Commit 95e46f36 by 罗志长

fix: mqtt断线重连

parent f16fa585
......@@ -42,18 +42,21 @@ public class MqttPushClient {
try {
client = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setCleanSession(false);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true);
MqttPushClient.setMqttClient(client);
if (!client.isConnected()) {
try {
client.setCallback(pushCallback);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
......
......@@ -30,10 +30,7 @@ import com.makeit.utils.DeviceCacheUtil;
import com.makeit.utils.old.StringUtils;
import com.makeit.utils.redis.RedisUtil;
import org.apache.commons.collections4.CollectionUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
......@@ -50,7 +47,7 @@ import java.util.concurrent.TimeUnit;
@Component
public class PushCallback implements MqttCallback {
public class PushCallback implements MqttCallbackExtended {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
public static final String DEVICE_BR_DATA = "device:brhr:data:";
......@@ -102,8 +99,21 @@ public class PushCallback implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开,可以重连");
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
int times = 1;
while (client == null || !client.isConnected()) {
logger.info("重新连接, 第" + (times++) + "次");
MqttPushClient mqttPushClient = mqttConfig.getMqttPushClient();
if (mqttPushClient != null && mqttPushClient.getMqttClient() != null && mqttPushClient.getMqttClient().isConnected()) {
logger.info("重连成功");
break;
}
logger.info("重连失败");
// 每隔10秒重试一次
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
......@@ -127,6 +137,11 @@ public class PushCallback implements MqttCallback {
logger.info("deliveryComplete--------------" + token.isComplete());
}
@Override
public void connectComplete(boolean b, String s) {
logger.info("连接成功,连接方式:{}", b ? "重连" : "直连");
}
/**
* IOT端设备属性修改后同步
* @param device
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment