【得捷电子Follow me第2期】使用socket遥控控制的天气钟
[复制链接]
本帖最后由 eew_dy9f48 于 2023-10-3 12:49 编辑
第一部分:
视频演示的这个效果就是将下文中的任务1至任务5合并在一起的效果。看完视频演示,我们在逐一将各个模块拆分,详细的讲解一下。
第二部分:
任务1:控制屏幕显示中文
使用Circuitpython显示中文比较方便,可以直接使用现成的pcf字体来显示。wqy在网上有一套bitmap字体可以直接下载,里面有各种不同大小可以选择。由于开发板内存仅有4M,相对比较紧张,因此我们选用较小的wenquanyi_13px。先初始化好屏幕,定义好常见颜色,并加载字体。
- display = board.DISPLAY
- group = displayio.Group()
- display.show(group)
-
-
- black = 0x000000
- white = 0xFFFFFF
- red = 0xFF0000
- green = 0x00FF00
- yellow = 0xFFFF00
- blue = 0x0000FF
- magenta = 0xFF00FF
- cyan = 0x00FFFF
- font = bitmap_font.load_font("/wenquanyi_13px.pcf")
接着我们单独定义每一个想要显示在屏幕中的元素:
- def splash():
- global time_area, weather_area, temperature_area
- time_area = label.Label(font, color=yellow)
- time_area.y = 60
- time_area.x = 14
- time_area.background_color = None
- time_area.line_spacing = 1.0
- time_area.scale = 4
- group.append(time_area)
-
- weather_area = label.Label(font, color=blue)
- weather_area.y = 20
- weather_area.x = 10
- weather_area.background_color = None
- weather_area.line_spacing = 1.0
- weather_area.scale = 2
- group.append(weather_area)
-
- temperature_area = label.Label(font, color=green)
- temperature_area.y = 110
- temperature_area.x = 10
- temperature_area.background_color = None
- temperature_area.line_spacing = 1.0
- temperature_area.scale = 2
- group.append(temperature_area)
最后只需要按需修改不同元素的内容,就可以显示出相对应的内容来。
任务2:网络功能使用
使用网络功能,仅需在settings.toml中填入WIFI信息,然后RESET开发板即可。
-
- CIRCUITPY_WIFI_SSID="SSID"
- CIRCUITPY_WIFI_PASSWORD="PASSWORD"
任务3:控制WS2812B
开发板上有一颗板载WS2812B灯珠,不是常见的5050封装,要小很多。但使用起来是完全一样的。在Circuitpython中有一个叫做neopixel的官方库,可以让我们非常方便的控制它。控制起来非常简单,只需要两行代码。第一行是创建一个灯珠对象,第二行是设置该对象的颜色。颜色格式是RGB,就这么简单。
- pixels = neopixel.NeoPixel(board.NEOPIXEL, 1, brightness=0.1)
- pixels[0] = (100,255,255)
按键的使用也十分方便,板载的BOOT按钮由于连接的是GPIO,因此可以复用。我们只需要两行代码就可以配置好按键,接下来使用value方法就可以读取到按键的值。
- switch = digitalio.DigitalInOut(board.BUTTON)
- switch.switch_to_input(pull=digitalio.Pull.UP)
- print(switch.value)
任务4:日历&时钟
由于这块板子上有一个2M的PSRAM,因此在这个项目中我并没有使用常规的天气API来获取天气数据,而是尝试直接在开发板上运行爬虫,从web上爬取天气数据。
字符串处理比较消耗内存,我实际测试下来,只要注意内存的分配,并及时用collect方法释放内存,完全可以正常长期工作。
- def weather_get():
- global weather, lost, total
-
- for i in range(5):
- try:
- _url = "https://www.tianqi.com/" + config['city_code'] + "/"
- _headers = {
- 'User-Agent':'Mozilla/5.0 (Linux; Android 12; Pixel 6 Build/SQ3A.220705.004; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/116.0.0.0 Mobile Safari/537.36 [FB_IAB/FB4A;FBAV/407.0.0.0.65;]',
- }
- text = requests.get(_url, headers=_headers).text
- text1=re.search('<p class="now">' + "(.*?)" + "<h6>", text).group(1)
- weather[0] = re.search("<span><b>" + "(.*?)" + "</b>", text1).group(1)
- weather[1] = re.search(weather[0] +"</b>" + "(.*?)" + " ~ ", text1).group(1)
- weather[2] = re.search(" ~ " + "(.*?)" + "\u2103", text1).group(1)
- weather[3] = weather[0]
- weather[4] = re.search("\u7a7a\u6c14\u8d28\u91cf\uff1a" + "(.*?)" + "</h5>", text1).group(1)
- weather[5] = re.search("\u98ce\u5411\uff1a" + "(.*?)" + " ", text1).group(1)
- weather[6] = re.search(weather[5] + " " + "(.*?)" + "</b><b>", text1).group(1)
- weather[7] = re.search("<b>" + "(.*?)" + "</b><i>", text1).group(1)
- weather[8] = re.search("\u6e7f\u5ea6\uff1a" + "(.*?)" + "%</b><b>", text1).group(1)
-
- total += 1
-
- return None
-
- except Exception as error:
- lost += 1
- print("Can not get weather!", lost)
- print(error)
- gc.collect()
- time.sleep(1)
获取到天期信息后,只需要按任务一中在屏幕上定义的各个元素中填入相应获取到的内容即可。
时间信息的获取是直接在上电后通过ntp服务器校准一次时间,随后跟随天气信息一起每间隔一段时间同步一次。
- ntp = adafruit_ntp.NTP(pool, tz_offset=8)
- rtc.RTC().datetime = ntp.datetime
任务5:通过网络控制WS2812B
由于在活动中我还一并购买了一个ATOMS3,它的主控也是ESP32S3,可以连接网络。因此这个任务我打算用socket方式连接两块ESP32S3,Adafruit作为服务端,Atom作为客户端,然后用atom通过网络给服务器发送socket包,以此控制板载的WS2812B灯珠。在这里稍有麻烦的地方是socket服务器是一种blocking的方法,而circuitpython又不支持多线程,因此我们需要用select来让socket server以非blocking的方法运行,不干扰天气钟的正常工作。
- while True:
- gc.collect()
-
- if switch.value == 0:
- while switch.value == 0:
- time.sleep(0.1)
- if sw == "ON":
- sw = "OFF"
- else:
- sw = "ON"
- print(sw)
-
- r, w, err = select.select((sock,), (), (), 1)
- if r:
- for readable in r:
- conn, addr = sock.accept()
- print(6)
- with conn:
- print("Connected by", addr)
- buff = bytearray(128)
- print("Receiving")
- numbytes = conn.recvfrom_into(buff)
- sw = buff[: numbytes[0]]
- print(sw)
- if sw == "ON":
- pixels[0] = (100,255,255)
- elif sw == "OFF":
- pixels[0] = (0,0,0)
-
- try:
- wifi.radio.connect(os.getenv("CIRCUITPY_WIFI_SSID"), os.getenv("CIRCUITPY_WIFI_PASSWORD"))
- loop()
- except Exception as error:
- print(error)
- time.sleep(2)
AtomS3这边就是一个简单的socket client功能,当按下按键后发送对应信息即可。
- import digitalio
- import board
- import wifi
- import socketpool
- import time
-
- switch = digitalio.DigitalInOut(board.BTN)
- switch.switch_to_input(pull=digitalio.Pull.UP)
-
- HOST = "192.168.8.161"
- PORT = 8888
-
- pool = socketpool.SocketPool(wifi.radio)
-
- def sock_send(_msg):
- with pool.socket(pool.AF_INET, pool.SOCK_STREAM) as s:
- s.connect((HOST, PORT))
- s.send(_msg)
-
- msg = [b"OFF", b"ON"]
- index = 0
- while True:
- time.sleep(0.1)
- if not switch.value:
- while not switch.value:
- pass
- index = not index
- try:
- sock_send(msg[index])
- print(msg[index])
- except Exception as error:
- print(error)
第三部分:
https://download.eeworld.com.cn/detail/eew_dy9f48/629303
|