| | |
| | | private String autoOffsetReset; |
| | | @Value("${kafka.consumer.concurrency}") |
| | | private int concurrency; |
| | | @Value("${kafka.groupId.insert}") |
| | | private String insertGroupId; |
| | | @Value("${kafka.groupId.state}") |
| | | private String stateGroupId; |
| | | |
| | | @Bean |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { |
| | | @Bean("insertListenerContainerFactory") |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() { |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(consumerFactory()); |
| | | factory.setConsumerFactory(insertConsumerFactory()); |
| | | factory.setConcurrency(concurrency); |
| | | factory.getContainerProperties().setPollTimeout(1500); |
| | | factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); |
| | | return factory; |
| | | } |
| | | |
| | | public ConsumerFactory<String, String> consumerFactory() { |
| | | return new DefaultKafkaConsumerFactory<>(consumerConfigs()); |
| | | @Bean("stateListenerContainerFactory") |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() { |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(stateConsumerFactory()); |
| | | factory.setConcurrency(concurrency); |
| | | factory.getContainerProperties().setPollTimeout(1500); |
| | | factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); |
| | | return factory; |
| | | } |
| | | |
| | | //kafka数据存入数据库消费组配置 |
| | | public ConsumerFactory<String, String> insertConsumerFactory() { |
| | | Map<String, Object> map = consumerConfigs(); |
| | | map.put(ConsumerConfig.GROUP_ID_CONFIG, insertGroupId); |
| | | return new DefaultKafkaConsumerFactory<>(map); |
| | | } |
| | | |
| | | //判断设备状态消费组配置 |
| | | public ConsumerFactory<String, String> stateConsumerFactory() { |
| | | Map<String, Object> map = consumerConfigs(); |
| | | map.put(ConsumerConfig.GROUP_ID_CONFIG, stateGroupId); |
| | | return new DefaultKafkaConsumerFactory<>(map); |
| | | } |
| | | |
| | | /* |
| | | * 通用配置 |
| | | * */ |
| | | public Map<String, Object> consumerConfigs() { |
| | | Map<String, Object> propsMap = new HashMap<>(); |
| | | propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); |