最近需要需要将测试结果,使用python写入PLC,尝试后代码记录如下:
1.python连接PLC与写入数据
import os import tracebackimport snap7 from snap7.util import * import time print(snap7.__file__) # 看看实际加载的是哪个文件 print(snap7.__version__) # 能输出版本号即安装成功 print("DLL 路径:", os.path.join(os.path.dirname(snap7.__file__), 'snap7.dll'))import snap7 try: # 0.11+AREA_DB = snap7.Area.DB except AttributeError: # 0.10 及更早AREA_DB = snap7.types.S7AreaDBclass PLCWriter:def __init__(self, plc_ip, rack=0, slot=1):"""初始化PLC连接器Args:plc_ip: PLC的IP地址(如:'192.168.0.1')rack: 机架号(通常为0)slot: 槽位号(西门子300系列通常为1,1500系列通常为2)"""self.plc_ip = plc_ipself.rack = rackself.slot = slotself.plc = Noneself.connected = Falsedef read_real(self, db_number, offset):"""从PLC读取Real(浮点数)值"""if not self.connected:print("未连接到PLC")return Nonetry:# 读取4字节数据data = self.plc.read_area(AREA_DB, db_number, offset, 4)# 解析为浮点数value = get_real(data, 0)return float(value)except Exception as e:print(f"读取浮点数失败: {e}")traceback.print_exc()return Nonedef read_int16(self, db_number, offset):"""从PLC读取16位整数"""if not self.connected:print("未连接到PLC")return Nonetry:# 读取2字节数据data = self.plc.read_area(AREA_DB, db_number, offset, 2)# 解析为整数value = get_int(data, 0)return int(value)except Exception as e:print("98: read 16 int failed",e)traceback.print_exc()# 新增:专门读取DB48中PFLong的值def read_PFLong_from_DB48(plc_writer, pflong_offset=8):"""读取DB48中PFLong的值参数说明:- pflong_offset: PFLong在DB48中的偏移量(字节)根据你的描述:第三行NAME,int,0,第四行PFLong,Real,0.0NAME是int类型占2字节,所以PFLong的偏移量应该是2(从0开始)但如果前面还有static或其他变量,需要调整"""# DB48号块,根据实际情况调整偏移量db_number = 48# 读取PFLong值(Real类型,占4字节)value = plc_writer.read_real(db_number, pflong_offset)if value is not None:print(f"[读取DB48] PFLong值: {value}")return valueelse:print("[读取DB48] 读取失败")return Nonedef connect(self):try:self.plc = snap7.client.Client()self.plc.set_param(snap7.types.PingTimeout, 1000)self.plc.connect(self.plc_ip, self.rack, self.slot)self.connected = Trueprint('[PLC] 已连接')return Trueexcept Exception as e:print(f'[PLC] 连接失败: {e}')return Falsedef write_int16(self, db_number, offset, value):"""写入16位整数到PLC"""if not self.connected:print("未连接到PLC")return Falsetry:# 创建2字节数据data = bytearray(2)set_int(data, 0, value)# 写入到PLC self.plc.write_area(AREA_DB, db_number, offset, data)# 可选:读取验证# read_data = self.plc.read_area(snap7.types.Areas.DB, db_number, offset, 2)# verify_value = get_int(read_data, 0)# print(f"验证写入值: {verify_value}")return Trueexcept Exception as e:print(f"写入失败: {e}")return Falsedef write_float(self, db_number, offset, value):"""写入浮点数到PLC"""if not self.connected:print("未连接到PLC")return Falsetry:# 创建4字节数据data = bytearray(4)set_real(data, 0, value)# 写入到PLCtry:self.plc.write_area(AREA_DB, db_number, offset, data)except Exception as e:traceback.print_exc()return Trueexcept Exception as e:print(f"写入浮点数失败: {e}")return Falsedef write_bool(self, db_number, byte_offset, bit_offset, value):"""写入布尔值到PLC"""if not self.connected:print("未连接到PLC")return Falsetry:# 读取当前字节current_data = self.plc.read_area(AREA_DB, db_number, byte_offset, 1)# 修改指定位 set_bool(current_data, 0, bit_offset, value)# 写回PLC self.plc.write_area(AREA_DB, db_number, byte_offset, current_data)return Trueexcept Exception as e:print(f"写入布尔值失败: {e}")return Falsedef disconnect(self):"""断开连接"""if self.connected and self.plc:self.plc.disconnect()self.connected = Falseprint("已断开PLC连接")# 你的数据生成函数(示例) def my_function():"""你的数据生成函数返回一个包含数据的字典"""# 这里是你实际的数据生成逻辑import randomreturn {'temperature': round(random.uniform(20.0, 30.0), 2), # 温度值'pressure': random.randint(100, 200), # 压力值'status': random.choice([True, False]), # 状态'speed': random.randint(500, 1500) # 速度 }def main():"""主函数:循环读取数据并写入PLC"""# ================ 配置部分(需要你修改)================PLC_IP = "192.168.0.1" # PLC的IP地址DB_NUMBER = 1 # DB块号# 数据在PLC中的存储位置(需要和PLC程序对应)ADDRESS_MAP = {'temperature': {'type': 'float', 'offset': 0}, # DB1.DBD0'pressure': {'type': 'int16', 'offset': 4}, # DB1.DBW4'status': {'type': 'bool', 'offset': 6, 'bit': 0}, # DB1.DBX6.0'speed': {'type': 'int16', 'offset': 8} # DB1.DBW8 }LOOP_INTERVAL = 1.0 # 循环间隔(秒)MAX_ITERATIONS = 10 # 最大循环次数(设为None表示无限循环)# ===================================================# 1. 创建PLC写入器print("初始化PLC写入器...")writer = PLCWriter(PLC_IP)# 2. 连接PLCif not writer.connect():print("无法连接PLC,程序退出")return# 3. 主循环print(f"\n开始数据写入循环,间隔: {LOOP_INTERVAL}秒")iteration = 0try:while True:if MAX_ITERATIONS and iteration >= MAX_ITERATIONS:print(f"达到最大循环次数: {MAX_ITERATIONS}")breakiteration += 1print(f"\n=== 第 {iteration} 次循环 ===")# 3.1 获取数据print("获取数据...")try:data = my_function()print(f"获取到数据: {data}")except Exception as e:print(f"获取数据失败: {e}")time.sleep(LOOP_INTERVAL)continue# 3.2 写入数据到PLCsuccess_count = 0total_count = len(data)for key, value in data.items():if key not in ADDRESS_MAP:print(f"警告: {key} 没有对应的PLC地址配置")continueaddr_info = ADDRESS_MAP[key]try:if addr_info['type'] == 'int16':success = writer.write_int16(DB_NUMBER, addr_info['offset'], value)elif addr_info['type'] == 'float':success = writer.write_float(DB_NUMBER, addr_info['offset'], value)elif addr_info['type'] == 'bool':success = writer.write_bool(DB_NUMBER, addr_info['offset'], addr_info.get('bit', 0), value)else:print(f"未知数据类型: {addr_info['type']}")continueif success:success_count += 1print(f" ✓ {key}: {value} → PLC成功")else:print(f" ✗ {key}: {value} → PLC失败")except Exception as e:print(f" ✗ {key}写入异常: {e}")# 3.3 统计结果print(f"写入结果: {success_count}/{total_count} 成功")# 3.4 等待下一次循环print(f"等待 {LOOP_INTERVAL} 秒...")time.sleep(LOOP_INTERVAL)except KeyboardInterrupt:print("\n用户中断程序")except Exception as e:print(f"程序异常: {e}")finally:# 4. 断开连接 writer.disconnect()print("程序结束")if __name__ == "__main__":# 测试用:简化版配置检查print("=== PLC数据写入程序 ===")# 快速连接测试test_writer = PLCWriter("192.168.1.110",rack=0, slot=1)if test_writer.connect():print("连接测试成功!")# test_writer.disconnect()db_number=34offset=4test_writer.write_float(db_number, offset, 1.23) # else:print("连接测试失败,请检查配置")
PLCWriter("192.168.1.110",rack=0, slot=1)中的参数分别是PLC的IP地址,机架号,曹号。
test_writer.write_float(db_number, offset, 1.23)表明将数据写入DB34,偏移量为4的地址,写入数据为浮点数1.23
2.测试读取是否受保护
当读取或者写入失败后,考虑是否PLC工程师将那边DB块保护起来了。
import snap7from PLC_deal_test import PLCWriterdef diagnose_protection_issues():"""诊断PLC保护设置"""writer = PLCWriter("192.168.1.110", rack=0, slot=1)if not writer.connect():returntry:print("=== PLC保护诊断 ===")# 1. 获取CPU信息print("\n1. CPU基本信息:")try:cpu_info = writer.plc.get_cpu_info()print(f" 模块类型: {cpu_info.ModuleTypeName.decode('utf-8', errors='ignore')}")print(f" 序列号: {cpu_info.SerialNumber.decode('utf-8', errors='ignore')}")print(f" AS名称: {cpu_info.ASName.decode('utf-8', errors='ignore')}")print(f" 版权: {cpu_info.Copyright.decode('utf-8', errors='ignore')}")print(f" 模块名称: {cpu_info.ModuleName.decode('utf-8', errors='ignore')}")except Exception as e:print(f" 获取CPU信息失败: {e}")# 2. 检查CPU状态print("\n2. CPU状态检查:")try:cpu_state = writer.plc.get_cpu_state()print(f" CPU状态: {cpu_state}")# 检查是否在RUN模式if cpu_state == 'S7CpuStatusRun':print(" ✓ CPU在RUN模式")else:print(f" ⚠ CPU在 {cpu_state} 模式,可能影响访问")except Exception as e:print(f" 获取CPU状态失败: {e}")# 3. 检查保护级别print("\n3. 保护级别检查:")try:# 尝试读取系统状态列表(需要密码时可能失败)ssl_data = writer.plc.read_szl(0x0111, 0x0001) # CPU特性print(f" 可以读取系统状态列表")except snap7.exceptions.Snap7Exception as e:if "CPU" in str(e) and "denied" in str(e).lower():print(f" ✗ 访问被拒绝 - 可能需要密码")else:print(f" 读取SSL失败: {e}")# 4. 测试不同区域的访问print("\n4. 内存区域访问测试:")test_areas = [("Merker (M)", snap7.types.S7AreaMK, 0, 10), # M区("Input (I)", snap7.types.S7AreaPE, 0, 10), # 输入("Output (Q)", snap7.types.S7AreaPA, 0, 10), # 输出 ]for area_name, area_code, offset, size in test_areas:try:data = writer.plc.read_area(area_code, 0, offset, size)print(f" ✓ {area_name}区可访问")except snap7.exceptions.Snap7Exception as e:print(f" ✗ {area_name}区访问失败: {e}")# 5. 测试其他DB块的访问print("\n5. DB块访问测试:")# 首先获取所有DB块try:db_list = writer.plc.list_blocks_of_type('DB', 50)print(f" 找到 {len(db_list)} 个DB块")# 测试几个常见的DB块test_db_numbers = []if db_list:# 取前5个DB块测试test_db_numbers = db_list[:5]else:# 如果没有获取到列表,测试常见DB号test_db_numbers = [1, 2, 3, 100, 200]for db_num in test_db_numbers:try:# 尝试读取1字节data = writer.plc.db_read(db_num, 0, 1)print(f" ✓ DB{db_num} 可访问")# 如果可以访问,尝试读取更多信息try:# 尝试读取DB长度(某些PLC支持)length = writer.plc.get_db_length(db_num)print(f" DB{db_num} 大小: {length} 字节")except:# 如果不能获取长度,尝试估计大小for test_size in [10, 50, 100]:try:data = writer.plc.db_read(db_num, 0, test_size)if len(data) == test_size:print(f" 至少 {test_size} 字节")except:breakexcept snap7.exceptions.Snap7Exception as e:error_msg = str(e)if "address out of range" in error_msg:print(f" ✗ DB{db_num} 不存在或大小为零")elif "access denied" in error_msg.lower() or "protection" in error_msg.lower():print(f" ✗ DB{db_num} 访问被拒绝(保护)")else:print(f" ✗ DB{db_num} 访问失败: {e}")except Exception as e:print(f" ✗ DB{db_num} 测试异常: {e}")except Exception as e:print(f" 获取DB列表失败: {e}")# 6. 专门测试DB48print("\n6. DB48专门测试:")if 48 in db_list:print(" DB48在列表中,尝试不同访问方式...")# 测试不同大小的读取read_success = Falsefor read_size in [1, 2, 4, 10]:try:data = writer.plc.db_read(48, 0, read_size)print(f" ✓ 可读取 {read_size} 字节")print(f" 数据: {data.hex()}")read_success = True# 如果成功,尝试解析if read_size >= 4:try:# 尝试作为Real解析(偏移量0)import structreal_value = struct.unpack('>f', data[:4])[0]print(f" 作为Real(0): {real_value}")except:passif read_size >= 6:try:# 尝试作为Real解析(偏移量2)real_value = struct.unpack('>f', data[2:6])[0]print(f" 作为Real(2): {real_value}")except:passexcept snap7.exceptions.Snap7Exception as e:if read_size == 1:print(f" ✗ 连1字节都无法读取: {e}")print(f" → DB48可能被完全保护或大小为0")breakif not read_success:print(" DB48存在但无法读取任何数据 → 很可能受保护")else:print(" DB48不在DB列表中")except Exception as e:print(f"诊断过程中出错: {e}")import tracebacktraceback.print_exc()finally:writer.disconnect()print("\n=== 诊断结束 ===")# 尝试写入DB48来测试是否写保护 def test_db_protection():writer = PLCWriter("192.168.1.110", rack=0, slot=1)writer.connect()try:# 测试1: 尝试写入# test_data = bytearray([0x00, 0x01])# try:# writer.plc.db_write(48, 0, test_data)# print("✓ 可以写入DB48 → 无保护或只读保护")# except snap7.exceptions.Snap7Exception as e:# if "access denied" in str(e).lower():# print("✗ 写入被拒绝 → 写保护或完全保护")# 测试2: 尝试读取try:data = writer.plc.db_read(48, 0, 2)print("✓ 可以读取DB48 → 至少可以读取")except:print("✗ 读取也被拒绝 → 完全保护")finally:writer.disconnect()if __name__ == '__main__':test_db_protection()
小结:以上程序现场测试过没有问题,如果报错,考虑snap7.dll是否存在。要记得安装python-snap7,版本与PLC型号也可能造成程序运行出问题。