中间数据迁入正式表,kafka小时数据存入中间表,删除中间表数据
1 files added
13 files modified
| | |
| | | public static final String UN_ADJUST = "unadjust"; |
| | | |
| | | /* |
| | | * 中间表后准 |
| | | * */ |
| | | public static final String TRANSITION = "transition"; |
| | | |
| | | /* |
| | | * 离线设备状态码 |
| | | * */ |
| | | public static final String DEVICE_STATE_OFFLINE = "0"; |
| | |
| | | package com.moral.api.mapper; |
| | | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
| | |
| | | */ |
| | | public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> { |
| | | |
| | | void insertHistoryHourly(List<HistoryHourly> list); |
| | | |
| | | List<HistoryHourly> getHistoryHourlyTransition(); |
| | | |
| | | void deleteHistoryHourlyTransition(); |
| | | |
| | | } |
| | |
| | | */ |
| | | public interface HistoryHourlyService extends IService<HistoryHourly> { |
| | | |
| | | //中间小时表数据迁入正式表 |
| | | void insertHistoryHourly(); |
| | | |
| | | //删除中间小时表数据 |
| | | void deleteHistoryHourlyTransition(); |
| | | |
| | | } |
| | |
| | | import com.moral.api.mapper.HistoryHourlyMapper; |
| | | import com.moral.api.service.HistoryHourlyService; |
| | | import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
| | | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.util.List; |
| | | |
| | | /** |
| | | * <p> |
| | |
| | | @Service |
| | | public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService { |
| | | |
| | | @Autowired |
| | | private HistoryHourlyMapper historyHourlyMapper; |
| | | |
| | | @Override |
| | | public void insertHistoryHourly() { |
| | | //获取中间小时表数据 |
| | | List<HistoryHourly> list = historyHourlyMapper.getHistoryHourlyTransition(); |
| | | historyHourlyMapper.insertHistoryHourly(list); |
| | | } |
| | | |
| | | @Override |
| | | public void deleteHistoryHourlyTransition() { |
| | | historyHourlyMapper.deleteHistoryHourlyTransition(); |
| | | } |
| | | } |
New file |
| | |
| | | package com.moral.api.task; |
| | | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import com.moral.api.service.HistoryHourlyService; |
| | | import com.xxl.job.core.biz.model.ReturnT; |
| | | import com.xxl.job.core.context.XxlJobHelper; |
| | | import com.xxl.job.core.handler.annotation.XxlJob; |
| | | |
| | | @Component |
| | | public class DeleteHistoryHourlyTransitionTask { |
| | | |
| | | @Autowired |
| | | private HistoryHourlyService historyHourlyService; |
| | | |
| | | //删除中间小时表数据 |
| | | @XxlJob("deleteHourlyTransition") |
| | | public ReturnT insertHistoryFiveMinutely() { |
| | | try { |
| | | historyHourlyService.deleteHistoryHourlyTransition(); |
| | | } catch (Exception e) { |
| | | XxlJobHelper.log(e.getMessage()); |
| | | return ReturnT.FAIL; |
| | | } |
| | | return ReturnT.SUCCESS; |
| | | } |
| | | |
| | | } |
| | |
| | | |
| | | import com.moral.api.service.HistoryDailyService; |
| | | import com.moral.api.service.HistoryFiveMinutelyService; |
| | | import com.moral.api.service.HistoryHourlyService; |
| | | import com.moral.api.service.HistoryMonthlyService; |
| | | import com.moral.api.service.HistoryWeeklyService; |
| | | import com.xxl.job.core.biz.model.ReturnT; |
| | |
| | | |
| | | @Autowired |
| | | private HistoryFiveMinutelyService historyFiveMinutelyService; |
| | | |
| | | @Autowired |
| | | private HistoryHourlyService historyHourlyService; |
| | | |
| | | @Autowired |
| | | private HistoryDailyService historyDailyService; |
| | |
| | | return ReturnT.SUCCESS; |
| | | } |
| | | |
| | | //小时数据从中间表迁入正式表 |
| | | @XxlJob("insertHistoryHourly") |
| | | public ReturnT insertHistoryHourly() { |
| | | try { |
| | | historyHourlyService.insertHistoryHourly(); |
| | | } catch (Exception e) { |
| | | XxlJobHelper.log(e.getMessage()); |
| | | return ReturnT.FAIL; |
| | | } |
| | | return ReturnT.SUCCESS; |
| | | } |
| | | |
| | | |
| | | //天数据统计 |
| | | @XxlJob("insertHistoryDaily") |
| | | public ReturnT insertHistoryDaily() { |
| | |
| | | <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
| | | <mapper namespace="com.moral.api.mapper.DeviceMapper"> |
| | | |
| | | <!-- 通用查询映射结果 --> |
| | | <resultMap id="BaseResultMap" type="com.moral.api.entity.Device"> |
| | | <id column="id" property="id" /> |
| | | <result column="name" property="name" /> |
| | | <result column="mac" property="mac" /> |
| | | <result column="address" property="address" /> |
| | | <result column="longitude" property="longitude" /> |
| | | <result column="latitude" property="latitude" /> |
| | | <result column="state" property="state" /> |
| | | <result column="operate_ids" property="operateIds" /> |
| | | <result column="monitor_point_id" property="monitorPointId" /> |
| | | <result column="organization_id" property="organizationId" /> |
| | | <result column="device_version_id" property="deviceVersionId" /> |
| | | <result column="profession" property="profession" /> |
| | | <result column="tech" property="tech" /> |
| | | <result column="detector" property="detector" /> |
| | | <result column="purchaser" property="purchaser" /> |
| | | <result column="create_time" property="createTime" /> |
| | | <result column="update_time" property="updateTime" /> |
| | | <result column="install_time" property="installTime" /> |
| | | <result column="is_delete" property="isDelete" /> |
| | | <result column="extend" property="extend" /> |
| | | </resultMap> |
| | | <!-- 通用查询映射结果 --> |
| | | <resultMap id="BaseResultMap" type="com.moral.api.entity.Device"> |
| | | <id column="id" property="id"/> |
| | | <result column="name" property="name"/> |
| | | <result column="mac" property="mac"/> |
| | | <result column="address" property="address"/> |
| | | <result column="longitude" property="longitude"/> |
| | | <result column="latitude" property="latitude"/> |
| | | <result column="state" property="state"/> |
| | | <result column="operate_ids" property="operateIds"/> |
| | | <result column="monitor_point_id" property="monitorPointId"/> |
| | | <result column="organization_id" property="organizationId"/> |
| | | <result column="device_version_id" property="deviceVersionId"/> |
| | | <result column="profession" property="profession"/> |
| | | <result column="tech" property="tech"/> |
| | | <result column="detector" property="detector"/> |
| | | <result column="purchaser" property="purchaser"/> |
| | | <result column="create_time" property="createTime"/> |
| | | <result column="update_time" property="updateTime"/> |
| | | <result column="install_time" property="installTime"/> |
| | | <result column="is_delete" property="isDelete"/> |
| | | <result column="extend" property="extend"/> |
| | | </resultMap> |
| | | |
| | | </mapper> |
| | |
| | | <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
| | | <mapper namespace="com.moral.api.mapper.HistoryAqiMapper"> |
| | | |
| | | <!-- 通用查询映射结果 --> |
| | | <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryAqi"> |
| | | <result column="city_code" property="cityCode" /> |
| | | <result column="time" property="time" /> |
| | | <result column="value" property="value" /> |
| | | </resultMap> |
| | | <!-- 通用查询映射结果 --> |
| | | <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryAqi"> |
| | | <result column="city_code" property="cityCode"/> |
| | | <result column="time" property="time"/> |
| | | <result column="value" property="value"/> |
| | | </resultMap> |
| | | |
| | | </mapper> |
| | |
| | | <result column="version" property="version"/> |
| | | </resultMap> |
| | | |
| | | <insert id="insertHistoryHourly"> |
| | | INSERT INTO history_hourly VALUES |
| | | <foreach collection="list" item="item" separator=","> |
| | | (#{item.mac}, #{item.time}, #{item.value}, #{item.version}) |
| | | </foreach> |
| | | </insert> |
| | | |
| | | <select id="getHistoryHourlyTransition" resultType="com.moral.api.entity.HistoryHourly"> |
| | | SELECT mac, |
| | | `time`, |
| | | `value`, |
| | | `version` |
| | | FROM history_hourly_transition |
| | | WHERE create_time = update_time |
| | | </select> |
| | | |
| | | <delete id="deleteHistoryHourlyTransition"> |
| | | DELETE FROM history_hourly_transition |
| | | </delete> |
| | | </mapper> |
| | |
| | | @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String"), |
| | | @ApiImplicitParam(name = "name", value = "设备名称模糊查询", required = false, paramType = "query", dataType = "String"), |
| | | @ApiImplicitParam(name = "mac", value = "mac模糊查询", required = false, paramType = "query", dataType = "String"), |
| | | @ApiImplicitParam(name = "organizationId", value = "根据组织查询", required = false, paramType = "query", dataType = "String"), |
| | | @ApiImplicitParam(name = "monitorPointId", value = "站点id", required = false, paramType = "query", dataType = "String") |
| | | |
| | | }) |
| | | @RequestMapping(value = "select", method = RequestMethod.GET) |
| | | public ResultMessage select(HttpServletRequest request) { |
| | |
| | | package com.moral.api.mapper; |
| | | |
| | | import org.apache.ibatis.annotations.Param; |
| | | |
| | | import java.util.Map; |
| | | |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
| | | |
| | | |
| | | /** |
| | | * <p> |
| | |
| | | |
| | | void insertHistoryHourlyUnAdjust(Map<String, Object> params); |
| | | |
| | | int getCountByMacAndTime(@Param("mac") String mac, @Param("time") String time); |
| | | |
| | | void insertHistoryHourlyTransition(Map<String, Object> params); |
| | | |
| | | void updateHistoryTransition(@Param("mac") String mac, @Param("time") String time); |
| | | } |
| | |
| | | public void insertHistoryHourly(Map<String, Object> data) { |
| | | Map<String, Object> dataAdjust = new HashMap<>(data); |
| | | String mac = data.remove("mac").toString(); |
| | | Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN); |
| | | |
| | | Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN); |
| | | Integer version = (Integer) data.remove("ver"); |
| | | Map<String, Object> result = new HashMap<>(data); |
| | | Map<String, Object> result = new HashMap<>(); |
| | | result.put("mac", mac); |
| | | result.put("time", time); |
| | | result.put("version", version); |
| | |
| | | dataAdjust.remove("mac"); |
| | | dataAdjust.remove("DataTime"); |
| | | dataAdjust.remove("ver"); |
| | | HistoryHourly historyHourly = new HistoryHourly(); |
| | | historyHourly.setMac(mac); |
| | | historyHourly.setTime(time); |
| | | historyHourly.setVersion(version); |
| | | historyHourly.setValue(JSONObject.toJSONString(dataAdjust)); |
| | | historyHourlyMapper.insert(historyHourly); |
| | | |
| | | int count = historyHourlyMapper.getCountByMacAndTime(mac, DateUtils.dateToDateString(time)); |
| | | |
| | | //判断中间表有没有该mac,该小时的数据,有就更新,没有就新增 |
| | | if (count == 0) { |
| | | //小时数据校准后存入小时中间表 |
| | | result.put("value", JSONObject.toJSONString(dataAdjust)); |
| | | //新增 |
| | | historyHourlyMapper.insertHistoryHourlyTransition(result); |
| | | } else { |
| | | //更新 |
| | | historyHourlyMapper.updateHistoryTransition(mac, DateUtils.dateToDateString(time)); |
| | | } |
| | | } |
| | | } |
| | |
| | | @Slf4j |
| | | @Component |
| | | public class AdjustDataUtils { |
| | | //数据,公式 |
| | | /** |
| | | * @param deviceData 设备数据 |
| | | * @param adjustFormula 校准公式 |
| | | * @param aqiMap 设备所在地区对应的墨迹aqi数据 |
| | | * @return Map<String, Object> 校准后数据 |
| | | * */ |
| | | public Map<String, Object> adjust(Map<String, Object> deviceData, Map<String, Object> adjustFormula, Map<String, Object> aqiMap) { |
| | | try { |
| | | Date time = DateUtils.getDate((String) deviceData.get("DataTime"), DateUtils.yyyyMMddHHmmss_EN); |
| | |
| | | INSERT INTO history_hourly_${timeUnits} VALUES (#{mac}, #{time}, #{value}, #{version}) |
| | | </insert> |
| | | |
| | | <select id="getCountByMacAndTime" resultType="java.lang.Integer"> |
| | | SELECT count(1) FROM history_hourly_transition WHERE mac = #{mac} AND `time` = #{time} |
| | | </select> |
| | | |
| | | <insert id="insertHistoryHourlyTransition"> |
| | | INSERT INTO history_hourly_transition (mac, `time`, `value`, version) VALUES (#{mac}, #{time}, #{value}, #{version}) |
| | | </insert> |
| | | |
| | | <update id="updateHistoryTransition"> |
| | | UPDATE history_hourly_transition SET update_time = now() WHERE mac = #{mac} AND `time` = {time} |
| | | </update> |
| | | </mapper> |