232|2

9

帖子

1

TA的资源

一粒金砂(中级)

楼主
 

【2024 DigiKey 创意大赛】基于Raspberry Pi 5的植物生长监管系统—2、数据采集&存储 [复制链接]

  本帖最后由 Wenyou 于 2024-9-23 22:10 编辑

这次我来分享数据采集和存储过程,数据采集主要由下位机ESP32完成,采集的数据通过mqtt传输给上位机树莓派5,并由树莓派5存储进sqlite数据库中完成数据获取和存储的过程。

首先搭建mqtt环境,使用emqx作为mqtt服务端。

https://www.emqx.com/zh/downloads-and-install/broker选择适合自己系统的版本。

页面中给出了下载安装和启动的代码,复制执行即可。
启动成功后通过树莓派的IP地址加上18083端口进入登录界面,使用默认账号密码admin/public登录。
登录成功即可看到mqtt管理界面,这里可以实时显示mqtt的连接数量与信息发布接收情况,便于开发。
服务端安装完成,下面给ESP32安装mqtt。
上节中我们给ESP32安装了CircuitPython开发环境,CircuitPython默认并没有支持mqtt的库,需要我们手动编写或添加,前往
链接已隐藏,如需查看请登录或者注册
下载对应CircuitPython版本的捆绑包,此捆绑包中包含了大量可能会用到的CircuitPython库。
通过Thonny连接ESP32,然后根据下图所示方式即可上传mqtt库。
mqtt库可能会依赖其他库,缺少什么根据上面的方法上传即可。
库上传好了,找一个示例代码验证连接。
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
import os
import ssl
import time
import ipaddress
import socketpool
import wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT
#连接WIFI
wifi.radio.connect(os.getenv("SSID"), os.getenv("PASSWORD"))
#mqtt服务端IP
ping_ip = ipaddress.IPv4Address(os.getenv("mqtt_broker"))
ping = wifi.radio.ping(ip=ping_ip)
if ping is None:
    print("连接失败")
else:
    print(f"Pinging 'mqttserver' took: {ping * 1000} ms")

# Setup a feed named 'photocell' for publishing to a feed
photocell_feed ="/feeds/photocell"
# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = "/feeds/onoff"

def connected(client, userdata, flags, rc):
    # This function will be called when the client is connected
    # successfully to the broker.
    print(f"Connected to Adafruit IO! Listening for topic changes on {onoff_feed}")
    # Subscribe to all changes on the onoff_feed.
    client.subscribe(onoff_feed)


def disconnected(client, userdata, rc):
    # This method is called when the client is disconnected
    print("Disconnected from Adafruit IO!")

def message(client, topic, message):
    # This method is called when a topic the client is subscribed to
    # has a new message.
    print(f"New message on topic {topic}: {message}")

# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)

# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
    broker=os.getenv("mqtt_broker"),
    port=1883,
    socket_pool=pool,
)

# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

# Connect the client to the MQTT broker.
print("Connecting to MQTT...")
mqtt_client.connect()

photocell_val = 0
while True:
    # Poll the message queue
    mqtt_client.loop(timeout=1)
    # Send a new message
    print(f"Sending photocell value: {photocell_val}...")
    mqtt_client.publish(photocell_feed, photocell_val)
    print("Sent!")
    photocell_val += 1
    time.sleep(2)
    

接下来进行数据采集。

首先是必选物料中的D6T-1A-01,用来获取温度。

这个模块文档中并没有给出适用于CircuitPython的示例代码,倒是有C写的适用于Raspberry Pi平台的示例,那么我们直接拿来用。

链接已隐藏,如需查看请登录或者注册
为代码地址。

顺便把D6T-1A-01的代码贴出来。

/*
 * MIT License
 * Copyright (c) 2019, 2018 - present OMRON Corporation
 * All rights reserved.
 *
 * Permission is hereby granted, free of charge, to any person obtaining a
 * copy of this software and associated documentation files (the "Software"),
 * to deal in the Software without restriction, including without limitation
 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
 * and/or sell copies of the Software, and to permit persons to whom the
 * Software is furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
 * DEALINGS IN THE SOFTWARE.
 */

/* includes */
#include <stdio.h>
#include <stdint.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <sys/ioctl.h>
#include <linux/i2c-dev.h>
#include <stdbool.h>
#include <time.h>

