浏览代码

1、通信报警根据要求修改
2、两个报警,由于配置差异较大,分开实现加载
3、Runnable模块,消息模式更改为信号槽机制,无法直接调用mqtt

舍得 1 年之前
父节点
当前提交
2b230beb8d
共有 5 个文件被更改,包括 107 次插入54 次删除
  1. 34 20
      CommRecognition/Module.cpp
  2. 14 2
      CommRecognition/Module.h
  3. 53 30
      EngineAlarm/DataEngine.cpp
  4. 4 0
      EngineAlarm/DataEngine.h
  5. 2 2
      EngineAlarm/DeviceManagerProxy.cpp

+ 34 - 20
CommRecognition/Module.cpp

@@ -37,7 +37,7 @@ Module::Module()
     m_szOutputs.clear();
     m_mpParamConfig.clear();
     m_mpDataValue.clear();
-    m_nLoopIntervalMS = UINT_FAST32_MAX;
+    m_nLoopIntervalMS = 100;//UINT_FAST32_MAX;
 }
 
 bool Module::isInheritedFrom(std::string tp)
@@ -57,7 +57,7 @@ void Module::Setup(ModuleInfo mi)
     std::list<DataItem>::iterator itrO;
     for( itrO = mi.Properties.begin(); itrO != mi.Properties.end(); ++itrO )
     {
-        m_szOutputs.push_back(mi.Code + "." + itrO->Code);
+        m_szOutputs.push_back(mi.Code + "_" + itrO->Code);
     }
 
     std::vector<Setting>::iterator itr;
@@ -115,40 +115,41 @@ void Module::Check()
         std::map<std::string,QDateTime>::iterator itrR;
         for( itrR = mpCache.begin(); itrR != mpCache.end(); ++itrR )
         {
-            if( m_mpParamConfig.find(itrR->first) == m_mpParamConfig.end())
+            std::string szTable = itrR->first;
+            if( m_mpParamConfig.find(szTable) == m_mpParamConfig.end())
             {
                 continue;
             }
 
-            uint nIntervalMS = m_mpParamConfig[itrR->first].uIntervalMS;
+            uint nIntervalMS = m_mpParamConfig[szTable].uIntervalMS;
             qint64 tDiff = itrR->second.msecsTo(dtNow);
             if( tDiff > nIntervalMS )
             {
-                if( m_mpParamConfig[itrR->first].nStatus != EAS_Breaked )
+                if( m_mpParamConfig[szTable].nStatus != EAS_Breaked )
                 {
-                    m_mpParamConfig[itrR->first].nStatus = EAS_Breaked;
-                    m_mpParamConfig[itrR->first].bChanged = true;
+                    m_mpParamConfig[szTable].nStatus = EAS_Breaked;
+                    m_mpParamConfig[szTable].bChanged = true;
 
-                    pubStatus(itrR->first,EAS_Breaked);
+                    pubStatus(szTable,EAS_Breaked);
                 }
                 else
                 {
-                    m_mpParamConfig[itrR->first].bChanged = false;
+                    m_mpParamConfig[szTable].bChanged = false;
                 }
             }
             else
             {
-                if( m_mpParamConfig[itrR->first].nStatus == EAS_Breaked )
+                if( m_mpParamConfig[szTable].nStatus == EAS_Breaked )
                 {
-                    m_mpParamConfig[itrR->first].nStatus = EAS_Recover;
-                    m_mpParamConfig[itrR->first].bChanged = true;
+                    m_mpParamConfig[szTable].nStatus = EAS_Recover;
+                    m_mpParamConfig[szTable].bChanged = true;
 
-                    pubStatus(itrR->first,EAS_Breaked);
+                    pubStatus(szTable,EAS_Recover);
                 }
                 else
                 {
-                    m_mpParamConfig[itrR->first].nStatus = EAS_Normal;
-                    m_mpParamConfig[itrR->first].bChanged = false;
+                    m_mpParamConfig[szTable].nStatus = EAS_Normal;
+                    m_mpParamConfig[szTable].bChanged = false;
                 }
             }
         }
