补充下终极任务二今天研究情况。
首先非常感谢乔帮主群里的指点,最后发现我无法驱动成功sd卡的原因有三个:
- sdcard类有问题,之前是从micropython下载的(也是坛友推荐的),一直没怀疑它,我用了乔帮主分享的立马就能初始化成功;
- sd卡兼容性有问题,之前用的2G小容量的,在arduino下访问没问题,但是用micropython就不行了,然后换了张新的32G的就没事;
- 手头没杜邦线,用的4Pin端子连接线,接头上了点焊锡后查到排母里,貌似有点接触不良;
这里我把乔帮主分享的文件传上来,也给大家做参考。(大家根据自己接线方式修改Pin定义)
一、SD卡驱动
软件代码
1、测试SD卡
# Filename: tfcard_test.py
import uos # os/uos
import machine
import sdcard
from machine import SPI, Pin
spi = SPI(1, sck=Pin(10), mosi=Pin(11), miso=Pin(12))
cs = Pin(13)
sd = sdcard.SDCard(spi,cs)
# 挂载文件到sd
uos.mount(sd,"/sd")
# 列出MicroSD/TF卡中的目录文件
print(uos.listdir('/sd'))
# 写文件测试
f = open('/sd/test.txt','w',encoding='utf-8')
f.write('MicroSD/TF存储卡访问测试!')
f.close()
# 读文件测试
f = open('/sd/test.txt','r')
print(f.read())
f.close()
2、运行结果:
二、FTP服务器搭建
1、软件代码
端口及参数定义
import gc
import uos
import time
import socket
import network
from time import localtime
from machine import Pin, SPI
from micropython import const
/'初始化引脚'/
_LED_PIN = const(25) # LED 指示灯引脚
_SPI_SPEED = const(2_000_000) # SPI 速率
_MOSI_PIN = const(19) # SPI MOSI 引脚
_MISO_PIN = const(16) # SPI MISO 引脚
_SCK_PIN = const(18) # SPI SCK 引脚
_CS_PIN = const(17) # SPI CS 引脚
_RST_PIN = const(20) # SPI RESET 引脚
FTP_ROOT_PATH = const("/") # FTP 根目录
month_name = ["","Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec",
]
# SPI 定义
spi = SPI(0, _SPI_SPEED, mosi=Pin(_MOSI_PIN), miso=Pin(_MISO_PIN), sck=Pin(_SCK_PIN))
nic = None
W5500初始化
""" W5500 初始化 """
def w5x00_init():
global nic
# 网口初始化
nic = network.WIZNET5K(spi, Pin(_CS_PIN), Pin(_RST_PIN)) # spi,cs,reset pin
nic.active(True)
# 配置网络
# If you use the Dynamic IP(DHCP), you must use the "nic.ifconfig('dhcp')".
nic.ifconfig("dhcp")
# If you use the Static IP, you must use the "nic.ifconfig("IP","subnet","Gateway","DNS")".
# nic.ifconfig(('192.168.0.106','255.255.255.0','192.168.0.1','114.114.114.114'))
while not nic.isconnected():
time.sleep(1)
print(nic.regs())
print("IP地址: %s" % nic.ifconfig()[0])
print("子网掩码: %s" % nic.ifconfig()[1])
print("网关: %s" % nic.ifconfig()[2])
print("DNS: %s" % nic.ifconfig()[3])
文件列表请求响应函数
'/ 响应文件列表请求 /'
def send_list_data(path, dataclient, full):
try: # whether path is a directory name
for fname in uos.listdir(path):
dataclient.sendall(make_description(path, fname, full))
except: # path may be a file name or pattern
pattern = path.split("/")[-1]
path = path[: -(len(pattern) + 1)]
if path == "":
path = "/"
for fname in uos.listdir(path):
if fncmp(fname, pattern) == True:
dataclient.sendall(make_description(path, fname, full))
目录详情
""" 列出目录详情 """
def make_description(path, fname, full):
if full:
stat = uos.stat(get_absolute_path(path, fname))
file_permissions = (
"drwxr-xr-x" if (stat[0] & 0o170000 == 0o040000) else "-rw-r--r--"
)
file_size = stat[6]
tm = localtime(stat[7])
if tm[0] != localtime()[0]:
description = "{} 1 owner group {:>10} {} {:2} {:>5} {}\r\n".format(
file_permissions, file_size, month_name[tm[1]], tm[2], tm[0], fname
)
else:
description = (
"{} 1 owner group {:>10} {} {:2} {:02}:{:02} {}\r\n".format(
file_permissions,
file_size,
month_name[tm[1]],
tm[2],
tm[3],
tm[4],
fname,
)
)
else:
description = fname + "\r\n"
return description
文件发送函数
""" 发送文件数据 """
def send_file_data(path, dataclient):
try:
with open(path, "rb") as file:
chunk = file.read(512)
print("chunk 0: ", len(chunk))
while len(chunk) > 0:
print("chunk: ", len(chunk))
dataclient.sendall(chunk)
chunk = file.read(512)
except Exception as err:
print("error: ", err.args, err.value, err.errno)
文件数据保存
""" 保存文件上传数据 """
def save_file_data(path, dataclient, mode):
with open(path, mode) as file:
chunk = dataclient.read(512)
while len(chunk) > 0:
file.write(chunk)
chunk = dataclient.read(512)
文件路径获取函数
""" 获取文件绝对路径 """
def get_absolute_path(cwd, payload):
# Just a few special cases "..", "." and ""
# If payload start's with /, set cwd to /
# and consider the remainder a relative path
if payload.startswith("/"):
cwd = "/"
for token in payload.split("/"):
if token == "..":
if cwd != "/":
cwd = "/".join(cwd.split("/")[:-1])
if cwd == "":
cwd = "/"
elif token != "." and token != "":
if cwd == "/":
cwd += token
else:
cwd = cwd + "/" + token
return cwd
文件名比较函数
""" 文件名比较 """
def fncmp(fname, pattern):
pi = 0
si = 0
while pi < len(pattern) and si < len(fname):
if (fname[si] == pattern[pi]) or (pattern[pi] == "?"):
si += 1
pi += 1
else:
if pattern[pi] == "*": # recurse
if (pi + 1) == len(pattern):
return True
while si < len(fname):
if fncmp(fname[si:], pattern[pi + 1 :]) == True:
return True
else:
si += 1
return False
else:
return False
if pi == len(pattern.rstrip("*")) and si == len(fname):
return True
else:
return False
FTP服务启动
""" 启动FTP服务 """
def ftpserver():
DATA_PORT = 13333
ftpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ftpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ftpsocket.bind(socket.getaddrinfo("0.0.0.0", 21)[0][4])
datasocket.bind(socket.getaddrinfo("0.0.0.0", DATA_PORT)[0][4])
ftpsocket.listen(1)
datasocket.listen(1)
datasocket.settimeout(10)
print("FTP服务启动成功!监听端口:21")
msg_250_OK = "250 OK\r\n"
msg_550_fail = "550 Failed\r\n"
try:
dataclient = None
fromname = None
while True:
cl, remote_addr = ftpsocket.accept()
cl.settimeout(300)
cwd = FTP_ROOT_PATH
try:
print("新的FTP连接来自: %s:%s" % (remote_addr[0], remote_addr[1]))
cl.sendall("220 Welcome! This is the W5500_EVB_PICO!\r\n")
while True:
gc.collect()
data = cl.readline().decode("utf-8").rstrip("\r\n")
if len(data) <= 0:
print("Client disappeared")
break
command = data.split(" ")[0].upper()
payload = data[len(command) :].lstrip()
path = get_absolute_path(cwd, payload)
print("命令={}, 参数={}, 路径={}".format(command, payload, path))
if command == "USER":
cl.sendall("230 Logged in.\r\n")
elif command == "SYST":
cl.sendall("215 UNIX Type: L8\r\n")
elif command == "NOOP":
cl.sendall("200 OK\r\n")
elif command == "FEAT":
cl.sendall("211 no-features\r\n")
elif command == "PWD":
cl.sendall('257 "{}"\r\n'.format(cwd))
elif command == "CWD":
try:
files = uos.listdir(path)
cwd = path
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "CDUP":
cwd = get_absolute_path(cwd, "..")
cl.sendall(msg_250_OK)
elif command == "TYPE":
# probably should switch between binary and not
cl.sendall("200 Transfer mode set\r\n")
elif command == "SIZE":
try:
size = uos.stat(path)[6]
cl.sendall("213 {}\r\n".format(size))
except:
cl.sendall(msg_550_fail)
elif command == "QUIT":
cl.sendall("221 Bye.\r\n")
break
elif command == "PASV":
addr = nic.ifconfig()[0]
cl.sendall(
"227 Entering Passive Mode ({},{},{}).\r\n".format(
addr.replace(".", ","), DATA_PORT >> 8, DATA_PORT % 256
)
)
dataclient, data_addr = datasocket.accept()
print("新的FTP数据连接来自: %s:%s" % (data_addr[0], data_addr[1]))
elif command == "LIST" or command == "NLST":
if not payload.startswith("-"):
place = path
else:
place = cwd
try:
send_list_data(
place, dataclient, command == "LIST" or payload == "-l"
)
cl.sendall("150 Here comes the directory listing.\r\n")
cl.sendall("226 Listed.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "RETR":
try:
send_file_data(path, dataclient)
cl.sendall("150 Opening data connection.\r\n")
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "STOR":
try:
cl.sendall("150 Ok to send data.\r\n")
save_file_data(path, dataclient, "wb")
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "APPE":
try:
cl.sendall("150 Ok to send data.\r\n")
save_file_data(path, dataclient, "a")
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "DELE":
try:
uos.remove(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "RMD":
try:
uos.rmdir(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "MKD":
try:
uos.mkdir(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "RNFR":
fromname = path
cl.sendall("350 Rename from\r\n")
elif command == "RNTO":
if fromname is not None:
try:
uos.rename(fromname, path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
else:
cl.sendall(msg_550_fail)
fromname = None
else:
cl.sendall("502 Unsupported command.\r\n")
# print("Unsupported command {} with payload {}".format(command, payload))
except Exception as err:
print(err)
finally:
cl.close()
cl = None
finally:
datasocket.close()
ftpsocket.close()
if dataclient is not None:
dataclient.close()
if __name__ == "__main__":
print("run in main")
w5x00_init() # 初始化网络
ftpserver() # 运行 FTP Server
三、视频演示
进阶任务
四、完整代码
import gc
import uos
import time
import socket
import network
from time import localtime
from machine import Pin, SPI
from micropython import const
import sdcard
"""初始化引脚"""
_LED_PIN = const(25) # LED 指示灯引脚
_SPI_SPEED = const(2_000_000) # SPI 速率
_MOSI_PIN = const(19) # SPI MOSI 引脚
_MISO_PIN = const(16) # SPI MISO 引脚
_SCK_PIN = const(18) # SPI SCK 引脚
_CS_PIN = const(17) # SPI CS 引脚
_RST_PIN = const(20) # SPI RESET 引脚
FTP_ROOT_PATH = const("/sd") # FTP 根目录
month_name = ["","Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec",
]
# SPI 定义
spi = SPI(0, _SPI_SPEED, mosi=Pin(_MOSI_PIN), miso=Pin(_MISO_PIN), sck=Pin(_SCK_PIN))
nic = None
spi1 = SPI(1, sck=Pin(10), mosi=Pin(11), miso=Pin(12))
cs = Pin(13)
sd = sdcard.SDCard(spi1,cs)
# 挂载文件到sd
uos.mount(sd,"/sd")
# 列出MicroSD/TF卡中的目录文件
print(uos.listdir('/sd'))
# 写文件测试
f = open('/sd/test.txt','w',encoding='utf-8')
f.write('MicroSD/TF存储卡访问测试!')
f.close()
# 读文件测试
f = open('/sd/test.txt','r')
print(f.read())
f.close()
""" W5500 初始化 """
def w5x00_init():
global nic
# 网口初始化
nic = network.WIZNET5K(spi, Pin(_CS_PIN), Pin(_RST_PIN)) # spi,cs,reset pin
nic.active(True)
# 配置网络
# If you use the Dynamic IP(DHCP), you must use the "nic.ifconfig('dhcp')".
nic.ifconfig("dhcp")
# If you use the Static IP, you must use the "nic.ifconfig("IP","subnet","Gateway","DNS")".
# nic.ifconfig(('192.168.0.106','255.255.255.0','192.168.0.1','114.114.114.114'))
while not nic.isconnected():
time.sleep(1)
print(nic.regs())
print("IP地址: %s" % nic.ifconfig()[0])
print("子网掩码: %s" % nic.ifconfig()[1])
print("网关: %s" % nic.ifconfig()[2])
print("DNS: %s" % nic.ifconfig()[3])
'/ 响应文件列表请求 /'
def send_list_data(path, dataclient, full):
try: # whether path is a directory name
for fname in uos.listdir(path):
dataclient.sendall(make_description(path, fname, full))
except: # path may be a file name or pattern
pattern = path.split("/")[-1]
path = path[: -(len(pattern) + 1)]
if path == "":
path = "/"
for fname in uos.listdir(path):
if fncmp(fname, pattern) == True:
dataclient.sendall(make_description(path, fname, full))
""" 列出目录详情 """
def make_description(path, fname, full):
if full:
stat = uos.stat(get_absolute_path(path, fname))
file_permissions = (
"drwxr-xr-x" if (stat[0] & 0o170000 == 0o040000) else "-rw-r--r--"
)
file_size = stat[6]
tm = localtime(stat[7])
if tm[0] != localtime()[0]:
description = "{} 1 owner group {:>10} {} {:2} {:>5} {}\r\n".format(
file_permissions, file_size, month_name[tm[1]], tm[2], tm[0], fname
)
else:
description = (
"{} 1 owner group {:>10} {} {:2} {:02}:{:02} {}\r\n".format(
file_permissions,
file_size,
month_name[tm[1]],
tm[2],
tm[3],
tm[4],
fname,
)
)
else:
description = fname + "\r\n"
return description
""" 发送文件数据 """
def send_file_data(path, dataclient):
try:
with open(path, "rb") as file:
chunk = file.read(512)
print("chunk 0: ", len(chunk))
while len(chunk) > 0:
print("chunk: ", len(chunk))
dataclient.sendall(chunk)
chunk = file.read(512)
except Exception as err:
print("error: ", err.args, err.value, err.errno)
""" 保存文件上传数据 """
def save_file_data(path, dataclient, mode):
with open(path, mode) as file:
chunk = dataclient.read(512)
while len(chunk) > 0:
file.write(chunk)
chunk = dataclient.read(512)
""" 获取文件绝对路径 """
def get_absolute_path(cwd, payload):
# Just a few special cases "..", "." and ""
# If payload start's with /, set cwd to /
# and consider the remainder a relative path
if payload.startswith("/"):
cwd = "/"
for token in payload.split("/"):
if token == "..":
if cwd != "/":
cwd = "/".join(cwd.split("/")[:-1])
if cwd == "":
cwd = "/"
elif token != "." and token != "":
if cwd == "/":
cwd += token
else:
cwd = cwd + "/" + token
return cwd
""" 文件名比较 """
def fncmp(fname, pattern):
pi = 0
si = 0
while pi < len(pattern) and si < len(fname):
if (fname[si] == pattern[pi]) or (pattern[pi] == "?"):
si += 1
pi += 1
else:
if pattern[pi] == "*": # recurse
if (pi + 1) == len(pattern):
return True
while si < len(fname):
if fncmp(fname[si:], pattern[pi + 1 :]) == True:
return True
else:
si += 1
return False
else:
return False
if pi == len(pattern.rstrip("*")) and si == len(fname):
return True
else:
return False
""" 启动FTP服务 """
def ftpserver():
DATA_PORT = 13333
ftpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ftpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ftpsocket.bind(socket.getaddrinfo("0.0.0.0", 21)[0][4])
datasocket.bind(socket.getaddrinfo("0.0.0.0", DATA_PORT)[0][4])
ftpsocket.listen(1)
datasocket.listen(1)
datasocket.settimeout(10)
print("FTP服务启动成功!监听端口:21")
msg_250_OK = "250 OK\r\n"
msg_550_fail = "550 Failed\r\n"
try:
dataclient = None
fromname = None
while True:
cl, remote_addr = ftpsocket.accept()
cl.settimeout(300)
cwd = FTP_ROOT_PATH
try:
print("新的FTP连接来自: %s:%s" % (remote_addr[0], remote_addr[1]))
cl.sendall("220 Welcome! This is the W5500_EVB_PICO!\r\n")
while True:
gc.collect()
data = cl.readline().decode("utf-8").rstrip("\r\n")
if len(data) <= 0:
print("Client disappeared")
break
command = data.split(" ")[0].upper()
payload = data[len(command) :].lstrip()
path = get_absolute_path(cwd, payload)
print("命令={}, 参数={}, 路径={}".format(command, payload, path))
if command == "USER":
cl.sendall("230 Logged in.\r\n")
elif command == "SYST":
cl.sendall("215 UNIX Type: L8\r\n")
elif command == "NOOP":
cl.sendall("200 OK\r\n")
elif command == "FEAT":
cl.sendall("211 no-features\r\n")
elif command == "PWD":
cl.sendall('257 "{}"\r\n'.format(cwd))
elif command == "CWD":
try:
files = uos.listdir(path)
cwd = path
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "CDUP":
cwd = get_absolute_path(cwd, "..")
cl.sendall(msg_250_OK)
elif command == "TYPE":
# probably should switch between binary and not
cl.sendall("200 Transfer mode set\r\n")
elif command == "SIZE":
try:
size = uos.stat(path)[6]
cl.sendall("213 {}\r\n".format(size))
except:
cl.sendall(msg_550_fail)
elif command == "QUIT":
cl.sendall("221 Bye.\r\n")
break
elif command == "PASV":
addr = nic.ifconfig()[0]
cl.sendall(
"227 Entering Passive Mode ({},{},{}).\r\n".format(
addr.replace(".", ","), DATA_PORT >> 8, DATA_PORT % 256
)
)
dataclient, data_addr = datasocket.accept()
print("新的FTP数据连接来自: %s:%s" % (data_addr[0], data_addr[1]))
elif command == "LIST" or command == "NLST":
if not payload.startswith("-"):
place = path
else:
place = cwd
try:
send_list_data(
place, dataclient, command == "LIST" or payload == "-l"
)
cl.sendall("150 Here comes the directory listing.\r\n")
cl.sendall("226 Listed.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "RETR":
try:
send_file_data(path, dataclient)
cl.sendall("150 Opening data connection.\r\n")
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "STOR":
try:
cl.sendall("150 Ok to send data.\r\n")
save_file_data(path, dataclient, "wb")
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "APPE":
try:
cl.sendall("150 Ok to send data.\r\n")
save_file_data(path, dataclient, "a")
cl.sendall("226 Transfer complete.\r\n")
except:
cl.sendall(msg_550_fail)
if dataclient is not None:
dataclient.close()
dataclient = None
elif command == "DELE":
try:
uos.remove(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "RMD":
try:
uos.rmdir(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "MKD":
try:
uos.mkdir(path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
elif command == "RNFR":
fromname = path
cl.sendall("350 Rename from\r\n")
elif command == "RNTO":
if fromname is not None:
try:
uos.rename(fromname, path)
cl.sendall(msg_250_OK)
except:
cl.sendall(msg_550_fail)
else:
cl.sendall(msg_550_fail)
fromname = None
else:
cl.sendall("502 Unsupported command.\r\n")
# print("Unsupported command {} with payload {}".format(command, payload))
except Exception as err:
print(err)
finally:
cl.close()
cl = None
finally:
datasocket.close()
ftpsocket.close()
if dataclient is not None:
dataclient.close()
if __name__ == "__main__":
print("run in main")
w5x00_init() # 初始化网络
ftpserver() # 运行 FTP Server
|