7.3 D*算法实战
在本章前面的内容中已经讲解了D*算法是核心知识,在本节的内容中,将通过具体实例来展示实现D*算法的过程,展示D*算法在机器人、自动驾驶车辆等智能体面临未知环境或者环境动态变化时,重新规划路径以避开障碍物或者优化路径的方法。
7.3.1 实战案例:迷宫中的D*算法探索与动态障碍物处理
请看我们的迷宫问题:在一个迷宫中,有一个机器人需要从起点出发,找到通往终点的最短路径。机器人在探索路径时,会遇到障碍物,并且障碍物的位置会发生变化。当障碍物发生变化时,机器人需要重新规划路径。本题要求你完成以下任务:
- 实现D*算法,帮助机器人找到起点到终点的最短路径。
- 使用Matplotlib库在迷宫地图上可视化机器人的路径探索过程。
- 模拟障碍物的动态变化,当障碍物发生变化时,重新规划机器人的路径,并使用Matplotlib库显示更新后的路径。
实例7-1:迷宫中的D*算法探索与动态障碍物处理(codes/7/easy.py)
实例文件easy.py具体实现流程如下所示。
(1)下面的代码实现了一个迷宫中状态的表示类(State),以及一个用于路径规划的D*算法。该算法可以在迷宫地图中找到起点到终点的最短路径,并且能够处理动态障碍物的变化。迷宫地图中的状态包括起点、终点、障碍物以及空白状态,算法通过不断更新节点的代价值来实现路径的探索和更新。
import math import matplotlib.pyplot as plt from sys import maxsize # 导入最大数,2^63-1 class State(object): def __init__(self, x, y): self.x = x self.y = y self.parent = None self.state = "." self.t = "new" self.h = 0 self.k = 0 # k即为f(2)定义类State中的方法cost,用于计算当前状态与目标状态之间的代价。如果当前状态或目标状态为障碍物(#),则返回最大数值,表示距离无穷大;否则,返回当前状态与目标状态之间的欧氏距离。
def cost(self, state): if self.state == "#" or state.state == "#": return maxsize # 存在障碍物时,距离无穷大 return math.sqrt(math.pow((self.x - state.x), 2) + math.pow((self.y - state.y), 2))(3)定义类State中的方法set_state,用于设置当前状态的状态值。如果传入的状态值不是预定义的状态值(包括起点S、空白.、障碍物#、终点E、更新的路径*和标记+),则不做任何操作;否则,将当前状态的状态值设置为传入的状态值。
def set_state(self, state): if state not in ["S", ".", "#", "E", "*", "+"]: return self.state = state(4)下面这段代码定义了类Map,用于创建地图对象。在初始化方法__init__中,传入地图的行数和列数,并调用init_map方法初始化地图。
class Map(object): ''' 创建地图 ''' def __init__(self, row, col): self.row = row self.col = col self.map = self.init_map()(5)下面这段代码定义了方法init_map,用于初始化地图。方法init_map通过嵌套的循环遍历每个网格的行和列,然后创建一个State对象,将其添加到临时列表中,并将该临时列表添加到地图列表中。最后,返回完整的地图列表。
def init_map(self): # 初始化map map_list = [] for i in range(self.row): tmp = [] for j in range(self.col): tmp.append(State(i, j)) map_list.append(tmp) return map_list(6)下面这段代码定义了print_map方法,用于打印地图。方法print_map通过遍历地图的行和列,获取每个网格的状态,并将其打印出来,以空格分隔。这样就可以在控制台中打印出地图的可视化表示。
def print_map(self): for i in range(self.row): tmp = "" for j in range(self.col): tmp += self.map[i][j].state + " " print(tmp)(7)下面这段代码定义了方法plot_map,用于绘制地图的可视化表示。方法plot_map使用 matplotlib 库创建一个图形窗口,然后根据每个网格的状态绘制不同的标记:
- "#" 表示障碍物,用黑色方块表示。
- "S" 表示起点,用蓝色圆形表示。
- "E" 表示终点,用红色圆形表示。
- "+" 表示路径,用绿色三角形表示。
- "*" 表示更新的路径,用黄色三角形表示。
- "." 表示空白,用白色圆形表示。
绘制地图完成后,调用方法plt.show()显示绘制的可视化图形。
def plot_map(self): plt.figure(figsize=(8, 8)) for i in range(self.row): for j in range(self.col): if self.map[i][j].state == "#": plt.plot(j, i, 'sk', markersize=20) # 障碍物 elif self.map[i][j].state == "S": plt.plot(j, i, 'bo', markersize=10) # 起点 elif self.map[i][j].state == "E": plt.plot(j, i, 'ro', markersize=10) # 终点 elif self.map[i][j].state == "+": plt.plot(j, i, 'g^', markersize=10) # 路径 elif self.map[i][j].state == "*": plt.plot(j, i, 'y^', markersize=10) # 更新的路径 else: plt.plot(j, i, 'wo', markersize=5) # 空白 plt.grid(True) plt.title('Map') plt.show()(8)定义方法get_neighbers,用于获取给定状态的八个相邻状态(八邻域)。具体实现步骤如下:
- 初始化一个空列表 state_list 用于存储相邻状态。
- 使用双重循环遍历八个可能的偏移量:[-1, 0, 1]。
- 在循环中,跳过偏移量为 (0, 0) 的情况,因为它表示当前状态本身。
- 对于每个偏移量 (i, j),检查是否超出了地图边界,如果超出则跳过。
- 如果偏移量在合理范围内,将相邻状态添加到 state_list 中。
- 最后返回包含所有相邻状态的 state_list。
方法get_neighbers的目的是为了在地图中获取给定状态的所有八个相邻状态,以便在路径搜索或者规划算法中使用。
def get_neighbers(self, state): # 获取8邻域 state_list = [] for i in [-1, 0, 1]: for j in [-1, 0, 1]: if i == 0 and j == 0: continue if state.x + i < 0 or state.x + i >= self.row: continue if state.y + j < 0 or state.y + j >= self.col: continue state_list.append(self.map[state.x + i][state.y + j]) return state_list(9)定义方法set_obstacle,用于在地图上设置障碍物的位置。接受一个由 (x, y) 坐标对组成的列表 point_list,表示要设置障碍物的位置。对于 point_list 中的每个 (x, y) 坐标对,执行以下操作:
- 如果 x 小于 0 或者大于等于地图的行数 self.row,或者 y 小于 0 或者大于等于地图的列数 self.col,则说明该坐标超出了地图范围,跳过该坐标。
- 否则,将地图中 (x, y) 坐标对应的状态设置为 "#",表示障碍物。
方法set_obstacle的目的是根据输入的坐标列表,在地图上将指定位置标记为障碍物,以便在路径规划算法中避开这些障碍物。
def set_obstacle(self, point_list): # 设置障碍物的位置 for x, y in point_list: if x < 0 or x >= self.row or y < 0 or y >= self.col: continue self.map[x][y].set_state("#")(10)下面这段代码定义了类Dstar,用于执行D*算法。__init__ 方法初始化了D*算法的实例对象。参数maps传入了地图对象,表示该D*算法实例针对的地图。open_list 是一个集合,用于存储待处理的状态节点。
class Dstar(object): def __init__(self, maps): self.map = maps self.open_list = set() # 创建空集合(11)方法process_state()实现了D*算法核心功能,包括如下所示的三个关键步骤:
- 选择当前状态节点x:从open_list列表中选择具有最小k值(即f值)的节点作为当前处理节点x。
- 更新节点状态:根据当前状态x和其邻居节点的关系,更新节点的父节点、代价h值等信息。
- 调整open_list:根据节点的代价h值和状态信息,调整节点在open_list中的位置,确保open_list中节点的顺序符合算法要求。
通过以上步骤,方法process_state()实现了D*算法的核心逻辑,用于在地图上寻找最优路径,并在需要时对路径进行动态调整以应对障碍物的变化。
def process_state(self): ''' D*算法的主要过程 :return: ''' x = self.min_state() # 获取open_list列表中最小k的节点 if x is None: return -1 k_old = self.get_kmin() # 获取open_list列表中最小k节点的k值 self.remove(x) # 从open_list中移除 # 判断open_list中 if k_old < x.h: for y in self.map.get_neighbers(x): if y.h <= k_old and x.h > y.h + x.cost(y): x.parent = y x.h = y.h + x.cost(y) elif k_old == x.h: for y in self.map.get_neighbers(x): if y.t == "new" or (y.parent == x and y.h != x.h + x.cost(y)) \ or (y.parent != x and y.h > x.h + x.cost(y)): y.parent = x self.insert(y, x.h + x.cost(y)) else: for y in self.map.get_neighbers(x): if y.t == "new" or y.parent == x and y.h != x.h + x.cost(y): y.parent = x self.insert(y, x.h + x.cost(y)) else: if y.parent != x and y.h > x.h + x.cost(y): self.insert(x, x.h) else: if y.parent != x and x.h > y.h + x.cost(y) \ and y.t == "close" and y.h > k_old: self.insert(y, y.h) return self.get_kmin()(12)下面代码定义了D*算法中用于获取open_list中代价k(即f值)最小的状态节点的方法 min_state(),该方法通过 min() 函数和 lambda 表达式,在open_list中选择具有最小k值的节点,并返回该节点作为当前处理节点。如果open_list为空,则返回 None。
def min_state(self): if not self.open_list: return None min_state = min(self.open_list, key=lambda x: x.k) # 获取openlist中k值最小对应的节点 return min_state(13)定义方法 get_kmin(),用于获取open list中状态节点的最小k值(即f值)。首先,检查open list是否为空,如果为空,则返回 -1 表示未找到最小的k值。如果open_list 不为空,它使用列表解析 [x.k for x in self.open_list] 获取open_list 中所有节点的k值,并通过 min() 函数找到其中的最小值,然后将其返回。
def get_kmin(self): # 获取 open_list 中最小的 k 值(即最小的估价函数值) if not self.open_list: return -1 k_min = min([x.k for x in self.open_list]) return k_min(14)定义方法 insert,用于向open_list 中插入状态节点,并根据节点的状态进行相应的操作。方法的具体功能如下所示:
- 如果状态为 "new",则将节点的k值设置为新的h值。
- 如果状态为 "open",则将节点的k值更新为当前k值和新的h值中的较小值。
- 如果状态为 "close",则将节点的k值更新为当前h值和新的h值中的较小值。
- 将节点的h值更新为新的h值。
- 将节点的状态设置为 "open"。
- 将节点添加到open list中。
def insert(self, state, h_new): if state.t == "new": state.k = h_new elif state.t == "open": state.k = min(state.k, h_new) elif state.t == "close": state.k = min(state.h, h_new) state.h = h_new state.t = "open" self.open_list.add(state)(15)定义方法remove ,用于从open_list中移除状态节点。如果状态为 "open",则将节点的状态更新为 "close",然后从open_list中移除该节点。
def remove(self, state): if state.t == "open": state.t = "close" self.open_list.remove(state)(16)定义方法modify_cost,用于修改节点的代价。如果节点的状态为 "close",则通过其父节点递推整条路径上的代价,并将新的代价值插入到open_listt中。
def modify_cost(self, x): if x.t == "close": #如果节点已关闭,则通过其父节点递推更新整条路径上的 cost 值 self.insert(x, x.parent.h + x.cost(x.parent))(17)下面这段代码定义了方法run,用于执行D*算法的主要流程,并可视化地图搜索结果。方法 run的具体实现流程如下所示。
- 首先,将终点 end 插入到open list中,并将其代价设置为0。
- 然后,循环执行 process_state 方法,直到起点 start 的状态变为 "close"。
- 将起点的状态设置为 "S",并从终点开始向父节点回溯,将沿途节点的状态设置为 "+",直到回溯到起点。
- 将终点的状态设置为 "E"。
- 打印出障碍物未发生变化时的搜索路径,并通过调用 plot_map 方法可视化地图。
- 设置障碍物位置,并通过调用 set_obstacle 方法更新地图中的障碍物。
- 从起点开始,沿着更新后的路径行进,遇到障碍物时重新修改代价,并继续寻找路径,直到到达终点。
- 打印输出障碍物发生变化时的搜索路径,并再次通过 plot_map 方法绘制可视化地图。
def run(self, start, end): self.insert(end, 0) while True: self.process_state() if start.t == "close": break start.set_state("S") s = start while s != end: s = s.parent s.set_state("+") s.set_state("E") print('障碍物未发生变化时,搜索的路径如下:') self.map.print_map() self.map.plot_map() tmp = start # 起始点不变 self.map.set_obstacle( [(9, 3), (9, 4), (9, 5), (9, 6), (9, 7), (9, 8)]) # 障碍物发生变化 ''' 从起始点开始,往目标点行进,当遇到障碍物时,重新修改代价,再寻找路径 ''' while tmp != end: tmp.set_state("*") if tmp.parent.state == "#": self.modify(tmp) continue tmp = tmp.parent tmp.set_state("E") print('障碍物发生变化时,搜索的路径如下(*为更新的路径):') self.map.print_map() self.map.plot_map()(18)定义方法modify,用于在障碍物发生变化时更新路径的代价。首先,调用 modify_cost 方法来更新受影响节点及其后继节点的路径代价。然后,循环执行 process_state 方法,直到当前最小代价大于等于参数 state 的路径代价 state.h。
def modify(self, state): ''' 当障碍物发生变化时,从目标点往起始点回推,更新由于障碍物发生变化而引起的路径代价的变化 :param state: :return: ''' self.modify_cost(state) while True: k_min = self.process_state() if k_min >= state.h: break(19)下面代码首先创建了一个大小为 20x20 的地图,并在初始位置设置了障碍物。然后指定了起点和终点的位置,并使用 D* 算法寻找起点到终点的最优路径。最后,通过可视化展示了搜索过程中障碍物未发生变化和障碍物发生变化时的路径情况。
if __name__ == '__main__': m = Map(20, 20) m.set_obstacle([(4, 3), (4, 4), (4, 5), (4, 6), (5, 3), (6, 3), (7, 3)]) # 障碍物初始位置 start = m.map[1][2] # 起点位置 end = m.map[17][11] # 终点位置 dstar = Dstar(m) dstar.run(start, end)执行后会打印输出:
障碍物未发生变化时,搜索的路径如下障碍物发生变化时,搜索的路径如下(*为更新的路径以上输出结果显示了一个二维地图,在地图上标记了起点(S)、终点(E)、障碍物(#)、路径(+)以及更新的路径(*)。在障碍物未发生变化时,程序根据D算法找到了起点到终点的路径,并在地图上标记出来。障碍物发生变化后,程序重新计算了路径,并在地图上更新了路径标记。
并且执行后还会使用库Matplotlib实现地图的可视化,如图7-1所示。其中障碍物用黑色方块表示,起点用蓝色圆圈表示,终点用红色圆圈表示,搜索的路径用绿色三角形表示,更新的路径用黄色三角形表示。
障碍物未发生变化时的路径
障碍物发生变化时的路径
图7-1 路径可视化