kaiyu
2020-11-30 8bb0e02e8fd166f35782870983fd2140142df409
src/main/java/com/moral/webSocketServer/WebSocketServer.java
@@ -17,12 +17,18 @@
import com.moral.common.util.ParameterUtils;
import com.moral.entity.Device;
import com.moral.service.DeviceService;
import com.moral.util.RabbitMQUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.QueueingConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
@@ -62,45 +68,40 @@
        this.session = session;
        //这个一定要写,第一次很容易忽略!
        webSocketSet.add(this);
        int flag = param.indexOf("&");
        int regionCodeIndex = param.indexOf("_");
        orgId = param.substring(0, flag);
        accountId = param.substring(flag + 1,regionCodeIndex);
        accountId = param.substring(flag + 1, regionCodeIndex);
        regionCode = param.substring(regionCodeIndex + 1);
        String QUEUE_NAME = "deviceInfo_" + accountId;
        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("orgId", orgId);
        paramMap.put("regionCode", regionCode);
        ParameterUtils.getRegionType4RegionCode(paramMap);
        List<Device> deviceList = deviceService.queryDevice(paramMap);
        try {
            //打开连接和创建频道,与发送端一样
            ConnectionFactory factory = new ConnectionFactory();
            //设置MabbitMQ所在主机ip或者主机名
            factory.setHost("47.96.15.25");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest_pass");
            String routingKey;
            connection = factory.newConnection();
            channel = connection.createChannel();
            this.connection = RabbitMQUtils.getConnection();
            channel = this.connection.createChannel();
            //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            for (Device d : deviceList) {
                routingKey = orgId + "." + d.getMac();
                channel.queueBind(QUEUE_NAME, "screens_data", routingKey);
            }
            //创建队列消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定消费队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
            while (true) {
                //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String message = new String(delivery.getBody());
                sendMessage(message);
            }
            channel.basicQos(30);//预先读取数
            channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String msg = new String(body, "utf-8");
                    sendMessage(msg);
                    channel.basicAck(envelope.getDeliveryTag(), true);
                }
            });
        } catch (Exception e) {
            log.error(e.getMessage());
        }
@@ -122,7 +123,6 @@
    @OnMessage
    public void onMessage(String message) {
        System.out.println(message);
        for (WebSocketServer webSocketServer : webSocketSet) {
            webSocketServer.sendMessage(message);
        }