package com.moral.monitor.listener;
|
|
import java.io.IOException;
|
import java.io.InputStreamReader;
|
import java.io.UnsupportedEncodingException;
|
import java.math.BigDecimal;
|
import java.text.SimpleDateFormat;
|
import java.util.ArrayList;
|
import java.util.Date;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
|
import javax.annotation.Resource;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.core.MessageListener;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.core.io.ClassPathResource;
|
import org.springframework.data.mongodb.core.MongoTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.util.StringUtils;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONReader;
|
import com.alibaba.fastjson.TypeReference;
|
import com.moral.monitor.dao.JobDao;
|
import com.moral.monitor.dao.TaskDao;
|
import com.moral.monitor.entity.History;
|
import com.moral.monitor.entity.Sensor;
|
import com.moral.monitor.service.RedisService;
|
import com.moral.monitor.util.RedisUtil;
|
|
public class TaskListener implements MessageListener {
|
|
@Resource
|
RabbitTemplate rabbitTemplate;
|
|
@Resource
|
JobDao jobDao;
|
|
@Resource
|
TaskDao taskDao;
|
|
@Resource
|
RedisTemplate<String, String> redisTemplate;
|
|
@Resource
|
protected MongoTemplate mongoTemplate;
|
|
private Logger logger = LoggerFactory.getLogger(TaskListener.class);
|
@Resource
|
RedisService redisService;
|
@Override
|
public void onMessage(Message msg) {
|
|
String message = null;
|
try {
|
message = new String(msg.getBody(), "utf-8");
|
} catch (UnsupportedEncodingException e) {
|
logger.warn(e.getMessage());
|
}
|
|
Map<String, String> msgData = JSON.parseObject(message, new TypeReference<Map<String, String>>() {});
|
String mac = msgData.get("mac");
|
String ver = msgData.get("ver");
|
|
if(StringUtils.isEmpty(ver) || StringUtils.isEmpty(mac)) {
|
return;
|
}
|
//获取缓存中的校准值map
|
Map<String, Float> adjustMap = redisService.getAdjustsByMac(mac);
|
if(!adjustMap.isEmpty()) {
|
for (Map.Entry<String, Float> entry : adjustMap.entrySet()) {
|
String key = entry.getKey();
|
if(msgData.containsKey(key)) {
|
Float value = entry.getValue();
|
Float dataValue = Float.valueOf(msgData.get(key)) ;
|
msgData.put(key, String.valueOf(dataValue + value));
|
}
|
}
|
}
|
|
// System.out.println(message);
|
// System.out.println(JSON.toJSONString(msgData));
|
|
String new_message = JSON.toJSONString(msgData);
|
rabbitTemplate.convertAndSend("monitors_data2", "", new_message.getBytes());
|
// rabbitTemplate.send("monitors_data2", "", new Message(JSON.toJSONString(msgData).getBytes(), new MessageProperties()));
|
|
// int state = (new Random()).nextInt(4) % 5; //TODO
|
int state = detEquState(msgData);
|
// rabbitTemplate.convertAndSend("monitors_alarm", "", "{\"mac\": \"" + mac + "\", \"state\": " + state + "}");
|
|
//更新设备状态
|
jobDao.updateStateByMac(mac, state);
|
|
// //保存原始数据
|
// taskDao.insertTologger(mac, message, getDate());
|
mongoTemplate.insert(message, "logger");
|
|
//保存历史记录
|
List<History> histories = new ArrayList<History>();
|
List<Sensor> sensorList = taskDao.selectFromsensorByver(ver);
|
for (Sensor sensor : sensorList) {
|
String key = sensor.getMac_key();
|
if(msgData.containsKey(key)) {
|
//taskDao.insertTohistory(mac, sensor.getSensor(), key, msgData.get(key));
|
History history = new History();
|
history.setMac(mac);
|
history.setMac_key(key);
|
history.setTime(new Date(Long.parseLong(msgData.get("time"))));
|
history.setSensor(sensor.getSensor());
|
history.setMac_value(Double.valueOf(msgData.get(key)));
|
histories.add(history);
|
}
|
}
|
jobDao.batchInsertHistory(histories);
|
|
//mongoTemplate.insert(JSON.toJSONString(histories), "data");
|
mongoTemplate.insert(new_message, "data");
|
}
|
//三级警报值阀值
|
private static Map<String,Double[]> alarmLevles =new HashMap<String, Double[]>();
|
/*
|
* 设置三级警报阀值,把map数据结构转换成数组结构
|
*/
|
@Value(value= "system/alarmLevels.json")
|
public void setAlarmLevles(String path) throws IOException {
|
org.springframework.core.io.Resource resource = new ClassPathResource(path);
|
InputStreamReader reader=new InputStreamReader(resource.getInputStream());
|
JSONReader jsonReader = new JSONReader(reader);
|
//三级警报值阀值
|
Map<String,Map<String,Double>> alarmLevleMap= jsonReader.readObject(new TypeReference<Map<String,Map<String,Double>>>(){});
|
//将map转换成数组
|
for(String key:alarmLevleMap.keySet()) {
|
Map<String,Double> levels = alarmLevleMap.get(key);
|
double level1 = levels.get("level1");
|
double level2 = levels.get("level2");
|
double level3 = levels.get("level3");
|
//当三个值都为零时,不加入阀值
|
if(levels.get("enable")!=null&&levels.get("enable")==1) {
|
//三级警报数组
|
Double arr[] = {0.0,level1,level2,level3};
|
alarmLevles.put(key, arr);
|
}
|
}
|
}
|
//根据三级警报阀值,确定设备状态
|
private int detEquState(Map<String,String> data) {
|
int state = 0;
|
Map<String, Object> equMap = new HashMap<String, Object>();
|
if(data!=null) {
|
for(String key:alarmLevles.keySet()) {
|
String value = data.get(key);
|
if(!StringUtils.isEmpty(value)) {
|
double val = Double.parseDouble(value);
|
//获取三级警报阀值
|
Double[] arr = alarmLevles.get(key);
|
for(int index = arr.length-1;index>0;index--) {
|
if(val>=arr[index]) {
|
//如果当前状态级别更高就取当前状态。否则保持之前状态
|
state = (index>state)?index:state;
|
String e_key = "level"+Integer.toString(index);
|
Object e_States = equMap.get(e_key);
|
if(e_States!=null&&e_States instanceof List) {
|
((List)e_States).add(key);
|
}else {
|
e_States = new ArrayList<String>();
|
equMap.put(e_key, e_States);
|
((List)e_States).add(key);
|
|
}
|
equMap.put(e_key, e_States);
|
break;
|
}
|
}
|
}
|
}
|
}
|
equMap.put("state", state);//设备状态
|
redisService.setEquState(data.get("mac"),equMap);
|
return state;
|
}
|
}
|