From e6463e3aad65706d4540235e30941717532f9f18 Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Tue, 13 Jul 2021 15:48:42 +0800 Subject: [PATCH] screen-manage 完成实时界面单位转换功能 --- screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java | 60 +++++++++++++++++++++++++++++++++++++++--------------------- 1 files changed, 39 insertions(+), 21 deletions(-) diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java index 94ec2f1..c257205 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java @@ -10,8 +10,8 @@ import org.springframework.util.StringUtils; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import java.util.stream.Collectors; import com.alibaba.fastjson.JSON; import com.moral.api.service.DeviceService; @@ -20,7 +20,7 @@ import com.moral.constant.KafkaConstants; import com.moral.constant.RedisConstants; -@Component +//@Component @Slf4j public class KafkaConsumer { @@ -37,7 +37,7 @@ private RedisTemplate redisTemplate; //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory") public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -52,23 +52,32 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyMinutelyService.insertHistoryMinutely(data); + historyMinutelyService.insertHistoryMinutely(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory") public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -83,23 +92,32 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyHourlyService.insertHistoryHourly(data); + historyHourlyService.insertHistoryHourly(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������������������������������������������������ - @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory") public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -115,12 +133,12 @@ //������������ data = deviceService.adjustDeviceData(data); //������redis - redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data); + redisTemplate.opsForHash().put(RedisConstants.DEVICE_DATA, mac, data); //��������������������������� deviceService.judgeDeviceState(data); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } } -- Gitblit v1.8.0