#include "RedisSubscriber.h" #include RedisSubscriber::RedisSubscriber() { } #define REDISTOPIC "test" static void fnRedisCallback(redisAsyncContext* ctx, void* r, void* data){ redisReply *reply = static_cast(r); RedisSubscriber* subscriber = static_cast(data); if(reply == nullptr){ qDebug() << "The reply is nullptr"; return; } qDebug() << reply->element[0]->str << " " << reply->element[1]->str << " " << reply->element[2]->str << " "; if (QString(reply->element[0]->str) == "message"){ //收到订阅消息 QString ch = QString(reply->element[1]->str); // QString cmd = QString(reply->element[2]->str); QString js = QString(reply->element[2]->str); //qDebug() << js; QJsonDocument doc = QJsonDocument::fromJson(js.toLatin1()); if(ch == REDISTOPIC){ QJsonObject jsonObj = doc.object(); //转成本地编码(utf-8), 中文正常 if(jsonObj["engine"].toString().toLower() == "on"){ subscriber->enqueue(ch, js); }else if(jsonObj["engine"].toString().toLower() == "off"){ subscriber->enqueue(ch, js); }else{ subscriber->enqueue(ch, js); } }else{ subscriber->enqueue(ch, js); } }else{ qDebug() << "NOT A MESSAGE"; } } void RedisSubscriber::enqueue(const QString &key, const QString &val) { mutex.lock(); msgQueue.push_back({key, val}); cond.wakeAll(); mutex.unlock(); } void RedisSubscriber::Run() { //subscribe redis topic redis = new Redis(); redis->start(); redis->subscribe(REDISTOPIC, fnRedisCallback, this); start(); } void RedisSubscriber::setLoader(QLibrary *) { } void RedisSubscriber::run(){ QThread::msleep(1000); //redis message emit to onData signal. while(!isInterruptionRequested()){ QString usr = "Redis"; QString key = "key"; QVariant val = "val"; Msg msg; mutex.lock(); if(cond.wait(&mutex), 500) { msg = msgQueue.dequeue(); emit pubData(usr, msg.key, msg.val); qDebug() << __FILE__ << __FUNCTION__; }; mutex.unlock(); /*if(!key.isEmpty()){ emit pubData(usr, key, val); key.clear(); }*/ //QThread::msleep(1000); } } Publisher* instance() { return new RedisSubscriber(); } void destroy(Publisher* pInstance) { if( pInstance ) { delete pInstance; } }