src/main/java/com/moral/mapper/AQIMapper.java
New file @@ -0,0 +1,10 @@ package com.moral.mapper; import java.util.List; import java.util.Map; public interface AQIMapper { List<Map<String, Object>> getCityAqiConfig(); int insertAQIData(Map<String, Object> parameters); } src/main/java/com/moral/mapper/HistoryMapper.java
@@ -4,6 +4,7 @@ import java.util.Map; import com.moral.entity.History; import org.apache.ibatis.annotations.Param; public interface HistoryMapper { int insert(History record); @@ -28,4 +29,7 @@ int insertHistorySpecialTable(Map<String, Object> parameters); int deleteHistoryData(String oldTime); void deletePartition(@Param("p") String p); } src/main/java/com/moral/mapper/HistoryMinutelyMapper.java
@@ -11,4 +11,5 @@ List<Map<String, Object>> getMinutelySensorData(Map<String, Object> parameters); void createHistoryMinutelyTable(@Param("yearAndMonth") String yearAndMonth); } src/main/java/com/moral/service/AQIService.java
New file @@ -0,0 +1,10 @@ package com.moral.service; import java.util.List; import java.util.Map; public interface AQIService { List<Map<String, Object>> getCityAqiConfig(); int insertAQIData(Map<String, Object> parameters); } src/main/java/com/moral/service/HistoryMinutelyService.java
@@ -9,4 +9,7 @@ List<Map<String, Object>> getMinutelySensorData(Map<String, Object> parameters); //创建分表任务 void createHistoryMinutelyTable(String yearAndMonth); } src/main/java/com/moral/service/HistoryService.java
@@ -10,4 +10,8 @@ List<History> selectByMacAndTime(Map<String, Object> parameters); int insertHistorySpecialTable(Map<String, Object> parameters); int deleteHistoryData(String oldTime); void deletePartition(String p); } src/main/java/com/moral/service/impl/AQIServiceImpl.java
New file @@ -0,0 +1,23 @@ package com.moral.service.impl; import com.moral.mapper.AQIMapper; import com.moral.service.AQIService; import org.springframework.stereotype.Service; import javax.annotation.Resource; import java.util.List; import java.util.Map; @Service public class AQIServiceImpl implements AQIService { @Resource private AQIMapper aqiMapper; @Override public List<Map<String, Object>> getCityAqiConfig() { return aqiMapper.getCityAqiConfig(); } @Override public int insertAQIData(Map<String, Object> parameters) { return aqiMapper.insertAQIData(parameters); } } src/main/java/com/moral/service/impl/HistoryMinutelyServiceImpl.java
@@ -26,4 +26,9 @@ return historyMinutelyMapper.getMinutelySensorData(parameters); } @Override public void createHistoryMinutelyTable(String yearAndMonth) { historyMinutelyMapper.createHistoryMinutelyTable(yearAndMonth); } } src/main/java/com/moral/service/impl/HistoryServiceImpl.java
@@ -26,4 +26,14 @@ public int insertHistorySpecialTable(Map<String, Object> parameters) { return historyMapper.insertHistorySpecialTable(parameters); } @Override public int deleteHistoryData(String oldTime) { return historyMapper.deleteHistoryData(oldTime); } @Override public void deletePartition(String p) { historyMapper.deletePartition(p); } } src/main/java/com/moral/task/AQIDataInsertTask.java
New file @@ -0,0 +1,83 @@ package com.moral.task; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.moral.service.AQIService; import com.moral.util.HttpUtils; import com.moral.util.WxMappingJackson2HttpMessageConverter; import org.apache.commons.lang3.time.DateUtils; import org.apache.http.HttpResponse; import org.apache.http.util.EntityUtils; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.web.client.RestTemplate; import redis.clients.jedis.Jedis; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.*; @Component public class AQIDataInsertTask { @Resource private AQIService aqiService; public void insertData(){ SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); RestTemplate restTemplate = new RestTemplate(); restTemplate.getMessageConverters().add(new WxMappingJackson2HttpMessageConverter()); Date pubtime = DateUtils.truncate(new Date(),Calendar.HOUR); Jedis jedis = new Jedis("r-bp1672d21a422a14161.redis.rds.aliyuncs.com", 6379); jedis.auth("KtElFcI1sYm9NP3"); jedis.select(1); List<Map<String,Object>> CityAqiConfigs =aqiService.getCityAqiConfig(); for (Map<String, Object> cityAqiConfig : CityAqiConfigs) { String entity = null ; Collection<Object> values = null; Map<String, Object> data = null; try { HttpResponse response = HttpUtils.doGet("https://api.epmap.org", "/api/v1/air/city", "GET", new HashMap<String, String>() {{put("Authorization", "APPCODE " + "31b6ea8f804a4472be3b633cfee44849");}}, new HashMap<String, String>() {{put("city", cityAqiConfig.get("city_name").toString());}} ); entity = EntityUtils.toString(response.getEntity()); JSONObject json = JSON.parseObject(entity); data = (Map<String, Object>) json.get("data"); if (!ObjectUtils.isEmpty(data)) { values = data.values(); pubtime = format.parse(data.get("pubtime").toString()); } } catch (Exception e) { e.printStackTrace(); } Map<String, Object> parameters = new HashMap<>(); parameters.put("time",pubtime); parameters.put("data",data.toString()); parameters.put("code",cityAqiConfig.get("city_code")); if (!ObjectUtils.isEmpty(data)) { try { aqiService.insertAQIData(parameters); } catch (Exception e) { e.printStackTrace(); } if ("1".equals(cityAqiConfig.get("is_compensate"))) { Map<String, String> map = new HashMap<String, String>(); map.put("e1", data.containsKey("PM25C") ? data.get("PM25C").toString() : data.get("PM2_5").toString()); map.put("e2", data.containsKey("PM10C") ? data.get("PM10C").toString() : data.get("PM10").toString()); map.put("e10", data.containsKey("COC") ? data.get("COC").toString() : data.get("CO").toString()); map.put("e11", data.containsKey("SO2C") ? data.get("SO2C").toString() : data.get("SO2").toString()); map.put("e15", data.containsKey("O3C") ? data.get("O3C").toString() : data.get("O3").toString()); map.put("e16", data.containsKey("NO2C") ? data.get("NO2C").toString() : data.get("NO2").toString()); jedis.hmset("aqi_" + cityAqiConfig.get("city_code"), map); } } } } } src/main/java/com/moral/task/HistoryMinutelySubTableTask.java
New file @@ -0,0 +1,28 @@ package com.moral.task; import com.moral.service.HistoryMinutelyService; import com.xxl.job.core.biz.model.ReturnT; import org.springframework.format.datetime.joda.LocalDateTimeParser; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.time.LocalDateTime; @Component public class HistoryMinutelySubTableTask { @Resource private HistoryMinutelyService historyMinutelyService; public ReturnT createHistoryMinutelyTb(){ LocalDateTime time=LocalDateTime.now(); LocalDateTime time1 = time.plusMonths(1); String year = String.valueOf(time1.getYear()); String month = String.valueOf(time1.getMonthValue()); if (month.length()<2){ month="0"+month; } String yearAndMonth=year+month; historyMinutelyService.createHistoryMinutelyTable(yearAndMonth); ReturnT returnT = new ReturnT(); return returnT; } } src/main/java/com/moral/task/HistoryTableDeleteTask.java
New file @@ -0,0 +1,39 @@ package com.moral.task; import com.moral.service.HistoryService; import com.moral.util.DateUtil; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.handler.annotation.XxlJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; @Component public class HistoryTableDeleteTask { private static transient Logger logger = LoggerFactory.getLogger(HistoryTableInsertTask.class); @Resource private HistoryService historyService; @XxlJob("deleteOldDate") public ReturnT deleteOldData() { String p=DateUtil.getOldTime(8); String s=DateUtil.getOldTime(7); String[] ss=p.split("-"); p="p"+ss[0]+ss[1]+ss[2]; int i=historyService.deleteHistoryData(s); if (i>0){ ReturnT returnT = new ReturnT(200, s+"之前数据删除成功"); historyService.deletePartition(p); return returnT; }else{ ReturnT returnT = new ReturnT(500, "删除数据失败"); return returnT; } } } src/main/java/com/moral/util/DateUtil.java
New file @@ -0,0 +1,16 @@ package com.moral.util; import java.text.SimpleDateFormat; import java.util.Date; public class DateUtil { //获得过去i天的年月日 public static String getOldTime(int i){ Long time=System.currentTimeMillis(); Long time1=time-(1000 * 60 * 60 * 24 * i); SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String sd = sdf.format(new Date(Long.parseLong(String.valueOf(time1)))); String s=sd.substring(0,10); return s; } } src/main/java/com/moral/util/HttpUtils.java
New file @@ -0,0 +1,310 @@ package com.moral.util; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.scheme.Scheme; import org.apache.http.conn.scheme.SchemeRegistry; import org.apache.http.conn.ssl.SSLSocketFactory; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicNameValuePair; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; import java.util.Map; public class HttpUtils { /** * get * * @param host * @param path * @param method * @param headers * @param querys * @return * @throws Exception */ public static HttpResponse doGet(String host, String path, String method, Map<String, String> headers, Map<String, String> querys) throws Exception { HttpClient httpClient = wrapClient(host); HttpGet request = new HttpGet(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } return httpClient.execute(request); } /** * post form * * @param host * @param path * @param method * @param headers * @param querys * @param bodys * @return * @throws Exception */ public static HttpResponse doPost(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, Map<String, String> bodys) throws Exception { HttpClient httpClient = wrapClient(host); HttpPost request = new HttpPost(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (bodys != null) { List<NameValuePair> nameValuePairList = new ArrayList<NameValuePair>(); for (String key : bodys.keySet()) { nameValuePairList.add(new BasicNameValuePair(key, bodys.get(key))); } UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(nameValuePairList, "utf-8"); formEntity.setContentType("application/x-www-form-urlencoded; charset=UTF-8"); request.setEntity(formEntity); } return httpClient.execute(request); } /** * Post String * * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPost(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, String body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPost request = new HttpPost(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (StringUtils.isNotBlank(body)) { request.setEntity(new StringEntity(body, "utf-8")); } return httpClient.execute(request); } /** * Post stream * * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPost(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, byte[] body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPost request = new HttpPost(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (body != null) { request.setEntity(new ByteArrayEntity(body)); } return httpClient.execute(request); } /** * Put String * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPut(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, String body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPut request = new HttpPut(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (StringUtils.isNotBlank(body)) { request.setEntity(new StringEntity(body, "utf-8")); } return httpClient.execute(request); } /** * Put stream * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPut(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, byte[] body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPut request = new HttpPut(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (body != null) { request.setEntity(new ByteArrayEntity(body)); } return httpClient.execute(request); } /** * Delete * * @param host * @param path * @param method * @param headers * @param querys * @return * @throws Exception */ public static HttpResponse doDelete(String host, String path, String method, Map<String, String> headers, Map<String, String> querys) throws Exception { HttpClient httpClient = wrapClient(host); HttpDelete request = new HttpDelete(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } return httpClient.execute(request); } private static String buildUrl(String host, String path, Map<String, String> querys) throws UnsupportedEncodingException { StringBuilder sbUrl = new StringBuilder(); sbUrl.append(host); if (!StringUtils.isBlank(path)) { sbUrl.append(path); } if (null != querys) { StringBuilder sbQuery = new StringBuilder(); for (Map.Entry<String, String> query : querys.entrySet()) { if (0 < sbQuery.length()) { sbQuery.append("&"); } if (StringUtils.isBlank(query.getKey()) && !StringUtils.isBlank(query.getValue())) { sbQuery.append(query.getValue()); } if (!StringUtils.isBlank(query.getKey())) { sbQuery.append(query.getKey()); if (!StringUtils.isBlank(query.getValue())) { sbQuery.append("="); sbQuery.append(URLEncoder.encode(query.getValue(), "utf-8")); } } } if (0 < sbQuery.length()) { sbUrl.append("?").append(sbQuery); } } return sbUrl.toString(); } private static HttpClient wrapClient(String host) { HttpClient httpClient = new DefaultHttpClient(); if (host.startsWith("https://")) { sslClient(httpClient); } return httpClient; } private static void sslClient(HttpClient httpClient) { try { SSLContext ctx = SSLContext.getInstance("TLS"); X509TrustManager tm = new X509TrustManager() { public X509Certificate[] getAcceptedIssuers() { return null; } public void checkClientTrusted(X509Certificate[] xcs, String str) { } public void checkServerTrusted(X509Certificate[] xcs, String str) { } }; ctx.init(null, new TrustManager[] { tm }, null); SSLSocketFactory ssf = new SSLSocketFactory(ctx); ssf.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); ClientConnectionManager ccm = httpClient.getConnectionManager(); SchemeRegistry registry = ccm.getSchemeRegistry(); registry.register(new Scheme("https", 443, ssf)); } catch (KeyManagementException ex) { throw new RuntimeException(ex); } catch (NoSuchAlgorithmException ex) { throw new RuntimeException(ex); } } } src/main/java/com/moral/util/WxMappingJackson2HttpMessageConverter.java
New file @@ -0,0 +1,16 @@ package com.moral.util; import org.springframework.http.MediaType; import org.springframework.http.converter.json.MappingJackson2HttpMessageConverter; import java.util.ArrayList; import java.util.List; public class WxMappingJackson2HttpMessageConverter extends MappingJackson2HttpMessageConverter { public WxMappingJackson2HttpMessageConverter(){ List<MediaType> mediaTypes = new ArrayList<>(); mediaTypes.add(MediaType.TEXT_PLAIN); mediaTypes.add(MediaType.TEXT_HTML); //加入text/html类型的支持 setSupportedMediaTypes(mediaTypes);// tag6 } } src/main/resources/mapper/AQIMapper.xml
New file @@ -0,0 +1,15 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.moral.mapper.AQIMapper"> <select id="getCityAqiConfig" resultType="java.util.LinkedHashMap"> select * from city_aqi_config; </select> <insert id="insertAQIData"> INSERT INTO hangzhou_aqi (time, aqi_json,city_code) values( #{time}, #{data}, #{code} ) </insert> </mapper> src/main/resources/mapper/HistoryMapper.xml
@@ -176,4 +176,11 @@ </foreach> </insert> <delete id="deleteHistoryData" parameterType="java.lang.String"> delete from history3 where time < #{oldTime}; </delete> <update id="deletePartition"> ALTER table history3 drop PARTITION ${p}; </update> </mapper> src/main/resources/mapper/HistoryMinutelyMapper.xml
@@ -10,6 +10,16 @@ (#{map.mac},#{map.time},#{map.json}) </foreach> </insert> <update id="createHistoryMinutelyTable"> CREATE TABLE `history_minutely_${yearAndMonth}` ( `mac` varchar(20) CHARACTER SET latin1 DEFAULT NULL, `time` datetime DEFAULT NULL, `json` json DEFAULT NULL, KEY `_idx_mac` (`mac`) USING BTREE, KEY `_idx_time` (`time`) USING BTREE, KEY `_idx_mac_time` (`mac`,`time`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8; </update> <select id="getMinutelySensorData" resultType="java.util.LinkedHashMap"> SELECT