ZenZ 1 жил өмнө
parent
commit
e616b3f7db

+ 43 - 0
DataConsumer/Alarm/Alarm.cpp

@@ -0,0 +1,43 @@
+#include "Alarm.h"
+#include <QDebug>
+Alarm::Alarm() {
+    //AlarmClient* tdclient = new AlarmClient();
+    //tdclient->start();
+}
+Alarm::~Alarm(){
+    qDebug() << __FILE__ << __FUNCTION__<< __LINE__;
+}
+void Alarm::Run(const ConsumerInfo& ci)
+{
+    //start();
+}
+
+void Alarm::OnData(const QString& user, const QString& key, const QVariant& val)
+{
+    qDebug() << __FILE__ << __FUNCTION__<< __LINE__ << user << key << val;
+}
+
+void Alarm::setLoader(QLibrary *)
+{
+
+}
+
+void Alarm::run()
+{
+    /*while(!isInterruptionRequested()){
+        qDebug() << __FILE__ << __FUNCTION__;
+        QThread::msleep(1000);
+    }*/
+}
+Client* instance()
+{
+    return new Alarm();
+}
+
+void destroy(Client* pInstance)
+{
+    if( pInstance )
+    {
+        delete pInstance;
+    }
+}

+ 24 - 0
DataConsumer/Alarm/Alarm.h

@@ -0,0 +1,24 @@
+#ifndef Alarm_H
+#define Alarm_H
+
+#include "Alarm_global.h"
+#include "Client.h"
+
+class ALARM_EXPORT Alarm:public Client
+{
+public:
+    Alarm();
+    ~Alarm();
+    virtual void Run(const ConsumerInfo& ci);
+    virtual void OnData(const QString& user, const QString& key, const QVariant& val);
+    virtual void setLoader(QLibrary*);
+private:
+    virtual void run();
+};
+
+extern "C" {//一定要添加上
+ALARM_EXPORT Client* instance();
+ALARM_EXPORT void destroy(Client*);
+}
+
+#endif // Alarm_H

+ 43 - 0
DataConsumer/Alarm/Alarm.pro

@@ -0,0 +1,43 @@
+QT -= gui
+
+TEMPLATE = lib
+DEFINES += ALARM_LIBRARY
+
+CONFIG += c++17
+
+# You can make your code fail to compile if it uses deprecated APIs.
+# In order to do so, uncomment the following line.
+#DEFINES += QT_DISABLE_DEPRECATED_BEFORE=0x060000    # disables all the APIs deprecated before Qt 6.0.0
+
+INCLUDEPATH += ../../include
+
+HEADERS += ../../include/Client.h
+
+SOURCES += \
+    Alarm.cpp
+
+HEADERS += \
+    Alarm_global.h \
+    Alarm.h
+
+# TDENGINE_DIR =$$PWD/../../thirdparty/tdengine
+# LIBS += -L$${TDENGINE_DIR}/lib -ltaos
+# INCLUDEPATH += $${TDENGINE_DIR}/include
+# INCLUDEPATH += $$PWD/../../modules/TDengineClient
+# LIBS +=-L$$PWD/../../bin/plugins -lTDengineClient
+
+HIREDIS_DIR =$$PWD/../../thirdparty/hiredis
+HEADERS += $${HIREDIS_DIR}/include/hiredis/adapters/qt.h
+LIBS += -L$${HIREDIS_DIR}/lib -lhiredis
+#LIBS += ../thirdparty/hiredis/lib/libhiredis.a
+INCLUDEPATH += $${HIREDIS_DIR}/include
+
+
+# Default rules for deployment.
+unix {
+    target.path = /usr/lib
+}else{
+    DESTDIR = $$PWD/../../bin/plugins
+}
+
+!isEmpty(target.path): INSTALLS += target

+ 12 - 0
DataConsumer/Alarm/Alarm_global.h

@@ -0,0 +1,12 @@
+#ifndef ALARM_GLOBAL_H
+#define ALARM_GLOBAL_H
+
+#include <QtCore/qglobal.h>
+
+#if defined(ALARM_LIBRARY)
+#define ALARM_EXPORT Q_DECL_EXPORT
+#else
+#define ALARM_EXPORT Q_DECL_IMPORT
+#endif
+
+#endif // TDENGINE_GLOBAL_H

+ 1 - 1
DataConsumer/TDengine/TDengine.cpp

@@ -7,7 +7,7 @@ TDengine::TDengine() {
 TDengine::~TDengine(){
     qDebug() << __FILE__ << __FUNCTION__<< __LINE__;
 }
-void TDengine::Run()
+void TDengine::Run(const ConsumerInfo& ci)
 {
     //start();
 }

+ 1 - 1
DataConsumer/TDengine/TDengine.h

@@ -9,7 +9,7 @@ class TDENGINE_EXPORT TDengine:public Client
 public:
     TDengine();
     ~TDengine();
-    virtual void Run();
+    virtual void Run(const ConsumerInfo& ci);
     virtual void OnData(const QString& user, const QString& key, const QVariant& val);
     virtual void setLoader(QLibrary*);
 private:

+ 2 - 3
DataManagerMain/DataConsumer.cpp

@@ -29,8 +29,7 @@ DataConsumer::~DataConsumer()
 void DataConsumer::Setup(ConsumerInfo &ci)
 {
     std::string assemblyName = ci.AssemblyName;
-    std::string className = ci.ClassName;
-    dataName = className;
+    dataName = ci.Name;
     Client* pModule = nullptr;
     // = (BaseModule)Assembly.LoadFile(Path.GetFullPath(@".\" + assemblyName)).CreateInstance(className);
     pModule = LibraryLoader::load<Client>(assemblyName);
@@ -48,7 +47,7 @@ void DataConsumer::Setup(ConsumerInfo &ci)
     {
         //Client*
         runable = dynamic_cast<Client*>(pModule);
-        runable->Run();
+        runable->Run(ci);
     }
 }
 

+ 24 - 20
DataManagerMain/DataManager.cpp

@@ -25,27 +25,31 @@ void DataManager::Startup(Config& config)
 {
     //Config config;
     DataManagerProxy dmp;
-    auto lstModules = dmp.loadModuleInfos(config.AppId.toStdString().c_str(), config.AppName.toStdString().c_str());
-
-    for(auto itr = lstModules.consumers.begin(); itr != lstModules.consumers.end(); ++itr){
-        //ScopedPointer<DataConsumer> consumer {new DataConsumer()};
-        DataConsumer * consumer = new DataConsumer();
-        ConsumerInfo ci = *itr;
-        consumer->Setup(ci);
-        consumerMap.insert(QString(itr->SubscribeName.c_str()),consumer);
-    }
+    auto lstModules = dmp.loadModuleInfos(config);
+    for(auto itModules = lstModules.begin(); itModules != lstModules.end(); itModules++){
+        for(auto itr = itModules->consumers.begin(); itr != itModules->consumers.end(); ++itr){
+            //ScopedPointer<DataConsumer> consumer {new DataConsumer()};
+            DataConsumer * consumer = new DataConsumer();
+            ConsumerInfo ci = *itr;
+            consumer->Setup(ci);
+            consumerMap.insert(QString(itr->SubscribeName.c_str()),consumer);
+        }
 
-    for(auto itr = lstModules.modules.begin();itr!= lstModules.modules.end();++itr)
-    {
-        ModuleInfo mi = *itr;
-
-        if(consumerMap.contains(itr->ClassName.c_str())){
-            //QScopedPointer<DataSubscribe> subscriber(new DataSubscribe());
-            DataSubscribe* subscriber = new DataSubscribe();
-            subscriber->regConsumer(consumerMap[itr->ClassName.c_str()]);
-            subscriber->Setup(mi);
-            //subscriber->Run();
-            subscribers.append(subscriber);
+        for(auto itr = itModules->modules.begin();itr!= itModules->modules.end();++itr)
+        {
+            ModuleInfo mi = *itr;
+            if(consumerMap.contains(itr->Name.c_str())){
+                //QScopedPointer<DataSubscribe> subscriber(new DataSubscribe());
+                DataSubscribe* subscriber = new DataSubscribe();
+
+                auto mapIt = consumerMap.find(itr->Name.c_str());
+                while(mapIt != consumerMap.end() && mapIt.key() == itr->Name.c_str()){
+                    subscriber->regConsumer(mapIt.value());
+                }
+                subscriber->Setup(mi);
+                //subscriber->Run();
+                subscribers.append(subscriber);
+            }
         }
     }
 

+ 1 - 1
DataManagerMain/DataManager.h

@@ -17,7 +17,7 @@ public:
     void Startup(Config& config);
 
 private:
-    QMap<QString, DataConsumer*> consumerMap;
+    QMultiMap<QString, DataConsumer*> consumerMap;
     QList<DataSubscribe*> subscribers;
 };
 

+ 66 - 25
DataManagerMain/DataManagerProxy.cpp

@@ -1,48 +1,89 @@
 #include "DataManagerProxy.h"
 #include "RedisClient.h"
+#include <QJsonArray>
+#include <QJsonObject>
 DataManagerProxy::DataManagerProxy() {
 
 }
 
-QList<DataManagerInfo> DataManagerProxy::loadModuleInfos(const char* id, const char* app)
+QList<DataManagerInfo> DataManagerProxy::loadModuleInfos(const Config& config)
 {
-
-    RedisClient* redis = new RedisClient();
+    RedisClient redis;
     QList<DataManagerInfo > listDataManageInfo;
-    QString key = QString("%1:%2").arg(app).arg(id);
+    QString key = QString("%1:%2").arg(config.AppName, config.AppId);
     DeviceInfo di;
-    if( redis != nullptr)
+
+    redis.Setup(config.redisCfg.host, config.redisCfg.port, config.redisCfg.au);
+    // QHash<QString, QString> h = redis->hgetall(key);
+    // for(auto it = h.begin(); it != h.end(); it++)
+    // {
+    //     qDebug()<< "key" << it.key();
+    //     //qDebug()<< "val" << it.value();
+    //     QJsonArray ja = QJsonDocument::fromJson(it.value().toUtf8()).array();
+    //     // QJsonObject jo = QJsonDocument::fromJson(it.value().toUtf8()).object();
+    // }
+
+    QStringList lst = redis.hvals(key);
+    foreach (QString str, lst)
     {
-        QStringList lst = redis->hvals(key);
-        foreach (QString str, lst)
+        if( str.isEmpty() )
         {
-            if( str.isEmpty() )
-            {
-                continue;
-            }
+            continue;
+        }
+        QJsonParseError jsonParseError;
+        QJsonDocument jsonDocument(QJsonDocument::fromJson(str.toUtf8(), &jsonParseError));
+        if(QJsonParseError::NoError != jsonParseError.error)
+        {
+            //LOGERROR("parse json file {} error", fullpath.toStdString().c_str());
+            continue;
+        }
+        if(!jsonDocument.isArray()){
+            continue;
+        }
+        QJsonArray ja = jsonDocument.array();
+        foreach (auto var, ja) {
+            QJsonObject item = var.toObject();
+            QString topic = item["topic"].toString();
+            QJsonArray clients = item["Client"].toArray();
+            DataManagerInfo dataManageInfo;
             ModuleInfo mi;
-            mi.Name = "RedisSubscriber";
-
-            mi.AssemblyName = "plugins/RedisSubscriber.dll";
-            mi.ClassName = "RedisSubscriber";
+            mi.Name = "TDengineSubscriber";
+            mi.AssemblyName = "plugins/TDengineSubscriber.dll";
+            // mi.ClassName = "TDengineSubscriber";
+            mi.Code = topic.toStdString();
+            foreach (auto client, clients) {
+                ConsumerInfo ci;
+                ci.Name = client["Name"].toString().toStdString();
+                ci.AssemblyName = client["AssemblyName"].toString().toStdString();
+                //ci.ClassName = "Alarm";
+                ci.SubscribeName = topic.toStdString();
+                ci.Settings = client["Settings"].toVariant().toString().toStdString();
+                dataManageInfo.consumers.push_back(ci);
+            }
+            dataManageInfo.modules.push_back(mi);
+            listDataManageInfo.push_back(dataManageInfo);
+        }
+    }
 
-            ConsumerInfo ci;
+#if 0
 
-            ci.name = "TDengine";
+            ModuleInfo mi;
+            mi.Name = "TDengineSubscriber";
+            mi.AssemblyName = "plugins/TDengineSubscriber.dll";
+            mi.ClassName = "TDengineSubscriber";
+            mi.Code = "topic:xxx";
 
-            ci.AssemblyName = "plugins/TDengine.dll";
-            ci.ClassName = "TDengine";
-            ci.SubscribeName = "RedisSubscriber";
+            ConsumerInfo ci;
+            ci.Name = "Alarm";
+            ci.AssemblyName = "plugins/Alarm.dll";
+            ci.ClassName = "Alarm";
+            ci.SubscribeName = "TDengineSubscriber";
             DataManagerInfo dataManageInfo;
             dataManageInfo.consumers.push_back(ci);
             dataManageInfo.modules.push_back(mi);
             listDataManageInfo.push_back(dataManageInfo);
-
-        }
-    }
-
+#endif
     //return di;
 
-
     return listDataManageInfo;
 }

+ 2 - 1
DataManagerMain/DataManagerProxy.h

@@ -3,13 +3,14 @@
 #include "Define.h"
 #include <QMap>
 #include <QList>
+#include "Config.h"
 class DataManagerProxy
 {
 public:
     DataManagerProxy();
 
 public:
-    QList<DataManagerInfo> loadModuleInfos(const char* id,const char* app);
+    QList<DataManagerInfo> loadModuleInfos(const Config& config);
 };
 
 #endif // DATAMANAGERPROXY_H

+ 5 - 4
DataManagerMain/DataSubscribe.cpp

@@ -17,8 +17,7 @@ DataSubscribe::~DataSubscribe()
 void DataSubscribe::Setup(ModuleInfo &mi)
 {
     std::string assemblyName = mi.AssemblyName;
-    std::string className = mi.ClassName;
-    dataName = className;
+    dataName = mi.Name;
     Publisher* pModule = nullptr;// = (BaseModule)Assembly.LoadFile(Path.GetFullPath(@".\" + assemblyName)).CreateInstance(className);
     pModule = LibraryLoader::load<Publisher>(assemblyName);
     if( pModule == nullptr )
@@ -36,7 +35,9 @@ void DataSubscribe::Setup(ModuleInfo &mi)
     {
         Publisher* runable = dynamic_cast<Publisher*>(pModule);
         //runable->shares(shares);
-        connect(runable, SIGNAL(pubData(const QString& ,const QString& ,const QVariant& )), dataConsumer, SLOT(OnData(const QString& ,const QString& ,const QVariant&)));
+        for(auto it =  dataConsumerList.begin(); it != dataConsumerList.end(); it++){
+            connect(runable, SIGNAL(pubData(const QString& ,const QString& ,const QVariant& )), *it, SLOT(OnData(const QString& ,const QString& ,const QVariant&)));
+        }
         runable->Run();
     }
 }
@@ -55,7 +56,7 @@ std::string DataSubscribe::getTypeList()
 
 void DataSubscribe::regConsumer(DataConsumer *dc)
 {
-    dataConsumer = dc;
+    dataConsumerList.append(dc);
     qDebug() << "register consumer";
 }
 

+ 1 - 1
DataManagerMain/DataSubscribe.h

@@ -25,7 +25,7 @@ public:
 //     virtual void setLoader(QLibrary*);
 private:
     QLibrary* library = nullptr;
-    DataConsumer* dataConsumer = nullptr;
+    QList<DataConsumer*> dataConsumerList;
     std::string dataName;
     ModuleInfo mi;
     SharedData* shares;

+ 1 - 0
DataManater.pro

@@ -7,6 +7,7 @@ SUBDIRS += \
     modules/RedisClient \
     modules/TDengineClient \
     DataConsumer/TDengine \
+    DataConsumer/Alarm \
     DataManagerMain/DataManagerMain.pro \
     DataSubscribe/RedisSubscriber   \
     DataSubscribe/TDengineSubscriber

+ 2 - 1
include/Client.h

@@ -2,6 +2,7 @@
 #include <QtCore/QThread>
 #include <QString>
 #include <QtCore/QLibrary>
+#include "Define.h"
 class Client{
 
 public:
@@ -9,7 +10,7 @@ public:
     virtual ~Client(){}
     virtual void setLoader(QLibrary*) = 0;
     virtual void OnData(const QString& user, const QString& key, const QVariant& val) = 0;
-    virtual void Run() = 0;
+    virtual void Run(const ConsumerInfo& ci) = 0;
 };
 
 

+ 2 - 1
include/Define.h

@@ -54,10 +54,11 @@ struct DeviceInfo
 };
 
 struct ConsumerInfo{
-    std::string name;
+    std::string Name;
     std::string AssemblyName;       // 组件程序(dll、jar)名字,含路径
     std::string ClassName;          // 组件的 类名
     std::string SubscribeName;
+    std::string Settings;
 };
 
 struct DataManagerInfo{

+ 8 - 8
modules/RedisClient/RedisClient.cpp

@@ -336,7 +336,7 @@ QString RedisClient::lpop(const QString& lData)
 }
 
 //阻塞从队列头部获取最早数据
