| | |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | |
| | | /*@Configuration |
| | | @EnableKafka*/ |
| | | @Configuration |
| | | @EnableKafka |
| | | public class KafkaConsumerConfig { |
| | | @Value("${kafka.consumer.servers}") |
| | | private String servers; |
| | |
| | | private boolean enableAutoCommit; |
| | | @Value("${kafka.consumer.session.timeout}") |
| | | private String sessionTimeout; |
| | | @Value("${kafka.consumer.auto.commit.interval}") |
| | | private String autoCommitInterval; |
| | | @Value("${kafka.consumer.groupMenu.id}") |
| | | private String groupId; |
| | | @Value("${kafka.consumer.auto.offset.reset}") |
| | | private String autoOffsetReset; |
| | | @Value("${kafka.consumer.concurrency}") |
| | | private int concurrency; |
| | | @Value("${kafka.groupId.second-data}") |
| | | private String secondDataGroupId; |
| | | @Value("${kafka.groupId.cruiser-data}") |
| | | private String cruiserDataGroupId; |
| | | |
| | | @Bean |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(consumerFactory()); |
| | | factory.setConcurrency(concurrency); |
| | | factory.getContainerProperties().setPollTimeout(1500); |
| | | factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); |
| | | |
| | | /*factory.setBatchListener(true);//@KafkaListener 批量消费 每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG |
| | | factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式*/ |
| | | factory.setConsumerFactory(consumerFactory());//设置消费者工厂 |
| | | factory.setConcurrency(concurrency);//设置线程数 |
| | | factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 |
| | | return factory; |
| | | } |
| | | |
| | |
| | | return new DefaultKafkaConsumerFactory<>(consumerConfigs()); |
| | | } |
| | | |
| | | @Bean("secondDataListenerFactory") |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> secondDataListenerFactory(){ |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(secondDataConsumerFactory());//设置消费者工厂 |
| | | factory.setConcurrency(concurrency);//设置线程数 |
| | | factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 |
| | | return factory; |
| | | } |
| | | |
| | | @Bean("cruiserDataListenerFactory") |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){ |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(cruiserDataConsumerFactory());//设置消费者工厂 |
| | | factory.setConcurrency(concurrency);//设置线程数 |
| | | factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 |
| | | return factory; |
| | | } |
| | | |
| | | /** |
| | | * @Description: 秒级消费工厂 |
| | | * @Param: [] |
| | | * @return: org.springframework.kafka.core.ConsumerFactory<java.lang.String,java.lang.String> |
| | | * @Author: 陈凯裕 |
| | | * @Date: 2021/7/19 |
| | | */ |
| | | public ConsumerFactory<String,String> secondDataConsumerFactory(){ |
| | | Map<String, Object> commonConfig = consumerConfigs(); |
| | | Map<String, Object> secondDataConfig = secondConsumerConfigs(); |
| | | secondDataConfig.putAll(commonConfig); |
| | | return new DefaultKafkaConsumerFactory<>(secondDataConfig); |
| | | } |
| | | |
| | | /* |
| | | * 走航车数据消费工厂 |
| | | * */ |
| | | public ConsumerFactory<String,String> cruiserDataConsumerFactory(){ |
| | | Map<String, Object> commonConfig = consumerConfigs(); |
| | | Map<String, Object> secondDataConfig = cruiserConsumerConfigs(); |
| | | secondDataConfig.putAll(commonConfig); |
| | | return new DefaultKafkaConsumerFactory<>(secondDataConfig); |
| | | } |
| | | |
| | | /** |
| | | * @Description: 秒级消费者配置 |
| | | * @Param: [] |
| | | * @return: java.util.Map<java.lang.String,java.lang.Object> |
| | | * @Author: 陈凯裕 |
| | | * @Date: 2021/7/19 |
| | | */ |
| | | public Map<String,Object> secondConsumerConfigs(){ |
| | | Map<String, Object> propsMap = new HashMap<>(); |
| | | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG,secondDataGroupId); |
| | | return propsMap; |
| | | } |
| | | |
| | | /* |
| | | * 走航车消费组配置 |
| | | * */ |
| | | public Map<String,Object> cruiserConsumerConfigs(){ |
| | | Map<String, Object> propsMap = new HashMap<>(); |
| | | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId); |
| | | return propsMap; |
| | | } |
| | | |
| | | /** |
| | | * @Description: 通用配置 |
| | | * @Param: [] |
| | | * @return: java.util.Map<java.lang.String,java.lang.Object> |
| | | * @Author: 陈凯裕 |
| | | * @Date: 2021/7/19 |
| | | */ |
| | | public Map<String, Object> consumerConfigs() { |
| | | Map<String, Object> propsMap = new HashMap<>(); |
| | | propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); |
| | | propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); |
| | | propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); |
| | | propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); |
| | | propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| | | propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); |
| | | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); |
| | | propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); |
| | | return propsMap; |
| | | } |