TDengineSubscriber.cpp 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. #include "TDengineSubscriber.h"
  2. #include <QDebug>
  3. TDengineSubscriber::TDengineSubscriber() {
  4. }
  5. #define REDISTOPIC "test"
  6. static void fnRedisCallback(const char* topic, const char* data, void* usr){
  7. TDengineSubscriber* subscriber = static_cast<TDengineSubscriber*>(usr);
  8. subscriber->enqueue(topic, data);
  9. }
  10. void TDengineSubscriber::enqueue(const QString &key, const QString &val)
  11. {
  12. emit pubData("tdengine", key, val);
  13. /*mutex.lock();
  14. msgQueue.push_back({key, val});
  15. cond.wakeAll();
  16. mutex.unlock();*/
  17. }
  18. void TDengineSubscriber::Run()
  19. {
  20. //subscribe redis topic
  21. TDengine = new TDengineClient();
  22. TDengine->start();
  23. TDengine->subscribe(REDISTOPIC, fnRedisCallback, this);
  24. start();
  25. }
  26. void TDengineSubscriber::setLoader(QLibrary *)
  27. {
  28. }
  29. void TDengineSubscriber::run(){
  30. QThread::msleep(1000);
  31. //redis message emit to onData signal.
  32. while(!isInterruptionRequested()){
  33. /*QString usr = "Redis";
  34. QString key = "key";
  35. QVariant val = "val";
  36. Msg msg;
  37. mutex.lock();
  38. if(cond.wait(&mutex), 500)
  39. {
  40. msg = msgQueue.dequeue();
  41. emit pubData(usr, msg.key, msg.val);
  42. qDebug() << __FILE__ << __FUNCTION__;
  43. }
  44. mutex.unlock();
  45. */
  46. /*if(!key.isEmpty()){
  47. emit pubData(usr, key, val);
  48. key.clear();
  49. }*/
  50. QThread::msleep(1000);
  51. }
  52. }
  53. Publisher* instance()
  54. {
  55. return new TDengineSubscriber();
  56. }
  57. void destroy(Publisher* pInstance)
  58. {
  59. if( pInstance )
  60. {
  61. delete pInstance;
  62. }
  63. }