New file |
| | |
| | | package com.moral.webSocketServer; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.moral.entity.Account; |
| | | import com.moral.entity.Device; |
| | | import com.moral.entity.Sensor; |
| | | import com.moral.service.AccountService; |
| | | import com.moral.service.DeviceService; |
| | | import com.moral.service.SensorService; |
| | | import com.moral.util.RabbitMQUtils; |
| | | import com.rabbitmq.client.*; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | import org.springframework.web.bind.annotation.CrossOrigin; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import javax.annotation.Resource; |
| | | import javax.websocket.*; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.CopyOnWriteArraySet; |
| | | import java.util.concurrent.TimeoutException; |
| | | |
| | | @Slf4j |
| | | @ServerEndpoint("/web/ESWebSocket/{param}") |
| | | @Component |
| | | public class ElectronicSWebSocketServer { |
| | | |
| | | public static AccountService accountService; |
| | | |
| | | public static SensorService sensorService; |
| | | |
| | | /** |
| | | * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。 |
| | | */ |
| | | public static CopyOnWriteArraySet<ElectronicSWebSocketServer> webSocketSet = new CopyOnWriteArraySet<ElectronicSWebSocketServer>(); |
| | | |
| | | /** |
| | | * 与某个客户端的连接会话,需要通过它来给客户端发送数据 |
| | | */ |
| | | private Session session; |
| | | |
| | | private String orgId; |
| | | private String accountId; |
| | | private String mac; |
| | | |
| | | private final String exchange = "screens_data"; |
| | | Connection connection; |
| | | Channel channel; |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session, @PathParam("param") String param) { |
| | | |
| | | this.session = session; |
| | | webSocketSet.add(this); |
| | | String[] params = param.split("&"); |
| | | this.accountId = params[0]; |
| | | Map<String, Object> organizationIdByAccountId = accountService.getOrganizationIdByAccountId(this.accountId); |
| | | this.mac = params[1]; |
| | | this.orgId = organizationIdByAccountId.get("organization_id").toString(); |
| | | try { |
| | | connection = RabbitMQUtils.getConnection(); |
| | | channel = connection.createChannel(); |
| | | |
| | | //生成临时队列 |
| | | String queue = channel.queueDeclare().getQueue(); |
| | | //交换机与队列通过routingKey进行绑定 |
| | | String routingKey = ""; |
| | | routingKey = this.orgId + "." + this.mac; |
| | | channel.queueBind(queue, exchange, routingKey); |
| | | //消费消息,手动确认模式。 |
| | | channel.basicQos(30);//预先读取数 |
| | | channel.basicConsume(queue, false, new DefaultConsumer(channel) { |
| | | @Override |
| | | public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { |
| | | //对从MQ中取出的数据做转换,并且发送风速到客户端 |
| | | Map<String,Object> message = (Map) JSON.parse((String) JSON.parse(new String(body))); |
| | | Map<Integer,Object> sortMap = new HashMap<>(); |
| | | message.forEach((key,value)->{ |
| | | SensorSortEnum sensorSortEnum = SensorSortEnum.valueOf(key); |
| | | Map newMap = new HashMap(); |
| | | newMap.put(key,value); |
| | | sortMap.put(sensorSortEnum.getSensorSort(),newMap); |
| | | }); |
| | | |
| | | Set<Map.Entry<Integer, Object>> entries = sortMap.entrySet(); |
| | | List<Map.Entry<Integer, Object>> list = new ArrayList<>(); |
| | | list.addAll(entries); |
| | | Collections.sort(list, new Comparator<Map.Entry<Integer, Object>>() { |
| | | @Override |
| | | public int compare(Map.Entry<Integer, Object> o1, Map.Entry<Integer, Object> o2) { |
| | | return o1.getKey()-o2.getKey(); |
| | | } |
| | | }); |
| | | |
| | | Map<String,Object> resultMap = new LinkedHashMap<>(); |
| | | for (Map.Entry<Integer, Object> entry : list) { |
| | | Map<String,Object> linkedMap = (Map<String, Object>) entry.getValue(); |
| | | linkedMap.forEach((key,value)->{ |
| | | resultMap.put(SensorSortEnum.valueOf(key).getSensorName(),value); |
| | | }); |
| | | } |
| | | Map<String,Object> reMap = new LinkedHashMap<>(); |
| | | List<Sensor> allSensors = sensorService.getAllSensors(); |
| | | for (int i = 0; i <resultMap.keySet().size() ; i++) { |
| | | for (Sensor sensor : allSensors) { |
| | | if (Arrays.asList(resultMap.keySet().toArray(new String[resultMap.keySet().size()])).get(i).equals(sensor.getSensorKey())){ |
| | | String value = resultMap.get(Arrays.asList(resultMap.keySet().toArray(new String[resultMap.keySet().size()])).get(i)).toString(); |
| | | /* String e = Arrays.asList(resultMap.keySet().toArray(new String[resultMap.keySet().size()])).get(i); |
| | | resultMap.remove(e);*/ |
| | | reMap.put(sensor.getName(), value+sensor.getUnit()); |
| | | break; |
| | | }else { |
| | | continue; |
| | | } |
| | | } |
| | | } |
| | | reMap.put("time",resultMap.get("time")); |
| | | |
| | | sendMessage(JSON.toJSONString(reMap)); |
| | | //手动确认 |
| | | channel.basicAck(envelope.getDeliveryTag(), true); |
| | | //判断socket是否已经断开 |
| | | } |
| | | }); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | @OnClose |
| | | public void onClose() { |
| | | webSocketSet.remove(this); // 从set中删除 |
| | | try { |
| | | channel.close(); |
| | | connection.close(); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } catch ( |
| | | TimeoutException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | //接收到客户端消息操作 |
| | | @OnMessage |
| | | public void onMessage(String message, Session session) { |
| | | |
| | | } |
| | | //枚举类,用于按特定规则排序 |
| | | |
| | | enum SensorSortEnum { |
| | | e1(1,"e1"), |
| | | e20(2,"e20"), |
| | | e7(3,"e7"), |
| | | e2(4,"e2"), |
| | | e17(5,"e17"), |
| | | e6(6,"e6"), |
| | | e15(7,"e15"), |
| | | e27(8,"e27"), |
| | | e18(9,"e18"), |
| | | e11(10,"e11"), |
| | | e21(11,"e21"), |
| | | e23(12,"e23"), |
| | | e10(13,"e10"), |
| | | e25(14,"e25"), |
| | | e28(15,"e28"), |
| | | e16(16,"e16"), |
| | | e26(17,"e26"), |
| | | e12(18,"e12"), |
| | | e5(19,"e5"), |
| | | e3(20,"e3"), |
| | | e13(21,"e13"), |
| | | e9(22,"e9"), |
| | | e4(23,"e4"), |
| | | e51(24,"e51"), |
| | | mac(100,"mac"), |
| | | ver(101,"ver"), |
| | | time(102,"time"), |
| | | e8(14,"e8"), |
| | | e94(94,"e94"), |
| | | e92(92,"e92"), |
| | | e40(40,"e40"), |
| | | e93(93,"e93"); |
| | | |
| | | private final Integer sensorSort; |
| | | private final String sensorName; |
| | | |
| | | SensorSortEnum(Integer sensorSort, String sensorName) { |
| | | this.sensorSort = sensorSort; |
| | | this.sensorName = sensorName; |
| | | } |
| | | |
| | | public Integer getSensorSort() { |
| | | return sensorSort; |
| | | } |
| | | |
| | | public String getSensorName() { |
| | | return sensorName; |
| | | } |
| | | } |
| | | |
| | | @OnError |
| | | public void onError(Session session, Throwable error) { |
| | | log.error(error.getMessage()); |
| | | } |
| | | |
| | | public void sendMessage(String message) throws IOException { |
| | | try { |
| | | if (session.isOpen()) { |
| | | this.session.getBasicRemote().sendText(message); |
| | | } |
| | | } catch (IOException e) { |
| | | |
| | | //log.error(e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | |
| | | } |