src/main/java/com/moral/common/bean/ResultBean.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/com/moral/config/WebSocketConfig.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/com/moral/controller/ScreenController.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/com/moral/controller/WebController.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/com/moral/util/RabbitMQUtils.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/com/moral/webSocketServer/BSWebsocketServer.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/com/moral/webSocketServer/WebSocketServer.java | ●●●●● patch | view | raw | blame | history | |
src/main/java/com/moral/webSocketServer/WebSocketServerNew.java | ●●●●● patch | view | raw | blame | history |
src/main/java/com/moral/common/bean/ResultBean.java
@@ -32,6 +32,7 @@ this.message = e.toString(); this.code = FAIL; this.errno = 1; } public ResultBean(T data) { src/main/java/com/moral/config/WebSocketConfig.java
@@ -1,6 +1,8 @@ package com.moral.config; import com.moral.service.DeviceService; import com.moral.service.SensorService; import com.moral.webSocketServer.BSWebsocketServer; import com.moral.webSocketServer.WebSocketServer; import com.moral.webSocketServer.WebSocketServerNew; @@ -15,10 +17,10 @@ public class WebSocketConfig { //本地测试需要将注释解开 /*@Bean @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); }*/ } @Bean public MessageConverter jsonMessageConverter() { @@ -29,6 +31,11 @@ public void setMessageService(DeviceService deviceService){ WebSocketServer.deviceService=deviceService; WebSocketServerNew.deviceService=deviceService; BSWebsocketServer.deviceService=deviceService; } @Autowired public void setSensorService( SensorService sensorService){ BSWebsocketServer.sensorService=sensorService; } } src/main/java/com/moral/controller/ScreenController.java
@@ -1614,6 +1614,7 @@ @RequestMapping(value = "/newMap-page", method = RequestMethod.GET) public ModelAndView newMap(ModelAndView model, @RequestParam("areaCode") long code, @RequestParam("accountId") int accountId) { Account account = accountService.getAccountById(accountId); List<MonitorPoint> monitorPointList = monitorPointService.getMonitorPointListByAccountId(accountId); String regionName = areaService.queryFullNameByCode(code); src/main/java/com/moral/controller/WebController.java
@@ -136,64 +136,6 @@ return resultMap; } //弃用 @UserLoginToken @RequestMapping("getAccountInfoTest") public Map<String, Object> getAccountInfoTest(@RequestBody Map<String, Object> parameters) { Map<String, Object> resultMap = new HashMap<String, Object>(); System.out.println(parameters); if (!(parameters.containsKey("account") && parameters.containsKey("password"))) { resultMap.put("msg", "用户名及密码不允许为空!"); resultMap.put("accountId", -1); } else { resultMap = accountService.screenLoginNew(parameters); // 添加返回行政区信息字符串操作 Object orgId = resultMap.get("orgId"); if (resultMap.get("orgId") != null && resultMap.get("orgId") instanceof Integer) { StringBuilder areaNamesBuilder = new StringBuilder("中国"); //判断是否为本公司开发者 if (!((Integer) orgId).equals(dictionaryDataService.querySupperOrgId())) { //不是本公司开发者则获取用户所属地区 Organization organization = organizationService.getOrganizationById((Integer) orgId); if (organization.getAreaNames() != null) { Map<String, String> areaNameMap = BeanUtils.beanToMap(organization.getAreaNames()); List<String> names = areaNameMap.entrySet().stream().filter(item -> { return item.getValue() != null; }).map(item -> { return item.getValue(); }).collect(Collectors.toList()); AreaNames areaNames = organization.getAreaNames(); areaNamesBuilder.append("/"); areaNamesBuilder.append(String.join("/", names)); } // 企业用户 if (organization.getRank() != null && organization.getRank() == 0) { resultMap.put("type", "enterprise"); } else { resultMap.put("type", "government"); } Number mapAreaCode = null; if (organization.getVillageCode() != null) { mapAreaCode = organization.getVillageCode(); } else if (organization.getTownCode() != null) { mapAreaCode = organization.getTownCode(); } else if (organization.getAreaCode() != null) { mapAreaCode = organization.getAreaCode(); } else if (organization.getCityCode() != null) { mapAreaCode = organization.getCityCode(); } else if (organization.getProvinceCode() != null) { mapAreaCode = organization.getProvinceCode(); } resultMap.put("mapAreaCode", mapAreaCode.toString()); } resultMap.put("mapPath", areaNamesBuilder.toString()); String accountId = String.valueOf(resultMap.get("accountId")); resultMap.put("token", webTokenService.getToken(accountId)); } } return resultMap; } @UserLoginToken @GetMapping("report_avg_datas") src/main/java/com/moral/util/RabbitMQUtils.java
New file @@ -0,0 +1,47 @@ package com.moral.util; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class RabbitMQUtils { private static ConnectionFactory factory ; static{ factory = new ConnectionFactory(); factory.setHost("47.96.15.25");//设置连接地址 factory.setPort(5672);//设置端口 factory.setUsername("guest"); factory.setPassword("guest_pass"); factory.setVirtualHost("/");//设置虚拟主机 } public static Connection getConnection(){ try{ return factory.newConnection(); }catch (Exception e){ e.printStackTrace(); } return null; } public static void closeConnectionChannel(Connection connection, Channel channel){ try{ if(channel!=null) channel.close(); if(connection!=null) connection.close(); }catch (Exception e){ e.printStackTrace(); } } public static void closeChannel( Channel channel){ try{ if(channel!=null) channel.close(); }catch (Exception e){ e.printStackTrace(); } } } src/main/java/com/moral/webSocketServer/BSWebsocketServer.java
New file @@ -0,0 +1,159 @@ package com.moral.webSocketServer; import com.alibaba.fastjson.JSON; import com.moral.common.util.ParameterUtils; import com.moral.entity.Device; import com.moral.entity.Sensor; import com.moral.service.DeviceService; import com.moral.service.SensorService; import com.rabbitmq.client.*; import com.moral.util.RabbitMQUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import javax.annotation.PostConstruct; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j @ServerEndpoint("/web/webSocket/{param}") @Component public class BSWebsocketServer { public static DeviceService deviceService; public static SensorService sensorService; /** * concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。 */ private static ConcurrentHashMap<String, BSWebsocketServer> webSocketMap = new ConcurrentHashMap<>(); /** * 与某个客户端的连接会话,需要通过它来给客户端发送数据 */ private Session session; private String orgId; private String accountId; private String regionCode; private final String exchange = "screens_data"; private static Map<String, Sensor> sensors; @PostConstruct public void init() { sensors = new HashMap<>(); List<Sensor> allSensors = sensorService.getAllSensors(); for (Sensor sensor : allSensors) { sensors.put(sensor.getSensorKey(), sensor); } } /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("param") String param) { this.session = session; String[] params = param.split("&"); this.orgId = params[1]; this.accountId = params[0]; this.regionCode = params[2]; if (webSocketMap.containsKey(accountId)) { webSocketMap.remove(accountId); webSocketMap.put(accountId, this); } else { webSocketMap.put(accountId, this); } Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("orgId", orgId); paramMap.put("regionCode", regionCode); ParameterUtils.getRegionType4RegionCode(paramMap); List<Device> deviceList = deviceService.queryDevice(paramMap); try { Connection connection = RabbitMQUtils.getConnection(); final Channel channel = connection.createChannel(); //生成临时队列 String queue = channel.queueDeclare().getQueue(); //交换机与队列通过routingKey进行绑定 String routingKey = ""; for (Device d : deviceList) { routingKey = orgId + "." + d.getMac(); channel.queueBind(queue, exchange, routingKey); } //消费消息,手动确认模式。 channel.basicConsume(queue, false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { Map message = (Map) JSON.parse((String) JSON.parse(new String(body))); message = messageFormat(message); sendMessage(JSON.toJSONString(message)); channel.basicAck(envelope.getDeliveryTag(), false); } }); } catch (IOException e) { e.printStackTrace(); } } @OnClose public void onClose() { if (webSocketMap.containsKey(accountId)) { webSocketMap.remove(accountId); } } //接收到客户端消息操作 @OnMessage public void onMessage(String message, Session session) { } @OnError public void onError(Session session, Throwable error) { log.error(error.getMessage()); } public void sendMessage(String message) throws IOException { try { if (session.isOpen()) { this.session.getBasicRemote().sendText(message); } } catch (IOException e) { //log.error(e.getMessage()); } } //将MQ取出的数据进行格式化处理 private Map<String,Object> messageFormat( Map<String,Object> param) { Map<String, Object> map = new HashMap<>(); param.forEach((key, value) -> { ; Sensor sensor = sensors.get(key); if (!ObjectUtils.isEmpty(sensor)) { String unit = ObjectUtils.isEmpty(sensor.getUnit())?"":(String)sensor.getUnit(); map.put(sensor.getName(),value+unit); } }); map.put("mac",param.get("mac")); return map; } } src/main/java/com/moral/webSocketServer/WebSocketServer.java
@@ -62,17 +62,21 @@ this.session = session; //这个一定要写,第一次很容易忽略! webSocketSet.add(this); int flag = param.indexOf("&"); int regionCodeIndex = param.indexOf("_"); orgId = param.substring(0, flag); accountId = param.substring(flag + 1,regionCodeIndex); regionCode = param.substring(regionCodeIndex + 1); String QUEUE_NAME = "deviceInfo_" + accountId; Map<String, Object> paramMap = new HashMap<String, Object>(); paramMap.put("orgId", orgId); paramMap.put("regionCode", regionCode); ParameterUtils.getRegionType4RegionCode(paramMap); List<Device> deviceList = deviceService.queryDevice(paramMap); try { //打开连接和创建频道,与发送端一样 ConnectionFactory factory = new ConnectionFactory(); src/main/java/com/moral/webSocketServer/WebSocketServerNew.java
@@ -52,12 +52,13 @@ this.session = session; //这个一定要写,第一次很容易忽略! webSocketSet.add(this); //取参 String[] flag=param.split("&"); String orgId = flag[0]; String accountId = flag[1]; String monitPointId = flag[2]; String p=flag[3]; String QUEUE_NAME = "deviceData_" + accountId; Map<String, Object> paramMap = new HashMap<String, Object>(); List<Device> deviceList = deviceService.getDeviceById2(Integer.parseInt(monitPointId));