ZenZ пре 1 година
родитељ
комит
26e10f1c69

+ 22 - 5
DataConsumer/TDengine/TDengine.cpp

@@ -7,15 +7,32 @@ void TDengine::Run()
     start();
 }
 
-void TDengine::OnData(QString user, QString key, QVariant val)
+void TDengine::OnData(const QString& user, const QString& key, const QVariant& val)
 {
-    qDebug() << user << key << val;
+    qDebug() << __FILE__ << __FUNCTION__<< __LINE__ << user << key << val;
+}
+
+void TDengine::setLoader(QLibrary *)
+{
+
 }
 
 void TDengine::run()
 {
-    while (!isInterruptionRequested()) {
-        //check connection
-        sleep(1);
+    while(!isInterruptionRequested()){
+        qDebug() << __FILE__ << __FUNCTION__;
+        QThread::msleep(1000);
+    }
+}
+Client* instance()
+{
+    return new TDengine();
+}
+
+void destroy(Client* pInstance)
+{
+    if( pInstance )
+    {
+        delete pInstance;
     }
 }

+ 8 - 2
DataConsumer/TDengine/TDengine.h

@@ -4,14 +4,20 @@
 #include "TDengine_global.h"
 #include "Client.h"
 
-class TDENGINE_EXPORT TDengine:public Client, public QThread
+class TDENGINE_EXPORT TDengine:public Client
 {
 public:
     TDengine();
     virtual void Run();
-    virtual void OnData(QString user, QString key, QVariant val);
+    virtual void OnData(const QString& user, const QString& key, const QVariant& val);
+    virtual void setLoader(QLibrary*);
 private:
     virtual void run();
 };
 
+extern "C" {//一定要添加上
+TDENGINE_EXPORT Client* instance();
+TDENGINE_EXPORT void destroy(Client*);
+}
+
 #endif // TDENGINE_H

+ 2 - 0
DataConsumer/TDengine/TDengine.pro

@@ -11,6 +11,8 @@ CONFIG += c++17
 
 INCLUDEPATH += ../../include
 
+HEADERS += ../../include/Client.h
+
 SOURCES += \
     TDengine.cpp
 

+ 21 - 24
DataManagerMain/DataConsumer.cpp

@@ -1,9 +1,6 @@
 #include "DataConsumer.h"
-#include <QMutex>
-#include <QSemaphore>
 #include <QThread>
 #include <QDateTime>
-#include <QQueue>
 #include <QVariant>
 #include "LibraryLoader.h"
 #include "Client.h"
@@ -19,28 +16,31 @@ DataConsumer::~DataConsumer()
 
 
 
-/*void DataConsumer::run()
-{
-    while(!isInterruptionRequested()){
+// void DataConsumer::run()
+// {
 
-    }
-}*/
+//     while(!isInterruptionRequested()){
+//         qDebug() << __FILE__ << __FUNCTION__;
+//         QThread::msleep(1000);
+//     }
+
+// }
 
 void DataConsumer::Setup(ConsumerInfo &ci)
 {
     std::string assemblyName = ci.AssemblyName;
     std::string className = ci.ClassName;
-
-    BaseModule* pModule = nullptr;// = (BaseModule)Assembly.LoadFile(Path.GetFullPath(@".\" + assemblyName)).CreateInstance(className);
-    pModule = LibraryLoader::load<BaseModule>(assemblyName);
+    dataName = className;
+    Client* pModule = nullptr;
+    // = (BaseModule)Assembly.LoadFile(Path.GetFullPath(@".\" + assemblyName)).CreateInstance(className);
+    pModule = LibraryLoader::load<Client>(assemblyName);
     if( pModule == nullptr )
     {
-        //        qCritical() << LOG_HEADER << " load " << szPlugin.c_str() << " failed.";
+        qCritical() << " load " << assemblyName.c_str() << " failed.";
         return;
     }
     if( pModule == nullptr )
     {
-
         qCritical() << __FILE__ << __LINE__ << " " << assemblyName.c_str() << " load failed.";
         return;
     }
@@ -57,22 +57,19 @@ std::string DataConsumer::getTypeList()
     return "DataStorage::RunnableModule";
 }
 
-void DataConsumer::setLoader(QLibrary *)
-{
+// void DataConsumer::setLoader(QLibrary *)
+// {
 
-}
+// }
 
-/*void DataConsumer::regConsumer(DataConsumer *dc)
-{
-    dataConsumer = dc;
-}*/
 
-/*void DataConsumer::Run()
+void DataConsumer::Run()
 {
     //start();
-}*/
+}
 
-void DataConsumer::OnData(QString user, QString key, QVariant val)
+void DataConsumer::OnData(const QString&usr ,const QString& key,const QVariant&val)
 {
-    runable->OnData(user, key, val);
+    qDebug() << __FILE__ <<__FUNCTION__<< usr << key << val ;
+    runable->OnData(usr, key, val);
 }

+ 11 - 12
DataManagerMain/DataConsumer.h

@@ -1,35 +1,34 @@
 #ifndef DATACONSUMER_H
 #define DATACONSUMER_H
 
-
 #include <string>
 #include <QVariant>
 #include "Define.h"
-#include "../include/RunnableModule.h"
-
-
+#include <QThread>
+#include <QLibrary>
 class Client;
-class DataConsumer:public Receiver, public BaseModule
+class DataConsumer:public QObject//, public BaseModule
 {
-
+    Q_OBJECT
 public:
     DataConsumer();
     ~DataConsumer();
-
+    std::string dataName;
     virtual void Setup(ConsumerInfo& ci);
 
     virtual std::string getTypeList();
 
-    virtual void setLoader(QLibrary*);
+    //virtual void setLoader(QLibrary*);
 
-    //virtual void Run() ;
-    virtual void OnData(QString user, QString key, QVariant val);
-private:
+    virtual void Run() ;
+public slots:
+    virtual void OnData(const QString& ,const QString& ,const QVariant&);
+//private:
     //virtual void run();
 private:
     QLibrary* library = nullptr;
     DataConsumer* dataConsumer = nullptr;
-    std::string dataName;
+    //std::string dataName;
     ConsumerInfo ci;
     Client* runable = nullptr;
 

+ 13 - 7
DataManagerMain/DataManager.cpp

@@ -3,6 +3,7 @@
 #include "DataManagerProxy.h"
 #include "DataSubscribe.h"
 #include "DataConsumer.h"
+#include <QDebug>
 DataManager::DataManager() {
 
 }
@@ -19,20 +20,25 @@ void DataManager::Startup()
     auto lstModules = dmp.loadModuleInfos(config.serverId, config.appName);
 
     for(auto itr = lstModules.consumers.begin(); itr != lstModules.consumers.end(); ++itr){
-        QScopedPointer<DataConsumer> consumer {new DataConsumer()};
-        //DataConsumer * consumer = new DataConsumer();
+        //ScopedPointer<DataConsumer> consumer {new DataConsumer()};
+        DataConsumer * consumer = new DataConsumer();
         ConsumerInfo ci = *itr;
         consumer->Setup(ci);
-        consumerMap.insert(QString(itr->name.c_str()),consumer.get());
+        consumerMap.insert(QString(itr->SubscribeName.c_str()),consumer);
     }
 
     for(auto itr = lstModules.modules.begin();itr!= lstModules.modules.end();++itr)
     {
         ModuleInfo mi = *itr;
-        QScopedPointer<DataSubscribe> subscriber(new DataSubscribe());
-        subscriber->regConsumer(consumerMap[itr->AssemblyName.c_str()]);
-        subscriber->Setup(mi);
-        subscriber->Run();
+
+        if(consumerMap.contains(itr->ClassName.c_str())){
+            //QScopedPointer<DataSubscribe> subscriber(new DataSubscribe());
+            DataSubscribe* subscriber = new DataSubscribe();
+            subscriber->regConsumer(consumerMap[itr->ClassName.c_str()]);
+            subscriber->Setup(mi);
+            //subscriber->Run();
+            subscribers.append(subscriber);
+        }
     }
 
 }

+ 4 - 1
DataManagerMain/DataManager.h

@@ -1,9 +1,10 @@
 #ifndef DATAMANAGER_H
 #define DATAMANAGER_H
-#include <string>
 #include <QVariant>
 #include <QMap>
+#include <QList>
 #include "DataConsumer.h"
+#include "DataSubscribe.h"
 #include <QSharedDataPointer>
 class DataManager
 {
@@ -12,8 +13,10 @@ public:
 public:
     //virtual void OnData(std::string key, QVariant val);
     void Startup();
+
 private:
     QMap<QString, DataConsumer*> consumerMap;
+    QList<DataSubscribe*> subscribers;
 };
 
 #endif // DATAMANAGER_H

+ 7 - 7
DataManagerMain/DataManagerMain.pro

