package com.moral.task; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.*; import java.util.stream.Collectors; import javax.annotation.Resource; import com.moral.util.AlarmUtils_2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import com.alibaba.fastjson.JSON; import com.moral.service.DeviceService; import com.moral.service.HistoryDailyService; import com.moral.service.HistoryHourlyService; import com.moral.service.HistoryMinutelyService; import com.moral.service.OrganizationRelationService; import com.moral.service.SensorService; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; @Component public class RabbitMQInsertTask { private static transient Logger logger = LoggerFactory.getLogger(RabbitMQInsertTask.class); @Resource private DeviceService deviceService; @Resource private SensorService sensorService; @Resource private HistoryMinutelyService historyMinutelyService; @Resource private HistoryHourlyService historyHourlyService; @Resource private HistoryDailyService historyDailyService; @Resource private OrganizationRelationService organizationRelationService; @Resource private RabbitTemplate rabbitTemplate; @Resource @Qualifier("organization_data") private TopicExchange organization_data; @XxlJob("RabbitMQRealtimeMinutely") public ReturnT insertRabbitMQRealtimeMinutely(String params) { LocalDateTime time = LocalDateTime.now(); int year = time.getYear(); int month = time.getMonthValue(); int day = time.getDayOfMonth(); int hour = time.getHour(); int minute = time.getMinute(); if (day == 1) { if (hour == 0 && minute == 0) { if (month == 1) { month = 12; year = year - 1; } else { month = month - 1; } } } String monthStr = month < 10 ? ("0" + month) : month + ""; String yearAndMonth = year + monthStr; Map organizationIdMap = JSON.parseObject(params); List parentIdList = (List) organizationIdMap.get("orgId"); LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES); LocalDateTime startTime = endTime.minusMinutes(1); List organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); try { List macList = deviceService.getMacByOrganizationid(organizationIdList); List sensorKeys = sensorService.getSensorKeyByMac(macList); List> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); Map kv = new LinkedHashMap<>(); for (Map map : macAndOrganizationIdMap) { kv.put(map.get("mac").toString(), map.get("organizationId")); } Map devices = new HashMap<>(); devices.put("sensorKeys", sensorKeys); devices.put("start", startTime); devices.put("end", endTime); devices.put("macs", macList); devices.put("yearAndMonth", yearAndMonth); List> realtimeData = historyMinutelyService.getMinutelySensorData(devices); XxlJobLogger.log("RabbitMQRealtimeMinutelyData:" + realtimeData.size()); if (!CollectionUtils.isEmpty(realtimeData)) { for (Map deviceData : realtimeData) { if (!ObjectUtils.isEmpty(deviceData)) { Iterator iterator = deviceData.keySet().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (key.startsWith("M")) { iterator.remove(); } } deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); deviceData.put("timeType", "realtime"); String mac = deviceData.get("mac").toString(); Object o1 = kv.get(mac); List parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); Map organizationIdMapNew = JSON.parseObject(params); List parentIdListNew = (List) organizationIdMapNew.get("orgId"); List intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), parentId + "." + mac, JSON.toJSONString(deviceData))); } } ReturnT returnT = new ReturnT(200, "RabbitMQ分钟实时数据存入成功"); return returnT; } } catch (Exception e) { XxlJobLogger.log("RabbitMQRealtimeMinutelyException:" + e.getMessage()); logger.error(e.getMessage()); e.printStackTrace(); } ReturnT returnT = new ReturnT(500, "RabbitMQ分钟实时数据存入失败"); return returnT; } @XxlJob("RabbitMQRealtimeEveryFiveMinutes") public ReturnT insertRabbitMQRealtimeEveryFiveMinutes(String params) { LocalDateTime time = LocalDateTime.now(); int year = time.getYear(); int month = time.getMonthValue(); int day = time.getDayOfMonth(); int hour = time.getHour(); int minute = time.getMinute(); if (day == 1) { if (hour == 0 && minute == 0) { if (month == 1) { month = 12; year = year - 1; } else { month = month - 1; } } } String monthStr = month < 10 ? ("0" + month) : month + ""; String yearAndMonth = year + monthStr; Map organizationIdMap = JSON.parseObject(params); List parentIdList = (List) organizationIdMap.get("orgId"); LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES); LocalDateTime startTime = endTime.minusMinutes(5); List organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); try { List macList = deviceService.getMacByOrganizationid(organizationIdList); List sensorKeys = sensorService.getSensorKeyByMac(macList); List> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); Map kv = new LinkedHashMap<>(); for (Map map : macAndOrganizationIdMap) { kv.put(map.get("mac").toString(), map.get("organizationId")); } Map devices = new HashMap<>(); devices.put("sensorKeys", sensorKeys); devices.put("start", startTime); devices.put("end", endTime); devices.put("macs", macList); devices.put("yearAndMonth", yearAndMonth); List> realtimeData = historyMinutelyService.getMinutelySensorData(devices); XxlJobLogger.log("RabbitMQRealtimeEveryFiveMinutesData:" + realtimeData.size()); if (!CollectionUtils.isEmpty(realtimeData)) { for (Map deviceData : realtimeData) { if (!ObjectUtils.isEmpty(deviceData)) { Iterator iterator = deviceData.keySet().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (key.startsWith("M")) { iterator.remove(); } } deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); deviceData.put("timeType", "realtime"); String mac = deviceData.get("mac").toString(); Object o1 = kv.get(mac); List parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); Map organizationIdMapNew = JSON.parseObject(params); List parentIdListNew = (List) organizationIdMapNew.get("orgId"); List intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), parentId + "." + mac, JSON.toJSONString(deviceData))); } } ReturnT returnT = new ReturnT(200, "RabbitMQ每5分钟实时数据存入成功"); return returnT; } } catch (Exception e) { XxlJobLogger.log("RabbitMQRealtimeEveryFiveMinutesException:" + e.getMessage()); logger.error(e.getMessage()); e.printStackTrace(); } ReturnT returnT = new ReturnT(500, "RabbitMQ每5分钟实时数据存入失败"); return returnT; } @XxlJob("RabbitMQMinutely") public ReturnT insertRabbitMQMinutely(String params) { LocalDateTime time = LocalDateTime.now(); int year = time.getYear(); int month = time.getMonthValue(); int day = time.getDayOfMonth(); int hour = time.getHour(); int minute = time.getMinute(); if (day == 1) { if (hour == 0 && minute == 0) { if (month == 1) { month = 12; year = year - 1; } else { month = month - 1; } } } String monthStr = month < 10 ? ("0" + month) : month + ""; String yearAndMonth = year + monthStr; Map organizationIdMap = JSON.parseObject(params); List parentIdList = (List) organizationIdMap.get("orgId"); LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES); LocalDateTime startTime = endTime.minusMinutes(1); List organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); try { List macList = deviceService.getMacByOrganizationid(organizationIdList); List sensorKeys = sensorService.getSensorKeyByMac(macList); List> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); Map kv = new LinkedHashMap<>(); for (Map map : macAndOrganizationIdMap) { kv.put(map.get("mac").toString(), map.get("organizationId")); } Map devices = new HashMap<>(); devices.put("sensorKeys", sensorKeys); devices.put("start", startTime); devices.put("end", endTime); devices.put("macs", macList); devices.put("yearAndMonth", yearAndMonth); List> minutelyDataList = historyMinutelyService.getMinutelySensorData(devices); XxlJobLogger.log("RabbitMQMinutelyData:" + minutelyDataList.size()); if (!CollectionUtils.isEmpty(minutelyDataList)) { for (Map deviceData : minutelyDataList) { if (!ObjectUtils.isEmpty(deviceData)) { Map minutelyData = new LinkedHashMap<>(); minutelyData.put("mac", deviceData.get("mac")); minutelyData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); minutelyData.put("timeType", "minutely"); for (String key : deviceData.keySet()) { if (!key.equals("mac") && !key.startsWith("M")) { String date = deviceData.get(key).toString() + "," + deviceData.get("MIN" + key).toString() + "," + deviceData.get("MAX" + key).toString(); minutelyData.put(key, date); } } String mac = minutelyData.get("mac").toString(); Object o1 = kv.get(mac); List parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); Map organizationIdMapNew = JSON.parseObject(params); List parentIdListNew = (List) organizationIdMapNew.get("orgId"); List intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), parentId + "." + mac, JSON.toJSONString(minutelyData))); } } ReturnT returnT = new ReturnT(200, "RabbitMQ分钟数据存入成功"); return returnT; } } catch (Exception e) { XxlJobLogger.log("RabbitMQMinutelyException:" + e.getMessage()); logger.error(e.getMessage()); e.printStackTrace(); } ReturnT returnT = new ReturnT(500, "RabbitMQ分钟数据存入失败"); return returnT; } @XxlJob("RabbitMQHourly") public ReturnT insertRabbitMQHourly(String params) { LocalDateTime time = LocalDateTime.now(); Map organizationIdMap = JSON.parseObject(params); List parentIdList = (List) organizationIdMap.get("orgId"); LocalDateTime endTime = time.truncatedTo(ChronoUnit.HOURS); LocalDateTime startTime = endTime.minusHours(1); List organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); try { List macList = deviceService.getMacByOrganizationid(organizationIdList); List sensorKeys = sensorService.getSensorKeyByMac(macList); List> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); Map kv = new LinkedHashMap<>(); for (Map map : macAndOrganizationIdMap) { kv.put(map.get("mac").toString(), map.get("organizationId")); } Map devices = new HashMap<>(); devices.put("sensorKeys", sensorKeys); devices.put("start", startTime); devices.put("end", endTime); devices.put("macs", macList); List> hourlyDataList = historyHourlyService.getHourlySensorData(devices); XxlJobLogger.log("RabbitMQHourlyData:" + hourlyDataList.size()); if (!CollectionUtils.isEmpty(hourlyDataList)) { if(!(hourlyDataList.size()==macList.size())){ List macList_copy = macList; if (hourlyDataList.size() ignoreMacList = new ArrayList(); ignoreMacList.add("p5dnd7a0392130"); for (String ignoreMac:ignoreMacList) { for (int d=0;d> deviceList = deviceService.getAllByMacList(macList_copy); List de = new ArrayList<>(); for (Map deviceMap:deviceList) { de.add(deviceMap.get("name").toString()); } AlarmUtils_2.sendMail("chenxi18913261648@163.com,1vv_zkk6ji3kln@dingtalk.com,liumiao_love@126.com","离线警报",de.toString()+"设备或许离线!"); } } for (Map deviceData : hourlyDataList) { if (!ObjectUtils.isEmpty(deviceData)) { Map hourlyData = new LinkedHashMap<>(); hourlyData.put("mac", deviceData.get("mac")); hourlyData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); hourlyData.put("timeType", "hourly"); for (String key : deviceData.keySet()) { if (!key.equals("mac") && !key.startsWith("M")) { String date = deviceData.get(key).toString() + "," + deviceData.get("MIN" + key).toString() + "," + deviceData.get("MAX" + key).toString(); hourlyData.put(key, date); } } String mac = hourlyData.get("mac").toString(); Object o1 = kv.get(mac); List parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); Map organizationIdMapNew = JSON.parseObject(params); List parentIdListNew = (List) organizationIdMapNew.get("orgId"); List intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), parentId + "." + mac, JSON.toJSONString(hourlyData))); } } ReturnT returnT = new ReturnT(200, "RabbitMQ小时数据存入成功"); return returnT; } } catch (Exception e) { XxlJobLogger.log("RabbitMQHourlyException:" + e.getMessage()); logger.error(e.getMessage()); e.printStackTrace(); } ReturnT returnT = new ReturnT(500, "RabbitMQ小时数据存入失败"); return returnT; } @XxlJob("RabbitMQDaily") public ReturnT insertRabbitMQDaily(String params) { LocalDateTime time = LocalDateTime.now(); Map organizationIdMap = JSON.parseObject(params); List parentIdList = (List) organizationIdMap.get("orgId"); LocalDateTime endTime = time.truncatedTo(ChronoUnit.DAYS); LocalDateTime startTime = endTime.minusDays(1); List organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); try { List macList = deviceService.getMacByOrganizationid(organizationIdList); List sensorKeys = sensorService.getSensorKeyByMac(macList); List> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); Map kv = new LinkedHashMap<>(); for (Map map : macAndOrganizationIdMap) { kv.put(map.get("mac").toString(), map.get("organizationId")); } Map devices = new HashMap<>(); devices.put("sensorKeys", sensorKeys); devices.put("start", startTime); devices.put("end", endTime); devices.put("macs", macList); List> dailyDataList = historyDailyService.getDailySensorData(devices); XxlJobLogger.log("RabbitMQDailyData:" + dailyDataList.size()); if (!CollectionUtils.isEmpty(dailyDataList)) { for (Map deviceData : dailyDataList) { if (!ObjectUtils.isEmpty(deviceData)) { Map dailyData = new LinkedHashMap<>(); dailyData.put("mac", deviceData.get("mac")); dailyData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); dailyData.put("timeType", "daily"); for (String key : deviceData.keySet()) { if (!key.equals("mac") && !key.startsWith("M")) { String date = deviceData.get(key).toString() + "," + deviceData.get("MIN" + key).toString() + "," + deviceData.get("MAX" + key).toString(); dailyData.put(key, date); } } String mac = dailyData.get("mac").toString(); Object o1 = kv.get(mac); List parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); Map organizationIdMapNew = JSON.parseObject(params); List parentIdListNew = (List) organizationIdMapNew.get("orgId"); List intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), parentId + "." + mac, JSON.toJSONString(dailyData))); } } ReturnT returnT = new ReturnT(200, "RabbitMQ天数据存入成功"); return returnT; } } catch (Exception e) { XxlJobLogger.log("RabbitMQDailyException:" + e.getMessage()); logger.error(e.getMessage()); e.printStackTrace(); } ReturnT returnT = new ReturnT(500, "RabbitMQ天数据存入失败"); return returnT; } }