Module robotic_manipulator_rloa.naf_components.naf_algorithm

Expand source code
import json
import os
import random
import time
from typing import Tuple, Dict

import torch
import torch.nn.functional as F
import torch.optim as optim
from numpy.typing import NDArray
from torch.nn.utils import clip_grad_norm_

from robotic_manipulator_rloa.utils.logger import get_global_logger
from robotic_manipulator_rloa.environment.environment import Environment
from robotic_manipulator_rloa.utils.exceptions import MissingWeightsFile
from robotic_manipulator_rloa.naf_components.naf_neural_network import NAF
from robotic_manipulator_rloa.utils.replay_buffer import ReplayBuffer


logger = get_global_logger()


class NAFAgent:

    MODEL_PATH = 'model.p'  # Filename where the parameters of the trained torch neural network are stored

    def __init__(self,
                 environment: Environment,
                 state_size: int,
                 action_size: int,
                 layer_size: int,
                 batch_size: int,
                 buffer_size: int,
                 learning_rate: float,
                 tau: float,
                 gamma: float,
                 update_freq: int,
                 num_updates: int,
                 checkpoint_frequency: int,
                 device: torch.device,
                 seed: int) -> None:
        """
        Interacts with and learns from the environment via the NAF algorithm.
        Args:
            environment: Instance of Environment class.
            state_size: Dimension of the states.
            action_size: Dimension of the actions.
            layer_size: Size for the hidden layers of the neural network.
            batch_size: Number of experiences to train with per training batch.
            buffer_size: Maximum number of experiences to be stored in Replay Buffer.
            learning_rate: Learning rate for neural network's optimizer.
            tau: Hyperparameter for soft updating the target network.
            gamma: Discount factor.
            update_freq: Number of timesteps after which the main neural network is updated.
            num_updates: Number of updates performed when learning.
            checkpoint_frequency: Number of episodes after which a checkpoint is generated.
            device: Device used (CPU or CUDA).
            seed: Random seed.
        """
        # Create required parent directory
        os.makedirs('checkpoints/', exist_ok=True)

        self.environment = environment
        self.state_size = state_size
        self.action_size = action_size
        self.layer_size = layer_size
        self.buffer_size = buffer_size
        self.learning_rate = learning_rate
        random.seed(seed)
        self.device = device
        self.tau = tau
        self.gamma = gamma
        self.update_freq = update_freq
        self.num_updates = num_updates
        self.batch_size = batch_size
        self.checkpoint_frequency = checkpoint_frequency

        # Initalize Q-Networks
        self.qnetwork_main = NAF(state_size, action_size, layer_size, seed, device).to(device)
        self.qnetwork_target = NAF(state_size, action_size, layer_size, seed, device).to(device)

        # Define Adam as optimizer
        self.optimizer = optim.Adam(self.qnetwork_main.parameters(), lr=learning_rate)

        # Initialize Replay memory
        self.memory = ReplayBuffer(buffer_size, batch_size, self.device, seed)

        # Initialize update time step counter (for updating every {update_freq} steps)
        self.update_t_step = 0

    def initialize_pretrained_agent_from_episode(self, episode: int) -> None:
        """
        Loads the previously trained weights into the main and target neural networks.
        The pretrained weights are retrieved from the checkpoints generated on a training execution, so
        the episode provided must be present in the checkpoints/ folder.
        Args:
            episode: Episode from which to retrieve the pretrained weights.
        Raises:
            MissingWeightsFile: The weights.p file is not present in the checkpoints/{episode}/ folder provided.
        """
        # Check if file is present in checkpoints/{episode}/ directory
        if not os.path.isfile(f'checkpoints/{episode}/weights.p'):
            raise MissingWeightsFile

        logger.debug(f'Loading naf_components weights from trained naf_components on episode {episode}...')
        self.qnetwork_main.load_state_dict(torch.load(f'checkpoints/{episode}/weights.p'))
        self.qnetwork_target.load_state_dict(torch.load(f'checkpoints/{episode}/weights.p'))
        logger.info(f'Loaded weights from trained naf_components on episode {episode}')

    def initialize_pretrained_agent_from_weights_file(self, weights_path: str) -> None:
        """
        Loads the previously trained weights into the main and target neural networks.
        The pretrained weights are retrieved from a .p file containing the weights, located in 
        the {weights_path} path.
        Args:
            weights_path: Path to the .p file containing the network's weights.
        Raises:
            MissingWeightsFile: The file path provided does not exist.
        """
        # Check if file is present
        if not os.path.isfile(weights_path):
            raise MissingWeightsFile

        logger.debug('Loading naf_components weights from trained naf_components...')
        self.qnetwork_main.load_state_dict(torch.load(weights_path))
        self.qnetwork_target.load_state_dict(torch.load(weights_path))
        logger.info('Loaded pre-trained weights for the NN')

    def step(self, state: NDArray, action: NDArray, reward: float, next_state: NDArray, done: int) -> None:
        """
        Stores in the ReplayBuffer the new experience composed by the parameters received,
        and learns only if the Buffer contains enough experiences to fill a batch. The
        learning will occur if the update frequency {update_freq} is reached, in which case it
        will learn {num_updates} times.
        Args:
            state: Current state.
            action: Action performed from state {state}.
            reward: Reward obtained after performing action {action} from state {state}.
            next_state: New state reached after performing action {action} from state {state}.
            done: Integer (0 or 1) indicating whether a terminal state have been reached.
        """

        # Save experience in replay memory
        self.memory.add(state, action, reward, next_state, done)

        # Learning will be performed every {update_freq}} time-steps.
        self.update_t_step = (self.update_t_step + 1) % self.update_freq  # Update time step counter
        if self.update_t_step == 0:
            # If enough samples are available in memory, get random subset and learn
            if len(self.memory) > self.batch_size:
                for _ in range(self.num_updates):
                    # Pick random batch of experiences from memory
                    experiences = self.memory.sample()

                    # Learn from experiences and get loss
                    self.learn(experiences)

    def act(self, state: NDArray) -> NDArray:
        """
        Extracts the action which maximizes the Q-Function, by getting the output of the mu layer
        of the main neural network.
        Args:
            state: Current state from which to pick the best action.
        Returns:
            Action which maximizes Q-Function.
        """
        state = torch.from_numpy(state).float().to(self.device)

        # Set evaluation mode on naf_components for obtaining a prediction
        self.qnetwork_main.eval()
        with torch.no_grad():
            # Get the action with maximum Q-Value from the local network
            action, _, _ = self.qnetwork_main(state.unsqueeze(0))

        # Set training mode on naf_components for future use
        self.qnetwork_main.train()

        return action.cpu().squeeze().numpy()

    def learn(self, experiences: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> None:
        """
        Calculate the Q-Function estimate from the main neural network, the Target value from
        the target neural network, and calculate the loss with both values, all by feeding the received
        batch of experience tuples to both networks. After loss is calculated, backpropagation is performed on the
        main network from the given loss, so that the weights of the main network are updated.
        Args:
            experiences: Tuple of five elements, where each element is a torch.Tensor of length {batch_size}.
        """
        # Set gradients of all optimized torch Tensors to zero
        self.optimizer.zero_grad()
        states, actions, rewards, next_states, dones = experiences

        # Get the Value Function for the next state from target naf_components (no_grad() disables gradient calculation)
        with torch.no_grad():
            _, _, V_ = self.qnetwork_target(next_states)

        # Compute the target Value Functions for the given experiences.
        # The target value is calculated as target_val = r + gamma * V(s')
        target_values = rewards + (self.gamma * V_)

        # Compute the expected Value Function from main network
        _, q_estimate, _ = self.qnetwork_main(states, actions)

        # Compute loss between target value and expected Q value
        loss = F.mse_loss(q_estimate, target_values)

        # Perform backpropagation for minimizing loss
        loss.backward()
        clip_grad_norm_(self.qnetwork_main.parameters(), 1)
        self.optimizer.step()

        # Update the target network softly with the local one
        self.soft_update(self.qnetwork_main, self.qnetwork_target)

        # return loss.detach().cpu().numpy()

    def soft_update(self, main_nn: NAF, target_nn: NAF) -> None:
        """
        Soft update naf_components parameters following this formula:\n
                    θ_target = τ*θ_local + (1 - τ)*θ_target
        Args:
            main_nn: Main torch neural network.
            target_nn: Target torch neural network.
        """
        for target_param, main_param in zip(target_nn.parameters(), main_nn.parameters()):
            target_param.data.copy_(self.tau * main_param.data + (1. - self.tau) * target_param.data)

    def run(self, frames: int = 1000, episodes: int = 1000, verbose: bool = True) -> Dict[int, Tuple[float, int]]:
        """
        Execute training flow of the NAF algorithm on the given environment.
        Args:
            frames: Number of maximum frames or timesteps per episode.
            episodes: Number of episodes required to terminate the training.
            verbose: Boolean indicating whether many or few logs are shown.
        Returns:
            Returns the score history generated along the training.
        """
        logger.info('Training started')
        # Initialize 'scores' dictionary to store rewards and timesteps executed for each episode
        scores = {episode: (0, 0) for episode in range(1, episodes + 1)}

        # Iterate through every episode
        for episode in range(episodes):
            logger.info(f'Running Episode {episode + 1}')
            start = time.time()  # Timer to measure execution time per episode
            state = self.environment.reset(verbose)
            score, mean = 0, list()

            for frame in range(1, frames + 1):
                if verbose: logger.info(f'Running frame {frame} in episode {episode + 1}')

                # Pick action according to current state
                if verbose: logger.info(f'Current State: {state}')
                action = self.act(state)
                if verbose: logger.info(f'Action chosen for the given state is: {action}')

                # Perform action on environment and get new state and reward
                next_state, reward, done = self.environment.step(action)

                # Save the experience in the ReplayBuffer, and learn from previous experiences if applicable
                self.step(state, action, reward, next_state, done)

                state = next_state  # Update state to next state
                score += reward
                mean.append(reward)

                if verbose: logger.info(f'Reward: {reward}  -  Cumulative reward: {score}\n')

                if done:
                    break

            # Updates scores history
            scores[episode + 1] = (score, frame)  # save most recent score and last frame
            logger.info(f'Reward:                             {score}')
            logger.info(f'Number of frames:                   {frame}')
            logger.info(f'Mean of rewards on this episode:    {sum(mean) / frames}')
            logger.info(f'Time taken for this episode:        {round(time.time() - start, 3)} secs\n')

            # Save the episode's performance if it is a checkpoint episode
            if (episode + 1) % self.checkpoint_frequency == 0:
                # Create parent directory for current episode
                os.makedirs(f'checkpoints/{episode + 1}/', exist_ok=True)
                # Save naf_components weights
                torch.save(self.qnetwork_main.state_dict(), f'checkpoints/{episode + 1}/weights.p')
                # Save naf_components's performance metrics
                with open(f'checkpoints/{episode + 1}/scores.txt', 'w') as f:
                    f.write(json.dumps(scores))

        torch.save(self.qnetwork_main.state_dict(), self.MODEL_PATH)
        logger.info(f'Model has been successfully saved in {self.MODEL_PATH}')

        return scores

Classes

class NAFAgent (environment: Environment, state_size: int, action_size: int, layer_size: int, batch_size: int, buffer_size: int, learning_rate: float, tau: float, gamma: float, update_freq: int, num_updates: int, checkpoint_frequency: int, device: torch.device, seed: int)

Interacts with and learns from the environment via the NAF algorithm.

Args

environment
Instance of Environment class.
state_size
Dimension of the states.
action_size
Dimension of the actions.
layer_size
Size for the hidden layers of the neural network.
batch_size
Number of experiences to train with per training batch.
buffer_size
Maximum number of experiences to be stored in Replay Buffer.
learning_rate
Learning rate for neural network's optimizer.
tau
Hyperparameter for soft updating the target network.
gamma
Discount factor.
update_freq
Number of timesteps after which the main neural network is updated.
num_updates
Number of updates performed when learning.
checkpoint_frequency
Number of episodes after which a checkpoint is generated.
device
Device used (CPU or CUDA).
seed
Random seed.
Expand source code
class NAFAgent:

    MODEL_PATH = 'model.p'  # Filename where the parameters of the trained torch neural network are stored

    def __init__(self,
                 environment: Environment,
                 state_size: int,
                 action_size: int,
                 layer_size: int,
                 batch_size: int,
                 buffer_size: int,
                 learning_rate: float,
                 tau: float,
                 gamma: float,
                 update_freq: int,
                 num_updates: int,
                 checkpoint_frequency: int,
                 device: torch.device,
                 seed: int) -> None:
        """
        Interacts with and learns from the environment via the NAF algorithm.
        Args:
            environment: Instance of Environment class.
            state_size: Dimension of the states.
            action_size: Dimension of the actions.
            layer_size: Size for the hidden layers of the neural network.
            batch_size: Number of experiences to train with per training batch.
            buffer_size: Maximum number of experiences to be stored in Replay Buffer.
            learning_rate: Learning rate for neural network's optimizer.
            tau: Hyperparameter for soft updating the target network.
            gamma: Discount factor.
            update_freq: Number of timesteps after which the main neural network is updated.
            num_updates: Number of updates performed when learning.
            checkpoint_frequency: Number of episodes after which a checkpoint is generated.
            device: Device used (CPU or CUDA).
            seed: Random seed.
        """
        # Create required parent directory
        os.makedirs('checkpoints/', exist_ok=True)

        self.environment = environment
        self.state_size = state_size
        self.action_size = action_size
        self.layer_size = layer_size
        self.buffer_size = buffer_size
        self.learning_rate = learning_rate
        random.seed(seed)
        self.device = device
        self.tau = tau
        self.gamma = gamma
        self.update_freq = update_freq
        self.num_updates = num_updates
        self.batch_size = batch_size
        self.checkpoint_frequency = checkpoint_frequency

        # Initalize Q-Networks
        self.qnetwork_main = NAF(state_size, action_size, layer_size, seed, device).to(device)
        self.qnetwork_target = NAF(state_size, action_size, layer_size, seed, device).to(device)

        # Define Adam as optimizer
        self.optimizer = optim.Adam(self.qnetwork_main.parameters(), lr=learning_rate)

        # Initialize Replay memory
        self.memory = ReplayBuffer(buffer_size, batch_size, self.device, seed)

        # Initialize update time step counter (for updating every {update_freq} steps)
        self.update_t_step = 0

    def initialize_pretrained_agent_from_episode(self, episode: int) -> None:
        """
        Loads the previously trained weights into the main and target neural networks.
        The pretrained weights are retrieved from the checkpoints generated on a training execution, so
        the episode provided must be present in the checkpoints/ folder.
        Args:
            episode: Episode from which to retrieve the pretrained weights.
        Raises:
            MissingWeightsFile: The weights.p file is not present in the checkpoints/{episode}/ folder provided.
        """
        # Check if file is present in checkpoints/{episode}/ directory
        if not os.path.isfile(f'checkpoints/{episode}/weights.p'):
            raise MissingWeightsFile

        logger.debug(f'Loading naf_components weights from trained naf_components on episode {episode}...')
        self.qnetwork_main.load_state_dict(torch.load(f'checkpoints/{episode}/weights.p'))
        self.qnetwork_target.load_state_dict(torch.load(f'checkpoints/{episode}/weights.p'))
        logger.info(f'Loaded weights from trained naf_components on episode {episode}')

    def initialize_pretrained_agent_from_weights_file(self, weights_path: str) -> None:
        """
        Loads the previously trained weights into the main and target neural networks.
        The pretrained weights are retrieved from a .p file containing the weights, located in 
        the {weights_path} path.
        Args:
            weights_path: Path to the .p file containing the network's weights.
        Raises:
            MissingWeightsFile: The file path provided does not exist.
        """
        # Check if file is present
        if not os.path.isfile(weights_path):
            raise MissingWeightsFile

        logger.debug('Loading naf_components weights from trained naf_components...')
        self.qnetwork_main.load_state_dict(torch.load(weights_path))
        self.qnetwork_target.load_state_dict(torch.load(weights_path))
        logger.info('Loaded pre-trained weights for the NN')

    def step(self, state: NDArray, action: NDArray, reward: float, next_state: NDArray, done: int) -> None:
        """
        Stores in the ReplayBuffer the new experience composed by the parameters received,
        and learns only if the Buffer contains enough experiences to fill a batch. The
        learning will occur if the update frequency {update_freq} is reached, in which case it
        will learn {num_updates} times.
        Args:
            state: Current state.
            action: Action performed from state {state}.
            reward: Reward obtained after performing action {action} from state {state}.
            next_state: New state reached after performing action {action} from state {state}.
            done: Integer (0 or 1) indicating whether a terminal state have been reached.
        """

        # Save experience in replay memory
        self.memory.add(state, action, reward, next_state, done)

        # Learning will be performed every {update_freq}} time-steps.
        self.update_t_step = (self.update_t_step + 1) % self.update_freq  # Update time step counter
        if self.update_t_step == 0:
            # If enough samples are available in memory, get random subset and learn
            if len(self.memory) > self.batch_size:
                for _ in range(self.num_updates):
                    # Pick random batch of experiences from memory
                    experiences = self.memory.sample()

                    # Learn from experiences and get loss
                    self.learn(experiences)

    def act(self, state: NDArray) -> NDArray:
        """
        Extracts the action which maximizes the Q-Function, by getting the output of the mu layer
        of the main neural network.
        Args:
            state: Current state from which to pick the best action.
        Returns:
            Action which maximizes Q-Function.
        """
        state = torch.from_numpy(state).float().to(self.device)

        # Set evaluation mode on naf_components for obtaining a prediction
        self.qnetwork_main.eval()
        with torch.no_grad():
            # Get the action with maximum Q-Value from the local network
            action, _, _ = self.qnetwork_main(state.unsqueeze(0))

        # Set training mode on naf_components for future use
        self.qnetwork_main.train()

        return action.cpu().squeeze().numpy()

    def learn(self, experiences: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> None:
        """
        Calculate the Q-Function estimate from the main neural network, the Target value from
        the target neural network, and calculate the loss with both values, all by feeding the received
        batch of experience tuples to both networks. After loss is calculated, backpropagation is performed on the
        main network from the given loss, so that the weights of the main network are updated.
        Args:
            experiences: Tuple of five elements, where each element is a torch.Tensor of length {batch_size}.
        """
        # Set gradients of all optimized torch Tensors to zero
        self.optimizer.zero_grad()
        states, actions, rewards, next_states, dones = experiences

        # Get the Value Function for the next state from target naf_components (no_grad() disables gradient calculation)
        with torch.no_grad():
            _, _, V_ = self.qnetwork_target(next_states)

        # Compute the target Value Functions for the given experiences.
        # The target value is calculated as target_val = r + gamma * V(s')
        target_values = rewards + (self.gamma * V_)

        # Compute the expected Value Function from main network
        _, q_estimate, _ = self.qnetwork_main(states, actions)

        # Compute loss between target value and expected Q value
        loss = F.mse_loss(q_estimate, target_values)

        # Perform backpropagation for minimizing loss
        loss.backward()
        clip_grad_norm_(self.qnetwork_main.parameters(), 1)
        self.optimizer.step()

        # Update the target network softly with the local one
        self.soft_update(self.qnetwork_main, self.qnetwork_target)

        # return loss.detach().cpu().numpy()

    def soft_update(self, main_nn: NAF, target_nn: NAF) -> None:
        """
        Soft update naf_components parameters following this formula:\n
                    θ_target = τ*θ_local + (1 - τ)*θ_target
        Args:
            main_nn: Main torch neural network.
            target_nn: Target torch neural network.
        """
        for target_param, main_param in zip(target_nn.parameters(), main_nn.parameters()):
            target_param.data.copy_(self.tau * main_param.data + (1. - self.tau) * target_param.data)

    def run(self, frames: int = 1000, episodes: int = 1000, verbose: bool = True) -> Dict[int, Tuple[float, int]]:
        """
        Execute training flow of the NAF algorithm on the given environment.
        Args:
            frames: Number of maximum frames or timesteps per episode.
            episodes: Number of episodes required to terminate the training.
            verbose: Boolean indicating whether many or few logs are shown.
        Returns:
            Returns the score history generated along the training.
        """
        logger.info('Training started')
        # Initialize 'scores' dictionary to store rewards and timesteps executed for each episode
        scores = {episode: (0, 0) for episode in range(1, episodes + 1)}

        # Iterate through every episode
        for episode in range(episodes):
            logger.info(f'Running Episode {episode + 1}')
            start = time.time()  # Timer to measure execution time per episode
            state = self.environment.reset(verbose)
            score, mean = 0, list()

            for frame in range(1, frames + 1):
                if verbose: logger.info(f'Running frame {frame} in episode {episode + 1}')

                # Pick action according to current state
                if verbose: logger.info(f'Current State: {state}')
                action = self.act(state)
                if verbose: logger.info(f'Action chosen for the given state is: {action}')

                # Perform action on environment and get new state and reward
                next_state, reward, done = self.environment.step(action)

                # Save the experience in the ReplayBuffer, and learn from previous experiences if applicable
                self.step(state, action, reward, next_state, done)

                state = next_state  # Update state to next state
                score += reward
                mean.append(reward)

                if verbose: logger.info(f'Reward: {reward}  -  Cumulative reward: {score}\n')

                if done:
                    break

            # Updates scores history
            scores[episode + 1] = (score, frame)  # save most recent score and last frame
            logger.info(f'Reward:                             {score}')
            logger.info(f'Number of frames:                   {frame}')
            logger.info(f'Mean of rewards on this episode:    {sum(mean) / frames}')
            logger.info(f'Time taken for this episode:        {round(time.time() - start, 3)} secs\n')

            # Save the episode's performance if it is a checkpoint episode
            if (episode + 1) % self.checkpoint_frequency == 0:
                # Create parent directory for current episode
                os.makedirs(f'checkpoints/{episode + 1}/', exist_ok=True)
                # Save naf_components weights
                torch.save(self.qnetwork_main.state_dict(), f'checkpoints/{episode + 1}/weights.p')
                # Save naf_components's performance metrics
                with open(f'checkpoints/{episode + 1}/scores.txt', 'w') as f:
                    f.write(json.dumps(scores))

        torch.save(self.qnetwork_main.state_dict(), self.MODEL_PATH)
        logger.info(f'Model has been successfully saved in {self.MODEL_PATH}')

        return scores

Class variables

var MODEL_PATH

Methods

def act(self, state: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]]) ‑> numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]]

