【得捷电子Follow me第2期】任务5:通过网络控制WS2812B
[复制链接]
本帖最后由 怀66 于 2023-9-10 11:41 编辑
任务5:结合123,在手机上通过网络控制板载Neopixel LED的显示和颜色切换,屏幕同步显示状态
这里我找到了三个解决方案:
①参考官方的mqtt服务器:Adafruit IO:发送和接收数据 |阿达水果 ESP32-S3 TFT 羽毛 |阿达果学习系统,最重的是注册adafruit io账号并拿到key,链接里有详细介绍一步一步来就行。
②就是抄的大佬作业了,要写一个网页,我不会,所以就恬不知耻的抄了大老的啦,链接:【得捷电子Follow me第2期】任务5:通过网络控制WS2812B - DigiKey得捷电子技术专区 - 电子工程世界-论坛 (eeworld.com.cn),【得捷电子Follow me第2期】CircuitPython入门到完成任务1-5详细教程 - DigiKey得捷电子技术专区 - 电子工程世界-论坛 (eeworld.com.cn)
③就是我自己原来写的tcp实验,服务端发rgb数据给esp32,esp32处理一下信息就好了,稍微加一点代码就可以了。tcp实验:【得捷电子Follow me第2期】任务2:网络功能使用 - DigiKey得捷电子技术专区 - 电子工程世界-论坛 (eeworld.com.cn)。
上效果:
播放器加载失败: 未检测到Flash Player,请到 安装
202309101127
①和②代码分别是参考官方和大佬的,我自己也都做了修改。③就是自己写的,所以比较简陋。
代码①:
- import time
- import ssl
- import os
- from random import randint
- import microcontroller
- import socketpool
- import wifi
- import board
- import neopixel
- import adafruit_minimqtt.adafruit_minimqtt as MQTT
- from adafruit_io.adafruit_io import IO_MQTT
-
- import displayio
- from adafruit_display_text import label
- from adafruit_st7789 import ST7789
- from adafruit_bitmap_font import bitmap_font
-
-
-
- BORDER = 20
- FONTSCALE = 2
- BACKGROUND_COLOR = 0xffffff
- TEXT_COLOR = 0x000000
- font_file = "wenquanyi_13px.pcf"
- font = bitmap_font.load_font(font_file)
-
-
- displayio.release_displays()
-
- spi = board.SPI()
- tft_cs = board.TFT_CS
- tft_dc = board.TFT_DC
-
- display_bus = displayio.FourWire(spi, command=tft_dc, chip_select=tft_cs)
- display = ST7789(
- display_bus, rotation=270, width=240, height=135, rowstart=40, colstart=53
- )
-
-
- splash = displayio.Group()
- display.show(splash)
-
- color_bitmap = displayio.Bitmap(display.width, display.height, 1)
- color_palette = displayio.Palette(1)
- color_palette[0] = BACKGROUND_COLOR
-
- bg_sprite = displayio.TileGrid(color_bitmap, pixel_shader=color_palette, x=0, y=0)
- splash.append(bg_sprite)
-
-
- text = "这里显示颜色"
- text_area = label.Label(font, text=text, color=TEXT_COLOR)
- text_width = text_area.bounding_box[2] * FONTSCALE
- text_group = displayio.Group(
- scale=FONTSCALE,
- x=display.width // 2 - text_width // 2,
- y=display.height // 2,
- )
- text_group.append(text_area)
- splash.append(text_group)
-
-
-
- try:
- print("Connecting to %s" % os.getenv("WIFI_SSID"))
- wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
- print("Connected to %s!" % os.getenv("WIFI_SSID"))
-
- except Exception as e:
- print("Failed to connect to WiFi. Error:", e, "\nBoard will hard reset in 30 seconds.")
- time.sleep(30)
- microcontroller.reset()
-
-
- pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
-
-
-
- def connected(client):
- print("Connected to Adafruit IO! Listening for NeoPixel changes...")
-
- client.subscribe("neopixel")
-
-
- def message(client, feed_id, payload):
- print("Feed {0} received new value: {1}".format(feed_id, payload))
- if feed_id == "neopixel":
- pixel.fill(int(payload[1:], 16))
- color_palette[0] = int(payload[1:], 16)
-
-
-
-
- pool = socketpool.SocketPool(wifi.radio)
-
-
- mqtt_client = MQTT.MQTT(
- broker="io.adafruit.com",
- username=os.getenv("AIO_USERNAME"),
- password=os.getenv("AIO_KEY"),
- socket_pool=pool,
- ssl_context=ssl.create_default_context(),
- )
-
-
- io = IO_MQTT(mqtt_client)
-
-
- io.on_connect = connected
- io.on_message = message
-
- timestamp = 0
- while True:
- try:
-
- if not io.is_connected:
-
- print("Connecting to Adafruit IO...")
- io.connect()
-
-
- io.loop()
-
- if (time.monotonic() - timestamp) >= 10:
- random_number = "{}".format(randint(0, 255))
- print("Current 'random' number: {}".format(random_number))
- io.publish("random", random_number)
- timestamp = time.monotonic()
-
-
-
- except Exception as e:
- print("Failed to get or send data, or connect. Error:", e,
- "\nBoard will hard reset in 30 seconds.")
- time.sleep(30)
- microcontroller.reset()
-
-
-
代码②:
- import board
- import digitalio
- import terminalio
- import busio
- import neopixel
- import rtc
- import time
- import os
-
- import wifi
- import socketpool
- import ssl
- import adafruit_requests
- from adafruit_httpserver import Server, Request, Response, Websocket, GET
-
- import displayio
- from adafruit_display_text import label
- from adafruit_st7789 import ST7789
- from adafruit_bitmap_font import bitmap_font
-
-
-
- BORDER = 20
- FONTSCALE = 2
- BACKGROUND_COLOR = 0xffffff
- TEXT_COLOR = 0x000000
- font_file = "wenquanyi_13px.pcf"
- font = bitmap_font.load_font(font_file)
-
-
- displayio.release_displays()
-
- spi = board.SPI()
- tft_cs = board.TFT_CS
- tft_dc = board.TFT_DC
-
- display_bus = displayio.FourWire(spi, command=tft_dc, chip_select=tft_cs)
- display = ST7789(
- display_bus, rotation=270, width=240, height=135, rowstart=40, colstart=53
- )
-
-
- splash = displayio.Group()
- display.show(splash)
-
- color_bitmap = displayio.Bitmap(display.width, display.height, 1)
- color_palette = displayio.Palette(1)
- color_palette[0] = BACKGROUND_COLOR
-
- bg_sprite = displayio.TileGrid(color_bitmap, pixel_shader=color_palette, x=0, y=0)
- splash.append(bg_sprite)
-
-
- text = "开始初始化中"
- text_area = label.Label(font, text=text, color=TEXT_COLOR)
- text_width = text_area.bounding_box[2] * FONTSCALE
- text_group = displayio.Group(
- scale=FONTSCALE,
- x=display.width // 2 - text_width // 2,
- y=display.height // 2,
- )
- text_group.append(text_area)
- splash.append(text_group)
- time.sleep(2)
-
-
-
-
- text_area.text = '尝试连接WIFI'
- time.sleep(2)
- try:
- wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
- print(f"My IP address: {wifi.radio.ipv4_address}")
- text_area.text = '连接WIFI成功'
- time.sleep(2)
- except:
- text_area.text = '连接WIFI失败'
- time.sleep(2)
- text_area.text = '尝试开启热点'
- time.sleep(2)
- if(wifi.radio.connected == False):
- wifi.radio.start_ap(ssid = 'Esp32控制小灯', password = '')
- print("未连接WIFI开启热点模式,WIFI名称为'Esp32控制小灯'")
- text_area.text = '开启热点成功'
- time.sleep(2)
- Wifi_Mode = 'STA' if(wifi.radio.connected == True) else 'AP'
- Wifi_ip = wifi.radio.ipv4_address if(wifi.radio.connected == True) else wifi.radio.ipv4_address_ap
-
-
-
-
- pool = socketpool.SocketPool(wifi.radio)
- server = Server(pool, debug=True)
-
- websocket: Websocket = None
- next_message_time = time.monotonic()
-
- HTML_TEMPLATE = """
- <html lang="en">
- <head>
- <!--注释:强制html使用utf-8编码-->
- <meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
- <title>Web控制中心</title>
- </head>
- <body>
- <p>ESP32S3: <strong>-</strong></p>
- <p>RGB 颜色: <input type="color"></p>
- <script>
- const Button_count = document.querySelector('strong');
- const colorPicker = document.querySelector('input[type="color"]');
-
- let ws = new WebSocket('ws://' + location.host + '/connect-websocket');
-
- ws.onopen = () => console.log('WebSocket connection opened');
- ws.onclose = () => console.log('WebSocket connection closed');
- ws.onmessage = event => Button_count.textContent = event.data;
- ws.onerror = error => Button_count.textContent = error;
-
- colorPicker.oninput = debounce(() => ws.send(colorPicker.value), 200);
- <!--注释:创建一个延时函数,消除RGB调整抖动-->
- function debounce(callback, delay = 1000) {
- let timeout
- return (...args) => {
- clearTimeout(timeout)
- timeout = setTimeout(() => {
- callback(...args)
- }, delay)
- }
- }
- </script>
- </body>
- </html>
- """
-
- @server.route("/client", GET)
- def client(request: Request):
- return Response(request, HTML_TEMPLATE, content_type="text/html")
-
- @server.route("/connect-websocket", GET)
- def connect_client(request: Request):
- global websocket
- if websocket is not None:
- print("websocket.close")
- websocket.close()
- websocket = Websocket(request)
- return websocket
-
-
-
- if(wifi.radio.connected == True):
- server.start(str(wifi.radio.ipv4_address), port = 1080)
- print('http://'+str(wifi.radio.ipv4_address)+':1080'+'/client')
- else:
- server.start(str(wifi.radio.ipv4_address_ap), port = 1080)
- print('http://'+str(wifi.radio.ipv4_address_ap)+':1080'+'/client')
-
-
- pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
- text_area.text = '这里显示颜色'
-
-
- time.sleep(2)
-
- while True:
-
- server.poll()
-
- if websocket is not None:
- if (data := websocket.receive(True)) is not None:
- RGBr, RGBg, RGBb = int(data[1:3], 16), int(data[3:5], 16), int(data[5:7], 16)
- pixel.fill((RGBr, RGBg, RGBb))
- color_palette[0] = (RGBr, RGBg, RGBb)
-
-
- if websocket is not None and next_message_time < time.monotonic():
- time.sleep(0.2)
- if websocket is not None:
- websocket.send_message(str('666'))
- next_message_time = time.monotonic() + 1
-
-
-
代码三:
电脑服务端:
-
-
-
-
- import socket
- import sys
- import time
-
-
- serversocket = socket.socket(
- socket.AF_INET, socket.SOCK_STREAM)
-
-
- host = socket.gethostname()
-
- port = 9999
-
-
- serversocket.bind(('192.168.1.103', port))
-
-
- serversocket.listen(5)
-
- print('\n'+'{:-^30}'.format('等待客户连接'))
- clientsocket,addr = serversocket.accept()
- print("连接地址: %s" % str(addr))
- print(clientsocket.recv(1024))
- msg='HELLO WORLD!\r\n'
-
-
- while True:
-
-
- print('{:*^20}'.format('请输入信息'))
- msg = input()
- clientsocket.send(msg.encode('utf-8'))
- time.sleep(0.5)
- msg_back = clientsocket.recv(1024)
-
-
- print(msg_back)
- clientsocket.close()
ESP32:
- import wifi, os, time
- import neopixel, board
- import displayio
- from adafruit_display_text import label
- from adafruit_st7789 import ST7789
- from adafruit_bitmap_font import bitmap_font
-
- pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
-
-
-
-
-
-
- BORDER = 20
- FONTSCALE = 2
- BACKGROUND_COLOR = 0xffffff
- TEXT_COLOR = 0x000000
- font_file = "wenquanyi_13px.pcf"
- font = bitmap_font.load_font(font_file)
-
-
- displayio.release_displays()
-
- spi = board.SPI()
- tft_cs = board.TFT_CS
- tft_dc = board.TFT_DC
-
- display_bus = displayio.FourWire(spi, command=tft_dc, chip_select=tft_cs)
- display = ST7789(
- display_bus, rotation=270, width=240, height=135, rowstart=40, colstart=53
- )
-
-
- splash = displayio.Group()
- display.show(splash)
-
- color_bitmap = displayio.Bitmap(display.width, display.height, 1)
- color_palette = displayio.Palette(1)
- color_palette[0] = BACKGROUND_COLOR
-
- bg_sprite = displayio.TileGrid(color_bitmap, pixel_shader=color_palette, x=0, y=0)
- splash.append(bg_sprite)
-
-
- text = "这里显示颜色"
- text_area = label.Label(font, text=text, color=TEXT_COLOR)
- text_width = text_area.bounding_box[2] * FONTSCALE
- text_group = displayio.Group(
- scale=FONTSCALE,
- x=display.width // 2 - text_width // 2,
- y=display.height // 2,
- )
- text_group.append(text_area)
- splash.append(text_group)
-
-
- while True:
- try:
- wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
- break
- except:
- print('正在尝试连接WIFI')
- print(f"My IP address: {wifi.radio.ipv4_address}")
-
- import socketpool
- import time
-
- HOST = "192.168.1.103"
- PORT = 9999
- TIMEOUT = 30
-
- pool = socketpool.SocketPool(wifi.radio)
- print("Creating Socket")
-
- def solve_rgb(buff):
- r,g,b = str(buff).split(',')
- r = r.split("'")[-1]
- b = b.split('\\')[0]
- return int(r),int(g),int(b)
-
- while True:
- try:
- sock=pool.socket(pool.AF_INET, pool.SOCK_STREAM)
- sock.settimeout(TIMEOUT)
- sock.connect((HOST, PORT))
- print('已连接上服务端')
- sock.send(b'client is ok, ready to change color')
- break
- except:
- print('请检查TCP服务器,尝试重新连接')
-
-
- while True:
- buff = bytearray(15)
- numbytes = sock.recv_into(buff)
- if(numbytes >= 0):
- print(buff)
- try:
- pixel.fill(solve_rgb(buff))
-
- color_palette[0] = solve_rgb(buff)
- sent = sock.send(bytearray('color has change'))
- except:
- print('换色失败')
- sent = sock.send(bytearray('color change default'))
-
-
-
-
-
-
-
|