TDengineSubscriber.cpp 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. #include "TDengineSubscriber.h"
  2. #include <QDebug>
  3. TDengineSubscriber::TDengineSubscriber() {
  4. }
  5. TDengineSubscriber::~TDengineSubscriber()
  6. {
  7. if(tdengine){
  8. delete tdengine;
  9. tdengine = nullptr;
  10. }
  11. }
  12. #define REDISTOPIC "test"
  13. static void fnRedisCallback(const char* topic, const char* data, void* usr){
  14. TDengineSubscriber* subscriber = static_cast<TDengineSubscriber*>(usr);
  15. subscriber->enqueue(topic, data);
  16. }
  17. void TDengineSubscriber::enqueue(const QString &key, const QString &val)
  18. {
  19. emit pubData("tdengine", key, val);
  20. /*mutex.lock();
  21. msgQueue.push_back({key, val});
  22. cond.wakeAll();
  23. mutex.unlock();*/
  24. }
  25. void TDengineSubscriber::Run()
  26. {
  27. //subscribe redis topic
  28. tdengine = new TDengineClient();
  29. tdengine->subscribe(REDISTOPIC, fnRedisCallback, this);
  30. tdengine->start();
  31. //start();
  32. }
  33. void TDengineSubscriber::setLoader(QLibrary *)
  34. {
  35. }
  36. void TDengineSubscriber::run(){
  37. QThread::msleep(1000);
  38. //redis message emit to onData signal.
  39. /*while(!isInterruptionRequested()){
  40. QString usr = "Redis";
  41. QString key = "key";
  42. QVariant val = "val";
  43. Msg msg;
  44. mutex.lock();
  45. if(cond.wait(&mutex), 500)
  46. {
  47. msg = msgQueue.dequeue();
  48. emit pubData(usr, msg.key, msg.val);
  49. qDebug() << __FILE__ << __FUNCTION__;
  50. }
  51. mutex.unlock();
  52. //QThread::msleep(1000);
  53. }*/
  54. }
  55. Publisher* instance()
  56. {
  57. return new TDengineSubscriber();
  58. }
  59. void destroy(Publisher* pInstance)
  60. {
  61. if( pInstance )
  62. {
  63. delete pInstance;
  64. }
  65. }