package com.moral.api.config.kafka; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.ContainerProperties; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.groupId.second-data}") private String secondDataGroupId; @Bean public KafkaListenerContainerFactory> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory());//设置消费者工厂 factory.setConcurrency(concurrency);//设置线程数 factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 return factory; } public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean("secondDataListenerFactory") public KafkaListenerContainerFactory> secondDataListenerFactory(){ ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(secondDataConsumerFactory());//设置消费者工厂 factory.setConcurrency(concurrency);//设置线程数 factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 return factory; } /** * @Description: 秒级消费工厂 * @Param: [] * @return: org.springframework.kafka.core.ConsumerFactory * @Author: 陈凯裕 * @Date: 2021/7/19 */ public ConsumerFactory secondDataConsumerFactory(){ Map commonConfig = consumerConfigs(); Map secondDataConfig = secondConsumerConfigs(); secondDataConfig.putAll(commonConfig); return new DefaultKafkaConsumerFactory<>(secondDataConfig); } /** * @Description: 秒级消费者配置 * @Param: [] * @return: java.util.Map * @Author: 陈凯裕 * @Date: 2021/7/19 */ public Map secondConsumerConfigs(){ Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,secondDataGroupId); return propsMap; } /** * @Description: 通用配置 * @Param: [] * @return: java.util.Map * @Author: 陈凯裕 * @Date: 2021/7/19 */ public Map consumerConfigs() { Map propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } }