screen-common/src/main/java/com/moral/constant/KafkaConstants.java
@@ -21,14 +21,5 @@ * 无人机,走航车等特殊设备秒数据主题 */ public static final String TOPIC_SECOND_SPECIAL = "second_data_special"; /** * 存入数据库的消费组 */ public static final String GROUP_INSERT = "insert"; /** * 用于判断设备状态消费组 */ public static final String GROUP_STATE = "state"; } screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -31,21 +31,48 @@ private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Value("${kafka.groupId.insert}") private String insertGroupId; @Value("${kafka.groupId.state}") private String stateGroupId; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConsumerFactory(insertConsumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(stateConsumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } //kafka数据存入数据库消费组配置 public ConsumerFactory<String, String> insertConsumerFactory() { Map<String, Object> map = consumerConfigs(); map.put(ConsumerConfig.GROUP_ID_CONFIG, insertGroupId); return new DefaultKafkaConsumerFactory<>(map); } //判断设备状态消费组配置 public ConsumerFactory<String, String> stateConsumerFactory() { Map<String, Object> map = consumerConfigs(); map.put(ConsumerConfig.GROUP_ID_CONFIG, stateGroupId); return new DefaultKafkaConsumerFactory<>(map); } /* * 通用配置 * */ public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -44,7 +44,7 @@ private HistorySecondSpecialService historySecondSpecialService; //分钟数据 @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory") @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -83,7 +83,7 @@ } //小时数据 @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory") @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { @@ -122,7 +122,7 @@ } //秒数据,修改设备状态,缓存最新秒数据 @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory") @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") public void listenSecond(ConsumerRecord<String, String> record) { String msg = record.value(); try { @@ -149,7 +149,7 @@ } //特殊设备秒数据 @KafkaListener(topics = KafkaConstants.TOPIC_SECOND_SPECIAL, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory") @KafkaListener(topics = KafkaConstants.TOPIC_SECOND_SPECIAL, containerFactory = "insertListenerContainerFactory") public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { screen-manage/src/main/resources/application-dev.yml
@@ -105,6 +105,9 @@ linger: 1 retries: 0 servers: 172.16.44.65:9092,172.16.44.67:9092,172.16.44.66:9092 groupId: insert: insert state: state mvc: interceptor: exclude: