package com.moral.task;
|
|
import com.alibaba.fastjson.JSON;
|
import com.moral.service.DeviceService;
|
import com.moral.service.HistoryMinutelyService;
|
import com.moral.service.OrganizationRelationService;
|
import com.moral.service.SensorService;
|
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
import com.xxl.job.core.log.XxlJobLogger;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.amqp.core.TopicExchange;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.ObjectUtils;
|
|
import javax.annotation.Resource;
|
import java.time.LocalDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.time.temporal.ChronoUnit;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
@Component
|
public class RabbitMQInsertBaiMeiDiKangTask {
|
private static transient Logger logger = LoggerFactory.getLogger(RabbitMQInsertBaiMeiDiKangTask.class);
|
|
@Resource
|
private DeviceService deviceService;
|
|
@Resource
|
private SensorService sensorService;
|
|
@Resource
|
private HistoryMinutelyService historyMinutelyService;
|
|
@Resource
|
private OrganizationRelationService organizationRelationService;
|
|
@Resource
|
private RabbitTemplate rabbitTemplate;
|
|
@Resource
|
@Qualifier("organization_data")
|
private TopicExchange organization_data;
|
|
//分钟数据,时间间隔一分钟
|
@XxlJob("RabbitMQBaiMeiDiKangMinutely")
|
public ReturnT insertRabbitMQMinutely(String params) {
|
LocalDateTime time = LocalDateTime.now();
|
int year = time.getYear();
|
int month = time.getMonthValue();
|
int day = time.getDayOfMonth();
|
int hour = time.getHour();
|
int minute = time.getMinute();
|
if (day == 1) {
|
if (hour == 0 && minute == 0) {
|
if (month == 1) {
|
month = 12;
|
year = year - 1;
|
} else {
|
month = month - 1;
|
}
|
}
|
}
|
String monthStr = month < 10 ? ("0" + month) : month + "";
|
String yearAndMonth = year + monthStr;
|
Map organizationIdMap = JSON.parseObject(params);
|
List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId");
|
LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES);
|
LocalDateTime startTime = endTime.minusMinutes(1);
|
List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList);
|
try {
|
List<String> macList = deviceService.getMacByOrganizationid(organizationIdList);
|
List<String> sensorKeys = sensorService.getSensorKeyByMac(macList);
|
List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList);
|
Map<String, Object> kv = new LinkedHashMap<>();
|
for (Map<String, Object> map : macAndOrganizationIdMap) {
|
kv.put(map.get("mac").toString(), map.get("organizationId"));
|
}
|
Map<String, Object> devices = new HashMap<>();
|
devices.put("sensorKeys", sensorKeys);
|
devices.put("start", startTime);
|
devices.put("end", endTime);
|
devices.put("macs", macList);
|
devices.put("yearAndMonth", yearAndMonth);
|
List<Map<String, Object>> minutelyData = historyMinutelyService.getMinutelySensorData(devices);
|
XxlJobLogger.log("RabbitMQMinutelyData:" + minutelyData.size());
|
if (!CollectionUtils.isEmpty(minutelyData)) {
|
for (Map<String, Object> deviceData : minutelyData) {
|
if (!ObjectUtils.isEmpty(deviceData)) {
|
Iterator<String> iterator = deviceData.keySet().iterator();
|
while (iterator.hasNext()) {
|
String key = iterator.next();
|
if (key.startsWith("M")) {
|
iterator.remove();
|
}
|
}
|
deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
|
deviceData.put("timeType", "minutely");
|
String mac = deviceData.get("mac").toString();
|
Object o1 = kv.get(mac);
|
List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1);
|
Map organizationIdMapNew = JSON.parseObject(params);
|
List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId");
|
List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList());
|
intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "baimeidikang" + "." + mac, JSON.toJSONString(deviceData)));
|
}
|
}
|
ReturnT returnT = new ReturnT(200, "RabbitMQ分钟实时数据存入成功");
|
return returnT;
|
}
|
} catch (Exception e) {
|
XxlJobLogger.log("RabbitMQRealtimeMinutelyException:" + e.getMessage());
|
logger.error(e.getMessage());
|
e.printStackTrace();
|
}
|
ReturnT returnT = new ReturnT(500, "RabbitMQ分钟实时数据存入失败");
|
return returnT;
|
}
|
|
}
|