772|2

22

帖子

0

TA的资源

一粒金砂(中级)

楼主
 

【得捷电子Follow me第2期】任务5:通过网络控制WS2812B [复制链接]

  本帖最后由 怀66 于 2023-9-10 11:41 编辑

任务5:结合123,在手机上通过网络控制板载Neopixel LED的显示和颜色切换,屏幕同步显示状态

这里我找到了三个解决方案:

    参考官方的mqtt服务器:Adafruit IO:发送和接收数据 |阿达水果 ESP32-S3 TFT 羽毛 |阿达果学习系统,最重的是注册adafruit io账号并拿到key,链接里有详细介绍一步一步来就行。

    就是抄的大佬作业了,要写一个网页,我不会,所以就恬不知耻的抄了大老的啦,链接:【得捷电子Follow me第2期】任务5:通过网络控制WS2812B - DigiKey得捷电子技术专区 - 电子工程世界-论坛 (eeworld.com.cn)【得捷电子Follow me第2期】CircuitPython入门到完成任务1-5详细教程 - DigiKey得捷电子技术专区 - 电子工程世界-论坛 (eeworld.com.cn)

    就是我自己原来写的tcp实验,服务端发rgb数据给esp32,esp32处理一下信息就好了,稍微加一点代码就可以了。tcp实验:【得捷电子Follow me第2期】任务2:网络功能使用 - DigiKey得捷电子技术专区 - 电子工程世界-论坛 (eeworld.com.cn)

 

上效果:

播放器加载失败: 未检测到Flash Player,请到安装
202309101127

 

①和②代码分别是参考官方和大佬的,我自己也都做了修改。③就是自己写的,所以比较简陋。

代码①:

  • import time
  • import ssl
  • import os
  • from random import randint
  • import microcontroller
  • import socketpool
  • import wifi
  • import board
  • import neopixel
  • import adafruit_minimqtt.adafruit_minimqtt as MQTT
  • from adafruit_io.adafruit_io import IO_MQTT
  • import displayio
  • from adafruit_display_text import label
  • from adafruit_st7789 import ST7789
  • from adafruit_bitmap_font import bitmap_font
  • #-----------------------显示设置-------------------------------------------------
  • # First set some parameters used for shapes and text
  • BORDER = 20
  • FONTSCALE = 2
  • BACKGROUND_COLOR = 0xffffff # Bright Green
  • TEXT_COLOR = 0x000000
  • font_file = "wenquanyi_13px.pcf"
  • font = bitmap_font.load_font(font_file)
  • # Release any resources currently in use for the displays
  • displayio.release_displays()
  • spi = board.SPI()
  • tft_cs = board.TFT_CS
  • tft_dc = board.TFT_DC
  • display_bus = displayio.FourWire(spi, command=tft_dc, chip_select=tft_cs)
  • display = ST7789(
  • display_bus, rotation=270, width=240, height=135, rowstart=40, colstart=53
  • )
  • # Make the display context
  • splash = displayio.Group()
  • display.show(splash)
  • color_bitmap = displayio.Bitmap(display.width, display.height, 1)
  • color_palette = displayio.Palette(1)
  • color_palette[0] = BACKGROUND_COLOR
  • bg_sprite = displayio.TileGrid(color_bitmap, pixel_shader=color_palette, x=0, y=0)
  • splash.append(bg_sprite)
  • # Draw a label
  • text = "这里显示颜色"
  • text_area = label.Label(font, text=text, color=TEXT_COLOR)
  • text_width = text_area.bounding_box[2] * FONTSCALE
  • text_group = displayio.Group(
  • scale=FONTSCALE,
  • x=display.width // 2 - text_width // 2,
  • y=display.height // 2,
  • )
  • text_group.append(text_area) # Subgroup for text scaling
  • splash.append(text_group)
  • #------------------------------分割线------------------------------------------------------
  • # WiFi
  • try:
  • print("Connecting to %s" % os.getenv("WIFI_SSID"))
  • wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
  • print("Connected to %s!" % os.getenv("WIFI_SSID"))
  • # Wi-Fi connectivity fails with error messages, not specific errors, so this except is broad.
  • except Exception as e: # pylint: disable=broad-except
  • print("Failed to connect to WiFi. Error:", e, "\nBoard will hard reset in 30 seconds.")
  • time.sleep(30)
  • microcontroller.reset()
  • # Initialise NeoPixel
  • pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
  • # Define callback functions which will be called when certain events happen.
  • def connected(client):
  • print("Connected to Adafruit IO! Listening for NeoPixel changes...")
  • # Subscribe to Adafruit IO feed called "neopixel"
  • client.subscribe("neopixel")
  • def message(client, feed_id, payload): # pylint: disable=unused-argument
  • print("Feed {0} received new value: {1}".format(feed_id, payload))
  • if feed_id == "neopixel":
  • pixel.fill(int(payload[1:], 16))
  • color_palette[0] = int(payload[1:], 16)
  • # Create a socket pool
  • pool = socketpool.SocketPool(wifi.radio)
  • # Initialize a new MQTT Client object
  • mqtt_client = MQTT.MQTT(
  • broker="io.adafruit.com",
  • username=os.getenv("AIO_USERNAME"),
  • password=os.getenv("AIO_KEY"),
  • socket_pool=pool,
  • ssl_context=ssl.create_default_context(),
  • )
  • # Initialize Adafruit IO MQTT "helper"
  • io = IO_MQTT(mqtt_client)
  • # Set up the callback methods above
  • io.on_connect = connected
  • io.on_message = message
  • timestamp = 0
  • while True:
  • try:
  • # If Adafruit IO is not connected...
  • if not io.is_connected:
  • # Connect the client to the MQTT broker.
  • print("Connecting to Adafruit IO...")
  • io.connect()
  • # Explicitly pump the message loop.
  • io.loop()
  • # Obtain the "random" value, print it and publish it to Adafruit IO every 10 seconds.
  • if (time.monotonic() - timestamp) >= 10:
  • random_number = "{}".format(randint(0, 255))
  • print("Current 'random' number: {}".format(random_number))
  • io.publish("random", random_number)
  • timestamp = time.monotonic()
  • # Adafruit IO fails with internal error types and WiFi fails with specific messages.
  • # This except is broad to handle any possible failure.
  • except Exception as e: # pylint: disable=broad-except
  • print("Failed to get or send data, or connect. Error:", e,
  • "\nBoard will hard reset in 30 seconds.")
  • time.sleep(30)
  • microcontroller.reset()

