From dc3e39dfbc7f99e2dd865c0f8274647c00bc5c70 Mon Sep 17 00:00:00 2001
From: kaiyu <404897439@qq.com>
Date: Wed, 16 Jun 2021 15:54:23 +0800
Subject: [PATCH] screen-api 增加kafka消费者以及websocket搭建
---
screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java | 4
screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java | 52 +++++++++++++++++
screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java | 20 +-----
screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java | 21 +++++++
screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java | 60 ++++++++++++++++++++
5 files changed, 140 insertions(+), 17 deletions(-)
diff --git a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
index 9d983d0..a4b55c1 100644
--- a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -16,8 +16,8 @@
import java.util.HashMap;
import java.util.Map;
-/*@Configuration
-@EnableKafka*/
+@Configuration
+@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.servers}")
private String servers;
@@ -25,10 +25,6 @@
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
- @Value("${kafka.consumer.auto.commit.interval}")
- private String autoCommitInterval;
- @Value("${kafka.consumer.groupMenu.id}")
- private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
@@ -37,13 +33,9 @@
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
- factory.setConsumerFactory(consumerFactory());
- factory.setConcurrency(concurrency);
- factory.getContainerProperties().setPollTimeout(1500);
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
-
- /*factory.setBatchListener(true);//@KafkaListener ������������ ���������������������Kafka���������������������ConsumerConfig.MAX_POLL_RECORDS_CONFIG
- factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//������������������������������*/
+ factory.setConsumerFactory(consumerFactory());//���������������������
+ factory.setConcurrency(concurrency);//���������������
+ factory.getContainerProperties().setPollTimeout(1500);//������������������������������
return factory;
}
@@ -56,11 +48,9 @@
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
- propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
diff --git a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
index ef5b141..a5603f1 100644
--- a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
@@ -13,8 +13,8 @@
import java.util.HashMap;
import java.util.Map;
-/*@Configuration
-@EnableKafka*/
+@Configuration
+@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
diff --git a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
new file mode 100644
index 0000000..c4f6e0a
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
@@ -0,0 +1,21 @@
+package com.moral.api.config.websocket;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
+
+/**
+ * @ClassName WebSocketConfig
+ * @Description TODO
+ * @Author ���������
+ * @Date 2021/6/15 13:53
+ * @Version TODO
+ **/
+@Configuration
+public class WebSocketConfig {
+
+ @Bean
+ public ServerEndpointExporter serverEndpointExporter(){
+ return new ServerEndpointExporter();
+ }
+}
diff --git a/screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java b/screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java
new file mode 100644
index 0000000..8c31f54
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java
@@ -0,0 +1,52 @@
+package com.moral.api.kafka.consumer;
+
+import com.alibaba.fastjson.JSON;
+import com.moral.api.websocket.SingleDeviceServer;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerSeekAware;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @ClassName SecondsDataConsumer1
+ * @Description TODO
+ * @Author ���������
+ * @Date 2021/6/15 14:49
+ * @Version TODO
+ **/
+@Component
+public class SecondsDataConsumer implements ConsumerSeekAware {
+
+ @KafkaListener(topics = "test",groupId = "SecondsDataGroup3")
+ public void listen(ConsumerRecord<String, String> record , Consumer consumer) throws Exception {
+ String messageStr = record.value();
+ Map<String,String> message = (Map<String,String>)JSON.parse(messageStr);
+ CopyOnWriteArraySet<SingleDeviceServer> sockets = SingleDeviceServer.sockets;
+ for (SingleDeviceServer socket : sockets) {
+ socket.sendMessage(message.toString());
+ }
+ System.out.println(message);
+ }
+
+ @Override
+ public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
+
+ }
+
+ //consumer���������������������������������offset������������
+ @Override
+ public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+ map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition()));
+ }
+
+ @Override
+ public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+
+ }
+}
diff --git a/screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java b/screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java
new file mode 100644
index 0000000..fa1a3b3
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java
@@ -0,0 +1,60 @@
+package com.moral.api.websocket;
+
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.PathVariable;
+
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * @ClassName SingleDeviceServer
+ * @Description TODO
+ * @Author ���������
+ * @Date 2021/6/15 13:56
+ * @Version TODO
+ **/
+@ServerEndpoint("/singleDevice/{mac}")
+@Component
+public class SingleDeviceServer {
+
+ //���������������������������������server������
+ public static CopyOnWriteArraySet<SingleDeviceServer> sockets = new CopyOnWriteArraySet<>();
+
+ private Session session;
+
+ private String mac;
+
+ @OnOpen
+ public void onOpen(Session session, @PathParam("mac") String mac) throws Exception {
+ this.session = session;
+ this.mac = mac;
+ sockets.add(this);
+ System.out.println(mac);
+ }
+
+ @OnClose
+ public void onClose() {
+ sockets.remove(this);
+ }
+
+ @OnMessage
+ public void onMessage(String message, Session session) {
+ System.out.println(message);
+ }
+
+ @OnError
+ public void onError(Session session, Throwable error) {
+ }
+
+ public void sendMessage(String message) throws Exception {
+ if (this.session.isOpen()) {
+ synchronized (session) {
+ this.session.getBasicRemote().sendText(message);
+ }
+ }
+ }
+
+
+}
--
Gitblit v1.8.0