新鲜出炉的MQTT库
================

作者:陈之敏 时间:2020年08月15日
关键字:csdk、RDA8910、二次开发、MQTT、PAHO

目录
----

`点击这里查看所有博文 <https://blog.csdn.net/weixin_44570083/article/details/104285283>`__

  最近听说有的小伙伴看了我的教程后,有一些问题都跑到官方的gitee上面去问去了。导致官方的人没搞懂问的是啥,小伙伴们也没能知道自己想要的答案。给大家造成了困扰,这里我说声抱歉。

  既然出现了这个问题,我这里就声明一下,本系列教程所涉及的内容(demo)不是官方的作品。我个人觉得官方的demo内容太多太全,往往都是把一个模块内所有的东西全部放在一起。这样的话对新手不是很友好,阅读起来也比较费劲。我就把官方的部分demo进行相关的简化,并推出教程这样的话可能会对新手朋友们有一定的帮助。

  有时候周末闲暇时间我也会加上一些我觉得好玩的模块在里面,这些可能在官方的demo都没有,比如cJSON、PAHO-MQTT、http-client。

  这就是官方的代码仓库。

.. code:: c

   git clone --recursive https://gitee.com/openLuat/Luat_CSDK_Air724U.git

  当然各位小伙伴在看本教程时,我建议还是使用我下面提供的仓库比较好,看完之后在迁移到官方的仓库⇧。

.. code:: c

   git clone --recursive https://gitee.com/chenxiahuaxu/RDA8910_CSDK.git

  再看本教程过程中如果遇到了问题,可以在本人的代码仓库下面评论。也可以在本人的\ `博客 <https://blog.csdn.net/weixin_44570083/article/details/104285283>`__\ 下面评论。我要是看到的话,并且这个问题在我的能力范围的话我会尽力解答的(非官方,不要对我要求太多哦,要求太多我可能就不管啦)。

一、前言
--------

  MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
|在这里插入图片描述|
  物联网应用中都离不开MQTT。玩物联网的人不会用MQTT那就是没有灵魂。目前官方提供的CSDK也是\ ``没有``\ MQTT库的,所以为了大家,在下花了一点时间移植了一套开源的用于嵌入式领域的MQTT库。
|image1|

  有人讲你那个HTTP库都自己写了,这个MQTT库怎么不自己写。这里声明一下HTTP的库不是我想自己写的,是因为我没找到合适的开源的HTTP库,移植起来很困难。被逼无奈自己写了一个简单的请求库,用在嵌入式平台正常情况下也够用了。
|image2|

  闲话少说,开始干正事。 ## 二、编写测试程序 ##
2.1、了解本例程所用到的函数
  使用MQTT服务需要包含\ ``#include "MQTTClient.h""``\ 头文件,我们这里只用到了7个函数,分别是:
>/**新的网络设置 > @param
n:指向网络结构的指针,包含网络的配置信息。\ @return 无**/

-  void ``NewNetwork``\ (Network \*n)

..

   /*\*\ *@简短连接网络功能*\ @param n:指向网络结构的指针
   *包含网络的配置信息。*\ ip:服务器IP。 \*port:服务器端口。 \**/

-  int ``ConnectNetwork``\ (Network *n, char*\ ip, int port)

..

   /*创建一个MQTT客户端对象*\ @参数客户端 *@参数网络*\ @param
   command_timeout_ms \*@参数 \**/

-  DLLExport void ``MQTTClientInit``\ (MQTTClient\* client, Network\*
   network, unsigned int command_timeout_ms,unsigned char\* sendbuf,
   size_t sendbuf_size, unsigned char\* readbuf, size_t readbuf_size);

..

   /*MQTT
   Connect-在网络上发送MQTT连接数据包并等待Connack*\ 调用此对象之前,必须将nework对象连接到网络端点
   *@param选项-连接选项*\ @返回成功代码 \**/

-  DLLExport int ``MQTTConnect``\ (MQTTClient\* client,
   MQTTPacket_connectData\* options);

..

   /*MQTT订阅-发送MQTT订阅数据包,并在返回之前等待suback。*\ @param
   client-要使用的客户端对象 *@param
   topicFilter-要订阅的主题过滤器*\ @param消息-要发送的消息
   \*@返回成功代码 \**/

