jinpengyong
2021-06-30 e6c6e6225bdbaaa27bcde320a79acde8239416c2
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -20,7 +20,7 @@
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
//@Component
@Component
@Slf4j
public class KafkaConsumer {
@@ -40,7 +40,6 @@
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        System.out.println(msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
@@ -59,6 +58,7 @@
                        return !(key.contains("Min") || key.contains("Max") || key.contains("Cou"));
                    }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue));
            data.remove("time");
            data.remove("entryTime");
            //存入数据库
            historyMinutelyService.insertHistoryMinutely(data);
            ack.acknowledge();
@@ -89,6 +89,7 @@
                        return !(key.contains("Min") || key.contains("Max") || key.contains("Cou"));
                    }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue));
            data.remove("time");
            data.remove("entryTime");
            //存入数据库
            historyHourlyService.insertHistoryHourly(data);
            ack.acknowledge();
@@ -101,7 +102,6 @@
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        //System.out.println(record.offset() + "===>" + msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");