学习前的铺垫(点击直达): Q learning算法 TF神经网络 神经网络 本文设计的源代码,点击直达
DQN可以理解为是对Q learning算法升级而来的,升级部分有: 1.增加记忆库 2.添加神经网络 3.暂时冻结q_target参数(切断相关性) 之前的算法是直接记录每个状态,但现实情况是,在复杂的情况下,你没有那么大的内存来存储这些状态,也没有那么多时间来寻找或计算他们,因此,之前的Q表被添加至神经网络中,通过不断训练来更改里面的参数,这样之后只需要对神经网络输入当前状态,就可以得到每种行为的Q值,这就是DQN的算法思想!
首先需要两个神经网络,一个是计算估计值的神经网络(下面简述为估计神经网),也就是最新的Q值(即Q估计),该神经网络的参数是一直更新的。另一个神经网络是计算现实值的(下面简述为现实神经网),也就是现实情况下的Q值(即Q现实)。 补充一下: 现实神经网不是说直接可以计算Q现实的,而是说,在计算Q现实时,我们会用到下一状态的最优行为的Q值,即下图中的maxQ(St+1,a;θt^-)。这时计算该Q值时使用现实神经网。而实际的Q现实,还是按照下面的公式计算!
为了切断数据之间的相关性,也可以让神经网络能更好的被训练,首先为为数据增加记忆库,训练时从库中随机抽取部分数据假如网络中,同时,每次训练时只更新估计神经网,暂时不更新现实神经网,那么计算Q估计和Q现实时就会有一个时间差,就切断了数据之间的相关性。 那么什么时候更新现实神经网呢?实际上现实神经网不用训练,而是隔一段时间后直接把训练好的估计神经网的参数直接复制到现实神经网上(这两个神经网络的结构是一模一样的)
源代码上面链接已经给出了,我这里只给RL_brain的代码和run_this的代码,附带自己的理解注释: 1.RL_brain:
import numpy as np import pandas as pd import tensorflow as tf np.random.seed(1) tf.set_random_seed(1) # Deep Q Network off-policy class DeepQNetwork: def __init__( self, # 即上下左右4个行为 n_actions, #这是状态的标志,可以直接理解为小方块左上角的坐标(横纵坐标,2个数),实际代码中是(该坐标/4个方块的面积) n_features, #和Q learning一样 learning_rate=0.01, reward_decay=0.9, e_greedy=0.9, #每隔这么多步更新现实神经网 replace_target_iter=300, #记忆库大小 memory_size=200, #每次从记忆库中抽取数据的数量 batch_size=32, #e_greedy这个值是否增加,其实神经网络训练的越久,就可以不随机选择行为了,直接选择Q值最大的行为 e_greedy_increment=None, #如果为True,就可以使用tensorboard查看graph结构 output_graph=False, ): self.n_actions = n_actions self.n_features = n_features self.lr = learning_rate self.gamma = reward_decay self.epsilon_max = e_greedy self.replace_target_iter = replace_target_iter self.memory_size = memory_size self.batch_size = batch_size self.epsilon_increment = e_greedy_increment self.epsilon = 0 if e_greedy_increment is not None else self.epsilon_max # 记录学习的步数 self.learn_step_counter = 0 # 创建记忆库,每一行的内容是:当前状态的横纵坐标(2个)、选择的行为、该行为得到的奖励、下一个状态的横纵坐标(2个) self.memory = np.zeros((self.memory_size, n_features * 2 + 2)) # 创建两个神经网络 self._build_net() # 用于存储两个神经网络参数的集合 t_params = tf.get_collection('target_net_params') e_params = tf.get_collection('eval_net_params') # 将估计神经网的参数复制到显示神经网中,即更新现实神经网 self.replace_target_op = [tf.assign(t, e) for t, e in zip(t_params, e_params)] self.sess = tf.Session() # tensorboard文件 if output_graph: # $ tensorboard --logdir=logs # tf.train.SummaryWriter soon be deprecated, use following tf.summary.FileWriter("logs/", self.sess.graph) # 初始化 self.sess.run(tf.global_variables_initializer()) # 记录误差的集合 self.cost_his = [] def _build_net(self): # ------------------ 创建估计神经网------------------ self.s = tf.placeholder(tf.float32, [None, self.n_features], name='s') # input self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_target') # for calculating loss # 正向传递,fp with tf.variable_scope('eval_net'): # c_names(collections_names) are the collections to store variables c_names, n_l1, w_initializer, b_initializer = \ ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES], 10, \ tf.random_normal_initializer(0., 0.3), tf.constant_initializer(0.1) # config of layers # first layer. collections is used later when assign to target net with tf.variable_scope('l1'): w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names) b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names) l1 = tf.nn.relu(tf.matmul(self.s, w1) + b1) # second layer. collections is used later when assign to target net with tf.variable_scope('l2'): w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names) b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names) self.q_eval = tf.matmul(l1, w2) + b2 # 反向传递,bp,计算损失值,优化 with tf.variable_scope('loss'): self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval)) with tf.variable_scope('train'): self._train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss) # ------------------ 创建现实神经网 ------------------ self.s_ = tf.placeholder(tf.float32, [None, self.n_features], name='s_') # input with tf.variable_scope('target_net'): # c_names(collections_names) are the collections to store variables c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES] # first layer. collections is used later when assign to target net with tf.variable_scope('l1'): w1 = tf.get_variable('w1', [self.n_features, n_l1], initializer=w_initializer, collections=c_names) b1 = tf.get_variable('b1', [1, n_l1], initializer=b_initializer, collections=c_names) l1 = tf.nn.relu(tf.matmul(self.s_, w1) + b1) # second layer. collections is used later when assign to target net with tf.variable_scope('l2'): w2 = tf.get_variable('w2', [n_l1, self.n_actions], initializer=w_initializer, collections=c_names) b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collections=c_names) self.q_next = tf.matmul(l1, w2) + b2 #现实神经网不进行训练,没有bp过程 # 往记忆库中添加记录 def store_transition(self, s, a, r, s_): if not hasattr(self, 'memory_counter'): self.memory_counter = 0 transition = np.hstack((s, [a, r], s_)) # 记忆库大小有限,当满了后,会替换掉之前旧的数据 index = self.memory_counter % self.memory_size self.memory[index, :] = transition self.memory_counter += 1 # 选择行为 def choose_action(self, observation): # to have batch dimension when feed into tf placeholder observation = observation[np.newaxis, :] # 训练越久,epsilon逐渐增大,那么就越不会随机选择行为 if np.random.uniform() < self.epsilon: # forward feed the observation and get q value for every actions actions_value = self.sess.run(self.q_eval, feed_dict={self.s: observation}) action = np.argmax(actions_value) else: action = np.random.randint(0, self.n_actions) return action # 学习,更新估计神经网参数 def learn(self): # 检查是否要更新现实神经网的参数 if self.learn_step_counter % self.replace_target_iter == 0: self.sess.run(self.replace_target_op) print('\ntarget_params_replaced\n') # 抽取部分数据 if self.memory_counter > self.memory_size: sample_index = np.random.choice(self.memory_size, size=self.batch_size) else: sample_index = np.random.choice(self.memory_counter, size=self.batch_size) batch_memory = self.memory[sample_index, :] # 计算所有选出来的状态的Q估计和Q现实 q_next, q_eval = self.sess.run( [self.q_next, self.q_eval], feed_dict={ self.s_: batch_memory[:, -self.n_features:], # fixed params self.s: batch_memory[:, :self.n_features], # newest params }) # 下面是计算误差的方法,我会单独提出来说,这里不做描述 q_target = q_eval.copy() batch_index = np.arange(self.batch_size, dtype=np.int32) eval_act_index = batch_memory[:, self.n_features].astype(int) reward = batch_memory[:, self.n_features + 1] q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1) # 计算Q现实和Q估计后,bp,训练估计神经网 _, self.cost = self.sess.run([self._train_op, self.loss], feed_dict={self.s: batch_memory[:, :self.n_features], self.q_target: q_target}) # 添加损失值 self.cost_his.append(self.cost) # 逐渐增加epsilon self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max self.learn_step_counter += 1 def plot_cost(self): import matplotlib.pyplot as plt plt.plot(np.arange(len(self.cost_his)), self.cost_his) plt.ylabel('Cost') plt.xlabel('training steps') plt.show()2.run_this:
from maze_env import Maze from RL_brain import DeepQNetwork def run_maze(): step = 0 for episode in range(300): # 获取初始状态 observation = env.reset() while True: # fresh env env.render() # RL choose action based on observation action = RL.choose_action(observation) # RL take action and get next observation and reward observation_, reward, done = env.step(action) RL.store_transition(observation, action, reward, observation_) # 即200条数据前不训练,之后隔5条数据训练一次,即往记忆库中添加5条数据,再随机抽取数据训练 if (step > 200) and (step % 5 == 0): RL.learn() # swap observation observation = observation_ # break while loop when end of this episode if done: break step += 1 # end of game print('game over') env.destroy() if __name__ == "__main__": # maze game env = Maze() RL = DeepQNetwork(env.n_actions, env.n_features, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9, replace_target_iter=200, memory_size=2000, # output_graph=True ) env.after(100, run_maze) env.mainloop() RL.plot_cost()重点!!! 首次补充代码中没说完的地方,即计算Q现实和Q估计的误差。代码如下:
# 下面是计算误差的方法,我会单独提出来说,这里不做描述 q_target = q_eval.copy() #获取数据的行索引 batch_index = np.arange(self.batch_size, dtype=np.int32) #获取每个状态所选的行为 eval_act_index = batch_memory[:, self.n_features].astype(int) #获取每个状态所选行为后的奖励 reward = batch_memory[:, self.n_features + 1] #更新q_target,将现实q值存入对应位置,即[batch_index, eval_act_index]位置 q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)这之前我们获得了两个数据q_next, q_eval,他们分别是对应状态下各个行为的Q值。 我们需要的误差,是我们通过选择行为后而来的,也就是说我们只需要一个行为的Q值,其他的并不需要,所以我们将其他的 action 值全变成 0, 将用到的 action 误差值 反向传递回去, 作为更新凭据。 同Q learning一样,Q估计是选择行为的Q值,Q现实是下一个状态的最优行为的Q值。举个例子: 假如q_eval为【0.3,0.4,0.2,0.1】,q_next为【0.5,0.2,0.1,0.2】,对于q_eval,假如选择的是1(即0.4),而q_next肯定是0(0.5最大),我们希望的结果是q_next-q_eval = 【0,0.1,0,0】(假设误差为0.1),也就是只要1位置上的误差,其他位置上为0.上面代码就是做这个工作的,首先用一个新变量q_target = q_eval ,那么q_target - q_eval后所有位置全为0,再相减之前,需要把<通过q_next中最大的Q值计算的现实Q值>放入q_target ,存入位置和q_eval中选择的行为一致,这个位置(即选择的行为)在记忆库中可以找到。 所以q_target = q_eval =【0.3,0.4,0.2,0.1】,通过记忆库找到位置1,计算现实值r+γ*Qmax,假设为0.5,那么q_target 为【0.3,0.5,0.2,0.1】,所以q_target - q_eval=【0,0.1,0,0】 现在再返回去看注释,相信你不迷惑了!
下面是实际代码中的数据:
这是第一条数据的q_next,最大的是0.18115267,后面会用:
这是第一条数据的q_eval: 这是记忆库中第一条数据: 这是行索引坐标,这里我们只看第一个,即0 这里每次选择的行为,这里我们只看第一个,即1 这是选择行为后的奖励数据,还是只看第一个,即0(R) 通过计算,Q现实 = R + γQmax = 0 + 0.90.17923227 = 0.16303374,将这个数放入q_target 中的位置1(行为1),结果去下: 最后再说一下这个代码的大致运行过程: 1.初试状态,选择行为,得到分数和下一状态,保存记忆库,如果记忆库中没有200条数据,跳入下一状态继续重复步骤1. 2.如果记忆库中有200+数据,那么学习一次,之后每存5条数据到记忆库就学习一次。反复执行 3.在学习过程中,Q现实和Q估计是两个神经网计算出来的,那么,每学习200次后更新一次现实神经网!
完整算法思路讲解,请点击