Prechádzať zdrojové kódy

1、AlarmRuler增加如下,及相关业务流程

    std::string alarmRuleId;        // 节点-标识码
    int alarmRuleVersion;
    int level;
    int status;
    std::string object;
2、MQTTAgent.publish 接受到的数据就是json,无需处理了
舍得 1 rok pred
rodič
commit
1e8b0f0c6c

+ 110 - 85
AlarmRecognition/Module.cpp

@@ -8,27 +8,6 @@
 #include <stdlib.h>
 #include <math.h>
 
-void split(const std::string source,const std::string delim,std::vector<std::string>& dest)
-{
-    long n = delim.length();
-    long lt = source.length();
-
-    long last = 0;
-    size_t index = source.find_first_of(delim,last);
-    while(index != std::string::npos)
-    {
-        dest.push_back(source.substr(last,index-last));
-        last = index + n;
-        index = source.find_first_of(delim,last);
-    }
-
-    long len = source.length();
-    if( len - last > 0)
-    {
-        dest.push_back(source.substr(last,len - last));
-    }
-}
-
 Module::Module()
     :BaseModule()
 {
@@ -54,7 +33,7 @@ void Module::Setup(ModuleInfo mi)
     std::list<DataItem>::iterator itrO;
     for( itrO = mi.Properties.begin(); itrO != mi.Properties.end(); ++itrO )
     {
-        m_szOutputs.push_back(mi.Code + "." + itrO->Code);
+        m_szOutputs.push_back(mi.Code + "_" + itrO->Code);
     }
 
     std::vector<Setting>::iterator itr;
@@ -65,10 +44,31 @@ void Module::Setup(ModuleInfo mi)
         QStringList keys = json.keys();
 
         tagRange tr;
+        tr.status = 0;
         if( keys.contains("KeepTimes",Qt::CaseInsensitive) )
         {
             tr.uWinSize = abs(json["KeepTimes"].toInt());
         }
+        if( keys.contains("DataName",Qt::CaseInsensitive) )
+        {
+            tr.szName = json["DataName"].toString();
+        }
+        if( keys.contains("alarmRuleId",Qt::CaseInsensitive) )
+        {
+            tr.alarmRuleId = json["alarmRuleId"].toString().toLocal8Bit().toStdString();
+        }
+        if( keys.contains("alarmRuleVersion",Qt::CaseInsensitive) )
+        {
+            tr.alarmRuleVersion = json["alarmRuleVersion"].toInt();
+        }
+        if( keys.contains("level",Qt::CaseInsensitive) )
+        {
+            tr.level = json["level"].toInt();
+        }
+        if( keys.contains("object",Qt::CaseInsensitive) )
+        {
+            tr.object = json["object"].toString().toLocal8Bit().toStdString();
+        }
         if( keys.contains("Operator",Qt::CaseInsensitive) )
         {
             double dbOperator1 = 0;
@@ -97,90 +97,115 @@ void Module::Setup(ModuleInfo mi)
             {
                 tr.dbMin = dbOperator1;
             }
-            m_mpRangeConfig["topic_" + itr->Name] = tr;
+            m_mpRangeConfig[tr.szName.toLocal8Bit().toStdString()] = tr;
         }
     }
 }
 
