package com.moral.api.service.impl; 
 | 
  
 | 
import com.alibaba.fastjson.JSONObject; 
 | 
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; 
 | 
import com.moral.api.entity.CityAqi; 
 | 
import com.moral.api.entity.CityConfigAqi; 
 | 
import com.moral.api.mapper.CityAqiMapper; 
 | 
import com.moral.api.service.CityAqiService; 
 | 
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; 
 | 
import com.moral.api.service.CityConfigAqiService; 
 | 
import com.moral.constant.Constants; 
 | 
import com.moral.constant.RedisConstants; 
 | 
import com.moral.pojo.AQI; 
 | 
import com.moral.util.AQIUtils; 
 | 
import com.moral.util.AmendUtils; 
 | 
import com.moral.util.ComprehensiveIndexUtils; 
 | 
import com.moral.util.DateUtils; 
 | 
  
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
  
 | 
import org.springframework.data.redis.core.RedisTemplate; 
 | 
import org.springframework.http.HttpEntity; 
 | 
import org.springframework.http.HttpHeaders; 
 | 
import org.springframework.http.ResponseEntity; 
 | 
import org.springframework.stereotype.Service; 
 | 
import org.springframework.util.LinkedMultiValueMap; 
 | 
import org.springframework.util.MultiValueMap; 
 | 
import org.springframework.util.ObjectUtils; 
 | 
import org.springframework.web.client.RestTemplate; 
 | 
  
 | 
import java.util.ArrayList; 
 | 
import java.util.Date; 
 | 
import java.util.HashMap; 
 | 
import java.util.List; 
 | 
import java.util.Map; 
 | 
import java.util.OptionalDouble; 
 | 
import java.util.stream.Collectors; 
 | 
import java.util.stream.DoubleStream; 
 | 
  
 | 
/** 
 | 
 * <p> 
 | 
 * 城市aqi实测小时数据表 服务实现类 
 | 
 * </p> 
 | 
 * 
 | 
 * @author moral 
 | 
 * @since 2021-10-11 
 | 
 */ 
 | 
@Service 
 | 
public class CityAqiServiceImpl extends ServiceImpl<CityAqiMapper, CityAqi> implements CityAqiService { 
 | 
  
 | 
    @Autowired 
 | 
    private CityAqiMapper cityAqiMapper; 
 | 
  
 | 
    @Autowired 
 | 
    private CityConfigAqiService cityConfigAqiService; 
 | 
  
 | 
    @Autowired 
 | 
    private RestTemplate restTemplate; 
 | 
  
 | 
    @Autowired 
 | 
    private RedisTemplate redisTemplate; 
 | 
  
 | 
    //城市aqi数据来源于,阿里云市场:墨迹天气(基础版CityID)全国历史天气预报接口 
 | 
    @Override 
 | 
    public void insertCityAqi() { 
 | 
        //pubtime=08的数据,是07-08之间的数据,存入数据库时的时间为07点 
 | 
        Date now = new Date(); 
 | 
        Date time = DateUtils.dataToTimeStampTime(now, DateUtils.yyyy_MM_dd_HH_EN); 
 | 
        //存入数据库是time字段实际时间 
 | 
        Date dataTime = DateUtils.addHours(time, -1); 
 | 
        Date start = null; 
 | 
        if (DateUtils.getHour(time) >= 8 || DateUtils.getHour(time) == 0) { 
 | 
            start = DateUtils.addHours(time, -8); 
 | 
        } 
 | 
  
 | 
        //获取城市配置 
 | 
        QueryWrapper<CityConfigAqi> wrapper = new QueryWrapper<>(); 
 | 
        wrapper.select("city_code", "city_id").eq("is_delete", Constants.NOT_DELETE); 
 | 
        List<CityConfigAqi> list = cityConfigAqiService.list(wrapper); 
 | 
  
 | 
        //获取历史数据,用于计算臭氧滑动值 
 | 
        Map<Integer, List<Map<String, Object>>> cityData = null; 
 | 
        if (start != null) { 
 | 
            QueryWrapper<CityAqi> queryWrapper = new QueryWrapper<>(); 
 | 
            List<Integer> cityCodes = list.stream().map(CityConfigAqi::getCityCode) 
 | 
                    .collect(Collectors.toList()); 
 | 
  
 | 
            queryWrapper.select("city_code", "time", "value") 
 | 
                    .ge("time", start) 
 | 
                    .in("city_code", cityCodes); 
 | 
            List<Map<String, Object>> selectMaps = cityAqiMapper.selectMaps(queryWrapper); 
 | 
  
 | 
            //按city_code分组 
 | 
            cityData = selectMaps.stream() 
 | 
                    .collect(Collectors.groupingBy(o -> Integer.parseInt(o.get("city_code").toString()))); 
 | 
        } 
 | 
  
 | 
  
 | 
        List<CityAqi> cityAqiList = new ArrayList<>(); 
 | 
  
 | 
  
 | 
        String url = "http://aliv13.data.moji.com/whapi/json/alicityweather/aqi"; 
 | 
        String appcode = "31b6ea8f804a4472be3b633cfee44849"; 
 | 
        HttpHeaders httpHeaders = new HttpHeaders(); 
 | 
        httpHeaders.add("Authorization", "APPCODE " + appcode); 
 | 
        //请求参数 
 | 
        LinkedMultiValueMap<String, Object> requestParams = new LinkedMultiValueMap<>(); 
 | 
  
 | 
        for (CityConfigAqi cityConfigAqi : list) { 
 | 
            CityAqi cityAqi = new CityAqi(); 
 | 
            Integer cityCode = cityConfigAqi.getCityCode(); 
 | 
            Map<String, Object> data; 
 | 
            try { 
 | 
                //从第三方接口获取数据 
 | 
                requestParams.set("cityId", cityConfigAqi.getCityId()); 
 | 
                HttpEntity<MultiValueMap<String, Object>> httpEntity = new HttpEntity<>(requestParams, httpHeaders); 
 | 
                ResponseEntity<String> response = restTemplate.postForEntity(url, httpEntity, String.class); 
 | 
                data = JSONObject.parseObject(response.getBody(), Map.class); 
 | 
            } catch (Exception e) { 
 | 
                continue; 
 | 
            } 
 | 
            if (ObjectUtils.isEmpty(data)) { 
 | 
                continue; 
 | 
            } 
 | 
  
 | 
            Map<String, Object> map = (Map<String, Object>) data.get("data"); 
 | 
            Map<String, Object> aqi = (Map<String, Object>) map.get("aqi"); 
 | 
            aqi.remove("pm25"); 
 | 
            aqi.remove("pm10"); 
 | 
            aqi.remove("so2"); 
 | 
            aqi.remove("no2"); 
 | 
            aqi.remove("co"); 
 | 
            aqi.remove("o3"); 
 | 
  
 | 
            aqi.put("PM2_5", aqi.remove("pm25C")); 
 | 
            aqi.put("PM10", aqi.remove("pm10C")); 
 | 
            aqi.put("SO2", aqi.remove("so2C")); 
 | 
            aqi.put("NO2", aqi.remove("no2C")); 
 | 
            aqi.put("CO", aqi.remove("coC")); 
 | 
            aqi.put("O3", aqi.remove("o3C")); 
 | 
            aqi.put("AQI", aqi.remove("value")); 
 | 
  
 | 
            //o3滑动值计算 
 | 
            if (cityData != null) { 
 | 
                List<Map<String, Object>> cityAqis = cityData.get(cityCode); 
 | 
                if (cityAqis == null) { 
 | 
                    cityAqis = new ArrayList<>(); 
 | 
                } 
 | 
  
 | 
                Map<String, Object> params = new HashMap<>(); 
 | 
                params.put("time", DateUtils.dateToDateString(dataTime, DateUtils.yyyy_MM_dd_HH_mm_ss_S_EN)); 
 | 
                params.put("value", JSONObject.toJSONString(aqi)); 
 | 
                cityAqis.add(params); 
 | 
                if (cityAqis.size() >= 6) { 
 | 
                    OptionalDouble average = cityAqis.stream().flatMapToDouble(v -> { 
 | 
                        Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class); 
 | 
                        double o3 = Double.parseDouble(dataValue.get("O3").toString()); 
 | 
                        return DoubleStream.of(o3); 
 | 
                    }).average(); 
 | 
                    if (average.isPresent()) { 
 | 
                        double O3_8H = AmendUtils.sciCal(average.getAsDouble(), 0); 
 | 
                        aqi.put("O3_8H", O3_8H); 
 | 
                    } 
 | 
                } 
 | 
            } 
 | 
  
 | 
            //综合指数计算 
 | 
            Map<String, Object> temp = new HashMap<>(aqi); 
 | 
            temp.put("O3", temp.get("O3_8H")); 
 | 
            Double compositeIndex = ComprehensiveIndexUtils.dailyData(temp); 
 | 
            aqi.put("compositeIndex", compositeIndex); 
 | 
  
 | 
            //首要污染物计算 
 | 
            Map<String, Object> sixParamMap = new HashMap<>(); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_PM25, aqi.get("PM2_5")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_PM10, aqi.get("PM10")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_SO2, aqi.get("SO2")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_NO2, aqi.get("NO2")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_CO, aqi.get("CO")); 
 | 
            sixParamMap.put(Constants.SENSOR_CODE_O3, aqi.get("O3")); 
 | 
            AQI aqiAndPollutant = AQIUtils.hourlyAQI(sixParamMap); 
 | 
            aqi.put("primaryPollutant", aqiAndPollutant.getPrimaryPollutantNames()); 
 | 
  
 | 
  
 | 
            cityAqi.setCityCode(cityCode); 
 | 
            cityAqi.setTime(dataTime); 
 | 
            cityAqi.setValue(JSONObject.toJSONString(aqi)); 
 | 
            cityAqiList.add(cityAqi); 
 | 
            //存入redis 
 | 
            redisTemplate.opsForHash().put(RedisConstants.CITY_AQI, cityCode.toString(), aqi); 
 | 
        } 
 | 
        cityAqiMapper.insertCityAqi(cityAqiList); 
 | 
    } 
 | 
  
 | 
    @Override 
 | 
    public List<CityAqi> getCityAqi() { 
 | 
        String time = DateUtils.getDateStringOfHour(-1, DateUtils.yyyy_MM_dd_HH_EN) + ":00:00"; 
 | 
        QueryWrapper<CityAqi> queryWrapper = new QueryWrapper<>(); 
 | 
        queryWrapper.eq("time", time); 
 | 
        return cityAqiMapper.selectList(queryWrapper); 
 | 
    } 
 | 
} 
 |