|  |  |  | 
|---|
|  |  |  | package com.moral.monitor.listener; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson.JSON; | 
|---|
|  |  |  | import com.alibaba.fastjson.TypeReference; | 
|---|
|  |  |  | import com.moral.monitor.dao.JobDao; | 
|---|
|  |  |  | import com.moral.monitor.dao.TaskDao; | 
|---|
|  |  |  | import com.moral.monitor.entity.History; | 
|---|
|  |  |  | import com.moral.monitor.entity.Sensor; | 
|---|
|  |  |  | import com.moral.monitor.util.RedisUtil; | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | import org.slf4j.LoggerFactory; | 
|---|
|  |  |  | import org.springframework.amqp.core.Message; | 
|---|
|  |  |  | import org.springframework.amqp.core.MessageListener; | 
|---|
|  |  |  | import org.springframework.amqp.core.MessageProperties; | 
|---|
|  |  |  | import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; | 
|---|
|  |  |  | import org.springframework.amqp.rabbit.connection.Connection; | 
|---|
|  |  |  | import org.springframework.amqp.rabbit.core.RabbitTemplate; | 
|---|
|  |  |  | import org.springframework.data.redis.core.RedisTemplate; | 
|---|
|  |  |  | import org.springframework.util.StringUtils; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import javax.annotation.Resource; | 
|---|
|  |  |  | import java.io.UnsupportedEncodingException; | 
|---|
|  |  |  | import java.math.BigDecimal; | 
|---|
|  |  |  | import java.text.SimpleDateFormat; | 
|---|
|  |  |  | import java.util.*; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | public class TaskListener implements MessageListener { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | RabbitTemplate rabbitTemplate; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | JobDao jobDao; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | TaskDao taskDao; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | RedisTemplate<String, String> redisTemplate; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private Logger logger = LoggerFactory.getLogger(TaskListener.class); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void onMessage(Message msg) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | String message = null; | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | message = new String(msg.getBody(), "utf-8"); | 
|---|
|  |  |  | } catch (UnsupportedEncodingException e) { | 
|---|
|  |  |  | logger.warn(e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Map<String, String> msgData = JSON.parseObject(message, new TypeReference<Map<String, String>>() {}); | 
|---|
|  |  |  | String mac = msgData.get("mac"); | 
|---|
|  |  |  | String ver = msgData.get("ver"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if(StringUtils.isEmpty(ver) || StringUtils.isEmpty(mac)) { | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Map<String, Float> adjustMap; | 
|---|
|  |  |  | String adjust_key = "adjust_" + mac; | 
|---|
|  |  |  | if(RedisUtil.hasKey(redisTemplate, adjust_key)) { | 
|---|
|  |  |  | adjustMap = JSON.parseObject(RedisUtil.get(redisTemplate, adjust_key), new TypeReference<Map<String, Float>>() {}); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | adjustMap = getAdjustData(mac); | 
|---|
|  |  |  | RedisUtil.set(redisTemplate, adjust_key, JSON.toJSONString(adjustMap)); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if(!adjustMap.isEmpty()) { | 
|---|
|  |  |  | for (Map.Entry<String, Float> entry : adjustMap.entrySet()) { | 
|---|
|  |  |  | String key = entry.getKey(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | Float value = entry.getValue(); | 
|---|
|  |  |  | Float dataValue = Float.valueOf(msgData.get(key)) ; | 
|---|
|  |  |  | msgData.put(key, String.valueOf(dataValue + value)); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //        System.out.println(message); | 
|---|
|  |  |  | //        System.out.println(JSON.toJSONString(msgData)); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | rabbitTemplate.convertAndSend("monitors_data2", "", JSON.toJSONString(msgData).getBytes()); | 
|---|
|  |  |  | //        rabbitTemplate.send("monitors_data_3", "", new Message(JSON.toJSONString(msgData).getBytes(), new MessageProperties())); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | int state = (new Random()).nextInt(4) % 5; //TODO | 
|---|
|  |  |  | //        rabbitTemplate.convertAndSend("monitors_alarm", "", "{\"mac\": \"" + mac + "\", \"state\": " + state + "}"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //更新设备状态 | 
|---|
|  |  |  | jobDao.updateStateByMac(mac, state); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //保存原始数据 | 
|---|
|  |  |  | taskDao.insertTologger(mac, message, getDate()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //保存历史记录 | 
|---|
|  |  |  | List<History> histories = new ArrayList<History>(); | 
|---|
|  |  |  | List<Sensor> sensorList = taskDao.selectFromsensorByver(ver); | 
|---|
|  |  |  | for (Sensor sensor : sensorList) { | 
|---|
|  |  |  | String key = sensor.getMac_key(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | //taskDao.insertTohistory(mac, sensor.getSensor(), key, msgData.get(key)); | 
|---|
|  |  |  | History history = new History(); | 
|---|
|  |  |  | history.setMac(mac); | 
|---|
|  |  |  | history.setMac_key(key); | 
|---|
|  |  |  | history.setSensor(sensor.getSensor()); | 
|---|
|  |  |  | history.setMac_value(Double.valueOf(msgData.get(key))); | 
|---|
|  |  |  | histories.add(history); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | jobDao.batchInsertHistory(histories); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private Map<String, Float> getAdjustData(String mac) { | 
|---|
|  |  |  | List<Map<String, String>> adjusts = jobDao.findAdjustByMac(mac); | 
|---|
|  |  |  | Map<String, Float> dataMap = new HashMap<String, Float>(); | 
|---|
|  |  |  | for (int i = 0; i < adjusts.size(); i++) { | 
|---|
|  |  |  | Map adjust = adjusts.get(i); | 
|---|
|  |  |  | if(adjust.get("value") != null) { | 
|---|
|  |  |  | String key = String.valueOf(adjust.get("key")); | 
|---|
|  |  |  | String data = String.valueOf(adjust.get("value")); | 
|---|
|  |  |  | BigDecimal value = new BigDecimal(data); | 
|---|
|  |  |  | value.setScale(3, BigDecimal.ROUND_HALF_UP); | 
|---|
|  |  |  | dataMap.put(key, value.floatValue()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return dataMap; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private String getDate() { | 
|---|
|  |  |  | Date d = new Date(); | 
|---|
|  |  |  | SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | 
|---|
|  |  |  | return formatter.format(d); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | package com.moral.monitor.listener; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import java.io.IOException; | 
|---|
|  |  |  | import java.io.InputStreamReader; | 
|---|
|  |  |  | import java.io.UnsupportedEncodingException; | 
|---|
|  |  |  | import java.math.BigDecimal; | 
|---|
|  |  |  | import java.text.SimpleDateFormat; | 
|---|
|  |  |  | import java.util.ArrayList; | 
|---|
|  |  |  | import java.util.Date; | 
|---|
|  |  |  | import java.util.HashMap; | 
|---|
|  |  |  | import java.util.List; | 
|---|
|  |  |  | import java.util.Map; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import javax.annotation.Resource; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import org.slf4j.Logger; | 
|---|
|  |  |  | import org.slf4j.LoggerFactory; | 
|---|
|  |  |  | import org.springframework.amqp.core.Message; | 
|---|
|  |  |  | import org.springframework.amqp.core.MessageListener; | 
|---|
|  |  |  | import org.springframework.amqp.rabbit.core.RabbitTemplate; | 
|---|
|  |  |  | import org.springframework.beans.factory.annotation.Value; | 
|---|
|  |  |  | import org.springframework.core.io.ClassPathResource; | 
|---|
|  |  |  | import org.springframework.data.redis.core.RedisTemplate; | 
|---|
|  |  |  | import org.springframework.util.StringUtils; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | import com.alibaba.fastjson.JSON; | 
|---|
|  |  |  | import com.alibaba.fastjson.JSONReader; | 
|---|
|  |  |  | import com.alibaba.fastjson.TypeReference; | 
|---|
|  |  |  | import com.moral.monitor.dao.JobDao; | 
|---|
|  |  |  | import com.moral.monitor.dao.TaskDao; | 
|---|
|  |  |  | import com.moral.monitor.entity.History; | 
|---|
|  |  |  | import com.moral.monitor.entity.Sensor; | 
|---|
|  |  |  | import com.moral.monitor.util.RedisUtil; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | public class TaskListener implements MessageListener { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | RabbitTemplate rabbitTemplate; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | JobDao jobDao; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | TaskDao taskDao; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Resource | 
|---|
|  |  |  | RedisTemplate<String, String> redisTemplate; | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private Logger logger = LoggerFactory.getLogger(TaskListener.class); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | @Override | 
|---|
|  |  |  | public void onMessage(Message msg) { | 
|---|
|  |  |  |  | 
|---|
|  |  |  | String message = null; | 
|---|
|  |  |  | try { | 
|---|
|  |  |  | message = new String(msg.getBody(), "utf-8"); | 
|---|
|  |  |  | } catch (UnsupportedEncodingException e) { | 
|---|
|  |  |  | logger.warn(e.getMessage()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Map<String, String> msgData = JSON.parseObject(message, new TypeReference<Map<String, String>>() {}); | 
|---|
|  |  |  | String mac = msgData.get("mac"); | 
|---|
|  |  |  | String ver = msgData.get("ver"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if(StringUtils.isEmpty(ver) || StringUtils.isEmpty(mac)) { | 
|---|
|  |  |  | return; | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | Map<String, Float> adjustMap; | 
|---|
|  |  |  | String adjust_key = "adjust_" + mac; | 
|---|
|  |  |  | if(RedisUtil.hasKey(redisTemplate, adjust_key)) { | 
|---|
|  |  |  | adjustMap = JSON.parseObject(RedisUtil.get(redisTemplate, adjust_key), new TypeReference<Map<String, Float>>() {}); | 
|---|
|  |  |  | } else { | 
|---|
|  |  |  | adjustMap = getAdjustData(mac); | 
|---|
|  |  |  | RedisUtil.set(redisTemplate, adjust_key, JSON.toJSONString(adjustMap)); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | if(!adjustMap.isEmpty()) { | 
|---|
|  |  |  | for (Map.Entry<String, Float> entry : adjustMap.entrySet()) { | 
|---|
|  |  |  | String key = entry.getKey(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | Float value = entry.getValue(); | 
|---|
|  |  |  | Float dataValue = Float.valueOf(msgData.get(key)) ; | 
|---|
|  |  |  | msgData.put(key, String.valueOf(dataValue + value)); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //        System.out.println(message); | 
|---|
|  |  |  | //        System.out.println(JSON.toJSONString(msgData)); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | rabbitTemplate.convertAndSend("monitors_data2", "", JSON.toJSONString(msgData).getBytes()); | 
|---|
|  |  |  | //        rabbitTemplate.send("monitors_data_3", "", new Message(JSON.toJSONString(msgData).getBytes(), new MessageProperties())); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //        int state = (new Random()).nextInt(4) % 5; //TODO | 
|---|
|  |  |  | int state = detEquState(msgData); | 
|---|
|  |  |  | //        rabbitTemplate.convertAndSend("monitors_alarm", "", "{\"mac\": \"" + mac + "\", \"state\": " + state + "}"); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //更新设备状态 | 
|---|
|  |  |  | jobDao.updateStateByMac(mac, state); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //保存原始数据 | 
|---|
|  |  |  | taskDao.insertTologger(mac, message, getDate()); | 
|---|
|  |  |  |  | 
|---|
|  |  |  | //保存历史记录 | 
|---|
|  |  |  | List<History> histories = new ArrayList<History>(); | 
|---|
|  |  |  | List<Sensor> sensorList = taskDao.selectFromsensorByver(ver); | 
|---|
|  |  |  | for (Sensor sensor : sensorList) { | 
|---|
|  |  |  | String key = sensor.getMac_key(); | 
|---|
|  |  |  | if(msgData.containsKey(key)) { | 
|---|
|  |  |  | //taskDao.insertTohistory(mac, sensor.getSensor(), key, msgData.get(key)); | 
|---|
|  |  |  | History history = new History(); | 
|---|
|  |  |  | history.setMac(mac); | 
|---|
|  |  |  | history.setMac_key(key); | 
|---|
|  |  |  | history.setSensor(sensor.getSensor()); | 
|---|
|  |  |  | history.setMac_value(Double.valueOf(msgData.get(key))); | 
|---|
|  |  |  | histories.add(history); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | jobDao.batchInsertHistory(histories); | 
|---|
|  |  |  | } | 
|---|
|  |  |  |  | 
|---|
|  |  |  | private Map<String, Float> getAdjustData(String mac) { | 
|---|
|  |  |  | List<Map<String, String>> adjusts = jobDao.findAdjustByMac(mac); | 
|---|
|  |  |  | Map<String, Float> dataMap = new HashMap<String, Float>(); | 
|---|
|  |  |  | for (int i = 0; i < adjusts.size(); i++) { | 
|---|
|  |  |  | Map adjust = adjusts.get(i); | 
|---|
|  |  |  | if(adjust.get("value") != null) { | 
|---|
|  |  |  | String key = String.valueOf(adjust.get("key")); | 
|---|
|  |  |  | String data = String.valueOf(adjust.get("value")); | 
|---|
|  |  |  | BigDecimal value = new BigDecimal(data); | 
|---|
|  |  |  | value.setScale(3, BigDecimal.ROUND_HALF_UP); | 
|---|
|  |  |  | dataMap.put(key, value.floatValue()); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return dataMap; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //三级警报值阀值 | 
|---|
|  |  |  | private static Map<String,Double[]> alarmLevles =new HashMap<String, Double[]>(); | 
|---|
|  |  |  | @Value(value="alarmLevles.json") | 
|---|
|  |  |  | public void setAlarmLevles(String path) throws IOException { | 
|---|
|  |  |  | org.springframework.core.io.Resource resource  = new ClassPathResource(path); | 
|---|
|  |  |  | InputStreamReader reader=new InputStreamReader(resource.getInputStream()); | 
|---|
|  |  |  | JSONReader jsonReader = new JSONReader(reader); | 
|---|
|  |  |  | //三级警报值阀值 | 
|---|
|  |  |  | Map<String,Map<String,Double>> alarmLevleMap= jsonReader.readObject(new TypeReference<Map<String,Map<String,Double>>>(){}); | 
|---|
|  |  |  | //将map转换成数组 | 
|---|
|  |  |  | for(String key:alarmLevleMap.keySet()) { | 
|---|
|  |  |  | Map<String,Double> levels = alarmLevleMap.get(key); | 
|---|
|  |  |  | double level1 = levels.get("level1"); | 
|---|
|  |  |  | double level2 = levels.get("level2"); | 
|---|
|  |  |  | double level3 = levels.get("level3"); | 
|---|
|  |  |  | //三级警报数组 | 
|---|
|  |  |  | Double arr[] = {0.0,level1,level2,level3}; | 
|---|
|  |  |  | alarmLevles.put(key, arr); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //根据三级警报阀值,确定设备状态 | 
|---|
|  |  |  | private int detEquState(Map<String,String> data) { | 
|---|
|  |  |  | int state = 0; | 
|---|
|  |  |  | if(data!=null) { | 
|---|
|  |  |  | for(String key:alarmLevles.keySet()) { | 
|---|
|  |  |  | String value = data.get(key); | 
|---|
|  |  |  | if(!StringUtils.isEmpty(value)) { | 
|---|
|  |  |  | double val = Double.parseDouble(value); | 
|---|
|  |  |  | //获取三级警报阀值 | 
|---|
|  |  |  | Double[] arr = alarmLevles.get(key); | 
|---|
|  |  |  | for(int index = arr.length-1;index>0;index--) { | 
|---|
|  |  |  | if(val>=arr[index]) { | 
|---|
|  |  |  | //如果当前状态级别更高就取当前状态。否则保持之前状态 | 
|---|
|  |  |  | state = (index>state)?index:state; | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | //当前状态到达最大级别不再遍历,跳出循环 | 
|---|
|  |  |  | if(state == (arr.length-1)) { | 
|---|
|  |  |  | break; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|
|  |  |  | return state; | 
|---|
|  |  |  | } | 
|---|
|  |  |  | private String getDate() { | 
|---|
|  |  |  | Date d = new Date(); | 
|---|
|  |  |  | SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); | 
|---|
|  |  |  | return formatter.format(d); | 
|---|
|  |  |  | } | 
|---|
|  |  |  | } | 
|---|