-void Module::OnSubData(std::string name,std::string val)
+void Module::OnSubData(std::string table,std::string val)
 {
-    if( m_mpRangeConfig.find(name) == m_mpRangeConfig.end())
-    {
-        return;
-    }
-
-    if( val.find(' ') != std::string::npos )
-    {
-        std::vector<std::string> dest;
-        split(val," ",dest);
-        val = *dest.rbegin();
-    }
-    uint nMaxWinSize = m_mpRangeConfig[name].uWinSize;
-    if( nMaxWinSize <= 0 )
-    {
-        nMaxWinSize = 1;
-    }
-
-    // add data
-    m_objDataLock.lockForWrite();
-    if( m_mpDataValue.find(name) == m_mpDataValue.end())
-    {
-        m_mpDataValue[name].push_back(tagKeyValue(name,val.length()>0 ? std::stod(val):0));
-    }
-    else
+    // iterate data
+    QJsonParseError err;
+    QJsonDocument jsonDoc(QJsonDocument::fromJson(val.c_str(),&err));
+    QJsonObject jsonObject = jsonDoc.object();
+    QStringList keys = jsonObject.keys();
+    foreach(QString dn,keys)
     {
-        while( m_mpDataValue[name].size() >= nMaxWinSize)
+        double dbDataValue = jsonObject[dn].toDouble();
+        std::string szDataName = dn.toLocal8Bit().toStdString();
+        // has table
+        if( m_mpRangeConfig.find(szDataName) == m_mpRangeConfig.end())
         {
-            m_mpDataValue[name].erase(m_mpDataValue[name].begin());
+            continue;
         }
-        m_mpDataValue[name].push_back(tagKeyValue(name,val.length()>0 ? std::stod(val):0));
-    }
 
-    // alarm recognition
-    m_mpRangeConfig[name].nStatus = EAS_Both;
-    std::list<tagKeyValue>::iterator itrV;
-    for( itrV = m_mpDataValue[name].begin(); itrV != m_mpDataValue[name].end(); ++itrV)
-    {
-        tagKeyValue tKV = *itrV;
-
-        if( itrV->value > m_mpRangeConfig[name].dbMax )
-        {
-            m_mpRangeConfig[name].nStatus &= 0x01;
-        }
-        else if( itrV->value < m_mpRangeConfig[name].dbMin )
+        // cache data
+        m_objDataLock.lockForWrite();
+        tagKeyValue kv(szDataName,dbDataValue);
+        if( m_mpDataValue.find(szDataName) == m_mpDataValue.end())
         {
-            m_mpRangeConfig[name].nStatus &= 0x02;
+            m_mpDataValue[szDataName].push_back(kv);
         }
         else
         {
-            m_mpRangeConfig[name].nStatus = EAS_Normal;
+            // win size
+            uint nMaxWinSize = m_mpRangeConfig[szDataName].uWinSize;
+            if( nMaxWinSize <= 0 )
+            {
+                nMaxWinSize = 1;
+            }
+
+            while( m_mpDataValue[szDataName].size() >= nMaxWinSize)
+            {
+                m_mpDataValue[szDataName].erase(m_mpDataValue[szDataName].begin());
+            }
+            m_mpDataValue[szDataName].push_back(kv);
         }
+        m_objDataLock.unlock();
     }
-    m_objDataLock.unlock();
 
-    //qDebug() << __FILE__ << __LINE__ << name.c_str() << m_mpRangeConfig[name].nStatus;
-    std::vector<std::string>::iterator itr;
-    for( itr = m_szOutputs.begin(); itr != m_szOutputs.end(); ++itr )
+    // alarm recognition
+    m_objDataLock.lockForRead();
+    std::map<std::string,std::list<tagKeyValue>>::iterator itr;
+    for( itr = m_mpDataValue.begin(); itr != m_mpDataValue.end(); ++itr )
     {
-        QString szStatus;
-        switch( m_mpRangeConfig[name].nStatus )
+        std::string szDataName = itr->first;
+
+        m_mpRangeConfig[szDataName].nStatus = EAS_Normal;
+        std::list<tagKeyValue>::iterator itrV;
+        for( itrV = m_mpDataValue[szDataName].begin(); itrV != m_mpDataValue[szDataName].end(); ++itrV)
         {
-        case EAS_Normal:
-            szStatus = "Normal";
-            break;
-        case EAS_OverThan:
-            szStatus = "OverThan";
-            break;
-        case EAS_UnderThan:
-            szStatus = "OverThan";
-            break;
-        case EAS_Both:
-            szStatus = "Both";
-            break;
+            tagKeyValue tKV = *itrV;
+
+            if( itrV->value > m_mpRangeConfig[szDataName].dbMax )
+            {
+                m_mpRangeConfig[szDataName].nStatus |= 0x01;
+            }
+            else if( itrV->value < m_mpRangeConfig[szDataName].dbMin )
+            {
+                m_mpRangeConfig[szDataName].nStatus |= 0x02;
+            }
+            else
+            {
+                m_mpRangeConfig[szDataName].nStatus = EAS_Normal;
+            }
         }
 
-        m_pDataConsumer->OnData(*itr,QVariant(szStatus));
+        //qDebug() << __FILE__ << __LINE__ << name.c_str() << m_mpRangeConfig[name].nStatus;
+        std::vector<std::string>::iterator itr;
+        for( itr = m_szOutputs.begin(); itr != m_szOutputs.end(); ++itr )
+        {
+            QString szStatus;
+            switch( m_mpRangeConfig[szDataName].nStatus )
+            {
+            case EAS_Normal:
+                szStatus = "[]";
+                break;
+            case EAS_OverThan:
+                szStatus = ">";
+                break;
+            case EAS_UnderThan:
+                szStatus = "<";
+                break;
+            case EAS_Both:
+                szStatus = "><";
+                break;
+            }
+
+            QString szJson;
+            szJson += "{";
+            szJson += QString("\"occurTime\":\"%1\",").arg(QDateTime::currentDateTime().toString("yyyy-MM-dd HH:mm:ss:zzz"));
+            szJson += QString("\"status\":%1,").arg(m_mpRangeConfig[szDataName].status);
+            szJson += QString("\"content\":\"%1 %2\",").arg(szDataName.c_str()).arg(szStatus);
+            szJson += QString("\"alarmRuleId\":\"%1\",").arg(m_mpRangeConfig[szDataName].alarmRuleId.c_str());
+            szJson += QString("\"alarmRuleVersion\":%1,").arg(m_mpRangeConfig[szDataName].alarmRuleVersion);
+            szJson += QString("\"level\":%1,").arg(m_mpRangeConfig[szDataName].level);
+            szJson += QString("\"object\":\"%1\"").arg(m_mpRangeConfig[szDataName].object.c_str());
+            szJson += "}";
+            m_pDataConsumer->OnData(*itr,QVariant(szJson));
+        }
     }
+    m_objDataLock.unlock();
 }
 
 BaseModule* instance()

