package com.moral.task;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.moral.entity.History;
|
import com.moral.mapper.HistoryMapper;
|
import com.moral.service.*;
|
import com.xxl.job.core.biz.model.ReturnT;
|
import com.xxl.job.core.handler.annotation.XxlJob;
|
import com.xxl.job.core.log.XxlJobLogger;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.amqp.core.TopicExchange;
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
import org.springframework.beans.factory.annotation.Qualifier;
|
import org.springframework.stereotype.Component;
|
import org.springframework.util.CollectionUtils;
|
import org.springframework.util.ObjectUtils;
|
|
import javax.annotation.Resource;
|
import java.text.SimpleDateFormat;
|
import java.time.LocalDateTime;
|
import java.time.format.DateTimeFormatter;
|
import java.time.temporal.ChronoUnit;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
/**
|
* @program: screen_job
|
* @description: 千灯数据转发到江苏省平台
|
* @author: lizijie
|
* @create: 2022-05-13 08:58
|
**/
|
@Component
|
public class RabbitMQInsertQiandengPark {
|
private static transient Logger logger = LoggerFactory.getLogger(RabbitMQInsertQiandengPark.class);
|
|
@Resource
|
private DeviceService deviceService;
|
|
@Resource
|
private SensorService sensorService;
|
|
@Resource
|
private HistoryMinutelyService historyMinutelyService;
|
|
@Resource
|
private HistoryHourlyService historyHourlyService;
|
|
@Resource
|
private HistoryMapper historyMapper;
|
|
@Resource
|
private OrganizationRelationService organizationRelationService;
|
|
@Resource
|
private RabbitTemplate rabbitTemplate;
|
|
@Resource
|
@Qualifier("organization_data")
|
private TopicExchange organization_data;
|
|
//分钟数据,时间间隔一分钟
|
@XxlJob("RabbitMQQiandengParkMinutely")
|
public ReturnT insertRabbitMQMinutely(String params) {
|
LocalDateTime time = LocalDateTime.now();
|
int year = time.getYear();
|
int month = time.getMonthValue();
|
int day = time.getDayOfMonth();
|
int hour = time.getHour();
|
int minute = time.getMinute();
|
if (day == 1) {
|
if (hour == 0 && minute == 0) {
|
if (month == 1) {
|
month = 12;
|
year = year - 1;
|
} else {
|
month = month - 1;
|
}
|
}
|
}
|
String monthStr = month < 10 ? ("0" + month) : month + "";
|
String yearAndMonth = year + monthStr;
|
Map organizationIdMap = JSON.parseObject(params);
|
List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId");
|
LocalDateTime endTime = time.truncatedTo(ChronoUnit.MINUTES);
|
LocalDateTime startTime = endTime.minusMinutes(1);
|
List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList);
|
try {
|
List<String> macList = deviceService.getMacByOrganizationid(organizationIdList);
|
List<String> sensorKeys = sensorService.getSensorKeyByMac(macList);
|
List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList);
|
Map<String, Object> kv = new LinkedHashMap<>();
|
for (Map<String, Object> map : macAndOrganizationIdMap) {
|
kv.put(map.get("mac").toString(), map.get("organizationId"));
|
}
|
Map<String, Object> devices = new HashMap<>();
|
devices.put("sensorKeys", sensorKeys);
|
devices.put("start", startTime);
|
devices.put("end", endTime);
|
devices.put("macs", macList);
|
devices.put("yearAndMonth", yearAndMonth);
|
List<Map<String, Object>> minutelyData = historyMinutelyService.getMinutelySensorData(devices);
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
|
String yearMonthDay = sdf.format(new Date());
|
History guodu = historyMapper.selectLastDataByMac(yearMonthDay, "jsxlqxpc000001");
|
History baihua = historyMapper.selectLastDataByMac(yearMonthDay, "jsxlqxpc000002");
|
if (!ObjectUtils.isEmpty(guodu)){
|
Map<String,Object> guoduValue = JSONObject.parseObject(guodu.getValue().toString());
|
guoduValue.put("mac","jsxlqxpc000001");
|
minutelyData.add(guoduValue);
|
}
|
if (!ObjectUtils.isEmpty(baihua)){
|
JSONObject baihuaValue = JSONObject.parseObject(baihua.getValue().toString());
|
baihuaValue.put("mac","jsxlqxpc000002");
|
minutelyData.add(baihuaValue);
|
}
|
XxlJobLogger.log("RabbitMQMinutelyData:" + minutelyData.size());
|
if (!CollectionUtils.isEmpty(minutelyData)) {
|
for (Map<String, Object> deviceData : minutelyData) {
|
if (!ObjectUtils.isEmpty(deviceData)) {
|
Iterator<String> iterator = deviceData.keySet().iterator();
|
while (iterator.hasNext()) {
|
String key = iterator.next();
|
if (key.startsWith("M")) {
|
iterator.remove();
|
}
|
}
|
deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
|
deviceData.put("timeType", "minutely");
|
String mac = deviceData.get("mac").toString();
|
Object o1 = kv.get(mac);
|
List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1);
|
Map organizationIdMapNew = JSON.parseObject(params);
|
List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId");
|
List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList());
|
intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "QiandengPark" + "." + mac, JSON.toJSONString(deviceData)));
|
}
|
}
|
ReturnT returnT = new ReturnT(200, "RabbitMQ分钟数据存入成功");
|
return returnT;
|
}
|
} catch (Exception e) {
|
XxlJobLogger.log("RabbitMQMinutelyException:" + e.getMessage());
|
logger.error(e.getMessage());
|
e.printStackTrace();
|
}
|
ReturnT returnT = new ReturnT(500, "RabbitMQ分钟数据存入失败");
|
return returnT;
|
}
|
|
//小时数据时间间隔一小时
|
@XxlJob("RabbitMQQiandengParkHourly")
|
public ReturnT insertRabbitMQHourly(String params) {
|
LocalDateTime time = LocalDateTime.now();
|
Map organizationIdMap = JSON.parseObject(params);
|
List<Integer> parentIdList = (List<Integer>) organizationIdMap.get("orgId");
|
LocalDateTime endTime = time.truncatedTo(ChronoUnit.HOURS);
|
LocalDateTime startTime = endTime.minusHours(1);
|
List<Object> organizationIdList = organizationRelationService.getChildIdByParentId(parentIdList);
|
try {
|
List<String> macList = deviceService.getMacByOrganizationid(organizationIdList);
|
List<String> sensorKeys = sensorService.getSensorKeyByMac(macList);
|
List<Map<String, Object>> macAndOrganizationIdMap = deviceService.macAndOrganizationIdMap(macList);
|
Map<String, Object> kv = new LinkedHashMap<>();
|
for (Map<String, Object> map : macAndOrganizationIdMap) {
|
kv.put(map.get("mac").toString(), map.get("organizationId"));
|
}
|
Map<String, Object> devices = new HashMap<>();
|
devices.put("sensorKeys", sensorKeys);
|
devices.put("start", startTime);
|
devices.put("end", endTime);
|
devices.put("macs", macList);
|
List<Map<String, Object>> minutelyData = historyHourlyService.getHourlySensorData(devices);
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
|
String yearMonthDay = sdf.format(new Date());
|
History guodu = historyMapper.selectLastDataByMac(yearMonthDay, "jsxlqxpc000001");
|
History baihua = historyMapper.selectLastDataByMac(yearMonthDay, "jsxlqxpc000002");
|
if (!ObjectUtils.isEmpty(guodu)){
|
Map<String,Object> guoduValue = JSONObject.parseObject(guodu.getValue().toString());
|
guoduValue.put("mac","jsxlqxpc000001");
|
minutelyData.add(guoduValue);
|
}
|
if (!ObjectUtils.isEmpty(baihua)){
|
JSONObject baihuaValue = JSONObject.parseObject(baihua.getValue().toString());
|
baihuaValue.put("mac","jsxlqxpc000002");
|
minutelyData.add(baihuaValue);
|
}
|
XxlJobLogger.log("RabbitMQHourlyData:" + minutelyData.size());
|
if (!CollectionUtils.isEmpty(minutelyData)) {
|
for (Map<String, Object> deviceData : minutelyData) {
|
if (!ObjectUtils.isEmpty(deviceData)) {
|
Iterator<String> iterator = deviceData.keySet().iterator();
|
while (iterator.hasNext()) {
|
String key = iterator.next();
|
if (key.startsWith("M")) {
|
iterator.remove();
|
}
|
}
|
deviceData.put("time", startTime.format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")));
|
deviceData.put("timeType", "hourly");
|
String mac = deviceData.get("mac").toString();
|
Object o1 = kv.get(mac);
|
List<Integer> parentIds = organizationRelationService.getParentIdListByChildId((Integer) o1);
|
Map organizationIdMapNew = JSON.parseObject(params);
|
List<Integer> parentIdListNew = (List<Integer>) organizationIdMapNew.get("orgId");
|
List<Integer> intersection = parentIdListNew.stream().filter(item -> parentIds.contains(item)).collect(Collectors.toList());
|
intersection.stream().forEach(parentId -> rabbitTemplate.convertAndSend(organization_data.getName(), "QiandengPark" + "." + mac, JSON.toJSONString(deviceData)));
|
}
|
}
|
ReturnT returnT = new ReturnT(200, "RabbitMQ小时数据存入成功");
|
return returnT;
|
}
|
} catch (Exception e) {
|
XxlJobLogger.log("RabbitMQHourlyException:" + e.getMessage());
|
logger.error(e.getMessage());
|
e.printStackTrace();
|
}
|
ReturnT returnT = new ReturnT(500, "RabbitMQ小时数据存入失败");
|
return returnT;
|
}
|
}
|