RedisSubscriber.cpp 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #include "RedisSubscriber.h"
  2. #include <QDebug>
  3. RedisSubscriber::RedisSubscriber() {
  4. }
  5. #define REDISTOPIC "test"
  6. static void fnRedisCallback(redisAsyncContext* ctx, void* r, void* data){
  7. redisReply *reply = static_cast<redisReply*>(r);
  8. RedisSubscriber* subscriber = static_cast<RedisSubscriber*>(data);
  9. if(reply == nullptr){
  10. qDebug() << "The reply is nullptr";
  11. return;
  12. }
  13. qDebug() << reply->element[0]->str << " "
  14. << reply->element[1]->str << " "
  15. << reply->element[2]->str << " ";
  16. if (QString(reply->element[0]->str) == "message"){ //收到订阅消息
  17. QString ch = QString(reply->element[1]->str);
  18. // QString cmd = QString(reply->element[2]->str);
  19. QString js = QString(reply->element[2]->str);
  20. //qDebug() << js;
  21. QJsonDocument doc = QJsonDocument::fromJson(js.toLatin1());
  22. if(ch == REDISTOPIC){
  23. QJsonObject jsonObj = doc.object(); //转成本地编码(utf-8), 中文正常
  24. if(jsonObj["engine"].toString().toLower() == "on"){
  25. subscriber->enqueue(ch, js);
  26. }else if(jsonObj["engine"].toString().toLower() == "off"){
  27. subscriber->enqueue(ch, js);
  28. }else{
  29. subscriber->enqueue(ch, js);
  30. }
  31. }else{
  32. subscriber->enqueue(ch, js);
  33. }
  34. }else{
  35. qDebug() << "NOT A MESSAGE";
  36. }
  37. }
  38. void RedisSubscriber::enqueue(const QString &key, const QString &val)
  39. {
  40. emit pubData("redis", key, val);
  41. /*
  42. mutex.lock();
  43. msgQueue.push_back({key, val});
  44. cond.wakeAll();
  45. mutex.unlock();
  46. */
  47. }
  48. void RedisSubscriber::Run()
  49. {
  50. //subscribe redis topic
  51. redis = new RedisClient();
  52. redis->start();
  53. redis->subscribe(REDISTOPIC, fnRedisCallback, this);
  54. start();
  55. }
  56. void RedisSubscriber::setLoader(QLibrary *)
  57. {
  58. }
  59. void RedisSubscriber::run(){
  60. QThread::msleep(1000);
  61. //redis message emit to onData signal.
  62. while(!isInterruptionRequested()){
  63. /*QString usr = "Redis";
  64. QString key = "key";
  65. QVariant val = "val";
  66. Msg msg;
  67. mutex.lock();
  68. if(cond.wait(&mutex), 500)
  69. {
  70. msg = msgQueue.dequeue();
  71. emit pubData(usr, msg.key, msg.val);
  72. qDebug() << __FILE__ << __FUNCTION__;
  73. };
  74. mutex.unlock();
  75. */
  76. /*if(!key.isEmpty()){
  77. emit pubData(usr, key, val);
  78. key.clear();
  79. }*/
  80. QThread::msleep(1000);
  81. }
  82. }
  83. Publisher* instance()
  84. {
  85. return new RedisSubscriber();
  86. }
  87. void destroy(Publisher* pInstance)
  88. {
  89. if( pInstance )
  90. {
  91. delete pInstance;
  92. }
  93. }