From bafbd7130529e2a2c1e4cb6461eb5649c53df027 Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Tue, 24 Nov 2020 15:58:25 +0800 Subject: [PATCH] 添加单设备AQI参数websocket --- src/main/java/com/moral/webSocketServer/BSWebsocketServer.java | 108 ++++++++++++++++++++++++++++++++++++++++-------------- 1 files changed, 80 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java index 7329c34..5e135c9 100644 --- a/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java +++ b/src/main/java/com/moral/webSocketServer/BSWebsocketServer.java @@ -6,8 +6,8 @@ import com.moral.entity.Sensor; import com.moral.service.DeviceService; import com.moral.service.SensorService; -import com.rabbitmq.client.*; import com.moral.util.RabbitMQUtils; +import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; @@ -18,13 +18,15 @@ import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j -@ServerEndpoint("/web/webSocket/{param}") +@ServerEndpoint("/web/WebSocket/{param}") @Component + public class BSWebsocketServer { public static DeviceService deviceService; @@ -32,7 +34,7 @@ public static SensorService sensorService; /** - * concurrent������������������Set���������������������������������������MyWebSocket��������� + * concurrent������������������Set���������������������������������������WebSocket��������� */ private static ConcurrentHashMap<String, BSWebsocketServer> webSocketMap = new ConcurrentHashMap<>(); /** @@ -46,9 +48,12 @@ private String regionCode; + private String mac; + private final String exchange = "screens_data"; private static Map<String, Sensor> sensors; + @PostConstruct @@ -83,32 +88,42 @@ paramMap.put("regionCode", regionCode); ParameterUtils.getRegionType4RegionCode(paramMap); List<Device> deviceList = deviceService.queryDevice(paramMap); + Map<String,Device> deviceMap = new HashMap<>(); try { Connection connection = RabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); //������������������ String queue = channel.queueDeclare().getQueue(); - //������������������������routingKey������������ String routingKey = ""; for (Device d : deviceList) { + deviceMap.put(d.getMac(),d); routingKey = orgId + "." + d.getMac(); channel.queueBind(queue, exchange, routingKey); } + + //������������,��������������������� - channel.basicQos(1);//��������������������������� + channel.basicQos(30);//��������������� channel.basicConsume(queue, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + //������MQ������������������������������������������������������������ Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); - message = messageFormat(message,deviceList); - sendMessage(JSON.toJSONString(message)); - channel.basicAck(envelope.getDeliveryTag(), false); + sendWindInfo(message); + + //���������������������������������������mac���������������������������������mac������������ + if(mac!=null&&(!mac.equals(0))) + sendDeviceInfo(message,deviceMap); + + //������������ + channel.basicAck(envelope.getDeliveryTag(), true); + //������socket������������������ if (!webSocketMap.containsKey(accountId)) { - RabbitMQUtils.closeConnectionChannel(connection,channel); + RabbitMQUtils.closeConnectionChannel(connection, channel); } } }); @@ -128,7 +143,10 @@ //������������������������������ @OnMessage public void onMessage(String message, Session session) { - + if (!ObjectUtils.isEmpty(message)) { + Map<String, Object> map = JSON.parseObject(message); + this.mac = (String) map.get("mac"); + } } @OnError @@ -142,31 +160,65 @@ this.session.getBasicRemote().sendText(message); } } catch (IOException e) { + //log.error(e.getMessage()); } } - //���MQ��������������������������������������������������������������� - private Map<String,Object> messageFormat( Map<String,Object> param,List<Device> deviceList) { - Map<String, Object> map = new HashMap<>(); - param.forEach((key, value) -> { - Sensor sensor = sensors.get(key); - if (!ObjectUtils.isEmpty(sensor)) { - String unit = ObjectUtils.isEmpty(sensor.getUnit())?"":(String)sensor.getUnit(); - map.put(sensor.getName(),value+unit); + /** + * @Description: ���������������������socket��������������������������������� + * @Param: [param] + * @return: void + * @Author: ��������� + * @Date: 2020/9/30 + */ + private void sendWindInfo(Map<String, Object> param) { + try { + Map<String, Object> map = new HashMap<>(); + if (param.get("e23") != null && param.get("mac") != null) { + map.put("������", param.get("e23")); + map.put("mac", param.get("mac")); + sendMessage(JSON.toJSONString(map)); } - }); - String mac = (String) param.get("mac"); - for (Device device : deviceList) { - if(mac.equals(device.getMac())){ - map.put("������",device.getLatitude()); - map.put("������",device.getLongitude()); - map.put("������",device.getState()); - break; + } catch (IOException e) { + log.error("���������������������������"); + } + } + + /** + * @Description: ������������mac������accountid���������������������mac������������������ + * @Param: [param] + * @return: void + * @Author: ��������� + * @Date: 2020/9/30 + */ + private void sendDeviceInfo(Map<String, Object> param,Map<String,Device> deviceMap) { + String deviceMac = (String) param.get("mac"); + if (mac.equals(deviceMac)) { + try { + Map<String,Object> sortMap = new LinkedHashMap<>(); + + //��������������������������������� + Device device =deviceMap.get(mac); + sortMap.put("������",device.getName()); + sortMap.put("������",device.getAddress()); + + + //��������������������������������� + param.forEach((key, value) -> { + Sensor sensor = sensors.get(key); + if (!ObjectUtils.isEmpty(sensor)) { + String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit(); + sortMap.put(sensor.getName(), value + unit); + } + }); + + sendMessage(JSON.toJSONString(sortMap)); + } catch (IOException e) { + log.error("������mac������������������������"); } } - map.put("mac",mac); - return map; } + } -- Gitblit v1.8.0