Redis.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655
  1. #include "redis.h"
  2. #include <QThread>
  3. #include <QTimer>
  4. // void Test()
  5. //{
  6. // struct st
  7. // {
  8. // int a;
  9. // int b;
  10. // char c;
  11. // int d;
  12. // };
  13. // st t;
  14. // t.a = 1;
  15. // t.b = 2;
  16. // t.c = 'a';
  17. // t.d = 4;
  18. // const char *v[3];
  19. // size_t vlen[3];
  20. // v[0] = "set";
  21. // vlen[0] = strlen("set");
  22. // v[1] = "st";
  23. // vlen[1] = strlen("st");
  24. // v[2] = (const char *)&t;
  25. // vlen[2] = sizeof(struct st);
  26. // redisReply *r = (redisReply *)redisCommandArgv(rc, sizeof(v) / sizeof(v[0]), v, vlen);
  27. // r = (redisReply *)redisCommand(rc, "get st");
  28. // if (!r) {
  29. // printf("empty reply\n");
  30. // exit(1);
  31. // }
  32. // st *t2 = (st*)(r->str);
  33. // qDebug()<< "st2" << t2->a << t2->b << t2->c << t2->d;
  34. //}
  35. void getCallback(redisAsyncContext *ctx, void *r, void *privdata)
  36. {
  37. qDebug() << "getCallback called";
  38. redisReply *reply = static_cast<redisReply *>(r);
  39. if (reply == nullptr)
  40. {
  41. qDebug() << "The reply is nullptr";
  42. return;
  43. }
  44. for (int i = 0; i < reply->elements; i++)
  45. {
  46. qDebug() << "key: " << reply->element[i]->str;
  47. }
  48. if (QString(reply->element[0]->str) == "message") // 收到订阅消息
  49. {
  50. QString ch = QString(reply->element[1]->str);
  51. if (ch == "DevCh")
  52. {
  53. qDebug() << "devch...";
  54. // qDebug()<<"img:" << ++cnt;
  55. // QString js = QString(reply->element[2]->str);
  56. // QJsonObject jsonObj = QJsonDocument::fromJson(js.toLocal8Bit()).object(); //转成本地编码(utf-8), 中文正常
  57. // //quint32 type = jsonObj.value("type").toInt(); //toInt() 会丢数据
  58. // //qDebug()<<"type:" << type;
  59. // //QString base64Str = QString(reply->element[2]->str);
  60. // QString base64Str = jsonObj["img"].toString();
  61. // //qDebug()<<"str size:" << base64Str.size();
  62. // QPixmap baseimage;
  63. // baseimage.loadFromData(QByteArray::fromBase64(base64Str.toLatin1()));
  64. // QPixmap scaledPixmap = baseimage.scaled(500, 500, Qt::KeepAspectRatio); //设置图片大小并设置为按比例缩放
  65. // lab->setPixmap(scaledPixmap); //label放置图片并显示
  66. }
  67. else if (ch == "Alarm")
  68. {
  69. qDebug() << "alarm...";
  70. }
  71. else if (ch == "Led")
  72. {
  73. qDebug() << "led...";
  74. }
  75. }
  76. redisAsyncCommand(ctx, getCallback, nullptr, "blpop lTest 0");
  77. }
  78. void connectCallback(const redisAsyncContext *c, int status)
  79. {
  80. if (status != REDIS_OK)
  81. {
  82. qDebug("Error1: %s\n", c->errstr);
  83. // redisOk = false;
  84. return;
  85. }
  86. qDebug("Connected...\n");
  87. }
  88. void disconnectCallback(const redisAsyncContext *c, int status)
  89. {
  90. if (status != REDIS_OK)
  91. {
  92. qDebug("Error2: %s\n", c->errstr);
  93. // redisOk = false;
  94. return;
  95. }
  96. qDebug("Disconnected...\n");
  97. }
  98. EventSubInterface* g_pSubCB; // 订阅回调
  99. void RedisCB(redisAsyncContext *ctx, void *r, void *privdata)
  100. {
  101. redisReply *reply = static_cast<redisReply*>(r);
  102. if( reply->elements < 3)
  103. {
  104. return;
  105. }
  106. std::string type = reply->element[0]->str;
  107. if(strcasecmp(type.c_str(), "message") != 0)
  108. {
  109. return;
  110. }
  111. std::string chanel = reply->element[1]->str;
  112. std::string msg = reply->element[2]->str;
  113. if( g_pSubCB != nullptr )
  114. {
  115. g_pSubCB->SubCB((char*)chanel.c_str(), (char*)msg.c_str());
  116. }
  117. }
  118. Redis::Redis(QObject *parent) : QObject(parent)
  119. {
  120. redisOk = false;
  121. g_pSubCB = nullptr;
  122. // conn();
  123. }
  124. Redis::~Redis()
  125. {
  126. delete rc;
  127. delete rac;
  128. }
  129. void Redis::conn()
  130. {
  131. if (!redisOk) // 未连接
  132. {
  133. // 同步连接
  134. rc = redisConnect(ip.toUtf8(), port);
  135. if (rc == nullptr || rc->err)
  136. {
  137. qDebug() << "redis err:" << rc->errstr;
  138. }
  139. else
  140. {
  141. QString authCmd = "auth " + auth; // 认证
  142. redisReply *reply = (redisReply *)redisCommand(rc, authCmd.toUtf8());
  143. // qDebug()<< "auth:" << reply->str;
  144. if (QString::compare(reply->str, "OK"))
  145. {
  146. qDebug() << "redis connect fail.";
  147. }
  148. else
  149. {
  150. redisOk = true;
  151. qDebug() << "redis connect successfully.";
  152. }
  153. }
  154. }
  155. else
  156. {
  157. // qDebug()<< "~ ";
  158. }
  159. }
  160. void Redis::start()
  161. {
  162. conn();
  163. //开启一个定时器, 每秒检测一次是否断开, 断开后自动连接redis服务器
  164. QTimer *redisTimer = new QTimer();
  165. connect(redisTimer, SIGNAL(timeout()), this, SLOT(conn()));
  166. redisTimer->start(1000);
  167. }
  168. bool Redis::hset(QString k, QString f, QString v)
  169. {
  170. // 如果 field 是哈希表中的一个新建域,并且值设置成功,reply->integer为1
  171. // 如果哈希表中域 field 已经存在且旧值已被新值覆盖,reply->integer为0
  172. // 无法区别是否成功
  173. bool ret = false;
  174. redisReply *reply = (redisReply *)redisCommand(rc, "hset %s %s %s", k.toStdString().c_str(), f.toStdString().c_str(), v.toStdString().c_str()); // 支持空格
  175. if (rc->err)
  176. {
  177. redisOk = false;
  178. qDebug() << rc->errstr;
  179. }
  180. else
  181. {
  182. // qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  183. ret = true;
  184. }
  185. return ret;
  186. }
  187. QString Redis::hget(QString k, QString f)
  188. {
  189. QString ret;
  190. redisReply *reply = (redisReply *)redisCommand(rc, QString("hget " + k + " " + f).toStdString().c_str());
  191. if (rc->err)
  192. {
  193. redisOk = false;
  194. qDebug() << rc->errstr;
  195. }
  196. else
  197. {
  198. // type:4表示不存在,type:1表示返回的为字符串
  199. // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1
  200. if (reply->len)
  201. {
  202. ret = QString(reply->str);
  203. }
  204. }
  205. return ret;
  206. }
  207. QStringList Redis::hkeys(QString k)
  208. {
  209. QStringList ret;
  210. redisReply *reply = (redisReply *)redisCommand(rc, QString("hkeys " + k).toStdString().c_str());
  211. if (rc->err)
  212. {
  213. redisOk = false;
  214. qDebug() << rc->errstr;
  215. }
  216. else if (reply->type == REDIS_REPLY_ARRAY)
  217. {
  218. // type:4表示不存在,type:1表示返回的为字符串
  219. // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1
  220. for (size_t i = 0; i < reply->elements; i++)
  221. {
  222. // qDebug() << "key: " << i << reply->element[i]->str;
  223. ret.append(QString(reply->element[i]->str));
  224. }
  225. }
  226. return ret;
  227. }
  228. QStringList Redis::hvals(QString k)
  229. {
  230. QStringList ret;
  231. redisReply *reply = (redisReply *)redisCommand(rc, QString("hvals " + k).toStdString().c_str());
  232. if (rc->err)
  233. {
  234. redisOk = false;
  235. qDebug() << rc->errstr;
  236. }
  237. else if (reply->type == REDIS_REPLY_ARRAY)
  238. {
  239. // type:4表示不存在,type:1表示返回的为字符串
  240. // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1
  241. for (size_t i = 0; i < reply->elements; i++)
  242. {
  243. // qDebug() << "key: " << i << reply->element[i]->str;
  244. ret.append(QString(reply->element[i]->str));
  245. }
  246. }
  247. return ret;
  248. }
  249. QHash<QString, QString> Redis::hgetall(QString k)
  250. {
  251. QHash<QString, QString> ret;
  252. redisReply *reply = (redisReply *)redisCommand(rc, QString("hgetall " + k).toStdString().c_str());
  253. if (rc->err)
  254. {
  255. redisOk = false;
  256. qDebug() << rc->errstr;
  257. }
  258. else if (reply->type == REDIS_REPLY_ARRAY)
  259. {
  260. // type:4表示不存在,type:1表示返回的为字符串
  261. // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1
  262. for (size_t i = 0; i < reply->elements; i += 2)
  263. {
  264. // qDebug() << "key: " << i << reply->element[i]->str;
  265. ret.insert(QString(reply->element[i]->str), QString(reply->element[i + 1]->str));
  266. }
  267. }
  268. return ret;
  269. }
  270. // 向队列尾(右)部加入字符串数据
  271. bool Redis::rpush(QString lData, QString s)
  272. {
  273. bool ret = false;
  274. // redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str());
  275. redisReply *reply = (redisReply *)redisCommand(rc, "rpush %s %s", lData.toStdString().c_str(), s.toStdString().c_str()); // 支持空格
  276. if (rc->err)
  277. {
  278. redisOk = false;
  279. qDebug() << rc->errstr;
  280. }
  281. else
  282. {
  283. // qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  284. // reply->integer为队列的个数,理论应该大于0
  285. if (reply->integer)
  286. {
  287. ret = true;
  288. }
  289. }
  290. return ret;
  291. }
  292. // 向队列尾(右)部加入二进制数据
  293. bool Redis::rpushb(QString lData, QByteArray ba)
  294. {
  295. bool ret = false;
  296. const char *arg[3]; // 3个参数(cmd, k, v)
  297. size_t vlen[3]; // 3个参数长度
  298. arg[0] = "rpush";
  299. vlen[0] = strlen("rpush");
  300. arg[1] = lData.toStdString().c_str();
  301. vlen[1] = lData.size();
  302. arg[2] = (const char *)ba.data();
  303. vlen[2] = ba.size();
  304. redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen);
  305. // redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str());
  306. if (rc->err)
  307. {
  308. redisOk = false;
  309. qDebug() << rc->errstr;
  310. }
  311. else
  312. {
  313. // qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  314. // reply->integer为队列的个数,理论应该大于0
  315. if (reply->integer)
  316. {
  317. ret = true;
  318. }
  319. }
  320. return ret;
  321. }
  322. QString Redis::lpop(QString lData)
  323. {
  324. QString ret;
  325. redisReply *reply = (redisReply *)redisCommand(rc, QString("lpop " + lData).toStdString().c_str());
  326. if (rc->err)
  327. {
  328. redisOk = false;
  329. qDebug() << rc->errstr;
  330. }
  331. else if (reply->len)
  332. {
  333. ret = QString(reply->str);
  334. }
  335. return ret;
  336. }
  337. // 阻塞从队列头部获取最早数据
  338. bool Redis::blpop(QString lData, redisCallbackFn *fn)
  339. {
  340. if (rac == nullptr)
  341. {
  342. // 异步连接
  343. rac = redisAsyncConnect(ip.toUtf8(), port);
  344. if (rac->err)
  345. {
  346. qDebug() << "error: " << rac->errstr;
  347. redisAsyncFree(rac);
  348. }
  349. else
  350. {
  351. adapter.setContext(rac);
  352. redisAsyncSetConnectCallback(rac, connectCallback);
  353. redisAsyncSetDisconnectCallback(rac, disconnectCallback);
  354. QString authCmd = "auth " + auth; // 认证
  355. redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8());
  356. qDebug() << "redis ok.";
  357. redisOk = true;
  358. }
  359. }
  360. if (rac && redisOk)
  361. {
  362. redisAsyncCommand(rac, fn, nullptr, ("blpop " + lData + " 0").toStdString().c_str());
  363. return true;
  364. }
  365. else
  366. {
  367. return false;
  368. }
  369. }
  370. // 同步阻塞一定时间返回数据
  371. QString Redis::blpop(QString lData, quint32 timeout)
  372. {
  373. // 最小为1,防止0时永久阻塞
  374. if (timeout < 1)
  375. {
  376. timeout = 1;
  377. }
  378. QString ret;
  379. redisReply *reply = (redisReply *)redisCommand(rc, QString("blpop " + lData + " " + QString::number(timeout)).toStdString().c_str());
  380. // for(int i = 0; i < reply->elements; i++){
  381. // qDebug() << "key: " << reply->element[i]->str;
  382. // }
  383. if (rc->err)
  384. {
  385. redisOk = false;
  386. qDebug() << rc->errstr;
  387. }
  388. else if (reply->elements > 1)
  389. {
  390. ret = QString(reply->element[1]->str);
  391. }
  392. return ret;
  393. }
  394. bool Redis::set(QString k, QString v)
  395. {
  396. bool ret = false;
  397. // redisReply * reply = (redisReply*)redisCommand(rc, QString("set " + k + " " + v).toStdString().c_str()); //不支持空格
  398. redisReply *reply = (redisReply *)redisCommand(rc, "set %s %s", k.toStdString().c_str(), v.toStdString().c_str()); // 支持空格
  399. if (rc->err)
  400. {
  401. redisOk = false;
  402. qDebug() << rc->errstr;
  403. }
  404. else
  405. {
  406. // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2
  407. if (reply->type == 6)
  408. {
  409. qDebug() << "err:" << v;
  410. }
  411. // 返回OK为成功,长度2为str长度
  412. ret = true;
  413. }
  414. return ret;
  415. }
  416. bool Redis::setb(QString k, QByteArray &v)
  417. {
  418. bool ret = false;
  419. const char *arg[3]; // 3个参数(cmd, k, v)
  420. size_t vlen[3]; // 3个参数长度
  421. arg[0] = "set";
  422. vlen[0] = strlen("set");
  423. arg[1] = k.toStdString().c_str();
  424. vlen[1] = k.size();
  425. arg[2] = (const char *)v.data();
  426. vlen[2] = v.size();
  427. redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen);
  428. if (rc->err)
  429. {
  430. redisOk = false;
  431. qDebug() << rc->errstr;
  432. }
  433. else
  434. {
  435. // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2
  436. if (reply->type == 6)
  437. {
  438. qDebug() << "err:" << v;
  439. }
  440. // 返回OK为成功,长度2为str长度
  441. ret = true;
  442. }
  443. return ret;
  444. }
  445. QString Redis::get(QString k)
  446. {
  447. QString ret;
  448. redisReply *reply = (redisReply *)redisCommand(rc, QString("get " + k).toStdString().c_str());
  449. if (rc->err)
  450. {
  451. redisOk = false;
  452. qDebug() << rc->errstr;
  453. }
  454. else if (reply->len)
  455. {
  456. ret = QString(reply->str);
  457. }
  458. return ret;
  459. }
  460. QByteArray Redis::getb(QString k)
  461. {
  462. QByteArray ret;
  463. redisReply *reply = (redisReply *)redisCommand(rc, QString("get " + k).toStdString().c_str());
  464. if (rc->err)
  465. {
  466. redisOk = false;
  467. qDebug() << rc->errstr;
  468. }
  469. else if (reply->len)
  470. {
  471. ret = QByteArray(reply->str, reply->len); // 加上长度,保证二进制安全
  472. // qDebug()<< reply->len << ret.size();
  473. }
  474. return ret;
  475. }
  476. bool Redis::expire(QString k, int sec)
  477. {
  478. bool ret = false;
  479. redisReply *reply = (redisReply *)redisCommand(rc, QString("expire " + k + " " + QString::number(sec, 10)).toStdString().c_str());
  480. if (rc->err)
  481. {
  482. redisOk = false;
  483. qDebug() << rc->errstr;
  484. }
  485. else
  486. {
  487. // qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  488. // K不存在为0,存在为1
  489. if (reply->integer)
  490. {
  491. ret = true;
  492. }
  493. }
  494. return ret;
  495. }
  496. bool Redis::publish(QString ch, QString s)
  497. {
  498. bool ret = false;
  499. // redisReply * reply = (redisReply*)redisCommand(rc, QString("publish " + ch + " " + js).toStdString().c_str());
  500. redisReply *reply = (redisReply *)redisCommand(rc, "publish %s %s", ch.toStdString().c_str(), s.toStdString().c_str()); // 支持空格
  501. if (rc->err)
  502. {
  503. redisOk = false;
  504. qDebug() << rc->errstr;
  505. }
  506. else
  507. {
  508. // qDebug() <<"type:" << reply->type << reply->integer; //type:3, integer:2
  509. // integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可
  510. ret = true;
  511. }
  512. return ret;
  513. }
  514. bool Redis::publishb(QString ch, QByteArray ba)
  515. {
  516. bool ret = false;
  517. const char *arg[3]; // 3个参数(cmd, k, v)
  518. size_t vlen[3]; // 3个参数长度
  519. arg[0] = "publish";
  520. vlen[0] = strlen("publish");
  521. arg[1] = ch.toStdString().c_str();
  522. vlen[1] = ch.size();
  523. arg[2] = (const char *)ba.data();
  524. vlen[2] = ba.size();
  525. redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen);
  526. // redisReply * reply = (redisReply*)redisCommand(rc, QString("publish " + ch + " " + js).toStdString().c_str());
  527. if (rc->err)
  528. {
  529. redisOk = false;
  530. qDebug() << rc->errstr;
  531. }
  532. else
  533. {
  534. qDebug() << "type:" << reply->type << reply->integer; // type:3, integer:2
  535. // integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可
  536. ret = true;
  537. }
  538. return ret;
  539. }
  540. void Redis::subscribe(QString ch, EventSubInterface *fn)
  541. {
  542. g_pSubCB = fn;
  543. if (rac == nullptr)
  544. {
  545. // 异步连接
  546. rac = redisAsyncConnect(ip.toUtf8(), port);
  547. if (rac->err)
  548. {
  549. qDebug() << "error: " << rac->errstr;
  550. redisAsyncFree(rac);
  551. }
  552. else
  553. {
  554. adapter.setContext(rac);
  555. redisAsyncSetConnectCallback(rac, connectCallback);
  556. redisAsyncSetDisconnectCallback(rac, disconnectCallback);
  557. QString authCmd = "auth " + auth; // 认证
  558. redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8());
  559. qDebug() << "redis ok.";
  560. redisOk = true;
  561. }
  562. }
  563. if (rac && redisOk)
  564. {
  565. redisAsyncCommand(rac, RedisCB, nullptr, QString("subscribe " + ch).toStdString().c_str());
  566. }
  567. }
  568. void Redis::psubscribe(QString ch, EventSubInterface *fn)
  569. {
  570. g_pSubCB = fn;
  571. if (rac == nullptr)
  572. {
  573. // 异步连接
  574. rac = redisAsyncConnect(ip.toUtf8(), port);
  575. if (rac->err)
  576. {
  577. qDebug() << "error: " << rac->errstr;
  578. redisAsyncFree(rac);
  579. }
  580. else
  581. {
  582. adapter.setContext(rac);
  583. redisAsyncSetConnectCallback(rac, connectCallback);
  584. redisAsyncSetDisconnectCallback(rac, disconnectCallback);
  585. QString authCmd = "auth " + auth; // 认证
  586. redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8());
  587. qDebug() << "redis ok.";
  588. redisOk = true;
  589. }
  590. }
  591. if (rac && redisOk)
  592. {
  593. redisAsyncCommand(rac, RedisCB, nullptr, QString("psubscribe " + ch).toStdString().c_str());
  594. }
  595. }
  596. void Redis::Setup(tagSetup ts)
  597. {
  598. ip = ts.addr.c_str();
  599. port = ts.port;
  600. auth = ts.password.c_str();
  601. redisOk = false;
  602. }