lizijie
2021-09-16 b7f5fd4000d644d117a0936dbcc2bfc56bfab4f0
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
@@ -1,5 +1,6 @@
package com.moral.api.kafka.consumer;
import com.moral.api.service.*;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
@@ -14,10 +15,6 @@
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.service.HistorySecondSpecialService;
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
@@ -42,6 +39,12 @@
    @Autowired
    private HistorySecondSpecialService historySecondSpecialService;
    @Autowired
    private HistorySecondCruiserService historySecondCruiserService;
    @Autowired
    private HistorySecondUavService historySecondUavService;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory")
@@ -150,9 +153,35 @@
        }
    }
    //特殊设备秒数据
    //无人机秒数据
    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory")
    public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        System.out.println(msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            historySecondUavService.insertHistorySecond(data);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("param{}" + msg);
        }
    }
    //走航车秒数据
    @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory")
    public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
@@ -168,7 +197,7 @@
            data.remove("time");
            data.remove("entryTime");
            historySecondSpecialService.insertHistorySecond(data);
            historySecondCruiserService.insertHistorySecond(data);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("param{}" + msg);