kaiyu
2020-10-26 80ed4c1a5a0b8d5a87cf0e5a0885e9267c3d28a3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.moral.webSocketServer;
 
import com.moral.common.util.ParameterUtils;
import com.moral.entity.Device;
import com.moral.service.DeviceService;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
 
/**
 * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
 * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
 */
@Slf4j
@ServerEndpoint("/screen/webSocketNew/{param}")
@Component
public class WebSocketServerNew {
    public static DeviceService deviceService;
 
    // 与某个客户端的连接会话,需要通过它来给客户端发送数据
    private Session session;
 
    private Connection connection;
 
    private Channel channel;
 
    // 存放session的集合,很重要!!
    private static CopyOnWriteArraySet<WebSocketServerNew> webSocketSet = new CopyOnWriteArraySet<WebSocketServerNew>();
 
    /**
     * 连接建立成功调用的方法
     *
     * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("param") String param) {
        this.session = session;
        //这个一定要写,第一次很容易忽略!
        webSocketSet.add(this);
        //取参
        String[] flag=param.split("&");
        String orgId = flag[0];
        String accountId = flag[1];
        String monitPointId = flag[2];
        String p=flag[3];
 
        String QUEUE_NAME = "deviceData_" + accountId;
        Map<String, Object> paramMap = new HashMap<String, Object>();
        List<Device> deviceList = deviceService.getDeviceById2(Integer.parseInt(monitPointId));
        try {
            //打开连接和创建频道,与发送端一样
            ConnectionFactory factory = new ConnectionFactory();
            //设置MabbitMQ所在主机ip或者主机名
            factory.setHost("47.96.15.25");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest_pass");
            String routingKey;
            connection = factory.newConnection();
            channel = connection.createChannel();
            //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            for (Device d : deviceList) {
                routingKey = orgId + "." + d.getMac();
                channel.queueBind(QUEUE_NAME, "screens_data", routingKey);
            }
            //创建队列消费者
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //指定消费队列
            channel.basicConsume(QUEUE_NAME, true, consumer);
            int i=Integer.parseInt(p);
            while (true) {
                if (i<=12){
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    sendMessage(message);
                    i++;
                }else {
                    //Thread.sleep(1* 1000);
                    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                    String message = new String(delivery.getBody());
                    sendMessage(message);
                    i++;
                }
                //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
 
            }
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
 
 
    /**
     * 连接关闭调用的方法
     */
 
    @OnClose
    public void onClose() {
//**从安全Set中 移除当前连接对象*//**//**//**//**//**//**//**//*
        webSocketSet.remove(this);
        try {
            connection.close();
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }
 
 
    @OnMessage
    public void onMessage(String message) {
        System.out.println(message);
        for (WebSocketServerNew webSocketServer : webSocketSet) {
            webSocketServer.sendMessage(message);
        }
    }
 
   /* @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "deviceInfo", durable = "false", autoDelete = "true"),
            exchange = @Exchange(value = "screens_data", durable = "true", type = "topic"),
            key = "99.*"
    ))
    @RabbitHandler //注解意思:如果有消息过来 需要消费的时候才会调用该方法
    public void receiveMessage(@Payload String message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
        //sendMessage(message.toString());
        onMessage(message);
       *//* Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手动接受并告诉rabbitmq消息已经接受了  deliverTag记录接受消息 false不批量接受
        channel.basicAck(deliveryTag, true);*//*
    }*/
 
    /**
     * 服务器端推送消息
     */
    public void sendMessage(String message) {
        try {
            if (session.isOpen()) {
                this.session.getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            log.error(e.getMessage());
        }
    }
 
    /**
     * 发生错误时调用
     *
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error(error.getMessage());
    }
 
}