pyads通知机制的高效数据处理:基于类的设计与优化实践

DDD
发布: 2025-08-11 18:22:02
原创
551人浏览过

pyads通知机制的高效数据处理:基于类的设计与优化实践

本文深入探讨了如何利用pyads库高效处理PLC实时数据通知。针对高并发、大数据量场景,我们提出并详细阐述了基于类封装的解决方案,以避免全局变量,实现更清晰的状态管理和数据积累。同时,文章还介绍了优化数据解析性能的关键技巧,包括利用原始字节数据与NumPy进行批量处理,旨在帮助开发者构建健壮、高性能的PLC数据采集系统。

pyads通知机制与挑战

pyads库提供了强大的功能,允许Python程序与倍福(Beckhoff)PLC进行ADS通信。其中,ADS通知(Notification)机制是实现实时数据采集的关键。通过注册通知,PLC变量的值一旦发生变化,pyads就会立即触发相应的回调函数,从而避免了轮询(Polling)带来的延迟和资源浪费。

然而,在处理高频、大数据量的PLC变量通知时,开发者常会遇到一些挑战:

  1. 数据积累与状态管理: 当需要将每次通知接收到的数据片段组合成更大的数据集(例如,将每次1000个值累积到100,000个值的数组)时,如何有效地管理这些数据,避免使用全局变量成为一个痛点。
  2. Pythonic设计: 如何以一种结构清晰、易于维护的Pythonic方式来组织代码,尤其是当pyads的@plc.notification装饰器无法直接应用于类内部方法时。
  3. 性能瓶颈: 对于包含大量变量的结构体,默认的字节到Python类型转换(如pyads.dict_from_bytes)可能会成为性能瓶颈,尤其是在200ms甚至更短的周期内处理15个信号、每个信号包含1000个值的情况下。

解决这些问题的核心在于采用基于类的设计模式,并结合高效的数据解析策略。

基于类的pyads通知管理

为了避免全局变量并更好地管理状态,推荐将pyads连接、通知注册以及回调函数封装在一个类中。这种方法提供了更好的封装性、模块化和可维护性。

核心思想:

  • 在类的构造函数中初始化pyads连接。
  • 定义一个方法来设置所有必要的ADS通知,并将其回调函数指向类的内部方法。
  • 回调方法作为类的成员,可以直接访问和修改类的实例变量,从而实现数据的累积和状态的更新,而无需依赖全局变量。

示例:基础通知管理类

以下是一个简单的示例,展示如何在一个类中管理pyads连接和注册通知,以监控一个PLC心跳变量:

import pyads
import ctypes
import time

# 假设PLC连接参数和变量名
PLC_AMS_NET_ID = '192.168.1.100.1.1' # 替换为你的PLC AMS Net ID
PLC_AMS_PORT = 851 # 替换为你的PLC AMS Port
VAR_NAME_HEARTBEAT = 'GVL.heartbeat' # 假设PLC中有一个布尔型心跳变量

class PlcDataManager:
    def __init__(self, plc_ams_net_id, plc_ams_port):
        """
        初始化PLC连接和数据管理器。
        """
        self.plc = pyads.Connection(plc_ams_net_id, plc_ams_port)
        self.heartbeat_count = 0
        self.data_buffer = [] # 用于累积数据的缓冲区
        self._is_connected = False

        try:
            self.plc.open()
            self._is_connected = True
            print(f"成功连接到PLC: {plc_ams_net_id}:{plc_ams_port}")
            self.setup_notifications()
        except pyads.ADSError as e:
            print(f"连接PLC失败或设置通知失败: {e}")
            self.plc = None # 确保连接失败时plc对象为None

    def setup_notifications(self):
        """
        设置所有ADS通知。
        """
        if not self._is_connected:
            print("PLC未连接,无法设置通知。")
            return

        # 注册心跳变量的通知
        # 注意:这里不能使用 @plc.notification 装饰器,而是使用 add_device_notification 方法
        self.plc.add_device_notification(
            VAR_NAME_HEARTBEAT,
            pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_BOOL)),
            self.on_plc_heartbeat, # 将类的成员方法作为回调函数
        )
        print(f"已注册通知: {VAR_NAME_HEARTBEAT}")

        # 示例:注册一个结构体变量的通知 (与原始问题类似)
        # 假设PLC中有一个名为 'global.sample_structure' 的结构体
        # structure_def = (
        #     ('nVar', pyads.PLCTYPE_DINT, 1000),
        #     ('nVar2', pyads.PLCTYPE_DINT, 1000),
        # )
        # size_of_struct = pyads.size_of_structure(structure_def)
        # self.plc.add_device_notification(
        #     'global.sample_structure',
        #     pyads.NotificationAttrib(size_of_struct),
        #     lambda handle, name, timestamp, value: self.on_sample_structure_data(handle, name, timestamp, value, structure_def)
        # )
        # print(f"已注册通知: global.sample_structure")

    def on_plc_heartbeat(self, handle, name, timestamp, value):
        """
        心跳变量通知回调函数。
        """
        # value 是 ctypes.c_ubyte 数组,需要解析
        # 对于布尔类型,通常是单个字节
        bool_value = bool(value[0])
        self.heartbeat_count += 1
        print(f"[{time.time():.3f}] 心跳计数: {self.heartbeat_count}, 值: {bool_value}")
        # 在这里可以进行数据处理或累积
        self.data_buffer.append(bool_value)

    # def on_sample_structure_data(self, handle, name, timestamp, value, structure_def):
    #     """
    #     结构体数据通知回调函数。
    #     """
    #     # 将字节数据解析为Python字典
    #     values = pyads.dict_from_bytes(value, structure_def)
    #     print(f"[{time.time():.3f}] 接收到结构体数据: {name}, 值: {values}")
    #     # 将数据添加到类的缓冲区
    #     self.data_buffer.extend(values['nVar']) # 假设只关心nVar

    def close(self):
        """
        关闭PLC连接。
        """
        if self._is_connected and self.plc:
            self.plc.close()
            print("PLC连接已关闭。")
            self._is_connected = False

