视频讲解:
1. 仅考虑局部合并奖励:目前的奖励只设置为合并方块时获得的分数,只关注了每一步的即时合并收益,而没有对最终达成 2048 这个目标给予额外的激励,如果没有对达成 2048 给予足够的奖励信号,Agent 可能不会将其作为一个重要的目标
2. 训练硬件资源利用不高,没有使用GPU进行加速,默认为CPU,较慢
代码修改如下:
step函数里面,输入维度增加max_tile最大的数是多少
- if 2048 in self.board:
- reward += 10000
- done = True
- state = self.board.flatten()
- max_tile = np.max(self.board)
- state = np.append(state, max_tile)
- return state, reward, done
检查系统中是否有可用的 GPU,如果有则使用 GPU 进行计算,否则使用 CPU。
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
在 train ,创建模型实例后,使用 .to(device) 将模型移动到指定的设备(GPU 或 CPU)
- model = DQN(input_size, output_size).to(device)
- target_model = DQN(input_size, output_size).to(device)
在训练和推理过程中,将输入数据(状态、动作、奖励等)也移动到指定的设备上。
- state = torch.FloatTensor(state).unsqueeze(0).to(device)
-
- next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
-
- states = torch.FloatTensor(states).to(device)
- actions = torch.LongTensor(actions).to(device)
- rewards = torch.FloatTensor(rewards).to(device)
- next_states = torch.FloatTensor(next_states).to(device)
- dones = torch.FloatTensor(dones).to(device)
将 state 和 next_state 先使用 .cpu() 方法移动到 CPU 上,再使用 .numpy() 方法转换为 NumPy 数组
- replay_buffer.add(state.cpu().squeeze(0).numpy(), action, reward, next_state.cpu().squeeze(0).numpy(), done)
这个不改的话,会出现 TypeError: can't convert cuda:0 device type tensor to numpy 错误
完整代码如下:
- import numpy as np
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import random
- from collections import deque
- import matplotlib.pyplot as plt
- import matplotlib.colors as mcolors
- from matplotlib.table import Table
-
- device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
-
-
- class Game2048:
- def __init__(self):
- self.board = np.zeros((4, 4), dtype=int)
- self.add_random_tile()
- self.add_random_tile()
-
- def add_random_tile(self):
- empty_cells = np.argwhere(self.board == 0)
- if len(empty_cells) > 0:
- index = random.choice(empty_cells)
- self.board[index[0], index[1]] = 2 if random.random() < 0.9 else 4
-
- def move_left(self):
- reward = 0
- new_board = np.copy(self.board)
- for row in range(4):
- line = new_board[row]
- non_zero = line[line != 0]
- merged = []
- i = 0
- while i < len(non_zero):
- if i + 1 < len(non_zero) and non_zero == non_zero[i + 1]:
- merged.append(2 * non_zero)
- reward += 2 * non_zero
- i += 2
- else:
- merged.append(non_zero)
- i += 1
- new_board[row] = np.pad(merged, (0, 4 - len(merged)), 'constant')
- if not np.array_equal(new_board, self.board):
- self.board = new_board
- self.add_random_tile()
- return reward
-
- def move_right(self):
- self.board = np.fliplr(self.board)
- reward = self.move_left()
- self.board = np.fliplr(self.board)
- return reward
-
- def move_up(self):
- self.board = self.board.T
- reward = self.move_left()
- self.board = self.board.T
- return reward
-
- def move_down(self):
- self.board = self.board.T
- reward = self.move_right()
- self.board = self.board.T
- return reward
-
- def step(self, action):
- if action == 0:
- reward = self.move_left()
- elif action == 1:
- reward = self.move_right()
- elif action == 2:
- reward = self.move_up()
- elif action == 3:
- reward = self.move_down()
- done = not np.any(self.board == 0) and all([
- np.all(self.board[:, i] != self.board[:, i + 1]) for i in range(3)
- ]) and all([
- np.all(self.board[i, :] != self.board[i + 1, :]) for i in range(3)
- ])
- if 2048 in self.board:
- reward += 10000
- done = True
- state = self.board.flatten()
- max_tile = np.max(self.board)
- state = np.append(state, max_tile)
- return state, reward, done
-
- def reset(self):
- self.board = np.zeros((4, 4), dtype=int)
- self.add_random_tile()
- self.add_random_tile()
- state = self.board.flatten()
- max_tile = np.max(self.board)
- state = np.append(state, max_tile)
- return state
-
-
- class DQN(nn.Module):
- def __init__(self, input_size, output_size):
- super(DQN, self).__init__()
- self.fc1 = nn.Linear(input_size, 128)
- self.fc2 = nn.Linear(128, 128)
- self.fc3 = nn.Linear(128, output_size)
-
- def forward(self, x):
- x = torch.relu(self.fc1(x))
- x = torch.relu(self.fc2(x))
- return self.fc3(x)
-
-
- class ReplayBuffer:
- def __init__(self, capacity):
- self.buffer = deque(maxlen=capacity)
-
- def add(self, state, action, reward, next_state, done):
- self.buffer.append((state, action, reward, next_state, done))
-
- def sample(self, batch_size):
- batch = random.sample(self.buffer, batch_size)
- states, actions, rewards, next_states, dones = zip(*batch)
- return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)
-
- def __len__(self):
- return len(self.buffer)
-
-
- def visualize_board(board, ax):
- ax.clear()
- table = Table(ax, bbox=[0, 0, 1, 1])
- nrows, ncols = board.shape
- width, height = 1.0 / ncols, 1.0 / nrows
-
-
- cmap = mcolors.LinearSegmentedColormap.from_list("", ["white", "yellow", "orange", "red"])
-
- for (i, j), val in np.ndenumerate(board):
- color = cmap(np.log2(val + 1) / np.log2(2048 + 1)) if val > 0 else "white"
- table.add_cell(i, j, width, height, text=val if val > 0 else "",
- loc='center', facecolor=color)
-
- ax.add_table(table)
- ax.set_axis_off()
- plt.draw()
- plt.pause(0.1)
-
-
- def train():
- env = Game2048()
- input_size = 17
- output_size = 4
- model = DQN(input_size, output_size).to(device)
- target_model = DQN(input_size, output_size).to(device)
- target_model.load_state_dict(model.state_dict())
- target_model.eval()
-
- optimizer = optim.Adam(model.parameters(), lr=0.001)
- criterion = nn.MSELoss()
- replay_buffer = ReplayBuffer(capacity=10000)
- batch_size = 32
- gamma = 0.99
- epsilon = 1.0
- epsilon_decay = 0.995
- epsilon_min = 0.01
- update_target_freq = 10
-
- num_episodes = 1000
- fig, ax = plt.subplots()
- for episode in range(num_episodes):
- state = env.reset()
- state = torch.FloatTensor(state).unsqueeze(0).to(device)
- done = False
- total_reward = 0
- while not done:
- visualize_board(env.board, ax)
- if random.random() < epsilon:
- action = random.randint(0, output_size - 1)
- else:
- q_values = model(state)
- action = torch.argmax(q_values, dim=1).item()
-
- next_state, reward, done = env.step(action)
- next_state = torch.FloatTensor(next_state).unsqueeze(0).to(device)
- replay_buffer.add(state.cpu().squeeze(0).numpy(), action, reward, next_state.cpu().squeeze(0).numpy(), done)
-
- if len(replay_buffer) >= batch_size:
- states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
- states = torch.FloatTensor(states).to(device)
- actions = torch.LongTensor(actions).to(device)
- rewards = torch.FloatTensor(rewards).to(device)
- next_states = torch.FloatTensor(next_states).to(device)
- dones = torch.FloatTensor(dones).to(device)
-
- q_values = model(states)
- q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
-
- next_q_values = target_model(next_states)
- next_q_values = next_q_values.max(1)[0]
- target_q_values = rewards + gamma * (1 - dones) * next_q_values
-
- loss = criterion(q_values, target_q_values)
- optimizer.zero_grad()
- loss.backward()
- optimizer.step()
-
- state = next_state
- total_reward += reward
-
- if episode % update_target_freq == 0:
- target_model.load_state_dict(model.state_dict())
-
- epsilon = max(epsilon * epsilon_decay, epsilon_min)
- print(f"Episode {episode}: Total Reward = {total_reward}, Epsilon = {epsilon}")
-
- plt.close()
-
- if __name__ == "__main__":
- train()