#include "DBStorage.h" #include #include #include #include #include #include #include // static const QString clientID = "dbStorage"; enum emOpenError { emDBError_Success, emDBError_User, emDBError_Server, emDBError_Driver, emDBError_Config }; inline QString connectName() { return QString("thread%1").arg((std::size_t)QThread::currentThreadId()); } DBStorage::DBStorage() { // mqtt = new MQTTClient(); //tdclient->start(); } DBStorage::~DBStorage(){ // delete mqtt; qDebug() << __FILE__ << __FUNCTION__<< __LINE__; } void DBStorage::Run(const ConsumerInfo& ci) { // QString host = "192.168.9.6"; // quint16 port = 1883; // QString usr = "root"; // QString passwd = "N6pNXbZjspDRqNGnxMmc"; // QString clientID = "dbStorage"; // QString type; // QString server; // QString DBName; // QString user; // QString password; QJsonParseError jsonParseError; QJsonDocument jsonDocument(QJsonDocument::fromJson(QString::fromStdString(ci.Settings.c_str()).toUtf8(), &jsonParseError)); if(QJsonParseError::NoError != jsonParseError.error) { //LOGERROR("parse json file {} error", fullpath.toStdString().c_str()); qDebug() << "ERROR:" << __FILE__ << __FUNCTION__<< __LINE__; } //QJsonArray ja = jsonDocument.array(); QJsonObject jo = jsonDocument.object(); // foreach (auto var, ja) { dbtype = jo["type"].toString(); server = jo["host"].toString(); usr = jo["usr"].toString(); DBName = jo["database"].toString(); // port = var["port"].toVariant().toInt(); passwd = jo["passwd"].toString(); // dstTable = jo["table"].toString(); // tableColumns = jo["columns"].toString(); // } QJsonArray targets = jo["target"].toArray(); foreach(auto target , targets){ TargetTable targetTable; QString tgtName = target["table"].toString(); QJsonArray sources = target["sourcetable"].toArray(); QListsourceList; foreach(auto src , sources){ sourceTableMap.insert(src.toString(),tgtName); sourceList.append(src.toString()); } QJsonArray columns = target["column"].toArray(); QListcolumnList; QString columnString = ""; foreach (auto col, columns) { ColumnDef colDef; colDef.from = col["from"].toString(); QStringList strlist = colDef.from.split(QLatin1Char('.'), Qt::SkipEmptyParts); colDef.name = strlist[1]; colDef.to = col["to"].toString(); columnList.append(colDef); if(!columnString.isEmpty()){ columnString += ","; } columnString += col["to"].toString(); } targetTable.columnDefs = columnList; targetTable.sourceTables = sourceList; targetTable.columns = columnString; targetTableMap.insert(tgtName, targetTable); } // clientID = QString("%1@%2").arg(usr, passwd); // dataItems = ci.dataItems; // topicsMap = ci.topicsMap; // foreach (auto var, ci.dataItems) { // dataMap.insert({var, {}}); // } open(dbtype, server, DBName, usr, passwd); // connect(this, SIGNAL(received), this, SLOT(pushData)); // mqtt->connect2Host(host, port, usr, passwd, clientID); //start(); } void DBStorage::OnData(const QString& user, const QString& key, const QVariant& val) { QMapdataMap; dataMap.clear(); qDebug() << __FILE__ << __FUNCTION__<< __LINE__ << user << key << val; QString topic = key; // QByteArray payload = "data"; // QByteArray byteArray(val.c_str(), val.length()); QJsonParseError err; QJsonDocument jsonDoc(QJsonDocument::fromJson(val.toString().toLatin1(),&err)); QJsonObject deviceObject = jsonDoc.object(); // QStringList keys = jsonObject.keys(); // auto itTopic = topicsMap.find(topic.toStdString()); // auto range = topicsMap.equal_range(key.toStdString()); // for (auto it = range.first; it != range.second; ++it) { // qDebug() << QString(it->first.c_str()) << QString(it->second.c_str()) << '\n'; // QJsonObject val = deviceObject[QString::fromStdString(it->second)].toObject(); // QString dataName = QString("%1.%2").arg(key, it->second.c_str()); // dataMap[dataName] = val; // } QString targetTable = ""; if(sourceTableMap.contains(key)){ targetTable = sourceTableMap[key]; } if(!targetTable.isEmpty()){ auto tableIt = targetTableMap.find(targetTable); for(auto columnDefsIt = tableIt->columnDefs.begin(); columnDefsIt != tableIt->columnDefs.end(); columnDefsIt++){ QJsonObject ov = deviceObject[columnDefsIt->name].toObject(); // qDebug() << " >> " << columnDefsIt->from ; dataMap.insert(columnDefsIt->from, ov); } insert2Data(key, dataMap); } // mqtt->publish(topic, val.toString().toLatin1()); } int DBStorage::open(QString type,QString wsServer,QString wsDBName,QString wsUser,QString wsPassword) { int nPort = 3306; QString qsServer = wsServer; if( qsServer.contains(":") ) { nPort = qsServer.mid(qsServer.indexOf(":")+1).toUInt(); qsServer = qsServer.mid(0,qsServer.indexOf(":")); } QSqlDatabase objCnn = QSqlDatabase::database(wsDBName + connectName()); if( objCnn.isOpen() == true ) { close(); } if ( type == "MYSQL") { driver = "QMYSQL"; // serverIp = wsServer; // dBName = wsDBName; // userName = wsUser; // password = wsPassword; try { QStringList drivers = QSqlDatabase::drivers(); // if( m_bTraceDebug ) // { // QString strDrivers = "SQLDrivers:"; // foreach(QString driver, drivers) // strDrivers += " " + driver; // qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< strDrivers; // } objCnn = QSqlDatabase::addDatabase(driver,DBName + connectName()); objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3"); objCnn.setHostName(qsServer); objCnn.setPort(nPort); objCnn.setDatabaseName(wsDBName); objCnn.setUserName(wsUser); objCnn.setPassword(wsPassword); if( !objCnn.open() ) { QSqlError errDBase = objCnn.lastError(); QString wsError = errDBase.text(); qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << " mysql.error : " << wsError; if( wsError.contains("Can't connect") == true ) { return emDBError_Server; } } } catch (QException& e) { qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << " mysql.error : " << e.what(); } } else if (type == "ODBC") { try { // QString strHost = ".\\SQLEXPRESS1"; // int port = 1433; // QString strDbName = "wxd"; // QString strUserName = "sa"; // QString strUserPwd = "111111"; // dBName = wsDBName; QString strconn = QString("Driver={SQL SERVER};SERVER=%1;DATABASE=%2;UID=%3;PWD=%4;") .arg(wsServer) //.arg(port) .arg(wsDBName) .arg(wsUser) .arg(wsPassword); objCnn = QSqlDatabase::addDatabase("QODBC",DBName + connectName()); objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3"); objCnn.setDatabaseName(strconn); objCnn.setConnectOptions(); if (!objCnn.open()) { QSqlError errDBase = objCnn.lastError(); QString wsError = errDBase.text(); if( wsError.contains("Can't connect") == true ) { return emDBError_Server; } } } catch (QException& e) { qCritical()<< __FILE__<<__FUNCTION__<<__LINE__<< e.what(); } } // if( m_bTraceDebug ) { qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< " db:" << DBName << (objCnn.isOpen() ? "connected.":"connect faild."); } // if (objCnn.isOpen()) // { // GetTableColumnTypes(); // } return objCnn.isOpen() ? emDBError_Success : emDBError_User; } #if 1 bool DBStorage::insert2Data(const QString& sourceTable, const QMap& dataMap) { bool bRet = false; QString strInsertSql; QString valData = ""; QString dstTable = sourceTableMap[sourceTable]; QString columnString = targetTableMap[dstTable].columns; QList columnDefines = targetTableMap[dstTable].columnDefs; // QList::Iterator columnIt = columnDefines.begin(); for(auto columnIt = columnDefines.begin(); columnIt != columnDefines.end(); columnIt++){ // qDebug() << columnIt->from << " -> " << columnIt->to; auto it = dataMap.find(columnIt->from); if(it == dataMap.end()){ qDebug() << "ERROR :" << __FILE__ << __FUNCTION__<< __LINE__; return false; } QJsonObject val = it.value(); // QString fromColumn = QString("%1.%2").arg(sourceTable,val["name"].toString()); // if(fromColumn.compare(columnIt->from) != 0){ // continue; // } if(!valData.isEmpty()){ valData += ","; } int type = val["type"].toInt(); switch (type) { case QMetaType::Type::QDateTime: if(dbtype == "MYSQL"){ qint64 timestamp = val["value"].toVariant().toULongLong(); QDateTime dt = QDateTime::fromMSecsSinceEpoch(timestamp/1000); valData += QString("\'%1.%2\'").arg(dt.toString("yyyy-MM-dd hh:mm:ss"),QString("%1").arg(timestamp%1000)); // valData += QString("'%1'").arg(val["value"].toVariant().toULongLong()); } break; case QMetaType::Type::ULongLong: valData += QString("%1").arg(val["value"].toVariant().toULongLong()); break; case QMetaType::Type::QString: valData += QString("'%1'").arg(val["value"].toString()); break; case QMetaType::Type::Double: valData += QString("%1").arg(val["value"].toVariant().toDouble()); break; case QMetaType::Type::Float: valData += QString("%1").arg(val["value"].toVariant().toFloat()); break; case QMetaType::Type::Int: valData += QString("%1").arg(val["value"].toVariant().toInt()); break; case QMetaType::Type::LongLong: valData += QString("%1").arg(val["value"].toVariant().toLongLong()); break; default: break; } } QString message; // 生成插入语句 strInsertSql = QString::fromLocal8Bit("INSERT INTO %1(%2) VALUES(%3)").arg(dstTable).arg(columnString).arg(valData); // 执行插入语句 bRet = excSql(strInsertSql); if (!bRet) { message = QString::fromLocal8Bit("数据入库失败"); qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << message << strInsertSql; }else{ qDebug() << __FILE__<<__FUNCTION__<<__LINE__ << " insert success : " << strInsertSql; } return bRet; } #endif bool DBStorage::isConnected() { QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName()); return objCnn.isOpen(); } void DBStorage::close() { QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName()); if( objCnn.isOpen() ) { objCnn.close(); } } bool DBStorage::excSql(QStringList sqlList,int& id) { if( sqlList.length() <= 0 ) { return false; } QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName()); if( objCnn.isOpen() == false ) { open(dbtype, server,DBName, usr, passwd); } objCnn = QSqlDatabase::database(DBName + connectName()); QSqlQuery query(objCnn); objCnn.transaction(); bool bExecute = true; for( int n = 0 ; n < sqlList.length(); n++ ) { query.exec( sqlList[n] ); int nCount = query.numRowsAffected(); if( 0 >= nCount ) { QSqlError errQuery = query.lastError(); QSqlError errDBase = objCnn.lastError(); QString wsError = errDBase.text() + ":" + errQuery.text(); qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << wsError; bExecute = false; break; } } if( bExecute ) { objCnn.commit(); if( query.exec( "select LAST_INSERT_ID()" ) ) { if( query.next() ) { id = query.value(0).toInt(); } } } else { objCnn.rollback(); } return bExecute; } bool DBStorage::excSql(QString strSql) { QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName()); if( objCnn.isOpen() == false ) { open(dbtype, server,DBName, usr, passwd); } objCnn = QSqlDatabase::database(DBName + connectName()); QSqlQuery query(objCnn); bool bRet = query.exec(strSql); if( bRet == false ) { QSqlError errDBase = objCnn.lastError(); QSqlError errQuery = query.lastError(); qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << "query.sql = " << strSql << " QueryError : " << errQuery.text(); } return bRet; } QSqlQuery DBStorage::query(QString strSql) { QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName()); if( objCnn.isOpen() == false ) { open(dbtype,server,DBName, usr, passwd); } objCnn = QSqlDatabase::database(DBName + connectName()); QSqlQuery query(objCnn); query.setForwardOnly(true); query.exec(strSql) ; return query; } void DBStorage::setLoader(QLibrary *) { } void DBStorage::run() { /*while(!isInterruptionRequested()){ qDebug() << __FILE__ << __FUNCTION__; QThread::msleep(1000); }*/ } void DBStorage::pushData() { // dataMap[dataName.toStdString()] = val; } Client* instance() { return new DBStorage(); } void destroy(Client* pInstance) { if( pInstance ) { delete pInstance; } }