/* defines */
#define D6T_ADDR 0x0A  // for I2C 7bit address
#define D6T_CMD 0x4C  // for D6T-44L-06/06H, D6T-8L-09/09H, for D6T-1A-01/02

#define N_ROW 1
#define N_PIXEL 1
#define N_READ ((N_PIXEL + 1) * 2 + 1)
#define RASPBERRY_PI_I2C    "/dev/i2c-1"
#define I2CDEV              RASPBERRY_PI_I2C

uint8_t rbuf[N_READ];
double ptat;
double pix_data[N_PIXEL];

/* I2C functions */
/** <!-- i2c_read_reg8 {{{1 --> I2C read function for bytes transfer.
 */
uint32_t i2c_read_reg8(uint8_t devAddr, uint8_t regAddr,
                       uint8_t *data, int length
) {
    int fd = open(I2CDEV, O_RDWR);

    if (fd < 0) {
        fprintf(stderr, "Failed to open device: %s\n", strerror(errno));
        return 21;
    }
    int err = 0;
    do {
        if (ioctl(fd, I2C_SLAVE, devAddr) < 0) {
            fprintf(stderr, "Failed to select device: %s\n", strerror(errno));
            err = 22; break;
        }
        if (write(fd, ®Addr, 1) != 1) {
            fprintf(stderr, "Failed to write reg: %s\n", strerror(errno));
            err = 23; break;
        }
        int count = read(fd, data, length);
        if (count < 0) {
            fprintf(stderr, "Failed to read device(%d): %s\n",
                    count, strerror(errno));
            err = 24; break;
        } else if (count != length) {
            fprintf(stderr, "Short read  from device, expected %d, got %d\n",
                    length, count);
            err = 25; break;
        }
    } while (false);
    close(fd);
    return err;
}

/** <!-- i2c_write_reg8 {{{1 --> I2C read function for bytes transfer.
 */
uint32_t i2c_write_reg8(uint8_t devAddr,
                        uint8_t *data, int length
) {
    int fd = open(I2CDEV, O_RDWR);
    if (fd < 0) {
        fprintf(stderr, "Failed to open device: %s\n", strerror(errno));
        return 21;
    }
    int err = 0;
    do {
        if (ioctl(fd, I2C_SLAVE, devAddr) < 0) {
            fprintf(stderr, "Failed to select device: %s\n", strerror(errno));
            err = 22; break;
        }
        if (write(fd, data, length) != length) {
            fprintf(stderr, "Failed to write reg: %s\n", strerror(errno));
            err = 23; break;
        }
    } while (false);
    close(fd);
    return err;
}

uint8_t calc_crc(uint8_t data) {
    int index;
    uint8_t temp;
    for (index = 0; index < 8; index++) {
        temp = data;
        data <<= 1;
        if (temp & 0x80) {data ^= 0x07;}
    }
    return data;
}

/** <!-- D6T_checkPEC {{{ 1--> D6T PEC(Packet Error Check) calculation.
 * calculate the data sequence,
 * from an I2C Read client address (8bit) to thermal data end.
 */
bool D6T_checkPEC(uint8_t buf[], int n) {
    int i;
    uint8_t crc = calc_crc((D6T_ADDR << 1) | 1);  // I2C Read address (8bit)
    for (i = 0; i < n; i++) {
        crc = calc_crc(buf[i] ^ crc);
    }
    bool ret = crc != buf[n];
    if (ret) {
        fprintf(stderr,
                "PEC check failed: %02X(cal)-%02X(get)\n", crc, buf[n]);
    }
    return ret;
}


/** <!-- conv8us_s16_le {{{1 --> convert a 16bit data from the byte stream.
 */
int16_t conv8us_s16_le(uint8_t* buf, int n) {
    uint16_t ret;
    ret = (uint16_t)buf[n];
    ret += ((uint16_t)buf[n + 1]) << 8;
    return (int16_t)ret;   // and convert negative.
}


void delay(int msec) {
    struct timespec ts = {.tv_sec = msec / 1000,
                          .tv_nsec = (msec % 1000) * 1000000};
    nanosleep(&ts, NULL);
}

void initialSetting(void) {
}

/** <!-- main - Thermal sensor {{{1 -->
 * Read data
 */