Extracts the action which maximizes the Q-Function, by getting the output of the mu layer of the main neural network.

Args

state
Current state from which to pick the best action.

Returns

Action which maximizes Q-Function.

Expand source code
def act(self, state: NDArray) -> NDArray:
    """
    Extracts the action which maximizes the Q-Function, by getting the output of the mu layer
    of the main neural network.
    Args:
        state: Current state from which to pick the best action.
    Returns:
        Action which maximizes Q-Function.
    """
    state = torch.from_numpy(state).float().to(self.device)

    # Set evaluation mode on naf_components for obtaining a prediction
    self.qnetwork_main.eval()
    with torch.no_grad():
        # Get the action with maximum Q-Value from the local network
        action, _, _ = self.qnetwork_main(state.unsqueeze(0))

    # Set training mode on naf_components for future use
    self.qnetwork_main.train()

    return action.cpu().squeeze().numpy()
def initialize_pretrained_agent_from_episode(self, episode: int) ‑> None

Loads the previously trained weights into the main and target neural networks. The pretrained weights are retrieved from the checkpoints generated on a training execution, so the episode provided must be present in the checkpoints/ folder.

Args

episode
Episode from which to retrieve the pretrained weights.

Raises

MissingWeightsFile
The weights.p file is not present in the checkpoints/{episode}/ folder provided.
Expand source code
def initialize_pretrained_agent_from_episode(self, episode: int) -> None:
    """
    Loads the previously trained weights into the main and target neural networks.
    The pretrained weights are retrieved from the checkpoints generated on a training execution, so
    the episode provided must be present in the checkpoints/ folder.
    Args:
        episode: Episode from which to retrieve the pretrained weights.
    Raises:
        MissingWeightsFile: The weights.p file is not present in the checkpoints/{episode}/ folder provided.
    """
    # Check if file is present in checkpoints/{episode}/ directory
    if not os.path.isfile(f'checkpoints/{episode}/weights.p'):
        raise MissingWeightsFile

    logger.debug(f'Loading naf_components weights from trained naf_components on episode {episode}...')
    self.qnetwork_main.load_state_dict(torch.load(f'checkpoints/{episode}/weights.p'))
    self.qnetwork_target.load_state_dict(torch.load(f'checkpoints/{episode}/weights.p'))
    logger.info(f'Loaded weights from trained naf_components on episode {episode}')