代码②:

  • import board
  • import digitalio
  • import terminalio
  • import busio
  • import neopixel
  • import rtc
  • import time
  • import os
  • #网络相关库
  • import wifi
  • import socketpool
  • import ssl
  • import adafruit_requests
  • from adafruit_httpserver import Server, Request, Response, Websocket, GET
  • #显示相关库
  • import displayio
  • from adafruit_display_text import label
  • from adafruit_st7789 import ST7789
  • from adafruit_bitmap_font import bitmap_font
  • #-----------------------显示设置-------------------------------------------------
  • # First set some parameters used for shapes and text
  • BORDER = 20
  • FONTSCALE = 2
  • BACKGROUND_COLOR = 0xffffff # Bright Green
  • TEXT_COLOR = 0x000000
  • font_file = "wenquanyi_13px.pcf"
  • font = bitmap_font.load_font(font_file)
  • # Release any resources currently in use for the displays
  • displayio.release_displays()
  • spi = board.SPI()
  • tft_cs = board.TFT_CS
  • tft_dc = board.TFT_DC
  • display_bus = displayio.FourWire(spi, command=tft_dc, chip_select=tft_cs)
  • display = ST7789(
  • display_bus, rotation=270, width=240, height=135, rowstart=40, colstart=53
  • )
  • # Make the display context
  • splash = displayio.Group()
  • display.show(splash)
  • color_bitmap = displayio.Bitmap(display.width, display.height, 1)
  • color_palette = displayio.Palette(1)
  • color_palette[0] = BACKGROUND_COLOR
  • bg_sprite = displayio.TileGrid(color_bitmap, pixel_shader=color_palette, x=0, y=0)
  • splash.append(bg_sprite)
  • # Draw a label
  • text = "开始初始化中"
  • text_area = label.Label(font, text=text, color=TEXT_COLOR)
  • text_width = text_area.bounding_box[2] * FONTSCALE
  • text_group = displayio.Group(
  • scale=FONTSCALE,
  • x=display.width // 2 - text_width // 2,
  • y=display.height // 2,
  • )
  • text_group.append(text_area) # Subgroup for text scaling
  • splash.append(text_group)
  • time.sleep(2)
  • #------------------------------分割线------------------------------------------------------
  • #wifi设置
  • #如果开启了wifi,esp32优先连接wifi,如果没有wifi,esp32开启热点,电脑连接热点后可访问
  • text_area.text = '尝试连接WIFI'
  • time.sleep(2)
  • try:
  • wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
  • print(f"My IP address: {wifi.radio.ipv4_address}")
  • text_area.text = '连接WIFI成功'
  • time.sleep(2)
  • except:
  • text_area.text = '连接WIFI失败'
  • time.sleep(2)
  • text_area.text = '尝试开启热点'
  • time.sleep(2)
  • if(wifi.radio.connected == False):
  • wifi.radio.start_ap(ssid = 'Esp32控制小灯', password = '')
  • print("未连接WIFI开启热点模式,WIFI名称为'Esp32控制小灯'")
  • text_area.text = '开启热点成功'
  • time.sleep(2)
  • Wifi_Mode = 'STA' if(wifi.radio.connected == True) else 'AP'
  • Wifi_ip = wifi.radio.ipv4_address if(wifi.radio.connected == True) else wifi.radio.ipv4_address_ap
  • #---------------------------网页服务器----------------------------------------------------------
  • # GetandSetHttp_Begin
  • pool = socketpool.SocketPool(wifi.radio) # 网络管理,触发http操作
  • server = Server(pool, debug=True)
  • websocket: Websocket = None
  • next_message_time = time.monotonic()#数据更新时钟,以秒为单位变化
  • HTML_TEMPLATE = """
  • <html lang="en">
  • <head>
  • <!--注释:强制html使用utf-8编码-->
  • <meta http-equiv="Content-Type" content="text/html;charset=utf-8"/>
  • <title>Web控制中心</title>
  • </head>
  • <body>
  • <p>ESP32S3: <strong>-</strong></p>
  • <p>RGB 颜色: <input type="color"></p>
  • <script>
  • const Button_count = document.querySelector('strong');
  • const colorPicker = document.querySelector('input[type="color"]');
  • let ws = new WebSocket('ws://' + location.host + '/connect-websocket');
  • ws.onopen = () => console.log('WebSocket connection opened');
  • ws.onclose = () => console.log('WebSocket connection closed');
  • ws.onmessage = event => Button_count.textContent = event.data;
  • ws.onerror = error => Button_count.textContent = error;
  • colorPicker.oninput = debounce(() => ws.send(colorPicker.value), 200);
  • <!--注释:创建一个延时函数,消除RGB调整抖动-->
  • function debounce(callback, delay = 1000) {
  • let timeout
  • return (...args) => {
  • clearTimeout(timeout)
  • timeout = setTimeout(() => {
  • callback(...args)
  • }, delay)
  • }
  • }
  • </script>
  • </body>
  • </html>
  • """
  • #创建Http服务器
  • @server.route("/client", GET)
  • def client(request: Request):
  • return Response(request, HTML_TEMPLATE, content_type="text/html")
  • #创建websocket服务
  • @server.route("/connect-websocket", GET)
  • def connect_client(request: Request):
  • global websocket # pylint: disable=global-statement
  • if websocket is not None:
  • print("websocket.close")
  • websocket.close() # Close any existing connection
  • websocket = Websocket(request)
  • return websocket
  • # ---------------------------------分割线---------------------------------------------------------
  • #判断网络模式,设定location.host
  • if(wifi.radio.connected == True):
  • server.start(str(wifi.radio.ipv4_address), port = 1080)
  • print('http://'+str(wifi.radio.ipv4_address)+':1080'+'/client')
  • else:
  • server.start(str(wifi.radio.ipv4_address_ap), port = 1080)
  • print('http://'+str(wifi.radio.ipv4_address_ap)+':1080'+'/client')
  • #rgb小灯初始化
  • pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
  • text_area.text = '这里显示颜色'
  • # GetandSetHttp_Begin
  • time.sleep(2)#等待2秒进入循环
  • while True:
  • #httpSeverLoop_Begin
  • server.poll()
  • # Check for incoming messages from client
  • if websocket is not None:
  • if (data := websocket.receive(True)) is not None:
  • RGBr, RGBg, RGBb = int(data[1:3], 16), int(data[3:5], 16), int(data[5:7], 16)
  • pixel.fill((RGBr, RGBg, RGBb))
  • color_palette[0] = (RGBr, RGBg, RGBb)
  • # Send a message every second
  • if websocket is not None and next_message_time < time.monotonic():
  • time.sleep(0.2)
  • if websocket is not None:
  • websocket.send_message(str('666'))
  • next_message_time = time.monotonic() + 1
  • #httpSeverLoop_End

