- import gc
- import uos
- import time
- import socket
- import network
- from time import localtime
- from machine import Pin, SPI
- from micropython import const
-
-
-
- _LED_PIN = const(25)
- _SPI_SPEED = const(2_000_000)
- _MOSI_PIN = const(19)
- _MISO_PIN = const(16)
- _SCK_PIN = const(18)
- _CS_PIN = const(17)
- _RST_PIN = const(20)
- FTP_ROOT_PATH = const("/www")
-
- month_name = ["", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]
-
-
- spi=SPI(0, _SPI_SPEED, mosi=Pin(_MOSI_PIN), miso=Pin(_MISO_PIN), sck=Pin(_SCK_PIN))
- nic = None
-
- """ W5500 初始化 """
- def w5x00_init():
- global nic
-
- nic = network.WIZNET5K(spi, Pin(_CS_PIN), Pin(_RST_PIN))
- nic.active(True)
-
- nic.ifconfig(('192.168.1.101','255.255.255.0','192.168.1.1','8.8.8.8'))
- while not nic.isconnected():
- time.sleep(1)
- print(nic.regs())
-
- print("IP地址: %s" %nic.ifconfig()[0])
- print("子网掩码: %s" %nic.ifconfig()[1])
- print("网关: %s" %nic.ifconfig()[2])
- print("DNS: %s" %nic.ifconfig()[3])
-
-
- """ 响应文件列表请求 """
- def send_list_data(path, dataclient, full):
- try:
- for fname in uos.listdir(path):
- dataclient.sendall(make_description(path, fname, full))
- except:
- pattern = path.split("/")[-1]
- path = path[:-(len(pattern) + 1)]
- if path == "": path = "/"
- for fname in uos.listdir(path):
- if fncmp(fname, pattern) == True:
- dataclient.sendall(make_description(path, fname, full))
-
- """ 列出目录详情 """
- def make_description(path, fname, full):
- if full:
- stat = uos.stat(get_absolute_path(path,fname))
- file_permissions = "drwxr-xr-x" if (stat[0] & 0o170000 == 0o040000) else "-rw-r--r--"
- file_size = stat[6]
- tm = localtime(stat[7])
- if tm[0] != localtime()[0]:
- description = "{} 1 owner group {:>10} {} {:2} {:>5} {}\r\n".format(
- file_permissions, file_size, month_name[tm[1]], tm[2], tm[0], fname)
- else:
- description = "{} 1 owner group {:>10} {} {:2} {:02}:{:02} {}\r\n".format(
- file_permissions, file_size, month_name[tm[1]], tm[2], tm[3], tm[4], fname)
- else:
- description = fname + "\r\n"
- return description
-
- """ 发送文件数据 """
- def send_file_data(path, dataclient):
- try:
- with open(path, "rb") as file:
- chunk = file.read(512)
- print("chunk 0: ", len(chunk))
- while len(chunk) > 0:
- print("chunk: ", len(chunk))
- dataclient.sendall(chunk)
- chunk = file.read(512)
- except Exception as err:
- print("error: ", err.args, err.value, err.errno)
-
-
- """ 保存文件上传数据 """
- def save_file_data(path, dataclient, mode):
- with open(path, mode) as file:
- chunk = dataclient.read(512)
- while len(chunk) > 0:
- file.write(chunk)
- chunk = dataclient.read(512)
-
- """ 获取文件绝对路径 """
- def get_absolute_path(cwd, payload):
-
-
-
- if payload.startswith('/'):
- cwd = "/"
- for token in payload.split("/"):
- if token == '..':
- if cwd != '/':
- cwd = '/'.join(cwd.split('/')[:-1])
- if cwd == '':
- cwd = '/'
- elif token != '.' and token != '':
- if cwd == '/':
- cwd += token
- else:
- cwd = cwd + '/' + token
- return cwd
-
-
- """ 文件名比较 """
- def fncmp(fname, pattern):
- pi = 0
- si = 0
- while pi < len(pattern) and si < len(fname):
- if (fname[si] == pattern[pi]) or (pattern[pi] == '?'):
- si += 1
- pi += 1
- else:
- if pattern[pi] == '*':
- if (pi + 1) == len(pattern):
- return True
- while si < len(fname):
- if fncmp(fname[si:], pattern[pi+1:]) == True:
- return True
- else:
- si += 1
- return False
- else:
- return False
- if pi == len(pattern.rstrip("*")) and si == len(fname):
- return True
- else:
- return False
-
- """ 启动FTP服务 """
- def ftpserver():
- DATA_PORT = 13333
- ftpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
-
- ftpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- ftpsocket.bind(socket.getaddrinfo("0.0.0.0", 21)[0][4])
- datasocket.bind(socket.getaddrinfo("0.0.0.0", DATA_PORT)[0][4])
-
- ftpsocket.listen(1)
- datasocket.listen(1)
- datasocket.settimeout(10)
-
- print("FTP服务启动成功!监听端口:21");
-
- msg_250_OK = '250 OK\r\n'
- msg_550_fail = '550 Failed\r\n'
- try:
- dataclient = None
- fromname = None
- while True:
- cl, remote_addr = ftpsocket.accept()
- cl.settimeout(300)
- cwd = FTP_ROOT_PATH
- try:
- print("新的FTP连接来自: %s:%s" %(remote_addr[0], remote_addr[1]))
- cl.sendall("220 Welcome! This is the W5500_EVB_PICO!\r\n")
- while True:
- gc.collect()
- data = cl.readline().decode("utf-8").rstrip("\r\n")
- if len(data) <= 0:
- print("Client disappeared")
- break
-
- command = data.split(" ")[0].upper()
- payload = data[len(command):].lstrip()
-
- path = get_absolute_path(cwd, payload)
-
- print("命令={}, 参数={}, 路径={}".format(command, payload, path))
-
- if command == "USER":
- cl.sendall("230 Logged in.\r\n")
- elif command == "SYST":
- cl.sendall("215 UNIX Type: L8\r\n")
- elif command == "NOOP":
- cl.sendall("200 OK\r\n")
- elif command == "FEAT":
- cl.sendall("211 no-features\r\n")
- elif command == "PWD":
- cl.sendall('257 "{}"\r\n'.format(cwd))
- elif command == "CWD":
- try:
- files = uos.listdir(path)
- cwd = path
- cl.sendall(msg_250_OK)
- except:
- cl.sendall(msg_550_fail)
- elif command == "CDUP":
- cwd = get_absolute_path(cwd, "..")
- cl.sendall(msg_250_OK)
- elif command == "TYPE":
-
- cl.sendall('200 Transfer mode set\r\n')
- elif command == "SIZE":
- try:
- size = uos.stat(path)[6]
- cl.sendall('213 {}\r\n'.format(size))
- except:
- cl.sendall(msg_550_fail)
- elif command == "QUIT":
- cl.sendall('221 Bye.\r\n')
- break
- elif command == "PASV":
- addr = nic.ifconfig()[0]
- cl.sendall('227 Entering Passive Mode ({},{},{}).\r\n'.format(
- addr.replace('.',','), DATA_PORT>>8, DATA_PORT%256))
- dataclient, data_addr = datasocket.accept()
- print("新的FTP数据连接来自: %s:%s" %(data_addr[0], data_addr[1]))
- elif command == "LIST" or command == "NLST":
- if not payload.startswith("-"):
- place = path
- else:
- place = cwd
- try:
- send_list_data(place, dataclient, command == "LIST" or payload == "-l")
- cl.sendall("150 Here comes the directory listing.\r\n")
- cl.sendall("226 Listed.\r\n")
- except:
- cl.sendall(msg_550_fail)
- if dataclient is not None:
- dataclient.close()
- dataclient = None
- elif command == "RETR":
- try:
- send_file_data(path, dataclient)
- cl.sendall("150 Opening data connection.\r\n")
- cl.sendall("226 Transfer complete.\r\n")
- except:
- cl.sendall(msg_550_fail)
- if dataclient is not None:
- dataclient.close()
- dataclient = None
- elif command == "STOR":
- try:
- cl.sendall("150 Ok to send data.\r\n")
- save_file_data(path, dataclient, "wb")
- cl.sendall("226 Transfer complete.\r\n")
- except:
- cl.sendall(msg_550_fail)
- if dataclient is not None:
- dataclient.close()
- dataclient = None
- elif command == "APPE":
- try:
- cl.sendall("150 Ok to send data.\r\n")
- save_file_data(path, dataclient, "a")
- cl.sendall("226 Transfer complete.\r\n")
- except:
- cl.sendall(msg_550_fail)
- if dataclient is not None:
- dataclient.close()
- dataclient = None
- elif command == "DELE":
- try:
- uos.remove(path)
- cl.sendall(msg_250_OK)
- except:
- cl.sendall(msg_550_fail)
- elif command == "RMD":
- try:
- uos.rmdir(path)
- cl.sendall(msg_250_OK)
- except:
- cl.sendall(msg_550_fail)
- elif command == "MKD":
- try:
- uos.mkdir(path)
- cl.sendall(msg_250_OK)
- except:
- cl.sendall(msg_550_fail)
- elif command == "RNFR":
- fromname = path
- cl.sendall("350 Rename from\r\n")
- elif command == "RNTO":
- if fromname is not None:
- try:
- uos.rename(fromname, path)
- cl.sendall(msg_250_OK)
- except:
- cl.sendall(msg_550_fail)
- else:
- cl.sendall(msg_550_fail)
- fromname = None
- else:
- cl.sendall("502 Unsupported command.\r\n")
-
- except Exception as err:
- print(err)
-
- finally:
- cl.close()
- cl = None
- finally:
- datasocket.close()
- ftpsocket.close()
- if dataclient is not None:
- dataclient.close()
-
-
- if __name__ == "__main__":
- print("run in main")
- w5x00_init()
- ftpserver()