Commit d6b5cd43 by huangjy

mqtt协议包修改

parent 4f1c0825
...@@ -3,6 +3,7 @@ package org.jetlinks.protocol.official; ...@@ -3,6 +3,7 @@ package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey; import org.jetlinks.core.device.DeviceConfigKey;
...@@ -10,12 +11,20 @@ import org.jetlinks.core.message.DeviceMessage; ...@@ -10,12 +11,20 @@ import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage; import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message; import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.server.session.DeviceSession; import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.entity.CustomMessage;
import org.jetlinks.protocol.official.entity.CustomReadMessage;
import org.jetlinks.protocol.official.entity.CustomWriteMessage;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
/** /**
* <pre> * <pre>
...@@ -86,11 +95,49 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { ...@@ -86,11 +95,49 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
if (message instanceof DeviceMessage) { if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message); DeviceMessage deviceMessage = ((DeviceMessage) message);
TopicPayload convertResult;
if (deviceMessage instanceof ReadPropertyMessage) {
CustomReadMessage customSendMessage = new CustomReadMessage();
CustomReadMessage.Header header = new CustomReadMessage.Header();
header.setTimestamp(deviceMessage.getTimestamp());
header.setVersion(deviceMessage.getHeaders().get("_uid").toString());
customSendMessage.setHeader(header);
customSendMessage.setDeviceId(((DeviceMessage) message).getDeviceId());
CustomReadMessage.PayLoad payLoad = new CustomReadMessage.PayLoad();
payLoad.setDeviceId(deviceMessage.getDeviceId());
payLoad.setProperties(((ReadPropertyMessage) deviceMessage).getProperties());
customSendMessage.setPayload(payLoad);
customSendMessage.setTimestamp(deviceMessage.getTimestamp());
customSendMessage.setMessageId(deviceMessage.getMessageId());
convertResult = TopicMessageCodec.encode(mapper, customSendMessage);
}else if (deviceMessage instanceof WritePropertyMessage) {
CustomWriteMessage customSendMessage = new CustomWriteMessage();
CustomWriteMessage.Header header = new CustomWriteMessage.Header();
header.setTimestamp(deviceMessage.getTimestamp());
header.setVersion(deviceMessage.getHeaders().get("_uid").toString());
customSendMessage.setHeader(header);
customSendMessage.setDeviceId(((DeviceMessage) message).getDeviceId());
CustomWriteMessage.PayLoad payLoad = new CustomWriteMessage.PayLoad();
payLoad.setDeviceId(deviceMessage.getDeviceId());
payLoad.setProperties(((WritePropertyMessage) deviceMessage).getProperties());
customSendMessage.setPayload(payLoad);
customSendMessage.setTimestamp(deviceMessage.getTimestamp());
customSendMessage.setMessageId(deviceMessage.getMessageId());
convertResult = TopicMessageCodec.encode(mapper, customSendMessage);
} else {
convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
}
TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
if (convertResult == null) { if (convertResult == null) {
return Mono.empty(); return Mono.empty();
} }
TopicPayload finalConvertResult = convertResult;
return Mono return Mono
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf)) .justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId()) .switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
...@@ -100,9 +147,9 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { ...@@ -100,9 +147,9 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
.map(productId -> SimpleMqttMessage .map(productId -> SimpleMqttMessage
.builder() .builder()
.clientId(deviceMessage.getDeviceId()) .clientId(deviceMessage.getDeviceId())
.topic("/".concat(productId).concat(convertResult.getTopic())) .topic("/".concat(productId).concat(finalConvertResult.getTopic()))
.payloadType(MessagePayloadType.JSON) .payloadType(MessagePayloadType.JSON)
.payload(Unpooled.wrappedBuffer(convertResult.getPayload())) .payload(Unpooled.wrappedBuffer(finalConvertResult.getPayload()))
.build()); .build());
} else { } else {
return Mono.empty(); return Mono.empty();
...@@ -114,7 +161,43 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { ...@@ -114,7 +161,43 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
@Override @Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) { public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
MqttMessage message = (MqttMessage) context.getMessage(); MqttMessage message = (MqttMessage) context.getMessage();
byte[] payload = message.payloadAsBytes(); ByteBuf buf = message.getPayload();
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
String body;
try {
body = new String(req,"UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
CustomMessage customMessage = JSON.parseObject(body, CustomMessage.class);
String properties = customMessage.getPayload().getProperties();
Map<String,Object> map = new HashMap<>();
String result;
if (message.getTopic().contains("read") || message.getTopic().contains("write")) {
String messageId = customMessage.getHeader().getUuid();
JSONObject jsonObject = JSON.parseObject(properties);
Set<String> keySet = jsonObject.keySet();
Map<String,Object> propertiesMap = new HashMap<>();
for (String key : keySet) {
propertiesMap.put(key, ((JSONObject) jsonObject.get(key)).get("value"));
}
map.put("properties", propertiesMap);
map.put("success",true);
map.put("messageId",messageId);
map.put("timestamp",System.currentTimeMillis());
result = JSON.toJSONString(map);
} else {
map.put("properties", JSON.parseObject(properties));
result = JSON.toJSONString(map);
}
byte[] bytes;
try {
bytes = result.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
byte[] payload = bytes;
return TopicMessageCodec return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload) .decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
......
...@@ -13,6 +13,8 @@ import org.jetlinks.core.message.state.DeviceStateCheckMessage; ...@@ -13,6 +13,8 @@ import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply; import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute; import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils; import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.protocol.official.entity.CustomReadMessage;
import org.jetlinks.protocol.official.entity.CustomWriteMessage;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
...@@ -35,7 +37,7 @@ public enum TopicMessageCodec { ...@@ -35,7 +37,7 @@ public enum TopicMessageCodec {
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")), .example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
//读取属性 //读取属性
readProperty("/*/properties/read", readProperty("/*/properties/read",
ReadPropertyMessage.class, CustomReadMessage.class,
route -> route route -> route
.upstream(false) .upstream(false)
.downstream(true) .downstream(true)
...@@ -53,7 +55,7 @@ public enum TopicMessageCodec { ...@@ -53,7 +55,7 @@ public enum TopicMessageCodec {
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")), .example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性 //修改属性
writeProperty("/*/properties/write", writeProperty("/*/properties/write",
WritePropertyMessage.class, CustomWriteMessage.class,
route -> route route -> route
.upstream(false) .upstream(false)
.downstream(true) .downstream(true)
......
package org.jetlinks.protocol.official.entity;
import lombok.Data;
@Data
public class CustomMessage {
private Header header;
private PayLoad payload;
private boolean success;
private String messageId;
private Long timestamp;
@Data
public class Header {
private String version;
private String uuid;
private Long timestamp;
}
@Data
public class PayLoad {
private String deviceId;
private String properties;
}
@Data
public class DeviceAttrInfo {
private String key;
private String value;
}
}
package org.jetlinks.protocol.official.entity;
import lombok.Data;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import java.util.List;
@Data
public class CustomReadMessage extends ReadPropertyMessage {
private Header header;
private PayLoad payload;
private String messageId;
private long timestamp;
@Data
public static class Header {
private String version;
private String uuid;
private Long timestamp;
}
@Data
public static class PayLoad {
private String deviceId;
private List<String> properties;
}
}
package org.jetlinks.protocol.official.entity;
import lombok.Data;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import java.util.Map;
@Data
public class CustomWriteMessage extends ReadPropertyMessage {
private Header header;
private PayLoad payload;
private String messageId;
private long timestamp;
@Data
public static class Header {
private String version;
private String uuid;
private Long timestamp;
}
@Data
public static class PayLoad {
private String deviceId;
private Map<String, Object> properties;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment