From 2685db12198903ccee5b61e65911425776f73bd8 Mon Sep 17 00:00:00 2001
From: kaiyu <404897439@qq.com>
Date: Wed, 21 Oct 2020 16:01:26 +0800
Subject: [PATCH] 修改pom
---
src/main/java/com/moral/webSocketServer/WebSocketServer.java | 35 +++++++++++++++++++++++++++--------
1 files changed, 27 insertions(+), 8 deletions(-)
diff --git a/src/main/java/com/moral/webSocketServer/WebSocketServer.java b/src/main/java/com/moral/webSocketServer/WebSocketServer.java
index 4cc76b6..f886cd2 100644
--- a/src/main/java/com/moral/webSocketServer/WebSocketServer.java
+++ b/src/main/java/com/moral/webSocketServer/WebSocketServer.java
@@ -17,12 +17,18 @@
import com.moral.common.util.ParameterUtils;
import com.moral.entity.Device;
import com.moral.service.DeviceService;
+import com.moral.util.RabbitMQUtils;
+import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.Consumer;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import lombok.extern.slf4j.Slf4j;
+
import org.springframework.stereotype.Component;
/**
@@ -62,28 +68,33 @@
this.session = session;
//������������������������������������������������
webSocketSet.add(this);
+
int flag = param.indexOf("&");
int regionCodeIndex = param.indexOf("_");
orgId = param.substring(0, flag);
- accountId = param.substring(flag + 1,regionCodeIndex);
+ accountId = param.substring(flag + 1, regionCodeIndex);
regionCode = param.substring(regionCodeIndex + 1);
String QUEUE_NAME = "deviceInfo_" + accountId;
+
Map<String, Object> paramMap = new HashMap<String, Object>();
paramMap.put("orgId", orgId);
paramMap.put("regionCode", regionCode);
ParameterUtils.getRegionType4RegionCode(paramMap);
List<Device> deviceList = deviceService.queryDevice(paramMap);
+
+
try {
//������������������������������������������������
- ConnectionFactory factory = new ConnectionFactory();
+ /*ConnectionFactory factory = new ConnectionFactory();
//������MabbitMQ������������ip���������������
factory.setHost("47.96.15.25");
factory.setPort(5672);
factory.setUsername("guest");
- factory.setPassword("guest_pass");
+ factory.setPassword("guest_pass");*/
+ //Connection connection = RabbitMQUtils.getConnection();
String routingKey;
- connection = factory.newConnection();
- channel = connection.createChannel();
+ this.connection = RabbitMQUtils.getConnection();
+ channel = this.connection.createChannel();
//���������������������������������������������������������������������������������������������������������
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
for (Device d : deviceList) {
@@ -91,7 +102,8 @@
channel.queueBind(QUEUE_NAME, "screens_data", routingKey);
}
//���������������������
- QueueingConsumer consumer = new QueueingConsumer(channel);
+ /*java������������*/
+ /*QueueingConsumer consumer = new QueueingConsumer(channel);
//������������������
channel.basicConsume(QUEUE_NAME, true, consumer);
@@ -100,7 +112,15 @@
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
sendMessage(message);
- }
+ }*/
+ channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
+ @Override
+ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+ String msg = new String(body, "utf-8");
+ sendMessage(msg);
+ channel.basicAck(envelope.getDeliveryTag(), false);
+ }
+ });
} catch (Exception e) {
log.error(e.getMessage());
}
@@ -114,7 +134,6 @@
/**���������Set��� ������������������������*/
webSocketSet.remove(this);
try {
- //channel.queueDelete("deviceInfo_" + accountId);
connection.close();
} catch (IOException e) {
log.error(e.getMessage());
--
Gitblit v1.8.0