From dc3e39dfbc7f99e2dd865c0f8274647c00bc5c70 Mon Sep 17 00:00:00 2001
From: kaiyu <404897439@qq.com>
Date: Wed, 16 Jun 2021 15:54:23 +0800
Subject: [PATCH] screen-api           增加kafka消费者以及websocket搭建

---
 screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java   |    4 
 screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java |   52 +++++++++++++++++
 screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java   |   20 +-----
 screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java   |   21 +++++++
 screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java       |   60 ++++++++++++++++++++
 5 files changed, 140 insertions(+), 17 deletions(-)

diff --git a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
index 9d983d0..a4b55c1 100644
--- a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -16,8 +16,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
-/*@Configuration
-@EnableKafka*/
+@Configuration
+@EnableKafka
 public class KafkaConsumerConfig {
     @Value("${kafka.consumer.servers}")
     private String servers;
@@ -25,10 +25,6 @@
     private boolean enableAutoCommit;
     @Value("${kafka.consumer.session.timeout}")
     private String sessionTimeout;
-    @Value("${kafka.consumer.auto.commit.interval}")
-    private String autoCommitInterval;
-    @Value("${kafka.consumer.groupMenu.id}")
-    private String groupId;
     @Value("${kafka.consumer.auto.offset.reset}")
     private String autoOffsetReset;
     @Value("${kafka.consumer.concurrency}")
@@ -37,13 +33,9 @@
     @Bean
     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
-        factory.setConsumerFactory(consumerFactory());
-        factory.setConcurrency(concurrency);
-        factory.getContainerProperties().setPollTimeout(1500);
-        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
-
-        /*factory.setBatchListener(true);//@KafkaListener ������������  ���������������������Kafka���������������������ConsumerConfig.MAX_POLL_RECORDS_CONFIG
-        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//������������������������������*/
+        factory.setConsumerFactory(consumerFactory());//���������������������
+        factory.setConcurrency(concurrency);//���������������
+        factory.getContainerProperties().setPollTimeout(1500);//������������������������������
         return factory;
     }
 
@@ -56,11 +48,9 @@
         Map<String, Object> propsMap = new HashMap<>();
         propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
         propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
-        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
         propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
         propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
         return propsMap;
     }
diff --git a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
index ef5b141..a5603f1 100644
--- a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
@@ -13,8 +13,8 @@
 import java.util.HashMap;
 import java.util.Map;
 
-/*@Configuration
-@EnableKafka*/
+@Configuration
+@EnableKafka
 public class KafkaProducerConfig {
     @Value("${kafka.producer.servers}")
     private String servers;
diff --git a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
new file mode 100644
index 0000000..c4f6e0a
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
@@ -0,0 +1,21 @@
+package com.moral.api.config.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @ClassName WebSocketConfig
+ * @Description TODO
+ * @Author ���������
+ * @Date 2021/6/15 13:53
+ * @Version TODO
+ **/
+@Configuration
+public class WebSocketConfig {
+
+    @Bean
+    public ServerEndpointExporter serverEndpointExporter(){
+        return new ServerEndpointExporter();
+    }
+}
diff --git a/screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java b/screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java
new file mode 100644
index 0000000..8c31f54
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java
@@ -0,0 +1,52 @@
+package com.moral.api.kafka.consumer;
+
+import com.alibaba.fastjson.JSON;
+import com.moral.api.websocket.SingleDeviceServer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerSeekAware;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @ClassName SecondsDataConsumer1
+ * @Description TODO
+ * @Author ���������
+ * @Date 2021/6/15 14:49
+ * @Version TODO
+ **/
+@Component
+public class SecondsDataConsumer implements ConsumerSeekAware {
+
+    @KafkaListener(topics = "test",groupId = "SecondsDataGroup3")
+    public void listen(ConsumerRecord<String, String> record , Consumer consumer) throws Exception {
+        String messageStr = record.value();
+        Map<String,String> message = (Map<String,String>)JSON.parse(messageStr);
+        CopyOnWriteArraySet<SingleDeviceServer> sockets = SingleDeviceServer.sockets;
+        for (SingleDeviceServer socket : sockets) {
+            socket.sendMessage(message.toString());
+        }
+        System.out.println(message);
+    }
+
+    @Override
+    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
+
+    }
+
+    //consumer���������������������������������offset������������
+    @Override
+    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+        map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition()));
+    }
+
+    @Override
+    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+
+    }
+}
diff --git a/screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java b/screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java
new file mode 100644
index 0000000..fa1a3b3
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java
@@ -0,0 +1,60 @@
+package com.moral.api.websocket;
+
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.PathVariable;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @ClassName SingleDeviceServer
+ * @Description TODO
+ * @Author ���������
+ * @Date 2021/6/15 13:56
+ * @Version TODO
+ **/
+@ServerEndpoint("/singleDevice/{mac}")
+@Component
+public class SingleDeviceServer {
+
+    //���������������������������������server������
+    public static CopyOnWriteArraySet<SingleDeviceServer> sockets = new CopyOnWriteArraySet<>();
+
+    private Session session;
+
+    private String mac;
+
+    @OnOpen
+    public void onOpen(Session session, @PathParam("mac") String mac) throws Exception {
+        this.session = session;
+        this.mac = mac;
+        sockets.add(this);
+        System.out.println(mac);
+    }
+
+    @OnClose
+    public void onClose() {
+        sockets.remove(this);
+    }
+
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        System.out.println(message);
+    }
+
+    @OnError
+    public void onError(Session session, Throwable error) {
+    }
+
+    public void sendMessage(String message) throws Exception {
+        if (this.session.isOpen()) {
+            synchronized (session) {
+                this.session.getBasicRemote().sendText(message);
+            }
+        }
+    }
+
+
+}

--
Gitblit v1.8.0