@@ -162,31 +163,44 @@ void Module::pubStatus(std::string name,int nStatus)
     std::vector<std::string>::iterator itr;
     for( itr = m_szOutputs.begin(); itr != m_szOutputs.end(); ++itr )
     {
+        std::string topic = *itr;
         QString szStatus;
         switch( nStatus )
         {
         case EAS_Normal:
-            szStatus = "Normal";
+            szStatus = "O";
             break;
         case EAS_Breaked:
-            szStatus = "Breaked";
+            szStatus = "B";
             break;
         case EAS_Recover:
-            szStatus = "Recover";
+            szStatus = "R";
             break;
         }
 
-        QString szMSG = QString("{\"%1\":\"%2\"}").arg(name.c_str()).arg(szStatus);
-        m_pDataConsumer->OnData(*itr,QVariant(szMSG));
+        QString szJson;
+        szJson += "{";
+        szJson += QString("\"%1\":\"%2\"").arg(name.c_str()).arg(szStatus);
+        szJson += "}";
+
+        emit sigMqttMsg(topic,szJson);
+        //qDebug() << __FILE__ << __LINE__ << QDateTime::currentDateTime().toString("hh:mm:ss.zzz ") << szMSG;
     }
 }
 
 void Module::Run()
 {
+    connect(this,SIGNAL(sigMqttMsg(std::string,QString)),this,SLOT(sltMqttMsg(std::string,QString)),Qt::QueuedConnection);
+
     std::thread t(&Module::Check,this);
     t.detach();
 }
 
