jinpengyong
2021-06-24 ec95e52c6f84c0a6cbdcbaecc2465b00ede6696d
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -17,7 +17,6 @@
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.util.AdjustDataUtils;
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
@@ -35,15 +34,13 @@
    private DeviceService deviceService;
    @Autowired
    private AdjustDataUtils adjustDataUtils;
    @Autowired
    private RedisTemplate redisTemplate;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        System.out.println(msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
@@ -104,6 +101,7 @@
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        //System.out.println(record.offset() + "===>" + msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
@@ -115,9 +113,9 @@
                return;
            }
            //数据校准
            data = adjustDataUtils.adjust(data);
            data = deviceService.adjustDeviceData(data);
            //存入redis
            redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + "_" + mac, data);
            redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data);
            //判断并修改设备状态
            deviceService.judgeDeviceState(data);
            ack.acknowledge();