Commit 161d8467 by 罗志长

feat: 小程序语音通话

parent 96edab64
......@@ -7,16 +7,19 @@
- MySql 5.7+ (初始化脚本db/init/init.sql)
- ElasticSearch 6.8.11
- Kibana 6.8.11 (可选)
- GLIBC 2.18(声网RTM SDK需要),使用strings /lib64/libc.so.6 | grep -i '^glibc'命令查看
## 项目打包
- 拉取源码http://git.xmmakeit.com/huangjiay/iot-platform-server.git
- 切换分支(master生产环境,dev测试环境)
- 首次请将lib/agora_rtm.jar依赖安装到本地maven仓库,mvn install:install-file -Dfile=lib/agora_rtm.jar -DgroupId=io.agora.rtm -DartifactId=agora-rtm-sdk -Dversion=1.0 -Dpackaging=jar
- 在项目根目录下执行mvn clean package进行打包
## 部署
- 在服务器上创建目录/opt/iot-platform-server
- 创建lib目录,将lib\libagora_rtm_sdk.so上传至/opt/iot-platform-serve/lib目录下
- 将jar包上传至服务器/opt/iot-platform-server目录下
- 正式环境在/opt/iot-platform-server/config目录下新建application.yaml文件,请注意修改对应配置信息!!!
- application.yaml
......@@ -148,6 +151,7 @@
customerKey: b3b5f44e536a4fc191358926c6716b7b
customerSecret: bd81828a133140a58dfb04e9d80eba43
pid: C567D6F40C6E48E293AA4F9B7AE9BA30
callbackSecret: bv5SbvvhJ
aliyun:
oss:
......@@ -169,12 +173,12 @@
- 测试环境
```sh
nohup java -jar server-web.jar --spring.profiles.active=test &
nohup java -Djava.library.path=lib/ -jar server-web.jar --spring.profiles.active=test &
```
- 正式环境
```sh
nohup java -jar server-web.jar &
nohup java -Djava.library.path=lib/ -jar server-web.jar &
```
No preview for this file type
The file could not be displayed because it is too large.
......@@ -302,6 +302,12 @@
<version>${elasticsearch.version}</version>
</dependency>
<dependency>
<groupId>io.agora.rtm</groupId>
<artifactId>agora-rtm-sdk</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</dependencyManagement>
......
......@@ -189,6 +189,11 @@
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
<dependency>
<groupId>io.agora.rtm</groupId>
<artifactId>agora-rtm-sdk</artifactId>
</dependency>
</dependencies>
<profiles>
......
......@@ -19,6 +19,7 @@ public class ShengwangProperties {
private String customerSecret;
private int tokenExpirationInSeconds;
private int privilegeExpirationInSeconds;
private String callbackSecret;
}
......@@ -111,7 +111,7 @@ public class RedisConst {
public static final String PLAT_IOT_DEVICE_FAULT_STATE_PREFIX = "plat:iot:device:faultState:";
public static final String RTC_EVENT_CALLBACK_PREFIX = "rtc:event:callback:";
}
......
package com.makeit.shengwang.agora.dto;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
@Data
public class RTMSendPeerMessageDTO {
@NotBlank
@ApiModelProperty(value = "RTM userId")
private String userId;
@NotBlank
@ApiModelProperty(value = "设备id")
private String deviceId;
@NotBlank
@ApiModelProperty(value = "RTC token")
private String rtcToken;
@NotBlank
@ApiModelProperty(value = "RTM token")
private String rtmToken;
@NotBlank
@ApiModelProperty(value = "频道")
private String channel;
}
package com.makeit.shengwang.agora.dto;
import com.alibaba.fastjson.JSONObject;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
@Data
public class WebhookDTO {
@NotBlank
@ApiModelProperty(value = "通知 ID,标识来自声网业务服务器的一次事件通知")
private String noticeId;
@NotNull
@ApiModelProperty(value = "业务 ID。值为 1 表示实时通信业务")
private Integer productId;
@NotNull
@ApiModelProperty(value = "通知的事件类型")
private Integer eventType;
@NotNull
@ApiModelProperty(value = "声网消息服务器向你的服务器发送事件通知的 Unix 时间戳 (ms)。通知重试时该值会更新")
private Long notifyMs;
@ApiModelProperty(value = "会话 ID")
private String sid;
@NotNull
@ApiModelProperty(value = "通知事件的具体内容。payload 因 eventType 而异")
private JSONObject payload;
}
package com.makeit.shengwang.agora.rtm;
import io.agora.rtm.*;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
public class RtmInstance {
public static final String APP_ID = "883078934ecd4193aa7a62a3cdacd810";
private static final RtmClient rtmClient;
static {
rtmClient = init();
}
private RtmInstance() {
}
private static RtmClient init() {
RtmClient rtmClient = null;
try {
rtmClient = RtmClient.createInstance(APP_ID, new RtmClientListener() {
@Override
public void onConnectionStateChanged(int state, int reason) {
log.info("on connection state changed to " + state + " reason: " + reason);
}
@Override
public void onMessageReceived(RtmMessage rtmMessage, String peerId) {
String msg = rtmMessage.getText();
log.info("Receive message: " + msg + " from " + peerId);
}
@Override
public void onTokenExpired() {
log.info("on token expired");
}
@Override
public void onTokenPrivilegeWillExpire() {
}
@Override
public void onPeersOnlineStatusChanged(Map<String, Integer> peersStatus) {
}
});
} catch (Exception e) {
log.error("RtmClient初始化失败", e);
throw new RuntimeException("Need to check rtm sdk init process");
}
rtmClient.setLogFilter(RtmClient.LOG_FILTER_OFF);
log.info("rtm sdk init success");
return rtmClient;
}
public static void logout() {
rtmClient.logout(new ResultCallback<Void>() {
@Override
public void onSuccess(Void responseInfo) {
log.info("logout success!");
}
@Override
public void onFailure(ErrorInfo errorInfo) {
log.info("logout failed, error info = {}", errorInfo);
}
});
}
public static void sendPeerMessage(String token, String userId, String dst, String message) {
rtmClient.login(token, userId, new ResultCallback<Void>() {
@Override
public void onSuccess(Void responseInfo) {
log.info("{} login success!", userId);
RtmMessage msg = rtmClient.createMessage();
msg.setText(message);
rtmClient.sendMessageToPeer(dst, msg, new ResultCallback<Void>() {
@Override
public void onSuccess(Void aVoid) {
log.info("send message to peer success, message: {}!", message);
logout();
}
@Override
public void onFailure(ErrorInfo errorInfo) {
log.info("send message to peer failed, error info = {}", errorInfo);
}
});
}
@Override
public void onFailure(ErrorInfo errorInfo) {
log.info("{} login failure: {}!", userId, errorInfo);
}
});
}
}
\ No newline at end of file
package com.makeit.shengwang.agora.service;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import com.makeit.config.ShengwangProperties;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.media.RtcTokenBuilder2;
import com.makeit.shengwang.agora.rtm.RtmTokenBuilder2;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
......@@ -13,8 +18,12 @@ import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
public class ShengwangService {
......@@ -59,4 +68,34 @@ public class ShengwangService {
platAlarmCallDeviceVO.setChannelName(channelName);
return platAlarmCallDeviceVO;
}
public void disbandChannel(String channel) {
Map<String, Object> params = new HashMap<>();
params.put("appid", shengwangProperties.getAppId());
params.put("cname", channel);
params.put("privileges", Lists.newArrayList("join_channel"));
params.put("time", 0);
HttpResponse response = HttpRequest.post("http://api.sd-rtn.com/dev/v1/kicking-rule")
.header("Authorization", getAuthorizationHeader())
.body(JSONObject.toJSONString(params)).execute();
String body = response.body();
log.info("解散rtc频道 {}, resp:{}", channel, body);
}
public String getAuthorizationHeader() {
final String customerKey = shengwangProperties.getCustomerKey();
final String customerSecret = shengwangProperties.getCustomerSecret();
String plainCredentials = customerKey + ":" + customerSecret;
String base64Credentials = new String(Base64.getEncoder().encode(plainCredentials.getBytes()));
return "Basic " + base64Credentials;
}
public static void main(String[] args) {
final String customerKey = "b3b5f44e536a4fc191358926c6716b7b";
final String customerSecret = "bd81828a133140a58dfb04e9d80eba43";
String plainCredentials = customerKey + ":" + customerSecret;
String base64Credentials = new String(Base64.getEncoder().encode(plainCredentials.getBytes()));
System.out.println("base64Credentials = Basic " + base64Credentials);
}
}
package com.makeit.shengwang.agora.signature;
import javax.crypto.Mac;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
public class HmacSha {
// 将加密后的字节数组转换成字符串
public static String bytesToHex(byte[] bytes) {
StringBuffer sb = new StringBuffer();
for (int i = 0; i < bytes.length; i++) {
String hex = Integer.toHexString(bytes[i] & 0xFF);
if (hex.length() < 2) {
sb.append(0);
}
sb.append(hex);
}
return sb.toString();
}
// HMAC/SHA256 加密,返回加密后的字符串
public static String hmacSha256(String message, String secret) {
try {
SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(
"utf-8"), "HmacSHA256");
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(signingKey);
byte[] rawHmac = mac.doFinal(message.getBytes("utf-8"));
return bytesToHex(rawHmac);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
String request_body = "{\"noticeId\":\"884fecc29d7a477383bb68b355fc39aa\",\"productId\":1,\"eventType\":104,\"notifyMs\":1709882033147,\"payload\":{\"channelName\":\"test_webhook\",\"platform\":1,\"reason\":1,\"ts\":1560853385,\"uid\":12121212}}";
String secret = "secret";
System.out.println(hmacSha256(request_body, secret));
}
}
......@@ -8,9 +8,11 @@ import com.makeit.common.response.ApiResponseEntity;
import com.makeit.common.response.ApiResponseUtils;
import com.makeit.dto.wechat.device.PlatChildDeviceDTO;
import com.makeit.dto.wechat.device.PlatDeviceSetupDTO;
import com.makeit.global.annotation.AuthIgnore;
import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.service.platform.device.PlatDeviceService;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.vo.platform.device.PlatChildDeviceListVO;
import com.makeit.vo.platform.device.PlatDeviceViewVO;
......@@ -67,5 +69,21 @@ public class PlatDeviceChildrenController {
return ApiResponseUtils.success(platDeviceService.callingDevice(dto));
}
@ApiOperation("设备获取登录RTMtoken")
@PostMapping("getDeviceRtmToken")
@AuthIgnore
@TenantIdIgnore
public ApiResponseEntity<PlatAlarmCallDeviceVO> getDeviceRtmToken(@RequestBody PlatCallingDeviceDTO dto) {
return ApiResponseUtils.success(platDeviceService.getDeviceRtmToken(dto));
}
@ApiOperation("发送设备消息")
@PostMapping("sendPeerMessage")
@TenantIdIgnore
public ApiResponseEntity<Void> sendPeerMessage(@Validated @RequestBody RTMSendPeerMessageDTO dto) {
platDeviceService.deviceJoinChannel(dto);
return ApiResponseUtils.success();
}
}
......@@ -2,7 +2,6 @@ package com.makeit.module.controller.wechat.device;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import com.makeit.common.dto.BaseIdDTO;
import com.makeit.common.page.PageReqDTO;
import com.makeit.common.page.PageVO;
......@@ -14,27 +13,26 @@ import com.makeit.dto.wechat.device.PlatDeviceEditWechatDTO;
import com.makeit.dto.wechat.device.PlatDeviceNetAttrWechatDTO;
import com.makeit.dto.wechat.device.PlatDeviceSetupDTO;
import com.makeit.entity.platform.auth.PlatOrg;
import com.makeit.entity.platform.auth.PlatRole;
import com.makeit.entity.platform.device.PlatDevice;
import com.makeit.enums.HeaderConst;
import com.makeit.global.annotation.AuthIgnore;
import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.module.iot.vo.DeviceProperties;
import com.makeit.service.platform.auth.PlatOrgService;
import com.makeit.service.platform.auth.PlatRoleService;
import com.makeit.service.platform.device.PlatDeviceService;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.utils.data.validate.CollectionUtils;
import com.makeit.utils.old.StringUtils;
import com.makeit.utils.request.RequestUtil;
import com.makeit.vo.platform.device.PlatDeviceListVO;
import com.makeit.vo.platform.device.PlatDeviceViewVO;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.stream.Collectors;
......@@ -133,5 +131,21 @@ public class PlatDeviceWechatController {
return ApiResponseUtils.success(platDeviceService.callingDevice(dto));
}
@ApiOperation("设备获取登录RTMtoken")
@PostMapping("getDeviceRtmToken")
@AuthIgnore
@TenantIdIgnore
public ApiResponseEntity<PlatAlarmCallDeviceVO> getDeviceRtmToken(@RequestBody PlatCallingDeviceDTO dto) {
return ApiResponseUtils.success(platDeviceService.getDeviceRtmToken(dto));
}
@ApiOperation("发送设备消息")
@PostMapping("sendPeerMessage")
@TenantIdIgnore
public ApiResponseEntity<Void> sendPeerMessage(@Validated @RequestBody RTMSendPeerMessageDTO dto) {
platDeviceService.deviceJoinChannel(dto);
return ApiResponseUtils.success();
}
}
......@@ -14,6 +14,7 @@ import com.makeit.entity.platform.device.PlatDevice;
import com.makeit.module.iot.vo.DeviceInfo;
import com.makeit.module.iot.vo.DeviceProperties;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.vo.platform.device.PlatChildDeviceListVO;
import com.makeit.vo.platform.device.PlatDeviceListVO;
......@@ -105,4 +106,10 @@ public interface PlatDeviceService extends IService<PlatDevice> {
* 故障处理
*/
void handleFault(DeviceInfo deviceInfo);
/**
* 通过RTM向设备发送点对点消息(告知设备加入RTC频道)
*/
void deviceJoinChannel(RTMSendPeerMessageDTO dto);
}
......@@ -64,8 +64,10 @@ import com.makeit.service.platform.space.PlatSpaceService;
import com.makeit.service.saas.PlatTenantService;
import com.makeit.service.saas.SaasPidManageService;
import com.makeit.shengwang.agora.dto.PlatCallingDeviceDTO;
import com.makeit.shengwang.agora.dto.RTMSendPeerMessageDTO;
import com.makeit.shengwang.agora.http.ShengwangHttpUtil;
import com.makeit.shengwang.agora.media.RtcTokenBuilder2;
import com.makeit.shengwang.agora.rtm.RtmInstance;
import com.makeit.shengwang.agora.service.ShengwangService;
import com.makeit.shengwang.agora.vo.PlatAlarmCallDeviceVO;
import com.makeit.utils.DeviceCacheUtil;
......@@ -1339,4 +1341,23 @@ public class PlatDeviceServiceImpl extends ServiceImpl<PlatDeviceMapper, PlatDev
}
}
@Override
public void deviceJoinChannel(RTMSendPeerMessageDTO dto) {
PlatDevice platDevice = getOne(new QueryWrapper<PlatDevice>().lambda()
.eq(PlatDevice::getOriDeviceId, dto.getDeviceId())
.last("limit 1"));
if (!com.makeit.module.iot.enums.DeviceState.online.getValue().equals(platDevice.getStatus())) {
throw new RuntimeException("设备:" + dto.getDeviceId() + "未在线");
}
Map<String, Object> params = new HashMap<>();
params.put("from", 2); // 1.web 2.uniapp 3.server 4.device
params.put("type", 1); // 1.进入 2.挂断
Map<String, Object> data = new HashMap<>();
data.put("sn", dto.getDeviceId());
data.put("token", dto.getRtcToken());
data.put("channel", dto.getChannel());
params.put("data", data);
RtmInstance.sendPeerMessage(dto.getRtmToken(), dto.getUserId(), dto.getDeviceId(), JSONObject.toJSONString(params));
}
}
package com.makeit.shengwang;
import com.alibaba.fastjson.JSONObject;
import com.makeit.common.response.ApiResponseEntity;
import com.makeit.common.response.ApiResponseUtils;
import com.makeit.config.ShengwangProperties;
import com.makeit.enums.redis.RedisConst;
import com.makeit.global.annotation.AuthIgnore;
import com.makeit.global.aspect.tenant.TenantIdIgnore;
import com.makeit.shengwang.agora.dto.WebhookDTO;
import com.makeit.shengwang.agora.service.ShengwangService;
import com.makeit.shengwang.agora.signature.HmacSha;
import com.makeit.utils.redis.RedisUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@Api(tags = "声网")
@RestController
@RequestMapping("/shengwang")
public class ShengWangController {
@Autowired
private ShengwangService shengwangService;
@Autowired
private ShengwangProperties shengwangProperties;
@Autowired
private ThreadPoolTaskExecutor taskExecutor;
@ApiOperation("事件回调")
@AuthIgnore
@TenantIdIgnore
@PostMapping("/notify")
public ApiResponseEntity<Void> notifyCallback(@RequestHeader("Agora-Signature-V2") String signature,
@RequestBody String message) {
log.info("声网RTC频道事件回调:signature:{}, message:{}", signature, message);
WebhookDTO webhookDTO = JSONObject.parseObject(message, WebhookDTO.class);
boolean absent = RedisUtil.setIfAbsent(RedisConst.RTC_EVENT_CALLBACK_PREFIX + webhookDTO.getNoticeId(), 1, 1, TimeUnit.MINUTES);
if (!absent) {
return ApiResponseUtils.success();
}
String hmacSha256 = HmacSha.hmacSha256(message, shengwangProperties.getCallbackSecret());
boolean equals = hmacSha256.equals(signature);
log.info("声网RTC频道事件回调签名校验结果:{}", equals);
if (!equals) {
return ApiResponseUtils.success();
}
JSONObject payload = webhookDTO.getPayload();
String channelName = payload.getString("channelName");
String uid = payload.getString("uid");
Integer platform = payload.getInteger("platform"); // 6:linux
Integer clientType = payload.getInteger("clientType"); // 8:小程序
// 测试事件
if ("test_webhook".equals(channelName) && "12121212".equals(uid)) {
return ApiResponseUtils.success();
}
// 主播离开频道 如果是小程序直接解散
if (webhookDTO.getEventType().equals(104) && platform.equals(6) && clientType.equals(8)) {
CompletableFuture.runAsync(() -> {
shengwangService.disbandChannel(channelName);
}, taskExecutor);
}
return ApiResponseUtils.success();
}
}
......@@ -115,7 +115,7 @@ iot:
clientId: fyxmb5h52iKwE2Hi
secureKey: 22fZbnH36wdHn7ZTyKKHraFw233npcez
sync:
enable: true
enable: false
mqtt:
username: admin|1693982115969
......@@ -166,6 +166,7 @@ shengwang:
customerKey: b3b5f44e536a4fc191358926c6716b7b
customerSecret: bd81828a133140a58dfb04e9d80eba43
pid: 9851781E9E31453DA3C572A4A4AF9402
callbackSecret: bv5SbvvhJ
aliyun:
......
......@@ -165,6 +165,7 @@ shengwang:
customerKey: b3b5f44e536a4fc191358926c6716b7b
customerSecret: bd81828a133140a58dfb04e9d80eba43
pid: 9851781E9E31453DA3C572A4A4AF9402
callbackSecret: bv5SbvvhJ
aliyun:
oss:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment