From 327267727c36cef76ee554e1ad97099180cd4f13 Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Tue, 13 Jul 2021 09:08:37 +0800 Subject: [PATCH] screen-manage 单位转换信息添加缓存 --- screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java | 66 ++++++++++++++++++++------------ 1 files changed, 41 insertions(+), 25 deletions(-) diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java index cc98e98..c257205 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java @@ -10,14 +10,13 @@ import org.springframework.util.StringUtils; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import java.util.stream.Collectors; import com.alibaba.fastjson.JSON; import com.moral.api.service.DeviceService; import com.moral.api.service.HistoryHourlyService; import com.moral.api.service.HistoryMinutelyService; -import com.moral.api.util.AdjustDataUtils; import com.moral.constant.KafkaConstants; import com.moral.constant.RedisConstants; @@ -35,13 +34,10 @@ private DeviceService deviceService; @Autowired - private AdjustDataUtils adjustDataUtils; - - @Autowired private RedisTemplate redisTemplate; //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory") public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -56,22 +52,32 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); + data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyMinutelyService.insertHistoryMinutely(data); + historyMinutelyService.insertHistoryMinutely(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory") public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -86,22 +92,32 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); + data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyHourlyService.insertHistoryHourly(data); + historyHourlyService.insertHistoryHourly(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������������������������������������������������ - @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory") public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -115,14 +131,14 @@ return; } //������������ - data = adjustDataUtils.adjust(data); + data = deviceService.adjustDeviceData(data); //������redis - redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + "_" + mac, data); + redisTemplate.opsForHash().put(RedisConstants.DEVICE_DATA, mac, data); //��������������������������� deviceService.judgeDeviceState(data); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } } -- Gitblit v1.8.0