kaiyu
2020-11-30 8bb0e02e8fd166f35782870983fd2140142df409
src/main/java/com/moral/webSocketServer/WebSocketServer.java
@@ -1,6 +1,9 @@
package com.moral.webSocketServer;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
@@ -11,13 +14,21 @@
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.moral.service.MonitorPointService;
import com.moral.common.util.ParameterUtils;
import com.moral.entity.Device;
import com.moral.service.DeviceService;
import com.moral.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
@@ -29,14 +40,20 @@
@Component
public class WebSocketServer {
    public static MonitorPointService monitorPointService;
    public static DeviceService deviceService;
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
    private String orgId;
    private String accountId;
    private String regionCode;
    private Connection connection;
    private Channel channel;
    // 存放session的集合,很重要!!
    private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
@@ -51,37 +68,41 @@
        this.session = session;
        //这个一定要写,第一次很容易忽略!
        webSocketSet.add(this);
        int flag = param.indexOf("&");
        int regionCodeIndex = param.indexOf("_");
        orgId = param.substring(0, flag);
        regionCode = param.substring(flag + 1);
        String QUEUE_NAME = "deviceInfo";
        accountId = param.substring(flag + 1, regionCodeIndex);
        regionCode = param.substring(regionCodeIndex + 1);
        String QUEUE_NAME = "deviceInfo_" + accountId;
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("orgId", orgId);
        paramMap.put("regionCode", regionCode);
        ParameterUtils.getRegionType4RegionCode(paramMap);
        List<Device> deviceList = deviceService.queryDevice(paramMap);
        try {
            //打开连接和创建频道,与发送端一样
            ConnectionFactory factory = new ConnectionFactory();
            //设置MabbitMQ所在主机ip或者主机名
            factory.setHost("172.16.206.8");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest_pass");
            String routingKey = orgId+".*";
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            String routingKey;
            this.connection = RabbitMQUtils.getConnection();
            channel = this.connection.createChannel();
            //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            channel.queueBind(QUEUE_NAME,"screens_data",routingKey);
            //创建队列消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定消费队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                sendMessage(message);
            for (Device d : deviceList) {
                routingKey = orgId + "." + d.getMac();
                channel.queueBind(QUEUE_NAME, "screens_data", routingKey);
            }
        }catch (Exception e){
            channel.basicQos(30);//预先读取数
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    sendMessage(msg);
                    channel.basicAck(envelope.getDeliveryTag(), true);
                }
            });
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
@@ -93,11 +114,15 @@
    public void onClose() {
        /**从安全Set中 移除当前连接对象*/
        webSocketSet.remove(this);
        try {
            connection.close();
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }
    @OnMessage
    public void onMessage(String message) {
        System.out.println(message);
        for (WebSocketServer webSocketServer : webSocketSet) {
            webSocketServer.sendMessage(message);
        }