From bafbd7130529e2a2c1e4cb6461eb5649c53df027 Mon Sep 17 00:00:00 2001
From: kaiyu <404897439@qq.com>
Date: Tue, 24 Nov 2020 15:58:25 +0800
Subject: [PATCH] 添加单设备AQI参数websocket

---
 src/main/java/com/moral/webSocketServer/BSAQIWebSocketServer.java |  171 ++++++++++++++++++++++++++++++++++++++++++
 src/main/java/com/moral/webSocketServer/BSWebsocketServer.java    |   10 --
 src/main/java/com/moral/config/WebSocketConfig.java               |    4 +
 3 files changed, 177 insertions(+), 8 deletions(-)

diff --git a/src/main/java/com/moral/config/WebSocketConfig.java b/src/main/java/com/moral/config/WebSocketConfig.java
index 5f40f0d..df8b756 100644
--- a/src/main/java/com/moral/config/WebSocketConfig.java
+++ b/src/main/java/com/moral/config/WebSocketConfig.java
@@ -2,6 +2,7 @@
 
 import com.moral.service.DeviceService;
 import com.moral.service.SensorService;
+import com.moral.webSocketServer.BSAQIWebSocketServer;
 import com.moral.webSocketServer.BSWebsocketServer;
 import com.moral.webSocketServer.WebSocketServer;
 
@@ -11,6 +12,7 @@
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.web.socket.server.standard.ServerEndpointExporter;
 
 @Configuration
 public class WebSocketConfig {
@@ -31,10 +33,12 @@
         WebSocketServer.deviceService=deviceService;
         WebSocketServerNew.deviceService=deviceService;
         BSWebsocketServer.deviceService=deviceService;
+        BSAQIWebSocketServer.deviceService=deviceService;
     }
     @Autowired
     public void setSensorService( SensorService sensorService){
         BSWebsocketServer.sensorService=sensorService;
+        BSAQIWebSocketServer.sensorService=sensorService;
     }
 
 }
diff --git a/src/main/java/com/moral/webSocketServer/BSAQIWebSocketServer.java b/src/main/java/com/moral/webSocketServer/BSAQIWebSocketServer.java
new file mode 100644
index 0000000..3ae2aa1
--- /dev/null
+++ b/src/main/java/com/moral/webSocketServer/BSAQIWebSocketServer.java
@@ -0,0 +1,171 @@
+package com.moral.webSocketServer;
+
+import com.alibaba.fastjson.JSON;
+import com.moral.entity.Device;
+import com.moral.entity.Sensor;
+import com.moral.service.DeviceService;
+import com.moral.service.SensorService;
+import com.moral.util.RabbitMQUtils;
+import com.rabbitmq.client.*;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import javax.annotation.PostConstruct;
+import javax.websocket.*;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+@Slf4j
+@ServerEndpoint("/web/AQIWebSocket/{param}")
+@Component
+
+public class BSAQIWebSocketServer {
+
+    public static SensorService sensorService;
+
+    public static DeviceService deviceService;
+
+    /**
+     * concurrent������������������Set���������������������������������������WebSocket���������
+     */
+    private static ConcurrentHashMap<String, BSAQIWebSocketServer> webSocketMap = new ConcurrentHashMap<>();
+    /**
+     * ������������������������������������������������������������������������������
+     */
+    private Session session;
+
+    private String orgId;
+
+    private String accountId;
+
+    private String mac;
+
+    private final String exchange = "screens_data";
+
+    private static Map<String, Sensor> sensors;
+
+    @PostConstruct
+    public void init() {
+        sensors = new HashMap<>();
+        List<Sensor> allSensors = sensorService.getAllSensors();
+        for (Sensor sensor : allSensors) {
+            sensors.put(sensor.getSensorKey(), sensor);
+        }
+    }
+
+    @OnOpen
+    public void onOpen(Session session, @PathParam("param") String param) {
+        this.session = session;
+        String[] params = param.split("&");
+        this.accountId = params[0];
+        this.orgId = params[1];
+        this.mac = params[2];
+
+        if (webSocketMap.containsKey(accountId)) {
+            webSocketMap.remove(accountId);
+            webSocketMap.put(accountId, this);
+        } else {
+            webSocketMap.put(accountId, this);
+        }
+
+
+        try {
+            Connection connection = RabbitMQUtils.getConnection();
+            final Channel channel = connection.createChannel();
+            //������������������
+            String queue = channel.queueDeclare().getQueue();
+            //������������������������routingKey������������
+            String routingKey = "";
+            routingKey = this.orgId + "." + this.mac;
+            channel.queueBind(queue, exchange, routingKey);
+
+
+            //������������,���������������������
+            channel.basicQos(30);//���������������
+            channel.basicConsume(queue, false, new DefaultConsumer(channel) {
+                @Override
+                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                    //������MQ������������������������������������������������������������
+                    Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
+                    Device device = deviceService.getDeviceByMac(mac,false);
+                    sendDeviceInfo(message, device);
+                    //������������
+                    channel.basicAck(envelope.getDeliveryTag(), true);
+                    //������socket������������������
+                    if (!webSocketMap.containsKey(accountId)) {
+                        RabbitMQUtils.closeConnectionChannel(connection, channel);
+                    }
+                }
+            });
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @OnClose
+    public void onClose() {
+        if (webSocketMap.containsKey(accountId)) {
+            webSocketMap.remove(accountId);
+        }
+    }
+
+    //������������������������������
+    @OnMessage
+    public void onMessage(String message, Session session) {
+
+    }
+
+    @OnError
+    public void onError(Session session, Throwable error) {
+        log.error(error.getMessage());
+    }
+
+    public void sendMessage(String message) throws IOException {
+        try {
+            if (session.isOpen()) {
+                this.session.getBasicRemote().sendText(message);
+            }
+        } catch (IOException e) {
+
+            //log.error(e.getMessage());
+        }
+    }
+
+    /**
+     * @Description: ������������mac������accountid���������������������mac������������������
+     * @Param: [param]
+     * @return: void
+     * @Author: ���������
+     * @Date: 2020/9/30
+     */
+    private void sendDeviceInfo(Map<String, Object> param, Device device) {
+
+        try {
+            Map<String, Object> sortMap = new LinkedHashMap<>();
+            //���������������������������������
+            sortMap.put("������", device.getName());
+            sortMap.put("������", device.getAddress());
+            //���������������������������������
+            param.forEach((key, value) -> {
+                Sensor sensor = sensors.get(key);
+                if (!ObjectUtils.isEmpty(sensor)) {
+                    String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit();
+                    sortMap.put(sensor.getName(), value + unit);
+                }
+            });
+
+            sendMessage(JSON.toJSONString(sortMap));
+        } catch (IOException e) {
+            log.error("������mac������������������������");
+        }
+    }
+
+
+}
diff --git a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
index a46be92..5e135c9 100644
--- a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
+++ b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
@@ -26,13 +26,7 @@
 @Slf4j
 @ServerEndpoint("/web/WebSocket/{param}")
 @Component
-/**
-* @Description: ���������������websocket
-        * @Param:
-        * @return:
-        * @Author: ������������
-        * @Date: 2020/9/30
-        */
+
 public class BSWebsocketServer {
 
     public static DeviceService deviceService;
@@ -40,7 +34,7 @@
     public static SensorService sensorService;
 
     /**
-     * concurrent������������������Set���������������������������������������MyWebSocket���������
+     * concurrent������������������Set���������������������������������������WebSocket���������
      */
     private static ConcurrentHashMap<String, BSWebsocketServer> webSocketMap = new ConcurrentHashMap<>();
     /**

--
Gitblit v1.8.0