李百仪 发表于 2021-12-22 18:02

【安信可NB-IoT开发板EC-01F-Kit测评】06.基于STM32+EC-01F MQTT接入阿里云

<div class='showpostmsg'> 本帖最后由 李百仪 于 2021-12-22 21:12 编辑

<p>&nbsp;</p>

<p>上一篇介绍了系统架构:各任务均有独立的状态机,添加MQTT任务时只要新增状态机和其他任务互斥即可;</p>

<p>&nbsp;</p>

<p>MQTT任务状态机包含:查询连接状态-&gt;是否需要断开之前连接-&gt;配置mqtt接入平台-&gt;配置mqtt客户端参数-&gt;打开客户端-&gt;创建连接-&gt;订阅主题1-&gt;订阅主题2-&gt;发布主题-&gt;异常处理等</p>

<p>任务间互斥:由于每次发送AT指令前清空接收缓冲区,所以初始化任务、周期性查询NB状态任务、MQTT/Socket任务之间要互斥,否则可能导致NB回复给任务A的指令被任务B清除。</p>

<p>&nbsp;</p>

<pre>
<code class="language-cpp">void NB_MQTT_Aliyun_ReportProcess(void)
{
        //static U8 status = 0;
        U8 res = 0;
        static char ack        =        {0};
        static char cmd         =        {0};

        if((NB.CycleCheckStatus &gt; 1) || (NB.GpsReportStatus &gt; 1))
        {
                return;
        }
        if(NB.mqtt.err_code != 0)
        {
                //return;
        }
       
        switch(NB.ReportStatus)
        {
                case 0:
                        if((NB.CON == 0xFF)
                        &amp;&amp; (NB.RegStatus == 1)
                        &amp;&amp; (NB.attach == 1)
                        &amp;&amp; (NB.csq != 99)
                        &amp;&amp; (NB.csq != 0)
                        &amp;&amp; (NB.CGACT_state == 1))
                        {
                                if(NB.isConnect &gt; 0)//已经建立连接,延长处理间隔
                                {
                                        NB.ReportTime = (200*100);
                                }
                                else
                                {
                                        NB.ReportTime = (200*10);
                                }
                                NB.ReportStatus = 1;
                                uprintf("\r\n NB.ReportTime %d", NB.ReportTime);
                        }
                break;
                case 1:
                        if(NB.ReportTime == 0)
                        {
                                NB.ReportStatus = 2;
                                memset(ack, 0, 200);
                                memset(cmd,0, 100);
                                uprintf("\r\n NB.ReportTimeout");
                        }
                break;
                case 2:
                        NB.t = 23.0f;//(float)DS18B20_Get_Temp();
            if(NB.t != 85.0f)
            {
                                uprintf("\r\n NB.isConnect %d", NB.isConnect);
                                if(NB.isConnect &lt;= 0)
                          {
                                NB.ReportStatus = 31;//creat connect
                                NB.net.socket = 0;
                                NB.mqtt.tcpconnectID = 0;
                                        NB.ReportStatus = 28;//config keep_alive_time
                                NB.mqtt.keep_alive_time = 120;
                          }
                          else
                          {
                                        NB.ReportStatus = 36;
                          }
            }
                break;
                case 28://config keep_alive_time
                        sprintf(cmd, "AT+ECMTCFG=\"keepalive\",%d,%d\r\n",NB.mqtt.tcpconnectID, NB.mqtt.keep_alive_time);
                        res = NB_AT_CMD((U8 *)cmd, 1000, "OK", 2);
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTCFG OK");
                                NB.ReportStatus = 29;//Disable Sleep Mode
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTCFG_TIMEOUT");
                                NB.ReportStatus = 29;
                }
                break;
                case 29://查询连接状态
                        res = NB_AT_CMD("AT+ECMTCONN?\r\n", 1000, "+ECMTCONN:", 2);
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTCONN1 OK");
                                if(!NB_parseMqttConStatus(&amp;EC01RxCache, EC01_handle.rcv_size))
                          {
                                       
                          }
                          if(NB.mqtt.state == 3)//3 MQTT 已经连接成功
                          {
                                        NB.ReportStatus = 100;//跳转到断开连接
                          }
                          else
                          {
                                        NB.ReportStatus = 31;
                          }
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTCONN1_TIMEOUT");
                                NB.ReportStatus = 31;
                }
                break;
                case 100://断开客户端和服务器连接
                        //+ECMTDISC: 0,0
                        sprintf(cmd, "AT+ECMTDISC=%d\r\n",NB.mqtt.tcpconnectID);
                        sprintf(ack, "+ECMTDISC: %d,0",NB.mqtt.tcpconnectID);
                        res = NB_AT_CMD((U8 *)cmd, 1000, ack, 2);//Close Socket
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTDISC OK");
                                //NB.ReportStatus = 31;
                                NB.ReportStatus = 29;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTDISC_TIMEOUT");
                                NB.ReportStatus = 31;
                }
                break;
               
                #if 1
                case 30://关闭客户端
                                //+ECMTCLOSE: 0,0
                                sprintf(cmd, "AT+ECMTCLOSE=%d\r\n",NB.mqtt.tcpconnectID);
                                sprintf(ack, "+ECMTCLOSE: %d,0",NB.mqtt.tcpconnectID);
                                res = NB_AT_CMD((U8 *)cmd, 1000, ack, 2);//Close Socket
                                if(res == ATCMD_REVOK)
                                {
                                        uprintf("\r\n ECMTCLOSE OK");
                                        NB.ReportStatus = 31;
                                }
                                else if(res == ATCMD_TIMEOUT)
                                {
                                        uprintf("\r\n ECMTCLOSE_TIMEOUT");
                                        NB.ReportStatus = 31;
                                }
                break;
#endif

                case 31://配置mqtt客户端参数
                        //AT+QMTCFG="aliauth",0,"a1KApkvNNDZ","EVN_2021_4_17","cd9b161999b754c3613f1bc1baf78ea5"\r               
                        sprintf(cmd, "AT+ECMTCFG=\"aliauth\",%d,"PRODUCT_KEY","DEVICE_NAME","DEVICE_SECRET"\r\n",
                        NB.mqtt.tcpconnectID);
                        res = NB_AT_CMD((U8 *)cmd, 1000, "OK", 2);
                       
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTCFG OK");
                                NB.ReportStatus = 32;
                                //NB.isConnect = 1;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTCFG_TIMEOUT");
                                NB.ReportStatus = 0;
                }
                break;
                case 32://建立tcp连接mqtt服务器
                        //+ECMTOPEN: 0,0
                        sprintf(cmd, "AT+ECMTOPEN=%d,\"%s\",%d\r\n",
                        NB.mqtt.tcpconnectID, HOST_NAME,HOST_PORT);
                        sprintf(ack, "+ECMTOPEN: %d,",NB.mqtt.tcpconnectID);
                        res = NB_AT_CMD((U8 *)cmd, 5000, ack, 2);
                       
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTOPEN OK");
                                NB.ReportStatus = 33;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTOPEN_TIMEOUT");
                                NB.ReportStatus = 30;
                }
                break;
                case 33://登陆mqtt服务器
                        //+ECMTCONN: 0,0,0
                        NB.mqtt.result = 0;//0 打开网络成功
                        NB.mqtt.ret_code = 0;//0 接受连接
                        sprintf(cmd, "AT+ECMTCONN=%d,\"12345_33\"\r\n",
                        NB.mqtt.tcpconnectID);
                        sprintf(ack, "+ECMTCONN: %d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.result, NB.mqtt.ret_code);
                        res = NB_AT_CMD((U8 *)cmd, 5000, ack, 2);
                       
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTCONN OK");
                                NB.ReportStatus = 34;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTCONN_TIMEOUT");
                                NB.ReportStatus = 0;
                }
                break;
                case 34://订阅服务器设置
                        //+ECMTSUB: 0,1,0,1
                        NB.mqtt.tcpconnectID = 0;
                        NB.mqtt.msgID= 1;
                        NB.mqtt.qos = 0;
                        NB.mqtt.result = 0;//0 数据包发送成功且接收到服务器的 ACK
                       
                        sprintf(cmd, "AT+ECMTSUB=%d,%d,%s,%d\r\n",
                        NB.mqtt.tcpconnectID, NB.mqtt.msgID, DEVICE_SUBSCRIBE_SET, NB.mqtt.qos);

                        sprintf(ack, "+ECMTSUB: %d,%d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.result, 1);
                        res = NB_AT_CMD((U8 *)cmd, 5000, ack, 2);
                       
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTSUB1 OK");
                                NB.ReportStatus = 35;
                                NB.isConnect = 1;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTSUB1_TIMEOUT");
                                NB.ReportStatus = 0;
                }
                break;
                case 35://订阅服务器应答
                        //+ECMTSUB: 0,2,0,1
                        NB.mqtt.tcpconnectID = 0;
                        NB.mqtt.msgID= 2;
                        NB.mqtt.qos = 1;
                        NB.mqtt.result = 0;//0 数据包发送成功且接收到服务器的 ACK
                       
                        sprintf(cmd, "AT+ECMTSUB=%d,%d,%s,%d\r\n",
                        NB.mqtt.tcpconnectID, NB.mqtt.msgID, DEVICE_SUBSCRIBE_POST_RELAY, NB.mqtt.qos);

                        sprintf(ack, "+ECMTSUB: %d,%d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.result, 1);
                        res = NB_AT_CMD((U8 *)cmd, 2000, ack, 2);
                       
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTSUB2 OK");
                                NB.ReportStatus = 36;
                                NB.isConnect = 1;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTSUB2_TIMEOUT");
                                NB.ReportStatus = 0;
                }
                break;
                case 36://发布
                        //+ECMTPUB: 0,0,0
                        NB.mqtt.retain= 0;
                        NB.mqtt.msgID = 0;
                        NB.mqtt.qos = 0;
                        NB.mqtt.result = 0;//0 数据包发送成功且接收到服务器的 ACK

                       
                        #ifdef USER_206
                        sprintf((char*)payload_out, PAYLOAD, Temp.temp, MQTT.Index);
                        #elif defined USER_174
                        sprintf((char*)payload_out, PAYLOAD, Temp.temp, CO2.co2,
                        Temp.low_threshold, Temp.high_threshold, CO2.low_threshold, CO2.high_threshold,
                        CO2.fanSwitch, Temp.CoolingSwitch,
                        GeoLocation.CoordinateSystem, GeoLocation.Latitude, GeoLocation.Longitude, GeoLocation.Altitude,
                        MQTT.Index);
                        #endif
                        sprintf(cmd, "AT+ECMTPUB=%d,%d,%d,%d,%s,%s\r\n",
                        NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.qos, NB.mqtt.retain, DEVICE_PUBLISH, payload_out);
                        //uprintf("\r\n payload_out:%s", payload_out);

                        sprintf(ack, "+ECMTPUB: %d,%d,%d",NB.mqtt.tcpconnectID, NB.mqtt.msgID, NB.mqtt.result);
                        res = NB_AT_CMD((U8 *)cmd, 3000, ack, 2);
                       
                if(res == ATCMD_REVOK)
                {
                        uprintf("\r\n payload_out:%s", payload_out);
                                uprintf("\r\n ECMTPUB OK");
                                MQTT.Index++;
                                GeoLocation.Latitude += 0.00001f;
                                GeoLocation.Longitude += 0.00001f;
                                GeoLocation.Altitude++;
                                uprintf("\r\n Latitude %f Longitude %f", GeoLocation.Latitude, GeoLocation.Longitude );
                               
                                NB.ReportStatus = 0;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTPUB_TIMEOUT");
                                NB.ReportStatus = 37;
                }
                
                break;
                case 37://查询连接状态
                        /*
                        +QMTCONN: 0,3
                       
                        OK
                        */

                        res = NB_AT_CMD("AT+ECMTCONN?\r\n", 1000, "+ECMTCONN:", 2);//Close Socket
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTCONN1 OK");
                                if(!NB_parseMqttConStatus(&amp;EC01RxCache, EC01_handle.rcv_size))
                          {
                                        NB.ReportStatus = 0;
                          }
                          else
                          {
                                        NB.ReportStatus = 30;
                          }
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTCONN1_TIMEOUT");
                                NB.ReportStatus = 30;
                                NB_ClearStatus();
                                NB.CON = 0;
                }
                break;
               
                case 0xFE://关闭mqtt客户端
                        //+ECMTCLOSE: 0,0
                        sprintf(cmd, "AT+ECMTCLOSE=%d\r\n",NB.mqtt.tcpconnectID);
                        sprintf(ack, "+ECMTCLOSE: %d,0",NB.mqtt.tcpconnectID);
                        res = NB_AT_CMD((U8 *)cmd, 1000, ack, 2);//Close Socket
                if(res == ATCMD_REVOK)
                {
                                uprintf("\r\n ECMTCLOSE OK");
                                NB.net.socket = -1;
                                NB_ClearStatus();
                          NB.ReportStatus = 0xFF;
                }
                else if(res == ATCMD_TIMEOUT)
                {
                                uprintf("\r\n ECMTCLOSE_TIMEOUT");
                                NB.ReportStatus = 0;
                }
                break;
                default:
                       
                break;
        }
}</code></pre>

