Commit 51a3266d by huangjy

feat,平台订阅设备消息

parent bbcde6d1
......@@ -84,6 +84,16 @@
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>dysmsapi20170525</artifactId>
</dependency>
......
package com.makeit.module.iot.mqtt;
import lombok.Data;
/**
* 接收设备的数据实体类
*/
@Data
public class DeviceInfo {
private String deviceId;
private Long timestamp;
private String messageType;
private HeaderInfo headers;
private String properties;
/**
* {
* "deviceId": "1694547143952007168",
* "headers": {
* "bindings": [{
* "id": "1694915814306185216",
* "type": "org"
* }],
* "deviceName": "设备测试",
* "productName": "测试产品2",
* "productId": "1694546594502377472",
* "_uid": "Iopp3Ep8avkvFYCekKxrwmhDCdJ3y4YO",
* "creatorId": "1199596756811550720",
* "traceparent": "00-c141385bf0a33bd88f7b50e41c2a7d03-76e248a7fa43f731-01"* },
* "messageType": "REPORT_PROPERTY",
* "properties": {
* "test": 555
* } ,
* "timestamp": 1693993159292
* }
*/
}
package com.makeit.module.iot.mqtt;
import lombok.Data;
import java.util.List;
@Data
public class HeaderInfo {
private String deviceName;
private String productName;
private String productId;
private String _uid;
private String creatorId;
private String traceparent;
private List<Bind> bindings;
@Data
public class Bind {
private String id;
private String type;
}
}
package com.makeit.module.iot.mqtt;
import com.makeit.module.iot.service.IotTokenService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
@Autowired
private IotTokenService iotTokenService;
private String username;
private String password;
private String hostUrl;
private String clientId;
private String defaultTopic;
private int timeout;
private int keepalive;
public MqttConfig() {
super();
}
public MqttConfig(MqttPushClient mqttPushClient, String username, String password, String hostUrl, String clientId,
String defaultTopic, int timeout, int keepalive) {
super();
this.mqttPushClient = mqttPushClient;
this.username = username;
this.password = password;
this.hostUrl = hostUrl;
this.clientId = clientId;
this.defaultTopic = defaultTopic;
this.timeout = timeout;
this.keepalive = keepalive;
}
@Bean
public MqttPushClient getMqttPushClient() {
String iotToken = iotTokenService.getIotToken();
clientId = StringUtils.isNotEmpty(iotToken) ? iotToken : clientId;
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
// 订阅主题
mqttPushClient.subscribe("/device/*/*/**", 0);
return mqttPushClient;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getHostUrl() {
return hostUrl;
}
public void setHostUrl(String hostUrl) {
this.hostUrl = hostUrl;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getDefaultTopic() {
return defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getKeepalive() {
return keepalive;
}
public void setKeepalive(int keepalive) {
this.keepalive = keepalive;
}
public void setMqttPushClient(MqttPushClient mqttPushClient) {
this.mqttPushClient = mqttPushClient;
}
}
package com.makeit.module.iot.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient mqttClient;
public static MqttClient getMqttClient() {
return mqttClient;
}
public static void setMqttClient(MqttClient mqttClient) {
MqttPushClient.mqttClient = mqttClient;
}
/**
* 客户端连接
*
* @param host
* @param clientId
* @param username
* @param password
* @param timeout
* @param keepalive
*/
public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
MqttClient client;
try {
client = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setMqttClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 发布
*
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic = MqttPushClient.getMqttClient().getTopic(topic);
if (mqttTopic == null) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mqttTopic.publish(message);
token.waitForCompletion();
} catch (Exception e) {
e.printStackTrace();
}
}
public void subscribe(String defaultTopic, int qos) {
logger.info("开始订阅主题" + defaultTopic);
try {
MqttPushClient.getMqttClient().subscribe(defaultTopic, qos);
} catch (MqttException e) {
e.printStackTrace();
logger.error(e.getMessage());
}
}
}
package com.makeit.module.iot.mqtt;
import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private MqttConfig mqttConfig;
private static MqttClient client;
@Override
public void connectionLost(Throwable cause) {
logger.info("连接断开,可以重连");
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// 收到消息并设置返回字符串格式
String payload = new String(message.getPayload(), "UTF-8");
logger.info("接收消息主题:{}, 接收消息QoS:{}", topic, message.getQos());
logger.info("接收消息内容:payload格式:{}", payload);
// 解析数据
DeviceInfo device = JSON.parseObject(payload, DeviceInfo.class);
// todo
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
logger.info("deliveryComplete--------------" + token.isComplete());
}
}
package com.makeit.module.iot.service;
import com.makeit.module.iot.util.HttpRequest;
import com.makeit.module.iot.vo.ResponseMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* iot 获取token服务类
*/
@Component
@Slf4j
public class IotTokenService extends IotCommonService {
/**
* 获取iotToken
*/
public String getIotToken() {
String url = iotUrl + "token";
String body = "{\"expires\": -1}";
HttpRequest request = buildRequest(url, body);
try {
ResponseMessage responseMessage = sendPost(url, request);
if (responseMessage.getStatus() == 200) {
return responseMessage.getResult().toString();
}
log.error("获取token接口失败:{}", responseMessage.getMessage());
} catch (IOException e) {
log.error("调用:{}接口异常:{}", url, e.getMessage());
}
return "";
}
}
......@@ -17,7 +17,9 @@ public class DeviceInstanceEntity {
private String name;
@Schema(description = "设备类型")
@Schema(description = "设备类型 device(\"直连设备\"),\n" +
" childrenDevice(\"网关子设备\"),\n" +
" gateway(\"网关设备\")")
private String deviceType;
@Schema(description = "说明")
......
......@@ -91,4 +91,13 @@ libreOffice: C:\\Program Files\\LibreOffice\\program\\soffice
iot:
url: http://iot.meiqicloud.com/api/
clientId: fyxmb5h52iKwE2Hi
secureKey: 22fZbnH36wdHn7ZTyKKHraFw233npcez
\ No newline at end of file
secureKey: 22fZbnH36wdHn7ZTyKKHraFw233npcez
mqtt:
username: admin|1693982115969
password: 8e3795ef7b5e95869fa8c323b865b3a9
hostUrl: tcp://124.71.33.17:11883
clientId: ab3a2fd694c8c838aba2686df3a80e7b
defaultTopic:
timeout: 10
keepalive: 60
\ No newline at end of file
......@@ -3,6 +3,7 @@ package com.makeit.iotapi;
import com.makeit.module.admin.vo.plat.PlatTenantVO;
import com.makeit.module.iot.service.IotOrgService;
import com.makeit.module.iot.service.IotProductDeviceService;
import com.makeit.module.iot.service.IotTokenService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
......@@ -11,6 +12,8 @@ import org.springframework.boot.test.context.SpringBootTest;
public class IotTest {
@Autowired
private IotTokenService tokenService;
@Autowired
private IotOrgService iotOrgService;
@Autowired
private IotProductDeviceService iotProductDeviceService;
......@@ -25,6 +28,11 @@ public class IotTest {
@Test
void getIotToken() {
tokenService.getIotToken();
}
@Test
void getEquipmentInfo() {
iotProductDeviceService.getEquipmentInfo();
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment