From 786577d75f2221e95d5d53e60338cc20cc82df71 Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Wed, 30 Sep 2020 13:50:14 +0800 Subject: [PATCH] 添加新的测试websocket接口 --- src/main/java/com/moral/controller/WebController.java | 1 src/main/java/com/moral/webSocketServer/BSWebsocketServer.java | 1 src/main/java/com/moral/config/WebSocketConfig.java | 5 + src/main/java/com/moral/webSocketServer/BSTestWebsocketServer.java | 194 ++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 200 insertions(+), 1 deletions(-) diff --git a/src/main/java/com/moral/config/WebSocketConfig.java b/src/main/java/com/moral/config/WebSocketConfig.java index 0bfa07a..dd57a2a 100644 --- a/src/main/java/com/moral/config/WebSocketConfig.java +++ b/src/main/java/com/moral/config/WebSocketConfig.java @@ -2,6 +2,7 @@ import com.moral.service.DeviceService; import com.moral.service.SensorService; +import com.moral.webSocketServer.BSTestWebsocketServer; import com.moral.webSocketServer.BSWebsocketServer; import com.moral.webSocketServer.WebSocketServer; @@ -17,7 +18,7 @@ public class WebSocketConfig { //��������������������������������� -/* @Bean + /*@Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }*/ @@ -32,9 +33,11 @@ WebSocketServer.deviceService=deviceService; WebSocketServerNew.deviceService=deviceService; BSWebsocketServer.deviceService=deviceService; + BSTestWebsocketServer.deviceService=deviceService; } @Autowired public void setSensorService( SensorService sensorService){ + BSTestWebsocketServer.sensorService=sensorService; BSWebsocketServer.sensorService=sensorService; } diff --git a/src/main/java/com/moral/controller/WebController.java b/src/main/java/com/moral/controller/WebController.java index ccce14b..43a14aa 100644 --- a/src/main/java/com/moral/controller/WebController.java +++ b/src/main/java/com/moral/controller/WebController.java @@ -49,6 +49,7 @@ @Resource DeviceService deviceService; + @UserLoginToken @GetMapping("test") public String add() { diff --git a/src/main/java/com/moral/webSocketServer/BSTestWebsocketServer.java b/src/main/java/com/moral/webSocketServer/BSTestWebsocketServer.java new file mode 100644 index 0000000..570884f --- /dev/null +++ b/src/main/java/com/moral/webSocketServer/BSTestWebsocketServer.java @@ -0,0 +1,194 @@ +package com.moral.webSocketServer; + +import com.alibaba.fastjson.JSON; +import com.moral.common.util.ParameterUtils; +import com.moral.entity.Device; +import com.moral.entity.Sensor; +import com.moral.service.DeviceService; +import com.moral.service.SensorService; +import com.moral.util.RabbitMQUtils; +import com.rabbitmq.client.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import javax.annotation.PostConstruct; +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@ServerEndpoint("/web/testWebSocket/{param}") +@Component +public class BSTestWebsocketServer { + + public static DeviceService deviceService; + + public static SensorService sensorService; + + /** + * concurrent������������������Set���������������������������������������MyWebSocket��������� + */ + private static ConcurrentHashMap<String, BSTestWebsocketServer> webSocketMap = new ConcurrentHashMap<>(); + /** + * ������������������������������������������������������������������������������ + */ + private Session session; + + private String orgId; + + private String accountId; + + private String regionCode; + + private String mac; + + private final String exchange = "screens_data"; + + private static Map<String, Sensor> sensors; + + + @PostConstruct + public void init() { + sensors = new HashMap<>(); + List<Sensor> allSensors = sensorService.getAllSensors(); + for (Sensor sensor : allSensors) { + sensors.put(sensor.getSensorKey(), sensor); + } + } + + /** + * ��������������������������������� + */ + @OnOpen + public void onOpen(Session session, @PathParam("param") String param) { + this.session = session; + String[] params = param.split("&"); + this.accountId = params[0]; + this.orgId = params[1]; + this.regionCode = params[2]; + + if (webSocketMap.containsKey(accountId)) { + webSocketMap.remove(accountId); + webSocketMap.put(accountId, this); + } else { + webSocketMap.put(accountId, this); + } + + Map<String, Object> paramMap = new HashMap<String, Object>(); + paramMap.put("orgId", orgId); + paramMap.put("regionCode", regionCode); + ParameterUtils.getRegionType4RegionCode(paramMap); + List<Device> deviceList = deviceService.queryDevice(paramMap); + + try { + Connection connection = RabbitMQUtils.getConnection(); + final Channel channel = connection.createChannel(); + //������������������ + String queue = channel.queueDeclare().getQueue(); + + //������������������������routingKey������������ + String routingKey = ""; + for (Device d : deviceList) { + routingKey = orgId + "." + d.getMac(); + channel.queueBind(queue, exchange, routingKey); + } + + //������������,��������������������� + channel.basicQos(1);//��������������������������� + channel.basicConsume(queue, false, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); + sendWindInfo(message); + if(mac!=null&&(!mac.equals(0))) + sendDeviceInfo(message); + channel.basicAck(envelope.getDeliveryTag(), false); + //������socket������������������ + if (!webSocketMap.containsKey(accountId)) { + RabbitMQUtils.closeConnectionChannel(connection, channel); + } + } + }); + } catch (IOException e) { + e.printStackTrace(); + } + + } + + @OnClose + public void onClose() { + if (webSocketMap.containsKey(accountId)) { + webSocketMap.remove(accountId); + } + } + + //������������������������������ + @OnMessage + public void onMessage(String message, Session session) { + if (!ObjectUtils.isEmpty(message)) { + System.out.println(message); + Map<String, Object> map = JSON.parseObject(message); + this.mac = (String) map.get("mac"); + } + } + + @OnError + public void onError(Session session, Throwable error) { + log.error(error.getMessage()); + } + + public void sendMessage(String message) throws IOException { + try { + if (session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + + //log.error(e.getMessage()); + } + } + + //��������������������� + private void sendWindInfo(Map<String, Object> param) { + try { + Map<String, Object> map = new HashMap<>(); + if (param.get("e23") != null && param.get("mac") != null) { + map.put("������", param.get("e23")); + map.put("mac", param.get("mac")); + sendMessage(JSON.toJSONString(map)); + } + } catch (IOException e) { + log.error("���������������������������"); + } + } + + //������mac������device������ + private void sendDeviceInfo(Map<String, Object> param) { + String deviceMac = (String) param.get("mac"); + if (mac.equals(deviceMac)) { + try { + Map<String, Object> map = new HashMap<>(); + param.forEach((key, value) -> { + Sensor sensor = sensors.get(key); + if (!ObjectUtils.isEmpty(sensor)) { + String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit(); + map.put(sensor.getName(), value + unit); + } + }); + String mac = (String) param.get("mac"); + map.put("mac", mac); + sendMessage(JSON.toJSONString(map)); + } catch (IOException e) { + log.error("������mac������������������������"); + } + } + } + + +} diff --git a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java index 7329c34..f4901f2 100644 --- a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java +++ b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java @@ -142,6 +142,7 @@ this.session.getBasicRemote().sendText(message); } } catch (IOException e) { + //log.error(e.getMessage()); } } -- Gitblit v1.8.0