From 3bf4a89e8470abf09ca61db5a5e8e8e84f45455b Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Wed, 30 Sep 2020 14:12:56 +0800 Subject: [PATCH] 添加标识符 --- src/main/java/com/moral/webSocketServer/WebSocketServer.java | 79 ++++++++++++++++++++++++++++++--------- 1 files changed, 61 insertions(+), 18 deletions(-) diff --git a/src/main/java/com/moral/webSocketServer/WebSocketServer.java b/src/main/java/com/moral/webSocketServer/WebSocketServer.java index e019555..f886cd2 100644 --- a/src/main/java/com/moral/webSocketServer/WebSocketServer.java +++ b/src/main/java/com/moral/webSocketServer/WebSocketServer.java @@ -1,6 +1,9 @@ package com.moral.webSocketServer; import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; import javax.websocket.OnClose; @@ -11,13 +14,21 @@ import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import com.moral.service.MonitorPointService; +import com.moral.common.util.ParameterUtils; +import com.moral.entity.Device; +import com.moral.service.DeviceService; +import com.moral.util.RabbitMQUtils; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import lombok.extern.slf4j.Slf4j; + import org.springframework.stereotype.Component; /** @@ -29,14 +40,20 @@ @Component public class WebSocketServer { - public static MonitorPointService monitorPointService; + public static DeviceService deviceService; // ������������������������������������������������������������������������������ private Session session; private String orgId; + private String accountId; + private String regionCode; + + private Connection connection; + + private Channel channel; // ������session��������������������������� private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); @@ -51,27 +68,42 @@ this.session = session; //������������������������������������������������ webSocketSet.add(this); + int flag = param.indexOf("&"); + int regionCodeIndex = param.indexOf("_"); orgId = param.substring(0, flag); - regionCode = param.substring(flag + 1); - String QUEUE_NAME = "deviceInfo"; + accountId = param.substring(flag + 1, regionCodeIndex); + regionCode = param.substring(regionCodeIndex + 1); + String QUEUE_NAME = "deviceInfo_" + accountId; + + Map<String, Object> paramMap = new HashMap<String, Object>(); + paramMap.put("orgId", orgId); + paramMap.put("regionCode", regionCode); + ParameterUtils.getRegionType4RegionCode(paramMap); + List<Device> deviceList = deviceService.queryDevice(paramMap); + + try { //������������������������������������������������ - ConnectionFactory factory = new ConnectionFactory(); + /*ConnectionFactory factory = new ConnectionFactory(); //������MabbitMQ������������ip��������������� factory.setHost("47.96.15.25"); factory.setPort(5672); factory.setUsername("guest"); - factory.setPassword("guest_pass"); - String routingKey = orgId+".*"; - Connection connection = factory.newConnection(); - Channel channel = connection.createChannel(); + factory.setPassword("guest_pass");*/ + //Connection connection = RabbitMQUtils.getConnection(); + String routingKey; + this.connection = RabbitMQUtils.getConnection(); + channel = this.connection.createChannel(); //��������������������������������������������������������������������������������������������������������� channel.queueDeclare(QUEUE_NAME, false, false, true, null); - channel.queueBind(QUEUE_NAME,"screens_data",routingKey); - + for (Device d : deviceList) { + routingKey = orgId + "." + d.getMac(); + channel.queueBind(QUEUE_NAME, "screens_data", routingKey); + } //��������������������� - QueueingConsumer consumer = new QueueingConsumer(channel); + /*java������������*/ + /*QueueingConsumer consumer = new QueueingConsumer(channel); //������������������ channel.basicConsume(QUEUE_NAME, true, consumer); @@ -80,9 +112,16 @@ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); sendMessage(message); - } - }catch (Exception e){ - e.printStackTrace(); + }*/ + channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + String msg = new String(body, "utf-8"); + sendMessage(msg); + channel.basicAck(envelope.getDeliveryTag(), false); + } + }); + } catch (Exception e) { log.error(e.getMessage()); } } @@ -94,11 +133,15 @@ public void onClose() { /**���������Set��� ������������������������*/ webSocketSet.remove(this); + try { + connection.close(); + } catch (IOException e) { + log.error(e.getMessage()); + } } @OnMessage public void onMessage(String message) { - System.out.println(message); for (WebSocketServer webSocketServer : webSocketSet) { webSocketServer.sendMessage(message); } @@ -127,7 +170,7 @@ this.session.getBasicRemote().sendText(message); } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } } @@ -139,7 +182,7 @@ */ @OnError public void onError(Session session, Throwable error) { - error.printStackTrace(); + log.error(error.getMessage()); } } -- Gitblit v1.8.0