用Python打通工业现场:手把手教你用pymodbus实时读取PLC线圈状态
在一条自动化产线上,设备是否运行、气缸有没有动作、报警灯亮没亮——这些看似简单的“是/否”问题,背后都依赖着一个关键环节:上位机如何准确获取PLC的实时状态?
传统方式往往依赖HMI面板或商业组态软件,但它们不够灵活、成本高、难以集成。而今天,我们换一种思路:用几行Python代码,直接与PLC对话。
本文将带你从零开始,使用纯Python实现的pymodbus库,通过Modbus TCP协议连接真实PLC(或模拟器),读取数字输出点(即“线圈”)的状态。整个过程无需专用开发环境,不依赖昂贵软件,适合树莓派、边缘网关甚至笔记本电脑部署。
你将获得一个可复用的数据采集模块,为后续构建监控系统、报警推送、数据记录打下坚实基础。
为什么选择 pymodbus?
在工业通信领域,Modbus 是当之无愧的“老前辈”。它简单、开放、兼容性极强,几乎每台现代PLC都原生支持。而pymodbus正是让Python成为Modbus主站(Master)的那把钥匙。
它不是某个大厂闭源工具的附属品,而是一个由社区驱动的开源项目(MIT许可),完全用Python编写,安装只需一行命令:
pip install pymodbus更棒的是,它同时支持Modbus TCP(以太网)和Modbus RTU(串口),无论是西门子、三菱、欧姆龙还是国产PLC,只要启用了Modbus服务,就能连。
它到底能做什么?
- ✅ 作为客户端主动读写PLC数据
- ✅ 模拟PLC响应请求(用于测试)
- ✅ 支持功能码0x01~0x17,覆盖所有常见操作
- ✅ 内置日志系统,可以打印原始报文,调试超方便
- ✅ 提供同步和异步(asyncio)版本,适应不同场景
换句话说,你想知道PLC里某个继电器有没有吸合?想远程查看指示灯状态?甚至想做个轻量级SCADA前端?pymodbus都能帮你搞定。
线圈是什么?为什么先读它?
在线圈(Coil)是Modbus中最基础的数据类型之一,对应PLC的数字输出点(DO),比如控制电机启停的继电器、点亮警示灯的开关信号等。
每个线圈就是一个布尔值:
-True→ ON / 置位(Set)
-False→ OFF / 复位(Reset)
它的地址空间独立于其他变量,通常标记为Q0.0,Q0.1… 在Modbus中统一编号为0x0000 起始的连续地址区。
我们要做的第一件事,就是向PLC发送一个“读线圈”指令(功能码0x01),告诉它:“请把从地址X开始的N个线圈状态发给我”。
这个操作之所以适合作为入门实践,是因为:
- 数据结构最简单(只有true/false)
- 不涉及写操作,安全性高
- 报文短小,失败也容易排查
- 几乎所有PLC默认开启该功能
协议细节不用怕,我来拆给你看
很多人被“协议”两个字吓退,其实 Modbus TCP 的通信格式非常清晰。我们来看一次典型的“读8个线圈”的请求和响应。
请求报文长什么样?
当你的Python程序调用read_coils(0, 8)时,pymodbus会自动生成如下字节流:
| 字段 | 值 | 说明 |
|---|---|---|
| Transaction ID | 0x0001 | 事务ID,用于匹配请求与响应 |
| Protocol ID | 0x0000 | 固定为0,表示Modbus协议 |
| Length | 0x0006 | 后续数据长度(6字节) |
| Unit ID | 0x01 | 目标PLC的从站地址(Slave ID) |
| Function Code | 0x01 | 功能码:读线圈 |
| Start Address | 0x0000 | 起始地址(第0个线圈) |
| Quantity | 0x0008 | 读取数量(8个) |
总共12字节,通过Socket直接发往PLC的502端口。
PLC怎么回应?
如果一切正常,PLC返回如下响应:
| 字段 | 值 | 说明 |
|---|---|---|
| Transaction ID | 0x0001 | 保持一致 |
| Protocol ID | 0x0000 | 同上 |
| Length | 0x0003 | 后续3字节 |
| Unit ID | 0x01 | 从站地址 |
| Function Code | 0x01 | 功能码回显 |
| Byte Count | 0x01 | 表示接下来有1个字节的数据 |
| Coil Status Data | 0xAA | 实际线圈状态(10101010b) |
这里的0xAA就是重点!转换成二进制是10101010,每一位代表一个线圈状态:
bit7 bit6 bit5 bit4 bit3 bit2 bit1 bit0 1 0 1 0 1 0 1 0所以第0、2、4、6位为ON,其余为OFF。
⚠️ 注意:pymodbus 在接收到这个字节后会自动解析成一个布尔列表
response.bits,开发者无需手动位运算。
手把手写代码:连接PLC并读取线圈
下面这段代码,就是你通往工业通信世界的第一扇门。
from pymodbus.client import ModbusTcpClient import logging # 启用详细日志输出(调试时很有用) logging.basicConfig() log = logging.getLogger() log.setLevel(logging.INFO) # 可改为DEBUG查看完整报文 # ========== 配置参数 ========== PLC_IP = "192.168.1.100" # 替换为你的PLC实际IP PORT = 502 # Modbus标准端口 UNIT_ID = 1 # PLC的从站地址(Slave ID) COIL_START_ADDR = 0 # 起始线圈地址(如Q0.0) COIL_COUNT = 8 # 读取数量(最多2000个) def read_coils(): # 创建TCP客户端 client = ModbusTcpClient(host=PLC_IP, port=PORT, timeout=2) try: # 尝试建立连接 if not client.connect(): print("❌ 无法连接到PLC,请检查网络、IP或防火墙设置") return print(f"✅ 成功连接到PLC {PLC_IP}:{PORT}") # 发起读线圈请求(功能码0x01) response = client.read_coils( address=COIL_START_ADDR, count=COIL_COUNT, slave=UNIT_ID ) # 判断是否有错误 if response.is_error(): print(f"⚠️ Modbus错误响应: {response}") return # 提取线圈状态(返回的是位列表) coil_states = response.bits[:COIL_COUNT] # 截取所需长度 print("🔍 当前8个线圈状态:") for i, state in enumerate(coil_states): addr = COIL_START_ADDR + i print(f" Q{addr // 8}.{addr % 8} (地址{addr}): {'ON' if state else 'OFF'}") except Exception as e: print(f"🚨 程序异常: {e}") finally: # 无论成功与否,关闭连接 client.close() print("🔌 连接已关闭") if __name__ == "__main__": read_coils()关键点解读:
ModbusTcpClient:这是pymodbus提供的同步客户端类,适合脚本化任务。timeout=2:设置2秒超时,避免因网络问题导致程序卡死。slave=UNIT_ID:非常重要!很多初学者忘记设置这个参数,结果收不到响应。response.bits:核心数据字段,返回一个bool类型的列表,索引0对应起始地址。- 异常处理与资源释放:确保即使出错也能安全断开连接。
实战常见坑点与避坑指南
别急着跑代码,先看看别人踩过的坑,你能省下至少半天时间。
❌ 坑1:地址对不上,明明Q0.0亮了却读不出来
原因:有些PLC厂商(尤其是日系品牌)在文档中标注“地址从1开始”,但pymodbus默认从0开始。
例如:
- PLC手册说“Q0.0 对应 Modbus地址 1”
- 你在代码中就必须写address=0(因为是从0计数)
👉解决方法:查阅PLC手册确认偏移规则,必要时做-1处理。
❌ 坑2:连接失败,提示“Connection refused”
可能原因:
- PLC未启用Modbus TCP服务
- IP地址填错或不在同一子网
- 防火墙阻止了502端口
- 使用了非标准端口(某些PLC可改)
👉排查步骤:
1. 用ping测试PLC是否可达;
2. 用telnet 192.168.1.100 502测试端口是否开放;
3. 登录PLC配置界面,确认Modbus已启用且监听正确IP。
❌ 坑3:偶尔读到乱码或超时
原因:轮询频率太高,PLC来不及响应。
👉建议:两次读取之间加延时,特别是多任务轮询时:
import time time.sleep(0.3) # 至少200ms以上✅ 秘籍:开启调试日志,看清每一帧报文
当你怀疑通信有问题时,把日志级别调到DEBUG:
log.setLevel(logging.DEBUG)你会看到类似这样的输出:
DEBUG:pymodbus.transaction:Current transaction state - IDLE DEBUG:pymodbus.transaction:Running transaction 1 DEBUG:pymodbus.client.sync:New Transaction state 'SENDING' DEBUG:pymodbus.transport:send: 0x1 0x0 0x0 0x0 0x0 0x6 0x1 0x1 0x0 0x0 0x0 0x8最后那一串十六进制就是原始请求报文,可以直接拿去和Wireshark抓包对比,定位问题快准狠。
更进一步:不只是“读一下”
你现在拥有的不仅是一段脚本,而是一个可扩展的通信引擎。以下是几个实用升级方向:
🔁 自动轮询 + 状态变化检测
import time last_state = None while True: response = client.read_coils(0, 8) if not response.is_error(): current = response.bits[:8] if current != last_state: print("💡 状态发生变化:", current) last_state = current time.sleep(0.5)可用于触发报警、记录事件时间戳。
💾 存入数据库或发送到Web API
import json import requests data = { "timestamp": time.time(), "coil_states": [bool(x) for x in coil_states] } requests.post("http://your-server/api/coils", json=data)轻松接入自己的监控平台。
🔄 多PLC轮询(工厂级应用)
plcs = [ {"ip": "192.168.1.100", "unit_id": 1}, {"ip": "192.168.1.101", "unit_id": 2}, ] for plc in plcs: client = ModbusTcpClient(plc["ip"]) if client.connect(): res = client.read_coils(0, 8, slave=plc["unit_id"]) # 处理结果... client.close()适用于多台设备集中监控。
总结:这不仅仅是个读状态的小脚本
你刚刚完成的,其实是打通IT与OT层的关键一步。
过去,工业控制属于“黑箱”,数据锁在PLC里,只能靠工程师现场查;而现在,借助像pymodbus这样的工具,你可以用最通用的语言(Python)去触达最底层的设备信号。
这段代码虽短,但它意味着:
- 你可以构建专属的轻量监控系统;
- 可以把PLC数据接入MySQL、InfluxDB、Grafana;
- 可以为老旧设备添加远程诊断能力;
- 甚至可以用树莓派+Python做一个低成本边缘网关。
更重要的是,你掌握了理解工业通信的本质逻辑——请求、响应、解析、处理。下一步无论是写寄存器、读模拟量、还是转用异步模式提升性能,都不再是难题。
如果你正在做智能制造、设备联网、工业物联网相关项目,不妨试试把这个小模块嵌入进去。也许下一次汇报时,你就可以自豪地说:
“我们的系统已经实现了对产线PLC的实时感知。”
而这一切,始于一段不到50行的Python代码。
💬欢迎在评论区分享你的应用场景或遇到的问题,我们一起讨论如何让工业通信变得更简单。