+ 21 - 2
AlarmRecognition/Module.h

@@ -15,6 +15,10 @@ enum EAlarmStatus
 
 // 全局唯一实例
 // 数据越限报警
+// CREATE TABLE IF NOT EXISTS lanpengdb.mokuai2(ts timestamp, dim1 double);
+// create topic mokuai2 as select * from lanpengdb.mokuai2;
+// delete from lanpengdb.mokuai2;
+// insert into lanpengdb.mokuai2 values(NOW,12.345);
 class ALARMRECOGNITION_EXPORT Module : public BaseModule
 {
 public:
@@ -22,10 +26,25 @@ public:
 
 private:
     struct tagRange{
+        QString szName;                 // data name
         double dbMax = __DBL_MAX__;
         double dbMin = __DBL_MIN__;
         uint uWinSize = 5;
         int nStatus = EAS_Normal;
+
+        std::string alarmRuleId;        // 节点-标识码
+        int alarmRuleVersion;
+        int level;
+        int status;
+        std::string object;
+        tagRange()
+        {
+            szName.clear();
+            dbMax = __DBL_MAX__;
+            dbMin = __DBL_MIN__;
+            uWinSize = 5;
+            nStatus = EAS_Normal;
+        }
     };
     struct  tagKeyValue{
         std::string name = "";
@@ -45,8 +64,8 @@ private:
 
 private:
     DataConsumer*                       m_pDataConsumer = nullptr;
-    std::vector<std::string>            m_szOutputs;         // 数据名称
-    std::map<std::string,tagRange>      m_mpRangeConfig;     // 数据范围
+    std::vector<std::string>            m_szOutputs;         // 输出名称
+    std::map<std::string,tagRange>      m_mpRangeConfig;     // 数据范围:data name - confit
 
 private:
     std::map<std::string,std::list<tagKeyValue>>

+ 20 - 7
EngineAlarm/DataEngine.cpp

@@ -54,7 +54,15 @@ void DataEngine::sltSubData(const std::string topic, const std::string msg)
         return;
     }
 
-    m_mpDevControler[topic]->OnSubData(topic,msg);
+    std::list<DeviceController*>::iterator itr;
+    for( itr = m_mpDevControler[topic].begin(); itr != m_mpDevControler[topic].end(); ++itr)
+    {
+        DeviceController* pControler = *itr;
+        if( pControler != nullptr )
+        {
+            pControler->OnSubData(topic,msg);
+        }
+    }
 }
 
 void loadConfig(std::string szFile,Config& config)
@@ -138,13 +146,18 @@ void DataEngine::Startup()
         std::vector<Setting>::iterator itrT;
         for( itrT = di.ModuleInfo.vSettings.begin(); itrT != di.ModuleInfo.vSettings.end(); ++itrT )
         {
-            lstTopics.push_back(itrT->Name);
-            m_mpDevControler[itrT->Name] = pControler;
-        }
-        m_pTDengine->Subscribe(lstTopics);
+            std::string topic = itrT->Name;
 
-        qRegisterMetaType<std::string>("std::string");
-        connect(m_pTDengine,SIGNAL(sigEvent(const std::string, const std::string)),this,SLOT(sltSubData(const std::string, const std::string)));
+            lstTopics.push_back(topic);
+            m_mpDevControler[topic].push_back(pControler);
+
+            qRegisterMetaType<std::string>("std::string");
+            connect(m_pTDengine,SIGNAL(sigEvent(const std::string, const std::string)),this,SLOT(sltSubData(const std::string, const std::string)));
+        }
+        if( lstTopics.begin() != lstTopics.end())
+        {
+            m_pTDengine->Subscribe(lstTopics);
+        }
     }
 }
 

+ 4 - 4
EngineAlarm/DataEngine.h

@@ -15,12 +15,12 @@ public:
     DataEngine();
 
 private:
-    RedisAgent*               m_pRedis;          // 读取配置
-    TDAgent*                  m_pTDengine;       // 订阅发布
-    MQTTAgent*                m_pMqtt;           // 订阅发布
+    RedisAgent*                             m_pRedis;          // 读取配置
+    TDAgent*                                m_pTDengine;       // 订阅发布
+    MQTTAgent*                            	m_pMqtt;           // 订阅发布
 
     std::map<std::string,
-             DeviceController*>  m_mpDevControler;
+             std::list<DeviceController*>>  m_mpDevControler;   // 订阅列表
 
 public slots:
     void sltSubData(const std::string, const std::string);

+ 84 - 62
EngineAlarm/DeviceManagerProxy.cpp

@@ -39,7 +39,11 @@ bool DeviceManagerProxy::parse(QString cfg,AlarmRuler& ar)
     {
         if( key.compare("code",Qt::CaseInsensitive) == 0)
         {
-            ar.NodeCode = jsonObject["code"].toString().toLocal8Bit().toStdString();
+            ar.alarmRuleId = jsonObject["code"].toString().toLocal8Bit().toStdString();
+        }
+        else if( key.compare("nodeCode",Qt::CaseInsensitive) == 0)
+        {
+            ar.NodeCode = jsonObject["nodeCode"].toString().toLocal8Bit().toStdString();
         }
         else if( key.compare("dataItemCode",Qt::CaseInsensitive) == 0)
         {
@@ -57,11 +61,21 @@ bool DeviceManagerProxy::parse(QString cfg,AlarmRuler& ar)
         {
             ar.dbOperator2 = jsonObject["operator2"].toDouble();
         }
-        else if( key.compare("operator2",Qt::CaseInsensitive) == 0)
+        else if( key.compare("continuousTimes",Qt::CaseInsensitive) == 0)
+        {
+            ar.nKeepTimes = jsonObject["continuousTimes"].toInt();
+        }
+        else if( key.compare("version",Qt::CaseInsensitive) == 0)
+        {
+            ar.alarmRuleVersion = jsonObject["version"].toInt();
+        }
+        else if( key.compare("level",Qt::CaseInsensitive) == 0)
         {
-            ar.nKeepTimes = jsonObject["operator2"].toInt();
+            ar.level = jsonObject["level"].toInt();
         }
     }
+    ar.object = ar.NodeCode + "." + ar.DataItemCode;
+    ar.status = 0;
 
     return true;
 }