代码三:

电脑服务端:

  • #!/usr/bin/python3
  • # 文件名:server.py
  • # 导入 socket、sys 模块
  • import socket
  • import sys
  • import time
  • # 创建 socket 对象
  • serversocket = socket.socket(
  • socket.AF_INET, socket.SOCK_STREAM)
  • # 获取本地主机名
  • host = socket.gethostname()
  • port = 9999
  • # 绑定端口号
  • serversocket.bind(('192.168.1.103', port))
  • # 设置最大连接数,超过后排队
  • serversocket.listen(5)
  • # 建立客户端连接
  • print('\n'+'{:-^30}'.format('等待客户连接'))
  • clientsocket,addr = serversocket.accept()
  • print("连接地址: %s" % str(addr))
  • print(clientsocket.recv(1024))
  • msg='HELLO WORLD!\r\n'
  • # clientsocket.send(msg.encode('utf-8'))
  • while True:
  • # clientsocket,addr = serversocket.accept()
  • # print("连接地址: %s" % str(addr))
  • print('{:*^20}'.format('请输入信息'))
  • msg = input()
  • clientsocket.send(msg.encode('utf-8'))
  • time.sleep(0.5) #等对面发回来
  • msg_back = clientsocket.recv(1024)
  • # while msg_back == 0:
  • # msg_back = clientsocket.recv(1024)
  • print(msg_back)
  • clientsocket.close()

