#include "Module.h" #include #include #include #include #include #include #include #include void split(const std::string source,const std::string delim,std::vector& dest) { long n = delim.length(); long lt = source.length(); long last = 0; size_t index = source.find_first_of(delim,last); while(index != std::string::npos) { dest.push_back(source.substr(last,index-last)); last = index + n; index = source.find_first_of(delim,last); } long len = source.length(); if( len - last > 0) { dest.push_back(source.substr(last,len - last)); } } Module::Module() :RunnableModule() { m_pDataConsumer = nullptr; m_szOutputs.clear(); m_mpParamConfig.clear(); m_mpDataValue.clear(); m_nLoopIntervalMS = 100;//UINT_FAST32_MAX; } bool Module::isInheritedFrom(std::string tp) { bool bEqual = (strcasecmp(tp.c_str(), "BaseModule") == 0); bEqual |= (strcasecmp(tp.c_str(), "RunnableModule") == 0); return bEqual; } void Module::regConsumer(DataConsumer* pDC) { m_pDataConsumer = pDC; } void Module::Setup(ModuleInfo mi) { std::list::iterator itrO; for( itrO = mi.Properties.begin(); itrO != mi.Properties.end(); ++itrO ) { m_szOutputs.push_back(mi.Code + "_" + itrO->Code); } std::vector::iterator itr; for( itr = mi.vSettings.begin(); itr != mi.vSettings.end(); ++itr ) { QString szConfig = itr->qValue.toString(); QJsonDocument jsonDoc(QJsonDocument::fromJson(szConfig.toLocal8Bit().toStdString().c_str())); QJsonArray ja = jsonDoc.array(); for(auto i : ja) { QString item = i.toString(); m_mpParamConfig[item.toLocal8Bit().toStdString()] = tagParam(); } } } void Module::OnSubData(std::string name,std::string) { if( m_mpParamConfig.find(name) == m_mpParamConfig.end()) { return; } // add data m_objDataLock.lockForWrite(); m_mpDataValue[name] = QDateTime::currentDateTime(); m_objDataLock.unlock(); } void Module::Check() { std::map::iterator itr; for( itr = m_mpParamConfig.begin(); itr != m_mpParamConfig.end(); ++itr ) { pubStatus(itr->first,EAS_Breaked); } while(1) { if( m_pDataConsumer == nullptr ) { std::this_thread::sleep_for(std::chrono::milliseconds(m_nLoopIntervalMS)); continue; } // get first time m_objDataLock.lockForRead(); std::map mpCache = m_mpDataValue; m_objDataLock.unlock(); // no data QDateTime dtNow = QDateTime::currentDateTime(); // status merge std::map::iterator itrR; for( itrR = mpCache.begin(); itrR != mpCache.end(); ++itrR ) { std::string szTable = itrR->first; if( m_mpParamConfig.find(szTable) == m_mpParamConfig.end()) { continue; } uint nIntervalMS = m_mpParamConfig[szTable].uIntervalMS; qint64 tDiff = itrR->second.msecsTo(dtNow); if( tDiff > nIntervalMS ) { if( m_mpParamConfig[szTable].nStatus != EAS_Breaked ) { m_mpParamConfig[szTable].nStatus = EAS_Breaked; m_mpParamConfig[szTable].bChanged = true; pubStatus(szTable,EAS_Breaked); } else { m_mpParamConfig[szTable].bChanged = false; } } else { if( m_mpParamConfig[szTable].nStatus == EAS_Breaked ) { m_mpParamConfig[szTable].nStatus = EAS_Recover; m_mpParamConfig[szTable].bChanged = true; pubStatus(szTable,EAS_Recover); } else { m_mpParamConfig[szTable].nStatus = EAS_Normal; m_mpParamConfig[szTable].bChanged = false; } } } std::this_thread::sleep_for(std::chrono::milliseconds(m_nLoopIntervalMS)); } } void Module::pubStatus(std::string name,int nStatus) { std::vector::iterator itr; for( itr = m_szOutputs.begin(); itr != m_szOutputs.end(); ++itr ) { std::string topic = *itr; QString szStatus; switch( nStatus ) { case EAS_Normal: szStatus = "O"; break; case EAS_Breaked: szStatus = "B"; break; case EAS_Recover: szStatus = "R"; break; } QString szJson; szJson += "{"; szJson += QString("\"%1\":\"%2\"").arg(name.c_str()).arg(szStatus); szJson += "}"; emit sigMqttMsg(topic,szJson); //qDebug() << __FILE__ << __LINE__ << QDateTime::currentDateTime().toString("hh:mm:ss.zzz ") << szMSG; } } void Module::Run() { connect(this,SIGNAL(sigMqttMsg(std::string,QString)),this,SLOT(sltMqttMsg(std::string,QString)),Qt::QueuedConnection); std::thread t(&Module::Check,this); t.detach(); } void Module::sltMqttMsg(std::string topic,QString szJson) { m_pDataConsumer->OnData(topic,QVariant(szJson)); } BaseModule* instance() { return new Module(); } void destroy(BaseModule* pInstance) { if( pInstance ) { delete pInstance; } }