From a66d53c3cbfb0024804045f4d795be06089d4f9d Mon Sep 17 00:00:00 2001
From: lizijie <lzjiiie@163.com>
Date: Thu, 03 Dec 2020 17:54:15 +0800
Subject: [PATCH] 五分钟表相关代码,平均风向、平局风速工具类

---
 src/main/java/com/moral/webSocketServer/BSWebsocketServer.java |  105 ++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 85 insertions(+), 20 deletions(-)

diff --git a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
index acd222b..5e135c9 100644
--- a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
+++ b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
@@ -6,8 +6,8 @@
 import com.moral.entity.Sensor;
 import com.moral.service.DeviceService;
 import com.moral.service.SensorService;
-import com.rabbitmq.client.*;
 import com.moral.util.RabbitMQUtils;
+import com.rabbitmq.client.*;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 import org.springframework.util.ObjectUtils;
@@ -18,13 +18,15 @@
 import javax.websocket.server.ServerEndpoint;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 @Slf4j
-@ServerEndpoint("/web/webSocket/{param}")
+@ServerEndpoint("/web/WebSocket/{param}")
 @Component
+
 public class BSWebsocketServer {
 
     public static DeviceService deviceService;
@@ -32,7 +34,7 @@
     public static SensorService sensorService;
 
     /**
-     * concurrent������������������Set���������������������������������������MyWebSocket���������
+     * concurrent������������������Set���������������������������������������WebSocket���������
      */
     private static ConcurrentHashMap<String, BSWebsocketServer> webSocketMap = new ConcurrentHashMap<>();
     /**
@@ -46,10 +48,12 @@
 
     private String regionCode;
 
+    private String mac;
 
     private final String exchange = "screens_data";
 
     private static Map<String, Sensor> sensors;
+
 
 
     @PostConstruct
@@ -68,8 +72,8 @@
     public void onOpen(Session session, @PathParam("param") String param) {
         this.session = session;
         String[] params = param.split("&");
-        this.orgId = params[1];
         this.accountId = params[0];
+        this.orgId = params[1];
         this.regionCode = params[2];
 
         if (webSocketMap.containsKey(accountId)) {
@@ -84,28 +88,43 @@
         paramMap.put("regionCode", regionCode);
         ParameterUtils.getRegionType4RegionCode(paramMap);
         List<Device> deviceList = deviceService.queryDevice(paramMap);
+        Map<String,Device> deviceMap = new HashMap<>();
 
         try {
             Connection connection = RabbitMQUtils.getConnection();
             final Channel channel = connection.createChannel();
             //������������������
             String queue = channel.queueDeclare().getQueue();
-
             //������������������������routingKey������������
             String routingKey = "";
             for (Device d : deviceList) {
+                deviceMap.put(d.getMac(),d);
                 routingKey = orgId + "." + d.getMac();
                 channel.queueBind(queue, exchange, routingKey);
             }
 
+
+
             //������������,���������������������
+            channel.basicQos(30);//���������������
             channel.basicConsume(queue, false, new DefaultConsumer(channel) {
                 @Override
                 public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                    //������MQ������������������������������������������������������������
                     Map message = (Map) JSON.parse((String) JSON.parse(new String(body)));
-                    message = messageFormat(message);
-                    sendMessage(JSON.toJSONString(message));
-                    channel.basicAck(envelope.getDeliveryTag(), false);
+                    sendWindInfo(message);
+
+                    //���������������������������������������mac���������������������������������mac������������
+                    if(mac!=null&&(!mac.equals(0)))
+                        sendDeviceInfo(message,deviceMap);
+
+                    //������������
+                    channel.basicAck(envelope.getDeliveryTag(), true);
+
+                    //������socket������������������
+                    if (!webSocketMap.containsKey(accountId)) {
+                        RabbitMQUtils.closeConnectionChannel(connection, channel);
+                    }
                 }
             });
         } catch (IOException e) {
@@ -124,7 +143,10 @@
     //������������������������������
     @OnMessage
     public void onMessage(String message, Session session) {
-
+        if (!ObjectUtils.isEmpty(message)) {
+            Map<String, Object> map = JSON.parseObject(message);
+            this.mac = (String) map.get("mac");
+        }
     }
 
     @OnError
@@ -138,22 +160,65 @@
                 this.session.getBasicRemote().sendText(message);
             }
         } catch (IOException e) {
+
             //log.error(e.getMessage());
         }
     }
 
-    //���MQ������������������������������������
-    private Map<String,Object> messageFormat( Map<String,Object> param) {
-        Map<String, Object> map = new HashMap<>();
-        param.forEach((key, value) -> { ;
-            Sensor sensor = sensors.get(key);
-            if (!ObjectUtils.isEmpty(sensor)) {
-                String unit = ObjectUtils.isEmpty(sensor.getUnit())?"":(String)sensor.getUnit();
-                map.put(sensor.getName(),value+unit);
+    /**
+    * @Description: ���������������������socket���������������������������������
+            * @Param: [param]
+            * @return: void
+            * @Author: ���������
+            * @Date: 2020/9/30
+            */
+    private void sendWindInfo(Map<String, Object> param) {
+        try {
+            Map<String, Object> map = new HashMap<>();
+            if (param.get("e23") != null && param.get("mac") != null) {
+                map.put("������", param.get("e23"));
+                map.put("mac", param.get("mac"));
+                sendMessage(JSON.toJSONString(map));
             }
-        });
-        map.put("mac",param.get("mac"));
-        return map;
+        } catch (IOException e) {
+            log.error("���������������������������");
+        }
     }
 
+    /**
+    * @Description: ������������mac������accountid���������������������mac������������������
+            * @Param: [param]
+            * @return: void
+            * @Author: ���������
+            * @Date: 2020/9/30
+            */
+    private void sendDeviceInfo(Map<String, Object> param,Map<String,Device> deviceMap) {
+        String deviceMac = (String) param.get("mac");
+        if (mac.equals(deviceMac)) {
+            try {
+                Map<String,Object> sortMap = new LinkedHashMap<>();
+
+                //���������������������������������
+                    Device device =deviceMap.get(mac);
+                    sortMap.put("������",device.getName());
+                    sortMap.put("������",device.getAddress());
+
+
+                //���������������������������������
+                param.forEach((key, value) -> {
+                    Sensor sensor = sensors.get(key);
+                    if (!ObjectUtils.isEmpty(sensor)) {
+                        String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit();
+                        sortMap.put(sensor.getName(), value + unit);
+                    }
+                });
+
+                sendMessage(JSON.toJSONString(sortMap));
+            } catch (IOException e) {
+                log.error("������mac������������������������");
+            }
+        }
+    }
+
+
 }

--
Gitblit v1.8.0