【2024 DigiKey创意大赛】
<p> </p><p><strong>前言</strong></p>
<p>我们的系统是一个智能猫咪行为监测系统,主要由树莓派和视觉传感器组成。树莓派作为系统的核心处理单元,负责运行程序、处理数据和通信。视觉传感器安装在树莓派上,用于捕捉猫咪的活动画面。通过视觉识别算法,实时处理图像并识别跟踪猫咪的活动。Gemini多模态模型被用来分析这些图像和视频,识别猫咪的各种行为,如吃东西、玩耍、睡觉等,并将这些行为数据记录下来,形成猫咪的行为日志。长时间的记录可以帮助识别猫咪的日常行为模式,提供行为健康分析。这个系统支持远程监控,宠物主人可以通过手机或电脑远程查看猫咪的行为记录,了解其健康状态和日常活动。通过行为数据分析,系统可以早期发现猫咪的异常行为,及时采取健康干预措施。同时,系统的数据存储在本地或通过加密的方式上传到云端,确保数据的隐私和安全。这个系统不仅能帮助宠物主人更好地了解和照顾他们的猫咪,还能为宠物健康和行为研究提供有价值的数据。</p>
<p><strong>设计</strong></p>
<p> </p>
<p><strong>实现</strong></p>
<p>这段代码创建了一个网页,允许用户通过WebRTC技术启动摄像头并将视频流发送到一个WebSocket服务器。它首先获取用户的摄像头流,然后通过WebSocket连接到服务器,创建一个PeerConnection来管理视频流的传输。代码还处理了ICE候选者和信令过程,以确保视频流能够正确地传输到服务器。</p>
<pre>
<code class="language-html"><!DOCTYPE html>
<html>
<head>
<title>WebRTC Camera Stream</title>
<script src="https://webrtc.github.io/adapter/adapter-latest.js"></script>
</head>
<body>
<video id="localVideo" autoplay muted></video>
<button id="startButton">Start</button>
<script type="text/javascript">
'use strict';
const localVideo = document.querySelector('video#localVideo');
const startButton = document.querySelector('button#startButton');
let localStream, pc, ws;
startButton.onclick = async () => {
try {
// 获取摄像头流
const stream = await navigator.mediaDevices.getUserMedia({video: true, audio: false});
localVideo.srcObject = stream;
localStream = stream;
// 初始化WebSocket连接
ws = new WebSocket('ws://localhost:8765');
ws.onopen = () => {
console.log('WebSocket connection established');
// 创建PeerConnection
pc = new RTCPeerConnection({
iceServers: [
{urls: 'stun:stun.l.google.com:19302'}
]
});
// 添加本地流到PeerConnection
localStream.getTracks().forEach(track => pc.addTrack(track, localStream));
// 处理ICE候选者
pc.onicecandidate = event => {
if (event.candidate) {
ws.send(JSON.stringify({type: 'candidate', candidate: event.candidate}));
}
};
// 创建offer
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
// 发送offer到信令服务器
ws.send(JSON.stringify({type: 'offer', sdp: pc.localDescription}));
};
ws.onmessage = async (event) => {
const data = JSON.parse(event.data);
if (data.type === 'answer') {
// 设置远程描述
await pc.setRemoteDescription(new RTCSessionDescription(data));
} else if (data.type === 'candidate') {
// 添加ICE候选者
await pc.addIceCandidate(new RTCIceCandidate(data.candidate));
}
};
ws.onclose = () => console.log('WebSocket connection closed');
ws.onerror = (error) => console.error('WebSocket error:', error);
} catch (e) {
console.error('getUserMedia() error:', e);
}
};
</script>
</body>
</html>
</code></pre>
<p>这段代码实现了一个WebSocket服务器,用于处理WebRTC视频流。它创建了一个异步函数来处理客户端的offer,设置远程描述,创建并发送answer,同时处理ICE候选者。服务器还负责接收视频轨道,利用OpenCV将WebRTC视频帧转换并保存为图片。最后,服务器通过WebSocket监听客户端的信令消息,处理offer和candidate,并保持运行以接收新的连接。</p>
<pre>
<code class="language-python">import asyncio
import json
import websockets
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate
import cv2
import numpy as np
pcs = set()
room = 'default-room'
async def handle_offer(offer):
pc = RTCPeerConnection()
pcs.add(pc)
@pc.on("icecandidate")
async def on_icecandidate(candidate):
if candidate:
await send_to_client(json.dumps({"type": "candidate", "candidate": candidate.to_json()}))
@pc.on("track")
def on_track(track):
print("Track %s received" % track.kind)
# 处理视频帧
async def process_frame():
while True:
frame = await track.recv()
if frame:
# 将WebRTC帧转换为OpenCV格式
frame = cv2.cvtColor(np.frombuffer(frame, dtype=np.uint8).reshape((frame.height, frame.width, 3)), cv2.COLOR_RGBA2BGR)
# 保存图片
cv2.imwrite('received_frame.jpg', frame)
print("Frame saved")
asyncio.ensure_future(process_frame())
await pc.setRemoteDescription(RTCSessionDescription(offer, type="offer"))
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return pc.localDescription.sdp
async def send_to_client(message):
# 这里应该实现向客户端发送消息的逻辑
pass
async def signal(websocket, path):
try:
async for message in websocket:
data = json.loads(message)
if data['type'] == 'offer':
answer = await handle_offer(data['sdp'])
await websocket.send(json.dumps({"type": "answer", "sdp": answer}))
elif data['type'] == 'candidate':
for pc in pcs:
await pc.addIceCandidate(RTCIceCandidate.from_json(data['candidate']))
finally:
# 处理客户端断开连接
pass
start_server = websockets.serve(signal, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
</code></pre>
<p>这段代码实现了一个简单的WebRTC信令服务器。它通过WebSocket处理客户端的连接和消息。服务器允许客户端加入房间,并在同一房间内的客户端之间转发offer、answer和ICE候选者消息。每个客户端连接时会被注册到一个字典中,包含其所在的房间信息。服务器会监听客户端的消息,并根据消息类型进行相应的转发操作。当客户端断开连接时,服务器会将其从字典中移除。</p>
<pre>
<code class="language-python">import asyncio
import json
import websockets
clients = {}
async def signal(websocket, path):
try:
# Register the client
clients = None
print("New client connected")
async for message in websocket:
data = json.loads(message)
action = data.get('action')
if action == 'join':
# Store the room ID for this client
clients = data['room']
elif action == 'offer' or action == 'answer':
# Forward the offer or answer to the other client in the same room
for client in clients:
if clients == clients and client != websocket:
await client.send(json.dumps(data))
break
elif action == 'ice-candidate':
# Forward ICE candidate to the other client in the same room
for client in clients:
if clients == clients and client != websocket:
await client.send(json.dumps(data))
break
finally:
# Unregister the client
if websocket in clients:
del clients
print("Client disconnected")
start_server = websockets.serve(signal, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
</code></pre>
<p>然后从数据库中生成网页报告</p>
<pre>
<code class="language-python">import base64
# 生成HTML内容
def generate_html(imgs, describe):
html_content = """
```html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>图片与描述展示</title>
<style>
.image-container {
display: flex;
flex-wrap: wrap;
justify-content: space-around;
}
.image-item {
border: 1px solid #ddd;
border-radius: 8px;
padding: 10px;
margin: 10px;
text-align: center;
}
.image-item img {
max-width: 300px;
max-height: 300px;
width: auto;
height: auto;
}
</style>
</head>
<body>
<div class="image-container">
"""
for img, desc in zip(imgs, describe):
# 将base64编码的图片数据插入到img标签中
html_content += f"""
<div class="image-item">
<img src="data:image/jpeg;base64,{img}" alt="{desc}">
<p>{desc}</p>
</div>
"""
html_content += """
</div>
</body>
</html>
</code></pre>
<p> </p>
<p> </p>
<p>测试视频获取。</p>
<p><a href="https://www.bilibili.com/video/BV1pA4m1V7WP/?spm_id_from=333.337.search-card.all.click&vd_source=c13e5621e1c5e79fc4965d6a679342eb" target="_blank">https://www.bilibili.com/video/BV1pA4m1V7WP/?spm_id_from=333.337.search-card.all.click&vd_source=c13e5621e1c5e79fc4965d6a679342eb</a></p>
<p>我们使用you-get项目获取上面的测试视频。<br />
</p>
<p>很想知道ICE候选者和信令过程代码实现的流程</p>
页:
[1]