From bb77c5e3089fc3e9ccb520625b7798d81c99bfa2 Mon Sep 17 00:00:00 2001
From: 于紫祥_1901 <email@yuzixiang_1910>
Date: Fri, 06 Nov 2020 13:55:15 +0800
Subject: [PATCH] update
---
src/main/java/com/moral/webSocketServer/WebSocketServer.java | 86 +++++++++++++++++++++++++++---------------
1 files changed, 55 insertions(+), 31 deletions(-)
diff --git a/src/main/java/com/moral/webSocketServer/WebSocketServer.java b/src/main/java/com/moral/webSocketServer/WebSocketServer.java
index e019555..a6654ed 100644
--- a/src/main/java/com/moral/webSocketServer/WebSocketServer.java
+++ b/src/main/java/com/moral/webSocketServer/WebSocketServer.java
@@ -1,6 +1,9 @@
package com.moral.webSocketServer;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
@@ -11,13 +14,21 @@
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
-import com.moral.service.MonitorPointService;
+import com.moral.common.util.ParameterUtils;
+import com.moral.entity.Device;
+import com.moral.service.DeviceService;
+import com.moral.util.RabbitMQUtils;
+import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import lombok.extern.slf4j.Slf4j;
+
import org.springframework.stereotype.Component;
/**
@@ -29,14 +40,20 @@
@Component
public class WebSocketServer {
- public static MonitorPointService monitorPointService;
+ public static DeviceService deviceService;
// ������������������������������������������������������������������������������
private Session session;
private String orgId;
+ private String accountId;
+
private String regionCode;
+
+ private Connection connection;
+
+ private Channel channel;
// ������session���������������������������
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
@@ -51,38 +68,41 @@
this.session = session;
//������������������������������������������������
webSocketSet.add(this);
+
int flag = param.indexOf("&");
+ int regionCodeIndex = param.indexOf("_");
orgId = param.substring(0, flag);
- regionCode = param.substring(flag + 1);
- String QUEUE_NAME = "deviceInfo";
+ accountId = param.substring(flag + 1, regionCodeIndex);
+ regionCode = param.substring(regionCodeIndex + 1);
+ String QUEUE_NAME = "deviceInfo_" + accountId;
+
+ Map<String, Object> paramMap = new HashMap<String, Object>();
+ paramMap.put("orgId", orgId);
+ paramMap.put("regionCode", regionCode);
+ ParameterUtils.getRegionType4RegionCode(paramMap);
+ List<Device> deviceList = deviceService.queryDevice(paramMap);
+
+
try {
- //������������������������������������������������
- ConnectionFactory factory = new ConnectionFactory();
- //������MabbitMQ������������ip���������������
- factory.setHost("47.96.15.25");
- factory.setPort(5672);
- factory.setUsername("guest");
- factory.setPassword("guest_pass");
- String routingKey = orgId+".*";
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
+ String routingKey;
+ this.connection = RabbitMQUtils.getConnection();
+ channel = this.connection.createChannel();
//���������������������������������������������������������������������������������������������������������
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
- channel.queueBind(QUEUE_NAME,"screens_data",routingKey);
-
- //���������������������
- QueueingConsumer consumer = new QueueingConsumer(channel);
- //������������������
- channel.basicConsume(QUEUE_NAME, true, consumer);
-
- while (true) {
- //nextDelivery������������������������������������������������������������take���������
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- sendMessage(message);
+ for (Device d : deviceList) {
+ routingKey = orgId + "." + d.getMac();
+ channel.queueBind(QUEUE_NAME, "screens_data", routingKey);
}
- }catch (Exception e){
- e.printStackTrace();
+ channel.basicQos(30);//���������������
+ channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ String msg = new String(body, "utf-8");
+ sendMessage(msg);
+ channel.basicAck(envelope.getDeliveryTag(), true);
+ }
+ });
+ } catch (Exception e) {
log.error(e.getMessage());
}
}
@@ -94,11 +114,15 @@
public void onClose() {
/**���������Set��� ������������������������*/
webSocketSet.remove(this);
+ try {
+ connection.close();
+ } catch (IOException e) {
+ log.error(e.getMessage());
+ }
}
@OnMessage
public void onMessage(String message) {
- System.out.println(message);
for (WebSocketServer webSocketServer : webSocketSet) {
webSocketServer.sendMessage(message);
}
@@ -127,7 +151,7 @@
this.session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
- e.printStackTrace();
+ log.error(e.getMessage());
}
}
@@ -139,7 +163,7 @@
*/
@OnError
public void onError(Session session, Throwable error) {
- error.printStackTrace();
+ log.error(error.getMessage());
}
}
--
Gitblit v1.8.0