这次以一个小项目为目标,对STM32MP135F开发板进行了测评,在测评中实现了开发板的系统烧写、wifi联网、网络校时、SSH连接、使用python控制GPIO、MQTT服务器的搭建、MQTT客户端的编程、GTK桌面程序编程、发送邮件、数据库保存数据等功能。
下面的视频演示了使用上面的功能实现的一个网关,该网关部署MQTT服务,接收传感器设备上传的温度数据,并保存到数据库中,用户可以通过桌面程序查看各传感器上传的数据,当判断温度超过26度时,该网关会自动发送邮件进行报警。
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
from control_database import MyDB
class ListBoxRowWithData(Gtk.ListBoxRow):
def __init__(self, data):
super().__init__()
self.data = data
self.add(Gtk.Label(label=data))
class ListBoxWindow(Gtk.Window):
def __init__(self):
super().__init__(title="sensor data")
self.set_border_width(10)
box_outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
self.add(box_outer)
label = Gtk.Label("Sensors")
box_outer.pack_start(label,True,True,0)
box_data = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
box_outer.pack_start(box_data,True,True,0)
listbox_2 = Gtk.ListBox()
db = MyDB()
items = db.get_table_list()
for item in items:
listbox_2.add(ListBoxRowWithData(item[0]))
def on_row_activated(listbox_widget, row):
print(row.data)
db = MyDB()
datas = db.read_id_all_data(row.data)
items = []
for data in datas:
items.append("%02.2f %s"%(data[1],data[2]))
fresh_list3(items)
def fresh_list3(items):
children = listbox_3.get_children()
for child in children:
listbox_3.remove(child)
for item in items:
listbox_3.add(ListBoxRowWithData(item))
listbox_3.show_all()
listbox_2.connect("row-activated", on_row_activated)
box_data.pack_start(listbox_2, True, True, 0)
listbox_3 = Gtk.ListBox()
box_data.pack_start(listbox_3, True, True, 0)
listbox_2.show_all()
listbox_3.show_all()
win = ListBoxWindow()
win.set_default_size(480, 272)
win.move(0, 0)
win.connect("destroy", Gtk.main_quit)
win.show_all()
Gtk.main()
数据库操作 control_database.py
import sqlite3
from sqlite3 import Error
import datetime
sensor_db = 'sensor_database.db'
class MyDB:
db = None
def sql_connection(self):
if self.db == None:
try:
self.db = sqlite3.connect(sensor_db)
except Error:
print(Error)
def sql_close(self):
self.db.close()
self.db = None
def put_data_to_db(self,id='',temperature=0):
cursorObj = self.db.cursor()
cmd = 'create table if not exists %s(id text, temperature real, time text)'%id
cursorObj.execute(cmd)
time = datetime.datetime.now()
time = time.strftime('%Y-%m-%d %H:%M:%S')
cmd = 'INSERT INTO %s VALUES (?,?,?)'%id
cursorObj.execute(cmd,(id,temperature,time))
self.db.commit()
def get_id_data_number(self,id):
cursorObj = self.db.cursor()
cmd = 'SELECT * FROM %s '%id
cursorObj.execute(cmd)
rows = cursorObj.fetchall()
number = len(rows)
print(f'sensor-{id} have {number} datas')
return number
def get_id_all_data_from_db(self,id):
cursorObj = self.db.cursor()
cmd = 'SELECT * FROM %s '%id
cursorObj.execute(cmd)
rows = cursorObj.fetchall()
return rows
def get_data_from_db(self,id,index):
number = self.get_id_data_number(id)
if index >= number:
return []
else:
cursorObj = self.db.cursor()
cmd = 'SELECT * FROM %s '%id
cursorObj.execute(cmd)
rows = cursorObj.fetchall()
return rows[index]
def write_data_to_db(self,id,temperature):
self.sql_connection()
self.put_data_to_db(id,temperature)
self.sql_close()
def read_data_from_db(self,id,index):
self.sql_connection()
self.get_data_from_db(id,index)
self.sql_close()
def read_id_data_number(self,id):
self.sql_connection()
number = self.get_id_data_number(id)
self.sql_close()
return number
def read_id_all_data(self,id):
self.sql_connection()
datas = self.get_id_all_data_from_db(id)
self.sql_close()
return datas
def get_table_list(self):
self.sql_connection()
cursorObj = self.db.cursor()
cursorObj.execute('SELECT name from sqlite_master where type= "table"')
table_list = cursorObj.fetchall()
self.sql_close()
return table_list
if __name__ == '__main__':
db = MyDB()
db.write_data_to_db('001',27)
value = db.read_data_from_db('001',1)
print(value)
mqtt客户端 save_sensor_data.py
import paho.mqtt.client as mqtt
import json
from control_database import MyDB
import send_email
sub_topic = 'sensor/temperature'
mqtt_server = "127.0.0.1"
def unpack_json_data(data):
raw_data = json.loads(data)
print("id:"+raw_data['id']+" "+"value:" + str(raw_data['value']))
return raw_data
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def on_message(client, userdata, msg):
print("get sensor data")
data = unpack_json_data(msg.payload)
db = MyDB()
db.write_data_to_db(data['id'],data['value'])
if float(data['value']) > 26:
send_email.send(f"传感器:{data['id']}温度超限,当前温度{data['value']}")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_server, 1883, 60)
client.subscribe(topic=sub_topic)
client.loop_forever()
发送邮件 send_email.py
import smtplib
from email.mime.text import MIMEText
mailhost = 'smtp.163.com'
mailuser = '15**********@163.com'
mailpass = 'T**********A'
sender = '15**********@163.com'
def send(message_content,recev='51*******93@qq.com'):
receivers = [recev]
message = MIMEText(message_content)
message['Subject'] = '温度报警数据'
message['From'] = sender
message['To'] = receivers[0]
try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mailhost,25)
smtpObj.login(mailuser,mailpass)
smtpObj.sendmail(sender,receivers,message.as_string())
smtpObj.quit()
print("send sucess")
except smtplib.SMTPException as e:
print("error",e)
if __name__ == "__main__":
send("test smtp email")
ST的开发板资料很丰富,官方提供了wiki网站,能够很方便的帮助我们开始编程设计。而且官方提供的linux的资源也很丰富,比如可以直接使用python进行编程,而且在编程中用到的很多python包都能够在线安装,大大简化了开发的流程,节约了开发的时间。