jinpengyong
2021-10-28 db56a260709d59c872b15d8eb6734fcb99f6cf2b
城市aqi日数据统计定时任务
5 files added
10 files modified
278 ■■■■ changed files
screen-api/src/main/java/com/moral/api/entity/CityAqi.java 2 ●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/entity/Forecast.java 2 ●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/util/AmendUtils.java 24 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/CityAqi.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/CityAqiDaily.java 45 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/CityConfig.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/CityWeather.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/CityAqiDailyMapper.java 16 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/CityAqiDailyService.java 19 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/CityAqiService.java 3 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/CityAqiDailyServiceImpl.java 108 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/CityAqiServiceImpl.java 33 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/CityWeatherServiceImpl.java 2 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/task/AqiInsertTask.java 6 ●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/CityAqiDailyMapper.xml 12 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/entity/CityAqi.java
@@ -24,7 +24,7 @@
    /**
     * 城市id
     */
    private String cityCode;
    private Integer cityCode;
    /**
     * 时间
screen-api/src/main/java/com/moral/api/entity/Forecast.java
@@ -24,7 +24,7 @@
    /**
     * 城市id
     */
    private String cityCode;
    private Integer cityCode;
    /**
     * 时间
screen-common/src/main/java/com/moral/util/AmendUtils.java
@@ -93,7 +93,6 @@
            avgs.add(average);
        }
        max = avgs.stream().mapToDouble(aDouble -> aDouble).summaryStatistics().getMax();
        System.out.println(avgs.size());
        if (avgs.size() < 14) {
            if (max < 160d) {
                return result;
@@ -103,8 +102,24 @@
        return result;
    }
    //城市aqi日均值
    public static Double o3OfDay(List<Map<String, Object>> value) {
        List<Map<String, Object>> o3_8H = getO3_8H(value);
        if (!ObjectUtils.isEmpty(o3_8H)) {
            double o3 = 0d;
            for (Map<String, Object> o : o3_8H) {
                double temp = (double) o.get("o3");
                if (temp > o3) {
                    o3 = temp;
                }
            }
            return sciCal(o3, 0);
        }
        return null;
    }
    /**
     * @param data 数据 time:Date类型
     * @param data 数据 time:Date类型,臭氧key:o3
     * @return 功能:臭氧8小时滑动值计算
     */
    public static List<Map<String, Object>> getO3_8H(List<Map<String, Object>> data) {
@@ -129,7 +144,8 @@
            }
            List<Double> value = new ArrayList<>();
            for (Map<String, Object> dataMap : data) {
                Double o3 = Double.parseDouble(dataMap.get(Constants.SENSOR_CODE_O3).toString());
                Map<String, Object> sensorValue = JSONObject.parseObject((String) dataMap.get("value"), Map.class);
                Double o3 = Double.parseDouble(sensorValue.get("o3").toString());
                Date time = (Date) dataMap.get("time");
                int hour = DateUtils.getHour(time);
                if (hour == 0) {
@@ -143,7 +159,7 @@
                continue;
            }
            double average = value.stream().mapToDouble(aDouble -> aDouble).summaryStatistics().getAverage();
            map.put(Constants.SENSOR_CODE_O3, average);
            map.put("o3", average);
            result.add(map);
        }
        return result;
screen-job/src/main/java/com/moral/api/entity/CityAqi.java
@@ -24,7 +24,7 @@
    /**
     * 城市id
     */
    private String cityCode;
    private Integer cityCode;
    /**
     * 时间
screen-job/src/main/java/com/moral/api/entity/CityAqiDaily.java
New file
@@ -0,0 +1,45 @@
package com.moral.api.entity;
import com.baomidou.mybatisplus.extension.activerecord.Model;
import java.io.Serializable;
import java.util.Date;
import lombok.Data;
import lombok.EqualsAndHashCode;
/**
 * <p>
 * 城市aqi日数据表
 * </p>
 *
 * @author moral
 * @since 2021-10-28
 */
@Data
@EqualsAndHashCode(callSuper = false)
public class CityAqiDaily extends Model<CityAqiDaily> {
    private static final long serialVersionUID = 1L;
    /**
     * 城市id
     */
    private Integer cityCode;
    /**
     * 时间
     */
    private Date time;
    /**
     * 数据
     */
    private String value;
    @Override
    protected Serializable pkVal() {
        return null;
    }
}
screen-job/src/main/java/com/moral/api/entity/CityConfig.java
@@ -30,7 +30,7 @@
    /**
     * 城市code
     */
    private String cityCode;
    private Integer cityCode;
    /**
     * 城市名字
screen-job/src/main/java/com/moral/api/entity/CityWeather.java
@@ -24,7 +24,7 @@
    /**
     * 城市code
     */
    private String cityCode;
    private Integer cityCode;
    /**
     * 时间
screen-job/src/main/java/com/moral/api/mapper/CityAqiDailyMapper.java
New file
@@ -0,0 +1,16 @@
package com.moral.api.mapper;
import com.moral.api.entity.CityAqiDaily;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
 * <p>
 * 城市aqi日数据表 Mapper 接口
 * </p>
 *
 * @author moral
 * @since 2021-10-28
 */
