#include "redis.h" #include #include // void Test() //{ // struct st // { // int a; // int b; // char c; // int d; // }; // st t; // t.a = 1; // t.b = 2; // t.c = 'a'; // t.d = 4; // const char *v[3]; // size_t vlen[3]; // v[0] = "set"; // vlen[0] = strlen("set"); // v[1] = "st"; // vlen[1] = strlen("st"); // v[2] = (const char *)&t; // vlen[2] = sizeof(struct st); // redisReply *r = (redisReply *)redisCommandArgv(rc, sizeof(v) / sizeof(v[0]), v, vlen); // r = (redisReply *)redisCommand(rc, "get st"); // if (!r) { // printf("empty reply\n"); // exit(1); // } // st *t2 = (st*)(r->str); // qDebug()<< "st2" << t2->a << t2->b << t2->c << t2->d; //} void getCallback(redisAsyncContext *ctx, void *r, void *privdata) { qDebug() << "getCallback called"; redisReply *reply = static_cast(r); if (reply == nullptr) { qDebug() << "The reply is nullptr"; return; } for (int i = 0; i < reply->elements; i++) { qDebug() << "key: " << reply->element[i]->str; } if (QString(reply->element[0]->str) == "message") // 收到订阅消息 { QString ch = QString(reply->element[1]->str); if (ch == "DevCh") { qDebug() << "devch..."; // qDebug()<<"img:" << ++cnt; // QString js = QString(reply->element[2]->str); // QJsonObject jsonObj = QJsonDocument::fromJson(js.toLocal8Bit()).object(); //转成本地编码(utf-8), 中文正常 // //quint32 type = jsonObj.value("type").toInt(); //toInt() 会丢数据 // //qDebug()<<"type:" << type; // //QString base64Str = QString(reply->element[2]->str); // QString base64Str = jsonObj["img"].toString(); // //qDebug()<<"str size:" << base64Str.size(); // QPixmap baseimage; // baseimage.loadFromData(QByteArray::fromBase64(base64Str.toLatin1())); // QPixmap scaledPixmap = baseimage.scaled(500, 500, Qt::KeepAspectRatio); //设置图片大小并设置为按比例缩放 // lab->setPixmap(scaledPixmap); //label放置图片并显示 } else if (ch == "Alarm") { qDebug() << "alarm..."; } else if (ch == "Led") { qDebug() << "led..."; } } redisAsyncCommand(ctx, getCallback, nullptr, "blpop lTest 0"); } void connectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { qDebug("Error1: %s\n", c->errstr); // redisOk = false; return; } qDebug("Connected...\n"); } void disconnectCallback(const redisAsyncContext *c, int status) { if (status != REDIS_OK) { qDebug("Error2: %s\n", c->errstr); // redisOk = false; return; } qDebug("Disconnected...\n"); } EventSubInterface* g_pSubCB; // 订阅回调 void RedisCB(redisAsyncContext *ctx, void *r, void *privdata) { redisReply *reply = static_cast(r); if( reply->elements < 3) { return; } std::string type = reply->element[0]->str; if(strcasecmp(type.c_str(), "message") != 0) { return; } std::string chanel = reply->element[1]->str; std::string msg = reply->element[2]->str; if( g_pSubCB != nullptr ) { g_pSubCB->SubCB((char*)chanel.c_str(), (char*)msg.c_str()); } } Redis::Redis(QObject *parent) : QObject(parent) { redisOk = false; g_pSubCB = nullptr; // conn(); } Redis::~Redis() { delete rc; delete rac; } void Redis::conn() { if (!redisOk) // 未连接 { // 同步连接 rc = redisConnect(ip.toUtf8(), port); if (rc == nullptr || rc->err) { qDebug() << "redis err:" << rc->errstr; } else { QString authCmd = "auth " + auth; // 认证 redisReply *reply = (redisReply *)redisCommand(rc, authCmd.toUtf8()); // qDebug()<< "auth:" << reply->str; if (QString::compare(reply->str, "OK")) { qDebug() << "redis connect fail."; } else { redisOk = true; qDebug() << "redis connect successfully."; } } } else { // qDebug()<< "~ "; } } void Redis::start() { conn(); //开启一个定时器, 每秒检测一次是否断开, 断开后自动连接redis服务器 QTimer *redisTimer = new QTimer(); connect(redisTimer, SIGNAL(timeout()), this, SLOT(conn())); redisTimer->start(1000); } bool Redis::hset(QString k, QString f, QString v) { // 如果 field 是哈希表中的一个新建域,并且值设置成功,reply->integer为1 // 如果哈希表中域 field 已经存在且旧值已被新值覆盖,reply->integer为0 // 无法区别是否成功 bool ret = false; redisReply *reply = (redisReply *)redisCommand(rc, "hset %s %s %s", k.toStdString().c_str(), f.toStdString().c_str(), v.toStdString().c_str()); // 支持空格 if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // qDebug() <<"type:" << reply->type << reply->integer; //type = 3 ret = true; } return ret; } QString Redis::hget(QString k, QString f) { QString ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("hget " + k + " " + f).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // type:4表示不存在,type:1表示返回的为字符串 // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1 if (reply->len) { ret = QString(reply->str); } } return ret; } QStringList Redis::hkeys(QString k) { QStringList ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("hkeys " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->type == REDIS_REPLY_ARRAY) { // type:4表示不存在,type:1表示返回的为字符串 // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1 for (size_t i = 0; i < reply->elements; i++) { // qDebug() << "key: " << i << reply->element[i]->str; ret.append(QString(reply->element[i]->str)); } } return ret; } QStringList Redis::hvals(QString k) { QStringList ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("hvals " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->type == REDIS_REPLY_ARRAY) { // type:4表示不存在,type:1表示返回的为字符串 // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1 for (size_t i = 0; i < reply->elements; i++) { // qDebug() << "key: " << i << reply->element[i]->str; ret.append(QString(reply->element[i]->str)); } } return ret; } QHash Redis::hgetall(QString k) { QHash ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("hgetall " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->type == REDIS_REPLY_ARRAY) { // type:4表示不存在,type:1表示返回的为字符串 // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:1 for (size_t i = 0; i < reply->elements; i += 2) { // qDebug() << "key: " << i << reply->element[i]->str; ret.insert(QString(reply->element[i]->str), QString(reply->element[i + 1]->str)); } } return ret; } // 向队列尾(右)部加入字符串数据 bool Redis::rpush(QString lData, QString s) { bool ret = false; // redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str()); redisReply *reply = (redisReply *)redisCommand(rc, "rpush %s %s", lData.toStdString().c_str(), s.toStdString().c_str()); // 支持空格 if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // qDebug() <<"type:" << reply->type << reply->integer; //type = 3 // reply->integer为队列的个数,理论应该大于0 if (reply->integer) { ret = true; } } return ret; } // 向队列尾(右)部加入二进制数据 bool Redis::rpushb(QString lData, QByteArray ba) { bool ret = false; const char *arg[3]; // 3个参数(cmd, k, v) size_t vlen[3]; // 3个参数长度 arg[0] = "rpush"; vlen[0] = strlen("rpush"); arg[1] = lData.toStdString().c_str(); vlen[1] = lData.size(); arg[2] = (const char *)ba.data(); vlen[2] = ba.size(); redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen); // redisReply * reply = (redisReply*)redisCommand(rc, QString("rpush " + lData + " " + js).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // qDebug() <<"type:" << reply->type << reply->integer; //type = 3 // reply->integer为队列的个数,理论应该大于0 if (reply->integer) { ret = true; } } return ret; } QString Redis::lpop(QString lData) { QString ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("lpop " + lData).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->len) { ret = QString(reply->str); } return ret; } // 阻塞从队列头部获取最早数据 bool Redis::blpop(QString lData, redisCallbackFn *fn) { if (rac == nullptr) { // 异步连接 rac = redisAsyncConnect(ip.toUtf8(), port); if (rac->err) { qDebug() << "error: " << rac->errstr; redisAsyncFree(rac); } else { adapter.setContext(rac); redisAsyncSetConnectCallback(rac, connectCallback); redisAsyncSetDisconnectCallback(rac, disconnectCallback); QString authCmd = "auth " + auth; // 认证 redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8()); qDebug() << "redis ok."; redisOk = true; } } if (rac && redisOk) { redisAsyncCommand(rac, fn, nullptr, ("blpop " + lData + " 0").toStdString().c_str()); return true; } else { return false; } } // 同步阻塞一定时间返回数据 QString Redis::blpop(QString lData, quint32 timeout) { // 最小为1,防止0时永久阻塞 if (timeout < 1) { timeout = 1; } QString ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("blpop " + lData + " " + QString::number(timeout)).toStdString().c_str()); // for(int i = 0; i < reply->elements; i++){ // qDebug() << "key: " << reply->element[i]->str; // } if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->elements > 1) { ret = QString(reply->element[1]->str); } return ret; } bool Redis::set(QString k, QString v) { bool ret = false; // redisReply * reply = (redisReply*)redisCommand(rc, QString("set " + k + " " + v).toStdString().c_str()); //不支持空格 redisReply *reply = (redisReply *)redisCommand(rc, "set %s %s", k.toStdString().c_str(), v.toStdString().c_str()); // 支持空格 if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2 if (reply->type == 6) { qDebug() << "err:" << v; } // 返回OK为成功,长度2为str长度 ret = true; } return ret; } bool Redis::setb(QString k, QByteArray &v) { bool ret = false; const char *arg[3]; // 3个参数(cmd, k, v) size_t vlen[3]; // 3个参数长度 arg[0] = "set"; vlen[0] = strlen("set"); arg[1] = k.toStdString().c_str(); vlen[1] = k.size(); arg[2] = (const char *)v.data(); vlen[2] = v.size(); redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // qDebug() <<"type:" << reply->type << reply->str << reply->len; //type:5 str:OK len:2 if (reply->type == 6) { qDebug() << "err:" << v; } // 返回OK为成功,长度2为str长度 ret = true; } return ret; } QString Redis::get(QString k) { QString ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("get " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->len) { ret = QString(reply->str); } return ret; } QByteArray Redis::getb(QString k) { QByteArray ret; redisReply *reply = (redisReply *)redisCommand(rc, QString("get " + k).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else if (reply->len) { ret = QByteArray(reply->str, reply->len); // 加上长度,保证二进制安全 // qDebug()<< reply->len << ret.size(); } return ret; } bool Redis::expire(QString k, int sec) { bool ret = false; redisReply *reply = (redisReply *)redisCommand(rc, QString("expire " + k + " " + QString::number(sec, 10)).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // qDebug() <<"type:" << reply->type << reply->integer; //type = 3 // K不存在为0,存在为1 if (reply->integer) { ret = true; } } return ret; } bool Redis::publish(QString ch, QString s) { bool ret = false; // redisReply * reply = (redisReply*)redisCommand(rc, QString("publish " + ch + " " + js).toStdString().c_str()); redisReply *reply = (redisReply *)redisCommand(rc, "publish %s %s", ch.toStdString().c_str(), s.toStdString().c_str()); // 支持空格 if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { // qDebug() <<"type:" << reply->type << reply->integer; //type:3, integer:2 // integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可 ret = true; } return ret; } bool Redis::publishb(QString ch, QByteArray ba) { bool ret = false; const char *arg[3]; // 3个参数(cmd, k, v) size_t vlen[3]; // 3个参数长度 arg[0] = "publish"; vlen[0] = strlen("publish"); arg[1] = ch.toStdString().c_str(); vlen[1] = ch.size(); arg[2] = (const char *)ba.data(); vlen[2] = ba.size(); redisReply *reply = (redisReply *)redisCommandArgv(rc, sizeof(arg) / sizeof(arg[0]), arg, vlen); // redisReply * reply = (redisReply*)redisCommand(rc, QString("publish " + ch + " " + js).toStdString().c_str()); if (rc->err) { redisOk = false; qDebug() << rc->errstr; } else { qDebug() << "type:" << reply->type << reply->integer; // type:3, integer:2 // integer为2,不懂什么意思,这个用于实时,无需判断是否成功,连接正常即可 ret = true; } return ret; } void Redis::subscribe(QString ch, EventSubInterface *fn) { g_pSubCB = fn; if (rac == nullptr) { // 异步连接 rac = redisAsyncConnect(ip.toUtf8(), port); if (rac->err) { qDebug() << "error: " << rac->errstr; redisAsyncFree(rac); } else { adapter.setContext(rac); redisAsyncSetConnectCallback(rac, connectCallback); redisAsyncSetDisconnectCallback(rac, disconnectCallback); QString authCmd = "auth " + auth; // 认证 redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8()); qDebug() << "redis ok."; redisOk = true; } } if (rac && redisOk) { redisAsyncCommand(rac, RedisCB, nullptr, QString("subscribe " + ch).toStdString().c_str()); } } void Redis::psubscribe(QString ch, EventSubInterface *fn) { g_pSubCB = fn; if (rac == nullptr) { // 异步连接 rac = redisAsyncConnect(ip.toUtf8(), port); if (rac->err) { qDebug() << "error: " << rac->errstr; redisAsyncFree(rac); } else { adapter.setContext(rac); redisAsyncSetConnectCallback(rac, connectCallback); redisAsyncSetDisconnectCallback(rac, disconnectCallback); QString authCmd = "auth " + auth; // 认证 redisAsyncCommand(rac, nullptr, nullptr, authCmd.toUtf8()); qDebug() << "redis ok."; redisOk = true; } } if (rac && redisOk) { redisAsyncCommand(rac, RedisCB, nullptr, QString("psubscribe " + ch).toStdString().c_str()); } } void Redis::Setup(tagSetup ts) { ip = ts.addr.c_str(); port = ts.port; auth = ts.password.c_str(); redisOk = false; }