jinpengyong
2021-09-01 7607bab6e868a51609164ce111c9d5e1046cd11f
走航车实时websocket,小时表分表
2 files renamed
3 files deleted
2 files added
15 files modified
520 ■■■■■ changed files
screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java 29 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java 4 ●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/controller/CruiserController.java 10 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java 47 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java 109 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java 7 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java 5 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java 20 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 21 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java 86 ●●●●● patch | view | raw | blame | history
screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml 11 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java 50 ●●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java 4 ●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 15 ●●●● patch | view | raw | blame | history
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml 3 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java 8 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java 46 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java 7 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java 8 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java 5 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java 17 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml 8 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -31,6 +31,8 @@
    private int concurrency;
    @Value("${kafka.groupId.second-data}")
    private String secondDataGroupId;
    @Value("${kafka.groupId.cruiser-data}")
    private String cruiserDataGroupId;
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
@@ -54,6 +56,15 @@
        return factory;
    }
    @Bean("cruiserDataListenerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(cruiserDataConsumerFactory());//设置消费者工厂
        factory.setConcurrency(concurrency);//设置线程数
        factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间
        return factory;
    }
    /**
    * @Description: 秒级消费工厂
            * @Param: []
@@ -64,6 +75,16 @@
    public ConsumerFactory<String,String> secondDataConsumerFactory(){
        Map<String, Object> commonConfig = consumerConfigs();
        Map<String, Object> secondDataConfig = secondConsumerConfigs();
        secondDataConfig.putAll(commonConfig);
        return new DefaultKafkaConsumerFactory<>(secondDataConfig);
    }
    /*
    * 走航车数据消费工厂
    * */
    public ConsumerFactory<String,String> cruiserDataConsumerFactory(){
        Map<String, Object> commonConfig = consumerConfigs();
        Map<String, Object> secondDataConfig = cruiserConsumerConfigs();
        secondDataConfig.putAll(commonConfig);
        return new DefaultKafkaConsumerFactory<>(secondDataConfig);
    }
@@ -81,6 +102,14 @@
        return propsMap;
    }
    /*
    * 走航车消费组配置
    * */
    public Map<String,Object> cruiserConsumerConfigs(){
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId);
        return propsMap;
    }
    /**
    * @Description: 通用配置
screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
@@ -1,7 +1,7 @@
package com.moral.api.config.websocket;
import com.moral.api.websocket.CruiserWebSocketServer;
import com.moral.api.websocket.SingleDeviceServer;
import com.moral.constant.RedisConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -26,5 +26,7 @@
    @Autowired
    public void setMessageService(RedisTemplate redisTemplate){
        SingleDeviceServer.redisTemplate = redisTemplate;
        CruiserWebSocketServer.redisTemplate = redisTemplate;
    }
}
screen-api/src/main/java/com/moral/api/controller/CruiserController.java
File was renamed from screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java
@@ -23,11 +23,11 @@
import com.moral.util.WebUtils;
@Slf4j
@Api(tags = {"特殊设备"})
@Api(tags = {"走航车"})
@RestController
@CrossOrigin(origins = "*", maxAge = 3600)
@RequestMapping("/specialDevice")
public class SpecialDeviceController {
@RequestMapping("/cruiser")
public class CruiserController {
    @Autowired
    private SpecialDeviceService specialDeviceService;
@@ -35,7 +35,7 @@
    /**
     * @return 返回请求成功后的对象信息
     */
    @GetMapping("getCarsByOrg")
    @GetMapping("selectCruisers")
    @ApiOperation(value = "获取当前组织下所有走航车列表", notes = "走航车轨迹")
    public ResultMessage getCarsInfo() {
        List<Map<String, Object>> response = specialDeviceService.getCarsInfo();
@@ -46,7 +46,7 @@
     * @param request 请求信息
     * @return 返回请求成功后的对象信息
     */
    @GetMapping("carTrajectory")
    @GetMapping("cruiserTrajectory")
    @ApiOperation(value = "走航车轨迹", notes = "走航车轨迹")
    @ApiImplicitParams(value = {
            @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String"),
screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java
File was deleted
screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java
New file
@@ -0,0 +1,109 @@
package com.moral.api.kafka.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.moral.api.entity.Sensor;
import com.moral.api.entity.SpecialDevice;
import com.moral.api.entity.UnitConversion;
import com.moral.api.websocket.CruiserWebSocketServer;
import com.moral.constant.KafkaConstants;
import com.moral.util.UnitConvertUtils;
/*
 * 走航车数据消费者
 * */
@Component
@Slf4j
public class CruiserDataConsumer implements ConsumerSeekAware {
    @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "cruiserDataListenerFactory")
    public void listenSecondSpecial(ConsumerRecord<String, String> record) throws Exception {
        String msg = record.value();
        Map<String, Object> data = JSONObject.parseObject(msg, Map.class);
        CopyOnWriteArraySet<CruiserWebSocketServer> sockets = CruiserWebSocketServer.sockets;
        for (CruiserWebSocketServer socket : sockets) {
            String messageMac = (String) data.get("mac");
            String mac = socket.getMac();
            if (!mac.equalsIgnoreCase(messageMac))
                continue;
            //取出基本信息
            SpecialDevice specialDevice = socket.getSpecialDevice();
            //创建最终消息对象
            Map<String, Object> result = new HashMap<>();
            result.put("time",data.get("time"));
            //拼接单位
            List<Sensor> sensors = specialDevice.getVersion().getSensors();//获取型号所有因子信息
            for (Sensor sensor : sensors) {
                String code = sensor.getCode();
                String showUnit = sensor.getShowUnit();
                String showUnitKey = sensor.getShowUnitKey();
                String unitKey = sensor.getUnitKey();
                String unit = sensor.getUnit();
                //对数据保留两位小数,并且向下取整
                if (data.get(code) == null) {
                    continue;
                }
                Double sourceDataD = Double.valueOf(String.valueOf(data.get(code)));
                /*BigDecimal bg = new BigDecimal(sourceDataD);
                bg = bg.setScale(2, BigDecimal.ROUND_FLOOR);*/
                String sourceData = String.valueOf(sourceDataD);
                //数据补偿
                //单位转换
                if (!unitKey.equals(showUnitKey)) {//如果源单位和显示单位不同,则进行单位转换
                    String formula = sensor.getFormula();
                    //如果sensor中的公式为空则从缓存中获取公式
                    if (ObjectUtils.isEmpty(formula)) {
                        List<UnitConversion> unitConversions = socket.getUnitConversions();
                        for (UnitConversion unitConversion : unitConversions) {
                            if (unitConversion.getOriginalUnitKey().equals(unitKey) && unitConversion.getTargetUnitKey().equals(showUnitKey))
                                formula = unitConversion.getFormula();
                        }
                    }
                    //单位转换
                    String resultData = UnitConvertUtils.calculate(sourceData, formula);
                    if (resultData != null) {
                        resultData += showUnit;
                    } else {//如果转换出的数据为null,则代表缓存中也没有公式,依然使用源单位。
                        resultData = sourceData + unit;
                    }
                    result.put(sensor.getCode(), resultData);
                } else {
                    //拼接单位
                    sourceData = sourceData + " " + showUnit;
                    result.put(sensor.getCode(), sourceData);
                }
            }
            socket.sendMessage(JSON.toJSONString(result));
        }
    }
    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
    }
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
        map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition()));
    }
    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
    }
}
screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -1,7 +1,6 @@
package com.moral.api.mapper;
import com.moral.api.entity.HistoryHourly;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import java.util.Map;
/**
 * <p>
@@ -11,6 +10,8 @@
 * @author moral
 * @since 2021-07-14
 */
public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> {
public interface HistoryHourlyMapper{
    String selectHourlyData(Map<String,Object> params);
}
screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -2,9 +2,6 @@
import java.util.Map;
import com.moral.api.entity.HistoryHourly;
import com.baomidou.mybatisplus.extension.service.IService;
/**
 * <p>
 * 已校准小时表 服务类
@@ -13,7 +10,7 @@
 * @author moral
 * @since 2021-07-14
 */
public interface HistoryHourlyService extends IService<HistoryHourly> {
public interface HistoryHourlyService{
    //根据mac获取小时AQI
    Map<String,Object> getHourlyAqiByMac(String mac);
screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -86,7 +86,6 @@
        queryWrapper.select("mac", "name").in("mac", macs);
        List<Device> devices = deviceMapper.selectList(queryWrapper);
        //所选时间
        List<String> times = (List<String>) params.remove("times");
        //因子code
@@ -100,7 +99,8 @@
        for (String start : times) {
            if ("hour".equals(type)) {
                end = DateUtils.getDateAddDay(start, 1);
                timeUnits = "hourly";
                String yearAndMonth = DateUtils.dateToDateString(DateUtils.getDate(start, DateUtils.yyyy_MM_dd_EN), DateUtils.yyyyMM_EN);
                timeUnits = "hourly_" + yearAndMonth;
                dateFormat = "%Y-%m-%d %H";
            } else if ("day".equals(type)) {
                end = DateUtils.getDateAddMonth(start, 1);
@@ -146,26 +146,26 @@
    @Override
    public Device getDeviceByMac(String mac) {
        Map<String,Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE,mac);
        Map<String, Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac);
        Device device = JSON.parseObject(JSON.toJSONString(deviceMap), Device.class);
        //从map获取organizationId和monitorPointId以及versionId
        Map<String,Object> organizationMap = (Map<String,Object>)deviceMap.get("organization");
        Map<String,Object> monitorPointMap = (Map<String,Object>)deviceMap.get("monitorPoint");
        Map<String,Object> versionMap = (Map<String,Object>)deviceMap.get("version");
        Map<String, Object> organizationMap = (Map<String, Object>) deviceMap.get("organization");
        Map<String, Object> monitorPointMap = (Map<String, Object>) deviceMap.get("monitorPoint");
        Map<String, Object> versionMap = (Map<String, Object>) deviceMap.get("version");
        device.setDeviceVersionId((Integer) versionMap.get("id"));
        device.setOrganizationId((Integer) organizationMap.get("id"));
        device.setMonitorPointId((Integer) monitorPointMap.get("id"));
        //如果缓存为空则查询数据库
        if(ObjectUtils.isEmpty(device)){
        if (ObjectUtils.isEmpty(device)) {
            return getDeviceByMacFromDB(mac);
        }
        return device;
    }
    private Device getDeviceByMacFromDB(String mac){
    private Device getDeviceByMacFromDB(String mac) {
        QueryWrapper<Device> wrapper = new QueryWrapper<>();
        wrapper.eq("mac",mac);
        wrapper.eq("is_delete",Constants.NOT_DELETE);
        wrapper.eq("mac", mac);
        wrapper.eq("is_delete", Constants.NOT_DELETE);
        return deviceMapper.selectOne(wrapper);
    }
screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,17 +1,15 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.service.HistoryHourlyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.constant.Constants;
import com.moral.util.AQIUtils;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.Date;
import java.util.HashMap;
@@ -26,24 +24,27 @@
 * @since 2021-07-14
 */
@Service
public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
public class HistoryHourlyServiceImpl implements HistoryHourlyService {
    @Autowired
    private HistoryHourlyMapper historyHourlyMapper;
    @Override
    public Map<String, Object> getHourlyAqiByMac(String mac) {
        QueryWrapper<HistoryHourly> queryWrapper = new QueryWrapper<>();
        String time = DateUtils.dateToDateString(new Date(), DateUtils.yyyy_MM_dd_HH_EN) + ":00:00";
        queryWrapper.eq("time", time).eq("mac", mac);
        Date now = new Date();
        String time = DateUtils.dateToDateString(now, DateUtils.yyyy_MM_dd_HH_EN) + ":00:00";
        //获取小时数据
        HistoryHourly historyHourly = historyHourlyMapper.selectOne(queryWrapper);
        Map<String, Object> params = new HashMap<>();
        params.put("timeUnits", DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN));
        params.put("mac", mac);
        params.put("time", time);
        String value = historyHourlyMapper.selectHourlyData(params);
        Map<String, Object> result = new HashMap<>();
        if (historyHourly == null) {
        if (ObjectUtils.isEmpty(value)) {
            result.put("AQI", Constants.NULL_VALUE);
            return result;
        }
        Map<String, Object> data = JSONObject.parseObject(historyHourly.getValue(), Map.class);
        Map<String, Object> data = JSONObject.parseObject(value, Map.class);
        result.put("AQI", AQIUtils.hourlyAqi(data));
        return result;
    }
screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java
New file
@@ -0,0 +1,86 @@
package com.moral.api.websocket;
import lombok.Data;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import com.moral.api.entity.SpecialDevice;
import com.moral.api.entity.UnitConversion;
import com.moral.constant.RedisConstants;
//走航车实时websocket
@ServerEndpoint("/cruiserWebsocket/{mac}")
@Component
@Data
public class CruiserWebSocketServer {
    //线程安全集合,用于存放server对象
    public static CopyOnWriteArraySet<CruiserWebSocketServer> sockets = new CopyOnWriteArraySet<>();
    public static RedisTemplate redisTemplate;
    private Session session;
    private String mac;
    private SpecialDevice specialDevice;
    private Map<String, Object> regionAqi;
    private List<UnitConversion> unitConversions;
    @OnOpen
    public void onOpen(Session session, @PathParam("mac") String mac) {
        this.session = session;
        this.mac = mac;
        this.specialDevice = (SpecialDevice) redisTemplate.opsForHash().get(RedisConstants.SPECIAL_DEVICE_INFO, mac);
        //获取设备地区对应的AQI用于补偿使用
        Map<String, Object> deviceInfo = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac);
        Map<String, Object> orgInfo = (Map<String, Object>) deviceInfo.get("organization");
        String areaCode = String.valueOf(orgInfo.get("areaCode"));
        String cityCode = String.valueOf(orgInfo.get("cityCode"));
        try {
            this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, areaCode);
            if (ObjectUtils.isEmpty(this.regionAqi))
                this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, cityCode);
        } catch (Exception e) {
            e.printStackTrace();
        }
        sockets.add(this);
    }
    @OnClose
    public void onClose() {
        sockets.remove(this);
    }
    @OnMessage
    public void onMessage(String message, Session session) {
        System.out.println("websocket==" + message);
    }
    @OnError
    public void onError(Session session, Throwable error) {
    }
    public void sendMessage(String message) throws Exception {
        if (this.session.isOpen()) {
            //  synchronized (session) {
            this.session.getBasicRemote().sendText(message);
            // }
        }
    }
}
screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -2,12 +2,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.HistoryHourlyMapper">
    <!-- 通用查询映射结果 -->
    <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly">
        <result column="mac" property="mac"/>
        <result column="time" property="time"/>
        <result column="value" property="value"/>
        <result column="version" property="version"/>
    </resultMap>
    <select id="selectHourlyData" resultType="java.lang.String">
        SELECT `value` FROM history_hourly_${timeUnits} WHERE mac = #{mac} AND `time` = #{time}
    </select>
</mapper>
screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java
File was deleted
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -3,8 +3,6 @@
import java.util.List;
import java.util.Map;
import com.moral.api.entity.HistoryHourly;
/**
 * <p>
 * 已校准小时表 Mapper 接口
@@ -17,7 +15,7 @@
    void createTable(String timeUnits);
    void insertHistoryHourly(List<HistoryHourly> list);
    void insertHistoryHourly(List<Map<String, Object>> list);
    Integer selectCountByTime(Map<String, Object> params);
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -2,7 +2,6 @@
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.mapper.HistoryMinutelyMapper;
@@ -116,14 +115,14 @@
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<HistoryHourly> insertData = new ArrayList<>();
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            HistoryHourly historyHourly = new HistoryHourly();
            historyHourly.setMac(key);
            historyHourly.setTime(end);
            Map<String, Object> jsonMap = new HashMap<>();
            Map<String, Object> historyHourly = new HashMap<>();
            historyHourly.put("mac", key);
            historyHourly.put("time", end);
            Map<String, Object> jsonMap = new HashMap<>();
            Map<String, Object> map = new HashMap<>();
            map.put("data", value);
            map.put("type", "hour");
@@ -211,8 +210,8 @@
                    }
                }
            });
            historyHourly.setValue(JSONObject.toJSONString(jsonMap));
            historyHourly.setVersion((Integer) value.get(0).get("version"));
            historyHourly.put("version", value.get(0).get("version"));
            historyHourly.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(historyHourly);
        });
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -7,12 +7,13 @@
            `mac` VARCHAR (20) DEFAULT NULL COMMENT '设备mac',
            `time` datetime DEFAULT NULL COMMENT '数据时间',
            `value` json DEFAULT NULL COMMENT '数据',
            `version` INT(11) DEFAULT NULL COMMENT '型号',
            KEY `idx_mac_time` (`mac`,`time`)
            ) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '小时数据表'
    </update>
    <insert id="insertHistoryHourly">
        INSERT INTO history_hourly VALUES
        INSERT INTO history_hourly_${timeUnits} VALUES
        <foreach collection="list" item="item" separator=",">
            (#{item.mac}, #{item.time}, #{item.value}, #{item.version})
        </foreach>
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -36,7 +36,7 @@
    @Value("${kafka.groupId.state}")
    private String stateGroupId;
    @Bean
    @Bean("insertListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(insertConsumerFactory());
@@ -46,7 +46,7 @@
        return factory;
    }
    @Bean
    @Bean("stateListenerContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(stateConsumerFactory());
@@ -71,8 +71,8 @@
    }
    /*
    * 通用配置
    * */
     * 通用配置
     * */
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java
File was deleted
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
File was renamed from screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -22,10 +22,11 @@
import com.moral.constant.RedisConstants;
/*
 * 设备数据接入
 * 普通设备消费者
 * */
@Component
@Slf4j
public class KafkaConsumer {
public class DeviceConsumer {
    @Autowired
    private HistoryMinutelyService historyMinutelyService;
@@ -148,7 +149,7 @@
    }
    //特殊设备秒数据
    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory")
    public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -1,12 +1,6 @@
package com.moral.api.mapper;
import org.apache.ibatis.annotations.Param;
import java.util.Map;
import com.moral.api.entity.HistoryHourly;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
 * <p>
@@ -16,7 +10,7 @@
 * @author moral
 * @since 2021-06-04
 */
public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> {
public interface HistoryHourlyMapper {
    void insertHistoryHourly(Map<String, Object> params);
screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -2,9 +2,6 @@
import java.util.Map;
import com.moral.api.entity.HistoryHourly;
import com.baomidou.mybatisplus.extension.service.IService;
/**
 * <p>
 * 小时表 服务类
@@ -13,7 +10,7 @@
 * @author moral
 * @since 2021-06-04
 */
public interface HistoryHourlyService extends IService<HistoryHourly> {
public interface HistoryHourlyService {
    //小时数据insert
    void insertHistoryHourly(Map<String, Object> data);
screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,11 +1,9 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.constant.Constants;
import com.moral.constant.RedisConstants;
import com.moral.util.DateUtils;
@@ -28,7 +26,7 @@
 * @since 2021-06-04
 */
@Service
public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
public class HistoryHourlyServiceImpl implements HistoryHourlyService {
    @Autowired
    private HistoryHourlyMapper historyHourlyMapper;
@@ -50,12 +48,13 @@
        Map<String, Object> dataAdjust = new HashMap<>(data);
        Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
        String yearAndMonth = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN);
        Map<String, Object> result = new HashMap<>();
        result.put("mac", mac);
        result.put("time", time);
        result.put("version", version);
        result.put("timeUnits", Constants.UN_ADJUST);
        result.put("timeUnits", yearAndMonth + "_" + Constants.UN_ADJUST);
        result.put("value", JSONObject.toJSONString(data));
        //未校准小时表insert
        historyHourlyMapper.insertHistoryHourly(result);
@@ -63,13 +62,9 @@
        //数据校准
        dataAdjust = deviceService.adjustDeviceData(dataAdjust);
        HistoryHourly historyHourly = new HistoryHourly();
        historyHourly.setMac(mac);
        historyHourly.setTime(time);
        historyHourly.setVersion(version);
        historyHourly.setValue(JSONObject.toJSONString(dataAdjust));
        //校准后小时表insert
        historyHourlyMapper.insert(historyHourly);
        result.put("timeUnits", yearAndMonth);
        result.put("value", JSONObject.toJSONString(dataAdjust));
        historyHourlyMapper.insertHistoryHourly(result);
    }
}
screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -2,14 +2,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.HistoryHourlyMapper">
    <!-- 通用查询映射结果 -->
    <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly">
        <result column="mac" property="mac"/>
        <result column="time" property="time"/>
        <result column="value" property="value"/>
        <result column="version" property="version"/>
    </resultMap>
    <insert id="insertHistoryHourly">
        INSERT INTO history_hourly_${timeUnits}
        VALUES (#{mac}, #{time}, #{value}, #{version})