package com.moral.task; import com.alibaba.fastjson.JSON; import com.moral.service.*; import com.moral.util.AlarmUtils_2; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import com.xxl.job.core.log.XxlJobLogger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import org.springframework.util.ObjectUtils; import javax.annotation.Resource; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; import java.util.*; import java.util.stream.Collectors; @Component public class RabbitMQInsertProvincialIndustrialPark { private static transient Logger logger = LoggerFactory.getLogger(RabbitMQInsertProvincialIndustrialPark.class); @Resource private DeviceService deviceService; @Resource private SensorService sensorService; @Resource private HistoryMinutelyService historyMinutelyService; @Resource private HistoryHourlyService historyHourlyService; @Resource private OrganizationRelationService organizationRelationService; @Resource private RabbitTemplate rabbitTemplate; @Resource @Qualifier("organization_data") private TopicExchange organization_data; //分钟数据,时间间隔一分钟 @XxlJob("RabbitMQProvincialIndustrialParkMinutely") public ReturnT insertRabbitMQMinutely(String params) { LocalDateTime time = LocalDateTime.now(); int year = time.getYear(); int month = time.getMonthValue(); int day = time.getDayOfMonth(); int hour = time.getHour(); int minute = time.getMinute(); if (day == 1) { if (hour == 0 && minute == 0) { if (month == 1) { month = 12; year = year - 1; } else { month = month - 1; } } } String monthStr = month < 10 ? ("0" + month) : month + ""; String yearAndMonth = year + monthStr; Map organizationIdMap = JSON.parseObject(params); List parentIdList = (List) organizationIdMap.get("orgId"); LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES); LocalDateTime startTime = endTime.minusMinutes(1); List organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); try { //List macList = deviceService.getMacByOrganizationid(organizationIdList); List macList = new ArrayList<>(); macList.add("p5dnd7a0391989"); List sensorKeys = sensorService.getSensorKeyByMac(macList); List> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); Map kv = new LinkedHashMap<>(); for (Map map : macAndOrganizationIdMap) { kv.put(map.get("mac").toString(), map.get("organizationId")); } Map devices = new HashMap<>(); devices.put("sensorKeys", sensorKeys); devices.put("start", startTime); devices.put("end", endTime); devices.put("macs", macList); devices.put("yearAndMonth", yearAndMonth); List> minutelyData = historyMinutelyService.getMinutelySensorData(devices); XxlJobLogger.log("RabbitMQMinutelyData:" + minutelyData.size()); if (!CollectionUtils.isEmpty(minutelyData)) { for (Map deviceData : minutelyData) { if (!ObjectUtils.isEmpty(deviceData)) { Iterator iterator = deviceData.keySet().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (key.startsWith("M")) { iterator.remove(); } } deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); deviceData.put("timeType", "minutely"); String mac = deviceData.get("mac").toString(); Object o1 = kv.get(mac); List parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); Map organizationIdMapNew = JSON.parseObject(params); List parentIdListNew = (List) organizationIdMapNew.get("orgId"); List intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "ProvincialIndustrialPark" + "." + mac, JSON.toJSONString(deviceData))); } } ReturnT returnT = new ReturnT(200, "RabbitMQ分钟数据存入成功"); return returnT; } } catch (Exception e) { XxlJobLogger.log("RabbitMQMinutelyException:" + e.getMessage()); logger.error(e.getMessage()); e.printStackTrace(); } ReturnT returnT = new ReturnT(500, "RabbitMQ分钟数据存入失败"); return returnT; } //小时数据时间间隔一小时 @XxlJob("RabbitMQProvincialIndustrialParkHourly") public ReturnT insertRabbitMQHourly(String params) { LocalDateTime time = LocalDateTime.now(); Map organizationIdMap = JSON.parseObject(params); List parentIdList = (List) organizationIdMap.get("orgId"); LocalDateTime endTime = time.truncatedTo(ChronoUnit.HOURS); LocalDateTime startTime = endTime.minusHours(1); List organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); try { //List macList = deviceService.getMacByOrganizationid(organizationIdList); List macList = new ArrayList<>(); macList.add("p5dnd7a0391989"); List sensorKeys = sensorService.getSensorKeyByMac(macList); List> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); Map kv = new LinkedHashMap<>(); for (Map map : macAndOrganizationIdMap) { kv.put(map.get("mac").toString(), map.get("organizationId")); } Map devices = new HashMap<>(); devices.put("sensorKeys", sensorKeys); devices.put("start", startTime); devices.put("end", endTime); devices.put("macs", macList); List> minutelyData = historyHourlyService.getHourlySensorData(devices); XxlJobLogger.log("RabbitMQHourlyData:" + minutelyData.size()); if (!CollectionUtils.isEmpty(minutelyData)) { for (Map deviceData : minutelyData) { if (!ObjectUtils.isEmpty(deviceData)) { Iterator iterator = deviceData.keySet().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (key.startsWith("M")) { iterator.remove(); } } deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); deviceData.put("timeType", "hourly"); String mac = deviceData.get("mac").toString(); Object o1 = kv.get(mac); List parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); Map organizationIdMapNew = JSON.parseObject(params); List parentIdListNew = (List) organizationIdMapNew.get("orgId"); List intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "ProvincialIndustrialPark" + "." + mac, JSON.toJSONString(deviceData))); } } ReturnT returnT = new ReturnT(200, "RabbitMQ小时数据存入成功"); return returnT; } } catch (Exception e) { XxlJobLogger.log("RabbitMQHourlyException:" + e.getMessage()); logger.error(e.getMessage()); e.printStackTrace(); } ReturnT returnT = new ReturnT(500, "RabbitMQ小时数据存入失败"); return returnT; } }