#include "RedisClient.h" #include #include //void Test() //{ // struct st // { // int a; // int b; // char c; // int d; // }; // st t; // t.a = 1; // t.b = 2; // t.c = 'a'; // t.d = 4; // const char *v[3]; // size_t vlen[3]; // v[0] = "set"; // vlen[0] = strlen("set"); // v[1] = "st"; // vlen[1] = strlen("st"); // v[2] = (const char *)&t; // vlen[2] = sizeof(struct st); // redisReply *r = (redisReply *)redisCommandArgv(rc, sizeof(v) / sizeof(v[0]), v, vlen); // r = (redisReply *)redisCommand(rc, "get st"); // if (!r) { // printf("empty reply\n"); // exit(1); // } // st *t2 = (st*)(r->str); // qDebug()<< "st2" << t2->a << t2->b << t2->c << t2->d; //} void getCallback(redisAsyncContext *ctx, void *r, void *privdata){ qDebug() << "getCallback called"; redisReply *reply = static_cast(r); if(reply == nullptr){ qDebug() << "The reply is nullptr"; return; } for(int i = 0; i < reply->elements; i++){ qDebug() << "key: " << reply->element[i]->str; } if (QString(reply->element[0]->str) == "message") //收到订阅消息 { QString ch = QString(reply->element[1]->str); if (ch == "DevCh") { qDebug()<<"devch..."; // qDebug()<<"img:" << ++cnt; // QString js = QString(reply->element[2]->str); // QJsonObject jsonObj = QJsonDocument::fromJson(js.toLocal8Bit()).object(); //转成本地编码(utf-8), 中文正常 // //quint32 type = jsonObj.value("type").toInt(); //toInt() 会丢数据 // //qDebug()<<"type:" << type; // //QString base64Str = QString(reply->element[2]->str); // QString base64Str = jsonObj["img"].toString(); // //qDebug()<<"str size:" << base64Str.size(); // QPixmap baseimage; // baseimage.loadFromData(QByteArray::fromBase64(base64Str.toLatin1())); // QPixmap scaledPixmap = baseimage.scaled(500, 500, Qt::KeepAspectRatio); //设置图片大小并设置为按比例缩放 // lab->setPixmap(scaledPixmap); //label放置图片并显示 } else if (ch == "Alarm") { qDebug()<<"alarm..."; } else if (ch == "Led") { qDebug()<<"led..."; } } redisAsyncCommand(ctx, getCallback, nullptr, "blpop lTest 0"); } void connectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { qDebug("Error1: %s\n", c->errstr); //redisOk = false; return; } qDebug("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { qDebug("Error2: %s\n", c->errstr); //redisOk = false; return; } qDebug("Disconnected...\n"); } RedisClient::RedisClient(QObject *parent) : QObject(parent) { redisOk = false; // conn(); } RedisClient::~RedisClient() { delete rc; delete rac; } void RedisClient::conn() { if (!redisOk) //未连接 { //同步连接 rc = redisConnect(ip.toUtf8(), port); if(rc == nullptr || rc->err) { qDebug()<< "redis err:" << rc->errstr; } else { QString authCmd = "auth " + auth; //认证 redisReply * reply = (redisReply*)redisCommand(rc, authCmd.toUtf8()); //qDebug()<< "auth:" << reply->str; if (QString::compare(reply->str, "OK")) { qDebug() << "redis connect fail."; } else { redisOk = true; qDebug() << "redis connect successfully."; } } } else { //qDebug()<< "~ "; } } void RedisClient::start() { conn(); //开启一个定时器, 每秒检测一次是否断开, 断开后自动连接redis服务器 QTimer *redisTimer = new QTimer(); connect(redisTimer, SIGNAL(timeout()), this, SLOT(conn())); redisTimer->start(1000); } bool RedisClient::hset(QString m, QString k, QString v) { //如果 field 是哈希表中的一个新建域,并且值设置成功,reply->integer为1 //如果哈希表中域 field 已经存在且旧值已被新值覆盖,reply->integer为0 //无法区别是否成功 bool ret = false; redisReply * reply = (redisReply*)redisCommand(rc, QString("hset " + m + " " + k + " " + v).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //qDebug() <<"type:" << reply->type << reply->integer; //type = 3 ret = true; } return ret; } QString RedisClient::hget(QString m, QString k) { QString ret; redisReply * reply = (redisReply*)redisCommand(rc, QString("hget " + m + " " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //type:4表示不存在,type:1表示返回的为字符串 //qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1 if (reply->len) { ret = QString(reply->str); } } return ret; } //向队列尾(右)部加入字符串数据 bool RedisClient::rpush(QString lData, QString js) { bool ret = false; redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //qDebug() <<"type:" << reply->type << reply->integer; //type = 3 //reply->integer为队列的个数,理论应该大于0 if (reply->integer) { ret = true; } } return ret; } //向队列尾(右)部加入二进制数据 bool RedisClient::rpushb(QString lData, QByteArray ba) { bool ret = false; const char *arg[3]; //3个参数(cmd, k, v) size_t vlen[3]; //3个参数长度 arg[0] = "rpush"; vlen[0] = strlen("rpush"); arg[1] = lData.toStdString().c_str(); vlen[1] = lData.size(); arg[2] = (const char *)ba.data(); vlen[2] = ba.size(); redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen); //redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //qDebug() <<"type:" << reply->type << reply->integer; //type = 3 //reply->integer为队列的个数,理论应该大于0 if (reply->integer) { ret = true; } } return ret; } QString RedisClient::lpop(QString lData) { QString ret; redisReply * reply = (redisReply*)redisCommand(rc, QString("lpop " + lData).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->len) { ret = QString(reply->str); } return ret; } //阻塞从队列头部获取最早数据 bool RedisClient::blpop(QString lData, redisCallbackFn *fn) { if (rac == nullptr) { //异步连接 rac = redisAsyncConnect(ip.toUtf8(), port); if(rac->err) { qDebug()<< "error: " << rac->errstr; redisAsyncFree(rac); } else { adapter.setContext(rac); redisAsyncSetConnectCallback(rac, connectCallback); redisAsyncSetDisconnectCallback(rac, disconnectCallback); QString authCmd = "auth " + auth; //认证 redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8()); qDebug()<< "redis ok."; redisOk = true; } } if (rac && redisOk) { redisAsyncCommand(rac, fn, nullptr, ("blpop " + lData + " 0").toStdString().c_str()); return true; } else { return false; } } //同步阻塞一定时间返回数据 QString RedisClient::blpop(QString lData, quint32 timeout) { //最小为1,防止0时永久阻塞 if (timeout < 1) { timeout = 1; } QString ret; redisReply * reply = (redisReply*)redisCommand(rc, QString("blpop " + lData + " " + QString::number(timeout)).toStdString().c_str()); // for(int i = 0; i < reply->elements; i++){ // qDebug() << "key: " << reply->element[i]->str; // } if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->elements > 1) { ret = QString(reply->element[1]->str); } return ret; } bool RedisClient::set(QString k, QString v) { bool ret = false; redisReply * reply = (redisReply*)redisCommand(rc, QString("set " + k + " " + v).toStdString().c_str()); //支持空格 if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2 if (reply->type == 6) { qDebug()<<"err:" << v; } //返回OK为成功,长度2为str长度 ret = true; } return ret; } bool RedisClient::setb(QString k, QByteArray &v) { bool ret = false; const char *arg[3]; //3个参数(cmd, k, v) size_t vlen[3]; //3个参数长度 arg[0] = "set"; vlen[0] = strlen("set"); arg[1] = k.toStdString().c_str(); vlen[1] = k.size(); arg[2] = (const char *)v.data(); vlen[2] = v.size(); redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2 if (reply->type == 6) { qDebug()<<"err:" << v; } //返回OK为成功,长度2为str长度 ret = true; } return ret; } QString RedisClient::get(QString k) { QString ret; redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->len) { ret = QString(reply->str); } return ret; } QByteArray RedisClient::getb(QString k) { QByteArray ret; redisReply * reply = (redisReply*)redisCommand(rc, QString("get " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->len) { ret = QByteArray(reply->str, reply->len); //加上长度,保证二进制安全 //qDebug()<< reply->len << ret.size(); } return ret; } bool RedisClient::expire(QString k,int sec) { bool ret = false; redisReply * reply = (redisReply*)redisCommand(rc, QString("expire " + k + " " + QString::number(sec, 10)).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //qDebug() <<"type:" << reply->type << reply->integer; //type = 3 //K不存在为0,存在为1 if (reply->integer) { ret = true; } } return ret; } bool RedisClient::publish(const QString& ch, const QString& js) { bool ret = false; QString pub = QString("publish " + ch + " " + js); redisReply * reply = (redisReply*)redisCommand(rc, pub.toStdString().c_str()); qDebug() << __FUNCTION__ << pub; if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { //qDebug() <<"type:" << reply->type << reply->integer; //type:3, integer:2 //integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可 ret = true; } return ret; } bool RedisClient::publishb(const QString& ch,const QByteArray& ba) { bool ret = false; const char *arg[3]; //3个参数(cmd, k, v) size_t vlen[3]; //3个参数长度 arg[0] = "publish"; vlen[0] = strlen("publish"); arg[1] = ch.toStdString().c_str(); vlen[1] = ch.size(); arg[2] = (const char *)ba.data(); vlen[2] = ba.size(); redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen); //redisReply * reply = (redisReply*)redisCommand(rc, QString("publish " + ch + " " + js).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { qDebug() <<"type:" << reply->type << reply->integer; //type:3, integer:2 //integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可 ret = true; } return ret; } void RedisClient::subscribe(QString ch, redisCallbackFn *fn, void* data) { if (rac == nullptr) { //异步连接 rac = redisAsyncConnect(ip.toUtf8(), port); if(rac->err) { qDebug()<< "error: " << rac->errstr; redisAsyncFree(rac); } else { adapter.setContext(rac); redisAsyncSetConnectCallback(rac, connectCallback); redisAsyncSetDisconnectCallback(rac, disconnectCallback); QString authCmd = "auth " + auth; //认证 redisAsyncCommand(rac, nullptr, data, authCmd.toUtf8()); qDebug()<< "redis ok."; redisOk = true; } } if (rac && redisOk) { redisAsyncCommand(rac, fn, data, QString("subscribe " + ch).toStdString().c_str()); } } void RedisClient::psubscribe(QString ch, redisCallbackFn *fn, void* data) { if (rac == nullptr) { //异步连接 rac = redisAsyncConnect(ip.toUtf8(), port); if(rac->err) { qDebug()<< "error: " << rac->errstr; redisAsyncFree(rac); } else { adapter.setContext(rac); redisAsyncSetConnectCallback(rac, connectCallback); redisAsyncSetDisconnectCallback(rac, disconnectCallback); QString authCmd = "auth " + auth; //认证 redisAsyncCommand(rac, nullptr, data, authCmd.toUtf8()); qDebug()<< "redis ok."; redisOk = true; } } if (rac && redisOk) { redisAsyncCommand(rac, fn, data, QString("psubscribe " + ch).toStdString().c_str()); } } void RedisClient::Setup(std::string addr, uint _port, std::string password) { ip = addr.c_str(); port = _port; auth = password.c_str(); redisOk = false; }