def initialize_pretrained_agent_from_weights_file(self, weights_path: str) ‑> None

Loads the previously trained weights into the main and target neural networks. The pretrained weights are retrieved from a .p file containing the weights, located in the {weights_path} path.

Args

weights_path
Path to the .p file containing the network's weights.

Raises

MissingWeightsFile
The file path provided does not exist.
Expand source code
def initialize_pretrained_agent_from_weights_file(self, weights_path: str) -> None:
    """
    Loads the previously trained weights into the main and target neural networks.
    The pretrained weights are retrieved from a .p file containing the weights, located in 
    the {weights_path} path.
    Args:
        weights_path: Path to the .p file containing the network's weights.
    Raises:
        MissingWeightsFile: The file path provided does not exist.
    """
    # Check if file is present
    if not os.path.isfile(weights_path):
        raise MissingWeightsFile

    logger.debug('Loading naf_components weights from trained naf_components...')
    self.qnetwork_main.load_state_dict(torch.load(weights_path))
    self.qnetwork_target.load_state_dict(torch.load(weights_path))
    logger.info('Loaded pre-trained weights for the NN')
def learn(self, experiences: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) ‑> None

Calculate the Q-Function estimate from the main neural network, the Target value from the target neural network, and calculate the loss with both values, all by feeding the received batch of experience tuples to both networks. After loss is calculated, backpropagation is performed on the main network from the given loss, so that the weights of the main network are updated.

