1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- #include "TDengineSubscriber.h"
- #include <QDebug>
- TDengineSubscriber::TDengineSubscriber() {
- }
- #define REDISTOPIC "test"
- static void fnRedisCallback(const char* topic, const char* data, void* usr){
- TDengineSubscriber* subscriber = static_cast<TDengineSubscriber*>(usr);
- subscriber->enqueue(topic, data);
- }
- void TDengineSubscriber::enqueue(const QString &key, const QString &val)
- {
- emit pubData("tdengine", key, val);
- /*mutex.lock();
- msgQueue.push_back({key, val});
- cond.wakeAll();
- mutex.unlock();*/
- }
- void TDengineSubscriber::Run()
- {
- //subscribe redis topic
- TDengine = new TDengineClient();
- TDengine->start();
- TDengine->subscribe(REDISTOPIC, fnRedisCallback, this);
- start();
- }
- void TDengineSubscriber::setLoader(QLibrary *)
- {
- }
- void TDengineSubscriber::run(){
- QThread::msleep(1000);
- //redis message emit to onData signal.
- while(!isInterruptionRequested()){
- /*QString usr = "Redis";
- QString key = "key";
- QVariant val = "val";
- Msg msg;
- mutex.lock();
- if(cond.wait(&mutex), 500)
- {
- msg = msgQueue.dequeue();
- emit pubData(usr, msg.key, msg.val);
- qDebug() << __FILE__ << __FUNCTION__;
- }
- mutex.unlock();
- */
- /*if(!key.isEmpty()){
- emit pubData(usr, key, val);
- key.clear();
- }*/
- QThread::msleep(1000);
- }
- }
- Publisher* instance()
- {
- return new TDengineSubscriber();
- }
- void destroy(Publisher* pInstance)
- {
- if( pInstance )
- {
- delete pInstance;
- }
- }
|