cjl
2023-10-13 2acb2c049b0c6708b14f28b0cdf1e62f486cb019
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
@@ -5,6 +5,7 @@
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@@ -139,7 +140,7 @@
    }
    //秒数据,修改设备状态,缓存最新秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory")
    /*@KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
@@ -161,19 +162,22 @@
            data.remove("entryTime");
            //数据校准
            data = deviceService.adjustDeviceData(data,"0");
            //data = deviceService.adjustDeviceData(data,"0");
            //存入redis
            data.put("DataTime", time);
            redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data);
            //redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data);
            //判断并修改设备状态
            data.put("mac", mac);
            deviceService.judgeDeviceState(data);
            ack.acknowledge();
            deviceService.judgeDeviceState(data);
        }catch (CommitFailedException e){
            log.error("param{}" + msg);
        } catch (Exception e) {
            log.error("param{}" + msg);
            ack.acknowledge();
        }
    }
    }*/
    //无人机秒数据
    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory")