Commit 72744f3e by 罗志长

fix: mqtt连接

parent 94bf8942
...@@ -70,7 +70,7 @@ public class HeaderUtils { ...@@ -70,7 +70,7 @@ public class HeaderUtils {
headers.put("X-Client-Id", clientId); headers.put("X-Client-Id", clientId);
headers.put("X-Timestamp", xTimestamp); headers.put("X-Timestamp", xTimestamp);
log.info("请求头信息:{}",Hex.encodeHexString(digest.digest()) +"|"+clientId+"|"+ xTimestamp); // log.info("请求头信息:{}",Hex.encodeHexString(digest.digest()) +"|"+clientId+"|"+ xTimestamp);
return headers; return headers;
} }
......
package com.makeit.module.iot.vo; package com.makeit.module.iot.vo;
import com.google.common.collect.Lists;
import lombok.Data; import lombok.Data;
import java.util.List; import java.util.List;
...@@ -13,7 +14,7 @@ public class HeaderInfo { ...@@ -13,7 +14,7 @@ public class HeaderInfo {
private String _uid; private String _uid;
private String creatorId; private String creatorId;
private String traceparent; private String traceparent;
private List<Bind> bindings; private List<Bind> bindings = Lists.newArrayList();
@Data @Data
......
...@@ -47,6 +47,7 @@ public class MqttPushClient { ...@@ -47,6 +47,7 @@ public class MqttPushClient {
options.setPassword(password.toCharArray()); options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout); options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive); options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true);
MqttPushClient.setMqttClient(client); MqttPushClient.setMqttClient(client);
if (!client.isConnected()) { if (!client.isConnected()) {
try { try {
......
...@@ -98,41 +98,46 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -98,41 +98,46 @@ public class PushCallback implements MqttCallbackExtended {
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
logger.info("连接断开,可以重连"); logger.info("连接断开,可以重连", cause);
if (client == null || !client.isConnected()) { int times = 1;
mqttConfig.getMqttPushClient(); while (client == null || !client.isConnected()) {
if (times > 5) {
logger.info("已达最大次数");
break;
}
logger.info("重新连接, 第" + (times++) + "次");
MqttPushClient mqttPushClient = mqttConfig.getMqttPushClient();
if (mqttPushClient != null && mqttPushClient.getMqttClient() != null && mqttPushClient.getMqttClient().isConnected()) {
logger.info("重连成功");
break;
}
logger.info("重连失败");
// 每隔10秒重试一次
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
// int times = 1;
// while (client == null || !client.isConnected()) {
// logger.info("重新连接, 第" + (times++) + "次");
// MqttPushClient mqttPushClient = mqttConfig.getMqttPushClient();
// if (mqttPushClient != null && mqttPushClient.getMqttClient() != null && mqttPushClient.getMqttClient().isConnected()) {
// logger.info("重连成功");
// break;
// }
// logger.info("重连失败");
// // 每隔10秒重试一次
// try {
// TimeUnit.SECONDS.sleep(10);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
} }
@Override @Override
public void messageArrived(String topic, MqttMessage message) throws Exception { public void messageArrived(String topic, MqttMessage message) {
// 收到消息并设置返回字符串格式 try {
String payload = new String(message.getPayload(), "UTF-8"); // 收到消息并设置返回字符串格式
String payload = new String(message.getPayload(), "UTF-8");
//logger.info("接收消息内容:payload格式:{}", payload); //logger.info("接收消息内容:payload格式:{}", payload);
// 解析数据 // 解析数据
DeviceInfo device = JSON.parseObject(payload, DeviceInfo.class); DeviceInfo device = JSON.parseObject(payload, DeviceInfo.class);
syncProperties(device); syncProperties(device);
checkAlarm(device); checkAlarm(device);
} catch (Exception e) {
logger.error("消息接收处理异常", e);
}
} }
@Override @Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment