From 626b1c85571181031f62cdf24a9392f4dea68c82 Mon Sep 17 00:00:00 2001 From: cjl <276999030@qq.com> Date: Mon, 10 Jul 2023 15:20:36 +0800 Subject: [PATCH] chore: 提交测试 --- screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java | 398 ++++++++++++++++++++++++++++---------------------------- 1 files changed, 199 insertions(+), 199 deletions(-) diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java index 0548b82..d6661e4 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java @@ -1,201 +1,201 @@ -//package com.moral.api.kafka.consumer; -// -//import com.moral.api.service.*; -//import lombok.extern.slf4j.Slf4j; -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.data.redis.core.RedisTemplate; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.stereotype.Component; -//import org.springframework.util.ObjectUtils; -// -//import java.util.HashMap; -//import java.util.Iterator; -//import java.util.Map; -// -//import com.alibaba.fastjson.JSON; -//import com.moral.constant.KafkaConstants; -//import com.moral.constant.RedisConstants; -// -// -// -//@Component -//@Slf4j -//public class DeviceConsumer { -// -// @Autowired -// private HistoryMinutelyService historyMinutelyService; -// -// @Autowired -// private HistoryHourlyService historyHourlyService; -// -// @Autowired -// private DeviceService deviceService; -// -// @Autowired -// private RedisTemplate redisTemplate; -// -// @Autowired -// private HistorySecondCruiserService historySecondCruiserService; -// -// @Autowired -// private HistorySecondUavService historySecondUavService; -// -// //������������ -// @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") -// public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ -//// data.remove("time"); -// data.remove("entryTime"); -// Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); -// Map<String, Object> newMap = new HashMap<>(); -// Map.Entry<String, Object> next; -// while (iterator.hasNext()) { -// next = iterator.next(); -// String key = next.getKey(); -// Object value = next.getValue(); -// if (key.contains("-Avg")) { -// newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); -// } else { -// newMap.put(key, value); -// } -// iterator.remove(); -// } -// //��������������� -// historyMinutelyService.insertHistoryMinutely(newMap); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������ -// @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") -// public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ -//// data.remove("time"); -// data.remove("entryTime"); -// Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); -// Map<String, Object> newMap = new HashMap<>(); -// Map.Entry<String, Object> next; -// while (iterator.hasNext()) { -// next = iterator.next(); -// String key = next.getKey(); -// Object value = next.getValue(); -// if (key.contains("-Avg")) { -// newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); -// } else { -// newMap.put(key, value); -// } -// iterator.remove(); -// } -// //��������������� -// historyHourlyService.insertHistoryHourly(newMap); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������������������������������������������������ -// @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") -// public void listenSecond(ConsumerRecord<String, String> record) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// return; -// } -// //������������ +package com.moral.api.kafka.consumer; + +import com.moral.api.service.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import com.alibaba.fastjson.JSON; +import com.moral.constant.KafkaConstants; +import com.moral.constant.RedisConstants; + + + +@Component +@Slf4j +public class DeviceConsumer { + + @Autowired + private HistoryMinutelyService historyMinutelyService; + + @Autowired + private HistoryHourlyService historyHourlyService; + + @Autowired + private DeviceService deviceService; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private HistorySecondCruiserService historySecondCruiserService; + + @Autowired + private HistorySecondUavService historySecondUavService; + + //������������ + @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") + public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ // data.remove("time"); -// data.remove("entryTime"); -// -// //������������ -// data = deviceService.adjustDeviceData(data,"0"); -// //������redis -// data.put("DataTime", time); -// redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); -// //��������������������������� -// data.put("mac", mac); -// deviceService.judgeDeviceState(data); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������������ -// @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") -// public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ + data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); + } else { + newMap.put(key, value); + } + iterator.remove(); + } + //��������������� + historyMinutelyService.insertHistoryMinutely(newMap); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������ + @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") + public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ // data.remove("time"); -// data.remove("entryTime"); -// -// historySecondUavService.insertHistorySecond(data); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������������ -// @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") -// public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ -// data.remove("time"); -// data.remove("entryTime"); -// -// historySecondCruiserService.insertHistorySecond(data); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + e); -// } -// } -//} + data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); + } else { + newMap.put(key, value); + } + iterator.remove(); + } + //��������������� + historyHourlyService.insertHistoryHourly(newMap); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������������������������������������������������ + @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") + public void listenSecond(ConsumerRecord<String, String> record) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + return; + } + //������������ + data.remove("time"); + data.remove("entryTime"); + + //������������ + data = deviceService.adjustDeviceData(data,"0"); + //������redis + data.put("DataTime", time); + redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); + //��������������������������� + data.put("mac", mac); + deviceService.judgeDeviceState(data); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������������ + @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") + public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ + data.remove("time"); + data.remove("entryTime"); + + historySecondUavService.insertHistorySecond(data); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������������ + @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") + public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ + data.remove("time"); + data.remove("entryTime"); + + historySecondCruiserService.insertHistorySecond(data); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + e); + } + } +} -- Gitblit v1.8.0