| New file | 
|  |  |  | 
|---|
|  |  |  | package com.moral.webSocketServer; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson.JSON; | 
|---|
|  |  |  | import com.moral.entity.Device; | 
|---|
|  |  |  | import com.moral.entity.Sensor; | 
|---|
|  |  |  | import com.moral.service.DeviceService; | 
|---|
|  |  |  | import com.moral.service.SensorService; | 
|---|
|  |  |  | import com.moral.util.RabbitMQUtils; | 
|---|
|  |  |  | import com.rabbitmq.client.*; | 
|---|
|  |  |  | import lombok.extern.slf4j.Slf4j; | 
|---|
|  |  |  | import org.springframework.stereotype.Component; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import javax.annotation.PostConstruct; | 
|---|
|  |  |  | import javax.websocket.*; | 
|---|
|  |  |  | import javax.websocket.server.PathParam; | 
|---|
|  |  |  | import javax.websocket.server.ServerEndpoint; | 
|---|
|  |  |  | import java.io.IOException; | 
|---|
|  |  |  | import java.util.HashMap; | 
|---|
|  |  |  | import java.util.LinkedHashMap; | 
|---|
|  |  |  | import java.util.List; | 
|---|
|  |  |  | import java.util.Map; | 
|---|
|  |  |  | import java.util.concurrent.ConcurrentHashMap; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Slf4j | 
|---|
|  |  |  | @ServerEndpoint("/web/AQIWebSocketTest/{param}") | 
|---|
|  |  |  | @Component | 
|---|
|  |  |  | public class BSAQIWebSocketServerTest { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | public static SensorService sensorService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | public static DeviceService deviceService; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | private static ConcurrentHashMap<String, BSAQIWebSocketServerTest> webSocketMap = new ConcurrentHashMap<>(); | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * 与某个客户端的连接会话,需要通过它来给客户端发送数据 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | private Session session; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private String orgId; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private String accountId; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private String mac; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private final String exchange = "screens_data"; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private static Map<String, Sensor> sensors; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @PostConstruct | 
|---|
|  |  |  | public void init() { | 
|---|
|  |  |  | sensors = new HashMap<>(); | 
|---|
|  |  |  | List<Sensor> allSensors = sensorService.getAllSensors(); | 
|---|
|  |  |  | for (Sensor sensor : allSensors) { | 
|---|
|  |  |  | sensors.put(sensor.getSensorKey(), sensor); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @OnOpen | 
|---|
|  |  |  | public void onOpen(Session session, @PathParam("param") String param) { | 
|---|
|  |  |  | this.session = session; | 
|---|
|  |  |  | String[] params = param.split("&"); | 
|---|
|  |  |  | this.accountId = params[0]; | 
|---|
|  |  |  | this.orgId = params[1]; | 
|---|
|  |  |  | this.mac = params[2]; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if (webSocketMap.containsKey(accountId)) { | 
|---|
|  |  |  | webSocketMap.remove(accountId); | 
|---|
|  |  |  | webSocketMap.put(accountId, this); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | webSocketMap.put(accountId, this); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Connection connection = RabbitMQUtils.getConnection(); | 
|---|
|  |  |  | final Channel channel = connection.createChannel(); | 
|---|
|  |  |  | //生成临时队列 | 
|---|
|  |  |  | String queue = channel.queueDeclare().getQueue(); | 
|---|
|  |  |  | //交换机与队列通过routingKey进行绑定 | 
|---|
|  |  |  | String routingKey = ""; | 
|---|
|  |  |  | routingKey = this.orgId + "." + this.mac; | 
|---|
|  |  |  | channel.queueBind(queue, exchange, routingKey); | 
|---|
|  |  |  |  | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //消费消息,手动确认模式。 | 
|---|
|  |  |  | channel.basicQos(30);//预先读取数 | 
|---|
|  |  |  | channel.basicConsume(queue, false, new DefaultConsumer(channel) { | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | 
|---|
|  |  |  | try{ | 
|---|
|  |  |  | //对从MQ中取出的数据做转换,并且发送风速到客户端 | 
|---|
|  |  |  | Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); | 
|---|
|  |  |  | Device device = deviceService.getDeviceByMac(mac,false); | 
|---|
|  |  |  | sendDeviceInfo(message, device); | 
|---|
|  |  |  | //手动确认 | 
|---|
|  |  |  | channel.basicAck(envelope.getDeliveryTag(), true); | 
|---|
|  |  |  | //判断socket是否已经断开 | 
|---|
|  |  |  | if (!webSocketMap.containsKey(accountId)) { | 
|---|
|  |  |  | RabbitMQUtils.closeConnectionChannel(connection, channel); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }catch (Exception e){ | 
|---|
|  |  |  | log.error(e.getMessage()); | 
|---|
|  |  |  | RabbitMQUtils.closeConnectionChannel(connection, channel); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | }); | 
|---|
|  |  |  | } catch (IOException e) { | 
|---|
|  |  |  | e.printStackTrace(); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @OnClose | 
|---|
|  |  |  | public void onClose() { | 
|---|
|  |  |  | if (webSocketMap.containsKey(accountId)) { | 
|---|
|  |  |  | webSocketMap.remove(accountId); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //接收到客户端消息操作 | 
|---|
|  |  |  | @OnMessage | 
|---|
|  |  |  | public void onMessage(String message, Session session) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @OnError | 
|---|
|  |  |  | public void onError(Session session, Throwable error) { | 
|---|
|  |  |  | log.error(error.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | public void sendMessage(String message) throws IOException { | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | if (session.isOpen()) { | 
|---|
|  |  |  | this.session.getBasicRemote().sendText(message); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } catch (IOException e) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //log.error(e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | /** | 
|---|
|  |  |  | * @Description: 接收前端mac以及accountid,传送前端指定mac号的设备信息 | 
|---|
|  |  |  | * @Param: [param] | 
|---|
|  |  |  | * @return: void | 
|---|
|  |  |  | * @Author: 陈凯裕 | 
|---|
|  |  |  | * @Date: 2020/9/30 | 
|---|
|  |  |  | */ | 
|---|
|  |  |  | private void sendDeviceInfo(Map<String, Object> param, Device device) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | Map<String, Object> sortMap = new LinkedHashMap<>(); | 
|---|
|  |  |  | //先放名称地址,用于排序 | 
|---|
|  |  |  | sortMap.put("name", device.getName()); | 
|---|
|  |  |  | sortMap.put("address", device.getAddress()); | 
|---|
|  |  |  | //将传感器代号转化为汉字 | 
|---|
|  |  |  | sortMap.putAll(param); | 
|---|
|  |  |  | sendMessage(JSON.toJSONString(sortMap)); | 
|---|
|  |  |  | } catch (IOException e) { | 
|---|
|  |  |  | log.error("根据mac发送设备数据异常"); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | } | 
|---|