package com.moral.task;
|
|
import com.alibaba.fastjson.JSON;
|
import com.moral.service.*;
|
import com.moral.util.AlarmUtils_2;
|
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
import com.xxl.job.core.log.XxlJobLogger;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.amqp.core.TopicExchange;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.ObjectUtils;
|
|
import javax.annotation.Resource;
|
import java.time.LocalDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.time.temporal.ChronoUnit;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
@Component
|
public class RabbitMQInsertJinTanTask {
|
private static transient Logger logger = LoggerFactory.getLogger(RabbitMQInsertJinTanTask.class);
|
|
@Resource
|
private DeviceService deviceService;
|
|
@Resource
|
private SensorService sensorService;
|
|
@Resource
|
private HistoryMinutelyService historyMinutelyService;
|
|
@Resource
|
private HistoryHourlyService historyHourlyService;
|
|
@Resource
|
private HistoryDailyService historyDailyService;
|
|
@Resource
|
private OrganizationRelationService organizationRelationService;
|
|
@Resource
|
private RabbitTemplate rabbitTemplate;
|
|
@Resource
|
@Qualifier("organization_data")
|
private TopicExchange organization_data;
|
|
//实时数据,时间间隔1分钟
|
@XxlJob("RabbitMQJintanRealtimeMinutely")
|
public ReturnT insertRabbitMQRealtimeMinutely(String params) {
|
LocalDateTime time = LocalDateTime.now();
|
int year = time.getYear();
|
int month = time.getMonthValue();
|
int day = time.getDayOfMonth();
|
int hour = time.getHour();
|
int minute = time.getMinute();
|
if (day == 1) {
|
if (hour == 0 && minute == 0) {
|
if (month == 1) {
|
month = 12;
|
year = year - 1;
|
} else {
|
month = month - 1;
|
}
|
}
|
}
|
String monthStr = month < 10 ? ("0" + month) : month + "";
|
String yearAndMonth = year + monthStr;
|
Map organizationIdMap = JSON.parseObject(params);
|
List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId");
|
LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES);
|
LocalDateTime startTime = endTime.minusMinutes(1);
|
List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList);
|
try {
|
List<String> macList = deviceService.getMacByOrganizationid(organizationIdList);
|
List<String> sensorKeys = sensorService.getSensorKeyByMac(macList);
|
List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList);
|
Map<String, Object> kv = new LinkedHashMap<>();
|
for (Map<String, Object> map : macAndOrganizationIdMap) {
|
kv.put(map.get("mac").toString(), map.get("organizationId"));
|
}
|
Map<String, Object> devices = new HashMap<>();
|
devices.put("sensorKeys", sensorKeys);
|
devices.put("start", startTime);
|
devices.put("end", endTime);
|
devices.put("macs", macList);
|
devices.put("yearAndMonth", yearAndMonth);
|
List<Map<String, Object>> realtimeData = historyMinutelyService.getMinutelySensorData(devices);
|
XxlJobLogger.log("RabbitMQRealtimeMinutelyData:" + realtimeData.size());
|
if (!CollectionUtils.isEmpty(realtimeData)) {
|
for (Map<String, Object> deviceData : realtimeData) {
|
if (!ObjectUtils.isEmpty(deviceData)) {
|
Iterator<String> iterator = deviceData.keySet().iterator();
|
while (iterator.hasNext()) {
|
String key = iterator.next();
|
if (key.startsWith("M")) {
|
iterator.remove();
|
}
|
}
|
deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
|
deviceData.put("timeType", "realtime");
|
String mac = deviceData.get("mac").toString();
|
Object o1 = kv.get(mac);
|
List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1);
|
Map organizationIdMapNew = JSON.parseObject(params);
|
List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId");
|
List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList());
|
intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "jintan" + "." + mac, JSON.toJSONString(deviceData)));
|
}
|
}
|
ReturnT returnT = new ReturnT(200, "RabbitMQ分钟实时数据存入成功");
|
return returnT;
|
}
|
} catch (Exception e) {
|
XxlJobLogger.log("RabbitMQRealtimeMinutelyException:" + e.getMessage());
|
logger.error(e.getMessage());
|
e.printStackTrace();
|
}
|
ReturnT returnT = new ReturnT(500, "RabbitMQ分钟实时数据存入失败");
|
return returnT;
|
}
|
|
//分钟数据,时间间隔一分钟
|
@XxlJob("RabbitMQJintanMinutely")
|
public ReturnT insertRabbitMQMinutely(String params) {
|
LocalDateTime time = LocalDateTime.now();
|
int year = time.getYear();
|
int month = time.getMonthValue();
|
int day = time.getDayOfMonth();
|
int hour = time.getHour();
|
int minute = time.getMinute();
|
if (day == 1) {
|
if (hour == 0 && minute == 0) {
|
if (month == 1) {
|
month = 12;
|
year = year - 1;
|
} else {
|
month = month - 1;
|
}
|
}
|
}
|
String monthStr = month < 10 ? ("0" + month) : month + "";
|
String yearAndMonth = year + monthStr;
|
Map organizationIdMap = JSON.parseObject(params);
|
List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId");
|
LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES);
|
LocalDateTime startTime = endTime.minusMinutes(1);
|
List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList);
|
try {
|
List<String> macList = deviceService.getMacByOrganizationid(organizationIdList);
|
List<String> sensorKeys = sensorService.getSensorKeyByMac(macList);
|
List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList);
|
Map<String, Object> kv = new LinkedHashMap<>();
|
for (Map<String, Object> map : macAndOrganizationIdMap) {
|
kv.put(map.get("mac").toString(), map.get("organizationId"));
|
}
|
Map<String, Object> devices = new HashMap<>();
|
devices.put("sensorKeys", sensorKeys);
|
devices.put("start", startTime);
|
devices.put("end", endTime);
|
devices.put("macs", macList);
|
devices.put("yearAndMonth", yearAndMonth);
|
List<Map<String, Object>> minutelyData = historyMinutelyService.getMinutelySensorData(devices);
|
XxlJobLogger.log("RabbitMQMinutelyData:" + minutelyData.size());
|
if (!CollectionUtils.isEmpty(minutelyData)) {
|
for (Map<String, Object> deviceData : minutelyData) {
|
if (!ObjectUtils.isEmpty(deviceData)) {
|
Iterator<String> iterator = deviceData.keySet().iterator();
|
while (iterator.hasNext()) {
|
String key = iterator.next();
|
if (key.startsWith("M")) {
|
iterator.remove();
|
}
|
}
|
deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
|
deviceData.put("timeType", "minutely");
|
String mac = deviceData.get("mac").toString();
|
Object o1 = kv.get(mac);
|
List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1);
|
Map organizationIdMapNew = JSON.parseObject(params);
|
List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId");
|
List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList());
|
intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "jintan" + "." + mac, JSON.toJSONString(deviceData)));
|
}
|
}
|
ReturnT returnT = new ReturnT(200, "RabbitMQ分钟实时数据存入成功");
|
return returnT;
|
}
|
} catch (Exception e) {
|
XxlJobLogger.log("RabbitMQRealtimeMinutelyException:" + e.getMessage());
|
logger.error(e.getMessage());
|
e.printStackTrace();
|
}
|
ReturnT returnT = new ReturnT(500, "RabbitMQ分钟实时数据存入失败");
|
return returnT;
|
}
|
|
@XxlJob("RabbitMQJintanHourly")
|
public ReturnT insertRabbitMQHourly(String params) {
|
LocalDateTime time = LocalDateTime.now();
|
Map organizationIdMap = JSON.parseObject(params);
|
List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId");
|
LocalDateTime endTime = time.truncatedTo(ChronoUnit.HOURS);
|
LocalDateTime startTime = endTime.minusHours(1);
|
List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList);
|
try {
|
List<String> macList = deviceService.getMacByOrganizationid(organizationIdList);
|
List<String> sensorKeys = sensorService.getSensorKeyByMac(macList);
|
List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList);
|
Map<String, Object> kv = new LinkedHashMap<>();
|
for (Map<String, Object> map : macAndOrganizationIdMap) {
|
kv.put(map.get("mac").toString(), map.get("organizationId"));
|
}
|
Map<String, Object> devices = new HashMap<>();
|
devices.put("sensorKeys", sensorKeys);
|
devices.put("start", startTime);
|
devices.put("end", endTime);
|
devices.put("macs", macList);
|
List<Map<String, Object>> hourlyDataList = historyHourlyService.getHourlySensorData(devices);
|
XxlJobLogger.log("RabbitMQHourlyData:" + hourlyDataList.size());
|
if (!CollectionUtils.isEmpty(hourlyDataList)) {
|
if(!(hourlyDataList.size()==macList.size())){
|
List<String> macList_copy = macList;
|
if (hourlyDataList.size()<macList.size()){
|
for (Map hourData: hourlyDataList) {
|
for (int k=0;k<macList_copy.size();k++){
|
if (hourData.get("mac").equals(macList_copy.get(k))){
|
macList_copy.remove(k);
|
}
|
}
|
}
|
}
|
List<String> ignoreMacList = new ArrayList();
|
ignoreMacList.add("p5dnd7a0392130");
|
for (String ignoreMac:ignoreMacList) {
|
for (int d=0;d<macList_copy.size();d++){
|
if (macList_copy.get(d).equals(ignoreMac)){
|
macList_copy.remove(d);
|
}
|
}
|
}
|
if (macList_copy.size()!=0){
|
List<Map<String, Object>> deviceList = deviceService.getAllByMacList(macList_copy);
|
List<String> de = new ArrayList<>();
|
for (Map deviceMap:deviceList) {
|
de.add(deviceMap.get("name").toString());
|
}
|
AlarmUtils_2.sendMail("276999030@qq.com,1vv_zkk6ji3kln@dingtalk.com,liumiao_love@126.com","离线警报",de.toString()+"设备或许离线!");
|
}
|
}
|
for (Map<String, Object> deviceData : hourlyDataList) {
|
if (!ObjectUtils.isEmpty(deviceData)) {
|
Map<String, Object> hourlyData = new LinkedHashMap<>();
|
hourlyData.put("mac", deviceData.get("mac"));
|
hourlyData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
|
hourlyData.put("timeType", "hourly");
|
for (String key : deviceData.keySet()) {
|
if (!key.equals("mac") && !key.startsWith("M")) {
|
String date = deviceData.get(key).toString() + "," + deviceData.get("MIN" + key).toString() + "," + deviceData.get("MAX" + key).toString();
|
hourlyData.put(key, date);
|
}
|
}
|
String mac = hourlyData.get("mac").toString();
|
Object o1 = kv.get(mac);
|
List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1);
|
Map organizationIdMapNew = JSON.parseObject(params);
|
List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId");
|
List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList());
|
intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "jintan" + "." + mac, JSON.toJSONString(hourlyData)));
|
}
|
}
|
ReturnT returnT = new ReturnT(200, "RabbitMQ小时数据存入成功");
|
return returnT;
|
}
|
} catch (Exception e) {
|
XxlJobLogger.log("RabbitMQHourlyException:" + e.getMessage());
|
logger.error(e.getMessage());
|
e.printStackTrace();
|
}
|
ReturnT returnT = new ReturnT(500, "RabbitMQ小时数据存入失败");
|
return returnT;
|
}
|
|
@XxlJob("RabbitMQJintanDaily")
|
public ReturnT insertRabbitMQDaily(String params) {
|
LocalDateTime time = LocalDateTime.now();
|
Map organizationIdMap = JSON.parseObject(params);
|
List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId");
|
LocalDateTime endTime = time.truncatedTo(ChronoUnit.DAYS);
|
LocalDateTime startTime = endTime.minusDays(1);
|
List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList);
|
try {
|
List<String> macList = deviceService.getMacByOrganizationid(organizationIdList);
|
List<String> sensorKeys = sensorService.getSensorKeyByMac(macList);
|
List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList);
|
Map<String, Object> kv = new LinkedHashMap<>();
|
for (Map<String, Object> map : macAndOrganizationIdMap) {
|
kv.put(map.get("mac").toString(), map.get("organizationId"));
|
}
|
Map<String, Object> devices = new HashMap<>();
|
devices.put("sensorKeys", sensorKeys);
|
devices.put("start", startTime);
|
devices.put("end", endTime);
|
devices.put("macs", macList);
|
List<Map<String, Object>> dailyDataList = historyDailyService.getDailySensorData(devices);
|
XxlJobLogger.log("RabbitMQDailyData:" + dailyDataList.size());
|
if (!CollectionUtils.isEmpty(dailyDataList)) {
|
for (Map<String, Object> deviceData : dailyDataList) {
|
if (!ObjectUtils.isEmpty(deviceData)) {
|
Map<String, Object> dailyData = new LinkedHashMap<>();
|
dailyData.put("mac", deviceData.get("mac"));
|
dailyData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
|
dailyData.put("timeType", "daily");
|
for (String key : deviceData.keySet()) {
|
if (!key.equals("mac") && !key.startsWith("M")) {
|
String date = deviceData.get(key).toString() + "," + deviceData.get("MIN" + key).toString() + "," + deviceData.get("MAX" + key).toString();
|
dailyData.put(key, date);
|
}
|
}
|
String mac = dailyData.get("mac").toString();
|
Object o1 = kv.get(mac);
|
List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1);
|
Map organizationIdMapNew = JSON.parseObject(params);
|
List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId");
|
List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList());
|
intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "jintan" + "." + mac, JSON.toJSONString(dailyData)));
|
}
|
}
|
ReturnT returnT = new ReturnT(200, "RabbitMQ天数据存入成功");
|
return returnT;
|
}
|
} catch (Exception e) {
|
XxlJobLogger.log("RabbitMQDailyException:" + e.getMessage());
|
logger.error(e.getMessage());
|
e.printStackTrace();
|
}
|
ReturnT returnT = new ReturnT(500, "RabbitMQ天数据存入失败");
|
return returnT;
|
}
|
|
}
|