package com.moral.monitor.listener; import java.io.IOException; import java.io.InputStreamReader; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Value; import org.springframework.core.io.ClassPathResource; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.util.StringUtils; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONReader; import com.alibaba.fastjson.TypeReference; import com.moral.monitor.dao.JobDao; import com.moral.monitor.dao.TaskDao; import com.moral.monitor.entity.History; import com.moral.monitor.entity.Sensor; import com.moral.monitor.service.RedisService; import com.moral.monitor.util.RedisUtil; public class TaskListener implements MessageListener { @Resource RabbitTemplate rabbitTemplate; @Resource JobDao jobDao; @Resource TaskDao taskDao; @Resource RedisTemplate redisTemplate; @Resource protected MongoTemplate mongoTemplate; private Logger logger = LoggerFactory.getLogger(TaskListener.class); @Resource RedisService redisService; private String levelKey = "alarm_level_config"; @Override public void onMessage(Message msg) { try { String message = null; try { message = new String(msg.getBody(), "utf-8"); } catch (UnsupportedEncodingException e) { logger.warn(e.getMessage()); } Map msgData = JSON.parseObject(message, new TypeReference>() {}); String mac = msgData.get("mac").toString(); String ver = msgData.get("ver").toString(); if(StringUtils.isEmpty(ver) || StringUtils.isEmpty(mac)) { return; } //获取缓存中的校准值map Map adjustMap = redisService.getAdjustsByMac(mac); if(adjustMap!=null&&!adjustMap.isEmpty()) { for (Map.Entry entry : adjustMap.entrySet()) { String key = entry.getKey(); if(msgData.containsKey(key)) { Float value = entry.getValue(); Float dataValue = Float.valueOf(msgData.get(key).toString()) ; msgData.put(key, dataValue + value); } } } // System.out.println(message); // System.out.println(JSON.toJSONString(msgData)); String new_message = JSON.toJSONString(msgData); rabbitTemplate.convertAndSend("monitors_data2", "", new_message.getBytes()); // rabbitTemplate.send("monitors_data2", "", new Message(JSON.toJSONString(msgData).getBytes(), new MessageProperties())); // int state = (new Random()).nextInt(4) % 5; //TODO int state = detEquState(msgData); // rabbitTemplate.convertAndSend("monitors_alarm", "", "{\"mac\": \"" + mac + "\", \"state\": " + state + "}"); //更新设备状态 jobDao.updateStateByMac(mac, state); // //保存原始数据 // taskDao.insertTologger(mac, message, getDate()); mongoTemplate.insert(message, "logger"); //保存历史记录 List histories = new ArrayList(); List sensorList = taskDao.selectFromsensorByver(ver); for (Sensor sensor : sensorList) { String key = sensor.getMac_key(); if(msgData.containsKey(key)) { //taskDao.insertTohistory(mac, sensor.getSensor(), key, msgData.get(key)); History history = new History(); history.setMac(mac); history.setMac_key(key); history.setTime(new Date(Long.parseLong(msgData.get("time").toString()))); history.setSensor(sensor.getSensor()); history.setMac_value(Double.valueOf(msgData.get(key).toString())); histories.add(history); } } jobDao.batchInsertHistory(histories); //mongoTemplate.insert(JSON.toJSONString(histories), "data"); mongoTemplate.insert(new_message, "data"); } catch (Exception e) { //打印错误 logger.error(e.getMessage()); e.printStackTrace(); } } // //三级警报值阀值 // private static Map alarmLevles =new HashMap(); // /* // * 设置三级警报阀值,把map数据结构转换成数组结构 // */ // @Value(value= "system/alarmLevels.json") // public void setAlarmLevles(String path) throws IOException { // org.springframework.core.io.Resource resource = new ClassPathResource(path); // InputStreamReader reader=new InputStreamReader(resource.getInputStream()); // JSONReader jsonReader = new JSONReader(reader); // //三级警报值阀值 // Map> alarmLevleMap= jsonReader.readObject(new TypeReference>>(){}); // //将map转换成数组 // for(String key:alarmLevleMap.keySet()) { // Map levels = alarmLevleMap.get(key); // double level1 = levels.get("level1"); // double level2 = levels.get("level2"); // double level3 = levels.get("level3"); // //当三个值都为零时,不加入阀值 // if(levels.get("enable")!=null&&levels.get("enable")==1) { // //三级警报数组 // Double arr[] = {0.0,level1,level2,level3}; // alarmLevles.put(key, arr); // } // } // } private Map getAlarmLevels() throws IOException { Map> alarmLevleMap = null; Map alarmLevles = new HashMap(); if(RedisUtil.hasKey(redisTemplate, levelKey)) { String levelConfigStr = RedisUtil.get(redisTemplate, levelKey); alarmLevleMap = JSON.parseObject(levelConfigStr, new TypeReference>>() {}); } else { org.springframework.core.io.Resource resource = new ClassPathResource("system/alarmLevels.json"); InputStreamReader reader = new InputStreamReader(resource.getInputStream()); JSONReader jsonReader = new JSONReader(reader); //三级警报值阀值 alarmLevleMap= jsonReader.readObject(new TypeReference>>(){}); } //将map转换成数组 for(String key:alarmLevleMap.keySet()) { Map levels = alarmLevleMap.get(key); double level1 = levels.get("level1"); double level2 = levels.get("level2"); double level3 = levels.get("level3"); //当三个值都为零时,不加入阀值 if(levels.get("enable")!=null&&levels.get("enable")==1) { //三级警报数组 Double arr[] = {0.0,level1,level2,level3}; alarmLevles.put(key, arr); } } return alarmLevles; } //根据三级警报阀值,确定设备状态 private int detEquState(Map data) throws IOException { Map alarmLevles = getAlarmLevels(); int state = 0; Map equMap = new HashMap(); String mac = data.get("mac").toString(); equMap.putAll(data); if(data!=null) { for(String key:alarmLevles.keySet()) { String value = data.get(key).toString(); if(!StringUtils.isEmpty(value)) { double val = Double.parseDouble(value); //获取三级警报阀值 Double[] arr = alarmLevles.get(key); for(int index = arr.length-1;index>0;index--) { if(val>=arr[index]) { //如果当前状态级别更高就取当前状态。否则保持之前状态 state = (index>state)?index:state; String e_key = "level"+Integer.toString(index); Object e_States = equMap.get(e_key); if(e_States!=null&&e_States instanceof List) { ((List)e_States).add(key); }else { e_States = new ArrayList(); equMap.put(e_key, e_States); ((List)e_States).add(key); } equMap.put(e_key, e_States); break; } } } } } equMap.put("state", state);//设备状态 redisService.setEquState(mac,equMap); return state; } }