DBStorage.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. #include "DBStorage.h"
  2. #include <QDebug>
  3. #include <QJsonDocument>
  4. #include <QJsonArray>
  5. #include <QJsonObject>
  6. #include <QSqlError>
  7. #include <QException>
  8. #include <QDateTime>
  9. // static const QString clientID = "dbStorage";
  10. enum emOpenError
  11. {
  12. emDBError_Success,
  13. emDBError_User,
  14. emDBError_Server,
  15. emDBError_Driver,
  16. emDBError_Config
  17. };
  18. inline QString connectName()
  19. {
  20. return QString("thread%1").arg((std::size_t)QThread::currentThreadId());
  21. }
  22. DBStorage::DBStorage() {
  23. // mqtt = new MQTTClient();
  24. //tdclient->start();
  25. }
  26. DBStorage::~DBStorage(){
  27. // delete mqtt;
  28. qDebug() << __FILE__ << __FUNCTION__<< __LINE__;
  29. }
  30. void DBStorage::Run(const ConsumerInfo& ci)
  31. {
  32. // QString host = "192.168.9.6";
  33. // quint16 port = 1883;
  34. // QString usr = "root";
  35. // QString passwd = "N6pNXbZjspDRqNGnxMmc";
  36. // QString clientID = "dbStorage";
  37. // QString type;
  38. // QString server;
  39. // QString DBName;
  40. // QString user;
  41. // QString password;
  42. QJsonParseError jsonParseError;
  43. QJsonDocument jsonDocument(QJsonDocument::fromJson(QString::fromStdString(ci.Settings.c_str()).toUtf8(), &jsonParseError));
  44. if(QJsonParseError::NoError != jsonParseError.error)
  45. {
  46. //LOGERROR("parse json file {} error", fullpath.toStdString().c_str());
  47. qDebug() << "ERROR:" << __FILE__ << __FUNCTION__<< __LINE__;
  48. }
  49. //QJsonArray ja = jsonDocument.array();
  50. QJsonObject jo = jsonDocument.object();
  51. // foreach (auto var, ja) {
  52. dbtype = jo["type"].toString();
  53. server = jo["host"].toString();
  54. usr = jo["usr"].toString();
  55. DBName = jo["database"].toString();
  56. // port = var["port"].toVariant().toInt();
  57. passwd = jo["passwd"].toString();
  58. // dstTable = jo["table"].toString();
  59. // tableColumns = jo["columns"].toString();
  60. // }
  61. QJsonArray targets = jo["target"].toArray();
  62. foreach(auto target , targets){
  63. TargetTable targetTable;
  64. QString tgtName = target["table"].toString();
  65. QJsonArray sources = target["sourcetable"].toArray();
  66. QList<QString>sourceList;
  67. foreach(auto src , sources){
  68. sourceTableMap.insert(src.toString(),tgtName);
  69. sourceList.append(src.toString());
  70. }
  71. QJsonArray columns = target["column"].toArray();
  72. QList<ColumnDef>columnList;
  73. QString columnString = "";
  74. foreach (auto col, columns) {
  75. ColumnDef colDef;
  76. colDef.from = col["from"].toString();
  77. QStringList strlist = colDef.from.split(QLatin1Char('.'), Qt::SkipEmptyParts);
  78. colDef.name = strlist[1];
  79. colDef.to = col["to"].toString();
  80. columnList.append(colDef);
  81. if(!columnString.isEmpty()){
  82. columnString += ",";
  83. }
  84. columnString += col["to"].toString();
  85. }
  86. targetTable.columnDefs = columnList;
  87. targetTable.sourceTables = sourceList;
  88. targetTable.columns = columnString;
  89. targetTableMap.insert(tgtName, targetTable);
  90. }
  91. // clientID = QString("%1@%2").arg(usr, passwd);
  92. dataItems = ci.dataItems;
  93. topicsMap = ci.topicsMap;
  94. // foreach (auto var, ci.dataItems) {
  95. // dataMap.insert({var, {}});
  96. // }
  97. open(dbtype, server, DBName, usr, passwd);
  98. // connect(this, SIGNAL(received), this, SLOT(pushData));
  99. // mqtt->connect2Host(host, port, usr, passwd, clientID);
  100. //start();
  101. }
  102. void DBStorage::OnData(const QString& user, const QString& key, const QVariant& val)
  103. {
  104. std::unordered_map<QString, QJsonObject>dataMap;
  105. dataMap.clear();
  106. qDebug() << __FILE__ << __FUNCTION__<< __LINE__ << user << key << val;
  107. QString topic = key;
  108. // QByteArray payload = "data";
  109. // QByteArray byteArray(val.c_str(), val.length());
  110. QJsonParseError err;
  111. QJsonDocument jsonDoc(QJsonDocument::fromJson(val.toString().toLatin1(),&err));
  112. QJsonObject deviceObject = jsonDoc.object();
  113. // QStringList keys = jsonObject.keys();
  114. // auto itTopic = topicsMap.find(topic.toStdString());
  115. // auto range = topicsMap.equal_range(key.toStdString());
  116. // for (auto it = range.first; it != range.second; ++it) {
  117. // qDebug() << QString(it->first.c_str()) << QString(it->second.c_str()) << '\n';
  118. // QJsonObject val = deviceObject[QString::fromStdString(it->second)].toObject();
  119. // QString dataName = QString("%1.%2").arg(key, it->second.c_str());
  120. // dataMap[dataName] = val;
  121. // }
  122. QString targetTable = "";
  123. if(sourceTableMap.contains(key)){
  124. targetTable = sourceTableMap[key];
  125. }
  126. if(!targetTable.isEmpty()){
  127. auto tableIt = targetTableMap.find(targetTable);
  128. for(auto columnDefsIt = tableIt->columnDefs.begin(); columnDefsIt != tableIt->columnDefs.end(); columnDefsIt++){
  129. QJsonObject ov = deviceObject[columnDefsIt->name].toObject();
  130. dataMap.insert({columnDefsIt->from, ov});
  131. }
  132. insert2Data(key, dataMap);
  133. }
  134. // mqtt->publish(topic, val.toString().toLatin1());
  135. }
  136. int DBStorage::open(QString type,QString wsServer,QString wsDBName,QString wsUser,QString wsPassword)
  137. {
  138. int nPort = 3306;
  139. QString qsServer = wsServer;
  140. if( qsServer.contains(":") )
  141. {
  142. nPort = qsServer.mid(qsServer.indexOf(":")+1).toUInt();
  143. qsServer = qsServer.mid(0,qsServer.indexOf(":"));
  144. }
  145. QSqlDatabase objCnn = QSqlDatabase::database(wsDBName + connectName());
  146. if( objCnn.isOpen() == true )
  147. {
  148. close();
  149. }
  150. if ( type == "MYSQL")
  151. {
  152. driver = "QMYSQL";
  153. // serverIp = wsServer;
  154. // dBName = wsDBName;
  155. // userName = wsUser;
  156. // password = wsPassword;
  157. try
  158. {
  159. QStringList drivers = QSqlDatabase::drivers();
  160. // if( m_bTraceDebug )
  161. {
  162. QString strDrivers = "SQLDrivers:";
  163. foreach(QString driver, drivers)
  164. strDrivers += " " + driver;
  165. qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< strDrivers;
  166. }
  167. objCnn = QSqlDatabase::addDatabase(driver,DBName + connectName());
  168. objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3");
  169. objCnn.setHostName(qsServer);
  170. objCnn.setPort(nPort);
  171. objCnn.setDatabaseName(wsDBName);
  172. objCnn.setUserName(wsUser);
  173. objCnn.setPassword(wsPassword);
  174. if( !objCnn.open() )
  175. {
  176. QSqlError errDBase = objCnn.lastError();
  177. QString wsError = errDBase.text();
  178. qCritical() << __FILE__<<__FUNCTION__<<__LINE__
  179. << " mysql.error : " << wsError;
  180. if( wsError.contains("Can't connect") == true )
  181. {
  182. return emDBError_Server;
  183. }
  184. }
  185. }
  186. catch (QException& e)
  187. {
  188. qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << " mysql.error : " << e.what();
  189. }
  190. }
  191. else if (type == "ODBC")
  192. {
  193. try
  194. {
  195. // QString strHost = ".\\SQLEXPRESS1";
  196. // int port = 1433;
  197. // QString strDbName = "wxd";
  198. // QString strUserName = "sa";
  199. // QString strUserPwd = "111111";
  200. // dBName = wsDBName;
  201. QString strconn = QString("Driver={SQL SERVER};SERVER=%1;DATABASE=%2;UID=%3;PWD=%4;")
  202. .arg(wsServer)
  203. //.arg(port)
  204. .arg(wsDBName)
  205. .arg(wsUser)
  206. .arg(wsPassword);
  207. objCnn = QSqlDatabase::addDatabase("QODBC",DBName + connectName());
  208. objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3");
  209. objCnn.setDatabaseName(strconn);
  210. objCnn.setConnectOptions();
  211. if (!objCnn.open())
  212. {
  213. QSqlError errDBase = objCnn.lastError();
  214. QString wsError = errDBase.text();
  215. if( wsError.contains("Can't connect") == true )
  216. {
  217. return emDBError_Server;
  218. }
  219. }
  220. }
  221. catch (QException& e)
  222. {
  223. qCritical()<< __FILE__<<__FUNCTION__<<__LINE__<< e.what();
  224. }
  225. }
  226. // if( m_bTraceDebug )
  227. {
  228. qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< " db:" << DBName << (objCnn.isOpen() ? "connected.":"connect faild.");
  229. }
  230. // if (objCnn.isOpen())
  231. // {
  232. // GetTableColumnTypes();
  233. // }
  234. return objCnn.isOpen() ? emDBError_Success : emDBError_User;
  235. }
  236. #if 1
  237. bool DBStorage::insert2Data(const QString& sourceTable, const std::unordered_map<QString, QJsonObject>& dataMap)
  238. {
  239. bool bRet = false;
  240. QString strInsertSql;
  241. QString valData = "";
  242. QString dstTable = sourceTableMap[sourceTable];
  243. QString columnString = targetTableMap[dstTable].columns;
  244. QList<ColumnDef> columnDefines = targetTableMap[dstTable].columnDefs;
  245. // QList<ColumnDef>::Iterator columnIt = columnDefines.begin();
  246. for(auto columnIt = columnDefines.begin(); columnIt != columnDefines.end(); columnIt++){
  247. qDebug() << columnIt->from << " -> " << columnIt->to;
  248. auto it = dataMap.find(columnIt->from);
  249. if(it == dataMap.end()){
  250. qDebug() << "ERROR :" << __FILE__ << __FUNCTION__<< __LINE__;
  251. return false;
  252. }
  253. QJsonObject val = it->second;
  254. // QString fromColumn = QString("%1.%2").arg(sourceTable,val["name"].toString());
  255. // if(fromColumn.compare(columnIt->from) != 0){
  256. // continue;
  257. // }
  258. if(!valData.isEmpty()){
  259. valData += ",";
  260. }
  261. int type = val["type"].toInt();
  262. switch (type) {
  263. case QMetaType::Type::QDateTime:
  264. if(dbtype == "MYSQL"){
  265. qint64 timestamp = val["value"].toVariant().toULongLong();
  266. QDateTime dt = QDateTime::fromMSecsSinceEpoch(timestamp/1000);
  267. valData += QString("\'%1.%2\'").arg(dt.toString("yyyy-MM-dd hh:mm:ss"),QString("%1").arg(timestamp%1000));
  268. // valData += QString("'%1'").arg(val["value"].toVariant().toULongLong());
  269. }
  270. break;
  271. case QMetaType::Type::ULongLong:
  272. valData += QString("%1").arg(val["value"].toVariant().toULongLong());
  273. break;
  274. case QMetaType::Type::QString:
  275. valData += QString("'%1'").arg(val["value"].toString());
  276. break;
  277. case QMetaType::Type::Double:
  278. valData += QString("%1").arg(val["value"].toVariant().toDouble());
  279. break;
  280. case QMetaType::Type::Float:
  281. valData += QString("%1").arg(val["value"].toVariant().toFloat());
  282. break;
  283. case QMetaType::Type::Int:
  284. valData += QString("%1").arg(val["value"].toVariant().toInt());
  285. break;
  286. case QMetaType::Type::LongLong:
  287. valData += QString("%1").arg(val["value"].toVariant().toLongLong());
  288. break;
  289. default:
  290. break;
  291. }
  292. }
  293. QString message;
  294. // 生成插入语句
  295. strInsertSql = QString::fromLocal8Bit("INSERT INTO %1(%2) VALUES(%3)").arg(dstTable).arg(columnString).arg(valData);
  296. // 执行插入语句
  297. bRet = excSql(strInsertSql);
  298. if (!bRet)
  299. {
  300. message = QString::fromLocal8Bit("数据入库失败");
  301. qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << message << strInsertSql;
  302. }
  303. return bRet;
  304. }
  305. #endif
  306. bool DBStorage::isConnected()
  307. {
  308. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  309. return objCnn.isOpen();
  310. }
  311. void DBStorage::close()
  312. {
  313. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  314. if( objCnn.isOpen() )
  315. {
  316. objCnn.close();
  317. }
  318. }
  319. bool DBStorage::excSql(QStringList sqlList,int& id)
  320. {
  321. if( sqlList.length() <= 0 )
  322. {
  323. return false;
  324. }
  325. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  326. if( objCnn.isOpen() == false )
  327. {
  328. open(driver, server,DBName, usr, passwd);
  329. }
  330. objCnn = QSqlDatabase::database(DBName + connectName());
  331. QSqlQuery query(objCnn);
  332. objCnn.transaction();
  333. bool bExecute = true;
  334. for( int n = 0 ; n < sqlList.length(); n++ )
  335. {
  336. query.exec( sqlList[n] );
  337. int nCount = query.numRowsAffected();
  338. if( 0 >= nCount )
  339. {
  340. QSqlError errQuery = query.lastError();
  341. QSqlError errDBase = objCnn.lastError();
  342. QString wsError = errDBase.text() + ":" + errQuery.text();
  343. qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << wsError;
  344. bExecute = false;
  345. break;
  346. }
  347. }
  348. if( bExecute )
  349. {
  350. objCnn.commit();
  351. if( query.exec( "select LAST_INSERT_ID()" ) )
  352. {
  353. if( query.next() )
  354. {
  355. id = query.value(0).toInt();
  356. }
  357. }
  358. }
  359. else
  360. {
  361. objCnn.rollback();
  362. }
  363. return bExecute;
  364. }
  365. bool DBStorage::excSql(QString strSql)
  366. {
  367. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  368. if( objCnn.isOpen() == false )
  369. {
  370. open(driver, server,DBName, usr, passwd);
  371. }
  372. objCnn = QSqlDatabase::database(DBName + connectName());
  373. QSqlQuery query(objCnn);
  374. bool bRet = query.exec(strSql);
  375. if( bRet == false )
  376. {
  377. QSqlError errDBase = objCnn.lastError();
  378. QSqlError errQuery = query.lastError();
  379. qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << "query.sql = " << strSql << " QueryError : " << errQuery.text();
  380. }
  381. return bRet;
  382. }
  383. QSqlQuery DBStorage::query(QString strSql)
  384. {
  385. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  386. if( objCnn.isOpen() == false )
  387. {
  388. open(driver,server,DBName, usr, passwd);
  389. }
  390. objCnn = QSqlDatabase::database(DBName + connectName());
  391. QSqlQuery query(objCnn);
  392. query.setForwardOnly(true);
  393. query.exec(strSql) ;
  394. return query;
  395. }
  396. void DBStorage::setLoader(QLibrary *)
  397. {
  398. }
  399. void DBStorage::run()
  400. {
  401. /*while(!isInterruptionRequested()){
  402. qDebug() << __FILE__ << __FUNCTION__;
  403. QThread::msleep(1000);
  404. }*/
  405. }
  406. void DBStorage::pushData()
  407. {
  408. // dataMap[dataName.toStdString()] = val;
  409. }
  410. Client* instance()
  411. {
  412. return new DBStorage();
  413. }
  414. void destroy(Client* pInstance)
  415. {
  416. if( pInstance )
  417. {
  418. delete pInstance;
  419. }
  420. }