int main() {
    int i;
	int16_t itemp;
	
	delay(220);	
	
	while(1){
		// Read data via I2C
		memset(rbuf, 0, N_READ);
		uint32_t ret = i2c_read_reg8(D6T_ADDR, D6T_CMD, rbuf, N_READ);
		D6T_checkPEC(rbuf, N_READ - 1);
		
        //Convert to temperature data (degC)
		ptat = (double)conv8us_s16_le(rbuf, 0) / 10.0;
		for (i = 0; i < N_PIXEL; i++) {
			itemp = conv8us_s16_le(rbuf, 2 + 2*i);
			pix_data[i] = (double)itemp / 10.0;
		}
		
        //Output results		
		printf("PTAT: %4.1f [degC], Temperature: ", ptat);
		for (i = 0; i < N_PIXEL; i++) {
		    printf("%4.1f, ", pix_data[i]);
		}
		printf("[degC]\n");
		
		delay(100);
	}
}
// vi: ft=c:fdm=marker:et:sw=4:tw=80
根据图示连接对应引脚,SDA连接树莓派GPIO2,SCL连接树莓派GPIO3。
 

示例网站中给出了详细的编译运行步骤。

编译成功后尝试执行d6t-1a,发现报错。
去代码中看一下报错位置,应该是找不到I2C接口。
想起来还没有开启I2C接口,需要先配置一下,使用sudo raspi-config打开配置界面,选择第三项接口配置。
选择第五项I2C。
选择Yes。
如下图所示则开启成功。
重新运行示例代码,这次成功获取到了当前的环境温度,示例程序的采样速率还是很快的,差不多一秒十行。手掌从传感器上方划过,示数可以很快变化。
验证了传感器可用,接下来将它连接到下位机ESP32上进行开发,我们需要获取当前温度并通过mqtt传输给上位机树莓派。
首先进行接线,SDA连接GPIO6,SCL连接GPIO7,需要注意的是ESP32需要外接上拉电阻。
接线如下图所示。
接好线后使用下面的代码进行I2C地址遍历,查询总线上的I2C地址。
import time
import board
import busio

# List of potential I2C busses
ALL_I2C = ("board.I2C()", "board.STEMMA_I2C()", "busio.I2C(board.IO7, board.IO6)")

# Determine which busses are valid
found_i2c = []
for name in ALL_I2C:
    try:
        print("Checking {}...".format(name), end="")
        bus = eval(name)
        bus.unlock()
        found_i2c.append((name, bus))
        print("ADDED.")
    except Exception as e:
        print("SKIPPED:", e)

# Scan valid busses
if len(found_i2c):
    print("-" * 40)
    print("I2C SCAN")
    print("-" * 40)
    while True:
        for bus_info in found_i2c:
            name = bus_info[0]
            bus = bus_info[1]

            while not bus.try_lock():
                pass

            print(
                name,
                "addresses found:",
                [hex(device_address) for device_address in bus.scan()],
            )

            bus.unlock()

        time.sleep(2)
else:
    print("No valid I2C bus found.")
可以看到成功查找到了D6T-1A的地址0xa,说明我们的接线没有问题。
接下来进行代码编写。将官方给出的C示例代码,转为适用于CircuitPython的代码。
import time
import board
import busio

D6T_ADDR = 0x0A  # I2C设备地址
D6T_CMD = 0x4C  # 设备命令

# 数据长度定义
N_PIXEL = 1
N_READ = (N_PIXEL + 1) * 2 + 1

# 初始化I2C总线
i2c = busio.I2C(board.IO7, board.IO6)

# crc校验
def calc_crc(data):
    crc = 0
    data &= 0xFF  # 确保输入为8位
    for _ in range(8):
        if (crc ^ data) & 0x80:
            crc = ((crc << 1) ^ 0x07) & 0xFF
        else:
            crc = (crc << 1) & 0xFF
        data <<= 1
    return crc

# PEC校验
def D6T_checkPEC(buf):
    crc = calc_crc((D6T_ADDR << 1) | 1)
    for b in buf[:-1]:
        crc = calc_crc(b ^ crc)
    if crc != buf[-1]:
        print(f"PEC校验失败: {crc:02X}(calculated) - {buf[-1]:02X}(received)")
        return False
    return True

def conv8us_s16_le(buf, n):
    return buf[n] + (buf[n + 1] << 8)