<p>轮询消息添加解析MQTT服务器指令:</p>

<pre>
<code class="language-cpp">void NB_Polling_Msg(void)
{
        EC01_handle.rcv_size = NB_CopyFIFOToParseBuf(EC01RxCache);
       
        if((EC01_handle.rcv_size != 0)
        &amp;&amp; (NB.CON == 0xFF))//初始化完成允许轮询消息,否则可能导致数据冲突
        {
               
                if(NB_parseDefaultMsg(&amp;EC01RxCache, EC01_handle.rcv_size))
                {
                       
                }
                if(!NB_parseMqttRECV(&amp;EC01RxCache, EC01_handle.rcv_size))//解析MQTT服务器指令
                {
                       
                }
        }       
}
/*
+QMTRECV: 0,0,"/sys/a1KApkvNND
Z/EVN_2021_4_17/thing/service/pr

*/
u8 NB_parseMqttRECV(u8* cmd, u32 len)
{
        char *ptr = NULL;
        S16 uiResult;
        char str;

        U16 uiIndexStart = 0, uiAckIndexStart = 0, uiAckIndexEnd = 0;

        S8 res = 0xFF;

        if(cmd == NULL) return 1;

        sprintf(str, "+ECMTRECV: %d,",NB.mqtt.tcpconnectID);
       
        ptr = (char*)cmd;
       
        if(!NB_strcmp(ptr,str,0,len))
        {

                if(!NB_GetHexFromStr((char*)ptr, str, ",", 0, len, &amp;uiResult))
                {
                        NB.mqtt.msgID = (S8)uiResult;
                        uprintf("\r\n NB.mqtt.msgID=%d", NB.mqtt.msgID);
                        res = 3;
                }
               
                sprintf(str, "+ECMTRECV: %d,%d,",NB.mqtt.tcpconnectID, NB.mqtt.msgID);


                while(uiAckIndexEnd &lt; (len - 2))
                {
                       
                        if(!NB_GetStrFromStr2((char*)ptr, str, "\r\n", uiIndexStart, len, (char *)NB.net.rxdata, &amp;uiAckIndexStart, &amp;uiAckIndexEnd))
                        {
                                if(!NB_strcmp(( char*)NB.net.rxdata, "\"fanSwitch\":1", 0, 200))
                                {
                                        CO2.fanSwitch = 1;
                                        GPIO_ResetBits(GPIOD, GPIO_Pin_2);
                                        uprintf("\r\n fanSwitch=1");
                                }
                                if(!NB_strcmp(( char*)NB.net.rxdata, "\"fanSwitch\":0", 0, 200))
                                {
                                        CO2.fanSwitch = 0;
                                        GPIO_SetBits(GPIOD, GPIO_Pin_2);
                                        uprintf("\r\n fanSwitch=0");
                                }
                                if(!NB_strcmp(( char*)NB.net.rxdata, "\"CoolingSwitch\":1", 0, 200))
                                {
                                        Temp.CoolingSwitch = 1;
                                        GPIO_ResetBits(GPIOD, GPIO_Pin_2);
                                        uprintf("\r\n CoolingSwitch=1");
                                }
                                if(!NB_strcmp(( char*)NB.net.rxdata, "\"CoolingSwitch\":0", 0, 200))
                                {
                                        Temp.CoolingSwitch = 0;
                                        GPIO_SetBits(GPIOD, GPIO_Pin_2);
                                        uprintf("\r\n CoolingSwitch=0");
                                }
                                res = 0;
                        }
                        else
                        {
                                break;
                        }
                        uiIndexStart = uiAckIndexEnd;
                }
               
        }
       
        return res;
}</code></pre>

