Commit c5305438 by huangjy

mqtt协议包,功能调用toic

parent 13f928ae
...@@ -14,6 +14,8 @@ import org.jetlinks.core.message.DisconnectDeviceMessage; ...@@ -14,6 +14,8 @@ import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message; import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*; import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage; import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import org.jetlinks.core.message.property.ReadPropertyMessage; import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage; import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.entity.*; import org.jetlinks.protocol.official.entity.*;
...@@ -22,6 +24,7 @@ import reactor.core.publisher.Mono; ...@@ -22,6 +24,7 @@ import reactor.core.publisher.Mono;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
...@@ -176,6 +179,30 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { ...@@ -176,6 +179,30 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
customUpgradeFirmwareMessage.setMessageId(deviceMessage.getMessageId()); customUpgradeFirmwareMessage.setMessageId(deviceMessage.getMessageId());
customUpgradeFirmwareMessage.setDeviceId(deviceMessage.getDeviceId()); customUpgradeFirmwareMessage.setDeviceId(deviceMessage.getDeviceId());
convertResult = TopicMessageCodec.encode(mapper, customUpgradeFirmwareMessage); convertResult = TopicMessageCodec.encode(mapper, customUpgradeFirmwareMessage);
}else if (deviceMessage instanceof FunctionInvokeMessage) {
CustomFunctionInvokeMessage functionInvokeMessage = new CustomFunctionInvokeMessage();
CustomFunctionInvokeMessage.Header header = new CustomFunctionInvokeMessage.Header();
header.setTimestamp(deviceMessage.getTimestamp());
header.setVersion(1);
header.setUuid(deviceMessage.getMessageId());
functionInvokeMessage.setHeader(header);
CustomFunctionInvokeMessage.PayLoad payLoad = new CustomFunctionInvokeMessage.PayLoad();
CustomFunctionInvokeMessage.Inputs inputs = new CustomFunctionInvokeMessage.Inputs();
CustomFunctionInvokeMessage.Value value = new CustomFunctionInvokeMessage.Value();
List<FunctionParameter> functionParameterList = ((FunctionInvokeMessage) deviceMessage).getInputs();
if (functionParameterList != null && !functionParameterList.isEmpty()) {
FunctionParameter functionParameter = functionParameterList.get(0);
inputs.setName(functionParameter.getName());
inputs.setValue(functionParameter.getValue());
}
payLoad.setFunctionId(((FunctionInvokeMessage) deviceMessage).getFunctionId());
payLoad.setInputs(inputs);
functionInvokeMessage.setPayload(payLoad);
functionInvokeMessage.setTimestamp(deviceMessage.getTimestamp());
functionInvokeMessage.setMessageId(deviceMessage.getMessageId());
functionInvokeMessage.setDeviceId(((DeviceMessage) message).getDeviceId());
convertResult = TopicMessageCodec.encode(mapper, functionInvokeMessage);
}else { }else {
convertResult = TopicMessageCodec.encode(mapper, deviceMessage); convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
} }
...@@ -232,7 +259,6 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { ...@@ -232,7 +259,6 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
} else { } else {
propertiesMap.put(key, value); propertiesMap.put(key, value);
} }
} }
map.put("properties", propertiesMap); map.put("properties", propertiesMap);
map.put("success",true); map.put("success",true);
...@@ -260,17 +286,20 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec { ...@@ -260,17 +286,20 @@ public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
String properties = customMessage.getPayload().getProperties(); String properties = customMessage.getPayload().getProperties();
map.put("properties", JSON.parseObject(properties)); map.put("properties", JSON.parseObject(properties));
result = JSON.toJSONString(map); result = JSON.toJSONString(map);
} else if (message.getTopic().contains("function/invoke")) {
CustomFunctionMessage customFunctionMessage = JSON.parseObject(body, CustomFunctionMessage.class);
CustomFunctionMessage.PayLoad payload = customFunctionMessage.getPayload();
map.put("messageId",customFunctionMessage.getHeader().getUuid());
map.put("timestamp",System.currentTimeMillis());
if (payload.getOutput().getRetCode() == 1) {
map.put("output",payload.getOutput().getValue());
map.put("success",true);
}
result = JSON.toJSONString(map);
} else { } else {
result = body; result = body;
} }
byte[] bytes; byte[] payload = result.getBytes(StandardCharsets.UTF_8);
try {
bytes = result.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
byte[] payload = bytes;
return TopicMessageCodec return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload) .decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
//如果不能直接解码,可能是其他设备功能 //如果不能直接解码,可能是其他设备功能
......
...@@ -13,6 +13,7 @@ import org.jetlinks.core.message.state.DeviceStateCheckMessage; ...@@ -13,6 +13,7 @@ import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply; import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute; import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils; import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.protocol.official.entity.CustomFunctionInvokeMessage;
import org.jetlinks.protocol.official.entity.CustomReadMessage; import org.jetlinks.protocol.official.entity.CustomReadMessage;
import org.jetlinks.protocol.official.entity.CustomUpgradeFirmwareMessage; import org.jetlinks.protocol.official.entity.CustomUpgradeFirmwareMessage;
import org.jetlinks.protocol.official.entity.CustomWriteMessage; import org.jetlinks.protocol.official.entity.CustomWriteMessage;
...@@ -106,7 +107,7 @@ public enum TopicMessageCodec { ...@@ -106,7 +107,7 @@ public enum TopicMessageCodec {
//调用功能 //调用功能
functionInvoke("/*/function/invoke", functionInvoke("/*/function/invoke",
FunctionInvokeMessage.class, CustomFunctionInvokeMessage.class,
route -> route route -> route
.upstream(false) .upstream(false)
.downstream(true) .downstream(true)
......
package org.jetlinks.protocol.official.entity;
import lombok.Data;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
@Data
public class CustomFunctionInvokeMessage extends FunctionInvokeMessage {
private CustomFunctionInvokeMessage.Header header;
private CustomFunctionInvokeMessage.PayLoad payload;
@Data
public static class Header {
private int version;
private String uuid;
private Long timestamp;
}
@Data
public static class PayLoad {
private String functionId;
private Inputs inputs;
}
@Data
public static class Inputs {
private String name;
private Object value;
}
@Data
public static class Value {
private String url;
private Long timestamp;
}
}
package org.jetlinks.protocol.official.entity;
import lombok.Data;
@Data
public class CustomFunctionMessage {
private Header header;
private PayLoad payload;
@Data
public static class Header {
private String version;
private String uuid;
private Long timestamp;
}
@Data
public static class PayLoad {
private Output output;
}
@Data
public static class Output {
private Integer retCode;
private String value;
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment