|  |  |  | 
|---|
|  |  |  | import java.util.HashMap; | 
|---|
|  |  |  | import java.util.Map; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /*@Configuration | 
|---|
|  |  |  | @EnableKafka*/ | 
|---|
|  |  |  | @Configuration | 
|---|
|  |  |  | @EnableKafka | 
|---|
|  |  |  | public class KafkaConsumerConfig { | 
|---|
|  |  |  | @Value("${kafka.consumer.servers}") | 
|---|
|  |  |  | private String servers; | 
|---|
|  |  |  | 
|---|
|  |  |  | private boolean enableAutoCommit; | 
|---|
|  |  |  | @Value("${kafka.consumer.session.timeout}") | 
|---|
|  |  |  | private String sessionTimeout; | 
|---|
|  |  |  | @Value("${kafka.consumer.auto.commit.interval}") | 
|---|
|  |  |  | private String autoCommitInterval; | 
|---|
|  |  |  | @Value("${kafka.consumer.group.id}") | 
|---|
|  |  |  | private String groupId; | 
|---|
|  |  |  | @Value("${kafka.consumer.auto.offset.reset}") | 
|---|
|  |  |  | private String autoOffsetReset; | 
|---|
|  |  |  | @Value("${kafka.consumer.concurrency}") | 
|---|
|  |  |  | private int concurrency; | 
|---|
|  |  |  | @Value("${kafka.groupId.second-data}") | 
|---|
|  |  |  | private String secondDataGroupId; | 
|---|
|  |  |  | @Value("${kafka.groupId.cruiser-data}") | 
|---|
|  |  |  | private String cruiserDataGroupId; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Bean | 
|---|
|  |  |  | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { | 
|---|
|  |  |  | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | 
|---|
|  |  |  | factory.setConsumerFactory(consumerFactory()); | 
|---|
|  |  |  | factory.setConcurrency(concurrency); | 
|---|
|  |  |  | factory.getContainerProperties().setPollTimeout(1500); | 
|---|
|  |  |  | factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /*factory.setBatchListener(true);//@KafkaListener 批量消费  每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG | 
|---|
|  |  |  | factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式*/ | 
|---|
|  |  |  | factory.setConsumerFactory(consumerFactory());//设置消费者工厂 | 
|---|
|  |  |  | factory.setConcurrency(concurrency);//设置线程数 | 
|---|
|  |  |  | factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 | 
|---|
|  |  |  | return factory; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | 
|---|
|  |  |  | return new DefaultKafkaConsumerFactory<>(consumerConfigs()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Bean("secondDataListenerFactory") | 
|---|
|  |  |  | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> secondDataListenerFactory(){ | 
|---|
|  |  |  | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | 
|---|
|  |  |  | factory.setConsumerFactory(secondDataConsumerFactory());//设置消费者工厂 | 
|---|
|  |  |  | factory.setConcurrency(concurrency);//设置线程数 | 
|---|
|  |  |  | factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 | 
|---|
|  |  |  | return factory; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Bean("cruiserDataListenerFactory") | 
|---|
|  |  |  | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){ | 
|---|
|  |  |  | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | 
|---|
|  |  |  | factory.setConsumerFactory(cruiserDataConsumerFactory());//设置消费者工厂 | 
|---|
|  |  |  | factory.setConcurrency(concurrency);//设置线程数 | 
|---|
|  |  |  | factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 | 
|---|
|  |  |  | return factory; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @Description: 秒级消费工厂 | 
|---|
|  |  |  | * @Param: [] | 
|---|
|  |  |  | * @return: org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.String> | 
|---|
|  |  |  | * @Author: 陈凯裕 | 
|---|
|  |  |  | * @Date: 2021/7/19 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public ConsumerFactory<String,String> secondDataConsumerFactory(){ | 
|---|
|  |  |  | Map<String, Object> commonConfig = consumerConfigs(); | 
|---|
|  |  |  | Map<String, Object> secondDataConfig = secondConsumerConfigs(); | 
|---|
|  |  |  | secondDataConfig.putAll(commonConfig); | 
|---|
|  |  |  | return new DefaultKafkaConsumerFactory<>(secondDataConfig); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /* | 
|---|
|  |  |  | * 走航车数据消费工厂 | 
|---|
|  |  |  | * */ | 
|---|
|  |  |  | public ConsumerFactory<String,String> cruiserDataConsumerFactory(){ | 
|---|
|  |  |  | Map<String, Object> commonConfig = consumerConfigs(); | 
|---|
|  |  |  | Map<String, Object> secondDataConfig = cruiserConsumerConfigs(); | 
|---|
|  |  |  | secondDataConfig.putAll(commonConfig); | 
|---|
|  |  |  | return new DefaultKafkaConsumerFactory<>(secondDataConfig); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @Description: 秒级消费者配置 | 
|---|
|  |  |  | * @Param: [] | 
|---|
|  |  |  | * @return: java.util.Map<java.lang.String,java.lang.Object> | 
|---|
|  |  |  | * @Author: 陈凯裕 | 
|---|
|  |  |  | * @Date: 2021/7/19 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public Map<String,Object> secondConsumerConfigs(){ | 
|---|
|  |  |  | Map<String, Object> propsMap = new HashMap<>(); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,secondDataGroupId); | 
|---|
|  |  |  | return propsMap; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /* | 
|---|
|  |  |  | * 走航车消费组配置 | 
|---|
|  |  |  | * */ | 
|---|
|  |  |  | public Map<String,Object> cruiserConsumerConfigs(){ | 
|---|
|  |  |  | Map<String, Object> propsMap = new HashMap<>(); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId); | 
|---|
|  |  |  | return propsMap; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @Description: 通用配置 | 
|---|
|  |  |  | * @Param: [] | 
|---|
|  |  |  | * @return: java.util.Map<java.lang.String,java.lang.Object> | 
|---|
|  |  |  | * @Author: 陈凯裕 | 
|---|
|  |  |  | * @Date: 2021/7/19 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | public Map<String, Object> consumerConfigs() { | 
|---|
|  |  |  | Map<String, Object> propsMap = new HashMap<>(); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); | 
|---|
|  |  |  | propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | 
|---|
|  |  |  | return propsMap; | 
|---|
|  |  |  | } | 
|---|