2147|2

7044

帖子

11

TA的资源

版主

楼主
 

【迪文串口屏】核酸采样登记系统之六 增加采样工作站 [复制链接]

 

【迪文串口屏】核酸采样登记系统之一 - 国产芯片交流 - 电子工程世界-论坛 (eeworld.com.cn)

【迪文串口屏】核酸采样登记系统之二:准备背景图片 - 国产芯片交流 - 电子工程世界-论坛 (eeworld.com.cn)

【迪文串口屏】核酸采样登记系统之三 连上迪文云 - 国产芯片交流 - 电子工程世界-论坛 (eeworld.com.cn)

【迪文串口屏】核酸采样登记系统之四 查询数据并显示 - 国产芯片交流 - 电子工程世界-论坛 (eeworld.com.cn)

【迪文串口屏】核酸采样登记系统之五 数据保存 - 国产芯片交流 - 电子工程世界-论坛 (eeworld.com.cn)

【前言】上个星期去单位试用了,没有正式投入,这个星期再进行改进,增加了采样工作站。

       采样站的主要功能就是被采样人登记好后,会在采样工作显示姓名,采样到第一个试管了,序号是多少,好核实信息等等。采样完后发送信号给服务器,服务清空数据,等待下一个采样人员进行登记。 

服务器代码已经更新可以同时操作两个串口:

import os
import sys

import time

import serial  # 导入模块
import threading
from win32com.client import Dispatch
import pythoncom

from excel import *

STRGLO = ""  # 读取的数据
BOOL = True  # 读取标志位
ser10 = None
ser7 = None
import win32com.client

cmd_read_phone = bytes([0x5a, 0xa5, 0x04, 0x83, 0x10, 0x60, 0x13])
cmd_read_ID_number = bytes([0x5a, 0xa5, 0x04, 0x83, 0x10, 0x20, 0x18])
cmd_write_to_enter_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x10])
cmd_write_to_main_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x00])
cmd_write_to_waite_page = bytes([0x5a, 0xa5, 0x07, 0x82, 0x00, 0x84, 0x5a, 0x01, 0x00, 0x12])
send_list = [0x5a, 0xa5, 0x00, 0x00, 0x00, 0x00]
all_range = None
"""
# 等待接收命令: wait_infor
# 查询状态     query_infor
# 确认信息:   confirmation_infor
# 保存信息:   save_infor
"""
work_state = 'wait_enter'
path = sys.path[0]
dict_in_data = {
    'tube_number': '',
    'name': '',
    'phone': '',
    'ID': '',
    'sex': '',
    'company': '',
    'test_date': ''
}
save_dict = None


# 姓名数据组装
def creat_gbk_list(mydata, my_commad, H_address, L_address, ser):
    # 发送的list
    send_list = [0x5a, 0xa5, 0x00, 0x00, 0x00, 0x00]
    send_len = 1 + 2 + 2 + len(mydata) * 2
    send_list[2] = send_len
    # 添加 命令
    send_list[3] = my_commad
    send_list[4] = H_address
    send_list[5] = L_address
    mysend_list = bytes(send_list) + mydata.encode("gbk") + b'\xFF\xFF'
    print(mysend_list)
    DWritePort(ser, mysend_list)
# 清空数据
def clear_show_data():
    dict_in_data['name'] = ''
    dict_in_data['sex'] = ''
    dict_in_data['company'] = ''
    dict_in_data['phone'] = ''
    dict_in_data['phone'] = ''
    dict_in_data['ID'] = ''
    creat_gbk_list(dict_in_data['sex'], 0x82, 0x11, 0x20, ser10)
    creat_gbk_list(dict_in_data['phone'], 0x82, 0x10, 0x60, ser10)
    creat_gbk_list(dict_in_data['company'], 0x82, 0x11, 0x40, ser10)
    creat_gbk_list(dict_in_data['name'], 0x82, 0x11, 0x60, ser10)
    creat_gbk_list(dict_in_data['company'], 0x82, 0x11, 0x40, ser10)
    creat_gbk_list(dict_in_data['name'], 0x82, 0x11, 0x60, ser10)
    creat_gbk_list(dict_in_data['ID'], 0x82, 0x10, 0x20, ser10)
    # 清理护士站的
    creat_gbk_list(dict_in_data['name'], 0x82, 0x20, 0x20, ser7)
def ReadData(ser):
    global all_range, work_state, dict_in_data, ser10,ser7
    while BOOL:
        if ser.in_waiting:
            # STRGLO = ser.read(ser.in_waiting).decode("gbk")
            STRGLO = ser.read(ser.in_waiting)
            print(STRGLO)
            print(ser)
            print("长度为:" + str(len(STRGLO)))
            if STRGLO[0] == 0x5a and STRGLO[1] == 0xa5:
                # 获取地址
                addres = STRGLO[4] << 8 | STRGLO[5]
                commd = STRGLO[3]
                recv_len = STRGLO[2]
                print("地址为:" + hex(addres) + "  长度为:" + str(recv_len) + " 命令为:" + hex(commd))
                try:
                    if recv_len == 0x06 and commd == 0x83:
                        if addres == 0x1080:
                            print("按手机号码查询")
                            # 读取手机号码数据
                            print("手机号码为:" + dict_in_data['phone'])
                            my_index = None
                            try:
                                for index, value in enumerate(all_range):
                                    xls_phone_code = str(int(value[2]))
                                    if phone_code == xls_phone_code:
                                        my_index = index
                                        break
                                if xls_phone_code == phone_code and (my_index is not None):
                                    dict_in_data['name'] = all_range[my_index][1]
                                    dict_in_data['sex'] = all_range[my_index][4]
                                    dict_in_data['company'] = all_range[my_index][5]
                                    dict_in_data['ID'] = all_range[my_index][3]
                                    creat_gbk_list(dict_in_data['sex'], 0x82, 0x11, 0x20, ser10)
                                    creat_gbk_list(dict_in_data['ID'], 0x82, 0x10, 0x20,  ser10)
                                    creat_gbk_list(dict_in_data['company'], 0x82, 0x11, 0x40, ser10)
                                    creat_gbk_list(dict_in_data['name'], 0x82, 0x11, 0x60, ser10)
                                    creat_gbk_list(dict_in_data['company'], 0x82, 0x11, 0x40, ser10)
                                    creat_gbk_list(dict_in_data['name'], 0x82, 0x11, 0x60, ser10)
                                    # 页面转移到确认页面 为什么要写两遍,BUG——
                                    DWritePort(ser10, cmd_write_to_enter_page)
                                    DWritePort(ser10, cmd_write_to_enter_page)
                            except Exception as e:
                                print("出错啦:" + str(e))
                        elif addres == 0x1090:
                            print("按身份证查询")
                            print("身份证号码为:" + dict_in_data['ID'])
                            try:
                                my_index = None
                                for index, value in enumerate(all_range):
                                    xls_ID_code = value[3]
                                    if dict_in_data['ID'] == xls_ID_code:
                                        my_index = index
                                        break
                                dict_in_data['name'] = all_range[my_index][1]
                                dict_in_data['sex'] = all_range[my_index][4]
                                dict_in_data['company'] = all_range[my_index][5]
                                dict_in_data['phone'] = str(int(all_range[my_index][2]))
                                creat_gbk_list(dict_in_data['sex'], 0x82, 0x11, 0x20, ser10)
                                creat_gbk_list(dict_in_data['phone'], 0x82, 0x10, 0x60, ser10)
                                creat_gbk_list(dict_in_data['company'], 0x82, 0x11, 0x40, ser10)
                                creat_gbk_list(dict_in_data['name'], 0x82, 0x11, 0x60, ser10)
                                creat_gbk_list(dict_in_data['company'], 0x82, 0x11, 0x40, ser10)
                                creat_gbk_list(dict_in_data['name'], 0x82, 0x11, 0x60, ser10)
                                # 页面转移到确认页面 为什么要写两遍,BUG——
                                # 页面转移到确认页面 为什么要写两遍,BUG——
                                DWritePort(ser10, cmd_write_to_enter_page)
                                DWritePort(ser10, cmd_write_to_enter_page)
                            except Exception as e:
                                print("出错啦:" + str(e))
                            # DWritePort(ser, cmd_read_ID_number)
                        elif addres == 0x1200:
                            # 增加时间,保存数据
                            print("确认登记数据")
                            # 转到等待画面
                            DWritePort(ser10, cmd_write_to_waite_page)
                            DWritePort(ser10, cmd_write_to_waite_page)
                            pythoncom.CoInitialize()
                            tub, xh = save_test_file(dict_in_data)
                            pythoncom.CoUninitialize()
                            if tub is not None:
                                creat_gbk_list(dict_in_data['name'], 0x82, 0x20, 0x20, ser7)
                                creat_gbk_list(str(int(tub)), 0x82, 0x20, 0x40, ser7)
                                creat_gbk_list(str(int(tub)), 0x82, 0x20, 0x40, ser7)
                                creat_gbk_list(str(xh), 0x82, 0x20, 0x60, ser7)
                                creat_gbk_list(str(xh), 0x82, 0x20, 0x60, ser7)
                            else:
                                # 弹出失败
                                pass
                            # 转到主页面
                        elif addres == 0x1210:
                            print('保存数据')
                            pythoncom.CoInitialize()
                            save_state = save_infor_file(dict_in_data)
                            reade_excel()
                            pythoncom.CoUninitialize()
                            DWritePort(ser, cmd_write_to_enter_page)
                        elif addres == 0x2004:
                            clear_show_data()
                            DWritePort(ser10, cmd_write_to_main_page)
                    elif commd == 0x83 and recv_len == 0x12 and addres == 0x1060:
                        print("接收到手机号码")
                        recv_phone_len = STRGLO[6]
                        if recv_phone_len == 0x07:
                            str_ph = STRGLO[7:18]
                            phone_code = str_ph.decode()
                            # 开始查找是否有手机号存在
                            dict_in_data['phone'] = phone_code
                        else:
                            print("接收到的电话号码长度不对 长度为:" + str((recv_phone_len - 8)))
                    elif commd == 0x83 and recv_len == 24 and addres == 0x1020:
                        print("接收到身份证号码")
                        recv_ID_len = STRGLO[6]
                        if recv_ID_len == 10:
                            dict_in_data['ID'] = STRGLO[7:25].decode()
                        else:
                            print("接收到的身份证长度不对 长度为:" + str((recv_ID_len - 8)))
                    elif commd == 0x83 and addres == 0x1160:
                        # 接收到姓名
                        recv_name_len = STRGLO[6] - 1
                        if 0 < recv_name_len < 5:
                            dict_in_data['name'] = (STRGLO[7:(7 + recv_name_len * 2)]).decode("gbk")
                        else:
                            print("姓名的长度不符合")
                        print(dict_in_data['name'])
                    elif commd == 0x83 and addres == 0x1140:
                        # 接收到姓名
                        recv_company_len = STRGLO[6] - 1
                        if 1 < recv_company_len < 6:
                            dict_in_data['company'] = (STRGLO[7:(7 + recv_company_len * 2)]).decode("gbk")
                        else:
                            print("单位的长度不符合")
                        print(dict_in_data['company'])
                    elif commd == 0x83 and addres == 0x1120:
                        # 接收到性别
                        recv_sex_len = STRGLO[6]
                        if recv_sex_len == 2:
                            str_sex = (STRGLO[7:9]).decode("gbk")
                            if str_sex == '男' or str_sex == '女':
                                dict_in_data['sex'] = str_sex
                            else:
                                print("性别必须为男或者女")
                                dict_in_data['sex'] = ''
                        else:
                            print("姓名的长度不符合")
                        print(dict_in_data['sex'])
                except Exception as e:
                    print(str(e))
            else:
                print("非法数据")


# 打开串口
# 端口,GNU / Linux上的/ dev / ttyUSB0 等 或 Windows上的 COM3 等
# 波特率,标准值之一:50,75,110,134,150,200,300,600,1200,1800,2400,4800,9600,19200,38400,57600,115200
# 超时设置,None:永远等待操作,0为立即返回请求结果,其他值为等待超时时间(单位为秒)
def DOpenPort(portx, bps, timeout):
    ret = False
    try:
        # 打开串口,并得到串口对象
        ser = serial.Serial(portx, bps, timeout=timeout)
        # 判断是否打开成功
        if (ser.is_open):
            ret = True
            threading.Thread(target=ReadData, args=(ser,)).start()
    except Exception as e:
        print("---异常---:", e)
    return ser, ret


# 关闭串口
def DColsePort(ser):
    global BOOL
    BOOL = False
    ser.close()


# 写数据
def DWritePort(ser, text):
    result = ser.write(text)  # 写数据
    return result


# 读数据
def DReadPort():
    global STRGLO
    str = STRGLO
    STRGLO = ""  # 清空当次读取
    return str


def reade_excel():
    global all_range
    # 循环接收数据,此为死循环,可用线程实现
    try:
        print(path)
        my_xls_path = path + '\核酸数据库.xlsx'
        my_xls = easyExcel(my_xls_path)
        cols = my_xls.getCols('Sheet1')
        print("总列数为:" + str(cols))
        rows = my_xls.getRows('Sheet1') + 1
        print("总行数为:" + str(rows))
        # 获取全部数据到内存
        all_range = my_xls.getRange('Sheet1', 2, 1, rows - 1, cols)
        my_xls.close()
    except Exception as e:
        print("出错啦:" + str(e))

# 保存基本信息数据
def save_infor_file(data):
    try:
        my_xls_path = path + '\核酸数据库.xlsx'
        save_infor_xls = easyExcel(my_xls_path)
        rows = save_infor_xls.getRows('Sheet1') + 1
        save_infor_xls.setCell('Sheet1', rows, 1, rows)  # 增加姓名
        save_infor_xls.setCell('Sheet1', rows, 2, data['name'])  # 增加姓名
        save_infor_xls.setCell('Sheet1', rows, 5, data['sex'])  # 增加性别
        save_infor_xls.setCell('Sheet1', rows, 3, data['phone'])  # 增加电话号码
        save_infor_xls.setCell('Sheet1', rows, 4, data['ID'])  # 增加身份证号
        save_infor_xls.setCell('Sheet1', rows, 6, data['company'])  # 增加单位
        save_infor_xls.save()
        save_infor_xls.close()
        return 1
    except Exception as e:
        print("保存基本信息出错:" + str(e))

# 保存采样信息数据
def save_test_file(data):
    # 先检查是否有文件,没有新建一个,有测打开
    """
    :param data: dict_in_data = {
            'tube_number': '',
            'name': '',
            'phone': '',
            'ID': '',
            'sex': '',
            'company': '',
            'test_date': ''
        }
    :return: None
    """
    try:
        my_save_xls_path = path + '\送检单.xlsx'
        save_xls = easyExcel(my_save_xls_path)
        max_row = save_xls.getRows('Sheet1')
        if max_row == 1:
            print("当天第一次写数据,初始化一下")
            # 试号管为 1
            data['tube_number'] = 1
            this_row = 1
            tube = 1
        else:
            tube = save_xls.getCell('Sheet1', max_row, 2)
            if ((max_row-1) % 10) == 0:  # 10混1
                tube = tube + 1
        # 表行号增加一行
        max_row = max_row + 1
        save_xls.setCell('Sheet1', max_row, 1, max_row - 1)  # 增加序号
        save_xls.setCell('Sheet1', max_row, 2, tube)  # 增加管号
        save_xls.setCell('Sheet1', max_row, 3, data['name'])  # 增加姓名
        save_xls.setCell('Sheet1', max_row, 4, data['sex'])  # 增加性别
        save_xls.setCell('Sheet1', max_row, 5, data['phone'])  # 增加电话号码
        save_xls.setCell('Sheet1', max_row, 6, data['ID'])  # 增加身份证号
        save_xls.setCell('Sheet1', max_row, 7, data['company'])  # 增加单位
        save_xls.setCell('Sheet1', max_row, 8, time.strftime("%Y-%m-%d", time.localtime()))  # 增加检测时间
        save_xls.save()
        save_xls.close()
        save_xls = None
        print("保存数据成功")
        xh = max_row - 1   # 修正序号
        return tube, xh
    except Exception as e:
        save_xls.close()
        print("保存数据出错:" + str(e))
        return 0


if __name__ == "__main__":
    ser10, ret = DOpenPort("COM3", 115200, None)
    if ret:  # 判断串口是否成功打开
        reade_excel()
    ser7, ret1 = DOpenPort('COM23',115200,None)
    if ret1:
        print("打开串口2成功")


【总结】现在还有一个缺点,就是读取EXCEL文件还很慢,差不多要2秒钟才能反应过来。下一步考虑是不是用数据库。还有一些细节要处理好。明天到单位再试用一下,看看效果。

最新回复

看起来挺方便的   详情 回复 发表于 2022-5-13 09:10
点赞 关注
 
 

回复
举报

7244

帖子

2

TA的资源

版主

沙发
 

再加个摄像头就和真正采样核酸差不多了!!

 
 
 

回复

2万

帖子

74

TA的资源

管理员

板凳
 

看起来挺方便的

加EE小助手好友,
入技术交流群
EE服务号
精彩活动e手掌握
EE订阅号
热门资讯e网打尽
聚焦汽车电子软硬件开发
认真关注技术本身
个人签名

加油!在电子行业默默贡献自己的力量!:)

 
 
 

回复
您需要登录后才可以回帖 登录 | 注册

查找数据手册?

EEWorld Datasheet 技术支持

相关文章 更多>>
关闭
站长推荐上一条 1/7 下一条

 
EEWorld订阅号

 
EEWorld服务号

 
汽车开发圈

About Us 关于我们 客户服务 联系方式 器件索引 网站地图 最新更新 手机版

站点相关: 国产芯 安防电子 汽车电子 手机便携 工业控制 家用电子 医疗电子 测试测量 网络通信 物联网

北京市海淀区中关村大街18号B座15层1530室 电话:(010)82350740 邮编:100190

电子工程世界版权所有 京B2-20211791 京ICP备10001474号-1 电信业务审批[2006]字第258号函 京公网安备 11010802033920号 Copyright © 2005-2025 EEWORLD.com.cn, Inc. All rights reserved
快速回复 返回顶部 返回列表