def read_thermal_data():
    rbuf = bytearray(N_READ)
    # 锁定总线
    if not i2c.try_lock():
        print("无法锁定I2C总线")
        return None, None

    try:
        # 发送命令并读取数据
        i2c.writeto(D6T_ADDR, bytes([D6T_CMD]))
        i2c.readfrom_into(D6T_ADDR, rbuf)
    except Exception as e:
        print(f"I2C 读取失败: {e}")
        return None, None
    finally:
        # 解锁总线
        i2c.unlock()

    # 校验PEC
    if not D6T_checkPEC(rbuf):
        return None, None

    # 转换温度数据
    ptat = conv8us_s16_le(rbuf, 0) / 10.0
    pix_data = [conv8us_s16_le(rbuf, 2 + 2 * i) / 10.0 for i in range(N_PIXEL)]
    
    return ptat, pix_data

def main():
    time.sleep(0.22)  # 稳定I2C总线
    
    while True:
        ptat, pix_data = read_thermal_data()
        if ptat is not None:
            print(f"PTAT: {ptat:.1f} [degC], Temperature: ", end="")
            for pix in pix_data:
                print(f"{pix:.1f}, ", end="")
            print("[degC]")
        else:
            print("读取数据时出现错误")
        
        time.sleep(0.1)

if __name__ == '__main__':
    main()
然后将数据用mqtt传输给上位机,并由上位机存储进Sqlite数据库。
目前上位机只安装了mqtt服务端,要使用python接收数据,我们还需要安装客户端。
pip install paho-mqtt
ESP32连接mqtt发送数据:
# SPDX-FileCopyrightText: 2021 ladyada for Adafruit Industries
# SPDX-License-Identifier: MIT
import os
import ssl
import time
import ipaddress
import socketpool
import wifi
import adafruit_minimqtt.adafruit_minimqtt as MQTT

#通过d6t1a01采集数据
from d6t1a01 import read_thermal_data

#搜索WIFI
# for network in wifi.radio.start_scanning_networks():
#     print(f"SSID:{str(network.ssid, 'utf-8')},RSSI:{network.rssi},Channel:{network.channel}")
# wifi.radio.stop_scanning_networks()

#连接WIFI
print(f"正在连接WIFI")
wifi.radio.connect(os.getenv("SSID"), os.getenv("PASSWORD"))

#mqtt服务端IP
ping_ip = ipaddress.IPv4Address(os.getenv("mqtt_broker"))
ping = wifi.radio.ping(ip=ping_ip)
if ping is None:
    print(f"ping {os.getenv('mqtt_broker')} 失败")
else:
    print(f"Pinging 'mqttserver' took: {ping * 1000} ms")

# Setup a feed named 'photocell' for publishing to a feed
temperature_feed ="/ESP32/d6t"
# Setup a feed named 'onoff' for subscribing to changes
onoff_feed = "/Pi/onoff"

def connected(client, userdata, flags, rc):
    # 连接成功触发
    print(f"连接成功,订阅主题: {onoff_feed}")
    client.subscribe(onoff_feed)

def disconnected(client, userdata, rc):
    # 断开连接触发
    print("断开连接!")

def message(client, topic, message):
    # 接收消息触发
    print(f"接收数据:{topic}: {message}")
    
# Create a socket pool
pool = socketpool.SocketPool(wifi.radio)

# Set up a MiniMQTT Client
mqtt_client = MQTT.MQTT(
    broker=os.getenv("mqtt_broker"),
    port=1883,
    socket_pool=pool,
)

# Setup the callback methods above
mqtt_client.on_connect = connected
mqtt_client.on_disconnect = disconnected
mqtt_client.on_message = message

# Connect the client to the MQTT broker.
print("正在连接 MQTT...")
mqtt_client.connect()

while True:
    # 检查是否有消息到达或其他需要处理的事件
    mqtt_client.loop(timeout=1)
    temperature = read_thermal_data()
    
    print(f"发送温度数据: {temperature[1][0]}")
    mqtt_client.publish(temperature_feed, temperature[1][0])
    
    time.sleep(2)

树莓派5连接mqtt接收数据并存储:

import time
import datetime
import sqlite3
from paho.mqtt import client as mqtt

conn = sqlite3.connect('test.db',check_same_thread=False)
c = conn.cursor()

#创建数据表,执行一次后注释即可,重复执行会报错
c.execute('''CREATE TABLE temp
       (id INTEGER PRIMARY KEY AUTOINCREMENT,
       temperature float NOT NULL,
        time DateTime NOT NULL
        );''')

def on_connect(client,userdata,flags,rc):
    if rc==0:
        print("连接成功")
        client.subscribe("/ESP32/d6t")
    else:
        print("连接失败")

def on_message(client,userdata,msg):
    '''接收端回调函数'''
    temperature = float(msg.payload.decode('utf-8'))
    print(temperature)
    c.execute(f'INSERT INTO temp (temperature,time) VALUES ("{temperature}","{datetime.datetime.now()}")')
    conn.commit()
    
    if temperature > 28:
        client.publish(f"/Pi/onoff",True,0,False)
    else:
        client.publish(f"/Pi/onoff",False,0,False)

if __name__ == '__main__':
    client = mqtt.Client()

    client.on_connect = on_connect

    client.on_message = on_message

    try:
        client.connect("127.0.0.1",1883,60)
        client.loop_start() #开启一个独立的循环监听线程 loop_forever()阻塞式线程循环
    except KeyboardInterrupt:
        client.disconnect()
    while True:
        time.sleep(1)

运行代码,ESP32每隔两秒与D6T通信采集一次数据,并将数据发送给树莓派5,树莓派5接收数据后将数据存储进数据库,并根据数据的值向ESP32发送控制信号。

数据库中存储温度值与接收时间,此处间隔3秒是因为树莓派还设有1s延时。
以上便是整个系统最核心的部分,后面要做的只是数据包装与模块组装。其他传感器只需照猫画虎即可实现数据采集与存储功能。
下面简单列出其他传感器的数据采集代码,首先是DNT11,用于获取湿度,虽然它也可以获取温度,但我们有D6T-1A-01了,那个更准确。
链接已隐藏,如需查看请登录或者注册
import board
import time
import adafruit_dht

def get_Temp_and_Humidity(dht):
    try:
        temperature = dht.temperature
        humidity = dht.humidity
        return (temperature, humidity)
    except RuntimeError as e:
        return (-1, -1)

if __name__ == "__main__":
    dht = adafruit_dht.DHT11(board.IO21)
    while True:
        temperature,humidity = get_Temp_and_Humidity(dht)
        print("temperature{:.1f}*c\t Humidity:{}%".format(temperature,humidity))
        time.sleep(2.5)

然后是光照传感器,用于接收环境光照强度以判断何时打开补光灯,上面提到的CircuitPython捆绑包中含有适用于此传感器的库。

# SPDX-FileCopyrightText: 2020 Bryan Siepert, written for Adafruit Industries
# SPDX-License-Identifier: Unlicense
import time
import board
import adafruit_bh1750
import busio

i2c = busio.I2C(board.IO22,board.IO23)
sensor = adafruit_bh1750.BH1750(i2c)

while True:
    print("%.2f Lux" % sensor.lux)
    time.sleep(2)

接着是土壤湿度传感器与水位传感器。土壤湿度传感器用于监测土壤湿度,以判断何时打开水电磁阀浇水。水位传感器用来判断是否有水漫出花盆,防止浇水过多造成溢出。它们的原理相似,所以放在一起来介绍。

import analogio
import board
import digitalio
import time
pin = analogio.AnalogIn(board.IO3)
while True:
    print(pin.value)
    if(pin.value > 10000):
        print("True")
    else:
        print("False")
    time.sleep(1.5)
水位传感器:
土壤湿度传感器:

至此,本系统的数据采集与存储部分介绍完毕,下节计划分享Web界面部分,感谢大家的观看。

 

 

 

 

 

 

 

 

 

 

最新回复

NICE,点赞   详情 回复 发表于 2024-10-29 19:22
点赞 关注
 
 

回复
举报

6748

帖子

2

TA的资源

版主

沙发
 

mqtt服务器可以下发数据给mqtt客户端吧?

 
 
 

回复

50

帖子

0

TA的资源

一粒金砂(中级)

板凳
 

NICE,点赞

 
 
 

回复
您需要登录后才可以回帖 登录 | 注册

随便看看
查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/10 下一条

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2024 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表