Commit afde87e9 by huangjy

mqtt协议包初始化

parents
Showing with 4334 additions and 0 deletions
## JetLinks 官方设备接入协议
JetLinks官方实现的设备接入协议,可用于参考实现自定义协议开发.
注意: 本协议仅用于参考自定义协议开发,在实际使用中请根据不同的场景进行调整.如认证方式,加密等.
### MQTT
[查看TOPIC说明](http://doc.jetlinks.cn/dev-guide/jetlinks-protocol-support.html)
用户名密码可以使用[生成工具进行生成](http://doc.jetlinks.cn/basics-guide/mqtt-auth-generator.html)
### HTTP
HTTP接入时需要使用`Bearer`
认证,URL和[MQTT的接入Topic]((http://doc.jetlinks.cn/basics-guide/jetlinks-protocol-support.html))一致.
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
### TCP
报文格式说明:
第0-4字节对应的32位整型值为接下来报文的长度,
后续为报文数据,
具体报文格式见: [二进制格式说明](binary-protocol.md)
创建连接后第一个数据包需要发送[认证包](binary-protocol.md#0x01-online-首次连接),
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
### UDP
报文格式说明:
`0`字节表示认证类型,目前固定为0x00.
`1-n`字节为`密钥信息`,编码使用`STRING`见: [数据类型定义](binary-protocol.md#数据类型)
密钥需要在`产品-设备接入`或者`设备详情`中进行配置
后续为报文数据,具体报文格式见: [二进制格式说明](binary-protocol.md)
UDP无需发送认证包,但是需要每个报文中都包含密钥信息.
除了ACK以外,其他平台下发的指令也都会包含认证密钥信息,用于设备侧校验请求.
### 测试
可以使用[模拟器](http://github.com/jetlinks/device-simulator)进行模拟测试
## 头信息
| 字节序 | 类型 | 字段 |
|:----:|---------|:-----------:|
| 0 | INT8 | 消息类型 |
| 1-8 | INT64 | UTC时间戳 |
| 9-11 | INT16 | 消息序号 |
| 12-n | STRING | 设备ID |
| ... | MESSAGE | 消息类型对应的编码规则 |
## 数据类型
所有数据类型均采用`大端`编码
| Byte | Type | 编码规则 |
|:----:|:-------:|---------------------------------------------|
| 0x00 | NULL | 0x01 |
| 0x01 | BOOLEAN | 1字节 0x00为false 其他为true |
| 0x02 | INT8 | 1字节 (byte) |
| 0x03 | INT16 | 2字节整型 (short) |
| 0x04 | INT32 | 4字节整型 (int) |
| 0x05 | INT64 | 8字节整型 (long) |
| 0x06 | UINT8 | 1字节无符号整型 |
| 0x07 | UINT16 | 2字节无符号整型 |
| 0x08 | UINT32 | 4字节无符号整型 |
| 0x09 | FLOAT | 4字节 IEEE 754浮点数 |
| 0x0a | DOUBLE | 8字节 IEEE 754浮点数 |
| 0x0b | STRING | 前`2字节无符号整型`表示字符串长度,接下来长度的字节为字符串内容,UTF8编码 |
| 0x0c | BINARY | 前`2字节无符号整型`表示数据长度,接下来长度的字节为数据内容 |
| 0x0d | ARRAY | 前`2字节无符号整型`表述数组长度,接下来根据后续报文类型来解析元素 |
| 0x0e | OBJECT | 前`2字节无符号整型`表述对象字段长度,接下来根据后续报文类型来解析key value |
## 消息类型定义
| Byte | Type | 说明 |
|:----:|:-------------------|--------|
| 0x00 | keepalive | 心跳 |
| 0x01 | online | 首次连接 |
| 0x02 | ack | 应答 |
| 0x03 | reportProperty | 上报属性 |
| 0x04 | readProperty | 读取属性 |
| 0x05 | readPropertyReply | 读取属性回复 |
| 0x06 | writeProperty | 修改属性 |
| 0x07 | writePropertyReply | 修改属性回复 |
| 0x08 | function | 功能调用 |
| 0x09 | functionReply | 功能调用回复 |
### 0x00 keepalive 心跳
[ 0x00 ]
### 0x01 online 首次连接
[ 0x01,STRING:密钥信息 ]
### 0x02 ack 应答
[ 0x02,应答码 ]
应答码: 0x00:ok , 0x01: 未认证, 0x02: 不支持.
### 0x03 reportProperty 上报属性
[ 0x03,属性数据:OBJECT类型 ]
### 0x04 readProperty 读取属性
[ 0x04,属性列表:ARRAY类型 ]
### 0x05 readPropertyReply 读取属性回复
读取成功:
[ 0x05,`0x01`,属性数据:OBJECT类型 ]
读取失败:
[ 0x05,`0x00`,错误码:动态类型,错误消息:动态类型 ]
`动态读取`表示类型不确定,根据对应的`数据类型`来定义类型.
如: 无错误信息
[ 0x05,0x00,`0x00`,`0x00` ]
`INT8(0x02)`类型错误码:`0x04`
[ 0x05,0x00,`0x02,0x04`,0x00 ]
TODO 更多消息类型
\ No newline at end of file
No preview for this file type
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-official-protocol</artifactId>
<version>3.0.0</version>
<name>JetLinks</name>
<url>https://jetlinks.org</url>
<inceptionYear>2019</inceptionYear>
<description>JetLinks 物联网平台</description>
<licenses>
<license>
<name>The Apache License, Version 2.0</name>
<url>https://www.apache.org/licenses/LICENSE-2.0.txt</url>
</license>
</licenses>
<developers>
<developer>
<name>zhouhao</name>
<email>i@hsweb.me</email>
<roles>
<role>Owner</role>
</roles>
<timezone>+8</timezone>
<url>https://github.com/zhou-hao</url>
</developer>
</developers>
<scm>
<connection>scm:git:https://github.com/jetlinks/jetlinks-official-protocol.git</connection>
<developerConnection>scm:git:https://github.com/jetlinks/jetlinks-official-protocol.git</developerConnection>
<url>https://github.com/jetlinks/jetlinks-official-protocol</url>
<tag>${project.version}</tag>
</scm>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.build.locales>zh_CN</project.build.locales>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<spring.boot.version>2.2.8.RELEASE</spring.boot.version>
<hsweb.framework.version>4.0.3</hsweb.framework.version>
<hsweb.expands.version>3.0.2</hsweb.expands.version>
<reactor.version>2020.0.6</reactor.version>
</properties>
<profiles>
<profile>
<id>release</id>
<build>
<plugins>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.3</version>
<extensions>true</extensions>
<configuration>
<serverId>sonatype-releases</serverId>
<nexusUrl>https://oss.sonatype.org/</nexusUrl>
<autoReleaseAfterClose>true</autoReleaseAfterClose>
<stagingProgressTimeoutMinutes>120</stagingProgressTimeoutMinutes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
<configuration>
<autoVersionSubmodules>true</autoVersionSubmodules>
<useReleaseProfile>false</useReleaseProfile>
<releaseProfiles>release</releaseProfiles>
<goals>deploy</goals>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>1.5</version>
<executions>
<execution>
<id>sign-artifacts</id>
<phase>verify</phase>
<goals>
<goal>sign</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<version>2.9.1</version>
<configuration>
<additionalparam>-Xdoclint:none</additionalparam>
</configuration>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<distributionManagement>
<repository>
<id>sonatype-releases</id>
<name>sonatype repository</name>
<url>https://oss.sonatype.org/service/local/staging/deploy/maven2</url>
</repository>
<snapshotRepository>
<id>sonatype-snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</snapshotRepository>
</distributionManagement>
</profile>
<profile>
<id>all-in-one</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置主类 -->
<mainClass>org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec</mainClass>
</transformer>
</transformers>
<!-- <artifactSet>-->
<!-- <includes>-->
<!-- &lt;!&ndash; 添加需要打包在一起的第三方依赖信息 &ndash;&gt;-->
<!-- &lt;!&ndash; <include>com.domain:*</include> &ndash;&gt;-->
<!-- </includes>-->
<!-- </artifactSet>-->
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<version>2.4</version>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar-no-fork</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.jetlinks</groupId>
<artifactId>jetlinks-supports</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.eclipse.californium</groupId>
<artifactId>californium-core</artifactId>
<version>3.5.0</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.18</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>4.3.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.fasterxml.jackson</groupId>
<artifactId>jackson-bom</artifactId>
<version>2.11.4</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<distributionManagement>
<repository>
<id>releases</id>
<name>Nexus Release Repository</name>
<url>https://nexus.jetlinks.cn/content/repositories/releases/</url>
</repository>
<snapshotRepository>
<id>snapshots</id>
<name>Nexus Snapshot Repository</name>
<url>https://nexus.jetlinks.cn/content/repositories/snapshots/</url>
</snapshotRepository>
</distributionManagement>
<repositories>
<repository>
<id>hsweb-nexus</id>
<name>Nexus Release Repository</name>
<url>https://nexus.jetlinks.cn/content/groups/public/</url>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
</snapshots>
</repository>
<repository>
<id>aliyun-nexus</id>
<name>aliyun</name>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
</repository>
</repositories>
</project>
package org.jetlinks.protocol.official;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@Slf4j
public abstract class AbstractCoapDeviceMessageCodec implements DeviceMessageCodec {
protected abstract Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response);
protected String getPath(CoapMessage message){
String path = message.getPath();
if (!path.startsWith("/")) {
path = "/" + path;
}
return path;
}
protected String getDeviceId(CoapMessage message){
String deviceId = message.getStringOption(2100).orElse(null);
String[] paths = TopicMessageCodec.removeProductPath(getPath(message));
if (StringUtils.isEmpty(deviceId) && paths.length > 1) {
deviceId = paths[1];
}
return deviceId;
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof CoapExchangeMessage) {
CoapExchangeMessage exchangeMessage = ((CoapExchangeMessage) context.getMessage());
AtomicBoolean alreadyReply = new AtomicBoolean();
Consumer<Object> responseHandler = (resp) -> {
if (alreadyReply.compareAndSet(false, true)) {
if (resp instanceof CoAP.ResponseCode) {
exchangeMessage.getExchange().respond(((CoAP.ResponseCode) resp));
}
if (resp instanceof String) {
exchangeMessage.getExchange().respond(((String) resp));
}
if (resp instanceof byte[]) {
exchangeMessage.getExchange().respond(CoAP.ResponseCode.CONTENT, ((byte[]) resp));
}
}
};
return this
.decode(exchangeMessage, context, responseHandler)
.doOnComplete(() -> responseHandler.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST);
})
.switchIfEmpty(Mono.fromRunnable(() -> responseHandler.accept(CoAP.ResponseCode.BAD_REQUEST)));
}
if (context.getMessage() instanceof CoapMessage) {
return decode(((CoapMessage) context.getMessage()), context, resp -> {
log.info("skip response coap request:{}", resp);
});
}
return Flux.empty();
}
@Nonnull
@Override
public Publisher<? extends EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.empty();
}
}
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.protocol.official.functional.TimeSyncRequest;
import org.jetlinks.protocol.official.functional.TimeSyncResponse;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Optional;
import java.util.function.Function;
/**
* 功能性的topic,不和平台交互
*/
public enum FunctionalTopicHandlers {
//同步时间
timeSync("/*/*/time-sync") {
@SneakyThrows
@SuppressWarnings("all")
Mono<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
TopicPayload topicPayload = new TopicPayload();
topicPayload.setTopic(String.join("/", topic) + "/reply");
TimeSyncRequest msg = mapper.readValue(payload, TimeSyncRequest.class);
TimeSyncResponse response = TimeSyncResponse.of(msg.getMessageId(), System.currentTimeMillis());
topicPayload.setPayload(mapper.writeValueAsBytes(response));
//直接回复给设备
return sender
.apply(topicPayload)
.then(Mono.empty());
}
};
FunctionalTopicHandlers(String topic) {
this.pattern = topic.split("/");
}
private final String[] pattern;
abstract Publisher<DeviceMessage> doHandle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender);
public static Publisher<DeviceMessage> handle(DeviceOperator device,
String[] topic,
byte[] payload,
ObjectMapper mapper,
Function<TopicPayload, Mono<Void>> sender) {
return Mono
.justOrEmpty(fromTopic(topic))
.flatMapMany(handler -> handler.doHandle(device, topic, payload, mapper, sender));
}
static Optional<FunctionalTopicHandlers> fromTopic(String[] topic) {
for (FunctionalTopicHandlers value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
}
package org.jetlinks.protocol.official;
import org.apache.commons.codec.digest.DigestUtils;
import org.jetlinks.core.Value;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.TimeUnit;
public class JetLinksAuthenticator implements Authenticator {
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
return registry
.getDevice(mqtt.getClientId())
.flatMap(device -> authenticate(request, device));
}
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator deviceOperation) {
if (request instanceof MqttAuthenticationRequest) {
MqttAuthenticationRequest mqtt = ((MqttAuthenticationRequest) request);
// secureId|timestamp
String username = mqtt.getUsername();
// md5(secureId|timestamp|secureKey)
String password = mqtt.getPassword();
String requestSecureId;
try {
String[] arr = username.split("[|]");
if (arr.length <= 1) {
return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
}
requestSecureId = arr[0];
long time = Long.parseLong(arr[1]);
//和设备时间差大于5分钟则认为无效
/* if (Math.abs(System.currentTimeMillis() - time) > TimeUnit.MINUTES.toMillis(5)) {
return Mono.just(AuthenticationResponse.error(401, "设备时间不同步"));
}*/
return deviceOperation.getConfigs("secureId", "secureKey")
.map(conf -> {
String secureId = conf.getValue("secureId").map(Value::asString).orElse(null);
String secureKey = conf.getValue("secureKey").map(Value::asString).orElse(null);
//签名
String digest = DigestUtils.md5Hex(username + "|" + secureKey);
if (requestSecureId.equals(secureId) && digest.equals(password)) {
return AuthenticationResponse.success(deviceOperation.getDeviceId());
} else {
return AuthenticationResponse.error(401, "密钥错误");
}
});
} catch (NumberFormatException e) {
return Mono.just(AuthenticationResponse.error(401, "用户名格式错误"));
}
}
return Mono.just(AuthenticationResponse.error(400, "不支持的授权类型:" + request));
}
}
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDTLSDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
private static final DefaultConfigMetadata coapDTLSConfig = new DefaultConfigMetadata(
"CoAP DTLS配置",
"使用CoAP DTLS 进行数据上报需要先进行签名认证获取token.\n" +
"之后上报数据需要在Option中携带token信息. \n" +
"自定义Option: 2110,sign ; 2111,token ")
.add("secureKey", "密钥", "认证签名密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP_DTLS;
}
public Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
if (context.getDevice() == null) {
return Flux.empty();
}
return Flux.defer(() -> {
String path = getPath(message);
String deviceId = getDeviceId(message);
String sign = message.getStringOption(2110).orElse(null);
String token = message.getStringOption(2111).orElse(null);
byte[] payload = message.payloadAsBytes();
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
if (StringUtils.isEmpty(deviceId)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
// TODO: 2021/7/30 移到 FunctionalTopicHandlers
if (path.endsWith("/request-token")) {
//认证
return context
.getDevice(deviceId)
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMap(device -> device
.getConfig("secureKey")
.flatMap(sk -> {
String secureKey = sk.asString();
if (!verifySign(secureKey, deviceId, payload, sign)) {
response.accept(CoAP.ResponseCode.BAD_REQUEST);
return Mono.empty();
}
String newToken = IDGenerator.MD5.generate();
return device
.setConfig("coap-token", newToken)
.doOnSuccess(success -> {
JSONObject json = new JSONObject();
json.put("token", newToken);
response.accept(json.toJSONString());
});
}))
.then(Mono.empty());
}
if (StringUtils.isEmpty(token)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return context
.getDevice(deviceId)
.flatMapMany(device -> device
.getSelfConfig("coap-token")
.switchIfEmpty(Mono.fromRunnable(() -> response.accept(CoAP.ResponseCode.UNAUTHORIZED)))
.flatMapMany(value -> {
String tk = value.asString();
if (!token.equals(tk)) {
response.accept(CoAP.ResponseCode.UNAUTHORIZED);
return Mono.empty();
}
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码,可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}))
.doOnComplete(() -> response.accept(CoAP.ResponseCode.CREATED))
.doOnError(error -> {
log.error("decode coap message error", error);
response.accept(CoAP.ResponseCode.BAD_REQUEST);
});
});
}
protected boolean verifySign(String secureKey, String deviceId, byte[] payload, String sign) {
//验证签名
byte[] secureKeyBytes = secureKey.getBytes();
byte[] signPayload = Arrays.copyOf(payload, payload.length + secureKeyBytes.length);
System.arraycopy(secureKeyBytes, 0, signPayload, 0, secureKeyBytes.length);
if (StringUtils.isEmpty(secureKey) || !DigestUtils.md5Hex(signPayload).equalsIgnoreCase(sign)) {
log.info("device [{}] coap sign [{}] error, payload:{}", deviceId, sign, payload);
return false;
}
return true;
}
}
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.californium.core.coap.CoAP;
import org.eclipse.californium.core.coap.OptionNumberRegistry;
import org.jetlinks.core.Value;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.CoapMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.DeviceConfigScope;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.function.Consumer;
@Slf4j
public class JetLinksCoapDeviceMessageCodec extends AbstractCoapDeviceMessageCodec {
public static final DefaultConfigMetadata coapConfig = new DefaultConfigMetadata(
"CoAP认证配置",
"使用CoAP进行数据上报时,需要对数据进行加密:" +
"encrypt(payload,secureKey);")
.add("encAlg", "加密算法", "加密算法", new EnumType()
.addElement(EnumType.Element.of("AES", "AES加密(ECB,PKCS#5)", "加密模式:ECB,填充方式:PKCS#5")), DeviceConfigScope.product)
.add("secureKey", "密钥", "16位密钥KEY", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.CoAP;
}
protected Flux<DeviceMessage> decode(CoapMessage message, MessageDecodeContext context, Consumer<Object> response) {
String path = getPath(message);
String deviceId = getDeviceId(message);
boolean cbor = message
.getStringOption(OptionNumberRegistry.CONTENT_FORMAT)
.map(MediaType::valueOf)
.map(MediaType.APPLICATION_CBOR::includes)
.orElse(false);
ObjectMapper objectMapper = cbor ? ObjectMappers.CBOR_MAPPER : ObjectMappers.JSON_MAPPER;
return context
.getDevice(deviceId)
.flatMapMany(device -> device
.getConfigs("encAlg", "secureKey")
.flatMapMany(configs -> {
Ciphers ciphers = configs
.getValue("encAlg")
.map(Value::asString)
.flatMap(Ciphers::of)
.orElse(Ciphers.AES);
String secureKey = configs.getValue("secureKey").map(Value::asString).orElse(null);
byte[] payload = ciphers.decrypt(message.payloadAsBytes(), secureKey);
//解码
return TopicMessageCodec
.decode(objectMapper, TopicMessageCodec.removeProductPath(path), payload)
//如果不能直接解码,可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(device,
path.split("/"),
payload,
objectMapper,
reply -> Mono.fromRunnable(() -> response.accept(reply.getPayload()))));
}));
}
}
package org.jetlinks.protocol.official;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.server.session.DeviceSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Map;
/**
* <pre>
* 下行Topic:
* 读取设备属性: /{productId}/{deviceId}/properties/read
* 修改设备属性: /{productId}/{deviceId}/properties/write
* 调用设备功能: /{productId}/{deviceId}/function/invoke
*
* //网关设备
* 读取子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/read
* 修改子设备属性: /{productId}/{deviceId}/child/{childDeviceId}/properties/write
* 调用子设备功能: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke
*
* 上行Topic:
* 读取属性回复: /{productId}/{deviceId}/properties/read/reply
* 修改属性回复: /{productId}/{deviceId}/properties/write/reply
* 调用设备功能: /{productId}/{deviceId}/function/invoke/reply
* 上报设备事件: /{productId}/{deviceId}/event/{eventId}
* 上报设备属性: /{productId}/{deviceId}/properties/report
* 上报设备派生物模型: /{productId}/{deviceId}/metadata/derived
*
* //网关设备
* 子设备上线消息: /{productId}/{deviceId}/child/{childDeviceId}/connected
* 子设备下线消息: /{productId}/{deviceId}/child/{childDeviceId}/disconnect
* 读取子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/read/reply
* 修改子设备属性回复: /{productId}/{deviceId}/child/{childDeviceId}/properties/write/reply
* 调用子设备功能回复: /{productId}/{deviceId}/child/{childDeviceId}/function/invoke/reply
* 上报子设备事件: /{productId}/{deviceId}/child/{childDeviceId}/event/{eventId}
* 上报子设备派生物模型: /{productId}/{deviceId}/child/{childDeviceId}/metadata/derived
*
* </pre>
* 基于jet links 的消息编解码器
*
* @author zhouhao
* @since 1.0.0
*/
@Slf4j
public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
private final Transport transport;
private final ObjectMapper mapper;
public JetLinksMqttDeviceMessageCodec(Transport transport) {
this.transport = transport;
this.mapper = ObjectMappers.JSON_MAPPER;
}
public JetLinksMqttDeviceMessageCodec() {
this(DefaultTransport.MQTT);
}
@Override
public Transport getSupportTransport() {
return transport;
}
@Nonnull
public Mono<MqttMessage> encode(@Nonnull MessageEncodeContext context) {
return Mono.defer(() -> {
Message message = context.getMessage();
if (message instanceof DisconnectDeviceMessage) {
return ((ToDeviceMessageContext) context)
.disconnect()
.then(Mono.empty());
}
if (message instanceof DeviceMessage) {
DeviceMessage deviceMessage = ((DeviceMessage) message);
TopicPayload convertResult = TopicMessageCodec.encode(mapper, deviceMessage);
if (convertResult == null) {
return Mono.empty();
}
return Mono
.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf))
.switchIfEmpty(context.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device.getSelfConfig(DeviceConfigKey.productId))
)
.defaultIfEmpty("null")
.map(productId -> SimpleMqttMessage
.builder()
.clientId(deviceMessage.getDeviceId())
.topic("/".concat(productId).concat(convertResult.getTopic()))
.payloadType(MessagePayloadType.JSON)
.payload(Unpooled.wrappedBuffer(convertResult.getPayload()))
.build());
} else {
return Mono.empty();
}
});
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
MqttMessage message = (MqttMessage) context.getMessage();
byte[] payload = message.payloadAsBytes();
return TopicMessageCodec
.decode(mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payload)
//如果不能直接解码,可能是其他设备功能
.switchIfEmpty(FunctionalTopicHandlers
.handle(context.getDevice(),
message.getTopic().split("/"),
payload,
mapper,
reply -> doReply(context, reply)))
;
}
private Mono<Void> doReply(MessageCodecContext context, TopicPayload reply) {
if (context instanceof FromDeviceMessageContext) {
return ((FromDeviceMessageContext) context)
.getSession()
.send(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
} else if (context instanceof ToDeviceMessageContext) {
return ((ToDeviceMessageContext) context)
.sendToDevice(SimpleMqttMessage
.builder()
.topic(reply.getTopic())
.payload(reply.getPayload())
.build())
.then();
}
return Mono.empty();
}
}
package org.jetlinks.protocol.official;
import org.jetlinks.core.defaults.CompositeProtocolSupport;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.route.WebsocketRoute;
import org.jetlinks.core.spi.ProtocolSupportProvider;
import org.jetlinks.core.spi.ServiceContext;
import org.jetlinks.protocol.official.http.JetLinksHttpDeviceMessageCodec;
import org.jetlinks.protocol.official.tcp.TcpDeviceMessageCodec;
import org.jetlinks.protocol.official.udp.UDPDeviceMessageCodec;
import org.jetlinks.supports.official.JetLinksDeviceMetadataCodec;
import org.springframework.http.HttpMethod;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JetLinksProtocolSupportProvider implements ProtocolSupportProvider {
private static final DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
"MQTT认证配置"
, "MQTT认证时需要的配置,mqtt用户名,密码算法:\n" +
"username=secureId|timestamp\n" +
"password=md5(secureId|timestamp|secureKey)\n" +
"\n" +
"timestamp为时间戳,与服务时间不能相差5分钟")
.add("secureId", "secureId", "密钥ID", new StringType())
.add("secureKey", "secureKey", "密钥KEY", new PasswordType());
@Override
public Mono<CompositeProtocolSupport> create(ServiceContext context) {
return Mono.defer(() -> {
CompositeProtocolSupport support = new CompositeProtocolSupport();
support.setId("jetlinks.v3.0");
support.setName("JetLinks V3.0");
support.setDescription("JetLinks Protocol Version 3.0");
support.addRoutes(DefaultTransport.MQTT, Arrays
.stream(TopicMessageCodec.values())
.map(TopicMessageCodec::getRoute)
.filter(Objects::nonNull)
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.MQTT,
"document-mqtt.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addRoutes(DefaultTransport.HTTP, Stream
.of(TopicMessageCodec.reportProperty,
TopicMessageCodec.event,
TopicMessageCodec.online,
TopicMessageCodec.offline)
.map(TopicMessageCodec::getRoute)
.filter(route -> route != null && route.isUpstream())
.map(route -> HttpRoute
.builder()
.address(route.getTopic())
.group(route.getGroup())
.contentType(MediaType.APPLICATION_JSON)
.method(HttpMethod.POST)
.description(route.getDescription())
.example(route.getExample())
.build())
.collect(Collectors.toList())
);
support.setDocument(DefaultTransport.HTTP,
"document-http.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
support.addAuthenticator(DefaultTransport.MQTT, new JetLinksAuthenticator());
support.setMetadataCodec(new JetLinksDeviceMetadataCodec());
support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
//TCP
support.addConfigMetadata(DefaultTransport.TCP, TcpDeviceMessageCodec.tcpConfig);
support.addMessageCodecSupport(new TcpDeviceMessageCodec());
support.setDocument(DefaultTransport.TCP,
"document-tcp.md",
JetLinksProtocolSupportProvider.class.getClassLoader());
//UDP
support.addConfigMetadata(DefaultTransport.UDP, UDPDeviceMessageCodec.udpConfig);
support.addMessageCodecSupport(new UDPDeviceMessageCodec());
//MQTT
support.addMessageCodecSupport(new JetLinksMqttDeviceMessageCodec());
//HTTP
support.addConfigMetadata(DefaultTransport.HTTP, JetLinksHttpDeviceMessageCodec.httpConfig);
support.addMessageCodecSupport(new JetLinksHttpDeviceMessageCodec());
//Websocket
JetLinksHttpDeviceMessageCodec codec = new JetLinksHttpDeviceMessageCodec(DefaultTransport.WebSocket);
support.addMessageCodecSupport(codec);
support.addAuthenticator(DefaultTransport.WebSocket, codec);
support.addRoutes(
DefaultTransport.WebSocket,
Collections.singleton(
WebsocketRoute
.builder()
.path("/{productId:产品ID}/{productId:设备ID}/socket")
.description("通过Websocket接入平台")
.build()
));
//CoAP
support.addConfigMetadata(DefaultTransport.CoAP, JetLinksCoapDeviceMessageCodec.coapConfig);
support.addMessageCodecSupport(new JetLinksCoapDeviceMessageCodec());
return Mono.just(support);
});
}
}
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.springframework.http.converter.json.Jackson2ObjectMapperBuilder;
public class ObjectMappers {
public static final ObjectMapper JSON_MAPPER;
public static final ObjectMapper CBOR_MAPPER;
static {
JSON_MAPPER = Jackson2ObjectMapperBuilder
.json()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
;
CBOR_MAPPER = Jackson2ObjectMapperBuilder
.cbor()
.build()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
}
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.*;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.message.state.DeviceStateCheckMessage;
import org.jetlinks.core.message.state.DeviceStateCheckMessageReply;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
import java.util.StringJoiner;
import java.util.function.Function;
public enum TopicMessageCodec {
//上报属性数据
reportProperty("/*/properties/report",
ReportPropertyMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("属性上报")
.description("上报物模型属性数据")
.example("{\"properties\":{\"属性ID\":\"属性值\"}}")),
//读取属性
readProperty("/*/properties/read",
ReadPropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("读取属性")
.description("平台下发读取物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":[\"属性ID\"]}")),
//读取属性回复
readPropertyReply("/*/properties/read/reply",
ReadPropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("读取属性")
.description("对平台下发的读取属性指令进行响应")
.example("{\"messageId\":\"消息ID,与读取指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性
writeProperty("/*/properties/write",
WritePropertyMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("修改属性")
.description("平台下发修改物模型属性数据指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//修改属性回复
writePropertyReply("/*/properties/write/reply",
WritePropertyMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("修改属性")
.description("对平台下发的修改属性指令进行响应")
.example("{\"messageId\":\"消息ID,与修改指令中的ID一致.\",\"properties\":{\"属性ID\":\"属性值\"}}")),
//事件上报
event("/*/event/*",
EventMessage.class,
route -> route
.upstream(true)
.downstream(false)
.group("事件上报")
.description("上报物模型事件数据")
.example("{\"data\":{\"key\":\"value\"}}")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{eventId:事件ID}";
}
@Override
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String event = topic[topic.length - 1];
return Mono.from(super.doDecode(mapper, topic, payload))
.cast(EventMessage.class)
.doOnNext(e -> e.setEvent(event))
.cast(DeviceMessage.class);
}
@Override
void refactorTopic(String[] topics, DeviceMessage message) {
super.refactorTopic(topics, message);
EventMessage event = ((EventMessage) message);
topics[topics.length - 1] = String.valueOf(event.getEvent());
}
},
//调用功能
functionInvoke("/*/function/invoke",
FunctionInvokeMessage.class,
route -> route
.upstream(false)
.downstream(true)
.group("调用功能")
.description("平台下发功能调用指令")
.example("{\"messageId\":\"消息ID,回复时需要一致.\"," +
"\"functionId\":\"功能标识\"," +
"\"inputs\":[{\"name\":\"参数名\",\"value\":\"参数值\"}]}")),
//调用功能回复
functionInvokeReply("/*/function/invoke/reply",
FunctionInvokeMessageReply.class,
route -> route
.upstream(true)
.downstream(false)
.group("调用功能")
.description("设备响应平台下发的功能调用指令")
.example("{\"messageId\":\"消息ID,与下发指令中的messageId一致.\"," +
"\"output\":\"输出结果,格式与物模型中定义的类型一致\"")),
//子设备消息
child("/*/child/*/**",
ChildDeviceMessage.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关上报或者平台下发子设备消息")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
_topic[0] = "";// topic以/开头所有第一位是空白
return TopicMessageCodec
.decode(mapper, _topic, payload)
.map(childMsg -> {
ChildDeviceMessage msg = new ChildDeviceMessage();
msg.setDeviceId(topic[1]);
msg.setChildDeviceMessage(childMsg);
msg.setTimestamp(childMsg.getTimestamp());
msg.setMessageId(childMsg.getMessageId());
return msg;
});
}
@Override
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
ChildDeviceMessage deviceMessage = ((ChildDeviceMessage) message);
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
String[] childTopic = payload.getTopic().split("/");
String[] topic = new String[topics.length + childTopic.length - 3];
//合并topic
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
refactorTopic(topic, message);
payload.setTopic(String.join("/", topic));
return payload;
}
}, //子设备消息回复
childReply("/*/child-reply/*/**",
ChildDeviceMessageReply.class,
route -> route
.upstream(true)
.downstream(true)
.group("子设备消息")
.description("网关回复平台下发给子设备的指令结果")) {
@Override
protected void transMqttTopic(String[] topic) {
topic[topic.length - 1] = "{#:子设备相应操作的topic}";
topic[topic.length - 2] = "{childDeviceId:子设备ID}";
}
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
String[] _topic = Arrays.copyOfRange(topic, 2, topic.length);
_topic[0] = "";// topic以/开头所有第一位是空白
return TopicMessageCodec
.decode(mapper, _topic, payload)
.map(childMsg -> {
ChildDeviceMessageReply msg = new ChildDeviceMessageReply();
msg.setDeviceId(topic[1]);
msg.setChildDeviceMessage(childMsg);
msg.setTimestamp(childMsg.getTimestamp());
msg.setMessageId(childMsg.getMessageId());
return msg;
});
}
@Override
protected TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
ChildDeviceMessageReply deviceMessage = ((ChildDeviceMessageReply) message);
DeviceMessage childMessage = ((DeviceMessage) deviceMessage.getChildDeviceMessage());
TopicPayload payload = TopicMessageCodec.encode(mapper, childMessage);
String[] childTopic = payload.getTopic().split("/");
String[] topic = new String[topics.length + childTopic.length - 3];
//合并topic
System.arraycopy(topics, 0, topic, 0, topics.length - 1);
System.arraycopy(childTopic, 1, topic, topics.length - 2, childTopic.length - 1);
refactorTopic(topic, message);
payload.setTopic(String.join("/", topic));
return payload;
}
},
//更新标签
updateTag("/*/tags",
UpdateTagMessage.class,
route -> route.upstream(true)
.downstream(false)
.group("更新标签")
.description("更新标签数据")
.example("{\"tags\":{\"key\",\"value\"}}")),
//注册
register("/*/register", DeviceRegisterMessage.class),
//注销
unregister("/*/unregister", DeviceUnRegisterMessage.class),
//更新固件消息
upgradeFirmware("/*/firmware/upgrade", UpgradeFirmwareMessage.class),
//更新固件消息回复
upgradeFirmwareReply("/*/firmware/upgrade/reply", UpgradeFirmwareMessageReply.class),
//更新固件升级进度消息
upgradeProcessFirmware("/*/firmware/upgrade/progress", UpgradeFirmwareProgressMessage.class),
//拉取固件
requestFirmware("/*/firmware/pull", RequestFirmwareMessage.class),
//拉取固件更新回复
requestFirmwareReply("/*/firmware/pull/reply", RequestFirmwareMessageReply.class),
//上报固件版本
reportFirmware("/*/firmware/report", ReportFirmwareMessage.class),
//读取固件回复
readFirmware("/*/firmware/read", ReadFirmwareMessage.class),
//读取固件回复
readFirmwareReply("/*/firmware/read/reply", ReadFirmwareMessageReply.class),
//派生物模型上报
derivedMetadata("/*/metadata/derived", DerivedMetadataMessage.class),
//透传设备消息
direct("/*/direct", DirectDeviceMessage.class) {
@Override
public Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
DirectDeviceMessage message = new DirectDeviceMessage();
message.setDeviceId(topic[1]);
message.setPayload(payload);
return Mono.just(message);
}
},
//断开连接消息
disconnect("/*/disconnect", DisconnectDeviceMessage.class),
//断开连接回复
disconnectReply("/*/disconnect/reply", DisconnectDeviceMessageReply.class),
//上线
online("/*/online", DeviceOnlineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备上线")),
//离线
offline("/*/offline", DeviceOfflineMessage.class, builder -> builder
.upstream(true)
.group("状态管理")
.description("设备离线")),
//日志
log("/*/log", DeviceLogMessage.class),
//状态检查
stateCheck("/*/state-check", DeviceStateCheckMessage.class),
stateCheckReply("/*/state-check/reply", DeviceStateCheckMessageReply.class),
;
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type,
Function<MqttRoute.Builder, MqttRoute.Builder> routeCustom) {
this.pattern = topic.split("/");
this.type = type;
this.route = routeCustom.apply(toRoute()).build();
}
TopicMessageCodec(String topic,
Class<? extends DeviceMessage> type) {
this.pattern = topic.split("/");
this.type = type;
this.route = null;
}
private final String[] pattern;
private final MqttRoute route;
private final Class<? extends DeviceMessage> type;
protected void transMqttTopic(String[] topic) {
}
@SneakyThrows
private MqttRoute.Builder toRoute() {
String[] topics = new String[pattern.length];
System.arraycopy(pattern, 0, topics, 0, pattern.length);
topics[0] = "{productId:产品ID}";
topics[1] = "{deviceId:设备ID}";
transMqttTopic(topics);
StringJoiner joiner = new StringJoiner("/", "/", "");
for (String topic : topics) {
joiner.add(topic);
}
return MqttRoute
.builder(joiner.toString())
.qos(1);
}
public MqttRoute getRoute() {
return route;
}
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String[] topics, byte[] payload) {
return Mono
.justOrEmpty(fromTopic(topics))
.flatMapMany(topicMessageCodec -> topicMessageCodec.doDecode(mapper, topics, payload));
}
public static Flux<DeviceMessage> decode(ObjectMapper mapper, String topic, byte[] payload) {
return decode(mapper, topic.split("/"), payload);
}
public static TopicPayload encode(ObjectMapper mapper, DeviceMessage message) {
return fromMessage(message)
.orElseThrow(() -> new UnsupportedOperationException("unsupported message:" + message.getMessageType()))
.doEncode(mapper, message);
}
static Optional<TopicMessageCodec> fromTopic(String[] topic) {
for (TopicMessageCodec value : values()) {
if (TopicUtils.match(value.pattern, topic)) {
return Optional.of(value);
}
}
return Optional.empty();
}
static Optional<TopicMessageCodec> fromMessage(DeviceMessage message) {
for (TopicMessageCodec value : values()) {
if (value.type == message.getClass()) {
return Optional.of(value);
}
}
return Optional.empty();
}
Publisher<DeviceMessage> doDecode(ObjectMapper mapper, String[] topic, byte[] payload) {
return Mono
.fromCallable(() -> {
DeviceMessage message = mapper.readValue(payload, type);
FastBeanCopier.copy(Collections.singletonMap("deviceId", topic[1]), message);
return message;
});
}
@SneakyThrows
TopicPayload doEncode(ObjectMapper mapper, String[] topics, DeviceMessage message) {
refactorTopic(topics, message);
return TopicPayload.of(String.join("/", topics), mapper.writeValueAsBytes(message));
}
@SneakyThrows
TopicPayload doEncode(ObjectMapper mapper, DeviceMessage message) {
String[] topics = Arrays.copyOf(pattern, pattern.length);
return doEncode(mapper, topics, message);
}
void refactorTopic(String[] topics, DeviceMessage message) {
topics[1] = message.getDeviceId();
}
/**
* 移除topic中的产品信息,topic第一个层为产品ID,在解码时,不需要此信息,所以需要移除之.
*
* @param topic topic
* @return 移除后的topic
*/
public static String[] removeProductPath(String topic) {
if (!topic.startsWith("/")) {
topic = "/" + topic;
}
String[] topicArr = topic.split("/");
String[] topics = Arrays.copyOfRange(topicArr, 1, topicArr.length);
topics[0] = "";
return topics;
}
}
package org.jetlinks.protocol.official;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TopicPayload {
private String topic;
private byte[] payload;
}
package org.jetlinks.protocol.official.binary;
public enum AckCode {
ok,
noAuth,
unsupportedMessage
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.HeaderKey;
public class BinaryAcknowledgeDeviceMessage implements BinaryMessage<AcknowledgeDeviceMessage> {
public static final HeaderKey<String> codeHeader = HeaderKey.of("code", AckCode.ok.name());
private AcknowledgeDeviceMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.ack;
}
@Override
public void read(ByteBuf buf) {
message = new AcknowledgeDeviceMessage();
AckCode code = AckCode.values()[buf.readUnsignedByte()];
message.addHeader(codeHeader, code.name());
}
@Override
public void write(ByteBuf buf) {
AckCode code = AckCode.valueOf(this.message.getHeaderOrDefault(codeHeader));
buf.writeByte(code.ordinal());
}
@Override
public void setMessage(AcknowledgeDeviceMessage message) {
this.message = message;
}
@Override
public AcknowledgeDeviceMessage getMessage() {
return message;
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
/**
*
*/
public class BinaryDeviceOnlineMessage implements BinaryMessage<DeviceOnlineMessage> {
public static final HeaderKey<String> loginToken = HeaderKey.of("token", null);
private DeviceOnlineMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.online;
}
@Override
public void read(ByteBuf buf) {
message = new DeviceOnlineMessage();
message.addHeader(loginToken, (String) DataType.STRING.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING
.write(
buf, message.getHeader(loginToken).orElse("")
);
}
@Override
public void setMessage(DeviceOnlineMessage message) {
this.message = message;
}
@Override
public DeviceOnlineMessage getMessage() {
return message;
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.event.EventMessage;
/**
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryEventMessage implements BinaryMessage<EventMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.event;
}
private EventMessage message;
@Override
public void read(ByteBuf buf) {
message = new EventMessage();
message.setEvent((String) DataType.STRING.read(buf));
message.setData(DataType.OBJECT.read(buf));
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getEvent());
DataType.OBJECT.write(buf, message.getData());
}
@Override
public void setMessage(EventMessage message) {
this.message = message;
}
@Override
public EventMessage getMessage() {
return message;
}
}
\ No newline at end of file
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionParameter;
import java.util.Map;
import java.util.stream.Collectors;
public class BinaryFunctionInvokeMessage implements BinaryMessage<FunctionInvokeMessage> {
private FunctionInvokeMessage message;
@Override
public BinaryMessageType getType() {
return BinaryMessageType.function;
}
@Override
public void read(ByteBuf buf) {
message = new FunctionInvokeMessage();
message.setFunctionId((String) DataType.STRING.read(buf));
@SuppressWarnings("all")
Map<String, Object> params = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setInputs(
params
.entrySet()
.stream()
.map(e -> new FunctionParameter(e.getKey(), e.getValue()))
.collect(Collectors.toList())
);
}
@Override
public void write(ByteBuf buf) {
DataType.STRING.write(buf,message.getFunctionId());
DataType.OBJECT.write(buf,message.inputsToMap());
}
@Override
public void setMessage(FunctionInvokeMessage message) {
this.message = message;
}
@Override
public FunctionInvokeMessage getMessage() {
return message;
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryFunctionInvokeMessageReply extends BinaryReplyMessage<FunctionInvokeMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.functionReply;
}
@Override
protected FunctionInvokeMessageReply newMessage() {
return new FunctionInvokeMessageReply();
}
@Override
protected void doReadSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
msg.setFunctionId((String) DataType.readFrom(buf));
msg.setOutput(DataType.readFrom(buf));
}
@Override
protected void doWriteSuccess(FunctionInvokeMessageReply msg, ByteBuf buf) {
DataType.writeTo(getMessage().getFunctionId(), buf);
DataType.writeTo(msg.getOutput(), buf);
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessage;
import reactor.core.publisher.Flux;
public interface BinaryMessage<T extends DeviceMessage> {
BinaryMessageType getType();
void read(ByteBuf buf);
void write(ByteBuf buf);
void setMessage(T message);
T getMessage();
}
package org.jetlinks.protocol.official.binary;
import com.google.common.cache.CacheBuilder;
import io.netty.buffer.ByteBuf;
import lombok.SneakyThrows;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.HeaderKey;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import java.time.Duration;
import java.util.Map;
import java.util.function.BiFunction;
import java.util.function.Supplier;
public enum BinaryMessageType {
//0x00
keepalive(null, null),
//0x01
online(DeviceOnlineMessage.class, BinaryDeviceOnlineMessage::new),
//0x02
ack(AcknowledgeDeviceMessage.class, BinaryAcknowledgeDeviceMessage::new),
//0x03
reportProperty(ReportPropertyMessage.class, BinaryReportPropertyMessage::new),
//0x04
readProperty(ReadPropertyMessage.class, BinaryReadPropertyMessage::new),
//0x05
readPropertyReply(ReadPropertyMessageReply.class, BinaryReadPropertyMessageReply::new),
writeProperty(WritePropertyMessage.class, BinaryWritePropertyMessage::new),
writePropertyReply(WritePropertyMessageReply.class, BinaryWritePropertyMessageReply::new),
function(FunctionInvokeMessage.class, BinaryFunctionInvokeMessage::new),
functionReply(FunctionInvokeMessageReply.class, BinaryFunctionInvokeMessageReply::new),
event(EventMessage.class, BinaryEventMessage::new);
private final Class<? extends DeviceMessage> forDevice;
private final Supplier<BinaryMessage<DeviceMessage>> forTcp;
private static final BinaryMessageType[] VALUES = values();
public static final HeaderKey<Integer> HEADER_MSG_SEQ = HeaderKey.of("_seq", 0, Integer.class);
@SuppressWarnings("all")
BinaryMessageType(Class<? extends DeviceMessage> forDevice,
Supplier<? extends BinaryMessage<?>> forTcp) {
this.forDevice = forDevice;
this.forTcp = (Supplier) forTcp;
}
private static final Map<String, MsgIdHolder> cache = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofHours(1))
.<String, MsgIdHolder>build()
.asMap();
private static class MsgIdHolder {
private int msgId = 0;
private final Map<Integer, String> cached = CacheBuilder
.newBuilder()
.expireAfterWrite(Duration.ofSeconds(30))
.<Integer, String>build()
.asMap();
public int next(String id) {
if (id == null) {
return -1;
}
do {
if (msgId++ < 0) {
msgId = 0;
}
} while (cached.putIfAbsent(msgId, id) != null);
return msgId;
}
public String getAndRemove(int id) {
if (id < 0) {
return null;
}
return cached.remove(id);
}
}
@SneakyThrows
private static MsgIdHolder takeHolder(String deviceId) {
return cache.computeIfAbsent(deviceId, (ignore) -> new MsgIdHolder());
}
public static ByteBuf write(DeviceMessage message, ByteBuf data) {
int msgId = message.getHeaderOrElse(HEADER_MSG_SEQ, () -> takeHolder(message.getDeviceId()).next(message.getMessageId()));
return write(message, msgId, data);
}
public static ByteBuf write(BinaryMessageType type, ByteBuf data) {
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 0-4字节 时间戳
data.writeLong(System.currentTimeMillis());
return data;
}
public static ByteBuf write(DeviceMessage message, int msgId, ByteBuf data) {
BinaryMessageType type = lookup(message);
// 第0个字节是消息类型
data.writeByte(type.ordinal());
// 第1-8字节 时间戳
data.writeLong(message.getTimestamp());
// 9-11字节 消息序号
data.writeShort(msgId);
// 12... 字节 设备ID
DataType.STRING.write(data, message.getDeviceId());
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
tcp.setMessage(message);
//写出数据到ByteBuf
tcp.write(data);
return data;
}
public static DeviceMessage read(ByteBuf data) {
return read(data, null);
}
public static <T> T read(ByteBuf data,
String deviceIdMaybe,
BiFunction<DeviceMessage, Integer, T> handler) {
//第0个字节是消息类型
BinaryMessageType type = VALUES[data.readByte()];
if (type.forTcp == null) {
return null;
}
// 1-4字节 时间戳
long timestamp = data.readLong();
// 5-6字节 消息序号
int msgId = data.readUnsignedShort();
// 7... 字节 设备ID
String deviceId = (String) DataType.STRING.read(data);
if (deviceId == null) {
deviceId = deviceIdMaybe;
}
// 创建消息对象
BinaryMessage<DeviceMessage> tcp = type.forTcp.get();
//从ByteBuf读取
tcp.read(data);
DeviceMessage message = tcp.getMessage();
message.thingId(DeviceThingType.device, deviceId);
if (timestamp > 0) {
message.timestamp(timestamp);
}
message.addHeader(HEADER_MSG_SEQ, msgId);
return handler.apply(message, msgId);
}
public static DeviceMessage read(ByteBuf data, String deviceIdMaybe) {
return read(data, deviceIdMaybe, (message, msgId) -> {
String messageId = null;
if (message.getDeviceId() != null) {
//获取实际平台下发的消息ID
MsgIdHolder holder = cache.get(message.getDeviceId());
if (holder != null) {
messageId = holder.getAndRemove(msgId);
}
}
if (messageId == null && msgId > 0) {
messageId = String.valueOf(msgId);
}
message.messageId(messageId);
return message;
});
}
public static BinaryMessageType lookup(DeviceMessage message) {
for (BinaryMessageType value : VALUES) {
if (value.forDevice != null && value.forDevice.isInstance(message)) {
return value;
}
}
throw new UnsupportedOperationException("unsupported device message " + message.getMessageType());
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (BinaryMessageType value : BinaryMessageType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.List;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReadPropertyMessage implements BinaryMessage<ReadPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readProperty;
}
private ReadPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReadPropertyMessage();
@SuppressWarnings("all")
List<String> list = (List<String>) DataType.ARRAY.read(buf);
message.setProperties(list);
}
@Override
public void write(ByteBuf buf) {
DataType.ARRAY.write(buf, message.getProperties());
}
@Override
public void setMessage(ReadPropertyMessage message) {
this.message = message;
}
@Override
public ReadPropertyMessage getMessage() {
return message;
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryReadPropertyMessageReply extends BinaryReplyMessage<ReadPropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected ReadPropertyMessageReply newMessage() {
return new ReadPropertyMessageReply();
}
@Override
protected void doWriteSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
@Override
protected void doReadSuccess(ReadPropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import org.jetlinks.core.message.DeviceMessageReply;
import java.util.Map;
public abstract class BinaryReplyMessage<T extends DeviceMessageReply> implements BinaryMessage<T> {
private T message;
protected abstract T newMessage();
@Override
public final void read(ByteBuf buf) {
message = newMessage();
boolean success = buf.readBoolean();
if (success) {
doReadSuccess(message, buf);
} else {
message.success(false);
message.code(String.valueOf(DataType.readFrom(buf)));
message.message(String.valueOf(DataType.readFrom(buf)));
}
}
protected abstract void doReadSuccess(T msg, ByteBuf buf);
protected abstract void doWriteSuccess(T msg, ByteBuf buf);
@Override
public final void write(ByteBuf buf) {
buf.writeBoolean(message.isSuccess());
if (message.isSuccess()) {
doWriteSuccess(message, buf);
} else {
DataType.writeTo(message.getCode(), buf);
DataType.writeTo(message.getMessage(), buf);
}
}
@Override
public void setMessage(T message) {
this.message = message;
}
@Override
public T getMessage() {
return message;
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryReportPropertyMessage implements BinaryMessage<ReportPropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.reportProperty;
}
private ReportPropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new ReportPropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(ReportPropertyMessage message) {
this.message = message;
}
@Override
public ReportPropertyMessage getMessage() {
return message;
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
@AllArgsConstructor
@NoArgsConstructor
public class BinaryWritePropertyMessage implements BinaryMessage<WritePropertyMessage> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.writeProperty;
}
private WritePropertyMessage message;
@Override
public void read(ByteBuf buf) {
message = new WritePropertyMessage();
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
message.setProperties(map);
}
@Override
public void write(ByteBuf buf) {
DataType.OBJECT.write(buf, message.getProperties());
}
@Override
public void setMessage(WritePropertyMessage message) {
this.message = message;
}
@Override
public WritePropertyMessage getMessage() {
return message;
}
}
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import org.jetlinks.core.message.property.ReadPropertyMessageReply;
import org.jetlinks.core.message.property.WritePropertyMessageReply;
import java.util.Map;
/**
* @author zhouhao
* @since 1.0
*/
public class BinaryWritePropertyMessageReply extends BinaryReplyMessage<WritePropertyMessageReply> {
@Override
public BinaryMessageType getType() {
return BinaryMessageType.readPropertyReply;
}
@Override
protected WritePropertyMessageReply newMessage() {
return new WritePropertyMessageReply();
}
@Override
protected void doReadSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
@SuppressWarnings("all")
Map<String, Object> map = (Map<String, Object>) DataType.OBJECT.read(buf);
msg.setProperties(map);
}
@Override
protected void doWriteSuccess(WritePropertyMessageReply msg, ByteBuf buf) {
DataType.OBJECT.write(buf, msg.getProperties());
}
}
package org.jetlinks.protocol.official.binary;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.jetlinks.protocol.official.ObjectMappers;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public enum DataType {
//0x00
NULL {
@Override
public Object read(ByteBuf buf) {
return null;
}
@Override
public void write(ByteBuf buf, Object value) {
}
},
//0x01
BOOLEAN {
@Override
public Object read(ByteBuf buf) {
return buf.readBoolean();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeBoolean((Boolean) value);
}
},
//0x02
INT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x03
INT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x04
INT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x05
INT64 {
@Override
public Object read(ByteBuf buf) {
return buf.readLong();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeLong((Long) value);
}
},
//0x06
UINT8 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedByte();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeByte((Byte) value);
}
},
//0x07
UINT16 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedShort();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeShort((Short) value);
}
},
//0x08
UINT32 {
@Override
public Object read(ByteBuf buf) {
return buf.readUnsignedInt();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeInt((Integer) value);
}
},
//0x09
FLOAT {
@Override
public Object read(ByteBuf buf) {
return buf.readFloat();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeFloat((Float) value);
}
},
//0x0A
DOUBLE {
@Override
public Object read(ByteBuf buf) {
return buf.readDouble();
}
@Override
public void write(ByteBuf buf, Object value) {
buf.writeDouble((Double) value);
}
},
//0x0B
STRING {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return new String(bytes, StandardCharsets.UTF_8);
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = ((String) value).getBytes();
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0C
BINARY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
byte[] bytes = new byte[len];
buf.readBytes(bytes);
return bytes;
}
@Override
public void write(ByteBuf buf, Object value) {
byte[] bytes = (byte[]) value;
buf.writeShort(bytes.length);
buf.writeBytes(bytes);
}
},
//0x0D
ARRAY {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
List<Object> array = new ArrayList<>(len);
for (int i = 0; i < len; i++) {
array.add(readFrom(buf));
}
return array;
}
@Override
public void write(ByteBuf buf, Object value) {
Collection<Object> array = (Collection<Object>) value;
buf.writeShort(array.size());
for (Object o : array) {
writeTo(o, buf);
}
}
},
//0x0E
OBJECT {
@Override
public Object read(ByteBuf buf) {
int len = buf.readUnsignedShort();
Map<String, Object> data = Maps.newLinkedHashMapWithExpectedSize(len);
for (int i = 0; i < len; i++) {
data.put((String) STRING.read(buf), readFrom(buf));
}
return data;
}
@Override
public void write(ByteBuf buf, Object value) {
Map<String, Object> data = value instanceof Map ? ((Map) value) : ObjectMappers.JSON_MAPPER.convertValue(value, Map.class);
buf.writeShort(data.size());
for (Map.Entry<String, Object> entry : data.entrySet()) {
STRING.write(buf, entry.getKey());
writeTo(entry.getValue(), buf);
}
}
};
private final static DataType[] VALUES = values();
public abstract Object read(ByteBuf buf);
public abstract void write(ByteBuf buf, Object value);
public static Object readFrom(ByteBuf buf) {
return VALUES[buf.readUnsignedByte()].read(buf);
}
public static void writeTo(Object data, ByteBuf buf) {
DataType type = loopUpType(data);
buf.writeByte(type.ordinal());
type.write(buf, data);
}
private static DataType loopUpType(Object data) {
if (data == null) {
return NULL;
} else if (data instanceof Boolean) {
return BOOLEAN;
} else if (data instanceof Byte) {
return INT8;
} else if (data instanceof Short) {
return INT16;
} else if (data instanceof Integer) {
return INT32;
} else if (data instanceof Long) {
return INT64;
} else if (data instanceof Float) {
return FLOAT;
} else if (data instanceof Double) {
return DOUBLE;
} else if (data instanceof String) {
return STRING;
} else if (data instanceof byte[]) {
return BINARY;
} else if (data instanceof Collection) {
return ARRAY;
} else if (data instanceof Map) {
return OBJECT;
} else {
throw new IllegalArgumentException("Unsupported data type: " + data.getClass());
}
}
public static void main(String[] args) {
System.out.println("| Byte | Type |");
System.out.println("| ---- | ---- |");
for (DataType value : DataType.values()) {
System.out.print("|");
System.out.print("0x0"+Integer.toString(value.ordinal(),16));
System.out.print("|");
System.out.print(value.name());
System.out.print("|");
System.out.println();
}
System.out.println();
}
}
package org.jetlinks.protocol.official.cipher;
import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Base64;
import javax.crypto.Cipher;
import javax.crypto.spec.SecretKeySpec;
import java.util.Optional;
public enum Ciphers {
AES {
@SneakyThrows
public byte[] encrypt(byte[] src, String key) {
if (key == null || key.length() != 16) {
throw new IllegalArgumentException("illegal key");
}
SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.ENCRYPT_MODE, skeySpec);
return cipher.doFinal(src);
}
@SneakyThrows
public byte[] decrypt(byte[] src, String key) {
if (key == null || key.length() != 16) {
throw new IllegalArgumentException("illegal key");
}
SecretKeySpec skeySpec = new SecretKeySpec(key.getBytes(), "AES");
Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
cipher.init(Cipher.DECRYPT_MODE, skeySpec);
return cipher.doFinal(src);
}
};
public static Optional<Ciphers> of(String name) {
try {
return Optional.of(valueOf(name.toUpperCase()));
} catch (Exception e) {
return Optional.empty();
}
}
public abstract byte[] encrypt(byte[] src, String key);
public abstract byte[] decrypt(byte[] src, String key);
String encryptBase64(String src, String key) {
return Base64.encodeBase64String(encrypt(src.getBytes(), key));
}
byte[] decryptBase64(String src, String key) {
return decrypt(Base64.decodeBase64(src), key);
}
}
package org.jetlinks.protocol.official.functional;
import lombok.Getter;
import lombok.Setter;
@Getter
@Setter
public class TimeSyncRequest {
private String messageId;
}
package org.jetlinks.protocol.official.functional;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Getter
@Setter
@AllArgsConstructor(staticName = "of")
@NoArgsConstructor
public class TimeSyncResponse {
private String messageId;
private long timestamp;
}
package org.jetlinks.protocol.official.http;
import com.alibaba.fastjson.JSONObject;
import com.fasterxml.jackson.core.JsonParseException;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import lombok.extern.slf4j.Slf4j;
import org.jetlinks.core.defaults.Authenticator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.HttpExchangeMessage;
import org.jetlinks.core.message.codec.http.SimpleHttpResponseMessage;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessage;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.protocol.official.ObjectMappers;
import org.jetlinks.protocol.official.TopicMessageCodec;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.Objects;
/**
* Http 的消息编解码器
*
* @author zhouhao
* @since 3.0.0
*/
@Slf4j
public class JetLinksHttpDeviceMessageCodec implements DeviceMessageCodec, Authenticator {
public static final DefaultConfigMetadata httpConfig = new DefaultConfigMetadata(
"HTTP认证配置"
, "使用HTTP Bearer Token进行认证")
.add("bearer_token", "Token", "Token", new PasswordType());
private final Transport transport;
public JetLinksHttpDeviceMessageCodec(Transport transport) {
this.transport = transport;
}
public JetLinksHttpDeviceMessageCodec() {
this(DefaultTransport.HTTP);
}
@Override
public Transport getSupportTransport() {
return transport;
}
@Nonnull
public Mono<EncodedMessage> encode(@Nonnull MessageEncodeContext context) {
JSONObject json = context.getMessage().toJson();
//通过websocket下发
return Mono.just(DefaultWebSocketMessage.of(
WebSocketMessage.Type.TEXT,
Unpooled.wrappedBuffer(json.toJSONString().getBytes())));
}
private static SimpleHttpResponseMessage unauthorized(String msg) {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"unauthorized\",\"message\":\"" + msg + "\"}")
.status(401)
.build();
}
private static SimpleHttpResponseMessage badRequest() {
return SimpleHttpResponseMessage
.builder()
.contentType(MediaType.APPLICATION_JSON)
.body("{\"success\":false,\"code\":\"bad_request\"}")
.status(400)
.build();
}
@Nonnull
@Override
public Flux<DeviceMessage> decode(@Nonnull MessageDecodeContext context) {
if (context.getMessage() instanceof HttpExchangeMessage) {
return decodeHttp(context);
}
if (context.getMessage() instanceof WebSocketSessionMessage) {
return decodeWebsocket(context);
}
return Flux.empty();
}
private Flux<DeviceMessage> decodeWebsocket(MessageDecodeContext context) {
WebSocketSessionMessage msg = ((WebSocketSessionMessage) context.getMessage());
return Mono
.justOrEmpty(MessageType.convertMessage(msg.payloadAsJson()))
.cast(DeviceMessage.class)
.flux();
}
private Flux<DeviceMessage> decodeHttp(MessageDecodeContext context) {
HttpExchangeMessage message = (HttpExchangeMessage) context.getMessage();
//校验请求头中的Authorization header,格式:
// Authorization: Bearer <token>
Header header = message.getHeader(HttpHeaders.AUTHORIZATION).orElse(null);
if (header == null || header.getValue() == null || header.getValue().length == 0) {
return message
.response(unauthorized("Authorization header is required"))
.thenMany(Mono.empty());
}
String[] token = header.getValue()[0].split(" ");
if (token.length == 1) {
return message
.response(unauthorized("Illegal token format"))
.thenMany(Mono.empty());
}
String basicToken = token[1];
String[] paths = TopicMessageCodec.removeProductPath(message.getPath());
if (paths.length < 1) {
return message
.response(badRequest())
.thenMany(Mono.empty());
}
String deviceId = paths[1];
return context
.getDevice(deviceId)
.flatMap(device -> device.getConfig("bearer_token"))
//校验token
.filter(value -> Objects.equals(value.asString(), basicToken))
//设备或者配置不对
.switchIfEmpty(Mono.defer(() -> message
.response(unauthorized("Device no register or token not match"))
.then(Mono.empty())))
//解码
.flatMapMany(ignore -> doDecode(message, paths))
.switchOnFirst((s, flux) -> {
Mono<Void> handler;
//有结果则认为成功
if (s.hasValue()) {
handler = message.ok("{\"success\":true}");
} else {
return message
.response(badRequest())
.then(Mono.empty());
}
return handler.thenMany(flux);
})
.onErrorResume(err -> message
.error(500, getErrorMessage(err))
.then(Mono.error(err)))
//跟踪信息
.as(FluxTracer
.create(DeviceTracer.SpanName.decode(deviceId),
builder -> builder.setAttribute(DeviceTracer.SpanKey.message, message.print())));
}
private Flux<DeviceMessage> doDecode(HttpExchangeMessage message, String[] paths) {
return message
.payload()
.flatMapMany(buf -> {
byte[] body = ByteBufUtil.getBytes(buf);
return TopicMessageCodec.decode(ObjectMappers.JSON_MAPPER, paths, body);
});
}
public String getErrorMessage(Throwable err) {
if (err instanceof JsonParseException) {
return "{\"success\":false,\"code\":\"request_body_format_error\"}";
}
return "{\"success\":false,\"code\":\"server_error\"}";
}
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceOperator device) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String token = req
.getSocketSession()
.getQueryParameters()
.get("token");
if (StringUtils.isEmpty(token)) {
return Mono.just(AuthenticationResponse.error(401, "认证参数错误"));
}
return device
.getConfig("bearer_token")
//校验token
.filter(value -> Objects.equals(value.asString(), token))
.map(ignore -> AuthenticationResponse.success(device.getDeviceId()))
//未配置或者配置不对
.switchIfEmpty(Mono.fromSupplier(() -> AuthenticationResponse.error(401, "token错误")));
}
static AuthenticationResponse deviceNotFound = AuthenticationResponse.error(404, "设备不存在");
@Override
public Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request, @Nonnull DeviceRegistry registry) {
if (!(request instanceof WebsocketAuthenticationRequest)) {
return Mono.just(AuthenticationResponse.error(400, "不支持的认证方式"));
}
WebsocketAuthenticationRequest req = ((WebsocketAuthenticationRequest) request);
String[] paths = TopicMessageCodec.removeProductPath(req.getSocketSession().getPath());
if (paths.length < 1) {
return Mono.just(AuthenticationResponse.error(400, "URL格式错误"));
}
return registry
.getDevice(paths[1])
.flatMap(device -> authenticate(request, device))
.defaultIfEmpty(deviceNotFound);
}
}
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.parsetools.RecordParser;
import lombok.SneakyThrows;
import org.jetlinks.core.message.AcknowledgeDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
public class TcpDevice {
@SneakyThrows
public static void main(String[] args) {
Vertx vertx = Vertx.vertx();
int start = args.length > 0 ? Integer.parseInt(args[0]) : 1;
int count = args.length > 1 ? Integer.parseInt(args[1]) : 8000;
String[] hosts = args.length > 2 ? args[2].split(",") : new String[]{"0.0.0.0"};
Flux.range(start, count)
.flatMap(i -> Mono
.create(sink -> {
NetClientOptions conf = new NetClientOptions().setTcpKeepAlive(true);
conf.setLocalAddress(hosts[i % hosts.length]);
vertx
.createNetClient(conf)
.connect(8802, "localhost")
.onFailure(err -> {
System.out.println(err.getMessage());
sink.success();
})
.onSuccess(socket -> {
RecordParser parser = RecordParser.newFixed(4);
AtomicReference<Buffer> buffer = new AtomicReference<>();
parser.handler(buf -> {
buffer.accumulateAndGet(buf, (a, b) -> {
if (a == null) {
parser.fixedSizeMode(buf.getInt(0));
return b;
}
parser.fixedSizeMode(4);
sink.success("tcp-off-" + i + ":" + socket.localAddress());
BinaryMessageType
.read(b.getByteBuf(),
null,
(downstream, seq) -> {
handleDownStream(downstream, seq, socket);
return null;
});
return null;
});
});
socket
.closeHandler((s) -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + "closed");
sink.success();
})
.exceptionHandler(er -> {
System.out.println("tcp-off-" + i + ":" + socket.localAddress() + " " + er.getMessage());
sink.success();
})
.handler(parser);
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "test");
message.setDeviceId("tcp-off-" + i);
socket.write(Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))));
});
}),
1024
)
.count()
.subscribe(System.out::println);
System.in.read();
}
protected static void handleDownStream(DeviceMessage downstream, int seq, NetSocket socket) {
if (!(downstream instanceof AcknowledgeDeviceMessage)) {
// System.out.println(downstream);
}
DeviceMessage reply = null;
if (downstream instanceof ReadPropertyMessage) {
reply = ((ReadPropertyMessage) downstream)
.newReply()
.success(Collections.singletonMap(
"temp0",
ThreadLocalRandom
.current()
.nextFloat() * 100
));
} else if (downstream instanceof WritePropertyMessage) {
reply = ((WritePropertyMessage) downstream)
.newReply()
.success(((WritePropertyMessage) downstream).getProperties());
}
if (reply != null) {
socket.write(
Buffer.buffer(TcpDeviceMessageCodec.wrapByteByf(BinaryMessageType.write(
reply
, seq, Unpooled.buffer())))
);
}
}
}
package org.jetlinks.protocol.official.tcp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.AckCode;
import org.jetlinks.protocol.official.binary.BinaryAcknowledgeDeviceMessage;
import org.jetlinks.protocol.official.binary.BinaryDeviceOnlineMessage;
import org.jetlinks.protocol.official.binary.BinaryMessageType;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class TcpDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata tcpConfig = new DefaultConfigMetadata(
"TCP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.TCP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//read index
payload.readInt();
//处理tcp连接后的首次消息
if (context.getDevice() == null) {
return handleLogin(payload, context);
}
return Mono.justOrEmpty(BinaryMessageType.read(payload, context.getDevice().getDeviceId()));
}
private Mono<DeviceMessage> handleLogin(ByteBuf payload, MessageDecodeContext context) {
DeviceMessage message = BinaryMessageType.read(payload);
if (message instanceof DeviceOnlineMessage) {
String token = message
.getHeader(BinaryDeviceOnlineMessage.loginToken)
.orElse(null);
String deviceId = message.getDeviceId();
return context
.getDevice(deviceId)
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
} else {
return ack(message, AckCode.noAuth, context);
}
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return Unpooled.wrappedBuffer(
Unpooled.buffer().writeInt(payload.writerIndex()),
payload);
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
if(source==null){
return Mono.empty();
}
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(EncodedMessage.simple(
wrapByteByf(BinaryMessageType.write(message, Unpooled.buffer()))
))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return Mono.just(EncodedMessage.simple(
wrapByteByf(
BinaryMessageType.write(deviceMessage, Unpooled.buffer())
)
));
}
}
package org.jetlinks.protocol.official.udp;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.NonNull;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.*;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.PasswordType;
import org.jetlinks.protocol.official.binary.*;
import org.reactivestreams.Publisher;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Mono;
import java.util.Objects;
public class UDPDeviceMessageCodec implements DeviceMessageCodec {
public static final String CONFIG_KEY_SECURE_KEY = "secureKey";
public static final DefaultConfigMetadata udpConfig = new DefaultConfigMetadata(
"UDP认证配置"
, "")
.add(CONFIG_KEY_SECURE_KEY, "secureKey", "密钥", new PasswordType());
@Override
public Transport getSupportTransport() {
return DefaultTransport.UDP;
}
@NonNull
@Override
public Publisher<? extends Message> decode(@NonNull MessageDecodeContext context) {
ByteBuf payload = context.getMessage().getPayload();
//todo 认证类型, 0 token,1 sign
byte authType = payload.readByte();
//前面是token
String token = (String) DataType.STRING.read(payload);
//接下来是消息
DeviceMessage message = BinaryMessageType.read(payload);
return context
.getDevice(message.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.flatMap(config -> {
if (Objects.equals(config.asString(), token)) {
return ack(message, AckCode.ok, context)
.thenReturn(message);
}
return Mono.empty();
}))
.switchIfEmpty(Mono.defer(() -> ack(message, AckCode.noAuth, context)));
}
public static ByteBuf wrapByteByf(ByteBuf payload) {
return payload;
}
private <T> Mono<T> ack(DeviceMessage source, AckCode code, MessageDecodeContext context) {
AcknowledgeDeviceMessage message = new AcknowledgeDeviceMessage();
message.addHeader(BinaryAcknowledgeDeviceMessage.codeHeader, code.name());
message.setDeviceId(source.getDeviceId());
message.setMessageId(source.getMessageId());
message.setCode(code.name());
message.setSuccess(code == AckCode.ok);
source.getHeader(BinaryMessageType.HEADER_MSG_SEQ)
.ifPresent(seq -> message.addHeader(BinaryMessageType.HEADER_MSG_SEQ, seq));
return ((FromDeviceMessageContext) context)
.getSession()
.send(doEncode(message, ""))
.then(Mono.fromRunnable(() -> {
if (source instanceof DeviceOnlineMessage && code != AckCode.ok) {
((FromDeviceMessageContext) context).getSession().close();
}
}));
}
@NonNull
@Override
public Publisher<? extends EncodedMessage> encode(@NonNull MessageEncodeContext context) {
DeviceMessage deviceMessage = ((DeviceMessage) context.getMessage());
if (deviceMessage instanceof DisconnectDeviceMessage) {
return Mono.empty();
}
return context
.getDevice(deviceMessage.getDeviceId())
.flatMap(device -> device
.getConfig(CONFIG_KEY_SECURE_KEY)
.map(config -> doEncode(deviceMessage, config.asString())));
}
private EncodedMessage doEncode(DeviceMessage message, String token) {
ByteBuf buf = Unpooled.buffer();
//todo 认证类型, 0 token,1 sign
buf.writeByte(0);
//token
DataType.STRING.write(buf, token);
//指令
return EncodedMessage.simple(wrapByteByf(BinaryMessageType.write(message, buf)));
}
}
#### 使用HTTP推送设备数据
上报属性例子:
```http request
POST /{productId}/{deviceId}/properties/report
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"properties":{
"temp":38.5
}
}
```
上报事件例子:
```http request
POST /{productId}/{deviceId}/event/{eventId}
Authorization: Bearer {产品或者设备中配置的Token}
Content-Type: application/json
{
"data":{
"address": ""
}
}
```
\ No newline at end of file
### 认证说明
CONNECT报文:
```text
clientId: 设备ID
username: secureId+"|"+timestamp
password: md5(secureId+"|"+timestamp+"|"+secureKey)
```
说明: secureId以及secureKey在创建设备产品或设备实例时进行配置.
timestamp为当前时间戳(毫秒),与服务器时间不能相差5分钟.
md5为32位,不区分大小写.
\ No newline at end of file
package org.jetlinks.protocol.official;
import lombok.SneakyThrows;
import org.eclipse.californium.core.CoapClient;
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.CoapResponse;
import org.eclipse.californium.core.CoapServer;
import org.eclipse.californium.core.coap.Option;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.network.CoapEndpoint;
import org.eclipse.californium.core.network.Endpoint;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.core.server.resources.Resource;
import org.hswebframework.utils.RandomUtil;
import org.jetlinks.core.defaults.CompositeProtocolSupports;
import org.jetlinks.core.device.DeviceInfo;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.CoapExchangeMessage;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.protocol.official.cipher.Ciphers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import java.util.concurrent.atomic.AtomicReference;
public class JetLinksCoapDeviceMessageCodecTest {
JetLinksCoapDeviceMessageCodec codec = new JetLinksCoapDeviceMessageCodec();
DeviceOperator device;
private final String key = RandomUtil.randomChar(16);
private TestDeviceRegistry registry;
AtomicReference<Message> messageRef = new AtomicReference<>();
private CoapServer server;
@After
public void shutdown(){
server.stop();
}
@Before
public void init() {
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
device = registry
.register(DeviceInfo.builder()
.id("test")
.protocol("jetlinks")
.build())
.flatMap(operator -> operator.setConfig("secureKey", key).thenReturn(operator))
.block();
server = new CoapServer() {
@Override
protected Resource createRoot() {
return new CoapResource("/", true) {
@Override
public void handlePOST(CoapExchange exchange) {
codec
.decode(new MessageDecodeContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return new CoapExchangeMessage(exchange);
}
@Override
public DeviceOperator getDevice() {
return device;
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
})
.doOnNext(messageRef::set)
.doOnError(Throwable::printStackTrace)
.subscribe();
}
@Override
public Resource getChild(String name) {
return this;
}
};
}
};
Endpoint endpoint = new CoapEndpoint.Builder()
.setPort(12341).build();
server.addEndpoint(endpoint);
server.start();
}
@Test
@SneakyThrows
public void test() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"data\":1}";
request.setURI("coap://localhost:12341/test/test/event/event1");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Assert.assertNotNull(messageRef.get());
Assert.assertTrue(messageRef.get() instanceof EventMessage);
System.out.println(messageRef.get());
}
@Test
@SneakyThrows
public void testTimeSync() {
CoapClient coapClient = new CoapClient();
Request request = Request.newPost();
String payload = "{\"messageId\":1}";
request.setURI("coap://localhost:12341/test/test/time-sync");
request.setPayload(Ciphers.AES.encrypt(payload.getBytes(), key));
// request.getOptions().setContentFormat(MediaTypeRegistry.APPLICATION_JSON);
CoapResponse response = coapClient.advanced(request);
Assert.assertTrue(response.isSuccess());
Assert.assertTrue(response.getResponseText().contains("timestamp"));
}
}
\ No newline at end of file
package org.jetlinks.protocol.official;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.defaults.CompositeProtocolSupports;
import org.jetlinks.core.device.DeviceInfo;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.ProductInfo;
import org.jetlinks.core.device.StandaloneDeviceMessageBroker;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DerivedMetadataMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.*;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.firmware.UpgradeFirmwareMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.jetlinks.core.server.session.DeviceSession;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import reactor.core.publisher.Mono;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
public class JetLinksMqttDeviceMessageCodecTest {
JetLinksMqttDeviceMessageCodec codec = new JetLinksMqttDeviceMessageCodec();
TestDeviceRegistry registry;
private EncodedMessage currentReply;
@Before
public void init() {
registry = new TestDeviceRegistry(new CompositeProtocolSupports(), new StandaloneDeviceMessageBroker());
registry.register(ProductInfo.builder()
.id("product1")
.protocol("jetlinks")
.build())
.flatMap(product -> registry.register(DeviceInfo.builder()
.id("device1")
.productId("product1")
.build()))
.block();
}
@Test
public void testReadProperty() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setProperties(Arrays.asList("name", "sn"));
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/properties/read");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testReadChildProperty() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test");
message.setProperties(Arrays.asList("name", "sn"));
ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
childDeviceMessage.setChildDeviceMessage(message);
childDeviceMessage.setChildDeviceId("test");
childDeviceMessage.setDeviceId("device1");
MqttMessage encodedMessage = codec.encode(createMessageContext(childDeviceMessage)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/test/properties/read");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ReadPropertyMessageReply);
ReadPropertyMessageReply reply = ((ReadPropertyMessageReply) message);
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testChildReadPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/read/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
ReadPropertyMessageReply reply = (ReadPropertyMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testWriteProperty() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setProperties(Collections.singletonMap("sn", "123"));
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/properties/write");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testChildWriteProperty() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setProperties(Collections.singletonMap("sn", "123"));
ChildDeviceMessage childDeviceMessage = new ChildDeviceMessage();
childDeviceMessage.setChildDeviceMessage(message);
childDeviceMessage.setChildDeviceId("test");
childDeviceMessage.setDeviceId("device1");
MqttMessage encodedMessage = codec.encode(createMessageContext(childDeviceMessage)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/child/device1/properties/write");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testWritePropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof WritePropertyMessageReply);
WritePropertyMessageReply reply = ((WritePropertyMessageReply) message);
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testWriteChildPropertyReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/write/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
WritePropertyMessageReply reply = (WritePropertyMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties().get("sn"), "test");
System.out.println(reply);
}
@Test
public void testInvokeFunction() {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setFunctionId("playVoice");
message.addInput("file", "http://baidu.com/1.mp3");
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/function/invoke");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testFirmwareUpgrade() {
UpgradeFirmwareMessage message = new UpgradeFirmwareMessage();
message.setDeviceId("device1");
message.setMessageId("test");
message.setVersion("1.0");
message.setUrl("http://baidu.com/1.mp3");
MqttMessage encodedMessage = codec.encode(createMessageContext(message)).block();
Assert.assertNotNull(encodedMessage);
Assert.assertEquals(encodedMessage.getTopic(), "/product1/device1/firmware/upgrade");
System.out.println(encodedMessage.getPayload().toString(StandardCharsets.UTF_8));
}
@Test
public void testInvokeFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof FunctionInvokeMessageReply);
FunctionInvokeMessageReply reply = ((FunctionInvokeMessageReply) message);
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getOutput(), "ok");
System.out.println(reply);
}
@Test
public void testInvokeChildFunctionReply() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/function/invoke/reply")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"output\":\"ok\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ChildDeviceMessage childReply = ((ChildDeviceMessage) message);
Assert.assertEquals(childReply.getDeviceId(), "device1");
Assert.assertEquals(childReply.getMessageId(), "test");
FunctionInvokeMessageReply reply = (FunctionInvokeMessageReply) childReply.getChildDeviceMessage();
;
Assert.assertTrue(reply.isSuccess());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getOutput(), "ok");
System.out.println(reply);
}
@Test
public void testEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof EventMessage);
EventMessage reply = ((EventMessage) message);
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getData(), 100);
System.out.println(reply);
}
@Test
public void testChildEvent() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/event/temp")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"data\":100}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
EventMessage reply = ((EventMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getData(), 100);
System.out.println(reply);
}
@Test
public void testPropertiesReport() {
Message message = codec.decode(createMessageContext(
SimpleMqttMessage
.builder()
.topic("/product1/device1/properties/report")
.payload(Unpooled.wrappedBuffer(("{\"messageId\":\"test\"," +
"\"properties\":{\"sn\":\"test\"}," +
"\"propertySourceTimes\":{\"sn\":10086}" +
"}").getBytes()))
.build()))
.blockFirst();
Assert.assertTrue(message instanceof ReportPropertyMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) message);
Assert.assertNotNull(reply.getPropertySourceTimes());
Assert.assertEquals(reply.getPropertySourceTimes().get("sn"), Long.valueOf(10086L));
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
System.out.println(reply);
}
@Test
public void testChildPropertiesReport() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/child/test/properties/report")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"properties\":{\"sn\":\"test\"}}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
ReportPropertyMessage reply = ((ReportPropertyMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getProperties(), Collections.singletonMap("sn", "test"));
System.out.println(reply);
}
@Test
public void testMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage.builder()
.topic("/product1/device1/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof DerivedMetadataMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) message);
Assert.assertEquals(reply.getDeviceId(), "device1");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getMetadata(), "1");
System.out.println(reply);
}
@Test
public void testChildMetadataDerived() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/child/test/metadata/derived")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\",\"metadata\":\"1\"}"
.getBytes()))
.build())).blockFirst();
Assert.assertTrue(message instanceof ChildDeviceMessage);
DerivedMetadataMessage reply = ((DerivedMetadataMessage) ((ChildDeviceMessage) message).getChildDeviceMessage());
Assert.assertEquals(reply.getDeviceId(), "test");
Assert.assertEquals(reply.getMessageId(), "test");
Assert.assertEquals(reply.getMetadata(), "1");
System.out.println(reply);
}
@Test
public void testTimeSync() {
Message message = codec.decode(createMessageContext(SimpleMqttMessage
.builder()
.topic("/product1/device1/time-sync")
.payload(Unpooled.wrappedBuffer("{\"messageId\":\"test\"}"
.getBytes()))
.build()))
.blockFirst();
Assert.assertNull(message);
Assert.assertNotNull(currentReply);
Assert.assertEquals(((MqttMessage) currentReply).getTopic(), "/product1/device1/time-sync/reply");
Assert.assertTrue(currentReply.payloadAsString().contains("timestamp"));
}
public void testTopic() {
}
public MessageEncodeContext createMessageContext(Message message) {
System.out.println(message.toString());
return new MessageEncodeContext() {
@Nonnull
@Override
public Message getMessage() {
return message;
}
@Override
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
public MessageDecodeContext createMessageContext(EncodedMessage message) {
System.out.println(message.toString());
return new FromDeviceMessageContext() {
@Nonnull
@Override
public EncodedMessage getMessage() {
return message;
}
@Override
public DeviceSession getSession() {
return new MockSession();
}
@Override
public DeviceOperator getDevice() {
return registry.getDevice("device1").block();
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return registry.getDevice(deviceId);
}
};
}
class MockSession implements DeviceSession {
@Override
public String getId() {
return "device1";
}
@Override
public String getDeviceId() {
return "device1";
}
@Nullable
@Override
public DeviceOperator getOperator() {
return registry.getDevice("device1").block();
}
@Override
public long lastPingTime() {
return 0;
}
@Override
public long connectTime() {
return 0;
}
@Override
public Mono<Boolean> send(EncodedMessage encodedMessage) {
currentReply = encodedMessage;
return Mono.just(true);
}
@Override
public Transport getTransport() {
return null;
}
@Override
public void close() {
}
@Override
public void ping() {
}
@Override
public boolean isAlive() {
return false;
}
@Override
public void onClose(Runnable call) {
}
}
}
\ No newline at end of file
package org.jetlinks.protocol.official;
import org.jetlinks.core.ProtocolSupports;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.defaults.DefaultDeviceOperator;
import org.jetlinks.core.defaults.DefaultDeviceProductOperator;
import org.jetlinks.core.device.*;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import org.jetlinks.supports.config.InMemoryConfigStorageManager;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class TestDeviceRegistry implements DeviceRegistry {
private DeviceMessageSenderInterceptor interceptor = new CompositeDeviceMessageSenderInterceptor();
private ConfigStorageManager manager = new InMemoryConfigStorageManager();
private Map<String, DeviceOperator> operatorMap = new ConcurrentHashMap<>();
private Map<String, DeviceProductOperator> productOperatorMap = new ConcurrentHashMap<>();
private ProtocolSupports supports;
private DeviceOperationBroker handler;
public TestDeviceRegistry(ProtocolSupports supports, DeviceOperationBroker handler) {
this.supports = supports;
this.handler = handler;
}
@Override
public Mono<DeviceOperator> getDevice(String deviceId) {
return Mono.fromSupplier(() -> operatorMap.get(deviceId));
}
@Override
public Mono<DeviceProductOperator> getProduct(String productId) {
return Mono.fromSupplier(() -> productOperatorMap.get(productId));
}
@Override
public Mono<DeviceOperator> register(DeviceInfo deviceInfo) {
DefaultDeviceOperator operator = new DefaultDeviceOperator(
deviceInfo.getId(),
supports, manager, handler, this
);
operatorMap.put(operator.getDeviceId(), operator);
return operator.setConfigs(
DeviceConfigKey.productId.value(deviceInfo.getProductId()),
DeviceConfigKey.protocol.value(deviceInfo.getProtocol()))
.thenReturn(operator);
}
@Override
public Mono<DeviceProductOperator> register(ProductInfo productInfo) {
DefaultDeviceProductOperator operator = new DefaultDeviceProductOperator(productInfo.getId(), supports, manager);
productOperatorMap.put(operator.getId(), operator);
return operator.setConfigs(
DeviceConfigKey.productId.value(productInfo.getMetadata()),
DeviceConfigKey.protocol.value(productInfo.getProtocol()))
.thenReturn(operator);
}
@Override
public Mono<Void> unregisterDevice(String deviceId) {
return Mono.justOrEmpty(deviceId)
.map(operatorMap::remove)
.then();
}
@Override
public Mono<Void> unregisterProduct(String productId) {
return Mono.justOrEmpty(productId)
.map(productOperatorMap::remove)
.then();
}
}
package org.jetlinks.protocol.official;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jetlinks.core.codec.defaults.TopicPayloadCodec;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.route.Route;
import org.junit.Test;
import reactor.test.StepVerifier;
import static org.junit.Assert.*;
public class TopicMessageCodecTest {
public void testChild(ObjectMapper objectMapper) {
ChildDeviceMessage message = new ChildDeviceMessage();
message.setDeviceId("test");
ReportPropertyMessage msg = new ReportPropertyMessage();
msg.setDeviceId("childId");
message.setChildDeviceMessage(msg);
message.setTimestamp(msg.getTimestamp());
TopicPayload payload = TopicMessageCodec.child.doEncode(objectMapper, message);
System.out.println(payload.getPayload().length);
assertEquals("/test/child/childId/properties/report", payload.getTopic());
TopicMessageCodec
.decode(objectMapper, payload.getTopic(), payload.getPayload())
.as(StepVerifier::create)
.expectNextMatches(deviceMessage -> {
System.out.println(message);
System.out.println(deviceMessage);
return deviceMessage.toJson().equals(message.toJson());
})
.verifyComplete();
}
@Test
public void testRoute() {
for (TopicMessageCodec value : TopicMessageCodec.values()) {
Route route = value.getRoute();
if (null != route)
System.out.println(route.getAddress());
}
}
@Test
public void doTest() {
testChild(ObjectMappers.JSON_MAPPER);
testChild(ObjectMappers.CBOR_MAPPER);
}
@Test
public void testEvent() {
EventMessage eventMessage = new EventMessage();
eventMessage.setEvent("test");
eventMessage.setDeviceId("test-device");
eventMessage.setData("123");
TopicPayload payload = TopicMessageCodec.encode(ObjectMappers.JSON_MAPPER, eventMessage);
assertEquals(payload.getTopic(), "/test-device/event/test");
DeviceMessage msg = TopicMessageCodec
.decode(ObjectMappers.JSON_MAPPER, payload.getTopic(), payload.getPayload())
.blockLast();
assertEquals(msg.toJson(), eventMessage.toJson());
}
}
\ No newline at end of file
package org.jetlinks.protocol.official.binary;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessageReply;
import org.jetlinks.core.message.property.*;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
public class BinaryMessageTypeTest {
@Test
public void testOnline() {
DeviceOnlineMessage message = new DeviceOnlineMessage();
message.setDeviceId("1000");
message.addHeader(BinaryDeviceOnlineMessage.loginToken, "admin");
ByteBuf byteBuf = BinaryMessageType.write(message, Unpooled.buffer());
System.out.println(ByteBufUtil.prettyHexDump(byteBuf));
ByteBuf buf = Unpooled
.buffer()
.writeInt(byteBuf.readableBytes())
.writeBytes(byteBuf);
System.out.println(ByteBufUtil.prettyHexDump(buf));
//登录报文
System.out.println(ByteBufUtil.hexDump(buf));
}
@Test
public void testReport() {
ReportPropertyMessage message = new ReportPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
}
@Test
public void testRead() {
ReadPropertyMessage message = new ReadPropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonList("temp"));
doTest(message);
ReadPropertyMessageReply reply = new ReadPropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testWrite() {
WritePropertyMessage message = new WritePropertyMessage();
message.setDeviceId("test");
message.setMessageId("test123");
message.setProperties(Collections.singletonMap("temp", 32.88));
doTest(message);
WritePropertyMessageReply reply = new WritePropertyMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setProperties(Collections.singletonMap("temp", 32.88));
doTest(reply);
}
@Test
public void testFunction() {
FunctionInvokeMessage message = new FunctionInvokeMessage();
message.setFunctionId("123");
message.setDeviceId("test");
message.setMessageId("test123");
message.addInput("test", 1);
doTest(message);
FunctionInvokeMessageReply reply = new FunctionInvokeMessageReply();
reply.setDeviceId("test");
reply.setMessageId("test123");
reply.setOutput(123);
doTest(reply);
}
public void doTest(DeviceMessage message) {
ByteBuf data = BinaryMessageType.write(message, Unpooled.buffer());
// System.out.println(ByteBufUtil.prettyHexDump(data));
ByteBuf buf = Unpooled.buffer()
.writeInt(data.readableBytes())
.writeBytes(data);
System.out.println(ByteBufUtil.prettyHexDump(buf));
System.out.println(ByteBufUtil.hexDump(buf));
//将长度字节读取后,直接解析报文正文
buf.readInt();
DeviceMessage read = BinaryMessageType.read(buf);
if (null != read.getHeaders()) {
read.getHeaders().forEach(message::addHeader);
}
System.out.println(read);
Assert.assertEquals(read.toString(), message.toString());
}
}
\ No newline at end of file
<?xml version="1.0" encoding="utf-8" ?>
<configuration>
<logger name="io.netty" level="warn"/>
<logger name="org.apache" level="warn"/>
 
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-4relative [%thread] %-5level %logger{35} - %msg %n</pattern>
</encoder>
</appender>
<root level="debug">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment