在第一篇中在米尔T527开发板自带的安卓系统上搭建了linux开发环境,并测试了mqtt协议。本篇我们进一步实现一个简易的协议管理器,这对于边缘网关来说很重要。因为在边缘网关中通常需要处理各种常见的物联网通讯协议,一个优秀的协议管理器对项目成败至关重要。因为用来做产品原型测试,所以我们继续用python作为开发语言,原因无他,主要是因为生态好,库多,上手快。为了简单起见,接下来我们主要展示如何同时管理socket 和 MQTT,并根据不同的协议进行数据的收发。
因为要管理多种协议,我们自然想到使用工厂模式实现 ProtocolManager 类,从而可以使代码更加灵活和可扩展。工厂模式允许我们根据传入的参数动态地创建和返回协议实例,而不需要在 ProtocolManager 中硬编码所有协议的具体实现。
class Protocol:
def send(self, data):
raise NotImplementedError("Protocol must implement send method")
def receive(self):
raise NotImplementedError("Protocol must implement receive method")
class TCPProtocol(Protocol):
def __init__(self, host, port):
self.host = host
self.port = port
# 初始化 TCP 连接等
def send(self, data):
# 实现 TCP 发送逻辑
pass
def receive(self):
# 实现 TCP 接收逻辑
pass
class MQTTProtocol(Protocol):
def __init__(self, broker_address):
self.broker_address = broker_address
# 初始化 MQTT 连接等
def send(self, data):
# 实现 MQTT 发送逻辑
pass
def receive(self):
# 实现 MQTT 接收逻辑
pass
class ProtocolFactory:
@staticmethod
def create_protocol(protocol_type, *args, **kwargs):
if protocol_type == 'tcp':
return TCPProtocol(*args, **kwargs)
elif protocol_type == 'mqtt':
return MQTTProtocol(*args, **kwargs)
else:
raise ValueError(f"Unknown protocol type: {protocol_type}")
class ProtocolManager:
def __init__(self):
self.protocols = {}
def connect(self, protocol_type, *args, **kwargs):
protocol_instance = ProtocolFactory.create_protocol(protocol_type, *args, **kwargs)
self.protocols[protocol_type] = protocol_instance
def send_data(self, protocol_type, data):
if protocol_type in self.protocols:
protocol_instance = self.protocols[protocol_type]
protocol_instance.send(data)
else:
print(f"No connection established for protocol: {protocol_type}")
def receive_data(self, protocol_type):
if protocol_type in self.protocols:
protocol_instance = self.protocols[protocol_type]
return protocol_instance.receive()
else:
print(f"No connection established for protocol: {protocol_type}")
return None
# 使用示例
manager = ProtocolManager()
# 连接 TCP 和 MQTT
manager.connect('tcp', 'localhost', 12345)
manager.connect('mqtt', 'localhost')
# 发送数据
manager.send_data('tcp', b'Hello, TCP!')
manager.send_data('mqtt', 'Hello, MQTT!')
# 接收数据(这里只是示例,实际使用时可能需要异步接收或轮询)
tcp_data = manager.receive_data('tcp')
mqtt_data = manager.receive_data('mqtt')
print(f"Received TCP data: {tcp_data}")
print(f"Received MQTT data: {mqtt_data}")
在上面的示例中,我们定义了 Protocol 基类,并创建了两个继承自 Protocol 的具体协议类 TCPProtocol 和 MQTTProtocol。ProtocolFactory 类是一个静态工厂,负责根据传入的协议类型创建相应的协议实例。ProtocolManager 类使用 connect 方法通过工厂创建协议实例,并将其存储在 protocols 字典中。send_data 和 receive_data 方法现在根据协议类型从字典中获取协议实例,并调用其 send 和 receive 方法。这种方式使得添加新的协议变得非常简单,只需要定义一个新的协议类,并在 ProtocolFactory 中添加相应的创建逻辑即可,而不需要修改 ProtocolManager 类的代码。
但是在实际应用中,接收数据通常涉及到监听数据流的异步事件或回调,并且通常使用队列或缓存来存储接收到的数据,直到它们被处理。python中可以使用asyncio库和queue模块来实现异步队列, 我们将上述接收数据部分改写为异步缓存处理,修改后的代码如下:
import asyncio
import queue
class Protocol:
async def receive(self):
raise NotImplementedError
async def send_data(self, data):
raise NotImplementedError
class AsyncProtocol(Protocol):
def __init__(self):
self.receive_queue = asyncio.Queue()
async def receive(self):
while True:
data = await self._receive_data()
await self.receive_queue.put(data)
async def _receive_data(self):
# 这里模拟从网络或其他IO源接收数据
await asyncio.sleep(1)
return f"Received data from {self.__class__.__name__}"
async def get_received_data(self):
return await self.receive_queue.get()
async def send_data(self, data):
# 这里应该实现具体的发送逻辑
# 暂时打印数据作为模拟
print(f"Sending data to {self.__class__.__name__}: {data}")
class TCPProtocol(AsyncProtocol):
pass
class MQTTProtocol(AsyncProtocol):
pass
class ProtocolFactory:
@staticmethod
async def create_protocol(protocol_type):
if protocol_type == 'tcp':
return TCPProtocol()
elif protocol_type == 'mqtt':
return MQTTProtocol()
else:
raise ValueError(f"Unknown protocol type: {protocol_type}")
class ProtocolManager:
def __init__(self):
self.protocols = {}
async def connect(self, protocol_type):
protocol_instance = await ProtocolFactory.create_protocol(protocol_type)
self.protocols[protocol_type] = protocol_instance
asyncio.create_task(protocol_instance.receive())
async def send_data(self, protocol_type, data):
if protocol_type in self.protocols:
await self.protocols[protocol_type].send_data(data)
else:
print(f"No connection established for protocol: {protocol_type}")
async def receive_data(self, protocol_type):
if protocol_type in self.protocols:
protocol_instance = self.protocols[protocol_type]
return await protocol_instance.get_received_data()
else:
print(f"No connection established for protocol: {protocol_type}")
return None
# 使用示例
async def main():
manager = ProtocolManager()
await manager.connect('tcp')
await manager.connect('mqtt')
# 发送数据
await manager.send_data('tcp', 'Hello TCP!')
await manager.send_data('mqtt', 'Hello MQTT!')
# 接收数据
tcp_data = await manager.receive_data('tcp')
mqtt_data = await manager.receive_data('mqtt')
print(f"TCP received: {tcp_data}")
print(f"MQTT received: {mqtt_data}")
# 运行事件循环
asyncio.run(main())
修改后的代码中,send_data方法现在只是简单地打印出要发送的数据和协议类型,以模拟发送逻辑。在实际应用中,需要在这些方法中加入真正的网络发送和接收逻辑。
运行代码,结果如下:
至此,一个简易的协议管理框架已经大家完毕,下一篇会基于该框架进行更多的物联网协议测试,以实现网关基础的收发功能。