Args

experiences
Tuple of five elements, where each element is a torch.Tensor of length {batch_size}.
Expand source code
def learn(self, experiences: Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]) -> None:
    """
    Calculate the Q-Function estimate from the main neural network, the Target value from
    the target neural network, and calculate the loss with both values, all by feeding the received
    batch of experience tuples to both networks. After loss is calculated, backpropagation is performed on the
    main network from the given loss, so that the weights of the main network are updated.
    Args:
        experiences: Tuple of five elements, where each element is a torch.Tensor of length {batch_size}.
    """
    # Set gradients of all optimized torch Tensors to zero
    self.optimizer.zero_grad()
    states, actions, rewards, next_states, dones = experiences

    # Get the Value Function for the next state from target naf_components (no_grad() disables gradient calculation)
    with torch.no_grad():
        _, _, V_ = self.qnetwork_target(next_states)

    # Compute the target Value Functions for the given experiences.
    # The target value is calculated as target_val = r + gamma * V(s')
    target_values = rewards + (self.gamma * V_)

    # Compute the expected Value Function from main network
    _, q_estimate, _ = self.qnetwork_main(states, actions)

    # Compute loss between target value and expected Q value
    loss = F.mse_loss(q_estimate, target_values)

    # Perform backpropagation for minimizing loss
    loss.backward()
    clip_grad_norm_(self.qnetwork_main.parameters(), 1)
    self.optimizer.step()

    # Update the target network softly with the local one
    self.soft_update(self.qnetwork_main, self.qnetwork_target)

    # return loss.detach().cpu().numpy()
def run(self, frames: int = 1000, episodes: int = 1000, verbose: bool = True) ‑> Dict[int, Tuple[float, int]]

Execute training flow of the NAF algorithm on the given environment.

Args

frames
Number of maximum frames or timesteps per episode.
episodes
Number of episodes required to terminate the training.
verbose
Boolean indicating whether many or few logs are shown.

Returns

Returns the score history generated along the training.

Expand source code
def run(self, frames: int = 1000, episodes: int = 1000, verbose: bool = True) -> Dict[int, Tuple[float, int]]:
    """
    Execute training flow of the NAF algorithm on the given environment.
    Args:
        frames: Number of maximum frames or timesteps per episode.
        episodes: Number of episodes required to terminate the training.
        verbose: Boolean indicating whether many or few logs are shown.
    Returns:
        Returns the score history generated along the training.
    """
    logger.info('Training started')
    # Initialize 'scores' dictionary to store rewards and timesteps executed for each episode
    scores = {episode: (0, 0) for episode in range(1, episodes + 1)}

    # Iterate through every episode
    for episode in range(episodes):
        logger.info(f'Running Episode {episode + 1}')
        start = time.time()  # Timer to measure execution time per episode
        state = self.environment.reset(verbose)
        score, mean = 0, list()

        for frame in range(1, frames + 1):
            if verbose: logger.info(f'Running frame {frame} in episode {episode + 1}')

            # Pick action according to current state
            if verbose: logger.info(f'Current State: {state}')
            action = self.act(state)
            if verbose: logger.info(f'Action chosen for the given state is: {action}')

            # Perform action on environment and get new state and reward
            next_state, reward, done = self.environment.step(action)

            # Save the experience in the ReplayBuffer, and learn from previous experiences if applicable
            self.step(state, action, reward, next_state, done)

            state = next_state  # Update state to next state
            score += reward
            mean.append(reward)

            if verbose: logger.info(f'Reward: {reward}  -  Cumulative reward: {score}\n')

            if done:
                break

        # Updates scores history
        scores[episode + 1] = (score, frame)  # save most recent score and last frame
        logger.info(f'Reward:                             {score}')
        logger.info(f'Number of frames:                   {frame}')
        logger.info(f'Mean of rewards on this episode:    {sum(mean) / frames}')
        logger.info(f'Time taken for this episode:        {round(time.time() - start, 3)} secs\n')

        # Save the episode's performance if it is a checkpoint episode
        if (episode + 1) % self.checkpoint_frequency == 0:
            # Create parent directory for current episode
            os.makedirs(f'checkpoints/{episode + 1}/', exist_ok=True)
            # Save naf_components weights
            torch.save(self.qnetwork_main.state_dict(), f'checkpoints/{episode + 1}/weights.p')
            # Save naf_components's performance metrics
            with open(f'checkpoints/{episode + 1}/scores.txt', 'w') as f:
                f.write(json.dumps(scores))

    torch.save(self.qnetwork_main.state_dict(), self.MODEL_PATH)
    logger.info(f'Model has been successfully saved in {self.MODEL_PATH}')

    return scores
