|  |  |  | 
|---|
|  |  |  | //package com.moral.api.kafka.consumer; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //import com.moral.api.service.*; | 
|---|
|  |  |  | //import lombok.extern.slf4j.Slf4j; | 
|---|
|  |  |  | //import org.apache.kafka.clients.consumer.ConsumerRecord; | 
|---|
|  |  |  | //import org.springframework.beans.factory.annotation.Autowired; | 
|---|
|  |  |  | //import org.springframework.data.redis.core.RedisTemplate; | 
|---|
|  |  |  | //import org.springframework.kafka.annotation.KafkaListener; | 
|---|
|  |  |  | //import org.springframework.kafka.support.Acknowledgment; | 
|---|
|  |  |  | //import org.springframework.stereotype.Component; | 
|---|
|  |  |  | //import org.springframework.util.ObjectUtils; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //import java.util.HashMap; | 
|---|
|  |  |  | //import java.util.Iterator; | 
|---|
|  |  |  | //import java.util.Map; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //import com.alibaba.fastjson.JSON; | 
|---|
|  |  |  | //import com.moral.constant.KafkaConstants; | 
|---|
|  |  |  | //import com.moral.constant.RedisConstants; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | // | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //@Component | 
|---|
|  |  |  | //@Slf4j | 
|---|
|  |  |  | //public class DeviceConsumer { | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    @Autowired | 
|---|
|  |  |  | //    private HistoryMinutelyService historyMinutelyService; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    @Autowired | 
|---|
|  |  |  | //    private HistoryHourlyService historyHourlyService; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    @Autowired | 
|---|
|  |  |  | //    private DeviceService deviceService; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    @Autowired | 
|---|
|  |  |  | //    private RedisTemplate redisTemplate; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    @Autowired | 
|---|
|  |  |  | //    private HistorySecondCruiserService historySecondCruiserService; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    @Autowired | 
|---|
|  |  |  | //    private HistorySecondUavService historySecondUavService; | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    //分钟数据 | 
|---|
|  |  |  | //    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | //    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | //        String msg = record.value(); | 
|---|
|  |  |  | //        try { | 
|---|
|  |  |  | //            Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | //            Object mac = data.get("mac"); | 
|---|
|  |  |  | //            Object time = data.get("DataTime"); | 
|---|
|  |  |  | //            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | //                log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | //                ack.acknowledge(); | 
|---|
|  |  |  | //                return; | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //            //数据过滤 | 
|---|
|  |  |  | ////            data.remove("time"); | 
|---|
|  |  |  | //            data.remove("entryTime"); | 
|---|
|  |  |  | //            Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|---|
|  |  |  | //            Map<String, Object> newMap = new HashMap<>(); | 
|---|
|  |  |  | //            Map.Entry<String, Object> next; | 
|---|
|  |  |  | //            while (iterator.hasNext()) { | 
|---|
|  |  |  | //                next = iterator.next(); | 
|---|
|  |  |  | //                String key = next.getKey(); | 
|---|
|  |  |  | //                Object value = next.getValue(); | 
|---|
|  |  |  | //                if (key.contains("-Avg")) { | 
|---|
|  |  |  | //                    newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); | 
|---|
|  |  |  | //                } else { | 
|---|
|  |  |  | //                    newMap.put(key, value); | 
|---|
|  |  |  | //                } | 
|---|
|  |  |  | //                iterator.remove(); | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | //            //存入数据库 | 
|---|
|  |  |  | //            historyMinutelyService.insertHistoryMinutely(newMap); | 
|---|
|  |  |  | //            ack.acknowledge(); | 
|---|
|  |  |  | //        } catch (Exception e) { | 
|---|
|  |  |  | //            log.error("param{}" + msg); | 
|---|
|  |  |  | //        } | 
|---|
|  |  |  | //    } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    //小时数据 | 
|---|
|  |  |  | //    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | //    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | //        String msg = record.value(); | 
|---|
|  |  |  | //        try { | 
|---|
|  |  |  | //            Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | //            Object mac = data.get("mac"); | 
|---|
|  |  |  | //            Object time = data.get("DataTime"); | 
|---|
|  |  |  | //            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | //                log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | //                ack.acknowledge(); | 
|---|
|  |  |  | //                return; | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //            //数据过滤 | 
|---|
|  |  |  | ////            data.remove("time"); | 
|---|
|  |  |  | //            data.remove("entryTime"); | 
|---|
|  |  |  | //            Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|---|
|  |  |  | //            Map<String, Object> newMap = new HashMap<>(); | 
|---|
|  |  |  | //            Map.Entry<String, Object> next; | 
|---|
|  |  |  | //            while (iterator.hasNext()) { | 
|---|
|  |  |  | //                next = iterator.next(); | 
|---|
|  |  |  | //                String key = next.getKey(); | 
|---|
|  |  |  | //                Object value = next.getValue(); | 
|---|
|  |  |  | //                if (key.contains("-Avg")) { | 
|---|
|  |  |  | //                    newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); | 
|---|
|  |  |  | //                } else { | 
|---|
|  |  |  | //                    newMap.put(key, value); | 
|---|
|  |  |  | //                } | 
|---|
|  |  |  | //                iterator.remove(); | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | //            //存入数据库 | 
|---|
|  |  |  | //            historyHourlyService.insertHistoryHourly(newMap); | 
|---|
|  |  |  | //            ack.acknowledge(); | 
|---|
|  |  |  | //        } catch (Exception e) { | 
|---|
|  |  |  | //            log.error("param{}" + msg); | 
|---|
|  |  |  | //        } | 
|---|
|  |  |  | //    } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    //秒数据,修改设备状态,缓存最新秒数据 | 
|---|
|  |  |  | //    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") | 
|---|
|  |  |  | //    public void listenSecond(ConsumerRecord<String, String> record) { | 
|---|
|  |  |  | //        String msg = record.value(); | 
|---|
|  |  |  | //        try { | 
|---|
|  |  |  | //            Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | //            Object mac = data.get("mac"); | 
|---|
|  |  |  | //            Object time = data.get("DataTime"); | 
|---|
|  |  |  | //            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | //                log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | //                return; | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | //            //数据过滤 | 
|---|
|  |  |  | /* | 
|---|
|  |  |  | package com.moral.api.kafka.consumer; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.moral.api.service.*; | 
|---|
|  |  |  | import lombok.extern.slf4j.Slf4j; | 
|---|
|  |  |  | import org.apache.kafka.clients.consumer.ConsumerRecord; | 
|---|
|  |  |  | import org.springframework.beans.factory.annotation.Autowired; | 
|---|
|  |  |  | import org.springframework.data.redis.core.RedisTemplate; | 
|---|
|  |  |  | import org.springframework.kafka.annotation.KafkaListener; | 
|---|
|  |  |  | import org.springframework.kafka.support.Acknowledgment; | 
|---|
|  |  |  | import org.springframework.stereotype.Component; | 
|---|
|  |  |  | import org.springframework.util.ObjectUtils; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import java.util.HashMap; | 
|---|
|  |  |  | import java.util.Iterator; | 
|---|
|  |  |  | import java.util.Map; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson.JSON; | 
|---|
|  |  |  | import com.moral.constant.KafkaConstants; | 
|---|
|  |  |  | import com.moral.constant.RedisConstants; | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Component | 
|---|
|  |  |  | @Slf4j | 
|---|
|  |  |  | public class DeviceConsumer { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private HistoryMinutelyService historyMinutelyService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private HistoryHourlyService historyHourlyService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private DeviceService deviceService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private RedisTemplate redisTemplate; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private HistorySecondCruiserService historySecondCruiserService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Autowired | 
|---|
|  |  |  | private HistorySecondUavService historySecondUavService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //分钟数据 | 
|---|
|  |  |  | @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | String msg = record.value(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | Object mac = data.get("mac"); | 
|---|
|  |  |  | Object time = data.get("DataTime"); | 
|---|
|  |  |  | if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据过滤 | 
|---|
|  |  |  | //            data.remove("time"); | 
|---|
|  |  |  | //            data.remove("entryTime"); | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //            //数据校准 | 
|---|
|  |  |  | //            data = deviceService.adjustDeviceData(data,"0"); | 
|---|
|  |  |  | //            //存入redis | 
|---|
|  |  |  | //            data.put("DataTime", time); | 
|---|
|  |  |  | //            redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); | 
|---|
|  |  |  | //            //判断并修改设备状态 | 
|---|
|  |  |  | //            data.put("mac", mac); | 
|---|
|  |  |  | //            deviceService.judgeDeviceState(data); | 
|---|
|  |  |  | //        } catch (Exception e) { | 
|---|
|  |  |  | //            log.error("param{}" + msg); | 
|---|
|  |  |  | //        } | 
|---|
|  |  |  | //    } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    //无人机秒数据 | 
|---|
|  |  |  | //    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | //    public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | //        String msg = record.value(); | 
|---|
|  |  |  | //        try { | 
|---|
|  |  |  | //            Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | //            Object mac = data.get("mac"); | 
|---|
|  |  |  | //            Object time = data.get("DataTime"); | 
|---|
|  |  |  | //            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | //                log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | //                ack.acknowledge(); | 
|---|
|  |  |  | //                return; | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //            //数据过滤 | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  | Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|---|
|  |  |  | Map<String, Object> newMap = new HashMap<>(); | 
|---|
|  |  |  | Map.Entry<String, Object> next; | 
|---|
|  |  |  | while (iterator.hasNext()) { | 
|---|
|  |  |  | next = iterator.next(); | 
|---|
|  |  |  | String key = next.getKey(); | 
|---|
|  |  |  | Object value = next.getValue(); | 
|---|
|  |  |  | if (key.contains("-Avg")) { | 
|---|
|  |  |  | newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | newMap.put(key, value); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | iterator.remove(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //存入数据库 | 
|---|
|  |  |  | historyMinutelyService.insertHistoryMinutely(newMap); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | log.error("param{}" + msg); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //小时数据 | 
|---|
|  |  |  | @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | String msg = record.value(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | Object mac = data.get("mac"); | 
|---|
|  |  |  | Object time = data.get("DataTime"); | 
|---|
|  |  |  | if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据过滤 | 
|---|
|  |  |  | //            data.remove("time"); | 
|---|
|  |  |  | //            data.remove("entryTime"); | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //            historySecondUavService.insertHistorySecond(data); | 
|---|
|  |  |  | //            ack.acknowledge(); | 
|---|
|  |  |  | //        } catch (Exception e) { | 
|---|
|  |  |  | //            log.error("param{}" + msg); | 
|---|
|  |  |  | //        } | 
|---|
|  |  |  | //    } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //    //走航车秒数据 | 
|---|
|  |  |  | //    @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | //    public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | //        String msg = record.value(); | 
|---|
|  |  |  | //        try { | 
|---|
|  |  |  | //            Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | //            Object mac = data.get("mac"); | 
|---|
|  |  |  | //            Object time = data.get("DataTime"); | 
|---|
|  |  |  | //            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | //                log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | //                ack.acknowledge(); | 
|---|
|  |  |  | //                return; | 
|---|
|  |  |  | //            } | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //            //数据过滤 | 
|---|
|  |  |  | //            data.remove("time"); | 
|---|
|  |  |  | //            data.remove("entryTime"); | 
|---|
|  |  |  | // | 
|---|
|  |  |  | //            historySecondCruiserService.insertHistorySecond(data); | 
|---|
|  |  |  | //            ack.acknowledge(); | 
|---|
|  |  |  | //        } catch (Exception e) { | 
|---|
|  |  |  | //            log.error("param{}" + e); | 
|---|
|  |  |  | //        } | 
|---|
|  |  |  | //    } | 
|---|
|  |  |  | //} | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  | Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|---|
|  |  |  | Map<String, Object> newMap = new HashMap<>(); | 
|---|
|  |  |  | Map.Entry<String, Object> next; | 
|---|
|  |  |  | while (iterator.hasNext()) { | 
|---|
|  |  |  | next = iterator.next(); | 
|---|
|  |  |  | String key = next.getKey(); | 
|---|
|  |  |  | Object value = next.getValue(); | 
|---|
|  |  |  | if (key.contains("-Avg")) { | 
|---|
|  |  |  | newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | newMap.put(key, value); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | iterator.remove(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //存入数据库 | 
|---|
|  |  |  | historyHourlyService.insertHistoryHourly(newMap); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | log.error("param{}" + msg); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //秒数据,修改设备状态,缓存最新秒数据 | 
|---|
|  |  |  | @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") | 
|---|
|  |  |  | public void listenSecond(ConsumerRecord<String, String> record) { | 
|---|
|  |  |  | String msg = record.value(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | Object mac = data.get("mac"); | 
|---|
|  |  |  | Object time = data.get("DataTime"); | 
|---|
|  |  |  | if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //数据过滤 | 
|---|
|  |  |  | data.remove("time"); | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据校准 | 
|---|
|  |  |  | data = deviceService.adjustDeviceData(data,"0"); | 
|---|
|  |  |  | //存入redis | 
|---|
|  |  |  | data.put("DataTime", time); | 
|---|
|  |  |  | redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); | 
|---|
|  |  |  | //判断并修改设备状态 | 
|---|
|  |  |  | data.put("mac", mac); | 
|---|
|  |  |  | deviceService.judgeDeviceState(data); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | log.error("param{}" + msg); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //无人机秒数据 | 
|---|
|  |  |  | @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | String msg = record.value(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | Object mac = data.get("mac"); | 
|---|
|  |  |  | Object time = data.get("DataTime"); | 
|---|
|  |  |  | if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据过滤 | 
|---|
|  |  |  | data.remove("time"); | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | historySecondUavService.insertHistorySecond(data); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | log.error("param{}" + msg); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //走航车秒数据 | 
|---|
|  |  |  | @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | String msg = record.value(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Map<String, Object> data = JSON.parseObject(msg, Map.class); | 
|---|
|  |  |  | Object mac = data.get("mac"); | 
|---|
|  |  |  | Object time = data.get("DataTime"); | 
|---|
|  |  |  | if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { | 
|---|
|  |  |  | log.warn("some properties is null, param{}", msg); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据过滤 | 
|---|
|  |  |  | data.remove("time"); | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | historySecondCruiserService.insertHistorySecond(data); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | log.error("param{}" + e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | */ | 
|---|