708|1

32

帖子

4

TA的资源

一粒金砂(中级)

楼主
 

【得捷Follow me第4期】简易FTP文件服务器 [复制链接]

  本帖最后由 鲜de芒果 于 2024-2-22 14:22 编辑

5.1 任务说明

这里我选择实现任务二,实现一个简易FTP文件服务器。

 

5.2 功能实现

本任务实现使用的 MicroPython 作为开发语言,需要下载 W5500-EVB-Pico 对应的 MicroPython 固件,详见:固件下载页面 本次任务使用的 MicroPython 固件版本为当前最新版本:v1.22.1 (2024-01-05) .uf2 实现步骤:

  1. 下载最新 MicroPython 固件并烧录到 W5500-EVB-Pico 开发板,具体的烧录步骤请稳步之前的文章,这里不作展开说明。
  2. 建立 www 目录,并存放一个测试文件。(可选步骤)
  3. 简单实现 ftp 协议。

 

5.3 功能代码

import gc
import uos
import time
import socket
import network
from time import localtime
from machine import Pin, SPI
from micropython import const



_LED_PIN = const(25) # 绿色 LED 引脚
_SPI_SPEED = const(2_000_000) # SPI 速率
_MOSI_PIN = const(19) # SPI MOSI 引脚
_MISO_PIN = const(16) # SPI MISO 引脚
_SCK_PIN = const(18) # SPI SCK 引脚
_CS_PIN = const(17) # SPI CS 引脚
_RST_PIN = const(20) # SPI RESET 引脚
FTP_ROOT_PATH = const("/www") # FTP 根目录

month_name = ["", "Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"]

# SPI 定义
spi=SPI(0, _SPI_SPEED, mosi=Pin(_MOSI_PIN), miso=Pin(_MISO_PIN), sck=Pin(_SCK_PIN))
nic = None

""" W5500 初始化 """
def w5x00_init():
    global nic
    # 网口初始化
    nic = network.WIZNET5K(spi, Pin(_CS_PIN), Pin(_RST_PIN)) #spi,cs,reset pin
    nic.active(True)
    # 配置网络
    nic.ifconfig(('192.168.1.101','255.255.255.0','192.168.1.1','8.8.8.8'))
    while not nic.isconnected():
        time.sleep(1)
        print(nic.regs())
    
    print("IP地址:  %s" %nic.ifconfig()[0])
    print("子网掩码: %s" %nic.ifconfig()[1])
    print("网关:    %s" %nic.ifconfig()[2])
    print("DNS:     %s" %nic.ifconfig()[3])


""" 响应文件列表请求 """
def send_list_data(path, dataclient, full):
    try: # whether path is a directory name
        for fname in uos.listdir(path):
            dataclient.sendall(make_description(path, fname, full))
    except: # path may be a file name or pattern
        pattern = path.split("/")[-1]
        path = path[:-(len(pattern) + 1)]
        if path == "": path = "/"
        for fname in uos.listdir(path):
            if fncmp(fname, pattern) == True:
                dataclient.sendall(make_description(path, fname, full))

""" 列出目录详情 """
def make_description(path, fname, full):
    if full:
        stat = uos.stat(get_absolute_path(path,fname))
        file_permissions = "drwxr-xr-x" if (stat[0] & 0o170000 == 0o040000) else "-rw-r--r--"
        file_size = stat[6]
        tm = localtime(stat[7])
        if tm[0] != localtime()[0]:
            description = "{}    1 owner group {:>10} {} {:2} {:>5} {}\r\n".format(
                file_permissions, file_size, month_name[tm[1]], tm[2], tm[0], fname)
        else:
            description = "{}    1 owner group {:>10} {} {:2} {:02}:{:02} {}\r\n".format(
                file_permissions, file_size, month_name[tm[1]], tm[2], tm[3], tm[4], fname)
    else:
        description = fname + "\r\n"
    return description

""" 发送文件数据 """
def send_file_data(path, dataclient):
    try:
        with open(path, "rb") as file:
            chunk = file.read(512)
            print("chunk 0: ", len(chunk))
            while len(chunk) > 0:
                print("chunk: ", len(chunk))
                dataclient.sendall(chunk)
                chunk = file.read(512)
    except Exception as err:
        print("error: ", err.args, err.value, err.errno)


""" 保存文件上传数据 """
def save_file_data(path, dataclient, mode):
    with open(path, mode) as file:
        chunk = dataclient.read(512)
        while len(chunk) > 0:
            file.write(chunk)
            chunk = dataclient.read(512)

""" 获取文件绝对路径 """
def get_absolute_path(cwd, payload):
    # Just a few special cases "..", "." and ""
    # If payload start's with /, set cwd to / 
    # and consider the remainder a relative path
    if payload.startswith('/'):
        cwd = "/"
    for token in payload.split("/"):
        if token == '..':
            if cwd != '/':
                cwd = '/'.join(cwd.split('/')[:-1])
                if cwd == '': 
                    cwd = '/'
        elif token != '.' and token != '':
            if cwd == '/':
                cwd += token
            else:
                cwd = cwd + '/' + token
    return cwd


""" 文件名比较 """
def fncmp(fname, pattern):
    pi = 0
    si = 0
    while pi < len(pattern) and si < len(fname):
        if (fname[si] == pattern[pi]) or (pattern[pi] == '?'):
            si += 1
            pi += 1
        else:
            if pattern[pi] == '*': # recurse
                if (pi + 1) == len(pattern):
                    return True
                while si < len(fname):
                    if fncmp(fname[si:], pattern[pi+1:]) == True:
                        return True
                    else:
                        si += 1
                return False
            else:
                return False
    if pi == len(pattern.rstrip("*"))  and si == len(fname):
        return True
    else:
        return False

