#include "TDengine.h" #include "taos.h" #include #include #include #include #include #include #include #include "taos.h" TAOS* pConn = NULL; EventSubInterface* g_pSubCB; // 订阅回调 std::list g_lstTopics; int32_t TDengine::msgProcess(TAOS_RES* msg) { char buf[1024]; int32_t rows = 0; const char* topicName = tmq_get_topic_name(msg); const char* dbName = tmq_get_db_name(msg); int32_t vgroupId = tmq_get_vgroup_id(msg); //qDebug() << __FILE__ << __LINE__ << "db: " << dbName; //qDebug() << __FILE__ << __LINE__ << "topic: " << topicName; //qDebug() << __FILE__ << __LINE__ << "vgroup id:" << vgroupId; while (1) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); // int32_t* length = taos_fetch_lengths(msg); // int32_t precision = taos_result_precision(msg); rows++; taos_print_row(buf, row, fields, numOfFields); if( g_pSubCB != nullptr ) { std::string topic = topicName; std::string content = buf; g_pSubCB->SubCB((char*)topic.c_str(),(char*)content.c_str()); } //qDebug() << __FILE__ << __LINE__ << "row content: " << buf; } return rows; } //构建消费者 tmq_t* TDengine::buildConsumer() { tmq_conf_res_t code; tmq_conf_t* conf = tmq_conf_new(); code = tmq_conf_set(conf, "td.connect.ip", host.toStdString().c_str()); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } //code = tmq_conf_set(conf, "enable.auto.commit", "true"); code = tmq_conf_set(conf, "enable.auto.commit", "false"); //禁用提交回调 if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } // code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); // if (TMQ_CONF_OK != code) { // tmq_conf_destroy(conf); // return NULL; // } code = tmq_conf_set(conf, "group.id", "cgrpName"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "client.id", "user defined name"); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "td.connect.user", user.toStdString().c_str()); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } code = tmq_conf_set(conf, "td.connect.pass", password.toStdString().c_str()); if (TMQ_CONF_OK != code) { tmq_conf_destroy(conf); return NULL; } //v3.2以后默认为最新的latest // code = tmq_conf_set(conf, "auto.offset.reset", "latest"); // if (TMQ_CONF_OK != code) { // tmq_conf_destroy(conf); // return NULL; // } //tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); //禁用提交回调 tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); //qDebug()<< __FILE__ << __LINE__ << "tmq" << tmq; tmq_conf_destroy(conf); return tmq; } //构建主题 tmq_list_t* TDengine::buildTopicList() { tmq_list_t* topicList = tmq_list_new(); std::list::iterator itr; for( itr = g_lstTopics.begin(); itr != g_lstTopics.end(); ++itr ) { std::string szTopic = *itr; qDebug() << __FILE__ << __LINE__ << " topic:" << szTopic.c_str(); int32_t code = tmq_list_append(topicList, szTopic.c_str()); if (code) { return NULL; } } return topicList; } //轮询主题 void TDengine::topicLoop() { int32_t code; tmq_t* tmq = buildConsumer(); if (NULL == tmq) { qDebug()<< __FILE__ << __LINE__ << "err" << stderr; return; } tmq_list_t* topic_list = buildTopicList(); code = tmq_subscribe(tmq, topic_list); if ( code ) { qDebug() << __FILE__ << __LINE__ << "Failed to tmq_subscribe(): " << tmq_err2str(code); } tmq_list_destroy(topic_list); int32_t totalRows = 0; int32_t msgCnt = 0; int32_t timeout = 100; //多个主题共用1个线程轮询,超时时间太长会影响其它主题实时性 while (true) { TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); if (tmqmsg) { msgCnt++; totalRows += msgProcess(tmqmsg); taos_free_result(tmqmsg); } else { //超时 //break; } } code = tmq_consumer_close(tmq); if (code) { fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); } else { fprintf(stderr, "%% Consumer closed\n"); } qDebug()<< __FILE__ << __LINE__ << "stderr" << stderr; } TDengine::TDengine(QObject *parent) : QObject(parent) { pConn = nullptr; g_pSubCB = nullptr; g_lstTopics.clear(); } TDengine::~TDengine() { taos_close(pConn); } void TDengine::exec(QString sql) { if (pConn) { TAOS_RES* pRes = taos_query(pConn, sql.toStdString().c_str()); if (taos_errno(pRes) != 0) { qDebug() << __FILE__ << __LINE__ << "failed to insert into tab, reason:" << taos_errstr(pRes); qDebug()<< __FILE__ << __LINE__ << "err sql" << sql; } else { //qDebug()<< "exec ok"; } taos_free_result(pRes); } } void TDengine::subscribe(QString ch, EventSubInterface *fn) { g_pSubCB = fn; g_lstTopics.push_back(ch.toLocal8Bit().toStdString()); } void TDengine::psubscribe(QString ch, EventSubInterface *fn) { g_pSubCB = fn; g_lstTopics.push_back(ch.toLocal8Bit().toStdString()); } void TDengine::Setup(tagSetup ts) { host = ts.addr.c_str(); user = ts.user.c_str(); dbName = ts.db.c_str(); password = ts.password.c_str(); m_nPort = ts.port; } void TDengine::start() { pConn = taos_connect(host.toStdString().c_str(), user.toStdString().c_str(), password.toStdString().c_str(), dbName.toStdString().c_str(), m_nPort); if (pConn == NULL) { qDebug() << __FILE__ << __LINE__ << "taos_connect failed:" << taos_errstr(pConn); qDebug()<< __FILE__ << __LINE__ << "td conn err."; return; } //开启一个线程轮询订阅的主题 QtConcurrent::run(this, &TDengine::topicLoop); }