DBStorage.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. #include "DBStorage.h"
  2. #include <QDebug>
  3. #include <QJsonDocument>
  4. #include <QJsonArray>
  5. #include <QJsonObject>
  6. #include <QSqlError>
  7. #include <QException>
  8. #include <QDateTime>
  9. // static const QString clientID = "dbStorage";
  10. enum emOpenError
  11. {
  12. emDBError_Success,
  13. emDBError_User,
  14. emDBError_Server,
  15. emDBError_Driver,
  16. emDBError_Config
  17. };
  18. inline QString connectName()
  19. {
  20. return QString("thread%1").arg((std::size_t)QThread::currentThreadId());
  21. }
  22. DBStorage::DBStorage() {
  23. // mqtt = new MQTTClient();
  24. //tdclient->start();
  25. }
  26. DBStorage::~DBStorage(){
  27. // delete mqtt;
  28. qDebug() << __FILE__ << __FUNCTION__<< __LINE__;
  29. }
  30. void DBStorage::Run(const ConsumerInfo& ci)
  31. {
  32. // QString host = "192.168.9.6";
  33. // quint16 port = 1883;
  34. // QString usr = "root";
  35. // QString passwd = "N6pNXbZjspDRqNGnxMmc";
  36. // QString clientID = "dbStorage";
  37. // QString type;
  38. // QString server;
  39. // QString DBName;
  40. // QString user;
  41. // QString password;
  42. QJsonParseError jsonParseError;
  43. QJsonDocument jsonDocument(QJsonDocument::fromJson(QString::fromStdString(ci.Settings.c_str()).toUtf8(), &jsonParseError));
  44. if(QJsonParseError::NoError != jsonParseError.error)
  45. {
  46. //LOGERROR("parse json file {} error", fullpath.toStdString().c_str());
  47. qDebug() << "ERROR:" << __FILE__ << __FUNCTION__<< __LINE__;
  48. }
  49. //QJsonArray ja = jsonDocument.array();
  50. QJsonObject jo = jsonDocument.object();
  51. // foreach (auto var, ja) {
  52. dbtype = jo["type"].toString();
  53. server = jo["host"].toString();
  54. usr = jo["usr"].toString();
  55. DBName = jo["database"].toString();
  56. // port = var["port"].toVariant().toInt();
  57. passwd = jo["passwd"].toString();
  58. // dstTable = jo["table"].toString();
  59. // tableColumns = jo["columns"].toString();
  60. // }
  61. QJsonArray targets = jo["target"].toArray();
  62. foreach(auto target , targets){
  63. TargetTable targetTable;
  64. QString tgtName = target["table"].toString();
  65. QJsonArray sources = target["sourcetable"].toArray();
  66. QList<QString>sourceList;
  67. foreach(auto src , sources){
  68. sourceTableMap.insert(src.toString(),tgtName);
  69. sourceList.append(src.toString());
  70. }
  71. QJsonArray columns = target["column"].toArray();
  72. QList<ColumnDef>columnList;
  73. QString columnString = "";
  74. foreach (auto col, columns) {
  75. ColumnDef colDef;
  76. colDef.from = col["from"].toString();
  77. QStringList strlist = colDef.from.split(QLatin1Char('.'), Qt::SkipEmptyParts);
  78. colDef.name = strlist[1];
  79. colDef.to = col["to"].toString();
  80. columnList.append(colDef);
  81. if(!columnString.isEmpty()){
  82. columnString += ",";
  83. }
  84. columnString += col["to"].toString();
  85. }
  86. targetTable.columnDefs = columnList;
  87. targetTable.sourceTables = sourceList;
  88. targetTable.columns = columnString;
  89. targetTableMap.insert(tgtName, targetTable);
  90. }
  91. // clientID = QString("%1@%2").arg(usr, passwd);
  92. // dataItems = ci.dataItems;
  93. // topicsMap = ci.topicsMap;
  94. // foreach (auto var, ci.dataItems) {
  95. // dataMap.insert({var, {}});
  96. // }
  97. open(dbtype, server, DBName, usr, passwd);
  98. // connect(this, SIGNAL(received), this, SLOT(pushData));
  99. // mqtt->connect2Host(host, port, usr, passwd, clientID);
  100. //start();
  101. }
  102. void DBStorage::OnData(const QString& user, const QString& key, const QVariant& val)
  103. {
  104. QMap<QString, QJsonObject>dataMap;
  105. dataMap.clear();
  106. qDebug() << __FILE__ << __FUNCTION__<< __LINE__ << user << key << val;
  107. QString topic = key;
  108. // QByteArray payload = "data";
  109. // QByteArray byteArray(val.c_str(), val.length());
  110. QJsonParseError err;
  111. QJsonDocument jsonDoc(QJsonDocument::fromJson(val.toString().toLatin1(),&err));
  112. QJsonObject deviceObject = jsonDoc.object();
  113. // QStringList keys = jsonObject.keys();
  114. // auto itTopic = topicsMap.find(topic.toStdString());
  115. // auto range = topicsMap.equal_range(key.toStdString());
  116. // for (auto it = range.first; it != range.second; ++it) {
  117. // qDebug() << QString(it->first.c_str()) << QString(it->second.c_str()) << '\n';
  118. // QJsonObject val = deviceObject[QString::fromStdString(it->second)].toObject();
  119. // QString dataName = QString("%1.%2").arg(key, it->second.c_str());
  120. // dataMap[dataName] = val;
  121. // }
  122. QString targetTable = "";
  123. if(sourceTableMap.contains(key)){
  124. targetTable = sourceTableMap[key];
  125. }
  126. if(!targetTable.isEmpty()){
  127. auto tableIt = targetTableMap.find(targetTable);
  128. for(auto columnDefsIt = tableIt->columnDefs.begin(); columnDefsIt != tableIt->columnDefs.end(); columnDefsIt++){
  129. QJsonObject ov = deviceObject[columnDefsIt->name].toObject();
  130. // qDebug() << " >> " << columnDefsIt->from ;
  131. dataMap.insert(columnDefsIt->from, ov);
  132. }
  133. insert2Data(key, dataMap);
  134. }
  135. // mqtt->publish(topic, val.toString().toLatin1());
  136. }
  137. int DBStorage::open(QString type,QString wsServer,QString wsDBName,QString wsUser,QString wsPassword)
  138. {
  139. int nPort = 3306;
  140. QString qsServer = wsServer;
  141. if( qsServer.contains(":") )
  142. {
  143. nPort = qsServer.mid(qsServer.indexOf(":")+1).toUInt();
  144. qsServer = qsServer.mid(0,qsServer.indexOf(":"));
  145. }
  146. QSqlDatabase objCnn = QSqlDatabase::database(wsDBName + connectName());
  147. if( objCnn.isOpen() == true )
  148. {
  149. close();
  150. }
  151. if ( type == "MYSQL")
  152. {
  153. driver = "QMYSQL";
  154. // serverIp = wsServer;
  155. // dBName = wsDBName;
  156. // userName = wsUser;
  157. // password = wsPassword;
  158. try
  159. {
  160. QStringList drivers = QSqlDatabase::drivers();
  161. // if( m_bTraceDebug )
  162. // {
  163. // QString strDrivers = "SQLDrivers:";
  164. // foreach(QString driver, drivers)
  165. // strDrivers += " " + driver;
  166. // qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< strDrivers;
  167. // }
  168. objCnn = QSqlDatabase::addDatabase(driver,DBName + connectName());
  169. objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3");
  170. objCnn.setHostName(qsServer);
  171. objCnn.setPort(nPort);
  172. objCnn.setDatabaseName(wsDBName);
  173. objCnn.setUserName(wsUser);
  174. objCnn.setPassword(wsPassword);
  175. if( !objCnn.open() )
  176. {
  177. QSqlError errDBase = objCnn.lastError();
  178. QString wsError = errDBase.text();
  179. qCritical() << __FILE__<<__FUNCTION__<<__LINE__
  180. << " mysql.error : " << wsError;
  181. if( wsError.contains("Can't connect") == true )
  182. {
  183. return emDBError_Server;
  184. }
  185. }
  186. }
  187. catch (QException& e)
  188. {
  189. qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << " mysql.error : " << e.what();
  190. }
  191. }
  192. else if (type == "ODBC")
  193. {
  194. try
  195. {
  196. // QString strHost = ".\\SQLEXPRESS1";
  197. // int port = 1433;
  198. // QString strDbName = "wxd";
  199. // QString strUserName = "sa";
  200. // QString strUserPwd = "111111";
  201. // dBName = wsDBName;
  202. QString strconn = QString("Driver={SQL SERVER};SERVER=%1;DATABASE=%2;UID=%3;PWD=%4;")
  203. .arg(wsServer)
  204. //.arg(port)
  205. .arg(wsDBName)
  206. .arg(wsUser)
  207. .arg(wsPassword);
  208. objCnn = QSqlDatabase::addDatabase("QODBC",DBName + connectName());
  209. objCnn.setConnectOptions("MYSQL_OPT_RECONNECT=1;MYSQL_OPT_CONNECT_TIMEOUT=3");
  210. objCnn.setDatabaseName(strconn);
  211. objCnn.setConnectOptions();
  212. if (!objCnn.open())
  213. {
  214. QSqlError errDBase = objCnn.lastError();
  215. QString wsError = errDBase.text();
  216. if( wsError.contains("Can't connect") == true )
  217. {
  218. return emDBError_Server;
  219. }
  220. }
  221. }
  222. catch (QException& e)
  223. {
  224. qCritical()<< __FILE__<<__FUNCTION__<<__LINE__<< e.what();
  225. }
  226. }
  227. // if( m_bTraceDebug )
  228. {
  229. qDebug() << __FILE__<<__FUNCTION__<<__LINE__<< " db:" << DBName << (objCnn.isOpen() ? "connected.":"connect faild.");
  230. }
  231. // if (objCnn.isOpen())
  232. // {
  233. // GetTableColumnTypes();
  234. // }
  235. return objCnn.isOpen() ? emDBError_Success : emDBError_User;
  236. }
  237. #if 1
  238. bool DBStorage::insert2Data(const QString& sourceTable, const QMap<QString, QJsonObject>& dataMap)
  239. {
  240. bool bRet = false;
  241. QString strInsertSql;
  242. QString valData = "";
  243. QString dstTable = sourceTableMap[sourceTable];
  244. QString columnString = targetTableMap[dstTable].columns;
  245. QList<ColumnDef> columnDefines = targetTableMap[dstTable].columnDefs;
  246. // QList<ColumnDef>::Iterator columnIt = columnDefines.begin();
  247. for(auto columnIt = columnDefines.begin(); columnIt != columnDefines.end(); columnIt++){
  248. // qDebug() << columnIt->from << " -> " << columnIt->to;
  249. auto it = dataMap.find(columnIt->from);
  250. if(it == dataMap.end()){
  251. qDebug() << "ERROR :" << __FILE__ << __FUNCTION__<< __LINE__;
  252. return false;
  253. }
  254. QJsonObject val = it.value();
  255. // QString fromColumn = QString("%1.%2").arg(sourceTable,val["name"].toString());
  256. // if(fromColumn.compare(columnIt->from) != 0){
  257. // continue;
  258. // }
  259. if(!valData.isEmpty()){
  260. valData += ",";
  261. }
  262. int type = val["type"].toInt();
  263. switch (type) {
  264. case QMetaType::Type::QDateTime:
  265. if(dbtype == "MYSQL"){
  266. qint64 timestamp = val["value"].toVariant().toULongLong();
  267. QDateTime dt = QDateTime::fromMSecsSinceEpoch(timestamp/1000);
  268. valData += QString("\'%1.%2\'").arg(dt.toString("yyyy-MM-dd hh:mm:ss"),QString("%1").arg(timestamp%1000));
  269. // valData += QString("'%1'").arg(val["value"].toVariant().toULongLong());
  270. }
  271. break;
  272. case QMetaType::Type::ULongLong:
  273. valData += QString("%1").arg(val["value"].toVariant().toULongLong());
  274. break;
  275. case QMetaType::Type::QString:
  276. valData += QString("'%1'").arg(val["value"].toString());
  277. break;
  278. case QMetaType::Type::Double:
  279. valData += QString("%1").arg(val["value"].toVariant().toDouble());
  280. break;
  281. case QMetaType::Type::Float:
  282. valData += QString("%1").arg(val["value"].toVariant().toFloat());
  283. break;
  284. case QMetaType::Type::Int:
  285. valData += QString("%1").arg(val["value"].toVariant().toInt());
  286. break;
  287. case QMetaType::Type::LongLong:
  288. valData += QString("%1").arg(val["value"].toVariant().toLongLong());
  289. break;
  290. default:
  291. break;
  292. }
  293. }
  294. QString message;
  295. // 生成插入语句
  296. strInsertSql = QString::fromLocal8Bit("INSERT INTO %1(%2) VALUES(%3)").arg(dstTable).arg(columnString).arg(valData);
  297. // 执行插入语句
  298. bRet = excSql(strInsertSql);
  299. if (!bRet)
  300. {
  301. message = QString::fromLocal8Bit("数据入库失败");
  302. qCritical() << __FILE__<<__FUNCTION__<<__LINE__ << message << strInsertSql;
  303. }else{
  304. qDebug() << __FILE__<<__FUNCTION__<<__LINE__ << " insert success : " << strInsertSql;
  305. }
  306. return bRet;
  307. }
  308. #endif
  309. bool DBStorage::isConnected()
  310. {
  311. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  312. return objCnn.isOpen();
  313. }
  314. void DBStorage::close()
  315. {
  316. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  317. if( objCnn.isOpen() )
  318. {
  319. objCnn.close();
  320. }
  321. }
  322. bool DBStorage::excSql(QStringList sqlList,int& id)
  323. {
  324. if( sqlList.length() <= 0 )
  325. {
  326. return false;
  327. }
  328. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  329. if( objCnn.isOpen() == false )
  330. {
  331. open(dbtype, server,DBName, usr, passwd);
  332. }
  333. objCnn = QSqlDatabase::database(DBName + connectName());
  334. QSqlQuery query(objCnn);
  335. objCnn.transaction();
  336. bool bExecute = true;
  337. for( int n = 0 ; n < sqlList.length(); n++ )
  338. {
  339. query.exec( sqlList[n] );
  340. int nCount = query.numRowsAffected();
  341. if( 0 >= nCount )
  342. {
  343. QSqlError errQuery = query.lastError();
  344. QSqlError errDBase = objCnn.lastError();
  345. QString wsError = errDBase.text() + ":" + errQuery.text();
  346. qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << wsError;
  347. bExecute = false;
  348. break;
  349. }
  350. }
  351. if( bExecute )
  352. {
  353. objCnn.commit();
  354. if( query.exec( "select LAST_INSERT_ID()" ) )
  355. {
  356. if( query.next() )
  357. {
  358. id = query.value(0).toInt();
  359. }
  360. }
  361. }
  362. else
  363. {
  364. objCnn.rollback();
  365. }
  366. return bExecute;
  367. }
  368. bool DBStorage::excSql(QString strSql)
  369. {
  370. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  371. if( objCnn.isOpen() == false )
  372. {
  373. open(dbtype, server,DBName, usr, passwd);
  374. }
  375. objCnn = QSqlDatabase::database(DBName + connectName());
  376. QSqlQuery query(objCnn);
  377. bool bRet = query.exec(strSql);
  378. if( bRet == false )
  379. {
  380. QSqlError errDBase = objCnn.lastError();
  381. QSqlError errQuery = query.lastError();
  382. qCritical()<< __FILE__<<__FUNCTION__<<__LINE__ << "query.sql = " << strSql << " QueryError : " << errQuery.text();
  383. }
  384. return bRet;
  385. }
  386. QSqlQuery DBStorage::query(QString strSql)
  387. {
  388. QSqlDatabase objCnn = QSqlDatabase::database(DBName + connectName());
  389. if( objCnn.isOpen() == false )
  390. {
  391. open(dbtype,server,DBName, usr, passwd);
  392. }
  393. objCnn = QSqlDatabase::database(DBName + connectName());
  394. QSqlQuery query(objCnn);
  395. query.setForwardOnly(true);
  396. query.exec(strSql) ;
  397. return query;
  398. }
  399. void DBStorage::setLoader(QLibrary *)
  400. {
  401. }
  402. void DBStorage::run()
  403. {
  404. /*while(!isInterruptionRequested()){
  405. qDebug() << __FILE__ << __FUNCTION__;
  406. QThread::msleep(1000);
  407. }*/
  408. }
  409. void DBStorage::pushData()
  410. {
  411. // dataMap[dataName.toStdString()] = val;
  412. }
  413. Client* instance()
  414. {
  415. return new DBStorage();
  416. }
  417. void destroy(Client* pInstance)
  418. {
  419. if( pInstance )
  420. {
  421. delete pInstance;
  422. }
  423. }