Commit 8a23b819 by huangjy

mqtt协议包,新增远程升级

parent cb92d1d0
......@@ -2,6 +2,7 @@ package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
......@@ -12,12 +13,10 @@ import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.entity.CustomMessage;
import org.jetlinks.protocol.official.entity.CustomReadMessage;
import org.jetlinks.protocol.official.entity.CustomWriteMessage;
import org.jetlinks.protocol.official.entity.UserServerInfo;
import org.jetlinks.protocol.official.entity.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
......@@ -127,6 +126,7 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
CustomWriteMessage.PayLoad payLoad = new CustomWriteMessage.PayLoad();
payLoad.setDeviceId(deviceMessage.getDeviceId());
Map<String, Object> properties = ((WritePropertyMessage) deviceMessage).getProperties();
Object result = properties.get("usrServerInfo");
if (result != null) {
......@@ -137,12 +137,47 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
} else {
payLoad.setProperties(properties);
}
customSendMessage.setPayload(payLoad);
customSendMessage.setTimestamp(deviceMessage.getTimestamp());
customSendMessage.setMessageId(deviceMessage.getMessageId());
mapper.configure(JsonGenerator.Feature.ESCAPE_NON_ASCII, true);
convertResult = TopicMessageCodec.encode(mapper, customSendMessage);
} else {
} else if (deviceMessage instanceof UpgradeFirmwareMessage) {
CustomUpgradeFirmwareMessage customUpgradeFirmwareMessage = new CustomUpgradeFirmwareMessage();
CustomUpgradeFirmwareMessage.Header header = new CustomUpgradeFirmwareMessage.Header();
header.setTimestamp(deviceMessage.getTimestamp());
header.setVersion(1);
header.setUuid(deviceMessage.getMessageId());
customUpgradeFirmwareMessage.setHeader(header);
CustomUpgradeFirmwareMessage.PayLoad payLoad = new CustomUpgradeFirmwareMessage.PayLoad();
String url = ((UpgradeFirmwareMessage) deviceMessage).getUrl();
payLoad.setUrl(url);
Map<String, Object> parameters = ((UpgradeFirmwareMessage) deviceMessage).getParameters();
payLoad.setParameters(parameters);
String signMethod = ((UpgradeFirmwareMessage) deviceMessage).getSignMethod();
payLoad.setSignMethod(signMethod);
long size = ((UpgradeFirmwareMessage) deviceMessage).getSize();
payLoad.setSize(size);
String sign = ((UpgradeFirmwareMessage) deviceMessage).getSign();
payLoad.setSign(sign);
String version = ((UpgradeFirmwareMessage) deviceMessage).getVersion();
payLoad.setFirmwareVersion(version);
payLoad.setFirmwareId(((UpgradeFirmwareMessage) deviceMessage).getFirmwareId());
customUpgradeFirmwareMessage.setPayload(payLoad);
customUpgradeFirmwareMessage.setMessageId(deviceMessage.getMessageId());
customUpgradeFirmwareMessage.setDeviceId(deviceMessage.getDeviceId());
convertResult = TopicMessageCodec.encode(mapper, customUpgradeFirmwareMessage);
}else {
convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
}
......@@ -182,11 +217,11 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
CustomMessage customMessage = JSON.parseObject(body, CustomMessage.class);
String properties = customMessage.getPayload().getProperties();
Map<String,Object> map = new HashMap<>();
String result;
Map<String,Object> map = new HashMap<>();
if (message.getTopic().contains("read") || message.getTopic().contains("write")) {
CustomMessage customMessage = JSON.parseObject(body, CustomMessage.class);
String properties = customMessage.getPayload().getProperties();
String messageId = customMessage.getHeader().getUuid();
JSONObject jsonObject = JSON.parseObject(properties);
Set<String> keySet = jsonObject.keySet();
......@@ -199,9 +234,29 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
map.put("messageId",messageId);
map.put("timestamp",System.currentTimeMillis());
result = JSON.toJSONString(map);
} else {
} else if (message.getTopic().contains("firmware/upgrade")) {
CustomFirmwareMessage customFirmwareMessage = JSON.parseObject(body, CustomFirmwareMessage.class);
CustomFirmwareMessage.PayLoad payload = customFirmwareMessage.getPayload();
if (message.getTopic().contains("firmware/upgrade/reply")) {
map.put("success",true);
map.put("messageId",customFirmwareMessage.getHeader().getUuid());
} else {
map.put("success",payload.getRetCode() == 0);
map.put("progress",payload.getProgress());
map.put("complete",payload.getProgress() == 100);
map.put("errorReason",payload.getValue());
map.put("version",payload.getFirmwareVersion());
map.put("firmwareId",payload.getFirmwareId());
}
map.put("timestamp",System.currentTimeMillis());
result = JSON.toJSONString(map);
} else if (message.getTopic().contains("properties/report")){
CustomMessage customMessage = JSON.parseObject(body, CustomMessage.class);
String properties = customMessage.getPayload().getProperties();
map.put("properties", JSON.parseObject(properties));
result = JSON.toJSONString(map);
} else {
result = body;
}
byte[] bytes;
try {
......
......@@ -14,6 +14,7 @@ import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.protocol.official.entity.CustomReadMessage;
import org.jetlinks.protocol.official.entity.CustomUpgradeFirmwareMessage;
import org.jetlinks.protocol.official.entity.CustomWriteMessage;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
......@@ -28,58 +29,58 @@ import java.util.function.Function;
public enum TopicMessageCodec {
//上报属性数据
reportProperty("/*/properties/report",
ReportPropertyMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("属性上报")
.description("上报物模型属性数据")
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
ReportPropertyMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("属性上报")
.description("上报物模型属性数据")
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
//读取属性
readProperty("/*/properties/read",
CustomReadMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("读取属性")
.description("平台下发读取物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")),
route -> route
.upstream(false)
.downstream(true)
.group("读取属性")
.description("平台下发读取物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")),
//读取属性回复
readPropertyReply("/*/properties/read/reply",
ReadPropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("读取属性")
.description("对平台下发的读取属性指令进行响应")
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
ReadPropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("读取属性")
.description("对平台下发的读取属性指令进行响应")
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性
writeProperty("/*/properties/write",
CustomWriteMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("修改属性")
.description("平台下发修改物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
route -> route
.upstream(false)
.downstream(true)
.group("修改属性")
.description("平台下发修改物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性回复
writePropertyReply("/*/properties/write/reply",
WritePropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("修改属性")
.description("对平台下发的修改属性指令进行响应")
.example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
WritePropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("修改属性")
.description("对平台下发的修改属性指令进行响应")
.example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//事件上报
event("/*/event/*",
EventMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("事件上报")
.description("上报物模型事件数据")
.example("{\"data\":{\"key\":\"value\"}}")) {
EventMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("事件上报")
.description("上报物模型事件数据")
.example("{\"data\":{\"key\":\"value\"}}")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{eventId:事件ID}";
......@@ -90,9 +91,9 @@ public enum TopicMessageCodec {
String event = topic[topic.length - 1];
return Mono.from(super.doDecode(mapper, topic, payload))
.cast(EventMessage.class)
.doOnNext(e -> e.setEvent(event))
.cast(DeviceMessage.class);
.cast(EventMessage.class)
.doOnNext(e -> e.setEvent(event))
.cast(DeviceMessage.class);
}
@Override
......@@ -105,33 +106,33 @@ public enum TopicMessageCodec {
//调用功能
functionInvoke("/*/function/invoke",
FunctionInvokeMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("调用功能")
.description("平台下发功能调用指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\"," +
"\"functionId\":\"功能标识\"," +
"\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")),
FunctionInvokeMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("调用功能")
.description("平台下发功能调用指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\"," +
"\"functionId\":\"功能标识\"," +
"\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")),
//调用功能回复
functionInvokeReply("/*/function/invoke/reply",
FunctionInvokeMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("调用功能")
.description("设备响应平台下发的功能调用指令")
.example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," +
"\"output\":\"输出结果,格式与物模型中定义的类型一致\"")),
FunctionInvokeMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("调用功能")
.description("设备响应平台下发的功能调用指令")
.example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," +
"\"output\":\"输出结果,格式与物模型中定义的类型一致\"")),
//子设备消息
child("/*/child/*/**",
ChildDeviceMessage.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关上报或者平台下发子设备消息")) {
ChildDeviceMessage.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关上报或者平台下发子设备消息")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
......@@ -174,12 +175,12 @@ public enum TopicMessageCodec {
}
}, //子设备消息回复
childReply("/*/child-reply/*/**",
ChildDeviceMessageReply.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关回复平台下发给子设备的指令结果")) {
ChildDeviceMessageReply.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关回复平台下发给子设备的指令结果")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
......@@ -223,18 +224,18 @@ public enum TopicMessageCodec {
},
//更新标签
updateTag("/*/tags",
UpdateTagMessage.class,
route -> route.upstream(true)
.downstream(false)
.group("更新标签")
.description("更新标签数据")
.example("{\"tags\":{\"key\",\"value\"}}")),
UpdateTagMessage.class,
route -> route.upstream(true)
.downstream(false)
.group("更新标签")
.description("更新标签数据")
.example("{\"tags\":{\"key\",\"value\"}}")),
//注册
register("/*/register", DeviceRegisterMessage.class),
//注销
unregister("/*/unregister", DeviceUnRegisterMessage.class),
//更新固件消息
upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
upgradeFirmware("/*/firmware/upgrade", CustomUpgradeFirmwareMessage.class),
//更新固件消息回复
upgradeFirmwareReply("/*/firmware/upgrade/reply", UpgradeFirmwareMessageReply.class),
//更新固件升级进度消息
......@@ -376,6 +377,7 @@ public enum TopicMessageCodec {
return TopicPayload.of(String.join("/", topics), mapper.writeValueAsBytes(message));
}
@SneakyThrows
TopicPayload doEncode(ObjectMapper mapper, DeviceMessage message) {
String[] topics = Arrays.copyOf(pattern, pattern.length);
......
package org.jetlinks.protocol.official.entity;
import lombok.Data;
@Data
public class CustomFirmwareMessage {
private Header header;
private PayLoad payload;
private boolean success;
private String messageId;
private Long timestamp;
@Data
public class Header {
private String version;
private String uuid;
private Long timestamp;
}
@Data
public class PayLoad {
private Integer progress;
private Integer retCode;
private String firmwareVersion;
private String firmwareId;
private String value;
}
@Data
public class DeviceAttrInfo {
private String key;
private String value;
}
}
package org.jetlinks.protocol.official.entity;
import lombok.Data;
import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import java.util.Map;
@Data
public class CustomUpgradeFirmwareMessage extends UpgradeFirmwareMessage {
private Header header;
private PayLoad payload;
@Data
public static class Header {
private int version;
private String uuid;
private Long timestamp;
}
@Data
public static class PayLoad {
private String url;
private String firmwareVersion;
private String sign;
private String signMethod;
private String firmwareId;
private long size;
private Map<String, Object> parameters;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment