package com.moral.api.kafka.consumer;
|
|
import com.moral.api.constant.TopicConstants;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
import org.springframework.kafka.annotation.KafkaListener;
|
import org.springframework.kafka.support.Acknowledgment;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Random;
|
|
@Component
|
@Slf4j
|
public class KafkaConsumer {
|
|
/* *//**
|
* 这是手动提交的消费方式
|
* @param record
|
* @param ack
|
* @throws Exception
|
*//*
|
@KafkaListener(topics = TopicConstants.TEST_TOPIC_MESSAGE,groupId = "test")
|
public void listenTest(ConsumerRecord<String, String> record , Acknowledgment ack) throws Exception {
|
String msg = record.value();
|
System.out.println(msg);
|
if (new Random().nextInt(100)<50){
|
log.info(String.format("kafka 消费消息成功---------------- listen1 topic = %s, offset = %d, value = %s ", record.topic(), record.offset(), record.value()));
|
ack.acknowledge();
|
}
|
|
}*/
|
|
|
|
|
|
}
|