Spaces:
Sleeping
Sleeping
import math | |
import torch | |
import torch.nn as nn | |
import copy | |
from torch.optim import Adam, SGD, AdamW | |
from torch.optim.lr_scheduler import LambdaLR | |
import logging | |
from typing import List, Dict, Any, Tuple, Union, Optional | |
from collections import namedtuple | |
from easydict import EasyDict | |
from ding.policy import Policy | |
from ding.model import model_wrap | |
from ding.torch_utils import to_device, to_list | |
from ding.utils import EasyTimer | |
from ding.utils.data import default_collate, default_decollate | |
from ding.rl_utils import get_nstep_return_data, get_train_sample | |
from ding.utils import POLICY_REGISTRY | |
from ding.torch_utils.loss.cross_entropy_loss import LabelSmoothCELoss | |
class BehaviourCloningPolicy(Policy): | |
""" | |
Overview: | |
Behaviour Cloning (BC) policy class, which supports both discrete and continuous action space. \ | |
The policy is trained by supervised learning, and the data is a offline dataset collected by expert. | |
""" | |
config = dict( | |
type='bc', | |
cuda=False, | |
on_policy=False, | |
continuous=False, | |
action_shape=19, | |
learn=dict( | |
update_per_collect=1, | |
batch_size=32, | |
learning_rate=1e-5, | |
lr_decay=False, | |
decay_epoch=30, | |
decay_rate=0.1, | |
warmup_lr=1e-4, | |
warmup_epoch=3, | |
optimizer='SGD', | |
momentum=0.9, | |
weight_decay=1e-4, | |
ce_label_smooth=False, | |
show_accuracy=False, | |
tanh_mask=False, # if actions always converge to 1 or -1, use this. | |
), | |
collect=dict( | |
unroll_len=1, | |
noise=False, | |
noise_sigma=0.2, | |
noise_range=dict( | |
min=-0.5, | |
max=0.5, | |
), | |
), | |
eval=dict(), # for compatibility | |
) | |
def default_model(self) -> Tuple[str, List[str]]: | |
""" | |
Overview: | |
Return this algorithm default neural network model setting for demonstration. ``__init__`` method will \ | |
automatically call this method to get the default model setting and create model. | |
Returns: | |
- model_info (:obj:`Tuple[str, List[str]]`): The registered model name and model's import_names. | |
.. note:: | |
The user can define and use customized network model but must obey the same inferface definition indicated \ | |
by import_names path. For example about discrete BC, its registered name is ``discrete_bc`` and the \ | |
import_names is ``ding.model.template.bc``. | |
""" | |
if self._cfg.continuous: | |
return 'continuous_bc', ['ding.model.template.bc'] | |
else: | |
return 'discrete_bc', ['ding.model.template.bc'] | |
def _init_learn(self) -> None: | |
""" | |
Overview: | |
Initialize the learn mode of policy, including related attributes and modules. For BC, it mainly contains \ | |
optimizer, algorithm-specific arguments such as lr_scheduler, loss, etc. \ | |
This method will be called in ``__init__`` method if ``learn`` field is in ``enable_field``. | |
.. note:: | |
For the member variables that need to be saved and loaded, please refer to the ``_state_dict_learn`` \ | |
and ``_load_state_dict_learn`` methods. | |
.. note:: | |
For the member variables that need to be monitored, please refer to the ``_monitor_vars_learn`` method. | |
.. note:: | |
If you want to set some spacial member variables in ``_init_learn`` method, you'd better name them \ | |
with prefix ``_learn_`` to avoid conflict with other modes, such as ``self._learn_attr1``. | |
""" | |
assert self._cfg.learn.optimizer in ['SGD', 'Adam'], self._cfg.learn.optimizer | |
if self._cfg.learn.optimizer == 'SGD': | |
self._optimizer = SGD( | |
self._model.parameters(), | |
lr=self._cfg.learn.learning_rate, | |
weight_decay=self._cfg.learn.weight_decay, | |
momentum=self._cfg.learn.momentum | |
) | |
elif self._cfg.learn.optimizer == 'Adam': | |
if self._cfg.learn.weight_decay is None: | |
self._optimizer = Adam( | |
self._model.parameters(), | |
lr=self._cfg.learn.learning_rate, | |
) | |
else: | |
self._optimizer = AdamW( | |
self._model.parameters(), | |
lr=self._cfg.learn.learning_rate, | |
weight_decay=self._cfg.learn.weight_decay | |
) | |
if self._cfg.learn.lr_decay: | |
def lr_scheduler_fn(epoch): | |
if epoch <= self._cfg.learn.warmup_epoch: | |
return self._cfg.learn.warmup_lr / self._cfg.learn.learning_rate | |
else: | |
ratio = (epoch - self._cfg.learn.warmup_epoch) // self._cfg.learn.decay_epoch | |
return math.pow(self._cfg.learn.decay_rate, ratio) | |
self._lr_scheduler = LambdaLR(self._optimizer, lr_scheduler_fn) | |
self._timer = EasyTimer(cuda=True) | |
self._learn_model = model_wrap(self._model, 'base') | |
self._learn_model.reset() | |
if self._cfg.continuous: | |
if self._cfg.loss_type == 'l1_loss': | |
self._loss = nn.L1Loss() | |
elif self._cfg.loss_type == 'mse_loss': | |
self._loss = nn.MSELoss() | |
else: | |
raise KeyError("not support loss type: {}".format(self._cfg.loss_type)) | |
else: | |
if not self._cfg.learn.ce_label_smooth: | |
self._loss = nn.CrossEntropyLoss() | |
else: | |
self._loss = LabelSmoothCELoss(0.1) | |
def _forward_learn(self, data: List[Dict[str, Any]]) -> Dict[str, Any]: | |
""" | |
Overview: | |
Policy forward function of learn mode (training policy and updating parameters). Forward means \ | |
that the policy inputs some training batch data from the replay buffer and then returns the output \ | |
result, including various training information such as loss and time. | |
Arguments: | |
- data (:obj:`List[Dict[int, Any]]`): The input data used for policy forward, including a batch of \ | |
training samples. For each element in list, the key of the dict is the name of data items and the \ | |
value is the corresponding data. Usually, the value is torch.Tensor or np.ndarray or there dict/list \ | |
combinations. In the ``_forward_learn`` method, data often need to first be stacked in the batch \ | |
dimension by some utility functions such as ``default_preprocess_learn``. \ | |
For BC, each element in list is a dict containing at least the following keys: ``obs``, ``action``. | |
Returns: | |
- info_dict (:obj:`Dict[str, Any]`): The information dict that indicated training result, which will be \ | |
recorded in text log and tensorboard, values must be python scalar or a list of scalars. For the \ | |
detailed definition of the dict, refer to the code of ``_monitor_vars_learn`` method. | |
.. note:: | |
The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ | |
For the data type that not supported, the main reason is that the corresponding model does not support it. \ | |
You can implement you own model rather than use the default model. For more information, please raise an \ | |
issue in GitHub repo and we will continue to follow up. | |
""" | |
if isinstance(data, list): | |
data = default_collate(data) | |
if self._cuda: | |
data = to_device(data, self._device) | |
self._learn_model.train() | |
with self._timer: | |
obs, action = data['obs'], data['action'].squeeze() | |
if self._cfg.continuous: | |
if self._cfg.learn.tanh_mask: | |
"""tanh_mask | |
We mask the action out of range of [tanh(-1),tanh(1)], model will learn information | |
and produce action in [-1,1]. So the action won't always converge to -1 or 1. | |
""" | |
mu = self._eval_model.forward(data['obs'])['action'] | |
bound = 1 - 2 / (math.exp(2) + 1) # tanh(1): (e-e**(-1))/(e+e**(-1)) | |
mask = mu.ge(-bound) & mu.le(bound) | |
mask_percent = 1 - mask.sum().item() / mu.numel() | |
if mask_percent > 0.8: # if there is too little data to learn(<80%). So we use all data. | |
loss = self._loss(mu, action.detach()) | |
else: | |
loss = self._loss(mu.masked_select(mask), action.masked_select(mask).detach()) | |
else: | |
mu = self._learn_model.forward(data['obs'])['action'] | |
# When we use bco, action is predicted by idm, gradient is not expected. | |
loss = self._loss(mu, action.detach()) | |
else: | |
a_logit = self._learn_model.forward(obs) | |
# When we use bco, action is predicted by idm, gradient is not expected. | |
loss = self._loss(a_logit['logit'], action.detach()) | |
if self._cfg.learn.show_accuracy: | |
# Calculate the overall accuracy and the accuracy of each class | |
total_accuracy = (a_logit['action'] == action.view(-1)).float().mean() | |
self.total_accuracy_in_dataset.append(total_accuracy) | |
logging.info(f'the total accuracy in current train mini-batch is: {total_accuracy.item()}') | |
for action_unique in to_list(torch.unique(action)): | |
action_index = (action == action_unique).nonzero(as_tuple=True)[0] | |
action_accuracy = (a_logit['action'][action_index] == action.view(-1)[action_index] | |
).float().mean() | |
if math.isnan(action_accuracy): | |
action_accuracy = 0.0 | |
self.action_accuracy_in_dataset[action_unique].append(action_accuracy) | |
logging.info( | |
f'the accuracy of action {action_unique} in current train mini-batch is: ' | |
f'{action_accuracy.item()}, ' | |
f'(nan means the action does not appear in the mini-batch)' | |
) | |
forward_time = self._timer.value | |
with self._timer: | |
self._optimizer.zero_grad() | |
loss.backward() | |
backward_time = self._timer.value | |
with self._timer: | |
if self._cfg.multi_gpu: | |
self.sync_gradients(self._learn_model) | |
sync_time = self._timer.value | |
self._optimizer.step() | |
cur_lr = [param_group['lr'] for param_group in self._optimizer.param_groups] | |
cur_lr = sum(cur_lr) / len(cur_lr) | |
return { | |
'cur_lr': cur_lr, | |
'total_loss': loss.item(), | |
'forward_time': forward_time, | |
'backward_time': backward_time, | |
'sync_time': sync_time, | |
} | |
def _monitor_vars_learn(self) -> List[str]: | |
""" | |
Overview: | |
Return the necessary keys for logging the return dict of ``self._forward_learn``. The logger module, such \ | |
as text logger, tensorboard logger, will use these keys to save the corresponding data. | |
Returns: | |
- necessary_keys (:obj:`List[str]`): The list of the necessary keys to be logged. | |
""" | |
return ['cur_lr', 'total_loss', 'forward_time', 'backward_time', 'sync_time'] | |
def _init_eval(self): | |
""" | |
Overview: | |
Initialize the eval mode of policy, including related attributes and modules. For BC, it contains the \ | |
eval model to greedily select action with argmax q_value mechanism for discrete action space. | |
This method will be called in ``__init__`` method if ``eval`` field is in ``enable_field``. | |
.. note:: | |
If you want to set some spacial member variables in ``_init_eval`` method, you'd better name them \ | |
with prefix ``_eval_`` to avoid conflict with other modes, such as ``self._eval_attr1``. | |
""" | |
if self._cfg.continuous: | |
self._eval_model = model_wrap(self._model, wrapper_name='base') | |
else: | |
self._eval_model = model_wrap(self._model, wrapper_name='argmax_sample') | |
self._eval_model.reset() | |
def _forward_eval(self, data: Dict[int, Any]) -> Dict[int, Any]: | |
""" | |
Overview: | |
Policy forward function of eval mode (evaluation policy performance by interacting with envs). Forward \ | |
means that the policy gets some necessary data (mainly observation) from the envs and then returns the \ | |
action to interact with the envs. | |
Arguments: | |
- data (:obj:`Dict[int, Any]`): The input data used for policy forward, including at least the obs. The \ | |
key of the dict is environment id and the value is the corresponding data of the env. | |
Returns: | |
- output (:obj:`Dict[int, Any]`): The output data of policy forward, including at least the action. The \ | |
key of the dict is the same as the input data, i.e. environment id. | |
.. note:: | |
The input value can be torch.Tensor or dict/list combinations and current policy supports all of them. \ | |
For the data type that not supported, the main reason is that the corresponding model does not support it. \ | |
You can implement you own model rather than use the default model. For more information, please raise an \ | |
issue in GitHub repo and we will continue to follow up. | |
""" | |
tensor_input = isinstance(data, torch.Tensor) | |
if tensor_input: | |
data = default_collate(list(data)) | |
else: | |
data_id = list(data.keys()) | |
data = default_collate(list(data.values())) | |
if self._cuda: | |
data = to_device(data, self._device) | |
self._eval_model.eval() | |
with torch.no_grad(): | |
output = self._eval_model.forward(data) | |
if self._cuda: | |
output = to_device(output, 'cpu') | |
if tensor_input: | |
return output | |
else: | |
output = default_decollate(output) | |
return {i: d for i, d in zip(data_id, output)} | |
def _init_collect(self) -> None: | |
""" | |
Overview: | |
BC policy uses offline dataset so it does not need to collect data. However, sometimes we need to use the \ | |
trained BC policy to collect data for other purposes. | |
""" | |
self._unroll_len = self._cfg.collect.unroll_len | |
if self._cfg.continuous: | |
self._collect_model = model_wrap( | |
self._model, | |
wrapper_name='action_noise', | |
noise_type='gauss', | |
noise_kwargs={ | |
'mu': 0.0, | |
'sigma': self._cfg.collect.noise_sigma.start | |
}, | |
noise_range=self._cfg.collect.noise_range | |
) | |
else: | |
self._collect_model = model_wrap(self._model, wrapper_name='eps_greedy_sample') | |
self._collect_model.reset() | |
def _forward_collect(self, data: Dict[int, Any], **kwargs) -> Dict[int, Any]: | |
data_id = list(data.keys()) | |
data = default_collate(list(data.values())) | |
if self._cuda: | |
data = to_device(data, self._device) | |
self._collect_model.eval() | |
with torch.no_grad(): | |
if self._cfg.continuous: | |
# output = self._collect_model.forward(data) | |
output = self._collect_model.forward(data, **kwargs) | |
else: | |
output = self._collect_model.forward(data, **kwargs) | |
if self._cuda: | |
output = to_device(output, 'cpu') | |
output = default_decollate(output) | |
return {i: d for i, d in zip(data_id, output)} | |
def _process_transition(self, obs: Any, policy_output: dict, timestep: namedtuple) -> dict: | |
transition = { | |
'obs': obs, | |
'next_obs': timestep.obs, | |
'action': policy_output['action'], | |
'reward': timestep.reward, | |
'done': timestep.done, | |
} | |
return EasyDict(transition) | |
def _get_train_sample(self, data: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | |
data = get_nstep_return_data(data, 1, 1) | |
return get_train_sample(data, self._unroll_len) | |