package com.moral.webSocketServer; import java.io.IOException; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeoutException; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import com.moral.service.MonitorPointService; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端, * 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端 */ @Slf4j @ServerEndpoint("/screen/webSocket/{param}") @Component public class WebSocketServer { public static MonitorPointService monitorPointService; // 与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; private String orgId; private String regionCode; private Connection connection; private Channel channel; // 存放session的集合,很重要!! private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); /** * 连接建立成功调用的方法 * * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @OnOpen public void onOpen(Session session, @PathParam("param") String param) { this.session = session; //这个一定要写,第一次很容易忽略! webSocketSet.add(this); int flag = param.indexOf("&"); orgId = param.substring(0, flag); regionCode = param.substring(flag + 1); String QUEUE_NAME = "deviceInfo"; try { //打开连接和创建频道,与发送端一样 ConnectionFactory factory = new ConnectionFactory(); //设置MabbitMQ所在主机ip或者主机名 factory.setHost("47.96.15.25"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest_pass"); String routingKey = orgId+".*"; connection = factory.newConnection(); channel = connection.createChannel(); //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 channel.queueDeclare(QUEUE_NAME, false, false, true, null); channel.queueBind(QUEUE_NAME,"screens_data",routingKey); //创建队列消费者 QueueingConsumer consumer = new QueueingConsumer(channel); //指定消费队列 channel.basicConsume(QUEUE_NAME, true, consumer); while (true) { //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); sendMessage(message); } }catch (Exception e){ log.error(e.getMessage()); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { /**从安全Set中 移除当前连接对象*/ webSocketSet.remove(this); try { channel.close(); connection.close(); }catch (IOException | TimeoutException e){ log.error(e.getMessage()); } } @OnMessage public void onMessage(String message) { System.out.println(message); for (WebSocketServer webSocketServer : webSocketSet) { webSocketServer.sendMessage(message); } } /* @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "deviceInfo", durable = "false", autoDelete = "true"), exchange = @Exchange(value = "screens_data", durable = "true", type = "topic"), key = "99.*" )) @RabbitHandler //注解意思:如果有消息过来 需要消费的时候才会调用该方法 public void receiveMessage(@Payload String message, @Headers Map headers, Channel channel) throws IOException { //sendMessage(message.toString()); onMessage(message); *//* Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG); //手动接受并告诉rabbitmq消息已经接受了 deliverTag记录接受消息 false不批量接受 channel.basicAck(deliveryTag, true);*//* }*/ /** * 服务器端推送消息 */ public void sendMessage(String message) { try { if (session.isOpen()) { this.session.getBasicRemote().sendText(message); } } catch (IOException e) { log.error(e.getMessage()); } } /** * 发生错误时调用 * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error(error.getMessage()); } }