jinpengyong
2021-06-30 e6c6e6225bdbaaa27bcde320a79acde8239416c2
定时任务
11 files added
21 files modified
780 ■■■■ changed files
screen-common/src/main/java/com/moral/constant/Constants.java 16 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/KafkaConstants.java 6 ●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/util/DateUtils.java 10 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/HistoryDaily.java 11 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java 50 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/Sensor.java 84 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryDailyMapper.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java 19 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/SensorMapper.java 16 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/DeviceService.java 2 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/HistoryDailyService.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/HistoryFiveMinutelyService.java 2 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/HistoryHourlyService.java 16 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/SensorService.java 16 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryAqiServiceImpl.java 7 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java 155 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java 135 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 20 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/SensorServiceImpl.java 20 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java 24 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/TestController.java 45 ●●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryDailyMapper.xml 2 ●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml 13 ●●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryMinutelyMapper.xml 30 ●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/SensorMapper.xml 19 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java 4 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/controller/TestController.java 24 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java 6 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java 4 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 3 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/HistoryMinutelyServiceImpl.java 4 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/util/AdjustDataUtils.java 13 ●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/Constants.java
@@ -114,7 +114,6 @@
     * */
    public static final String SYSTEM_DICT_TYPE_PURCHASER = "purchaser";
    /*
     * 未校准数据表后缀
     * */
@@ -125,5 +124,20 @@
     * */
    public static final String DEVICE_STATE_OFFLINE = "4";
    /*
     * 臭氧code
     * */
    public static final String SENSOR_CODE_O3 = "a05024";
    /*
     * vocs code
     * */
    public static final String SENSOR_CODE_VOCS = "a99054";
    /*
     * 温度code
     * */
    public static final String SENSOR_CODE_TEMP = "a01001";
}
screen-common/src/main/java/com/moral/constant/KafkaConstants.java
@@ -5,17 +5,17 @@
    /**
     * 分钟数据主题
     */
    public static final String TOPIC_MINUTE = "minute";
    public static final String TOPIC_MINUTE = "minute_data";
    /**
     * 小时数据主题
     */
    public static final String TOPIC_HOUR = "hour";
    public static final String TOPIC_HOUR = "hour_data";
    /*
     * 秒数据主题
     * */
    public static final String TOPIC_SECOND = "test";
    public static final String TOPIC_SECOND = "second_data";
    /**
     * 用于将数据存入数据库的消费组
screen-common/src/main/java/com/moral/util/DateUtils.java
@@ -1301,12 +1301,6 @@
        return date;
    }
    //当前时间转换,只取到分钟
    public static Date convertDate(Date date) {
        String dateString = dateToDateString(date, yyyy_MM_dd_HH_mm_EN);
        return getDate(dateString, yyyy_MM_dd_HH_mm_EN);
    }
    //时间戳转换,只取时分秒
    public static Date dataToTimeStampTime(Date time, String dateFormat) {
        String dateString = dateToDateString(time, dateFormat);
@@ -1315,9 +1309,5 @@
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        System.out.println(new Date().getTime());
    }
}
screen-job/src/main/java/com/moral/api/entity/HistoryDaily.java
@@ -9,7 +9,7 @@
/**
 * <p>
 *
 * 日数据
 * </p>
 *
 * @author moral
@@ -21,10 +21,19 @@
    private static final long serialVersionUID = 1L;
    /*
    * 设备mac
    * */
    private String mac;
    /*
    * 时间
    * */
    private Date time;
    /*
    * 数据
    * */
    private String value;
screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java
New file
@@ -0,0 +1,50 @@
package com.moral.api.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
 * <p>
 * 已校准小时表
 * </p>
 *
 * @author moral
 * @since 2021-06-28
 */
@Data
@EqualsAndHashCode(callSuper = false)
public class HistoryHourly extends Model<HistoryHourly> {
    private static final long serialVersionUID = 1L;
    /**
     * 设备mac
     */
    private String mac;
    /**
     * 数据时间
     */
    private Date time;
    /**
     * 数据
     */
    private String value;
    /**
     * 型号
     */
    private Integer version;
    @Override
    protected Serializable pkVal() {
        return null;
    }
}
screen-job/src/main/java/com/moral/api/entity/Sensor.java
New file
@@ -0,0 +1,84 @@
package com.moral.api.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import com.baomidou.mybatisplus.annotation.TableId;
import java.time.LocalDateTime;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
 * <p>
 *
 * </p>
 *
 * @author moral
 * @since 2021-06-25
 */
@Data
@EqualsAndHashCode(callSuper = false)
public class Sensor extends Model<Sensor> {
    private static final long serialVersionUID = 1L;
    /**
     * 序号
     */
    @TableId(value = "id", type = IdType.AUTO)
    private Integer id;
    /**
     * 因子名称
     */
    private String name;
    /**
     * 公司自定义因子编号
     */
    private String desc;
    /**
     * 国标因子编码
     */
    private String code;
    /**
     * 上限
     */
    private Double upper;
    /**
     * 下限
     */
    private Double lower;
    /**
     * 默认单位
     */
    private String defaultUnitKey;
    /**
     * 创建时间
     */
    private Date createTime;
    /**
     * 更新时间
     */
    private Date updateTime;
    /**
     * 是否删除
     */
    private String isDelete;
    @Override
    protected Serializable pkVal() {
        return this.id;
    }
}
screen-job/src/main/java/com/moral/api/mapper/HistoryDailyMapper.java
@@ -16,6 +16,6 @@
 */
public interface HistoryDailyMapper extends BaseMapper<HistoryDaily> {
    void insertHistoryDaily(List<HistoryDaily> list);
    void insertHistoryDaily(List<Map<String,Object>> list);
}
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
New file
@@ -0,0 +1,19 @@
package com.moral.api.mapper;
import java.util.List;
import java.util.Map;
import com.moral.api.entity.HistoryHourly;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
 * <p>
 * 已校准小时表 Mapper 接口
 * </p>
 *
 * @author moral
 * @since 2021-06-28
 */
public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> {
}
screen-job/src/main/java/com/moral/api/mapper/SensorMapper.java
New file
@@ -0,0 +1,16 @@
package com.moral.api.mapper;
import com.moral.api.entity.Sensor;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
 * <p>
 *  Mapper 接口
 * </p>
 *
 * @author moral
 * @since 2021-06-25
 */
public interface SensorMapper extends BaseMapper<Sensor> {
}
screen-job/src/main/java/com/moral/api/service/DeviceService.java
@@ -13,8 +13,10 @@
 */
public interface DeviceService extends IService<Device> {
    //判断设备离线,更改设备状态
    void judgeOffLineDevice();
    //更改设备状态
    void updateDeviceState(Device device);
}
screen-job/src/main/java/com/moral/api/service/HistoryDailyService.java
@@ -16,6 +16,6 @@
 */
public interface HistoryDailyService extends IService<HistoryDaily> {
    void insertHistoryDaily(List<HistoryDaily> list);
    void insertHistoryDaily();
}
screen-job/src/main/java/com/moral/api/service/HistoryFiveMinutelyService.java
@@ -2,8 +2,10 @@
public interface HistoryFiveMinutelyService {
    //5分钟表创建
    void createTable(String timeUnits);
    //5分钟数据统计
    void insertHistoryFiveMinutely();
}
screen-job/src/main/java/com/moral/api/service/HistoryHourlyService.java
New file
@@ -0,0 +1,16 @@
package com.moral.api.service;
import com.moral.api.entity.HistoryHourly;
import com.baomidou.mybatisplus.extension.service.IService;
/**
 * <p>
 * 已校准小时表 服务类
 * </p>
 *
 * @author moral
 * @since 2021-06-28
 */
public interface HistoryHourlyService extends IService<HistoryHourly> {
}
screen-job/src/main/java/com/moral/api/service/SensorService.java
New file
@@ -0,0 +1,16 @@
package com.moral.api.service;
import com.moral.api.entity.Sensor;
import com.baomidou.mybatisplus.extension.service.IService;
/**
 * <p>
 *  服务类
 * </p>
 *
 * @author moral
 * @since 2021-06-25
 */
public interface SensorService extends IService<Sensor> {
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryAqiServiceImpl.java
@@ -1,12 +1,13 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.moral.api.entity.CityAqiConfig;
import com.moral.api.entity.HistoryAqi;
import com.moral.api.mapper.HistoryAqiMapper;
import com.moral.api.service.CityAqiConfigService;
import com.moral.api.service.HistoryAqiService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.constant.RedisConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -67,11 +68,11 @@
            value.put("O3", data.get("o3C"));
            value.put("AQI", data.get("value"));
            //数据
            historyAqi.setValue(JSON.toJSONString(value));
            historyAqi.setValue(JSONObject.toJSONString(value));
            //数据存入数据库
            historyAqiMapper.insert(historyAqi);
            //存入redis
            redisTemplate.opsForHash().putAll("aqi_" + cityCode, value);
            redisTemplate.opsForHash().putAll(RedisConstants.AQI_DATA + cityCode, value);
        }
    }
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryDailyServiceImpl.java
@@ -1,15 +1,30 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.HistoryDaily;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryDailyMapper;
import com.moral.api.service.HistoryDailyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
/**
 * <p>
@@ -25,9 +40,143 @@
    @Autowired
    private HistoryDailyMapper historyDailyMapper;
    @Autowired
    private SensorService sensorService;
    @Autowired
    private HistoryHourlyService historyHourlyService;
    @Override
    public void insertHistoryDaily(List<HistoryDaily> list) {
        System.out.println(list);
        historyDailyMapper.insertHistoryDaily(list);
    public void insertHistoryDaily() {
        String format = DateUtils.yyyy_MM_dd_EN;
        Date now = new Date();
        //开始时间
        Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfDay(now, -1), format);
        //结束时间
        Date end = DateUtils.dataToTimeStampTime(now, format);
        //因子
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
        sensorQueryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> sensorCodes = sensorService.listObjs(sensorQueryWrapper);
        //获取所有设备小时数据
        QueryWrapper<HistoryHourly> historyHourlyQueryWrapper = new QueryWrapper<>();
        historyHourlyQueryWrapper.ge("time", DateUtils.dateToDateString(start)).le("time", DateUtils.dateToDateString(end));
        List<Map<String, Object>> dailyData = historyHourlyService.listMaps(historyHourlyQueryWrapper);
        //按mac分组
        Map<String, List<Map<String, Object>>> data = dailyData.parallelStream().collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            Map<String, Object> jsonMap = new HashMap<>();
            dataMap.put("mac", key);
            dataMap.put("time", start);
            //中间变量,用于计算除臭氧外其它因子
            List<Map<String, Object>> tempValue = new ArrayList<>(value);
            value.removeIf(map -> ((Date) map.get("time")).getTime() == start.getTime());
            //臭氧8小时滑动值
            double o3AvgOfDay = getO3AvgOfDay(value);
            jsonMap.put(Constants.SENSOR_CODE_O3, o3AvgOfDay);
            System.out.println("臭氧==" + value);
            //除臭氧外日均值计算
            tempValue.removeIf(map -> ((Date) map.get("time")).getTime() == end.getTime());
            System.out.println("其他因子==" + tempValue);
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = tempValue.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            if (sensorCode.equals(Constants.SENSOR_CODE_O3)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    jsonMap.put(sensorCode.toString(), Double.parseDouble(String.format("%.4f", optionalDouble.getAsDouble())));
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        //臭氧8小时滑动值
        System.out.println(insertData);
        historyDailyMapper.insertHistoryDaily(insertData);
    }
    //臭氧8小时滑动值
    private double getO3AvgOfDay(List<Map<String, Object>> list) {
        double max = 0d;
        for (int i = 8; i <= 24; i++) {
            List<Double> data = new ArrayList<>();
            for (Map<String, Object> dataMap : list) {
                Map<String, Object> dataValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
                Double o3 = Double.parseDouble(dataValue.get(Constants.SENSOR_CODE_O3).toString());
                int hour = DateUtils.getHour((Date) dataMap.get("time"));
                if (hour == 0) {
                    hour = 24;
                }
                if (hour <= i && hour >= i - 7) {
                    data.add(o3);
                }
            }
            double average = data.stream().mapToDouble(aDouble -> aDouble).summaryStatistics().getAverage();
            if (max < average) {
                max = average;
            }
        }
        return Double.parseDouble(String.format("%.4f", max));
    }
    public static void main(String[] args) {
        List<Integer> list = new ArrayList<>();
        list.add(8);//1点
        list.add(12);//2
        list.add(10);//3
        list.add(18);//4
        list.add(16);//5
        list.add(22);//6
        list.add(4);//8
        list.add(12);
        list.add(28);
        list.add(26);
        list.add(25);
        list.add(21);
        list.add(6);
        list.add(18);
        list.add(28);
        list.add(18);
        list.add(16);
        list.add(15);
        list.add(12);
        list.add(14);
        list.add(12);
        list.add(10);
        list.add(5);
        list.add(88);
        Integer max = 0;
        for (int i = 7; i < list.size(); i++) {
            Integer sum = 0;
            for (int j = i - 7; j <= i; j++) {
                Integer b = list.get(j);
                sum = sum + b;
            }
            if (sum > max) {
                max = sum;
            }
        }
        System.out.println((max / 8F));
    }
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryFiveMinutelyServiceImpl.java
@@ -2,18 +2,25 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryFiveMinutelyMapper;
import com.moral.api.service.HistoryFiveMinutelyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.service.SensorService;
import com.moral.constant.Constants;
import com.moral.util.DateUtils;
@Service
@@ -25,6 +32,9 @@
    @Autowired
    private HistoryMinutelyService historyMinutelyService;
    @Autowired
    private SensorService sensorService;
    @Override
    public void createTable(String timeUnits) {
        historyFiveMinutelyMapper.createTable(timeUnits);
@@ -32,33 +42,118 @@
    @Override
    public void insertHistoryFiveMinutely() {
        //时间格式化:yyyy-MM-dd HH:mm
        String format = DateUtils.yyyy_MM_dd_HH_mm_EN;
        Date now = new Date();
        Map<String, Object> params = new HashMap<>();
        //开始时间(分钟)
        Date start = DateUtils.convertDate(DateUtils.getDateOfMin(-5));
        //结束时间(分钟)
        Date end = DateUtils.convertDate(new Date());
        //开始时间
        Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfMin(now, -5), format);
        //结束时间
        Date end = DateUtils.dataToTimeStampTime(now, format);
        params.put("start", start);
        params.put("end", end);
        //分钟表后缀
        //获取数据的分钟表后缀
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        params.put("timeUnits", timeUnits);
        //因子
        QueryWrapper<Sensor> queryWrapper = new QueryWrapper<>();
        queryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> sensorCodes = sensorService.listObjs(queryWrapper);
        params.put("sensorKeys", null);
        //获取所有设备的5分钟数据
        List<Map<String, Object>> fiveMinutelyData = historyMinutelyService.getHistoryFiveMinutelyData(params);
        //按mac分组
        Map<String, List<Map<String, Object>>> data = fiveMinutelyData.parallelStream()
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        historyMinutelyService.getHistoryFiveMinutelyData(params);
        /*List<Map<String, Object>> list = new ArrayList<>();
        Map<String, Object> map = new HashMap<>();
        map.put("mac", "123456");
        Date date = new Date();
        map.put("time", date);
        Map<String, Object> value = new HashMap<>();
        value.put("e1", 10);
        value.put("e2", 20);
        map.put("value", JSON.toJSONString(value));
        list.add(map);*/
        //存入数据库的结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        historyFiveMinutelyMapper.insertHistoryFiveMinutely(null, null);
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            Map<String, Object> jsonMap = new HashMap<>();
            dataMap.put("mac", key);
            dataMap.put("time", end);
            sensorCodes.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    jsonMap.put(sensorCode.toString(), Double.parseDouble(String.format("%.4f", optionalDouble.getAsDouble())));
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        //5分钟表后缀
        String insertTimeUnits = DateUtils.dateToDateString(new Date());
        historyFiveMinutelyMapper.insertHistoryFiveMinutely(insertData, insertTimeUnits);
    }
    public static void main(String[] args) {
        //模拟数据
        List<Map<String, Object>> list = new ArrayList<>();
        Map<String, Object> map1 = new HashMap<>();
        map1.put("mac", "p5dnd1234567");
        map1.put("value", "{\"a0001\": 10, \"a0002\": 8, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\"}");
        Map<String, Object> map2 = new HashMap<>();
        map2.put("mac", "p5dnd123456789");
        map2.put("value", "{\"a0001\": 12, \"a0002\": 12, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\"}");
        Map<String, Object> map3 = new HashMap<>();
        map3.put("mac", "p5dnd1234567");
        map3.put("value", "{\"a0001\": 6, \"a0002\": 20, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\", \"a0003\": 14}");
        Map<String, Object> map4 = new HashMap<>();
        map4.put("mac", "p5dnd1234567");
        map4.put("value", "{\"a0001\": 4, \"a0002\": 16, \"a0001-Flag\": \"N\", \"a0002-Flag\": \"N\", \"a0003\": 16}");
        list.add(map1);
        list.add(map2);
        list.add(map3);
        list.add(map4);
        //数据按mac分组
        Map<String, List<Map<String, Object>>> data = list.parallelStream().collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        System.out.println(data);
        //所有因子
        List<Object> sensors = new ArrayList<>();
        sensors.add("a0001");
        sensors.add("a0002");
        sensors.add("a0003");
        //插入数据库结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            Map<String, Object> dataMap = new HashMap<>();
            Map<String, Object> jsonMap = new HashMap<>();
            dataMap.put("mac", key);
            sensors.forEach(sensorCode -> {
                OptionalDouble optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode.toString());
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            return DoubleStream.of(Double.parseDouble(sensorValue.toString()));
                        }).average();
                if (optionalDouble.isPresent()) {
                    jsonMap.put(sensorCode.toString(), Double.parseDouble(String.format("%.4f", optionalDouble.getAsDouble())));
                }
            });
            dataMap.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(dataMap);
        });
        System.out.println("666==" + insertData);
    }
}
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
New file
@@ -0,0 +1,20 @@
package com.moral.api.service.impl;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.service.HistoryHourlyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
/**
 * <p>
 * 已校准小时表 服务实现类
 * </p>
 *
 * @author moral
 * @since 2021-06-28
 */
@Service
public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
}
screen-job/src/main/java/com/moral/api/service/impl/SensorServiceImpl.java
New file
@@ -0,0 +1,20 @@
package com.moral.api.service.impl;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.SensorMapper;
import com.moral.api.service.SensorService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.stereotype.Service;
/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author moral
 * @since 2021-06-25
 */
@Service
public class SensorServiceImpl extends ServiceImpl<SensorMapper, Sensor> implements SensorService {
}
screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java
@@ -30,17 +30,6 @@
    @XxlJob("insertHistoryFiveMinutely")
    public ReturnT insertHistoryFiveMinutely() {
        try {
           /* List<Map<String, Object>> list = new ArrayList<>();
            Map<String, Object> map = new HashMap<>();
            map.put("mac", "123456");
            Date date = new Date();
            map.put("time", date);
            Map<String, Object> value = new HashMap<>();
            value.put("e1", 10);            value.put("e2", 20);
            map.put("value", JSON.toJSONString(value));
            list.add(map);
            String timeUnits = DateUtils.dateToDateString(date, DateUtils.yyyyMM_EN);*/
            historyFiveMinutelyService.insertHistoryFiveMinutely();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
@@ -53,18 +42,7 @@
    @XxlJob("insertHistoryDaily")
    public ReturnT insertHistoryDaily() {
        try {
            List<HistoryDaily> list = new ArrayList<>();
            HistoryDaily historyDaily = new HistoryDaily();
            historyDaily.setMac("123456");
            historyDaily.setTime(new Date());
            Map<String, Object> value = new HashMap<>();
            value.put("e1", 1);
            value.put("e2", 2);
            historyDaily.setValue(JSON.toJSONString(value));
            for (int i = 0; i < 20000; i++) {
                list.add(historyDaily);
            }
            historyDailyService.insertHistoryDaily(list);
            historyDailyService.insertHistoryDaily();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return ReturnT.FAIL;
screen-job/src/main/java/com/moral/api/task/TestController.java
New file
@@ -0,0 +1,45 @@
package com.moral.api.task;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.service.HistoryDailyService;
import com.moral.api.service.HistoryHourlyService;
@Slf4j
@Api(tags = {"定时任务"})
@RestController
@RequestMapping("/job")
public class TestController {
    @Autowired
    private HistoryHourlyService historyHourlyService;
    @Autowired
    private HistoryHourlyMapper historyHourlyMapper;
    @Autowired
    private HistoryDailyService historyDailyService;
    @ApiOperation(value = "job测试", notes = "job测试")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "data", value = "data", required = true, paramType = "query", dataType = "String")
    })
    @RequestMapping(value = "jobTest", method = RequestMethod.GET)
    public void jobTest() {
        historyDailyService.insertHistoryDaily();
    }
}
screen-job/src/main/resources/mapper/HistoryDailyMapper.xml
@@ -9,7 +9,7 @@
        <result column="value" property="value"/>
    </resultMap>
    <insert id="insertHistoryDaily" parameterType="com.moral.api.entity.HistoryDaily">
    <insert id="insertHistoryDaily">
        INSERT INTO history_daily
        VALUES
        <foreach collection="list" item="item" separator=",">
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
New file
@@ -0,0 +1,13 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.HistoryHourlyMapper">
    <!-- 通用查询映射结果 -->
    <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly">
        <result column="mac" property="mac"/>
        <result column="time" property="time"/>
        <result column="value" property="value"/>
        <result column="version" property="version"/>
    </resultMap>
</mapper>
screen-job/src/main/resources/mapper/HistoryMinutelyMapper.xml
@@ -16,35 +16,11 @@
    <select id="getHistoryFiveMinutelyData" resultType="java.util.LinkedHashMap">
        SELECT
        mac
        <foreach collection="sensorKeys" open="," separator="," item="sensorKey">
            <choose>
                <when test="sensorKey=='e23[0]'">
                    ROUND((CASE WHEN AVG(value->'$.e18[0]'*SIN((value->'$.e23[0]'/180)*PI()))<![CDATA[>]]>0 AND
                    AVG(value->'$.e18[0]'*COS((value->'$.e23[0]'/180)*PI()))<![CDATA[>]]>0
                    THEN
                    ATAN(AVG(value->'$.e18[0]'*SIN((value->'$.e23[0]'/180)*PI()))/AVG(value->'$.e18[0]'*COS((value->'$.e23[0]'/180)*PI())))*180/PI()
                    WHEN AVG(value->'$.e18[0]'*SIN((value->'$.e23[0]'/180)*PI()))>0 AND
                    AVG(value->'$.e18[0]'*COS((value->'$.e23[0]'/180)*PI()))<![CDATA[<]]>0
                    THEN
                    (ATAN(AVG(value->'$.e18[0]'*SIN((value->'$.e23[0]'/180)*PI()))/AVG(value->'$.e18[0]'*COS((value->'$.e23[0]'/180)*PI())))*180/PI())+180
                    WHEN AVG(value->'$.e18[0]'*SIN((value->'$.e23[0]'/180)*PI()))<![CDATA[<]]>0 AND
                    AVG(value->'$.e18[0]'*COS((value->'$.e23[0]'/180)*PI()))<![CDATA[<]]>0
                    THEN
                    (ATAN(AVG(value->'$.e18[0]'*SIN((value->'$.e23[0]'/180)*PI()))/AVG(value->'$.e18[0]'*COS((value->'$.e23[0]'/180)*PI())))*180/PI())+180
                    ELSE
                    (ATAN(AVG(value->'$.e18[0]'*SIN((value->'$.e23[0]'/180)*PI()))/AVG(value->'$.e18[0]'*COS((value->'$.e23[0]'/180)*PI())))*180/PI())+360
                    END),3) AS '${sensorKey}'
                </when>
                <otherwise>
                    ROUND(AVG(value->'$.${sensorKey}[0]'),3) AS '${sensorKey}'
                </otherwise>
            </choose>
        </foreach>
        mac, value
        FROM
        history_minutely_${timeUnits}
        WHERE time <![CDATA[>=]]> #{start} AND time <![CDATA[<]]> #{end}
        GROUP BY mac
        WHERE time <![CDATA[>=]]> #{start}
        AND time <![CDATA[<]]> #{end}
    </select>
</mapper>
screen-job/src/main/resources/mapper/SensorMapper.xml
New file
@@ -0,0 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.SensorMapper">
    <!-- 通用查询映射结果 -->
    <resultMap id="BaseResultMap" type="com.moral.api.entity.Sensor">
        <id column="id" property="id"/>
        <result column="name" property="name"/>
        <result column="desc" property="desc"/>
        <result column="code" property="code"/>
        <result column="upper" property="upper"/>
        <result column="lower" property="lower"/>
        <result column="default_unit_key" property="defaultUnitKey"/>
        <result column="create_time" property="createTime"/>
        <result column="update_time" property="updateTime"/>
        <result column="is_delete" property="isDelete"/>
    </resultMap>
</mapper>
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
@@ -13,8 +13,8 @@
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
/*@Configuration
@EnableKafka*/
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
screen-manage/src/main/java/com/moral/api/controller/TestController.java
@@ -1,9 +1,14 @@
package com.moral.api.controller;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.moral.api.entity.Sensor;
import com.moral.api.entity.Test;
import com.moral.api.service.SensorService;
import com.moral.api.service.TestService;
import com.moral.api.service.impl.SensorServiceImpl;
import com.moral.api.util.CacheUtils;
import com.moral.constant.Constants;
import com.moral.constant.KafkaConstants;
import com.moral.constant.ResultMessage;
import com.moral.redis.RedisUtil;
@@ -23,6 +28,7 @@
import javax.annotation.Resource;
import java.io.*;
import java.util.List;
@Slf4j
@@ -182,4 +188,22 @@
        writer.close();
        fis.close();
    }
    @Autowired
    private SensorService sensorService;
    @ApiOperation(value = "因子测试", notes = "因子测试")
    @ApiImplicitParams({
            @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String")
    })
    @RequestMapping(value = "getSensor", method = RequestMethod.GET)
    public void getSensor() {
        QueryWrapper<Sensor> queryWrapper = new QueryWrapper<>();
        queryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE);
        List<Object> list = sensorService.listObjs(queryWrapper);
        for (Object o : list) {
            System.out.println(o);
        }
    }
}
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -20,7 +20,7 @@
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
//@Component
@Component
@Slf4j
public class KafkaConsumer {
@@ -40,7 +40,6 @@
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        System.out.println(msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
@@ -59,6 +58,7 @@
                        return !(key.contains("Min") || key.contains("Max") || key.contains("Cou"));
                    }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue));
            data.remove("time");
            data.remove("entryTime");
            //存入数据库
            historyMinutelyService.insertHistoryMinutely(data);
            ack.acknowledge();
@@ -89,6 +89,7 @@
                        return !(key.contains("Min") || key.contains("Max") || key.contains("Cou"));
                    }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue));
            data.remove("time");
            data.remove("entryTime");
            //存入数据库
            historyHourlyService.insertHistoryHourly(data);
            ack.acknowledge();
@@ -101,7 +102,6 @@
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        //System.out.println(record.offset() + "===>" + msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
screen-manage/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -457,13 +457,13 @@
            String formula = sensor.getFormula();
            //转换单位前因子值
            String sensorValue = (String) deviceData.get(sensorCode);
            double value = Double.parseDouble(String.format("%.3f", sensorValue));
            double value = Double.parseDouble(String.format("%.4f", sensorValue));
            //单位转换
            if (formula != null) {
                //转换后因子值
                sensorValue = formula.replace("{0}", sensorValue);
                expression = AviatorEvaluator.compile(sensorValue);
                value = Double.parseDouble(String.format("%.3f", expression.execute()));
                value = Double.parseDouble(String.format("%.4f", expression.execute()));
            }
            int sensorState = judgeState(list, value);
            if (sensorState > state) {
screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -37,7 +37,8 @@
    public void insertHistoryHourly(Map<String, Object> data) {
        Map<String, Object> dataAdjust = new HashMap<>(data);
        String mac = data.remove("mac").toString();
        Date time = DateUtils.dataToTimeStampTime(new Date(new Long((String) data.remove("DataTime"))), DateUtils.yyyy_MM_dd_HH_EN);
        Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyy_MM_dd_HH_EN);
        Integer version = (Integer) data.remove("ver");
        Map<String, Object> result = new HashMap<>(data);
        result.put("mac", mac);
screen-manage/src/main/java/com/moral/api/service/impl/HistoryMinutelyServiceImpl.java
@@ -30,8 +30,8 @@
        Object mac = data.remove("mac");
        result.put("mac", mac);
        result.put("version", data.remove("ver"));
        Date time = new Date(new Long((String) data.remove("DataTime")));
        result.put("time", DateUtils.dataToTimeStampTime(time,DateUtils.yyyy_MM_dd_HH_mm_EN));
        Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
        result.put("time", time);
        result.put("value", JSONObject.toJSONString(data));
        String timeUnits = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN);
        result.put("timeUnits", tableSuffix(timeUnits, Constants.UN_ADJUST));
screen-manage/src/main/java/com/moral/api/util/AdjustDataUtils.java
@@ -14,6 +14,7 @@
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.moral.api.entity.DeviceAdjustValue;
import com.moral.constant.Constants;
import com.moral.util.DateUtils;
@Slf4j
@@ -22,10 +23,8 @@
    //数据,公式
    public Map<String, Object> adjust(Map<String, Object> deviceData, Map<String, Object> adjustFormula, Map<String, Object> aqiMap) {
        try {
            Object dataTime = deviceData.get("DataTime");
            //清除毫秒,四舍五入
            long time = Math.round(new Double((String) dataTime) / 1000) * 1000L;
            long finalTime = DateUtils.dataToTimeStampTime(new Date(time), DateUtils.HH_mm_ss_EN).getTime();
            Date time = DateUtils.getDate((String) deviceData.get("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
            long finalTime = DateUtils.dataToTimeStampTime(time, DateUtils.HH_mm_ss_EN).getTime();
            for (String key : deviceData.keySet()) {
                if (!key.equals("mac") && !key.equals("time") && !key.equals("DataTime") && !key.equals("ver") && !key.contains("Flag")) {
                    //测量值
@@ -57,7 +56,7 @@
                        env.put("aqi", ObjectUtils.isEmpty(aqiValue) ? 0F : Float.parseFloat((String) aqiValue));
                    }
                    if (formula.contains("vocs")) {
                        Object vocsValue = ObjectUtils.isEmpty(deviceData.get("a99054")) ? 0F : deviceData.get("a99054");
                        Object vocsValue = ObjectUtils.isEmpty(deviceData.get(Constants.SENSOR_CODE_VOCS)) ? 0F : deviceData.get(Constants.SENSOR_CODE_VOCS);
                        env.put("vocs", vocsValue);
                    }
                    if (formula.contains("cel")) {
@@ -66,10 +65,10 @@
                    //校准
                    measuredValue = expression.execute(env);
                    //温度处理
                    if (Float.parseFloat(measuredValue.toString()) < 0 && !"a01001".equals(measuredValue)) {
                    if (!Constants.SENSOR_CODE_TEMP.equals(measuredValue) && Float.parseFloat(measuredValue.toString()) < 0) {
                        measuredValue = 0F;
                    }
                    deviceData.put(key, Double.parseDouble(String.format("%.3f", measuredValue)));
                    deviceData.put(key, Double.parseDouble(String.format("%.4f", measuredValue)));
                }
            }
        } catch (Exception e) {