|  |  | 
 |  |  | package com.moral.api.kafka.consumer; | 
 |  |  |  | 
 |  |  | import com.moral.api.service.*; | 
 |  |  | import lombok.extern.slf4j.Slf4j; | 
 |  |  | import org.apache.kafka.clients.consumer.ConsumerRecord; | 
 |  |  | import org.springframework.beans.factory.annotation.Autowired; | 
 |  |  | 
 |  |  | import java.util.Map; | 
 |  |  |  | 
 |  |  | import com.alibaba.fastjson.JSON; | 
 |  |  | import com.moral.api.service.DeviceService; | 
 |  |  | import com.moral.api.service.HistoryHourlyService; | 
 |  |  | import com.moral.api.service.HistoryMinutelyService; | 
 |  |  | import com.moral.api.service.HistorySecondSpecialService; | 
 |  |  | import com.moral.constant.KafkaConstants; | 
 |  |  | import com.moral.constant.RedisConstants; | 
 |  |  |  | 
 |  |  | 
 |  |  |     private RedisTemplate redisTemplate; | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private HistorySecondSpecialService historySecondSpecialService; | 
 |  |  |     private HistorySecondCruiserService historySecondCruiserService; | 
 |  |  |  | 
 |  |  |     @Autowired | 
 |  |  |     private HistorySecondUavService historySecondUavService; | 
 |  |  |  | 
 |  |  |     //分钟数据 | 
 |  |  |     @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") | 
 |  |  | 
 |  |  |             //数据校准 | 
 |  |  |             data = deviceService.adjustDeviceData(data); | 
 |  |  |             //存入redis | 
 |  |  |             data.put("DataTime", time); | 
 |  |  |             redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); | 
 |  |  |             //判断并修改设备状态 | 
 |  |  |             data.put("mac", mac); | 
 |  |  |             deviceService.judgeDeviceState(data); | 
 |  |  |         } catch (Exception e) { | 
 |  |  |             log.error("param{}" + msg); | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     //特殊设备秒数据 | 
 |  |  |     //无人机秒数据 | 
 |  |  |     @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") | 
 |  |  |     public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
 |  |  |         String msg = record.value(); | 
 |  |  | 
 |  |  |             data.remove("time"); | 
 |  |  |             data.remove("entryTime"); | 
 |  |  |  | 
 |  |  |             historySecondSpecialService.insertHistorySecond(data); | 
 |  |  |             historySecondUavService.insertHistorySecond(data); | 
 |  |  |             ack.acknowledge(); | 
 |  |  |         } catch (Exception e) { | 
 |  |  |             log.error("param{}" + msg); | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     //走航车秒数据 | 
 |  |  |     @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") | 
 |  |  |     public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
 |  |  |         String msg = record.value(); | 
 |  |  |         try { | 
 |  |  |             Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
 |  |  |             Object mac = data.get("mac"); | 
 |  |  |             Object time = data.get("DataTime"); | 
 |  |  |             if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
 |  |  |                 log.warn("some properties is null, param{}", msg); | 
 |  |  |                 ack.acknowledge(); | 
 |  |  |                 return; | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             //数据过滤 | 
 |  |  |             data.remove("time"); | 
 |  |  |             data.remove("entryTime"); | 
 |  |  |  | 
 |  |  |             historySecondCruiserService.insertHistorySecond(data); | 
 |  |  |             ack.acknowledge(); | 
 |  |  |         } catch (Exception e) { | 
 |  |  |             log.error("param{}" + e); | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  | } |