-  DLLExport int ``MQTTSubscribe``\ (MQTTClient\* client, const char\*
   topicFilter, enum QoS, messageHandler);

..

   /*MQTT发布-发送MQTT发布数据包,并等待所有ack完成所有QoS*\ @param
   client-要使用的客户端对象
   *@param主题-要发布到的主题*\ @param消息-要发送的消息 \*@返回成功代码
   \**/

-  DLLExport int ``MQTTPublish``\ (MQTTClient\* client, const char\ *,
   MQTTMessage*);

..

   /*需要死循环调用,这个函数负责库内部一些事件处理。*\ @param
   client-要使用的客户端对象 *@param
   time-产生的时间(以毫秒为单位)*\ @返回成功代码 \**/

-  DLLExport int ``MQTTYield``\ (MQTTClient\* client, int time);

2.2、编写初始化程序
-------------------

  初始化程序负责建立socket套接字、连接网络、初始化MQTT对象、连接MQTT服务这几件事

.. code:: c

           NewNetwork(&myNet);
           ConnectNetwork(&myNet, MQTT_IP, MQTT_PORT);
           MQTTClientInit(&myclient, &myNet, 1000, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf));
           MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
           data.willFlag = 0;
           data.MQTTVersion = 3;
           data.clientID.cstring = (char *)"RDA8910 MQTT Test";
           data.username.cstring = "Air724UG";
           data.password.cstring = NULL;
           data.keepAliveInterval = 120;
           data.cleansession = 1;
           int rc = MQTTConnect(&myclient, &data);

2.3、编写订阅消息程序
---------------------

  订阅程序分成两块。第一块是向服务器发送订阅消息。

.. code:: c

           rc = MQTTSubscribe(&myclient, "FX", QOS1, MQTT_CB);
           iot_debug_print("[MQTT_Test] Subscribed %d\r\n", rc);

  第二块是根据服务器下发的订阅消息,解析报文。

.. code:: c

   void MQTT_CB(MessageData *md)
   {
       MQTTMessage *message = md->message;
       uint32 datalen = (int)message->payloadlen;

       uint8 data[20] = {0};
       memcpy(data, (char *)message->payload, datalen);

       iot_debug_print("[MQTT_Test] topicName:%s,data:%s,datalen:%d", md->topicName, data, datalen);
   }

2.4、编写发布消息程序
---------------------

  在任务内循环发布不同的消息,5s发一次。

.. code:: c

   static void Publish(void *param)
   {
       char PubData[20] = {0};
       uint8 num = 0;
       uint8 datalen = 0;
       MQTTMessage message = {0};
       message.qos = QOS0;
       while (1)
       {
           datalen = sprintf(PubData, "RDA8910 Sent:%d", num);
           PubData[datalen] = '\0';
           message.payload = PubData;
           message.payloadlen = datalen;
           MQTTPublish(&myclient, "RDA8910", &message);
           num += 1;
           iot_os_sleep(5000);
       }
   }

三、编译并下载程序
------------------

  完整代码中上线会订阅\ ``FX``\ 这个主题,紧接着直接向\ ``RDA8910``\ 这个主题发送一条\ ``RDA8910 Online!``\ ,随后会定时发送其他的消息。
  完整代码在这,自取。

