From e2bb1b2698bad87ff7fff9a92a4cb0b7b71c5483 Mon Sep 17 00:00:00 2001 From: cjl <276999030@qq.com> Date: Fri, 07 Jul 2023 15:29:19 +0800 Subject: [PATCH] revert: 错误提交 --- screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java | 398 ++++++++++++++++++++++++++++---------------------------- 1 files changed, 199 insertions(+), 199 deletions(-) diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java index d6661e4..0548b82 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java @@ -1,201 +1,201 @@ -package com.moral.api.kafka.consumer; - -import com.moral.api.service.*; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.redis.core.RedisTemplate; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.Acknowledgment; -import org.springframework.stereotype.Component; -import org.springframework.util.ObjectUtils; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -import com.alibaba.fastjson.JSON; -import com.moral.constant.KafkaConstants; -import com.moral.constant.RedisConstants; - - - -@Component -@Slf4j -public class DeviceConsumer { - - @Autowired - private HistoryMinutelyService historyMinutelyService; - - @Autowired - private HistoryHourlyService historyHourlyService; - - @Autowired - private DeviceService deviceService; - - @Autowired - private RedisTemplate redisTemplate; - - @Autowired - private HistorySecondCruiserService historySecondCruiserService; - - @Autowired - private HistorySecondUavService historySecondUavService; - - //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") - public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { - String msg = record.value(); - try { - Map<String, Object> data = JSON.parseObject(msg, Map.class); - Object mac = data.get("mac"); - Object time = data.get("DataTime"); - if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { - log.warn("some properties is null, param{}", msg); - ack.acknowledge(); - return; - } - - //������������ +//package com.moral.api.kafka.consumer; +// +//import com.moral.api.service.*; +//import lombok.extern.slf4j.Slf4j; +//import org.apache.kafka.clients.consumer.ConsumerRecord; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.data.redis.core.RedisTemplate; +//import org.springframework.kafka.annotation.KafkaListener; +//import org.springframework.kafka.support.Acknowledgment; +//import org.springframework.stereotype.Component; +//import org.springframework.util.ObjectUtils; +// +//import java.util.HashMap; +//import java.util.Iterator; +//import java.util.Map; +// +//import com.alibaba.fastjson.JSON; +//import com.moral.constant.KafkaConstants; +//import com.moral.constant.RedisConstants; +// +// +// +//@Component +//@Slf4j +//public class DeviceConsumer { +// +// @Autowired +// private HistoryMinutelyService historyMinutelyService; +// +// @Autowired +// private HistoryHourlyService historyHourlyService; +// +// @Autowired +// private DeviceService deviceService; +// +// @Autowired +// private RedisTemplate redisTemplate; +// +// @Autowired +// private HistorySecondCruiserService historySecondCruiserService; +// +// @Autowired +// private HistorySecondUavService historySecondUavService; +// +// //������������ +// @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") +// public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { +// String msg = record.value(); +// try { +// Map<String, Object> data = JSON.parseObject(msg, Map.class); +// Object mac = data.get("mac"); +// Object time = data.get("DataTime"); +// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { +// log.warn("some properties is null, param{}", msg); +// ack.acknowledge(); +// return; +// } +// +// //������������ +//// data.remove("time"); +// data.remove("entryTime"); +// Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); +// Map<String, Object> newMap = new HashMap<>(); +// Map.Entry<String, Object> next; +// while (iterator.hasNext()) { +// next = iterator.next(); +// String key = next.getKey(); +// Object value = next.getValue(); +// if (key.contains("-Avg")) { +// newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); +// } else { +// newMap.put(key, value); +// } +// iterator.remove(); +// } +// //��������������� +// historyMinutelyService.insertHistoryMinutely(newMap); +// ack.acknowledge(); +// } catch (Exception e) { +// log.error("param{}" + msg); +// } +// } +// +// //������������ +// @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") +// public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { +// String msg = record.value(); +// try { +// Map<String, Object> data = JSON.parseObject(msg, Map.class); +// Object mac = data.get("mac"); +// Object time = data.get("DataTime"); +// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { +// log.warn("some properties is null, param{}", msg); +// ack.acknowledge(); +// return; +// } +// +// //������������ +//// data.remove("time"); +// data.remove("entryTime"); +// Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); +// Map<String, Object> newMap = new HashMap<>(); +// Map.Entry<String, Object> next; +// while (iterator.hasNext()) { +// next = iterator.next(); +// String key = next.getKey(); +// Object value = next.getValue(); +// if (key.contains("-Avg")) { +// newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); +// } else { +// newMap.put(key, value); +// } +// iterator.remove(); +// } +// //��������������� +// historyHourlyService.insertHistoryHourly(newMap); +// ack.acknowledge(); +// } catch (Exception e) { +// log.error("param{}" + msg); +// } +// } +// +// //������������������������������������������������������ +// @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") +// public void listenSecond(ConsumerRecord<String, String> record) { +// String msg = record.value(); +// try { +// Map<String, Object> data = JSON.parseObject(msg, Map.class); +// Object mac = data.get("mac"); +// Object time = data.get("DataTime"); +// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { +// log.warn("some properties is null, param{}", msg); +// return; +// } +// //������������ // data.remove("time"); - data.remove("entryTime"); - Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); - Map<String, Object> newMap = new HashMap<>(); - Map.Entry<String, Object> next; - while (iterator.hasNext()) { - next = iterator.next(); - String key = next.getKey(); - Object value = next.getValue(); - if (key.contains("-Avg")) { - newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); - } else { - newMap.put(key, value); - } - iterator.remove(); - } - //��������������� - historyMinutelyService.insertHistoryMinutely(newMap); - ack.acknowledge(); - } catch (Exception e) { - log.error("param{}" + msg); - } - } - - //������������ - @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") - public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { - String msg = record.value(); - try { - Map<String, Object> data = JSON.parseObject(msg, Map.class); - Object mac = data.get("mac"); - Object time = data.get("DataTime"); - if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { - log.warn("some properties is null, param{}", msg); - ack.acknowledge(); - return; - } - - //������������ +// data.remove("entryTime"); +// +// //������������ +// data = deviceService.adjustDeviceData(data,"0"); +// //������redis +// data.put("DataTime", time); +// redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); +// //��������������������������� +// data.put("mac", mac); +// deviceService.judgeDeviceState(data); +// } catch (Exception e) { +// log.error("param{}" + msg); +// } +// } +// +// //������������������ +// @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") +// public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { +// String msg = record.value(); +// try { +// Map<String, Object> data = JSON.parseObject(msg, Map.class); +// Object mac = data.get("mac"); +// Object time = data.get("DataTime"); +// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { +// log.warn("some properties is null, param{}", msg); +// ack.acknowledge(); +// return; +// } +// +// //������������ // data.remove("time"); - data.remove("entryTime"); - Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); - Map<String, Object> newMap = new HashMap<>(); - Map.Entry<String, Object> next; - while (iterator.hasNext()) { - next = iterator.next(); - String key = next.getKey(); - Object value = next.getValue(); - if (key.contains("-Avg")) { - newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); - } else { - newMap.put(key, value); - } - iterator.remove(); - } - //��������������� - historyHourlyService.insertHistoryHourly(newMap); - ack.acknowledge(); - } catch (Exception e) { - log.error("param{}" + msg); - } - } - - //������������������������������������������������������ - @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") - public void listenSecond(ConsumerRecord<String, String> record) { - String msg = record.value(); - try { - Map<String, Object> data = JSON.parseObject(msg, Map.class); - Object mac = data.get("mac"); - Object time = data.get("DataTime"); - if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { - log.warn("some properties is null, param{}", msg); - return; - } - //������������ - data.remove("time"); - data.remove("entryTime"); - - //������������ - data = deviceService.adjustDeviceData(data,"0"); - //������redis - data.put("DataTime", time); - redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); - //��������������������������� - data.put("mac", mac); - deviceService.judgeDeviceState(data); - } catch (Exception e) { - log.error("param{}" + msg); - } - } - - //������������������ - @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") - public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { - String msg = record.value(); - try { - Map<String, Object> data = JSON.parseObject(msg, Map.class); - Object mac = data.get("mac"); - Object time = data.get("DataTime"); - if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { - log.warn("some properties is null, param{}", msg); - ack.acknowledge(); - return; - } - - //������������ - data.remove("time"); - data.remove("entryTime"); - - historySecondUavService.insertHistorySecond(data); - ack.acknowledge(); - } catch (Exception e) { - log.error("param{}" + msg); - } - } - - //������������������ - @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") - public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { - String msg = record.value(); - try { - Map<String, Object> data = JSON.parseObject(msg, Map.class); - Object mac = data.get("mac"); - Object time = data.get("DataTime"); - if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { - log.warn("some properties is null, param{}", msg); - ack.acknowledge(); - return; - } - - //������������ - data.remove("time"); - data.remove("entryTime"); - - historySecondCruiserService.insertHistorySecond(data); - ack.acknowledge(); - } catch (Exception e) { - log.error("param{}" + e); - } - } -} +// data.remove("entryTime"); +// +// historySecondUavService.insertHistorySecond(data); +// ack.acknowledge(); +// } catch (Exception e) { +// log.error("param{}" + msg); +// } +// } +// +// //������������������ +// @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") +// public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { +// String msg = record.value(); +// try { +// Map<String, Object> data = JSON.parseObject(msg, Map.class); +// Object mac = data.get("mac"); +// Object time = data.get("DataTime"); +// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { +// log.warn("some properties is null, param{}", msg); +// ack.acknowledge(); +// return; +// } +// +// //������������ +// data.remove("time"); +// data.remove("entryTime"); +// +// historySecondCruiserService.insertHistorySecond(data); +// ack.acknowledge(); +// } catch (Exception e) { +// log.error("param{}" + e); +// } +// } +//} -- Gitblit v1.8.0