【2024 DigiKey创意大赛】
<p> </p><p><strong>前言</strong></p>
<p> </p>
<code class="language-html"><!DOCTYPE html>
<title>WebRTC Camera Stream</title>
<script src="https://webrtc.github.io/adapter/adapter-latest.js"></script>
<video id="localVideo" autoplay muted></video>
<button id="startButton">Start</button>
<script type="text/javascript">
'use strict';
const localVideo = document.querySelector('video#localVideo');
const startButton = document.querySelector('button#startButton');
let localStream, pc, ws;
startButton.onclick = async () => {
try {
// 获取摄像头流
const stream = await navigator.mediaDevices.getUserMedia({video: true, audio: false});
localVideo.srcObject = stream;
localStream = stream;
// 初始化WebSocket连接
ws = new WebSocket('ws://localhost:8765');
ws.onopen = () => {
console.log('WebSocket connection established');
// 创建PeerConnection
pc = new RTCPeerConnection({
iceServers: [
{urls: 'stun:stun.l.google.com:19302'}
// 添加本地流到PeerConnection
localStream.getTracks().forEach(track => pc.addTrack(track, localStream));
// 处理ICE候选者
pc.onicecandidate = event => {
if (event.candidate) {
ws.send(JSON.stringify({type: 'candidate', candidate: event.candidate}));
// 创建offer
const offer = await pc.createOffer();
await pc.setLocalDescription(offer);
// 发送offer到信令服务器
ws.send(JSON.stringify({type: 'offer', sdp: pc.localDescription}));
ws.onmessage = async (event) => {
const data = JSON.parse(event.data);
if (data.type === 'answer') {
// 设置远程描述
await pc.setRemoteDescription(new RTCSessionDescription(data));
} else if (data.type === 'candidate') {
// 添加ICE候选者
await pc.addIceCandidate(new RTCIceCandidate(data.candidate));
ws.onclose = () => console.log('WebSocket connection closed');
ws.onerror = (error) => console.error('WebSocket error:', error);
} catch (e) {
console.error('getUserMedia() error:', e);
<code class="language-python">import asyncio
import json
import websockets
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate
import cv2
import numpy as np
pcs = set()
room = 'default-room'
async def handle_offer(offer):
pc = RTCPeerConnection()
async def on_icecandidate(candidate):
if candidate:
await send_to_client(json.dumps({"type": "candidate", "candidate": candidate.to_json()}))
def on_track(track):
print("Track %s received" % track.kind)
# 处理视频帧
async def process_frame():
while True:
frame = await track.recv()
if frame:
# 将WebRTC帧转换为OpenCV格式
frame = cv2.cvtColor(np.frombuffer(frame, dtype=np.uint8).reshape((frame.height, frame.width, 3)), cv2.COLOR_RGBA2BGR)
# 保存图片
cv2.imwrite('received_frame.jpg', frame)
print("Frame saved")
await pc.setRemoteDescription(RTCSessionDescription(offer, type="offer"))
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return pc.localDescription.sdp
async def send_to_client(message):
# 这里应该实现向客户端发送消息的逻辑
async def signal(websocket, path):
async for message in websocket:
data = json.loads(message)
if data['type'] == 'offer':
answer = await handle_offer(data['sdp'])
await websocket.send(json.dumps({"type": "answer", "sdp": answer}))
elif data['type'] == 'candidate':
for pc in pcs:
await pc.addIceCandidate(RTCIceCandidate.from_json(data['candidate']))
# 处理客户端断开连接
start_server = websockets.serve(signal, "localhost", 8765)
<code class="language-python">import asyncio
import json
import websockets
clients = {}
async def signal(websocket, path):
# Register the client
clients = None
print("New client connected")
async for message in websocket:
data = json.loads(message)
action = data.get('action')
if action == 'join':
# Store the room ID for this client
clients = data['room']
elif action == 'offer' or action == 'answer':
# Forward the offer or answer to the other client in the same room
for client in clients:
if clients == clients and client != websocket:
await client.send(json.dumps(data))
elif action == 'ice-candidate':
# Forward ICE candidate to the other client in the same room
for client in clients:
if clients == clients and client != websocket:
await client.send(json.dumps(data))
# Unregister the client
if websocket in clients:
del clients
print("Client disconnected")
start_server = websockets.serve(signal, "localhost", 8765)
<code class="language-python">import base64
# 生成HTML内容
def generate_html(imgs, describe):
html_content = """
<!DOCTYPE html>
<html lang="en">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
.image-container {
display: flex;
flex-wrap: wrap;
justify-content: space-around;
.image-item {
border: 1px solid #ddd;
border-radius: 8px;
padding: 10px;
margin: 10px;
text-align: center;
.image-item img {
max-width: 300px;
max-height: 300px;
width: auto;
height: auto;
<div class="image-container">
for img, desc in zip(imgs, describe):
# 将base64编码的图片数据插入到img标签中
html_content += f"""
<div class="image-item">
<img src="data:image/jpeg;base64,{img}" alt="{desc}">
html_content += """
<p> </p>
<p> </p>
<p><a href="https://www.bilibili.com/video/BV1pA4m1V7WP/?spm_id_from=333.337.search-card.all.click&vd_source=c13e5621e1c5e79fc4965d6a679342eb" target="_blank">https://www.bilibili.com/video/BV1pA4m1V7WP/?spm_id_from=333.337.search-card.all.click&vd_source=c13e5621e1c5e79fc4965d6a679342eb</a></p>
<p>我们使用you-get项目获取上面的测试视频。<br />