From f8c2609bca86f6e4a2acd2e92a7eae29b86c2070 Mon Sep 17 00:00:00 2001
From: kaiyu <404897439@qq.com>
Date: Wed, 14 Jul 2021 14:05:42 +0800
Subject: [PATCH] screen-api 添加从kafka取出时间戳
---
screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java | 102 +++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 99 insertions(+), 3 deletions(-)
diff --git a/screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java b/screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java
index 07b3958..fb62517 100644
--- a/screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java
+++ b/screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java
@@ -1,15 +1,32 @@
package com.moral.api.service.impl;
+import com.alibaba.fastjson.JSONObject;
+import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.HistoryDaily;
+import com.moral.api.entity.HistoryHourly;
+import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryDailyMapper;
import com.moral.api.service.HistoryDailyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.moral.api.service.HistoryHourlyService;
+import com.moral.api.service.SensorService;
+import com.moral.constant.Constants;
+import com.moral.util.AmendUtils;
+import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.util.ObjectUtils;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.OptionalDouble;
+import java.util.stream.Collectors;
+import java.util.stream.DoubleStream;
/**
* <p>
@@ -25,9 +42,88 @@
@Autowired
private HistoryDailyMapper historyDailyMapper;
+ @Autowired
+ private SensorService sensorService;
+
+ @Autowired
+ private HistoryHourlyService historyHourlyService;
+
@Override
- public void insertHistoryDaily(List<HistoryDaily> list) {
- System.out.println(list);
- historyDailyMapper.insertHistoryDaily(list);
+ @Transactional
+ public void insertHistoryDaily() {
+ String format = DateUtils.yyyy_MM_dd_EN;
+ Date now = new Date();
+ //���������������������
+ Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfDay(now, -1), format);
+ //���������������������
+ Date end = DateUtils.dataToTimeStampTime(now, format);
+ //������
+ QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
+ sensorQueryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
+ List<Object> sensorCodes = sensorService.listObjs(sensorQueryWrapper);
+
+ //������������������������������
+ QueryWrapper<HistoryHourly> historyHourlyQueryWrapper = new QueryWrapper<>();
+ historyHourlyQueryWrapper.ge("time", DateUtils.dateToDateString(start)).le("time", DateUtils.dateToDateString(end));
+ List<Map<String, Object>> dailyData = historyHourlyService.listMaps(historyHourlyQueryWrapper);
+ if (dailyData.size() == 0) {
+ return;
+ }
+
+ //���mac������
+ Map<String, List<Map<String, Object>>> data = dailyData.parallelStream().collect(Collectors.groupingBy(o -> (String) o.get("mac")));
+
+ //���������������������������
+ List<Map<String, Object>> insertData = new ArrayList<>();
+
+ data.forEach((key, value) -> {
+ Map<String, Object> dataMap = new HashMap<>();
+ Map<String, Object> jsonMap = new HashMap<>();
+ dataMap.put("mac", key);
+ dataMap.put("time", start);
+
+ //���������������������������������������������������
+ List<Map<String, Object>> tempValue = new ArrayList<>(value);
+
+ value.removeIf(map -> ((Date) map.get("time")).getTime() == start.getTime());
+ //������8������������������������������������
+ Object o3AvgOfDay = AmendUtils.getO3AvgOfDay(value);
+ if (o3AvgOfDay != null) {
+ jsonMap.put(Constants.SENSOR_CODE_O3, o3AvgOfDay);
+ }
+ //������������������������������������
+ tempValue.removeIf(map -> ((Date) map.get("time")).getTime() == end.getTime());
+
+ //���������������������������
+ Object windDirAvg = AmendUtils.getWindDirAvg(value);
+ if (windDirAvg != null) {
+ jsonMap.put(Constants.SENSOR_CODE_WIND_DIR, windDirAvg);
+ }
+
+ sensorCodes.forEach(sensorCode -> {
+ OptionalDouble optionalDouble = tempValue.parallelStream()
+ .flatMapToDouble(v -> {
+ Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
+ Object sensorValue = dataValue.get(sensorCode.toString());
+ if (ObjectUtils.isEmpty(sensorValue)) {
+ return null;
+ }
+ if (sensorCode.equals(Constants.SENSOR_CODE_O3)) {
+ return null;
+ }
+ return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
+ }).average();
+ if (optionalDouble.isPresent()) {
+ //���������������������
+ double sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 4);
+ jsonMap.put(sensorCode.toString(), sciCal);
+ }
+ });
+ dataMap.put("value", JSONObject.toJSONString(jsonMap));
+ insertData.add(dataMap);
+ });
+
+ //���������������
+ historyDailyMapper.insertHistoryDaily(insertData);
}
}
--
Gitblit v1.8.0