kaiyu
2021-06-16 dc3e39dfbc7f99e2dd865c0f8274647c00bc5c70
screen-api
增加kafka消费者以及websocket搭建
3 files added
2 files modified
157 ■■■■ changed files
screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java 20 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java 4 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java 21 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java 52 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java 60 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -16,8 +16,8 @@
import java.util.HashMap;
import java.util.Map;
/*@Configuration
@EnableKafka*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
@@ -25,10 +25,6 @@
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.groupMenu.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
@@ -37,13 +33,9 @@
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        /*factory.setBatchListener(true);//@KafkaListener 批量消费  每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式*/
        factory.setConsumerFactory(consumerFactory());//设置消费者工厂
        factory.setConcurrency(concurrency);//设置线程数
        factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间
        return factory;
    }
@@ -56,11 +48,9 @@
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
@@ -13,8 +13,8 @@
import java.util.HashMap;
import java.util.Map;
/*@Configuration
@EnableKafka*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
New file
@@ -0,0 +1,21 @@
package com.moral.api.config.websocket;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
/**
 * @ClassName WebSocketConfig
 * @Description TODO
 * @Author 陈凯裕
 * @Date 2021/6/15 13:53
 * @Version TODO
 **/
@Configuration
public class WebSocketConfig {
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }
}
screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java
New file
@@ -0,0 +1,52 @@
package com.moral.api.kafka.consumer;
import com.alibaba.fastjson.JSON;
import com.moral.api.websocket.SingleDeviceServer;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * @ClassName SecondsDataConsumer1
 * @Description TODO
 * @Author 陈凯裕
 * @Date 2021/6/15 14:49
 * @Version TODO
 **/
@Component
public class SecondsDataConsumer implements ConsumerSeekAware {
    @KafkaListener(topics = "test",groupId = "SecondsDataGroup3")
    public void listen(ConsumerRecord<String, String> record , Consumer consumer) throws Exception {
        String messageStr = record.value();
        Map<String,String> message = (Map<String,String>)JSON.parse(messageStr);
        CopyOnWriteArraySet<SingleDeviceServer> sockets = SingleDeviceServer.sockets;
        for (SingleDeviceServer socket : sockets) {
            socket.sendMessage(message.toString());
        }
        System.out.println(message);
    }
    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
    }
    //consumer回调函数,设置从最新的offset开始消费
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
        map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition()));
    }
    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
    }
}
screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java
New file
@@ -0,0 +1,60 @@
package com.moral.api.websocket;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PathVariable;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.concurrent.CopyOnWriteArraySet;
/**
 * @ClassName SingleDeviceServer
 * @Description TODO
 * @Author 陈凯裕
 * @Date 2021/6/15 13:56
 * @Version TODO
 **/
@ServerEndpoint("/singleDevice/{mac}")
@Component
public class SingleDeviceServer {
    //线程安全集合,用于存放server对象
    public static CopyOnWriteArraySet<SingleDeviceServer> sockets = new CopyOnWriteArraySet<>();
    private Session session;
    private String mac;
    @OnOpen
    public void onOpen(Session session, @PathParam("mac") String mac) throws Exception {
        this.session = session;
        this.mac = mac;
        sockets.add(this);
        System.out.println(mac);
    }
    @OnClose
    public void onClose() {
        sockets.remove(this);
    }
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println(message);
    }
    @OnError
    public void onError(Session session, Throwable error) {
    }
    public void sendMessage(String message) throws Exception {
        if (this.session.isOpen()) {
            synchronized (session) {
                this.session.getBasicRemote().sendText(message);
            }
        }
    }
}