cjl
13 hours ago 256aa4c5431733e5d166f583f03724cf69ddfaa4
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
@@ -1,7 +1,9 @@
package com.moral.api.kafka.consumer;
import com.alibaba.fastjson.JSON;
import com.moral.api.entity.HistorySecondRadar;
import com.moral.api.service.*;
import com.moral.api.util.Method;
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
import lombok.extern.slf4j.Slf4j;
@@ -41,6 +43,8 @@
    @Autowired
    private HistorySecondUavService historySecondUavService;
    @Autowired
    private HistorySecondRadarService historySecondRadarService;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory")
@@ -223,4 +227,25 @@
            log.error("param{}" + e);
        }
    }
    @KafkaListener(topics = KafkaConstants.RADAR_SECOND, containerFactory = "insertListenerContainerFactory")
    public void listenSecondRadar(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            msg = msg.replaceAll(",", ";");
            Map<String, Object> data = Method.getDataStore(msg);
            Object mac = data.get("QN");
            Object time = data.get("DataTime");
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
            int i = 0;
            historySecondRadarService.insertHistorySecond(data);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("param{}" + e);
        }
    }
}