|  |  |  | 
|---|
|  |  |  | /* | 
|---|
|  |  |  | package com.moral.api.kafka.consumer; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.moral.api.service.*; | 
|---|
|  |  |  | 
|---|
|  |  |  | import com.moral.constant.KafkaConstants; | 
|---|
|  |  |  | import com.moral.constant.RedisConstants; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /* | 
|---|
|  |  |  | * 普通设备消费者 | 
|---|
|  |  |  | * */ | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Component | 
|---|
|  |  |  | @Slf4j | 
|---|
|  |  |  | public class DeviceConsumer { | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据过滤 | 
|---|
|  |  |  | data.remove("time"); | 
|---|
|  |  |  | //            data.remove("time"); | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  | Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|---|
|  |  |  | Map<String, Object> newMap = new HashMap<>(); | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据过滤 | 
|---|
|  |  |  | data.remove("time"); | 
|---|
|  |  |  | //            data.remove("time"); | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  | Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); | 
|---|
|  |  |  | Map<String, Object> newMap = new HashMap<>(); | 
|---|
|  |  |  | 
|---|
|  |  |  | data.remove("entryTime"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //数据校准 | 
|---|
|  |  |  | data = deviceService.adjustDeviceData(data); | 
|---|
|  |  |  | data = deviceService.adjustDeviceData(data,"0"); | 
|---|
|  |  |  | //存入redis | 
|---|
|  |  |  | data.put("DataTime", time); | 
|---|
|  |  |  | redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //走航车秒数据 | 
|---|
|  |  |  | @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") | 
|---|
|  |  |  | public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { | 
|---|
|  |  |  | String msg = record.value(); | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | 
|---|
|  |  |  | historySecondCruiserService.insertHistorySecond(data); | 
|---|
|  |  |  | ack.acknowledge(); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | log.error("param{}" + msg); | 
|---|
|  |  |  | log.error("param{}" + e); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | */ | 
|---|