123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466 |
- #include "DBStorage.h"
- #include <QDebug>
- #include <QJsonDocument>
- #include <QJsonArray>
- #include <QJsonObject>
- #include <QSqlError>
- #include <QException>
- #include <QDateTime>
- // static const QString clientID = "dbStorage";
- enum emOpenError
- {
- emDBError_Success,
- emDBError_User,
- emDBError_Server,
- emDBError_Driver,
- emDBError_Config
- };
- inline QString connectName()
- {
- return QString("thread%1").arg((std::size_t)QThread::currentThreadId());
- }
- DBStorage::DBStorage() {
- // mqtt = new MQTTClient();
- //tdclient->start();
- }
- DBStorage::~DBStorage(){
- // delete mqtt;
- qDebug() << __FILE__ << __FUNCTION__<< __LINE__;
- }
- void DBStorage::Run(const ConsumerInfo& ci)
- {
- // QString host = "192.168.9.6";
- // quint16 port = 1883;
- // QString usr = "root";
- // QString passwd = "N6pNXbZjspDRqNGnxMmc";
- // QString clientID = "dbStorage";
- // QString type;
- // QString server;
- // QString DBName;
- // QString user;
- // QString password;
- QJsonParseError jsonParseError;
- QJsonDocument jsonDocument(QJsonDocument::fromJson(QString::fromStdString(ci.Settings.c_str()).toUtf8(), &jsonParseError));
- if(QJsonParseError::NoError != jsonParseError.error)
- {
- //LOGERROR("parse json file {} error", fullpath.toStdString().c_str());
- qDebug() << "ERROR:" << __FILE__ << __FUNCTION__<< __LINE__;
- }
- //QJsonArray ja = jsonDocument.array();
- QJsonObject jo = jsonDocument.object();
- // foreach (auto var, ja) {
- dbtype = jo["type"].toString();
- server = jo["host"].toString();
- usr = jo["usr"].toString();
- DBName = jo["database"].toString();
- // port = var["port"].toVariant().toInt();
- passwd = jo["passwd"].toString();
- // dstTable = jo["table"].toString();
- // tableColumns = jo["columns"].toString();
- // }
- QJsonArray targets = jo["target"].toArray();
- foreach(auto target , targets){
- TargetTable targetTable;
- QString tgtName = target["table"].toString();
- QJsonArray sources = target["sourcetable"].toArray();
- QList<QString>sourceList;
- foreach(auto src , sources){
- sourceTableMap.insert(src.toString(),tgtName);
- sourceList.append(src.toString());
- }
- QJsonArray columns = target["column"].toArray();
- QList<ColumnDef>columnList;
- QString columnString = "";
- foreach (auto col, columns) {
- ColumnDef colDef;
- colDef.from = col["from"].toString();
- QStringList strlist = colDef.from.split(QLatin1Char('.'), Qt::SkipEmptyParts);
- colDef.name = strlist[1];
- colDef.to = col["to"].toString();
- columnList.append(colDef);
- if(!columnString.isEmpty()){
- columnString += ",";
- }
- columnString += col["to"].toString();
- }
- targetTable.columnDefs = columnList;
- targetTable.sourceTables = sourceList;
- targetTable.columns = columnString;
- targetTableMap.insert(tgtName, targetTable);
- }
- // clientID = QString("%1@%2").arg(usr, passwd);
- // dataItems = ci.dataItems;
- // topicsMap = ci.topicsMap;
- // foreach (auto var, ci.dataItems) {
- // dataMap.insert({var, {}});
- // }
- open(dbtype, server, DBName, usr, passwd);
- // connect(this, SIGNAL(received), this, SLOT(pushData));
- // mqtt->connect2Host(host, port, usr, passwd, clientID);
- //start();
- }
- void DBStorage::OnData(const QString& user, const QString& key, const QVariant& val)
- {
- QMap<QString, QJsonObject>dataMap;
- dataMap.clear();
- qDebug() << __FILE__ << __FUNCTION__<< __LINE__ << user << key << val;
- QString topic = key;
- // QByteArray payload = "data";
- // QByteArray byteArray(val.c_str(), val.length());
- QJsonParseError err;
- QJsonDocument jsonDoc(QJsonDocument::fromJson(val.toString().toLatin1(),&err));
- QJsonObject deviceObject = jsonDoc.object();
- // QStringList keys = jsonObject.keys();
- // auto itTopic = topicsMap.find(topic.toStdString());
- // auto range = topicsMap.equal_range(key.toStdString());
- // for (auto it = range.first; it != range.second; ++it) {
- // qDebug() << QString(it->first.c_str()) << QString(it->second.c_str()) << '\n';
- // QJsonObject val = deviceObject[QString::fromStdString(it->second)].toObject();
- // QString dataName = QString("%1.%2").arg(key, it->second.c_str());
- // dataMap[dataName] = val;
- // }
- QString targetTable = "";
- if(sourceTableMap.contains(key)){
- targetTable = sourceTableMap[key];
- }
- if(!targetTable.isEmpty()){
- auto tableIt = targetTableMap.find(targetTable);
- for(auto columnDefsIt = tableIt->columnDefs.begin(); columnDefsIt != tableIt->columnDefs.end(); columnDefsIt++){
- QJsonObject ov = deviceObject[columnDefsIt->name].toObject();
- // qDebug() << " >> " << columnDefsIt->from ;
- dataMap.insert(columnDefsIt->from, ov);
- }
- insert2Data(key, dataMap);
- }
- // mqtt->publish(topic, val.toString().toLatin1());
- }
- int DBStorage::open(QString type,QString wsServer,QString wsDBName,QString wsUser,QString wsPassword)
- {
- int nPort = 3306;
- QString qsServer = wsServer;
- if( qsServer.contains(":") )
- {
- nPort = qsServer.mid(qsServer.indexOf(":")+1).toUInt();
- qsServer = qsServer.mid(0,qsServer.indexOf(":"));
- }
- QSqlDatabase objCnn = QSqlDatabase::database(wsDBName + connectName());
- if( objCnn.isOpen() == true )
- {
- close();
- }
- if ( type == "MYSQL")
- {
- driver = "QMYSQL";
- // serverIp = wsServer;
- // dBName = wsDBName;
- // userName = wsUser;
- // password = wsPassword;
- try
- {
- QStringList drivers = QSqlDatabase::drivers();
- // if( m_bTraceDebug )
- // {
- // QString strDrivers = "SQLDrivers:";
- // foreach(QString driver, drivers)
- // strDrivers += " " + driver;
- // qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< strDrivers;
- // }
- objCnn = QSqlDatabase::addDatabase(driver,DBName + connectName());
- objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3");
- objCnn.setHostName(qsServer);
- objCnn.setPort(nPort);
- objCnn.setDatabaseName(wsDBName);
- objCnn.setUserName(wsUser);
- objCnn.setPassword(wsPassword);
- if( !objCnn.open() )
- {
- QSqlError errDBase = objCnn.lastError();
- QString wsError = errDBase.text();
- qCritical() << __FILE__<<__FUNCTION__<<__LINE__
- << " mysql.error : " << wsError;
- if( wsError.contains("Can't connect") == true )
- {
- return emDBError_Server;
- }
- }
- }
- catch (QException& e)
- {
- qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << " mysql.error : " << e.what();
- }
- }
- else if (type == "ODBC")
- {
- try
- {
- // QString strHost = ".\\SQLEXPRESS1";
- // int port = 1433;
- // QString strDbName = "wxd";
- // QString strUserName = "sa";
- // QString strUserPwd = "111111";
- // dBName = wsDBName;
- QString strconn = QString("Driver={SQL SERVER};SERVER=%1;DATABASE=%2;UID=%3;PWD=%4;")
- .arg(wsServer)
- //.arg(port)
- .arg(wsDBName)
- .arg(wsUser)
- .arg(wsPassword);
- objCnn = QSqlDatabase::addDatabase("QODBC",DBName + connectName());
- objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3");
- objCnn.setDatabaseName(strconn);
- objCnn.setConnectOptions();
- if (!objCnn.open())
- {
- QSqlError errDBase = objCnn.lastError();
- QString wsError = errDBase.text();
- if( wsError.contains("Can't connect") == true )
- {
- return emDBError_Server;
- }
- }
- }
- catch (QException& e)
- {
- qCritical()<< __FILE__<<__FUNCTION__<<__LINE__<< e.what();
- }
- }
- // if( m_bTraceDebug )
- {
- qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< " db:" << DBName << (objCnn.isOpen() ? "connected.":"connect faild.");
- }
- // if (objCnn.isOpen())
- // {
- // GetTableColumnTypes();
- // }
- return objCnn.isOpen() ? emDBError_Success : emDBError_User;
- }
- #if 1
- bool DBStorage::insert2Data(const QString& sourceTable, const QMap<QString, QJsonObject>& dataMap)
- {
- bool bRet = false;
- QString strInsertSql;
- QString valData = "";
- QString dstTable = sourceTableMap[sourceTable];
- QString columnString = targetTableMap[dstTable].columns;
- QList<ColumnDef> columnDefines = targetTableMap[dstTable].columnDefs;
- // QList<ColumnDef>::Iterator columnIt = columnDefines.begin();
- for(auto columnIt = columnDefines.begin(); columnIt != columnDefines.end(); columnIt++){
- // qDebug() << columnIt->from << " -> " << columnIt->to;
- auto it = dataMap.find(columnIt->from);
- if(it == dataMap.end()){
- qDebug() << "ERROR :" << __FILE__ << __FUNCTION__<< __LINE__;
- return false;
- }
- QJsonObject val = it.value();
- // QString fromColumn = QString("%1.%2").arg(sourceTable,val["name"].toString());
- // if(fromColumn.compare(columnIt->from) != 0){
- // continue;
- // }
- if(!valData.isEmpty()){
- valData += ",";
- }
- int type = val["type"].toInt();
- switch (type) {
- case QMetaType::Type::QDateTime:
- if(dbtype == "MYSQL"){
- qint64 timestamp = val["value"].toVariant().toULongLong();
- QDateTime dt = QDateTime::fromMSecsSinceEpoch(timestamp/1000);
- valData += QString("\'%1.%2\'").arg(dt.toString("yyyy-MM-dd hh:mm:ss"),QString("%1").arg(timestamp%1000));
- // valData += QString("'%1'").arg(val["value"].toVariant().toULongLong());
- }
- break;
- case QMetaType::Type::ULongLong:
- valData += QString("%1").arg(val["value"].toVariant().toULongLong());
- break;
- case QMetaType::Type::QString:
- valData += QString("'%1'").arg(val["value"].toString());
- break;
- case QMetaType::Type::Double:
- valData += QString("%1").arg(val["value"].toVariant().toDouble());
- break;
- case QMetaType::Type::Float:
- valData += QString("%1").arg(val["value"].toVariant().toFloat());
- break;
- case QMetaType::Type::Int:
- valData += QString("%1").arg(val["value"].toVariant().toInt());
- break;
- case QMetaType::Type::LongLong:
- valData += QString("%1").arg(val["value"].toVariant().toLongLong());
- break;
- default:
- break;
- }
- }
- QString message;
- // 生成插入语句
- strInsertSql = QString::fromLocal8Bit("INSERT INTO %1(%2) VALUES(%3)").arg(dstTable).arg(columnString).arg(valData);
- // 执行插入语句
- bRet = excSql(strInsertSql);
- if (!bRet)
- {
- message = QString::fromLocal8Bit("数据入库失败");
- qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << message << strInsertSql;
- }else{
- qDebug() << __FILE__<<__FUNCTION__<<__LINE__ << " insert success : " << strInsertSql;
- }
- return bRet;
- }
- #endif
- bool DBStorage::isConnected()
- {
- QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
- return objCnn.isOpen();
- }
- void DBStorage::close()
- {
- QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
- if( objCnn.isOpen() )
- {
- objCnn.close();
- }
- }
- bool DBStorage::excSql(QStringList sqlList,int& id)
- {
- if( sqlList.length() <= 0 )
- {
- return false;
- }
- QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
- if( objCnn.isOpen() == false )
- {
- open(dbtype, server,DBName, usr, passwd);
- }
- objCnn = QSqlDatabase::database(DBName + connectName());
- QSqlQuery query(objCnn);
- objCnn.transaction();
- bool bExecute = true;
- for( int n = 0 ; n < sqlList.length(); n++ )
- {
- query.exec( sqlList[n] );
- int nCount = query.numRowsAffected();
- if( 0 >= nCount )
- {
- QSqlError errQuery = query.lastError();
- QSqlError errDBase = objCnn.lastError();
- QString wsError = errDBase.text() + ":" + errQuery.text();
- qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << wsError;
- bExecute = false;
- break;
- }
- }
- if( bExecute )
- {
- objCnn.commit();
- if( query.exec( "select LAST_INSERT_ID()" ) )
- {
- if( query.next() )
- {
- id = query.value(0).toInt();
- }
- }
- }
- else
- {
- objCnn.rollback();
- }
- return bExecute;
- }
- bool DBStorage::excSql(QString strSql)
- {
- QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
- if( objCnn.isOpen() == false )
- {
- open(dbtype, server,DBName, usr, passwd);
- }
- objCnn = QSqlDatabase::database(DBName + connectName());
- QSqlQuery query(objCnn);
- bool bRet = query.exec(strSql);
- if( bRet == false )
- {
- QSqlError errDBase = objCnn.lastError();
- QSqlError errQuery = query.lastError();
- qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << "query.sql = " << strSql << " QueryError : " << errQuery.text();
- }
- return bRet;
- }
- QSqlQuery DBStorage::query(QString strSql)
- {
- QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
- if( objCnn.isOpen() == false )
- {
- open(dbtype,server,DBName, usr, passwd);
- }
- objCnn = QSqlDatabase::database(DBName + connectName());
- QSqlQuery query(objCnn);
- query.setForwardOnly(true);
- query.exec(strSql) ;
- return query;
- }
- void DBStorage::setLoader(QLibrary *)
- {
- }
- void DBStorage::run()
- {
- /*while(!isInterruptionRequested()){
- qDebug() << __FILE__ << __FUNCTION__;
- QThread::msleep(1000);
- }*/
- }
- void DBStorage::pushData()
- {
- // dataMap[dataName.toStdString()] = val;
- }
- Client* instance()
- {
- return new DBStorage();
- }
- void destroy(Client* pInstance)
- {
- if( pInstance )
- {
- delete pInstance;
- }
- }
|