kaiyu
2021-08-31 23e43d94105c6f3676e879d2cc271d0bec63f081
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -24,7 +24,6 @@
/*
 * 设备数据接入
 * */
@Component
@Slf4j
public class KafkaConsumer {
@@ -44,15 +43,14 @@
    private HistorySecondSpecialService historySecondSpecialService;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
@@ -84,15 +82,14 @@
    }
    //小时数据
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory")
    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
@@ -124,22 +121,20 @@
    }
    //秒数据,修改设备状态,缓存最新秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                return;
            }
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            data.remove("ver");
            //数据校准
            data = deviceService.adjustDeviceData(data);
@@ -153,15 +148,14 @@
    }
    //特殊设备秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND_SPECIAL, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (ObjectUtils.isEmpty(ver) || ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
@@ -170,7 +164,6 @@
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            data.remove("ver");
            historySecondSpecialService.insertHistorySecond(data);
            ack.acknowledge();