Commit c5d2d356 by 罗志长

feat: 长者日报呼吸心率数据存es

parent 1d0412b7
package com.makeit.dto.platform.elder.es;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 长者每日呼吸数据
*/
@Data
public class PlatElderBreatheHeartRateDTO {
/**
* 主键ID
*/
private String id;
/**
* 租户ID
*/
private String tenantId;
/**
* 创建时间
*/
private LocalDateTime createDate;
/**
* 长者id
*/
private String elderId;
/**
* 上报时间
*/
private Long reportTime;
/**
* 心率
*/
private Integer heartRate;
/**
* 呼吸率
*/
private Integer respiratoryRate;
/**
* 体动值
*/
private Integer bodyMove;
/**
* 设备Id
*/
private String deviceId;
/**
* iot设备Id
*/
private String iotDeviceId;
}
package com.makeit.service.platform.elder.impl;
import cn.hutool.core.util.IdUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import com.makeit.dto.platform.elder.es.PlatElderBreatheHeartRateDTO;
import com.makeit.entity.platform.device.PlatDevice;
import com.makeit.entity.platform.elder.PlatElder;
import com.makeit.entity.platform.elder.PlatElderBreatheHeartRateRecord;
......@@ -17,9 +20,16 @@ import com.makeit.utils.data.convert.StreamUtil;
import com.makeit.utils.time.LocalDateTimeUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
......@@ -33,6 +43,8 @@ public class PlatElderBreatheHeartRateRecordServiceImpl extends ServiceImpl<Plat
private PlatElderRealTimeService platElderRealTimeService;
@Autowired
private IotProductDeviceService iotProductDeviceService;
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
public void breatheHeartRateRecordTask() {
......@@ -46,28 +58,38 @@ public class PlatElderBreatheHeartRateRecordServiceImpl extends ServiceImpl<Plat
if (CollectionUtils.isEmpty(elderList)) {
return;
}
List<PlatElderBreatheHeartRateRecord> list = Lists.newArrayList();
for (PlatElder e : elderList) {
PlatDevice platDevice = platElderRealTimeService.getBreathDevice(e.getId(), null);
if (platDevice == null) {
continue;
}
BulkRequest request = new BulkRequest();
request.timeout("600s");
List<DeviceInfoContentBreathe> breatheList = iotProductDeviceService.getDeviceLogByTimeRangeBreathe(platDevice.getOriDeviceId(), 2 * 24 * 3600, start, end);
List<PlatElderBreatheHeartRateRecord> data = StreamUtil.map(breatheList, b -> {
PlatElderBreatheHeartRateRecord record = new PlatElderBreatheHeartRateRecord();
for (DeviceInfoContentBreathe breathe : breatheList) {
PlatElderBreatheHeartRateDTO record = new PlatElderBreatheHeartRateDTO();
record.setElderId(e.getId());
record.setReportTime(b.getTimestamp());
record.setHeartRate(b.getProperties().getHr());
record.setRespiratoryRate(b.getProperties().getBr());
record.setBodyMove(b.getProperties().getBodymove());
record.setReportTime(breathe.getTimestamp());
record.setHeartRate(breathe.getProperties().getHr());
record.setRespiratoryRate(breathe.getProperties().getBr());
record.setBodyMove(breathe.getProperties().getBodymove());
record.setDeviceId(platDevice.getId());
record.setIotDeviceId(platDevice.getOriDeviceId());
record.setTenantId(e.getTenantId());
return record;
});
list.addAll(data);
record.setCreateDate(LocalDateTime.now());
request.add(new IndexRequest("plat_elder_breathe_heart_rate").id(IdUtil.fastUUID()).type("_doc").source(JSONObject.toJSONString(record), XContentType.JSON));
}
if (CollectionUtils.isNotEmpty(breatheList)) {
try {
BulkResponse response = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (response.hasFailures()) {
log.error("es保存长者每日呼吸数据失败");
}
} catch (IOException exception) {
log.error("es保存长者每日呼吸数据失败:{}", exception);
}
}
}
saveBatch(list);
}
@Override
......
......@@ -2,9 +2,7 @@ package com.makeit.service.platform.elder.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.google.common.collect.Lists;
......@@ -36,10 +34,22 @@ import com.makeit.utils.time.LocalDateTimeUtils;
import com.makeit.vo.platform.elder.realtime.PlatElderCoordinateVO;
import com.makeit.vo.platform.elder.realtime.PlatElderRealTimeHeartRespiratoryVO;
import com.makeit.vo.platform.elder.report.day.*;
import org.springframework.beans.BeanUtils;
import org.apache.commons.collections4.MapUtils;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDate;
......@@ -47,10 +57,7 @@ import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@Service
......@@ -86,6 +93,8 @@ public class PlatElderDayReportDayServiceImpl implements PlatElderDayReportDaySe
private PlatElderBreatheHeartRateRecordService platElderBreatheHeartRateRecordService;
@Autowired
private SaasSleepEvaluateStandardReportService saasSleepEvaluateStandardReportService;
@Autowired
private RestHighLevelClient restHighLevelClient;
private static LocalDateTime dayStartNow(LocalDate now) {
return LocalDateTimeUtils.getDayStart(now);
......@@ -359,18 +368,61 @@ public class PlatElderDayReportDayServiceImpl implements PlatElderDayReportDaySe
long reportTimeStart = LocalDateTimeUtils.getMilliSecond(start);
long reportTimeEnd = LocalDateTimeUtils.getMilliSecond(end);
List<PlatElderBreatheHeartRateRecord> list = platElderBreatheHeartRateRecordService
.list(platElderIdDTO.getElderId(), platElderIdDTO.getDeviceId(), platElderIdDTO.getTenantId(), reportTimeStart, reportTimeEnd);
return StreamUtil.map(list, e -> {
PlatElderRealTimeHeartRespiratoryVO vo = new PlatElderRealTimeHeartRespiratoryVO();
vo.setTime(LongTimestampUtil.toLocalDateTime(e.getReportTime()));
vo.setHeartRate(e.getHeartRate());
vo.setRespiratoryRate(e.getRespiratoryRate());
return vo;
});
int size = 1000;
long scrollTime = 1L;
SearchRequest request = new SearchRequest();
request.indices("plat_elder_breathe_heart_rate");
request.types("_doc");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.size(size);
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
if (StringUtils.isNotBlank(platElderIdDTO.getElderId())) {
boolQueryBuilder.must(QueryBuilders.matchQuery("elderId", platElderIdDTO.getElderId()));
}
if (StringUtils.isNotBlank(platElderIdDTO.getTenantId())) {
boolQueryBuilder.must(QueryBuilders.matchQuery("tenantId", platElderIdDTO.getTenantId()));
}
if (StringUtils.isNotBlank(platElderIdDTO.getDeviceId())) {
boolQueryBuilder.must(QueryBuilders.matchQuery("iotDeviceId", platElderIdDTO.getDeviceId()));
}
boolQueryBuilder.must(QueryBuilders.rangeQuery("reportTime").gte(reportTimeStart).lte(reportTimeEnd));
builder.query(boolQueryBuilder);
request.scroll(TimeValue.timeValueMinutes(scrollTime));
request.source(builder);
SearchResponse response;
List<PlatElderRealTimeHeartRespiratoryVO> list = Lists.newArrayList();
try {
response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
SearchHit[] results = response.getHits().getHits();
String scrollId = response.getScrollId();
add(list, results);
while (results != null && results.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
scrollRequest.scroll(TimeValue.timeValueMinutes(scrollTime));
response = restHighLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
scrollId = response.getScrollId();
results = response.getHits().getHits();
add(list, results);
}
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
return list;
}
private void add(List<PlatElderRealTimeHeartRespiratoryVO> list, SearchHit[] results) {
for (SearchHit result : results) {
Map<String, Object> sourceAsMap = result.getSourceAsMap();
PlatElderRealTimeHeartRespiratoryVO heartRespiratoryVO = new PlatElderRealTimeHeartRespiratoryVO();
heartRespiratoryVO.setTime(LongTimestampUtil.toLocalDateTime(MapUtils.getLong(sourceAsMap, "reportTime")));
heartRespiratoryVO.setHeartRate(MapUtils.getInteger(sourceAsMap, "heartRate"));
heartRespiratoryVO.setRespiratoryRate(MapUtils.getInteger(sourceAsMap, "respiratoryRate"));
list.add(heartRespiratoryVO);
}
}
@Override
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or sign in to comment