【国产RISC-V Linux板 昉·星光VisionFive试用报告】完成被采样人数据保存读取
[复制链接]
今天完成采样用户录入、以及以身份证号码查询:
main主程序更新如下:
# -*- coding: gbk -*-
#!/usr/bin/python
import time
import serial # 导入模块
import threading
import urllib3
import json
import time
url = "http://192.168.3.192:9000/man"
http = urllib3.PoolManager()
STRGLO = "" # 读取的数据
cmd_read_phone = bytes([0x5a, 0xa5, 0x04, 0x83, 0x10, 0x60, 0x13])
cmd_read_ID_number = bytes([0x5a, 0xa5, 0x04, 0x83, 0x10, 0x20, 0x18])
cmd_write_to_enter_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x10])
cmd_write_to_main_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x00])
cmd_write_to_waite_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x12])
cmd_write_to_noinfor_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x13])
cmd_write_to_ERROR_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x14])
send_list = [0x5a, 0xa5, 0x00, 0x00, 0x00, 0x00]
all_range = None
dict_in_data = {
'tube_number': '',
'name': '',
'phone': '',
'ID_card': '',
'company': '',
'test_date': ''
}
class sampleNet:
def post(self, dict_info):
try:
encoded_data = json.dumps(dict_info).encode("gbk")
resp = http.request(
"POST",
url,
body=encoded_data,
headers={
'x-env-code': 'mafutian',
'content-type': 'application/json;charset=gbk'
}
)
return resp
except Exception as e:
print("post ERR:" + str(e))
# 按手机号码查询
def phone_get(phone):
net_post = sampleNet()
data = {"command": "getPersonnel", "phone": phone}
try:
resp = net_post.post(data)
if resp.status == 200:
data = json.loads(resp.data)
print(data)
return data
else:
return None
except Exception as e:
print(e)
# 按身份证号码查询
def ID_get(phone):
net_post = sampleNet()
data = {"command": "getPersonnel", "ID_card": phone}
try:
resp = net_post.post(data)
if resp.status == 200:
data = json.loads(resp.data)
print(data)
return data
else:
return None
except Exception as e:
print(e)
# 保存数据
def save_data():
net_post = sampleNet()
data = {"command": "appendPersonnel",
"name": dict_in_data['name'],
"phone": dict_in_data['phone'],
"ID_card": dict_in_data['ID_card'],
"uint": dict_in_data['company']}
try:
resp = net_post.post(data)
if resp.status == 200:
data = json.loads(resp.data)
print(data)
return data
else:
return None
except Exception as e:
print(e)
# 姓名数据组装
def creat_gbk_list(mydata, my_commad, H_address, L_address):
# 发送的list
send_list = [0x5a, 0xa5, 0x00, 0x00, 0x00, 0x00]
send_len = 1 + 2 + 2 + len(mydata) * 2
send_list[2] = send_len
# 添加 命令
send_list[3] = my_commad
send_list[4] = H_address
send_list[5] = L_address
mysend_list = bytes(send_list) + mydata.encode("gbk") + b'\xFF\xFF'
print(mysend_list)
DWritePort(ser, mysend_list)
def ReadData(ser):
global all_range
while BOOL:
if ser.in_waiting:
try:
# STRGLO = ser.read(ser.in_waiting).decode("gbk")
STRGLO = ser.read(ser.in_waiting)
print("收到数据")
print("长度为:" + str(len(STRGLO)))
if STRGLO[0] == 0x5a and STRGLO[1] == 0xa5:
# 获取地址
addres = STRGLO[4] << 8 | STRGLO[5]
commd = STRGLO[3]
recv_len = STRGLO[2]
print("地址为:" + hex(addres) + " 长度为:" + str(recv_len) + " 命令为:" + hex(commd))
if recv_len == 0x06 and commd == 0x83:
if addres == 0x1080:
print("按手机号码查询")
# 读取手机号码数据
# 开始查找是否有手机号存在
try:
data = phone_get(dict_in_data['phone'] )
if data is not None:
if data['code'] == 1000:
# 获取身份证号码
creat_gbk_list(data['ID_card'], 0x82, 0x10, 0x20)
# 写职业
creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
# 写姓名
creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
# 写职业
creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
# 写姓名
creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
DWritePort(ser, cmd_write_to_enter_page)
except Exception as e:
print("出错啦:" + str(e))
elif addres == 0x1090:
print("按身份证查询")
try:
data = ID_get(dict_in_data['ID_card'])
if data is not None:
if data['code'] == 1000:
# 获取身份证号码
# 写性别
#creat_gbk_list(my_xb, 0x82, 0x11, 0x20)
creat_gbk_list(data['phone'], 0x82, 0x10, 0x60)
# 写职业
creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
# 写姓名
creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
# 写职业
creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
# 页面转移到确认页面
# 写姓名
creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
DWritePort(ser, cmd_write_to_enter_page)
except Exception as e:
print("出错啦:" + str(e))
elif addres == 0x1210:
print("保存数据")
if dict_in_data['name'] == "" or \
dict_in_data['company'] == "" or\
dict_in_data['phone'] == "" or\
dict_in_data['ID_card'] == "":
print("data ERROR")
print("数据中有空字段")
else:
data = save_data()
if data is not None:
if data['code'] == 1000:
DWritePort(ser, cmd_write_to_enter_page)
elif commd == 0x83 and recv_len == 18 and addres == 0x1060:
print("接收到手机号码")
print(STRGLO)
recv_phone_len = STRGLO[6]
if (recv_phone_len == 0x07):
str_ph = STRGLO[7:18]
dict_in_data['phone'] = str_ph.decode()
print(dict_in_data['phone'])
else:
print("接收到的电话号码长度不对 长度为:" + str((recv_phone_len - 8)))
# 查找手机号
elif commd == 0x83 and recv_len == 24 and addres == 0x1020:
print("接收到身份证号码")
recv_ID_len = STRGLO[6]
print(STRGLO)
print(recv_ID_len)
if (recv_ID_len == 10):
str_ID = STRGLO[7:25]
dict_in_data['ID_card'] = str_ID.decode()
else:
print("接收到的电话号码长度不对 长度为:" + str((recv_phone_len - 8)))
# 查找手机号码
elif commd == 0x83 and addres == 0x1160:
# 接收到姓名
recv_name_len = STRGLO[6] - 1
if 0 < recv_name_len < 5:
dict_in_data['name'] = (STRGLO[7:(7 + recv_name_len * 2)]).decode("gbk")
else:
print("姓名的长度不符合")
creat_gbk_list("姓名的长度不符合!", 0x82, 0x18, 0x00)
print(dict_in_data['name'])
elif commd == 0x83 and addres == 0x1140:
# 接收到姓名
recv_company_len = STRGLO[6] - 1
if 1 < recv_company_len < 6:
dict_in_data['company'] = (STRGLO[7:(7 + recv_company_len * 2)]).decode("gbk")
else:
creat_gbk_list("单位的长度不符合!", 0x82, 0x18, 0x00)
print("单位的长度不符合")
print(dict_in_data['company'])
else:
print("非法数据")
except Exception as e:
print("错误:" + str(e))
# 打开串口
# 端口,GNU / Linux上的/ dev / ttyUSB0 等 或 Windows上的 COM3 等
# 波特率,标准值之一:50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,19200,38400,57600,115200
# 超时设置,None:永远等待操作,0为立即返回请求结果,其他值为等待超时时间(单位为秒)
def DOpenPort(portx, bps, timeout):
ret = False
try:
# 打开串口,并得到串口对象
ser = serial.Serial(portx, bps, timeout=timeout)
# 判断是否打开成功
if (ser.is_open):
ret = True
threading.Thread(target=ReadData, args=(ser,)).start()
except Exception as e:
print("---异常---:", e)
return ser, ret
# 关闭串口
def DColsePort(ser):
global BOOL
BOOL = False
ser.close()
# 写数据
def DWritePort(ser, text):
result = ser.write(text) # 写数据
return result
# 读数据
def DReadPort():
global STRGLO
str = STRGLO
STRGLO = "" # 清空当次读取
return str
if __name__ == "__main__":
ser, ret = DOpenPort("/dev/ttyUSB0", 115200, None)
if (ret == True): # 判断串口是否成功打开
print("打开串口成功1")
# count=DWritePort(ser,"我是东小东,哈哈")
# print("写入字节数:",count)
# DReadPort() #读串口数据
# DColsePort(ser) #关闭串口
用户录端项目基本完成,还剩下错误数据处理与展示。
|