#include "RedisSubscriber.h" #include RedisSubscriber::RedisSubscriber() { } #define REDISTOPIC "test" static void fnRedisCallback(redisAsyncContext* ctx, void* r, void* data){ redisReply *reply = static_cast(r); RedisSubscriber* subscriber = static_cast(data); if(reply == nullptr){ qDebug() << "The reply is nullptr"; return; } qDebug() << reply->element[0]->str << " " << reply->element[1]->str << " " << reply->element[2]->str << " "; if (QString(reply->element[0]->str) == "message"){ //收到订阅消息 QString ch = QString(reply->element[1]->str); // QString cmd = QString(reply->element[2]->str); QString js = QString(reply->element[2]->str); //qDebug() << js; QJsonDocument doc = QJsonDocument::fromJson(js.toLatin1()); if(ch == REDISTOPIC){ QJsonObject jsonObj = doc.object(); //转成本地编码(utf-8), 中文正常 if(jsonObj["engine"].toString().toLower() == "on"){ subscriber->enqueue(ch, js); }else if(jsonObj["engine"].toString().toLower() == "off"){ subscriber->enqueue(ch, js); }else{ subscriber->enqueue(ch, js); } }else{ subscriber->enqueue(ch, js); } }else{ qDebug() << "NOT A MESSAGE"; } } void RedisSubscriber::enqueue(const QString &key, const QString &val) { emit pubData("redis", key, val); /*mutex.lock(); msgQueue.push_back({key, val}); cond.wakeAll(); mutex.unlock();*/ } void RedisSubscriber::Run(const ModuleInfo& mi) { //subscribe redis topic redis = new RedisClient(); redis->start(); redis->subscribe(mi.Topic.c_str(), fnRedisCallback, this); //start(); } void RedisSubscriber::setLoader(QLibrary *) { } void RedisSubscriber::run(){ QThread::msleep(1000); //redis message emit to onData signal. /*while(!isInterruptionRequested()){ qDebug() << __FILE__ << __FUNCTION__; QString usr = "Redis"; QString key = "key"; QVariant val = "val"; Msg msg; mutex.lock(); if(cond.wait(&mutex), 500) { msg = msgQueue.dequeue(); emit pubData(usr, msg.key, msg.val); qDebug() << __FILE__ << __FUNCTION__; }; mutex.unlock(); //QThread::msleep(1000); }*/ } Publisher* instance() { return new RedisSubscriber(); } void destroy(Publisher* pInstance) { if( pInstance ) { delete pInstance; } }