ESP32:

  • import wifi, os, time
  • import neopixel, board
  • import displayio
  • from adafruit_display_text import label
  • from adafruit_st7789 import ST7789
  • from adafruit_bitmap_font import bitmap_font
  • pixel = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=1)
  • # pixel2 = neopixel.NeoPixel(board.D5, 12, brightness=0.1)
  • # pixel.fill((255,255,255))
  • # pixel2.fill((255,255,255))
  • #-----------------------显示设置-------------------------------------------------
  • # First set some parameters used for shapes and text
  • BORDER = 20
  • FONTSCALE = 2
  • BACKGROUND_COLOR = 0xffffff # Bright Green
  • TEXT_COLOR = 0x000000
  • font_file = "wenquanyi_13px.pcf"
  • font = bitmap_font.load_font(font_file)
  • # Release any resources currently in use for the displays
  • displayio.release_displays()
  • spi = board.SPI()
  • tft_cs = board.TFT_CS
  • tft_dc = board.TFT_DC
  • display_bus = displayio.FourWire(spi, command=tft_dc, chip_select=tft_cs)
  • display = ST7789(
  • display_bus, rotation=270, width=240, height=135, rowstart=40, colstart=53
  • )
  • # Make the display context
  • splash = displayio.Group()
  • display.show(splash)
  • color_bitmap = displayio.Bitmap(display.width, display.height, 1)
  • color_palette = displayio.Palette(1)
  • color_palette[0] = BACKGROUND_COLOR
  • bg_sprite = displayio.TileGrid(color_bitmap, pixel_shader=color_palette, x=0, y=0)
  • splash.append(bg_sprite)
  • # Draw a label
  • text = "这里显示颜色"
  • text_area = label.Label(font, text=text, color=TEXT_COLOR)
  • text_width = text_area.bounding_box[2] * FONTSCALE
  • text_group = displayio.Group(
  • scale=FONTSCALE,
  • x=display.width // 2 - text_width // 2,
  • y=display.height // 2,
  • )
  • text_group.append(text_area) # Subgroup for text scaling
  • splash.append(text_group)
  • #------------------------------分割线------------------------------------------------------
  • while True:
  • try:
  • wifi.radio.connect(os.getenv("WIFI_SSID"), os.getenv("WIFI_PASSWORD"))
  • break
  • except:
  • print('正在尝试连接WIFI')
  • print(f"My IP address: {wifi.radio.ipv4_address}")
  • import socketpool
  • import time
  • HOST = "192.168.1.103"
  • PORT = 9999
  • TIMEOUT = 30
  • pool = socketpool.SocketPool(wifi.radio)
  • print("Creating Socket")
  • def solve_rgb(buff):
  • r,g,b = str(buff).split(',')
  • r = r.split("'")[-1]
  • b = b.split('\\')[0]
  • return int(r),int(g),int(b)
  • while True:
  • try:
  • sock=pool.socket(pool.AF_INET, pool.SOCK_STREAM)
  • sock.settimeout(TIMEOUT)
  • sock.connect((HOST, PORT))
  • print('已连接上服务端')
  • sock.send(b'client is ok, ready to change color')
  • break
  • except:
  • print('请检查TCP服务器,尝试重新连接')
  • # time.sleep(1)
  • # sent = sock.send(b"start\n")
  • while True:
  • buff = bytearray(15)
  • numbytes = sock.recv_into(buff)
  • if(numbytes >= 0):
  • print(buff)
  • try:
  • pixel.fill(solve_rgb(buff))
  • # pixel2.fill(solve_rgb(buff))
  • color_palette[0] = solve_rgb(buff)
  • sent = sock.send(bytearray('color has change'))
  • except:
  • print('换色失败')
  • sent = sock.send(bytearray('color change default'))
  • # sent = sock.send(bytearray('client has recieve message:'))
  • # sent = sock.send(buff)
  • # time.sleep(1)

 

最新回复

这个不叫抄的大佬作业啊,有创新点   详情 回复 发表于 2023-9-10 21:06
点赞 关注
 
 

回复
举报

7145

帖子

11

TA的资源

版主

沙发
 

感谢分享详细的任务帖子,这样后面的就有好的学习材料了。

 
 
 

回复

6909

帖子

0

TA的资源

五彩晶圆(高级)

板凳
 

这个不叫抄的大佬作业啊,有创新点

 
 
 

回复
您需要登录后才可以回帖 登录 | 注册

随便看看
查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/10 下一条
艾睿电子& Silicon Labs 有奖直播 | 全新蓝牙信道探测:从技术创新到实际应用
直播时间:3月12日(周三)上午10:00
直播奖励:多功能榨汁机、蓝牙音箱、手机支架

查看 »

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2025 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表