613|0

274

帖子

8

TA的资源

纯净的硅(初级)

楼主
 

【STM32MP135F-DK】10-项目演示和汇总 [复制链接]

这次以一个小项目为目标,对STM32MP135F开发板进行了测评,在测评中实现了开发板的系统烧写、wifi联网、网络校时、SSH连接、使用python控制GPIO、MQTT服务器的搭建、MQTT客户端的编程、GTK桌面程序编程、发送邮件、数据库保存数据等功能。
下面的视频演示了使用上面的功能实现的一个网关,该网关部署MQTT服务,接收传感器设备上传的温度数据,并保存到数据库中,用户可以通过桌面程序查看各传感器上传的数据,当判断温度超过26度时,该网关会自动发送邮件进行报警。
邮件内容如下图所示
 
 
功能演示视频
202401121555

 
 
源代码
桌面应用 app.py
import gi

gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
from control_database import MyDB

class ListBoxRowWithData(Gtk.ListBoxRow):
    def __init__(self, data):
        super().__init__()
        self.data = data
        self.add(Gtk.Label(label=data))


class ListBoxWindow(Gtk.Window):
    def __init__(self):
        super().__init__(title="sensor data")
        self.set_border_width(10)

        box_outer = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
        self.add(box_outer)

        label = Gtk.Label("Sensors")
        box_outer.pack_start(label,True,True,0)

        box_data = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
        box_outer.pack_start(box_data,True,True,0)

        listbox_2 = Gtk.ListBox()
        db = MyDB()
        items = db.get_table_list()

        for item in items:
            listbox_2.add(ListBoxRowWithData(item[0]))

        def on_row_activated(listbox_widget, row):
            print(row.data)
            db = MyDB()
            datas = db.read_id_all_data(row.data)
            items = []
            for data in datas:
                items.append("%02.2f    %s"%(data[1],data[2]))
            fresh_list3(items)

        def fresh_list3(items):
            children = listbox_3.get_children()
            for child in children:
                listbox_3.remove(child)
            for item in items:
                listbox_3.add(ListBoxRowWithData(item))
                listbox_3.show_all()

        listbox_2.connect("row-activated", on_row_activated)

        box_data.pack_start(listbox_2, True, True, 0)
        listbox_3 = Gtk.ListBox()

        box_data.pack_start(listbox_3, True, True, 0)
        listbox_2.show_all()
        listbox_3.show_all()

win = ListBoxWindow()
win.set_default_size(480, 272)
win.move(0, 0)
win.connect("destroy", Gtk.main_quit)
win.show_all()
Gtk.main()

数据库操作 control_database.py

import sqlite3
from sqlite3 import Error
import datetime

sensor_db = 'sensor_database.db'

class MyDB:
    db = None
    def sql_connection(self):
        if self.db == None:
            try:
                self.db = sqlite3.connect(sensor_db)
            except Error:
                print(Error)

    def sql_close(self):
        self.db.close()
        self.db = None

    def put_data_to_db(self,id='',temperature=0):
        cursorObj = self.db.cursor()
        cmd = 'create table if not exists %s(id text, temperature real, time text)'%id
        cursorObj.execute(cmd)
        time = datetime.datetime.now()
        time = time.strftime('%Y-%m-%d %H:%M:%S')
        cmd = 'INSERT INTO %s VALUES (?,?,?)'%id
        cursorObj.execute(cmd,(id,temperature,time))
        self.db.commit()

    def get_id_data_number(self,id):
        cursorObj = self.db.cursor()
        cmd = 'SELECT * FROM %s '%id
        cursorObj.execute(cmd)
        rows = cursorObj.fetchall()
        number = len(rows)
        print(f'sensor-{id} have {number} datas')        
        return number

    def get_id_all_data_from_db(self,id):
        cursorObj = self.db.cursor()
        cmd = 'SELECT * FROM %s '%id
        cursorObj.execute(cmd)
        rows = cursorObj.fetchall()
        return rows
        
    def get_data_from_db(self,id,index):
        number = self.get_id_data_number(id)
        if index >= number:
            return []
        else:
            cursorObj = self.db.cursor()
            cmd = 'SELECT * FROM %s '%id
            cursorObj.execute(cmd)
            rows = cursorObj.fetchall()
            return rows[index]
        
    def write_data_to_db(self,id,temperature):
        self.sql_connection()
        self.put_data_to_db(id,temperature)
        self.sql_close()

    def read_data_from_db(self,id,index):
        self.sql_connection()
        self.get_data_from_db(id,index)
        self.sql_close()

    def read_id_data_number(self,id):
        self.sql_connection()
        number = self.get_id_data_number(id)
        self.sql_close()
        return number
    
    def read_id_all_data(self,id):
        self.sql_connection()
        datas = self.get_id_all_data_from_db(id)
        self.sql_close()
        return datas
    
    def get_table_list(self):
        self.sql_connection()
        cursorObj = self.db.cursor()
        cursorObj.execute('SELECT name from sqlite_master where type= "table"')
        table_list = cursorObj.fetchall()
        self.sql_close()
        return table_list
    
if __name__ == '__main__':
    db = MyDB()
    db.write_data_to_db('001',27)
    value = db.read_data_from_db('001',1)
    print(value)

mqtt客户端 save_sensor_data.py

import paho.mqtt.client as mqtt
import json
from control_database import MyDB
import send_email

sub_topic = 'sensor/temperature'
mqtt_server = "127.0.0.1"


def unpack_json_data(data):
    raw_data = json.loads(data)
    print("id:"+raw_data['id']+" "+"value:" + str(raw_data['value']))
    return raw_data

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

def on_message(client, userdata, msg):
    print("get sensor data")
    data = unpack_json_data(msg.payload)
    db = MyDB()
    db.write_data_to_db(data['id'],data['value'])
    if float(data['value']) > 26:
        send_email.send(f"传感器:{data['id']}温度超限,当前温度{data['value']}")
    

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect(mqtt_server, 1883, 60)
client.subscribe(topic=sub_topic)

client.loop_forever()

发送邮件 send_email.py

import smtplib
from email.mime.text import MIMEText

mailhost = 'smtp.163.com'
mailuser = '15**********@163.com'
mailpass = 'T**********A'
sender = '15**********@163.com'

def send(message_content,recev='51*******93@qq.com'):
    receivers = [recev]
    message = MIMEText(message_content)
    message['Subject'] = '温度报警数据'
    message['From'] = sender
    message['To'] = receivers[0]
    try:
        smtpObj = smtplib.SMTP()
        smtpObj.connect(mailhost,25)
        smtpObj.login(mailuser,mailpass)
        smtpObj.sendmail(sender,receivers,message.as_string())

        smtpObj.quit()
        print("send sucess")
    except smtplib.SMTPException as e:
        print("error",e)

if __name__ == "__main__":
    send("test smtp email")

 

源代码.zip (2.71 KB, 下载次数: 2)
相关帖子:
总结
ST的开发板资料很丰富,官方提供了wiki网站,能够很方便的帮助我们开始编程设计。而且官方提供的linux的资源也很丰富,比如可以直接使用python进行编程,而且在编程中用到的很多python包都能够在线安装,大大简化了开发的流程,节约了开发的时间。
此帖出自stm32/stm8论坛
点赞 关注(1)
 

回复
举报
您需要登录后才可以回帖 登录 | 注册

查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/8 下一条

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2025 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表