+void Module::sltMqttMsg(std::string topic,QString szJson)
+{
+    m_pDataConsumer->OnData(topic,QVariant(szJson));
+}
+
 BaseModule* instance()
 {
     return new Module();

+ 14 - 2
CommRecognition/Module.h

@@ -4,6 +4,7 @@
 #include "LockFreeQueue.h"
 #include <QtCore/QDateTime>
 #include <QtCore/QReadWriteLock>
+//#include <QtCore/QThread>
 
 enum EAlarmStatus
 {
@@ -14,15 +15,20 @@ enum EAlarmStatus
 
 // 全局唯一实例
 // 数据通断告警
-class COMMRECOGNITION_EXPORT Module : public RunnableModule
+// CREATE TABLE IF NOT EXISTS lanpengdb.mokuai3(ts timestamp, dim1 double);
+// create topic mokuai3 as select * from lanpengdb.mokuai3;
+// delete from lanpengdb.mokuai3;
+// insert into lanpengdb.mokuai3 values(NOW,12.345);
+class COMMRECOGNITION_EXPORT Module : public QObject,public RunnableModule
 {
+    Q_OBJECT
 public:
     Module();
 
 private:
     struct tagParam{
         uint uWinSize = 3;
-        uint uIntervalMS = 1000;
+        uint uIntervalMS = 10000;
         int nStatus = EAS_Breaked;
         bool bChanged = false;
     };
@@ -37,6 +43,12 @@ private:
     std::map<std::string,QDateTime>     m_mpDataValue;       // 测量数据
     QReadWriteLock                      m_objDataLock;
 
+signals:
+    void sigMqttMsg(std::string,QString);
+
+public slots:
+    void sltMqttMsg(std::string,QString);
+
 private:
     void pubStatus(std::string,int);
     void Check();

+ 53 - 30
EngineAlarm/DataEngine.cpp

@@ -81,6 +81,54 @@ void loadConfig(std::string szFile,Config& config)
     }
 }
 
+void DataEngine::startup_DataAlarm(DeviceInfo di)
+{
+    DeviceController* pControler = new DeviceController();
+    pControler->regConsumer(this);
+    pControler->CreateDevice(di);
+
+    std::list<std::string> lstTopics;
+    std::vector<Setting>::iterator itr;
+    for( itr = di.ModuleInfo.vSettings.begin(); itr != di.ModuleInfo.vSettings.end(); ++itr )
+    {
+        std::string topic = itr->Name;
+
+        lstTopics.push_back(topic);
+        m_mpDevControler[topic].push_back(pControler);
+    }
+    if( lstTopics.begin() != lstTopics.end())
+    {
+        m_pTDengine->Subscribe(lstTopics);
+    }
+}
+
+void DataEngine::startup_CommAlarm(DeviceInfo di)
+{
+    DeviceController* pControler = new DeviceController();
+    pControler->regConsumer(this);
+    pControler->CreateDevice(di);
+
+    std::list<std::string> lstTopics;
+    std::vector<Setting>::iterator itr;
+    for( itr = di.ModuleInfo.vSettings.begin(); itr != di.ModuleInfo.vSettings.end(); ++itr )
+    {
+        Setting oSet = *itr;
+        QJsonDocument jsonDoc(QJsonDocument::fromJson(oSet.qValue.toString().toLocal8Bit()));
+        QJsonArray json = jsonDoc.array();
+        for(auto i : json)
+        {
+            std::string topic = i.toString().toLocal8Bit().toStdString();
+
+            lstTopics.push_back(topic);
+            m_mpDevControler[topic].push_back(pControler);
+        }
+    }
+    if( lstTopics.begin() != lstTopics.end())
+    {
+        m_pTDengine->Subscribe(lstTopics);
+    }
+}
+
 void DataEngine::Startup()
 {
     // merge file name
@@ -121,6 +169,8 @@ void DataEngine::Startup()
 
     m_pTDengine = new TDAgent();
     m_pTDengine->Connect(mpConfig["tdengine"]);     // td
+    qRegisterMetaType<std::string>("std::string");
+    connect(m_pTDengine,SIGNAL(sigEvent(const std::string, const std::string)),this,SLOT(sltSubData(const std::string, const std::string)));
 
     m_pMqtt = new MQTTAgent();
     m_pMqtt->Connect(mpConfig["mqtt"]);             // mqtt
@@ -128,36 +178,9 @@ void DataEngine::Startup()
     // server.id
     DeviceManagerProxy dm(m_pRedis);
     DeviceInfo da = dm.loadDataAlarm("alarmrule", "plugins/AlarmRecognition.dll");
-    DeviceInfo ca = dm.loadCommAlarm("alarmruledevicestatus", "plugins/CommRecognition.dll");
-
-    std::list<DeviceInfo> lstDevices;
-    lstDevices.push_back(da);
-    lstDevices.push_back(ca);
-
-    std::list<DeviceInfo>::iterator itr;
-    for( itr = lstDevices.begin();itr!= lstDevices.end();++itr)
-    {
-        DeviceInfo di = *itr;
-        DeviceController* pControler = new DeviceController();
-        pControler->regConsumer(this);
-        pControler->CreateDevice(di);
-
-        std::list<std::string> lstTopics;
-        std::vector<Setting>::iterator itrT;
-        for( itrT = di.ModuleInfo.vSettings.begin(); itrT != di.ModuleInfo.vSettings.end(); ++itrT )
-        {
-            std::string topic = itrT->Name;
+    startup_DataAlarm(da);
 
-            lstTopics.push_back(topic);
-            m_mpDevControler[topic].push_back(pControler);
-
-            qRegisterMetaType<std::string>("std::string");
-            connect(m_pTDengine,SIGNAL(sigEvent(const std::string, const std::string)),this,SLOT(sltSubData(const std::string, const std::string)));
-        }
-        if( lstTopics.begin() != lstTopics.end())
-        {
-            m_pTDengine->Subscribe(lstTopics);
-        }
-    }
+    DeviceInfo ca = dm.loadCommAlarm("alarmruledevicestatus", "plugins/CommRecognition.dll");
+    startup_CommAlarm(ca);
 }
 

+ 4 - 0
EngineAlarm/DataEngine.h

@@ -25,6 +25,10 @@ private:
 public slots:
     void sltSubData(const std::string, const std::string);
 
+private:
+    void startup_DataAlarm(DeviceInfo di);
+    void startup_CommAlarm(DeviceInfo di);
+
 public:
     virtual void OnData(std::string name,QVariant val);
     void Startup();

+ 2 - 2
EngineAlarm/DeviceManagerProxy.cpp

@@ -139,11 +139,11 @@ DeviceInfo DeviceManagerProxy::loadCommAlarm(std::string id,std::string app)
     }
 
     ModuleInfo mi;
-    mi.Code = "alarm";
+    mi.Code = "device";
     mi.AssemblyName = app;
 
     DataItem oDT;
-    oDT.Code = "comm";
+    oDT.Code = "alarm";
     mi.Properties.push_back(oDT);
     QHash<QString, QString> mps = m_pRedis->hgetall(id.c_str());
     QHash<QString, QString>::iterator itr;