New file |
| | |
| | | package com.moral.webSocketServer; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.moral.common.util.ParameterUtils; |
| | | import com.moral.entity.Device; |
| | | import com.moral.entity.Sensor; |
| | | import com.moral.service.DeviceService; |
| | | import com.moral.service.SensorService; |
| | | import com.moral.util.RabbitMQUtils; |
| | | import com.rabbitmq.client.*; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import javax.websocket.*; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.io.IOException; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | |
| | | @Slf4j |
| | | @ServerEndpoint("/web/testWebSocket/{param}") |
| | | @Component |
| | | public class BSTestWebsocketServer { |
| | | |
| | | public static DeviceService deviceService; |
| | | |
| | | public static SensorService sensorService; |
| | | |
| | | /** |
| | | * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 |
| | | */ |
| | | private static ConcurrentHashMap<String, BSTestWebsocketServer> webSocketMap = new ConcurrentHashMap<>(); |
| | | /** |
| | | * 与某个客户端的连接会话,需要通过它来给客户端发送数据 |
| | | */ |
| | | private Session session; |
| | | |
| | | private String orgId; |
| | | |
| | | private String accountId; |
| | | |
| | | private String regionCode; |
| | | |
| | | private String mac; |
| | | |
| | | private final String exchange = "screens_data"; |
| | | |
| | | private static Map<String, Sensor> sensors; |
| | | |
| | | |
| | | @PostConstruct |
| | | public void init() { |
| | | sensors = new HashMap<>(); |
| | | List<Sensor> allSensors = sensorService.getAllSensors(); |
| | | for (Sensor sensor : allSensors) { |
| | | sensors.put(sensor.getSensorKey(), sensor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 连接建立成功调用的方法 |
| | | */ |
| | | @OnOpen |
| | | public void onOpen(Session session, @PathParam("param") String param) { |
| | | this.session = session; |
| | | String[] params = param.split("&"); |
| | | this.accountId = params[0]; |
| | | this.orgId = params[1]; |
| | | this.regionCode = params[2]; |
| | | |
| | | if (webSocketMap.containsKey(accountId)) { |
| | | webSocketMap.remove(accountId); |
| | | webSocketMap.put(accountId, this); |
| | | } else { |
| | | webSocketMap.put(accountId, this); |
| | | } |
| | | |
| | | Map<String, Object> paramMap = new HashMap<String, Object>(); |
| | | paramMap.put("orgId", orgId); |
| | | paramMap.put("regionCode", regionCode); |
| | | ParameterUtils.getRegionType4RegionCode(paramMap); |
| | | List<Device> deviceList = deviceService.queryDevice(paramMap); |
| | | |
| | | try { |
| | | Connection connection = RabbitMQUtils.getConnection(); |
| | | final Channel channel = connection.createChannel(); |
| | | //生成临时队列 |
| | | String queue = channel.queueDeclare().getQueue(); |
| | | |
| | | //交换机与队列通过routingKey进行绑定 |
| | | String routingKey = ""; |
| | | for (Device d : deviceList) { |
| | | routingKey = orgId + "." + d.getMac(); |
| | | channel.queueBind(queue, exchange, routingKey); |
| | | } |
| | | |
| | | //消费消息,手动确认模式。 |
| | | channel.basicQos(1);//每次只消费一条数据 |
| | | channel.basicConsume(queue, false, new DefaultConsumer(channel) { |
| | | @Override |
| | | public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| | | Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); |
| | | sendWindInfo(message); |
| | | if(mac!=null&&(!mac.equals(0))) |
| | | sendDeviceInfo(message); |
| | | channel.basicAck(envelope.getDeliveryTag(), false); |
| | | //判断socket是否已经断开 |
| | | if (!webSocketMap.containsKey(accountId)) { |
| | | RabbitMQUtils.closeConnectionChannel(connection, channel); |
| | | } |
| | | } |
| | | }); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | |
| | | } |
| | | |
| | | @OnClose |
| | | public void onClose() { |
| | | if (webSocketMap.containsKey(accountId)) { |
| | | webSocketMap.remove(accountId); |
| | | } |
| | | } |
| | | |
| | | //接收到客户端消息操作 |
| | | @OnMessage |
| | | public void onMessage(String message, Session session) { |
| | | if (!ObjectUtils.isEmpty(message)) { |
| | | System.out.println(message); |
| | | Map<String, Object> map = JSON.parseObject(message); |
| | | this.mac = (String) map.get("mac"); |
| | | } |
| | | } |
| | | |
| | | @OnError |
| | | public void onError(Session session, Throwable error) { |
| | | log.error(error.getMessage()); |
| | | } |
| | | |
| | | public void sendMessage(String message) throws IOException { |
| | | try { |
| | | if (session.isOpen()) { |
| | | this.session.getBasicRemote().sendText(message); |
| | | } |
| | | } catch (IOException e) { |
| | | |
| | | //log.error(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | //发送风向标数据 |
| | | private void sendWindInfo(Map<String, Object> param) { |
| | | try { |
| | | Map<String, Object> map = new HashMap<>(); |
| | | if (param.get("e23") != null && param.get("mac") != null) { |
| | | map.put("风向", param.get("e23")); |
| | | map.put("mac", param.get("mac")); |
| | | sendMessage(JSON.toJSONString(map)); |
| | | } |
| | | } catch (IOException e) { |
| | | log.error("发送风向标数据异常"); |
| | | } |
| | | } |
| | | |
| | | //根据mac发送device数据 |
| | | private void sendDeviceInfo(Map<String, Object> param) { |
| | | String deviceMac = (String) param.get("mac"); |
| | | if (mac.equals(deviceMac)) { |
| | | try { |
| | | Map<String, Object> map = new HashMap<>(); |
| | | param.forEach((key, value) -> { |
| | | Sensor sensor = sensors.get(key); |
| | | if (!ObjectUtils.isEmpty(sensor)) { |
| | | String unit = ObjectUtils.isEmpty(sensor.getUnit()) ? "" : (String) sensor.getUnit(); |
| | | map.put(sensor.getName(), value + unit); |
| | | } |
| | | }); |
| | | String mac = (String) param.get("mac"); |
| | | map.put("mac", mac); |
| | | sendMessage(JSON.toJSONString(map)); |
| | | } catch (IOException e) { |
| | | log.error("根据mac发送设备数据异常"); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | } |