300|0

457

帖子

0

TA的资源

版主

楼主
 

DQN 玩 2048 实战|第三期!优化网络,使用GPU、Env奖励优化 [复制链接]

 

邀请:@wsdymg   @bigbat   @okhxyyo   @汤权   参与回复

 

 

视频讲解:


 

1. 仅考虑局部合并奖励:目前的奖励只设置为合并方块时获得的分数,只关注了每一步的即时合并收益,而没有对最终达成 2048 这个目标给予额外的激励,如果没有对达成 2048 给予足够的奖励信号,Agent 可能不会将其作为一个重要的目标

2. 训练硬件资源利用不高,没有使用GPU进行加速,默认为CPU,较慢

代码修改如下:

step函数里面,输入维度增加max_tile最大的数是多少

  • if 2048 in self.board:
  • reward += 10000
  • done = True
  • state = self.board.flatten()
  • max_tile = np.max(self.board)
  • state = np.append(state, max_tile)
  • return state, reward, done

检查系统中是否有可用的 GPU,如果有则使用 GPU 进行计算,否则使用 CPU。

  • device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

在 train ,创建模型实例后,使用 .to(device) 将模型移动到指定的设备(GPU 或 CPU)

  • model = DQN(input_size, output_size).to(device)
  • target_model = DQN(input_size, output_size).to(device)

在训练和推理过程中,将输入数据(状态、动作、奖励等)也移动到指定的设备上。

  • state = torch.FloatTensor(state).unsqueeze(0).to(device)
  • next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
  • states = torch.FloatTensor(states).to(device)
  • actions = torch.LongTensor(actions).to(device)
  • rewards = torch.FloatTensor(rewards).to(device)
  • next_states = torch.FloatTensor(next_states).to(device)
  • dones = torch.FloatTensor(dones).to(device)

将 state 和 next_state 先使用 .cpu() 方法移动到 CPU 上,再使用 .numpy() 方法转换为 NumPy 数组

  • replay_buffer.add(state.cpu().squeeze(0).numpy(), action, reward, next_state.cpu().squeeze(0).numpy(), done)

这个不改的话,会出现 TypeError: can't convert cuda:0 device type tensor to numpy 错误

完整代码如下:

  • import numpy as np
  • import torch
  • import torch.nn as nn
  • import torch.optim as optim
  • import random
  • from collections import deque
  • import matplotlib.pyplot as plt
  • import matplotlib.colors as mcolors
  • from matplotlib.table import Table
  • device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  • # 2048 游戏环境类
  • class Game2048:
  • def __init__(self):
  • self.board = np.zeros((4, 4), dtype=int)
  • self.add_random_tile()
  • self.add_random_tile()
  • def add_random_tile(self):
  • empty_cells = np.argwhere(self.board == 0)
  • if len(empty_cells) > 0:
  • index = random.choice(empty_cells)
  • self.board[index[0], index[1]] = 2 if random.random() < 0.9 else 4
  • def move_left(self):
  • reward = 0
  • new_board = np.copy(self.board)
  • for row in range(4):
  • line = new_board[row]
  • non_zero = line[line != 0]
  • merged = []
  • i = 0
  • while i < len(non_zero):
  • if i + 1 < len(non_zero) and non_zero == non_zero[i + 1]:
  • merged.append(2 * non_zero)
  • reward += 2 * non_zero
  • i += 2
  • else:
  • merged.append(non_zero)
  • i += 1
  • new_board[row] = np.pad(merged, (0, 4 - len(merged)), 'constant')
  • if not np.array_equal(new_board, self.board):
  • self.board = new_board
  • self.add_random_tile()
  • return reward
  • def move_right(self):
  • self.board = np.fliplr(self.board)
  • reward = self.move_left()
  • self.board = np.fliplr(self.board)
  • return reward
  • def move_up(self):
  • self.board = self.board.T
  • reward = self.move_left()
  • self.board = self.board.T
  • return reward
  • def move_down(self):
  • self.board = self.board.T
  • reward = self.move_right()
  • self.board = self.board.T
  • return reward
  • def step(self, action):
  • if action == 0:
  • reward = self.move_left()
  • elif action == 1:
  • reward = self.move_right()
  • elif action == 2:
  • reward = self.move_up()
  • elif action == 3:
  • reward = self.move_down()
  • done = not np.any(self.board == 0) and all([
  • np.all(self.board[:, i] != self.board[:, i + 1]) for i in range(3)
  • ]) and all([
  • np.all(self.board[i, :] != self.board[i + 1, :]) for i in range(3)
  • ])
  • if 2048 in self.board:
  • reward += 10000
  • done = True
  • state = self.board.flatten()
  • max_tile = np.max(self.board)
  • state = np.append(state, max_tile)
  • return state, reward, done
  • def reset(self):
  • self.board = np.zeros((4, 4), dtype=int)
  • self.add_random_tile()
  • self.add_random_tile()
  • state = self.board.flatten()
  • max_tile = np.max(self.board)
  • state = np.append(state, max_tile)
  • return state
  • # 深度 Q 网络类
  • class DQN(nn.Module):
  • def __init__(self, input_size, output_size):
  • super(DQN, self).__init__()
  • self.fc1 = nn.Linear(input_size, 128)
  • self.fc2 = nn.Linear(128, 128)
  • self.fc3 = nn.Linear(128, output_size)
  • def forward(self, x):
  • x = torch.relu(self.fc1(x))
  • x = torch.relu(self.fc2(x))
  • return self.fc3(x)
  • # 经验回放缓冲区类
  • class ReplayBuffer:
  • def __init__(self, capacity):
  • self.buffer = deque(maxlen=capacity)
  • def add(self, state, action, reward, next_state, done):
  • self.buffer.append((state, action, reward, next_state, done))
  • def sample(self, batch_size):
  • batch = random.sample(self.buffer, batch_size)
  • states, actions, rewards, next_states, dones = zip(*batch)
  • return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)
  • def __len__(self):
  • return len(self.buffer)
  • # 可视化函数
  • def visualize_board(board, ax):
  • ax.clear()
  • table = Table(ax, bbox=[0, 0, 1, 1])
  • nrows, ncols = board.shape
  • width, height = 1.0 / ncols, 1.0 / nrows
  • # 定义颜色映射
  • cmap = mcolors.LinearSegmentedColormap.from_list("", ["white", "yellow", "orange", "red"])
  • for (i, j), val in np.ndenumerate(board):
  • color = cmap(np.log2(val + 1) / np.log2(2048 + 1)) if val > 0 else "white"
  • table.add_cell(i, j, width, height, text=val if val > 0 else "",
  • loc='center', facecolor=color)
  • ax.add_table(table)
  • ax.set_axis_off()
  • plt.draw()
  • plt.pause(0.1)
  • # 训练函数
  • def train():
  • env = Game2048()
  • input_size = 17
  • output_size = 4
  • model = DQN(input_size, output_size).to(device)
  • target_model = DQN(input_size, output_size).to(device)
  • target_model.load_state_dict(model.state_dict())
  • target_model.eval()
  • optimizer = optim.Adam(model.parameters(), lr=0.001)
  • criterion = nn.MSELoss()
  • replay_buffer = ReplayBuffer(capacity=10000)
  • batch_size = 32
  • gamma = 0.99
  • epsilon = 1.0
  • epsilon_decay = 0.995
  • epsilon_min = 0.01
  • update_target_freq = 10
  • num_episodes = 1000
  • fig, ax = plt.subplots()
  • for episode in range(num_episodes):
  • state = env.reset()
  • state = torch.FloatTensor(state).unsqueeze(0).to(device)
  • done = False
  • total_reward = 0
  • while not done:
  • visualize_board(env.board, ax)
  • if random.random() < epsilon:
  • action = random.randint(0, output_size - 1)
  • else:
  • q_values = model(state)
  • action = torch.argmax(q_values, dim=1).item()
  • next_state, reward, done = env.step(action)
  • next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
  • replay_buffer.add(state.cpu().squeeze(0).numpy(), action, reward, next_state.cpu().squeeze(0).numpy(), done)
  • if len(replay_buffer) >= batch_size:
  • states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
  • states = torch.FloatTensor(states).to(device)
  • actions = torch.LongTensor(actions).to(device)
  • rewards = torch.FloatTensor(rewards).to(device)
  • next_states = torch.FloatTensor(next_states).to(device)
  • dones = torch.FloatTensor(dones).to(device)
  • q_values = model(states)
  • q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
  • next_q_values = target_model(next_states)
  • next_q_values = next_q_values.max(1)[0]
  • target_q_values = rewards + gamma * (1 - dones) * next_q_values
  • loss = criterion(q_values, target_q_values)
  • optimizer.zero_grad()
  • loss.backward()
  • optimizer.step()
  • state = next_state
  • total_reward += reward
  • if episode % update_target_freq == 0:
  • target_model.load_state_dict(model.state_dict())
  • epsilon = max(epsilon * epsilon_decay, epsilon_min)
  • print(f"Episode {episode}: Total Reward = {total_reward}, Epsilon = {epsilon}")
  • plt.close()
  • if __name__ == "__main__":
  • train()

 

 

 

查看本帖全部内容,请登录或者注册
点赞(1) 关注
 
 

回复
举报
您需要登录后才可以回帖 登录 | 注册

随便看看
查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/10 下一条
【干货上新】电源解决方案和技术第二趴 | DigiKey 应用探索站
当月好物、电源技术资源、特色活动、DigiKey在线实用工具,干货多多~

查看 »

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

 
机器人开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网 3

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2025 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表