JerryZhen 发表于 2024-3-24 22:25

【米尔-全志 T527 开发板-试用评测】-第二篇 网关协议管理功能的实现

<div class='showpostmsg'><p>在第一篇中在米尔T527开发板自带的安卓系统上搭建了linux开发环境,并测试了mqtt协议。本篇我们进一步实现一个简易的协议管理器,这对于边缘网关来说很重要。因为在边缘网关中通常需要处理各种常见的物联网通讯协议,一个优秀的协议管理器对项目成败至关重要。因为用来做产品原型测试,所以我们继续用python作为开发语言,原因无他,主要是因为生态好,库多,上手快。为了简单起见,接下来我们主要展示如何同时管理socket 和 MQTT,并根据不同的协议进行数据的收发。</p>

<p>&nbsp;</p>

<p>因为要管理多种协议,我们自然想到使用工厂模式实现 ProtocolManager 类,从而可以使代码更加灵活和可扩展。工厂模式允许我们根据传入的参数动态地创建和返回协议实例,而不需要在 ProtocolManager 中硬编码所有协议的具体实现。</p>

<pre>
<code class="language-python">class Protocol:
    def send(self, data):
      raise NotImplementedError("Protocol must implement send method")

    def receive(self):
      raise NotImplementedError("Protocol must implement receive method")

class TCPProtocol(Protocol):
    def __init__(self, host, port):
      self.host = host
      self.port = port
      # 初始化 TCP 连接等

    def send(self, data):
      # 实现 TCP 发送逻辑
      pass

    def receive(self):
      # 实现 TCP 接收逻辑
      pass

class MQTTProtocol(Protocol):
    def __init__(self, broker_address):
      self.broker_address = broker_address
      # 初始化 MQTT 连接等

    def send(self, data):
      # 实现 MQTT 发送逻辑
      pass

    def receive(self):
      # 实现 MQTT 接收逻辑
      pass

class ProtocolFactory:
    @staticmethod
    def create_protocol(protocol_type, *args, **kwargs):
      if protocol_type == 'tcp':
            return TCPProtocol(*args, **kwargs)
      elif protocol_type == 'mqtt':
            return MQTTProtocol(*args, **kwargs)
      else:
            raise ValueError(f"Unknown protocol type: {protocol_type}")

class ProtocolManager:
    def __init__(self):
      self.protocols = {}

    def connect(self, protocol_type, *args, **kwargs):
      protocol_instance = ProtocolFactory.create_protocol(protocol_type, *args, **kwargs)
      self.protocols = protocol_instance

    def send_data(self, protocol_type, data):
      if protocol_type in self.protocols:
            protocol_instance = self.protocols
            protocol_instance.send(data)
      else:
            print(f"No connection established for protocol: {protocol_type}")

    def receive_data(self, protocol_type):
      if protocol_type in self.protocols:
            protocol_instance = self.protocols
            return protocol_instance.receive()
      else:
            print(f"No connection established for protocol: {protocol_type}")
            return None

# 使用示例
manager = ProtocolManager()

# 连接 TCP 和 MQTT
manager.connect('tcp', 'localhost', 12345)
manager.connect('mqtt', 'localhost')

# 发送数据
manager.send_data('tcp', b'Hello, TCP!')
manager.send_data('mqtt', 'Hello, MQTT!')

# 接收数据(这里只是示例,实际使用时可能需要异步接收或轮询)
tcp_data = manager.receive_data('tcp')
mqtt_data = manager.receive_data('mqtt')

print(f"Received TCP data: {tcp_data}")
print(f"Received MQTT data: {mqtt_data}")</code></pre>

<p>在上面的示例中,我们定义了 Protocol 基类,并创建了两个继承自 Protocol 的具体协议类 TCPProtocol 和 MQTTProtocol。ProtocolFactory 类是一个静态工厂,负责根据传入的协议类型创建相应的协议实例。ProtocolManager 类使用 connect 方法通过工厂创建协议实例,并将其存储在 protocols 字典中。send_data 和 receive_data 方法现在根据协议类型从字典中获取协议实例,并调用其 send 和 receive 方法。这种方式使得添加新的协议变得非常简单,只需要定义一个新的协议类,并在 ProtocolFactory 中添加相应的创建逻辑即可,而不需要修改 ProtocolManager 类的代码。</p>

<p>&nbsp;</p>

<p>但是在实际应用中,接收数据通常涉及到监听数据流的异步事件或回调,并且通常使用队列或缓存来存储接收到的数据,直到它们被处理。python中可以使用asyncio库和queue模块来实现异步队列, 我们将上述接收数据部分改写为异步缓存处理,修改后的代码如下:</p>

<pre>
<code class="language-python">import asyncio
import queue

class Protocol:
    async def receive(self):
      raise NotImplementedError

    async def send_data(self, data):
      raise NotImplementedError

class AsyncProtocol(Protocol):
    def __init__(self):
      self.receive_queue = asyncio.Queue()

    async def receive(self):
      while True:
            data = await self._receive_data()
            await self.receive_queue.put(data)

    async def _receive_data(self):
      # 这里模拟从网络或其他IO源接收数据
      await asyncio.sleep(1)
      return f"Received data from {self.__class__.__name__}"

    async def get_received_data(self):
      return await self.receive_queue.get()

    async def send_data(self, data):
      # 这里应该实现具体的发送逻辑
      # 暂时打印数据作为模拟
      print(f"Sending data to {self.__class__.__name__}: {data}")

class TCPProtocol(AsyncProtocol):
    pass

class MQTTProtocol(AsyncProtocol):
    pass

class ProtocolFactory:
    @staticmethod
    async def create_protocol(protocol_type):
      if protocol_type == 'tcp':
            return TCPProtocol()
      elif protocol_type == 'mqtt':
            return MQTTProtocol()
      else:
            raise ValueError(f"Unknown protocol type: {protocol_type}")

class ProtocolManager:
    def __init__(self):
      self.protocols = {}

    async def connect(self, protocol_type):
      protocol_instance = await ProtocolFactory.create_protocol(protocol_type)
      self.protocols = protocol_instance
      asyncio.create_task(protocol_instance.receive())

    async def send_data(self, protocol_type, data):
      if protocol_type in self.protocols:
            await self.protocols.send_data(data)
      else:
            print(f"No connection established for protocol: {protocol_type}")

    async def receive_data(self, protocol_type):
      if protocol_type in self.protocols:
            protocol_instance = self.protocols
            return await protocol_instance.get_received_data()
      else:
            print(f"No connection established for protocol: {protocol_type}")
            return None

# 使用示例
async def main():
    manager = ProtocolManager()
    await manager.connect('tcp')
    await manager.connect('mqtt')

    # 发送数据
    await manager.send_data('tcp', 'Hello TCP!')
    await manager.send_data('mqtt', 'Hello MQTT!')

    # 接收数据
    tcp_data = await manager.receive_data('tcp')
    mqtt_data = await manager.receive_data('mqtt')
    print(f"TCP received: {tcp_data}")
    print(f"MQTT received: {mqtt_data}")

# 运行事件循环
asyncio.run(main())</code></pre>

<p>修改后的代码中,send_data方法现在只是简单地打印出要发送的数据和协议类型,以模拟发送逻辑。在实际应用中,需要在这些方法中加入真正的网络发送和接收逻辑。</p>

<p>运行代码,结果如下:</p>

<div style="text-align: center;"></div>

<div style="text-align: center;">&nbsp;</div>

<div>至此,一个简易的协议管理框架已经大家完毕,下一篇会基于该框架进行更多的物联网协议测试,以实现网关基础的收发功能。</div>

<p>&nbsp;</p>
</div><script>                                        var loginstr = '<div class="locked">查看本帖全部内容,请<a href="javascript:;"   style="color:#e60000" class="loginf">登录</a>或者<a href="https://bbs.eeworld.com.cn/member.php?mod=register_eeworld.php&action=wechat" style="color:#e60000" target="_blank">注册</a></div>';
                                       
                                        if(parseInt(discuz_uid)==0){
                                                                                                (function($){
                                                        var postHeight = getTextHeight(400);
                                                        $(".showpostmsg").html($(".showpostmsg").html());
                                                        $(".showpostmsg").after(loginstr);
                                                        $(".showpostmsg").css({height:postHeight,overflow:"hidden"});
                                                })(jQuery);
                                        }                </script><script type="text/javascript">(function(d,c){var a=d.createElement("script"),m=d.getElementsByTagName("script"),eewurl="//counter.eeworld.com.cn/pv/count/";a.src=eewurl+c;m.parentNode.insertBefore(a,m)})(document,523)</script>

Jacktang 发表于 2024-3-25 07:27

<p>米尔-全志 T527 开发板做的还算牛叉</p>

beyond_笑谈 发表于 2024-4-26 09:15

Jacktang 发表于 2024-3-25 07:27
米尔-全志 T527 开发板做的还算牛叉

<p>感觉米尔的开发板硬件设计达不到工业应用环境的要求</p>
页: [1]
查看完整版本: 【米尔-全志 T527 开发板-试用评测】-第二篇 网关协议管理功能的实现