Commit d391d917 by 罗志长

fix: mqtt订阅调整

parent 78262ce0
package com.makeit.mqtt; package com.makeit.mqtt;
import com.makeit.module.iot.service.IotTokenService; import cn.hutool.core.util.IdUtil;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.boot.context.properties.ConfigurationProperties;
...@@ -18,8 +18,6 @@ public class MqttConfig { ...@@ -18,8 +18,6 @@ public class MqttConfig {
@Autowired @Autowired
private MqttPushClient mqttPushClient; private MqttPushClient mqttPushClient;
@Autowired @Autowired
private IotTokenService iotTokenService;
@Autowired
private StringRedisTemplate redisTemplate; private StringRedisTemplate redisTemplate;
public static final String DEVICE_BR_ANALYSIS = "device:brhr:analysis:"; public static final String DEVICE_BR_ANALYSIS = "device:brhr:analysis:";
...@@ -56,7 +54,7 @@ public class MqttConfig { ...@@ -56,7 +54,7 @@ public class MqttConfig {
if (!msgSwitch) { if (!msgSwitch) {
return null; return null;
} }
clientId = StringUtils.isNotEmpty(clientId) ? clientId : iotTokenService.getIotToken(); clientId = StringUtils.isNotEmpty(clientId) ? clientId : IdUtil.fastUUID();
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive); mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
// 订阅主题 // 订阅主题
mqttPushClient.subscribe(defaultTopic, 0); mqttPushClient.subscribe(defaultTopic, 0);
......
...@@ -36,13 +36,14 @@ import org.slf4j.Logger; ...@@ -36,13 +36,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
...@@ -96,6 +97,8 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -96,6 +97,8 @@ public class PushCallback implements MqttCallbackExtended {
private OpenApiBaseStrategyFactory openApiBaseStrategyFactory; private OpenApiBaseStrategyFactory openApiBaseStrategyFactory;
@Autowired @Autowired
private PlatTenantService platTenantService; private PlatTenantService platTenantService;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
...@@ -103,7 +106,6 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -103,7 +106,6 @@ public class PushCallback implements MqttCallbackExtended {
} }
@Override @Override
@Async
public void messageArrived(String topic, MqttMessage message) { public void messageArrived(String topic, MqttMessage message) {
try { try {
// 收到消息并设置返回字符串格式 // 收到消息并设置返回字符串格式
...@@ -120,9 +122,11 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -120,9 +122,11 @@ public class PushCallback implements MqttCallbackExtended {
return; return;
} }
syncProperties(device); CompletableFuture.runAsync(() -> {
checkAlarm(device);
}, taskExecutor);
checkAlarm(device); syncProperties(device);
} catch (Exception e) { } catch (Exception e) {
logger.error("消息接收处理异常", e); logger.error("消息接收处理异常", e);
} }
...@@ -167,8 +171,10 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -167,8 +171,10 @@ public class PushCallback implements MqttCallbackExtended {
if (!"0".equals(success)) { if (!"0".equals(success)) {
return; return;
} }
platDeviceService.syncIotProperties(device.getDeviceId(), properties); CompletableFuture.runAsync(() -> {
RedisUtil.delete(redisKey); platDeviceService.syncIotProperties(device.getDeviceId(), properties);
RedisUtil.delete(redisKey);
}, taskExecutor);
} }
} }
......
...@@ -120,12 +120,12 @@ iot: ...@@ -120,12 +120,12 @@ iot:
mqtt: mqtt:
username: admin|1693982115969 username: admin|1693982115969
password: 8e3795ef7b5e95869fa8c323b865b3a9 password: 8e3795ef7b5e95869fa8c323b865b3a9
hostUrl: tcp://124.71.33.17:11883 hostUrl: tcp://124.71.33.17:3883
clientId: clientId:
defaultTopic: /device/*/*/** defaultTopic: $share/ks//push_data
timeout: 10 timeout: 10
keepalive: 60 keepalive: 60
msgSwitch: false msgSwitch: true
wx: wx:
miniapp: miniapp:
......
...@@ -118,9 +118,9 @@ iot: ...@@ -118,9 +118,9 @@ iot:
mqtt: mqtt:
username: admin|1693982115969 username: admin|1693982115969
password: 8e3795ef7b5e95869fa8c323b865b3a9 password: 8e3795ef7b5e95869fa8c323b865b3a9
hostUrl: tcp://124.71.33.17:11883 hostUrl: tcp://124.71.33.17:3883
clientId: clientId:
defaultTopic: /device/*/*/** defaultTopic: $share/ks//push_data
timeout: 10 timeout: 10
keepalive: 60 keepalive: 60
msgSwitch: true msgSwitch: true
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment