-
-
-
- import time
- import serial
- import threading
- import urllib3
- import json
- import time
-
- url = "http://192.168.3.192:9000/man"
- http = urllib3.PoolManager()
-
- STRGLO = ""
-
- cmd_read_phone = bytes([0x5a, 0xa5, 0x04, 0x83, 0x10, 0x60, 0x13])
- cmd_read_ID_number = bytes([0x5a, 0xa5, 0x04, 0x83, 0x10, 0x20, 0x18])
- cmd_write_to_enter_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x10])
- cmd_write_to_main_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x00])
- cmd_write_to_waite_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x12])
- cmd_write_to_noinfor_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x13])
- cmd_write_to_ERROR_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x14])
- send_list = [0x5a, 0xa5, 0x00, 0x00, 0x00, 0x00]
-
- all_range = None
-
- dict_in_data = {
- 'tube_number': '',
- 'name': '',
- 'phone': '',
- 'ID_card': '',
- 'company': '',
- 'test_date': ''
- }
-
- class sampleNet:
- def post(self, dict_info):
- try:
- encoded_data = json.dumps(dict_info).encode("gbk")
- resp = http.request(
- "POST",
- url,
- body=encoded_data,
- headers={
- 'x-env-code': 'mafutian',
- 'content-type': 'application/json;charset=gbk'
- }
- )
- return resp
- except Exception as e:
- print("post ERR:" + str(e))
-
- def phone_get(phone):
- net_post = sampleNet()
- data = {"command": "getPersonnel", "phone": phone}
- try:
- resp = net_post.post(data)
- if resp.status == 200:
- data = json.loads(resp.data)
- print(data)
- return data
- else:
- return None
- except Exception as e:
- print(e)
-
- def ID_get(phone):
- net_post = sampleNet()
- data = {"command": "getPersonnel", "ID_card": phone}
- try:
- resp = net_post.post(data)
- if resp.status == 200:
- data = json.loads(resp.data)
- print(data)
- return data
- else:
- return None
- except Exception as e:
- print(e)
-
-
- def save_data():
- net_post = sampleNet()
- data = {"command": "appendPersonnel",
- "name": dict_in_data['name'],
- "phone": dict_in_data['phone'],
- "ID_card": dict_in_data['ID_card'],
- "uint": dict_in_data['company']}
- try:
- resp = net_post.post(data)
- if resp.status == 200:
- data = json.loads(resp.data)
- print(data)
- return data
- else:
- return None
- except Exception as e:
- print(e)
-
- def creat_gbk_list(mydata, my_commad, H_address, L_address):
-
- send_list = [0x5a, 0xa5, 0x00, 0x00, 0x00, 0x00]
- send_len = 1 + 2 + 2 + len(mydata) * 2
- send_list[2] = send_len
-
- send_list[3] = my_commad
- send_list[4] = H_address
- send_list[5] = L_address
- mysend_list = bytes(send_list) + mydata.encode("gbk") + b'\xFF\xFF'
- print(mysend_list)
- DWritePort(ser, mysend_list)
-
-
- def ReadData(ser):
- global all_range
- while BOOL:
- if ser.in_waiting:
- try:
-
- STRGLO = ser.read(ser.in_waiting)
- print("收到数据")
- print("长度为:" + str(len(STRGLO)))
- if STRGLO[0] == 0x5a and STRGLO[1] == 0xa5:
-
- addres = STRGLO[4] << 8 | STRGLO[5]
- commd = STRGLO[3]
- recv_len = STRGLO[2]
- print("地址为:" + hex(addres) + " 长度为:" + str(recv_len) + " 命令为:" + hex(commd))
- if recv_len == 0x06 and commd == 0x83:
- if addres == 0x1080:
- print("按手机号码查询")
-
-
- try:
- data = phone_get(dict_in_data['phone'] )
- if data is not None:
- if data['code'] == 1000:
-
- creat_gbk_list(data['ID_card'], 0x82, 0x10, 0x20)
-
- creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
-
- creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
-
- creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
-
- creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
- DWritePort(ser, cmd_write_to_enter_page)
-
- except Exception as e:
- print("出错啦:" + str(e))
- elif addres == 0x1090:
- print("按身份证查询")
- try:
- data = ID_get(dict_in_data['ID_card'])
- if data is not None:
- if data['code'] == 1000:
-
-
-
- creat_gbk_list(data['phone'], 0x82, 0x10, 0x60)
-
-
- creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
-
-
- creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
-
- creat_gbk_list(data['uint'], 0x82, 0x11, 0x40)
-
-
- creat_gbk_list(data['name'], 0x82, 0x11, 0x60)
- DWritePort(ser, cmd_write_to_enter_page)
- except Exception as e:
- print("出错啦:" + str(e))
- elif addres == 0x1210:
- print("保存数据")
- if dict_in_data['name'] == "" or \
- dict_in_data['company'] == "" or\
- dict_in_data['phone'] == "" or\
- dict_in_data['ID_card'] == "":
- print("data ERROR")
- print("数据中有空字段")
- else:
- data = save_data()
- if data is not None:
- if data['code'] == 1000:
- DWritePort(ser, cmd_write_to_enter_page)
-
- elif commd == 0x83 and recv_len == 18 and addres == 0x1060:
- print("接收到手机号码")
- print(STRGLO)
- recv_phone_len = STRGLO[6]
- if (recv_phone_len == 0x07):
- str_ph = STRGLO[7:18]
- dict_in_data['phone'] = str_ph.decode()
- print(dict_in_data['phone'])
- else:
- print("接收到的电话号码长度不对 长度为:" + str((recv_phone_len - 8)))
-
- elif commd == 0x83 and recv_len == 24 and addres == 0x1020:
- print("接收到身份证号码")
- recv_ID_len = STRGLO[6]
- print(STRGLO)
- print(recv_ID_len)
- if (recv_ID_len == 10):
- str_ID = STRGLO[7:25]
- dict_in_data['ID_card'] = str_ID.decode()
- else:
- print("接收到的电话号码长度不对 长度为:" + str((recv_phone_len - 8)))
-
- elif commd == 0x83 and addres == 0x1160:
-
- recv_name_len = STRGLO[6] - 1
- if 0 < recv_name_len < 5:
- dict_in_data['name'] = (STRGLO[7:(7 + recv_name_len * 2)]).decode("gbk")
- else:
- print("姓名的长度不符合")
- creat_gbk_list("姓名的长度不符合!", 0x82, 0x18, 0x00)
- print(dict_in_data['name'])
- elif commd == 0x83 and addres == 0x1140:
-
- recv_company_len = STRGLO[6] - 1
- if 1 < recv_company_len < 6:
- dict_in_data['company'] = (STRGLO[7:(7 + recv_company_len * 2)]).decode("gbk")
- else:
- creat_gbk_list("单位的长度不符合!", 0x82, 0x18, 0x00)
- print("单位的长度不符合")
- print(dict_in_data['company'])
- else:
- print("非法数据")
- except Exception as e:
- print("错误:" + str(e))
-
-
-
-
-
-
- def DOpenPort(portx, bps, timeout):
- ret = False
- try:
-
- ser = serial.Serial(portx, bps, timeout=timeout)
-
- if (ser.is_open):
- ret = True
- threading.Thread(target=ReadData, args=(ser,)).start()
- except Exception as e:
- print("---异常---:", e)
- return ser, ret
-
-
-
- def DColsePort(ser):
- global BOOL
- BOOL = False
- ser.close()
-
-
-
- def DWritePort(ser, text):
- result = ser.write(text)
- return result
-
-
-
- def DReadPort():
- global STRGLO
- str = STRGLO
- STRGLO = ""
- return str
-
-
- if __name__ == "__main__":
- ser, ret = DOpenPort("/dev/ttyUSB0", 115200, None)
- if (ret == True):
- print("打开串口成功1")
-
-
-
-