【得捷电子Follow me第2期】WS2812B灯板控制及MQTT通讯实验
[复制链接]
大家好,我是一名业余电子DIY爱好者,这次活动的硬件是Adafruit ESP32-S3 TFT Feather,这块板子的flash有4M,PSRAM有2M,自带了一块1.14寸的IPS显示屏,还附带了一个锂电池充放管理电路,可以做一些可穿戴的开发。
必做任务1:控制屏幕显示中文
第一个任务我们先来尝试一下如何使用Circuitpython来显示中文字符。
在传统的嵌入式开发中,显示中文字符一直是个比较麻烦的事情,因为要先自行对需要显示的汉字进行统计,随后进行取模。如果显示的汉字比较多的话还可能出现占用内存过大的问题。但是,由于这块板子内存足够大,我们甚至可以导入一个完整的中文字体库,这样就不再需要事先统计好需要显示哪些汉字了。这在某些应用,比如网页浏览器,或是电子书等功能实现上非常有用。
我们先来看看代码实现的方式。Circuitpython固件中已经自带了屏幕的驱动,因此用起来非常方便,初始化代码仅需要三行:
import board
import displayio
from adafruit_bitmap_font import bitmap_font
from adafruit_display_text import label
display = board.DISPLAY
group = displayio.Group()
display.show(group)
重点来了,因为我们需要显示中文,所以要自己下载PCF字体文件并加载进代码中。
font = bitmap_font.load_font("/wenquanyi_13px.pcf")
接下来我们创建一个文字元素,并把它添加到屏幕中显示,所有的设置过程就完成了。在这里我们先预先定义一个常用颜色字典,方便使用。
color = {
"black" : 0x000000,
"white" : 0xFFFFFF,
"red" : 0xFF0000,
"green" : 0x00FF00,
"blue" : 0x0000FF,
"cyan" : 0x00FFFF,
"magenta" : 0xFF00FF,
"yellow" : 0xFFFF00,
}
text_tg = label.Label(font, color=color["white"])
text_tg.y = 60
text_tg.x = 0
text_tg.background_color = None
text_tg.line_spacing = 1.0
text_tg.scale = 3
group.append(text_tg)
最后,只需要在代码中设定该元素的文字内容即可。当这个文字内容发生变化时,新的文字会同步出现在屏幕上。
text_tg.text = "你好,世界!"
必做任务2:网络功能使用
Circuitpython 连接网络也十分方便,只需要导入对应的WIFI库即可。连接上网络后,我们可以把获取到的IP显示在屏幕上,已验证是否成功连接到网络。
import wifi
wifi.radio.connect(ssid="SSID", password="PWD")
print("my IP addr:", wifi.radio.ipv4_address)
text_tg.text = str(wifi.radio.ipv4_address) # 利用任务一中初始化好的显示代码
while 1:
True
必做任务3:控制WS2812B
这块板子上自带了一个WS2812B灯珠,我们可以先利用这个灯珠测试一下neopixel这个库的功能,测试过后再将其拓展到灯板上。
先导入相应的库,然后创建好灯珠对象:
import board
import neopixel
pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.04)
为了实现彩虹灯的变换效果,我们需要让RGB三个通道的数值来回扫描,在这里我们进行了一些简单的数学变换来实现这个效果,并把它封装成一个函数:
i = 0
res = 255
step = 10
def breathing(reverse = False):
global pixels, i
if reverse:
j = 2*res - i
else:
j = i
pixel[0] = (abs((j+int(0))%(res*2)-res),abs((j+int(res*2/3))%(res*2)-res),abs((j+int(res*2*2/3))%(res*2)-res))
i = (i + step)%(2*res)
while True:
breathing()
我们就可以看到效果:
按键控制部分我放在了第四部分一起完成。
必做任务4:WS2812B效果控制
接下来我们将上面的代码拓展到灯板上,单色灯显示比较简单,因为每一个灯的颜色都是一样的,我们直接用fill方法就可以实现。但流水灯在这里会比较麻烦,因为每个灯的颜色都不一样,所以在这里使用了循环来实现。
def breathing_matrix(reverse = False):
global pixels, i
for k in range(64):
j = (i + (k * step))%(2*res)
if reverse:
o = 63-k
else:
o = k
if int(o/8)%2:
l = o
else:
l = (7-(o%8)) + (int(o/8)*8)
# l = o
pixels[l] = (abs((j+int(0))%(res*2)-res),abs((j+int(res*2/3))%(res*2)-res),abs((j+int(res*2*2/3))%(res*2)-res))
i = (i + step)%(2*res)
pixels.show()
至此,功能模块部分都完成了,我们需要把上面做过的屏幕显示和WB2812B控制部分放到一起加入主循环。别忘了在这部分添加按键切换。
list1 = [
color["black"],
color["white"],
color["red"],
color["green"],
color["blue"],
color["cyan"],
color["magenta"],
color["yellow"],
]
list2 = [
"关闭",
"白色",
"红色",
"绿色",
"蓝色",
"青色",
"品红",
"黄色",
]
idx = 8
def loop():
global idx
if not btn.value:
while not btn.value:
pass
idx = (idx + 1) % (len(list1) + 2)
if idx < len(list1):
pixel.fill(list1[idx])
pixels.fill(list1[idx])
pixels.show()
text_tg.text = list2[idx]
else:
if idx == len(list1):
breathing()
breathing_matrix()
text_tg.text = "彩虹正"
else:
breathing(True)
breathing_matrix(True)
text_tg.text = "彩虹反"
while True:
gc.collect()
loop()
最终显示效果如下:
可选任务5:通过网络控制WS2812B.
我在采购物料的时候多带了两块RPI-PICO-W,刚好在这次的项目上可以尝试下MQTT多对一通讯。我打算让两块RPI-PICO-W都作为遥控发射端,两边同时控制ESP32的WS2812B灯板,看是否可行。
MQTT服务端我使用的是本机架设的broker,当然如果处于实验的目的最简单还是直接用公共broker,无需任何配置直接就可用。这块网上教程很多,且与本项目相关性不大,就不再赘述了。我们先编写一下RPI-PICO-W这边的代码,需要注意的是,这里我把WIFI连接写在了SETTINGS中,并没有在代码中体现:
import wifi
import socketpool
import adafruit_minimqtt.adafruit_minimqtt as MQTT
mqtt_server = '192.168.50.100'
port = 1883
user = ''
pwd = ''
client_id = 'rpi_2'
services = 'homeassistant'
topic_state = services + "/binary_sensor/" + 'rpi_1' + "/state"
last_message = 0
message_interval = 1
def connect(mqtt_client, userdata, flags, rc):
# This function will be called when the mqtt_client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print("Flags: {0}\n RC: {1}".format(flags, rc))
mqtt_client.subscribe(topic_state)
def disconnect(mqtt_client, userdata, rc):
# This method is called when the mqtt_client disconnects
# from the broker.
print("Disconnected from MQTT Broker!")
def subscribe(mqtt_client, userdata, topic, granted_qos):
# This method is called when the mqtt_client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def unsubscribe(mqtt_client, userdata, topic, pid):
# This method is called when the mqtt_client unsubscribes from a feed.
print("Unsubscribed from {0} with PID {1}".format(topic, pid))
def publish(mqtt_client, userdata, topic, pid):
# This method is called when the mqtt_client publishes data to a feed.
print("Published to {0} with PID {1}".format(topic, pid))
def message(client, topic, message):
print("New message on topic {0}: {1}".format(topic, message))
# Set up a MiniMQTT Client
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT.MQTT(
broker=mqtt_server,
port=port,
username=user,
password=pwd,
socket_pool=pool,
client_id=client_id
)
# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message
mqtt_client.connect()
while True:
# Poll the message queue
mqtt_client.loop()
mqtt_client.publish(topic_state, input("\nCMD:> "))
这是其中一个的代码,另一个RPI-PICO-W只需要改一下client_id = 'rpi_1'即可,这样可以避免设备出现重名。
接下来我们再改一下ESP32部分的代码,首先先增加有关MQTT的配置部分,并在每次订阅的topic收到消息后,将WS2812B切换为下一个状态。:
mqtt_server = '192.168.50.100'
port = 1883
user = ''
pwd = ''
client_id = 'esp32'
services = 'homeassistant'
topic_state = services + "/binary_sensor/" + 'rpi_1' + "/state"
last_message = 0
message_interval = 1
def connect(mqtt_client, userdata, flags, rc):
# This function will be called when the mqtt_client is connected
# successfully to the broker.
print("Connected to MQTT Broker!")
print("Flags: {0}\n RC: {1}".format(flags, rc))
mqtt_client.subscribe(topic_state)
def subscribe(mqtt_client, userdata, topic, granted_qos):
# This method is called when the mqtt_client subscribes to a new feed.
print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def message(client, topic, message):
global idx
print("New message on topic {0}: {1}".format(topic, message))
idx = (idx + 1) % (len(list1) + 2)
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT.MQTT(
broker=mqtt_server,
port=port,
username=user,
password=pwd,
socket_pool=pool,
client_id=client_id
)
# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_message = message
mqtt_client.connect()
配置完成后我们还需要在loop中添加MQTT轮询命令。由于该命令比较耗时,如果在每一次loop都执行的话就会让流水灯显示非常的慢,几乎无法实现流水效果。因此我们增加一个计数器,每30次loop查询一下MQTT服务:
idx = 8
times = 0
def loop():
global idx, times
times = (times + 1)%30
if not times:
mqtt_client.loop()
if not btn.value:
while not btn.value:
pass
idx = (idx + 1) % (len(list1) + 2)
if idx < len(list1):
pixel.fill(list1[idx])
pixels.fill(list1[idx])
pixels.show()
text_tg.text = list2[idx]
else:
if idx == len(list1):
breathing()
breathing_matrix()
text_tg.text = "彩虹正"
else:
breathing(True)
breathing_matrix(True)
text_tg.text = "彩虹反"
至此全部代码完成,我们看一下效果,可以看到,我开了两个串口终端来分别控制两个RPI-PICO-W,我在任意一个串口终端中打入命令,都可以正确的遥控到ESP32去改变WS2812B的状态。
https://download.eeworld.com.cn/detail/eew_AG2DvH/629301
心得体会
经过这次EEWORLD与得捷电子共同举办的followme活动,我已经可以完全上手Circuitpython的使用,用它完成一些日常的小项目。希望未来两期的活动可以有更多有意思的硬件让大家体验,希望活动越办越好。
|