From 3bf4a89e8470abf09ca61db5a5e8e8e84f45455b Mon Sep 17 00:00:00 2001
From: kaiyu <404897439@qq.com>
Date: Wed, 30 Sep 2020 14:12:56 +0800
Subject: [PATCH] 添加标识符

---
 src/main/java/com/moral/webSocketServer/WebSocketServer.java |   35 +++++++++++++++++++++++++++--------
 1 files changed, 27 insertions(+), 8 deletions(-)

diff --git a/src/main/java/com/moral/webSocketServer/WebSocketServer.java b/src/main/java/com/moral/webSocketServer/WebSocketServer.java
index 46b26d4..f886cd2 100644
--- a/src/main/java/com/moral/webSocketServer/WebSocketServer.java
+++ b/src/main/java/com/moral/webSocketServer/WebSocketServer.java
@@ -17,12 +17,18 @@
 import com.moral.common.util.ParameterUtils;
 import com.moral.entity.Device;
 import com.moral.service.DeviceService;
+import com.moral.util.RabbitMQUtils;
+import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.QueueingConsumer;
 
 import lombok.extern.slf4j.Slf4j;
+
 import org.springframework.stereotype.Component;
 
 /**
@@ -62,28 +68,33 @@
         this.session = session;
         //������������������������������������������������
         webSocketSet.add(this);
+
         int flag = param.indexOf("&");
         int regionCodeIndex = param.indexOf("_");
         orgId = param.substring(0, flag);
-        accountId = param.substring(flag + 1,regionCodeIndex);
+        accountId = param.substring(flag + 1, regionCodeIndex);
         regionCode = param.substring(regionCodeIndex + 1);
         String QUEUE_NAME = "deviceInfo_" + accountId;
+
         Map<String, Object> paramMap = new HashMap<String, Object>();
         paramMap.put("orgId", orgId);
         paramMap.put("regionCode", regionCode);
         ParameterUtils.getRegionType4RegionCode(paramMap);
         List<Device> deviceList = deviceService.queryDevice(paramMap);
+
+
         try {
             //������������������������������������������������
-            ConnectionFactory factory = new ConnectionFactory();
+            /*ConnectionFactory factory = new ConnectionFactory();
             //������MabbitMQ������������ip���������������
             factory.setHost("47.96.15.25");
             factory.setPort(5672);
             factory.setUsername("guest");
-            factory.setPassword("guest_pass");
+            factory.setPassword("guest_pass");*/
+            //Connection connection = RabbitMQUtils.getConnection();
             String routingKey;
-            connection = factory.newConnection();
-            channel = connection.createChannel();
+            this.connection = RabbitMQUtils.getConnection();
+            channel = this.connection.createChannel();
             //���������������������������������������������������������������������������������������������������������
             channel.queueDeclare(QUEUE_NAME, false, false, true, null);
             for (Device d : deviceList) {
@@ -91,7 +102,8 @@
                 channel.queueBind(QUEUE_NAME, "screens_data", routingKey);
             }
             //���������������������
-            QueueingConsumer consumer = new QueueingConsumer(channel);
+            /*java������������*/
+            /*QueueingConsumer consumer = new QueueingConsumer(channel);
             //������������������
             channel.basicConsume(QUEUE_NAME, true, consumer);
 
@@ -100,7 +112,15 @@
                 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                 String message = new String(delivery.getBody());
                 sendMessage(message);
-            }
+            }*/
+            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                    String msg = new String(body, "utf-8");
+                    sendMessage(msg);
+                    channel.basicAck(envelope.getDeliveryTag(), false);
+                }
+            });
         } catch (Exception e) {
             log.error(e.getMessage());
         }
@@ -122,7 +142,6 @@
 
     @OnMessage
     public void onMessage(String message) {
-        System.out.println(message);
         for (WebSocketServer webSocketServer : webSocketSet) {
             webSocketServer.sendMessage(message);
         }

--
Gitblit v1.8.0