jinpengyong
2021-07-01 fdc8e1e781851904b05f234523f372c8c1098b69
定时任务CO.O3算法
6 files added
7 files modified
567 ■■■■■ changed files
screen-common/src/main/java/com/moral/constant/Constants.java 13 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/KafkaConstants.java 11 ●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/util/AmendUtils.java 168 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/util/DateUtils.java 8 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/HistoryWeekly.java 44 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryWeeklyMapper.java 21 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/HistoryWeeklyService.java 18 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java 57 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java 28 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryWeeklyServiceImpl.java 147 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java 26 ●●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryWeeklyMapper.xml 20 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java 6 ●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/Constants.java
@@ -139,5 +139,18 @@
     * */
    public static final String SENSOR_CODE_TEMP = "a01001";
    /*
     * 风向code
     * */
    public static final String SENSOR_CODE_WIND_DIR = "a01008";
    /*
     * 风速code
     * */
    public static final String SENSOR_CODE_WIND_SPEED = "a01008";
    /*
     * 一氧化碳code
     * */
    public static final String SENSOR_CODE_CO= "a21005";
}
screen-common/src/main/java/com/moral/constant/KafkaConstants.java
@@ -18,12 +18,17 @@
    public static final String TOPIC_SECOND = "second_data";
    /**
     * 用于将数据存入数据库的消费组
     * 小时主题消费组
     */
    public static final String GROUP_ID_INSERT = "insert";
    public static final String GROUP_HOUR = "hour";
    /**
     * 分钟主题消费组
     */
    public static final String GROUP_MINUTE = "minute";
    /**
     * 用于判断设备状态消费组
     */
    public static final String GROUP_ID_STATE = "state";
    public static final String GROUP_STATE = "state";
}
screen-common/src/main/java/com/moral/util/AmendUtils.java
New file
@@ -0,0 +1,168 @@
package com.moral.util;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSONObject;
import com.moral.constant.Constants;
public class AmendUtils {
    /**
     * @param value 需要科学计算的数据
     * @param digit 保留的小数位
     * @return 功能:四舍六入五成双计算法
     */
    public static double sciCal(double value, int digit) {
        String result;
        try {
            double ratio = Math.pow(10, digit);
            double _num = value * ratio;
            double mod = _num % 1;
            double integer = Math.floor(_num);
            double returnNum;
            if (mod > 0.5) {
                returnNum = (integer + 1) / ratio;
            } else if (mod < 0.5) {
                returnNum = integer / ratio;
            } else {
                returnNum = (integer % 2 == 0 ? integer : integer + 1) / ratio;
            }
            BigDecimal bg = new BigDecimal(returnNum);
            result = bg.setScale(digit, BigDecimal.ROUND_HALF_UP).toString();
        } catch (RuntimeException e) {
            throw e;
        }
        return Double.parseDouble(result);
    }
    /**
     * @param list 参数,[value={"O3":12},.....]
     * @return 功能:臭氧日均值计算
     */
    public static Object getO3AvgOfDay(List<Map<String, Object>> list) {
        double max;
        List<Double> avgs = new ArrayList<>();
        for (int i = 8; i <= 24; i++) {
            List<Double> data = new ArrayList<>();
            for (Map<String, Object> dataMap : list) {
                Map<String, Object> dataValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
                Double o3 = Double.parseDouble(dataValue.get(Constants.SENSOR_CODE_O3).toString());
                int hour = DateUtils.getHour((Date) dataMap.get("time"));
                if (hour == 0) {
                    hour = 24;
                }
                if (hour <= i && hour >= i - 7) {
                    data.add(o3);
                }
            }
            if (data.size() < 6) {
                continue;
            }
            double average = data.stream().mapToDouble(aDouble -> aDouble).summaryStatistics().getAverage();
            avgs.add(average);
        }
        max = avgs.stream().mapToDouble(aDouble -> aDouble).summaryStatistics().getMax();
        if (avgs.size() < 14) {
            if (max < 160d) {
                return null;
            }
        }
        return sciCal(max, 4);
    }
    /**
     * @param list 参数,[value={"风速":12,”风向“:200},.....]
     * @return 功能:风向均值计算
     */
    public static Object getWindDirAvg(List<Map<String, Object>> list) {
        double avgDir;
        double sumSin = 0d;
        double sumCos = 0d;
        int size = 0;
        for (Map<String, Object> map : list) {
            Map<String, Object> dataValue = JSONObject.parseObject((String) map.get("value"), Map.class);
            Object wind = dataValue.get(Constants.SENSOR_CODE_WIND_DIR);
            Object speed = dataValue.get(Constants.SENSOR_CODE_WIND_SPEED);
            if (wind == null || speed == null) {
                continue;
            }
            size++;
            int windDir = Integer.parseInt(wind.toString());
            double windSpeed = Double.parseDouble(speed.toString());
            double sin = windSpeed * Math.sin(windDir / 180d) * Math.PI;
            double cos = windSpeed * Math.cos(windDir / 180d) * Math.PI;
            sumSin += sin;
            sumCos += cos;
        }
        if (size == 0) {
            return null;
        }
        double avgSin = sumSin / size;
        double avgCos = sumCos / size;
        if (avgSin > 0 && avgCos > 0) {
            avgDir = Math.atan(avgSin / avgCos) * 180 / Math.PI;
        } else if ((avgSin > 0 && avgCos < 0) || (avgSin < 0 && avgCos < 0)) {
            avgDir = Math.atan(avgSin / avgCos) * 180 / Math.PI + 180;
        } else {
            avgDir = Math.atan(avgSin / avgCos) * 180 / Math.PI + 360;
        }
        return sciCal(avgDir, 4);
    }
    /**
     * @param data 需要求百分位的数据
     * @param p    百分位,例:95百分位,p=95
     * @return 功能:百分位数计算
     */
    public static double percentile(List<Double> data, int p) {
        int n = data.size();
        Collections.sort(data);
        double v = n / (100 / p);
        System.out.println(n % (100 / p));
        if (n % (100 / p) == 0) {
            if (v == n) {
                return data.get(n - 1);
            }
            return (data.get((int) v - 1) + data.get((int) v)) / 2;
        }
        return sciCal(data.get((int) v), 4);
    }
    public static Object getCOAvgOfWeek(List<Map<String, Object>> list) {
        List<Double> data = new ArrayList<>();
        for (Map<String, Object> dataMap : list) {
            Map<String, Object> dataValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
            Object o = dataValue.get(Constants.SENSOR_CODE_CO);
            if (o == null) {
                continue;
            }
            Double co = Double.parseDouble(o.toString());
            data.add(co);
        }
        if (data.size() == 0) {
            return null;
        }
        return percentile(data, 95);
    }
    public static Object getO3AvgOfWeek(List<Map<String, Object>> list) {
        List<Double> data = new ArrayList<>();
        for (Map<String, Object> dataMap : list) {
            Map<String, Object> dataValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
            Object o = dataValue.get(Constants.SENSOR_CODE_O3);
            if (o == null) {
                continue;
            }
            Double o3 = Double.parseDouble(o.toString());
            data.add(o3);
        }
        if (data.size() == 0) {
            return null;
        }
        return percentile(data, 90);
    }
}
screen-common/src/main/java/com/moral/util/DateUtils.java
@@ -1310,4 +1310,12 @@
            throw new RuntimeException(e);
        }
    }
    //获取上周一
    public static Date geLastWeekMonday() {
        Calendar cal = Calendar.getInstance();
        cal.setTime(getDate(getMondayOfThisWeek(),yyyy_MM_dd_EN) );
        cal.add(Calendar.DATE, -7);
        return cal.getTime();
    }
}
screen-job/src/main/java/com/moral/api/entity/HistoryWeekly.java
New file
@@ -0,0 +1,44 @@
package com.moral.api.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import java.time.LocalDateTime;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
 * <p>
 * 周数据
 * </p>
 *
 * @author moral
 * @since 2021-06-30
 */
@Data
@EqualsAndHashCode(callSuper = false)
public class HistoryWeekly extends Model<HistoryWeekly> {
    private static final long serialVersionUID = 1L;
    /**
     * 设备mac
     */
    private String mac;
    /**
     * 时间
     */
    private LocalDateTime time;
    /**
     * 数据
     */
    private String value;
    @Override
    protected Serializable pkVal() {
        return null;
    }
}
screen-job/src/main/java/com/moral/api/mapper/HistoryWeeklyMapper.java
New file
@@ -0,0 +1,21 @@
package com.moral.api.mapper;
import java.util.List;
import java.util.Map;
import com.moral.api.entity.HistoryWeekly;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
 * <p>
 * 周数据 Mapper 接口
 * </p>
 *
 * @author moral
 * @since 2021-06-30
 */
public interface HistoryWeeklyMapper extends BaseMapper<HistoryWeekly> {
    void insertHistoryWeekly(List<Map<String,Object>> list);
}
screen-job/src/main/java/com/moral/api/service/HistoryWeeklyService.java
New file
@@ -0,0 +1,18 @@
package com.moral.api.service;
import com.moral.api.entity.HistoryWeekly;
import com.baomidou.mybatisplus.extension.service.IService;
/**
 * <p>
 * 周数据 服务类
 * </p>
 *
 * @author moral
 * @since 2021-06-30
 */
public interface HistoryWeeklyService extends IService<HistoryWeekly> {
    void insertHistoryWeekly();
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java
@@ -11,6 +11,7 @@
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.util.AmendUtils;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -46,10 +47,8 @@
    @Autowired
    private HistoryHourlyService historyHourlyService;
    @Override
    public void insertHistoryDaily() {
        String format = DateUtils.yyyy_MM_dd_EN;
        Date now = new Date();
        //开始时间
@@ -82,14 +81,20 @@
            List<Map<String, Object>> tempValue = new ArrayList<>(value);
            value.removeIf(map -> ((Date) map.get("time")).getTime() == start.getTime());
            //臭氧8小时滑动值
            double o3AvgOfDay = getO3AvgOfDay(value);
            jsonMap.put(Constants.SENSOR_CODE_O3, o3AvgOfDay);
            System.out.println("臭氧==" + value);
            //除臭氧外日均值计算
            //臭氧8小时滑动平均值计算并修约
            Object o3AvgOfDay = AmendUtils.getO3AvgOfDay(value);
            if (o3AvgOfDay != null) {
                jsonMap.put(Constants.SENSOR_CODE_O3, o3AvgOfDay);
            }
            //除臭氧外其他因子均值计算
            tempValue.removeIf(map -> ((Date) map.get("time")).getTime() == end.getTime());
            System.out.println("其他因子==" + tempValue);
            //风向均值计算并修约
            Object windDirAvg = AmendUtils.getWindDirAvg(value);
            if (windDirAvg != null) {
                jsonMap.put(Constants.SENSOR_CODE_WIND_DIR, windDirAvg);
            }
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = tempValue.parallelStream()
                        .flatMapToDouble(v -> {
@@ -104,41 +109,19 @@
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    jsonMap.put(sensorCode.toString(), Double.parseDouble(String.format("%.4f", optionalDouble.getAsDouble())));
                    //银行家算法修约
                    double sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 4);
                    jsonMap.put(sensorCode.toString(), sciCal);
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        //臭氧8小时滑动值
        System.out.println(insertData);
        //存入数据库
        historyDailyMapper.insertHistoryDaily(insertData);
    }
    //臭氧8小时滑动值
    private double getO3AvgOfDay(List<Map<String, Object>> list) {
        double max = 0d;
        for (int i = 8; i <= 24; i++) {
            List<Double> data = new ArrayList<>();
            for (Map<String, Object> dataMap : list) {
                Map<String, Object> dataValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
                Double o3 = Double.parseDouble(dataValue.get(Constants.SENSOR_CODE_O3).toString());
                int hour = DateUtils.getHour((Date) dataMap.get("time"));
                if (hour == 0) {
                    hour = 24;
                }
                if (hour <= i && hour >= i - 7) {
                    data.add(o3);
                }
            }
            double average = data.stream().mapToDouble(aDouble -> aDouble).summaryStatistics().getAverage();
            if (max < average) {
                max = average;
            }
        }
        return Double.parseDouble(String.format("%.4f", max));
    }
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
@@ -148,7 +131,7 @@
        list.add(18);//4
        list.add(16);//5
        list.add(22);//6
        list.add(4);//8
        list.add(4);//7
        list.add(12);
        list.add(28);
        list.add(26);
@@ -165,7 +148,7 @@
        list.add(12);
        list.add(10);
        list.add(5);
        list.add(88);
        list.add(88);//24
        Integer max = 0;
        for (int i = 7; i < list.size(); i++) {
            Integer sum = 0;
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java
@@ -21,6 +21,7 @@
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.util.AmendUtils;
import com.moral.util.DateUtils;
@Service
@@ -72,9 +73,17 @@
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            Map<String, Object> jsonMap = new HashMap<>();
            dataMap.put("mac", key);
            dataMap.put("time", end);
            Map<String, Object> jsonMap = new HashMap<>();
            //风向均值计算并修约
            Object windDirAvg = AmendUtils.getWindDirAvg(value);
            if (windDirAvg != null) {
                jsonMap.put(Constants.SENSOR_CODE_WIND_DIR, windDirAvg);
            }
            //除风向外其他因子均值计算
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
@@ -83,10 +92,16 @@
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            //风向另外计算
                            if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    jsonMap.put(sensorCode.toString(), Double.parseDouble(String.format("%.4f", optionalDouble.getAsDouble())));
                    //银行家算法修约
                    double sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 4);
                    jsonMap.put(sensorCode.toString(), sciCal);
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
@@ -95,11 +110,13 @@
        //5分钟表后缀
        String insertTimeUnits = DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN);
        //存入数据库
        historyFiveMinutelyMapper.insertHistoryFiveMinutely(insertData, insertTimeUnits);
    }
    public static void main(String[] args) {
        //模拟数据
       /* //模拟数据
        List<Map<String, Object>> list = new ArrayList<>();
        Map<String, Object> map1 = new HashMap<>();
        map1.put("mac", "p5dnd1234567");
@@ -154,6 +171,9 @@
        });
        System.out.println("666==" + insertData);
        System.out.println("666==" + insertData);*/
        double a = 5.5d;
        System.out.println(Math.round(a));
    }
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryWeeklyServiceImpl.java
New file
@@ -0,0 +1,147 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.HistoryDaily;
import com.moral.api.entity.HistoryWeekly;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryWeeklyMapper;
import com.moral.api.service.HistoryDailyService;
import com.moral.api.service.HistoryWeeklyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.util.AmendUtils;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
/**
 * <p>
 * 周数据 服务实现类
 * </p>
 *
 * @author moral
 * @since 2021-06-30
 */
@Service
public class HistoryWeeklyServiceImpl extends ServiceImpl<HistoryWeeklyMapper, HistoryWeekly> implements HistoryWeeklyService {
    @Autowired
    private HistoryWeeklyMapper historyWeeklyMapper;
    @Autowired
    private SensorService sensorService;
    @Autowired
    private HistoryDailyService historyDailyService;
    @Override
    public void insertHistoryWeekly() {
        Date now = new Date();
        //开始时间,上周一
        Date start = DateUtils.geLastWeekMonday();
        //因子
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
        sensorQueryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> sensorCodes = sensorService.listObjs(sensorQueryWrapper);
        //获取所有设备日数据
        QueryWrapper<HistoryDaily> historyDailyQueryWrapper = new QueryWrapper<>();
        historyDailyQueryWrapper.ge("time", DateUtils.dateToDateString(start)).lt("time", DateUtils.dateToDateString(now));
        List<Map<String, Object>> weeklyData = historyDailyService.listMaps(historyDailyQueryWrapper);
        //按mac分组
        Map<String, List<Map<String, Object>>> data = weeklyData.parallelStream().collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            Map<String, Object> jsonMap = new HashMap<>();
            dataMap.put("mac", key);
            dataMap.put("time", start);
            //风向均值计算并修约
            Object windDirAvg = AmendUtils.getWindDirAvg(value);
            if (windDirAvg != null) {
                jsonMap.put(Constants.SENSOR_CODE_WIND_DIR, windDirAvg);
            }
            //CO 95百分位计算并修约
            Object coAvg = AmendUtils.getCOAvgOfWeek(value);
            if (coAvg != null) {
                jsonMap.put(Constants.SENSOR_CODE_CO, coAvg);
            }
            //CO 90百分位计算并修约
            Object o3Avg = AmendUtils.getO3AvgOfWeek(value);
            if (o3Avg != null) {
                jsonMap.put(Constants.SENSOR_CODE_O3, o3Avg);
            }
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            //风向另外计算
                            if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                                return null;
                            }
                            //CO另外计算
                            if (Constants.SENSOR_CODE_CO.equals(sensorCode)) {
                                return null;
                            }
                            //O3另外计算
                            if (Constants.SENSOR_CODE_O3.equals(sensorCode)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    //银行家算法修约
                    double sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 4);
                    jsonMap.put(sensorCode.toString(), sciCal);
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        //存入数据库
        historyWeeklyMapper.insertHistoryWeekly(insertData);
    }
    public static void main(String[] args) {
        List<Double> data = new ArrayList<>();
        data.add(7d);
        data.add(15d);
        data.add(36d);
        data.add(39d);
        data.add(40d);
        data.add(41d);
        data.add(20d);
        data.add(18d);
        System.out.println(data);
        System.out.println(AmendUtils.percentile(data, 95));
    }
}
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java
@@ -3,16 +3,9 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.fastjson.JSON;
import com.moral.api.entity.HistoryDaily;
import com.moral.api.service.HistoryDailyService;
import com.moral.api.service.HistoryFiveMinutelyService;
import com.moral.api.service.HistoryWeeklyService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
@@ -21,10 +14,13 @@
public class HistoryTableInsertTask {
    @Autowired
    private HistoryFiveMinutelyService historyFiveMinutelyService;
    @Autowired
    private HistoryDailyService historyDailyService;
    @Autowired
    private HistoryFiveMinutelyService historyFiveMinutelyService;
    private HistoryWeeklyService historyWeeklyService;
    //5分钟数据统计
    @XxlJob("insertHistoryFiveMinutely")
@@ -49,4 +45,16 @@
        }
        return ReturnT.SUCCESS;
    }
    //周数据统计
    @XxlJob("insertHistoryWeekly")
    public ReturnT insertHistoryWeekly() {
        try {
            historyWeeklyService.insertHistoryWeekly();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
        }
        return ReturnT.SUCCESS;
    }
}
screen-job/src/main/resources/mapper/HistoryWeeklyMapper.xml
New file
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.HistoryWeeklyMapper">
    <!-- 通用查询映射结果 -->
    <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryWeekly">
        <result column="mac" property="mac"/>
        <result column="time" property="time"/>
        <result column="value" property="value"/>
    </resultMap>
    <insert id="insertHistoryWeekly">
        INSERT INTO history_weekly
        VALUES
        <foreach collection="list" item="item" separator=",">
            (#{item.mac},#{item.time},#{item.value})
        </foreach>
    </insert>
</mapper>
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -37,7 +37,7 @@
    private RedisTemplate redisTemplate;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
@@ -68,7 +68,7 @@
    }
    //小时数据
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory")
    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
@@ -99,7 +99,7 @@
    }
    //秒数据,修改设备状态,缓存最新秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {