| | |
| | | package com.moral.api.kafka.consumer; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.moral.api.entity.HistorySecondRadar; |
| | | import com.moral.api.service.*; |
| | | import com.moral.api.util.Method; |
| | | import com.moral.constant.KafkaConstants; |
| | | import com.moral.constant.RedisConstants; |
| | | import lombok.extern.slf4j.Slf4j; |
| | |
| | | |
| | | @Autowired |
| | | private HistorySecondUavService historySecondUavService; |
| | | @Autowired |
| | | private HistorySecondRadarService historySecondRadarService; |
| | | |
| | | //分钟数据 |
| | | @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") |
| | |
| | | log.error("param{}" + e); |
| | | } |
| | | } |
| | | |
| | | @KafkaListener(topics = KafkaConstants.RADAR_SECOND, containerFactory = "insertListenerContainerFactory") |
| | | public void listenSecondRadar(ConsumerRecord<String, String> record, Acknowledgment ack) { |
| | | String msg = record.value(); |
| | | try { |
| | | msg = msg.replaceAll(",", ";"); |
| | | Map<String, Object> data = Method.getDataStore(msg); |
| | | Object mac = data.get("QN"); |
| | | Object time = data.get("DataTime"); |
| | | if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { |
| | | log.warn("some properties is null, param{}", msg); |
| | | ack.acknowledge(); |
| | | return; |
| | | } |
| | | int i = 0; |
| | | historySecondRadarService.insertHistorySecond(data); |
| | | ack.acknowledge(); |
| | | } catch (Exception e) { |
| | | log.error("param{}" + e); |
| | | } |
| | | } |
| | | } |