From 8d3e4f4b795f44e6d59facb11d7fc0918376f4f2 Mon Sep 17 00:00:00 2001 From: 于紫祥_1901 <email@yuzixiang_1910> Date: Wed, 11 Nov 2020 15:57:24 +0800 Subject: [PATCH] 点击沧州monitor显示整个沧州设备 --- src/main/java/com/moral/webSocketServer/WebSocketServer.java | 40 ++++++++++++++++++---------------------- 1 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/moral/webSocketServer/WebSocketServer.java b/src/main/java/com/moral/webSocketServer/WebSocketServer.java index 25ee014..a6654ed 100644 --- a/src/main/java/com/moral/webSocketServer/WebSocketServer.java +++ b/src/main/java/com/moral/webSocketServer/WebSocketServer.java @@ -17,12 +17,18 @@ import com.moral.common.util.ParameterUtils; import com.moral.entity.Device; import com.moral.service.DeviceService; +import com.moral.util.RabbitMQUtils; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Consumer; +import com.rabbitmq.client.DefaultConsumer; +import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; import lombok.extern.slf4j.Slf4j; + import org.springframework.stereotype.Component; /** @@ -66,7 +72,7 @@ int flag = param.indexOf("&"); int regionCodeIndex = param.indexOf("_"); orgId = param.substring(0, flag); - accountId = param.substring(flag + 1,regionCodeIndex); + accountId = param.substring(flag + 1, regionCodeIndex); regionCode = param.substring(regionCodeIndex + 1); String QUEUE_NAME = "deviceInfo_" + accountId; @@ -78,33 +84,24 @@ try { - //������������������������������������������������ - ConnectionFactory factory = new ConnectionFactory(); - //������MabbitMQ������������ip��������������� - factory.setHost("47.96.15.25"); - factory.setPort(5672); - factory.setUsername("guest"); - factory.setPassword("guest_pass"); String routingKey; - connection = factory.newConnection(); - channel = connection.createChannel(); + this.connection = RabbitMQUtils.getConnection(); + channel = this.connection.createChannel(); //��������������������������������������������������������������������������������������������������������� channel.queueDeclare(QUEUE_NAME, false, false, true, null); for (Device d : deviceList) { routingKey = orgId + "." + d.getMac(); channel.queueBind(QUEUE_NAME, "screens_data", routingKey); } - //��������������������� - QueueingConsumer consumer = new QueueingConsumer(channel); - //������������������ - channel.basicConsume(QUEUE_NAME, true, consumer); - - while (true) { - //nextDelivery������������������������������������������������������������take��������� - QueueingConsumer.Delivery delivery = consumer.nextDelivery(); - String message = new String(delivery.getBody()); - sendMessage(message); - } + channel.basicQos(30);//��������������� + channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + String msg = new String(body, "utf-8"); + sendMessage(msg); + channel.basicAck(envelope.getDeliveryTag(), true); + } + }); } catch (Exception e) { log.error(e.getMessage()); } @@ -118,7 +115,6 @@ /**���������Set��� ������������������������*/ webSocketSet.remove(this); try { - //channel.queueDelete("deviceInfo_" + accountId); connection.close(); } catch (IOException e) { log.error(e.getMessage()); -- Gitblit v1.8.0