screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java | ●●●●● patch | view | raw | blame | history | |
screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java | ●●●●● patch | view | raw | blame | history | |
screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java | ●●●●● patch | view | raw | blame | history | |
screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java | ●●●●● patch | view | raw | blame | history | |
screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java | ●●●●● patch | view | raw | blame | history |
screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -16,8 +16,8 @@ import java.util.HashMap; import java.util.Map; /*@Configuration @EnableKafka*/ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @@ -25,10 +25,6 @@ private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.groupMenu.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") @@ -37,13 +33,9 @@ @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); /*factory.setBatchListener(true);//@KafkaListener 批量消费 每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置提交偏移量的方式*/ factory.setConsumerFactory(consumerFactory());//设置消费者工厂 factory.setConcurrency(concurrency);//设置线程数 factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 return factory; } @@ -56,11 +48,9 @@ Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } screen-api/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
@@ -13,8 +13,8 @@ import java.util.HashMap; import java.util.Map; /*@Configuration @EnableKafka*/ @Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
New file @@ -0,0 +1,21 @@ package com.moral.api.config.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @ClassName WebSocketConfig * @Description TODO * @Author 陈凯裕 * @Date 2021/6/15 13:53 * @Version TODO **/ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter(){ return new ServerEndpointExporter(); } } screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java
New file @@ -0,0 +1,52 @@ package com.moral.api.kafka.consumer; import com.alibaba.fastjson.JSON; import com.moral.api.websocket.SingleDeviceServer; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.stereotype.Component; import java.util.Map; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; /** * @ClassName SecondsDataConsumer1 * @Description TODO * @Author 陈凯裕 * @Date 2021/6/15 14:49 * @Version TODO **/ @Component public class SecondsDataConsumer implements ConsumerSeekAware { @KafkaListener(topics = "test",groupId = "SecondsDataGroup3") public void listen(ConsumerRecord<String, String> record , Consumer consumer) throws Exception { String messageStr = record.value(); Map<String,String> message = (Map<String,String>)JSON.parse(messageStr); CopyOnWriteArraySet<SingleDeviceServer> sockets = SingleDeviceServer.sockets; for (SingleDeviceServer socket : sockets) { socket.sendMessage(message.toString()); } System.out.println(message); } @Override public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) { } //consumer回调函数,设置从最新的offset开始消费 @Override public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition())); } @Override public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { } } screen-api/src/main/java/com/moral/api/websocket/SingleDeviceServer.java
New file @@ -0,0 +1,60 @@ package com.moral.api.websocket; import org.springframework.stereotype.Component; import org.springframework.web.bind.annotation.PathVariable; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.CopyOnWriteArraySet; /** * @ClassName SingleDeviceServer * @Description TODO * @Author 陈凯裕 * @Date 2021/6/15 13:56 * @Version TODO **/ @ServerEndpoint("/singleDevice/{mac}") @Component public class SingleDeviceServer { //线程安全集合,用于存放server对象 public static CopyOnWriteArraySet<SingleDeviceServer> sockets = new CopyOnWriteArraySet<>(); private Session session; private String mac; @OnOpen public void onOpen(Session session, @PathParam("mac") String mac) throws Exception { this.session = session; this.mac = mac; sockets.add(this); System.out.println(mac); } @OnClose public void onClose() { sockets.remove(this); } @OnMessage public void onMessage(String message, Session session) { System.out.println(message); } @OnError public void onError(Session session, Throwable error) { } public void sendMessage(String message) throws Exception { if (this.session.isOpen()) { synchronized (session) { this.session.getBasicRemote().sendText(message); } } } }