Commit ad14d110 by 罗志长

Merge branch 'dev'

parents 8359d9a6 b7912ad0
Showing with 628 additions and 39 deletions
...@@ -7,16 +7,19 @@ ...@@ -7,16 +7,19 @@
- MySql 5.7+ (初始化脚本db/init/init.sql) - MySql 5.7+ (初始化脚本db/init/init.sql)
- ElasticSearch 6.8.11 - ElasticSearch 6.8.11
- Kibana 6.8.11 (可选) - Kibana 6.8.11 (可选)
- GLIBC 2.18(声网RTM SDK需要),使用strings /lib64/libc.so.6 | grep -i '^glibc'命令查看
## 项目打包 ## 项目打包
- 拉取源码http://git.xmmakeit.com/huangjiay/iot-platform-server.git - 拉取源码http://git.xmmakeit.com/huangjiay/iot-platform-server.git
- 切换分支(master生产环境,dev测试环境) - 切换分支(master生产环境,dev测试环境)
- 首次请将lib/agora_rtm.jar依赖安装到本地maven仓库,mvn install:install-file -Dfile=lib/agora_rtm.jar -DgroupId=io.agora.rtm -DartifactId=agora-rtm-sdk -Dversion=1.0 -Dpackaging=jar
- 在项目根目录下执行mvn clean package进行打包 - 在项目根目录下执行mvn clean package进行打包
## 部署 ## 部署
- 在服务器上创建目录/opt/iot-platform-server - 在服务器上创建目录/opt/iot-platform-server
- 创建lib目录,将lib\libagora_rtm_sdk.so上传至/opt/iot-platform-serve/lib目录下
- 将jar包上传至服务器/opt/iot-platform-server目录下 - 将jar包上传至服务器/opt/iot-platform-server目录下
- 正式环境在/opt/iot-platform-server/config目录下新建application.yaml文件,请注意修改对应配置信息!!! - 正式环境在/opt/iot-platform-server/config目录下新建application.yaml文件,请注意修改对应配置信息!!!
- application.yaml - application.yaml
...@@ -142,12 +145,13 @@ ...@@ -142,12 +145,13 @@
appId: 883078934ecd4193aa7a62a3cdacd810 appId: 883078934ecd4193aa7a62a3cdacd810
appCertificate: b29be69c9c034120a68f1d5c199d2e74 appCertificate: b29be69c9c034120a68f1d5c199d2e74
channelName: 1 channelName: 1
uid: 0 uid: 10010 #RTM用户id唯一,不同应用请自行修改
tokenExpirationInSeconds: 3600 tokenExpirationInSeconds: 3600
privilegeExpirationInSeconds: 3600 privilegeExpirationInSeconds: 3600
customerKey: b3b5f44e536a4fc191358926c6716b7b customerKey: b3b5f44e536a4fc191358926c6716b7b
customerSecret: bd81828a133140a58dfb04e9d80eba43 customerSecret: bd81828a133140a58dfb04e9d80eba43
pid: C567D6F40C6E48E293AA4F9B7AE9BA30 pid: C567D6F40C6E48E293AA4F9B7AE9BA30
callbackSecret: bv5SbvvhJ
aliyun: aliyun:
oss: oss:
...@@ -169,12 +173,12 @@ ...@@ -169,12 +173,12 @@
- 测试环境 - 测试环境
```sh ```sh
nohup java -jar server-web.jar --spring.profiles.active=test & nohup java -Djava.library.path=lib/ -jar server-web.jar --spring.profiles.active=test &
``` ```
- 正式环境 - 正式环境
```sh ```sh
nohup java -jar server-web.jar & nohup java -Djava.library.path=lib/ -jar server-web.jar &
``` ```
No preview for this file type
The file could not be displayed because it is too large.
...@@ -302,6 +302,12 @@ ...@@ -302,6 +302,12 @@
<version>${elasticsearch.version}</version> <version>${elasticsearch.version}</version>
</dependency> </dependency>
<dependency>
<groupId>io.agora.rtm</groupId>
<artifactId>agora-rtm-sdk</artifactId>
<version>1.0</version>
</dependency>
</dependencies> </dependencies>
</dependencyManagement> </dependencyManagement>
......
...@@ -189,6 +189,11 @@ ...@@ -189,6 +189,11 @@
<artifactId>elasticsearch-rest-high-level-client</artifactId> <artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.agora.rtm</groupId>
<artifactId>agora-rtm-sdk</artifactId>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
......
package com.makeit.config;
import org.springframework.context.annotation.Condition;
import org.springframework.context.annotation.ConditionContext;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotatedTypeMetadata;
public class LinuxCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Environment environment = context.getEnvironment();
String osName = environment.getProperty("os.name");
return osName != null && osName.startsWith("Linux");
}
}
...@@ -19,6 +19,7 @@ public class ShengwangProperties { ...@@ -19,6 +19,7 @@ public class ShengwangProperties {
private String customerSecret; private String customerSecret;
private int tokenExpirationInSeconds; private int tokenExpirationInSeconds;
private int privilegeExpirationInSeconds; private int privilegeExpirationInSeconds;
private String callbackSecret;
} }
...@@ -111,7 +111,13 @@ public class RedisConst { ...@@ -111,7 +111,13 @@ public class RedisConst {
public static final String PLAT_IOT_DEVICE_FAULT_STATE_PREFIX = "plat:iot:device:faultState:"; public static final String PLAT_IOT_DEVICE_FAULT_STATE_PREFIX = "plat:iot:device:faultState:";
public static final String PLAT_IOT_DEVICE_HEARTBEAT_PREFIX = "plat:iot:device:heartbeat:";
public static final String RTC_EVENT_CALLBACK_PREFIX = "rtc:event:callback:";
public static final String RTM_RESIDENT_USER_TOKEN_PREFIX = "rtm:resident:user:";
public static final String RTM_RESIDENT_USER_LOGIN_STATUS_PREFIX = "rtm:resident:user:login:";
} }
......
...@@ -256,17 +256,21 @@ public class IotProductDeviceService extends IotCommonService { ...@@ -256,17 +256,21 @@ public class IotProductDeviceService extends IotCommonService {
for (DeviceInfoContentBreathe deviceInfoContentBreathe : deviceInfoContentBreathes) { for (DeviceInfoContentBreathe deviceInfoContentBreathe : deviceInfoContentBreathes) {
timestamp = deviceInfoContentBreathe.getTimestamp(); timestamp = deviceInfoContentBreathe.getTimestamp();
breatheProperties = deviceInfoContentBreathe.getProperties(); breatheProperties = deviceInfoContentBreathe.getProperties();
if (breatheProperties == null || breatheProperties.getPerson() == null
|| breatheProperties.getBr() == null || breatheProperties.getHr() == null) {
continue;
}
if (breatheProperties.getPerson() == 0) { if (breatheProperties.getPerson() == 0) {
noPersonCount++; noPersonCount++;
continue; continue;
} }
// 0和255直接跳过 // 0和255直接跳过
Integer propertiesHr = breatheProperties.getHr() != null ? breatheProperties.getHr() : 0; Integer propertiesHr = breatheProperties.getHr();
if (propertiesHr == 255 || propertiesHr == 0 ) { if (propertiesHr == 255 || propertiesHr == 0 ) {
noPersonCount++; noPersonCount++;
continue; continue;
} }
Integer propertiesBr = breatheProperties.getBr() != null ? breatheProperties.getBr() : 0; Integer propertiesBr = breatheProperties.getBr();
if (propertiesBr == 255 || propertiesBr == 0) { if (propertiesBr == 255 || propertiesBr == 0) {
noPersonCount++; noPersonCount++;
continue; continue;
......
package com.makeit.shengwang.agora.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
@Data
public class RTMSendPeerMessageDTO {
@NotBlank
@ApiModelProperty(value = "设备id")
private String deviceId;
@NotBlank
@ApiModelProperty(value = "RTC token")
private String rtcToken;
@NotBlank
@ApiModelProperty(value = "频道")
private String channel;
}
package com.makeit.shengwang.agora.dto;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@Data
public class WebhookDTO {
@NotBlank
@ApiModelProperty(value = "通知 ID,标识来自声网业务服务器的一次事件通知")
private String noticeId;
@NotNull
@ApiModelProperty(value = "业务 ID。值为 1 表示实时通信业务")
private Integer productId;
@NotNull
@ApiModelProperty(value = "通知的事件类型")
private Integer eventType;
@NotNull
@ApiModelProperty(value = "声网消息服务器向你的服务器发送事件通知的 Unix 时间戳 (ms)。通知重试时该值会更新")
private Long notifyMs;
@ApiModelProperty(value = "会话 ID")
private String sid;
@NotNull
@ApiModelProperty(value = "通知事件的具体内容。payload 因 eventType 而异")
private JSONObject payload;
}
...@@ -25,7 +25,7 @@ public class ShengwangHttpUtil { ...@@ -25,7 +25,7 @@ public class ShengwangHttpUtil {
Map<String,String> reqMap = new HashMap<>(); Map<String,String> reqMap = new HashMap<>();
reqMap.put("pid",shengwangProperties.getPid()); reqMap.put("pid", pid);
reqMap.put("licenseKey",licenseKey); reqMap.put("licenseKey",licenseKey);
reqMap.put("appid",shengwangProperties.getAppId()); reqMap.put("appid",shengwangProperties.getAppId());
String toParams = HttpUtil.toParams(reqMap); String toParams = HttpUtil.toParams(reqMap);
......
package com.makeit.shengwang.agora.rtm;
import com.makeit.config.LinuxCondition;
import com.makeit.enums.redis.RedisConst;
import com.makeit.utils.old.StringUtils;
import com.makeit.utils.redis.RedisUtil;
import io.agora.rtm.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@Conditional(LinuxCondition.class)
public class RtmInstance {
private static String APP_ID;
private static String UID;
private static RtmClient RTM_CLIENT;
private RtmInstance() {
}
@PostConstruct
private void init() {
try {
RTM_CLIENT = RtmClient.createInstance(APP_ID, new RtmClientListener() {
@Override
public void onConnectionStateChanged(int state, int reason) {
log.info("on connection state changed to " + state + " reason: " + reason);
}
@Override
public void onMessageReceived(RtmMessage rtmMessage, String peerId) {
String msg = rtmMessage.getText();
log.info("Receive message: " + msg + " from " + peerId);
}
@Override
public void onTokenExpired() {
log.info("on token expired");
}
// 当前使用的 RTM Token 还有 30 秒过期
@Override
public void onTokenPrivilegeWillExpire() {
RedisUtil.delete(RedisConst.RTM_RESIDENT_USER_TOKEN_PREFIX + UID);
RedisUtil.delete(RedisConst.RTM_RESIDENT_USER_LOGIN_STATUS_PREFIX + UID);
logout();
}
@Override
public void onPeersOnlineStatusChanged(Map<String, Integer> peersStatus) {
}
});
} catch (Exception e) {
log.error("RtmClient初始化失败", e);
throw new RuntimeException("Need to check rtm sdk init process");
}
RTM_CLIENT.setLogFilter(RtmClient.LOG_FILTER_OFF);
log.info("rtm sdk init success");
}
public static void logout() {
RTM_CLIENT.logout(new ResultCallback<Void>() {
@Override
public void onSuccess(Void responseInfo) {
log.info("logout success!");
}
@Override
public void onFailure(ErrorInfo errorInfo) {
log.info("logout failed, error info = {}", errorInfo);
}
});
}
public static void sendPeerMessage(String token, String userId, String dst, String message) {
String key = RedisConst.RTM_RESIDENT_USER_LOGIN_STATUS_PREFIX + userId;
String value = RedisUtil.get(key);
if (StringUtils.isBlank(value)) {
RTM_CLIENT.login(token, userId, new ResultCallback<Void>() {
@Override
public void onSuccess(Void responseInfo) {
log.info("{} login success!", userId);
RedisUtil.set(key, "1", 1, TimeUnit.HOURS);
sendPeerMessage(dst, message);
}
@Override
public void onFailure(ErrorInfo errorInfo) {
log.info("{} login failure: {}!", userId, errorInfo);
}
});
} else {
sendPeerMessage(dst, message);
}
}
public static void sendPeerMessage(String dst, String message) {
RtmMessage msg = RTM_CLIENT.createMessage();
msg.setText(message);
RTM_CLIENT.sendMessageToPeer(dst, msg, new ResultCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
log.info("send message to peer success, message: {}!", message);
}
@Override
public void onFailure(ErrorInfo errorInfo) {
log.info("send message to peer failed, error info = {}", errorInfo);
}
});
}
@Value("${shengwang.appId}")
public void setAppId(String appId) {
RtmInstance.APP_ID = appId;
}
@Value("${shengwang.uid}")
public void setUID(String UID) {
RtmInstance.UID = UID;
}
}
\ No newline at end of file
package com.makeit.shengwang.agora.service; package com.makeit.shengwang.agora.service;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.makeit.config.ShengwangProperties; import com.makeit.config.ShengwangProperties;
import com.makeit.enums.redis.RedisConst;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO; import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.media.RtcTokenBuilder2; import com.makeit.shengwang.agora.media.RtcTokenBuilder2;
import com.makeit.shengwang.agora.rtm.RtmTokenBuilder2; import com.makeit.shengwang.agora.rtm.RtmTokenBuilder2;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO; import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.utils.redis.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Slf4j
@Component @Component
public class ShengwangService { public class ShengwangService {
public static final String REDIS_DEVICE_CALL = "device::call";
@Autowired @Autowired
private ShengwangProperties shengwangProperties; private ShengwangProperties shengwangProperties;
@Autowired
private StringRedisTemplate redisTemplate;
public PlatAlarmCallDeviceVO callingDeviceAuthIgnoreRtm(PlatCallingDeviceDTO dto) { public PlatAlarmCallDeviceVO callingDeviceAuthIgnoreRtm(PlatCallingDeviceDTO dto) {
String redisResult = redisTemplate.opsForValue().get(REDIS_DEVICE_CALL + dto.getUserId()); String key = RedisConst.RTM_RESIDENT_USER_TOKEN_PREFIX + dto.getUserId();
String redisResult = RedisUtil.get(key);
if (StringUtils.isNotBlank(redisResult)) { if (StringUtils.isNotBlank(redisResult)) {
return JSON.parseObject(redisResult,PlatAlarmCallDeviceVO.class); return JSON.parseObject(redisResult,PlatAlarmCallDeviceVO.class);
} }
...@@ -40,8 +47,7 @@ public class ShengwangService { ...@@ -40,8 +47,7 @@ public class ShengwangService {
platAlarmCallDeviceVO.setChannelName(format); platAlarmCallDeviceVO.setChannelName(format);
platAlarmCallDeviceVO.setAppId(shengwangProperties.getAppId()); platAlarmCallDeviceVO.setAppId(shengwangProperties.getAppId());
platAlarmCallDeviceVO.setUserId(dto.getUserId()); platAlarmCallDeviceVO.setUserId(dto.getUserId());
redisTemplate.opsForValue().set(REDIS_DEVICE_CALL + dto.getUserId(),JSON.toJSONString(platAlarmCallDeviceVO), RedisUtil.set(key, JSON.toJSONString(platAlarmCallDeviceVO), shengwangProperties.getTokenExpirationInSeconds(), TimeUnit.SECONDS);
shengwangProperties.getTokenExpirationInSeconds(), TimeUnit.SECONDS);
return platAlarmCallDeviceVO; return platAlarmCallDeviceVO;
} }
...@@ -59,4 +65,34 @@ public class ShengwangService { ...@@ -59,4 +65,34 @@ public class ShengwangService {
platAlarmCallDeviceVO.setChannelName(channelName); platAlarmCallDeviceVO.setChannelName(channelName);
return platAlarmCallDeviceVO; return platAlarmCallDeviceVO;
} }
public void disbandChannel(String channel) {
Map<String, Object> params = new HashMap<>();
params.put("appid", shengwangProperties.getAppId());
params.put("cname", channel);
params.put("privileges", Lists.newArrayList("join_channel"));
params.put("time", 0);
HttpResponse response = HttpRequest.post("http://api.sd-rtn.com/dev/v1/kicking-rule")
.header("Authorization", getAuthorizationHeader())
.body(JSONObject.toJSONString(params)).execute();
String body = response.body();
log.info("解散rtc频道 {}, resp:{}", channel, body);
}
public String getAuthorizationHeader() {
final String customerKey = shengwangProperties.getCustomerKey();
final String customerSecret = shengwangProperties.getCustomerSecret();
String plainCredentials = customerKey + ":" + customerSecret;
String base64Credentials = new String(Base64.getEncoder().encode(plainCredentials.getBytes()));
return "Basic " + base64Credentials;
}
public static void main(String[] args) {
final String customerKey = "b3b5f44e536a4fc191358926c6716b7b";
final String customerSecret = "bd81828a133140a58dfb04e9d80eba43";
String plainCredentials = customerKey + ":" + customerSecret;
String base64Credentials = new String(Base64.getEncoder().encode(plainCredentials.getBytes()));
System.out.println("base64Credentials = Basic " + base64Credentials);
}
} }
package com.makeit.shengwang.agora.signature;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
public class HmacSha {
// 将加密后的字节数组转换成字符串
public static String bytesToHex(byte[] bytes) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < bytes.length; i++) {
String hex = Integer.toHexString(bytes[i] & 0xFF);
if (hex.length() < 2) {
sb.append(0);
}
sb.append(hex);
}
return sb.toString();
}
// HMAC/SHA256 加密,返回加密后的字符串
public static String hmacSha256(String message, String secret) {
try {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(
"utf-8"), "HmacSHA256");
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(message.getBytes("utf-8"));
return bytesToHex(rawHmac);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
String request_body = "{\"noticeId\":\"884fecc29d7a477383bb68b355fc39aa\",\"productId\":1,\"eventType\":104,\"notifyMs\":1709882033147,\"payload\":{\"channelName\":\"test_webhook\",\"platform\":1,\"reason\":1,\"ts\":1560853385,\"uid\":12121212}}";
String secret = "secret";
System.out.println(hmacSha256(request_body, secret));
}
}
package com.makeit.startup.runner;
import com.makeit.config.ShengwangProperties;
import com.makeit.enums.redis.RedisConst;
import com.makeit.utils.redis.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
@Component
public class RtmRunner implements ApplicationRunner {
@Autowired
private ShengwangProperties shengwangProperties;
@Override
public void run(ApplicationArguments args) throws Exception {
String uid = shengwangProperties.getUid();
RedisUtil.delete(RedisConst.RTM_RESIDENT_USER_TOKEN_PREFIX + uid);
RedisUtil.delete(RedisConst.RTM_RESIDENT_USER_LOGIN_STATUS_PREFIX + uid);
}
}
...@@ -15,13 +15,13 @@ public class TestRunner implements ApplicationRunner { ...@@ -15,13 +15,13 @@ public class TestRunner implements ApplicationRunner {
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
logger.info("TestRunner run"); // logger.info("TestRunner run");
} }
@Order(1) @Order(1)
@Runner @Runner
public void run() throws Exception { public void run() throws Exception {
logger.info("TestRunner run 2"); // logger.info("TestRunner run 2");
} }
} }
...@@ -11,6 +11,7 @@ import com.makeit.dto.wechat.device.PlatDeviceSetupDTO; ...@@ -11,6 +11,7 @@ import com.makeit.dto.wechat.device.PlatDeviceSetupDTO;
import com.makeit.global.aspect.tenant.TenantIdIgnore; import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.service.platform.device.PlatDeviceService; import com.makeit.service.platform.device.PlatDeviceService;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO; import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO; import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.vo.platform.device.PlatChildDeviceListVO; import com.makeit.vo.platform.device.PlatChildDeviceListVO;
import com.makeit.vo.platform.device.PlatDeviceViewVO; import com.makeit.vo.platform.device.PlatDeviceViewVO;
...@@ -67,5 +68,13 @@ public class PlatDeviceChildrenController { ...@@ -67,5 +68,13 @@ public class PlatDeviceChildrenController {
return ApiResponseUtils.success(platDeviceService.callingDevice(dto)); return ApiResponseUtils.success(platDeviceService.callingDevice(dto));
} }
@ApiOperation("发送设备消息")
@PostMapping("sendPeerMessage")
@TenantIdIgnore
public ApiResponseEntity<Void> sendPeerMessage(@Validated @RequestBody RTMSendPeerMessageDTO dto) {
platDeviceService.deviceJoinChannel(dto);
return ApiResponseUtils.success();
}
} }
...@@ -6,10 +6,7 @@ import com.makeit.common.page.PageReqDTO; ...@@ -6,10 +6,7 @@ import com.makeit.common.page.PageReqDTO;
import com.makeit.common.page.PageVO; import com.makeit.common.page.PageVO;
import com.makeit.common.response.ApiResponseEntity; import com.makeit.common.response.ApiResponseEntity;
import com.makeit.common.response.ApiResponseUtils; import com.makeit.common.response.ApiResponseUtils;
import com.makeit.dto.platform.device.PlatDeviceAttrDTO; import com.makeit.dto.platform.device.*;
import com.makeit.dto.platform.device.PlatDeviceBindOrgDTO;
import com.makeit.dto.platform.device.PlatDeviceEditDTO;
import com.makeit.dto.platform.device.PlatDeviceQueryDTO;
import com.makeit.dto.wechat.device.PlatDeviceAttrWechatDTO; import com.makeit.dto.wechat.device.PlatDeviceAttrWechatDTO;
import com.makeit.dto.wechat.device.PlatDeviceNetAttrWechatDTO; import com.makeit.dto.wechat.device.PlatDeviceNetAttrWechatDTO;
import com.makeit.entity.platform.device.PlatDevice; import com.makeit.entity.platform.device.PlatDevice;
...@@ -131,5 +128,17 @@ public class PlatDeviceController { ...@@ -131,5 +128,17 @@ public class PlatDeviceController {
public ApiResponseEntity<PlatAlarmCallDeviceVO> getDeviceRtmToken(@RequestBody PlatCallingDeviceDTO dto) { public ApiResponseEntity<PlatAlarmCallDeviceVO> getDeviceRtmToken(@RequestBody PlatCallingDeviceDTO dto) {
return ApiResponseUtils.success(platDeviceService.getDeviceRtmToken(dto)); return ApiResponseUtils.success(platDeviceService.getDeviceRtmToken(dto));
} }
@ApiOperation("时间同步")
@PostMapping("timeSync")
@AuthIgnore
@TenantIdIgnore
public ApiResponseEntity<PlatDeviceTimeSyncDTO> timeSync() {
PlatDeviceTimeSyncDTO dto = new PlatDeviceTimeSyncDTO();
dto.setReceiveTime(System.currentTimeMillis());
dto.setSendTime(System.currentTimeMillis());
return ApiResponseUtils.success(dto);
}
} }
...@@ -88,6 +88,16 @@ public class PlatElderSleepController { ...@@ -88,6 +88,16 @@ public class PlatElderSleepController {
} }
@ApiOperation("测试") @ApiOperation("测试")
@GetMapping("test44")
@AuthIgnore
@TenantIdIgnore
public ApiResponseEntity<Void> test44(@RequestParam Integer month,
@RequestParam Integer day) {
platElderSleepService.test44(month, day);
return ApiResponseUtils.success();
}
@ApiOperation("测试")
@PostMapping("test5") @PostMapping("test5")
@AuthIgnore @AuthIgnore
@TenantIdIgnore @TenantIdIgnore
......
...@@ -2,7 +2,6 @@ package com.makeit.module.controller.wechat.device; ...@@ -2,7 +2,6 @@ package com.makeit.module.controller.wechat.device;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import com.makeit.common.dto.BaseIdDTO; import com.makeit.common.dto.BaseIdDTO;
import com.makeit.common.page.PageReqDTO; import com.makeit.common.page.PageReqDTO;
import com.makeit.common.page.PageVO; import com.makeit.common.page.PageVO;
...@@ -14,27 +13,26 @@ import com.makeit.dto.wechat.device.PlatDeviceEditWechatDTO; ...@@ -14,27 +13,26 @@ import com.makeit.dto.wechat.device.PlatDeviceEditWechatDTO;
import com.makeit.dto.wechat.device.PlatDeviceNetAttrWechatDTO; import com.makeit.dto.wechat.device.PlatDeviceNetAttrWechatDTO;
import com.makeit.dto.wechat.device.PlatDeviceSetupDTO; import com.makeit.dto.wechat.device.PlatDeviceSetupDTO;
import com.makeit.entity.platform.auth.PlatOrg; import com.makeit.entity.platform.auth.PlatOrg;
import com.makeit.entity.platform.auth.PlatRole;
import com.makeit.entity.platform.device.PlatDevice; import com.makeit.entity.platform.device.PlatDevice;
import com.makeit.enums.HeaderConst;
import com.makeit.global.annotation.AuthIgnore; import com.makeit.global.annotation.AuthIgnore;
import com.makeit.global.aspect.tenant.TenantIdIgnore; import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.module.iot.vo.DeviceProperties; import com.makeit.module.iot.vo.DeviceProperties;
import com.makeit.service.platform.auth.PlatOrgService; import com.makeit.service.platform.auth.PlatOrgService;
import com.makeit.service.platform.auth.PlatRoleService;
import com.makeit.service.platform.device.PlatDeviceService; import com.makeit.service.platform.device.PlatDeviceService;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO; import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO; import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.utils.data.validate.CollectionUtils; import com.makeit.utils.data.validate.CollectionUtils;
import com.makeit.utils.old.StringUtils;
import com.makeit.utils.request.RequestUtil;
import com.makeit.vo.platform.device.PlatDeviceListVO; import com.makeit.vo.platform.device.PlatDeviceListVO;
import com.makeit.vo.platform.device.PlatDeviceViewVO; import com.makeit.vo.platform.device.PlatDeviceViewVO;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
...@@ -133,5 +131,13 @@ public class PlatDeviceWechatController { ...@@ -133,5 +131,13 @@ public class PlatDeviceWechatController {
return ApiResponseUtils.success(platDeviceService.callingDevice(dto)); return ApiResponseUtils.success(platDeviceService.callingDevice(dto));
} }
@ApiOperation("发送设备消息")
@PostMapping("sendPeerMessage")
@TenantIdIgnore
public ApiResponseEntity<Void> sendPeerMessage(@Validated @RequestBody RTMSendPeerMessageDTO dto) {
platDeviceService.deviceJoinChannel(dto);
return ApiResponseUtils.success();
}
} }
package com.makeit.dto.platform.device;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
@Data
public class PlatDeviceTimeSyncDTO {
@ApiModelProperty(value = "接收时间")
private long receiveTime;
@ApiModelProperty(value = "发送时间")
private long sendTime;
}
...@@ -968,6 +968,8 @@ public class PlatUserServiceImpl extends ServiceImpl<PlatUserMapper, PlatUser> ...@@ -968,6 +968,8 @@ public class PlatUserServiceImpl extends ServiceImpl<PlatUserMapper, PlatUser>
getRoleList(userVO); getRoleList(userVO);
PlatTenant platTenant = platTenantService.getById(userVO.getTenantId()); PlatTenant platTenant = platTenantService.getById(userVO.getTenantId());
platTenant.setAppid(null);
platTenant.setSecret(null);
userVO.setPlatTenant(platTenant); userVO.setPlatTenant(platTenant);
return userVO; return userVO;
} }
......
...@@ -14,6 +14,7 @@ import com.makeit.entity.platform.device.PlatDevice; ...@@ -14,6 +14,7 @@ import com.makeit.entity.platform.device.PlatDevice;
import com.makeit.module.iot.vo.DeviceInfo; import com.makeit.module.iot.vo.DeviceInfo;
import com.makeit.module.iot.vo.DeviceProperties; import com.makeit.module.iot.vo.DeviceProperties;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO; import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO; import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.vo.platform.device.PlatChildDeviceListVO; import com.makeit.vo.platform.device.PlatChildDeviceListVO;
import com.makeit.vo.platform.device.PlatDeviceListVO; import com.makeit.vo.platform.device.PlatDeviceListVO;
...@@ -105,4 +106,10 @@ public interface PlatDeviceService extends IService<PlatDevice> { ...@@ -105,4 +106,10 @@ public interface PlatDeviceService extends IService<PlatDevice> {
* 故障处理 * 故障处理
*/ */
void handleFault(DeviceInfo deviceInfo); void handleFault(DeviceInfo deviceInfo);
/**
* 通过RTM向设备发送点对点消息(告知设备加入RTC频道)
*/
void deviceJoinChannel(RTMSendPeerMessageDTO dto);
} }
...@@ -65,8 +65,10 @@ import com.makeit.service.platform.space.PlatSpaceService; ...@@ -65,8 +65,10 @@ import com.makeit.service.platform.space.PlatSpaceService;
import com.makeit.service.saas.PlatTenantService; import com.makeit.service.saas.PlatTenantService;
import com.makeit.service.saas.SaasPidManageService; import com.makeit.service.saas.SaasPidManageService;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO; import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.http.ShengwangHttpUtil; import com.makeit.shengwang.agora.http.ShengwangHttpUtil;
import com.makeit.shengwang.agora.media.RtcTokenBuilder2; import com.makeit.shengwang.agora.media.RtcTokenBuilder2;
import com.makeit.shengwang.agora.rtm.RtmInstance;
import com.makeit.shengwang.agora.service.ShengwangService; import com.makeit.shengwang.agora.service.ShengwangService;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO; import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.utils.DeviceCacheUtil; import com.makeit.utils.DeviceCacheUtil;
...@@ -1345,4 +1347,26 @@ public class PlatDeviceServiceImpl extends ServiceImpl<PlatDeviceMapper, PlatDev ...@@ -1345,4 +1347,26 @@ public class PlatDeviceServiceImpl extends ServiceImpl<PlatDeviceMapper, PlatDev
} }
} }
@Override
public void deviceJoinChannel(RTMSendPeerMessageDTO dto) {
PlatDevice platDevice = getOne(new QueryWrapper<PlatDevice>().lambda()
.eq(PlatDevice::getOriDeviceId, dto.getDeviceId())
.last("limit 1"));
if (!com.makeit.module.iot.enums.DeviceState.online.getValue().equals(platDevice.getStatus())) {
throw new RuntimeException("设备:" + dto.getDeviceId() + "未在线");
}
Map<String, Object> params = new HashMap<>();
params.put("from", 2); // 1.web 2.uniapp 3.server 4.device
params.put("type", 1); // 1.进入 2.挂断
Map<String, Object> data = new HashMap<>();
data.put("sn", dto.getDeviceId());
data.put("token", dto.getRtcToken());
data.put("channel", dto.getChannel());
params.put("data", data);
PlatCallingDeviceDTO callingDeviceDTO = new PlatCallingDeviceDTO();
callingDeviceDTO.setUserId(shengwangProperties.getUid());
PlatAlarmCallDeviceVO rtmToken = this.getDeviceRtmToken(callingDeviceDTO);
RtmInstance.sendPeerMessage(rtmToken.getAccessToken(), rtmToken.getUserId(), dto.getDeviceId(), JSONObject.toJSONString(params));
}
} }
...@@ -22,4 +22,5 @@ public interface PlatElderSleepService extends IService<PlatElderSleep> { ...@@ -22,4 +22,5 @@ public interface PlatElderSleepService extends IService<PlatElderSleep> {
String calculateScores(long daySleepTime, long dayRestTime, long deepTime, long soberTime, long lightTime, String calculateScores(long daySleepTime, long dayRestTime, long deepTime, long soberTime, long lightTime,
SaasSleepEvaluateStandardReport evaluateStandardReport); SaasSleepEvaluateStandardReport evaluateStandardReport);
void test44(Integer month, Integer day);
} }
...@@ -99,7 +99,7 @@ public class DeviceLogServiceImpl implements DeviceLogService { ...@@ -99,7 +99,7 @@ public class DeviceLogServiceImpl implements DeviceLogService {
TreeMap<String, AnalysisVO> result = new TreeMap<>(); TreeMap<String, AnalysisVO> result = new TreeMap<>();
collect1.forEach(result::put); collect1.forEach(result::put);
getDayMinute(1,10).forEach(minute -> { getDayMinute(3,21).forEach(minute -> {
boolean key = result.containsKey(minute); boolean key = result.containsKey(minute);
if (!key) { if (!key) {
AnalysisVO analysisVO = buildDefaultData(minute); AnalysisVO analysisVO = buildDefaultData(minute);
......
...@@ -159,6 +159,10 @@ public class PlatElderBreatheAnalysisServiceImpl extends ServiceImpl<PlatElderBr ...@@ -159,6 +159,10 @@ public class PlatElderBreatheAnalysisServiceImpl extends ServiceImpl<PlatElderBr
deviceInfoContentBreatheList.forEach(deviceInfo -> { deviceInfoContentBreatheList.forEach(deviceInfo -> {
DeviceInfoContentBreathe.Properties deviceProperties = deviceInfo.getProperties(); DeviceInfoContentBreathe.Properties deviceProperties = deviceInfo.getProperties();
if (deviceProperties == null || deviceProperties.getPerson() == null
|| deviceProperties.getHr() == null || deviceProperties.getBr() == null) {
return;
}
int br = deviceProperties.getBr(); int br = deviceProperties.getBr();
int hr = deviceProperties.getHr(); int hr = deviceProperties.getHr();
Integer hasPerson = deviceProperties.getPerson(); // 0无人,1有人 Integer hasPerson = deviceProperties.getPerson(); // 0无人,1有人
......
...@@ -123,6 +123,10 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper, ...@@ -123,6 +123,10 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper,
for (DeviceInfoContentBreathe infoContentBreathe : deviceInfoContentBreathes) { for (DeviceInfoContentBreathe infoContentBreathe : deviceInfoContentBreathes) {
// 体动指数 // 体动指数
breatheProperties = infoContentBreathe.getProperties(); breatheProperties = infoContentBreathe.getProperties();
if (breatheProperties == null || breatheProperties.getPerson() == null
|| breatheProperties.getBr() == null || breatheProperties.getHr() == null) {
continue;
}
Integer bodyMove = breatheProperties.getBodymove(); Integer bodyMove = breatheProperties.getBodymove();
int br = breatheProperties.getBr(); int br = breatheProperties.getBr();
int hr = breatheProperties.getHr(); int hr = breatheProperties.getHr();
...@@ -240,8 +244,9 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper, ...@@ -240,8 +244,9 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper,
for (Map.Entry<String, AnalysisVO> entry : totalMap.entrySet()) { for (Map.Entry<String, AnalysisVO> entry : totalMap.entrySet()) {
AnalysisVO analysisVO = entry.getValue(); AnalysisVO analysisVO = entry.getValue();
boolean isValid = analysisVO.getAvgBr() == 255 && analysisVO.getAvgHr() == 255;
// 非离床且体动值<20 // 非离床且体动值<20
if (!analysisVO.getIsAction() && !analysisVO.getIsMoveBed()) { if (!analysisVO.getIsAction() && !analysisVO.getIsMoveBed() && !isValid) {
if (StrUtil.isBlank(startSleepTime)) { if (StrUtil.isBlank(startSleepTime)) {
startSleepTime = entry.getKey(); startSleepTime = entry.getKey();
} }
...@@ -955,6 +960,44 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper, ...@@ -955,6 +960,44 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper,
return result; return result;
} }
/**
* iot线上机器数据插入表,方便分析睡眠
* @param month
* @param day
*/
@Override
public void test44(Integer month, Integer day) {
List<String> dayHourRangeList = getLastDayHourRange(month, day);
String oriDeviceId = "H6V01241050037T";
SaasSleepAnalysisModel analysisModel = saasSleepAnalysisModelService.getOne(new QueryWrapper<SaasSleepAnalysisModel>().lambda()
.orderByDesc(BaseEntity::getCreateBy)
.last("limit 1"));
for (String hourRange : dayHourRangeList) {
String[] hourRangeArray = hourRange.split("~");
List<DeviceOperationLogEntity> deviceOperationLogEntities = productDeviceService.getDeviceLogByTimeRange(oriDeviceId,
"reportProperty", 10000, hourRangeArray[0], hourRangeArray[1]);
if (CollUtil.isEmpty(deviceOperationLogEntities)) {
continue;
}
List<DeviceInfoContentBreathe> deviceInfoContentBreatheList = deviceOperationLogEntities.stream()
.filter(deviceOperationLogEntity -> deviceOperationLogEntity.getType().contains("reportProperty"))
.map(deviceOperationLogEntity -> {
DeviceInfoContentBreathe deviceInfoContentBreathe = JsonUtil.toObj((String) deviceOperationLogEntity.getContent(), DeviceInfoContentBreathe.class);
assert deviceInfoContentBreathe != null;
deviceInfoContentBreathe.setReportTime(formatLongTime(deviceInfoContentBreathe.getTimestamp()));
return deviceInfoContentBreathe;
}).collect(Collectors.toList());
Map<String, List<DeviceInfoContentBreathe>> minuteMap = StreamUtil.groupBy(deviceInfoContentBreatheList, DeviceInfoContentBreathe::getReportTime);
deviceLogService.save(deviceOperationLogEntities);
deviceOperationLogEntities.clear();
TreeMap<String, AnalysisVO> statisticsMap = getPerMinuteData(minuteMap, analysisModel);
if (CollUtil.isNotEmpty(statisticsMap)) {
//测试环境数据 device_minute_info
deviceLogService.saveEntity(statisticsMap, "1765616197461037058");
}
}
}
@Override @Override
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@TenantIdIgnore @TenantIdIgnore
...@@ -1024,6 +1067,7 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper, ...@@ -1024,6 +1067,7 @@ public class PlatElderSleepServiceImpl extends ServiceImpl<PlatElderSleepMapper,
if (CollUtil.isEmpty(totalMap)) { if (CollUtil.isEmpty(totalMap)) {
continue; continue;
} }
// 测试环境数据 device_minute_info totalMap = deviceLogService.getData();
// 没有上报数据的时间,填默认值 // 没有上报数据的时间,填默认值
deviceLogService.fillDefaultData(totalMap, month, day); deviceLogService.fillDefaultData(totalMap, month, day);
// 记录长者不同类型的睡眠时间 // 记录长者不同类型的睡眠时间
......
...@@ -71,7 +71,12 @@ public class PlatRegionSettingServiceImpl extends ServiceImpl<PlatRegionSettingM ...@@ -71,7 +71,12 @@ public class PlatRegionSettingServiceImpl extends ServiceImpl<PlatRegionSettingM
platDeviceOthers = platDeviceOthers.stream().filter(platDeviceOther -> StringUtils.isNotEmpty(platDeviceOther.getAttribute())).collect(Collectors.toList()); platDeviceOthers = platDeviceOthers.stream().filter(platDeviceOther -> StringUtils.isNotEmpty(platDeviceOther.getAttribute())).collect(Collectors.toList());
Map<String,String> map = platDeviceOthers.stream().collect(Collectors.toMap(PlatDeviceOther::getDeviceId,PlatDeviceOther::getAttribute)); Map<String,String> map = platDeviceOthers.stream().collect(Collectors.toMap(PlatDeviceOther::getDeviceId,PlatDeviceOther::getAttribute));
List<PlatRegionSetting> list = new ArrayList<>(); List<PlatRegionSetting> list = new ArrayList<>();
List<PlatRegionSetting> settings = list(new QueryWrapper<PlatRegionSetting>().lambda().eq(PlatRegionSetting::getRoomId, roomId).in(PlatRegionSetting::getDeviceId, listDeviceId));
Map<String, PlatRegionSetting> settingMap = settings.stream().collect(Collectors.toMap(PlatRegionSetting::getDeviceId, Function.identity(), (k1, k2) -> k1));
for (String item : listDeviceId) { for (String item : listDeviceId) {
if (settingMap.containsKey(item)) {
continue;
}
PlatRegionSetting platRegionSetting = new PlatRegionSetting(); PlatRegionSetting platRegionSetting = new PlatRegionSetting();
platRegionSetting.setDeviceId(item); platRegionSetting.setDeviceId(item);
platRegionSetting.setRoomId(roomId); platRegionSetting.setRoomId(roomId);
......
...@@ -57,7 +57,7 @@ ...@@ -57,7 +57,7 @@
</insert> </insert>
<select id="getData" resultType="com.makeit.module.iot.vo.analysis.AnalysisVO"> <select id="getData" resultType="com.makeit.module.iot.vo.analysis.AnalysisVO">
select * from device_minute_info where device_id = '1732647368962203650' order by created_time select * from device_minute_info where device_id = '1765616197461037058' order by created_time
</select> </select>
......
...@@ -19,6 +19,7 @@ import com.makeit.external.strategy.OpenApiBaseStrategy; ...@@ -19,6 +19,7 @@ import com.makeit.external.strategy.OpenApiBaseStrategy;
import com.makeit.external.strategy.OpenApiBaseStrategyFactory; import com.makeit.external.strategy.OpenApiBaseStrategyFactory;
import com.makeit.global.aspect.tenant.TenantIdIgnore; import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.module.iot.enums.DeviceState; import com.makeit.module.iot.enums.DeviceState;
import com.makeit.module.iot.service.IotDevicePropertiesOperateService;
import com.makeit.module.iot.vo.DeviceInfo; import com.makeit.module.iot.vo.DeviceInfo;
import com.makeit.module.iot.vo.HeaderInfo; import com.makeit.module.iot.vo.HeaderInfo;
import com.makeit.service.platform.alarm.PlatAlarmConfigService; import com.makeit.service.platform.alarm.PlatAlarmConfigService;
...@@ -41,6 +42,7 @@ import org.springframework.stereotype.Component; ...@@ -41,6 +42,7 @@ import org.springframework.stereotype.Component;
import javax.annotation.Resource; import javax.annotation.Resource;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
...@@ -69,6 +71,7 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -69,6 +71,7 @@ public class PushCallback implements MqttCallbackExtended {
public static final String WRITE_PROPERTY = "WRITE_PROPERTY"; public static final String WRITE_PROPERTY = "WRITE_PROPERTY";
public static final String WRITE_PROPERTY_REPLY = "WRITE_PROPERTY_REPLY"; public static final String WRITE_PROPERTY_REPLY = "WRITE_PROPERTY_REPLY";
public static final String HEARTBEAT = "LOG";
public static final String OPEN_API_KEY_COUNT = "open:api:key:count"; public static final String OPEN_API_KEY_COUNT = "open:api:key:count";
...@@ -98,6 +101,8 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -98,6 +101,8 @@ public class PushCallback implements MqttCallbackExtended {
@Autowired @Autowired
private PlatTenantService platTenantService; private PlatTenantService platTenantService;
@Autowired @Autowired
private IotDevicePropertiesOperateService devicePropertiesOperateService;
@Autowired
private ThreadPoolTaskExecutor taskExecutor; private ThreadPoolTaskExecutor taskExecutor;
@Override @Override
...@@ -109,8 +114,7 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -109,8 +114,7 @@ public class PushCallback implements MqttCallbackExtended {
public void messageArrived(String topic, MqttMessage message) { public void messageArrived(String topic, MqttMessage message) {
try { try {
// 收到消息并设置返回字符串格式 // 收到消息并设置返回字符串格式
String payload = new String(message.getPayload(), "UTF-8"); String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
//logger.info("接收消息内容:payload格式:{}", payload); //logger.info("接收消息内容:payload格式:{}", payload);
// 解析数据 // 解析数据
...@@ -124,6 +128,8 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -124,6 +128,8 @@ public class PushCallback implements MqttCallbackExtended {
handleStatus(device); handleStatus(device);
heartbeat(device);
CompletableFuture.runAsync(() -> { CompletableFuture.runAsync(() -> {
checkAlarm(device); checkAlarm(device);
}, taskExecutor); }, taskExecutor);
...@@ -193,22 +199,35 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -193,22 +199,35 @@ public class PushCallback implements MqttCallbackExtended {
Long timestamp = device.getTimestamp(); Long timestamp = device.getTimestamp();
long currentSecond = timestamp / 1000; long currentSecond = timestamp / 1000;
// 先通过产品名称判断 // 先通过产品名称判断
JSONObject properties = device.getProperties();
if (properties == null) {
return;
}
if (REPORT_PROPERTY.equals(device.getMessageType()) && headers.getProductName().contains("呼吸")) { if (REPORT_PROPERTY.equals(device.getMessageType()) && headers.getProductName().contains("呼吸")) {
if (!properties.containsKey("hr") || !properties.containsKey("br")) {
return;
}
cacheBrDeviceData(device, currentSecond); cacheBrDeviceData(device, currentSecond);
redisTemplate.opsForValue().set(DEVICE_BR_DATA + device.getDeviceId(),JSON.toJSONString(device.getProperties()), redisTemplate.opsForValue().set(DEVICE_BR_DATA + device.getDeviceId(),JSON.toJSONString(properties),
5000, TimeUnit.MILLISECONDS); 5000, TimeUnit.MILLISECONDS);
} }
if (REPORT_PROPERTY.equals(device.getMessageType()) && headers.getProductName().contains("空间")) { if (REPORT_PROPERTY.equals(device.getMessageType()) && headers.getProductName().contains("空间")) {
if (!properties.containsKey("personState")) {
return;
}
cacheSpaceFallDeviceData(DEVICE_SPACE_TEMP_DATA, device, currentSecond); cacheSpaceFallDeviceData(DEVICE_SPACE_TEMP_DATA, device, currentSecond);
redisTemplate.opsForValue().set(DEVICE_SPACE_DATA + device.getDeviceId(),JSON.toJSONString(device.getProperties()), redisTemplate.opsForValue().set(DEVICE_SPACE_DATA + device.getDeviceId(),JSON.toJSONString(properties),
5000, TimeUnit.MILLISECONDS); 5000, TimeUnit.MILLISECONDS);
} }
if (REPORT_PROPERTY.equals(device.getMessageType()) && headers.getProductName().contains("跌倒")) { if (REPORT_PROPERTY.equals(device.getMessageType()) && headers.getProductName().contains("跌倒")) {
if (!properties.containsKey("personState")) {
return;
}
cacheSpaceFallDeviceData(DEVICE_FALL_TEMP_DATA, device, currentSecond); cacheSpaceFallDeviceData(DEVICE_FALL_TEMP_DATA, device, currentSecond);
redisTemplate.opsForValue().set(DEVICE_FALL_DATA + device.getDeviceId(),JSON.toJSONString(device.getProperties()), redisTemplate.opsForValue().set(DEVICE_FALL_DATA + device.getDeviceId(),JSON.toJSONString(properties),
5000, TimeUnit.MILLISECONDS); 5000, TimeUnit.MILLISECONDS);
} }
String deviceId = device.getDeviceId(); String deviceId = device.getDeviceId();
...@@ -220,7 +239,6 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -220,7 +239,6 @@ public class PushCallback implements MqttCallbackExtended {
for (HeaderInfo.Bind binding : bindings) { for (HeaderInfo.Bind binding : bindings) {
try { try {
String iot_tenantId = binding.getId(); String iot_tenantId = binding.getId();
JSONObject properties = device.getProperties();
// 设备上报到第三方,一分钟一次 // 设备上报到第三方,一分钟一次
reportBrData(device, iot_tenantId); reportBrData(device, iot_tenantId);
...@@ -288,6 +306,26 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -288,6 +306,26 @@ public class PushCallback implements MqttCallbackExtended {
} }
/** /**
* 心跳上报
*/
private void heartbeat(DeviceInfo device) {
CompletableFuture.runAsync(() -> {
if (!HEARTBEAT.equals(device.getMessageType())) {
return;
}
String deviceId = device.getDeviceId();
String key = RedisConst.PLAT_IOT_DEVICE_HEARTBEAT_PREFIX + deviceId;
if (RedisUtil.exist(key)) {
return;
}
// 正常应该一分钟上报一次,避免频繁读取设备属性造成压力
RedisUtil.set(key, 1, 55, TimeUnit.SECONDS);
List<String> properties = Lists.newArrayList("sysNetworkType", "sysRssi", "sysCsq");
devicePropertiesOperateService.deviceRead(deviceId, properties);
}, taskExecutor);
}
/**
* 离床预警 * 离床预警
* @param platDevice * @param platDevice
* @param properties * @param properties
......
package com.makeit.shengwang;
import com.alibaba.fastjson.JSONObject;
import com.makeit.common.response.ApiResponseEntity;
import com.makeit.common.response.ApiResponseUtils;
import com.makeit.config.ShengwangProperties;
import com.makeit.enums.redis.RedisConst;
import com.makeit.global.annotation.AuthIgnore;
import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.shengwang.agora.dto.WebhookDTO;
import com.makeit.shengwang.agora.service.ShengwangService;
import com.makeit.shengwang.agora.signature.HmacSha;
import com.makeit.utils.redis.RedisUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@Api(tags = "声网")
@RestController
@RequestMapping("/shengwang")
public class ShengWangController {
@Autowired
private ShengwangService shengwangService;
@Autowired
private ShengwangProperties shengwangProperties;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@ApiOperation("事件回调")
@AuthIgnore
@TenantIdIgnore
@PostMapping("/notify")
public ApiResponseEntity<Void> notifyCallback(@RequestHeader("Agora-Signature-V2") String signature,
@RequestBody String message) {
log.info("声网RTC频道事件回调:signature:{}, message:{}", signature, message);
WebhookDTO webhookDTO = JSONObject.parseObject(message, WebhookDTO.class);
boolean absent = RedisUtil.setIfAbsent(RedisConst.RTC_EVENT_CALLBACK_PREFIX + webhookDTO.getNoticeId(), 1, 1, TimeUnit.MINUTES);
if (!absent) {
return ApiResponseUtils.success();
}
String hmacSha256 = HmacSha.hmacSha256(message, shengwangProperties.getCallbackSecret());
boolean equals = hmacSha256.equals(signature);
log.info("声网RTC频道事件回调签名校验结果:{}", equals);
if (!equals) {
return ApiResponseUtils.success();
}
JSONObject payload = webhookDTO.getPayload();
String channelName = payload.getString("channelName");
String uid = payload.getString("uid");
Integer platform = payload.getInteger("platform"); // 6:linux
Integer clientType = payload.getInteger("clientType"); // 8:小程序
// 测试事件
if ("test_webhook".equals(channelName) && "12121212".equals(uid)) {
return ApiResponseUtils.success();
}
// 主播离开频道 如果是小程序直接解散
if (webhookDTO.getEventType().equals(104) && platform.equals(6) && clientType.equals(8)) {
CompletableFuture.runAsync(() -> {
shengwangService.disbandChannel(channelName);
}, taskExecutor);
}
return ApiResponseUtils.success();
}
}
...@@ -115,7 +115,16 @@ iot: ...@@ -115,7 +115,16 @@ iot:
clientId: fyxmb5h52iKwE2Hi clientId: fyxmb5h52iKwE2Hi
secureKey: 22fZbnH36wdHn7ZTyKKHraFw233npcez secureKey: 22fZbnH36wdHn7ZTyKKHraFw233npcez
sync: sync:
enable: true enable: false
# 线上iot
#iot:
# url: https://iot.quanthium.com.cn/api/
# uploadUrl: https://saas.qa.insightica.cn/api/saas/device/devicePushLog
# clientId: SGtXMT3nXNjDrexH
# secureKey: SK48ztMZDMEs8FmaPHjGQGmQBA4CjrPt
# sync:
# enable: false
mqtt: mqtt:
username: admin|1693982115969 username: admin|1693982115969
...@@ -160,12 +169,13 @@ shengwang: ...@@ -160,12 +169,13 @@ shengwang:
appId: 883078934ecd4193aa7a62a3cdacd810 appId: 883078934ecd4193aa7a62a3cdacd810
appCertificate: b29be69c9c034120a68f1d5c199d2e74 appCertificate: b29be69c9c034120a68f1d5c199d2e74
channelName: 1 channelName: 1
uid: 0 uid: 10000 #RTM用户id,不同应用请修改
tokenExpirationInSeconds: 3600 tokenExpirationInSeconds: 3600
privilegeExpirationInSeconds: 3600 privilegeExpirationInSeconds: 3600
customerKey: b3b5f44e536a4fc191358926c6716b7b customerKey: b3b5f44e536a4fc191358926c6716b7b
customerSecret: bd81828a133140a58dfb04e9d80eba43 customerSecret: bd81828a133140a58dfb04e9d80eba43
pid: 9851781E9E31453DA3C572A4A4AF9402 pid: 9851781E9E31453DA3C572A4A4AF9402
callbackSecret: bv5SbvvhJ
aliyun: aliyun:
......
...@@ -159,12 +159,13 @@ shengwang: ...@@ -159,12 +159,13 @@ shengwang:
appId: 883078934ecd4193aa7a62a3cdacd810 appId: 883078934ecd4193aa7a62a3cdacd810
appCertificate: b29be69c9c034120a68f1d5c199d2e74 appCertificate: b29be69c9c034120a68f1d5c199d2e74
channelName: 1 channelName: 1
uid: 0 uid: 10001 #RTM用户id,不同应用请修改
tokenExpirationInSeconds: 3600 tokenExpirationInSeconds: 3600
privilegeExpirationInSeconds: 3600 privilegeExpirationInSeconds: 3600
customerKey: b3b5f44e536a4fc191358926c6716b7b customerKey: b3b5f44e536a4fc191358926c6716b7b
customerSecret: bd81828a133140a58dfb04e9d80eba43 customerSecret: bd81828a133140a58dfb04e9d80eba43
pid: 9851781E9E31453DA3C572A4A4AF9402 pid: 9851781E9E31453DA3C572A4A4AF9402
callbackSecret: bv5SbvvhJ
aliyun: aliyun:
oss: oss:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment