1759|2

12

帖子

0

TA的资源

一粒金砂(中级)

楼主
 

【米尔-全志 T527 开发板-试用评测】-第二篇 网关协议管理功能的实现 [复制链接]

在第一篇中在米尔T527开发板自带的安卓系统上搭建了linux开发环境,并测试了mqtt协议。本篇我们进一步实现一个简易的协议管理器,这对于边缘网关来说很重要。因为在边缘网关中通常需要处理各种常见的物联网通讯协议,一个优秀的协议管理器对项目成败至关重要。因为用来做产品原型测试,所以我们继续用python作为开发语言,原因无他,主要是因为生态好,库多,上手快。为了简单起见,接下来我们主要展示如何同时管理socket 和 MQTT,并根据不同的协议进行数据的收发。

 

因为要管理多种协议,我们自然想到使用工厂模式实现 ProtocolManager 类,从而可以使代码更加灵活和可扩展。工厂模式允许我们根据传入的参数动态地创建和返回协议实例,而不需要在 ProtocolManager 中硬编码所有协议的具体实现。

class Protocol:  
    def send(self, data):  
        raise NotImplementedError("Protocol must implement send method")  
  
    def receive(self):  
        raise NotImplementedError("Protocol must implement receive method")  
  
class TCPProtocol(Protocol):  
    def __init__(self, host, port):  
        self.host = host  
        self.port = port  
        # 初始化 TCP 连接等  
  
    def send(self, data):  
        # 实现 TCP 发送逻辑  
        pass  
  
    def receive(self):  
        # 实现 TCP 接收逻辑  
        pass  
  
class MQTTProtocol(Protocol):  
    def __init__(self, broker_address):  
        self.broker_address = broker_address  
        # 初始化 MQTT 连接等  
  
    def send(self, data):  
        # 实现 MQTT 发送逻辑  
        pass  
  
    def receive(self):  
        # 实现 MQTT 接收逻辑  
        pass  
  
class ProtocolFactory:  
    @staticmethod  
    def create_protocol(protocol_type, *args, **kwargs):  
        if protocol_type == 'tcp':  
            return TCPProtocol(*args, **kwargs)  
        elif protocol_type == 'mqtt':  
            return MQTTProtocol(*args, **kwargs)  
        else:  
            raise ValueError(f"Unknown protocol type: {protocol_type}")  
  
class ProtocolManager:  
    def __init__(self):  
        self.protocols = {}  
  
    def connect(self, protocol_type, *args, **kwargs):  
        protocol_instance = ProtocolFactory.create_protocol(protocol_type, *args, **kwargs)  
        self.protocols[protocol_type] = protocol_instance  
  
    def send_data(self, protocol_type, data):  
        if protocol_type in self.protocols:  
            protocol_instance = self.protocols[protocol_type]  
            protocol_instance.send(data)  
        else:  
            print(f"No connection established for protocol: {protocol_type}")  
  
    def receive_data(self, protocol_type):  
        if protocol_type in self.protocols:  
            protocol_instance = self.protocols[protocol_type]  
            return protocol_instance.receive()  
        else:  
            print(f"No connection established for protocol: {protocol_type}")  
            return None  
  
# 使用示例  
manager = ProtocolManager()  
  
# 连接 TCP 和 MQTT  
manager.connect('tcp', 'localhost', 12345)  
manager.connect('mqtt', 'localhost')  
  
# 发送数据  
manager.send_data('tcp', b'Hello, TCP!')  
manager.send_data('mqtt', 'Hello, MQTT!')  
  
# 接收数据(这里只是示例,实际使用时可能需要异步接收或轮询)  
tcp_data = manager.receive_data('tcp')  
mqtt_data = manager.receive_data('mqtt')  
  
print(f"Received TCP data: {tcp_data}")  
print(f"Received MQTT data: {mqtt_data}")

在上面的示例中,我们定义了 Protocol 基类,并创建了两个继承自 Protocol 的具体协议类 TCPProtocol 和 MQTTProtocol。ProtocolFactory 类是一个静态工厂,负责根据传入的协议类型创建相应的协议实例。ProtocolManager 类使用 connect 方法通过工厂创建协议实例,并将其存储在 protocols 字典中。send_data 和 receive_data 方法现在根据协议类型从字典中获取协议实例,并调用其 send 和 receive 方法。这种方式使得添加新的协议变得非常简单,只需要定义一个新的协议类,并在 ProtocolFactory 中添加相应的创建逻辑即可,而不需要修改 ProtocolManager 类的代码。

 

但是在实际应用中,接收数据通常涉及到监听数据流的异步事件或回调,并且通常使用队列或缓存来存储接收到的数据,直到它们被处理。python中可以使用asyncio库和queue模块来实现异步队列, 我们将上述接收数据部分改写为异步缓存处理,修改后的代码如下:

import asyncio  
import queue  
  
class Protocol:  
    async def receive(self):  
        raise NotImplementedError  
  
    async def send_data(self, data):  
        raise NotImplementedError  
  
class AsyncProtocol(Protocol):  
    def __init__(self):  
        self.receive_queue = asyncio.Queue()  
  
    async def receive(self):  
        while True:  
            data = await self._receive_data()  
            await self.receive_queue.put(data)  
  
    async def _receive_data(self):  
        # 这里模拟从网络或其他IO源接收数据  
        await asyncio.sleep(1)  
        return f"Received data from {self.__class__.__name__}"  
  
    async def get_received_data(self):  
        return await self.receive_queue.get()  
  
    async def send_data(self, data):  
        # 这里应该实现具体的发送逻辑  
        # 暂时打印数据作为模拟  
        print(f"Sending data to {self.__class__.__name__}: {data}")  
  
class TCPProtocol(AsyncProtocol):  
    pass  
  
class MQTTProtocol(AsyncProtocol):  
    pass  
  
class ProtocolFactory:  
    @staticmethod  
    async def create_protocol(protocol_type):  
        if protocol_type == 'tcp':  
            return TCPProtocol()  
        elif protocol_type == 'mqtt':  
            return MQTTProtocol()  
        else:  
            raise ValueError(f"Unknown protocol type: {protocol_type}")  
  
class ProtocolManager:  
    def __init__(self):  
        self.protocols = {}  
  
    async def connect(self, protocol_type):  
        protocol_instance = await ProtocolFactory.create_protocol(protocol_type)  
        self.protocols[protocol_type] = protocol_instance  
        asyncio.create_task(protocol_instance.receive())  
  
    async def send_data(self, protocol_type, data):  
        if protocol_type in self.protocols:  
            await self.protocols[protocol_type].send_data(data)  
        else:  
            print(f"No connection established for protocol: {protocol_type}")  
  
    async def receive_data(self, protocol_type):  
        if protocol_type in self.protocols:  
            protocol_instance = self.protocols[protocol_type]  
            return await protocol_instance.get_received_data()  
        else:  
            print(f"No connection established for protocol: {protocol_type}")  
            return None  
  
# 使用示例  
async def main():  
    manager = ProtocolManager()  
    await manager.connect('tcp')  
    await manager.connect('mqtt')  
  
    # 发送数据  
    await manager.send_data('tcp', 'Hello TCP!')  
    await manager.send_data('mqtt', 'Hello MQTT!')  
  
    # 接收数据  
    tcp_data = await manager.receive_data('tcp')  
    mqtt_data = await manager.receive_data('mqtt')  
    print(f"TCP received: {tcp_data}")  
    print(f"MQTT received: {mqtt_data}")  
  
# 运行事件循环  
asyncio.run(main())

修改后的代码中,send_data方法现在只是简单地打印出要发送的数据和协议类型,以模拟发送逻辑。在实际应用中,需要在这些方法中加入真正的网络发送和接收逻辑。

运行代码,结果如下:

 
至此,一个简易的协议管理框架已经大家完毕,下一篇会基于该框架进行更多的物联网协议测试,以实现网关基础的收发功能。

 

最新回复

感觉米尔的开发板硬件设计达不到工业应用环境的要求   详情 回复 发表于 2024-4-26 09:15
点赞 关注(1)
 

回复
举报

6822

帖子

0

TA的资源

五彩晶圆(高级)

沙发
 

米尔-全志 T527 开发板做的还算牛叉

点评

感觉米尔的开发板硬件设计达不到工业应用环境的要求  详情 回复 发表于 2024-4-26 09:15
 
 

回复

1286

帖子

4

TA的资源

版主

板凳
 
Jacktang 发表于 2024-3-25 07:27 米尔-全志 T527 开发板做的还算牛叉

感觉米尔的开发板硬件设计达不到工业应用环境的要求

 
 

回复
您需要登录后才可以回帖 登录 | 注册

随便看看
查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/8 下一条

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2025 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表