#include "MQTTClient.h" #include "qmqtt.h" MQTTClient::MQTTClient(QObject *parent) : QObject(parent) { } void MQTTClient::connect2Host(const QString &host, const uint16_t port, const QString &usr, const QString &passwd, const QString &clientID) { mqtt = new QMQTT::Client(); connect(mqtt, &QMQTT::Client::connected, []{ qDebug()<< __FILE__ << __LINE__ << "connected"; }); connect(mqtt, &QMQTT::Client::disconnected, []{ qDebug()<< __FILE__ << __LINE__ << "disconnected"; } ); connect(mqtt, &QMQTT::Client::received, this, &MQTTClient::onReceived); //mqtt->setHostName("y.kjxry.cn"); mqtt->setHostName(host); mqtt->setPort(port); mqtt->setKeepAlive(60); mqtt->setClientId(clientID); //唯一id, 相同id不能同时连接 mqtt->setUsername(usr); mqtt->setPassword(passwd.toUtf8()); mqtt->setAutoReconnect(true); //开启自动重连 mqtt->setCleanSession(true); //非持久化连接,上线时,将不再关心之前所有的订阅关系以及离线消息 mqtt->setVersion(QMQTT::V3_1_1); qDebug()<< "ver" << mqtt->version(); mqtt->connectToHost(); } MQTTClient::~MQTTClient() { mqtt->disconnected(); mqtt->deleteLater(); } //接收到的消息 void MQTTClient::onReceived(const QMQTT::Message& message) { //qDebug() << "recv" << message.topic() << message.payload(); MQTTCallbackFn cbFunc = hCbFuncs.value(message.topic()); if (cbFunc) { cbFunc(message.topic(), message.payload()); } else if (shareCbFunc) { shareCbFunc(message.topic(), message.payload()); } } void MQTTClient::subscribe(const QString& topic, MQTTCallbackFn& func) { //qDebug()<< "sub" << topic; if (topic.contains("+") || topic.contains("#") || topic.contains("*")) { shareCbFunc = func; //共享回调 } else { hCbFuncs.insert(topic, func); //独立回调 } mqtt->subscribe(topic, 1); //不同的订阅方式 //mqtt->subscribe("alarm/led", 1); //mqtt->subscribe("+/led", 1); //mqtt->subscribe("alarm/#", 1); } void MQTTClient::publish(const QString &topic, const QByteArray &payload) { //QMQTT::Message message(i, EXAMPLE_TOPIC, QString("Number %1").arg(i).toUtf8()); QMQTT::Message message; message.setTopic(topic); message.setPayload(payload); message.setRetain(true); //保留最后一条数据 mqtt->publish(message); }