package com.moral.webSocketServer; import com.alibaba.fastjson.JSON; import com.moral.entity.Account; import com.moral.entity.Device; import com.moral.entity.Sensor; import com.moral.entity.SensorUnit; import com.moral.service.AccountService; import com.moral.service.DeviceService; import com.moral.service.SensorService; import com.moral.service.SensorUnitService; import com.moral.util.RabbitMQUtils; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.apache.commons.jexl3.JexlBuilder; import org.apache.commons.jexl3.JexlEngine; import org.apache.commons.jexl3.JexlExpression; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.CrossOrigin; import javax.annotation.PostConstruct; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.TimeoutException; @Slf4j @ServerEndpoint("/web/ESWebSocket/{param}") @Component public class ElectronicSWebSocketServer { public static AccountService accountService; public static SensorService sensorService; public static SensorUnitService sensorUnitService; /** * concurrent包的线程安全Set,用来存放每个客户端对应的WebSocket对象。 */ public static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String orgId; private String accountId; private String mac; private final String exchange = "screens_data"; Connection connection; Channel channel; @OnOpen public void onOpen(Session session, @PathParam("param") String param) { this.session = session; webSocketSet.add(this); String[] params = param.split("&"); this.accountId = params[0]; Map organizationIdByAccountId = accountService.getOrganizationIdByAccountId(this.accountId); this.mac = params[1]; this.orgId = organizationIdByAccountId.get("organization_id").toString(); try { connection = RabbitMQUtils.getConnection(); channel = connection.createChannel(); //生成临时队列 String queue = channel.queueDeclare().getQueue(); //交换机与队列通过routingKey进行绑定 String routingKey = ""; routingKey = this.orgId + "." + this.mac; channel.queueBind(queue, exchange, routingKey); //消费消息,手动确认模式。 channel.basicQos(30);//预先读取数 channel.basicConsume(queue, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //对从MQ中取出的数据做转换,并且发送风速到客户端 Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); Map sortMap = new HashMap<>(); message.forEach((key,value)->{ SensorSortEnum sensorSortEnum = SensorSortEnum.valueOf(key); Map newMap = new HashMap(); newMap.put(key,value); sortMap.put(sensorSortEnum.getSensorSort(),newMap); }); Set> entries = sortMap.entrySet(); List> list = new ArrayList<>(); list.addAll(entries); Collections.sort(list, new Comparator>() { @Override public int compare(Map.Entry o1, Map.Entry o2) { return o1.getKey()-o2.getKey(); } }); Map resultMap = new LinkedHashMap<>(); for (Map.Entry entry : list) { Map linkedMap = (Map) entry.getValue(); linkedMap.forEach((key,value)->{ resultMap.put(SensorSortEnum.valueOf(key).getSensorName(),value); }); } Map reMap = new LinkedHashMap<>(); List allSensors = sensorService.getAllSensors(); for (int i = 0; i sensorUnits = sensorUnitService.queryListBySensorId(sensor.getId()); if (sensorUnits.size()>1){ String s = sensorUnits.get(1).getRules(); s = s.replace("{0}",value); JexlEngine jexlEngine = new JexlBuilder().create(); JexlExpression jexlExpression = jexlEngine.createExpression(s); Object evaluate = jexlExpression.evaluate(null); BigDecimal bg = new BigDecimal(evaluate.toString()); value = bg.setScale(3, BigDecimal.ROUND_HALF_UP).toString(); reMap.put(sensor.getName(), value+sensorUnits.get(0).getName()); }else { String s = sensorUnits.get(0).getRules(); s = s.replace("{0}",value); JexlEngine jexlEngine = new JexlBuilder().create(); JexlExpression jexlExpression = jexlEngine.createExpression(s); Object evaluate = jexlExpression.evaluate(null); BigDecimal bg = new BigDecimal(evaluate.toString()); value = bg.setScale(2, BigDecimal.ROUND_HALF_UP).toString(); reMap.put(sensor.getName(), value+sensorUnits.get(0).getName()); } }else { if (sensor.getName().equals("挥发性有机气体(TVOC)")){ BigDecimal bg = new BigDecimal(Double.parseDouble(value)*2.5); value = bg.setScale(2, BigDecimal.ROUND_HALF_UP).toString(); reMap.put(sensor.getName(), value+sensor.getUnit()); }else { reMap.put(sensor.getName(), value+sensor.getUnit()); } } break; }else { continue; } } } reMap.put("time",resultMap.get("time")); sendMessage(JSON.toJSONString(reMap)); //手动确认 channel.basicAck(envelope.getDeliveryTag(), true); //判断socket是否已经断开 } }); } catch (IOException e) { e.printStackTrace(); } } @OnClose public void onClose() { webSocketSet.remove(this); // 从set中删除 try { channel.close(); connection.close(); } catch (IOException e) { e.printStackTrace(); } catch ( TimeoutException e) { e.printStackTrace(); } } //接收到客户端消息操作 @OnMessage public void onMessage(String message, Session session) { } //枚举类,用于按特定规则排序 enum SensorSortEnum { e1(1,"e1"), e20(2,"e20"), e7(3,"e7"), e2(4,"e2"), e17(5,"e17"), e6(6,"e6"), e15(7,"e15"), e27(8,"e27"), e18(9,"e18"), e11(10,"e11"), e21(11,"e21"), e23(12,"e23"), e10(13,"e10"), e25(14,"e25"), e28(15,"e28"), e16(16,"e16"), e26(17,"e26"), e12(18,"e12"), e5(19,"e5"), e3(20,"e3"), e13(21,"e13"), e9(22,"e9"), e4(23,"e4"), e51(24,"e51"), mac(100,"mac"), ver(101,"ver"), time(102,"time"), e8(14,"e8"), e94(94,"e94"), e92(92,"e92"), e40(40,"e40"), e93(93,"e93"); private final Integer sensorSort; private final String sensorName; SensorSortEnum(Integer sensorSort, String sensorName) { this.sensorSort = sensorSort; this.sensorName = sensorName; } public Integer getSensorSort() { return sensorSort; } public String getSensorName() { return sensorName; } } @OnError public void onError(Session session, Throwable error) { log.error(error.getMessage()); } public void sendMessage(String message) throws IOException { try { if (session.isOpen()) { this.session.getBasicRemote().sendText(message); } } catch (IOException e) { //log.error(e.getMessage()); } } }