RedisClient.cpp 14 KB


  1. #include "RedisClient.h"
  2. #include <QThread>
  3. #include <QTimer>
  4. //void Test()
  5. //{
  6. // struct st
  7. // {
  8. // int a;
  9. // int b;
  10. // char c;
  11. // int d;
  12. // };
  13. // st t;
  14. // t.a = 1;
  15. // t.b = 2;
  16. // t.c = 'a';
  17. // t.d = 4;
  18. // const char *v[3];
  19. // size_t vlen[3];
  20. // v[0] = "set";
  21. // vlen[0] = strlen("set");
  22. // v[1] = "st";
  23. // vlen[1] = strlen("st");
  24. // v[2] = (const char *)&t;
  25. // vlen[2] = sizeof(struct st);
  26. // redisReply *r = (redisReply *)redisCommandArgv(rc, sizeof(v) / sizeof(v[0]), v, vlen);
  27. // r = (redisReply *)redisCommand(rc, "get st");
  28. // if (!r) {
  29. // printf("empty reply\n");
  30. // exit(1);
  31. // }
  32. // st *t2 = (st*)(r->str);
  33. // qDebug()<< "st2" << t2->a << t2->b << t2->c << t2->d;
  34. //}
  35. void getCallback(redisAsyncContext *ctx, void *r, void *privdata){
  36. qDebug() << "getCallback called";
  37. redisReply *reply = static_cast<redisReply*>(r);
  38. if(reply == nullptr){
  39. qDebug() << "The reply is nullptr";
  40. return;
  41. }
  42. for(int i = 0; i < reply->elements; i++){
  43. qDebug() << "key: " << reply->element[i]->str;
  44. }
  45. if (QString(reply->element[0]->str) == "message") //收到订阅消息
  46. {
  47. QString ch = QString(reply->element[1]->str);
  48. if (ch == "DevCh")
  49. {
  50. qDebug()<<"devch...";
  51. // qDebug()<<"img:" << ++cnt;
  52. // QString js = QString(reply->element[2]->str);
  53. // QJsonObject jsonObj = QJsonDocument::fromJson(js.toLocal8Bit()).object(); //转成本地编码(utf-8), 中文正常
  54. // //quint32 type = jsonObj.value("type").toInt(); //toInt() 会丢数据
  55. // //qDebug()<<"type:" << type;
  56. // //QString base64Str = QString(reply->element[2]->str);
  57. // QString base64Str = jsonObj["img"].toString();
  58. // //qDebug()<<"str size:" << base64Str.size();
  59. // QPixmap baseimage;
  60. // baseimage.loadFromData(QByteArray::fromBase64(base64Str.toLatin1()));
  61. // QPixmap scaledPixmap = baseimage.scaled(500, 500, Qt::KeepAspectRatio); //设置图片大小并设置为按比例缩放
  62. // lab->setPixmap(scaledPixmap); //label放置图片并显示
  63. }
  64. else if (ch == "Alarm")
  65. {
  66. qDebug()<<"alarm...";
  67. }
  68. else if (ch == "Led")
  69. {
  70. qDebug()<<"led...";
  71. }
  72. }
  73. redisAsyncCommand(ctx, getCallback, nullptr, "blpop lTest 0");
  74. }
  75. void connectCallback(const redisAsyncContext *c, int status) {
  76. if (status != REDIS_OK) {
  77. qDebug("Error1: %s\n", c->errstr);
  78. //redisOk = false;
  79. return;
  80. }
  81. qDebug("Connected...\n");
  82. }
  83. void disconnectCallback(const redisAsyncContext *c, int status) {
  84. if (status != REDIS_OK) {
  85. qDebug("Error2: %s\n", c->errstr);
  86. //redisOk = false;
  87. return;
  88. }
  89. qDebug("Disconnected...\n");
  90. }
  91. RedisClient::RedisClient(QObject *parent) : QObject(parent)
  92. {
  93. redisOk = false;
  94. // conn();
  95. }
  96. RedisClient::~RedisClient()
  97. {
  98. delete rc;
  99. delete rac;
  100. }
  101. void RedisClient::conn()
  102. {
  103. if (!redisOk) //未连接
  104. {
  105. //同步连接
  106. rc = redisConnect(ip.toUtf8(), port);
  107. if(rc == nullptr || rc->err)
  108. {
  109. qDebug()<< "redis err:" << rc->errstr;
  110. }
  111. else
  112. {
  113. QString authCmd = "auth " + auth; //认证
  114. redisReply * reply = (redisReply*)redisCommand(rc, authCmd.toUtf8());
  115. //qDebug()<< "auth:" << reply->str;
  116. if (QString::compare(reply->str, "OK"))
  117. {
  118. qDebug() << "redis connect fail.";
  119. }
  120. else
  121. {
  122. redisOk = true;
  123. qDebug() << "redis connect successfully.";
  124. }
  125. }
  126. }
  127. else
  128. {
  129. //qDebug()<< "~ ";
  130. }
  131. }
  132. void RedisClient::start()
  133. {
  134. conn();
  135. //开启一个定时器, 每秒检测一次是否断开, 断开后自动连接redis服务器
  136. QTimer *redisTimer = new QTimer();
  137. connect(redisTimer, SIGNAL(timeout()), this, SLOT(conn()));
  138. redisTimer->start(1000);
  139. }
  140. bool RedisClient::hset(QString m, QString k, QString v)
  141. {
  142. //如果 field 是哈希表中的一个新建域,并且值设置成功,reply->integer为1
  143. //如果哈希表中域 field 已经存在且旧值已被新值覆盖,reply->integer为0
  144. //无法区别是否成功
  145. bool ret = false;
  146. redisReply * reply = (redisReply*)redisCommand(rc, QString("hset " + m + " " + k + " " + v).toStdString().c_str());
  147. if (rc->err)
  148. {
  149. redisOk = false;
  150. qDebug() << rc->errstr;
  151. }
  152. else
  153. {
  154. //qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  155. ret = true;
  156. }
  157. return ret;
  158. }
  159. QString RedisClient::hget(QString m, QString k)
  160. {
  161. QString ret;
  162. redisReply * reply = (redisReply*)redisCommand(rc, QString("hget " + m + " " + k).toStdString().c_str());
  163. if (rc->err)
  164. {
  165. redisOk = false;
  166. qDebug() << rc->errstr;
  167. }
  168. else
  169. {
  170. //type:4表示不存在,type:1表示返回的为字符串
  171. //qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1
  172. if (reply->len)
  173. {
  174. ret = QString(reply->str);
  175. }
  176. }
  177. return ret;
  178. }
  179. //向队列尾(右)部加入字符串数据
  180. bool RedisClient::rpush(QString lData, QString js)
  181. {
  182. bool ret = false;
  183. redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str());
  184. if (rc->err)
  185. {
  186. redisOk = false;
  187. qDebug() << rc->errstr;
  188. }
  189. else
  190. {
  191. //qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  192. //reply->integer为队列的个数,理论应该大于0
  193. if (reply->integer)
  194. {
  195. ret = true;
  196. }
  197. }
  198. return ret;
  199. }
  200. //向队列尾(右)部加入二进制数据
  201. bool RedisClient::rpushb(QString lData, QByteArray ba)
  202. {
  203. bool ret = false;
  204. const char *arg[3]; //3个参数(cmd, k, v)
  205. size_t vlen[3]; //3个参数长度
  206. arg[0] = "rpush";
  207. vlen[0] = strlen("rpush");
  208. arg[1] = lData.toStdString().c_str();
  209. vlen[1] = lData.size();
  210. arg[2] = (const char *)ba.data();
  211. vlen[2] = ba.size();
  212. redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen);
  213. //redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str());
  214. if (rc->err)
  215. {
  216. redisOk = false;
  217. qDebug() << rc->errstr;
  218. }
  219. else
  220. {
  221. //qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  222. //reply->integer为队列的个数,理论应该大于0
  223. if (reply->integer)
  224. {
  225. ret = true;
  226. }
  227. }
  228. return ret;
  229. }
  230. QString RedisClient::lpop(QString lData)
  231. {
  232. QString ret;
  233. redisReply * reply = (redisReply*)redisCommand(rc, QString("lpop " + lData).toStdString().c_str());
  234. if (rc->err)
  235. {
  236. redisOk = false;
  237. qDebug() << rc->errstr;
  238. }
  239. else if (reply->len)
  240. {
  241. ret = QString(reply->str);
  242. }
  243. return ret;
  244. }
  245. //阻塞从队列头部获取最早数据
  246. bool RedisClient::blpop(QString lData, redisCallbackFn *fn)
  247. {
  248. if (rac == nullptr)
  249. {
  250. //异步连接
  251. rac = redisAsyncConnect(ip.toUtf8(), port);
  252. if(rac->err)
  253. {
  254. qDebug()<< "error: " << rac->errstr;
  255. redisAsyncFree(rac);
  256. }
  257. else
  258. {
  259. adapter.setContext(rac);
  260. redisAsyncSetConnectCallback(rac, connectCallback);
  261. redisAsyncSetDisconnectCallback(rac, disconnectCallback);
  262. QString authCmd = "auth " + auth; //认证
  263. redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8());
  264. qDebug()<< "redis ok.";
  265. redisOk = true;
  266. }
  267. }
  268. if (rac && redisOk)
  269. {
  270. redisAsyncCommand(rac, fn, nullptr, ("blpop " + lData + " 0").toStdString().c_str());
  271. return true;
  272. }
  273. else
  274. {
  275. return false;
  276. }
  277. }
  278. //同步阻塞一定时间返回数据
  279. QString RedisClient::blpop(QString lData, quint32 timeout)
  280. {
  281. //最小为1,防止0时永久阻塞
  282. if (timeout < 1)
  283. {
  284. timeout = 1;
  285. }
  286. QString ret;
  287. redisReply * reply = (redisReply*)redisCommand(rc, QString("blpop " + lData + " " + QString::number(timeout)).toStdString().c_str());
  288. // for(int i = 0; i < reply->elements; i++){
  289. // qDebug() << "key: " << reply->element[i]->str;
  290. // }
  291. if (rc->err)
  292. {
  293. redisOk = false;
  294. qDebug() << rc->errstr;
  295. }
  296. else if (reply->elements > 1)
  297. {
  298. ret = QString(reply->element[1]->str);
  299. }
  300. return ret;
  301. }
  302. bool RedisClient::set(QString k, QString v)
  303. {
  304. bool ret = false;
  305. redisReply * reply = (redisReply*)redisCommand(rc, QString("set " + k + " " + v).toStdString().c_str()); //支持空格
  306. if (rc->err)
  307. {
  308. redisOk = false;
  309. qDebug() << rc->errstr;
  310. }
  311. else
  312. {
  313. //qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2
  314. if (reply->type == 6)
  315. {
  316. qDebug()<<"err:" << v;
  317. }
  318. //返回OK为成功,长度2为str长度
  319. ret = true;
  320. }
  321. return ret;
  322. }
  323. bool RedisClient::setb(QString k, QByteArray &v)
  324. {
  325. bool ret = false;
  326. const char *arg[3]; //3个参数(cmd, k, v)
  327. size_t vlen[3]; //3个参数长度
  328. arg[0] = "set";
  329. vlen[0] = strlen("set");
  330. arg[1] = k.toStdString().c_str();
  331. vlen[1] = k.size();
  332. arg[2] = (const char *)v.data();
  333. vlen[2] = v.size();
  334. redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen);
  335. if (rc->err)
  336. {
  337. redisOk = false;
  338. qDebug() << rc->errstr;
  339. }
  340. else
  341. {
  342. //qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2
  343. if (reply->type == 6)
  344. {
  345. qDebug()<<"err:" << v;
  346. }
  347. //返回OK为成功,长度2为str长度
  348. ret = true;
  349. }
  350. return ret;
  351. }
  352. QString RedisClient::get(QString k)
  353. {
  354. QString ret;
  355. redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str());
  356. if (rc->err)
  357. {
  358. redisOk = false;
  359. qDebug() << rc->errstr;
  360. }
  361. else if (reply->len)
  362. {
  363. ret = QString(reply->str);
  364. }
  365. return ret;
  366. }
  367. QByteArray RedisClient::getb(QString k)
  368. {
  369. QByteArray ret;
  370. redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str());
  371. if (rc->err)
  372. {
  373. redisOk = false;
  374. qDebug() << rc->errstr;
  375. }
  376. else if (reply->len)
  377. {
  378. ret = QByteArray(reply->str, reply->len); //加上长度,保证二进制安全
  379. //qDebug()<< reply->len << ret.size();
  380. }
  381. return ret;
  382. }
  383. bool RedisClient::expire(QString k,int sec)
  384. {
  385. bool ret = false;
  386. redisReply * reply = (redisReply*)redisCommand(rc, QString("expire " + k + " " + QString::number(sec, 10)).toStdString().c_str());
  387. if (rc->err)
  388. {
  389. redisOk = false;
  390. qDebug() << rc->errstr;
  391. }
  392. else
  393. {
  394. //qDebug() <<"type:" << reply->type << reply->integer; //type = 3
  395. //K不存在为0,存在为1
  396. if (reply->integer)
  397. {
  398. ret = true;
  399. }
  400. }
  401. return ret;
  402. }
  403. bool RedisClient::publish(const QString& ch, const QString& js)
  404. {
  405. bool ret = false;
  406. QString pub = QString("publish " + ch + " " + js);
  407. redisReply * reply = (redisReply*)redisCommand(rc, pub.toStdString().c_str());
  408. qDebug() << __FUNCTION__ << pub;
  409. if (rc->err)
  410. {
  411. redisOk = false;
  412. qDebug() << rc->errstr;
  413. }
  414. else
  415. {
  416. //qDebug() <<"type:" << reply->type << reply->integer; //type:3, integer:2
  417. //integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可
  418. ret = true;
  419. }
  420. return ret;
  421. }
  422. bool RedisClient::publishb(const QString& ch,const QByteArray& ba)
  423. {
  424. bool ret = false;
  425. const char *arg[3]; //3个参数(cmd, k, v)
  426. size_t vlen[3]; //3个参数长度
  427. arg[0] = "publish";
  428. vlen[0] = strlen("publish");
  429. arg[1] = ch.toStdString().c_str();
  430. vlen[1] = ch.size();
  431. arg[2] = (const char *)ba.data();
  432. vlen[2] = ba.size();
  433. redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen);
  434. //redisReply * reply = (redisReply*)redisCommand(rc, QString("publish " + ch + " " + js).toStdString().c_str());
  435. if (rc->err)
  436. {
  437. redisOk = false;
  438. qDebug() << rc->errstr;
  439. }
  440. else
  441. {
  442. qDebug() <<"type:" << reply->type << reply->integer; //type:3, integer:2
  443. //integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可
  444. ret = true;
  445. }
  446. return ret;
  447. }
  448. void RedisClient::subscribe(QString ch, redisCallbackFn *fn, void* data)
  449. {
  450. if (rac == nullptr)
  451. {
  452. //异步连接
  453. rac = redisAsyncConnect(ip.toUtf8(), port);
  454. if(rac->err)
  455. {
  456. qDebug()<< "error: " << rac->errstr;
  457. redisAsyncFree(rac);
  458. }
  459. else
  460. {
  461. adapter.setContext(rac);
  462. redisAsyncSetConnectCallback(rac, connectCallback);
  463. redisAsyncSetDisconnectCallback(rac, disconnectCallback);
  464. QString authCmd = "auth " + auth; //认证
  465. redisAsyncCommand(rac, nullptr, data, authCmd.toUtf8());
  466. qDebug()<< "redis ok.";
  467. redisOk = true;
  468. }
  469. }
  470. if (rac && redisOk)
  471. {
  472. redisAsyncCommand(rac, fn, data, QString("subscribe " + ch).toStdString().c_str());
  473. }
  474. }
  475. void RedisClient::psubscribe(QString ch, redisCallbackFn *fn, void* data)
  476. {
  477. if (rac == nullptr)
  478. {
  479. //异步连接
  480. rac = redisAsyncConnect(ip.toUtf8(), port);
  481. if(rac->err)
  482. {
  483. qDebug()<< "error: " << rac->errstr;
  484. redisAsyncFree(rac);
  485. }
  486. else
  487. {
  488. adapter.setContext(rac);
  489. redisAsyncSetConnectCallback(rac, connectCallback);
  490. redisAsyncSetDisconnectCallback(rac, disconnectCallback);
  491. QString authCmd = "auth " + auth; //认证
  492. redisAsyncCommand(rac, nullptr, data, authCmd.toUtf8());
  493. qDebug()<< "redis ok.";
  494. redisOk = true;
  495. }
  496. }
  497. if (rac && redisOk)
  498. {
  499. redisAsyncCommand(rac, fn, data, QString("psubscribe " + ch).toStdString().c_str());
  500. }
  501. }
  502. void RedisClient::Setup(std::string addr, uint _port, std::string password)
  503. {
  504. ip = addr.c_str();
  505. port = _port;
  506. auth = password.c_str();
  507. redisOk = false;
  508. }