package com.moral.api.kafka.consumer; import com.moral.api.constant.TopicConstants; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import java.util.Random; @Component @Slf4j public class KafkaConsumer { /* *//** * 这是手动提交的消费方式 * @param record * @param ack * @throws Exception *//* @KafkaListener(topics = TopicConstants.TEST_TOPIC_MESSAGE,groupId = "test") public void listenTest(ConsumerRecord record , Acknowledgment ack) throws Exception { String msg = record.value(); System.out.println(msg); if (new Random().nextInt(100)<50){ log.info(String.format("kafka 消费消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value())); ack.acknowledge(); } }*/ }