//package com.moral.api.kafka.consumer; // //import com.moral.api.service.*; //import lombok.extern.slf4j.Slf4j; //import org.apache.kafka.clients.consumer.ConsumerRecord; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.data.redis.core.RedisTemplate; //import org.springframework.kafka.annotation.KafkaListener; //import org.springframework.kafka.support.Acknowledgment; //import org.springframework.stereotype.Component; //import org.springframework.util.ObjectUtils; // //import java.util.HashMap; //import java.util.Iterator; //import java.util.Map; // //import com.alibaba.fastjson.JSON; //import com.moral.constant.KafkaConstants; //import com.moral.constant.RedisConstants; // // // //@Component //@Slf4j //public class DeviceConsumer { // // @Autowired // private HistoryMinutelyService historyMinutelyService; // // @Autowired // private HistoryHourlyService historyHourlyService; // // @Autowired // private DeviceService deviceService; // // @Autowired // private RedisTemplate redisTemplate; // // @Autowired // private HistorySecondCruiserService historySecondCruiserService; // // @Autowired // private HistorySecondUavService historySecondUavService; // // //分钟数据 // @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") // public void listenMinute(ConsumerRecord record, Acknowledgment ack) { // String msg = record.value(); // try { // Map data = JSON.parseObject(msg, Map.class); // Object mac = data.get("mac"); // Object time = data.get("DataTime"); // if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { // log.warn("some properties is null, param{}", msg); // ack.acknowledge(); // return; // } // // //数据过滤 //// data.remove("time"); // data.remove("entryTime"); // Iterator> iterator = data.entrySet().iterator(); // Map newMap = new HashMap<>(); // Map.Entry next; // while (iterator.hasNext()) { // next = iterator.next(); // String key = next.getKey(); // Object value = next.getValue(); // if (key.contains("-Avg")) { // newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); // } else { // newMap.put(key, value); // } // iterator.remove(); // } // //存入数据库 // historyMinutelyService.insertHistoryMinutely(newMap); // ack.acknowledge(); // } catch (Exception e) { // log.error("param{}" + msg); // } // } // // //小时数据 // @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") // public void listenHour(ConsumerRecord record, Acknowledgment ack) { // String msg = record.value(); // try { // Map data = JSON.parseObject(msg, Map.class); // Object mac = data.get("mac"); // Object time = data.get("DataTime"); // if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { // log.warn("some properties is null, param{}", msg); // ack.acknowledge(); // return; // } // // //数据过滤 //// data.remove("time"); // data.remove("entryTime"); // Iterator> iterator = data.entrySet().iterator(); // Map newMap = new HashMap<>(); // Map.Entry next; // while (iterator.hasNext()) { // next = iterator.next(); // String key = next.getKey(); // Object value = next.getValue(); // if (key.contains("-Avg")) { // newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); // } else { // newMap.put(key, value); // } // iterator.remove(); // } // //存入数据库 // historyHourlyService.insertHistoryHourly(newMap); // ack.acknowledge(); // } catch (Exception e) { // log.error("param{}" + msg); // } // } // // //秒数据,修改设备状态,缓存最新秒数据 // @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") // public void listenSecond(ConsumerRecord record) { // String msg = record.value(); // try { // Map data = JSON.parseObject(msg, Map.class); // Object mac = data.get("mac"); // Object time = data.get("DataTime"); // if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { // log.warn("some properties is null, param{}", msg); // return; // } // //数据过滤 // data.remove("time"); // data.remove("entryTime"); // // //数据校准 // data = deviceService.adjustDeviceData(data,"0"); // //存入redis // data.put("DataTime", time); // redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); // //判断并修改设备状态 // data.put("mac", mac); // deviceService.judgeDeviceState(data); // } catch (Exception e) { // log.error("param{}" + msg); // } // } // // //无人机秒数据 // @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") // public void listenSecondSpecial(ConsumerRecord record, Acknowledgment ack) { // String msg = record.value(); // try { // Map data = JSON.parseObject(msg, Map.class); // Object mac = data.get("mac"); // Object time = data.get("DataTime"); // if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { // log.warn("some properties is null, param{}", msg); // ack.acknowledge(); // return; // } // // //数据过滤 // data.remove("time"); // data.remove("entryTime"); // // historySecondUavService.insertHistorySecond(data); // ack.acknowledge(); // } catch (Exception e) { // log.error("param{}" + msg); // } // } // // //走航车秒数据 // @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") // public void listenSecondCruiser(ConsumerRecord record, Acknowledgment ack) { // String msg = record.value(); // try { // Map data = JSON.parseObject(msg, Map.class); // Object mac = data.get("mac"); // Object time = data.get("DataTime"); // if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { // log.warn("some properties is null, param{}", msg); // ack.acknowledge(); // return; // } // // //数据过滤 // data.remove("time"); // data.remove("entryTime"); // // historySecondCruiserService.insertHistorySecond(data); // ack.acknowledge(); // } catch (Exception e) { // log.error("param{}" + e); // } // } //}