Module robotic_manipulator_rloa.utils.replay_buffer
Expand source code
import random
from collections import deque, namedtuple
from typing import Tuple
import numpy as np
import torch
from numpy.typing import NDArray
# deque is a Doubly Ended Queue, provides O(1) complexity for pop and append actions
# namedtuple is a tuple that can be accessed by both its index and attributes
class ReplayBuffer:
def __init__(self, buffer_size: int, batch_size: int, device: torch.device, seed: int):
"""
Buffer to store experience tuples. Each experience has the following structure:
(state, action, reward, next_state, done)
Args:
buffer_size: Maximum size for the buffer. Higher buffer size imply higher RAM consumption.
batch_size: Number of experiences to be retrieved from the ReplayBuffer per batch.
device: CUDA device.
seed: Random seed.
"""
self.device = device
self.memory = deque(maxlen=buffer_size)
self.batch_size = batch_size
self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"])
random.seed(seed)
def add(self, state: NDArray, action: NDArray, reward: float, next_state: NDArray, done: int) -> None:
"""
Add a new experience to the Replay Buffer.
Args:
state: NDArray of the current state.
action: NDArray of the action taken from state {state}.
reward: Reward obtained after performing action {action} from state {state}.
next_state: NDArray of the state reached after performing action {action} from state {state}.
done: Integer (0 or 1) indicating whether the next_state is a terminal state.
"""
# Create namedtuple object from the experience
exp = self.experience(state, action, reward, next_state, done)
# Add the experience object to memory
self.memory.append(exp)
def sample(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
"""
Randomly sample a batch of experiences from memory.
Returns:
Tuple of 5 elements, which are (states, actions, rewards, next_states, dones). Each element
in the tuple is a torch Tensor composed of {batch_size} items.
"""
# Randomly sample a batch of experiences
experiences = random.sample(self.memory, k=self.batch_size)
states = torch.from_numpy(
np.stack([e.state if not isinstance(e.state, tuple) else e.state[0] for e in experiences])).float().to(
self.device)
actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(self.device)
rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(self.device)
next_states = torch.from_numpy(np.stack([e.next_state for e in experiences if e is not None])).float().to(
self.device)
dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to(
self.device)
return states, actions, rewards, next_states, dones
def __len__(self) -> int:
"""
Return the current size of the Replay Buffer
Returns:
Size of Replay Buffer
"""
return len(self.memory)
Classes
class ReplayBuffer (buffer_size: int, batch_size: int, device: torch.device, seed: int)
-
Buffer to store experience tuples. Each experience has the following structure: (state, action, reward, next_state, done)
Args
buffer_size
- Maximum size for the buffer. Higher buffer size imply higher RAM consumption.
batch_size
- Number of experiences to be retrieved from the ReplayBuffer per batch.
device
- CUDA device.
seed
- Random seed.
Expand source code
class ReplayBuffer: def __init__(self, buffer_size: int, batch_size: int, device: torch.device, seed: int): """ Buffer to store experience tuples. Each experience has the following structure: (state, action, reward, next_state, done) Args: buffer_size: Maximum size for the buffer. Higher buffer size imply higher RAM consumption. batch_size: Number of experiences to be retrieved from the ReplayBuffer per batch. device: CUDA device. seed: Random seed. """ self.device = device self.memory = deque(maxlen=buffer_size) self.batch_size = batch_size self.experience = namedtuple("Experience", field_names=["state", "action", "reward", "next_state", "done"]) random.seed(seed) def add(self, state: NDArray, action: NDArray, reward: float, next_state: NDArray, done: int) -> None: """ Add a new experience to the Replay Buffer. Args: state: NDArray of the current state. action: NDArray of the action taken from state {state}. reward: Reward obtained after performing action {action} from state {state}. next_state: NDArray of the state reached after performing action {action} from state {state}. done: Integer (0 or 1) indicating whether the next_state is a terminal state. """ # Create namedtuple object from the experience exp = self.experience(state, action, reward, next_state, done) # Add the experience object to memory self.memory.append(exp) def sample(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """ Randomly sample a batch of experiences from memory. Returns: Tuple of 5 elements, which are (states, actions, rewards, next_states, dones). Each element in the tuple is a torch Tensor composed of {batch_size} items. """ # Randomly sample a batch of experiences experiences = random.sample(self.memory, k=self.batch_size) states = torch.from_numpy( np.stack([e.state if not isinstance(e.state, tuple) else e.state[0] for e in experiences])).float().to( self.device) actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(self.device) rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(self.device) next_states = torch.from_numpy(np.stack([e.next_state for e in experiences if e is not None])).float().to( self.device) dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to( self.device) return states, actions, rewards, next_states, dones def __len__(self) -> int: """ Return the current size of the Replay Buffer Returns: Size of Replay Buffer """ return len(self.memory)
Methods
def add(self, state: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], action: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], reward: float, next_state: numpy.ndarray[typing.Any, numpy.dtype[+ScalarType]], done: int) ‑> None
-
Add a new experience to the Replay Buffer.
Args
state
- NDArray of the current state.
action
- NDArray of the action taken from state {state}.
reward
- Reward obtained after performing action {action} from state {state}.
next_state
- NDArray of the state reached after performing action {action} from state {state}.
done
- Integer (0 or 1) indicating whether the next_state is a terminal state.
Expand source code
def add(self, state: NDArray, action: NDArray, reward: float, next_state: NDArray, done: int) -> None: """ Add a new experience to the Replay Buffer. Args: state: NDArray of the current state. action: NDArray of the action taken from state {state}. reward: Reward obtained after performing action {action} from state {state}. next_state: NDArray of the state reached after performing action {action} from state {state}. done: Integer (0 or 1) indicating whether the next_state is a terminal state. """ # Create namedtuple object from the experience exp = self.experience(state, action, reward, next_state, done) # Add the experience object to memory self.memory.append(exp)
def sample(self) ‑> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]
-
Randomly sample a batch of experiences from memory.
Returns
Tuple of 5 elements, which are (states, actions, rewards, next_states, dones). Each element in the tuple is a torch Tensor composed of {batch_size} items.
Expand source code
def sample(self) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """ Randomly sample a batch of experiences from memory. Returns: Tuple of 5 elements, which are (states, actions, rewards, next_states, dones). Each element in the tuple is a torch Tensor composed of {batch_size} items. """ # Randomly sample a batch of experiences experiences = random.sample(self.memory, k=self.batch_size) states = torch.from_numpy( np.stack([e.state if not isinstance(e.state, tuple) else e.state[0] for e in experiences])).float().to( self.device) actions = torch.from_numpy(np.vstack([e.action for e in experiences if e is not None])).long().to(self.device) rewards = torch.from_numpy(np.vstack([e.reward for e in experiences if e is not None])).float().to(self.device) next_states = torch.from_numpy(np.stack([e.next_state for e in experiences if e is not None])).float().to( self.device) dones = torch.from_numpy(np.vstack([e.done for e in experiences if e is not None]).astype(np.uint8)).float().to( self.device) return states, actions, rewards, next_states, dones