Quellcode durchsuchen

实现数据越限报警

舍得 vor 1 Jahr
Ursprung
Commit
ff7a8f8da7

+ 34 - 2
AlarmRecognition/Module.cpp

@@ -8,6 +8,27 @@
 #include <stdlib.h>
 #include <math.h>
 
+void split(const std::string source,const std::string delim,std::vector<std::string>& dest)
+{
+    long n = delim.length();
+    long lt = source.length();
+
+    long last = 0;
+    size_t index = source.find_first_of(delim,last);
+    while(index != std::string::npos)
+    {
+        dest.push_back(source.substr(last,index-last));
+        last = index + n;
+        index = source.find_first_of(delim,last);
+    }
+
+    long len = source.length();
+    if( len - last > 0)
+    {
+        dest.push_back(source.substr(last,len - last));
+    }
+}
+
 Module::Module()
     :BaseModule()
 {
@@ -63,7 +84,7 @@ void Module::Setup(ModuleInfo mi)
             }
 
             std::string szOp = json["Operator"].toString().toLocal8Bit().toStdString();
-            if( strcasecmp(szOp.c_str(), "rang") == 0)
+            if( strcasecmp(szOp.c_str(), "range") == 0)
             {
                 tr.dbMin = dbOperator1;
                 tr.dbMax = dbOperator2;
@@ -87,7 +108,18 @@ void Module::OnSubData(std::string name,std::string val)
     {
         return;
     }
+
+    if( val.find(' ') != std::string::npos )
+    {
+        std::vector<std::string> dest;
+        split(val," ",dest);
+        val = *dest.rbegin();
+    }
     uint nMaxWinSize = m_mpRangeConfig[name].uWinSize;
+    if( nMaxWinSize <= 0 )
+    {
+        nMaxWinSize = 1;
+    }
 
     // add data
     m_objDataLock.lockForWrite();
@@ -126,7 +158,7 @@ void Module::OnSubData(std::string name,std::string val)
     }
     m_objDataLock.unlock();
 
-    qDebug() << __FILE__ << __LINE__ << name.c_str() << m_mpRangeConfig[name].nStatus;
+    //qDebug() << __FILE__ << __LINE__ << name.c_str() << m_mpRangeConfig[name].nStatus;
     std::vector<std::string>::iterator itr;
     for( itr = m_szOutputs.begin(); itr != m_szOutputs.end(); ++itr )
     {

+ 11 - 10
EngineAlarm/DataEngine.cpp

@@ -31,7 +31,7 @@ void DataEngine::OnData(std::string name,QVariant val)
 //    }
     if( m_pTDengine != nullptr )
     {
-        m_pTDengine->Publish(name, val);
+ //       m_pTDengine->Publish(name, val);
     }
     else
     {
@@ -47,7 +47,7 @@ void DataEngine::OnData(std::string name,QVariant val)
     }
 }
 
-void DataEngine::sltSubData(char* topic, char* msg)
+void DataEngine::sltSubData(const std::string topic, const std::string msg)
 {
     if(m_mpDevControler.find(topic) == m_mpDevControler.end())
     {
@@ -112,7 +112,10 @@ void DataEngine::Startup()
     m_pRedis->Connect(mpConfig["redis"]);           // redis
 
     m_pTDengine = new TDAgent();
-    m_pTDengine->Connect(mpConfig["tdengine"]);       // td
+    m_pTDengine->Connect(mpConfig["tdengine"]);     // td
+
+    m_pMqtt = new MQTTAgent();
+    m_pMqtt->Connect(mpConfig["mqtt"]);             // mqtt
 
     // server.id
     Config config;
@@ -132,16 +135,14 @@ void DataEngine::Startup()
         std::vector<Setting>::iterator itrT;
         for( itrT = di.ModuleInfo.vSettings.begin(); itrT != di.ModuleInfo.vSettings.end(); ++itrT )
         {
-            lstTopics.push_back("topic_" + itrT->Name);
+            std::string szTopic = "topic_" + itrT->Name;
+            lstTopics.push_back(szTopic);
+            m_mpDevControler[szTopic] = pControler;
         }
         m_pTDengine->Subscribe(lstTopics);
 
-        connect(m_pTDengine,SIGNAL(sigEvent(char*, char*)),this,SLOT(sltSubData(char*, char*)));
-        std::list<std::string>::iterator itrI;
-        for( itrI = di.ModuleInfo.lstInputs.begin(); itrI != di.ModuleInfo.lstInputs.end(); ++itrI )
-        {
-            m_mpDevControler[*itrI] = pControler;
-        }
+        qRegisterMetaType<std::string>("std::string");
+        connect(m_pTDengine,SIGNAL(sigEvent(const std::string, const std::string)),this,SLOT(sltSubData(const std::string, const std::string)));
     }
 }
 

