Commit 7ab531ac by 罗志长

async

parent 5fed9d5e
package com.makeit.config;
import com.alibaba.ttl.threadpool.TtlExecutors;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
public class AsyncConfig {
@Bean(name = "asyncTaskExecutor")
public Executor asyncTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("async-service-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setKeepAliveSeconds(60);
executor.setAllowCoreThreadTimeOut(false);
executor.initialize();
return TtlExecutors.getTtlExecutor(executor);
}
}
...@@ -54,10 +54,12 @@ public class MqttPushClient { ...@@ -54,10 +54,12 @@ public class MqttPushClient {
client.setCallback(pushCallback); client.setCallback(pushCallback);
client.connect(options); client.connect(options);
} catch (Exception e) { } catch (Exception e) {
logger.error("mqtt连接失败", e);
e.printStackTrace(); e.printStackTrace();
} }
} }
} catch (Exception e) { } catch (Exception e) {
logger.error("mqtt连接失败", e);
e.printStackTrace(); e.printStackTrace();
} }
} }
......
...@@ -104,6 +104,7 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -104,6 +104,7 @@ public class PushCallback implements MqttCallbackExtended {
} }
@Override @Override
@Async("asyncTaskExecutor")
public void messageArrived(String topic, MqttMessage message) { public void messageArrived(String topic, MqttMessage message) {
try { try {
// 收到消息并设置返回字符串格式 // 收到消息并设置返回字符串格式
...@@ -161,10 +162,8 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -161,10 +162,8 @@ public class PushCallback implements MqttCallbackExtended {
if (!"0".equals(success)) { if (!"0".equals(success)) {
return; return;
} }
CompletableFuture.runAsync(() -> { platDeviceService.syncIotProperties(device.getDeviceId(), properties);
platDeviceService.syncIotProperties(device.getDeviceId(), properties); RedisUtil.delete(redisKey);
RedisUtil.delete(redisKey);
}); // 要加线程池
} }
} }
...@@ -174,7 +173,6 @@ public class PushCallback implements MqttCallbackExtended { ...@@ -174,7 +173,6 @@ public class PushCallback implements MqttCallbackExtended {
* @param device * @param device
*/ */
@TenantIdIgnore @TenantIdIgnore
@Async
public void checkAlarm(DeviceInfo device) { public void checkAlarm(DeviceInfo device) {
HeaderInfo headers = device.getHeaders(); HeaderInfo headers = device.getHeaders();
List<HeaderInfo.Bind> bindings = headers.getBindings(); List<HeaderInfo.Bind> bindings = headers.getBindings();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment