岳阳市网站建设_网站建设公司_ASP.NET_seo优化
2026/1/20 2:34:37 网站建设 项目流程

零基础也能懂:用 Python 玩转 Modbus 通信,从请求到响应的完整旅程

你有没有遇到过这样的场景?
一台 PLC 在工厂角落默默运行,传感器数据不断产生,而你想把这些信息读出来——不是靠 HMI 触摸屏,而是自己写代码去“对话”。听起来很酷,但协议复杂、寄存器地址一堆十六进制数,让人望而却步?

别担心。今天我们就来拆解这个过程,用最简单的 Python 代码,带你一步步看懂 Modbus 是怎么工作的。核心工具就是pymodbus——一个纯 Python 实现的工业通信库。

重点不在于记住所有 API,而在于理解:主站是怎么发请求的?从站又是如何回应的?中间发生了什么?


为什么是 pymodbus?它解决了什么问题?

在工业自动化中,Modbus 就像设备间的“普通话”:简单、通用、历史悠久。无论是温湿度传感器、电表还是变频器,很多都支持 Modbus 协议。

传统的做法是用 C/C++ 调用底层驱动(比如 libmodbus),但这对初学者门槛太高。而pymodbus 的出现,让 Python 成为上位机开发的新选择

它的优势非常明显:
- ✅ 纯 Python 编写,无需编译,跨平台(Windows/Linux/树莓派都能跑)
- ✅ 支持 Modbus TCP 和 RTU(串口),覆盖主流通信方式
- ✅ 内置模拟服务器功能,没有硬件也能动手实验
- ✅ 提供高级封装的同时,也允许你深入报文细节

换句话说:你可以先学会“说话”,再研究“语法结构”


先跑通一个例子:看看“请求-响应”长什么样

我们不急着讲原理,先来个“Hello World”级别的实战。

第一步:安装依赖

pip install pymodbus

建议使用较新版本(如 3.6+),API 更统一,文档更清晰。


第二步:启动一个“假设备”——模拟 Modbus 从站

想象你要连接的是一台真实设备,但现在手头啥也没有。没关系,我们可以用代码造一个!

# server.py from pymodbus.server import StartTcpServer from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext from pymodbus.datastore import ModbusSequentialDataBlock def run_server(): # 创建保持寄存器区域,预设一些测试数据 store = ModbusSlaveContext( hr=ModbusSequentialDataBlock(0, [100, 101, 102, 103] * 25) # 地址0开始,共100个寄存器 ) context = ModbusServerContext(slaves=store, single=True) print("🚀 Modbus Server 启动中... 监听 localhost:5020") StartTcpServer(context=context, address=("localhost", 5020)) if __name__ == "__main__": run_server()

这段代码做了三件事:
1. 定义了一块内存空间(4x 寄存器区),起始地址为 0,内容是[100, 101, 102, 103, ...]
2. 把这块内存挂载到一个虚拟从站上
3. 启动 TCP 服务,监听本地 5020 端口

📌 注意:这里的hr=表示 Holding Register(保持寄存器),对应 Modbus 功能码 0x03 和 0x06。

运行它,你就拥有了一个随时待命的“设备”。


第三步:作为主站发起请求,读取数据

新开一个终端,运行下面的客户端代码:

# client.py from pymodbus.client import ModbusTcpClient def read_data(): client = ModbusTcpClient('localhost', port=5020) if not client.connect(): print("❌ 连接失败,请检查服务是否已启动") return try: # 发起请求:读取地址0开始的4个保持寄存器 result = client.read_holding_registers(address=0, count=4, slave=0) if result.isError(): print(f"⚠️ 通信异常:{result}") else: print(f"✅ 成功读取数据:{result.registers}") # 应输出 [100, 101, 102, 103] finally: client.close() if __name__ == "__main__": read_data()

执行流程如下:
1. 主站建立 TCP 连接到localhost:5020
2. 构造一条“读保持寄存器”的指令,目标地址 0,数量 4
3. 从站收到后,在自己的数据区查找这四个值
4. 返回结果,主站解析并打印

一次完整的请求 → 响应流程就此完成。

💡 小技巧:如果你看到返回的是[0, 0, 0, 0],可能是服务端还没启动;如果连接超时,检查防火墙或端口占用情况。


深入一点:这条请求背后到底发生了什么?

你以为只是调了个函数?其实背后有一整套协议在运转。

Modbus TCP 报文结构揭秘

Modbus TCP 并不只是发送几个数字那么简单。每条消息都包含两个部分:MBAP 头 + PDU 体

字段长度含义
Transaction ID2 字节事务编号,用于匹配请求和响应
Protocol ID2 字节固定为 0(表示 Modbus)
Length2 字节后续字节数
Unit ID1 字节从站地址(以前叫 Slave Address)
Function Code1 字节功能码(如 0x03 表示读保持寄存器)
DataN 字节参数或实际数据

举个例子,当你调用:

client.read_holding_registers(address=0, count=4)

pymodbus 实际上会打包成这样一组字节(十六进制):

00 01 00 00 00 06 01 03 00 00 00 04 ↑↑ ↑↑ ↑↑ ↑ ↑↑ ↑↑ ↑↑ ↑↑ TID PID Len UID FC Addr Count

当服务器收到后,也会原样回传一个响应包:

00 01 00 00 00 07 01 03 08 64 00 65 00 66 00 67 00 ←----------------→ 4个寄存器数据(每个占2字节)

其中64 00是 100 的大端格式(Big Endian),这也是 Modbus 默认的字节序。

⚠️ 坑点提醒:如果你发现读出来的数值不对,比如25600而不是100,那很可能是因为字节序没处理好!有些设备用小端模式,需要手动转换。


异常处理:不是每次通信都能成功

理想情况下,请求发出就能拿到数据。但现实中,各种问题层出不穷。

pymodbus 已经帮你把常见错误封装好了。只要判断.isError()就行:

result = client.read_holding_registers(0, 4) if result.isError(): print(f"出错了:{result}")

常见的异常类型包括:

异常码含义可能原因
0x01非法功能码设备不支持该操作(例如尝试写只读寄存器)
0x02非法数据地址访问了超出范围的地址(比如只有 10 个寄存器却读第 20 个)
0x03非法数据值写入的数据超出允许范围
0x04从站故障设备内部错误(如硬件异常)

这些错误都会导致返回的功能码最高位置 1。例如正常0x03,异常就变成0x83,后面紧跟一个异常码。

所以当你看到类似<Exception Response(131, 2)>的输出时,就知道这是个功能码 0x83(即 131),异常码 2——说明地址非法。


如何避免踩坑?这些经验值得收藏

我在调试过程中总结了几条实用建议,分享给你:

✅ 1. 给连接加超时,别让程序卡死

client = ModbusTcpClient('localhost', port=5020, timeout=3)

默认可能阻塞很久,设置 3 秒足够了。

✅ 2. 用with管理连接,防止资源泄漏

with ModbusTcpClient('localhost', 5020) as client: result = client.read_holding_registers(0, 4) if not result.isError(): print(result.registers)

自动关闭连接,更安全。

✅ 3. 批量读取,减少网络开销

不要一个个寄存器去读,尽量一次性读多个:

# ❌ 不推荐 for i in range(10): read_one_register(i) # ✅ 推荐 read_holding_registers(0, 10) # 一次搞定

✅ 4. 明确四种数据区的区别

区域功能码读写性常见用途
0x 线圈(Coils)0x01 / 0x05可读写开关量控制(启停电机)
1x 离散输入0x02只读数字输入信号(限位开关)
3x 输入寄存器0x04只读模拟量采集(温度、电压)
4x 保持寄存器0x03 / 0x06可读写参数配置、状态存储

搞不清容易写错地址,一定要查设备手册确认映射关系!

✅ 5. 开启日志,看清每一帧通信

调试时加上 logging,能看到完整的收发报文:

import logging logging.basicConfig(level=logging.DEBUG) # 输出示例: # DEBUG:pymodbus.transaction:SEND: 0x0 0x1 0x0 ... # DEBUG:pymodbus.transaction:RECV: 0x0 0x1 0x0 ...

这对分析通信失败特别有帮助。


进阶思路:做一个定时轮询器,像 SCADA 一样工作

现在你知道怎么读一次数据了,那能不能让它自动定时读?

当然可以!下面是一个简易轮询器:

import time from pymodbus.client import ModbusTcpClient class ModbusPoller: def __init__(self, host, port=5020): self.client = ModbusTcpClient(host, port, timeout=3) def connect(self): return self.client.connect() def poll(self, addr, count): result = self.client.read_holding_registers(addr, count) return None if result.isError() else result.registers def run(self, interval=2): if not self.connect(): print("无法连接,请检查服务状态") return print("开始轮询...") while True: data = self.poll(0, 4) if data: timestamp = time.strftime("%H:%M:%S") print(f"[{timestamp}] 当前数据: {data}") else: print("⚠️ 读取失败,继续下一轮") time.sleep(interval) # 使用 if __name__ == "__main__": poller = ModbusPoller("localhost") poller.run()

运行效果:

[14:23:01] 当前数据: [100, 101, 102, 103] [14:23:03] 当前数据: [100, 101, 102, 103] ...

这就是一个极简版的监控系统雏形。下一步你可以把它接入数据库、绘图界面,甚至做成 Web API。


总结一下:你真正学会了什么?

通过这篇文章,你不只是学会了怎么安装pymodbus或调用read_holding_registers(),更重要的是:

🧠你理解了一个典型的工业通信流程是如何运作的:

  1. 主站构造请求(含功能码、地址、数量)
  2. 协议栈自动封装成标准 Modbus TCP 报文
  3. 通过网络发送给从站
  4. 从站解析请求,在本地数据区查找对应值
  5. 生成响应报文并返回
  6. 主站解析数据或处理异常

这套请求-响应模型,不仅是 Modbus 的核心,也是大多数工业协议的基础(如 OPC UA、MQTT 等)。

🎯 对于零基础学习者来说,“先让它跑起来”比“完全弄明白”更重要。当你亲手让一段数据从“虚拟设备”中被读出来时,那种成就感会驱使你继续探索:能不能写寄存器?能不能支持多从站?能不能异步并发?

这些问题的答案,都在pymodbus的官方文档里等着你。


下一步你可以尝试……

  • 🔁 试着修改服务器端的数据,每隔几秒自动更新寄存器值,模拟动态传感器
  • ✍️ 添加写寄存器功能(write_register/write_registers),实现远程控制
  • 🌐 把 server 部署到树莓派上,用手机或笔记本远程读取
  • 🧩 结合 Flask 做个网页仪表盘,实时显示数据
  • ⚡ 改用asyncio版本实现高并发采集(from pymodbus.async_io import AsyncModbusTcpClient

技术的成长从来不是一蹴而就。每一个复杂的系统,都是由无数个“第一次成功通信”堆叠而成的

你现在迈出的这一步,也许就是未来构建智能工厂、边缘网关、IoT 平台的第一块基石。

如果你在实践中遇到了其他问题,欢迎留言讨论,我们一起解决。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询