删除7天前数据以及分区任务;AQI数据插入任务;创建history_minutely_年月分表任务
10 files added
8 files modified
594 ■■■■■ changed files
src/main/java/com/moral/mapper/AQIMapper.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/mapper/HistoryMapper.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/mapper/HistoryMinutelyMapper.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/service/AQIService.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/service/HistoryMinutelyService.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/service/HistoryService.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/moral/service/impl/AQIServiceImpl.java 23 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/service/impl/HistoryMinutelyServiceImpl.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/service/impl/HistoryServiceImpl.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/task/AQIDataInsertTask.java 83 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/task/HistoryMinutelySubTableTask.java 28 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/task/HistoryTableDeleteTask.java 39 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/util/DateUtil.java 16 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/util/HttpUtils.java 310 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/util/WxMappingJackson2HttpMessageConverter.java 16 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/AQIMapper.xml 15 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/HistoryMapper.xml 7 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/HistoryMinutelyMapper.xml 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/moral/mapper/AQIMapper.java
New file
@@ -0,0 +1,10 @@
package com.moral.mapper;
import java.util.List;
import java.util.Map;
public interface AQIMapper {
    List<Map<String, Object>> getCityAqiConfig();
    int insertAQIData(Map<String, Object> parameters);
}
src/main/java/com/moral/mapper/HistoryMapper.java
@@ -4,6 +4,7 @@
import java.util.Map;
import com.moral.entity.History;
import org.apache.ibatis.annotations.Param;
public interface HistoryMapper {
    int insert(History record);
@@ -28,4 +29,7 @@
    int insertHistorySpecialTable(Map<String, Object> parameters);
    int deleteHistoryData(String oldTime);
    void deletePartition(@Param("p") String p);
}
src/main/java/com/moral/mapper/HistoryMinutelyMapper.java
@@ -11,4 +11,5 @@
    List<Map<String, Object>> getMinutelySensorData(Map<String, Object> parameters);
    void createHistoryMinutelyTable(@Param("yearAndMonth") String yearAndMonth);
}
src/main/java/com/moral/service/AQIService.java
New file
@@ -0,0 +1,10 @@
package com.moral.service;
import java.util.List;
import java.util.Map;
public interface AQIService {
    List<Map<String, Object>> getCityAqiConfig();
    int insertAQIData(Map<String, Object> parameters);
}
src/main/java/com/moral/service/HistoryMinutelyService.java
@@ -9,4 +9,7 @@
    List<Map<String, Object>> getMinutelySensorData(Map<String, Object> parameters);
    //创建分表任务
    void createHistoryMinutelyTable(String yearAndMonth);
}
src/main/java/com/moral/service/HistoryService.java
@@ -10,4 +10,8 @@
    List<History> selectByMacAndTime(Map<String, Object> parameters);
    int insertHistorySpecialTable(Map<String, Object> parameters);
    int deleteHistoryData(String oldTime);
    void deletePartition(String p);
}
src/main/java/com/moral/service/impl/AQIServiceImpl.java
New file
@@ -0,0 +1,23 @@
package com.moral.service.impl;
import com.moral.mapper.AQIMapper;
import com.moral.service.AQIService;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
@Service
public class AQIServiceImpl implements AQIService {
    @Resource
    private AQIMapper aqiMapper;
    @Override
    public List<Map<String, Object>> getCityAqiConfig() {
        return aqiMapper.getCityAqiConfig();
    }
    @Override
    public int insertAQIData(Map<String, Object> parameters) {
        return aqiMapper.insertAQIData(parameters);
    }
}
src/main/java/com/moral/service/impl/HistoryMinutelyServiceImpl.java
@@ -26,4 +26,9 @@
        return historyMinutelyMapper.getMinutelySensorData(parameters);
    }
    @Override
    public void createHistoryMinutelyTable(String yearAndMonth) {
        historyMinutelyMapper.createHistoryMinutelyTable(yearAndMonth);
    }
}
src/main/java/com/moral/service/impl/HistoryServiceImpl.java
@@ -26,4 +26,14 @@
    public int insertHistorySpecialTable(Map<String, Object> parameters) {
        return historyMapper.insertHistorySpecialTable(parameters);
    }
    @Override
    public int deleteHistoryData(String oldTime) {
        return historyMapper.deleteHistoryData(oldTime);
    }
    @Override
    public void deletePartition(String p) {
        historyMapper.deletePartition(p);
    }
}
src/main/java/com/moral/task/AQIDataInsertTask.java
New file
@@ -0,0 +1,83 @@
package com.moral.task;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.moral.service.AQIService;
import com.moral.util.HttpUtils;
import com.moral.util.WxMappingJackson2HttpMessageConverter;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.http.HttpResponse;
import org.apache.http.util.EntityUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.web.client.RestTemplate;
import redis.clients.jedis.Jedis;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.*;
@Component
public class AQIDataInsertTask {
    @Resource
        private AQIService aqiService;
        public void insertData(){
            SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            RestTemplate restTemplate = new RestTemplate();
            restTemplate.getMessageConverters().add(new WxMappingJackson2HttpMessageConverter());
            Date pubtime = DateUtils.truncate(new Date(),Calendar.HOUR);
            Jedis jedis = new Jedis("r-bp1672d21a422a14161.redis.rds.aliyuncs.com", 6379);
            jedis.auth("KtElFcI1sYm9NP3");
            jedis.select(1);
            List<Map<String,Object>> CityAqiConfigs =aqiService.getCityAqiConfig();
            for (Map<String, Object> cityAqiConfig : CityAqiConfigs) {
                String entity = null ;
                Collection<Object> values = null;
                Map<String, Object> data = null;
                try {
                    HttpResponse response = HttpUtils.doGet("https://api.epmap.org", "/api/v1/air/city", "GET",
                            new HashMap<String, String>() {{put("Authorization", "APPCODE " + "31b6ea8f804a4472be3b633cfee44849");}},
                            new HashMap<String, String>() {{put("city", cityAqiConfig.get("city_name").toString());}}
                    );
                    entity = EntityUtils.toString(response.getEntity());
                    JSONObject json = JSON.parseObject(entity);
                    data = (Map<String, Object>) json.get("data");
                    if (!ObjectUtils.isEmpty(data)) {
                        values = data.values();
                        pubtime = format.parse(data.get("pubtime").toString());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                Map<String, Object> parameters = new HashMap<>();
                parameters.put("time",pubtime);
                parameters.put("data",data.toString());
                parameters.put("code",cityAqiConfig.get("city_code"));
                if (!ObjectUtils.isEmpty(data)) {
                    try {
                        aqiService.insertAQIData(parameters);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if ("1".equals(cityAqiConfig.get("is_compensate"))) {
                        Map<String, String> map = new HashMap<String, String>();
                        map.put("e1", data.containsKey("PM25C") ? data.get("PM25C").toString() : data.get("PM2_5").toString());
                        map.put("e2", data.containsKey("PM10C") ? data.get("PM10C").toString() : data.get("PM10").toString());
                        map.put("e10", data.containsKey("COC") ? data.get("COC").toString() : data.get("CO").toString());
                        map.put("e11", data.containsKey("SO2C") ? data.get("SO2C").toString() : data.get("SO2").toString());
                        map.put("e15", data.containsKey("O3C") ? data.get("O3C").toString() : data.get("O3").toString());
                        map.put("e16", data.containsKey("NO2C") ? data.get("NO2C").toString() : data.get("NO2").toString());
                        jedis.hmset("aqi_" + cityAqiConfig.get("city_code"), map);
                    }
                }
            }
        }
}
src/main/java/com/moral/task/HistoryMinutelySubTableTask.java
New file
@@ -0,0 +1,28 @@
package com.moral.task;
import com.moral.service.HistoryMinutelyService;
import com.xxl.job.core.biz.model.ReturnT;
import org.springframework.format.datetime.joda.LocalDateTimeParser;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
@Component
public class HistoryMinutelySubTableTask {
    @Resource
    private HistoryMinutelyService historyMinutelyService;
    public ReturnT createHistoryMinutelyTb(){
        LocalDateTime time=LocalDateTime.now();
        LocalDateTime time1 = time.plusMonths(1);
        String year = String.valueOf(time1.getYear());
        String month = String.valueOf(time1.getMonthValue());
        if (month.length()<2){
            month="0"+month;
        }
        String yearAndMonth=year+month;
        historyMinutelyService.createHistoryMinutelyTable(yearAndMonth);
        ReturnT returnT = new ReturnT();
        return returnT;
    }
}
src/main/java/com/moral/task/HistoryTableDeleteTask.java
New file
@@ -0,0 +1,39 @@
package com.moral.task;
import com.moral.service.HistoryService;
import com.moral.util.DateUtil;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class HistoryTableDeleteTask {
    private static transient Logger logger = LoggerFactory.getLogger(HistoryTableInsertTask.class);
    @Resource
    private HistoryService historyService;
    @XxlJob("deleteOldDate")
    public ReturnT deleteOldData() {
        String p=DateUtil.getOldTime(8);
        String s=DateUtil.getOldTime(7);
        String[] ss=p.split("-");
        p="p"+ss[0]+ss[1]+ss[2];
        int i=historyService.deleteHistoryData(s);
        if (i>0){
            ReturnT returnT = new ReturnT(200, s+"之前数据删除成功");
            historyService.deletePartition(p);
            return returnT;
        }else{
            ReturnT returnT = new ReturnT(500, "删除数据失败");
            return returnT;
        }
    }
}
src/main/java/com/moral/util/DateUtil.java
New file
@@ -0,0 +1,16 @@
package com.moral.util;
import java.text.SimpleDateFormat;
import java.util.Date;
public class DateUtil {
    //获得过去i天的年月日
    public static String getOldTime(int i){
        Long time=System.currentTimeMillis();
        Long time1=time-(1000 * 60 * 60 * 24 * i);
        SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String sd = sdf.format(new Date(Long.parseLong(String.valueOf(time1))));
        String s=sd.substring(0,10);
        return s;
    }
}
src/main/java/com/moral/util/HttpUtils.java
New file
@@ -0,0 +1,310 @@
package com.moral.util;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.conn.ClientConnectionManager;
import org.apache.http.conn.scheme.Scheme;
import org.apache.http.conn.scheme.SchemeRegistry;
import org.apache.http.conn.ssl.SSLSocketFactory;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HttpUtils {
    /**
     * get
     *
     * @param host
     * @param path
     * @param method
     * @param headers
     * @param querys
     * @return
     * @throws Exception
     */
    public static HttpResponse doGet(String host, String path, String method,
                                     Map<String, String> headers,
                                     Map<String, String> querys)
            throws Exception {
        HttpClient httpClient = wrapClient(host);
        HttpGet request = new HttpGet(buildUrl(host, path, querys));
        for (Map.Entry<String, String> e : headers.entrySet()) {
            request.addHeader(e.getKey(), e.getValue());
        }
        return httpClient.execute(request);
    }
    /**
     * post form
     *
     * @param host
     * @param path
     * @param method
     * @param headers
     * @param querys
     * @param bodys
     * @return
     * @throws Exception
     */
    public static HttpResponse doPost(String host, String path, String method,
                                      Map<String, String> headers,
                                      Map<String, String> querys,
                                      Map<String, String> bodys)
            throws Exception {
        HttpClient httpClient = wrapClient(host);
        HttpPost request = new HttpPost(buildUrl(host, path, querys));
        for (Map.Entry<String, String> e : headers.entrySet()) {
            request.addHeader(e.getKey(), e.getValue());
        }
        if (bodys != null) {
            List<NameValuePair> nameValuePairList = new ArrayList<NameValuePair>();
            for (String key : bodys.keySet()) {
                nameValuePairList.add(new BasicNameValuePair(key, bodys.get(key)));
            }
            UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(nameValuePairList, "utf-8");
            formEntity.setContentType("application/x-www-form-urlencoded; charset=UTF-8");
            request.setEntity(formEntity);
        }
        return httpClient.execute(request);
    }
    /**
     * Post String
     *
     * @param host
     * @param path
     * @param method
     * @param headers
     * @param querys
     * @param body
     * @return
     * @throws Exception
     */
    public static HttpResponse doPost(String host, String path, String method,
                                      Map<String, String> headers,
                                      Map<String, String> querys,
                                      String body)
            throws Exception {
        HttpClient httpClient = wrapClient(host);
        HttpPost request = new HttpPost(buildUrl(host, path, querys));
        for (Map.Entry<String, String> e : headers.entrySet()) {
            request.addHeader(e.getKey(), e.getValue());
        }
        if (StringUtils.isNotBlank(body)) {
            request.setEntity(new StringEntity(body, "utf-8"));
        }
        return httpClient.execute(request);
    }
    /**
     * Post stream
     *
     * @param host
     * @param path
     * @param method
     * @param headers
     * @param querys
     * @param body
     * @return
     * @throws Exception
     */
    public static HttpResponse doPost(String host, String path, String method,
                                      Map<String, String> headers,
                                      Map<String, String> querys,
                                      byte[] body)
            throws Exception {
        HttpClient httpClient = wrapClient(host);
        HttpPost request = new HttpPost(buildUrl(host, path, querys));
        for (Map.Entry<String, String> e : headers.entrySet()) {
            request.addHeader(e.getKey(), e.getValue());
        }
        if (body != null) {
            request.setEntity(new ByteArrayEntity(body));
        }
        return httpClient.execute(request);
    }
    /**
     * Put String
     * @param host
     * @param path
     * @param method
     * @param headers
     * @param querys
     * @param body
     * @return
     * @throws Exception
     */
    public static HttpResponse doPut(String host, String path, String method,
                                     Map<String, String> headers,
                                     Map<String, String> querys,
                                     String body)
            throws Exception {
        HttpClient httpClient = wrapClient(host);
        HttpPut request = new HttpPut(buildUrl(host, path, querys));
        for (Map.Entry<String, String> e : headers.entrySet()) {
            request.addHeader(e.getKey(), e.getValue());
        }
        if (StringUtils.isNotBlank(body)) {
            request.setEntity(new StringEntity(body, "utf-8"));
        }
        return httpClient.execute(request);
    }
    /**
     * Put stream
     * @param host
     * @param path
     * @param method
     * @param headers
     * @param querys
     * @param body
     * @return
     * @throws Exception
     */
    public static HttpResponse doPut(String host, String path, String method,
                                     Map<String, String> headers,
                                     Map<String, String> querys,
                                     byte[] body)
            throws Exception {
        HttpClient httpClient = wrapClient(host);
        HttpPut request = new HttpPut(buildUrl(host, path, querys));
        for (Map.Entry<String, String> e : headers.entrySet()) {
            request.addHeader(e.getKey(), e.getValue());
        }
        if (body != null) {
            request.setEntity(new ByteArrayEntity(body));
        }
        return httpClient.execute(request);
    }
    /**
     * Delete
     *
     * @param host
     * @param path
     * @param method
     * @param headers
     * @param querys
     * @return
     * @throws Exception
     */
    public static HttpResponse doDelete(String host, String path, String method,
                                        Map<String, String> headers,
                                        Map<String, String> querys)
            throws Exception {
        HttpClient httpClient = wrapClient(host);
        HttpDelete request = new HttpDelete(buildUrl(host, path, querys));
        for (Map.Entry<String, String> e : headers.entrySet()) {
            request.addHeader(e.getKey(), e.getValue());
        }
        return httpClient.execute(request);
    }
    private static String buildUrl(String host, String path, Map<String, String> querys) throws UnsupportedEncodingException {
        StringBuilder sbUrl = new StringBuilder();
        sbUrl.append(host);
        if (!StringUtils.isBlank(path)) {
            sbUrl.append(path);
        }
        if (null != querys) {
            StringBuilder sbQuery = new StringBuilder();
            for (Map.Entry<String, String> query : querys.entrySet()) {
                if (0 < sbQuery.length()) {
                    sbQuery.append("&");
                }
                if (StringUtils.isBlank(query.getKey()) && !StringUtils.isBlank(query.getValue())) {
                    sbQuery.append(query.getValue());
                }
                if (!StringUtils.isBlank(query.getKey())) {
                    sbQuery.append(query.getKey());
                    if (!StringUtils.isBlank(query.getValue())) {
                        sbQuery.append("=");
                        sbQuery.append(URLEncoder.encode(query.getValue(), "utf-8"));
                    }
                }
            }
            if (0 < sbQuery.length()) {
                sbUrl.append("?").append(sbQuery);
            }
        }
        return sbUrl.toString();
    }
    private static HttpClient wrapClient(String host) {
        HttpClient httpClient = new DefaultHttpClient();
        if (host.startsWith("https://")) {
            sslClient(httpClient);
        }
        return httpClient;
    }
    private static void sslClient(HttpClient httpClient) {
        try {
            SSLContext ctx = SSLContext.getInstance("TLS");
            X509TrustManager tm = new X509TrustManager() {
                public X509Certificate[] getAcceptedIssuers() {
                    return null;
                }
                public void checkClientTrusted(X509Certificate[] xcs, String str) {
                }
                public void checkServerTrusted(X509Certificate[] xcs, String str) {
                }
            };
            ctx.init(null, new TrustManager[] { tm }, null);
            SSLSocketFactory ssf = new SSLSocketFactory(ctx);
            ssf.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
            ClientConnectionManager ccm = httpClient.getConnectionManager();
            SchemeRegistry registry = ccm.getSchemeRegistry();
            registry.register(new Scheme("https", 443, ssf));
        } catch (KeyManagementException ex) {
            throw new RuntimeException(ex);
        } catch (NoSuchAlgorithmException ex) {
            throw new RuntimeException(ex);
        }
    }
}
src/main/java/com/moral/util/WxMappingJackson2HttpMessageConverter.java
New file
@@ -0,0 +1,16 @@
package com.moral.util;
import org.springframework.http.MediaType;
import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter;
import java.util.ArrayList;
import java.util.List;
public class WxMappingJackson2HttpMessageConverter extends MappingJackson2HttpMessageConverter {
    public WxMappingJackson2HttpMessageConverter(){
        List<MediaType> mediaTypes = new ArrayList<>();
        mediaTypes.add(MediaType.TEXT_PLAIN);
        mediaTypes.add(MediaType.TEXT_HTML);  //加入text/html类型的支持
        setSupportedMediaTypes(mediaTypes);// tag6
    }
}
src/main/resources/mapper/AQIMapper.xml
New file
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.mapper.AQIMapper">
    <select id="getCityAqiConfig" resultType="java.util.LinkedHashMap">
        select * from city_aqi_config;
    </select>
    <insert id="insertAQIData">
        INSERT INTO hangzhou_aqi (time, aqi_json,city_code)  values(
        #{time},
        #{data},
        #{code}
        )
    </insert>
</mapper>
src/main/resources/mapper/HistoryMapper.xml
@@ -176,4 +176,11 @@
        </foreach>
    </insert>
    <delete id="deleteHistoryData" parameterType="java.lang.String">
        delete from history3 where  time &lt; #{oldTime};
    </delete>
    <update id="deletePartition">
        ALTER table history3 drop PARTITION ${p};
    </update>
</mapper>
src/main/resources/mapper/HistoryMinutelyMapper.xml
@@ -10,6 +10,16 @@
            (#{map.mac},#{map.time},#{map.json})
        </foreach>
    </insert>
    <update id="createHistoryMinutelyTable">
    CREATE TABLE `history_minutely_${yearAndMonth}` (
        `mac` varchar(20) CHARACTER SET latin1 DEFAULT NULL,
        `time` datetime DEFAULT NULL,
        `json` json DEFAULT NULL,
        KEY `_idx_mac` (`mac`) USING BTREE,
        KEY `_idx_time` (`time`) USING BTREE,
        KEY `_idx_mac_time` (`mac`,`time`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    </update>
    <select id="getMinutelySensorData" resultType="java.util.LinkedHashMap">
        SELECT