1171|2

501

帖子

4

TA的资源

纯净的硅(高级)

楼主
 

【得捷电子Follow me第1期】+3.同步网络时间 [复制链接]

  本帖最后由 qinyunti 于 2023-4-9 21:11 编辑

转自公众号,欢迎关注

Raspberry Pi Pico w网络编程-wifi连接,性能测试,http服务,网络时间同步 (qq.com)

73a03124076d5c34dd8a4083fba1ef3b

 

一.准备

参考Connecting-to-the-internet-with-pico-w.pdf

 

net库参考

https://docs.micropython.org/en/latest/library/network.WLAN.html

 

二.连接网络

 

代码如下net1.py

import time
import network

ssid = 'qiqiqiqi'
password = 'cqmygysdss'

wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(ssid, password)

# Wait for connect or fail
max_wait = 10
while max_wait > 0:
    if wlan.status() < 0 or wlan.status() >= 3:
        break
    max_wait -= 1
    print('waiting for connection...')
    time.sleep(1)

# Handle connection error
if wlan.status() != 3:
    raise RuntimeError('network connection failed')
else:
    print('connected')
    status = wlan.ifconfig()
    print( 'ip = ' + status[0] )
    
import ntptime
def sync_ntp():
     ntptime.NTP_DELTA = 3155644800   # 可选 UTC+8偏移时间(秒),不设置就是UTC0
     ntptime.host = 'ntp1.aliyun.com'  # 可选,ntp服务器,默认是"pool.ntp.org"
     ntptime.settime()   # 修改设备时间,到这就已经设置好了

sync_ntp()
# 已经设置好了,随便用
from machine import RTC
rtc = RTC()
print(rtc.datetime())


 

运行可以看到连接到WIFI并打印了IP信息

 

 

三.网页控制LED

代码如下net2.py

import network
import socket
import time

from machine import Pin
import uasyncio as asyncio

led = Pin("LED", Pin.OUT)
onboard = Pin("LED", Pin.OUT, value=0)

ssid = 'qiqiqiqi'
password = 'cqmygysdss'

html = """<!DOCTYPE html>
<html>
    <head> <title>Pico W</title> </head>
    <body> <h1>Pico W</h1>
        <p>%s</p>
    </body>
</html>
"""

wlan = network.WLAN(network.STA_IF)

def connect_to_network():
    wlan.active(True)
    wlan.config(pm = 0xa11140) # Disable power-save mode
    wlan.connect(ssid, password)

    max_wait = 10
    while max_wait > 0:
        if wlan.status() < 0 or wlan.status() >= 3:
            break
        max_wait -= 1
        print('waiting for connection...')
        time.sleep(1)

    if wlan.status() != 3:
        raise RuntimeError('network connection failed')
    else:
        print('connected')
        status = wlan.ifconfig()
        print('ip = ' + status[0])

async def serve_client(reader, writer):
    print("Client connected")
    request_line = await reader.readline()
    print("Request:", request_line)
    # We are not interested in HTTP request headers, skip them
    while await reader.readline() != b"\r\n":
        pass

    request = str(request_line)
    led_on = request.find('/light/on')
    led_off = request.find('/light/off')
    print( 'led on = ' + str(led_on))
    print( 'led off = ' + str(led_off))

    stateis = ""
    if led_on == 6:
        print("led on")
        led.value(1)
        stateis = "LED is ON"

    if led_off == 6:
        print("led off")
        led.value(0)
        stateis = "LED is OFF"

    response = html % stateis
    writer.write('HTTP/1.0 200 OK\r\nContent-type: text/html\r\n\r\n')
    writer.write(response)

    await writer.drain()
    await writer.wait_closed()
    print("Client disconnected")

async def main():
    print('Connecting to Network...')
    connect_to_network()

    print('Setting up webserver...')
    asyncio.create_task(asyncio.start_server(serve_client, "0.0.0.0", 80))
    while True:
        #onboard.on()
        print("heartbeat")
        await asyncio.sleep(0.25)
        #onboard.off()
        await asyncio.sleep(5)

try:
    asyncio.run(main())
finally:
    asyncio.new_event_loop()

 

网页输入http://192.168.31.109/light/off看到LED灭

 

输入http://192.168.31.109/light/on,LED亮

 

 

 

四.性能测试

4.1准备

Pc端安装iperf3

https://iperf.fr/iperf-download.php下下载

 

iperf-3.1.3-win64.zip

解压

 

 

开发板命令行中输入以下指令安装iperf3

import network

wlan = network.WLAN(network.STA_IF)

wlan.active(True)

wlan.connect('qiqiqiqi', 'cqmygysdss')

wlan.status() # 如果打印3说明连接成功



import upip

upip.install("uiperf3")

 

其中

'qiqiqiqi', 'cqmygysdss'为WIF的用户名和密码

192.168.31.236为PC的IP地址

4.2PC端运行

.\iperf3.exe -s -i 2

4.3开发板端运行

net3.py

import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect('qiqiqiqi', 'cqmygysdss')
wlan.status()  #如果打印3说明连接成功
import uiperf3
uiperf3.client('192.168.31.236')

 

可以看到速度可达600KB/S左右

 

五.网络时间

 

5.1安装ntptime

import network

wlan = network.WLAN(network.STA_IF)

wlan.active(True)

wlan.connect('qiqiqiqi', 'cqmygysdss')

wlan.status() #如果打印3说明连接成功



import upip

upip.install("ntptime")

或者从

链接已隐藏,如需查看请登录或者注册

"""
Pure Python, iperf3-compatible network performance test tool.

MIT license; Copyright (c) 2018-2019 Damien P. George

Supported modes: server & client, TCP & UDP, normal & reverse

Usage:
    import uiperf3
    uiperf3.server()
    uiperf3.client('192.168.1.5')
    uiperf3.client('192.168.1.5', udp=True, reverse=True)
"""

import sys, os, struct
import time, select, socket
import json

DEBUG = False

# iperf3 cookie size, last byte is null byte
COOKIE_SIZE = 37

# iperf3 commands
TEST_START = 1
TEST_RUNNING = 2
TEST_END = 4
PARAM_EXCHANGE = 9
CREATE_STREAMS = 10
EXCHANGE_RESULTS = 13
DISPLAY_RESULTS = 14
IPERF_DONE = 16

if DEBUG:
    cmd_string = {
        TEST_START: 'TEST_START',
        TEST_RUNNING: 'TEST_RUNNING',
        TEST_END: 'TEST_END',
        PARAM_EXCHANGE: 'PARAM_EXCHANGE',
        CREATE_STREAMS: 'CREATE_STREAMS',
        EXCHANGE_RESULTS: 'EXCHANGE_RESULTS',
        DISPLAY_RESULTS: 'DISPLAY_RESULTS',
        IPERF_DONE: 'IPERF_DONE',
    }

def fmt_size(val, div):
    for mult in ('', 'K', 'M', 'G'):
        if val < 10:
            return '% 5.2f %s' % (val, mult)
        elif val < 100:
            return '% 5.1f %s' % (val, mult)
        elif mult == 'G' or val < 1000:
            return '% 5.0f %s' % (val, mult)
        else:
            val /= div

class Stats:
    def __init__(self, param):
        self.pacing_timer_us = param['pacing_timer'] * 1000
        self.udp = param.get('udp', False)
        self.reverse = param.get('reverse', False)
        self.running = False

    def start(self):
        self.running = True
        self.t0 = self.t1 = ticks_us()
        self.nb0 = self.nb1 = 0 # num bytes
        self.np0 = self.np1 = 0 # num packets
        self.nm0 = self.nm1 = 0 # num lost packets
        if self.udp:
            if self.reverse:
                extra = '         Jitter    Lost/Total Datagrams'
            else:
                extra = '         Total Datagrams'
        else:
            extra = ''
        print('Interval           Transfer     Bitrate' + extra)

    def max_dt_ms(self):
        if not self.running:
            return -1
        return max(0, (self.pacing_timer_us - ticks_diff(ticks_us(), self.t1)) // 1000)

    def add_bytes(self, n):
        if not self.running:
            return
        self.nb0 += n
        self.nb1 += n
        self.np0 += 1
        self.np1 += 1

    def add_lost_packets(self, n):
        self.np0 += n
        self.np1 += n
        self.nm0 += n
        self.nm1 += n

    def print_line(self, ta, tb, nb, np, nm, extra=''):
        dt = tb - ta
        print(' %5.2f-%-5.2f  sec %sBytes %sbits/sec' % (ta, tb, fmt_size(nb, 1024), fmt_size(nb * 8 / dt, 1000)), end='')
        if self.udp:
            if self.reverse:
                print(' %6.3f ms  %u/%u (%.1f%%)' % (0, nm, np, 100 * nm / (max(1, np + nm))), end='')
            else:
                print('  %u' % np, end='')
        print(extra)

    def update(self, final=False):
        if not self.running:
            return
        t2 = ticks_us()
        dt = ticks_diff(t2, self.t1)
        if final or dt > self.pacing_timer_us:
            ta = ticks_diff(self.t1, self.t0) * 1e-6
            tb = ticks_diff(t2, self.t0) * 1e-6
            self.print_line(ta, tb, self.nb1, self.np1, self.nm1)
            self.t1 = t2
            self.nb1 = 0
            self.np1 = 0
            self.nm1 = 0

    def stop(self):
        self.update(True)
        self.running = False
        self.t3 = ticks_us()
        dt = ticks_diff(self.t3, self.t0)
        print('- ' * 30)
        self.print_line(0, dt * 1e-6, self.nb0, self.np0, self.nm0, '  sender')

    def report_receiver(self, stats):
        st = stats['streams'][0]
        dt = st['end_time'] - st['start_time']
        self.print_line(st['start_time'], st['end_time'], st['bytes'], st['packets'], st['errors'], '  receiver')
        return

def recvn(s, n):
    data = b''
    while len(data) < n:
        data += s.recv(n - len(data))
    return data

def recvinto(s, buf):
    if hasattr(s, 'readinto'):
        return s.readinto(buf)
    else:
        return s.recv_into(buf)

def recvninto(s, buf):
    if hasattr(s, 'readinto'):
        n = s.readinto(buf)
        assert n == len(buf)
    else:
        mv = memoryview(buf)
        off = 0
        while off < len(buf):
            off += s.recv_into(mv[off:])

def make_cookie():
    cookie_chars = b'abcdefghijklmnopqrstuvwxyz234567'
    cookie = bytearray(COOKIE_SIZE)
    for i, x in enumerate(os.urandom(COOKIE_SIZE - 1)):
        cookie[i] = cookie_chars[x & 31]
    return cookie

def server_once():
    # Listen for a connection
    ai = socket.getaddrinfo('0.0.0.0', 5201)
    ai = ai[0]
    print('Server listening on', ai[-1])
    s_listen = socket.socket(ai[0], socket.SOCK_STREAM)
    s_listen.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s_listen.bind(ai[-1])
    s_listen.listen(1)
    s_ctrl, addr = s_listen.accept()

    # Read client's cookie
    cookie = recvn(s_ctrl, COOKIE_SIZE)
    if DEBUG:
        print(cookie)

    # Ask for parameters
    s_ctrl.sendall(bytes([PARAM_EXCHANGE]))

    # Get parameters
    n = struct.unpack('>I', recvn(s_ctrl, 4))[0]
    param = recvn(s_ctrl, n)
    param = json.loads(str(param, 'ascii'))
    if DEBUG:
        print(param)
    reverse = param.get('reverse', False)

    # Ask to create streams
    s_ctrl.sendall(bytes([CREATE_STREAMS]))

    if param.get('tcp', False):
        # Accept stream
        s_data, addr = s_listen.accept()
        print('Accepted connection:', addr)
        recvn(s_data, COOKIE_SIZE)
    elif param.get('udp', False):
        # Close TCP connection and open UDP "connection"
        s_listen.close()
        s_data = socket.socket(ai[0], socket.SOCK_DGRAM)
        s_data.bind(ai[-1])
        data, addr = s_data.recvfrom(4)
        s_data.sendto(b'\x12\x34\x56\x78', addr)
    else:
        assert False

    # Start test
    s_ctrl.sendall(bytes([TEST_START]))

    # Run test
    s_ctrl.sendall(bytes([TEST_RUNNING]))

    # Read data, and wait for client to send TEST_END
    poll = select.poll()
    poll.register(s_ctrl, select.POLLIN)
    if reverse:
        poll.register(s_data, select.POLLOUT)
    else:
        poll.register(s_data, select.POLLIN)
    stats = Stats(param)
    stats.start()
    running = True
    data_buf = bytearray(os.urandom(param['len']))
    while running:
        for pollable in poll.poll(stats.max_dt_ms()):
            if pollable_is_sock(pollable, s_ctrl):
                cmd = recvn(s_ctrl, 1)[0]
                if DEBUG:
                    print(cmd_string.get(cmd, 'UNKNOWN_COMMAND'))
                if cmd == TEST_END:
                    running = False
            elif pollable_is_sock(pollable, s_data):
                if reverse:
                    n = s_data.send(data_buf)
                    stats.add_bytes(n)
                else:
                    recvninto(s_data, data_buf)
                    stats.add_bytes(len(data_buf))
        stats.update()

    # Need to continue writing so other side doesn't get blocked waiting for data
    if reverse:
        while True:
            for pollable in poll.poll(0):
                if pollable_is_sock(pollable, s_data):
                    s_data.send(data_buf)
                    break
            else:
                break

    stats.stop()

    # Ask to exchange results
    s_ctrl.sendall(bytes([EXCHANGE_RESULTS]))

    # Get client results
    n = struct.unpack('>I', recvn(s_ctrl, 4))[0]
    results = recvn(s_ctrl, n)
    results = json.loads(str(results, 'ascii'))
    if DEBUG:
        print(results)

    # Send our results
    results = { 
        'cpu_util_total': 1,
        'cpu_util_user': 0.5,
        'cpu_util_system': 0.5,
        'sender_has_retransmits': 1,
        'congestion_used': 'cubic',
        'streams': [{
            'id': 1,
            'bytes': stats.nb0,
            'retransmits': 0,
            'jitter': 0,
            'errors': 0,
            'packets': stats.np0,
            'start_time': 0,
            'end_time': ticks_diff(stats.t3, stats.t0) * 1e-6
        }]
    }
    results = json.dumps(results)
    s_ctrl.sendall(struct.pack('>I', len(results))) 
    s_ctrl.sendall(bytes(results, 'ascii'))

    # Ask to display results
    s_ctrl.sendall(bytes([DISPLAY_RESULTS]))

    # Wait for client to send IPERF_DONE
    cmd = recvn(s_ctrl, 1)[0]
    assert cmd == IPERF_DONE

    # Close all sockets
    s_data.close()
    s_ctrl.close()
    s_listen.close()

def server():
    while True:
        server_once()

def client(host, udp=False, reverse=False, bandwidth=10*1024*1024):
    print('CLIENT MODE:', 'UDP' if udp else 'TCP', 'receiving' if reverse else 'sending')

    param = {
        'client_version': '3.6',
        'omit': 0,
        'parallel': 1,
        'pacing_timer':1000,
        'time': 10,
    }

    if udp:
        param['udp'] = True
        param['len'] = 1500 - 42
        param['bandwidth'] = bandwidth # this should be should be intended bits per second
        udp_interval = 1000000 * 8 * param['len'] // param['bandwidth']
    else:
        param['tcp'] = True
        param['len'] = 3000

    if reverse:
        param['reverse'] = True

    # Connect to server
    ai = socket.getaddrinfo(host, 5201)[0]
    print('Connecting to', ai[-1])
    s_ctrl = socket.socket(ai[0], socket.SOCK_STREAM)
    s_ctrl.connect(ai[-1])

    # Send our cookie
    cookie = make_cookie()
    if DEBUG:
        print(cookie)
    s_ctrl.sendall(cookie)

    # Object to gather statistics about the run
    stats = Stats(param)

    # Run the main loop, waiting for incoming commands and dat
    ticks_us_end = param['time'] * 1000000
    poll = select.poll()
    poll.register(s_ctrl, select.POLLIN)
    s_data = None
    start = None
    udp_packet_id = 0
    while True:
        for pollable in poll.poll(stats.max_dt_ms()):
            if pollable_is_sock(pollable, s_data):
                # Data socket is writable/readable
                t = ticks_us()
                if ticks_diff(t, start) > ticks_us_end:
                    if reverse:
                        # Continue to drain any incoming data
                        recvinto(s_data, buf)
                    if stats.running:
                        # End of run
                        s_ctrl.sendall(bytes([TEST_END]))
                        stats.stop()
                else:
                    # Send/receiver data
                    if udp:
                        if reverse:
                            recvninto(s_data, buf)
                            udp_in_sec, udp_in_usec, udp_in_id = struct.unpack_from('>III', buf, 0)
                            #print(udp_in_sec, udp_in_usec, udp_in_id)
                            if udp_in_id != udp_packet_id + 1:
                                stats.add_lost_packets(udp_in_id - (udp_packet_id + 1))
                            udp_packet_id = udp_in_id
                            stats.add_bytes(len(buf))
                        else:
                            #print('UDP send', udp_last_send, t, udp_interval)
                            if t - udp_last_send > udp_interval:
                                udp_last_send += udp_interval
                                udp_packet_id += 1
                                struct.pack_into('>III', buf, 0, t // 1000000, t % 1000000, udp_packet_id)
                                n = s_data.sendto(buf, ai[-1])
                                stats.add_bytes(n)
                    else:
                        if reverse:
                            recvninto(s_data, buf)
                            n = len(buf)
                        else:
                            #print('TCP send', len(buf))
                            n = s_data.send(buf)
                        stats.add_bytes(n)

            elif pollable_is_sock(pollable, s_ctrl):
                # Receive command
                cmd = recvn(s_ctrl, 1)[0]
                if DEBUG:
                    print(cmd_string.get(cmd, 'UNKNOWN_COMMAND'))
                if cmd == TEST_START:
                    if reverse:
                        # Start receiving data now, because data socket is open
                        poll.register(s_data, select.POLLIN)
                        start = ticks_us()
                        stats.start()
                elif cmd == TEST_RUNNING:
                    if not reverse:
                        # Start sending data now
                        poll.register(s_data, select.POLLOUT)
                        start = ticks_us()
                        if udp:
                            udp_last_send = start - udp_interval
                        stats.start()
                elif cmd == PARAM_EXCHANGE:
                    param_j = json.dumps(param)
                    s_ctrl.sendall(struct.pack('>I', len(param_j))) 
                    s_ctrl.sendall(bytes(param_j, 'ascii'))
                elif cmd == CREATE_STREAMS:
                    if udp:
                        s_data = socket.socket(ai[0], socket.SOCK_DGRAM)
                        s_data.sendto(struct.pack('<I', 123456789), ai[-1])
                        recvn(s_data, 4) # get dummy response from server (=987654321)
                    else:
                        s_data = socket.socket(ai[0], socket.SOCK_STREAM)
                        s_data.connect(ai[-1])
                        s_data.sendall(cookie)
                    buf = bytearray(os.urandom(param['len']))
                elif cmd == EXCHANGE_RESULTS:
                    # Close data socket now that server knows we are finished, to prevent it flooding us
                    poll.unregister(s_data)
                    s_data.close()
                    s_data = None

                    results = {
                        'cpu_util_total': 1,
                        'cpu_util_user': 0.5,
                        'cpu_util_system': 0.5,
                        'sender_has_retransmits': 1,
                        'congestion_used': 'cubic',
                        'streams': [{
                            'id': 1,
                            'bytes': stats.nb0,
                            'retransmits': 0,
                            'jitter': 0,
                            'errors': stats.nm0,
                            'packets': stats.np0,
                            'start_time': 0,
                            'end_time': ticks_diff(stats.t3, stats.t0) * 1e-6
                        }]
                    }
                    results = json.dumps(results)
                    s_ctrl.sendall(struct.pack('>I', len(results))) 
                    s_ctrl.sendall(bytes(results, 'ascii'))

                    n = struct.unpack('>I', recvn(s_ctrl, 4))[0]
                    results = recvn(s_ctrl, n)
                    results = json.loads(str(results, 'ascii'))
                    stats.report_receiver(results)

                elif cmd == DISPLAY_RESULTS:
                    s_ctrl.sendall(bytes([IPERF_DONE]))
                    s_ctrl.close()
                    time.sleep(1) # delay so server is ready for any subsequent client connections
                    return

        stats.update()

def main():
    opt_mode = None
    opt_udp = False
    opt_reverse = False

    sys.argv.pop(0)
    while sys.argv:
        opt = sys.argv.pop(0)
        if opt == '-R':
            opt_reverse = True
        elif opt == '-u':
            opt_udp = True
        elif opt == '-s':
            opt_mode = opt
        elif opt == '-c':
            opt_mode = opt
            opt_host = sys.argv.pop(0)
        else:
            print('unknown option:', opt)
            raise SystemExit(1)

    if opt_mode == '-s':
        server()
    else:
        client(opt_host, opt_udp, opt_reverse)

if sys.platform == 'linux':
    def pollable_is_sock(pollable, sock):
        return sock is not None and pollable[0] == sock.fileno()
    def ticks_us():
        return int(time.time() * 1e6)
    def ticks_diff(a, b):
        return a - b
    if __name__ == '__main__':
        main()
else:
    def pollable_is_sock(pollable, sock):
        return pollable[0] == sock
    from time import ticks_us, ticks_diff

 

保存到开发板中。


5.2同步时间

编写代码net4.py如下

import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect('qiqiqiqi', 'cqmygysdss')
wlan.status()  #如果打印3说明连接成功


import ntptime
def sync_ntp():
     ntptime.NTP_DELTA = 3155644800   # 可选 UTC+8偏移时间(秒),不设置就是UTC0
     ntptime.host = 'ntp1.aliyun.com'  # 可选,ntp服务器,默认是"pool.ntp.org"
     ntptime.settime()   # 修改设备时间,到这就已经设置好了

sync_ntp()
# 已经设置好了,随便用
from machine import RTC
rtc = RTC()
print(rtc.datetime())

 

运行结果如下

 

 

 

 

最新回复

很给力   详情 回复 发表于 2023-6-4 12:43
点赞 关注
 
 

回复
举报

6841

帖子

11

TA的资源

版主

沙发
 
这帖发得,开挂似的呀,感谢分享!
 
 
 

回复

158

帖子

0

TA的资源

一粒金砂(中级)

板凳
 

很给力

 
 
 

回复
您需要登录后才可以回帖 登录 | 注册

随便看看
查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/10 下一条

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2024 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表