public interface CityAqiDailyMapper extends BaseMapper<CityAqiDaily> {
}
screen-job/src/main/java/com/moral/api/service/CityAqiDailyService.java
New file
@@ -0,0 +1,19 @@
package com.moral.api.service;
import com.moral.api.entity.CityAqiDaily;
import com.baomidou.mybatisplus.extension.service.IService;
/**
 * <p>
 * 城市aqi日数据表 服务类
 * </p>
 *
 * @author moral
 * @since 2021-10-28
 */
public interface CityAqiDailyService extends IService<CityAqiDaily> {
    //城市aqi日数据统计
    void insertCityAqiDaily();
}
screen-job/src/main/java/com/moral/api/service/CityAqiService.java
@@ -16,7 +16,4 @@
    //aqi数据按城市insert
    void insertCityAqi();
    //城市aqi日数据统计
    void insertCityAqiDaily();
}
screen-job/src/main/java/com/moral/api/service/impl/CityAqiDailyServiceImpl.java
New file
@@ -0,0 +1,108 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.CityAqi;
import com.moral.api.entity.CityAqiDaily;
import com.moral.api.mapper.CityAqiDailyMapper;
import com.moral.api.service.CityAqiDailyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.api.service.CityAqiService;
import com.moral.util.AmendUtils;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.stream.Collectors;
import java.util.stream.DoubleStream;
/**
 * <p>
 * 城市aqi日数据表 服务实现类
 * </p>
 *
 * @author moral
 * @since 2021-10-28
 */
@Service
public class CityAqiDailyServiceImpl extends ServiceImpl<CityAqiDailyMapper, CityAqiDaily> implements CityAqiDailyService {
    @Autowired
    private CityAqiDailyMapper cityAqiDailyMapper;
    @Autowired
    private CityAqiService cityAqiService;
    @Override
    public void insertCityAqiDaily() {
        List<String> sensors = Arrays.asList("pm2_5", "pm10", "so2", "no2", "co", "o3", "aqi");
        String format = DateUtils.yyyy_MM_dd_EN;
        Date now = new Date();
        //开始时间,昨日
        Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfDay(now, -1), format);
        //结束时间,今日
        Date end = DateUtils.dataToTimeStampTime(now, format);
        //获取所有城市aqi小时数据
        QueryWrapper<CityAqi> wrapper = new QueryWrapper<>();
        wrapper.select("city_code", "time", "value")
                .ge("time", DateUtils.dateToDateString(start))
                .lt("time", DateUtils.dateToDateString(end));
        List<Map<String, Object>> dailyData = cityAqiService.listMaps(wrapper);
        if (dailyData.size() == 0) {
            return;
        }
        //按city_code分组
        Map<String, List<Map<String, Object>>> data = dailyData.parallelStream().collect(Collectors.groupingBy(o -> o.get("city_code").toString()));
        data.forEach((cityCode, value) -> {
            Map<String, Object> jsonMap = new HashMap<>();
            CityAqiDaily cityAqiDaily = new CityAqiDaily();
            cityAqiDaily.setCityCode(Integer.parseInt(cityCode));
            cityAqiDaily.setTime(start);
            //臭氧采用滑动值算法
            Double o3OfDay = AmendUtils.o3OfDay(value);
            if (o3OfDay != null) {
                jsonMap.put("o3", o3OfDay);
            }
            sensors.forEach(sensor -> {
                OptionalDouble optionalDouble = value.parallelStream().flatMapToDouble(v -> {
                    Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                    //臭氧单独计算
                    if ("o3".equals(sensor)) {
                        return null;
                    }
                    Object sensorValue = dataValue.get(sensor);
                    if (ObjectUtils.isEmpty(sensorValue)) {
                        return null;
                    }
                    double aDouble = Double.parseDouble(sensorValue.toString());
                    return DoubleStream.of(aDouble);
                }).average();
                if (optionalDouble.isPresent()) {
                    //银行家算法修约
                    double sciCal;
                    if ("co".equals(sensor)) {
                        sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 1);
                    } else {
                        sciCal = AmendUtils.sciCal(optionalDouble.getAsDouble(), 0);
                    }
                    jsonMap.put(sensor, sciCal);
                }
            });
            cityAqiDaily.setValue(JSONObject.toJSONString(jsonMap));
            cityAqiDailyMapper.insert(cityAqiDaily);
        });
    }
}
screen-job/src/main/java/com/moral/api/service/impl/CityAqiServiceImpl.java
@@ -22,7 +22,6 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
 * <p>
