| package com.moral.api.config.kafka; | 
|   | 
| import org.apache.kafka.clients.consumer.ConsumerConfig; | 
| import org.apache.kafka.common.serialization.StringDeserializer; | 
| import org.springframework.beans.factory.annotation.Value; | 
| import org.springframework.context.annotation.Bean; | 
| import org.springframework.context.annotation.Configuration; | 
| import org.springframework.kafka.annotation.EnableKafka; | 
| import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; | 
| import org.springframework.kafka.config.KafkaListenerContainerFactory; | 
| import org.springframework.kafka.core.ConsumerFactory; | 
| import org.springframework.kafka.core.DefaultKafkaConsumerFactory; | 
| import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; | 
| import org.springframework.kafka.listener.ContainerProperties; | 
|   | 
| import java.util.HashMap; | 
| import java.util.Map; | 
|   | 
| @Configuration | 
| @EnableKafka | 
| public class KafkaConsumerConfig { | 
|     @Value("47.99.145.48:9092,47.111.116.38:9092,47.111.124.159:9092") | 
|     private String servers; | 
|     @Value("${kafka.consumer.enable.auto.commit}") | 
|     private boolean enableAutoCommit; | 
|     @Value("${kafka.consumer.session.timeout}") | 
|     private String sessionTimeout; | 
|     @Value("${kafka.consumer.auto.commit.interval}") | 
|     private String autoCommitInterval; | 
|     @Value("${kafka.consumer.auto.offset.reset}") | 
|     private String autoOffsetReset; | 
|     @Value("${kafka.consumer.concurrency}") | 
|     private int concurrency; | 
|     @Value("${kafka.groupId.insert}") | 
|     private String insertGroupId; | 
|     @Value("${kafka.groupId.state}") | 
|     private String stateGroupId; | 
|   | 
|     @Bean("insertListenerContainerFactory") | 
|     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() { | 
|         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | 
|         factory.setConsumerFactory(insertConsumerFactory()); | 
|         factory.setConcurrency(concurrency); | 
|         factory.getContainerProperties().setPollTimeout(1500); | 
|         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); | 
|         return factory; | 
|     } | 
|   | 
|     @Bean("stateListenerContainerFactory") | 
|     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() { | 
|         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | 
|         factory.setConsumerFactory(stateConsumerFactory()); | 
|         factory.setConcurrency(concurrency); | 
|         factory.getContainerProperties().setPollTimeout(1500); | 
|         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); | 
|         return factory; | 
|     } | 
|   | 
|     //kafka数据存入数据库消费组配置 | 
|     public ConsumerFactory<String, String> insertConsumerFactory() { | 
|         Map<String, Object> map = consumerConfigs(); | 
|         map.put(ConsumerConfig.GROUP_ID_CONFIG, insertGroupId); | 
|         return new DefaultKafkaConsumerFactory<>(map); | 
|     } | 
|   | 
|     //判断设备状态消费组配置 | 
|     public ConsumerFactory<String, String> stateConsumerFactory() { | 
|         Map<String, Object> map = consumerConfigs(); | 
|         map.put(ConsumerConfig.GROUP_ID_CONFIG, stateGroupId); | 
|         return new DefaultKafkaConsumerFactory<>(map); | 
|     } | 
|   | 
|     /* | 
|      * 通用配置 | 
|      * */ | 
|     public Map<String, Object> consumerConfigs() { | 
|         Map<String, Object> propsMap = new HashMap<>(); | 
|         propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); | 
|         propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); | 
|         propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); | 
|         propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); | 
|         propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | 
|         propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); | 
|         propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); | 
|         return propsMap; | 
|     } | 
| } |