jinpengyong
2021-08-26 2818433841637ac5aaa8823f2904aec417ef72b7
kafka消费组配置文件
4 files modified
55 ■■■■■ changed files
screen-common/src/main/java/com/moral/constant/KafkaConstants.java 9 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java 35 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java 8 ●●●● patch | view | raw | blame | history
screen-manage/src/main/resources/application-dev.yml 3 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/KafkaConstants.java
@@ -22,13 +22,4 @@
     */
    public static final String TOPIC_SECOND_SPECIAL = "second_data_special";
    /**
     * 存入数据库的消费组
     */
    public static final String GROUP_INSERT = "insert";
    /**
     * 用于判断设备状态消费组
     */
    public static final String GROUP_STATE = "state";
}
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -31,21 +31,48 @@
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;
    @Value("${kafka.groupId.insert}")
    private String insertGroupId;
    @Value("${kafka.groupId.state}")
    private String stateGroupId;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConsumerFactory(insertConsumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(stateConsumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
    //kafka数据存入数据库消费组配置
    public ConsumerFactory<String, String> insertConsumerFactory() {
        Map<String, Object> map = consumerConfigs();
        map.put(ConsumerConfig.GROUP_ID_CONFIG, insertGroupId);
        return new DefaultKafkaConsumerFactory<>(map);
    }
    //判断设备状态消费组配置
    public ConsumerFactory<String, String> stateConsumerFactory() {
        Map<String, Object> map = consumerConfigs();
        map.put(ConsumerConfig.GROUP_ID_CONFIG, stateGroupId);
        return new DefaultKafkaConsumerFactory<>(map);
    }
    /*
    * 通用配置
    * */
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -44,7 +44,7 @@
    private HistorySecondSpecialService historySecondSpecialService;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
@@ -83,7 +83,7 @@
    }
    //小时数据
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory")
    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
@@ -122,7 +122,7 @@
    }
    //秒数据,修改设备状态,缓存最新秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record) {
        String msg = record.value();
        try {
@@ -149,7 +149,7 @@
    }
    //特殊设备秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND_SPECIAL, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND_SPECIAL, containerFactory = "insertListenerContainerFactory")
    public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
screen-manage/src/main/resources/application-dev.yml
@@ -105,6 +105,9 @@
    linger: 1
    retries: 0
    servers: 172.16.44.65:9092,172.16.44.67:9092,172.16.44.66:9092
  groupId:
    insert: insert
    state: state
mvc:
  interceptor:
    exclude: