| | |
| | | import com.moral.constant.KafkaConstants; |
| | | import com.moral.constant.RedisConstants; |
| | | |
| | | //@Component |
| | | @Component |
| | | @Slf4j |
| | | public class KafkaConsumer { |
| | | |
| | |
| | | @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") |
| | | public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { |
| | | String msg = record.value(); |
| | | System.out.println(msg); |
| | | try { |
| | | Map<String, Object> data = JSON.parseObject(msg, HashMap.class); |
| | | Object mac = data.get("mac"); |
| | |
| | | return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); |
| | | }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); |
| | | data.remove("time"); |
| | | data.remove("entryTime"); |
| | | //存入数据库 |
| | | historyMinutelyService.insertHistoryMinutely(data); |
| | | ack.acknowledge(); |
| | |
| | | return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); |
| | | }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); |
| | | data.remove("time"); |
| | | data.remove("entryTime"); |
| | | //存入数据库 |
| | | historyHourlyService.insertHistoryHourly(data); |
| | | ack.acknowledge(); |
| | |
| | | @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory") |
| | | public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { |
| | | String msg = record.value(); |
| | | //System.out.println(record.offset() + "===>" + msg); |
| | | try { |
| | | Map<String, Object> data = JSON.parseObject(msg, HashMap.class); |
| | | Object mac = data.get("mac"); |