-bool RedisClient::blpop(QString lData, redisCallbackFn *fn)
+bool RedisClient::blpop(const QString& lData, redisCallbackFn *fn)
 {
     if (rac == nullptr)
     {
@@ -370,7 +370,7 @@ bool RedisClient::blpop(QString lData, redisCallbackFn *fn)
 }
 
 //同步阻塞一定时间返回数据
-QString RedisClient::blpop(QString lData, quint32 timeout)
+QString RedisClient::blpop(const QString& lData, quint32 timeout)
 {
     //最小为1,防止0时永久阻塞
     if (timeout < 1)
@@ -394,7 +394,7 @@ QString RedisClient::blpop(QString lData, quint32 timeout)
     return ret;
 }
 
-bool RedisClient::set(QString k, QString v)
+bool RedisClient::set(const QString& k, const QString& v)
 {
     bool ret = false;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("set " + k + " " + v).toStdString().c_str()); //支持空格
@@ -416,7 +416,7 @@ bool RedisClient::set(QString k, QString v)
     return ret;
 }
 
-bool RedisClient::setb(QString k, QByteArray &v)
+bool RedisClient::setb(const QString& k, const QByteArray &v)
 {
     bool ret = false;
     const char *arg[3]; //3个参数(cmd, k, v)
@@ -449,7 +449,7 @@ bool RedisClient::setb(QString k, QByteArray &v)
     return ret;
 }
 
-QString RedisClient::get(QString k)
+QString RedisClient::get(const QString& k)
 {
     QString ret;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str());
@@ -465,7 +465,7 @@ QString RedisClient::get(QString k)
     return ret;
 }
 
-QByteArray RedisClient::getb(QString k)
+QByteArray RedisClient::getb(const QString& k)
 {
     QByteArray ret;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str());
@@ -482,7 +482,7 @@ QByteArray RedisClient::getb(QString k)
     return ret;
 }
 
-bool RedisClient::expire(QString k,int sec)
+bool RedisClient::expire(const QString& k,int sec)
 {
     bool ret = false;
     redisReply * reply = (redisReply*)redisCommand(rc, QString("expire " + k + " " + QString::number(sec, 10)).toStdString().c_str());
@@ -618,6 +618,6 @@ void RedisClient::Setup(const QString& addr, uint _port,const QString&password)
     ip = addr;
     port = _port;
     auth = password;
-
     redisOk = false;
+    conn();
 }

+ 3 - 3
modules/RedisClient/RedisClient.h

@@ -23,7 +23,7 @@ public slots:
     QString hget(const QString& m, const QString& k);
     bool set(const QString& k, const QString& v);
     bool setb(const QString& k,const QByteArray &v);
-    const QString& get(const QString& k);
+    QString get(const QString& k);
     QByteArray getb(const QString& k);
     QStringList hkeys(const QString& k);
     QStringList hvals(const QString& k);
@@ -33,8 +33,8 @@ public slots:
     QString lpop(const QString& lData);
     QString blpop(const QString& lData, quint32 timeout);          //同步阻塞一定时间返回数据
     bool blpop(const QString& lData, redisCallbackFn *fn);         //阻塞从队列头部获取最早数据
-    bool publish(const QString&& ch, const QString&& js);
-    bool publishb(const QString&& ch, const QByteArray& ba);
+    bool publish(const QString& ch, const QString& js);
+    bool publishb(const QString& ch, const QByteArray& ba);
     bool expire(const QString& k,int sec);
     void subscribe(const QString& ch, redisCallbackFn *fn, void* data = nullptr);        // 订阅
     void psubscribe(const QString& ch, redisCallbackFn *fn, void* priData = nullptr);       // 订阅:模式匹配

+ 3 - 3
modules/TDengineClient/TDengineClient.cpp

@@ -118,7 +118,7 @@ tmq_list_t* TDengineClient::buildTopicList()
 {
     tmq_list_t* tmqList = tmq_list_new();
 
-    std::list<std::string>::iterator itr;
+    std::set<std::string>::iterator itr;
     for( itr = topicList.begin(); itr != topicList.end(); ++itr )
     {
         std::string szTopic = *itr;
@@ -209,14 +209,14 @@ void TDengineClient::subscribe(QString ch, std::function<void(const char* topic,
 {
     callback = fn;
     usrData = usrdata;
-    topicList.push_back(ch.toLocal8Bit().toStdString());
+    topicList.insert(ch.toLocal8Bit().toStdString());
 }
 
 void TDengineClient::psubscribe(QString ch, std::function<void(const char* topic, const char* data, void*usr)> fn, void*usrdata)
 {
     callback = fn;
     usrData = usrdata;
-    topicList.push_back(ch.toLocal8Bit().toStdString());
+    topicList.insert(ch.toLocal8Bit().toStdString());
 }
 
 void TDengineClient::Setup(const char* _host, const char* _user, const char* _passwd, uint16_t _port)

+ 2 - 1
modules/TDengineClient/TDengineClient.h

@@ -7,6 +7,7 @@
 #include "libaray_symbols.h"
 #include <functional>
 #include <QFuture>
+#include <set>
 class TDENGINECLIENT_EXPORT TDengineClient : public QObject
 {
     Q_OBJECT
@@ -41,7 +42,7 @@ private:
 
     //TAOS* pConn = NULL;
     //EventSubInterface*  g_pSubCB;       // 订阅回调
-    std::list<std::string>  topicList;
+    std::set<std::string>  topicList;
 
     void * usrData;
     QFuture<void> future;