cjl
2023-10-13 4133d836c5aa4ebbf11e3749528a4e484ef7ca9e
fix:补偿提交
1 files modified
25 ■■■■■ changed files
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java 25 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
@@ -140,44 +140,39 @@
    }
    //秒数据,修改设备状态,缓存最新秒数据
    /*@KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            ack.acknowledge();
            Map<String, Object> data = JSON.parseObject(msg, Map.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
            List<String> listDictDevice = Arrays.asList(deviceList.split(","));
            if(!listDictDevice.contains(mac.toString())){
                ack.acknowledge();
            }
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            data.put("DataTime", time);
            List<String> listDictDevice = Arrays.asList(deviceList.split(","));
            if(listDictDevice.contains(mac.toString())){
                //判断并修改设备状态
                data.put("mac", mac);
                deviceService.judgeDeviceState(data);
            }
            //数据校准
            //data = deviceService.adjustDeviceData(data,"0");
            //存入redis
            data.put("DataTime", time);
            //redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data);
            //判断并修改设备状态
            data.put("mac", mac);
            ack.acknowledge();
            deviceService.judgeDeviceState(data);
        }catch (CommitFailedException e){
            log.error("param{}" + msg);
        } catch (Exception e) {
            log.error("param{}" + msg);
            ack.acknowledge();
        }
    }*/
    }
    //无人机秒数据
    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory")