深度确定性策略梯度 (DDPG)

深度确定性策略梯度 (DDPG)

作者: amifunny
创建日期 2020/06/04
上次修改 2024/03/23
描述: 在倒立摆问题上实现 DDPG 算法。

ⓘ 此示例使用 Keras 3

深度确定性策略梯度 (DDPG) 是一种用于学习连续动作的无模型离策略算法。

它结合了 DPG(确定性策略梯度)和 DQN(深度 Q 网络)的思想。 它使用了来自 DQN 的经验回放和慢速学习目标网络,并且基于可以处理连续动作空间的 DPG。

本教程紧密遵循这篇论文 - 使用深度强化学习进行连续控制


我们正在尝试解决经典的倒立摆控制问题。 在这种设置中,我们只能采取两个动作:向左摆动或向右摆动。

对于 Q 学习算法来说,这个问题的挑战在于动作是连续的而不是离散的。 也就是说,我们必须从 -2+2 的无限动作范围中选择,而不是使用像 -1+1 这样的两个离散动作。


就像 Actor-Critic 方法一样,我们有两个网络

  1. Actor - 它在给定状态下提出一个动作。
  2. Critic - 它预测在给定状态和动作的情况下,该动作是好(正值)还是坏(负值)。

DDPG 使用了原始 DQN 中不存在的另外两种技术


为什么? 因为它可以增加训练的稳定性。 简而言之,我们正在从估计的目标中学习,而目标网络更新缓慢,因此可以保持我们估计的目标稳定。

从概念上讲,这就像说,“我有一个关于如何玩好这个游戏的想法,我将尝试一下,直到我找到更好的方法”,而不是说“我将在每次移动后重新学习如何玩这个整个游戏”。 查看这个 StackOverflow 答案


我们存储元组 (state, action, reward, next_state) 的列表,而不是仅从最近的经验中学习,而是从迄今为止积累的所有经验中进行采样来学习。


import os

os.environ["KERAS_BACKEND"] = "tensorflow"

import keras
from keras import layers

import tensorflow as tf
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt

我们使用 Gymnasium 创建环境。 我们稍后将使用 upper_bound 参数来缩放我们的动作。

# Specify the `render_mode` parameter to show the attempts of the agent in a pop up window.
env = gym.make("Pendulum-v1", render_mode="human")

num_states = env.observation_space.shape[0]
print("Size of State Space ->  {}".format(num_states))
num_actions = env.action_space.shape[0]
print("Size of Action Space ->  {}".format(num_actions))

upper_bound = env.action_space.high[0]
lower_bound = env.action_space.low[0]

print("Max Value of Action ->  {}".format(upper_bound))
print("Min Value of Action ->  {}".format(lower_bound))
Size of State Space ->  3
Size of Action Space ->  1
Max Value of Action ->  2.0
Min Value of Action ->  -2.0

为了实现 Actor 网络更好的探索,我们使用噪声扰动,特别是用于生成噪声的 Ornstein-Uhlenbeck 过程,如论文中所述。 它从相关的正态分布中采样噪声。

class OUActionNoise:
    def __init__(self, mean, std_deviation, theta=0.15, dt=1e-2, x_initial=None):
        self.theta = theta
        self.mean = mean
        self.std_dev = std_deviation
        self.dt = dt
        self.x_initial = x_initial

    def __call__(self):
        # Formula taken from https://www.wikipedia.org/wiki/Ornstein-Uhlenbeck_process
        x = (
            + self.theta * (self.mean - self.x_prev) * self.dt
            + self.std_dev * np.sqrt(self.dt) * np.random.normal(size=self.mean.shape)
        # Store x into x_prev
        # Makes next noise dependent on current one
        self.x_prev = x
        return x

    def reset(self):
        if self.x_initial is not None:
            self.x_prev = self.x_initial
            self.x_prev = np.zeros_like(self.mean)

Buffer 类实现经验回放。


Critic 损失 - y - Q(s, a) 的均方误差,其中 y 是目标网络看到的预期回报,而 Q(s, a) 是 Critic 网络预测的动作值。 y 是 Critic 模型试图实现的移动目标; 我们通过缓慢更新目标模型来使此目标保持稳定。

Actor 损失 - 这是使用 Critic 网络对 Actor 网络采取的动作给出的值的平均值计算的。 我们力求最大化这个量。

因此,我们更新 Actor 网络,使其产生被 Critic 认为在给定状态下获得最大预测值的动作。

class Buffer:
    def __init__(self, buffer_capacity=100000, batch_size=64):
        # Number of "experiences" to store at max
        self.buffer_capacity = buffer_capacity
        # Num of tuples to train on.
        self.batch_size = batch_size

        # Its tells us num of times record() was called.
        self.buffer_counter = 0

        # Instead of list of tuples as the exp.replay concept go
        # We use different np.arrays for each tuple element
        self.state_buffer = np.zeros((self.buffer_capacity, num_states))
        self.action_buffer = np.zeros((self.buffer_capacity, num_actions))
        self.reward_buffer = np.zeros((self.buffer_capacity, 1))
        self.next_state_buffer = np.zeros((self.buffer_capacity, num_states))

    # Takes (s,a,r,s') observation tuple as input
    def record(self, obs_tuple):
        # Set index to zero if buffer_capacity is exceeded,
        # replacing old records
        index = self.buffer_counter % self.buffer_capacity

        self.state_buffer[index] = obs_tuple[0]
        self.action_buffer[index] = obs_tuple[1]
        self.reward_buffer[index] = obs_tuple[2]
        self.next_state_buffer[index] = obs_tuple[3]

        self.buffer_counter += 1

    # Eager execution is turned on by default in TensorFlow 2. Decorating with tf.function allows
    # TensorFlow to build a static graph out of the logic and computations in our function.
    # This provides a large speed up for blocks of code that contain many small TensorFlow operations such as this one.
    def update(
        # Training and updating Actor & Critic networks.
        # See Pseudo Code.
        with tf.GradientTape() as tape:
            target_actions = target_actor(next_state_batch, training=True)
            y = reward_batch + gamma * target_critic(
                [next_state_batch, target_actions], training=True
            critic_value = critic_model([state_batch, action_batch], training=True)
            critic_loss = keras.ops.mean(keras.ops.square(y - critic_value))

        critic_grad = tape.gradient(critic_loss, critic_model.trainable_variables)
            zip(critic_grad, critic_model.trainable_variables)

        with tf.GradientTape() as tape:
            actions = actor_model(state_batch, training=True)
            critic_value = critic_model([state_batch, actions], training=True)
            # Used `-value` as we want to maximize the value given
            # by the critic for our actions
            actor_loss = -keras.ops.mean(critic_value)

        actor_grad = tape.gradient(actor_loss, actor_model.trainable_variables)
            zip(actor_grad, actor_model.trainable_variables)

    # We compute the loss and update parameters
    def learn(self):
        # Get sampling range
        record_range = min(self.buffer_counter, self.buffer_capacity)
        # Randomly sample indices
        batch_indices = np.random.choice(record_range, self.batch_size)

        # Convert to tensors
        state_batch = keras.ops.convert_to_tensor(self.state_buffer[batch_indices])
        action_batch = keras.ops.convert_to_tensor(self.action_buffer[batch_indices])
        reward_batch = keras.ops.convert_to_tensor(self.reward_buffer[batch_indices])
        reward_batch = keras.ops.cast(reward_batch, dtype="float32")
        next_state_batch = keras.ops.convert_to_tensor(

        self.update(state_batch, action_batch, reward_batch, next_state_batch)

# This update target parameters slowly
# Based on rate `tau`, which is much less than one.
def update_target(target, original, tau):
    target_weights = target.get_weights()
    original_weights = original.get_weights()

    for i in range(len(target_weights)):
        target_weights[i] = original_weights[i] * tau + target_weights[i] * (1 - tau)


在这里,我们定义 Actor 和 Critic 网络。 这些是具有 ReLU 激活的基本密集模型。

注意:我们需要将 Actor 最后一层的初始化设置为 -0.0030.003 之间,因为这可以防止我们在初始阶段获得 1-1 的输出值,这会将我们的梯度缩小为零,因为我们使用了 tanh 激活。

def get_actor():
    # Initialize weights between -3e-3 and 3-e3
    last_init = keras.initializers.RandomUniform(minval=-0.003, maxval=0.003)

    inputs = layers.Input(shape=(num_states,))
    out = layers.Dense(256, activation="relu")(inputs)
    out = layers.Dense(256, activation="relu")(out)
    outputs = layers.Dense(1, activation="tanh", kernel_initializer=last_init)(out)

    # Our upper bound is 2.0 for Pendulum.
    outputs = outputs * upper_bound
    model = keras.Model(inputs, outputs)
    return model

def get_critic():
    # State as input
    state_input = layers.Input(shape=(num_states,))
    state_out = layers.Dense(16, activation="relu")(state_input)
    state_out = layers.Dense(32, activation="relu")(state_out)

    # Action as input
    action_input = layers.Input(shape=(num_actions,))
    action_out = layers.Dense(32, activation="relu")(action_input)

    # Both are passed through separate layer before concatenating
    concat = layers.Concatenate()([state_out, action_out])

    out = layers.Dense(256, activation="relu")(concat)
    out = layers.Dense(256, activation="relu")(out)
    outputs = layers.Dense(1)(out)

    # Outputs single value for give state-action
    model = keras.Model([state_input, action_input], outputs)

    return model

policy() 返回从我们的 Actor 网络采样并在探索中添加一些噪声的动作。

def policy(state, noise_object):
    sampled_actions = keras.ops.squeeze(actor_model(state))
    noise = noise_object()
    # Adding noise to action
    sampled_actions = sampled_actions.numpy() + noise

    # We make sure action is within bounds
    legal_action = np.clip(sampled_actions, lower_bound, upper_bound)

    return [np.squeeze(legal_action)]


std_dev = 0.2
ou_noise = OUActionNoise(mean=np.zeros(1), std_deviation=float(std_dev) * np.ones(1))

actor_model = get_actor()
critic_model = get_critic()

target_actor = get_actor()
target_critic = get_critic()

# Making the weights equal initially

# Learning rate for actor-critic models
critic_lr = 0.002
actor_lr = 0.001

critic_optimizer = keras.optimizers.Adam(critic_lr)
actor_optimizer = keras.optimizers.Adam(actor_lr)

total_episodes = 100
# Discount factor for future rewards
gamma = 0.99
# Used to update target networks
tau = 0.005

buffer = Buffer(50000, 64)

现在,我们实现我们的主要训练循环,并迭代各个情节。 我们使用 policy() 采样动作,并在每个时间步长使用 learn() 进行训练,同时以速率 tau 更新目标网络。

# To store reward history of each episode
ep_reward_list = []
# To store average reward history of last few episodes
avg_reward_list = []

# Takes about 4 min to train
for ep in range(total_episodes):
    prev_state, _ = env.reset()
    episodic_reward = 0

    while True:
        tf_prev_state = keras.ops.expand_dims(
            keras.ops.convert_to_tensor(prev_state), 0

        action = policy(tf_prev_state, ou_noise)
        # Receive state and reward from environment.
        state, reward, done, truncated, _ = env.step(action)

        buffer.record((prev_state, action, reward, state))
        episodic_reward += reward


        update_target(target_actor, actor_model, tau)
        update_target(target_critic, critic_model, tau)

        # End this episode when `done` or `truncated` is True
        if done or truncated:

        prev_state = state


    # Mean of last 40 episodes
    avg_reward = np.mean(ep_reward_list[-40:])
    print("Episode * {} * Avg Reward is ==> {}".format(ep, avg_reward))

# Plotting graph
# Episodes versus Avg. Rewards
plt.ylabel("Avg. Episodic Reward")
您可以随意尝试不同的学习率、tau 值以及 Actor 和 Critic 网络的架构。

倒立摆问题的复杂度较低,但 DDPG 在许多其他问题上也表现出色。

另一个可以尝试的绝佳环境是连续的 LunarLander-v2,但这需要更多情节才能获得良好的结果。

# Save the weights




100 个情节后
