ZenZ 1 ano atrás
pai
commit
d8c979d4ec

+ 5 - 1
DataConsumer/TDengine/TDengine.cpp

@@ -1,6 +1,10 @@
 #include "TDengine.h"
 #include <QDebug>
-TDengine::TDengine() {}
+#include "TDengineClient.h"
+TDengine::TDengine() {
+    TDengineClient* tdclient = new TDengineClient();
+    tdclient->start();
+}
 
 void TDengine::Run()
 {

+ 7 - 0
DataConsumer/TDengine/TDengine.pro

@@ -20,6 +20,13 @@ HEADERS += \
     TDengine_global.h \
     TDengine.h
 
+TDENGINE_DIR =$$PWD/../../thirdparty/tdengine
+LIBS += -L$${TDENGINE_DIR}/lib -ltaos
+INCLUDEPATH += $${TDENGINE_DIR}/include
+INCLUDEPATH += $$PWD/../../modules/TDengineClient
+LIBS +=-L$$PWD/../../bin/plugins -lTDengineClient
+
+
 # Default rules for deployment.
 unix {
     target.path = /usr/lib

+ 10 - 0
DataManagerMain/DataManagerMain.pro

@@ -29,6 +29,16 @@ SOURCES += \
         DataSubscribe.cpp \
         main.cpp
 
+HIREDIS_DIR =$$PWD/../thirdparty/hiredis
+HEADERS += $${HIREDIS_DIR}/include/hiredis/adapters/qt.h
+
+LIBS += -L$${HIREDIS_DIR}/lib -lhiredis
+#LIBS += ../thirdparty/hiredis/lib/libhiredis.a
+INCLUDEPATH += $${HIREDIS_DIR}/include
+
+INCLUDEPATH += $$PWD/../modules/RedisClient
+LIBS +=-L$$PWD/../bin/plugins -lRedisClient
+
 TRANSLATIONS += \
     DataManager_zh_CN.ts
 CONFIG += lrelease

+ 42 - 1
DataManagerMain/DataManagerProxy.cpp

@@ -1,5 +1,5 @@
 #include "DataManagerProxy.h"
-
+#include "RedisClient.h"
 DataManagerProxy::DataManagerProxy() {
 
 }
@@ -35,7 +35,48 @@ DataManagerInfo DataManagerProxy::loadModuleInfos(int id, std::string app)
     objDevice.ModuleInfo = objModule;
     objDevice.Properties.push_back(objDataItem);
 */
+    RedisClient* redis = new RedisClient();
+
+    /*DeviceInfo di;
+    if( redis != nullptr)
+    {
+        ModuleInfo mi;
+        mi.Code = "alarm";
+        mi.AssemblyName = app;
+
+        DataItem oDT;
+        oDT.Code = "data";
+        mi.Properties.push_back(oDT);
+        QStringList lst = redis->hvals(id.c_str());
+        foreach (QString str, lst)
+        {
+            if( str.isEmpty() )
+            {
+                continue;
+            }
+
+            AlarmRuler ar;
+            if( parse(str,ar) )
+            {
+                QString szJson;
+                szJson += "{";
+                szJson += QString("\"Operator\":\"%1\",").arg(ar.Operator.c_str());
+                szJson += QString("\"Operator1\":%1,").arg(ar.dbOperator1);
+                szJson += QString("\"Operator2\":%1,").arg(ar.dbOperator2);
+                szJson += QString("\"KeepTimes\":%1").arg(ar.nKeepTimes);
+                szJson += "}";
+                Setting oSet;
+                oSet.Name = ar.NodeCode;
+                oSet.qValue = szJson;
+
+                mi.vSettings.push_back(oSet);
+            }
+        }
+        di.ModuleInfo = mi;
+    }
 
+    return di;
+    */
     ModuleInfo mi;
     mi.Name = "RedisSubscriber";
 

+ 4 - 1
DataManater.pro

@@ -4,9 +4,12 @@ CONFIG += ordered
 SUBDIRS += \
     #DataConsumer \
     #DataSubscribe \
+    modules/RedisClient \
+    modules/TDengineClient \
     DataConsumer/TDengine \
     DataManagerMain/DataManagerMain.pro \
-    DataSubscribe/RedisSubscriber
+    DataSubscribe/RedisSubscriber   \
+    DataSubscribe/TDengineSubscriber
     
 
 

+ 7 - 4
DataSubscribe/RedisSubscriber/RedisSubscriber.cpp

@@ -47,16 +47,19 @@ static void fnRedisCallback(redisAsyncContext* ctx, void* r, void* data){
 
 void RedisSubscriber::enqueue(const QString &key, const QString &val)
 {
+    emit pubData("redis", key, val);
+    /*
     mutex.lock();
     msgQueue.push_back({key, val});
     cond.wakeAll();
     mutex.unlock();
+    */
 }
 
 void RedisSubscriber::Run()
 {
     //subscribe redis topic
-    redis = new Redis();
+    redis = new RedisClient();
     redis->start();
     redis->subscribe(REDISTOPIC, fnRedisCallback, this);
     start();
@@ -71,7 +74,7 @@ void RedisSubscriber::run(){
     QThread::msleep(1000);
     //redis message emit to onData signal.
     while(!isInterruptionRequested()){
-        QString usr = "Redis";
+        /*QString usr = "Redis";
         QString key = "key";
         QVariant val = "val";
         Msg msg;
@@ -84,14 +87,14 @@ void RedisSubscriber::run(){
         };
 
         mutex.unlock();
-
+        */
         /*if(!key.isEmpty()){
             emit pubData(usr, key, val);
 
             key.clear();
         }*/
 
-        //QThread::msleep(1000);
+        QThread::msleep(1000);
     }
 }
 

+ 2 - 2
DataSubscribe/RedisSubscriber/RedisSubscriber.h

@@ -1,7 +1,7 @@
 #pragma once
 #include "RedisSubscriber_global.h"
 #include "Publisher.h"
-#include "Redis.h"
+#include "RedisClient.h"
 #include <QQueue>
 #include <QMutex>
 #include <QWaitCondition>
@@ -23,7 +23,7 @@ protected:
 signals:
     void pubData(const QString& ,const QString& ,const QVariant&);
 private:
-    Redis* redis;
+    RedisClient* redis;
     QMutex mutex;
     QWaitCondition cond;
     QQueue<Msg> msgQueue;

+ 5 - 3
DataSubscribe/RedisSubscriber/RedisSubscriber.pro

@@ -13,8 +13,8 @@ INCLUDEPATH += ../../include
 
 HEADERS += \
     ../../include/LibraryLoader.h \
-    ../../include/Publisher.h \
-    Redis.h
+    ../../include/Publisher.h
+    #Redis.h
 
 HIREDIS_DIR =$$PWD/../../thirdparty/hiredis
 HEADERS += $${HIREDIS_DIR}/include/hiredis/adapters/qt.h
@@ -24,13 +24,15 @@ LIBS += -L$${HIREDIS_DIR}/lib -lhiredis
 INCLUDEPATH += $${HIREDIS_DIR}/include
 
 SOURCES += \
-    Redis.cpp \
+    #Redis.cpp \
     RedisSubscriber.cpp
 
 HEADERS += \
     RedisSubscriber_global.h \
     RedisSubscriber.h
 
+INCLUDEPATH += $$PWD/../../modules/RedisClient
+LIBS +=-L$$PWD/../../bin/plugins -lRedisClient
 
 # Default rules for deployment.
 unix {

+ 80 - 0
DataSubscribe/TDengineSubscriber/TDengineSubscriber.cpp

@@ -0,0 +1,80 @@
+#include "TDengineSubscriber.h"
+#include <QDebug>
+TDengineSubscriber::TDengineSubscriber() {
+
+}
+
+
+#define REDISTOPIC "test"
+
+static void fnRedisCallback(const char* topic, const char* data, void* usr){
+
+    TDengineSubscriber* subscriber = static_cast<TDengineSubscriber*>(usr);
+    subscriber->enqueue(topic, data);
+}
+
+void TDengineSubscriber::enqueue(const QString &key, const QString &val)
+{
+    emit pubData("tdengine", key, val);
+    /*mutex.lock();
+    msgQueue.push_back({key, val});
+    cond.wakeAll();
+    mutex.unlock();*/
+}
+
+void TDengineSubscriber::Run()
+{
+    //subscribe redis topic
+    TDengine = new TDengineClient();
+    TDengine->start();
+    TDengine->subscribe(REDISTOPIC, fnRedisCallback, this);
+    start();
+}
+
+void TDengineSubscriber::setLoader(QLibrary *)
+{
+
+}
+
+void TDengineSubscriber::run(){
+    QThread::msleep(1000);
+    //redis message emit to onData signal.
+    while(!isInterruptionRequested()){
+        /*QString usr = "Redis";
+        QString key = "key";
+        QVariant val = "val";
+        Msg msg;
+        mutex.lock();
+        if(cond.wait(&mutex), 500)
+        {
+            msg = msgQueue.dequeue();
+            emit pubData(usr, msg.key, msg.val);
+            qDebug() << __FILE__ << __FUNCTION__;
+        }
+
+        mutex.unlock();
+        */
+        /*if(!key.isEmpty()){
+            emit pubData(usr, key, val);
+
+            key.clear();
+        }*/
+
+        QThread::msleep(1000);
+    }
+}
+
+
+Publisher* instance()
+{
+    return new TDengineSubscriber();
+}
+
+void destroy(Publisher* pInstance)
+{
+    if( pInstance )
+    {
+        delete pInstance;
+    }
+}
+

+ 36 - 0
DataSubscribe/TDengineSubscriber/TDengineSubscriber.h

@@ -0,0 +1,36 @@
+#pragma once
+#include "TDengineSubscriber_global.h"
+#include "Publisher.h"
+#include "TDengineClient.h"
+#include <QQueue>
+#include <QMutex>
+#include <QWaitCondition>
+struct Msg{
+    QString key;
+    QString val;
+};
+class TDENGINESUBSCRIBER_EXPORT TDengineSubscriber : public Publisher
+{
+    Q_OBJECT
+public:
+    TDengineSubscriber();
+    // virtual void shares(SharedData * share);
+    virtual void Run();
+    virtual void setLoader(QLibrary*);
+    void enqueue(const QString& key,const QString& val);
+protected:
+    virtual void run();
+signals:
+    void pubData(const QString& ,const QString& ,const QVariant&);
+private:
+    TDengineClient* TDengine;
+    QMutex mutex;
+    QWaitCondition cond;
+    QQueue<Msg> msgQueue;
+};
+
+
+extern "C" {//一定要添加上
+TDENGINESUBSCRIBER_EXPORT Publisher* instance();
+TDENGINESUBSCRIBER_EXPORT void destroy(Publisher*);
+}

+ 40 - 0
DataSubscribe/TDengineSubscriber/TDengineSubscriber.pro

@@ -0,0 +1,40 @@
+QT -= gui
+
+TEMPLATE = lib
+DEFINES += TDENGINESUBSCRIBER_LIBRARY
+
+CONFIG += c++17
+
+# You can make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+INCLUDEPATH += ../../include
+
+HEADERS += \
+    ../../include/LibraryLoader.h \
+    ../../include/Publisher.h
+    #Redis.h
+
+SOURCES += \
+    TDengineSubscriber.cpp
+    #Redis.cpp \
+
+HEADERS += \
+    TDengineSubscriber.h \
+    TDengineSubscriber_global.h
+
+TDENGINE_DIR =$$PWD/../../thirdparty/tdengine
+LIBS += -L$${TDENGINE_DIR}/lib -ltaos
+INCLUDEPATH += $${TDENGINE_DIR}/include
+INCLUDEPATH += $$PWD/../../modules/TDengineClient
+LIBS +=-L$$PWD/../../bin/plugins -lTDengineClient
+
+
+# Default rules for deployment.
+unix {
+    target.path = /usr/lib
+}else{
+    DESTDIR = $$PWD/../../bin/plugins
+}
+!isEmpty(target.path): INSTALLS += target

+ 12 - 0
DataSubscribe/TDengineSubscriber/TDengineSubscriber_global.h

@@ -0,0 +1,12 @@
+#ifndef TDENGINESUBSCRIBER_GLOBAL_H
+#define TDENGINESUBSCRIBER_GLOBAL_H
+
+#include <QtCore/qglobal.h>
+
+#if defined(TDENGINESUBSCRIBER_LIBRARY)
+#define TDENGINESUBSCRIBER_EXPORT Q_DECL_EXPORT
+#else
+#define TDENGINESUBSCRIBER_EXPORT Q_DECL_IMPORT
+#endif
+
+#endif // TDENGINESUBSCRIBER_GLOBAL_H

+ 23 - 23
DataSubscribe/RedisSubscriber/Redis.cpp → modules/RedisClient/RedisClient.cpp

@@ -1,4 +1,4 @@
-#include "redis.h"
+#include "RedisClient.h"
 #include <QThread>
 #include <QTimer>
 
@@ -103,19 +103,19 @@ void disconnectCallback(const redisAsyncContext *c, int status) {
     qDebug("Disconnected...\n");
 }
 
-Redis::Redis(QObject *parent) : QObject(parent)
+RedisClient::RedisClient(QObject *parent) : QObject(parent)
 {
     redisOk = false;
 //    conn();
 }
 
-Redis::~Redis()
+RedisClient::~RedisClient()
 {
     delete rc;
     delete rac;
 }
 
-void Redis::conn()
+void RedisClient::conn()
 {
     if (!redisOk) //未连接
     {
@@ -147,7 +147,7 @@ void Redis::conn()
     }
 }
 
-void Redis::start()
+void RedisClient::start()
 {
     conn();
     //开启一个定时器, 每秒检测一次是否断开, 断开后自动连接redis服务器
@@ -156,7 +156,7 @@ void Redis::start()
     redisTimer->start(1000);
 }
 
-bool Redis::hset(QString m, QString k, QString v)
+bool RedisClient::hset(QString m, QString k, QString v)
 {
     //如果 field 是哈希表中的一个新建域,并且值设置成功,reply->integer为1
     //如果哈希表中域 field 已经存在且旧值已被新值覆盖,reply->integer为0
@@ -176,7 +176,7 @@ bool Redis::hset(QString m, QString k, QString v)
     return ret;
 }
 
-QString Redis::hget(QString m, QString k)
+QString RedisClient::hget(QString m, QString k)
 {
     QString ret;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("hget " + m + " " + k).toStdString().c_str());
@@ -198,7 +198,7 @@ QString Redis::hget(QString m, QString k)
 }
 
 //向队列尾(右)部加入字符串数据
-bool Redis::rpush(QString lData, QString js)
+bool RedisClient::rpush(QString lData, QString js)
 {
     bool ret = false;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str());
@@ -220,7 +220,7 @@ bool Redis::rpush(QString lData, QString js)
 }
 
 //向队列尾(右)部加入二进制数据
-bool Redis::rpushb(QString lData, QByteArray ba)
+bool RedisClient::rpushb(QString lData, QByteArray ba)
 {
     bool ret = false;
     const char *arg[3]; //3个参数(cmd, k, v)
@@ -253,7 +253,7 @@ bool Redis::rpushb(QString lData, QByteArray ba)
     return ret;
 }
 
-QString Redis::lpop(QString lData)
+QString RedisClient::lpop(QString lData)
 {
     QString ret;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("lpop " + lData).toStdString().c_str());
@@ -270,7 +270,7 @@ QString Redis::lpop(QString lData)
 }
 
 //阻塞从队列头部获取最早数据
-bool Redis::blpop(QString lData, redisCallbackFn *fn)
+bool RedisClient::blpop(QString lData, redisCallbackFn *fn)
 {
     if (rac == nullptr)
     {
@@ -304,7 +304,7 @@ bool Redis::blpop(QString lData, redisCallbackFn *fn)
 }
 
 //同步阻塞一定时间返回数据
-QString Redis::blpop(QString lData, quint32 timeout)
+QString RedisClient::blpop(QString lData, quint32 timeout)
 {
     //最小为1,防止0时永久阻塞
     if (timeout < 1)
@@ -328,7 +328,7 @@ QString Redis::blpop(QString lData, quint32 timeout)
     return ret;
 }
 
-bool Redis::set(QString k, QString v)
+bool RedisClient::set(QString k, QString v)
 {
     bool ret = false;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("set " + k + " " + v).toStdString().c_str()); //支持空格
@@ -350,7 +350,7 @@ bool Redis::set(QString k, QString v)
     return ret;
 }
 
-bool Redis::setb(QString k, QByteArray &v)
+bool RedisClient::setb(QString k, QByteArray &v)
 {
     bool ret = false;
     const char *arg[3]; //3个参数(cmd, k, v)
@@ -383,7 +383,7 @@ bool Redis::setb(QString k, QByteArray &v)
     return ret;
 }
 
-QString Redis::get(QString k)
+QString RedisClient::get(QString k)
 {
     QString ret;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str());
@@ -399,7 +399,7 @@ QString Redis::get(QString k)
     return ret;
 }
 
-QByteArray Redis::getb(QString k)
+QByteArray RedisClient::getb(QString k)
 {
     QByteArray ret;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str());
@@ -416,7 +416,7 @@ QByteArray Redis::getb(QString k)
     return ret;
 }
 
-bool Redis::expire(QString k,int sec)
+bool RedisClient::expire(QString k,int sec)
 {
     bool ret = false;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("expire " + k + " " + QString::number(sec, 10)).toStdString().c_str());
@@ -437,7 +437,7 @@ bool Redis::expire(QString k,int sec)
     return ret;
 }
 
-bool Redis::publish(QString ch, QString js)
+bool RedisClient::publish(const QString& ch, const QString& js)
 {
     bool ret = false;
     QString pub = QString("publish " + ch + " " + js);
@@ -458,7 +458,7 @@ bool Redis::publish(QString ch, QString js)
     return ret;
 }
 
-bool Redis::publishb(QString ch, QByteArray ba)
+bool RedisClient::publishb(const QString& ch,const QByteArray& ba)
 {
     bool ret = false;
     const char *arg[3]; //3个参数(cmd, k, v)
@@ -488,7 +488,7 @@ bool Redis::publishb(QString ch, QByteArray ba)
     return ret;
 }
 
-void Redis::subscribe(QString ch, redisCallbackFn *fn, void* data)
+void RedisClient::subscribe(QString ch, redisCallbackFn *fn, void* data)
 {
     if (rac == nullptr)
     {
@@ -517,7 +517,7 @@ void Redis::subscribe(QString ch, redisCallbackFn *fn, void* data)
     }
 }
 
-void Redis::psubscribe(QString ch, redisCallbackFn *fn, void* data)
+void RedisClient::psubscribe(QString ch, redisCallbackFn *fn, void* data)
 {
     if (rac == nullptr)
     {
@@ -547,10 +547,10 @@ void Redis::psubscribe(QString ch, redisCallbackFn *fn, void* data)
     }
 }
 
-void Redis::Setup(std::string addr, uint port, std::string password)
+void RedisClient::Setup(std::string addr, uint _port, std::string password)
 {
     ip = addr.c_str();
-    port = port;
+    port = _port;
     auth = password.c_str();
 
     redisOk = false;

+ 6 - 5
DataSubscribe/RedisSubscriber/Redis.h → modules/RedisClient/RedisClient.h

@@ -7,13 +7,14 @@
 #include <QDebug>
 #include <QJsonObject>
 #include <QJsonDocument>
+#include "RedisClient_global.h"
 
-class Redis : public QObject
+class REDISCLIENT_EXPORT RedisClient : public QObject
 {
     Q_OBJECT
 public:
-    explicit Redis(QObject *parent = 0);
-    ~Redis();
+    explicit RedisClient(QObject *parent = 0);
+    ~RedisClient();
 
 public slots:
     void start();
@@ -29,8 +30,8 @@ public slots:
     QString lpop(QString lData);
     QString blpop(QString lData, quint32 timeout);          //同步阻塞一定时间返回数据
     bool blpop(QString lData, redisCallbackFn *fn);         //阻塞从队列头部获取最早数据
-    bool publish(QString ch, QString js);
-    bool publishb(QString ch, QByteArray ba);
+    bool publish(const QString& ch, const QString& js);
+    bool publishb(const QString& ch, const QByteArray& ba);
     bool expire(QString k,int sec);
     void subscribe(QString ch, redisCallbackFn *fn, void* data = nullptr);        // 订阅
     void psubscribe(QString ch, redisCallbackFn *fn, void* priData = nullptr);       // 订阅:模式匹配

+ 38 - 0
modules/RedisClient/RedisClient.pro

@@ -0,0 +1,38 @@
+QT -= gui
+
+TEMPLATE = lib
+DEFINES += REDISCLIENT_LIBRARY
+
+CONFIG += c++17
+
+# You can make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+INCLUDEPATH += ../../include
+
+HEADERS += \
+    RedisClient.h
+
+HIREDIS_DIR =$$PWD/../../thirdparty/hiredis
+HEADERS += $${HIREDIS_DIR}/include/hiredis/adapters/qt.h
+
+LIBS += -L$${HIREDIS_DIR}/lib -lhiredis
+#LIBS += ../thirdparty/hiredis/lib/libhiredis.a
+INCLUDEPATH += $${HIREDIS_DIR}/include
+
+SOURCES += \
+    RedisClient.cpp
+
+
+HEADERS += \
+    RedisClient_global.h
+
+
+# Default rules for deployment.
+unix {
+    target.path = /usr/lib
+}else{
+    DESTDIR = $$PWD/../../bin/plugins
+}
+!isEmpty(target.path): INSTALLS += target

+ 9 - 0
modules/RedisClient/RedisClient_global.h

@@ -0,0 +1,9 @@
+#pragma once
+#include <QtCore/qglobal.h>
+
+#if defined(REDISCLIENT_LIBRARY)
+#define REDISCLIENT_EXPORT Q_DECL_EXPORT
+#else
+#define REDISCLIENT_EXPORT Q_DECL_IMPORT
+#endif
+

+ 246 - 0
modules/TDengineClient/TDengineClient.cpp

@@ -0,0 +1,246 @@
+#include "TDengineClient.h"
+#include "taos.h"
+#include <QtCore/QDebug>
+#include <QtConcurrent/QtConcurrent>
+
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include "taos.h"
+
+
+
+int32_t TDengineClient::msgProcess(TAOS_RES* msg)
+{
+    char    buf[1024];
+    int32_t rows = 0;
+
+    const char* topicName = tmq_get_topic_name(msg);
+    const char* dbName = tmq_get_db_name(msg);
+    int32_t     vgroupId = tmq_get_vgroup_id(msg);
+
+    //qDebug() << __FILE__ << __LINE__ << "db: " << dbName;
+    //qDebug() << __FILE__ << __LINE__ << "topic: " << topicName;
+    //qDebug() << __FILE__ << __LINE__ << "vgroup id:" << vgroupId;
+
+    while (1)
+    {
+        TAOS_ROW row = taos_fetch_row(msg);
+        if (row == NULL)
+            break;
+
+        TAOS_FIELD* fields = taos_fetch_fields(msg);
+        int32_t     numOfFields = taos_field_count(msg);
+        //        int32_t*    length = taos_fetch_lengths(msg);
+        //        int32_t     precision = taos_result_precision(msg);
+        rows++;
+        taos_print_row(buf, row, fields, numOfFields);
+        //if( g_pSubCB != nullptr )
+        {
+            std::string topic = topicName;
+            std::string content = buf;
+            callback((char*)topic.c_str(),(char*)content.c_str(), usrData);
+        }
+        //qDebug() << __FILE__ << __LINE__ << "row content: " << buf;
+    }
+
+    return rows;
+}
+
+//构建消费者
+tmq_t* TDengineClient::buildConsumer()
+{
+    tmq_conf_res_t code;
+    tmq_conf_t*    conf = tmq_conf_new();
+
+    code = tmq_conf_set(conf, "td.connect.ip", host.toStdString().c_str());
+    if (TMQ_CONF_OK != code) {
+        tmq_conf_destroy(conf);
+        return NULL;
+    }
+
+    //code = tmq_conf_set(conf, "enable.auto.commit", "true");
+    code = tmq_conf_set(conf, "enable.auto.commit", "false"); //禁用提交回调
+    if (TMQ_CONF_OK != code) {
+        tmq_conf_destroy(conf);
+        return NULL;
+    }
+
+    //    code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
+    //    if (TMQ_CONF_OK != code) {
+    //        tmq_conf_destroy(conf);
+    //        return NULL;
+    //    }
+
+    code = tmq_conf_set(conf, "group.id", "cgrpName");
+    if (TMQ_CONF_OK != code) {
+        tmq_conf_destroy(conf);
+        return NULL;
+    }
+
+    code = tmq_conf_set(conf, "client.id", "user defined name");
+    if (TMQ_CONF_OK != code) {
+        tmq_conf_destroy(conf);
+        return NULL;
+    }
+
+    code = tmq_conf_set(conf, "td.connect.user", user.toStdString().c_str());
+    if (TMQ_CONF_OK != code) {
+        tmq_conf_destroy(conf);
+        return NULL;
+    }
+
+    code = tmq_conf_set(conf, "td.connect.pass", password.toStdString().c_str());
+    if (TMQ_CONF_OK != code) {
+        tmq_conf_destroy(conf);
+        return NULL;
+    }
+
+    //v3.2以后默认为最新的latest
+//    code = tmq_conf_set(conf, "auto.offset.reset", "latest");
+//    if (TMQ_CONF_OK != code) {
+//        tmq_conf_destroy(conf);
+//        return NULL;
+//    }
+
+    //tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); //禁用提交回调
+
+    tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
+    //qDebug()<< __FILE__ << __LINE__ << "tmq" << tmq;
+    tmq_conf_destroy(conf);
+    return tmq;
+}
+
+//构建主题
+tmq_list_t* TDengineClient::buildTopicList()
+{
+    tmq_list_t* tmqList = tmq_list_new();
+
+    std::list<std::string>::iterator itr;
+    for( itr = topicList.begin(); itr != topicList.end(); ++itr )
+    {
+        std::string szTopic = *itr;
+        int32_t code = tmq_list_append(tmqList, szTopic.c_str());
+        if (code) {
+            return NULL;
+        }
+    }
+    return tmqList;
+}
+
+//轮询主题
+void TDengineClient::topicLoop()
+{
+    int32_t code;
+    tmq_t* tmq = buildConsumer();
+    if (NULL == tmq) {
+        qDebug()<< __FILE__ << __LINE__ << "err" << stderr;
+        return;
+    }
+
+    tmq_list_t* topic_list = buildTopicList();
+    code = tmq_subscribe(tmq, topic_list);
+    if ( code ) {
+        qDebug() << __FILE__ << __LINE__ << "Failed to tmq_subscribe(): " << tmq_err2str(code);
+    }
+
+    tmq_list_destroy(topic_list);
+
+    int32_t totalRows = 0;
+    int32_t msgCnt = 0;
+    int32_t timeout = 100; //多个主题共用1个线程轮询,超时时间太长会影响其它主题实时性
+    while (true) {
+        TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout);
+        if (tmqmsg) {
+            msgCnt++;
+            totalRows += msgProcess(tmqmsg);
+            taos_free_result(tmqmsg);
+        } else {
+            //超时
+            //break;
+        }
+    }
+
+    code = tmq_consumer_close(tmq);
+    if (code) {
+        fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
+    } else {
+        fprintf(stderr, "%% Consumer closed\n");
+    }
+    qDebug()<< __FILE__ << __LINE__ << "stderr" << stderr;
+}
+
+TDengineClient::TDengineClient(QObject *parent)
+    : QObject(parent)
+{
+    pConn = nullptr;
+    //g_pSubCB = nullptr;
+    topicList.clear();
+}
+
+TDengineClient::~TDengineClient()
+{
+    taos_close(pConn);
+}
+
+void TDengineClient::exec(QString sql)
+{
+    if (pConn)
+    {
+        TAOS_RES* pRes = taos_query(pConn, sql.toStdString().c_str());
+        if (taos_errno(pRes) != 0) {
+            qDebug() << __FILE__ << __LINE__ << "failed to insert into tab, reason:" << taos_errstr(pRes);
+            qDebug()<< __FILE__ << __LINE__ << "err sql" << sql;
+        }
+        else
+        {
+            //qDebug()<< "exec ok";
+        }
+        taos_free_result(pRes);
+    }
+}
+
+void TDengineClient::subscribe(QString ch, std::function<void(const char* topic, const char* data, void*usr)> fn , void* usrdata)
+{
+    callback = fn;
+    usrData = usrdata;
+    topicList.push_back(ch.toLocal8Bit().toStdString());
+}
+
+void TDengineClient::psubscribe(QString ch, std::function<void(const char* topic, const char* data, void*usr)> fn, void*usrdata)
+{
+    callback = fn;
+    usrData = usrdata;
+    topicList.push_back(ch.toLocal8Bit().toStdString());
+}
+
+void TDengineClient::Setup(const char* _host, const char* _user, const char* _passwd, uint _port)
+{
+    // host = ts.addr.c_str();
+    // user = ts.user.c_str();
+    // dbName = ts.db.c_str();
+    // password = ts.password.c_str();
+    // port = ts.port;
+    host = _host;
+    user = _user;
+    password = _passwd;
+    port = _port;
+}
+
+void TDengineClient::start()
+{
+    pConn = taos_connect(host.toStdString().c_str(),
+                         user.toStdString().c_str(),
+                         password.toStdString().c_str(),
+                         dbName.toStdString().c_str(),
+                         port);
+    if (pConn == NULL) {
+        qDebug()<< __FILE__ << __LINE__ << "td conn err.";
+        return;
+    }
+
+    //开启一个线程轮询订阅的主题
+    QtConcurrent::run(this, &TDengineClient::topicLoop);
+}

+ 52 - 0
modules/TDengineClient/TDengineClient.h

@@ -0,0 +1,52 @@
+#pragma once
+
+//#include "MWareInterface.h"
+#include "taos.h"
+#include <QtCore/QObject>
+#include <QtCore/QDebug>
+#include "libaray_symbols.h"
+#include <functional>
+class TDENGINECLIENT_EXPORT TDengineClient : public QObject
+{
+    Q_OBJECT
+
+public:
+    explicit TDengineClient(QObject *parent = 0);
+    TDengineClient(QString password,
+             std::function<void(const char *, const char *, void*)> callback)
+        : password(std::move(password)), callback(std::move(callback)) {}
+    ~TDengineClient();
+
+    void exec(QString sql);
+
+public:
+    int32_t msgProcess(TAOS_RES* msg);  //消息处理
+    tmq_t* buildConsumer();             //构建消费者
+    tmq_list_t* buildTopicList();       //构建主题
+    void topicLoop();                   //轮询主题
+
+signals:
+public slots:
+
+private:
+    QString host = "192.168.9.6";
+    uint    port = 6041;
+    QString user = "root";
+    QString dbName = "lanpengdb";
+    QString password = "x=gheLw7QMAD4zjQh3d9";
+
+    TAOS* pConn = NULL;
+    std::function<void(const char* topic, const char* data, void*usr)> callback;
+
+    //TAOS* pConn = NULL;
+    //EventSubInterface*  g_pSubCB;       // 订阅回调
+    std::list<std::string>  topicList;
+
+    void * usrData;
+public:
+    //void Setup(tagSetup ts);
+    void Setup(const char* host, const char* user, const char* passwd, uint port);
+    void subscribe(QString ch, std::function<void(const char* topic, const char* data, void*usr)> fn, void* usrdata);      // 订阅
+    void psubscribe(QString ch, std::function<void(const char* topic, const char* data, void*usr)> fn, void*usrdata);     // 订阅:模式匹配
+    void start();
+};

+ 31 - 0
modules/TDengineClient/TDengineClient.pro

@@ -0,0 +1,31 @@
+QT = core network
+QT -= gui
+
+CONFIG += c++17 cmdline
+
+TARGET = TDengineClient
+TEMPLATE = lib
+
+unix{
+}
+else{
+     DESTDIR = $$PWD/../../bin/plugins
+}
+
+DEFINES += TDENGINECLIENT_LIBRARY
+
+INCLUDEPATH += ../include
+
+TDENGINE_DIR =$$PWD/../../thirdparty/tdengine
+
+LIBS += -L$${TDENGINE_DIR}/lib -ltaos
+#LIBS += ../thirdparty/hiredis/lib/libhiredis.a
+INCLUDEPATH += $${TDENGINE_DIR}/include
+
+
+HEADERS += \
+    TDengineClient.h \
+    libaray_symbols.h
+
+SOURCES += \
+    TDengineClient.cpp

+ 9 - 0
modules/TDengineClient/libaray_symbols.h

@@ -0,0 +1,9 @@
+#pragma once
+#include <QtCore/QGlobal.h>
+
+#if defined(TDENGINECLIENT_LIBRARY)
+#  define TDENGINECLIENT_EXPORT Q_DECL_EXPORT
+#else
+#  define TDENGINECLIENT_EXPORT Q_DECL_IMPORT
+#endif
+

BIN
modules/lib/TDengineClient.dll


BIN
modules/lib/libTDengineClient.a


+ 362 - 0
thirdparty/tdengine/include/taos.h

@@ -0,0 +1,362 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef TDENGINE_TAOS_H
+#define TDENGINE_TAOS_H
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef void   TAOS;
+typedef void   TAOS_STMT;
+typedef void   TAOS_RES;
+typedef void **TAOS_ROW;
+typedef void   TAOS_SUB;
+
+// Data type definition
+#define TSDB_DATA_TYPE_NULL       0   // 1 bytes
+#define TSDB_DATA_TYPE_BOOL       1   // 1 bytes
+#define TSDB_DATA_TYPE_TINYINT    2   // 1 byte
+#define TSDB_DATA_TYPE_SMALLINT   3   // 2 bytes
+#define TSDB_DATA_TYPE_INT        4   // 4 bytes
+#define TSDB_DATA_TYPE_BIGINT     5   // 8 bytes
+#define TSDB_DATA_TYPE_FLOAT      6   // 4 bytes
+#define TSDB_DATA_TYPE_DOUBLE     7   // 8 bytes
+#define TSDB_DATA_TYPE_VARCHAR    8   // string, alias for varchar
+#define TSDB_DATA_TYPE_TIMESTAMP  9   // 8 bytes
+#define TSDB_DATA_TYPE_NCHAR      10  // unicode string
+#define TSDB_DATA_TYPE_UTINYINT   11  // 1 byte
+#define TSDB_DATA_TYPE_USMALLINT  12  // 2 bytes
+#define TSDB_DATA_TYPE_UINT       13  // 4 bytes
+#define TSDB_DATA_TYPE_UBIGINT    14  // 8 bytes
+#define TSDB_DATA_TYPE_JSON       15  // json string
+#define TSDB_DATA_TYPE_VARBINARY  16  // binary
+#define TSDB_DATA_TYPE_DECIMAL    17  // decimal
+#define TSDB_DATA_TYPE_BLOB       18  // binary
+#define TSDB_DATA_TYPE_MEDIUMBLOB 19
+#define TSDB_DATA_TYPE_BINARY     TSDB_DATA_TYPE_VARCHAR  // string
+#define TSDB_DATA_TYPE_GEOMETRY   20  // geometry
+#define TSDB_DATA_TYPE_MAX        21
+
+typedef enum {
+  TSDB_OPTION_LOCALE,
+  TSDB_OPTION_CHARSET,
+  TSDB_OPTION_TIMEZONE,
+  TSDB_OPTION_CONFIGDIR,
+  TSDB_OPTION_SHELL_ACTIVITY_TIMER,
+  TSDB_OPTION_USE_ADAPTER,
+  TSDB_MAX_OPTIONS
+} TSDB_OPTION;
+
+typedef enum {
+  TSDB_SML_UNKNOWN_PROTOCOL = 0,
+  TSDB_SML_LINE_PROTOCOL = 1,
+  TSDB_SML_TELNET_PROTOCOL = 2,
+  TSDB_SML_JSON_PROTOCOL = 3,
+} TSDB_SML_PROTOCOL_TYPE;
+
+typedef enum {
+  TSDB_SML_TIMESTAMP_NOT_CONFIGURED = 0,
+  TSDB_SML_TIMESTAMP_HOURS,
+  TSDB_SML_TIMESTAMP_MINUTES,
+  TSDB_SML_TIMESTAMP_SECONDS,
+  TSDB_SML_TIMESTAMP_MILLI_SECONDS,
+  TSDB_SML_TIMESTAMP_MICRO_SECONDS,
+  TSDB_SML_TIMESTAMP_NANO_SECONDS,
+} TSDB_SML_TIMESTAMP_TYPE;
+
+typedef struct taosField {
+  char    name[65];
+  int8_t  type;
+  int32_t bytes;
+} TAOS_FIELD;
+
+typedef struct TAOS_FIELD_E {
+  char    name[65];
+  int8_t  type;
+  uint8_t precision;
+  uint8_t scale;
+  int32_t bytes;
+} TAOS_FIELD_E;
+
+#ifdef WINDOWS
+#define DLL_EXPORT __declspec(dllexport)
+#else
+#define DLL_EXPORT
+#endif
+
+typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *res, int code);
+typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type);
+
+typedef struct TAOS_MULTI_BIND {
+  int       buffer_type;
+  void     *buffer;
+  uintptr_t buffer_length;
+  int32_t  *length;
+  char     *is_null;
+  int       num;
+} TAOS_MULTI_BIND;
+
+typedef enum {
+  SET_CONF_RET_SUCC = 0,
+  SET_CONF_RET_ERR_PART = -1,
+  SET_CONF_RET_ERR_INNER = -2,
+  SET_CONF_RET_ERR_JSON_INVALID = -3,
+  SET_CONF_RET_ERR_JSON_PARSE = -4,
+  SET_CONF_RET_ERR_ONLY_ONCE = -5,
+  SET_CONF_RET_ERR_TOO_LONG = -6
+} SET_CONF_RET_CODE;
+
+typedef enum {
+  TAOS_NOTIFY_PASSVER = 0,
+} TAOS_NOTIFY_TYPE;
+
+#define RET_MSG_LENGTH 1024
+typedef struct setConfRet {
+  SET_CONF_RET_CODE retCode;
+  char              retMsg[RET_MSG_LENGTH];
+} setConfRet;
+
+typedef struct TAOS_VGROUP_HASH_INFO {
+  int32_t  vgId;
+  uint32_t hashBegin;
+  uint32_t hashEnd;
+} TAOS_VGROUP_HASH_INFO;
+
+typedef struct TAOS_DB_ROUTE_INFO {
+  int32_t                routeVersion;
+  int16_t                hashPrefix;
+  int16_t                hashSuffix;
+  int8_t                 hashMethod;
+  int32_t                vgNum;
+  TAOS_VGROUP_HASH_INFO *vgHash;
+} TAOS_DB_ROUTE_INFO;
+
+DLL_EXPORT void       taos_cleanup(void);
+DLL_EXPORT int        taos_options(TSDB_OPTION option, const void *arg, ...);
+DLL_EXPORT setConfRet taos_set_config(const char *config);
+DLL_EXPORT int        taos_init(void);
+DLL_EXPORT TAOS      *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
+DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
+DLL_EXPORT void  taos_close(TAOS *taos);
+
+DLL_EXPORT const char *taos_data_type(int type);
+
+DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
+DLL_EXPORT TAOS_STMT *taos_stmt_init_with_reqid(TAOS *taos, int64_t reqid);
+DLL_EXPORT int        taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
+DLL_EXPORT int        taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
+DLL_EXPORT int        taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
+DLL_EXPORT int        taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
+DLL_EXPORT int        taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
+DLL_EXPORT int        taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
+DLL_EXPORT int        taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields);
+// let stmt to reclaim TAOS_FIELD_E that was allocated by `taos_stmt_get_tag_fields`/`taos_stmt_get_col_fields`
+DLL_EXPORT void taos_stmt_reclaim_fields(TAOS_STMT *stmt, TAOS_FIELD_E *fields);
+
+DLL_EXPORT int       taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
+DLL_EXPORT int       taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
+DLL_EXPORT int       taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
+DLL_EXPORT int       taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
+DLL_EXPORT int       taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
+DLL_EXPORT int       taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
+DLL_EXPORT int       taos_stmt_add_batch(TAOS_STMT *stmt);
+DLL_EXPORT int       taos_stmt_execute(TAOS_STMT *stmt);
+DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
+DLL_EXPORT int       taos_stmt_close(TAOS_STMT *stmt);
+DLL_EXPORT char     *taos_stmt_errstr(TAOS_STMT *stmt);
+DLL_EXPORT int       taos_stmt_affected_rows(TAOS_STMT *stmt);
+DLL_EXPORT int       taos_stmt_affected_rows_once(TAOS_STMT *stmt);
+
+DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
+DLL_EXPORT TAOS_RES *taos_query_with_reqid(TAOS *taos, const char *sql, int64_t reqId);
+
+DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res);
+DLL_EXPORT int      taos_result_precision(TAOS_RES *res);  // get the time precision of result
+DLL_EXPORT void     taos_free_result(TAOS_RES *res);
+DLL_EXPORT void     taos_kill_query(TAOS *taos);
+DLL_EXPORT int      taos_field_count(TAOS_RES *res);
+DLL_EXPORT int      taos_num_fields(TAOS_RES *res);
+DLL_EXPORT int      taos_affected_rows(TAOS_RES *res);
+DLL_EXPORT int64_t  taos_affected_rows64(TAOS_RES *res);
+
+DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res);
+DLL_EXPORT int         taos_select_db(TAOS *taos, const char *db);
+DLL_EXPORT int         taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
+DLL_EXPORT void        taos_stop_query(TAOS_RES *res);
+DLL_EXPORT bool        taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
+DLL_EXPORT bool        taos_is_update_query(TAOS_RES *res);
+DLL_EXPORT int         taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
+DLL_EXPORT int         taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);
+DLL_EXPORT int         taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData);
+DLL_EXPORT int        *taos_get_column_data_offset(TAOS_RES *res, int columnIndex);
+DLL_EXPORT int         taos_validate_sql(TAOS *taos, const char *sql);
+DLL_EXPORT void        taos_reset_current_db(TAOS *taos);
+
+DLL_EXPORT int      *taos_fetch_lengths(TAOS_RES *res);
+DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
+
+DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
+DLL_EXPORT const char *taos_get_client_info();
+DLL_EXPORT int         taos_get_current_db(TAOS *taos, char *database, int len, int *required);
+
+DLL_EXPORT const char *taos_errstr(TAOS_RES *res);
+DLL_EXPORT int         taos_errno(TAOS_RES *res);
+
+DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
+DLL_EXPORT void taos_query_a_with_reqid(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param, int64_t reqid);
+DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
+DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
+DLL_EXPORT const void *taos_get_raw_block(TAOS_RES *res);
+
+DLL_EXPORT int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo);
+DLL_EXPORT int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId);
+DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *table[], int tableNum, int *vgId);
+
+DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
+
+// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
+DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
+
+DLL_EXPORT int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type);
+
+/*  --------------------------schemaless INTERFACE------------------------------- */
+
+DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol,
+                                                       int precision, int64_t reqid);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
+                                                int precision);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows,
+                                                           int protocol, int precision, int64_t reqid);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl(TAOS *taos, char *lines[], int numLines, int protocol, int precision,
+                                                int32_t ttl);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert_ttl_with_reqid(TAOS *taos, char *lines[], int numLines, int protocol,
+                                                           int precision, int32_t ttl, int64_t reqid);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl(TAOS *taos, char *lines, int len, int32_t *totalRows, int protocol,
+                                                    int precision, int32_t ttl);
+DLL_EXPORT TAOS_RES *taos_schemaless_insert_raw_ttl_with_reqid(TAOS *taos, char *lines, int len, int32_t *totalRows,
+                                                               int protocol, int precision, int32_t ttl, int64_t reqid);
+
+/* --------------------------TMQ INTERFACE------------------------------- */
+
+typedef struct tmq_t      tmq_t;
+typedef struct tmq_conf_t tmq_conf_t;
+typedef struct tmq_list_t tmq_list_t;
+
+typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param));
+
+DLL_EXPORT tmq_list_t *tmq_list_new();
+DLL_EXPORT int32_t     tmq_list_append(tmq_list_t *, const char *);
+DLL_EXPORT void        tmq_list_destroy(tmq_list_t *);
+DLL_EXPORT int32_t     tmq_list_get_size(const tmq_list_t *);
+DLL_EXPORT char      **tmq_list_to_c_array(const tmq_list_t *);
+
+DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
+
+DLL_EXPORT const char *tmq_err2str(int32_t code);
+
+/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
+typedef struct tmq_topic_assignment {
+  int32_t vgId;
+  int64_t currentOffset;
+  int64_t begin;
+  int64_t end;
+} tmq_topic_assignment;
+
+DLL_EXPORT int32_t   tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list);
+DLL_EXPORT int32_t   tmq_unsubscribe(tmq_t *tmq);
+DLL_EXPORT int32_t   tmq_subscription(tmq_t *tmq, tmq_list_t **topics);
+DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout);
+DLL_EXPORT int32_t   tmq_consumer_close(tmq_t *tmq);
+DLL_EXPORT int32_t   tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
+DLL_EXPORT void      tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
+DLL_EXPORT int32_t   tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment,
+                                              int32_t *numOfAssignment);
+DLL_EXPORT void      tmq_free_assignment(tmq_topic_assignment* pAssignment);
+DLL_EXPORT int32_t   tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset);
+
+/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
+
+enum tmq_conf_res_t {
+  TMQ_CONF_UNKNOWN = -2,
+  TMQ_CONF_INVALID = -1,
+  TMQ_CONF_OK = 0,
+};
+
+typedef enum tmq_conf_res_t tmq_conf_res_t;
+
+DLL_EXPORT tmq_conf_t    *tmq_conf_new();
+DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
+DLL_EXPORT void           tmq_conf_destroy(tmq_conf_t *conf);
+DLL_EXPORT void           tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param);
+
+/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
+
+DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
+DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
+DLL_EXPORT int32_t     tmq_get_vgroup_id(TAOS_RES *res);
+DLL_EXPORT int64_t     tmq_get_vgroup_offset(TAOS_RES* res);
+
+/* ------------------------------ TAOSX -----------------------------------*/
+// note: following apis are unstable
+enum tmq_res_t {
+  TMQ_RES_INVALID = -1,
+  TMQ_RES_DATA = 1,
+  TMQ_RES_TABLE_META = 2,
+  TMQ_RES_METADATA = 3,
+};
+
+typedef struct tmq_raw_data {
+  void    *raw;
+  uint32_t raw_len;
+  uint16_t raw_type;
+} tmq_raw_data;
+
+typedef enum tmq_res_t tmq_res_t;
+
+DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
+DLL_EXPORT tmq_res_t   tmq_get_res_type(TAOS_RES *res);
+DLL_EXPORT int32_t     tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
+DLL_EXPORT int32_t     tmq_write_raw(TAOS *taos, tmq_raw_data raw);
+DLL_EXPORT int         taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
+DLL_EXPORT int         taos_write_raw_block_with_fields(TAOS *taos, int rows, char *pData, const char *tbname,
+                                                        TAOS_FIELD *fields, int numFields);
+DLL_EXPORT void        tmq_free_raw(tmq_raw_data raw);
+// Returning null means error. Returned result need to be freed by tmq_free_json_meta
+DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
+DLL_EXPORT void  tmq_free_json_meta(char *jsonMeta);
+
+/* ---------------------------- TAOSX END -------------------------------- */
+
+typedef enum {
+  TSDB_SRV_STATUS_UNAVAILABLE = 0,
+  TSDB_SRV_STATUS_NETWORK_OK = 1,
+  TSDB_SRV_STATUS_SERVICE_OK = 2,
+  TSDB_SRV_STATUS_SERVICE_DEGRADED = 3,
+  TSDB_SRV_STATUS_EXTING = 4,
+} TSDB_SERVER_STATUS;
+
+DLL_EXPORT TSDB_SERVER_STATUS taos_check_server_status(const char *fqdn, int port, char *details, int maxlen);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif

BIN
thirdparty/tdengine/lib/taos.lib