From f8c2609bca86f6e4a2acd2e92a7eae29b86c2070 Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Wed, 14 Jul 2021 14:05:42 +0800 Subject: [PATCH] screen-api 添加从kafka取出时间戳 --- screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java | 58 ++++++++++++++++++++++++++++++++++++++-------------------- 1 files changed, 38 insertions(+), 20 deletions(-) diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java index 94ec2f1..4d1f804 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java @@ -10,8 +10,8 @@ import org.springframework.util.StringUtils; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import java.util.stream.Collectors; import com.alibaba.fastjson.JSON; import com.moral.api.service.DeviceService; @@ -37,7 +37,7 @@ private RedisTemplate redisTemplate; //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory") public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -52,23 +52,32 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyMinutelyService.insertHistoryMinutely(data); + historyMinutelyService.insertHistoryMinutely(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory") public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -83,23 +92,32 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyHourlyService.insertHistoryHourly(data); + historyHourlyService.insertHistoryHourly(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������������������������������������������������ - @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory") public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -115,12 +133,12 @@ //������������ data = deviceService.adjustDeviceData(data); //������redis - redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data); + redisTemplate.opsForHash().put(RedisConstants.DEVICE_DATA, mac, data); //��������������������������� deviceService.judgeDeviceState(data); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } } -- Gitblit v1.8.0