Commit adf4f15f by 罗志长

feat: 心跳上报

parent c408f8e6
...@@ -111,6 +111,8 @@ public class RedisConst { ...@@ -111,6 +111,8 @@ public class RedisConst {
public static final String PLAT_IOT_DEVICE_FAULT_STATE_PREFIX = "plat:iot:device:faultState:"; public static final String PLAT_IOT_DEVICE_FAULT_STATE_PREFIX = "plat:iot:device:faultState:";
public static final String PLAT_IOT_DEVICE_HEARTBEAT_PREFIX = "plat:iot:device:heartbeat:";
public static final String RTC_EVENT_CALLBACK_PREFIX = "rtc:event:callback:"; public static final String RTC_EVENT_CALLBACK_PREFIX = "rtc:event:callback:";
public static final String RTM_RESIDENT_USER_TOKEN_PREFIX = "rtm:resident:user:"; public static final String RTM_RESIDENT_USER_TOKEN_PREFIX = "rtm:resident:user:";
......
...@@ -19,6 +19,7 @@ import com.makeit.external.strategy.OpenApiBaseStrategy; ...@@ -19,6 +19,7 @@ import com.makeit.external.strategy.OpenApiBaseStrategy;
import com.makeit.external.strategy.OpenApiBaseStrategyFactory; import com.makeit.external.strategy.OpenApiBaseStrategyFactory;
import com.makeit.global.aspect.tenant.TenantIdIgnore; import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.module.iot.enums.DeviceState; import com.makeit.module.iot.enums.DeviceState;
import com.makeit.module.iot.service.IotDevicePropertiesOperateService;
import com.makeit.module.iot.vo.DeviceInfo; import com.makeit.module.iot.vo.DeviceInfo;
import com.makeit.module.iot.vo.HeaderInfo; import com.makeit.module.iot.vo.HeaderInfo;
import com.makeit.service.platform.alarm.PlatAlarmConfigService; import com.makeit.service.platform.alarm.PlatAlarmConfigService;
...@@ -41,6 +42,7 @@ import org.springframework.stereotype.Component; ...@@ -41,6 +42,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -69,6 +71,7 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -69,6 +71,7 @@ public class PushCallback implements MqttCallbackExtended {
public static final String WRITE_PROPERTY = "WRITE_PROPERTY"; public static final String WRITE_PROPERTY = "WRITE_PROPERTY";
public static final String WRITE_PROPERTY_REPLY = "WRITE_PROPERTY_REPLY"; public static final String WRITE_PROPERTY_REPLY = "WRITE_PROPERTY_REPLY";
public static final String HEARTBEAT = "LOG";
public static final String OPEN_API_KEY_COUNT = "open:api:key:count"; public static final String OPEN_API_KEY_COUNT = "open:api:key:count";
...@@ -98,6 +101,8 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -98,6 +101,8 @@ public class PushCallback implements MqttCallbackExtended {
@Autowired @Autowired
private PlatTenantService platTenantService; private PlatTenantService platTenantService;
@Autowired @Autowired
private IotDevicePropertiesOperateService devicePropertiesOperateService;
@Autowired
private ThreadPoolTaskExecutor taskExecutor; private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
...@@ -109,8 +114,7 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -109,8 +114,7 @@ public class PushCallback implements MqttCallbackExtended {
public void messageArrived(String topic, MqttMessage message) { public void messageArrived(String topic, MqttMessage message) {
try { try {
// 收到消息并设置返回字符串格式 // 收到消息并设置返回字符串格式
String payload = new String(message.getPayload(), "UTF-8"); String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
//logger.info("接收消息内容:payload格式:{}", payload); //logger.info("接收消息内容:payload格式:{}", payload);
// 解析数据 // 解析数据
...@@ -124,6 +128,8 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -124,6 +128,8 @@ public class PushCallback implements MqttCallbackExtended {
handleStatus(device); handleStatus(device);
heartbeat(device);
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
checkAlarm(device); checkAlarm(device);
}, taskExecutor); }, taskExecutor);
...@@ -288,6 +294,26 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -288,6 +294,26 @@ public class PushCallback implements MqttCallbackExtended {
} }
/** /**
* 心跳上报
*/
private void heartbeat(DeviceInfo device) {
CompletableFuture.runAsync(() -> {
if (!HEARTBEAT.equals(device.getMessageType())) {
return;
}
String deviceId = device.getDeviceId();
String key = RedisConst.PLAT_IOT_DEVICE_HEARTBEAT_PREFIX + deviceId;
if (RedisUtil.exist(key)) {
return;
}
// 正常应该一分钟上报一次,避免频繁读取设备属性造成压力
RedisUtil.set(key, 1, 55, TimeUnit.SECONDS);
List<String> properties = Lists.newArrayList("sysNetworkType", "sysRssi", "sysCsq");
devicePropertiesOperateService.deviceRead(deviceId, properties);
}, taskExecutor);
}
/**
* 离床预警 * 离床预警
* @param platDevice * @param platDevice
* @param properties * @param properties
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment