【得捷Follow me第4期】W5500-EVB-Pico的使用 - 作品提交
[复制链接]
第一部分:任务视频介绍
第二部分:任务/项目总结报告
ps:不知道为什么,现在的md编辑器 TOC/图片缩放/代码块 都不行了:下文为EEWorld自动转码后的内容为方便查看:请下载WORD/PDF查看
入门任务
开发环境搭建,BLINK,驱动液晶显示器进行显示(没有则串口HelloWorld)
烧录 Micropython:
- 前往 Micropython 下载Wiznet W5500-EVB-Pico的专属固件:MicroPython - Python for microcontrollers
- 下载好W5500_EVB_PICO-20240222-v1.22.2.uf2后,进入烧录模式,可以直接拖入 uf2 文件进行烧录
- 烧录完成重启
BLINK:
from machine import Pin
import time
led_pin = Pin(25, Pin.OUT)
while True:
led_pin.value(1)
time.sleep(1)
led_pin.value(0)
time.sleep(1)
驱动液晶显示器:
我使用的是合宙的 1.8' 128x160 RGB TFT_LCD,连接如下:
GPIO6 -------------- SCL
GPIO7 -------------- SDA
GPIO8 -------------- RST
GPIO9 -------------- CS
GPIO10 -------------- DC
前往下载st77xx.py、ufont.py和unifont-14-12917-16.v3.bmf,并上传到根目录,运行:
from machine import SPI, Pin
import ufont
from st77xx import ST7735
spi = SPI(0, 30000000, sck=Pin(6), mosi=Pin(7))
display = ST7735(spi=spi, cs=9, dc=10, rst=8, bl=None, width=160, height=128, rotate=1)
font = ufont.BMFont("unifont-14-12917-16.v3.bmf")
font.text(display, "EEWorld &\nDigiKey Follow Me 4\nICS \n你好", 0, 0, show=True)
基础任务
■ 完成主控板W5500初始化(静态IP配置),并能使用局域网电脑ping通,同时W5500可以ping通互联网站点;通过抓包软件(Wireshark、Sniffer等)抓取本地PC的ping报文,展示并分析。
我们可以通过DCHP直接获取 IP 地址
import network
d = network.WIZNET5K()
d.active(True)
d.ifconfig("dhcp")
# 验证
if d.isconnected():
print(d.ifconfig())
# ('192.168.199.188', '255.255.255.0', '192.168.199.1', '192.168.199.1')
当然,也可以指定IP
import network
d = network.WIZNET5K()
d.active(True)
d.ifconfig(('192.168.199.20','255.255.255.0','192.168.199.1','223.5.5.5'))
# 验证
if d.isconnected():
print(d.ifconfig())
这里我们以第一次PING进行解析,我们来对比一下:
第一个数据包:
0000 02 d7 27 7c 62 9a a0 29 42 99 88 60 08 00 45 00
0010 00 3c b7 0f 00 00 80 01 00 00 c0 a8 c7 d0 c0 a8
0020 c7 14 08 00 4c 64 00 01 00 f7 61 62 63 64 65 66
0030 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
0040 77 61 62 63 64 65 66 67 68 69
- 以太网头部(Ethernet Header):
- 目的MAC地址:02:d7:27:7c:62:9a
- 源MAC地址:a0:29:42:99:88:60
- 以太网类型:0x0800(表示IP数据包)
- IP头部(IP Header):
- 版本:4 (IPv4)
- 头部长度:20 bytes
- 生存时间(TTL):128
- 协议:ICMP (0x01)
- 源IP地址:192.168.199.208
- 目的IP地址:192.168.199.20
- ICMP数据部分:
- 类型:8 (Echo (ping) request)
- 代码:0
- 标识符(Identifier):0x0001
- 序列号(Sequence Number):0x00f7
- 数据:abcdefghijklmnopqrstuvwxyz
第二个数据包:
0000 a0 29 42 99 88 60 02 d7 27 7c 62 9a 08 00 45 00
0010 00 3c b7 0f 00 00 ff 01 f4 7a c0 a8 c7 14 c0 a8
0020 c7 d0 00 00 54 64 00 01 00 f7 61 62 63 64 65 66
0030 67 68 69 6a 6b 6c 6d 6e 6f 70 71 72 73 74 75 76
0040 77 61 62 63 64 65 66 67 68 69
- 以太网头部(Ethernet Header):
- 源MAC地址:a0:29:42:99:88:60
- 目的MAC地址:02:d7:27:7c:62:9a
- 以太网类型:0x0800(表示IP数据包)
- IP头部(IP Header):
- 生存时间(TTL):255
- 源IP地址:192.168.199.20
- 目的IP地址:192.168.199.208
- ICMP数据部分:
- 类型:8 (Echo (ping) request)
- 代码:0
- 标识符(Identifier):0x0001
- 序列号(Sequence Number):0x00f7
- 数据:abcdefghijklmnopqrstuvwxyz
■ 主控板建立TCPIP或UDP服务器,局域网PC使用TCPIP或UDP客户端进行连接并发送数据,主控板接收到数据后,送液晶屏显示(没有则通过串口打印显示);通过抓包软件抓取交互报文,展示并分析。(TCP和UDP二选一,或者全都操作)
使用socket模块可以进行socket通讯,下面就创建一个TCP的服务端:
import socket
# 设置服务器的IP地址和端口号
SERVER_IP = '0.0.0.0' # 监听所有网络接口
SERVER_PORT = 12345
# 创建一个TCP套接字对象
server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 绑定IP地址和端口号
server_sock.bind((SERVER_IP, SERVER_PORT))
# 开始监听连接
server_sock.listen(1)
print("Server listening on", SERVER_IP, "port", SERVER_PORT)
# 接受连接并处理数据
client_sock, client_addr = server_sock.accept()
print("Client connected from", client_addr)
while True:
# 接收客户端的数据
data = client_sock.recv(1024)
if data:
print("Received:", data.decode())
# 发送响应数据
response = "Hello ICS"
client_sock.send(response.encode())
■ 从NTP服务器(注意数据交互格式的解析)同步时间,获取时间送显示屏(串口)显示。
在开发板上新建一个文件,命名为ntpics.py,内容如下:
from machine import SPI, Pin,Timer
import network
import ufont
from st77xx import ST7735
import ntplc
import time
spi = SPI(0, 30000000, sck=Pin(6), mosi=Pin(7))
display = ST7735(spi=spi, cs=9, dc=10, rst=8, bl=None, width=160, height=128, rotate=1)
font = ufont.BMFont("unifont-14-12917-16.v3.bmf")
font.text(display, "等待授时完成", 0, 0, show=True)
d = network.WIZNET5K()
d.active(True)
d.ifconfig("dhcp")
# 验证
if d.isconnected():
print(d.ifconfig())
ntplc.settime()
def timer_callback(timer):
# 获取当前时间戳(自1970年1月1日以来的秒数)
current_time = time.time()
# 将时间戳转换为本地时间
local_time = time.localtime(current_time + 28800)
# 打印本地时间的年、月、日、时、分、秒
font.text(display, "{}-{}-{}\n{}:{}:{}".format(local_time[0], local_time[1], local_time[2], local_time[3], local_time[4], local_time[5]), 48, 32, font_size=16,clear=True)
# 创建一个定时器对象
tim = Timer(-1)
# 每隔1秒触发一次定时器回调函数
tim.init(period=1000, mode=Timer.PERIODIC, callback=timer_callback)
终极任务
■ 使用外部存储器,组建简易FTP文件服务器,并能正常上传下载文件。
# 参考:https://github.com/hosseinghaheri/MicroPython-FTP-Server
import socket
import network
import uos
import gc
import sys
import errno
from time import sleep_ms, localtime
from micropython import alloc_emergency_exception_buf
# constant definitions
_CHUNK_SIZE = const(1024)
_SO_REGISTER_HANDLER = const(20)
_COMMAND_TIMEOUT = const(300)
_DATA_TIMEOUT = const(100)
_DATA_PORT = const(13333)
# Global variables
ftpsockets = []
datasocket = None
client_list = []
verbose_l = 0
client_busy = False
# Interfaces: (IP-Address (string), IP-Address (integer), Netmask (integer))
_month_name = ("", "Jan", "Feb", "Mar", "Apr", "May", "Jun",
"Jul", "Aug", "Sep", "Oct", "Nov", "Dec")
class FTP_client:
def __init__(self, ftpsocket, local_addr):
self.command_client, self.remote_addr = ftpsocket.accept()
self.remote_addr = self.remote_addr[0]
self.command_client.settimeout(_COMMAND_TIMEOUT)
log_msg(1, "FTP Command connection from:", self.remote_addr)
self.command_client.setsockopt(socket.SOL_SOCKET,
_SO_REGISTER_HANDLER,
self.exec_ftp_command)
self.command_client.sendall("220 Hello, this is the {}.\r\n".format(sys.platform))
self.cwd = '/'
self.fromname = None
# self.logged_in = False
self.act_data_addr = self.remote_addr
self.DATA_PORT = 20
self.active = True
self.pasv_data_addr = local_addr
def send_list_data(self, path, data_client, full):
try:
for fname in uos.listdir(path):
data_client.sendall(self.make_description(path, fname, full))
except Exception as e: # path may be a file name or pattern
path, pattern = self.split_path(path)
try:
for fname in uos.listdir(path):
if self.fncmp(fname, pattern):
data_client.sendall(
self.make_description(path, fname, full))
except:
pass
def make_description(self, path, fname, full):
global _month_name
if full:
stat = uos.stat(self.get_absolute_path(path, fname))
file_permissions = ("drwxr-xr-x"
if (stat[0] & 0o170000 == 0o040000)
else "-rw-r--r--")
file_size = stat[6]
tm = stat[7] & 0xffffffff
tm = localtime(tm if tm < 0x80000000 else tm - 0x100000000)
if tm[0] != localtime()[0]:
description = "{} 1 owner group {:>10} {} {:2} {:>5} {}\r\n".\
format(file_permissions, file_size,
_month_name[tm[1]], tm[2], tm[0], fname)
else:
description = "{} 1 owner group {:>10} {} {:2} {:02}:{:02} {}\r\n".\
format(file_permissions, file_size,
_month_name[tm[1]], tm[2], tm[3], tm[4], fname)
else:
description = fname + "\r\n"
return description
def send_file_data(self, path, data_client):
buffer = bytearray(_CHUNK_SIZE)
mv = memoryview(buffer)
with open(path, "rb") as file:
bytes_read = file.readinto(buffer)
while bytes_read > 0:
data_client.write(mv[0:bytes_read])
bytes_read = file.readinto(buffer)
data_client.close()
def save_file_data(self, path, data_client, mode):
buffer = bytearray(_CHUNK_SIZE)
mv = memoryview(buffer)
with open(path, mode) as file:
bytes_read = data_client.readinto(buffer)
while bytes_read > 0:
file.write(mv[0:bytes_read])
bytes_read = data_client.readinto(buffer)
data_client.close()
def get_absolute_path(self, cwd, payload):
# Just a few special cases "..", "." and ""
# If payload start's with /, set cwd to /
# and consider the remainder a relative path
if payload.startswith('/'):
cwd = "/"
for token in payload.split("/"):
if token == '..':
cwd = self.split_path(cwd)[0]
elif token != '.' and token != '':
if cwd == '/':
cwd += token
else:
cwd = cwd + '/' + token
return cwd
def split_path(self, path): # instead of path.rpartition('/')
tail = path.split('/')[-1]
head = path[:-(len(tail) + 1)]
return ('/' if head == '' else head, tail)
# compare fname against pattern. Pattern may contain
# the wildcards ? and *.
def fncmp(self, fname, pattern):
pi = 0
si = 0
while pi < len(pattern) and si < len(fname):
if (fname[si] == pattern[pi]) or (pattern[pi] == '?'):
si += 1
pi += 1
else:
if pattern[pi] == '*': # recurse
if pi == len(pattern.rstrip("*?")): # only wildcards left
return True
while si < len(fname):
if self.fncmp(fname[si:], pattern[pi + 1:]):
return True
else:
si += 1
return False
else:
return False
if pi == len(pattern.rstrip("*")) and si == len(fname):
return True
else:
return False
def open_dataclient(self):
if self.active: # active mode
data_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
data_client.settimeout(_DATA_TIMEOUT)
data_client.connect((self.act_data_addr, self.DATA_PORT))
log_msg(1, "FTP Data connection with:", self.act_data_addr)
else: # passive mode
data_client, data_addr = datasocket.accept()
log_msg(1, "FTP Data connection with:", data_addr[0])
return data_client
def exec_ftp_command(self, cl):
global datasocket
global client_busy
global my_ip_addr
try:
gc.collect()
data = cl.readline().decode("utf-8").rstrip("\r\n")
if len(data) <= 0:
# No data, close
# This part is NOT CLEAN; there is still a chance that a
# closing data connection will be signalled as closing
# command connection
log_msg(1, "*** No data, assume QUIT")
close_client(cl)
return
if client_busy: # check if another client is busy
cl.sendall("400 Device busy.\r\n") # tell so the remote client
return # and quit
client_busy = True # now it's my turn
# check for log-in state may done here, like
# if self.logged_in == False and not command in\
# ("USER", "PASS", "QUIT"):
# cl.sendall("530 Not logged in.\r\n")
# return
command = data.split()[0].upper()
payload = data[len(command):].lstrip() # partition is missing
path = self.get_absolute_path(self.cwd, payload)
log_msg(1, "Command={}, Payload={}".format(command, payload))
if command == "USER":
# self.logged_in = True
cl.sendall("230 Logged in.\r\n")
# If you want to see a password,return
# "331 Need password.\r\n" instead
# If you want to reject an user, return
# "530 Not logged in.\r\n"
elif command == "PASS":
# you may check here for a valid password and return
# "530 Not logged in.\r\n" in case it's wrong
# self.logged_in = True
cl.sendall("230 Logged in.\r\n")
elif command == "SYST":
cl.sendall("215 UNIX Type: L8\r\n")
elif command in ("TYPE", "NOOP", "ABOR"): # just accept & ignore
cl.sendall('200 OK\r\n')
elif command == "QUIT":
cl.sendall('221 Bye.\r\n')
close_client(cl)
elif command == "PWD" or command == "XPWD":
cl.sendall('257 "{}"\r\n'.format(self.cwd))
elif command == "CWD" or command == "XCWD":
try:
if (uos.stat(path)[0] & 0o170000) == 0o040000:
self.cwd = path
cl.sendall('250 OK\r\n')
else:
cl.sendall('550 Fail\r\n')
except:
cl.sendall('550 Fail\r\n')
elif command == "PASV":
cl.sendall('227 Entering Passive Mode ({},{},{}).\r\n'.format(
self.pasv_data_addr.replace('.', ','),
_DATA_PORT >> 8, _DATA_PORT % 256))
self.active = False
elif command == "PORT":
items = payload.split(",")
if len(items) >= 6:
self.act_data_addr = '.'.join(items[:4])
if self.act_data_addr == "127.0.1.1":
# replace by command session addr
self.act_data_addr = self.remote_addr
self.DATA_PORT = int(items[4]) * 256 + int(items[5])
cl.sendall('200 OK\r\n')
self.active = True
else:
cl.sendall('504 Fail\r\n')
elif command == "LIST" or command == "NLST":
if payload.startswith("-"):
option = payload.split()[0].lower()
path = self.get_absolute_path(
self.cwd, payload[len(option):].lstrip())
else:
option = ""
try:
data_client = self.open_dataclient()
cl.sendall("150 Directory listing:\r\n")
self.send_list_data(path, data_client,
command == "LIST" or 'l' in option)
cl.sendall("226 Done.\r\n")
data_client.close()
except:
cl.sendall('550 Fail\r\n')
if data_client is not None:
data_client.close()
elif command == "RETR":
try:
data_client = self.open_dataclient()
cl.sendall("150 Opened data connection.\r\n")
self.send_file_data(path, data_client)
# if the next statement is reached,
# the data_client was closed.
data_client = None
cl.sendall("226 Done.\r\n")
except:
cl.sendall('550 Fail\r\n')
if data_client is not None:
data_client.close()
elif command == "STOR" or command == "APPE":
try:
data_client = self.open_dataclient()
cl.sendall("150 Opened data connection.\r\n")
self.save_file_data(path, data_client,
"wb" if command == "STOR" else "ab")
# if the next statement is reached,
# the data_client was closed.
data_client = None
cl.sendall("226 Done.\r\n")
except:
cl.sendall('550 Fail\r\n')
if data_client is not None:
data_client.close()
elif command == "SIZE":
try:
cl.sendall('213 {}\r\n'.format(uos.stat(path)[6]))
except:
cl.sendall('550 Fail\r\n')
elif command == "MDTM":
try:
tm=localtime(uos.stat(path)[8])
cl.sendall('213 {:04d}{:02d}{:02d}{:02d}{:02d}{:02d}\r\n'.format(*tm[0:6]))
except:
cl.sendall('550 Fail\r\n')
elif command == "STAT":
if payload == "":
cl.sendall("211-Connected to ({})\r\n"
" Data address ({})\r\n"
" TYPE: Binary STRU: File MODE: Stream\r\n"
" Session timeout {}\r\n"
"211 Client count is {}\r\n".format(
self.remote_addr, self.pasv_data_addr,
_COMMAND_TIMEOUT, len(client_list)))
else:
cl.sendall("213-Directory listing:\r\n")
self.send_list_data(path, cl, True)
cl.sendall("213 Done.\r\n")
elif command == "DELE":
try:
uos.remove(path)
cl.sendall('250 OK\r\n')
except:
cl.sendall('550 Fail\r\n')
elif command == "RNFR":
try:
# just test if the name exists, exception if not
uos.stat(path)
self.fromname = path
cl.sendall("350 Rename from\r\n")
except:
cl.sendall('550 Fail\r\n')
elif command == "RNTO":
try:
uos.rename(self.fromname, path)
cl.sendall('250 OK\r\n')
except:
cl.sendall('550 Fail\r\n')
self.fromname = None
elif command == "CDUP" or command == "XCUP":
self.cwd = self.get_absolute_path(self.cwd, "..")
cl.sendall('250 OK\r\n')
elif command == "RMD" or command == "XRMD":
try:
uos.rmdir(path)
cl.sendall('250 OK\r\n')
except:
cl.sendall('550 Fail\r\n')
elif command == "MKD" or command == "XMKD":
try:
uos.mkdir(path)
cl.sendall('250 OK\r\n')
except:
cl.sendall('550 Fail\r\n')
elif command == "SITE":
try:
exec(payload.replace('\0','\n'))
cl.sendall('250 OK\r\n')
except:
cl.sendall('550 Fail\r\n')
else:
cl.sendall("502 Unsupported command.\r\n")
# log_msg(2,
# "Unsupported command {} with payload {}".format(command,
# payload))
except OSError as err:
if verbose_l > 0:
log_msg(1, "Exception in exec_ftp_command:")
sys.print_exception(err)
if err.errno in (errno.ECONNABORTED, errno.ENOTCONN):
close_client(cl)
# handle unexpected errors
except Exception as err:
log_msg(1, "Exception in exec_ftp_command: {}".format(err))
# tidy up before leaving
client_busy = False
def log_msg(level, *args):
global verbose_l
if verbose_l >= level:
print(*args)
# close client and remove it from the list
def close_client(cl):
cl.setsockopt(socket.SOL_SOCKET, _SO_REGISTER_HANDLER, None)
cl.close()
for i, client in enumerate(client_list):
if client.command_client == cl:
del client_list[i]
break
def accept_ftp_connect(ftpsocket, local_addr):
# Accept new calls for the server
try:
client_list.append(FTP_client(ftpsocket, local_addr))
except:
log_msg(1, "Attempt to connect failed")
# try at least to reject
try:
temp_client, temp_addr = ftpsocket.accept()
temp_client.close()
except:
pass
def num_ip(ip):
items = ip.split(".")
return (int(items[0]) << 24 | int(items[1]) << 16 |
int(items[2]) << 8 | int(items[3]))
def stop():
global ftpsockets, datasocket
global client_list
global client_busy
for client in client_list:
client.command_client.setsockopt(socket.SOL_SOCKET,
_SO_REGISTER_HANDLER, None)
client.command_client.close()
del client_list
client_list = []
client_busy = False
for sock in ftpsockets:
sock.setsockopt(socket.SOL_SOCKET, _SO_REGISTER_HANDLER, None)
sock.close()
ftpsockets = []
if datasocket is not None:
datasocket.close()
datasocket = None
# start listening for ftp connections on port 21
def start(port=21, verbose=0, splash=True):
global ftpsockets, datasocket
global verbose_l
global client_list
global client_busy
alloc_emergency_exception_buf(100)
verbose_l = verbose
client_list = []
client_busy = False
d = network.WIZNET5K()
d.active(True)
d.ifconfig("dhcp")
ifconfig = d.ifconfig()
addr = socket.getaddrinfo(ifconfig[0], port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(addr[0][4])
sock.listen(1)#
sock.setsockopt(socket.SOL_SOCKET,
_SO_REGISTER_HANDLER,
lambda s : accept_ftp_connect(s, ifconfig[0]))
ftpsockets.append(sock)
if splash:
print("FTP server started on {}:{}".format(ifconfig[0], port))
datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
datasocket.bind(('0.0.0.0', _DATA_PORT))
datasocket.listen(1)
datasocket.settimeout(10)
def restart(port=21, verbose=0, splash=True):
stop()
sleep_ms(200)
start(port, verbose, splash)
start(splash=True)
第三部分:可编译下载的代码
下载详情请查看:download.eeworld.com.cn/detail/ICS/631418
参与这次活动不仅提高了我的电子技术水平,也激发了我的创造力和热情,让我感受到了编程的乐趣和创造的成就。
非常感谢得捷和EEWorld为这次活动提供了优质的物料、平台、资源和支持,让我能够顺利地完成我的作品,并有机会获得丰厚的奖励和认可。
希能够继续举办更多类似的活动,让更多的电子爱好者能够参与进来,一起学习实用的电子技术知识,一起积攒DIY经验,一起变成更好的自己!
|
参考
补充内容 (2024-3-9 16:44): 标题写错了:应该是第四期的!!!!
|