def soft_update(self, main_nn: NAF, target_nn: NAF) ‑> None

Soft update naf_components parameters following this formula:

        θ_target = τ*θ_local + (1 - τ)*θ_target

Args

main_nn
Main torch neural network.
target_nn
Target torch neural network.
Expand source code
def soft_update(self, main_nn: NAF, target_nn: NAF) -> None:
    """
    Soft update naf_components parameters following this formula:\n
                θ_target = τ*θ_local + (1 - τ)*θ_target
    Args:
        main_nn: Main torch neural network.
        target_nn: Target torch neural network.
    """
    for target_param, main_param in zip(target_nn.parameters(), main_nn.parameters()):
        target_param.data.copy_(self.tau * main_param.data + (1. - self.tau) * target_param.data)
def step(self, state: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], action: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], reward: float, next_state: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], done: int) ‑> None

Stores in the ReplayBuffer the new experience composed by the parameters received, and learns only if the Buffer contains enough experiences to fill a batch. The learning will occur if the update frequency {update_freq} is reached, in which case it will learn {num_updates} times.

Args

state
Current state.
action
Action performed from state {state}.
reward
Reward obtained after performing action {action} from state {state}.
next_state
New state reached after performing action {action} from state {state}.
done
Integer (0 or 1) indicating whether a terminal state have been reached.
Expand source code
def step(self, state: NDArray, action: NDArray, reward: float, next_state: NDArray, done: int) -> None:
    """
    Stores in the ReplayBuffer the new experience composed by the parameters received,
    and learns only if the Buffer contains enough experiences to fill a batch. The
    learning will occur if the update frequency {update_freq} is reached, in which case it
    will learn {num_updates} times.
    Args:
        state: Current state.
        action: Action performed from state {state}.
        reward: Reward obtained after performing action {action} from state {state}.
        next_state: New state reached after performing action {action} from state {state}.
        done: Integer (0 or 1) indicating whether a terminal state have been reached.
    """

    # Save experience in replay memory
    self.memory.add(state, action, reward, next_state, done)

    # Learning will be performed every {update_freq}} time-steps.
    self.update_t_step = (self.update_t_step + 1) % self.update_freq  # Update time step counter
    if self.update_t_step == 0:
        # If enough samples are available in memory, get random subset and learn
        if len(self.memory) > self.batch_size:
            for _ in range(self.num_updates):
                # Pick random batch of experiences from memory
                experiences = self.memory.sample()

                # Learn from experiences and get loss
                self.learn(experiences)