874|1

11

帖子

3

TA的资源

一粒金砂(中级)

楼主
 

【得捷电子Follow me第2期】WS2812B灯板控制及MQTT通讯实验 [复制链接]

 

大家好,我是一名业余电子DIY爱好者,这次活动的硬件是Adafruit ESP32-S3 TFT Feather,这块板子的flash有4M,PSRAM有2M,自带了一块1.14寸的IPS显示屏,还附带了一个锂电池充放管理电路,可以做一些可穿戴的开发。

 

必做任务1:控制屏幕显示中文

第一个任务我们先来尝试一下如何使用Circuitpython来显示中文字符。

在传统的嵌入式开发中,显示中文字符一直是个比较麻烦的事情,因为要先自行对需要显示的汉字进行统计,随后进行取模。如果显示的汉字比较多的话还可能出现占用内存过大的问题。但是,由于这块板子内存足够大,我们甚至可以导入一个完整的中文字体库,这样就不再需要事先统计好需要显示哪些汉字了。这在某些应用,比如网页浏览器,或是电子书等功能实现上非常有用。

 

我们先来看看代码实现的方式。Circuitpython固件中已经自带了屏幕的驱动,因此用起来非常方便,初始化代码仅需要三行:

import board
import displayio
from adafruit_bitmap_font import bitmap_font
from adafruit_display_text import label

display = board.DISPLAY
group = displayio.Group()
display.show(group)

 

重点来了,因为我们需要显示中文,所以要自己下载PCF字体文件并加载进代码中。

font = bitmap_font.load_font("/wenquanyi_13px.pcf")

 

接下来我们创建一个文字元素,并把它添加到屏幕中显示,所有的设置过程就完成了。在这里我们先预先定义一个常用颜色字典,方便使用。

color = {
        "black" : 0x000000,
        "white" : 0xFFFFFF,
        "red" : 0xFF0000,
        "green" : 0x00FF00,
        "blue" : 0x0000FF,
        "cyan" : 0x00FFFF,
        "magenta" : 0xFF00FF,
        "yellow" : 0xFFFF00,
        }

text_tg = label.Label(font, color=color["white"])
text_tg.y = 60
text_tg.x = 0
text_tg.background_color = None
text_tg.line_spacing = 1.0
text_tg.scale = 3
group.append(text_tg)

 

最后,只需要在代码中设定该元素的文字内容即可。当这个文字内容发生变化时,新的文字会同步出现在屏幕上。

text_tg.text = "你好,世界!"

必做任务2:网络功能使用

Circuitpython 连接网络也十分方便,只需要导入对应的WIFI库即可。连接上网络后,我们可以把获取到的IP显示在屏幕上,已验证是否成功连接到网络。

import wifi


wifi.radio.connect(ssid="SSID", password="PWD")
print("my IP addr:", wifi.radio.ipv4_address)
text_tg.text = str(wifi.radio.ipv4_address) # 利用任务一中初始化好的显示代码
while 1:
    True


必做任务3:控制WS2812B

这块板子上自带了一个WS2812B灯珠,我们可以先利用这个灯珠测试一下neopixel这个库的功能,测试过后再将其拓展到灯板上。

先导入相应的库,然后创建好灯珠对象:

import board
import neopixel

pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.04)

为了实现彩虹灯的变换效果,我们需要让RGB三个通道的数值来回扫描,在这里我们进行了一些简单的数学变换来实现这个效果,并把它封装成一个函数:

i = 0
res = 255
step = 10
def breathing(reverse = False):
    global pixels, i
    if reverse:
        j = 2*res - i
    else:
        j = i 
    pixel[0] = (abs((j+int(0))%(res*2)-res),abs((j+int(res*2/3))%(res*2)-res),abs((j+int(res*2*2/3))%(res*2)-res))
    i = (i + step)%(2*res)

while True:
	breathing()

我们就可以看到效果:

按键控制部分我放在了第四部分一起完成。

 

必做任务4:WS2812B效果控制

