|
@@ -3,12 +3,15 @@
|
|
#include <QtCore/QDebug>
|
|
#include <QtCore/QDebug>
|
|
#include <QtConcurrent/QtConcurrent>
|
|
#include <QtConcurrent/QtConcurrent>
|
|
|
|
|
|
|
|
+#include <QJsonArray>
|
|
|
|
+#include <QJsonObject>
|
|
|
|
+#include <QJsonDocument>
|
|
|
|
+
|
|
#include <assert.h>
|
|
#include <assert.h>
|
|
#include <stdio.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <string.h>
|
|
#include <time.h>
|
|
#include <time.h>
|
|
-#include "taos.h"
|
|
|
|
|
|
|
|
|
|
|
|
TAOS* pConn = NULL;
|
|
TAOS* pConn = NULL;
|
|
@@ -40,6 +43,47 @@ int32_t TDengine::msgProcess(TAOS_RES* msg)
|
|
// int32_t precision = taos_result_precision(msg);
|
|
// int32_t precision = taos_result_precision(msg);
|
|
rows++;
|
|
rows++;
|
|
taos_print_row(buf, row, fields, numOfFields);
|
|
taos_print_row(buf, row, fields, numOfFields);
|
|
|
|
+
|
|
|
|
+ QJsonObject jsonObject;
|
|
|
|
+ for(int k = 0;k < numOfFields;k++)
|
|
|
|
+ {
|
|
|
|
+ QString fieldName = fields[k].name;
|
|
|
|
+ if(fields[k].type == TSDB_DATA_TYPE_TIMESTAMP)
|
|
|
|
+ {
|
|
|
|
+ uint64_t value = *((int64_t *)row[k]);
|
|
|
|
+ int intvalue = (int)value;
|
|
|
|
+ jsonObject.insert(fieldName,intvalue);
|
|
|
|
+ }
|
|
|
|
+ else if(fields[k].type == TSDB_DATA_TYPE_VARCHAR)
|
|
|
|
+ {
|
|
|
|
+ QString value((char *)row[k]);
|
|
|
|
+ jsonObject.insert(fieldName,value);
|
|
|
|
+ }
|
|
|
|
+ else if(fields[k].type == TSDB_DATA_TYPE_DOUBLE)
|
|
|
|
+ {
|
|
|
|
+ double value = *(double *)(row[k]);
|
|
|
|
+ jsonObject.insert(fieldName,value);
|
|
|
|
+ }
|
|
|
|
+ else if(fields[k].type == TSDB_DATA_TYPE_FLOAT)
|
|
|
|
+ {
|
|
|
|
+ float value = *(float *)(row[k]);
|
|
|
|
+ jsonObject.insert(fieldName,value);
|
|
|
|
+ }
|
|
|
|
+ else if(fields[k].type == TSDB_DATA_TYPE_INT)
|
|
|
|
+ {
|
|
|
|
+ int value = *(int *)(row[k]);
|
|
|
|
+ jsonObject.insert(fieldName,value);
|
|
|
|
+ }
|
|
|
|
+ else if(fields[k].type == TSDB_DATA_TYPE_BIGINT)
|
|
|
|
+ {
|
|
|
|
+ int64_t value = *(int64_t *)(row[k]);
|
|
|
|
+ int intvalue = (int)value;
|
|
|
|
+ jsonObject.insert(fieldName,intvalue);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ std::string content = QJsonDocument(jsonObject).toJson(QJsonDocument::Compact).toStdString();
|
|
|
|
+
|
|
if( g_pSubCB != nullptr )
|
|
if( g_pSubCB != nullptr )
|
|
{
|
|
{
|
|
std::string topic = topicName;
|
|
std::string topic = topicName;
|
|
@@ -102,11 +146,11 @@ tmq_t* TDengine::buildConsumer()
|
|
}
|
|
}
|
|
|
|
|
|
//v3.2以后默认为最新的latest
|
|
//v3.2以后默认为最新的latest
|
|
-// code = tmq_conf_set(conf, "auto.offset.reset", "latest");
|
|
|
|
-// if (TMQ_CONF_OK != code) {
|
|
|
|
-// tmq_conf_destroy(conf);
|
|
|
|
-// return NULL;
|
|
|
|
-// }
|
|
|
|
|
|
+ // code = tmq_conf_set(conf, "auto.offset.reset", "latest");
|
|
|
|
+ // if (TMQ_CONF_OK != code) {
|
|
|
|
+ // tmq_conf_destroy(conf);
|
|
|
|
+ // return NULL;
|
|
|
|
+ // }
|
|
|
|
|
|
//tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); //禁用提交回调
|
|
//tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); //禁用提交回调
|
|
|
|
|