<p>运行效果手机APP远程控制LED:</p>

<p><iframe allowfullscreen="true" frameborder="0" height="510" src="https://training.eeworld.com.cn/shareOpenCourseAPI?isauto=true&amp;lessonid=32080" style="background:#eee;margin-bottom:10px;" width="550"></iframe><br />
至此,本次测评告一段落。</p>

<p>还有一些问题待解决。</p>

<p><a href="https://bbs.eeworld.com.cn/thread-1189547-1-1.html#pid3111773" target="_blank">https://bbs.eeworld.com.cn/thread-1189547-1-1.html#pid3111773</a></p>

<p>&nbsp;</p>
</div><script>                                        var loginstr = '<div class="locked">查看本帖全部内容,请<a href="javascript:;"   style="color:#e60000" class="loginf">登录</a>或者<a href="https://bbs.eeworld.com.cn/member.php?mod=register_eeworld.php&action=wechat" style="color:#e60000" target="_blank">注册</a></div>';
                                       
                                        if(parseInt(discuz_uid)==0){
                                                                                                (function($){
                                                        var postHeight = getTextHeight(400);
                                                        $(".showpostmsg").html($(".showpostmsg").html());
                                                        $(".showpostmsg").after(loginstr);
                                                        $(".showpostmsg").css({height:postHeight,overflow:"hidden"});
                                                })(jQuery);
                                        }                </script><script type="text/javascript">(function(d,c){var a=d.createElement("script"),m=d.getElementsByTagName("script"),eewurl="//counter.eeworld.com.cn/pv/count/";a.src=eewurl+c;m.parentNode.insertBefore(a,m)})(document,523)</script>

Jacktang 发表于 2021-12-23 07:33

<p>在添加MQTT任务时,需要保证新增状态机和其他任务互斥,添加新增状态时有什么技巧,有时不互斥怎么处理</p>

李百仪 发表于 2021-12-23 13:48

Jacktang 发表于 2021-12-23 07:33
在添加MQTT任务时,需要保证新增状态机和其他任务互斥,添加新增状态时有什么技巧,有时不互斥怎么处理

<p>1.添加状态机技巧:规划新任务流程图,流程上评审没问题,翻译成代码即可。</p>

<p>2.任务不互斥需要在使用公共资源时添加互斥,RTOS也是这样处理的。</p>

Jacktang 发表于 2021-12-23 19:44

李百仪 发表于 2021-12-23 13:48
1.添加状态机技巧:规划新任务流程图,流程上评审没问题,翻译成代码即可。

2.任务不互斥需要在使用公 ...

<p>明白了一些,谢谢答复</p>
页: [1]
查看完整版本: 【安信可NB-IoT开发板EC-01F-Kit测评】06.基于STM32+EC-01F MQTT接入阿里云