+ 1 - 1
EngineAlarm/DataEngine.h

@@ -23,7 +23,7 @@ private:
              DeviceController*>  m_mpDevControler;
 
 public slots:
-    void sltSubData(char* topic, char* msg);
+    void sltSubData(const std::string, const std::string);
 
 public:
     virtual void OnData(std::string name,QVariant val);

+ 1 - 1
EngineAlarm/DeviceManagerProxy.cpp

@@ -92,7 +92,7 @@ std::list<DeviceInfo> DeviceManagerProxy::loadDevices(std::string id,std::string
                 szJson += "{";
                 szJson += QString("\"Operator\":\"%1\",").arg(ar.Operator.c_str());
                 szJson += QString("\"Operator1\":%1,").arg(ar.dbOperator1);
-                szJson += QString("\"Operator2\":%1,").arg(ar.dbOperator1);
+                szJson += QString("\"Operator2\":%1,").arg(ar.dbOperator2);
                 szJson += QString("\"KeepTimes\":%1").arg(ar.nKeepTimes);
                 szJson += "}";
                 Setting oSet;

+ 6 - 2
MQTTAgent/MQTTAgent.cpp

@@ -50,7 +50,7 @@ bool MQTTAgent::Connect(tagSetup ts)
     m_pMqtt->setCleanSession(true); //非持久化连接,上线时,将不再关心之前所有的订阅关系以及离线消息
 
     m_pMqtt->setVersion(QMQTT::V3_1_1);
-    qDebug()<< __FILE__ << __LINE__<< "ver" << m_pMqtt->version();
+    //qDebug()<< __FILE__ << __LINE__<< "ver" << m_pMqtt->version();
 
     m_pMqtt->connectToHost();
 
@@ -101,7 +101,11 @@ void MQTTAgent::Publish(std::string szKey,QVariant v)
     message.setTopic(szKey.c_str());
     message.setPayload(szContent.c_str());
     message.setRetain(true); //保留最后一条数据
-    m_pMqtt->publish(message);
+    if( m_pMqtt != nullptr )
+    {
+        m_pMqtt->publish(message);
+        qDebug() << __FILE__ << __LINE__ << szKey.c_str() << " " << szContent.c_str();
+    }
 }
 
 QStringList MQTTAgent::hvals(QString k)

+ 2 - 2
MQTTAgent/MQTTAgent.h

@@ -23,10 +23,10 @@ private:
 private:
 
 signals:
-    void sigEvent(char* topic, char* msg);
+    void sigEvent(const std::string topic, const std::string msg);
 
 private:
-    //void SubCB(char* topic, char* msg);
+    void SubCB(const std::string topic, const std::string msg){};
 
 public:
     bool Connect(tagSetup ts);

+ 1 - 1
RedisAgent/RedisAgent.cpp

@@ -9,7 +9,7 @@ RedisAgent::~RedisAgent()
 {
 }
 
-void RedisAgent::SubCB(char* topic, char* msg)
+void RedisAgent::SubCB(const std::string topic, const std::string msg)
 {
     emit sigEvent(topic, msg);
 }

+ 2 - 2
RedisAgent/RedisAgent.h

@@ -24,10 +24,10 @@ private:
     Redis objRedis;
 
 signals:
-    void sigEvent(char* topic, char* msg);
+    void sigEvent(const std::string topic, const std::string msg);
 
 private:
-    void SubCB(char* topic, char* msg);
+    void SubCB(const std::string topic, const std::string msg);
 
 public:
     bool Connect(tagSetup ts);

+ 1 - 1
TDAgent/TDAgent.cpp

@@ -11,7 +11,7 @@ TDAgent::~TDAgent()
 {
 }
 
-void TDAgent::SubCB(char* topic, char* msg)
+void TDAgent::SubCB(const std::string topic, const std::string msg)
 {
     emit sigEvent(topic, msg);
 }

+ 2 - 2
TDAgent/TDAgent.h

@@ -25,10 +25,10 @@ private:
 private:
 
 signals:
-    void sigEvent(char* topic, char* msg);
+    void sigEvent(const std::string topic, const std::string msg);
 
 private:
-    void SubCB(char* topic, char* msg);
+    void SubCB(const std::string topic, const std::string msg);
 
 public:
     bool Connect(tagSetup ts);

+ 10 - 7
TDAgent/TDengine.cpp

@@ -24,9 +24,9 @@ int32_t TDengine::msgProcess(TAOS_RES* msg)
     const char* dbName = tmq_get_db_name(msg);
     int32_t     vgroupId = tmq_get_vgroup_id(msg);
 
-    qDebug() << __FILE__ << __LINE__ << "topic: " << topicName;
-    qDebug() << __FILE__ << __LINE__ << "db: " << dbName;
-    qDebug() << __FILE__ << __LINE__ << "vgroup id:" << vgroupId;
+    //qDebug() << __FILE__ << __LINE__ << "db: " << dbName;
+    //qDebug() << __FILE__ << __LINE__ << "topic: " << topicName;
+    //qDebug() << __FILE__ << __LINE__ << "vgroup id:" << vgroupId;
 
     while (1)
     {
@@ -42,9 +42,11 @@ int32_t TDengine::msgProcess(TAOS_RES* msg)
         taos_print_row(buf, row, fields, numOfFields);
         if( g_pSubCB != nullptr )
         {
-            g_pSubCB->SubCB((char*)topicName,buf);
+            std::string topic = topicName;
+            std::string content = buf;
+            g_pSubCB->SubCB((char*)topic.c_str(),(char*)content.c_str());
         }
-        qDebug() << __FILE__ << __LINE__ << "row content: " << buf;
+        //qDebug() << __FILE__ << __LINE__ << "row content: " << buf;
     }
 
     return rows;
@@ -109,7 +111,7 @@ tmq_t* TDengine::buildConsumer()
     //tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); //禁用提交回调
 
     tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
-    qDebug()<< __FILE__ << __LINE__ << "tmq" << tmq;
+    //qDebug()<< __FILE__ << __LINE__ << "tmq" << tmq;
     tmq_conf_destroy(conf);
     return tmq;
 }
@@ -122,7 +124,8 @@ tmq_list_t* TDengine::buildTopicList()
     std::list<std::string>::iterator itr;
     for( itr = g_lstTopics.begin(); itr != g_lstTopics.end(); ++itr )
     {
-        int32_t code = tmq_list_append(topicList, itr->c_str());
+        std::string szTopic = *itr;
+        int32_t code = tmq_list_append(topicList, szTopic.c_str());
         if (code) {
             return NULL;
         }

+ 1 - 1
include/MWareInterface.h

@@ -14,5 +14,5 @@ struct tagSetup
 class EventSubInterface
 {
 public:
-    virtual void SubCB(char* topic, char* msg) = 0;
+    virtual void SubCB(const std::string topic, const std::string msg) = 0;
 };