【得捷电子Follow me第3期】植物养护监测器
[复制链接]
一、介绍视频
二、活动任务及项目说明
活动链接
Follow me 第3期!与得捷一起解锁开发板超能力! (eeworld.com.cn)
任务1:使用MicroPython系统(必做任务)
熟悉Seeed Studio XIAO ESP32C3开发板基本操作,安装esptool,并给开发板刷写MicroPython系统,完成入门程序的运行
搭配器件:Seeed Studio XIAO ESP32C3
安装MicroPython
- 先安装好python以及pip,然后使用pip安装esptool,命令为python -m pip install esptool,具体可参考 官网 (espressif.com)
- 下载MicroPython固件,选择 MCU: esp32c3,链接为MicroPython - Python for microcontrollers,固件为ESP32_GENERIC_C3-20231005-v1.21.0.bin,大小约1.5MB
- 将设备连接好电脑后,查看USB串口号,如windows10环境下为COM14。刷写固件前可以先备份原固件,相关的安装命令为
-
# 1.读取flash信息
esptool --port COM14 flash_id
# 输出结果为 ------------
esptool.py v4.6.2
Serial port COM15
Connecting...
Detecting chip type... ESP32-C3
Chip is ESP32-C3 (revision v0.4)
Features: WiFi, BLE
Crystal is 40MHz
MAC: 34:85:00:00:00:00
Uploading stub...
Running stub...
Stub running...
Manufacturer: 20
Device: 4016
Detected flash size: 4MB
Hard resetting via RTS pin...
# 2.可选择备份当前flash,可从以上结果中看到flash大小为4M,固备份时参数为0x400000
# esptool --port COM14 read_flash 0x00000 0x400000 XIAO_ESP32C3.bin
# 3. 擦除flash
esptool --chip esp32c3 --port COM14 erase_flash
# 4.写入flash-ESP32S3
esptool --chip esp32c3 --port COM14 --baud 460800 write_flash -z 0x0 ESP32_GENERIC_C3-20231005-v1.21.0.bin
安装IDE-Thonny
- 下载Thonny并安装
- 电脑连接XIAO ESP32C3,打开Thonny,点击右下角配置解释器,选择interpreter- Micropython (ESP32)和Port >>>单击“OK”
- 如下图,在shell窗口中已经成功连接上了ESP32C3
-
- 现在就可以在esp32c3上执行python命令了,如下
-
任务2:驱动扩展板上的OLED屏幕(必做任务)
使用扩展板上的OLED屏幕显示文字和图形
搭配器件:Seeed Studio XIAO ESP32C3、Seeed Studio Expansion Board Base for XIAO
安装lib包
- 在Thonny中选择“工具”-“包管理”,搜索micropython-ssd1306
-
- 其中XIAO ESP32C3和展板的io定义可在官网文档Getting Started | Seeed Studio Wiki和 Expansion Board Base for XIAO | Seeed Studio Wiki中找到,具体IO定义如下图。可从图中看到OLED使用的IIC端口为GPIO6和GPIO7,以下代码中会用到
-
- 编写代码并保存为boot.py,代码参考这里 MicroPython for ESP32C3 | Seeed Studio Wiki
- 显示文字图形代码
-
import time
from machine import Pin, SoftI2C
import ssd1306
import math
# ESP32C3 OLED屏 引脚定义
i2c = SoftI2C(scl=Pin(7), sda=Pin(6))
oled_width = 128 # 屏宽
oled_height = 64 # 屏高
oled = ssd1306.SSD1306_I2C(oled_width, oled_height, i2c)
oled.fill(0) # 清屏
oled.text("Hello, ESP32C3!", 10, 15) # 文字
oled.text("\(^o^)/", 30, 40)
# 绘制分隔线
#oled.line(64, 0, 64, 63, 1) # 竖线
oled.line(0, 32, 127, 32, 1) # 横线
oled.show() # 显示
-
运行效果如下
-
更复杂的显示可参考以下扩展库
UI库:
任务3:控制蜂鸣器播放音乐(必做任务)
使用Seeed Studio XIAO ESP32C3控制蜂鸣器发出不同频率的声音,并播放一段音乐
搭配器件:Seeed Studio XIAO ESP32C3、Seeed Studio Expansion Board Base for XIAO
控制蜂鸣器:板子安装有一个无源蜂鸣器,位于GPIO5。可以通过PWM来控制其振动频率,发出不同的声音。经测试最低频率可设置为5 Hz,最高为40M Hz,范围还是挺广的。不过正常能听到的最高频率不到20000 Hz。如果要播放音乐,可以模拟不同音符对应的频率,如C: 261.63 Hz、D: 293.66 Hz、E: 329.63 Hz等。
from machine import Pin, PWM
pin = Pin(5, machine.Pin.OUT)
pwm = PWM(pin)
# 低频声
pwm.freq(100)
# 高频声
pwm.freq(10000)
# 更高频率就提示错误了
>>> bz.freq(40000001)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: frequency must be from 1Hz to 40MHz
当然可调用现有 播放音乐, 作者给出了使用方法,在https://onlinesequencer.net/网站上可找到音乐。并复制其中的编码。如以下代码中所示,音乐定义是分号分隔的,其中每段中定义了音符、这里选了一首土耳其进行曲。由于音乐文件是多轨的,使用一个蜂鸣器有些单调,但旋律还能分辨出来的。
from buzzer_music import music
from time import sleep
from machine import Pin
# 示例曲子 - 土耳其进行曲(片断)
# https://onlinesequencer.net/3066052#t323 Mozart – Turkish March
song = '0 B5 1 41;1 A5 1 41;2 G#5 1 41;3 A5 1 41;4 C6 1 41;4 A4 1 41;8 D6 1 41;9 C6 1 41;10 B5 1 41;11 C6 1 41;12 E6 1 41;16 F6 1 41;17 E6 1 41;18 D#6 1 41;19 E6 1 41;6 C5 1 41;6 E5 1 41;8 E5 1 41;8 C5 1 41;10 C5 1 41;10 E5 1 41;12 A4 1 41;14 C5 1 41;16 C5 1 41;18 C5 1 41;14 E5 1 41;16 E5 1 41;18 E5 1 41;20 B6 1 41;21 A6 1 41;23 A6 1 41;22 G#6 1 41;26 G#6 1 41;24 B6 1 41;25 A6 1 41;27 A6 1 41;20 A4 1 41;22 C5 1 41;22 E5 1 41;24 A4 1 41;26 C5 1 41;26 E5 1 41;28 C7 1 41;32 A6 1 41;34 C7 1 41;28 A4 1 41;30 C5 1 41;30 E5 1 41;32 C5 1 41;32 E5 1 41;34 C5 1 41;34 E5 1 41;36 G6 0.5 41;36.5 A6 0.5 41;37 B6 1 41;38 A6 1 41;38 F#6 1 41;40 G6 1 41;40 E6 1 41;42 A6 1 41;42 F#6 1 41;36 E4 1 41;38 E5 1 41;38 B4 1 41;40 E5 1 41;40 B4 1 41;42 E5 1 41;42 B4 1 41;44 G6 0.5 41;44.5 A6 0.5 41;45 B6 1 41;46 A6 1 41;46 F#6 1 41;48 G6 1 41;48 E6 1 41;50 A6 1 41;50 F#6 1 41;44 E4 1 41;46 E5 1 41;46 B4 1 41;48 E5 1 41;48 B4 1 41;50 E5 1 41;50 B4 1 41;52 E4 1 41;54 B4 1 41;54 E5 1 41;56 B3 1 41;58 B4 1 41;52 G6 0.5 41;52.5 A6 0.5 41;53 B6 1 41;54 A6 1 41;54 F#6 1 41;56 G6 1 41;56 E6 1 41;58 F#6 1 41;58 D#6 1 41;60 E6 4 41;60 E4 4 41;64 B5 1 41;65 A5 1 41;66 G#5 1 41;67 A5 1 41;68 C6 1 41;68 A4 1 41;72 D6 1 41;73 C6 1 41;74 B5 1 41;75 C6 1 41;76 E6 1 41;80 F6 1 41;81 E6 1 41;82 D#6 1 41;83 E6 1 41;70 C5 1 41;70 E5 1 41;72 E5 1 41;72 C5 1 41;74 C5 1 41;74 E5 1 41;76 A4 1 41;78 C5 1 41;80 C5 1 41;82 C5 1 41;78 E5 1 41;80 E5 1 41;82 E5 1 41;84 B6 1 41;85 A6 1 41;87 A6 1 41;86 G#6 1 41;90 G#6 1 41;88 B6 1 41;89 A6 1 41;91 A6 1 41;84 A4 1 41;86 C5 1 41;86 E5 1 41;88 A4 1 41;90 C5 1 41;90 E5 1 41;92 C7 1 41;96 A6 1 41;98 C7 1 41;92 A4 1 41;94 C5 1 41;94 E5 1 41;96 C5 1 41;96 E5 1 41;98 C5 1 41;98 E5 1 41;100 G6 0.5 41;100.5 A6 0.5 41;101 B6 1 41;102 A6 1 41;102 F#6 1 41;104 G6 1 41;104 E6 1 41;106 A6 1 41;106 F#6 1 41;100 E4 1 41;102 E5 1 41;102 B4 1 41;104 E5 1 41;104 B4 1 41;106 E5 1 41;106 B4 1 41;108 G6 0.5 41;108.5 A6 0.5 41;109 B6 1 41;110 A6 1 41;110 F#6 1 41;112 G6 1 41;112 E6 1 41;114 A6 1 41;114 F#6 1 41;108 E4 1 41;110 E5 1 41;110 B4 1 41;112 E5 1 41;112 B4 1 41;114 E5 1 41;114 B4 1 41;116 E4 1 41;118 B4 1 41;118 E5 1 41;120 B3 1 41;122 B4 1 41;116 G6 0.5 41;116.5 A6 0.5 41;117 B6 1 41;118 A6 1 41;118 F#6 1 41;120 G6 1 41;120 E6 1 41;122 F#6 1 41;122 D#6 1 41;124 E6 4 41;124 E4 4 41;132 G6 1 41;132 E6 1 41;128 E6 1 41;130 F6 1 41;130 D6 1 41;128 C6 1 41;134 G6 1 41;134 E6 1 41;136 A6 1 41;137 G6 1 41;138 F6 1 41;139 E6 1 41;132 C4 1 41;134 C5 1 41;136 E4 1 41;138 E5 1 41;140 G4 4 41;140 B5 1 41;140 D6 1 41;142 G5 1 41;148 G6 1 41;148 E6 1 41;144 E6 1 41;146 F6 1 41;146 D6 1 41;144 C6 1 41;150 G6 1 41;150 E6 1 41;152 A6 1 41;153 G6 1 41;154 F6 1 41;155 E6 1 41;150 C5 1 41;152 E4 1 41;154 E5 1 41;156 G4 4 41;156 B5 4 41;156 D6 4 41;160 A5 1 41;160 C6 1 41;162 B5 1 41;162 D6 1 41;164 C6 1 41;164 E6 1 41;166 C6 1 41;166 E6 1 41;168 F6 1 41;169 E6 1 41;170 D6 1 41;171 C6 1 41;166 A4 1 41;164 A3 1 41;168 C4 1 41;170 C5 1 41;172 B5 1 41;172 G#5 1 41;174 E5 1 41;172 E4 4 41;176 A5 1 41;176 C6 1 41;178 B5 1 41;178 D6 1 41;180 C6 1 41;180 E6 1 41;182 C6 1 41;182 E6 1 41;184 F6 1 41;185 E6 1 41;186 D6 1 41;187 C6 1 41;182 A4 1 41;180 A3 1 41;184 C4 1 41;186 C5 1 41;188 B5 4 41;188 E4 4 41;188 G#5 4 41;148 C4 1 41;192 B5 1 41;193 A5 1 41;194 G#5 1 41;195 A5 1 41;196 C6 1 41;196 A4 1 41;200 D6 1 41;201 C6 1 41;202 B5 1 41;203 C6 1 41;204 E6 1 41;208 F6 1 41;209 E6 1 41;210 D#6 1 41;211 E6 1 41;198 C5 1 41;198 E5 1 41;200 E5 1 41;200 C5 1 41;202 C5 1 41;202 E5 1 41;204 A4 1 41;206 C5 1 41;208 C5 1 41;210 C5 1 41;206 E5 1 41;208 E5 1 41;210 E5 1 41;212 B6 1 41;213 A6 1 41;215 A6 1 41;214 G#6 1 41;218 G#6 1 41;216 B6 1 41;217 A6 1 41;219 A6 1 41;212 A4 1 41;214 C5 1 41;214 E5 1 41;216 A4 1 41;218 C5 1 41;218 E5 1 41;220 C7 1 41;220 F4 1 41;222 A4 1 41;222 D#5 1 41;224 D#5 1 41;226 D#5 1 41;224 A4 1 41;226 A4 1 41;228 E4 1 41;230 A4 1 41;230 E5 1 41;232 D4 1 41;234 B4 1 41;234 F4 1 41;224 A6 1 41;226 B6 1 41;228 C7 1 41;230 B6 1 41;232 A6 1 41;234 G#6 1 41;236 A6 1 41;238 E6 1 41;240 F6 1 41;242 D6 1 41;236 C4 1 41;238 E4 1 41;238 A4 1 41;240 D4 1 41;242 F4 1 41;242 B4 1 41;244 C6 4 41;244 A4 2 41;246 A4 2 41;248 G#4 2 41;250 G#4 2 41;244 E4 2 41;246 E4 2 41;248 E4 2 41;250 E4 2 41;248 B5 0.5 41;248.5 C6 0.5 41;249 B5 0.5 41;249.5 C6 0.5 41;250 B5 0.5 41;250.5 C6 0.5 41;251 A5 0.5 41;251.5 B5 0.5 41;252 A5 4 41;252 A4 4 41;252 A3 4 41;260 G6 1 41;260 E6 1 41;256 E6 1 41;258 F6 1 41;258 D6 1 41;256 C6 1 41;262 G6 1 41;262 E6 1 41;264 A6 1 41;265 G6 1 41;266 F6 1 41;267 E6 1 41;260 C4 1 41;262 C5 1 41;264 E4 1 41;266 E5 1 41;268 G4 4 41;268 B5 1 41;268 D6 1 41;270 G5 1 41;276 G6 1 41;276 E6 1 41;272 E6 1 41;274 F6 1 41;274 D6 1 41;272 C6 1 41;278 G6 1 41;278 E6 1 41;280 A6 1 41;281 G6 1 41;282 F6 1 41;283 E6 1 41;278 C5 1 41;280 E4 1 41;282 E5 1 41;284 G4 4 41;284 B5 4 41;284 D6 4 41;288 A5 1 41;288 C6 1 41;290 B5 1 41;290 D6 1 41;292 C6 1 41;292 E6 1 41;294 C6 1 41;294 E6 1 41;296 F6 1 41;297 E6 1 41;298 D6 1 41;299 C6 1 41;294 A4 1 41;292 A3 1 41;296 C4 1 41;298 C5 1 41;300 B5 1 41;300 G#5 1 41;302 E5 1 41;300 E4 4 41;304 A5 1 41;304 C6 1 41;306 B5 1 41;306 D6 1 41;308 C6 1 41;308 E6 1 41;310 C6 1 41;310 E6 1 41;312 F6 1 41;313 E6 1 41;314 D6 1 41;315 C6 1 41;310 A4 1 41;308 A3 1 41;312 C4 1 41;314 C5 1 41;316 B5 4 41;316 E4 4 41;316 G#5 4 41'
"""
Find a piece of music on onlinesequencer.net, click edit,
then select all notes with CTRL+A and copy them with CTRL+C
Paste string as shown above after removing ";:" from
the end and "Online Sequencer:120233:" from the start
"""
mySong = music(song, pins=[Pin(5)])
while True:
print(mySong.tick())
sleep(0.04)
任务4:连接WiFi网络(必做任务)
将Seeed Studio XIAO ESP32C3连接到WiFi网络,并访问互联网信息
搭配器件:Seeed Studio XIAO ESP32C3、Seeed Studio Expansion Board Base for XIAO、RF ANT 2.4GHZ/5.5GHZ PCB TRACE
连接WIFI的方法其实是比较简单的,但想要可靠的进行网络连接,需要对异常情况进行处理。以下代码是经过学习后整理出来的比较稳定的用法,但功能仍不完善。其中有几点可以供大家参考:
- wifi账号密码不要硬编码在代码中,可写入配置文件里
- wifi.connect()方法是非阻塞的,调用完后直接返回,但些时wifi可能没有连接成功。需要使用wifi.isconnected()进行判断,返回True表示连接成功
- 如果是用在最终产品中,需要提供方便的方式供用户来设置账号密码。如转为AP模式,使用web页面来设置,或者使用smartconfig之类的工具来设置。由于时间有限,这里并没有实现。
import network
import utime
# 保存 Wi-Fi MQTT 信息到配置文件,此函数需要事先手动执行
# 使用以下命令保存wifi账号和密码
'''
import ujson
save_wifi_config('SSID','xxx','mqtt.my',1883,'mqtt','xxx','xiao_esp32c3')
'''
def save_wifi_config(ssid, password, mqtt_host=None, mqtt_port=1883, mqtt_user=None, mqtt_password=None, client_id=None):
config = {
'wifi_ssid': ssid,
'wifi_password': password,
'mqtt_host': mqtt_host,
'mqtt_port': mqtt_port,
'mqtt_user': mqtt_user,
'mqtt_password': mqtt_password,
'client_id': client_id
}
with open('config.json', 'w') as f:
ujson.dump(config, f)
# 当前设备联网状态
class Internet:
def __init__(self, led_pin=2, max_retry=10):
self.state = False
self.wait = 2.0
self.max_retry = max_retry
def load_config(self):
try:
with open('config.json', 'r') as f:
config = ujson.load(f)
return config['wifi_ssid'], config['wifi_password']
except OSError:
return None, None
def connect_to_wifi(self, ssid, password):
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
if not wifi.isconnected():
print("连接到WiFi网络...")
wifi.connect(ssid, password)
retry_count = 0
while not wifi.isconnected() and retry_count < self.max_retry:
utime.sleep(1)
retry_count += 1
if wifi.isconnected():
print("已成功连接到WiFi网络")
print("网络信息:", wifi.ifconfig())
self.state = True
return True
else:
print("WiFi连接失败")
self.state = False
self.setup_wifi()
return False
else:
print("已连接到WiFi网络")
self.state = True
return True
def setup_wifi(self):
ssid, password = self.load_config()
if ssid and password:
if self.connect_to_wifi(ssid, password):
return
# TODO 实现用手机配置网络
print("无法读取配置文件或配置文件不存在,请使用save_wifi_config来设置")
internet = Internet()
ssid, password = internet.load_config()
while True:
if internet.connect_to_wifi(ssid, password):
internet.wait = 60
else:
internet.wait = 2
utime.sleep(internet.wait)
访问网络,同步NTP时间。这里添加了时区设置,时间同步后,使用utime.localtime()可获得正确的本地时间
import utime
import ntptime
# 时区
TIME_ZONE = 8
try:
tm = utime.gmtime(ntptime.time())
RTC().datetime((tm[0], tm[1], tm[2], tm[6] +
1, tm[3] + TIME_ZONE, tm[4], tm[5], 0))
utime.localtime()
except OSError as error:
print("Failed to get ntptime\n", error)
输出结果为
任务5:使用外部传感器(必做任务)
连接环境光传感器或温湿度传感器,获取传感器的数值,并转换成真实的物理量
搭配器件: Seeed Studio XIAO ESP32C3、Seeed Studio Expansion Board Base for XIAO、 DHT22温湿度传感器、GY30光照传感器、土壤湿度传感器
本次外部传感器使用的是自己已有的一些,同时可是为了完成一个综合项目——植物养护监测器。各传感器的型号和连接如下:
温湿度传感器-DHT22:GPIO4
土壤湿度传感器:GPIO5(AO)、GPIO21(DO)
光照传感器-GY-30:GPIO6(SDA)、GPIO7(SCL)
温湿度采集
import dht
import machine
d = dht.DHT22(machine.Pin(4))
d.measure()
d.temperature() #温度
d.humidity() #湿度
GY-30光照采集
GY-30使用的是BH1750,它的读取协议为,其读取地址为0100011(0x23)。
按照协议的说明,读取时分3步。
1.发送测量模式和分辨率指令;2.等待约180ms;3.读取两个字节的光照强度,然后按格式转为光照数值
i2c读取代码
from machine import Pin, I2C
i2c = I2C(0,scl = Pin(7),sda = Pin(6),freq = 1_000_000)
# 结果为 [35, 60, 81]
scan = i2c.scan()
print(f'地址:{hex(scan[0])}')
# 输出内容为
# 地址:0x23
BH1750_I2C_ADD = 0x23
# 读取指令列表
BH1750_CMD_POWERDOWN = 0x0
BH1750_CMD_POWERON = 0x1
BH1750_CMD_RESET = 0x7
BH1750_CMD_H_RESOLUTION = 0x10
BH1750_CMD_H_RESOLUTION2 = 0x11
BH1750_CMD_L_RESOLUTION = 0x13
BH1750_CMD_ONETIME_H = 0x20
BH1750_CMD_ONETIME_H2 = 0x21
BH1750_CMD_ONETIME_L = 0x23
# 设置精度
buf = bytearray(1)
buf[0] = BH1750_CMD_H_RESOLUTION
i2c.writeto(BH1750_I2C_ADD, buf)
time.sleep_ms(200)
while True:
# 读取两个字节
buf = i2c.readfrom(BH1750_I2C_ADD, 0x2)
data = buf[0] * 256 + buf[1]
print(data)
time.sleep_ms(200)
土壤湿度采集
使用ADC时需要将传感器接入到AO口,因为adc读取的是个绝对数值,为了换算成相对湿度,可测量最潮湿的值和最干燥的值,然后用以下公式来计算个相对湿度,该值仅供参考,无严格的物理含义。
from machine import Pin, ADC
# 土壤传感器 模拟信号
adc = ADC(Pin(5))
adc.atten(ADC.ATTN_11DB)
adc.read()
# 读取的值在0-4095之间,可使用以下公式换算为相对湿度,其中1000为最潮湿情况下的adc值
soil_humidity = round(
100.0 * (adc_raw - 4095.0) / (1000.0 - 4095.0), 1)
任务6:综合实践(选做,非必做)
植物养护监测器
一、硬件设备
1.主控:XIAO_ESP32C3、远程监控设备AtomS3(芯片为ESP32S3,是本次活动一起下单的,用来做mqtt消息订阅实验)
2.传感器:
温湿度传感器-DHT22:GPIO4
土壤湿度传感器:GPIO5(AO)、GPIO21(DO)
光照传感器-GY-30:GPIO6(SDA)、GPIO7(SCL)
RGB灯-WS2812:GPIO2
二、系统功能
数据采集:设备定时采集环境温度、湿度、土壤湿度、光照信息,上传至mqtt服务器;
远程监控:设备本机OLED可用来查看当前的状态,远程设备通过订阅mqtt来同步展示信息;
历史趋势:远程使用HomeAssistant来订阅mqtt数据并展示当前数据和历史趋势;
事件提醒:当土壤湿度过低时,控制RGB亮灯用来提示用户需要浇水了。
三、开发环境
1. 设备端 MicroPython v1.21.0、系统端 Mosquitto、 HomeAssistant
2. 外部库:ssd1306、dht
3. 相关技术:MQTT、asyncio协程、Timer
四、成品展示
主控及传感器
远程监控屏幕
HomeAssistant看板
五、核心代码
1.主控 ESP32C3-MicroPython
import ujson
import network
import ntptime
import utime
from umqtt.simple import MQTTClient
from machine import Pin, SoftI2C, ADC, I2C, Timer, RTC
import neopixel
import dht
# 需要手动安装,搜索ssd1306
import ssd1306
import asyncio
# 定义全局变量
# 引脚定义
# RGB灯
WS2812_PIN = 2
# 板载按键
BTN_PIN = 3
# 温湿度传感器-DHT22
DHT22_PIN = 4
# 土壤湿度传感器
SOIL_AO_PIN = 5
SOIL_DO_PIN = 21
# GY-30 IIC
SDA_PIN = 6
SCL_PIN = 7
BH1750_I2C_ADD = 0x23
BH1750_CMD_H_RESOLUTION = 0x10
# 蜂鸣器
BUZZER_PIN = 20
# 时区
TIME_ZONE = 8
# 保存 Wi-Fi MQTT 信息到配置文件,此函数需要事先手动执行
# 使用以下命令保存wifi账号和密码
'''
import ujson
save_wifi_config('SSID','xxx','mqtt.my',1883,'mqtt','xxx','xiao_esp32c3')
'''
def save_wifi_config(ssid, password, mqtt_host=None, mqtt_port=1883, mqtt_user=None, mqtt_password=None, client_id=None):
config = {
'wifi_ssid': ssid,
'wifi_password': password,
'mqtt_host': mqtt_host,
'mqtt_port': mqtt_port,
'mqtt_user': mqtt_user,
'mqtt_password': mqtt_password,
'client_id': client_id
}
with open('config.json', 'w') as f:
ujson.dump(config, f)
# 从配置文件中读取 MQTT 配置
def load_mqtt_config():
try:
with open('config.json', 'r') as f:
config = ujson.load(f)
return config['mqtt_host'], config['mqtt_port'], config['mqtt_user'], config['mqtt_password'], config['client_id']
except OSError:
return None, None, None, None, None
# 当前设备联网状态
class Internet:
def __init__(self, led_pin=2, max_retry=10):
self.state = False
self.wait = 2.0
self.led_pin = led_pin
self.max_retry = max_retry
def load_config(self):
try:
with open('config.json', 'r') as f:
config = ujson.load(f)
return config['wifi_ssid'], config['wifi_password']
except OSError:
return None, None
def connect_to_wifi(self, ssid, password):
wifi = network.WLAN(network.STA_IF)
wifi.active(True)
if not wifi.isconnected():
print("连接到WiFi网络...")
wifi.connect(ssid, password)
retry_count = 0
while not wifi.isconnected() and retry_count < self.max_retry:
utime.sleep(1)
retry_count += 1
if wifi.isconnected():
print("已成功连接到WiFi网络")
print("网络信息:", wifi.ifconfig())
self.state = True
return True
else:
print("WiFi连接失败")
self.state = False
return False
else:
print("已连接到WiFi网络")
self.state = True
return True
def setup_wifi(self):
ssid, password = self.load_config()
if ssid and password:
if self.connect_to_wifi(ssid, password):
return
# TODO 实现用手机配置网络
print("无法读取配置文件或配置文件不存在,请使用save_wifi_config来设置")
# 当前日期时间状态
class NTPDatetime:
def __init__(self):
self.datetime = None
self.ntp = None
self.wait = 36000
def sync_time(self):
try:
tm = utime.gmtime(ntptime.time())
RTC().datetime((tm[0], tm[1], tm[2], tm[6] +
1, tm[3] + TIME_ZONE, tm[4], tm[5], 0))
return True
except OSError as error:
print("Failed to get ntptime\n", error)
return False
# MQTT
class MqttClient:
def __init__(self, internet, plantCare):
self.mqtt = None
self.internet = internet
self.is_connected = False
self.plantCare = plantCare
host, port, user, password, client_id = load_mqtt_config()
print(host, port, user, client_id)
self.client_id = client_id
mqtt = MQTTClient(
client_id=client_id,
server=host,
port=port,
user=user,
password=password
)
self._mqtt = mqtt
# mqtt.set_callback = self.message
self.last_update_json = None
self.wait = 10
# 更新传感器状态到HomeAssistant
def update(self):
if self.mqtt:
# 构建传感器数据的字典
sensor_data = {
'temperature': self.plantCare.temperature,
'humidity': self.plantCare.humidity,
'soil_humidity': self.plantCare.soil_humidity,
'light_intensity': self.plantCare.light_intensity
}
# 将传感器数据转换为JSON字符串
json_data = ujson.dumps(sensor_data)
# 如果传感器未变化,则不更新mqtt
if self.last_update_json == json_data:
return
self.last_update_json = json_data
print(json_data)
# 发布MQTT消息
try:
self.mqtt.connect()
self.mqtt.publish(
f"homeassistant/sensor/{self.client_id}/state", json_data, retain=True)
self.mqtt.disconnect()
except OSError as error:
print("MQTT publish error:", error)
# 自动注册设备到HomeAssistant
def register_device(self):
if not self.mqtt:
return
# 设备自动发现配置信息
device_identifier = self.client_id
device_name = "Plant Care"
device_manufacturer = "Follow Me 2023Q3"
device_model = "Xiao ESP32C3"
# 传感器自动发现配置信息
temperature_sensor_name = "Temperature"
humidity_sensor_name = "Humidity"
soil_humidity_sensor_name = "Soid Humidity"
light_intensity_sensor_name = "Light Intensity"
# 构建设备自动发现配置信息的字典
device_config = {
"identifiers": [device_identifier],
"name": device_name,
"manufacturer": device_manufacturer,
"model": device_model,
"sw_version": "1.0",
"via_device": "mqtt_bridge",
"platform": "mqtt"
}
# 将设备自动发现配置信息转换为JSON字符串
device_config_json = ujson.dumps(device_config)
# 构建传感器自动发现配置信息的字典
temperature_sensor_config = {
"device_class": "temperature",
"state_topic": f"homeassistant/sensor/{device_identifier}/state",
"value_template": "{{ value_json.temperature}}",
"unit_of_measurement": "°C",
"unique_id": f"{device_identifier}_temperature",
"device": {
"identifiers": [
f"{device_identifier}"
],
"name": temperature_sensor_name
}
}
humidity_sensor_config = {
"device_class": "humidity",
"state_topic": f"homeassistant/sensor/{device_identifier}/state",
"value_template": "{{ value_json.humidity}}",
"unit_of_measurement": "%",
"unique_id": f"{device_identifier}_humidity",
"device": {
"identifiers": [
f"{device_identifier}"
],
"name": humidity_sensor_name
}
}
soil_intensity_sensor_config = {
"device_class": "humidity",
"state_topic": f"homeassistant/sensor/{device_identifier}/state",
"value_template": "{{ value_json.soil_humidity}}",
"unit_of_measurement": "%",
"unique_id": f"{device_identifier}_soil_humidity",
"device": {
"identifiers": [
f"{device_identifier}"
],
"name": soil_humidity_sensor_name
}
}
light_intensity_sensor_config = {
"device_class": "illuminance",
"state_topic": f"homeassistant/sensor/{device_identifier}/state",
"value_template": "{{ value_json.light_intensity}}",
"unit_of_measurement": "lx",
"unique_id": f"{device_identifier}_illuminance",
"device": {
"identifiers": [
f"{device_identifier}"
],
"name": light_intensity_sensor_name
}
}
# 将传感器自动发现配置信息转换为JSON字符串
temperature_sensor_config_json = ujson.dumps(temperature_sensor_config)
humidity_sensor_config_json = ujson.dumps(humidity_sensor_config)
soil_humidity_sensor_config_json = ujson.dumps(
soil_intensity_sensor_config)
light_intensity_sensor_config_json = ujson.dumps(
light_intensity_sensor_config)
self.mqtt.connect()
# 发布传感器自动发现配置信息到自动发现主题
self.mqtt.publish(
f"homeassistant/sensor/{device_identifier}/temperature/config", temperature_sensor_config_json)
self.mqtt.publish(
f"homeassistant/sensor/{device_identifier}/humidity/config", humidity_sensor_config_json)
self.mqtt.publish(
f"homeassistant/sensor/{device_identifier}/soil_humidity/config", soil_humidity_sensor_config_json)
self.mqtt.publish(
f"homeassistant/sensor/{device_identifier}/light_intensity/config", light_intensity_sensor_config_json)
self.mqtt.disconnect()
print(f"homeassistant/sensor/{device_identifier}/temperature/config")
print(temperature_sensor_config_json)
# 断开MQTT连接
self.mqtt.disconnect()
def message(self, client, topic, message):
print(f"New message on topic {topic}: {message}")
# 0-关,1-关,2-切换
if topic == self.state_topic:
if message == '0':
self.led = False
elif message == '1':
self.led = True
# 植物环境信息
class PlantCare:
def __init__(self, i2c):
self.i2c = i2c
self.temperature = 0.0
self.humidity = 0.0
self.soil_humidity = 0.0
self.light_intensity = 0
self.weather_text = None
self.weather_temperature = None
self.wait = 5
# 定义一个回调函数列表
self.callbacks = []
# 初始化设备
def init(self):
self.dht22 = dht.DHT22(Pin(DHT22_PIN))
self.adc = ADC(Pin(SOIL_AO_PIN))
self.adc.atten(ADC.ATTN_11DB)
# self.i2c = I2C(0,scl = Pin(SCL_PIN),sda = Pin(SDA_PIN),freq = 1_000_000)
buf = bytearray(1)
buf[0] = BH1750_CMD_H_RESOLUTION
self.i2c.writeto(BH1750_I2C_ADD, buf)
pass
# 更新传感器
def update_sensor(self):
try:
self.dht22.measure()
self.temperature = self.dht22.temperature() # 温度
self.humidity = self.dht22.humidity() # 湿度
print(f"{self.temperature}, {self.humidity}%")
except OSError as error:
print("Failed to get dht22\n", error)
adc_raw = self.adc.read()
print(f"adc_raw={adc_raw}")
self.soil_humidity = round(
100.0 * (adc_raw - 4096.0) / (1000.0 - 4095.0), 1)
print(f"{self.soil_humidity}%")
buf = self.i2c.readfrom(BH1750_I2C_ADD, 0x2)
self.light_intensity = buf[0] * 256 + buf[1]
print(f"{self.light_intensity} lux")
# 注册回调函数
def register_callback(self, callback):
# 检查回调函数的参数
if len(callback.__code__.co_varnames) != 2:
raise ValueError(
"Callback function must have exactly 2 parameters.")
self.callbacks.append(callback)
# 解除回调函数
def unregister_callback(self, callback):
if callback in self.callbacks:
self.callbacks.remove(callback)
# 调用所有注册的回调函数
def call_callbacks(self, *args, **kwargs):
for callback in self.callbacks:
# 检查回调函数的参数
if len(args) != 2:
raise ValueError(
"Callback function must have exactly 2 arguments.")
callback(*args, **kwargs)
class OLED:
def __init__(self, i2c):
# 初始化屏幕
self.oled_width = 128 # 屏宽
self.oled_height = 64 # 屏高
oled = ssd1306.SSD1306_I2C(self.oled_width, self.oled_height, i2c)
self.oled = oled
self.power = None
self.wait = 5
# 屏幕超时关闭定时器
self.timer = Timer(0)
self.poweron()
def poweron(self):
self.oled.poweron()
self.power = True
# 设置超时关闭
self.set_oled_timeout()
def poweroff(self, timer=None):
self.oled.poweroff()
self.power = False
def show_text(self, temperature, humidity, soil_humidity, light):
global oled_is_show
# 全局变量控制oled是否显示
if self.power:
oled = self.oled
# 设置字体大小
oled.fill(0)
# 绘制十字分隔线
oled.line(64, 0, 64, 63, 1)
oled.line(0, 32, 127, 32, 1)
# 在每个部分显示数值
oled.text(f'{temperature:.1f} C', 8, 10, 2)
oled.text(f'{humidity:.1f} %', 72, 10, 2)
oled.text(f'{soil_humidity:.1f} %', 8, 42, 2)
oled.text(f'{light} lx', 72, 42, 2)
oled.show()
# 屏幕超时关闭
def set_oled_timeout(self):
self.timer.init(period=60000, mode=Timer.PERIODIC,
callback=self.poweroff)
class ButtonControl:
def __init__(self, oled):
self.oled = oled
# 配置按钮中断
button = Pin(BTN_PIN, Pin.IN, Pin.PULL_UP)
button.irq(trigger=Pin.IRQ_FALLING | Pin.IRQ_RISING,
handler=self.button_interrupt)
def button_interrupt(self, button):
goled = self.oled
# 获取当前时间戳
current_time = utime.ticks_ms()
# 检测按钮状态
if button.value() == 0:
print(f'button press:current_time={current_time}')
# 等待按键防抖延时
utime.sleep_ms(50)
# 再次检测按钮状态,确保按键状态稳定
if button.value() == 0:
if goled.power:
goled.power = False
goled.poweroff()
else:
goled.power = True
goled.poweron()
# 连接wifi
async def wifi_connect(internet):
failure_count = 0
ssid, password = internet.load_config()
while True:
if internet.connect_to_wifi(ssid, password):
internet.wait = 60
else:
internet.wait = 2
await asyncio.sleep(internet.wait)
# 获取NTP时间
async def ntp_time(internet, ntp_datetime):
tz_offset = TIME_ZONE
while True:
if internet.state:
ntp_datetime.sync_time()
await asyncio.sleep(ntp_datetime.wait)
async def plant_care_control(plant_care):
plant_care.init()
while True:
plant_care.update_sensor()
await asyncio.sleep(plant_care.wait)
# MQTT消息
async def mqtt_connect(internet, mqtt_client):
while True:
if mqtt_client.mqtt:
mqtt_client.update()
elif internet.state:
print("Set up a MiniMQTT Client")
try:
mqtt_client._mqtt.connect()
mqtt_client._mqtt.disconnect()
mqtt_client.mqtt = mqtt_client._mqtt
mqtt_client.register_device()
except OSError as error:
print("Failed to set mqtt\n", error)
await asyncio.sleep(mqtt_client.wait)
async def pixels_led(internet, plant_care):
pixel_pin = Pin(WS2812_PIN)
pixel_num = 1
pixels = neopixel.NeoPixel(pixel_pin, pixel_num)
while True:
if not internet.state:
pixels[0] = (0, 0, 255) # 设为蓝色全亮
pixels.write()
await asyncio.sleep(internet.wait)
# 土壤湿度小于10%时亮红灯提醒
else:
if plant_care.soil_humidity < 0.1:
pixels[0] = (255, 0, 0) # 设为红色全亮
pixels.write()
else:
if pixels[0] != (0, 0, 0):
pixels[0] = (0, 0, 0)
pixels.write()
await asyncio.sleep(plant_care.wait)
# OLED屏幕显示
async def oled_show(oled, plant_care):
while True:
temperature = plant_care.temperature
humidity = plant_care.humidity
soil_humidity = plant_care.soil_humidity
light_intensity = plant_care.light_intensity
oled.show_text(temperature, humidity, soil_humidity, light_intensity)
await asyncio.sleep(oled.wait)
def main():
i2c = SoftI2C(scl=Pin(SCL_PIN), sda=Pin(SDA_PIN))
# 共享变量设置
internet = Internet()
ntp_datetime = NTPDatetime()
plant_care = PlantCare(i2c)
mqtt_client = MqttClient(internet, plant_care)
oled = OLED(i2c)
button_control = ButtonControl(oled)
# 设置全局变量,在回调函数中使用
global goled
goled = oled
# 协程函数定义
loop = asyncio.get_event_loop()
loop.create_task(wifi_connect(internet))
loop.create_task(ntp_time(internet, ntp_datetime))
loop.create_task(mqtt_connect(internet, mqtt_client))
loop.create_task(plant_care_control(plant_care))
loop.create_task(pixels_led(internet, plant_care))
loop.create_task(oled_show(oled, plant_care))
# 启动协程
loop.run_forever()
main()
2. 远程监控 ESP32S3-CircuitPython
import board
import os
import displayio
import wifi
import json
import socketpool
from digitalio import DigitalInOut, Direction, Pull
# 需要导入adafruit_bitmap_font库
from adafruit_bitmap_font import bitmap_font
# 需要导入adafruit_display_text库
from adafruit_display_text import bitmap_label
# 需要导入adafruit_debouncer库
from adafruit_debouncer import Button
# 需要导入adafruit_minimqtt库
import adafruit_minimqtt.adafruit_minimqtt as MQTT
# 需要导入asyncio、adafruit_ticks库
import asyncio
# 当前设备联网状态
class Internet:
def __init__(self, max_retry=10):
self.state = False
self.wait = 2.0
self.max_retry = max_retry
def connect_to_wifi(self):
ssid = os.getenv("CIRCUITPY_WIFI_SSID")
password = os.getenv("CIRCUITPY_WIFI_PASSWORD")
try:
wifi.radio.connect(ssid, password)
except Exception as e:
print(f"Wi-Fi 连接失败:{e}")
retry_count = 0
while not wifi.radio.connected and retry_count < self.max_retry:
# time.sleep(1)
print(f"Wi-Fi 连接失败+1")
retry_count += 1
if wifi.radio.connected:
print("已成功连接到WiFi网络")
print("网络信息:", wifi.radio.ipv4_address)
self.state = True
return True
else:
print("WiFi连接失败")
self.setup_wifi()
self.state = False
return False
def setup_wifi(self):
# TODO 实现用手机配置网络
print("无法读取配置文件或配置文件不存在,请使用save_wifi_config来设置")
def run(self):
# 如果WiFi连接断开,进行断网重连
if wifi.radio.connected:
# print(f"已连接到WiFi网络:{wifi.radio.ipv4_address}")
self.wait = 120
self.state = True
else:
self.connect_to_wifi()
# MQTT消息
class MqttClient:
def __init__(self, internet, plant_care, lcd):
self.mqtt = None
self.is_connected = False
self.wait = 1
self.wait_mqtt = 10
topic = os.getenv("MQTT_TOPIC")
self.state_topic = topic
pool = socketpool.SocketPool(wifi.radio)
mqtt = MQTT.MQTT(
broker=os.getenv("MQTT_HOST"),
port=os.getenv("MQTT_PORT"),
username=os.getenv("MQTT_USER"),
password=os.getenv("MQTT_PASSWORD"),
socket_pool=pool,
client_id=os.getenv("MQTT_CLIENT_ID")
)
mqtt.on_connect = self.on_connect
mqtt.on_disconnect = self.on_disconnect
mqtt.on_message = self.on_message
self.mqtt = mqtt
self.plant_care = plant_care
def connect(self):
try:
self.mqtt.connect()
except (Exception, RuntimeError) as e:
print("Failed to mqtt connect\n", e)
def update(self):
if self.is_connected:
try:
self.mqtt.loop()
except (Exception, RuntimeError) as e:
print("Failed to loop mqtt", e)
def on_connect(self, mqtt_cli, userdata, flags, rc):
print(f"Connected to MQTT")
self.is_connected = True
self.mqtt.subscribe(self.state_topic)
def on_disconnect(self, mqtt_cli, userdata, rc):
print(f"Disconnected from MQTT")
self.is_connected = False
def on_message(self, client, topic, message):
print(f"New message on topic {topic}: {message}")
# 解析JSON数据
data = json.loads(message)
# 提取Temperature等信息
self.plant_care.temperature = data["temperature"]
self.plant_care.humidity = data["humidity"]
self.plant_care.soil_humidity = data["soil_humidity"]
self.plant_care.light_intensity = data["light_intensity"]
# 植物环境信息
class PlantCare:
def __init__(self):
self.temperature = 0.0
self.humidity = 0.0
self.soil_humidity = 0.0
self.light_intensity = 0
self.weather_text = None
self.weather_temperature = None
# LCD显示
class LCD:
def __init__(self, plant_care):
self.index = 0
self.wait = 5
self.plant_care = plant_care
display = board.DISPLAY
# 中文字体文件放在font目录下
font_file = "font/wenquanyi_13px.pcf"
font = bitmap_font.load_font(font_file)
# terminalio.FONT
group = displayio.Group()
# 设置显示
name_label = bitmap_label.Label(font, color=0x00FF00, scale=2)
name_label.anchor_point = (0.5, 0.5)
name_label.anchored_position = (display.width // 2, 20)
name_label.text = ""
# 设置显示
data_label = bitmap_label.Label(font, scale=3)
data_label.anchor_point = (0.5, 0.5)
data_label.anchored_position = (
display.width // 2, display.height // 1.8)
data_label.text = ""
group.append(name_label)
group.append(data_label)
self.name_label = name_label
self.data_label = data_label
# 创建根group
main_group = displayio.Group()
main_group.append(group)
# 展示
display.root_group = main_group
def show(self):
current_screen = self.index
plant_care = self.plant_care
if current_screen == 0:
lcd_text = ("温度", f"{plant_care.temperature:.1f}°C")
elif current_screen == 1:
lcd_text = ("湿度", f"{plant_care.humidity:.1f} %")
elif current_screen == 2:
lcd_text = ("土壤", f"{plant_care.soil_humidity:.1f} %")
elif current_screen == 3:
lcd_text = ("光照", f"{plant_care.light_intensity} lx")
self.name_label.text, self.data_label.text = lcd_text
def show_next(self):
self.index = (self.index + 1) % 4
self.show()
async def wifi_connect(internet):
while True:
internet.run()
await asyncio.sleep(internet.wait)
async def mqtt_connect(internet, mqtt_client):
while True:
# 等待WiFi连接成功
if not internet.state:
print('wifi 未连接')
await asyncio.sleep(internet.wait)
continue
if mqtt_client.is_connected:
try:
print("mqtt update")
mqtt_client.update()
except (ValueError, RuntimeError) as e:
print("Failed to get data, retrying\n", e)
await asyncio.sleep(mqtt_client.wait)
elif internet.state:
try:
mqtt_client.connect()
except (ValueError, RuntimeError) as e:
print("Failed to mqtt connect\n", e)
await asyncio.sleep(mqtt_client.wait_mqtt)
else:
await asyncio.sleep(internet.wait)
# 屏幕显示
async def lcd_display(lcd, plant_care):
while True:
lcd.show_next()
await asyncio.sleep(lcd.wait)
# 按钮点击检测
async def monitor_buttons(lcd):
pin = DigitalInOut(board.BTN)
button = Button(pin)
while True:
button.update()
# if button.long_press:
# print("Long Press")
if button.short_count == 1:
print(f"Short Press Count = {button.short_count}")
lcd.show_next()
# elif button.short_count == 2:
# print(f"Short Press Count = {button.short_count}")
# elif button.short_count == 3:
# print(f"Short Press Count = {button.short_count}")
# elif button.short_count == 4:
# print(f"Short Press Count = {button.short_count}")
# elif button.short_count > 1:
# print(f"Short Press Count = {button.short_count}")
# if button.long_press and button.short_count == 1:
# print("That's a long double press !")
await asyncio.sleep(0.01)
async def main():
# 共享变量设置
internet = Internet()
plant_care = PlantCare()
lcd = LCD(plant_care)
mqtt_client = MqttClient(internet, plant_care, lcd)
# 协程函数定义
wifi_connect_task = asyncio.create_task(wifi_connect(internet))
mqtt_task = asyncio.create_task(mqtt_connect(internet, mqtt_client))
monitor_buttons_task = asyncio.create_task(monitor_buttons(lcd))
lcd_display_task = asyncio.create_task(
lcd_display(lcd, plant_care))
# 启动协程
await asyncio.gather(wifi_connect_task, mqtt_task, monitor_buttons_task, lcd_display_task)
asyncio.run(main())
参考文档
MicroPython libraries — MicroPython latest documentation
MicroPython for ESP32C3 | Seeed Studio Wiki
Digi-Key得捷电子技术专区 (eeworld.com.cn)
心得体会
心得体会:这次参加的FollowMe活动让我受益匪浅。通过学习大咖们的教程,我掌握了esp32s3开发板的基础知识和实际应用技巧,包括CircuitPython的刷写、屏幕显示和网络访问等操作。通过将所学知识整理成文档和操作视频,不仅加深了自己的理解,也能与他人分享。对于电子爱好者来说,这次活动的难度适宜,主办方组织得有条不紊,工作人员耐心指导,再次点赞表示感谢。这次经历让我意识到学习和分享的重要性,学习群里大家都很活跃。希望未来能继续参与类似活动,不断学习、分享、进步。同时也祝愿FollowMe活动越办越好,共同推动中国电子领域的创新发展!
三、项目代码下载
FollowMe第3期-植物养护监测器-MicroPython-XiaoESP32C3-嵌入式开发相关资料下载-EEWORLD下载中心
|