《三十六计》有计名为“调虎离山”,解辞云:待天以困之,用人以诱之。
前文书中说到,楼主解决了几个核心的问题,准备完成整个系统了。但是据主办方要求,这次要按项目背景、作品简介、系统框图、各部分功能说明、视频演示、项目源码、发布的博文、项目总结共8个部分交作业,所以,准备听书的客官恐怕要失望了。下面把依题而做的八股文章献于众民公面前,各位上眼啊……
作品名称:泄洪道安全警告控制系统
作者:nemon
一、项目背景
常言道“水火无情”。近年来由于洪水造成的灾害多发,其中由于在泄洪道逗留、游玩甚至居住导致财产损失、人身伤亡事故的屡有发生。如2022年8月14日四川省彭州市龙漕沟突发洪水,造成7人死亡、8人轻伤的悲剧,重要原因之一就是把泄洪地当成“网红打卡地”且在灾害即将发生前未及时撤离。
我国为了实现“双碳”目标,近年来在可再生能源领域持续发力,传统的水电也获得了更多重视。水电生产运营过程中,特别是汛期到来时,必须及时发现泄洪道上的滞留人员,否则极易发生人身伤亡事故。由于水电企业特点,长期驻场人员数量不是很多,因此有必要采取自动化手段监控泄洪道,降低安全风险。
二、作品简介
"泄洪道安全警告控制系统"由3部分组成:
1、基于LicheePi 4A的探测模块:
负责拾取图像。使用Yolo V5检测摄像头视野中是否存在人员,并将结果通过UDP广播出去。
2、在PC环境里运行的广播驱离模块:
负责播放驱离音频。在接到存在人员的UDP报文后,播放3遍MP3文件,内容是“泄洪道有危险,请滞留人员马上撤离!”
3、用Pico W演示泄洪闸门开启控制模块:
负责控制闸门的开启。收到不存在人员的UDP报文后,led点亮,可以按按钮控制闸门开启;当接到存在人员的UDP报文后,只有按下强制开启按钮才能打卡闸门。
三、系统框图
硬件设备功能见“作品简介”,不再赘述。
四、各部分功能说明
在不同的硬件设备上,部署各自的软件系统,每个系统里有多个部分,实现各自的功能:
2.1、基于LicheePi 4A的探测模块:
-
监控图像获取:负责从USB摄像头获取图像并保存文件。
-
检测存在人员:使用Yolo V5检测图片文件中是否存在人员;
-
广播监测结果:将检测结果通过UDP广播出去。
2.2、在PC环境里运行的广播驱离模块:
-
收听广播结果:在接到是否存在人员的UDP报文后,判断是否调用“驱离音频播放”;
-
驱离音频播放:播放3遍MP3文件,内容是“泄洪道有危险,请滞留人员马上撤离!”
2.3、用Pico W演示泄洪闸门开启控制模块:
-
收听广播结果:在接到是否存在人员的UDP报文后,更新记录是否存在滞留人员的is_any_person变量,如果没有人则点亮led,否则熄灭;
-
控制舵机角度:舵机角度对应的是闸门的开闭;
-
响应按钮中断:如果是强制打开按钮,则调用“控制舵机角度”功能打开闸门,否则判断是否存在滞留人员的is_any_person变量,如果有人就关闭闸门,否则打开闸门。
五、视频演示(视频简介+链接)
https://www.bilibili.com/video/BV1ac411D7QX/
六、项目源码
1、LicheePi 4A的 监控图像获取探测模块:
import cv2,time
v=cv2.VideoCapture(0)
while v.isOpened():
try:
rtn,fra = v.read()
if rtn :
cv2.imwrite('/home/sipeed/cap.jpg',fra)
except:
pass
time.sleep(10)
v.release()
2、LicheePi 4A的检测存在人员和广播监测结果模块:
import numpy as np
import cv2
import os
from socket import *
import time
#UDP 参数
port = 5000
address = ("<broadcast>", port)
s = socket(AF_INET, SOCK_DGRAM)
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
def image_preprocess(image, target_size):
ih, iw = target_size
h, w, _ = image.shape
scale = min(iw/w, ih/h)
nw, nh = int(scale * w), int(scale * h)
image_resized = cv2.resize(image, (nw, nh))
image_padded = np.full(shape=[ih, iw, 3], fill_value=128.0)
dw, dh = (iw - nw) // 2, (ih-nh) // 2
image_padded[dh:nh+dh, dw:nw+dw, :] = image_resized
return image_padded
def read_class_names(class_file_name):
'''loads class name from a file'''
names = {}
with open(class_file_name, 'r') as data:
for ID, name in enumerate(data):
names[ID] = name.strip('\n')
return names
def check_result_has_person(bboxes):
"""
bboxes: [x_min, y_min, x_max, y_max, probability, cls_id] format coordinates.
"""
for i, bbox in enumerate(bboxes):
class_ind = int(bbox[5])
score = bbox[4]
if class_ind==0 and score>=.6 :
return True
return False
# input size
input_hight = 384
input_width = 640
while True:
# 获取图像
#v=cv2.VideoCapture(0)
#v.isOpened()
#o,original_image = v.read()
original_image = cv2.imread( '/home/sipeed/cap.jpg')
#cv2.imwrite('/home/sipeed/tmp.jpg',original_image)
# 调整大小
rgb_image = cv2.cvtColor(original_image, cv2.COLOR_BGR2RGB)
image_preprocessed = image_preprocess(np.copy(rgb_image), [input_hight, input_width])
# 归一化
image_preprocessed = image_preprocessed / 255.0
img_ndarray = np.array(image_preprocessed).astype("float32")
img_ndarray = img_ndarray.transpose(2, 0, 1)
img_ndarray.tofile("image_preprocessed.tensor", "\n")
img_ndarray.tofile("image_preprocessed.bin")
#print(" ******* run yolov5 and postprocess *******")
# 调用处理
model_inference_command = "./yolov5n_example ./hhb_out/hhb.bm image_preprocessed.bin"
os.system(model_inference_command)
#输出
bboxes = []
with open("detect.txt", 'r') as f:
x_min = f.readline().strip()
while x_min:
y_min = f.readline().strip()
x_max = f.readline().strip()
y_max = f.readline().strip()
probability = f.readline().strip()
cls_id = f.readline().strip()
bbox = [float(x_min), float(y_min), float(x_max), float(y_max), float(probability), int(cls_id)]
print(bbox)
bboxes.append(bbox)
x_min = f.readline().strip()
# 输出
flag = check_result_has_person( bboxes)
if flag:
s.sendto("1".encode(), address)
else:
s.sendto("0".encode(), address)
3、在PC环境里运行的广播驱离模块:
# coding=UTF-8
import pygame
pygame.mixer.init()
pygame.mixer.music.load("danger.mp3")
from socket import *
import time
# IP地址为空""表示接收任何网段的广播消息
# IP地址也可以填 "0.0.0.0"
address = ("0.0.0.0", 5000)
# 创建流式socket
s = socket(AF_INET, SOCK_DGRAM)
# 设置socket属性
s.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
s.settimeout(3)
# 绑定本地ip地址和端口
s.bind(address)
print("wait recv...")
while True:
# 接收消息
try:
data, address_rx = s.recvfrom(65536)
print("[recv form %s:%d]:%s" % (address_rx[0], address_rx[1], data))
buf=data.decode('utf-8')
print(buf,buf[0]=="0")
if buf[0]!="0":
for _ in range(3):
pygame.mixer.music.play()
while pygame.mixer.music.get_busy() == True:
pass
time.sleep(0.01)
except:
pass
# 关闭socket
s.close()
4、用Pico W演示泄洪闸门开启控制模块:
from machine import Pin,PWM
import network
import socket
import time,utime
class STATE:
OPEN = 1
CLOSE = 0
# 常数定义
ssid = "FAST_538C80"
key = "12345678"
led = Pin('WL_GPIO0', Pin.OUT) # 板载LED连到WL_GPIO0
led.value(0) # 板载LED熄灭
"""
将伺服电机的电源线连接到 Raspberry Pi Pico W 上的 3.3V 引脚。
将伺服电机的地线连接到 Raspberry Pi Pico W 上的任意 GND 引脚。
将伺服电机的控制信号线连接到 Raspberry Pi Pico W 上的 GPIO0 引脚。
"""
pwm = PWM(Pin(0))
pwm.freq(50)
def setServoCycle(position):
pwm.duty_u16(position)
time.sleep(0.01)
def setServoDegree(deg):
setServoCycle(1000+int(deg*400/9))#1000 + int( deg*8000/180 )
print("setServoDegree:",deg)
is_any_person =True
# 定义连接WiFi函数
def connect():
global wlan
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, key)
while wlan.isconnected() == False:
print("等待连接...")
time.sleep(1)
print("已连接...")
connect()
port = 5000
UDP = ("", port)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind(UDP)
# 舵机控制
def ServoCtrl(state = STATE.CLOSE):
if state == STATE.OPEN:
setServoDegree(180)
else:
setServoDegree(0)
# 响应按钮中断
def hadleBtnPressed20(pin):
#print("hadleBtnPressed--------20",pin)
#p20.irq(None, Pin.IRQ_FALLING)
if is_any_person:
ServoCtrl(STATE.CLOSE)
else:
ServoCtrl(STATE.OPEN)
#time.sleep(0.1)
#p20.irq(hadleBtnPressed20, Pin.IRQ_FALLING)
def hadleBtnPressed21(pin):
#print("hadleBtnPressed----21",pin)
#p21.irq(None, Pin.IRQ_FALLING)
ServoCtrl(STATE.OPEN)
#time.sleep(0.1)
#p21.irq(hadleBtnPressed21, Pin.IRQ_FALLING)
#注册按钮中断
def button_irq_handler(btn_number,pin_object):
remove_button_irq_handler(p20)
remove_button_irq_handler(p21)
print("pin:",btn_number," IRQ with flags:", pin_object.irq().flags(),utime.localtime(),pin_object)
if btn_number == 20 :
hadleBtnPressed20(20)
else:
hadleBtnPressed21(21)
#self.update_screen()
utime.sleep_ms(300)
#恢复中断
set_button_irq(20,p20)
set_button_irq(21,p21)
def set_button_irq(btn_number,pin_object):
pin_object.irq(lambda pin: button_irq_handler(btn_number,pin), Pin.IRQ_FALLING)
def remove_button_irq_handler(pin_object):
pin_object.irq(lambda pin: pin, Pin.IRQ_FALLING)
p20 = Pin(20, Pin.IN, Pin.PULL_UP)
p21 = Pin(21, Pin.IN, Pin.PULL_UP)
###setup()
#global msg_id,msg_num,selIDX
set_button_irq(20,p20)
set_button_irq(21,p21)
# 主循环
while True:
buf, addr = sock.recvfrom(1024)
if buf:
buf = buf.decode('utf-8')
print(buf,buf[0]=="0")
if buf[0]=="0":
led.value(1)
is_any_person = False
else:
led.value(0)
is_any_person = True
七、发布的博文(附上标题和平头哥发布链接)
1 |
||
2 |
||
3 |
||
4 |
八、项目总结
本项目由于条件和法规限制未能使用真实系统实地测试,比较遗憾。但是本项目测试了在LicheePi 4A上运行Yolo V5的能力,并通过多图对比,估算出实地环境布置摄像头的标准(人像宽度须不小于2%),并采用广播的方式,实现了多设备的协同支持。总的来讲,验证了LicheePi 4A的性能和网络功能,可以认为基本完成了原设计功能。如果有机会继续,可以考虑使用PLC和网路摄像头进行集成,会更加实用。
九、其他
感谢主办方“平头哥”玄铁官网(www.xrvm.cn)和 “电子工程世界”(www.eeworld.com.cn)的支持以及各位网友的帮助。谢谢你们!