基於深度強化學習(DQN)的迷宮尋路演演算法

2023-04-23 06:00:41

QLearning方法有著明顯的侷限性,當狀態和動作空間是離散的且維數不高時可使用Q-Table儲存每個狀態動作的Q值,而當狀態和動作時高維連續時,該方法便不太適用。可以將Q-Table的更新問題變成一個函數擬合問題,通過更新引數θ使得Q函數逼近最優Q值。DL是解決引數學習的有效方法,可以通過引進DL來解決強化學習RL中擬合Q值函數問題,但是要先解決一系列問題:

  1. DL需要大量帶標籤的樣本進行監督學習,但RL只有reward返回值
  2. DL樣本獨立,但RL前後State狀態有關
  3. DL目標分佈固定,但RL的分佈一直變化
  4. 使用非線性網路表示值函數時會不穩定

QLearning實現:https://www.cnblogs.com/N3ptune/p/17341434.html

Deep Q-Network

此處將使用DQN來解決上述問題,其演演算法流程包括:

  1. 首先初始化深度神經網路,它將作為 Q 函數的近似值函數
  2. 初始化經驗回放緩衝區,用於儲存智慧體的經驗,其中包括狀態、動作、獎勵、下一狀態等資訊
  3. 智慧體在環境中採取行動,根據行動獲得獎勵,得到下一個狀態,並將這些經驗新增到經驗回放緩衝區中
  4. 從經驗回放緩衝區中取樣一批經驗,用於訓練神經網路
  5. 根據神經網路計算每個動作的 Q 值
  6. 選擇一個動作,可以使用 ε-greedy 策略或者 softmax 策略等
  7. 根據選擇的動作與環境互動,得到獎勵和下一個狀態,將經驗新增到經驗回放緩衝區中
  8. 使用經驗回放緩衝區中的資料對神經網路進行訓練,目標是最小化 Q 值函數的平均誤差
  9. 將神經網路中的引數複製到目標網路中,每隔一段時間更新目標網路,以提高穩定性和收斂性
  10. 重複執行步驟3-9,直到達到指定的訓練輪數或者 Q 值函數收斂

此處要說明的是,DQN要使用Reward來構造標籤,通過經驗回放來解決相關性以及非靜態分佈問題,使用一個CNN(Policy-Net)產生當前Q值,使用另外一個CNN(Target-Net)產生Target Q值

在本問題中,動作空間依然是上下左右四個方向,以整個迷宮為狀態,用0來標記道路、-1表示障礙、1表示起點和終點,2表示已經走過的路徑

損失函數

Q的目標值:

