123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- #include "RedisSubscriber.h"
- #include <QDebug>
- RedisSubscriber::RedisSubscriber() {
- }
- #define REDISTOPIC "test"
- static void fnRedisCallback(redisAsyncContext* ctx, void* r, void* data){
- redisReply *reply = static_cast<redisReply*>(r);
- RedisSubscriber* subscriber = static_cast<RedisSubscriber*>(data);
- if(reply == nullptr){
- qDebug() << "The reply is nullptr";
- return;
- }
- qDebug() << reply->element[0]->str << " "
- << reply->element[1]->str << " "
- << reply->element[2]->str << " ";
- if (QString(reply->element[0]->str) == "message"){ //收到订阅消息
- QString ch = QString(reply->element[1]->str);
- // QString cmd = QString(reply->element[2]->str);
- QString js = QString(reply->element[2]->str);
- //qDebug() << js;
- QJsonDocument doc = QJsonDocument::fromJson(js.toLatin1());
- if(ch == REDISTOPIC){
- QJsonObject jsonObj = doc.object(); //转成本地编码(utf-8), 中文正常
- if(jsonObj["engine"].toString().toLower() == "on"){
- subscriber->enqueue(ch, js);
- }else if(jsonObj["engine"].toString().toLower() == "off"){
- subscriber->enqueue(ch, js);
- }else{
- subscriber->enqueue(ch, js);
- }
- }else{
- subscriber->enqueue(ch, js);
- }
- }else{
- qDebug() << "NOT A MESSAGE";
- }
- }
- void RedisSubscriber::enqueue(const QString &key, const QString &val)
- {
- emit pubData("redis", key, val);
- /*mutex.lock();
- msgQueue.push_back({key, val});
- cond.wakeAll();
- mutex.unlock();*/
- }
- void RedisSubscriber::Run()
- {
- //subscribe redis topic
- redis = new RedisClient();
- redis->start();
- redis->subscribe(REDISTOPIC, fnRedisCallback, this);
- //start();
- }
- void RedisSubscriber::setLoader(QLibrary *)
- {
- }
- void RedisSubscriber::run(){
- QThread::msleep(1000);
- //redis message emit to onData signal.
- /*while(!isInterruptionRequested()){
- qDebug() << __FILE__ << __FUNCTION__;
- QString usr = "Redis";
- QString key = "key";
- QVariant val = "val";
- Msg msg;
- mutex.lock();
- if(cond.wait(&mutex), 500)
- {
- msg = msgQueue.dequeue();
- emit pubData(usr, msg.key, msg.val);
- qDebug() << __FILE__ << __FUNCTION__;
- };
- mutex.unlock();
- //QThread::msleep(1000);
- }*/
- }
- Publisher* instance()
- {
- return new RedisSubscriber();
- }
- void destroy(Publisher* pInstance)
- {
- if( pInstance )
- {
- delete pInstance;
- }
- }
|