pyads库提供了强大的功能,允许Python程序与倍福(Beckhoff)PLC进行ADS通信。其中,ADS通知(Notification)机制是实现实时数据采集的关键。通过注册通知,PLC变量的值一旦发生变化,pyads就会立即触发相应的回调函数,从而避免了轮询(Polling)带来的延迟和资源浪费。
然而,在处理高频、大数据量的PLC变量通知时,开发者常会遇到一些挑战:
解决这些问题的核心在于采用基于类的设计模式,并结合高效的数据解析策略。
为了避免全局变量并更好地管理状态,推荐将pyads连接、通知注册以及回调函数封装在一个类中。这种方法提供了更好的封装性、模块化和可维护性。
核心思想:
示例:基础通知管理类
以下是一个简单的示例,展示如何在一个类中管理pyads连接和注册通知,以监控一个PLC心跳变量:
import pyads import ctypes import time # 假设PLC连接参数和变量名 PLC_AMS_NET_ID = '192.168.1.100.1.1' # 替换为你的PLC AMS Net ID PLC_AMS_PORT = 851 # 替换为你的PLC AMS Port VAR_NAME_HEARTBEAT = 'GVL.heartbeat' # 假设PLC中有一个布尔型心跳变量 class PlcDataManager: def __init__(self, plc_ams_net_id, plc_ams_port): """ 初始化PLC连接和数据管理器。 """ self.plc = pyads.Connection(plc_ams_net_id, plc_ams_port) self.heartbeat_count = 0 self.data_buffer = [] # 用于累积数据的缓冲区 self._is_connected = False try: self.plc.open() self._is_connected = True print(f"成功连接到PLC: {plc_ams_net_id}:{plc_ams_port}") self.setup_notifications() except pyads.ADSError as e: print(f"连接PLC失败或设置通知失败: {e}") self.plc = None # 确保连接失败时plc对象为None def setup_notifications(self): """ 设置所有ADS通知。 """ if not self._is_connected: print("PLC未连接,无法设置通知。") return # 注册心跳变量的通知 # 注意:这里不能使用 @plc.notification 装饰器,而是使用 add_device_notification 方法 self.plc.add_device_notification( VAR_NAME_HEARTBEAT, pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_BOOL)), self.on_plc_heartbeat, # 将类的成员方法作为回调函数 ) print(f"已注册通知: {VAR_NAME_HEARTBEAT}") # 示例:注册一个结构体变量的通知 (与原始问题类似) # 假设PLC中有一个名为 'global.sample_structure' 的结构体 # structure_def = ( # ('nVar', pyads.PLCTYPE_DINT, 1000), # ('nVar2', pyads.PLCTYPE_DINT, 1000), # ) # size_of_struct = pyads.size_of_structure(structure_def) # self.plc.add_device_notification( # 'global.sample_structure', # pyads.NotificationAttrib(size_of_struct), # lambda handle, name, timestamp, value: self.on_sample_structure_data(handle, name, timestamp, value, structure_def) # ) # print(f"已注册通知: global.sample_structure") def on_plc_heartbeat(self, handle, name, timestamp, value): """ 心跳变量通知回调函数。 """ # value 是 ctypes.c_ubyte 数组,需要解析 # 对于布尔类型,通常是单个字节 bool_value = bool(value[0]) self.heartbeat_count += 1 print(f"[{time.time():.3f}] 心跳计数: {self.heartbeat_count}, 值: {bool_value}") # 在这里可以进行数据处理或累积 self.data_buffer.append(bool_value) # def on_sample_structure_data(self, handle, name, timestamp, value, structure_def): # """ # 结构体数据通知回调函数。 # """ # # 将字节数据解析为Python字典 # values = pyads.dict_from_bytes(value, structure_def) # print(f"[{time.time():.3f}] 接收到结构体数据: {name}, 值: {values}") # # 将数据添加到类的缓冲区 # self.data_buffer.extend(values['nVar']) # 假设只关心nVar def close(self): """ 关闭PLC连接。 """ if self._is_connected and self.plc: self.plc.close() print("PLC连接已关闭。") self._is_connected = False # 示例使用 if __name__ == "__main__": # 替换为你的PLC连接参数 manager = PlcDataManager(PLC_AMS_NET_ID, PLC_AMS_PORT) if manager._is_connected: try: # 保持主线程运行,等待通知回调 # pyads.Connection.run() 会阻塞并处理所有通知 # 或者在一个循环中执行其他任务并等待通知 print("\n等待PLC通知 (按 Ctrl+C 退出)...") pyads.Connection.run() # 这会阻塞并处理所有已注册的通知 except KeyboardInterrupt: print("\n程序终止。") finally: manager.close() print(f"总心跳次数: {manager.heartbeat_count}") # print(f"累积数据量: {len(manager.data_buffer)}")
注意事项:
除了用户自定义的变量,pyads还允许监听PLC的内部状态变化,例如PLC从运行模式(Run Mode)切换到配置模式(Config Mode)。这对于诊断通信问题和确保系统稳定性至关重要。
PLC状态通知通过特定的ADS组(Group)和偏移量(Offset)来注册:
以下是如何在类中实现PLC状态变化的监控:
# ... PlcDataManager 类定义不变 ... class PlcDataManager: # ... __init__ 和 setup_notifications 方法的其余部分 ... def setup_notifications(self): # ... 现有通知设置 ... # 注册PLC状态变化的通知 self.plc.add_device_notification( (pyads.ADSIGRP_DEVICE_DATA, pyads.ADSIOFFS_DEVDATA_ADSSTATE), # ADS组和偏移量 pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT)), # PLC状态通常是INT类型 self._on_plc_status_change, ) print("已注册PLC状态变化通知。") def _on_plc_status_change(self, notification, _): """ PLC状态变化通知回调函数。 当PLC状态(如从运行到配置)改变时触发。 """ # 使用 parse_notification 解析通知数据 # parse_notification 会返回 handle, name, timestamp, value # 我们只关心 value *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT) print(f"[{time.time():.3f}] PLC状态改变为: {value} ({pyads.ADSSTATE_NAMES.get(value, '未知状态')})") if value != pyads.ADSSTATE_RUN: print("警告: PLC已退出运行模式!") # 在这里可以触发错误处理、日志记录或系统报警 self._plc_fatal_com_error_message = "PLC exited run mode"
对于原始问题中提到的,每个周期需要处理1000个值,总计累积100,000个值的场景,默认的pyads.dict_from_bytes方法可能会因为其逐个变量的解析机制而效率不高。为了提高性能,尤其是在数据类型统一的情况下,推荐以下优化策略:
示例:高效解析结构体字节数据
假设你有一个包含1000个DINT类型变量的结构体:
import numpy as np # ... PlcDataManager 类定义 ... class PlcDataManager: # ... 其他方法 ... # 假设你的结构体定义如下 STRUCTURE_DEF_LARGE_ARRAY = ( ('data_array', pyads.PLCTYPE_DINT, 1000), # 1000个DINT # ... 其他变量 ... ) SIZE_OF_LARGE_STRUCT = pyads.size_of_structure(STRUCTURE_DEF_LARGE_ARRAY) def setup_notifications(self): # ... 现有通知设置 ... # 注册大型结构体通知 self.plc.add_device_notification( 'global.large_data_structure', # 假设PLC中这个结构体的名称 pyads.NotificationAttrib(self.SIZE_OF_LARGE_STRUCT), self.on_large_data_notification, ) print("已注册大型数据结构体通知。") def on_large_data_notification(self, handle, name, timestamp, value): """ 大型数据结构体通知回调函数,使用NumPy优化解析。 """ # value 是 ctypes.c_ubyte * SIZE_OF_LARGE_STRUCT 类型 # 可以直接将其转换为bytes byte_data = bytes(value) # 假设结构体中只有一个名为 'data_array' 的DINT数组 # DINT是32位整数 (4字节) dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT) num_elements = 1000 expected_bytes = dint_size * num_elements if len(byte_data) >= expected_bytes: # 使用np.frombuffer直接从字节缓冲区创建NumPy数组 # dtype='<i4' 表示小端32位有符号整数 (DINT) # 如果是无符号DINT (UDINT),则使用 '<u4' # offset 参数用于跳过结构体中其他变量的字节 # 这里假设 'data_array' 是结构体的第一个成员 np_array = np.frombuffer(byte_data, dtype='<i4', count=num_elements, offset=0) # 可以在这里对 np_array 进行进一步处理或累积 self.data_buffer.extend(np_array.tolist()) # 将NumPy数组转换为Python列表并累积 print(f"[{time.time():.3f}] 接收到大型数据: {name}, 数组前5个值: {np_array[:5]}, 累积数据量: {len(self.data_buffer)}") else: print(f"[{time.time():.3f}] 警告: 接收到的字节数据长度不匹配预期。")
NumPy解析注意事项:
通过采用基于类的设计模式,我们能够以更清晰、更可维护的方式管理pyads的PLC通信和通知回调。这种方法有效地解决了全局变量的依赖问题,并使得数据在类实例内部得以安全地累积和处理。
对于高吞吐量的数据采集场景,仅仅依赖回调机制是不够的,还需要关注数据解析的效率。利用NumPy等科学计算库直接处理从pyads通知中获取的原始字节数据,可以显著提升数据转换和处理的性能。
最佳实践建议:
遵循这些原则,开发者可以构建出健壮、高效且易于维护的pyads应用程序,以满足严苛的工业数据采集需求。
以上就是pyads通知机制的高效数据处理:基于类的设计与优化实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号