#include "TDengineSubscriber.h" #include TDengineSubscriber::TDengineSubscriber() { } #define REDISTOPIC "test" static void fnRedisCallback(const char* topic, const char* data, void* usr){ TDengineSubscriber* subscriber = static_cast(usr); subscriber->enqueue(topic, data); } void TDengineSubscriber::enqueue(const QString &key, const QString &val) { emit pubData("tdengine", key, val); /*mutex.lock(); msgQueue.push_back({key, val}); cond.wakeAll(); mutex.unlock();*/ } void TDengineSubscriber::Run() { //subscribe redis topic TDengine = new TDengineClient(); TDengine->start(); TDengine->subscribe(REDISTOPIC, fnRedisCallback, this); start(); } void TDengineSubscriber::setLoader(QLibrary *) { } void TDengineSubscriber::run(){ QThread::msleep(1000); //redis message emit to onData signal. while(!isInterruptionRequested()){ /*QString usr = "Redis"; QString key = "key"; QVariant val = "val"; Msg msg; mutex.lock(); if(cond.wait(&mutex), 500) { msg = msgQueue.dequeue(); emit pubData(usr, msg.key, msg.val); qDebug() << __FILE__ << __FUNCTION__; } mutex.unlock(); */ /*if(!key.isEmpty()){ emit pubData(usr, key, val); key.clear(); }*/ QThread::msleep(1000); } } Publisher* instance() { return new TDengineSubscriber(); } void destroy(Publisher* pInstance) { if( pInstance ) { delete pInstance; } }