package com.moral.webSocketServer; import com.alibaba.fastjson.JSON; import com.moral.common.util.ParameterUtils; import com.moral.entity.Device; import com.moral.entity.Sensor; import com.moral.service.DeviceService; import com.moral.service.SensorService; import com.moral.util.RabbitMQUtils; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j @ServerEndpoint("/web/WebSocket/{param}") @Component public class BSWebsocketServer { public static DeviceService deviceService; public static SensorService sensorService; /** * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。 */ private static ConcurrentHashMap webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String orgId; private String accountId; private String regionCode; private String mac; private final String exchange = "screens_data"; private static Map sensors; @PostConstruct public void init() { sensors = new HashMap<>(); List allSensors = sensorService.getAllSensors(); for (Sensor sensor : allSensors) { sensors.put(sensor.getSensorKey(), sensor); } } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("param") String param) { this.session = session; String[] params = param.split("&"); this.accountId = params[0]; this.orgId = params[1]; this.regionCode = params[2]; if (webSocketMap.containsKey(accountId)) { webSocketMap.remove(accountId); webSocketMap.put(accountId, this); } else { webSocketMap.put(accountId, this); } Map paramMap = new HashMap(); paramMap.put("orgId", orgId); paramMap.put("regionCode", regionCode); ParameterUtils.getRegionType4RegionCode(paramMap); List deviceList = deviceService.queryDevice(paramMap); Map deviceMap = new HashMap<>(); try { Connection connection = RabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); //生成临时队列 String queue = channel.queueDeclare().getQueue(); //交换机与队列通过routingKey进行绑定 String routingKey = ""; for (Device d : deviceList) { deviceMap.put(d.getMac(),d); routingKey = orgId + "." + d.getMac(); channel.queueBind(queue, exchange, routingKey); } //消费消息,手动确认模式。 channel.basicQos(30);//预先读取数 channel.basicConsume(queue, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try{ //对从MQ中取出的数据做转换,并且发送风速到客户端 Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); sendWindInfo(message); //判断是否接收到客户端发送的mac,如果接收到则返回指定mac设备信息 if(mac!=null&&(!mac.equals(0))) sendDeviceInfo(message,deviceMap); //手动确认 channel.basicAck(envelope.getDeliveryTag(), true); //判断socket是否已经断开 if (!webSocketMap.containsKey(accountId)) { RabbitMQUtils.closeConnectionChannel(connection, channel); } }catch (Exception e){ log.error(e.getMessage()); RabbitMQUtils.closeConnectionChannel(connection, channel); } } }); } catch (IOException e) { e.printStackTrace(); } } @OnClose public void onClose() { if (webSocketMap.containsKey(accountId)) { webSocketMap.remove(accountId); } } //接收到客户端消息操作 @OnMessage public void onMessage(String message, Session session) { if (!ObjectUtils.isEmpty(message)) { Map map = JSON.parseObject(message); this.mac = (String) map.get("mac"); } } @OnError public void onError(Session session, Throwable error) { log.error(error.getMessage()); } public void sendMessage(String message) throws IOException { try { if (session.isOpen()) { this.session.getBasicRemote().sendText(message); } } catch (IOException e) { //log.error(e.getMessage()); } } /** * @Description: 从客户端连接到socket就一直开始发送风向数据 * @Param: [param] * @return: void * @Author: 陈凯裕 * @Date: 2020/9/30 */ private void sendWindInfo(Map param) { try { Map map = new HashMap<>(); if (param.get("e23") != null && param.get("mac") != null) { map.put("风向", param.get("e23")); map.put("mac", param.get("mac")); sendMessage(JSON.toJSONString(map)); } } catch (IOException e) { log.error("发送风向标数据异常"); } } /** * @Description: 接收前端mac以及accountid,传送前端指定mac号的设备信息 * @Param: [param] * @return: void * @Author: 陈凯裕 * @Date: 2020/9/30 */ private void sendDeviceInfo(Map param,Map deviceMap) { String deviceMac = (String) param.get("mac"); if (mac.equals(deviceMac)) { try { Map sortMap = new LinkedHashMap<>(); //先放名称地址,用于排序 Device device =deviceMap.get(mac); sortMap.put("名称",device.getName()); sortMap.put("地址",device.getAddress()); //将传感器代号转化为汉字 param.forEach((key, value) -> { Sensor sensor = sensors.get(key); if (!ObjectUtils.isEmpty(sensor)) { String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit(); sortMap.put(sensor.getName(), value + unit); } }); sendMessage(JSON.toJSONString(sortMap)); } catch (IOException e) { log.error("根据mac发送设备数据异常"); } } } }