接下来我们将上面的代码拓展到灯板上,单色灯显示比较简单,因为每一个灯的颜色都是一样的,我们直接用fill方法就可以实现。但流水灯在这里会比较麻烦,因为每个灯的颜色都不一样,所以在这里使用了循环来实现。

def breathing_matrix(reverse = False):
    global pixels, i
    for k in range(64):
        j = (i + (k * step))%(2*res)
        if reverse:
            o = 63-k
        else:
            o = k
        if int(o/8)%2:
            l = o
        else:
            l = (7-(o%8)) + (int(o/8)*8)
        # l = o
        pixels[l] = (abs((j+int(0))%(res*2)-res),abs((j+int(res*2/3))%(res*2)-res),abs((j+int(res*2*2/3))%(res*2)-res))
    i = (i + step)%(2*res)
    pixels.show()

 

至此,功能模块部分都完成了,我们需要把上面做过的屏幕显示和WB2812B控制部分放到一起加入主循环。别忘了在这部分添加按键切换。

list1 = [
        color["black"],
        color["white"],
        color["red"],
        color["green"],
        color["blue"],
        color["cyan"],
        color["magenta"],
        color["yellow"],
        ]

list2 = [
        "关闭",
        "白色",
        "红色",
        "绿色",
        "蓝色",
        "青色",
        "品红",
        "黄色",
        ]


idx = 8
def loop():
    global idx
    if not btn.value:
        while not btn.value:
            pass
        idx = (idx + 1) % (len(list1) + 2)
    if idx < len(list1):
        pixel.fill(list1[idx])
        pixels.fill(list1[idx])
        pixels.show()
        text_tg.text = list2[idx]
    else:
        if idx == len(list1):
            breathing()
            breathing_matrix()
            
            text_tg.text = "彩虹正"
        else:
            breathing(True)
            breathing_matrix(True)
            text_tg.text = "彩虹反"

while True:
    gc.collect()
    loop()
        

 

最终显示效果如下:


可选任务5:通过网络控制WS2812B.

我在采购物料的时候多带了两块RPI-PICO-W,刚好在这次的项目上可以尝试下MQTT多对一通讯。我打算让两块RPI-PICO-W都作为遥控发射端,两边同时控制ESP32的WS2812B灯板,看是否可行。

 

MQTT服务端我使用的是本机架设的broker,当然如果处于实验的目的最简单还是直接用公共broker,无需任何配置直接就可用。这块网上教程很多,且与本项目相关性不大,就不再赘述了。我们先编写一下RPI-PICO-W这边的代码,需要注意的是,这里我把WIFI连接写在了SETTINGS中,并没有在代码中体现:

import wifi
import socketpool
import adafruit_minimqtt.adafruit_minimqtt as MQTT


mqtt_server = '192.168.50.100'
port = 1883
user = ''
pwd = ''
client_id = 'rpi_2'
services = 'homeassistant'
topic_state = services + "/binary_sensor/" + 'rpi_1' + "/state"
last_message = 0
message_interval = 1

def connect(mqtt_client, userdata, flags, rc):
    # This function will be called when the mqtt_client is connected
    # successfully to the broker.
    print("Connected to MQTT Broker!")
    print("Flags: {0}\n RC: {1}".format(flags, rc))
    mqtt_client.subscribe(topic_state)


def disconnect(mqtt_client, userdata, rc):
    # This method is called when the mqtt_client disconnects
    # from the broker.
    print("Disconnected from MQTT Broker!")


def subscribe(mqtt_client, userdata, topic, granted_qos):
    # This method is called when the mqtt_client subscribes to a new feed.
    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))


def unsubscribe(mqtt_client, userdata, topic, pid):
    # This method is called when the mqtt_client unsubscribes from a feed.
    print("Unsubscribed from {0} with PID {1}".format(topic, pid))


def publish(mqtt_client, userdata, topic, pid):
    # This method is called when the mqtt_client publishes data to a feed.
    print("Published to {0} with PID {1}".format(topic, pid))


def message(client, topic, message):
    print("New message on topic {0}: {1}".format(topic, message))
    

# Set up a MiniMQTT Client
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT.MQTT(
    broker=mqtt_server,
    port=port,
    username=user,
    password=pwd,
    socket_pool=pool,
    client_id=client_id
)

# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_disconnect = disconnect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_unsubscribe = unsubscribe
mqtt_client.on_publish = publish
mqtt_client.on_message = message

mqtt_client.connect()

while True:
    # Poll the message queue
    mqtt_client.loop()
    mqtt_client.publish(topic_state, input("\nCMD:> "))
    

 

这是其中一个的代码,另一个RPI-PICO-W只需要改一下client_id = 'rpi_1'即可,这样可以避免设备出现重名。

 

接下来我们再改一下ESP32部分的代码,首先先增加有关MQTT的配置部分,并在每次订阅的topic收到消息后,将WS2812B切换为下一个状态。:

mqtt_server = '192.168.50.100'
port = 1883
user = ''
pwd = ''
client_id = 'esp32'
services = 'homeassistant'
topic_state = services + "/binary_sensor/" + 'rpi_1' + "/state"
last_message = 0
message_interval = 1
def connect(mqtt_client, userdata, flags, rc):
    # This function will be called when the mqtt_client is connected
    # successfully to the broker.
    print("Connected to MQTT Broker!")
    print("Flags: {0}\n RC: {1}".format(flags, rc))
    mqtt_client.subscribe(topic_state)
def subscribe(mqtt_client, userdata, topic, granted_qos):
    # This method is called when the mqtt_client subscribes to a new feed.
    print("Subscribed to {0} with QOS level {1}".format(topic, granted_qos))
def message(client, topic, message):
    global idx
    print("New message on topic {0}: {1}".format(topic, message))
    idx = (idx + 1) % (len(list1) + 2)
pool = socketpool.SocketPool(wifi.radio)
mqtt_client = MQTT.MQTT(
    broker=mqtt_server,
    port=port,
    username=user,
    password=pwd,
    socket_pool=pool,
    client_id=client_id
)
# Connect callback handlers to mqtt_client
mqtt_client.on_connect = connect
mqtt_client.on_subscribe = subscribe
mqtt_client.on_message = message

mqtt_client.connect()

 

配置完成后我们还需要在loop中添加MQTT轮询命令。由于该命令比较耗时,如果在每一次loop都执行的话就会让流水灯显示非常的慢,几乎无法实现流水效果。因此我们增加一个计数器,每30次loop查询一下MQTT服务:

idx = 8
times = 0
def loop():
    global idx, times
    times = (times + 1)%30
    if not times:
        mqtt_client.loop()
    if not btn.value:
        while not btn.value:
            pass
        idx = (idx + 1) % (len(list1) + 2)
    if idx < len(list1):
        pixel.fill(list1[idx])
        pixels.fill(list1[idx])
        pixels.show()
        text_tg.text = list2[idx]
    else:
        if idx == len(list1):
            breathing()
            breathing_matrix()
            
            text_tg.text = "彩虹正"
        else:
            breathing(True)
            breathing_matrix(True)
            text_tg.text = "彩虹反"

 

至此全部代码完成,我们看一下效果,可以看到,我开了两个串口终端来分别控制两个RPI-PICO-W,我在任意一个串口终端中打入命令,都可以正确的遥控到ESP32去改变WS2812B的状态。

FOLLOWME2.rar (387.67 KB, 下载次数: 1)

https://download.eeworld.com.cn/detail/eew_AG2DvH/629301

 

心得体会

经过这次EEWORLD与得捷电子共同举办的followme活动,我已经可以完全上手Circuitpython的使用,用它完成一些日常的小项目。希望未来两期的活动可以有更多有意思的硬件让大家体验,希望活动越办越好。

 

最新回复

显示中文通常是比较麻烦的事,楼主的是个好方法,方便   详情 回复 发表于 2023-10-4 10:09
点赞 关注
 
 

回复
举报

6828

帖子

0

TA的资源

五彩晶圆(高级)

沙发
 

显示中文通常是比较麻烦的事,楼主的是个好方法,方便

 
 
 

回复
您需要登录后才可以回帖 登录 | 注册

随便看看
查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/7 下一条

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2025 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表