【2024 DigiKey 创意大赛】基于Raspberry Pi 5的植物生长监管系统—2、数据采集&存储
[复制链接]
本帖最后由 Wenyou 于 2024-9-23 22:10 编辑
这次我来分享数据采集和存储过程,数据采集主要由下位机ESP32完成,采集的数据通过mqtt传输给上位机树莓派5,并由树莓派5存储进sqlite数据库中完成数据获取和存储的过程。
首先搭建mqtt环境,使用emqx作为mqtt服务端。
在https://www.emqx.com/zh/downloads-and-install/broker选择适合自己系统的版本。
页面中给出了下载安装和启动的代码,复制执行即可。
启动成功后通过树莓派的IP地址加上18083端口进入登录界面,使用默认账号密码admin/public登录。
登录成功即可看到mqtt管理界面,这里可以实时显示mqtt的连接数量与信息发布接收情况,便于开发。
服务端安装完成,下面给ESP32安装mqtt。
上节中我们给ESP32安装了CircuitPython开发环境,CircuitPython默认并没有支持mqtt的库,需要我们手动编写或添加,前往下载对应CircuitPython版本的捆绑包,此捆绑包中包含了大量可能会用到的CircuitPython库。
通过Thonny连接ESP32,然后根据下图所示方式即可上传mqtt库。
mqtt库可能会依赖其他库,缺少什么根据上面的方法上传即可。
库上传好了,找一个示例代码验证连接。
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
import os
import ssl
import time
import ipaddress
import socketpool
import wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT
#连接WIFI
wifi.radio.connect(os.getenv("SSID"), os.getenv("PASSWORD"))
#mqtt服务端IP
ping_ip = ipaddress.IPv4Address(os.getenv("mqtt_broker"))
ping = wifi.radio.ping(ip=ping_ip)
if ping is None:
print("连接失败")
else:
print(f"Pinging 'mqttserver' took: {ping * 1000} ms")
# Setup a feed named 'photocell' for publishing to a feed
photocell_feed ="/feeds/photocell"
# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = "/feeds/onoff"
def connected(client, userdata, flags, rc):
# This function will be called when the client is connected
# successfully to the broker.
print(f"Connected to Adafruit IO! Listening for topic changes on {onoff_feed}")
# Subscribe to all changes on the onoff_feed.
client.subscribe(onoff_feed)
def disconnected(client, userdata, rc):
# This method is called when the client is disconnected
print("Disconnected from Adafruit IO!")
def message(client, topic, message):
# This method is called when a topic the client is subscribed to
# has a new message.
print(f"New message on topic {topic}: {message}")
# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
broker=os.getenv("mqtt_broker"),
port=1883,
socket_pool=pool,
)
# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message
# Connect the client to the MQTT broker.
print("Connecting to MQTT...")
mqtt_client.connect()
photocell_val = 0
while True:
# Poll the message queue
mqtt_client.loop(timeout=1)
# Send a new message
print(f"Sending photocell value: {photocell_val}...")
mqtt_client.publish(photocell_feed, photocell_val)
print("Sent!")
photocell_val += 1
time.sleep(2)
接下来进行数据采集。
首先是必选物料中的D6T-1A-01,用来获取温度。
这个模块文档中并没有给出适用于CircuitPython的示例代码,倒是有C写的适用于Raspberry Pi平台的示例,那么我们直接拿来用。
为代码地址。
顺便把D6T-1A-01的代码贴出来。
/*
* MIT License
* Copyright (c) 2019, 2018 - present OMRON Corporation
* All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
/* includes */
#include <stdio.h>
#include <stdint.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <linux/i2c-dev.h>
#include <stdbool.h>
#include <time.h>
/* defines */
#define D6T_ADDR 0x0A // for I2C 7bit address
#define D6T_CMD 0x4C // for D6T-44L-06/06H, D6T-8L-09/09H, for D6T-1A-01/02
#define N_ROW 1
#define N_PIXEL 1
#define N_READ ((N_PIXEL + 1) * 2 + 1)
#define RASPBERRY_PI_I2C "/dev/i2c-1"
#define I2CDEV RASPBERRY_PI_I2C
uint8_t rbuf[N_READ];
double ptat;
double pix_data[N_PIXEL];
/* I2C functions */
/** <!-- i2c_read_reg8 {{{1 --> I2C read function for bytes transfer.
*/
uint32_t i2c_read_reg8(uint8_t devAddr, uint8_t regAddr,
uint8_t *data, int length
) {
int fd = open(I2CDEV, O_RDWR);
if (fd < 0) {
fprintf(stderr, "Failed to open device: %s\n", strerror(errno));
return 21;
}
int err = 0;
do {
if (ioctl(fd, I2C_SLAVE, devAddr) < 0) {
fprintf(stderr, "Failed to select device: %s\n", strerror(errno));
err = 22; break;
}
if (write(fd, ®Addr, 1) != 1) {
fprintf(stderr, "Failed to write reg: %s\n", strerror(errno));
err = 23; break;
}
int count = read(fd, data, length);
if (count < 0) {
fprintf(stderr, "Failed to read device(%d): %s\n",
count, strerror(errno));
err = 24; break;
} else if (count != length) {
fprintf(stderr, "Short read from device, expected %d, got %d\n",
length, count);
err = 25; break;
}
} while (false);
close(fd);
return err;
}
/** <!-- i2c_write_reg8 {{{1 --> I2C read function for bytes transfer.
*/
uint32_t i2c_write_reg8(uint8_t devAddr,
uint8_t *data, int length
) {
int fd = open(I2CDEV, O_RDWR);
if (fd < 0) {
fprintf(stderr, "Failed to open device: %s\n", strerror(errno));
return 21;
}
int err = 0;
do {
if (ioctl(fd, I2C_SLAVE, devAddr) < 0) {
fprintf(stderr, "Failed to select device: %s\n", strerror(errno));
err = 22; break;
}
if (write(fd, data, length) != length) {
fprintf(stderr, "Failed to write reg: %s\n", strerror(errno));
err = 23; break;
}
} while (false);
close(fd);
return err;
}
uint8_t calc_crc(uint8_t data) {
int index;
uint8_t temp;
for (index = 0; index < 8; index++) {
temp = data;
data <<= 1;
if (temp & 0x80) {data ^= 0x07;}
}
return data;
}
/** <!-- D6T_checkPEC {{{ 1--> D6T PEC(Packet Error Check) calculation.
* calculate the data sequence,
* from an I2C Read client address (8bit) to thermal data end.
*/
bool D6T_checkPEC(uint8_t buf[], int n) {
int i;
uint8_t crc = calc_crc((D6T_ADDR << 1) | 1); // I2C Read address (8bit)
for (i = 0; i < n; i++) {
crc = calc_crc(buf[i] ^ crc);
}
bool ret = crc != buf[n];
if (ret) {
fprintf(stderr,
"PEC check failed: %02X(cal)-%02X(get)\n", crc, buf[n]);
}
return ret;
}
/** <!-- conv8us_s16_le {{{1 --> convert a 16bit data from the byte stream.
*/
int16_t conv8us_s16_le(uint8_t* buf, int n) {
uint16_t ret;
ret = (uint16_t)buf[n];
ret += ((uint16_t)buf[n + 1]) << 8;
return (int16_t)ret; // and convert negative.
}
void delay(int msec) {
struct timespec ts = {.tv_sec = msec / 1000,
.tv_nsec = (msec % 1000) * 1000000};
nanosleep(&ts, NULL);
}
void initialSetting(void) {
}
/** <!-- main - Thermal sensor {{{1 -->
* Read data
*/
int main() {
int i;
int16_t itemp;
delay(220);
while(1){
// Read data via I2C
memset(rbuf, 0, N_READ);
uint32_t ret = i2c_read_reg8(D6T_ADDR, D6T_CMD, rbuf, N_READ);
D6T_checkPEC(rbuf, N_READ - 1);
//Convert to temperature data (degC)
ptat = (double)conv8us_s16_le(rbuf, 0) / 10.0;
for (i = 0; i < N_PIXEL; i++) {
itemp = conv8us_s16_le(rbuf, 2 + 2*i);
pix_data[i] = (double)itemp / 10.0;
}
//Output results
printf("PTAT: %4.1f [degC], Temperature: ", ptat);
for (i = 0; i < N_PIXEL; i++) {
printf("%4.1f, ", pix_data[i]);
}
printf("[degC]\n");
delay(100);
}
}
// vi: ft=c:fdm=marker:et:sw=4:tw=80
根据图示连接对应引脚,SDA连接树莓派GPIO2,SCL连接树莓派GPIO3。
示例网站中给出了详细的编译运行步骤。
编译成功后尝试执行d6t-1a,发现报错。
去代码中看一下报错位置,应该是找不到I2C接口。
想起来还没有开启I2C接口,需要先配置一下,使用sudo raspi-config打开配置界面,选择第三项接口配置。
选择第五项I2C。
选择Yes。
如下图所示则开启成功。
重新运行示例代码,这次成功获取到了当前的环境温度,示例程序的采样速率还是很快的,差不多一秒十行。手掌从传感器上方划过,示数可以很快变化。
验证了传感器可用,接下来将它连接到下位机ESP32上进行开发,我们需要获取当前温度并通过mqtt传输给上位机树莓派。
首先进行接线,SDA连接GPIO6,SCL连接GPIO7,需要注意的是ESP32需要外接上拉电阻。
接线如下图所示。
接好线后使用下面的代码进行I2C地址遍历,查询总线上的I2C地址。
import time
import board
import busio
# List of potential I2C busses
ALL_I2C = ("board.I2C()", "board.STEMMA_I2C()", "busio.I2C(board.IO7, board.IO6)")
# Determine which busses are valid
found_i2c = []
for name in ALL_I2C:
try:
print("Checking {}...".format(name), end="")
bus = eval(name)
bus.unlock()
found_i2c.append((name, bus))
print("ADDED.")
except Exception as e:
print("SKIPPED:", e)
# Scan valid busses
if len(found_i2c):
print("-" * 40)
print("I2C SCAN")
print("-" * 40)
while True:
for bus_info in found_i2c:
name = bus_info[0]
bus = bus_info[1]
while not bus.try_lock():
pass
print(
name,
"addresses found:",
[hex(device_address) for device_address in bus.scan()],
)
bus.unlock()
time.sleep(2)
else:
print("No valid I2C bus found.")
可以看到成功查找到了D6T-1A的地址0xa,说明我们的接线没有问题。
接下来进行代码编写。将官方给出的C示例代码,转为适用于CircuitPython的代码。
import time
import board
import busio
D6T_ADDR = 0x0A # I2C设备地址
D6T_CMD = 0x4C # 设备命令
# 数据长度定义
N_PIXEL = 1
N_READ = (N_PIXEL + 1) * 2 + 1
# 初始化I2C总线
i2c = busio.I2C(board.IO7, board.IO6)
# crc校验
def calc_crc(data):
crc = 0
data &= 0xFF # 确保输入为8位
for _ in range(8):
if (crc ^ data) & 0x80:
crc = ((crc << 1) ^ 0x07) & 0xFF
else:
crc = (crc << 1) & 0xFF
data <<= 1
return crc
# PEC校验
def D6T_checkPEC(buf):
crc = calc_crc((D6T_ADDR << 1) | 1)
for b in buf[:-1]:
crc = calc_crc(b ^ crc)
if crc != buf[-1]:
print(f"PEC校验失败: {crc:02X}(calculated) - {buf[-1]:02X}(received)")
return False
return True
def conv8us_s16_le(buf, n):
return buf[n] + (buf[n + 1] << 8)
def read_thermal_data():
rbuf = bytearray(N_READ)
# 锁定总线
if not i2c.try_lock():
print("无法锁定I2C总线")
return None, None
try:
# 发送命令并读取数据
i2c.writeto(D6T_ADDR, bytes([D6T_CMD]))
i2c.readfrom_into(D6T_ADDR, rbuf)
except Exception as e:
print(f"I2C 读取失败: {e}")
return None, None
finally:
# 解锁总线
i2c.unlock()
# 校验PEC
if not D6T_checkPEC(rbuf):
return None, None
# 转换温度数据
ptat = conv8us_s16_le(rbuf, 0) / 10.0
pix_data = [conv8us_s16_le(rbuf, 2 + 2 * i) / 10.0 for i in range(N_PIXEL)]
return ptat, pix_data
def main():
time.sleep(0.22) # 稳定I2C总线
while True:
ptat, pix_data = read_thermal_data()
if ptat is not None:
print(f"PTAT: {ptat:.1f} [degC], Temperature: ", end="")
for pix in pix_data:
print(f"{pix:.1f}, ", end="")
print("[degC]")
else:
print("读取数据时出现错误")
time.sleep(0.1)
if __name__ == '__main__':
main()
然后将数据用mqtt传输给上位机,并由上位机存储进Sqlite数据库。
目前上位机只安装了mqtt服务端,要使用python接收数据,我们还需要安装客户端。
pip install paho-mqtt
ESP32连接mqtt发送数据:
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
import os
import ssl
import time
import ipaddress
import socketpool
import wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT
#通过d6t1a01采集数据
from d6t1a01 import read_thermal_data
#搜索WIFI
# for network in wifi.radio.start_scanning_networks():
# print(f"SSID:{str(network.ssid, 'utf-8')},RSSI:{network.rssi},Channel:{network.channel}")
# wifi.radio.stop_scanning_networks()
#连接WIFI
print(f"正在连接WIFI")
wifi.radio.connect(os.getenv("SSID"), os.getenv("PASSWORD"))
#mqtt服务端IP
ping_ip = ipaddress.IPv4Address(os.getenv("mqtt_broker"))
ping = wifi.radio.ping(ip=ping_ip)
if ping is None:
print(f"ping {os.getenv('mqtt_broker')} 失败")
else:
print(f"Pinging 'mqttserver' took: {ping * 1000} ms")
# Setup a feed named 'photocell' for publishing to a feed
temperature_feed ="/ESP32/d6t"
# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = "/Pi/onoff"
def connected(client, userdata, flags, rc):
# 连接成功触发
print(f"连接成功,订阅主题: {onoff_feed}")
client.subscribe(onoff_feed)
def disconnected(client, userdata, rc):
# 断开连接触发
print("断开连接!")
def message(client, topic, message):
# 接收消息触发
print(f"接收数据:{topic}: {message}")
# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)
# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
broker=os.getenv("mqtt_broker"),
port=1883,
socket_pool=pool,
)
# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message
# Connect the client to the MQTT broker.
print("正在连接 MQTT...")
mqtt_client.connect()
while True:
# 检查是否有消息到达或其他需要处理的事件
mqtt_client.loop(timeout=1)
temperature = read_thermal_data()
print(f"发送温度数据: {temperature[1][0]}")
mqtt_client.publish(temperature_feed, temperature[1][0])
time.sleep(2)
树莓派5连接mqtt接收数据并存储:
import time
import datetime
import sqlite3
from paho.mqtt import client as mqtt
conn = sqlite3.connect('test.db',check_same_thread=False)
c = conn.cursor()
#创建数据表,执行一次后注释即可,重复执行会报错
c.execute('''CREATE TABLE temp
(id INTEGER PRIMARY KEY AUTOINCREMENT,
temperature float NOT NULL,
time DateTime NOT NULL
);''')
def on_connect(client,userdata,flags,rc):
if rc==0:
print("连接成功")
client.subscribe("/ESP32/d6t")
else:
print("连接失败")
def on_message(client,userdata,msg):
'''接收端回调函数'''
temperature = float(msg.payload.decode('utf-8'))
print(temperature)
c.execute(f'INSERT INTO temp (temperature,time) VALUES ("{temperature}","{datetime.datetime.now()}")')
conn.commit()
if temperature > 28:
client.publish(f"/Pi/onoff",True,0,False)
else:
client.publish(f"/Pi/onoff",False,0,False)
if __name__ == '__main__':
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
try:
client.connect("127.0.0.1",1883,60)
client.loop_start() #开启一个独立的循环监听线程 loop_forever()阻塞式线程循环
except KeyboardInterrupt:
client.disconnect()
while True:
time.sleep(1)
运行代码,ESP32每隔两秒与D6T通信采集一次数据,并将数据发送给树莓派5,树莓派5接收数据后将数据存储进数据库,并根据数据的值向ESP32发送控制信号。
数据库中存储温度值与接收时间,此处间隔3秒是因为树莓派还设有1s延时。
以上便是整个系统最核心的部分,后面要做的只是数据包装与模块组装。其他传感器只需照猫画虎即可实现数据采集与存储功能。
下面简单列出其他传感器的数据采集代码,首先是DNT11,用于获取湿度,虽然它也可以获取温度,但我们有D6T-1A-01了,那个更准确。
import board
import time
import adafruit_dht
def get_Temp_and_Humidity(dht):
try:
temperature = dht.temperature
humidity = dht.humidity
return (temperature, humidity)
except RuntimeError as e:
return (-1, -1)
if __name__ == "__main__":
dht = adafruit_dht.DHT11(board.IO21)
while True:
temperature,humidity = get_Temp_and_Humidity(dht)
print("temperature{:.1f}*c\t Humidity:{}%".format(temperature,humidity))
time.sleep(2.5)
然后是光照传感器,用于接收环境光照强度以判断何时打开补光灯,上面提到的CircuitPython捆绑包中含有适用于此传感器的库。
# SPDX-FileCopyrightText: 2020 Bryan Siepert, written for Adafruit Industries
# SPDX-License-Identifier: Unlicense
import time
import board
import adafruit_bh1750
import busio
i2c = busio.I2C(board.IO22,board.IO23)
sensor = adafruit_bh1750.BH1750(i2c)
while True:
print("%.2f Lux" % sensor.lux)
time.sleep(2)
接着是土壤湿度传感器与水位传感器。土壤湿度传感器用于监测土壤湿度,以判断何时打开水电磁阀浇水。水位传感器用来判断是否有水漫出花盆,防止浇水过多造成溢出。它们的原理相似,所以放在一起来介绍。
import analogio
import board
import digitalio
import time
pin = analogio.AnalogIn(board.IO3)
while True:
print(pin.value)
if(pin.value > 10000):
print("True")
else:
print("False")
time.sleep(1.5)
水位传感器:
土壤湿度传感器:
至此,本系统的数据采集与存储部分介绍完毕,下节计划分享Web界面部分,感谢大家的观看。
|