@@ -47,6 +46,9 @@
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    @Override
    public void insertCityAqi() {
        //获取城市配置
@@ -55,7 +57,7 @@
        List<CityConfig> list = cityConfigService.list(wrapper);
        Date time = DateUtils.dataToTimeStampTime(new Date(), DateUtils.yyyy_MM_dd_HH_EN);
        for (CityConfig cityConfig : list) {
            String cityCode = cityConfig.getCityCode();
            Integer cityCode = cityConfig.getCityCode();
            Map<String, Object> data;
            try {
                //从第三方接口获取数据
@@ -83,32 +85,7 @@
            cityAqi.setValue(JSONObject.toJSONString(aqi));
            cityAqiMapper.insert(cityAqi);
            //存入redis
            redisTemplate.opsForHash().put(RedisConstants.CITY_AQI, cityCode, aqi);
            redisTemplate.opsForHash().put(RedisConstants.CITY_AQI, cityCode.toString(), aqi);
        }
    }
    @Override
    public void insertCityAqiDaily() {
        String format = DateUtils.yyyy_MM_dd_EN;
        Date now = new Date();
        //开始时间,昨日
        Date start = DateUtils.dataToTimeStampTime(DateUtils.getDateOfDay(now, -1), format);
        //结束时间,今日
        Date end = DateUtils.dataToTimeStampTime(now, format);
        //获取所有城市aqi小时数据
        QueryWrapper<CityAqi> wrapper = new QueryWrapper<>();
        wrapper.select("city_code", "value")
                .ge("time", DateUtils.dateToDateString(start))
                .lt("time", DateUtils.dateToDateString(end));
        List<Map<String, Object>> dailyData = cityAqiMapper.selectMaps(wrapper);
        if (dailyData.size() == 0) {
            return;
        }
        //按city_code分组
        Map<String, List<Map<String, Object>>> data = dailyData.parallelStream().collect(Collectors.groupingBy(o -> (String) o.get("city_code")));
        data.forEach((cityCode, value) -> {
        });
    }
}
screen-job/src/main/java/com/moral/api/service/impl/CityWeatherServiceImpl.java
@@ -51,7 +51,7 @@
        wrapper.select("city_code", "location_id").eq("is_delete", Constants.NOT_DELETE);
        List<CityConfig> list = cityConfigService.list(wrapper);
        for (CityConfig cityConfig : list) {
            String cityCode = cityConfig.getCityCode();
            Integer cityCode = cityConfig.getCityCode();
            Integer locationId = cityConfig.getLocationId();
            Map<String, Object> data = restTemplate.getForObject("https://api.qweather.com/v7/weather/now?key=da05c6c4852d4f7aa3364a9236ee9e26&gzip=n&location={1}", Map.class, locationId);
            Map<String, Object> now = (Map<String, Object>) data.get("now");
screen-job/src/main/java/com/moral/api/task/AqiInsertTask.java
@@ -3,6 +3,7 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.moral.api.service.CityAqiDailyService;
import com.moral.api.service.CityAqiService;
import com.moral.api.service.HistoryAqiService;
import com.xxl.job.core.biz.model.ReturnT;
@@ -14,6 +15,9 @@
    @Autowired
    private HistoryAqiService historyAqiService;
    @Autowired
    private CityAqiDailyService cityAqiDailyService;
    @Autowired
    private CityAqiService cityAqiService;
@@ -46,7 +50,7 @@
    @XxlJob("insertCityAqiDaily")
    public ReturnT insertCityAqiDaily(){
        try {
            cityAqiService.insertCityAqiDaily();
            cityAqiDailyService.insertCityAqiDaily();
        } catch (Exception e) {
            XxlJobHelper.log(e.getMessage());
            return new ReturnT(ReturnT.FAIL_CODE, e.getMessage());
screen-job/src/main/resources/mapper/CityAqiDailyMapper.xml
New file
@@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.CityAqiDailyMapper">
    <!-- 通用查询映射结果 -->
    <resultMap id="BaseResultMap" type="com.moral.api.entity.CityAqiDaily">
        <result column="city_code" property="cityCode"/>
        <result column="time" property="time"/>
        <result column="value" property="value"/>
    </resultMap>
</mapper>