# 示例使用
if __name__ == "__main__":
    # 替换为你的PLC连接参数
    manager = PlcDataManager(PLC_AMS_NET_ID, PLC_AMS_PORT)

    if manager._is_connected:
        try:
            # 保持主线程运行,等待通知回调
            # pyads.Connection.run() 会阻塞并处理所有通知
            # 或者在一个循环中执行其他任务并等待通知
            print("\n等待PLC通知 (按 Ctrl+C 退出)...")
            pyads.Connection.run() # 这会阻塞并处理所有已注册的通知

        except KeyboardInterrupt:
            print("\n程序终止。")
        finally:
            manager.close()
            print(f"总心跳次数: {manager.heartbeat_count}")
            # print(f"累积数据量: {len(manager.data_buffer)}")
登录后复制

注意事项:

  • pyads.Connection.run() 方法会阻塞当前线程,持续监听并处理ADS通知。在实际应用中,你可能需要将其放在单独的线程中,或者在主循环中结合其他任务。
  • 回调函数中的value参数通常是一个ctypes.c_ubyte数组,需要根据PLC变量的类型进行适当的解析。

高级应用:监控PLC状态变化

除了用户自定义的变量,pyads还允许监听PLC的内部状态变化,例如PLC从运行模式(Run Mode)切换到配置模式(Config Mode)。这对于诊断通信问题和确保系统稳定性至关重要。

PLC状态通知通过特定的ADS组(Group)和偏移量(Offset)来注册:

  • ADSIGRP_DEVICE_DATA (0xF100): 设备数据组。
  • ADSIOFFS_DEVDATA_ADSSTATE (0x0000): ADS状态偏移量。

以下是如何在类中实现PLC状态变化的监控:

# ... PlcDataManager 类定义不变 ...

class PlcDataManager:
    # ... __init__ 和 setup_notifications 方法的其余部分 ...

    def setup_notifications(self):
        # ... 现有通知设置 ...

        # 注册PLC状态变化的通知
        self.plc.add_device_notification(
            (pyads.ADSIGRP_DEVICE_DATA, pyads.ADSIOFFS_DEVDATA_ADSSTATE), # ADS组和偏移量
            pyads.NotificationAttrib(ctypes.sizeof(pyads.PLCTYPE_INT)), # PLC状态通常是INT类型
            self._on_plc_status_change,
        )
        print("已注册PLC状态变化通知。")

    def _on_plc_status_change(self, notification, _):
        """
        PLC状态变化通知回调函数。
        当PLC状态(如从运行到配置)改变时触发。
        """
        # 使用 parse_notification 解析通知数据
        # parse_notification 会返回 handle, name, timestamp, value
        # 我们只关心 value
        *_, value = self.plc.parse_notification(notification, pyads.PLCTYPE_INT)

        print(f"[{time.time():.3f}] PLC状态改变为: {value} ({pyads.ADSSTATE_NAMES.get(value, '未知状态')})")

        if value != pyads.ADSSTATE_RUN:
            print("警告: PLC已退出运行模式!")
            # 在这里可以触发错误处理、日志记录或系统报警
            self._plc_fatal_com_error_message = "PLC exited run mode"
登录后复制

优化大数据量通知数据处理

对于原始问题中提到的,每个周期需要处理1000个值,总计累积100,000个值的场景,默认的pyads.dict_from_bytes方法可能会因为其逐个变量的解析机制而效率不高。为了提高性能,尤其是在数据类型统一的情况下,推荐以下优化策略:

  1. 获取原始字节数据: 在add_device_notification中,回调函数接收到的value参数就是原始的字节数据(ctypes.c_ubyte数组)。
  2. 利用read_by_name与return_ctypes=True: 尽管这里是通知回调,但了解pyads的read_by_name(return_ctypes=True)是一个重要的性能优化方向。它允许你直接获取ctypes对象,而不是Python基本类型,从而避免了pyads内部的类型转换开销。对于通知,你已经获得了原始字节数据,所以重点在于如何高效地解析它。
  3. 使用NumPy进行批量转换: 如果你的数据都是相同类型(例如,所有1000个值都是DINT),那么将原始字节数据转换为NumPy数组可以显著提高解析速度。NumPy的向量化操作远比Python的循环快。

示例:高效解析结构体字节数据

假设你有一个包含1000个DINT类型变量的结构体:

import numpy as np

# ... PlcDataManager 类定义 ...

class PlcDataManager:
    # ... 其他方法 ...

    # 假设你的结构体定义如下
    STRUCTURE_DEF_LARGE_ARRAY = (
        ('data_array', pyads.PLCTYPE_DINT, 1000), # 1000个DINT
        # ... 其他变量 ...
    )
    SIZE_OF_LARGE_STRUCT = pyads.size_of_structure(STRUCTURE_DEF_LARGE_ARRAY)

    def setup_notifications(self):
        # ... 现有通知设置 ...

        # 注册大型结构体通知
        self.plc.add_device_notification(
            'global.large_data_structure', # 假设PLC中这个结构体的名称
            pyads.NotificationAttrib(self.SIZE_OF_LARGE_STRUCT),
            self.on_large_data_notification,
        )
        print("已注册大型数据结构体通知。")

    def on_large_data_notification(self, handle, name, timestamp, value):
        """
        大型数据结构体通知回调函数,使用NumPy优化解析。
        """
        # value 是 ctypes.c_ubyte * SIZE_OF_LARGE_STRUCT 类型
        # 可以直接将其转换为bytes
        byte_data = bytes(value)

        # 假设结构体中只有一个名为 'data_array' 的DINT数组
        # DINT是32位整数 (4字节)
        dint_size = ctypes.sizeof(pyads.PLCTYPE_DINT)
        num_elements = 1000
        expected_bytes = dint_size * num_elements

        if len(byte_data) >= expected_bytes:
            # 使用np.frombuffer直接从字节缓冲区创建NumPy数组
            # dtype='<i4' 表示小端32位有符号整数 (DINT)
            # 如果是无符号DINT (UDINT),则使用 '<u4'
            # offset 参数用于跳过结构体中其他变量的字节
            # 这里假设 'data_array' 是结构体的第一个成员
            np_array = np.frombuffer(byte_data, dtype='<i4', count=num_elements, offset=0)

            # 可以在这里对 np_array 进行进一步处理或累积
            self.data_buffer.extend(np_array.tolist()) # 将NumPy数组转换为Python列表并累积
            print(f"[{time.time():.3f}] 接收到大型数据: {name}, 数组前5个值: {np_array[:5]}, 累积数据量: {len(self.data_buffer)}")
        else:
            print(f"[{time.time():.3f}] 警告: 接收到的字节数据长度不匹配预期。")
登录后复制

NumPy解析注意事项:

  • dtype参数非常重要,它指定了字节数据的解释方式。对于pyads的PLCTYPE_DINT,通常对应于C语言的long或int,在Python中是32位有符号整数。根据PLC实际变量类型选择正确的NumPy dtype(例如,'
  • offset参数用于指定从字节数据中开始读取的位置。如果你的数组不是结构体的第一个成员,你需要计算其在结构体中的字节偏移量。
  • 这种方法对于结构体中所有成员都是相同类型数组的情况尤其有效。如果结构体包含多种类型或复杂的嵌套,你可能需要更精细的字节解析逻辑,但NumPy仍然可以用于处理其中的连续同类型数据块。

总结与最佳实践

通过采用基于类的设计模式,我们能够以更清晰、更可维护的方式管理pyads的PLC通信和通知回调。这种方法有效地解决了全局变量的依赖问题,并使得数据在类实例内部得以安全地累积和处理。

对于高吞吐量的数据采集场景,仅仅依赖回调机制是不够的,还需要关注数据解析的效率。利用NumPy等科学计算库直接处理从pyads通知中获取的原始字节数据,可以显著提升数据转换和处理的性能。

最佳实践建议:

  • 封装性: 始终将pyads连接和相关逻辑封装在类中,以实现更好的模块化和状态管理。
  • 回调设计: 将回调函数设计为类的成员方法,可以直接访问和修改类实例的属性,从而实现数据的累积和处理。
  • 性能优化: 对于大数据量和高频通知,考虑跳过pyads的默认高层解析,直接处理原始字节数据,并结合NumPy等库进行批量、向量化的数据转换。
  • 错误处理: 注册PLC状态通知,并实现相应的回调逻辑,以便在PLC通信出现问题时及时发现并采取措施。
  • 异步处理: 对于需要同时执行其他任务的应用程序,考虑将pyads.Connection.run()放在单独的线程中,或使用asyncio等异步框架来管理通知。

遵循这些原则,开发者可以构建出健壮、高效且易于维护的pyads应用程序,以满足严苛的工业数据采集需求。

以上就是pyads通知机制的高效数据处理:基于类的设计与优化实践的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号