From 9655ea9ce8cdd671a766f87a0885b0fdff7b89c9 Mon Sep 17 00:00:00 2001 From: jinpengyong <jpy123456> Date: Thu, 24 Aug 2023 17:12:28 +0800 Subject: [PATCH] Merge branch 'dev' of http://blit.7drlb.com:8888/r/moral into wb --- screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java | 415 ++++++++++++++++++++++++++++++---------------------------- 1 files changed, 215 insertions(+), 200 deletions(-) diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java index 0548b82..30041ec 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java @@ -1,201 +1,216 @@ -//package com.moral.api.kafka.consumer; -// -//import com.moral.api.service.*; -//import lombok.extern.slf4j.Slf4j; -//import org.apache.kafka.clients.consumer.ConsumerRecord; -//import org.springframework.beans.factory.annotation.Autowired; -//import org.springframework.data.redis.core.RedisTemplate; -//import org.springframework.kafka.annotation.KafkaListener; -//import org.springframework.kafka.support.Acknowledgment; -//import org.springframework.stereotype.Component; -//import org.springframework.util.ObjectUtils; -// -//import java.util.HashMap; -//import java.util.Iterator; -//import java.util.Map; -// -//import com.alibaba.fastjson.JSON; -//import com.moral.constant.KafkaConstants; -//import com.moral.constant.RedisConstants; -// -// -// -//@Component -//@Slf4j -//public class DeviceConsumer { -// -// @Autowired -// private HistoryMinutelyService historyMinutelyService; -// -// @Autowired -// private HistoryHourlyService historyHourlyService; -// -// @Autowired -// private DeviceService deviceService; -// -// @Autowired -// private RedisTemplate redisTemplate; -// -// @Autowired -// private HistorySecondCruiserService historySecondCruiserService; -// -// @Autowired -// private HistorySecondUavService historySecondUavService; -// -// //������������ -// @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") -// public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ -//// data.remove("time"); -// data.remove("entryTime"); -// Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); -// Map<String, Object> newMap = new HashMap<>(); -// Map.Entry<String, Object> next; -// while (iterator.hasNext()) { -// next = iterator.next(); -// String key = next.getKey(); -// Object value = next.getValue(); -// if (key.contains("-Avg")) { -// newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); -// } else { -// newMap.put(key, value); -// } -// iterator.remove(); -// } -// //��������������� -// historyMinutelyService.insertHistoryMinutely(newMap); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������ -// @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") -// public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ -//// data.remove("time"); -// data.remove("entryTime"); -// Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); -// Map<String, Object> newMap = new HashMap<>(); -// Map.Entry<String, Object> next; -// while (iterator.hasNext()) { -// next = iterator.next(); -// String key = next.getKey(); -// Object value = next.getValue(); -// if (key.contains("-Avg")) { -// newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); -// } else { -// newMap.put(key, value); -// } -// iterator.remove(); -// } -// //��������������� -// historyHourlyService.insertHistoryHourly(newMap); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������������������������������������������������ -// @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") -// public void listenSecond(ConsumerRecord<String, String> record) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// return; -// } -// //������������ +package com.moral.api.kafka.consumer; + +import com.moral.api.service.*; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.text.DecimalFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import com.alibaba.fastjson.JSON; +import com.moral.constant.KafkaConstants; +import com.moral.constant.RedisConstants; + + + +@Component +@Slf4j +public class DeviceConsumer { + + @Autowired + private HistoryMinutelyService historyMinutelyService; + + @Autowired + private HistoryHourlyService historyHourlyService; + + @Autowired + private DeviceService deviceService; + + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private HistorySecondCruiserService historySecondCruiserService; + + @Autowired + private HistorySecondUavService historySecondUavService; + + //������������ + @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") + public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ // data.remove("time"); -// data.remove("entryTime"); -// -// //������������ -// data = deviceService.adjustDeviceData(data,"0"); -// //������redis -// data.put("DataTime", time); -// redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); -// //��������������������������� -// data.put("mac", mac); -// deviceService.judgeDeviceState(data); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������������ -// @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") -// public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ -// data.remove("time"); -// data.remove("entryTime"); -// -// historySecondUavService.insertHistorySecond(data); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + msg); -// } -// } -// -// //������������������ -// @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") -// public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { -// String msg = record.value(); -// try { -// Map<String, Object> data = JSON.parseObject(msg, Map.class); -// Object mac = data.get("mac"); -// Object time = data.get("DataTime"); -// if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { -// log.warn("some properties is null, param{}", msg); -// ack.acknowledge(); -// return; -// } -// -// //������������ -// data.remove("time"); -// data.remove("entryTime"); -// -// historySecondCruiserService.insertHistorySecond(data); -// ack.acknowledge(); -// } catch (Exception e) { -// log.error("param{}" + e); -// } -// } -//} + data.remove("entryTime"); + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); + } else { + newMap.put(key, value); + } + iterator.remove(); + } + //��������������� + historyMinutelyService.insertHistoryMinutely(newMap); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������ + @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") + public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ + data.remove("time"); + data.remove("entryTime"); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String format = dateFormat.format(new Date().getTime()); + //������������ + String substring = format.substring(0, 4); + + String substring1 = time.toString().substring(0, 4); + + String replace = time.toString().replace(substring1, substring); + + data.put("DataTime",replace); + + Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); + Map<String, Object> newMap = new HashMap<>(); + Map.Entry<String, Object> next; + while (iterator.hasNext()) { + next = iterator.next(); + String key = next.getKey(); + Object value = next.getValue(); + if (key.contains("-Avg")) { + newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); + } else { + newMap.put(key, value); + } + iterator.remove(); + } + //��������������� + historyHourlyService.insertHistoryHourly(newMap); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������������������������������������������������ + @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") + public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + return; + } + //������������ + data.remove("time"); + data.remove("entryTime"); + + //������������ + data = deviceService.adjustDeviceData(data,"0"); + //������redis + data.put("DataTime", time); + redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); + //��������������������������� + data.put("mac", mac); + deviceService.judgeDeviceState(data); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������������ + @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") + public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ + data.remove("time"); + data.remove("entryTime"); + + historySecondUavService.insertHistorySecond(data); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + msg); + } + } + + //������������������ + @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") + public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { + String msg = record.value(); + try { + Map<String, Object> data = JSON.parseObject(msg, Map.class); + Object mac = data.get("mac"); + Object time = data.get("DataTime"); + if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { + log.warn("some properties is null, param{}", msg); + ack.acknowledge(); + return; + } + + //������������ + data.remove("time"); + data.remove("entryTime"); + + historySecondCruiserService.insertHistorySecond(data); + ack.acknowledge(); + } catch (Exception e) { + log.error("param{}" + e); + } + } +} -- Gitblit v1.8.0