\[y_i=r+\gamma\max_{a'}Q(s',a';\theta^-) \]

Q的預測值:

\[Q(s,a;\theta) \]

因此損失函數為:

\[L(\theta) = \mathbb{E}[(r + \gamma \max_{a'} Q_{\text{target}}(s', a', \theta^{-}) - Q(s, a, \theta))^2] \]

經驗回放

經驗回放機制,不斷地將智慧體與環境互動產生的經驗儲存到一個經驗池中,然後從這個經驗池中隨機抽取一定數量的經驗,用於訓練神經網路,避免了資料的相關性和非靜態分佈性。

經驗回放機制的優點在於可以將不同時間點收集到的經驗混合在一起,使得訓練的樣本具有更大的多樣性,避免了訓練樣本的相關性,從而提高了訓練的穩定性和效率。此外,經驗回放機制還可以減少因為樣本分佈的改變而造成的訓練不穩定問題。

在DQN中,經驗回放機制的具體實現方式是將智慧體與環境的互動序列(state, action, reward, next state)儲存在一個經驗池中,當神經網路進行訓練時,從經驗池中隨機抽取一定數量的經驗序列,用於訓練網路。這種方法可以減少資料的相關性,同時還可以重複利用之前的經驗,提高資料的利用率。

程式碼實現

首先實現一個神經網路,如上述分析,該網路用於擬合Q函數,接收一個狀態作為輸入,然後在其隱藏層中執行一系列非線性轉換,最終輸出狀態下所有可能動作的Q值。這些Q值可以被用來選擇下一步要執行的動作。

# Deep Q Network
class DQNet(nn.Module):
    def __init__(self):
        super(DQNet,self).__init__()
        self.conv1 = nn.Conv2d(1,32,kernel_size=3,stride=1,padding=1)
        self.conv2 = nn.Conv2d(32,64,kernel_size=3,stride=1,padding=1)
        self.fc1 = nn.Linear(64*8*8,256)
        self.dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(256,4)

    def forward(self,x):
        x = x.view(-1,1,8,8)
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(-1,64*8*8)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

定義經驗回放緩衝

class ReplayBuffer:
    # 初始化緩衝區
    def __init__(self,capacity):
        self.capacity = capacity
        self.buffer = []

    # 將一條經驗資料新增到緩衝區中
    def push(self,state,action,reward,next_state,done):
        if len(self.buffer) >= self.capacity:
            self.buffer.pop(0)
        self.buffer.append((state,action,reward,next_state,done))

    # 隨機從緩衝區抽取batch_size大小的經驗資料
    def sample(self,batch_size):
        states,actions,rewards,next_states,dones = zip(*random.sample(self.buffer,batch_size))
        return states,actions,rewards,next_states,dones

    def __len__(self):
        return len(self.buffer)

定義智慧體:

class DQNAgent:
    def __init__(self,state_size,action_size):
        self.state_size = state_size  # 狀態空間
        self.action_size = action_size # 動作空間
        self.q_net = DQNet()  # 估計動作價值 神經網路
        self.target_q_net = DQNet() # 計算目標值 神經網路
        self.target_q_net.load_state_dict(self.q_net.state_dict())
        self.optimizer = optim.Adam(self.q_net.parameters(),lr=0.001)  # 初始化Adam優化器
        self.memory = ReplayBuffer(capacity=10000)  # 經驗回放緩衝區
        self.gamma = 0.99 # 折扣因子
        self.epsilon = 1.0 # 探索率
        self.epsilon_decay = 0.99995 # 衰減因子
        self.epsilon_min = 0.01 # 探索率最小值
        self.batch_size = 64  # 經驗回放每個批次大小
        self.update_rate = 200 # 網路更新頻率
        self.steps = 0 # 總步數

    # 探索策略 在給定狀態下采取動作
    def get_action(self,state):
        if np.random.rand() <= self.epsilon:
            return np.random.choice(self.action_size) # 隨機選擇動作
        state = torch.from_numpy(state).float().unsqueeze(0)
        q_values = self.q_net(state)
        return torch.argmax(q_values,dim=1).item()

    # 將狀態轉移元組儲存到經驗回放緩衝區
    def remember(self,state,action,reward,next_state,done):
        self.memory.push(state,action,reward,next_state,done)

    # 從經驗回放緩衝區抽取一個批次的轉移樣本
    def relay(self):
        if len(self.memory) < self.batch_size:
            return

        # 從回放經驗中抽取資料
        states,actions,rewards,next_states,dones = self.memory.sample(self.batch_size)
        states = torch.from_numpy(np.array(states)).float()
        actions = torch.from_numpy(np.array(actions)).long()
        rewards = torch.from_numpy(np.array(rewards)).float()
        next_states = torch.from_numpy(np.array(next_states)).float()
        dones = torch.from_numpy(np.array(dones)).long()

        q_targets = self.target_q_net(next_states).detach()  # 計算下一狀態Q值
        q_targets[dones] = 0.0 # 對於已完成狀態 將Q值設定為0

        # 計算目標Q值
        q_targets = rewards.unsqueeze(1) + self.gamma * torch.max(q_targets,dim=1)[0].unsqueeze(1)
        q_expected = self.q_net(states).gather(1,actions.unsqueeze(1)) # 計算當前狀態Q值

        # 計算損失值
        loss = F.mse_loss(q_expected,q_targets)

        # 通過反向傳播更新神經網路的引數
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        self.steps += 1

        # 隔一定步數 更新目標網路
        if self.steps % self.update_rate == 0:
            self.target_q_net.load_state_dict(self.q_net.state_dict())

        # 更新epsilon值 使得探索時間衰減
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def train(self,env,episodes):
        steps = []
        for episode in range(episodes):
            env.reset(complete=False)
            step = 0
            while True:
                step += 1
                action = self.get_action(env.state)  # 獲取動作
                next_state,reward,done = env.step(action)  # 執行動作
                agent.remember(env.state,action,reward,next_state,done)
                agent.relay()
                env.state = next_state  # 更新地圖狀態
                if done or step > 200:
                    break
            steps.append(step)
        return steps

    def test(self,env):
        step = 0
        while True:
            step += 1
            action = self.get_action(env.state)
            next_state,reward,done = env.step(action)
            env.state = next_state
            if done or step > 1000:
                break

    def save(self,path):
        torch.save(self.q_net.state_dict(),path+"/value_model.pt")
        torch.save(self.target_q_net.state_dict(),path+"/target_model.pt")

    def load(self,path):
        self.q_net.load_state_dict(torch.load(path+"/value_model.pt"))
        self.target_q_net.load_state_dict(torch.load(path+"/target_model.pt"))

在定義中,init函數用於初始化物件:

def __init__(self,state_size,action_size):
    self.state_size = state_size  # 狀態空間
    self.action_size = action_size # 動作空間
    self.q_net = DQNet()  # 估計動作價值 神經網路
    self.target_q_net = DQNet() # 計算目標值 神經網路
    self.target_q_net.load_state_dict(self.q_net.state_dict())
    self.optimizer = optim.Adam(self.q_net.parameters(),lr=0.001)  # 初始化Adam優化器
    self.memory = ReplayBuffer(capacity=10000)  # 經驗回放緩衝區
    self.gamma = 0.99 # 折扣因子
    self.epsilon = 1.0 # 探索率
    self.epsilon_decay = 0.99995 # 衰減因子
    self.epsilon_min = 0.01 # 探索率最小值
    self.batch_size = 64  # 經驗回放每個批次大小
    self.update_rate = 200 # 網路更新頻率
    self.steps = 0 # 總步數

上述包含了一些DQN的重要引數

在智慧體選取動作時,依然使用QL中的貪婪策略

# 探索策略 在給定狀態下采取動作
def get_action(self,state):
    if np.random.rand() <= self.epsilon:
        return np.random.choice(self.action_size) # 隨機選擇動作
    state = torch.from_numpy(state).float().unsqueeze(0)
    q_values = self.q_net(state)
    return torch.argmax(q_values,dim=1).item()

與QL不同的是,Q值由神經網路求得

下述函數用於將五元組儲存到經驗回放緩衝區

# 將狀態轉移元組儲存到經驗回放緩衝區
def remember(self,state,action,reward,next_state,done):
    self.memory.push(state,action,reward,next_state,done)

經驗回放:

# 從經驗回放緩衝區抽取一個批次的轉移樣本
def relay(self):
    if len(self.memory) < self.batch_size:
        return

    # 從回放經驗中抽取資料
    states,actions,rewards,next_states,dones = self.memory.sample(self.batch_size)
    states = torch.from_numpy(np.array(states)).float()
    actions = torch.from_numpy(np.array(actions)).long()
    rewards = torch.from_numpy(np.array(rewards)).float()
    next_states = torch.from_numpy(np.array(next_states)).float()
    dones = torch.from_numpy(np.array(dones)).long()

    q_targets = self.target_q_net(next_states).detach()  # 計算下一狀態Q值
    q_targets[dones] = 0.0 # 對於已完成狀態 將Q值設定為0

    # 計算目標Q值
    q_targets = rewards.unsqueeze(1) + self.gamma * torch.max(q_targets,dim=1)[0].unsqueeze(1)
    q_expected = self.q_net(states).gather(1,actions.unsqueeze(1)) # 計算當前狀態Q值

    # 計算損失值
    loss = F.mse_loss(q_expected,q_targets)

    # 通過反向傳播更新神經網路的引數
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

    self.steps += 1

    # 隔一定步數 更新目標網路
    if self.steps % self.update_rate == 0:
        self.target_q_net.load_state_dict(self.q_net.state_dict())

    # 更新epsilon值 使得探索時間衰減
    if self.epsilon > self.epsilon_min:
        self.epsilon *= self.epsilon_decay

在每個時間步,從經驗回放池中隨機選擇一批先前觀察到的狀態和動作,然後計算它們的Q值。之後可以使用這些Q值來計算一個損失函數,該函數衡量當前的Q函數與理論上的Q函數之間的差距。最後使用反向傳播演演算法來更新神經網路的權重,以最小化損失函數。

訓練函數:

def train(self,env,episodes):
    steps = []
    for episode in range(episodes):
        env.reset(complete=False)
        step = 0
        while True:
            step += 1
            action = self.get_action(env.state)  # 獲取動作
            next_state,reward,done = env.step(action)  # 執行動作
            agent.remember(env.state,action,reward,next_state,done)
            agent.relay()
            env.state = next_state  # 更新地圖狀態
            if done or step > 200:
                break
        steps.append(step)
    return steps

在該函數中會讓智慧體進行數次遊戲,每次遊戲開始時會重置狀態,但不重置迷宮,並且設定一個閾值,讓智慧體步數達到這個值時終止遊戲,否則智慧體有概率不斷滯留。在智慧體每次選擇動作並執行後,會將這次的狀態和動作以及獎賞儲存到經驗池中,之後進行經驗回放,訓練網路。

定義環境

定義一個迷宮環境,和智慧體進行互動:

class MazeEnv:
    def __init__(self,size):
        self.size = size
        self.actions = [0,1,2,3]
        self.maze,self.start,self.end = self.generate(size)
        self.state = np.expand_dims(self.maze,axis=2).copy()

    def reset(self,complete=False):
        if complete:
            # 重置迷宮
            self.maze,self.start,self.end = self.generate(self.size)
        self.state = np.expand_dims(self.maze,axis=2)
        self.position = self.start
        self.goal = self.end
        self.path = [self.start]
        return self.state

    def step(self, action):
        # 執行動作
        next_position = None
        if action == 0 and self.position[0] > 0:
            next_position = (self.position[0]-1, self.position[1])
        elif action == 1 and self.position[0] < self.size-1:
            next_position = (self.position[0]+1, self.position[1])
        elif action == 2 and self.position[1] > 0:
            next_position = (self.position[0], self.position[1]-1)
        elif action == 3 and self.position[1] < self.size-1:
            next_position = (self.position[0], self.position[1]+1)
        else:
            next_position = self.position

        if next_position == self.goal:
            reward = 500
        elif self.maze[next_position] == -1:
            reward = -300
        else:
            reward = -10

        self.position = next_position  # 更新位置
        self.path.append(self.position)  # 加入路徑

        next_state = self.state.copy()
        next_state[self.position] = 2 # 標記路徑

        done = (self.position == self.goal)  # 判斷是否結束
        return next_state, reward, done

    @staticmethod
    # 生成迷宮影象
    def generate(size):
        maze = np.zeros((size, size))
        # Start and end points
        start = (random.randint(0, size-1), 0)
        end = (random.randint(0, size-1), size-1)
        maze[start] = 1
        maze[end] = 1
        # Generate maze walls
        for i in range(size * size):
            x, y = random.randint(0, size-1), random.randint(0, size-1)
            if (x, y) == start or (x, y) == end:
                continue
            if random.random() < 0.2:
                maze[x, y] = -1
            if np.sum(np.abs(maze)) == size*size - 2:
                break
        return maze, start, end

    @staticmethod
    # BFS求出路徑
    def solve_maze(maze, start, end):
        size = maze.shape[0]
        visited = np.zeros((size, size))
        solve = np.zeros((size,size))
        queue = [start]
        visited[start[0],start[1]] = 1
        while queue:
            x, y = queue.pop(0)
            if (x, y) == end:
                break
            for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
                nx, ny = x + dx, y + dy
                if nx < 0 or nx >= size or ny < 0 or ny >= size or visited[nx, ny] or maze[nx, ny] == -1:
                    continue
                queue.append((nx, ny))
                visited[nx, ny] = visited[x, y] + 1
        if visited[end[0],end[1]] == 0:
            return solve,[]
        path = [end]
        x, y = end
        while (x, y) != start:
            for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
                nx, ny = x + dx, y + dy
                if nx < 0 or nx >= size or ny < 0 or ny >= size or visited[nx, ny] != visited[x, y] - 1:
                    continue
                path.append((nx, ny))
                x, y = nx, ny
                break

        points = path[::-1]  # 倒序
        for point in points:
            solve[point[0]][point[1]] = 1
        return solve, points

模型訓練

初始化,這裡針對8*8迷宮

maze_size = 8
input_shape = (maze_size,maze_size,1)
num_actions = 4
agent = DQNAgent(input_shape,num_actions)
env = MazeEnv(maze_size)

定義一個函數,用於繪製迷宮:

from PIL import Image

def maze_to_image(maze, path):
    size = maze.shape[0]
    img = Image.new('RGB', (size, size), (255, 255, 255))
    pixels = img.load()
    for i in range(size):
        for j in range(size):
            if maze[i, j] == -1:
                pixels[j, i] = (0, 0, 0)
            elif maze[i, j] == 1:
                pixels[j, i] = (0, 255, 0)
    for x, y in path:
        pixels[y, x] = (255, 0, 0)
    return np.array(img)

執行訓練:

for epoch in range(100):
    steps = agent.train(env,50)

    plt.imshow(maze_to_image(env.maze,[]))
    plt.savefig(f"mazes/{epoch+1}.png") # 儲存迷宮原始影象
    plt.clf()

    plt.plot(steps)
    plt.xlabel('Episode')
    plt.ylabel('Steps')
    plt.title('Training Steps')
    plt.savefig(f"train/{epoch+1}.png") # 儲存訓練影象
    plt.clf()

    solve = maze_to_image(env.maze,env.path)

    plt.imshow(solve)
    plt.savefig(f"solves/{epoch+1}.png") # 儲存最後一次路線
    plt.clf()

    env.reset(complete=True)  # 完全重置環境

agent.save("model")

抽取一些訓練時的圖片:

第1次訓練:

迷宮影象:

最後一次路線圖:

訓練影象:

執行步數不穩定,有多次超出閾值

第10次:

第50次:

儘管效率很高,但依然觸碰了障礙物

第100次:

這次不僅沒有觸碰障礙物,並且非常接近最優解