| | |
| | | package com.moral.webSocketServer; |
| | | |
| | | import java.io.IOException; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.CopyOnWriteArraySet; |
| | | |
| | | import javax.websocket.OnClose; |
| | |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | |
| | | import com.moral.service.MonitorPointService; |
| | | import com.moral.common.util.ParameterUtils; |
| | | import com.moral.entity.Device; |
| | | import com.moral.service.DeviceService; |
| | | import com.moral.util.RabbitMQUtils; |
| | | import com.rabbitmq.client.AMQP; |
| | | import com.rabbitmq.client.Channel; |
| | | import com.rabbitmq.client.Connection; |
| | | import com.rabbitmq.client.ConnectionFactory; |
| | | import com.rabbitmq.client.Consumer; |
| | | import com.rabbitmq.client.DefaultConsumer; |
| | | import com.rabbitmq.client.Envelope; |
| | | import com.rabbitmq.client.QueueingConsumer; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | |
| | | @Component |
| | | public class WebSocketServer { |
| | | |
| | | public static MonitorPointService monitorPointService; |
| | | public static DeviceService deviceService; |
| | | |
| | | // 与某个客户端的连接会话,需要通过它来给客户端发送数据 |
| | | private Session session; |
| | | |
| | | private String orgId; |
| | | |
| | | private String accountId; |
| | | |
| | | private String regionCode; |
| | | |
| | | private Connection connection; |
| | | |
| | | private Channel channel; |
| | | |
| | | // 存放session的集合,很重要!! |
| | | private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>(); |
| | |
| | | this.session = session; |
| | | //这个一定要写,第一次很容易忽略! |
| | | webSocketSet.add(this); |
| | | |
| | | int flag = param.indexOf("&"); |
| | | int regionCodeIndex = param.indexOf("_"); |
| | | orgId = param.substring(0, flag); |
| | | regionCode = param.substring(flag + 1); |
| | | String QUEUE_NAME = "deviceInfo"; |
| | | accountId = param.substring(flag + 1, regionCodeIndex); |
| | | regionCode = param.substring(regionCodeIndex + 1); |
| | | String QUEUE_NAME = "deviceInfo_" + accountId; |
| | | |
| | | Map<String, Object> paramMap = new HashMap<String, Object>(); |
| | | paramMap.put("orgId", orgId); |
| | | paramMap.put("regionCode", regionCode); |
| | | ParameterUtils.getRegionType4RegionCode(paramMap); |
| | | List<Device> deviceList = deviceService.queryDevice(paramMap); |
| | | |
| | | |
| | | try { |
| | | //打开连接和创建频道,与发送端一样 |
| | | ConnectionFactory factory = new ConnectionFactory(); |
| | | //设置MabbitMQ所在主机ip或者主机名 |
| | | factory.setHost("47.96.15.25"); |
| | | factory.setPort(5672); |
| | | factory.setUsername("guest"); |
| | | factory.setPassword("guest_pass"); |
| | | String routingKey = orgId+".*"; |
| | | Connection connection = factory.newConnection(); |
| | | Channel channel = connection.createChannel(); |
| | | String routingKey; |
| | | this.connection = RabbitMQUtils.getConnection(); |
| | | channel = this.connection.createChannel(); |
| | | //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。 |
| | | channel.queueDeclare(QUEUE_NAME, false, false, true, null); |
| | | channel.queueBind(QUEUE_NAME,"screens_data",routingKey); |
| | | |
| | | //创建队列消费者 |
| | | QueueingConsumer consumer = new QueueingConsumer(channel); |
| | | //指定消费队列 |
| | | channel.basicConsume(QUEUE_NAME, true, consumer); |
| | | |
| | | while (true) { |
| | | //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法) |
| | | QueueingConsumer.Delivery delivery = consumer.nextDelivery(); |
| | | String message = new String(delivery.getBody()); |
| | | sendMessage(message); |
| | | for (Device d : deviceList) { |
| | | routingKey = orgId + "." + d.getMac(); |
| | | channel.queueBind(QUEUE_NAME, "screens_data", routingKey); |
| | | } |
| | | }catch (Exception e){ |
| | | channel.basicQos(30);//预先读取数 |
| | | channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel){ |
| | | @Override |
| | | public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| | | String msg = new String(body, "utf-8"); |
| | | sendMessage(msg); |
| | | channel.basicAck(envelope.getDeliveryTag(), true); |
| | | } |
| | | }); |
| | | } catch (Exception e) { |
| | | log.error(e.getMessage()); |
| | | } |
| | | } |
| | |
| | | public void onClose() { |
| | | /**从安全Set中 移除当前连接对象*/ |
| | | webSocketSet.remove(this); |
| | | try { |
| | | connection.close(); |
| | | } catch (IOException e) { |
| | | log.error(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | @OnMessage |
| | | public void onMessage(String message) { |
| | | System.out.println(message); |
| | | for (WebSocketServer webSocketServer : webSocketSet) { |
| | | webSocketServer.sendMessage(message); |
| | | } |