@@ -7,13 +7,13 @@ CONFIG += c++17 cmdline
 #DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
 INCLUDEPATH += ../include/
 
-# HEADERS += \
-#     ../include/BaseModule.h \
-#     ../include/Define.h \
-#     ../include/JobModule.h \
-#     ../include/LibraryLoader.h \
-#     ../include/RunnableModule.h \
-#     ../include/Publisher.h
+HEADERS += \
+    ../include/BaseModule.h \
+    ../include/Define.h \
+    ../include/JobModule.h \
+    ../include/LibraryLoader.h \
+    ../include/RunnableModule.h \
+    ../include/Publisher.h
 
 
 HEADERS += \

+ 17 - 2
DataManagerMain/DataManagerProxy.cpp

@@ -6,6 +6,7 @@ DataManagerProxy::DataManagerProxy() {
 
 DataManagerInfo DataManagerProxy::loadModuleInfos(int id, std::string app)
 {
+/*
     DataItem objDataItem;
     objDataItem.Id = 1;
     objDataItem.Code = "CEJINGYI-001.TEMP";
@@ -33,9 +34,23 @@ DataManagerInfo DataManagerProxy::loadModuleInfos(int id, std::string app)
     objDevice.Description = "aaaaa";
     objDevice.ModuleInfo = objModule;
     objDevice.Properties.push_back(objDataItem);
+*/
+    ModuleInfo mi;
+    mi.Name = "RedisSubscriber";
+
+    mi.AssemblyName = "plugins/RedisSubscriber.dll";
+    mi.ClassName = "RedisSubscriber";
+    ConsumerInfo ci;
+
+    ci.name = "TDengine";
+
+    ci.AssemblyName = "plugins/TDengine.dll";
+    ci.ClassName = "TDengine";
+    ci.SubscribeName = "RedisSubscriber";
 
-    std::list<DeviceInfo> lstDevices;
-    lstDevices.push_back(objDevice);
     DataManagerInfo listDataManageInfo;
+    listDataManageInfo.consumers.push_back(ci);
+    listDataManageInfo.modules.push_back(mi);
+
     return listDataManageInfo;
 }

+ 16 - 24
DataManagerMain/DataSubscribe.cpp

@@ -2,33 +2,25 @@
 #include <QVariant>
 #include "LibraryLoader.h"
 #include "Publisher.h"
-
+#include <QDebug>
 DataSubscribe::DataSubscribe()
 {
-
+    //shares = new SharedData();
 }
 
-
-
-void DataSubscribe::run()
+DataSubscribe::~DataSubscribe()
 {
-    while(!isInterruptionRequested()){
-        if(shares.sem->tryAcquire(1, 500)){
-            shares.mutex->lock();
-            shares.commandList.dequeue();
-            shares.mutex->unlock();
-            dataConsumer->OnData(QString(dataName.c_str()), "time", QVariant());
-        }
-    }
+    //delete shares;
 }
 
+
 void DataSubscribe::Setup(ModuleInfo &mi)
 {
     std::string assemblyName = mi.AssemblyName;
     std::string className = mi.ClassName;
-
-    BaseModule* pModule = nullptr;// = (BaseModule)Assembly.LoadFile(Path.GetFullPath(@".\" + assemblyName)).CreateInstance(className);
-    pModule = LibraryLoader::load<BaseModule>(assemblyName);
+    dataName = className;
+    Publisher* pModule = nullptr;// = (BaseModule)Assembly.LoadFile(Path.GetFullPath(@".\" + assemblyName)).CreateInstance(className);
+    pModule = LibraryLoader::load<Publisher>(assemblyName);
     if( pModule == nullptr )
     {
         //        qCritical() << LOG_HEADER << " load " << szPlugin.c_str() << " failed.";
@@ -43,7 +35,8 @@ void DataSubscribe::Setup(ModuleInfo &mi)
 
     {
         Publisher* runable = dynamic_cast<Publisher*>(pModule);
-        runable->shares(shares);
+        //runable->shares(shares);
+        connect(runable, SIGNAL(pubData(const QString& ,const QString& ,const QVariant& )), dataConsumer, SLOT(OnData(const QString& ,const QString& ,const QVariant&)));
         runable->Run();
     }
 }
@@ -55,20 +48,19 @@ std::string DataSubscribe::getTypeList()
     return "DataStorage::RunnableModule";
 }
 
-void DataSubscribe::setLoader(QLibrary *)
-{
+// void DataSubscribe::setLoader(QLibrary *)
+// {
 
-}
+// }
 
 void DataSubscribe::regConsumer(DataConsumer *dc)
 {
     dataConsumer = dc;
+    qDebug() << "register consumer";
 }
 
-void DataSubscribe::Run()
-{
-    start();
-}
+
+
 
 
 

+ 9 - 8
DataManagerMain/DataSubscribe.h

@@ -1,33 +1,34 @@
 #ifndef DATASUBSCRIBE_H
 #define DATASUBSCRIBE_H
-#include "../include/RunnableModule.h"
 
+//#include "BaseModule.h"
 #include <QtCore/QThread>
 #include "DataConsumer.h"
 #include "Define.h"
-
 #include <QDateTime>
 
-class DataSubscribe : public RunnableModule, public QThread
+class SharedData;
+
+class DataSubscribe : public QObject//, public BaseModule
 {
 public:
     DataSubscribe();
+    ~DataSubscribe();
     virtual void Setup(ModuleInfo& mi);
 
     virtual std::string getTypeList();
 
     virtual void regConsumer(DataConsumer* dc) ;
-    virtual void Run() ;
 
-private:
-    virtual void run();
-    virtual void setLoader(QLibrary*);
+    //void OnData(QString, QString, QVariant);
+// private:
+//     virtual void setLoader(QLibrary*);
 private:
     QLibrary* library = nullptr;
     DataConsumer* dataConsumer = nullptr;
     std::string dataName;
     ModuleInfo mi;
-    SharedData shares;
+    SharedData* shares;
 
 };
 

+ 10 - 9
DataManater.pro

@@ -5,15 +5,16 @@ SUBDIRS += \
     #DataConsumer \
     #DataSubscribe \
     DataConsumer/TDengine \
-    DataManagerMain/DataManagerMain.pro
+    DataManagerMain/DataManagerMain.pro \
+    DataSubscribe/RedisSubscriber
     
 
 
-HEADERS += \
-    include/BaseModule.h \
-    include/Define.h \
-    include/JobModule.h \
-    include/LibraryLoader.h \
-    include/RunnableModule.h \
-    include/Publisher.h \
-    include/Client.h
+# HEADERS += \
+#     include/BaseModule.h \
+#     include/Define.h \
+#     include/JobModule.h \
+#     include/LibraryLoader.h \
+#     include/RunnableModule.h \
+#     include/Publisher.h \
+#     include/Client.h

+ 50 - 0
DataSubscribe/RedisSubscriber/RedisSubscriber.cpp

@@ -0,0 +1,50 @@
+#include "RedisSubscriber.h"
+#include <QDebug>
+RedisSubscriber::RedisSubscriber() {}
+
+// void RedisSubscriber::shares(SharedData *share_)
+// {
+//     share = share_;
+// }
+
+void RedisSubscriber::Run()
+{
+
+    //subscribe redis topic
+    start();
+}
+
+void RedisSubscriber::setLoader(QLibrary *)
+{
+
+}
+
+void RedisSubscriber::run(){
+    QThread::msleep(1000);
+    //redis message emit to onData signal.
+    while(!isInterruptionRequested()){
+        QString usr = "Redis";
+        QString key = "key";
+        QVariant val = "val";
+
+        //qDebug() << __FILE__ << __FUNCTION__;
+
+        emit pubData(usr, key, val);
+        QThread::msleep(1000);
+    }
+}
+
+
+Publisher* instance()
+{
+    return new RedisSubscriber();
+}
+
+void destroy(Publisher* pInstance)
+{
+    if( pInstance )
+    {
+        delete pInstance;
+    }
+}
+

+ 22 - 0
DataSubscribe/RedisSubscriber/RedisSubscriber.h

@@ -0,0 +1,22 @@
+#pragma once
+#include "RedisSubscriber_global.h"
+#include "Publisher.h"
+class REDISSUBSCRIBER_EXPORT RedisSubscriber : public Publisher
+{
+    Q_OBJECT
+public:
+    RedisSubscriber();
+    // virtual void shares(SharedData * share);
+    virtual void Run();
+    virtual void setLoader(QLibrary*);
+protected:
+    virtual void run();
+signals:
+    void pubData(const QString& ,const QString& ,const QVariant&);
+
+};
+
+extern "C" {//一定要添加上
+REDISSUBSCRIBER_EXPORT Publisher* instance();
+REDISSUBSCRIBER_EXPORT void destroy(Publisher*);
+}

+ 31 - 0
DataSubscribe/RedisSubscriber/RedisSubscriber.pro

@@ -0,0 +1,31 @@
+QT -= gui
+
+TEMPLATE = lib
+DEFINES += REDISSUBSCRIBER_LIBRARY
+
+CONFIG += c++17
+
+# You can make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+INCLUDEPATH += ../../include
+
+HEADERS += \
+    ../../include/LibraryLoader.h \
+    ../../include/Publisher.h
+
+SOURCES += \
+    RedisSubscriber.cpp
+
+HEADERS += \
+    RedisSubscriber_global.h \
+    RedisSubscriber.h
+
+# Default rules for deployment.
+unix {
+    target.path = /usr/lib
+}else{
+    DESTDIR = $$PWD/../../bin/plugins
+}
+!isEmpty(target.path): INSTALLS += target

+ 12 - 0
DataSubscribe/RedisSubscriber/RedisSubscriber_global.h

@@ -0,0 +1,12 @@
+#ifndef REDISSUBSCRIBER_GLOBAL_H
+#define REDISSUBSCRIBER_GLOBAL_H
+
+#include <QtCore/qglobal.h>
+
+#if defined(REDISSUBSCRIBER_LIBRARY)
+#define REDISSUBSCRIBER_EXPORT Q_DECL_EXPORT
+#else
+#define REDISSUBSCRIBER_EXPORT Q_DECL_IMPORT
+#endif
+
+#endif // REDISSUBSCRIBER_GLOBAL_H

+ 1 - 1
include/BaseModule.h

@@ -8,7 +8,7 @@
 class Receiver
 {
 public:
-    virtual void OnData(QString user, QString key, QVariant val) = 0;
+    virtual void OnData(const QString& user, const QString& key, const QVariant& val) = 0;
 };
 
 

+ 7 - 5
include/Client.h

@@ -1,13 +1,15 @@
 #pragma once
-#include "RunnableModule.h"
 #include <QtCore/QThread>
-#include "Define.h"
 #include <QString>
+#include <QtCore/QLibrary>
+class Client :public QThread{
 
-class Client : public RunnableModule{
 public:
-    virtual void OnData(QString user, QString key, QVariant val) = 0;
-
+    Client(){}
+    virtual ~Client(){}
+    virtual void setLoader(QLibrary*) = 0;
+    virtual void OnData(const QString& user, const QString& key, const QVariant& val) = 0;
+    virtual void Run() = 0;
 };
 
 

+ 1 - 11
include/Define.h

@@ -57,6 +57,7 @@ struct ConsumerInfo{
     std::string name;
     std::string AssemblyName;       // 组件程序(dll、jar)名字,含路径
     std::string ClassName;          // 组件的 类名
+    std::string SubscribeName;
 };
 
 struct DataManagerInfo{
@@ -77,16 +78,5 @@ struct Config
     int serverId;
     std::string appName;
 };
-#include <QMutex>
-#include <QSemaphore>
-#include <QQueue>
 
-struct PublishData{
 
-};
-
-struct SharedData{
-    QMutex* mutex;
-    QSemaphore* sem;
-    QQueue<PublishData>commandList;
-};

+ 13 - 7
include/Publisher.h

@@ -1,13 +1,19 @@
 #pragma once
-#include "RunnableModule.h"
-#include <QtCore/QThread>
-#include "DataConsumer.h"
-#include "Define.h"
 
-class Publisher : public RunnableModule, public QThread{
-public:
-    virtual void shares(SharedData & share) = 0;
+#include <QtCore/QLibrary>
+#include <QObject>
+#include <QThread>
 
+class Publisher : public QThread{
+    Q_OBJECT
+public:
+    Publisher(){}
+    virtual ~Publisher(){}
+    // virtual void shares(SharedData * share) = 0;
+    virtual void Run() = 0;
+    virtual void setLoader(QLibrary*) = 0;
+signals:
+    void pubData(const QString& ,const QString& ,const QVariant&);
 
 };
 

+ 3 - 2
include/RunnableModule.h

@@ -1,9 +1,10 @@
 #pragma once
 #include "BaseModule.h"
 #include <QtCore/QObject>
-
-class RunnableModule: public BaseModule
+#include <QtCore/QThread>
+class RunnableModule: public BaseModule,public QThread
 {
+
 public:
  //   virtual void onData(QObject data) = 0;
     virtual void Run() = 0;