""" 启动FTP服务 """
def ftpserver():
    DATA_PORT = 13333
    ftpsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    datasocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    ftpsocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    datasocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    ftpsocket.bind(socket.getaddrinfo("0.0.0.0", 21)[0][4])
    datasocket.bind(socket.getaddrinfo("0.0.0.0", DATA_PORT)[0][4])

    ftpsocket.listen(1)
    datasocket.listen(1)
    datasocket.settimeout(10)
    
    print("FTP服务启动成功!监听端口:21");

    msg_250_OK = '250 OK\r\n'
    msg_550_fail = '550 Failed\r\n'
    try:
        dataclient = None
        fromname = None
        while True:
            cl, remote_addr = ftpsocket.accept()
            cl.settimeout(300)
            cwd = FTP_ROOT_PATH
            try:
                print("新的FTP连接来自: %s:%s" %(remote_addr[0], remote_addr[1]))
                cl.sendall("220 Welcome! This is the W5500_EVB_PICO!\r\n")
                while True:
                    gc.collect()
                    data = cl.readline().decode("utf-8").rstrip("\r\n")
                    if len(data) <= 0:
                        print("Client disappeared")
                        break
                    
                    command = data.split(" ")[0].upper()
                    payload = data[len(command):].lstrip()

                    path = get_absolute_path(cwd, payload)
                    
                    print("命令={}, 参数={}, 路径={}".format(command, payload, path))
                    
                    if command == "USER":
                        cl.sendall("230 Logged in.\r\n")
                    elif command == "SYST":
                        cl.sendall("215 UNIX Type: L8\r\n")
                    elif command == "NOOP":
                        cl.sendall("200 OK\r\n")
                    elif command == "FEAT":
                        cl.sendall("211 no-features\r\n")
                    elif command == "PWD":
                        cl.sendall('257 "{}"\r\n'.format(cwd))
                    elif command == "CWD":
                        try:
                            files = uos.listdir(path)
                            cwd = path
                            cl.sendall(msg_250_OK)
                        except:
                            cl.sendall(msg_550_fail)
                    elif command == "CDUP":
                        cwd = get_absolute_path(cwd, "..")
                        cl.sendall(msg_250_OK)
                    elif command == "TYPE":
                        # probably should switch between binary and not
                        cl.sendall('200 Transfer mode set\r\n')
                    elif command == "SIZE":
                        try:
                            size = uos.stat(path)[6]
                            cl.sendall('213 {}\r\n'.format(size))
                        except:
                            cl.sendall(msg_550_fail)
                    elif command == "QUIT":
                        cl.sendall('221 Bye.\r\n')
                        break
                    elif command == "PASV":
                        addr = nic.ifconfig()[0]
                        cl.sendall('227 Entering Passive Mode ({},{},{}).\r\n'.format(
                            addr.replace('.',','), DATA_PORT>>8, DATA_PORT%256))
                        dataclient, data_addr = datasocket.accept()
                        print("新的FTP数据连接来自: %s:%s" %(data_addr[0], data_addr[1]))
                    elif command == "LIST" or command == "NLST":
                        if not payload.startswith("-"):
                            place = path
                        else: 
                            place = cwd
                        try:
                            send_list_data(place, dataclient, command == "LIST" or payload == "-l")
                            cl.sendall("150 Here comes the directory listing.\r\n")
                            cl.sendall("226 Listed.\r\n")
                        except:
                            cl.sendall(msg_550_fail)
                        if dataclient is not None:
                            dataclient.close()
                            dataclient = None
                    elif command == "RETR":
                        try:
                            send_file_data(path, dataclient)
                            cl.sendall("150 Opening data connection.\r\n")
                            cl.sendall("226 Transfer complete.\r\n")
                        except:
                            cl.sendall(msg_550_fail)
                        if dataclient is not None:
                            dataclient.close()
                            dataclient = None
                    elif command == "STOR":
                        try:
                            cl.sendall("150 Ok to send data.\r\n")
                            save_file_data(path, dataclient, "wb")
                            cl.sendall("226 Transfer complete.\r\n")
                        except:
                            cl.sendall(msg_550_fail)
                        if dataclient is not None:
                            dataclient.close()
                            dataclient = None
                    elif command == "APPE":
                        try:
                            cl.sendall("150 Ok to send data.\r\n")
                            save_file_data(path, dataclient, "a")
                            cl.sendall("226 Transfer complete.\r\n")
                        except:
                            cl.sendall(msg_550_fail)
                        if dataclient is not None:
                            dataclient.close()
                            dataclient = None
                    elif command == "DELE":
                        try:
                            uos.remove(path)
                            cl.sendall(msg_250_OK)
                        except:
                            cl.sendall(msg_550_fail)
                    elif command == "RMD":
                        try:
                            uos.rmdir(path)
                            cl.sendall(msg_250_OK)
                        except:
                            cl.sendall(msg_550_fail)
                    elif command == "MKD":
                        try:
                            uos.mkdir(path)
                            cl.sendall(msg_250_OK)
                        except:
                            cl.sendall(msg_550_fail)
                    elif command == "RNFR":
                            fromname = path
                            cl.sendall("350 Rename from\r\n")
                    elif command == "RNTO":
                            if fromname is not None: 
                                try:
                                    uos.rename(fromname, path)
                                    cl.sendall(msg_250_OK)
                                except:
                                    cl.sendall(msg_550_fail)
                            else:
                                cl.sendall(msg_550_fail)
                            fromname = None
                    else:
                        cl.sendall("502 Unsupported command.\r\n")
                        # print("Unsupported command {} with payload {}".format(command, payload))
            except Exception as err:
                print(err)  

            finally:          
                cl.close()
                cl = None
    finally:
        datasocket.close()
        ftpsocket.close()
        if dataclient is not None:
            dataclient.close()


if __name__ == "__main__":
    print("run in main")
    w5x00_init() # 初始化网络
    ftpserver()  # 运行 FTP Server

 

5.4 效果展示

启动服务

 

建立连接

 

上传文件

 

 

删除文件 

 

下载文件

 

5.5 总结

FTP 的实现使用 MicroPython 的原因是为了简化存储的访问。CircuitPython 对存储访问的管理较为严格,实现文件存储比较复杂。

 

 

5.6 源码

W5500-EVB-Pico-ftp.zip (3.65 KB, 下载次数: 2)

 

 

5.7 视频演示


 

最新回复

CircuitPython 对存储访问的管理较为严格,实现文件存储能看出来还是比较复杂的   详情 回复 发表于 2024-2-24 08:08
点赞 关注
 
 

回复
举报

6809

帖子

0

TA的资源

五彩晶圆(高级)

沙发
 

CircuitPython 对存储访问的管理较为严格,实现文件存储能看出来还是比较复杂的

 
 
 

回复
您需要登录后才可以回帖 登录 | 注册

查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/8 下一条

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2025 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表