From 11bb93e34b91a8ba4c5588dbe907988a1f279859 Mon Sep 17 00:00:00 2001 From: kaiyu <404897439@qq.com> Date: Tue, 15 Dec 2020 14:06:00 +0800 Subject: [PATCH] 增加新的websocket版本 --- src/main/java/com/moral/webSocketServer/BSAQIWebSocketServerTest.java | 166 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 166 insertions(+), 0 deletions(-) diff --git a/src/main/java/com/moral/webSocketServer/BSAQIWebSocketServerTest.java b/src/main/java/com/moral/webSocketServer/BSAQIWebSocketServerTest.java new file mode 100644 index 0000000..ac6ada1 --- /dev/null +++ b/src/main/java/com/moral/webSocketServer/BSAQIWebSocketServerTest.java @@ -0,0 +1,166 @@ +package com.moral.webSocketServer; + +import com.alibaba.fastjson.JSON; +import com.moral.entity.Device; +import com.moral.entity.Sensor; +import com.moral.service.DeviceService; +import com.moral.service.SensorService; +import com.moral.util.RabbitMQUtils; +import com.rabbitmq.client.*; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.websocket.*; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Slf4j +@ServerEndpoint("/web/AQIWebSocketTest/{param}") +@Component +public class BSAQIWebSocketServerTest { + + public static SensorService sensorService; + + public static DeviceService deviceService; + + /** + * concurrent������������������Set���������������������������������������WebSocket��������� + */ + private static ConcurrentHashMap<String, BSAQIWebSocketServerTest> webSocketMap = new ConcurrentHashMap<>(); + /** + * ������������������������������������������������������������������������������ + */ + private Session session; + + private String orgId; + + private String accountId; + + private String mac; + + private final String exchange = "screens_data"; + + private static Map<String, Sensor> sensors; + + @PostConstruct + public void init() { + sensors = new HashMap<>(); + List<Sensor> allSensors = sensorService.getAllSensors(); + for (Sensor sensor : allSensors) { + sensors.put(sensor.getSensorKey(), sensor); + } + } + + @OnOpen + public void onOpen(Session session, @PathParam("param") String param) { + this.session = session; + String[] params = param.split("&"); + this.accountId = params[0]; + this.orgId = params[1]; + this.mac = params[2]; + + if (webSocketMap.containsKey(accountId)) { + webSocketMap.remove(accountId); + webSocketMap.put(accountId, this); + } else { + webSocketMap.put(accountId, this); + } + + + try { + Connection connection = RabbitMQUtils.getConnection(); + final Channel channel = connection.createChannel(); + //������������������ + String queue = channel.queueDeclare().getQueue(); + //������������������������routingKey������������ + String routingKey = ""; + routingKey = this.orgId + "." + this.mac; + channel.queueBind(queue, exchange, routingKey); + + + //������������,��������������������� + channel.basicQos(30);//��������������� + channel.basicConsume(queue, false, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { + try{ + //������MQ������������������������������������������������������������ + Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); + Device device = deviceService.getDeviceByMac(mac,false); + sendDeviceInfo(message, device); + //������������ + channel.basicAck(envelope.getDeliveryTag(), true); + //������socket������������������ + if (!webSocketMap.containsKey(accountId)) { + RabbitMQUtils.closeConnectionChannel(connection, channel); + } + }catch (Exception e){ + log.error(e.getMessage()); + RabbitMQUtils.closeConnectionChannel(connection, channel); + } + } + }); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @OnClose + public void onClose() { + if (webSocketMap.containsKey(accountId)) { + webSocketMap.remove(accountId); + } + } + + //������������������������������ + @OnMessage + public void onMessage(String message, Session session) { + + } + + @OnError + public void onError(Session session, Throwable error) { + log.error(error.getMessage()); + } + + public void sendMessage(String message) throws IOException { + try { + if (session.isOpen()) { + this.session.getBasicRemote().sendText(message); + } + } catch (IOException e) { + + //log.error(e.getMessage()); + } + } + + /** + * @Description: ������������mac������accountid���������������������mac������������������ + * @Param: [param] + * @return: void + * @Author: ��������� + * @Date: 2020/9/30 + */ + private void sendDeviceInfo(Map<String, Object> param, Device device) { + + try { + Map<String, Object> sortMap = new LinkedHashMap<>(); + //��������������������������������� + sortMap.put("name", device.getName()); + sortMap.put("address", device.getAddress()); + //��������������������������������� + sortMap.putAll(param); + sendMessage(JSON.toJSONString(sortMap)); + } catch (IOException e) { + log.error("������mac������������������������"); + } + } + +} -- Gitblit v1.8.0