| package com.moral.api.kafka.consumer; | 
|   | 
| import com.moral.api.service.*; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.apache.kafka.clients.consumer.ConsumerRecord; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.data.redis.core.RedisTemplate; | 
| import org.springframework.kafka.annotation.KafkaListener; | 
| import org.springframework.kafka.support.Acknowledgment; | 
| import org.springframework.stereotype.Component; | 
| import org.springframework.util.ObjectUtils; | 
|   | 
| import java.util.HashMap; | 
| import java.util.Iterator; | 
| import java.util.Map; | 
|   | 
| import com.alibaba.fastjson.JSON; | 
| import com.moral.constant.KafkaConstants; | 
| import com.moral.constant.RedisConstants; | 
|   | 
| /* | 
|  * 普通设备消费者 | 
|  * */ | 
| @Component | 
| @Slf4j | 
| public class DeviceConsumer { | 
|   | 
|     @Autowired | 
|     private HistoryMinutelyService historyMinutelyService; | 
|   | 
|     @Autowired | 
|     private HistoryHourlyService historyHourlyService; | 
|   | 
|     @Autowired | 
|     private DeviceService deviceService; | 
|   | 
|     @Autowired | 
|     private RedisTemplate redisTemplate; | 
|   | 
|     @Autowired | 
|     private HistorySecondSpecialService historySecondSpecialService; | 
|   | 
|     @Autowired | 
|     private HistorySecondCruiserService historySecondCruiserService; | 
|   | 
|     @Autowired | 
|     private HistorySecondUavService historySecondUavService; | 
|   | 
|     //分钟数据 | 
|     //@KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") | 
|     public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|         String msg = record.value(); | 
|         try { | 
|             Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|             Object mac = data.get("mac"); | 
|             Object time = data.get("DataTime"); | 
|             if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|                 log.warn("some properties is null, param{}", msg); | 
|                 ack.acknowledge(); | 
|                 return; | 
|             } | 
|   | 
|             //数据过滤 | 
|             data.remove("time"); | 
|             data.remove("entryTime"); | 
|             Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|             Map<String, Object> newMap = new HashMap<>(); | 
|             Map.Entry<String, Object> next; | 
|             while (iterator.hasNext()) { | 
|                 next = iterator.next(); | 
|                 String key = next.getKey(); | 
|                 Object value = next.getValue(); | 
|                 if (key.contains("-Avg")) { | 
|                     newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); | 
|                 } else { | 
|                     newMap.put(key, value); | 
|                 } | 
|                 iterator.remove(); | 
|             } | 
|             //存入数据库 | 
|             historyMinutelyService.insertHistoryMinutely(newMap); | 
|             ack.acknowledge(); | 
|         } catch (Exception e) { | 
|             log.error("param{}" + msg); | 
|         } | 
|     } | 
|   | 
|     //小时数据 | 
|     //@KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") | 
|     public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|         String msg = record.value(); | 
|         try { | 
|             Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|             Object mac = data.get("mac"); | 
|             Object time = data.get("DataTime"); | 
|             if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|                 log.warn("some properties is null, param{}", msg); | 
|                 ack.acknowledge(); | 
|                 return; | 
|             } | 
|   | 
|             //数据过滤 | 
|             data.remove("time"); | 
|             data.remove("entryTime"); | 
|             Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|             Map<String, Object> newMap = new HashMap<>(); | 
|             Map.Entry<String, Object> next; | 
|             while (iterator.hasNext()) { | 
|                 next = iterator.next(); | 
|                 String key = next.getKey(); | 
|                 Object value = next.getValue(); | 
|                 if (key.contains("-Avg")) { | 
|                     newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); | 
|                 } else { | 
|                     newMap.put(key, value); | 
|                 } | 
|                 iterator.remove(); | 
|             } | 
|             //存入数据库 | 
|             historyHourlyService.insertHistoryHourly(newMap); | 
|             ack.acknowledge(); | 
|         } catch (Exception e) { | 
|             log.error("param{}" + msg); | 
|         } | 
|     } | 
|   | 
|     //秒数据,修改设备状态,缓存最新秒数据 | 
|     //@KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") | 
|     public void listenSecond(ConsumerRecord<String, String> record) { | 
|         String msg = record.value(); | 
|         try { | 
|             Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|             Object mac = data.get("mac"); | 
|             Object time = data.get("DataTime"); | 
|             if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|                 log.warn("some properties is null, param{}", msg); | 
|                 return; | 
|             } | 
|             //数据过滤 | 
|             data.remove("time"); | 
|             data.remove("entryTime"); | 
|   | 
|             //数据校准 | 
|             data = deviceService.adjustDeviceData(data); | 
|             //存入redis | 
|             data.put("DataTime", time); | 
|             redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); | 
|             //判断并修改设备状态 | 
|             data.put("mac", mac); | 
|             deviceService.judgeDeviceState(data); | 
|         } catch (Exception e) { | 
|             log.error("param{}" + msg); | 
|         } | 
|     } | 
|   | 
|     //无人机秒数据 | 
|     @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") | 
|     public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|         String msg = record.value(); | 
|         System.out.println(msg); | 
|         try { | 
|             Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|             Object mac = data.get("mac"); | 
|             Object time = data.get("DataTime"); | 
|             if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|                 log.warn("some properties is null, param{}", msg); | 
|                 ack.acknowledge(); | 
|                 return; | 
|             } | 
|   | 
|             //数据过滤 | 
|             data.remove("time"); | 
|             data.remove("entryTime"); | 
|   | 
|             historySecondUavService.insertHistorySecond(data); | 
|             ack.acknowledge(); | 
|         } catch (Exception e) { | 
|             log.error("param{}" + msg); | 
|         } | 
|     } | 
|   | 
|     //走航车秒数据 | 
|     @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") | 
|     public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|         String msg = record.value(); | 
|         try { | 
|             Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|             Object mac = data.get("mac"); | 
|             Object time = data.get("DataTime"); | 
|             if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|                 log.warn("some properties is null, param{}", msg); | 
|                 ack.acknowledge(); | 
|                 return; | 
|             } | 
|   | 
|             //数据过滤 | 
|             data.remove("time"); | 
|             data.remove("entryTime"); | 
|   | 
|             historySecondCruiserService.insertHistorySecond(data); | 
|             ack.acknowledge(); | 
|         } catch (Exception e) { | 
|             log.error("param{}" + msg); | 
|         } | 
|     } | 
| } |