.. code:: c

   /*
    * @Author: your name
    * @Date: 2020-05-19 14:05:32
    * @LastEditTime: 2020-06-02 11:22:21
    * @LastEditors: Please set LastEditors
    * @Description: In User Settings Edit
    * @FilePath: \RDA8910_CSDK\USER\user_main.c
    */

   #include "string.h"
   //#include "cs_types.h"

   #include "osi_log.h"
   #include "osi_api.h"

   #include "am_openat.h"
   #include "am_openat_vat.h"
   #include "am_openat_common.h"

   #include "iot_debug.h"
   #include "iot_uart.h"
   #include "iot_os.h"
   #include "iot_gpio.h"
   #include "iot_pmd.h"
   #include "iot_adc.h"
   #include "iot_vat.h"
   #include "iot_network.h"
   #include "iot_socket.h"

   //#include "http_client.h"
   #include "MQTTClient.h"

   HANDLE TestTask_HANDLE = NULL;
   uint8 NetWorkCbMessage = 0;

   #define MQTT_IP "换成自己的ip"   //例如192.168.0.1
   #define MQTT_PORT 1883      //自己改端口

   Network myNet;
   MQTTClient myclient;
   unsigned char sendbuf[1024] = {0};
   unsigned char recvbuf[1024] = {0};

   void MQTT_CB(MessageData *md)
   {
       MQTTMessage *message = md->message;
       uint32 datalen = (int)message->payloadlen;

       uint8 data[20] = {0};
       memcpy(data, (char *)message->payload, datalen);

       iot_debug_print("[MQTT_Test] topicName:%s,data:%s,datalen:%d", md->topicName, data, datalen);
   }
   static void Publish(void *param)
   {
       char PubData[20] = {0};
       uint8 num = 0;
       uint8 datalen = 0;
       MQTTMessage message = {0};
       message.qos = QOS0;
       while (1)
       {
           datalen = sprintf(PubData, "RDA8910 Sent:%d", num);
           PubData[datalen] = '\0';
           message.payload = PubData;
           message.payloadlen = datalen;
           MQTTPublish(&myclient, "RDA8910", &message);
           num += 1;
           iot_os_sleep(5000);
       }
   }
   static void TestTask(void *param)
   {
       bool NetLink = FALSE;
       while (NetLink == FALSE)
       {
           T_OPENAT_NETWORK_CONNECT networkparam = {0};
           switch (NetWorkCbMessage)
           {
           case OPENAT_NETWORK_DISCONNECT: //网络断开 表示GPRS网络不可用澹,无法进行数据连接,有可能可以打电话
               iot_debug_print("[socket] OPENAT_NETWORK_DISCONNECT");
               iot_os_sleep(10000);
               break;
           case OPENAT_NETWORK_READY: //网络已连接 表示GPRS网络可用,可以进行链路激活
               iot_debug_print("[socket] OPENAT_NETWORK_READY");
               memcpy(networkparam.apn, "CMNET", strlen("CMNET"));
               //建立网络连接,实际为pdp激活流程
               iot_network_connect(&networkparam);
               iot_os_sleep(500);
               break;
           case OPENAT_NETWORK_LINKED: //链路已经激活 PDP已经激活,可以通过socket接口建立数据连接
               iot_debug_print("[socket] OPENAT_NETWORK_LINKED");
               NetLink = TRUE;
               break;
           }
       }
       if (NetLink == TRUE)
       {
           NewNetwork(&myNet);
           ConnectNetwork(&myNet, MQTT_IP, MQTT_PORT);
           MQTTClientInit(&myclient, &myNet, 1000, sendbuf, sizeof(sendbuf), recvbuf, sizeof(recvbuf));
           MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
           data.willFlag = 0;
           data.MQTTVersion = 3;
           data.clientID.cstring = (char *)"RDA8910 MQTT Test";
           data.username.cstring = "Air724UG";
           data.password.cstring = NULL;
           data.keepAliveInterval = 120;
           data.cleansession = 1;
           int rc = MQTTConnect(&myclient, &data);
           iot_debug_print("[MQTT_Test] Connected %d\r\n", rc);
           rc = MQTTSubscribe(&myclient, "FX", QOS1, MQTT_CB);
           iot_debug_print("[MQTT_Test] Subscribed %d\r\n", rc);

           MQTTMessage message = {0};
           message.qos = QOS0;
           message.payload = "RDA8910 Online!";
           message.payloadlen = sizeof("RDA8910 Online!");
           MQTTPublish(&myclient, "RDA8910", &message);
           iot_os_create_task(Publish, NULL, 2048, 10, OPENAT_OS_CREATE_DEFAULT, "Publish");
           while (1)
           {
               MQTTYield(&myclient, 500);
               iot_os_sleep(500);
           }
       }
       iot_os_delete_task(TestTask_HANDLE);
   }
   static void NetWorkCb(E_OPENAT_NETWORK_STATE state)
   {
       NetWorkCbMessage = state;
   }
   //main函数
   int appimg_enter(void *param)
   {
       //系统休眠
       iot_os_sleep(10000);
       //注册网络状态回调函数
       iot_network_set_cb(NetWorkCb);
       //创建一个任务
       //TestTask_HANDLE =
       TestTask_HANDLE = iot_os_create_task(TestTask, NULL, 2048, 10, OPENAT_OS_CREATE_DEFAULT, "TestTask");
       return 0;
   }

   //退出提示
   void appimg_exit(void)
   {
       OSI_LOGI(0, "application image exit");
   }

四、分析结果
------------

  我这里使用\ ``MQTT.fx``\ 测试。使用MQTT.fx订阅\ ``RDA8910``\ 这个主题,并且随机向FX这个主题发布消息。
  可以看到MQTT.fx收到了开发板发布的消息。 |image3|
  开发板也能收到MQTT.fx发布的消息。 |image4|
  通过EMQ的控制台,看到开发板\ ``11:01``\ 开始上线。现在时间看右小角是\ ``11:55``\ ,运行了接近一个小时。也没出现掉线现象。稳定性也还是可以的。
|image5|

五、总结
--------

  除了上面我们使用的函数外。MQTT还具有以下其他的库。这里做简略介绍 >
/*MQTT
Connect-在网络上发送MQTT连接数据包并等待Connack*\ 调用此对象之前,必须将nework对象连接到网络端点
*@param选项-连接选项*\ @返回成功代码 \**/

-  DLLExport int ``MQTTConnectWithResults``\ (MQTTClient\* client,
   MQTTPacket_connectData\* options, MQTTConnackData\* data);

..

   /*MQTT SetMessageHandler-设置或删除每个主题的消息处理程序*\ @param
   client-要使用的客户端对象 *@param
   topicFilter-主题过滤器为设置消息处理程序*\ @param
   messageHandler-指向消息处理程序函数的指针或要删除的NULL
   \*@返回成功代码 \**/

-  DLLExport int ``MQTTSetMessageHandler``\ (MQTTClient\* c, const
   char\* topicFilter, messageHandler messageHandler);

..

   /*MQTT订阅-发送MQTT订阅数据包,并在返回之前等待suback。*\ @param
   client-要使用的客户端对象 *@param
   topicFilter-要订阅的主题过滤器*\ @param消息-要发送的消息
   *@param数据-子包授予的QoS返回*\ @返回成功代码 \**/

-  DLLExport int ``MQTTSubscribeWithResults``\ (MQTTClient\* client,
   const char\* topicFilter, enum QoS, messageHandler, MQTTSubackData\*
   data);

..

   /*MQTT订阅-发送MQTT取消订阅数据包并等待取消回复,然后再返回。*\ @param
   client-要使用的客户端对象 *@param
   topicFilter-要取消订阅的主题过滤器*\ @返回成功代码 \**/

-  DLLExport int ``MQTTUnsubscribe``\ (MQTTClient\* client, const char\*
   topicFilter);

..

   /*MQTT断开连接-发送MQTT断开连接数据包并关闭连接*\ @param
   client-要使用的客户端对象 \*@返回成功代码 \**/

-  DLLExport int ``MQTTDisconnect``\ (MQTTClient\* client); >
   /*MQTT已连接*\ @param client-要使用的客户端对象
   \*@return真值,指示客户端是否连接到服务器 \**/

-  DLLExport int ``MQTTIsConnected``\ (MQTTClient\* client);

..

   不会下载的\ `点击这里 <https://blog.csdn.net/weixin_44570083/article/details/104285283>`__\ ,进去查看我的\ ``RDA8910 CSDK二次开发入门教程``\ 专题第一篇博文\ ``1、RDA8910CSDK二次开发:环境搭建``\ 里面讲了怎么下载
   这里只是我的学习笔记,拿出来给大家分享,欢迎大家批评指正,本篇教程到此结束

.. |在这里插入图片描述| image:: https://img-blog.csdnimg.cn/20200602104417908.png
.. |image1| image:: https://img-blog.csdnimg.cn/2020060210575885.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NDU3MDA4Mw==,size_16,color_FFFFFF,t_70
.. |image2| image:: https://img-blog.csdnimg.cn/20200602105436132.png
.. |image3| image:: https://img-blog.csdnimg.cn/20200602115220133.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NDU3MDA4Mw==,size_16,color_FFFFFF,t_70
.. |image4| image:: https://img-blog.csdnimg.cn/20200602115324287.png
.. |image5| image:: https://img-blog.csdnimg.cn/20200602115553862.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80NDU3MDA4Mw==,size_16,color_FFFFFF,t_70