|  |  | 
 |  |  | import java.util.HashMap; | 
 |  |  | import java.util.Map; | 
 |  |  |  | 
 |  |  | /*@Configuration | 
 |  |  | @EnableKafka*/ | 
 |  |  | @Configuration | 
 |  |  | @EnableKafka | 
 |  |  | public class KafkaConsumerConfig { | 
 |  |  |     @Value("${kafka.consumer.servers}") | 
 |  |  |     private String servers; | 
 |  |  | 
 |  |  |     private String autoOffsetReset; | 
 |  |  |     @Value("${kafka.consumer.concurrency}") | 
 |  |  |     private int concurrency; | 
 |  |  |     @Value("${kafka.groupId.insert}") | 
 |  |  |     private String insertGroupId; | 
 |  |  |     @Value("${kafka.groupId.state}") | 
 |  |  |     private String stateGroupId; | 
 |  |  |  | 
 |  |  |     @Bean | 
 |  |  |     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { | 
 |  |  |     @Bean("insertListenerContainerFactory") | 
 |  |  |     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() { | 
 |  |  |         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | 
 |  |  |         factory.setConsumerFactory(consumerFactory()); | 
 |  |  |         factory.setConsumerFactory(insertConsumerFactory()); | 
 |  |  |         factory.setConcurrency(concurrency); | 
 |  |  |         factory.getContainerProperties().setPollTimeout(1500); | 
 |  |  |         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); | 
 |  |  |         return factory; | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public ConsumerFactory<String, String> consumerFactory() { | 
 |  |  |         return new DefaultKafkaConsumerFactory<>(consumerConfigs()); | 
 |  |  |     @Bean("stateListenerContainerFactory") | 
 |  |  |     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() { | 
 |  |  |         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); | 
 |  |  |         factory.setConsumerFactory(stateConsumerFactory()); | 
 |  |  |         factory.setConcurrency(concurrency); | 
 |  |  |         factory.getContainerProperties().setPollTimeout(1500); | 
 |  |  |         factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); | 
 |  |  |         return factory; | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     //kafka数据存入数据库消费组配置 | 
 |  |  |     public ConsumerFactory<String, String> insertConsumerFactory() { | 
 |  |  |         Map<String, Object> map = consumerConfigs(); | 
 |  |  |         map.put(ConsumerConfig.GROUP_ID_CONFIG, insertGroupId); | 
 |  |  |         return new DefaultKafkaConsumerFactory<>(map); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     //判断设备状态消费组配置 | 
 |  |  |     public ConsumerFactory<String, String> stateConsumerFactory() { | 
 |  |  |         Map<String, Object> map = consumerConfigs(); | 
 |  |  |         map.put(ConsumerConfig.GROUP_ID_CONFIG, stateGroupId); | 
 |  |  |         return new DefaultKafkaConsumerFactory<>(map); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /* | 
 |  |  |      * 通用配置 | 
 |  |  |      * */ | 
 |  |  |     public Map<String, Object> consumerConfigs() { | 
 |  |  |         Map<String, Object> propsMap = new HashMap<>(); | 
 |  |  |         propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); |