Code:https://download.eeworld.com.cn/detail/eew_dy9f48/631268
前言:
在这个项目中,我使用到了wiznet W5500-EVB-Pico和M5stack Cardputer两块开发板。其中W5500-EVB-Pico作为主控,Cardputer作为显示器使用。两者通信使用的是tcp进行的。Cardputer由于内置了电池,因此不需要连线就可以使用,可以作为一款很不错的无线移动设备使用。
入门任务:
开发环境的搭建没有任何门槛,直接去micropython官网下载好W5500-EVB-Pico的uf2固件,第一次上电W5500-EVB-Pico时会自动跳出一个U盘,将uf2文件拷贝进去就可以完成固件烧写。IDE使用Thonny就可以,可以直接去官网下载安装包安装即可。
电灯的任务比较简单,用以下代码创建一个led对象,然后在循环中点亮,延迟,熄灭,再延迟就可以。需要注意的是,由于活动的任务较多,而这些任务按顺序执行,会导致执行到后面的任务时内存被占满。因此这里我们需要使用到gc库,当一个任务完成时,及时清理掉不再需要的对象,并释放内存,以免内存碎片化。
led = machine.Pin(25, machine.Pin.OUT)
for i in range(10):
led.high()
time.sleep(0.5)
led.low()
time.sleep(0.5)
屏幕显示的任务较为麻烦。因为两块开发板之间通讯依靠了tcp,因此我必须在这个任务中就完成联网的工作。具体联网方式会在下一个任务中详述,这里我们先讲讲联网完成后要怎么做。
连接网络后,我们要把W5500-EVB-Pico作为一个socket client,向cardputer发送需要显示的内容,Cardputer作为socket server,接收到信息后将其展示在屏幕上。
W5500-EVB-Pico部分代码如下:
import socket
def send(addr,port,text):
s = socket.socket()
try:
s.connect(socket.getaddrinfo(addr, port)[0][-1])
except Exception as e:
print(e)
try:
s.send(bytes(text, 'utf8'))
except Exception as e:
print(e)
而Cardputer比较麻烦,因为Cardputer的屏幕使用了ESP32S3的33,34这两个引脚。而由于这两个引脚在PSRAM版本上会有占用,因此乐鑫官方的模块根本就没有引出他们,而导致市面上现成的固件,无论是micropython或是circuitpython,都缺少这两个引脚的定义。这也就意味着我们需要重新编译固件。
由于我电脑上有现成的circuitpython编译环境,因此这里我就使用了circuitpython。具体编译的细节由于和任务关系不大,这里就不提了。附件中我会放入编译好的固件。
代码部分主要分了两块,一块是socket server部分,负责接收从W5500-EVB-Pico传来的字符串;另一部分是显示部分,显示器对象我已经直接编译在了固件中,因此这里不需要再初始化,直接使用即可。为了显示中文,这里还需要再导入一个wenquanyi的pcf字体文件。
import board
import wifi
import socketpool
import select
import displayio
from adafruit_bitmap_font import bitmap_font
from adafruit_display_text import label, wrap_text_to_pixels
while not wifi.radio.ipv4_address:
pass
RES = (240, 135)
pool = socketpool.SocketPool(wifi.radio)
sock = pool.socket(pool.AF_INET, pool.SOCK_STREAM)
sock.settimeout(None)
sock.setblocking(True)
sock.bind(("0.0.0.0", 8888))
sock.listen(1)
display = board.DISPLAY
group = displayio.Group()
display.root_group = group
COLOR = {
"black": 0x000000,
"white": 0xFFFFFF,
"red": 0xFF0000,
"green": 0x00FF00,
"blue": 0x0000FF,
"cyan": 0x00FFFF,
"magenta": 0xFF00FF,
"yellow": 0xFFFF00,
}
bg_bitmap = displayio.Bitmap(RES[0], RES[1], 1)
bg_palette = displayio.Palette(1)
bg_palette[0] = COLOR["magenta"]
bg_tg = displayio.TileGrid(bg_bitmap, pixel_shader=bg_palette, x=0, y=0)
group.append(bg_tg)
FONT = bitmap_font.load_font("/wenquanyi_13px.pcf")
text_tg = label.Label(FONT, color=COLOR["yellow"])
text_tg.anchor_point = (0.5, 0.5)
text_tg.anchored_position = (RES[0] / 2, RES[1] / 2)
text_tg.background_color = None
text_tg.line_spacing = 1.1
text_tg.scale = 2
group.append(text_tg)
def handlereq():
data = None
r, w, err = select.select((sock,), (), (), 0)
if r:
for readable in r:
conn, addr = sock.accept()
with conn:
print("Connected by", addr)
data = bytearray(1024)
print("Receiving")
conn.settimeout(None)
numbytes = conn.recv_into(data)
data = data[:numbytes]
print(data)
return data
def loop():
data = handlereq()
if data is not None:
text = data.decode("utf-8")
text_list = wrap_text_to_pixels(text, RES[0] / text_tg.scale, font=FONT)
show = ""
for i in text_list:
show += i.replace("-", "") + "\n"
print(text_list)
text_tg.text = show
text_tg.text = str(wifi.radio.ipv4_address)
print("Ready")
while True:
try:
loop()
except Exception as e:
print(e)
当然,从首行代码就可以看出来,这个代码要想顺利运行,需要先确保cardputer已经连上网络。circuitpython联网非常简单,在settings.toml中写入以下内容,然后按rst重置即可。
CIRCUITPY_WIFI_SSID="your_ssid"
CIRCUITPY_WIFI_PASSWORD="your_password"
一切顺利的话,重置过后代码自动运行,我们就可以看到cardputer的ip地址显示在屏幕上,我们就可以用这个地址来给它发消息。
lcd = "192.168.199.139"
lcd_port = 8888
send(lcd, lcd_port, "入门任务")
基础任务一
联网的方法在官方的github中,可以直接拿出来用。这里我把它写在一个单独的文件中,在主程序中直接导入调用就可以,这样可以让代码更加清晰易读。
from machine import Pin,SPI
import network
import time
def w5500(config): # could be "dhcp" or ('192.168.x.xxx','255.255.255.0','192.168.x.1','8.8.8.8')
#spi init
spi=SPI(0,2_000_000, mosi=Pin(19),miso=Pin(16),sck=Pin(18))
nic = network.WIZNET5K(spi,Pin(17),Pin(20)) #spi,cs,reset pin
nic.active(True)#nicwork active
nic.ifconfig(config)
while not nic.isconnected():
time.sleep(1)
# print(nic.regs())#Print register information
#Print nicwork address information
print("IP Address:",nic.ifconfig()[0])
print("Subnic Mask:",nic.ifconfig()[1])
print("Gateway:",nic.ifconfig()[2])
print("DNS:",nic.ifconfig()[3])
return nic
from w5500 import w5500
from ping import ping
ip = "192.168.199.121"
gate = ip.split(".")
gate[-1] = "1"
gate = ".".join(gate)
nic = w5500((ip, "255.255.255.0", gate, "8.8.8.8"))
ping(gate, quiet=True)
最后的quiet ping是为了阻塞代码,确保已经联网后再执行后续代码。ping方法使用的是基于8266的uping修改而来,两者socket方法一致,修改的仅仅是ping的超时判断逻辑,以及增加使用方法,修改默认ping次数。同样我们把它单独放在一个文件中,导入使用。
ping完后,将汇总信息显示到LCD上:
# μPing (MicroPing) for MicroPython
# copyright (c) 2018 Shawwwn <shawwwn1@gmail.com>
# License: MIT
# Internet Checksum Algorithm
# Author: Olav Morken
# https://github.com/olavmrk/python-ping/blob/master/ping.py
# @data: bytes
def checksum(data):
if len(data) & 0x1: # Odd number of bytes
data += b'\0'
cs = 0
for pos in range(0, len(data), 2):
b1 = data[pos]
b2 = data[pos + 1]
cs += (b1 << 8) + b2
while cs >= 0x10000:
cs = (cs & 0xffff) + (cs >> 16)
cs = ~cs & 0xffff
return cs
def ping(host, count=4, timeout=30, interval=1, quiet=False, size=64):
import time
import select
import uctypes
import socket
import struct
import random
# prepare packet
assert size >= 16, "pkt size too small"
pkt = b'Q'*size
pkt_desc = {
"type": uctypes.UINT8 | 0,
"code": uctypes.UINT8 | 1,
"checksum": uctypes.UINT16 | 2,
"id": uctypes.UINT16 | 4,
"seq": uctypes.INT16 | 6,
"timestamp": uctypes.UINT64 | 8,
} # packet header descriptor
h = uctypes.struct(uctypes.addressof(pkt), pkt_desc, uctypes.BIG_ENDIAN)
h.type = 8 # ICMP_ECHO_REQUEST
h.code = 0
h.checksum = 0
h.id = random.getrandbits(16)
h.seq = 1
# init socket
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
sock.setblocking(0)
sock.settimeout(timeout/1000)
c = 0
addr = None
while c < 10:
try:
c += 1
time.sleep_ms(100)
addr = socket.getaddrinfo(host, 1)[0][-1][0] # ip address
break
except:
pass
if not addr:
print("DNS lookup failed")
return
sock.connect((addr, 1))
not quiet and print("PING %s (%s): %u data bytes" % (host, addr, len(pkt)))
seqs = list(range(1, count+1)) # [1,2,...,count]
c = 1
t = time.time()
tstart = time.time()
n_trans = 0
n_recv = 0
finish = False
while time.time() - tstart < timeout:
if time.time() - t >= interval and c<=count:
# send packet
h.checksum = 0
h.seq = c
h.timestamp = time.ticks_us()
h.checksum = checksum(pkt)
if sock.send(pkt) == size:
n_trans += 1
t = time.time() # reset timeout
else:
seqs.remove(c)
# recv packet
socks, _, _ = select.select([sock], [], [], 0)
if socks:
resp = socks[0].recv(4096)
resp_mv = memoryview(resp)
h2 = uctypes.struct(uctypes.addressof(resp_mv[20:]), pkt_desc, uctypes.BIG_ENDIAN)
# TODO: validate checksum (optional)
seq = h2.seq
if h2.type==0 and h2.id==h.id and (seq in seqs): # 0: ICMP_ECHO_REPLY
t_elasped = (time.ticks_us()-h2.timestamp) / 1000
ttl = struct.unpack('!B', resp_mv[8:9])[0] # time-to-live
n_recv += 1
c += 1
not quiet and print("%u bytes from %s: icmp_seq=%u, ttl=%u, time=%f ms" % (len(resp), addr, seq, ttl, t_elasped))
seqs.remove(seq)
if len(seqs) == 0:
finish = True
if finish:
break
# close
sock.close()
ret = (n_trans, n_recv)
not quiet and print("%u packets transmitted, %u packets received" % (n_trans, n_recv))
return (n_trans, n_recv)
send(lcd, lcd_port, "基础任务一")
time.sleep(2)
send(lcd, lcd_port, "Pinging")
(n_trans, n_recv) = ping("digikey.cn")
send(lcd, lcd_port, "Ping digikey.cn\n%u transmitted\n%u received" % (n_trans, n_recv))
time.sleep(5)
del led
del ping
gc.collect()
input("Input anything to continue.")
在代码最后我们通过input方式阻塞程序,这样我们有时间可以去完成电脑ping开发板并抓包的任务
打开wireshark,开始抓包,然后设定筛选内容为ip.src==192.168.199.121 or ip.dst==192.168.199.121,这里的ip是在上面联网功能中设定的。
接着打开cmd ping 192.168.199.121,看到下面输出,就说明成功ping通。
查看wireshark抓到的包,就可以看到具体数据包。协议是ICMP, INFO是 echo ping,说明一切正常。
基础任务二:
我们之前让TCP的显示屏正常,说明已经完成了TCP Client和TCP Server的实现。但TCP Server并不在W5500-EVB-Pico上。因此这里我们再在W5500-EVB-Pico上建立一个TCP 服务器,由电脑向该服务器发送文字,W5500-EVB-Pico收到后,再作为TCP Client,把该文字发送给cardputer进行显示。
send(lcd, lcd_port, "基础任务二")
addr = socket.getaddrinfo('0.0.0.0', 8080)[0][-1]
server = socket.socket()
server.bind(addr)
server.listen(1)
print('listening on', addr)
for i in range(3):
conn, addr = server.accept()
print("Connected by", addr)
data = conn.recv(1024)
conn.close()
send(lcd, lcd_port, data.decode())
del conn
del addr
del data
gc.collect()
server.close()
del server
gc.collect()
time.sleep(2)
在pc端我们使用windows工具TCPUDP网络调试助手来发送信息。
发送完该信息后,我们看到串口显示了服务器接收到内容,同时cardputer也显示了我们发送的文字。
再看看wireshark,也成功抓到了我们发送的包,信息的具体内容以NTF-8编码的形式显示。
如果我们发送的是英文信息,属于ASCII,就可以直接显示出来。
进阶任务:
ntp实现不再需要自己写库,micropython中内置的ntptime可以直接实现。
import ntptime
send(lcd, lcd_port, "进阶任务")
time.sleep(2)
for i in range(10):
try:
ntptime.settime()
t = time.localtime(time.time() + 8*3600)
output = "%s-%s-%s %s:%s:%s" % (t[0],t[1],t[2],t[3],t[4],t[5])
print("ntp time(BeiJing): " + output)
send(lcd, lcd_port, "北京时间:\n" + output)
break
except:
print("Can not get time!")
time.sleep_ms(1000)
time.sleep(5)
del ntptime
del t
del output
gc.collect()
运行过后,就可以在串口看到时间,屏幕也会打印出来。
终极任务:
这个任务跟ping一样,我们同样使用8266的uftp库进行一些修改。socket方法两者都一致,唯一的不同是我们需要外部传入W5500-EVB-PICO的ip地址,来组装PASV命令的返回信息。
#
# Small ftp server for ESP8266 ans ESP32 Micropython
#
# Based on the work of chrisgp - Christopher Popp and pfalcon - Paul Sokolovsky
#
# The server accepts passive mode only.
# It runs in foreground and quits, when it receives a quit command
# Start the server with:
#
# import ftp
#
# Copyright (c) 2016 Christopher Popp (initial ftp server framework)
# Copyright (c) 2016 Robert Hammelrath (putting the pieces together
# and a few extensions)
# Distributed under MIT License
#
import socket
import network
import uos
import gc
def send_list_data(path, dataclient, full):
try: # whether path is a directory name
for fname in sorted(uos.listdir(path), key=str.lower):
dataclient.sendall(make_description(path, fname, full))
except: # path may be a file name or pattern
pattern = path.split("/")[-1]
path = path[:-(len(pattern) + 1)]
if path == "":
path = "/"
for fname in sorted(uos.listdir(path), key=str.lower):
if fncmp(fname, pattern):
dataclient.sendall(make_description(path, fname, full))
def make_description(path, fname, full):
if full:
stat = uos.stat(get_absolute_path(path, fname))
file_permissions = ("drwxr-xr-x"
if (stat[0] & 0o170000 == 0o040000)
else "-rw-r--r--")
file_size = stat[6]
description = "{} 1 owner group {:>10} Jan 1 2000 {}\r\n".format(
file_permissions, file_size, fname)
else:
description = fname + "\r\n"
return description
def send_file_data(path, dataclient):
with open(path, "rb") as file:
chunk = file.read(512)
while len(chunk) > 0:
dataclient.sendall(chunk)
chunk = file.read(512)
def save_file_data(path, dataclient):
with open(path, "wb") as file:
chunk = dataclient.recv(512)
while len(chunk) > 0:
file.write(chunk)
chunk = dataclient.recv(512)
def get_absolute_path(cwd, payload):
# Just a few special cases "..", "." and ""
# If payload start's with /, set cwd to /
# and consider the remainder a relative path
if payload.startswith('/'):
cwd = "/"
for token in payload.split("/"):
if token == '..':
if cwd != '/':
cwd = '/'.join(cwd.split('/')[:-1])
if cwd == '':
cwd = '/'
elif token != '.' and token != '':
if cwd == '/':
cwd += token
else:
cwd = cwd + '/' + token
return cwd
# compare fname against pattern. Pattern may contain
# wildcards ? and *.
def fncmp(fname, pattern):
pi = 0
si = 0
while pi < len(pattern) and si < len(fname):
if (fname[si] == pattern[pi]) or (pattern[pi] == '?'):
si += 1
pi += 1
else:
if pattern[pi] == '*': # recurse
if (pi + 1) == len(pattern):
return True
while si < len(fname):
if fncmp(fname[si:], pattern[pi+1:]):
return True
else:
si += 1
return False
else:
return False
if pi == len(pattern.rstrip("*")) and si == len(fname):
return True
else:
return False
def ftpserver(net, port=21, timeout=None):
DATA_PORT = 13333
ftpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ftpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ftpsocket.bind(socket.getaddrinfo("0.0.0.0", port)[0][4])
datasocket.bind(socket.getaddrinfo("0.0.0.0", DATA_PORT)[0][4])
ftpsocket.listen(1)
ftpsocket.settimeout(timeout)
datasocket.listen(1)
datasocket.settimeout(None)
msg_250_OK = '250 OK\r\n'
msg_550_fail = '550 Failed\r\n'
addr = net.ifconfig()[0]
print("FTP Server started on ", addr)
try:
dataclient = None
fromname = None
do_run = True
while do_run:
cl, remote_addr = ftpsocket.accept()
cl.settimeout(300)
cwd = '/'
try:
# print("FTP connection from:", remote_addr)
cl.sendall("220 Hello, this is Micropython.\r\n")
while True:
gc.collect()
data = cl.readline().decode("utf-8").rstrip("\r\n")
if len(data) <= 0:
print("Client disappeared")
do_run = False
break
command = data.split(" ")[0].upper()
payload = data[len(command):].lstrip()
path = get_absolute_path(cwd, payload)
print("Command={}, Payload={}".format(command, payload))
if command == "USER":
cl.sendall("230 Logged in.\r\n")
elif command == "SYST":
cl.sendall("215 UNIX Type: L8\r\n")
elif command == "NOOP":
cl.sendall("200 OK\r\n")
elif command == "FEAT":
cl.sendall("211 no-features\r\n")
elif command == "PWD" or command == "XPWD":
cl.sendall('257 "{}"\r\n'.format(cwd))
elif command == "CWD" or command == "XCWD":
try:
files = uos.listdir(path)
cwd = path
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "CDUP":
cwd = get_absolute_path(cwd, "..")
cl.sendall(msg_250_OK)
elif command == "TYPE":
# probably should switch between binary and not
cl.sendall('200 Transfer mode set\r\n')
elif command == "SIZE":
try:
size = uos.stat(path)[6]
cl.sendall('213 {}\r\n'.format(size))
except:
cl.sendall(msg_550_fail)
elif command == "QUIT":
cl.sendall('221 Bye.\r\n')
do_run = False
break
elif command == "PASV":
cl.sendall('227 Entering Passive Mode ({},{},{}).\r\n'.
format(addr.replace('.', ','), DATA_PORT >> 8,
DATA_PORT % 256))
dataclient, data_addr = datasocket.accept()
print("FTP Data connection from:", data_addr)
DATA_PORT = 13333
active = False
elif command == "PORT":
items = payload.split(",")
if len(items) >= 6:
data_addr = '.'.join(items[:4])
# replace by command session addr
if data_addr == "127.0.1.1":
data_addr = remote_addr
DATA_PORT = int(items[4]) * 256 + int(items[5])
dataclient = socket.socket(socket.AF_INET,
socket.SOCK_STREAM)
dataclient.settimeout(10)
dataclient.connect((data_addr, DATA_PORT))
print("FTP Data connection with:", data_addr)
cl.sendall('200 OK\r\n')
active = True
else:
cl.sendall('504 Fail\r\n')
elif command == "LIST" or command == "NLST":
if not payload.startswith("-"):
place = path
else:
place = cwd
try:
cl.sendall("150 Here comes the directory listing.\r\n")
send_list_data(place, dataclient,
command == "LIST" or payload == "-l")
cl.sendall("226 Listed.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "RETR":
try:
cl.sendall("150 Opening data connection.\r\n")
send_file_data(path, dataclient)
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "STOR":
try:
cl.sendall("150 Ok to send data.\r\n")
save_file_data(path, dataclient)
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "DELE":
try:
uos.remove(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "RMD" or command == "XRMD":
try:
uos.rmdir(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "MKD" or command == "XMKD":
try:
uos.mkdir(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "RNFR":
fromname = path
cl.sendall("350 Rename from\r\n")
elif command == "RNTO":
if fromname is not None:
try:
uos.rename(fromname, path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
else:
cl.sendall(msg_550_fail)
fromname = None
elif command == "MDTM":
try:
tm=localtime(uos.stat(path)[8])
cl.sendall('213 {:04d}{:02d}{:02d}{:02d}{:02d}{:02d}\r\n'.format(*tm[0:6]))
except:
cl.sendall('550 Fail\r\n')
elif command == "STAT":
if payload == "":
cl.sendall("211-Connected to ({})\r\n"
" Data address ({})\r\n"
"211 TYPE: Binary STRU: File MODE:"
" Stream\r\n".format(
remote_addr[0], addr))
else:
cl.sendall("213-Directory listing:\r\n")
send_list_data(path, cl, True)
cl.sendall("213 Done.\r\n")
else:
cl.sendall("502 Unsupported command.\r\n")
print("Unsupported command {} with payload {}".format(
command, payload))
except Exception as err:
print(err)
finally:
cl.close()
cl = None
except Exception as e:
print(e)
finally:
datasocket.close()
ftpsocket.close()
if dataclient is not None:
dataclient.close()
# ftpserver()
主程序导入并运行后,我们就可用window的文件资源管理器来上传下载文件,只需要输入地址ftp://192.168.199.121
活动心得体会
这次的任务由于发现了一块以前没有听说过的cardputer,实现起来变得有意思了很多。对于主控板wiznet W5500-EVB-Pico来说,还是有一些美中不足的地方。原本我打算全部在circuitpython上实现功能的,但在使用的过程中发现,虽然官方有支持circuitpython,但里面的socket方法有很多并不全,且circuitpython本身并不支持uastype,导致在移植micropython的ping方法时发现不太可行,由于时间所限,不得已才按论坛中大部分人的做法使用了micropython。未来有时间,我想要学习一下网络底层的用法,看看是否能补全官方库,让这块开发板在circuitpython环境下也能够顺畅跑起来。
|