New file |
| | |
| | | package com.moral.task; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.moral.service.*; |
| | | import com.xxl.job.core.biz.model.ReturnT; |
| | | import com.xxl.job.core.handler.annotation.XxlJob; |
| | | import com.xxl.job.core.log.XxlJobLogger; |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.amqp.core.TopicExchange; |
| | | import org.springframework.amqp.rabbit.core.RabbitTemplate; |
| | | import org.springframework.beans.factory.annotation.Qualifier; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.CollectionUtils; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.time.LocalDateTime; |
| | | import java.time.format.DateTimeFormatter; |
| | | import java.time.temporal.ChronoUnit; |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | | @Component |
| | | public class RabbitMQInsertChangshuHuansiTask { |
| | | private static transient Logger logger = LoggerFactory.getLogger(RabbitMQInsertChangshuHuansiTask.class); |
| | | |
| | | @Resource |
| | | private DeviceService deviceService; |
| | | |
| | | @Resource |
| | | private SensorService sensorService; |
| | | |
| | | @Resource |
| | | private HistoryMinutelyService historyMinutelyService; |
| | | |
| | | @Resource |
| | | private OrganizationRelationService organizationRelationService; |
| | | |
| | | @Resource |
| | | private RabbitTemplate rabbitTemplate; |
| | | |
| | | @Resource |
| | | @Qualifier("organization_data") |
| | | private TopicExchange organization_data; |
| | | |
| | | //分钟数据,时间间隔一分钟 |
| | | @XxlJob("RabbitMQChangshuHuansiMinutely") |
| | | public ReturnT insertRabbitMQMinutely(String params) { |
| | | LocalDateTime time = LocalDateTime.now(); |
| | | int year = time.getYear(); |
| | | int month = time.getMonthValue(); |
| | | int day = time.getDayOfMonth(); |
| | | int hour = time.getHour(); |
| | | int minute = time.getMinute(); |
| | | if (day == 1) { |
| | | if (hour == 0 && minute == 0) { |
| | | if (month == 1) { |
| | | month = 12; |
| | | year = year - 1; |
| | | } else { |
| | | month = month - 1; |
| | | } |
| | | } |
| | | } |
| | | String monthStr = month < 10 ? ("0" + month) : month + ""; |
| | | String yearAndMonth = year + monthStr; |
| | | Map organizationIdMap = JSON.parseObject(params); |
| | | List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId"); |
| | | LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES); |
| | | LocalDateTime startTime = endTime.minusMinutes(1); |
| | | List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList); |
| | | try { |
| | | List<String> macList = deviceService.getMacByOrganizationid(organizationIdList); |
| | | List<String> sensorKeys = sensorService.getSensorKeyByMac(macList); |
| | | List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList); |
| | | Map<String, Object> kv = new LinkedHashMap<>(); |
| | | for (Map<String, Object> map : macAndOrganizationIdMap) { |
| | | kv.put(map.get("mac").toString(), map.get("organizationId")); |
| | | } |
| | | Map<String, Object> devices = new HashMap<>(); |
| | | devices.put("sensorKeys", sensorKeys); |
| | | devices.put("start", startTime); |
| | | devices.put("end", endTime); |
| | | devices.put("macs", macList); |
| | | devices.put("yearAndMonth", yearAndMonth); |
| | | List<Map<String, Object>> minutelyData = historyMinutelyService.getMinutelySensorData(devices); |
| | | XxlJobLogger.log("RabbitMQMinutelyData:" + minutelyData.size()); |
| | | if (!CollectionUtils.isEmpty(minutelyData)) { |
| | | for (Map<String, Object> deviceData : minutelyData) { |
| | | if (!ObjectUtils.isEmpty(deviceData)) { |
| | | Iterator<String> iterator = deviceData.keySet().iterator(); |
| | | while (iterator.hasNext()) { |
| | | String key = iterator.next(); |
| | | if (key.startsWith("M")) { |
| | | iterator.remove(); |
| | | } |
| | | } |
| | | deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))); |
| | | deviceData.put("timeType", "minutely"); |
| | | String mac = deviceData.get("mac").toString(); |
| | | Object o1 = kv.get(mac); |
| | | List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1); |
| | | Map organizationIdMapNew = JSON.parseObject(params); |
| | | List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId"); |
| | | List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList()); |
| | | intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "changshuhuansi" + "." + mac, JSON.toJSONString(deviceData))); |
| | | } |
| | | } |
| | | ReturnT returnT = new ReturnT(200, "RabbitMQ分钟实时数据存入成功"); |
| | | return returnT; |
| | | } |
| | | } catch (Exception e) { |
| | | XxlJobLogger.log("RabbitMQRealtimeMinutelyException:" + e.getMessage()); |
| | | logger.error(e.getMessage()); |
| | | e.printStackTrace(); |
| | | } |
| | | ReturnT returnT = new ReturnT(500, "RabbitMQ分钟实时数据存入失败"); |
| | | return returnT; |
| | | } |
| | | |
| | | } |
| | | |
| | | |