@@ -69,80 +83,88 @@ bool DeviceManagerProxy::parse(QString cfg,AlarmRuler& ar)
 DeviceInfo DeviceManagerProxy::loadDataAlarm(std::string id,std::string app)
 {
     DeviceInfo di;
-    if( m_pRedis != nullptr)
+    if( m_pRedis == nullptr)
     {
-        ModuleInfo mi;
-        mi.Code = "alarm";
-        mi.AssemblyName = app;
-
-        DataItem oDT;
-        oDT.Code = "data";
-        mi.Properties.push_back(oDT);
-        QStringList lst = m_pRedis->hvals(id.c_str());
-        foreach (QString str, lst)
+        return di;
+    }
+
+    ModuleInfo mi;
+    mi.Code = "active";
+    mi.AssemblyName = app;
+
+    DataItem oDT;
+    oDT.Code = "alarm";
+    mi.Properties.push_back(oDT);
+    QStringList lst = m_pRedis->hvals(id.c_str());
+    foreach (QString str, lst)
+    {
+        if( str.isEmpty() )
+        {
+            continue;
+        }
+
+        AlarmRuler ar;
+        if( parse(str,ar) )
         {
-            if( str.isEmpty() )
-            {
-                continue;
-            }
-
-            AlarmRuler ar;
-            if( parse(str,ar) )
-            {
-                QString szJson;
-                szJson += "{";
-                szJson += QString("\"Operator\":\"%1\",").arg(ar.Operator.c_str());
-                szJson += QString("\"Operator1\":%1,").arg(ar.dbOperator1);
-                szJson += QString("\"Operator2\":%1,").arg(ar.dbOperator2);
-                szJson += QString("\"KeepTimes\":%1").arg(ar.nKeepTimes);
-                szJson += "}";
-                Setting oSet;
-                oSet.Name = ar.NodeCode;
-                oSet.qValue = szJson;
-
-                mi.vSettings.push_back(oSet);
-            }
+            QString szJson;
+            szJson += "{";
+            szJson += QString("\"DataName\":\"%1\",").arg(ar.DataItemCode.c_str());
+            szJson += QString("\"Operator\":\"%1\",").arg(ar.Operator.c_str());
+            szJson += QString("\"Operator1\":%1,").arg(ar.dbOperator1);
+            szJson += QString("\"Operator2\":%1,").arg(ar.dbOperator2);
+            szJson += QString("\"KeepTimes\":%1,").arg(ar.nKeepTimes);
+            szJson += QString("\"alarmRuleId\":\"%1\",").arg(ar.alarmRuleId.c_str());
+            szJson += QString("\"alarmRuleVersion\":%1,").arg(ar.alarmRuleVersion);
+            szJson += QString("\"level\":%1,").arg(ar.level);
+            szJson += QString("\"object\":\"%1\"").arg(ar.object.c_str());
+            szJson += "}";
+            Setting oSet;
+            oSet.Name = ar.NodeCode;
+            oSet.qValue = szJson;
+
+            mi.vSettings.push_back(oSet);
         }
-        di.ModuleInfo = mi;
     }
 
+    di.ModuleInfo = mi;
     return di;
 }
 
 DeviceInfo DeviceManagerProxy::loadCommAlarm(std::string id,std::string app)
 {
     DeviceInfo di;
-    if( m_pRedis != nullptr)
+    if( m_pRedis == nullptr)
     {
-        ModuleInfo mi;
-        mi.Code = "alarm";
-        mi.AssemblyName = app;
-
-        DataItem oDT;
-        oDT.Code = "comm";
-        mi.Properties.push_back(oDT);
-        QHash<QString, QString> mps = m_pRedis->hgetall(id.c_str());
-        QHash<QString, QString>::iterator itr;
-        for( itr = mps.begin(); itr != mps.end(); ++itr)
-        {
-            if( itr.key().isEmpty() || itr.value().isEmpty() )
-            {
-                continue;
-            }
+        return di;
+    }
 
-            if( itr.key().compare(DEVICE_NODE,Qt::CaseInsensitive) != 0 )
-            {
-                continue;
-            }
+    ModuleInfo mi;
+    mi.Code = "alarm";
+    mi.AssemblyName = app;
 
-            Setting oSet;
-            oSet.Name = itr.key().toLocal8Bit().toStdString();
-            oSet.qValue = itr.value();
-            mi.vSettings.push_back(oSet);
-            break;
+    DataItem oDT;
+    oDT.Code = "comm";
+    mi.Properties.push_back(oDT);
+    QHash<QString, QString> mps = m_pRedis->hgetall(id.c_str());
+    QHash<QString, QString>::iterator itr;
+    for( itr = mps.begin(); itr != mps.end(); ++itr)
+    {
+        if( itr.key().isEmpty() || itr.value().isEmpty() )
+        {
+            continue;
         }
-        di.ModuleInfo = mi;
-    }
 
+        if( itr.key().compare(DEVICE_NODE,Qt::CaseInsensitive) != 0 )
+        {
+            continue;
+        }
+
+        Setting oSet;
+        oSet.Name = itr.key().toLocal8Bit().toStdString();
+        oSet.qValue = itr.value();
+        mi.vSettings.push_back(oSet);
+        break;
+    }
+    di.ModuleInfo = mi;
     return di;
 }

+ 1 - 0
EngineAlarm/EngineAlarm.pro

@@ -9,6 +9,7 @@ TEMPLATE = app
 unix{
 }
 else{
+    QMAKE_PRE_LINK += cp $${PWD}/../bin/*.a $${PWD}/../lib/
     DESTDIR = $$PWD/../bin
 }
 

+ 4 - 4
MQTTAgent/MQTTAgent.cpp

@@ -71,7 +71,7 @@ bool MQTTAgent::Subscribe(std::list<std::string> topics)
 
 void MQTTAgent::Publish(std::string szKey,QVariant v)
 {
-    std::string szContent;
+/*    std::string szContent;
     switch( v.type() )
     {
     case QMetaType::Bool:
@@ -96,15 +96,15 @@ void MQTTAgent::Publish(std::string szKey,QVariant v)
         szContent = v.toString().toLocal8Bit().toStdString();
         break;
     }
-
+*/
     QMQTT::Message message;
     message.setTopic(szKey.c_str());
-    message.setPayload(szContent.c_str());
+    message.setPayload(v.toByteArray());
     message.setRetain(true); //保留最后一条数据
     if( m_pMqtt != nullptr )
     {
         m_pMqtt->publish(message);
-        qDebug() << __FILE__ << __LINE__ << szKey.c_str() << " " << szContent.c_str();
+        qDebug() << __FILE__ << __LINE__ << szKey.c_str() << " " << v.toString();
     }
 }
 

+ 1 - 1
RedisAgent/RedisAgent.pro

@@ -10,7 +10,7 @@ unix{
 }
 else{
 #DESTDIR = $$PWD/../bin/plugins
-DESTDIR = $$PWD/../lib
+DESTDIR = $$PWD/../bin
 }
 
 #DEFINES += REDISAGENT_LIBRARY

+ 16 - 4
TDAgent/TDAgent.cpp

@@ -25,15 +25,26 @@ bool TDAgent::Connect(tagSetup ts)
 
 bool TDAgent::Subscribe(std::list<std::string> topics)
 {
-    bool ret = true;
+    std::map<std::string,int> mpTopics;
     std::list<std::string>::iterator itr;
     for (itr = topics.begin(); itr != topics.end(); ++itr)
     {
-        objTD.subscribe(itr->c_str(), this);
+        std::string key = *itr;
+        if( key.find(".") != std::string::npos)
+        {
+            key = key.substr(0,key.find("."));
+        }
+        mpTopics[key] = 1;
+    }
+
+    std::map<std::string,int>::iterator itrM;
+    for( itrM = mpTopics.begin(); itrM != mpTopics.end(); ++itrM)
+    {
+        objTD.subscribe(itrM->first.c_str(), this);
     }
 
     objTD.start();
-    return ret;
+    return true;
 }
 
 void TDAgent::Publish(std::string szKey,QVariant v)
@@ -64,5 +75,6 @@ void TDAgent::Publish(std::string szKey,QVariant v)
         break;
     }
 
-//    objRedis.publish(szKey.c_str(), szContent.c_str()); //把数据发送到对应主题
+    objTD.publish(szKey.c_str(), szContent.c_str());
+    //objRedis.publish(szKey.c_str(), szContent.c_str()); //把数据发送到对应主题
 }

+ 1 - 1
TDAgent/TDAgent.pro

@@ -9,7 +9,7 @@ TEMPLATE = lib
 unix{
 }
 else{
-    DESTDIR = $$PWD/../lib
+    DESTDIR = $$PWD/../bin
 }
 
 DEFINES += TDEGENT_LIBRARY

+ 45 - 8
TDAgent/TDengine.cpp

@@ -13,7 +13,6 @@
 #include <string.h>
 #include <time.h>
 
-
 TAOS* pConn = NULL;
 EventSubInterface*  g_pSubCB;       // 订阅回调
 std::list<std::string>  g_lstTopics;
@@ -82,15 +81,14 @@ int32_t TDengine::msgProcess(TAOS_RES* msg)
             }
         }
 
-        std::string content = QJsonDocument(jsonObject).toJson(QJsonDocument::Compact).toStdString();
-
         if( g_pSubCB != nullptr )
         {
             std::string topic = topicName;
-            std::string content = buf;
-            g_pSubCB->SubCB((char*)topic.c_str(),(char*)content.c_str());
+            //std::string content = buf;
+            std::string content = QJsonDocument(jsonObject).toJson(QJsonDocument::Compact).toStdString();
+            g_pSubCB->SubCB(topic,content);
+            //qDebug() << __FILE__ << __LINE__ << "SubCB: " << topic.c_str() << " - " << content.c_str();
         }
-        //qDebug() << __FILE__ << __LINE__ << "row content: " << buf;
     }
 
     return rows;
@@ -169,7 +167,7 @@ tmq_list_t* TDengine::buildTopicList()
     for( itr = g_lstTopics.begin(); itr != g_lstTopics.end(); ++itr )
     {
         std::string szTopic = *itr;
-        qDebug() << __FILE__ << __LINE__ << " topic:" << szTopic.c_str();
+        //qDebug() << __FILE__ << __LINE__ << " topic:" << szTopic.c_str();
         int32_t code = tmq_list_append(topicList, szTopic.c_str());
         if (code) {
             return NULL;
@@ -191,7 +189,14 @@ void TDengine::topicLoop()
     tmq_list_t* topic_list = buildTopicList();
     code = tmq_subscribe(tmq, topic_list);
     if ( code ) {
-        qDebug() << __FILE__ << __LINE__ << "Failed to tmq_subscribe(): " << tmq_err2str(code);
+        std::string szTopics;
+        std::list<std::string>::iterator itr;
+        for( itr = g_lstTopics.begin(); itr != g_lstTopics.end(); ++itr )
+        {
+            szTopics += " " + *itr;
+        }
+
+        qDebug() << __FILE__ << __LINE__ << tmq_err2str(code) << " : " << szTopics.c_str();
     }
 
     tmq_list_destroy(topic_list);
@@ -262,6 +267,38 @@ void TDengine::psubscribe(QString ch, EventSubInterface *fn)
     g_lstTopics.push_back(ch.toLocal8Bit().toStdString());
 }
 
+void TDengine::publish(QString key,QString val)
+{
+    qDebug() << __FILE__ << __LINE__ << " publish " << key << " " << val;
+
+    std::string szTable = key.toLocal8Bit().toStdString();
+    std::string szColume = key.toLocal8Bit().toStdString();
+    if( key.contains(".") )
+    {
+        szTable = key.left(key.indexOf(".")).toLocal8Bit().toStdString();
+        szColume = key.mid(key.indexOf(".")+1,-1).toLocal8Bit().toStdString();
+    }
+
+    std::string sql = "insert into ";
+    sql += dbName.toLocal8Bit().toStdString() + ".";
+    sql += szTable;
+    sql += " (ts, " + szColume;
+    sql += " ) values(NOW,";
+    sql += val.toLocal8Bit().toStdString();
+    sql += " );";
+    TAOS_RES* res = taos_query(pConn, sql.c_str());
+    int code = taos_errno(res);
+    if (code != 0)
+    {
+        qCritical() << __FILE__ << __LINE__ << " execute sql failed. code = " << code << " msg = " << taos_errstr(res);
+        qCritical() << __FILE__ << __LINE__ << " slq :" << sql.c_str();
+        taos_free_result(res);
+    }
+    int affectedRows = taos_affected_rows(res);
+    taos_free_result(res);
+    //return affectedRows;
+}
+
 void TDengine::Setup(tagSetup ts)
 {
     host = ts.addr.c_str();

+ 1 - 0
TDAgent/TDengine.h

@@ -37,5 +37,6 @@ public:
     void Setup(tagSetup ts);
     void subscribe(QString ch, EventSubInterface *fn);      // 订阅
     void psubscribe(QString ch, EventSubInterface *fn);     // 订阅:模式匹配
+    void publish(QString key,QString val);                  // 发布
     void start();
 };

+ 7 - 1
include/Define.h

@@ -72,7 +72,13 @@ struct AlarmRuler
     std::string Operator;            // 运算符
     double dbOperator1;              // 操作数1
     double dbOperator2;              // 操作数2
-    uint nKeepTimes;                 // 保持次数
+    uint nKeepTimes = 5;             // 保持次数
+
+    std::string alarmRuleId;        // 节点-标识码
+    int alarmRuleVersion;
+    int level;
+    int status;
+    std::string object;
 };
 
 struct Config