package com.moral.api.kafka.consumer; import com.moral.api.service.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import com.alibaba.fastjson.JSON; import com.moral.constant.KafkaConstants; import com.moral.constant.RedisConstants; /* * 普通设备消费者 * */ @Component @Slf4j public class DeviceConsumer { @Autowired private HistoryMinutelyService historyMinutelyService; @Autowired private HistoryHourlyService historyHourlyService; @Autowired private DeviceService deviceService; @Autowired private RedisTemplate redisTemplate; @Autowired private HistorySecondCruiserService historySecondCruiserService; @Autowired private HistorySecondUavService historySecondUavService; //分钟数据 @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") public void listenMinute(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, Map.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); ack.acknowledge(); return; } //数据过滤 data.remove("time"); data.remove("entryTime"); Iterator> iterator = data.entrySet().iterator(); Map newMap = new HashMap<>(); Map.Entry next; while (iterator.hasNext()) { next = iterator.next(); String key = next.getKey(); Object value = next.getValue(); if (key.contains("-Avg")) { newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); } else { newMap.put(key, value); } iterator.remove(); } //存入数据库 historyMinutelyService.insertHistoryMinutely(newMap); ack.acknowledge(); } catch (Exception e) { log.error("param{}" + msg); } } //小时数据 @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") public void listenHour(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, Map.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); ack.acknowledge(); return; } //数据过滤 data.remove("time"); data.remove("entryTime"); Iterator> iterator = data.entrySet().iterator(); Map newMap = new HashMap<>(); Map.Entry next; while (iterator.hasNext()) { next = iterator.next(); String key = next.getKey(); Object value = next.getValue(); if (key.contains("-Avg")) { newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); } else { newMap.put(key, value); } iterator.remove(); } //存入数据库 historyHourlyService.insertHistoryHourly(newMap); ack.acknowledge(); } catch (Exception e) { log.error("param{}" + msg); } } //秒数据,修改设备状态,缓存最新秒数据 @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") public void listenSecond(ConsumerRecord record) { String msg = record.value(); try { Map data = JSON.parseObject(msg, Map.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); return; } //数据过滤 data.remove("time"); data.remove("entryTime"); //数据校准 data = deviceService.adjustDeviceData(data); //存入redis data.put("DataTime", time); redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); //判断并修改设备状态 data.put("mac", mac); deviceService.judgeDeviceState(data); } catch (Exception e) { log.error("param{}" + msg); } } //无人机秒数据 @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") public void listenSecondSpecial(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, Map.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); ack.acknowledge(); return; } //数据过滤 data.remove("time"); data.remove("entryTime"); historySecondUavService.insertHistorySecond(data); ack.acknowledge(); } catch (Exception e) { log.error("param{}" + msg); } } //走航车秒数据 @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") public void listenSecondCruiser(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, Map.class); Object mac = data.get("mac"); Object time = data.get("DataTime"); if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { log.warn("some properties is null, param{}", msg); ack.acknowledge(); return; } //数据过滤 data.remove("time"); data.remove("entryTime"); historySecondCruiserService.insertHistorySecond(data); ack.acknowledge(); } catch (Exception e) { log.error("param{}" + msg); } } }