#include "TDengine.h" #include "taos.h" #include #include #include #include #include #include #include #include #include #include TAOS* pConn = NULL; EventSubInterface* g_pSubCB; // 订阅回调 std::list g_lstTopics; int32_t TDengine::msgProcess(TAOS_RES* msg) { char buf[1024]; int32_t rows = 0; const char* topicName = tmq_get_topic_name(msg); const char* dbName = tmq_get_db_name(msg); int32_t vgroupId = tmq_get_vgroup_id(msg); //qDebug() << __FILE__ << __LINE__ << "db: " << dbName; //qDebug() << __FILE__ << __LINE__ << "topic: " << topicName; //qDebug() << __FILE__ << __LINE__ << "vgroup id:" << vgroupId; while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); // int32_t* length = taos_fetch_lengths(msg); // int32_t precision = taos_result_precision(msg); rows++; taos_print_row(buf, row, fields, numOfFields); QJsonObject jsonObject; for(int k = 0;k < numOfFields;k++) { QString fieldName = fields[k].name; if(fields[k].type == TSDB_DATA_TYPE_TIMESTAMP) { uint64_t value = *((int64_t *)row[k]); int intvalue = (int)value; jsonObject.insert(fieldName,intvalue); } else if(fields[k].type == TSDB_DATA_TYPE_VARCHAR) { QString value((char *)row[k]); jsonObject.insert(fieldName,value); } else if(fields[k].type == TSDB_DATA_TYPE_DOUBLE) { double value = *(double *)(row[k]); jsonObject.insert(fieldName,value); } else if(fields[k].type == TSDB_DATA_TYPE_FLOAT) { float value = *(float *)(row[k]); jsonObject.insert(fieldName,value); } else if(fields[k].type == TSDB_DATA_TYPE_INT) { int value = *(int *)(row[k]); jsonObject.insert(fieldName,value); } else if(fields[k].type == TSDB_DATA_TYPE_BIGINT) { int64_t value = *(int64_t *)(row[k]); int intvalue = (int)value; jsonObject.insert(fieldName,intvalue); } } if( g_pSubCB != nullptr ) { std::string topic = topicName; //std::string content = buf; std::string content = QJsonDocument(jsonObject).toJson(QJsonDocument::Compact).toStdString(); g_pSubCB->SubCB(topic,content); //qDebug() << __FILE__ << __LINE__ << "SubCB: " << topic.c_str() << " - " << content.c_str(); } } return rows; } //构建消费者 tmq_t* TDengine::buildConsumer() { tmq_conf_res_t code; tmq_conf_t* conf = tmq_conf_new(); code = tmq_conf_set(conf, "td.connect.ip", host.toStdString().c_str()); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } //code = tmq_conf_set(conf, "enable.auto.commit", "true"); code = tmq_conf_set(conf, "enable.auto.commit", "false"); //禁用提交回调 if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } // code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); // if (TMQ_CONF_OK != code) { // tmq_conf_destroy(conf); // return NULL; // } code = tmq_conf_set(conf, "group.id", "cgrpName"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "client.id", "user defined name"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "td.connect.user", user.toStdString().c_str()); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "td.connect.pass", password.toStdString().c_str()); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } //v3.2以后默认为最新的latest // code = tmq_conf_set(conf, "auto.offset.reset", "latest"); // if (TMQ_CONF_OK != code) { // tmq_conf_destroy(conf); // return NULL; // } //tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); //禁用提交回调 tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); //qDebug()<< __FILE__ << __LINE__ << "tmq" << tmq; tmq_conf_destroy(conf); return tmq; } //构建主题 tmq_list_t* TDengine::buildTopicList() { tmq_list_t* topicList = tmq_list_new(); std::list::iterator itr; for( itr = g_lstTopics.begin(); itr != g_lstTopics.end(); ++itr ) { std::string szTopic = *itr; //qDebug() << __FILE__ << __LINE__ << " topic:" << szTopic.c_str(); int32_t code = tmq_list_append(topicList, szTopic.c_str()); if (code) { return NULL; } } return topicList; } //轮询主题 void TDengine::topicLoop() { int32_t code; tmq_t* tmq = buildConsumer(); if (NULL == tmq) { qDebug()<< __FILE__ << __LINE__ << "err" << stderr; return; } tmq_list_t* topic_list = buildTopicList(); code = tmq_subscribe(tmq, topic_list); if ( code ) { std::string szTopics; std::list::iterator itr; for( itr = g_lstTopics.begin(); itr != g_lstTopics.end(); ++itr ) { szTopics += " " + *itr; } qDebug() << __FILE__ << __LINE__ << tmq_err2str(code) << " : " << szTopics.c_str(); } tmq_list_destroy(topic_list); int32_t totalRows = 0; int32_t msgCnt = 0; int32_t timeout = 100; //多个主题共用1个线程轮询,超时时间太长会影响其它主题实时性 while (true) { TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); if (tmqmsg) { msgCnt++; totalRows += msgProcess(tmqmsg); taos_free_result(tmqmsg); } else { //超时 //break; } } code = tmq_consumer_close(tmq); if (code) { fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); } else { fprintf(stderr, "%% Consumer closed\n"); } qDebug()<< __FILE__ << __LINE__ << "stderr" << stderr; } TDengine::TDengine(QObject *parent) : QObject(parent) { pConn = nullptr; g_pSubCB = nullptr; g_lstTopics.clear(); } TDengine::~TDengine() { taos_close(pConn); } void TDengine::exec(QString sql) { if (pConn) { TAOS_RES* pRes = taos_query(pConn, sql.toStdString().c_str()); if (taos_errno(pRes) != 0) { qDebug() << __FILE__ << __LINE__ << "failed to insert into tab, reason:" << taos_errstr(pRes); qDebug()<< __FILE__ << __LINE__ << "err sql" << sql; } else { //qDebug()<< "exec ok"; } taos_free_result(pRes); } } void TDengine::subscribe(QString ch, EventSubInterface *fn) { g_pSubCB = fn; g_lstTopics.push_back(ch.toLocal8Bit().toStdString()); } void TDengine::psubscribe(QString ch, EventSubInterface *fn) { g_pSubCB = fn; g_lstTopics.push_back(ch.toLocal8Bit().toStdString()); } void TDengine::publish(QString key,QString val) { qDebug() << __FILE__ << __LINE__ << " publish " << key << " " << val; std::string szTable = key.toLocal8Bit().toStdString(); std::string szColume = key.toLocal8Bit().toStdString(); if( key.contains(".") ) { szTable = key.left(key.indexOf(".")).toLocal8Bit().toStdString(); szColume = key.mid(key.indexOf(".")+1,-1).toLocal8Bit().toStdString(); } std::string sql = "insert into "; sql += dbName.toLocal8Bit().toStdString() + "."; sql += szTable; sql += " (ts, " + szColume; sql += " ) values(NOW,"; sql += val.toLocal8Bit().toStdString(); sql += " );"; TAOS_RES* res = taos_query(pConn, sql.c_str()); int code = taos_errno(res); if (code != 0) { qCritical() << __FILE__ << __LINE__ << " execute sql failed. code = " << code << " msg = " << taos_errstr(res); qCritical() << __FILE__ << __LINE__ << " slq :" << sql.c_str(); taos_free_result(res); } int affectedRows = taos_affected_rows(res); taos_free_result(res); //return affectedRows; } void TDengine::Setup(tagSetup ts) { host = ts.addr.c_str(); user = ts.user.c_str(); dbName = ts.db.c_str(); password = ts.password.c_str(); m_nPort = ts.port; } void TDengine::start() { pConn = taos_connect(host.toStdString().c_str(), user.toStdString().c_str(), password.toStdString().c_str(), dbName.toStdString().c_str(), m_nPort); if (pConn == NULL) { qDebug() << __FILE__ << __LINE__ << "taos_connect failed:" << taos_errstr(pConn); qDebug()<< __FILE__ << __LINE__ << "td conn err."; return; } //开启一个线程轮询订阅的主题 QtConcurrent::run(this, &TDengine::topicLoop); }