爆肝24小时,用 (Micro)Python 做了个墨水屏桌面摆件!【附源码】
[复制链接]
本帖最后由 ningh 于 2025-3-21 22:08 编辑
储备了不少墨水屏,一直没派上用场,继【用三色墨水屏显示哪吒】之后,又摆弄了一下 7.5 寸 7 色墨水屏(悬空放了两个小时,弯了-。-),换了个 3.7 寸双色继续折腾。
由于之前大致摸清楚了图像抖动算法,突发奇想,只需要将网页转换为图片,不就可以在墨水屏上显示任何内容了?毕竟网页界面制作起来工作量可小多了,而且有丰富的库可以使用,说干就干!
首先,确定技术方案:服务端:Python + Playwright 负责提供接口渲染网页;客户端:MicroPython 连接 WiFi,定期请求网页更新屏幕。
服务端源码:
- import asyncio
- import logging
- import time
- import os
- import uuid
- from io import BytesIO
- from urllib.parse import urljoin
- from secrets import compare_digest
-
- from aiohttp import web
- from dotenv import load_dotenv
- from playwright.async_api import async_playwright
- from PIL import Image
-
- logger = logging.getLogger(__name__)
- load_dotenv()
-
-
- HOST = os.environ.get('HTTP_HOST')
- PORT = int(os.environ.get('HTTP_PORT'))
-
- RENDER_TIMEOUT = int(os.environ.get('RENDER_TIMEOUT', 30))
- TEMPLATE_PATH = os.path.join(os.path.dirname(__file__), os.environ.get('TEMPLATE_PATH', 'templates/hello.html'))
- UPDATE_INTERVAL = int(os.environ.get('UPDATE_INTERVAL', 60 * 15))
-
- IMAGE_URL = str(uuid.uuid4())
-
-
- def _get_mac():
- mac = uuid.getnode()
- return ':'.join(("%012X" % mac)[i:i + 2] for i in range(0, 12, 2))
-
-
- _device = {
- 'model': 'matters-370-opensource',
- 'firmware': '0.0.1',
- 'key': os.environ.get('API_KEY'),
-
-
- 'mac': _get_mac(),
- 'battery': 0.0,
- 'rssi': 0,
- 'ota': '',
-
-
- 'width': 416,
- 'height': 240,
- }
-
- _image = None
- _last_update = 0
- _task = None
- _task_lock = asyncio.Lock()
-
-
- async def render(wait_for=UPDATE_INTERVAL):
- global _image, _last_update
- if wait_for:
- logger.info(f'Waiting for {wait_for} seconds...')
- await asyncio.sleep(wait_for)
- logger.info('Rendering...')
- async with async_playwright() as p:
- try:
- browser = await p.chromium.launch()
- context = await browser.new_context()
- page = await context.new_page()
- await page.set_viewport_size({'width': _device['width'], 'height': _device['height']})
-
- html = open(TEMPLATE_PATH).read()
- await page.set_content(html, timeout=RENDER_TIMEOUT * 1000, wait_until='networkidle')
- png_bytes = await page.screenshot(type='png')
- img = Image.open(BytesIO(png_bytes))
- img = img.convert('1', dither=Image.Dither.FLOYDSTEINBERG)
- img_bytes = BytesIO()
- img.save(img_bytes, format='BMP')
- _image = img_bytes.getvalue()
- _last_update = int(time.time())
- except Exception as e:
- logger.error('Render failed:', exc_info=e)
- return
- logger.info('Render √')
-
-
- @web.middleware
- async def auth_middleware(req, handler):
- if req.method == 'POST':
- if not compare_digest(req.headers.get('Access-Token', ''), _device['key']):
- raise web.HTTPForbidden(reason='Invalid API key')
- return await handler(req)
-
-
- async def ping(req: web.Request) -> web.Response:
- global _task
- if _image:
- image_url = urljoin(f'{req.scheme}://{req.host}{req.path}', IMAGE_URL)
- else:
- image_url = ''
- res = {
- 'status': 200,
- 'message': 'OK',
- 'display': image_url,
- 'last_update': _last_update,
-
- 'key': '',
- 'ota': '',
- }
- res['next_update'] = UPDATE_INTERVAL if _image else RENDER_TIMEOUT
- async with _task_lock:
- if not _task or _task.done():
- _task = asyncio.create_task(render(max(res['next_update'] - RENDER_TIMEOUT, 0)))
- return web.json_response(res)
-
-
- def image(req: web.Request) -> web.Response:
- if not _image:
- raise web.HTTPNotFound()
-
- return web.Response(body=_image, content_type='image/bmp')
-
-
- def create_app() -> web.Application:
- app = web.Application(middlewares=[auth_middleware])
-
- app.add_routes([
- web.post('/ping', ping),
- web.get(f'/{IMAGE_URL}', image),
- ])
-
- return app
-
-
- if __name__ == '__main__':
- logging.basicConfig(level=logging.INFO)
- if not _device['key']:
- _device['key'] = uuid.uuid4().hex
- logger.info('API key generated: %s', _device['key'])
- logger.info('Starting server...')
- app = create_app()
- web.run_app(app, host=HOST, port=PORT)
-
为了方便测试,还用 Python + tkinter 实现了一个简单的模拟器,效果如下:
模拟器源码:
- import threading
- import time
- import tkinter as tk
- import uuid
- from datetime import datetime, timedelta
- from io import BytesIO
- from tkinter import ttk
-
- import requests
- from PIL import ImageTk, Image
-
-
- API_HOST = 'http://127.0.0.1:1988'
- API_KEY = ''
-
-
- def _get_mac():
- mac = uuid.getnode()
- return ':'.join(('%012X' % mac)[i:i + 2] for i in range(0, 12, 2))
-
- _device = {
- 'model': 'matters-370-opensource',
- 'firmware': '0.0.1',
-
- 'mac': _get_mac(),
- 'battery': 0.0,
- 'rssi': 0,
- 'ota': '',
-
- 'width': 416,
- 'height': 240,
-
- 'key': API_KEY,
- 'last_update': 0,
- }
-
- _req = {
- 'model': _device['model'],
- 'firmware': _device['firmware'],
- 'mac': _device['mac'],
- 'battery': 0.0,
- 'rssi': 0,
- }
-
-
- class Simulator:
- def __init__(self, master):
- self.master = master
- master.title(f'Simulator: {_device["model"]}')
- master.resizable(0, 0)
-
- self.frame = ttk.Frame(master, width=_device['width'], height=_device['height'])
- self.frame.pack_propagate(0)
- self.frame.pack()
-
- self.img_label = ttk.Label(self.frame)
- self.img_label.pack(expand=1)
-
- self.status = ttk.Label(master, text='Initializing...',
- relief=tk.SUNKEN, anchor=tk.W)
- self.status.pack(side=tk.BOTTOM, fill=tk.X)
-
- threading.Thread(target=self.upate, daemon=True).start()
-
- def upate(self):
- global _device
- while True:
- self.update_status('Connecting...')
- try:
- resp = requests.post(url=f'{API_HOST}/ping', json=_req, timeout=10, headers={
- 'Access-Token': _device['key'],
- })
- resp.raise_for_status()
- data = resp.json()
- self.update_status(data['message'])
-
- if data['key']:
- _device['key'] = data['key']
-
- if data['last_update'] != _device['last_update']:
- resp = requests.get(data['display'], timeout=10)
- resp.raise_for_status()
-
- img_io = BytesIO(resp.content)
- pil_img = Image.open(img_io).resize((_device['width'], _device['height']))
- self.tk_img = ImageTk.PhotoImage(pil_img)
- self.master.after(0, self.show_image)
- _device['last_update'] = data['last_update']
-
- next_update = datetime.now() + (timedelta(seconds=data['next_update']))
- self.update_status(f'Next update: {next_update}')
- time.sleep(data['next_update'])
- except Exception as e:
- self.master.after(0, lambda: self.show_error(str(e)))
- time.sleep(30)
-
- def show_image(self):
- self.img_label.configure(image=self.tk_img)
-
- def update_status(self, message):
- self.status.config(text=message)
- self.status.update_idletasks()
-
- def show_error(self, message):
- self.update_status(f'Error: {message}')
-
-
- if __name__ == '__main__':
- root = tk.Tk()
- root.geometry(f'{_device["width"]}x{_device["height"]+20}')
- app = Simulator(root)
- root.mainloop()
-
最后,把模拟器逻辑,移植到 MicroPython,最终效果:
先到这,过几天整理发布到 GitHub 上。
|