package com.moral.api.kafka.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.HashMap; import java.util.Map; import java.util.stream.Collectors; import com.alibaba.fastjson.JSON; import com.moral.api.service.DeviceService; import com.moral.api.service.HistoryHourlyService; import com.moral.api.service.HistoryMinutelyService; import com.moral.constant.KafkaConstants; import com.moral.constant.RedisConstants; @Component @Slf4j public class KafkaConsumer { @Autowired private HistoryMinutelyService historyMinutelyService; @Autowired private HistoryHourlyService historyHourlyService; @Autowired private DeviceService deviceService; @Autowired private RedisTemplate redisTemplate; //分钟数据 @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory") public void listenMinute(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, HashMap.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); Object ver = data.get("ver"); if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); ack.acknowledge(); return; } //数据过滤 data = data.entrySet().stream() .filter(map -> { String key = map.getKey(); return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); data.remove("entryTime"); //存入数据库 historyMinutelyService.insertHistoryMinutely(data); ack.acknowledge(); } catch (Exception e) { //log.error("param{}" + msg); } } //小时数据 @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory") public void listenHour(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, HashMap.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); Object ver = data.get("ver"); if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); ack.acknowledge(); return; } //数据过滤 data = data.entrySet().stream() .filter(map -> { String key = map.getKey(); return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); data.remove("time"); data.remove("entryTime"); //存入数据库 historyHourlyService.insertHistoryHourly(data); ack.acknowledge(); } catch (Exception e) { //log.error("param{}" + msg); } } //秒数据,修改设备状态,缓存最新秒数据 @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory") public void listenSecond(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, HashMap.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); Object ver = data.get("ver"); if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); ack.acknowledge(); return; } //数据校准 data = deviceService.adjustDeviceData(data); //存入redis redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data); //判断并修改设备状态 deviceService.judgeDeviceState(data); ack.acknowledge(); } catch (Exception e) { //log.error("param{}" + msg); } } }