From 327267727c36cef76ee554e1ad97099180cd4f13 Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Tue, 13 Jul 2021 09:08:37 +0800 Subject: [PATCH] screen-manage 单位转换信息添加缓存 --- screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java | 62 ++++++++++++++++++++----------- 1 files changed, 40 insertions(+), 22 deletions(-) diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java index 54c9d35..c257205 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java @@ -10,8 +10,8 @@ import org.springframework.util.StringUtils; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; -import java.util.stream.Collectors; import com.alibaba.fastjson.JSON; import com.moral.api.service.DeviceService; @@ -37,10 +37,9 @@ private RedisTemplate redisTemplate; //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory") public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); - System.out.println(msg); try { Map<String, Object> data = JSON.parseObject(msg, HashMap.class); Object mac = data.get("mac"); @@ -53,22 +52,32 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); + data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyMinutelyService.insertHistoryMinutely(data); + historyMinutelyService.insertHistoryMinutely(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory") public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -83,25 +92,34 @@ } //������������ - data = data.entrySet().stream() - .filter(map -> { - String key = map.getKey(); - return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); - }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); + data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), value); + } else { + newMap.put(key, value); + } + iterator.remove(); + } //��������������� - historyHourlyService.insertHistoryHourly(data); + historyHourlyService.insertHistoryHourly(newMap); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } //������������������������������������������������������ - @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory") public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); - //System.out.println(record.offset() + "===>" + msg); try { Map<String, Object> data = JSON.parseObject(msg, HashMap.class); Object mac = data.get("mac"); @@ -115,12 +133,12 @@ //������������ data = deviceService.adjustDeviceData(data); //������redis - redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data); + redisTemplate.opsForHash().put(RedisConstants.DEVICE_DATA, mac, data); //��������������������������� deviceService.judgeDeviceState(data); ack.acknowledge(); } catch (Exception e) { - //log.error("param{}" + msg); + log.error("param{}" + msg); } } } -- Gitblit v1.8.0