|  |  |  | 
|---|
|  |  |  | RedisService redisService; | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void onMessage(Message msg) { | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | String message = null; | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | message = new String(msg.getBody(), "utf-8"); | 
|---|
|  |  |  | } catch (UnsupportedEncodingException e) { | 
|---|
|  |  |  | logger.warn(e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | String message = null; | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | message = new String(msg.getBody(), "utf-8"); | 
|---|
|  |  |  | } catch (UnsupportedEncodingException e) { | 
|---|
|  |  |  | logger.warn(e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | Map<String, String> msgData = JSON.parseObject(message, new TypeReference<Map<String, String>>() {}); | 
|---|
|  |  |  | String mac = msgData.get("mac"); | 
|---|
|  |  |  | String ver = msgData.get("ver"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Map<String, String> msgData = JSON.parseObject(message, new TypeReference<Map<String, String>>() {}); | 
|---|
|  |  |  | String mac = msgData.get("mac"); | 
|---|
|  |  |  | String ver = msgData.get("ver"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if(StringUtils.isEmpty(ver) || StringUtils.isEmpty(mac)) { | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //获取缓存中的校准值map | 
|---|
|  |  |  | Map<String, Float> adjustMap = redisService.getAdjustsByMac(mac); | 
|---|
|  |  |  | if(adjustMap!=null&&!adjustMap.isEmpty()) { | 
|---|
|  |  |  | for (Map.Entry<String, Float> entry : adjustMap.entrySet()) { | 
|---|
|  |  |  | String key = entry.getKey(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | Float value = entry.getValue(); | 
|---|
|  |  |  | Float dataValue = Float.valueOf(msgData.get(key)) ; | 
|---|
|  |  |  | msgData.put(key, String.valueOf(dataValue + value)); | 
|---|
|  |  |  | if(StringUtils.isEmpty(ver) || StringUtils.isEmpty(mac)) { | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //获取缓存中的校准值map | 
|---|
|  |  |  | Map<String, Float> adjustMap = redisService.getAdjustsByMac(mac); | 
|---|
|  |  |  | if(adjustMap!=null&&!adjustMap.isEmpty()) { | 
|---|
|  |  |  | for (Map.Entry<String, Float> entry : adjustMap.entrySet()) { | 
|---|
|  |  |  | String key = entry.getKey(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | Float value = entry.getValue(); | 
|---|
|  |  |  | Float dataValue = Float.valueOf(msgData.get(key)) ; | 
|---|
|  |  |  | msgData.put(key, String.valueOf(dataValue + value)); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //        System.out.println(message); | 
|---|
|  |  |  | //        System.out.println(JSON.toJSONString(msgData)); | 
|---|
|  |  |  | //            System.out.println(message); | 
|---|
|  |  |  | //            System.out.println(JSON.toJSONString(msgData)); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | String new_message = JSON.toJSONString(msgData); | 
|---|
|  |  |  | rabbitTemplate.convertAndSend("monitors_data2", "", new_message.getBytes()); | 
|---|
|  |  |  | //        rabbitTemplate.send("monitors_data2", "", new Message(JSON.toJSONString(msgData).getBytes(), new MessageProperties())); | 
|---|
|  |  |  | String new_message = JSON.toJSONString(msgData); | 
|---|
|  |  |  | rabbitTemplate.convertAndSend("monitors_data2", "", new_message.getBytes()); | 
|---|
|  |  |  | //            rabbitTemplate.send("monitors_data2", "", new Message(JSON.toJSONString(msgData).getBytes(), new MessageProperties())); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //        int state = (new Random()).nextInt(4) % 5; //TODO | 
|---|
|  |  |  | int state = detEquState(msgData); | 
|---|
|  |  |  | //        rabbitTemplate.convertAndSend("monitors_alarm", "", "{\"mac\": \"" + mac + "\", \"state\": " + state + "}"); | 
|---|
|  |  |  | //            int state = (new Random()).nextInt(4) % 5; //TODO | 
|---|
|  |  |  | int state = detEquState(msgData); | 
|---|
|  |  |  | //            rabbitTemplate.convertAndSend("monitors_alarm", "", "{\"mac\": \"" + mac + "\", \"state\": " + state + "}"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //更新设备状态 | 
|---|
|  |  |  | jobDao.updateStateByMac(mac, state); | 
|---|
|  |  |  | //更新设备状态 | 
|---|
|  |  |  | jobDao.updateStateByMac(mac, state); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //        //保存原始数据 | 
|---|
|  |  |  | //        taskDao.insertTologger(mac, message, getDate()); | 
|---|
|  |  |  | mongoTemplate.insert(message, "logger"); | 
|---|
|  |  |  | //            //保存原始数据 | 
|---|
|  |  |  | //            taskDao.insertTologger(mac, message, getDate()); | 
|---|
|  |  |  | mongoTemplate.insert(message, "logger"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //保存历史记录 | 
|---|
|  |  |  | List<History> histories = new ArrayList<History>(); | 
|---|
|  |  |  | List<Sensor> sensorList = taskDao.selectFromsensorByver(ver); | 
|---|
|  |  |  | for (Sensor sensor : sensorList) { | 
|---|
|  |  |  | String key = sensor.getMac_key(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | //taskDao.insertTohistory(mac, sensor.getSensor(), key, msgData.get(key)); | 
|---|
|  |  |  | History history = new History(); | 
|---|
|  |  |  | history.setMac(mac); | 
|---|
|  |  |  | history.setMac_key(key); | 
|---|
|  |  |  | history.setTime(new Date(Long.parseLong(msgData.get("time")))); | 
|---|
|  |  |  | history.setSensor(sensor.getSensor()); | 
|---|
|  |  |  | history.setMac_value(Double.valueOf(msgData.get(key))); | 
|---|
|  |  |  | histories.add(history); | 
|---|
|  |  |  | //保存历史记录 | 
|---|
|  |  |  | List<History> histories = new ArrayList<History>(); | 
|---|
|  |  |  | List<Sensor> sensorList = taskDao.selectFromsensorByver(ver); | 
|---|
|  |  |  | for (Sensor sensor : sensorList) { | 
|---|
|  |  |  | String key = sensor.getMac_key(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | //taskDao.insertTohistory(mac, sensor.getSensor(), key, msgData.get(key)); | 
|---|
|  |  |  | History history = new History(); | 
|---|
|  |  |  | history.setMac(mac); | 
|---|
|  |  |  | history.setMac_key(key); | 
|---|
|  |  |  | history.setTime(new Date(Long.parseLong(msgData.get("time")))); | 
|---|
|  |  |  | history.setSensor(sensor.getSensor()); | 
|---|
|  |  |  | history.setMac_value(Double.valueOf(msgData.get(key))); | 
|---|
|  |  |  | histories.add(history); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | jobDao.batchInsertHistory(histories); | 
|---|
|  |  |  | jobDao.batchInsertHistory(histories); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //mongoTemplate.insert(JSON.toJSONString(histories), "data"); | 
|---|
|  |  |  | mongoTemplate.insert(new_message, "data"); | 
|---|
|  |  |  | //mongoTemplate.insert(JSON.toJSONString(histories), "data"); | 
|---|
|  |  |  | mongoTemplate.insert(new_message, "data"); | 
|---|
|  |  |  | } catch (Exception e) { | 
|---|
|  |  |  | //打印错误 | 
|---|
|  |  |  | logger.error(e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //三级警报值阀值 | 
|---|
|  |  |  | private static Map<String,Double[]> alarmLevles =new HashMap<String, Double[]>(); | 
|---|
|  |  |  | 
|---|
|  |  |  | //根据三级警报阀值,确定设备状态 | 
|---|
|  |  |  | private int detEquState(Map<String,String> data) { | 
|---|
|  |  |  | int state = 0; | 
|---|
|  |  |  | data.remove("mac");//不需要存储mac地址 | 
|---|
|  |  |  | Map<String, Object> equMap = new HashMap<String, Object>(); | 
|---|
|  |  |  | String mac = data.get("mac"); | 
|---|
|  |  |  | data.remove("mac");//不需要存储mac地址 | 
|---|
|  |  |  | equMap.putAll(data); | 
|---|
|  |  |  | if(data!=null) { | 
|---|
|  |  |  | for(String key:alarmLevles.keySet()) { | 
|---|
|  |  |  | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | equMap.put("state", state);//设备状态 | 
|---|
|  |  |  | redisService.setEquState(data.get("mac"),equMap); | 
|---|
|  |  |  | redisService.setEquState(mac,equMap); | 
|---|
|  |  |  | return state; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|