package com.moral.api.config.kafka; 
 | 
  
 | 
import org.apache.kafka.clients.consumer.ConsumerConfig; 
 | 
import org.apache.kafka.common.serialization.StringDeserializer; 
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
import org.springframework.beans.factory.annotation.Value; 
 | 
import org.springframework.context.annotation.Bean; 
 | 
import org.springframework.context.annotation.Configuration; 
 | 
import org.springframework.kafka.annotation.EnableKafka; 
 | 
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; 
 | 
import org.springframework.kafka.config.KafkaListenerContainerFactory; 
 | 
import org.springframework.kafka.core.ConsumerFactory; 
 | 
import org.springframework.kafka.core.DefaultKafkaConsumerFactory; 
 | 
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; 
 | 
import org.springframework.kafka.listener.ContainerProperties; 
 | 
import java.util.HashMap; 
 | 
import java.util.Map; 
 | 
  
 | 
@Configuration 
 | 
@EnableKafka 
 | 
public class KafkaConsumerConfig { 
 | 
    @Value("${kafka.consumer.servers}") 
 | 
    private String servers; 
 | 
    @Value("${kafka.consumer.enable.auto.commit}") 
 | 
    private boolean enableAutoCommit; 
 | 
    @Value("${kafka.consumer.session.timeout}") 
 | 
    private String sessionTimeout; 
 | 
    @Value("${kafka.consumer.auto.offset.reset}") 
 | 
    private String autoOffsetReset; 
 | 
    @Value("${kafka.consumer.concurrency}") 
 | 
    private int concurrency; 
 | 
    @Value("${kafka.groupId.second-data}") 
 | 
    private String secondDataGroupId; 
 | 
    @Value("${kafka.groupId.cruiser-data}") 
 | 
    private String cruiserDataGroupId; 
 | 
  
 | 
    @Bean 
 | 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 
 | 
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
 | 
        factory.setConsumerFactory(consumerFactory());//设置消费者工厂 
 | 
        factory.setConcurrency(concurrency);//设置线程数 
 | 
        factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 
 | 
        return factory; 
 | 
    } 
 | 
  
 | 
    public ConsumerFactory<String, String> consumerFactory() { 
 | 
        return new DefaultKafkaConsumerFactory<>(consumerConfigs()); 
 | 
    } 
 | 
  
 | 
    @Bean("secondDataListenerFactory") 
 | 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> secondDataListenerFactory(){ 
 | 
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
 | 
        factory.setConsumerFactory(secondDataConsumerFactory());//设置消费者工厂 
 | 
        factory.setConcurrency(concurrency);//设置线程数 
 | 
        factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 
 | 
        return factory; 
 | 
    } 
 | 
  
 | 
    @Bean("cruiserDataListenerFactory") 
 | 
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){ 
 | 
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); 
 | 
        factory.setConsumerFactory(cruiserDataConsumerFactory());//设置消费者工厂 
 | 
        factory.setConcurrency(concurrency);//设置线程数 
 | 
        factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 
 | 
        return factory; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
    * @Description: 秒级消费工厂 
 | 
            * @Param: [] 
 | 
            * @return: org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.String> 
 | 
            * @Author: 陈凯裕 
 | 
            * @Date: 2021/7/19 
 | 
            */ 
 | 
    public ConsumerFactory<String,String> secondDataConsumerFactory(){ 
 | 
        Map<String, Object> commonConfig = consumerConfigs(); 
 | 
        Map<String, Object> secondDataConfig = secondConsumerConfigs(); 
 | 
        secondDataConfig.putAll(commonConfig); 
 | 
        return new DefaultKafkaConsumerFactory<>(secondDataConfig); 
 | 
    } 
 | 
  
 | 
    /* 
 | 
    * 走航车数据消费工厂 
 | 
    * */ 
 | 
    public ConsumerFactory<String,String> cruiserDataConsumerFactory(){ 
 | 
        Map<String, Object> commonConfig = consumerConfigs(); 
 | 
        Map<String, Object> secondDataConfig = cruiserConsumerConfigs(); 
 | 
        secondDataConfig.putAll(commonConfig); 
 | 
        return new DefaultKafkaConsumerFactory<>(secondDataConfig); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
    * @Description: 秒级消费者配置 
 | 
            * @Param: [] 
 | 
            * @return: java.util.Map<java.lang.String,java.lang.Object> 
 | 
            * @Author: 陈凯裕 
 | 
            * @Date: 2021/7/19 
 | 
            */ 
 | 
    public Map<String,Object> secondConsumerConfigs(){ 
 | 
        Map<String, Object> propsMap = new HashMap<>(); 
 | 
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,secondDataGroupId); 
 | 
        return propsMap; 
 | 
    } 
 | 
  
 | 
    /* 
 | 
    * 走航车消费组配置 
 | 
    * */ 
 | 
    public Map<String,Object> cruiserConsumerConfigs(){ 
 | 
        Map<String, Object> propsMap = new HashMap<>(); 
 | 
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId); 
 | 
        return propsMap; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
    * @Description: 通用配置 
 | 
            * @Param: [] 
 | 
            * @return: java.util.Map<java.lang.String,java.lang.Object> 
 | 
            * @Author: 陈凯裕 
 | 
            * @Date: 2021/7/19 
 | 
            */ 
 | 
    public Map<String, Object> consumerConfigs() { 
 | 
        Map<String, Object> propsMap = new HashMap<>(); 
 | 
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); 
 | 
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); 
 | 
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); 
 | 
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
 | 
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); 
 | 
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); 
 | 
        return propsMap; 
 | 
    } 
 | 
} 
 |