Module robotic_manipulator_rloa.rl_framework
Expand source code
import json
import logging
import os
import re
from dataclasses import dataclass
from typing import List, Union, Optional
import matplotlib.pyplot as plt
import numpy as np
import pybullet as p
import pybullet_data
import torch
from robotic_manipulator_rloa.environment.environment import Environment, EnvironmentConfiguration
from robotic_manipulator_rloa.utils.exceptions import (
EnvironmentNotInitialized,
NAFAgentNotInitialized,
ConfigurationIncomplete,
InvalidHyperParameter,
InvalidNAFAgentParameter
)
from robotic_manipulator_rloa.utils.logger import get_global_logger, Logger
from robotic_manipulator_rloa.naf_components.naf_algorithm import NAFAgent
# Mute matplotlib and PIL logs
plt.set_loglevel('critical')
logging.getLogger('PIL').setLevel(logging.WARNING)
# Initialize framework logger
logger = get_global_logger()
Logger.set_logger_setup()
@dataclass
class HyperParameters:
"""
Data Class for the storage of the required hyper-parameters.
"""
buffer_size: int = 100000
batch_size: int = 128
gamma: float = 0.99
tau: float = 0.001
learning_rate: float = 0.001
update_freq: int = 1
num_updates: int = 1
class ManipulatorFramework:
def __init__(self) -> None:
"""
Main class of the package. Initializes the required hyper-parameters to their default value.
"""
self.env: Union[Environment, None] = None
self.naf_agent: Union[NAFAgent, None] = None
self._hyperparameters: Union[HyperParameters, None] = None
self._initialize_hyperparameters()
logger.info('The Framework has been initialized with the default hyperparameters configuration')
logger.debug('* Custom hyperparameters can be set via the set_hyperparameter() method')
logger.debug('* All the required hyperparameters can be printed via the get_required_hyperparameters() method')
logger.debug('* Load a manipulator via the initialize_environment() method to start with the training '
'configuration')
def _initialize_hyperparameters(self) -> None:
"""
Initializes the hyper-parameters with their default values.
"""
self._hyperparameters = HyperParameters(buffer_size=100000,
batch_size=128,
gamma=0.99,
tau=0.001,
learning_rate=0.001,
update_freq=1,
num_updates=1)
@staticmethod
def set_log_level(log_level: int) -> None:
"""
Set Log Level for the Logger.
Args:
log_level: Log level to be set.\n
Valid values: 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL)
"""
level_name_mappings = {10: 'DEBUG', 20: 'INFO', 30: 'WARNING', 40: 'ERROR', 50: 'CRITICAL'}
# Set Log level
if log_level in [10, 20, 30, 40, 50]:
logger.info(f'Log Level has been set to {logger.level} ({level_name_mappings[logger.level]})')
logger.setLevel(log_level)
else:
logger.error(
f'The Log level provided is invalid, so the previous Log Level is maintained ({logger.level}))')
logger.error('Valid values: 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL)')
@staticmethod
def get_required_hyperparameters() -> None:
"""
Returns a list with the required hyper-parameters. This function does not imply
any logic, as it is only intended to help the user to know what hyper-parameters can be set.
The required hyper-parameters are shown as DEBUG logs, so if the Log level is set to INFO or higher
the function will not return anything.
"""
if logger.level > 10:
logger.error('get_required_hyperparameters() only shows information for DEBUG log level. '
'Try running this method after setting the log level to DEBUG by calling '
'set_log_level(10) class method')
return
hyperparameters_info_map = {
'Buffer Size': 'https://www.tensorflow.org/agents/tutorials/5_replay_buffers_tutorial?hl=es-419',
'Batch Size': 'https://www.kaggle.com/general/276990',
'Gamma (discount factor)': 'https://arxiv.org/pdf/2007.02040.pdf',
'Tau': 'https://arxiv.org/abs/1603.00748',
'Learning Rate': 'https://machinelearningmastery.com/understand-the-dynamics-of-learning-rate-on-deep'
'-learning-neural-networks/',
'Update Frequency': 'https://medium.com/towards-data-science/applied-reinforcement-learning-v-normalized'
'-advantage-function-naf-for-continuous-control-62ad143d3095',
'Number of Updates': 'https://medium.com/towards-data-science/applied-reinforcement-learning-v-normalized'
'-advantage-function-naf-for-continuous-control-62ad143d3095'}
logger.debug('Required Hyperparameters:')
for hyperparam, info in hyperparameters_info_map.items():
logger.debug('{:<25} (see {:<10})'.format(hyperparam, info))
@staticmethod
def plot_training_rewards(episode: int, mean_range: int = 50) -> None:
"""
Plots the mean reward of each batch of {mean_range} episodes for the "scores.txt" file stored
in the checkpoints/ folder, corresponding to the episode received as parameter.
Args:
episode: Episode number from which to plot the results. The episode provided must be one of the
checkpoints generated in the /checkpoints directory.
mean_range: Range of episodes on which the mean is calculated. If the execution lasted for
200 episodes, and the mean_range is set to 50, 4 metric points will be generated.
Raises:
FileNotFoundError: Raises if the episode provided is not present as a directory in the /checkpoints
directory generated after executing a training.
"""
try:
with open(f'checkpoints/{episode}/scores.txt', 'r') as f:
file = f.read()
scores = json.loads(file)
except FileNotFoundError as err:
logger.error(f'File "scores.txt" located in checkpoints/{episode}/ folder was not found')
raise err
counter, cummulative_reward, values_to_plot = 0, list(), list()
for episode, result in scores.items():
cummulative_reward.append(result[0])
counter += 1
if counter % mean_range == 0:
mean = sum(cummulative_reward) / len(cummulative_reward)
values_to_plot.append(mean)
cummulative_reward = list()
plt.plot(range(len(values_to_plot)), values_to_plot)
plt.show()
def set_hyperparameter(self, hyperparameter: str, value: Union[float, int]) -> None:
"""
Sets the specified value on the given hyper-parameter. Checking are performed to
ensure that the new value for the hyper-parameter is a valid value.
Args:
hyperparameter: Name of the hyper-parameter to be updated.
Allowed names are:\n
- buffer_size/buffersize/BUFFER_SIZE/BUFFERSIZE\n
- batch_size/batchsize/BATCH_SIZE/BATCHSIZE\n
- gamma/GAMMA\n
- tau/TAU\n
- learning_rate/learningrate/LEARNING_RATE/LEARNINGRATE\n
- update_freq/updatefreq/UPDATE_FREQ/UPDATEFREQ\n
- num_update/numupdate/NUMUPDATE/NUM_UPDATE\n
value: New value for the given hyper-parameter.
Raises:
InvalidHyperParameter: The hyperparameter received has an invalid value/type, or the hyperparameter
name received is not one of the accepted values.
"""
# Set BUFFER SIZE
if re.match(r'^(buffer_size|buffersize|BUFFER_SIZE|BUFFERSIZE)$', hyperparameter):
if not (isinstance(value, int) and value > 0):
raise InvalidHyperParameter('Buffer Size is not an int or has a value lower than 0')
self._hyperparameters.buffer_size = value
logger.info(f'Hyperparameter {hyperparameter} has been set to {value}')
# Set BATCH SIZE
elif re.match(r'^(batch_size|batchsize|BATCH_SIZE|BATCHSIZE)$', hyperparameter):
if not (isinstance(value, int) and value > 0):
raise InvalidHyperParameter('Batch Size is not an int or has a value lower than 0')
self._hyperparameters.batch_size = value
logger.info(f'Hyperparameter {hyperparameter} has been set to {value}')
# Set GAMMA
elif re.match(r'^(gamma|GAMMA)$', hyperparameter):
if not (isinstance(value, (int, float)) and 0 < value < 1):
raise InvalidHyperParameter('Gamma is not a float or its value is out of range (0, 1)')
self._hyperparameters.gamma = value
logger.info(f'Hyperparameter {hyperparameter} has been set to {value}')
# Set TAU
elif re.match(r'^(tau|TAU)$', hyperparameter):
if not (isinstance(value, (int, float)) and 0 <= value <= 1):
raise InvalidHyperParameter('Tau is not a float or its value is out of range [0, 1]')
self._hyperparameters.tau = value
logger.info(f'Hyperparameter {hyperparameter} has been set to {value}')
# Set LEARNING RATE
elif re.match(r'^(learning_rate|learningrate|LEARNING_RATE|LEARNINGRATE)$', hyperparameter):
if not (isinstance(value, (int, float)) and value > 0):
raise InvalidHyperParameter('Learning Rate is not a float or has a value lower than 0')
self._hyperparameters.learning_rate = value
logger.info(f'Hyperparameter {hyperparameter} has been set to {value}')
# Set UPDATE EVERY
elif re.match(r'^(update_freq|updatefreq|UPDATE_FREQ|UPDATEFREQ)$', hyperparameter):
if not (isinstance(value, int) and value > 0):
raise InvalidHyperParameter('Update Frequency is not an int or has a value lower than 0')
self._hyperparameters.update_freq = value
logger.info(f'Hyperparameter {hyperparameter} has been set to {value}')
# Set NUPDATE
elif re.match(r'^(num_update|numupdate|NUMUPDATE|NUM_UPDATE)$', hyperparameter):
if not (isinstance(value, int) and value > 0):
raise InvalidHyperParameter('Buffer Size is not an int or has a value lower than 0')
self._hyperparameters.num_updates = value
logger.info(f'Hyperparameter {hyperparameter} has been set to {value}')
else:
raise InvalidHyperParameter(
'The hyperparameter name passed as parameter is not valid. Valid hyperparameters are: '
'["buffer_size", "batch_size", "gamma", "tau", "learning_rate", "update_freq", "num_update"]')
def load_pretrained_parameters_from_weights_file(self, parameters_file_path: str) -> None:
"""
Loads a pretrained network's weights into the current networks. The weights are loaded from the path
provided in the {parameters_file_path} parameter. As the weights are loaded in the neural networks
contained in the NAFAgent class, the method will raise an error if either the Environment or the NAFAgent
class are not initialized.
Args:
parameters_file_path: Path to the .p file where the weights are stored.
Raises:
EnvironmentNotInitialized: The Environment class has not been initialized.
NAFAgentNotInitialized: The NAFAgentNotInitialized class has not been initialized.
MissingWeightsFile: The .p file path provided does not exist.
"""
if not self.env:
raise EnvironmentNotInitialized
if not self.naf_agent:
raise NAFAgentNotInitialized
self.naf_agent.initialize_pretrained_agent_from_weights_file(parameters_file_path)
def load_pretrained_parameters_from_episode(self, episode: int) -> None:
"""
Loads previously trained weights into the current networks.
The pretrained weights are retrieved from the checkpoints generated on a training execution, so
the episode provided must be present in the checkpoints/ folder. As the weights are loaded in the
neural networks contained in the NAFAgent class, the method will raise an error if either
the Environment or the NAFAgent class are not initialized.
Args:
episode: Episode in the /checkpoints folder from which to retrieve the pretrained weights.
Raises:
EnvironmentNotInitialized: The Environment class has not been initialized.
NAFAgentNotInitialized: The NAFAgentNotInitialized class has not been initialized.
MissingWeightsFile: The weights.p file does not exist in the folder provided.
"""
if not self.env:
raise EnvironmentNotInitialized
if not self.naf_agent:
raise NAFAgentNotInitialized
self.naf_agent.initialize_pretrained_agent_from_episode(episode)
def get_environment_configuration(self) -> None:
"""
Shows the Environment configuration as logs on stdout.
"""
if not self.env:
logger.error("Environment is not initialized yet, can't show configuration")
return
logger.info('Environment Configuration:')
logger.info(f'* Manipulator File: {self.env.manipulator_file}')
logger.info(f'* End Effector index: {self.env.endeffector_index}')
logger.info(f'* List of fixed Joints: {self.env.fixed_joints}')
logger.info(f'* List of Joints involved in training: {self.env.involved_joints}')
logger.info(f'* Position of the Target: {self.env.target_pos}')
logger.info(f'* Position of the Obstacle: {self.env.obstacle_pos}')
logger.info(f'* Initial position of joints: {self.env.initial_joint_positions}')
logger.info(f'* Initial variation range of joints: {self.env.initial_positions_variation_range}')
logger.info(f'* Max Force to be applied on joints: {self.env.max_force}')
logger.info(f'* Visualize mode: {self.env.visualize}')
logger.info(f'* Instance of the Environment: {self.env}')
def get_nafagent_configuration(self) -> None:
"""
Shows the NAFAgent configuration as logs on stdout.
"""
if not self.naf_agent:
logger.error("NAFAgent is not initialized yet, can't show configuration")
return
logger.info('NAFAgent Configuration:')
logger.info(f'* Environment Instance: {self.naf_agent.environment}')
logger.info(f'* State Size: {self.naf_agent.state_size}')
logger.info(f'* Action Size: {self.naf_agent.action_size}')
logger.info(f'* Size of layers of the Neural Network: {self.naf_agent.layer_size}')
logger.info(f'* Batch Size: {self.naf_agent.batch_size}')
logger.info(f'* Buffer Size: {self.naf_agent.buffer_size}')
logger.info(f'* Learning Rate: {self.naf_agent.learning_rate}')
logger.info(f'* Tau: {self.naf_agent.tau}')
logger.info(f'* Gamma: {self.naf_agent.gamma}')
logger.info(f'* Update Frequency: {self.naf_agent.update_freq}')
logger.info(f'* Number of Updates: {self.naf_agent.num_updates}')
logger.info(f'* Checkpoint frequency: {self.naf_agent.checkpoint_frequency}')
logger.info(f'* Device: {self.naf_agent.device}')
def test_trained_model(self, n_episodes: int, frames: int) -> None:
"""
Tests a previously trained agent through the execution of {n_episodes} test episodes,
for {frames} timesteps each. When the test concludes, the results of the test are logged on terminal.
Args:
n_episodes: Number of test episodes to execute.
frames: Number of timesteps per test episode.
Raises:
ConfigurationIncomplete: Either the Environment class or the NAFAgent class are not initialized.
"""
# Check if Environment and NAFAgent initialized
if not self.naf_agent or not self.env:
raise ConfigurationIncomplete
# Initialize Test result's history
results, num_collisions = list(), 0
for _ in range(n_episodes):
state = self.env.reset()
for frame in range(frames):
action = self.naf_agent.act(state)
next_state, reward, done = self.env.step(action)
state = next_state
if done:
if reward == 250:
results.append((True, frame))
break
else:
results.append((False, frame))
num_collisions += 1
break
if frame == frames - 1 and not done:
results.append((False, frame))
break
logger.info('Test Episode number {ep} completed\n'.format(ep=_ + 1))
logger.info('RESULTS OF THE TEST:')
for i, result in enumerate(results):
logger.info(f'Results of Iteration {i + 1}: COMPLETED: {result[0]}. FRAMES: {result[1]}')
logger.info(f'Number of successful executions: '
f'{[res[0] for res in results].count(True)}/{len(results)} '
f'({([res[0] for res in results].count(True) / len(results)) * 100}%)')
logger.info(f'Average number of frames required to complete an episode: '
f'{np.mean(np.array([res[1] for res in results if res[0]]))}')
logger.info(f'Number of episodes terminated because of collisions: {num_collisions}')
def initialize_environment(
self,
manipulator_file: str,
endeffector_index: int,
fixed_joints: List[int],
involved_joints: List[int],
target_position: List[float],
obstacle_position: List[float],
initial_joint_positions: List[float] = None,
initial_positions_variation_range: List[float] = None,
max_force: float = 200.,
visualize: bool = True) -> None:
"""
Initialize the Environment by creating an instance of the Environment class.
Args:
manipulator_file: Path to the manipulator's URDF or SDF file.
endeffector_index: Index of the manipulator's end-effector.
fixed_joints: List containing the indices of every joint not involved in the training.
involved_joints: List containing the indices of every joint involved in the training.
target_position: List containing the position of the target object, as 3D Cartesian coordinates.
obstacle_position: List containing the position of the obstacle, as 3D Cartesian coordinates.
initial_joint_positions: List containing as many items as the number of joints of the manipulator.
Each item in the list corresponds to the initial position wanted for the joint with that same index.
initial_positions_variation_range: List containing as many items as the number of joints of the manipulator.
Each item in the list corresponds to the variation range wanted for the joint with that same index.
max_force: Maximum force to be applied on the joints.
visualize: Visualization mode.
Raises:
InvalidEnvironmentParameter: One or more parameters provided for the Environment initialization are invalid.
InvalidManipulatorFile: The URDF/SDF file provided does not exist or cannot be loaded
into the Pybullet Environment.
"""
logger.debug('Initializing Pybullet Environment...')
environment_config = EnvironmentConfiguration(
endeffector_index=endeffector_index,
fixed_joints=fixed_joints,
involved_joints=involved_joints,
target_position=target_position,
obstacle_position=obstacle_position,
initial_joint_positions=initial_joint_positions,
initial_positions_variation_range=initial_positions_variation_range,
max_force=max_force,
visualize=visualize)
self.env = Environment(manipulator_file=manipulator_file,
environment_config=environment_config)
logger.info('Pybullet Environment successfully initialized')
logger.debug(f'* The NAF Agent can now be initialized via the initialize_naf_agent() method')
def delete_environment(self) -> None:
"""
Deletes the existing Environment instance, and disconnects the current Pybullet instance.
"""
if not self.env:
logger.error('No existing instance of Environment found')
return
p.disconnect(self.env.physics_client)
self.env = None
logger.info('Environment instance has been successfully removed')
def initialize_naf_agent(self, checkpoint_frequency: int = 500, seed: int = 0) -> None:
"""
Initialize the NAF Agent by creating an instance of the NAFAgent class.
Args:
checkpoint_frequency: Number of episodes required to generate a checkpoint.
seed: Random seed.
Raises:
EnvironmentNotInitialized: Environment class has not been initialized.
InvalidNAFAgentParameter: Either "checkpoint_frequency" or "seed" parameters have an invalid value/type.
"""
if not self.env:
raise EnvironmentNotInitialized
if not isinstance(checkpoint_frequency, int) or not isinstance(seed, int):
raise InvalidNAFAgentParameter('Checkpoint Frequency or Seed received is not an integer')
logger.debug('Initializing NAF Agent...')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.naf_agent = NAFAgent(environment=self.env,
state_size=self.env.observation_space.shape[0],
action_size=self.env.action_space.shape[0],
layer_size=256,
batch_size=self._hyperparameters.batch_size,
buffer_size=self._hyperparameters.buffer_size,
learning_rate=self._hyperparameters.learning_rate,
tau=self._hyperparameters.tau,
gamma=self._hyperparameters.gamma,
update_freq=self._hyperparameters.update_freq,
num_updates=self._hyperparameters.num_updates,
checkpoint_frequency=checkpoint_frequency,
device=device,
seed=seed)
logger.info('NAF Agent successfully initialized')
logger.debug('* The Robotic Manipulator training can now be launched via the run_training() method')
def delete_naf_agent(self) -> None:
"""
Deletes the existing NAFAgent instance.
"""
if not self.naf_agent:
logger.error('No existing instance of NAFAgent found')
return
self.naf_agent = None
logger.info('NAFAgent instance has been successfully removed')
def run_training(self, episodes: int, frames: Optional[int] = 500, verbose: bool = True):
"""
Execute a training on the configured Environment with the configured NAF Agent. The training is
executed for {episodes}, and for {frames} timesteps per episode.
Args:
episodes: Maximum number of episodes to execute.
frames: Maximum number of timesteps to execute per episode.
verbose: Verbose mode.\n
- If set to True, each timestep will log information about the current state,
the action chosen from that state, the current reward and the cummulative reward until that timestep.
In addition, each time an episode ends information about the total reward obtained, the number of
frames required to complete the episode, the mean reward obtained along the episode and the
execution time of the episode are logged.\n
- If set to False, each time an episode ends information about the total reward obtained, the number of
frames required to complete the episode, the mean reward obtained along the episode and the
execution time of the episode are logged.\n
It is recommended to use the verbose mode only in a development/debugging context, since logging
information for each timestep greatly reduces the visibility of what is happening during the training.
Raises:
ConfigurationIncomplete: Either NAFAgent or Environment has not been initialized.
"""
if not self.naf_agent or not self.env:
raise ConfigurationIncomplete
self.naf_agent.run(frames, episodes, verbose)
def run_demo_training(self, demo_type: str, verbose: bool = False) -> None:
"""
Run a demo from a preconfigured Environment and NAFAgent, which shows how the framework works.
The training is executed for 20 episodes. Do not expect good results, as this is just a demo of the training
configuration process and the number of episodes is not enough to achieve a good performance in the manipulator.
Args:
demo_type: Valid values:\n
- 'kuka_training': training with the KUKA IIWA Robotic Manipulator.\n
- 'xarm6_training': training with the XArm6 Robotic Manipulator.\n
verbose: Verbose mode. Verbose mode functionality is applied on the same way as for the
run_training() method.
"""
logger.warning('Both the demo testing and the demo training are executed with the Log level '
'set to DEBUG, so that the framework can be understood at a low level.')
old_level = logger.level
logger.setLevel(10)
# CHECK EXISTENT ENVIRONMENT AND NAF AGENT
# Overwrite previous Environment configuration if present
if self.env:
overwrite_env = input('Environment instance found. Overwrite? [Y/n] ').lower() == 'y'
if not overwrite_env:
logger.info('Demo could not run due to the presence of a user-configured Environment instance')
return
self.delete_environment()
# Overwrite previous NAFAgent configuration if present
if self.naf_agent:
overwrite_naf_agent = input('NAFAgent instance found. Overwrite? [Y/n] ').lower() == 'y'
if not overwrite_naf_agent:
logger.info('Demo could not run due to the presence of a user-configured NAFAgent instance')
return
self.delete_naf_agent()
# START DEMO
if demo_type == 'kuka_training':
logger.info('Initializing demo Environment instance...')
self.initialize_environment(manipulator_file='kuka_iiwa/kuka_with_gripper2.sdf',
endeffector_index=13,
fixed_joints=[6, 7, 8, 9, 10, 11, 12, 13],
involved_joints=[0, 1, 2, 3, 4, 5],
target_position=[0.4, 0.85, 0.71],
obstacle_position=[0.45, 0.55, 0.55],
initial_joint_positions=[0.9, 0.45, 0, 0, 0, 0],
initial_positions_variation_range=[0, 0, 0, 0, 0, 0],
visualize=True)
logger.info('Initializing demo NAFAgent instance...')
self.initialize_naf_agent()
logger.info('Running training for 20 episodes. Do not expect good results, '
'this is just a demo of the training configuration process')
self.run_training(20, 400, verbose=verbose)
# Reset Environment and NAFAgent
self.delete_environment()
self.delete_naf_agent()
elif demo_type == 'xarm6_training':
logger.info('Initializing demo Environment instance...')
xarm_path = os.path.join(pybullet_data.getDataPath(), 'xarm/xarm6_with_gripper.urdf')
self.initialize_environment(manipulator_file=xarm_path,
endeffector_index=12,
fixed_joints=[0, 7, 8, 9, 10, 11, 12, 13],
involved_joints=[1, 2, 3, 4, 5, 6],
target_position=[0.3, 0.47, 0.61],
obstacle_position=[0.25, 0.27, 0.5],
initial_joint_positions=[0., 1., 0., -2.3, 0., 0., 0.],
initial_positions_variation_range=[0, 0, 0, 0.3, 1, 1, 1],
visualize=True)
logger.info('Initializing demo NAFAgent instance...')
self.initialize_naf_agent()
logger.info('Running training for 20 episodes. Do not expect good results, '
'this is just a demo of the confiuration process')
self.run_training(20, 400, verbose=verbose)
# Reset Environment and NAFAgent
self.delete_environment()
self.delete_naf_agent()
else:
logger.error('Incorrect demo type!')
# Reset log level
logger.setLevel(old_level)
logger.warning('Log level has been reset to its original value')
def run_demo_testing(self, demo_type: str) -> None:
"""
Run a demo testing from a preconfigured Environment and NAFAgent, which shows how a robotic
manipulator learns with the framework. The demo loads pretrained weights and executes 50 test episodes.
Args:
demo_type: Valid values:\n
- 'kuka_testing': testing with the KUKA IIWA Robotic Manipulator.\n
- 'xarm6_testing': testing with the XArm6 Robotic Manipulator.\n
"""
logger.warning('Both the demo testing and the demo training are executed with the Log level '
'set to DEBUG, so that the framework can be understood at a low level.')
old_level = logger.level
logger.setLevel(10)
# CHECK EXISTENT ENVIRONMENT AND NAF AGENT
# Overwrite previous Environment configuration if present
if self.env:
overwrite_env = input('Environment instance found. Overwrite? [Y/n] ').lower() == 'y'
if not overwrite_env:
logger.info('Demo could not run due to the presence of a user-configured Environment instance')
return
self.delete_environment()
# Overwrite previous NAFAgent configuration if present
if self.naf_agent:
overwrite_naf_agent = input('NAFAgent instance found. Overwrite? [Y/n] ').lower() == 'y'
if not overwrite_naf_agent:
logger.info('Demo could not run due to the presence of a user-configured NAFAgent instance')
return
self.delete_naf_agent()
# START DEMO
if demo_type == 'kuka_testing':
logger.info('Initializing demo Environment instance...')
kuka_path = os.path.join(pybullet_data.getDataPath(), 'kuka_iiwa/kuka_with_gripper2.sdf')
self.initialize_environment(manipulator_file=kuka_path,
endeffector_index=13,
fixed_joints=[6, 7, 8, 9, 10, 11, 12, 13],
involved_joints=[0, 1, 2, 3, 4, 5],
target_position=[0.4, 0.85, 0.71],
obstacle_position=[0.45, 0.55, 0.55],
initial_joint_positions=[0.9, 0.45, 0, 0, 0, 0],
initial_positions_variation_range=[0, 0, .5, .5, .5, .5])
logger.info('Initializing demo NAFAgent instance...')
self.initialize_naf_agent()
logger.info('Loading demo pretrained parameters')
self.load_pretrained_parameters_from_weights_file(os.path.dirname(
os.path.realpath(__file__)) + '/naf_components/demo_weights/weights_kuka.p')
logger.info('Running 50 test episodes...')
self.test_trained_model(50, 750)
# Reset Environment and NAFAgent
self.delete_environment()
self.delete_naf_agent()
elif demo_type == 'xarm6_testing':
logger.info('Initializing demo Environment instance...')
xarm_path = os.path.join(pybullet_data.getDataPath(), 'xarm/xarm6_with_gripper.urdf')
self.initialize_environment(manipulator_file=xarm_path,
endeffector_index=12,
fixed_joints=[0, 7, 8, 9, 10, 11, 12, 13],
involved_joints=[1, 2, 3, 4, 5, 6],
target_position=[0.3, 0.47, 0.61],
obstacle_position=[0.25, 0.27, 0.5],
initial_joint_positions=[0., 1., 0., -2.3, 0., 0., 0.],
initial_positions_variation_range=[0, 0, 0, 0.3, 1, 1, 1],
max_force=200,
visualize=True)
logger.info('Initializing demo NAFAgent instance...')
self.initialize_naf_agent()
logger.info('Loading demo pretrained parameters')
self.load_pretrained_parameters_from_weights_file(
os.path.dirname(os.path.realpath(__file__)) + '/naf_components/demo_weights/weights_xarm6.p')
logger.info('Running 50 test episodes...')
self.test_trained_model(50, 750)
# Reset Environment and NAFAgent
self.delete_environment()
self.delete_naf_agent()
else:
logger.error('Incorrect demo type!')
# Reset log level
logger.setLevel(old_level)
logger.warning('Log level has been reset to its original value')
Classes
class HyperParameters (buffer_size: int = 100000, batch_size: int = 128, gamma: float = 0.99, tau: float = 0.001, learning_rate: float = 0.001, update_freq: int = 1, num_updates: int = 1)
-
Data Class for the storage of the required hyper-parameters.
Expand source code
class HyperParameters: """ Data Class for the storage of the required hyper-parameters. """ buffer_size: int = 100000 batch_size: int = 128 gamma: float = 0.99 tau: float = 0.001 learning_rate: float = 0.001 update_freq: int = 1 num_updates: int = 1
Class variables
var batch_size : int
var buffer_size : int
var gamma : float
var learning_rate : float
var num_updates : int
var tau : float
var update_freq : int
class ManipulatorFramework
-
Main class of the package. Initializes the required hyper-parameters to their default value.
Expand source code
class ManipulatorFramework: def __init__(self) -> None: """ Main class of the package. Initializes the required hyper-parameters to their default value. """ self.env: Union[Environment, None] = None self.naf_agent: Union[NAFAgent, None] = None self._hyperparameters: Union[HyperParameters, None] = None self._initialize_hyperparameters() logger.info('The Framework has been initialized with the default hyperparameters configuration') logger.debug('* Custom hyperparameters can be set via the set_hyperparameter() method') logger.debug('* All the required hyperparameters can be printed via the get_required_hyperparameters() method') logger.debug('* Load a manipulator via the initialize_environment() method to start with the training ' 'configuration') def _initialize_hyperparameters(self) -> None: """ Initializes the hyper-parameters with their default values. """ self._hyperparameters = HyperParameters(buffer_size=100000, batch_size=128, gamma=0.99, tau=0.001, learning_rate=0.001, update_freq=1, num_updates=1) @staticmethod def set_log_level(log_level: int) -> None: """ Set Log Level for the Logger. Args: log_level: Log level to be set.\n Valid values: 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL) """ level_name_mappings = {10: 'DEBUG', 20: 'INFO', 30: 'WARNING', 40: 'ERROR', 50: 'CRITICAL'} # Set Log level if log_level in [10, 20, 30, 40, 50]: logger.info(f'Log Level has been set to {logger.level} ({level_name_mappings[logger.level]})') logger.setLevel(log_level) else: logger.error( f'The Log level provided is invalid, so the previous Log Level is maintained ({logger.level}))') logger.error('Valid values: 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL)') @staticmethod def get_required_hyperparameters() -> None: """ Returns a list with the required hyper-parameters. This function does not imply any logic, as it is only intended to help the user to know what hyper-parameters can be set. The required hyper-parameters are shown as DEBUG logs, so if the Log level is set to INFO or higher the function will not return anything. """ if logger.level > 10: logger.error('get_required_hyperparameters() only shows information for DEBUG log level. ' 'Try running this method after setting the log level to DEBUG by calling ' 'set_log_level(10) class method') return hyperparameters_info_map = { 'Buffer Size': 'https://www.tensorflow.org/agents/tutorials/5_replay_buffers_tutorial?hl=es-419', 'Batch Size': 'https://www.kaggle.com/general/276990', 'Gamma (discount factor)': 'https://arxiv.org/pdf/2007.02040.pdf', 'Tau': 'https://arxiv.org/abs/1603.00748', 'Learning Rate': 'https://machinelearningmastery.com/understand-the-dynamics-of-learning-rate-on-deep' '-learning-neural-networks/', 'Update Frequency': 'https://medium.com/towards-data-science/applied-reinforcement-learning-v-normalized' '-advantage-function-naf-for-continuous-control-62ad143d3095', 'Number of Updates': 'https://medium.com/towards-data-science/applied-reinforcement-learning-v-normalized' '-advantage-function-naf-for-continuous-control-62ad143d3095'} logger.debug('Required Hyperparameters:') for hyperparam, info in hyperparameters_info_map.items(): logger.debug('{:<25} (see {:<10})'.format(hyperparam, info)) @staticmethod def plot_training_rewards(episode: int, mean_range: int = 50) -> None: """ Plots the mean reward of each batch of {mean_range} episodes for the "scores.txt" file stored in the checkpoints/ folder, corresponding to the episode received as parameter. Args: episode: Episode number from which to plot the results. The episode provided must be one of the checkpoints generated in the /checkpoints directory. mean_range: Range of episodes on which the mean is calculated. If the execution lasted for 200 episodes, and the mean_range is set to 50, 4 metric points will be generated. Raises: FileNotFoundError: Raises if the episode provided is not present as a directory in the /checkpoints directory generated after executing a training. """ try: with open(f'checkpoints/{episode}/scores.txt', 'r') as f: file = f.read() scores = json.loads(file) except FileNotFoundError as err: logger.error(f'File "scores.txt" located in checkpoints/{episode}/ folder was not found') raise err counter, cummulative_reward, values_to_plot = 0, list(), list() for episode, result in scores.items(): cummulative_reward.append(result[0]) counter += 1 if counter % mean_range == 0: mean = sum(cummulative_reward) / len(cummulative_reward) values_to_plot.append(mean) cummulative_reward = list() plt.plot(range(len(values_to_plot)), values_to_plot) plt.show() def set_hyperparameter(self, hyperparameter: str, value: Union[float, int]) -> None: """ Sets the specified value on the given hyper-parameter. Checking are performed to ensure that the new value for the hyper-parameter is a valid value. Args: hyperparameter: Name of the hyper-parameter to be updated. Allowed names are:\n - buffer_size/buffersize/BUFFER_SIZE/BUFFERSIZE\n - batch_size/batchsize/BATCH_SIZE/BATCHSIZE\n - gamma/GAMMA\n - tau/TAU\n - learning_rate/learningrate/LEARNING_RATE/LEARNINGRATE\n - update_freq/updatefreq/UPDATE_FREQ/UPDATEFREQ\n - num_update/numupdate/NUMUPDATE/NUM_UPDATE\n value: New value for the given hyper-parameter. Raises: InvalidHyperParameter: The hyperparameter received has an invalid value/type, or the hyperparameter name received is not one of the accepted values. """ # Set BUFFER SIZE if re.match(r'^(buffer_size|buffersize|BUFFER_SIZE|BUFFERSIZE)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Buffer Size is not an int or has a value lower than 0') self._hyperparameters.buffer_size = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set BATCH SIZE elif re.match(r'^(batch_size|batchsize|BATCH_SIZE|BATCHSIZE)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Batch Size is not an int or has a value lower than 0') self._hyperparameters.batch_size = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set GAMMA elif re.match(r'^(gamma|GAMMA)$', hyperparameter): if not (isinstance(value, (int, float)) and 0 < value < 1): raise InvalidHyperParameter('Gamma is not a float or its value is out of range (0, 1)') self._hyperparameters.gamma = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set TAU elif re.match(r'^(tau|TAU)$', hyperparameter): if not (isinstance(value, (int, float)) and 0 <= value <= 1): raise InvalidHyperParameter('Tau is not a float or its value is out of range [0, 1]') self._hyperparameters.tau = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set LEARNING RATE elif re.match(r'^(learning_rate|learningrate|LEARNING_RATE|LEARNINGRATE)$', hyperparameter): if not (isinstance(value, (int, float)) and value > 0): raise InvalidHyperParameter('Learning Rate is not a float or has a value lower than 0') self._hyperparameters.learning_rate = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set UPDATE EVERY elif re.match(r'^(update_freq|updatefreq|UPDATE_FREQ|UPDATEFREQ)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Update Frequency is not an int or has a value lower than 0') self._hyperparameters.update_freq = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set NUPDATE elif re.match(r'^(num_update|numupdate|NUMUPDATE|NUM_UPDATE)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Buffer Size is not an int or has a value lower than 0') self._hyperparameters.num_updates = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') else: raise InvalidHyperParameter( 'The hyperparameter name passed as parameter is not valid. Valid hyperparameters are: ' '["buffer_size", "batch_size", "gamma", "tau", "learning_rate", "update_freq", "num_update"]') def load_pretrained_parameters_from_weights_file(self, parameters_file_path: str) -> None: """ Loads a pretrained network's weights into the current networks. The weights are loaded from the path provided in the {parameters_file_path} parameter. As the weights are loaded in the neural networks contained in the NAFAgent class, the method will raise an error if either the Environment or the NAFAgent class are not initialized. Args: parameters_file_path: Path to the .p file where the weights are stored. Raises: EnvironmentNotInitialized: The Environment class has not been initialized. NAFAgentNotInitialized: The NAFAgentNotInitialized class has not been initialized. MissingWeightsFile: The .p file path provided does not exist. """ if not self.env: raise EnvironmentNotInitialized if not self.naf_agent: raise NAFAgentNotInitialized self.naf_agent.initialize_pretrained_agent_from_weights_file(parameters_file_path) def load_pretrained_parameters_from_episode(self, episode: int) -> None: """ Loads previously trained weights into the current networks. The pretrained weights are retrieved from the checkpoints generated on a training execution, so the episode provided must be present in the checkpoints/ folder. As the weights are loaded in the neural networks contained in the NAFAgent class, the method will raise an error if either the Environment or the NAFAgent class are not initialized. Args: episode: Episode in the /checkpoints folder from which to retrieve the pretrained weights. Raises: EnvironmentNotInitialized: The Environment class has not been initialized. NAFAgentNotInitialized: The NAFAgentNotInitialized class has not been initialized. MissingWeightsFile: The weights.p file does not exist in the folder provided. """ if not self.env: raise EnvironmentNotInitialized if not self.naf_agent: raise NAFAgentNotInitialized self.naf_agent.initialize_pretrained_agent_from_episode(episode) def get_environment_configuration(self) -> None: """ Shows the Environment configuration as logs on stdout. """ if not self.env: logger.error("Environment is not initialized yet, can't show configuration") return logger.info('Environment Configuration:') logger.info(f'* Manipulator File: {self.env.manipulator_file}') logger.info(f'* End Effector index: {self.env.endeffector_index}') logger.info(f'* List of fixed Joints: {self.env.fixed_joints}') logger.info(f'* List of Joints involved in training: {self.env.involved_joints}') logger.info(f'* Position of the Target: {self.env.target_pos}') logger.info(f'* Position of the Obstacle: {self.env.obstacle_pos}') logger.info(f'* Initial position of joints: {self.env.initial_joint_positions}') logger.info(f'* Initial variation range of joints: {self.env.initial_positions_variation_range}') logger.info(f'* Max Force to be applied on joints: {self.env.max_force}') logger.info(f'* Visualize mode: {self.env.visualize}') logger.info(f'* Instance of the Environment: {self.env}') def get_nafagent_configuration(self) -> None: """ Shows the NAFAgent configuration as logs on stdout. """ if not self.naf_agent: logger.error("NAFAgent is not initialized yet, can't show configuration") return logger.info('NAFAgent Configuration:') logger.info(f'* Environment Instance: {self.naf_agent.environment}') logger.info(f'* State Size: {self.naf_agent.state_size}') logger.info(f'* Action Size: {self.naf_agent.action_size}') logger.info(f'* Size of layers of the Neural Network: {self.naf_agent.layer_size}') logger.info(f'* Batch Size: {self.naf_agent.batch_size}') logger.info(f'* Buffer Size: {self.naf_agent.buffer_size}') logger.info(f'* Learning Rate: {self.naf_agent.learning_rate}') logger.info(f'* Tau: {self.naf_agent.tau}') logger.info(f'* Gamma: {self.naf_agent.gamma}') logger.info(f'* Update Frequency: {self.naf_agent.update_freq}') logger.info(f'* Number of Updates: {self.naf_agent.num_updates}') logger.info(f'* Checkpoint frequency: {self.naf_agent.checkpoint_frequency}') logger.info(f'* Device: {self.naf_agent.device}') def test_trained_model(self, n_episodes: int, frames: int) -> None: """ Tests a previously trained agent through the execution of {n_episodes} test episodes, for {frames} timesteps each. When the test concludes, the results of the test are logged on terminal. Args: n_episodes: Number of test episodes to execute. frames: Number of timesteps per test episode. Raises: ConfigurationIncomplete: Either the Environment class or the NAFAgent class are not initialized. """ # Check if Environment and NAFAgent initialized if not self.naf_agent or not self.env: raise ConfigurationIncomplete # Initialize Test result's history results, num_collisions = list(), 0 for _ in range(n_episodes): state = self.env.reset() for frame in range(frames): action = self.naf_agent.act(state) next_state, reward, done = self.env.step(action) state = next_state if done: if reward == 250: results.append((True, frame)) break else: results.append((False, frame)) num_collisions += 1 break if frame == frames - 1 and not done: results.append((False, frame)) break logger.info('Test Episode number {ep} completed\n'.format(ep=_ + 1)) logger.info('RESULTS OF THE TEST:') for i, result in enumerate(results): logger.info(f'Results of Iteration {i + 1}: COMPLETED: {result[0]}. FRAMES: {result[1]}') logger.info(f'Number of successful executions: ' f'{[res[0] for res in results].count(True)}/{len(results)} ' f'({([res[0] for res in results].count(True) / len(results)) * 100}%)') logger.info(f'Average number of frames required to complete an episode: ' f'{np.mean(np.array([res[1] for res in results if res[0]]))}') logger.info(f'Number of episodes terminated because of collisions: {num_collisions}') def initialize_environment( self, manipulator_file: str, endeffector_index: int, fixed_joints: List[int], involved_joints: List[int], target_position: List[float], obstacle_position: List[float], initial_joint_positions: List[float] = None, initial_positions_variation_range: List[float] = None, max_force: float = 200., visualize: bool = True) -> None: """ Initialize the Environment by creating an instance of the Environment class. Args: manipulator_file: Path to the manipulator's URDF or SDF file. endeffector_index: Index of the manipulator's end-effector. fixed_joints: List containing the indices of every joint not involved in the training. involved_joints: List containing the indices of every joint involved in the training. target_position: List containing the position of the target object, as 3D Cartesian coordinates. obstacle_position: List containing the position of the obstacle, as 3D Cartesian coordinates. initial_joint_positions: List containing as many items as the number of joints of the manipulator. Each item in the list corresponds to the initial position wanted for the joint with that same index. initial_positions_variation_range: List containing as many items as the number of joints of the manipulator. Each item in the list corresponds to the variation range wanted for the joint with that same index. max_force: Maximum force to be applied on the joints. visualize: Visualization mode. Raises: InvalidEnvironmentParameter: One or more parameters provided for the Environment initialization are invalid. InvalidManipulatorFile: The URDF/SDF file provided does not exist or cannot be loaded into the Pybullet Environment. """ logger.debug('Initializing Pybullet Environment...') environment_config = EnvironmentConfiguration( endeffector_index=endeffector_index, fixed_joints=fixed_joints, involved_joints=involved_joints, target_position=target_position, obstacle_position=obstacle_position, initial_joint_positions=initial_joint_positions, initial_positions_variation_range=initial_positions_variation_range, max_force=max_force, visualize=visualize) self.env = Environment(manipulator_file=manipulator_file, environment_config=environment_config) logger.info('Pybullet Environment successfully initialized') logger.debug(f'* The NAF Agent can now be initialized via the initialize_naf_agent() method') def delete_environment(self) -> None: """ Deletes the existing Environment instance, and disconnects the current Pybullet instance. """ if not self.env: logger.error('No existing instance of Environment found') return p.disconnect(self.env.physics_client) self.env = None logger.info('Environment instance has been successfully removed') def initialize_naf_agent(self, checkpoint_frequency: int = 500, seed: int = 0) -> None: """ Initialize the NAF Agent by creating an instance of the NAFAgent class. Args: checkpoint_frequency: Number of episodes required to generate a checkpoint. seed: Random seed. Raises: EnvironmentNotInitialized: Environment class has not been initialized. InvalidNAFAgentParameter: Either "checkpoint_frequency" or "seed" parameters have an invalid value/type. """ if not self.env: raise EnvironmentNotInitialized if not isinstance(checkpoint_frequency, int) or not isinstance(seed, int): raise InvalidNAFAgentParameter('Checkpoint Frequency or Seed received is not an integer') logger.debug('Initializing NAF Agent...') device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") self.naf_agent = NAFAgent(environment=self.env, state_size=self.env.observation_space.shape[0], action_size=self.env.action_space.shape[0], layer_size=256, batch_size=self._hyperparameters.batch_size, buffer_size=self._hyperparameters.buffer_size, learning_rate=self._hyperparameters.learning_rate, tau=self._hyperparameters.tau, gamma=self._hyperparameters.gamma, update_freq=self._hyperparameters.update_freq, num_updates=self._hyperparameters.num_updates, checkpoint_frequency=checkpoint_frequency, device=device, seed=seed) logger.info('NAF Agent successfully initialized') logger.debug('* The Robotic Manipulator training can now be launched via the run_training() method') def delete_naf_agent(self) -> None: """ Deletes the existing NAFAgent instance. """ if not self.naf_agent: logger.error('No existing instance of NAFAgent found') return self.naf_agent = None logger.info('NAFAgent instance has been successfully removed') def run_training(self, episodes: int, frames: Optional[int] = 500, verbose: bool = True): """ Execute a training on the configured Environment with the configured NAF Agent. The training is executed for {episodes}, and for {frames} timesteps per episode. Args: episodes: Maximum number of episodes to execute. frames: Maximum number of timesteps to execute per episode. verbose: Verbose mode.\n - If set to True, each timestep will log information about the current state, the action chosen from that state, the current reward and the cummulative reward until that timestep. In addition, each time an episode ends information about the total reward obtained, the number of frames required to complete the episode, the mean reward obtained along the episode and the execution time of the episode are logged.\n - If set to False, each time an episode ends information about the total reward obtained, the number of frames required to complete the episode, the mean reward obtained along the episode and the execution time of the episode are logged.\n It is recommended to use the verbose mode only in a development/debugging context, since logging information for each timestep greatly reduces the visibility of what is happening during the training. Raises: ConfigurationIncomplete: Either NAFAgent or Environment has not been initialized. """ if not self.naf_agent or not self.env: raise ConfigurationIncomplete self.naf_agent.run(frames, episodes, verbose) def run_demo_training(self, demo_type: str, verbose: bool = False) -> None: """ Run a demo from a preconfigured Environment and NAFAgent, which shows how the framework works. The training is executed for 20 episodes. Do not expect good results, as this is just a demo of the training configuration process and the number of episodes is not enough to achieve a good performance in the manipulator. Args: demo_type: Valid values:\n - 'kuka_training': training with the KUKA IIWA Robotic Manipulator.\n - 'xarm6_training': training with the XArm6 Robotic Manipulator.\n verbose: Verbose mode. Verbose mode functionality is applied on the same way as for the run_training() method. """ logger.warning('Both the demo testing and the demo training are executed with the Log level ' 'set to DEBUG, so that the framework can be understood at a low level.') old_level = logger.level logger.setLevel(10) # CHECK EXISTENT ENVIRONMENT AND NAF AGENT # Overwrite previous Environment configuration if present if self.env: overwrite_env = input('Environment instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_env: logger.info('Demo could not run due to the presence of a user-configured Environment instance') return self.delete_environment() # Overwrite previous NAFAgent configuration if present if self.naf_agent: overwrite_naf_agent = input('NAFAgent instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_naf_agent: logger.info('Demo could not run due to the presence of a user-configured NAFAgent instance') return self.delete_naf_agent() # START DEMO if demo_type == 'kuka_training': logger.info('Initializing demo Environment instance...') self.initialize_environment(manipulator_file='kuka_iiwa/kuka_with_gripper2.sdf', endeffector_index=13, fixed_joints=[6, 7, 8, 9, 10, 11, 12, 13], involved_joints=[0, 1, 2, 3, 4, 5], target_position=[0.4, 0.85, 0.71], obstacle_position=[0.45, 0.55, 0.55], initial_joint_positions=[0.9, 0.45, 0, 0, 0, 0], initial_positions_variation_range=[0, 0, 0, 0, 0, 0], visualize=True) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Running training for 20 episodes. Do not expect good results, ' 'this is just a demo of the training configuration process') self.run_training(20, 400, verbose=verbose) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() elif demo_type == 'xarm6_training': logger.info('Initializing demo Environment instance...') xarm_path = os.path.join(pybullet_data.getDataPath(), 'xarm/xarm6_with_gripper.urdf') self.initialize_environment(manipulator_file=xarm_path, endeffector_index=12, fixed_joints=[0, 7, 8, 9, 10, 11, 12, 13], involved_joints=[1, 2, 3, 4, 5, 6], target_position=[0.3, 0.47, 0.61], obstacle_position=[0.25, 0.27, 0.5], initial_joint_positions=[0., 1., 0., -2.3, 0., 0., 0.], initial_positions_variation_range=[0, 0, 0, 0.3, 1, 1, 1], visualize=True) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Running training for 20 episodes. Do not expect good results, ' 'this is just a demo of the confiuration process') self.run_training(20, 400, verbose=verbose) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() else: logger.error('Incorrect demo type!') # Reset log level logger.setLevel(old_level) logger.warning('Log level has been reset to its original value') def run_demo_testing(self, demo_type: str) -> None: """ Run a demo testing from a preconfigured Environment and NAFAgent, which shows how a robotic manipulator learns with the framework. The demo loads pretrained weights and executes 50 test episodes. Args: demo_type: Valid values:\n - 'kuka_testing': testing with the KUKA IIWA Robotic Manipulator.\n - 'xarm6_testing': testing with the XArm6 Robotic Manipulator.\n """ logger.warning('Both the demo testing and the demo training are executed with the Log level ' 'set to DEBUG, so that the framework can be understood at a low level.') old_level = logger.level logger.setLevel(10) # CHECK EXISTENT ENVIRONMENT AND NAF AGENT # Overwrite previous Environment configuration if present if self.env: overwrite_env = input('Environment instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_env: logger.info('Demo could not run due to the presence of a user-configured Environment instance') return self.delete_environment() # Overwrite previous NAFAgent configuration if present if self.naf_agent: overwrite_naf_agent = input('NAFAgent instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_naf_agent: logger.info('Demo could not run due to the presence of a user-configured NAFAgent instance') return self.delete_naf_agent() # START DEMO if demo_type == 'kuka_testing': logger.info('Initializing demo Environment instance...') kuka_path = os.path.join(pybullet_data.getDataPath(), 'kuka_iiwa/kuka_with_gripper2.sdf') self.initialize_environment(manipulator_file=kuka_path, endeffector_index=13, fixed_joints=[6, 7, 8, 9, 10, 11, 12, 13], involved_joints=[0, 1, 2, 3, 4, 5], target_position=[0.4, 0.85, 0.71], obstacle_position=[0.45, 0.55, 0.55], initial_joint_positions=[0.9, 0.45, 0, 0, 0, 0], initial_positions_variation_range=[0, 0, .5, .5, .5, .5]) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Loading demo pretrained parameters') self.load_pretrained_parameters_from_weights_file(os.path.dirname( os.path.realpath(__file__)) + '/naf_components/demo_weights/weights_kuka.p') logger.info('Running 50 test episodes...') self.test_trained_model(50, 750) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() elif demo_type == 'xarm6_testing': logger.info('Initializing demo Environment instance...') xarm_path = os.path.join(pybullet_data.getDataPath(), 'xarm/xarm6_with_gripper.urdf') self.initialize_environment(manipulator_file=xarm_path, endeffector_index=12, fixed_joints=[0, 7, 8, 9, 10, 11, 12, 13], involved_joints=[1, 2, 3, 4, 5, 6], target_position=[0.3, 0.47, 0.61], obstacle_position=[0.25, 0.27, 0.5], initial_joint_positions=[0., 1., 0., -2.3, 0., 0., 0.], initial_positions_variation_range=[0, 0, 0, 0.3, 1, 1, 1], max_force=200, visualize=True) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Loading demo pretrained parameters') self.load_pretrained_parameters_from_weights_file( os.path.dirname(os.path.realpath(__file__)) + '/naf_components/demo_weights/weights_xarm6.p') logger.info('Running 50 test episodes...') self.test_trained_model(50, 750) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() else: logger.error('Incorrect demo type!') # Reset log level logger.setLevel(old_level) logger.warning('Log level has been reset to its original value')
Static methods
def get_required_hyperparameters() ‑> None
-
Returns a list with the required hyper-parameters. This function does not imply any logic, as it is only intended to help the user to know what hyper-parameters can be set. The required hyper-parameters are shown as DEBUG logs, so if the Log level is set to INFO or higher the function will not return anything.
Expand source code
@staticmethod def get_required_hyperparameters() -> None: """ Returns a list with the required hyper-parameters. This function does not imply any logic, as it is only intended to help the user to know what hyper-parameters can be set. The required hyper-parameters are shown as DEBUG logs, so if the Log level is set to INFO or higher the function will not return anything. """ if logger.level > 10: logger.error('get_required_hyperparameters() only shows information for DEBUG log level. ' 'Try running this method after setting the log level to DEBUG by calling ' 'set_log_level(10) class method') return hyperparameters_info_map = { 'Buffer Size': 'https://www.tensorflow.org/agents/tutorials/5_replay_buffers_tutorial?hl=es-419', 'Batch Size': 'https://www.kaggle.com/general/276990', 'Gamma (discount factor)': 'https://arxiv.org/pdf/2007.02040.pdf', 'Tau': 'https://arxiv.org/abs/1603.00748', 'Learning Rate': 'https://machinelearningmastery.com/understand-the-dynamics-of-learning-rate-on-deep' '-learning-neural-networks/', 'Update Frequency': 'https://medium.com/towards-data-science/applied-reinforcement-learning-v-normalized' '-advantage-function-naf-for-continuous-control-62ad143d3095', 'Number of Updates': 'https://medium.com/towards-data-science/applied-reinforcement-learning-v-normalized' '-advantage-function-naf-for-continuous-control-62ad143d3095'} logger.debug('Required Hyperparameters:') for hyperparam, info in hyperparameters_info_map.items(): logger.debug('{:<25} (see {:<10})'.format(hyperparam, info))
def plot_training_rewards(episode: int, mean_range: int = 50) ‑> None
-
Plots the mean reward of each batch of {mean_range} episodes for the "scores.txt" file stored in the checkpoints/ folder, corresponding to the episode received as parameter.
Args
episode
- Episode number from which to plot the results. The episode provided must be one of the checkpoints generated in the /checkpoints directory.
mean_range
- Range of episodes on which the mean is calculated. If the execution lasted for 200 episodes, and the mean_range is set to 50, 4 metric points will be generated.
Raises
FileNotFoundError
- Raises if the episode provided is not present as a directory in the /checkpoints directory generated after executing a training.
Expand source code
@staticmethod def plot_training_rewards(episode: int, mean_range: int = 50) -> None: """ Plots the mean reward of each batch of {mean_range} episodes for the "scores.txt" file stored in the checkpoints/ folder, corresponding to the episode received as parameter. Args: episode: Episode number from which to plot the results. The episode provided must be one of the checkpoints generated in the /checkpoints directory. mean_range: Range of episodes on which the mean is calculated. If the execution lasted for 200 episodes, and the mean_range is set to 50, 4 metric points will be generated. Raises: FileNotFoundError: Raises if the episode provided is not present as a directory in the /checkpoints directory generated after executing a training. """ try: with open(f'checkpoints/{episode}/scores.txt', 'r') as f: file = f.read() scores = json.loads(file) except FileNotFoundError as err: logger.error(f'File "scores.txt" located in checkpoints/{episode}/ folder was not found') raise err counter, cummulative_reward, values_to_plot = 0, list(), list() for episode, result in scores.items(): cummulative_reward.append(result[0]) counter += 1 if counter % mean_range == 0: mean = sum(cummulative_reward) / len(cummulative_reward) values_to_plot.append(mean) cummulative_reward = list() plt.plot(range(len(values_to_plot)), values_to_plot) plt.show()
def set_log_level(log_level: int) ‑> None
-
Set Log Level for the Logger.
Args
log_level
-
Log level to be set.
Valid values: 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL)
Expand source code
@staticmethod def set_log_level(log_level: int) -> None: """ Set Log Level for the Logger. Args: log_level: Log level to be set.\n Valid values: 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL) """ level_name_mappings = {10: 'DEBUG', 20: 'INFO', 30: 'WARNING', 40: 'ERROR', 50: 'CRITICAL'} # Set Log level if log_level in [10, 20, 30, 40, 50]: logger.info(f'Log Level has been set to {logger.level} ({level_name_mappings[logger.level]})') logger.setLevel(log_level) else: logger.error( f'The Log level provided is invalid, so the previous Log Level is maintained ({logger.level}))') logger.error('Valid values: 10 (DEBUG), 20 (INFO), 30 (WARNING), 40 (ERROR), 50 (CRITICAL)')
Methods
def delete_environment(self) ‑> None
-
Deletes the existing Environment instance, and disconnects the current Pybullet instance.
Expand source code
def delete_environment(self) -> None: """ Deletes the existing Environment instance, and disconnects the current Pybullet instance. """ if not self.env: logger.error('No existing instance of Environment found') return p.disconnect(self.env.physics_client) self.env = None logger.info('Environment instance has been successfully removed')
def delete_naf_agent(self) ‑> None
-
Deletes the existing NAFAgent instance.
Expand source code
def delete_naf_agent(self) -> None: """ Deletes the existing NAFAgent instance. """ if not self.naf_agent: logger.error('No existing instance of NAFAgent found') return self.naf_agent = None logger.info('NAFAgent instance has been successfully removed')
def get_environment_configuration(self) ‑> None
-
Shows the Environment configuration as logs on stdout.
Expand source code
def get_environment_configuration(self) -> None: """ Shows the Environment configuration as logs on stdout. """ if not self.env: logger.error("Environment is not initialized yet, can't show configuration") return logger.info('Environment Configuration:') logger.info(f'* Manipulator File: {self.env.manipulator_file}') logger.info(f'* End Effector index: {self.env.endeffector_index}') logger.info(f'* List of fixed Joints: {self.env.fixed_joints}') logger.info(f'* List of Joints involved in training: {self.env.involved_joints}') logger.info(f'* Position of the Target: {self.env.target_pos}') logger.info(f'* Position of the Obstacle: {self.env.obstacle_pos}') logger.info(f'* Initial position of joints: {self.env.initial_joint_positions}') logger.info(f'* Initial variation range of joints: {self.env.initial_positions_variation_range}') logger.info(f'* Max Force to be applied on joints: {self.env.max_force}') logger.info(f'* Visualize mode: {self.env.visualize}') logger.info(f'* Instance of the Environment: {self.env}')
def get_nafagent_configuration(self) ‑> None
-
Shows the NAFAgent configuration as logs on stdout.
Expand source code
def get_nafagent_configuration(self) -> None: """ Shows the NAFAgent configuration as logs on stdout. """ if not self.naf_agent: logger.error("NAFAgent is not initialized yet, can't show configuration") return logger.info('NAFAgent Configuration:') logger.info(f'* Environment Instance: {self.naf_agent.environment}') logger.info(f'* State Size: {self.naf_agent.state_size}') logger.info(f'* Action Size: {self.naf_agent.action_size}') logger.info(f'* Size of layers of the Neural Network: {self.naf_agent.layer_size}') logger.info(f'* Batch Size: {self.naf_agent.batch_size}') logger.info(f'* Buffer Size: {self.naf_agent.buffer_size}') logger.info(f'* Learning Rate: {self.naf_agent.learning_rate}') logger.info(f'* Tau: {self.naf_agent.tau}') logger.info(f'* Gamma: {self.naf_agent.gamma}') logger.info(f'* Update Frequency: {self.naf_agent.update_freq}') logger.info(f'* Number of Updates: {self.naf_agent.num_updates}') logger.info(f'* Checkpoint frequency: {self.naf_agent.checkpoint_frequency}') logger.info(f'* Device: {self.naf_agent.device}')
def initialize_environment(self, manipulator_file: str, endeffector_index: int, fixed_joints: List[int], involved_joints: List[int], target_position: List[float], obstacle_position: List[float], initial_joint_positions: List[float] = None, initial_positions_variation_range: List[float] = None, max_force: float = 200.0, visualize: bool = True) ‑> None
-
Initialize the Environment by creating an instance of the Environment class.
Args
manipulator_file
- Path to the manipulator's URDF or SDF file.
endeffector_index
- Index of the manipulator's end-effector.
fixed_joints
- List containing the indices of every joint not involved in the training.
involved_joints
- List containing the indices of every joint involved in the training.
target_position
- List containing the position of the target object, as 3D Cartesian coordinates.
obstacle_position
- List containing the position of the obstacle, as 3D Cartesian coordinates.
initial_joint_positions
- List containing as many items as the number of joints of the manipulator. Each item in the list corresponds to the initial position wanted for the joint with that same index.
initial_positions_variation_range
- List containing as many items as the number of joints of the manipulator. Each item in the list corresponds to the variation range wanted for the joint with that same index.
max_force
- Maximum force to be applied on the joints.
visualize
- Visualization mode.
Raises
InvalidEnvironmentParameter
- One or more parameters provided for the Environment initialization are invalid.
InvalidManipulatorFile
- The URDF/SDF file provided does not exist or cannot be loaded into the Pybullet Environment.
Expand source code
def initialize_environment( self, manipulator_file: str, endeffector_index: int, fixed_joints: List[int], involved_joints: List[int], target_position: List[float], obstacle_position: List[float], initial_joint_positions: List[float] = None, initial_positions_variation_range: List[float] = None, max_force: float = 200., visualize: bool = True) -> None: """ Initialize the Environment by creating an instance of the Environment class. Args: manipulator_file: Path to the manipulator's URDF or SDF file. endeffector_index: Index of the manipulator's end-effector. fixed_joints: List containing the indices of every joint not involved in the training. involved_joints: List containing the indices of every joint involved in the training. target_position: List containing the position of the target object, as 3D Cartesian coordinates. obstacle_position: List containing the position of the obstacle, as 3D Cartesian coordinates. initial_joint_positions: List containing as many items as the number of joints of the manipulator. Each item in the list corresponds to the initial position wanted for the joint with that same index. initial_positions_variation_range: List containing as many items as the number of joints of the manipulator. Each item in the list corresponds to the variation range wanted for the joint with that same index. max_force: Maximum force to be applied on the joints. visualize: Visualization mode. Raises: InvalidEnvironmentParameter: One or more parameters provided for the Environment initialization are invalid. InvalidManipulatorFile: The URDF/SDF file provided does not exist or cannot be loaded into the Pybullet Environment. """ logger.debug('Initializing Pybullet Environment...') environment_config = EnvironmentConfiguration( endeffector_index=endeffector_index, fixed_joints=fixed_joints, involved_joints=involved_joints, target_position=target_position, obstacle_position=obstacle_position, initial_joint_positions=initial_joint_positions, initial_positions_variation_range=initial_positions_variation_range, max_force=max_force, visualize=visualize) self.env = Environment(manipulator_file=manipulator_file, environment_config=environment_config) logger.info('Pybullet Environment successfully initialized') logger.debug(f'* The NAF Agent can now be initialized via the initialize_naf_agent() method')
def initialize_naf_agent(self, checkpoint_frequency: int = 500, seed: int = 0) ‑> None
-
Initialize the NAF Agent by creating an instance of the NAFAgent class.
Args
checkpoint_frequency
- Number of episodes required to generate a checkpoint.
seed
- Random seed.
Raises
EnvironmentNotInitialized
- Environment class has not been initialized.
InvalidNAFAgentParameter
- Either "checkpoint_frequency" or "seed" parameters have an invalid value/type.
Expand source code
def initialize_naf_agent(self, checkpoint_frequency: int = 500, seed: int = 0) -> None: """ Initialize the NAF Agent by creating an instance of the NAFAgent class. Args: checkpoint_frequency: Number of episodes required to generate a checkpoint. seed: Random seed. Raises: EnvironmentNotInitialized: Environment class has not been initialized. InvalidNAFAgentParameter: Either "checkpoint_frequency" or "seed" parameters have an invalid value/type. """ if not self.env: raise EnvironmentNotInitialized if not isinstance(checkpoint_frequency, int) or not isinstance(seed, int): raise InvalidNAFAgentParameter('Checkpoint Frequency or Seed received is not an integer') logger.debug('Initializing NAF Agent...') device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") self.naf_agent = NAFAgent(environment=self.env, state_size=self.env.observation_space.shape[0], action_size=self.env.action_space.shape[0], layer_size=256, batch_size=self._hyperparameters.batch_size, buffer_size=self._hyperparameters.buffer_size, learning_rate=self._hyperparameters.learning_rate, tau=self._hyperparameters.tau, gamma=self._hyperparameters.gamma, update_freq=self._hyperparameters.update_freq, num_updates=self._hyperparameters.num_updates, checkpoint_frequency=checkpoint_frequency, device=device, seed=seed) logger.info('NAF Agent successfully initialized') logger.debug('* The Robotic Manipulator training can now be launched via the run_training() method')
def load_pretrained_parameters_from_episode(self, episode: int) ‑> None
-
Loads previously trained weights into the current networks. The pretrained weights are retrieved from the checkpoints generated on a training execution, so the episode provided must be present in the checkpoints/ folder. As the weights are loaded in the neural networks contained in the NAFAgent class, the method will raise an error if either the Environment or the NAFAgent class are not initialized.
Args
episode
- Episode in the /checkpoints folder from which to retrieve the pretrained weights.
Raises
EnvironmentNotInitialized
- The Environment class has not been initialized.
NAFAgentNotInitialized
- The NAFAgentNotInitialized class has not been initialized.
MissingWeightsFile
- The weights.p file does not exist in the folder provided.
Expand source code
def load_pretrained_parameters_from_episode(self, episode: int) -> None: """ Loads previously trained weights into the current networks. The pretrained weights are retrieved from the checkpoints generated on a training execution, so the episode provided must be present in the checkpoints/ folder. As the weights are loaded in the neural networks contained in the NAFAgent class, the method will raise an error if either the Environment or the NAFAgent class are not initialized. Args: episode: Episode in the /checkpoints folder from which to retrieve the pretrained weights. Raises: EnvironmentNotInitialized: The Environment class has not been initialized. NAFAgentNotInitialized: The NAFAgentNotInitialized class has not been initialized. MissingWeightsFile: The weights.p file does not exist in the folder provided. """ if not self.env: raise EnvironmentNotInitialized if not self.naf_agent: raise NAFAgentNotInitialized self.naf_agent.initialize_pretrained_agent_from_episode(episode)
def load_pretrained_parameters_from_weights_file(self, parameters_file_path: str) ‑> None
-
Loads a pretrained network's weights into the current networks. The weights are loaded from the path provided in the {parameters_file_path} parameter. As the weights are loaded in the neural networks contained in the NAFAgent class, the method will raise an error if either the Environment or the NAFAgent class are not initialized.
Args
parameters_file_path
- Path to the .p file where the weights are stored.
Raises
EnvironmentNotInitialized
- The Environment class has not been initialized.
NAFAgentNotInitialized
- The NAFAgentNotInitialized class has not been initialized.
MissingWeightsFile
- The .p file path provided does not exist.
Expand source code
def load_pretrained_parameters_from_weights_file(self, parameters_file_path: str) -> None: """ Loads a pretrained network's weights into the current networks. The weights are loaded from the path provided in the {parameters_file_path} parameter. As the weights are loaded in the neural networks contained in the NAFAgent class, the method will raise an error if either the Environment or the NAFAgent class are not initialized. Args: parameters_file_path: Path to the .p file where the weights are stored. Raises: EnvironmentNotInitialized: The Environment class has not been initialized. NAFAgentNotInitialized: The NAFAgentNotInitialized class has not been initialized. MissingWeightsFile: The .p file path provided does not exist. """ if not self.env: raise EnvironmentNotInitialized if not self.naf_agent: raise NAFAgentNotInitialized self.naf_agent.initialize_pretrained_agent_from_weights_file(parameters_file_path)
def run_demo_testing(self, demo_type: str) ‑> None
-
Run a demo testing from a preconfigured Environment and NAFAgent, which shows how a robotic manipulator learns with the framework. The demo loads pretrained weights and executes 50 test episodes.
Args
demo_type
- Valid values:
-
'kuka_testing': testing with the KUKA IIWA Robotic Manipulator.
-
'xarm6_testing': testing with the XArm6 Robotic Manipulator.
Expand source code
def run_demo_testing(self, demo_type: str) -> None: """ Run a demo testing from a preconfigured Environment and NAFAgent, which shows how a robotic manipulator learns with the framework. The demo loads pretrained weights and executes 50 test episodes. Args: demo_type: Valid values:\n - 'kuka_testing': testing with the KUKA IIWA Robotic Manipulator.\n - 'xarm6_testing': testing with the XArm6 Robotic Manipulator.\n """ logger.warning('Both the demo testing and the demo training are executed with the Log level ' 'set to DEBUG, so that the framework can be understood at a low level.') old_level = logger.level logger.setLevel(10) # CHECK EXISTENT ENVIRONMENT AND NAF AGENT # Overwrite previous Environment configuration if present if self.env: overwrite_env = input('Environment instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_env: logger.info('Demo could not run due to the presence of a user-configured Environment instance') return self.delete_environment() # Overwrite previous NAFAgent configuration if present if self.naf_agent: overwrite_naf_agent = input('NAFAgent instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_naf_agent: logger.info('Demo could not run due to the presence of a user-configured NAFAgent instance') return self.delete_naf_agent() # START DEMO if demo_type == 'kuka_testing': logger.info('Initializing demo Environment instance...') kuka_path = os.path.join(pybullet_data.getDataPath(), 'kuka_iiwa/kuka_with_gripper2.sdf') self.initialize_environment(manipulator_file=kuka_path, endeffector_index=13, fixed_joints=[6, 7, 8, 9, 10, 11, 12, 13], involved_joints=[0, 1, 2, 3, 4, 5], target_position=[0.4, 0.85, 0.71], obstacle_position=[0.45, 0.55, 0.55], initial_joint_positions=[0.9, 0.45, 0, 0, 0, 0], initial_positions_variation_range=[0, 0, .5, .5, .5, .5]) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Loading demo pretrained parameters') self.load_pretrained_parameters_from_weights_file(os.path.dirname( os.path.realpath(__file__)) + '/naf_components/demo_weights/weights_kuka.p') logger.info('Running 50 test episodes...') self.test_trained_model(50, 750) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() elif demo_type == 'xarm6_testing': logger.info('Initializing demo Environment instance...') xarm_path = os.path.join(pybullet_data.getDataPath(), 'xarm/xarm6_with_gripper.urdf') self.initialize_environment(manipulator_file=xarm_path, endeffector_index=12, fixed_joints=[0, 7, 8, 9, 10, 11, 12, 13], involved_joints=[1, 2, 3, 4, 5, 6], target_position=[0.3, 0.47, 0.61], obstacle_position=[0.25, 0.27, 0.5], initial_joint_positions=[0., 1., 0., -2.3, 0., 0., 0.], initial_positions_variation_range=[0, 0, 0, 0.3, 1, 1, 1], max_force=200, visualize=True) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Loading demo pretrained parameters') self.load_pretrained_parameters_from_weights_file( os.path.dirname(os.path.realpath(__file__)) + '/naf_components/demo_weights/weights_xarm6.p') logger.info('Running 50 test episodes...') self.test_trained_model(50, 750) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() else: logger.error('Incorrect demo type!') # Reset log level logger.setLevel(old_level) logger.warning('Log level has been reset to its original value')
def run_demo_training(self, demo_type: str, verbose: bool = False) ‑> None
-
Run a demo from a preconfigured Environment and NAFAgent, which shows how the framework works. The training is executed for 20 episodes. Do not expect good results, as this is just a demo of the training configuration process and the number of episodes is not enough to achieve a good performance in the manipulator.
Args
demo_type
- Valid values:
-
'kuka_training': training with the KUKA IIWA Robotic Manipulator.
-
'xarm6_training': training with the XArm6 Robotic Manipulator.
verbose
- Verbose mode. Verbose mode functionality is applied on the same way as for the run_training() method.
Expand source code
def run_demo_training(self, demo_type: str, verbose: bool = False) -> None: """ Run a demo from a preconfigured Environment and NAFAgent, which shows how the framework works. The training is executed for 20 episodes. Do not expect good results, as this is just a demo of the training configuration process and the number of episodes is not enough to achieve a good performance in the manipulator. Args: demo_type: Valid values:\n - 'kuka_training': training with the KUKA IIWA Robotic Manipulator.\n - 'xarm6_training': training with the XArm6 Robotic Manipulator.\n verbose: Verbose mode. Verbose mode functionality is applied on the same way as for the run_training() method. """ logger.warning('Both the demo testing and the demo training are executed with the Log level ' 'set to DEBUG, so that the framework can be understood at a low level.') old_level = logger.level logger.setLevel(10) # CHECK EXISTENT ENVIRONMENT AND NAF AGENT # Overwrite previous Environment configuration if present if self.env: overwrite_env = input('Environment instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_env: logger.info('Demo could not run due to the presence of a user-configured Environment instance') return self.delete_environment() # Overwrite previous NAFAgent configuration if present if self.naf_agent: overwrite_naf_agent = input('NAFAgent instance found. Overwrite? [Y/n] ').lower() == 'y' if not overwrite_naf_agent: logger.info('Demo could not run due to the presence of a user-configured NAFAgent instance') return self.delete_naf_agent() # START DEMO if demo_type == 'kuka_training': logger.info('Initializing demo Environment instance...') self.initialize_environment(manipulator_file='kuka_iiwa/kuka_with_gripper2.sdf', endeffector_index=13, fixed_joints=[6, 7, 8, 9, 10, 11, 12, 13], involved_joints=[0, 1, 2, 3, 4, 5], target_position=[0.4, 0.85, 0.71], obstacle_position=[0.45, 0.55, 0.55], initial_joint_positions=[0.9, 0.45, 0, 0, 0, 0], initial_positions_variation_range=[0, 0, 0, 0, 0, 0], visualize=True) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Running training for 20 episodes. Do not expect good results, ' 'this is just a demo of the training configuration process') self.run_training(20, 400, verbose=verbose) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() elif demo_type == 'xarm6_training': logger.info('Initializing demo Environment instance...') xarm_path = os.path.join(pybullet_data.getDataPath(), 'xarm/xarm6_with_gripper.urdf') self.initialize_environment(manipulator_file=xarm_path, endeffector_index=12, fixed_joints=[0, 7, 8, 9, 10, 11, 12, 13], involved_joints=[1, 2, 3, 4, 5, 6], target_position=[0.3, 0.47, 0.61], obstacle_position=[0.25, 0.27, 0.5], initial_joint_positions=[0., 1., 0., -2.3, 0., 0., 0.], initial_positions_variation_range=[0, 0, 0, 0.3, 1, 1, 1], visualize=True) logger.info('Initializing demo NAFAgent instance...') self.initialize_naf_agent() logger.info('Running training for 20 episodes. Do not expect good results, ' 'this is just a demo of the confiuration process') self.run_training(20, 400, verbose=verbose) # Reset Environment and NAFAgent self.delete_environment() self.delete_naf_agent() else: logger.error('Incorrect demo type!') # Reset log level logger.setLevel(old_level) logger.warning('Log level has been reset to its original value')
def run_training(self, episodes: int, frames: Optional[int] = 500, verbose: bool = True)
-
Execute a training on the configured Environment with the configured NAF Agent. The training is executed for {episodes}, and for {frames} timesteps per episode.
Args
episodes
- Maximum number of episodes to execute.
frames
- Maximum number of timesteps to execute per episode.
verbose
- Verbose mode.
-
If set to True, each timestep will log information about the current state, the action chosen from that state, the current reward and the cummulative reward until that timestep. In addition, each time an episode ends information about the total reward obtained, the number of frames required to complete the episode, the mean reward obtained along the episode and the execution time of the episode are logged.
-
If set to False, each time an episode ends information about the total reward obtained, the number of frames required to complete the episode, the mean reward obtained along the episode and the execution time of the episode are logged.
It is recommended to use the verbose mode only in a development/debugging context, since logging information for each timestep greatly reduces the visibility of what is happening during the training.
Raises
ConfigurationIncomplete
- Either NAFAgent or Environment has not been initialized.
Expand source code
def run_training(self, episodes: int, frames: Optional[int] = 500, verbose: bool = True): """ Execute a training on the configured Environment with the configured NAF Agent. The training is executed for {episodes}, and for {frames} timesteps per episode. Args: episodes: Maximum number of episodes to execute. frames: Maximum number of timesteps to execute per episode. verbose: Verbose mode.\n - If set to True, each timestep will log information about the current state, the action chosen from that state, the current reward and the cummulative reward until that timestep. In addition, each time an episode ends information about the total reward obtained, the number of frames required to complete the episode, the mean reward obtained along the episode and the execution time of the episode are logged.\n - If set to False, each time an episode ends information about the total reward obtained, the number of frames required to complete the episode, the mean reward obtained along the episode and the execution time of the episode are logged.\n It is recommended to use the verbose mode only in a development/debugging context, since logging information for each timestep greatly reduces the visibility of what is happening during the training. Raises: ConfigurationIncomplete: Either NAFAgent or Environment has not been initialized. """ if not self.naf_agent or not self.env: raise ConfigurationIncomplete self.naf_agent.run(frames, episodes, verbose)
def set_hyperparameter(self, hyperparameter: str, value: Union[float, int]) ‑> None
-
Sets the specified value on the given hyper-parameter. Checking are performed to ensure that the new value for the hyper-parameter is a valid value.
Args
hyperparameter
-
Name of the hyper-parameter to be updated. Allowed names are:
-
buffer_size/buffersize/BUFFER_SIZE/BUFFERSIZE
-
batch_size/batchsize/BATCH_SIZE/BATCHSIZE
-
gamma/GAMMA
-
tau/TAU
-
learning_rate/learningrate/LEARNING_RATE/LEARNINGRATE
-
update_freq/updatefreq/UPDATE_FREQ/UPDATEFREQ
-
num_update/numupdate/NUMUPDATE/NUM_UPDATE
-
value
- New value for the given hyper-parameter.
Raises
InvalidHyperParameter
- The hyperparameter received has an invalid value/type, or the hyperparameter name received is not one of the accepted values.
Expand source code
def set_hyperparameter(self, hyperparameter: str, value: Union[float, int]) -> None: """ Sets the specified value on the given hyper-parameter. Checking are performed to ensure that the new value for the hyper-parameter is a valid value. Args: hyperparameter: Name of the hyper-parameter to be updated. Allowed names are:\n - buffer_size/buffersize/BUFFER_SIZE/BUFFERSIZE\n - batch_size/batchsize/BATCH_SIZE/BATCHSIZE\n - gamma/GAMMA\n - tau/TAU\n - learning_rate/learningrate/LEARNING_RATE/LEARNINGRATE\n - update_freq/updatefreq/UPDATE_FREQ/UPDATEFREQ\n - num_update/numupdate/NUMUPDATE/NUM_UPDATE\n value: New value for the given hyper-parameter. Raises: InvalidHyperParameter: The hyperparameter received has an invalid value/type, or the hyperparameter name received is not one of the accepted values. """ # Set BUFFER SIZE if re.match(r'^(buffer_size|buffersize|BUFFER_SIZE|BUFFERSIZE)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Buffer Size is not an int or has a value lower than 0') self._hyperparameters.buffer_size = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set BATCH SIZE elif re.match(r'^(batch_size|batchsize|BATCH_SIZE|BATCHSIZE)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Batch Size is not an int or has a value lower than 0') self._hyperparameters.batch_size = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set GAMMA elif re.match(r'^(gamma|GAMMA)$', hyperparameter): if not (isinstance(value, (int, float)) and 0 < value < 1): raise InvalidHyperParameter('Gamma is not a float or its value is out of range (0, 1)') self._hyperparameters.gamma = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set TAU elif re.match(r'^(tau|TAU)$', hyperparameter): if not (isinstance(value, (int, float)) and 0 <= value <= 1): raise InvalidHyperParameter('Tau is not a float or its value is out of range [0, 1]') self._hyperparameters.tau = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set LEARNING RATE elif re.match(r'^(learning_rate|learningrate|LEARNING_RATE|LEARNINGRATE)$', hyperparameter): if not (isinstance(value, (int, float)) and value > 0): raise InvalidHyperParameter('Learning Rate is not a float or has a value lower than 0') self._hyperparameters.learning_rate = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set UPDATE EVERY elif re.match(r'^(update_freq|updatefreq|UPDATE_FREQ|UPDATEFREQ)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Update Frequency is not an int or has a value lower than 0') self._hyperparameters.update_freq = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') # Set NUPDATE elif re.match(r'^(num_update|numupdate|NUMUPDATE|NUM_UPDATE)$', hyperparameter): if not (isinstance(value, int) and value > 0): raise InvalidHyperParameter('Buffer Size is not an int or has a value lower than 0') self._hyperparameters.num_updates = value logger.info(f'Hyperparameter {hyperparameter} has been set to {value}') else: raise InvalidHyperParameter( 'The hyperparameter name passed as parameter is not valid. Valid hyperparameters are: ' '["buffer_size", "batch_size", "gamma", "tau", "learning_rate", "update_freq", "num_update"]')
def test_trained_model(self, n_episodes: int, frames: int) ‑> None
-
Tests a previously trained agent through the execution of {n_episodes} test episodes, for {frames} timesteps each. When the test concludes, the results of the test are logged on terminal.
Args
n_episodes
- Number of test episodes to execute.
frames
- Number of timesteps per test episode.
Raises
ConfigurationIncomplete
- Either the Environment class or the NAFAgent class are not initialized.
Expand source code
def test_trained_model(self, n_episodes: int, frames: int) -> None: """ Tests a previously trained agent through the execution of {n_episodes} test episodes, for {frames} timesteps each. When the test concludes, the results of the test are logged on terminal. Args: n_episodes: Number of test episodes to execute. frames: Number of timesteps per test episode. Raises: ConfigurationIncomplete: Either the Environment class or the NAFAgent class are not initialized. """ # Check if Environment and NAFAgent initialized if not self.naf_agent or not self.env: raise ConfigurationIncomplete # Initialize Test result's history results, num_collisions = list(), 0 for _ in range(n_episodes): state = self.env.reset() for frame in range(frames): action = self.naf_agent.act(state) next_state, reward, done = self.env.step(action) state = next_state if done: if reward == 250: results.append((True, frame)) break else: results.append((False, frame)) num_collisions += 1 break if frame == frames - 1 and not done: results.append((False, frame)) break logger.info('Test Episode number {ep} completed\n'.format(ep=_ + 1)) logger.info('RESULTS OF THE TEST:') for i, result in enumerate(results): logger.info(f'Results of Iteration {i + 1}: COMPLETED: {result[0]}. FRAMES: {result[1]}') logger.info(f'Number of successful executions: ' f'{[res[0] for res in results].count(True)}/{len(results)} ' f'({([res[0] for res in results].count(True) / len(results)) * 100}%)') logger.info(f'Average number of frames required to complete an episode: ' f'{np.mean(np.array([res[1] for res in results if res[0]]))}') logger.info(f'Number of episodes terminated because of collisions: {num_collisions}')