https://gitee.com/zhu-cancan/wit_imu_ros2置ROS2环境与多IMU数据发布使用小鱼工具快速配置ROS2环境wget http://fishros.com/install -O fishros . fishros创建工作空间与功能包部署在home目录下创建并解压功能包mkdir -p wit_ros2_imu_ws/src unzip wit_ros2_imu_src.zip -d wit_ros2_imu_ws/src/ cd wit_ros2_imu_ws进入成功后如下图所示修改设备参数编辑Python脚本以匹配硬件配置nano ./src/wit_ros2_imu/wit_ros2_imu/wit_ros2_imu.py更改成自己设备的类型与波特率1 、将self.declare parameter(protocol RS485_HIGH这句的红框处更改为RS485_STD。2、将self.declare parameter(baudrate 230400这句的红框处根据传感器的实际波特率修改一般设备出厂默认都是9600的。3.将self.declare parameter(modbusid 0x50改成设备对应的设备id更改完毕后按ctrlx然后按y后再按回车便可保存硬件连接与USB绑定使用485转USB模块连接IMUA-A, B-B, VCC-VCC, GND-GND。获取设备信息并绑定输入udevadm info --queryall --name/dev/ttyUSB0 | grep -E ID_VENDOR_ID|ID_MODEL_ID输入完毕后按下回车记住这两个值接着输入nano src/wit_ros2_imu/script/imu_usb.rules输入完毕后按下回车进入如图界面将红框框起的字符修改成上一步骤记录的字符修改完毕后按ctrlx再按下Y再按下回车键若按下ctrlx后窗口直接关闭了的话不需要理会在终端中输入sudo bash src/wit_ros2_imu/script/bind_usb.sh出现Finish即为写入绑定规则成功重新拔插usb在终端中输入ls -l /dev/imu_usb出现下图字段即为绑定成功运行功能包绑定完毕USB设备后输入ros2 run wit_ros2_imu wit_ros2_imu输入完毕后按下回车即可驱动imu模块返回数值运行成功示意图。若是提示[imu_driver_node]: Serial port opening failure请按下ctrlc输入ls -l /dev/imu_usb查看是否存在/dev/imu_usb这个设备若出现ls: 无法访问 /dev/imu_usb: 没有那个文件或目录请重新拔插USB直到出现/dev/imu_usb这个设备多IMU数据发布配置修改代码为import math import serial import struct import numpy as np import rclpy from enum import Enum from rclpy.node import Node from sensor_msgs.msg import Imu from imu_msg.msg import ImuData class protocolType(Enum): TTL_STD 1 TTL_HIGH 2 CAN_STD 3 CAN_HIGH 4 RS485_STD 5 RS485_HIGH 6 class imuDriverNode(Node): def __init__(self): super().__init__(imuDriverNode) # 参数声明 self.declare_parameter(port, /dev/imu_usb) self.declare_parameter(baudrate, 115200) self.declare_parameter(protocol, RS485_STD) self.declare_parameter(modbusIDs, [0x51, 0x52, 0x53]) self.port self.get_parameter(port).value self.baudrate self.get_parameter(baudrate).value self.modbusIDs self.get_parameter(modbusIDs).value protocolStr self.get_parameter(protocol).value self.protocol protocolType[protocolStr] # 串口初始化 self.serialPort serial.Serial( self.port, self.baudrate, timeout0.001, write_timeout0 ) self.get_logger().info(fSerial opened: {self.port} {self.baudrate}) # 多IMU轮询相关变量 self.modbusAddrList [0x34, 0x37, 0x3A, 0x3D] self.modbusIndex 0 self.imuIndex 0 self.waitingResponse False self.lastSendTime 0 self.currentRequestAddr None # 接收缓存 self.rxBuffer bytearray() # 为每个IMU创建独立的数据存储和发布器 self.imuData {} self.imuPublishers {} for mid in self.modbusIDs: self.imuData[mid] { acc: np.zeros(3), gyro: np.zeros(3), angle: np.zeros(3), mag: np.zeros(3), } suffix fimu_{mid:02x} self.imuPublishers[mid] { imu: self.create_publisher(Imu, fimu/data/{suffix}, 10), rpy: self.create_publisher(ImuData, fimu/rpy/{suffix}, 10), } # 定时器放在最后创建避免上下文错误 self.create_timer(0.001, self.timerCallback) self.create_timer(0.1, self.printMsg) def timerCallback(self): if self.protocol in (protocolType.RS485_STD, protocolType.RS485_HIGH): now self.get_clock().now().nanoseconds if not self.waitingResponse: current_id self.modbusIDs[self.imuIndex] addr self.modbusAddrList[self.modbusIndex] self.currentRequestAddr addr if self.protocol protocolType.RS485_HIGH and addr 0x3D: cmd self.buildModbusReadCmd(current_id, addr, 6) else: cmd self.buildModbusReadCmd(current_id, addr, 3) self.serialPort.write(cmd) self.waitingResponse True self.lastSendTime now elif now - self.lastSendTime 50000000: self.waitingResponse False self.currentRequestAddr None self.modbusIndex (self.modbusIndex 1) % len(self.modbusAddrList) if self.modbusIndex 0: self.imuIndex (self.imuIndex 1) % len(self.modbusIDs) self.readSerial() self.parseBuffer() def readSerial(self): cnt self.serialPort.in_waiting if cnt: self.rxBuffer.extend(self.serialPort.read(cnt)) def parseBuffer(self): if self.protocol not in (protocolType.RS485_STD, protocolType.RS485_HIGH): return while len(self.rxBuffer) 5: dev_id self.rxBuffer[0] if dev_id not in self.modbusIDs: self.rxBuffer.pop(0) continue if self.rxBuffer[1] ! 0x03: self.rxBuffer.pop(0) continue byteCnt self.rxBuffer[2] frameLen 3 byteCnt 2 if len(self.rxBuffer) frameLen: return frame self.rxBuffer[:frameLen] recv_crc (frame[-2] 8) | frame[-1] calc_crc self.modbusCRC(frame[:-2]) calc_crc ((calc_crc 0xFF) 8) | (calc_crc 8) if recv_crc ! calc_crc: self.rxBuffer.pop(0) continue del self.rxBuffer[:frameLen] self.handleModbusFrame(frame, dev_id) def handleModbusFrame(self, frame, dev_id): byteCnt frame[2] data frame[3:3byteCnt] addr self.currentRequestAddr imu self.imuData[dev_id] if self.protocol protocolType.RS485_STD: if len(data) 6: v struct.unpack(hhh, data) if addr 0x34: imu[acc] np.array(v) if addr 0x37: imu[gyro] np.array(v) if addr 0x3A: imu[mag] np.array(v) if addr 0x3D: imu[angle] np.array(v) else: if addr 0x3D and len(data) 12: def p32(o): raw (data[o] 24) | (data[o1] 16) | (data[o2] 8) | data[o3] return raw if raw 0x80000000 else raw - 0x100000000 imu[angle] np.array([p32(0), p32(4), p32(8)]) elif len(data) 6: v struct.unpack(hhh, data) if addr 0x34: imu[acc] np.array(v) if addr 0x37: imu[gyro] np.array(v) if addr 0x3A: imu[mag] np.array(v) self.publishImu(dev_id) self.waitingResponse False self.modbusIndex (self.modbusIndex 1) % len(self.modbusAddrList) if self.modbusIndex 0: self.imuIndex (self.imuIndex 1) % len(self.modbusIDs) def publishImu(self, dev_id): imu self.imuData[dev_id] acc imu[acc] * 16 / 32768.0 gyro np.radians(imu[gyro] * 2000 / 32768.0) mag imu[mag] if self.protocol protocolType.RS485_HIGH: angle np.radians(imu[angle] / 1000.0) else: angle np.radians(imu[angle] * 180.0 / 32768.0) q self.eulerToQuaternion(angle[0], angle[1], angle[2]) msg Imu() msg.header.stamp self.get_clock().now().to_msg() msg.header.frame_id fimu_{dev_id:02x}_link msg.linear_acceleration.x, msg.linear_acceleration.y, msg.linear_acceleration.z acc.tolist() msg.angular_velocity.x, msg.angular_velocity.y, msg.angular_velocity.z gyro.tolist() msg.orientation.x, msg.orientation.y, msg.orientation.z, msg.orientation.w q rpy ImuData() rpy.header msg.header rpy.imu msg rpy.roll, rpy.pitch, rpy.yaw map(math.degrees, angle) self.imuPublishers[dev_id][imu].publish(msg) self.imuPublishers[dev_id][rpy].publish(rpy) def buildModbusReadCmd(self, mid, addr, length): f bytearray([mid, 0x03, (addr 8) 0xFF, addr 0xFF, (length 8) 0xFF, length 0xFF]) crc self.modbusCRC(f) f.extend([crc 0xFF, (crc 8) 0xFF]) return f staticmethod def modbusCRC(data): crc 0xFFFF for b in data: crc ^ b for _ in range(8): crc (crc 1) ^ 0xA001 if crc 1 else crc 1 return crc staticmethod def eulerToQuaternion(r, p, y): cr, cp, cy math.cos(r/2), math.cos(p/2), math.cos(y/2) sr, sp, sy math.sin(r/2), math.sin(p/2), math.sin(y/2) return [ sr*cp*cy - cr*sp*sy, cr*sp*cy sr*cp*sy, cr*cp*sy - sr*sp*cy, cr*cp*cy sr*sp*sy ] def printMsg(self): print(\033[H\033[J, end) for mid in self.modbusIDs: imu self.imuData[mid] acc imu[acc] * 16 / 32768.0 gyro imu[gyro] * 2000 / 32768.0 angle imu[angle] * (180.0 / 32768.0 if self.protocol protocolType.RS485_STD else 0.1) print(f IMU {hex(mid)} ) print(fAccel: x{acc[0]:8.3f} | y{acc[1]:8.3f} | z{acc[2]:8.3f} m/s²) print(fGyro: x{gyro[0]:8.3f} | y{gyro[1]:8.3f} | z{gyro[2]:8.3f} °/s) print(fAngle: x{angle[0]:8.3f} | y{angle[1]:8.3f} | z{angle[2]:8.3f} °) print(fMag: x{imu[mag][0]:8.0f} | y{imu[mag][1]:8.0f} | z{imu[mag][2]:8.0f}\n) def shutdown(self): if self.serialPort and self.serialPort.is_open: self.serialPort.close() self.get_logger().info(Serial port closed) def main(): rclpy.init() node imuDriverNode() try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.shutdown() rclpy.shutdown() if __name__ __main__: main()主要修改点修改驱动节点以支持多设备self.declare_parameter(modbusIDs, [0x51, 0x52, 0x53]) # 三个IMU的ID for mid in self.modbusIDs: suffix fimu_{mid:02x} self.create_publisher(Imu, fimu/data/{suffix}, 10) # 独立Topic轮询机制处理多IMU数据def timerCallback(self): current_id self.modbusIDs[self.imuIndex] addr self.modbusAddrList[self.modbusIndex] cmd self.buildModbusReadCmd(current_id, addr, 3) self.serialPort.write(cmd) self.imuIndex (self.imuIndex 1) % len(self.modbusIDs)启动驱动节点并检查Topicros2 run wit_ros2_imu wit_ros2_imu ros2 topic list # 应显示三个IMU的Topic运行界面