jinpengyong
2021-06-24 ec95e52c6f84c0a6cbdcbaecc2465b00ede6696d
数据校准工具类
15 files modified
173 ■■■■■ changed files
screen-api/src/main/java/com/moral/api/controller/GroupController.java 5 ●●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java 2 ●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/service/GroupService.java 5 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/service/UserGroupService.java 3 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/service/impl/GroupServiceImpl.java 9 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/service/impl/UserGroupServiceImpl.java 11 ●●●●● patch | view | raw | blame | history
screen-api/src/main/resources/mapper/GroupMapper.xml 5 ●●●●● patch | view | raw | blame | history
screen-common/src/main/java/com/moral/constant/RedisConstants.java 17 ●●●● patch | view | raw | blame | history
screen-job/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java 2 ●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java 4 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java 4 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java 10 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java 42 ●●●● patch | view | raw | blame | history
screen-manage/src/main/java/com/moral/api/util/AdjustDataUtils.java 48 ●●●●● patch | view | raw | blame | history
screen-manage/src/main/resources/application-dev.yml 6 ●●●● patch | view | raw | blame | history
screen-api/src/main/java/com/moral/api/controller/GroupController.java
@@ -17,7 +17,6 @@
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.moral.api.entity.Group;
import com.moral.api.service.GroupService;
@@ -115,8 +114,8 @@
            return ResultMessage.fail(ResponseCodeEnum.PARAMETERS_IS_MISSING.getCode(),
                    ResponseCodeEnum.PARAMETERS_IS_MISSING.getMsg());
        }
        List<Integer> groupIds = userGroupService.getGroupIds(userId);
        return ResultMessage.ok(groupIds);
        List<Map<String, Object>> groups = groupService.getGroupIds(userId);
        return ResultMessage.ok(groups);
    }
    @ApiOperation(value = "用户分配组", notes = "用户分配组")
screen-api/src/main/java/com/moral/api/kafka/consumer/SecondsDataConsumer.java
@@ -19,7 +19,7 @@
 * @Date 2021/6/15 14:49
 * @Version TODO
 **/
@Component
//@Component
public class SecondsDataConsumer implements ConsumerSeekAware {
    @KafkaListener(topics = "test",groupId = "SecondsDataGroup3")
screen-api/src/main/java/com/moral/api/service/GroupService.java
@@ -27,7 +27,10 @@
    //修改组(角色)
    Map<String, Object> updateGroup(Group group);
    //分页组(菜单)列表
    //分页组(角色)列表
    Page<Group> selectGroups(Map<String, Object> parameters);
    //获取用户组(角色)
    List<Map<String, Object>> getGroupIds(Integer userId);
}
screen-api/src/main/java/com/moral/api/service/UserGroupService.java
@@ -1,6 +1,5 @@
package com.moral.api.service;
import java.util.List;
import java.util.Map;
import com.moral.api.entity.UserGroup;
@@ -16,7 +15,7 @@
 */
public interface UserGroupService extends IService<UserGroup> {
    //用户分配组(角色)
    void allotGroups(Map<String, Object> parameters);
    List<Integer> getGroupIds(Integer userId);
}
screen-api/src/main/java/com/moral/api/service/impl/GroupServiceImpl.java
@@ -2,6 +2,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -10,7 +11,6 @@
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.moral.api.entity.Group;
import com.moral.api.entity.GroupMenu;
import com.moral.api.entity.User;
import com.moral.api.entity.UserGroup;
import com.moral.api.mapper.GroupMapper;
import com.moral.api.mapper.GroupMenuMapper;
@@ -164,4 +164,11 @@
        groupMapper.selectPage(pageData, queryWrapper);
        return pageData;
    }
    @Override
    public List<Map<String, Object>> getGroupIds(Integer userId) {
        Map<String, Object> params = new HashMap<>();
        params.put("userId", userId);
        return groupMapper.selectUserGroup(params);
    }
}
screen-api/src/main/java/com/moral/api/service/impl/UserGroupServiceImpl.java
@@ -5,7 +5,6 @@
import java.util.Map;
import java.util.Objects;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.moral.api.entity.UserGroup;
import com.moral.api.mapper.GroupMapper;
@@ -78,15 +77,5 @@
        String account = userMapper.selectById((Integer) parameters.get("userId")).getAccount();
        String content = "给用户:" + account + "分配了组:" + groups;
        operationLogUtils.insertLog(request, content, Constants.UPDATE_OPERATE_TYPE);
    }
    @Override
    public List<Integer> getGroupIds(Integer userId) {
        QueryWrapper<UserGroup> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("user_id", userId).eq("is_delete", Constants.NOT_DELETE);
        List<UserGroup> userGroups = userGroupMapper.selectList(queryWrapper);
        List<Integer> groupIds = new ArrayList<>();
        userGroups.forEach(userGroup -> groupIds.add(userGroup.getGroupId()));
        return groupIds;
    }
}
screen-api/src/main/resources/mapper/GroupMapper.xml
@@ -12,11 +12,6 @@
        <result column="is_delete" property="isDelete"/>
    </resultMap>
    <resultMap id="GroupResultMap" type="java.util.LinkedHashMap">
        <id column="id" property="id"/>
        <result column="group_name" property="groupName"/>
    </resultMap>
    <select id="selectUserGroup" resultMap="GroupResultMap">
        SELECT g.id,g.group_name
        FROM `group` g,`user_group` ug
screen-common/src/main/java/com/moral/constant/RedisConstants.java
@@ -32,12 +32,7 @@
    /*
     * 设备信息前缀
     * */
    public static final String DEVICE = "device";
    /*
     * 设备状态信息前缀
     * */
    public static final String STATE = "state";
    public static final String DEVICE = "device_";
    /*
    * 存储设备显示单位,报警等级以及单位转换公式
@@ -51,11 +46,17 @@
    /*
     * 设备校准公式前缀
     * */
    public static final String ADJUST = "adjust";
    public static final String ADJUST = "adjust_";
    /*
     * 设备实时数据
     * */
    public static final String DEVICE_DATA = "data";
    public static final String DEVICE_DATA = "data_";
    /*
     * 设备实时数据
     * */
    public static final String AQI_DATA = "aqi_";
}
screen-job/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -64,6 +64,6 @@
    }
    private Map<String, Object> getDataFromRedis(String mac) {
        return (Map<String, Object>) redisTemplate.opsForValue().get(RedisConstants.DEVICE_DATA + "_" + mac);
        return (Map<String, Object>) redisTemplate.opsForValue().get(RedisConstants.DEVICE_DATA + mac);
    }
}
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -16,8 +16,8 @@
import java.util.HashMap;
import java.util.Map;
/*@Configuration
@EnableKafka*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaProducerConfig.java
@@ -13,8 +13,8 @@
import java.util.HashMap;
import java.util.Map;
/*@Configuration
@EnableKafka*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
@@ -17,7 +17,6 @@
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.util.AdjustDataUtils;
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
@@ -35,15 +34,13 @@
    private DeviceService deviceService;
    @Autowired
    private AdjustDataUtils adjustDataUtils;
    @Autowired
    private RedisTemplate redisTemplate;
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        System.out.println(msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
@@ -104,6 +101,7 @@
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        //System.out.println(record.offset() + "===>" + msg);
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
@@ -115,9 +113,9 @@
                return;
            }
            //数据校准
            data = adjustDataUtils.adjust(data);
            data = deviceService.adjustDeviceData(data);
            //存入redis
            redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + "_" + mac, data);
            redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data);
            //判断并修改设备状态
            deviceService.judgeDeviceState(data);
            ack.acknowledge();
screen-manage/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -34,12 +34,10 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletRequest;
@@ -92,38 +90,21 @@
     * 从redis获取设备信息
     * */
    private Map<String, Object> getDeviceInfoFromRedis(String mac) {
        return (Map<String, Object>) redisTemplate.opsForValue().get(getDeviceKey(mac));
        return (Map<String, Object>) redisTemplate.opsForValue().get(RedisConstants.DEVICE + mac);
    }
    /*
     * 设备信息存入redis
     */
    private void setDeviceInfoToRedis(String mac, Map<String, Object> deviceInfo) {
        redisTemplate.opsForValue().set(getDeviceKey(mac), deviceInfo);
        redisTemplate.opsForValue().set(RedisConstants.DEVICE + mac, deviceInfo);
    }
    /*
     * 从redis删除设备信息
     */
    private void delDeviceInfoFromRedis(String mac) {
        redisTemplate.delete(getDeviceKey(mac));
    }
    /*
     * 获取设备信息在redis里的key
     */
    private String getDeviceKey(String mac) {
        return keysConnect(RedisConstants.DEVICE, mac);
    }
    //redis key前缀
    private String keysConnect(String... keys) {
        StringBuilder key = new StringBuilder(keys[0]);
        for (int i = 1; i < keys.length; i++) {
            key.append("_");
            key.append(keys[i]);
        }
        return key.toString().toLowerCase();
        redisTemplate.delete(RedisConstants.DEVICE + mac);
    }
    @Override
@@ -438,7 +419,22 @@
    @Override
    public Map<String, Object> adjustDeviceData(Map<String, Object> deviceData) {
        return adjustDataUtils.adjust(deviceData);
        String mac = deviceData.get("mac").toString();
        //从redis获取校准公式
        Map<String, Object> adjustFormula = redisTemplate.opsForHash().entries(RedisConstants.ADJUST + mac);
        if (!ObjectUtils.isEmpty(adjustFormula)) {
            Map<String, Object> deviceInfo = getDeviceByMac(mac);
            Map<String, Object> monitorPoint = (Map<String, Object>) deviceInfo.get("monitorPoint");
            Object areaCode = monitorPoint.get("areaCode");
            Object cityCode = monitorPoint.get("cityCode");
            Map<String, Object> aqiMap = redisTemplate.opsForHash().entries(RedisConstants.AQI_DATA + areaCode);
            if (ObjectUtils.isEmpty(aqiMap)) {
                aqiMap = redisTemplate.opsForHash().entries(RedisConstants.AQI_DATA + cityCode);
            }
            return adjustDataUtils.adjust(deviceData, adjustFormula, ObjectUtils.isEmpty(aqiMap) ? null : aqiMap);
        }
        return deviceData;
    }
    @Override
screen-manage/src/main/java/com/moral/api/util/AdjustDataUtils.java
@@ -1,10 +1,9 @@
package com.moral.api.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import java.util.Date;
import java.util.HashMap;
@@ -15,66 +14,53 @@
import com.googlecode.aviator.AviatorEvaluator;
import com.googlecode.aviator.Expression;
import com.moral.api.entity.DeviceAdjustValue;
import com.moral.api.service.DeviceService;
import com.moral.constant.RedisConstants;
import com.moral.util.DateUtils;
@Slf4j
@Component
public class AdjustDataUtils {
    @Autowired
    private DeviceService deviceService;
    @Autowired
    private RedisTemplate redisTemplate;
    public Map<String, Object> adjust(Map<String, Object> deviceData) {
    //数据,公式
    public Map<String, Object> adjust(Map<String, Object> deviceData, Map<String, Object> adjustFormula, Map<String, Object> aqiMap) {
        try {
            Object dataTime = deviceData.get("DataTime");
            String mac = deviceData.get("mac").toString();
            //清除毫秒,四舍五入
            long time = Math.round(new Double((String) dataTime) / 1000) * 1000L;
            long finalTime = DateUtils.dataToTimeStampTime(new Date(time), DateUtils.HH_mm_ss_EN).getTime();
            //设备信息
            Map<String, Object> deviceInfo = deviceService.getDeviceByMac(mac);
            Map<String, Object> monitorPoint = (Map<String, Object>) deviceInfo.get("monitorPoint");
            Object areaCode = monitorPoint.get("areaCode");
            Object cityCode = monitorPoint.get("cityCode");
            for (String key : deviceData.keySet()) {
                if (!key.equals("mac") && !key.equals("time") && !key.equals("DataTime") && !key.equals("ver") && !key.contains("Flag")) {
                    //测量值
                    Object measuredValue = deviceData.get(key);
                    List<DeviceAdjustValue> adjustValues = (List<DeviceAdjustValue>) redisTemplate.opsForHash().get(RedisConstants.ADJUST + "_" + mac, key);
                    if (ObjectUtils.isEmpty(adjustValues)) {
                    //单个因子校准公式
                    List<DeviceAdjustValue> sensorFormulas = (List<DeviceAdjustValue>) adjustFormula.get(key);
                    if (ObjectUtils.isEmpty(sensorFormulas)) {
                        deviceData.put(key, measuredValue);
                        continue;
                    }
                    //根据时间段筛选校准公式
                    DeviceAdjustValue deviceAdjustValue = adjustValues.stream()
                    DeviceAdjustValue deviceAdjustValue = sensorFormulas.stream()
                            .filter(o -> o.getStartTime().getTime() <= finalTime && o.getEndTime().getTime() > finalTime)
                            .findFirst().get();
                    String adjustValue = deviceAdjustValue.getValue();
                    if (ObjectUtils.isEmpty(adjustValue)) {
                    String formula = deviceAdjustValue.getValue();
                    if (StringUtils.isEmpty(formula)) {
                        deviceData.put(key, measuredValue);
                        continue;
                    }
                    Expression expression = AviatorEvaluator.compile(adjustValue);
                    Expression expression = AviatorEvaluator.compile(formula);
                    Map<String, Object> env = new HashMap<>();
                    if (adjustValue.contains("aqi")) {
                        Object aqiValue = redisTemplate.opsForHash().get("aqi_" + areaCode, key);
                        if (ObjectUtils.isEmpty(aqiValue)) {
                            aqiValue = redisTemplate.opsForHash().get("aqi_" + cityCode, key);
                    if (formula.contains("aqi")) {
                        Object aqiValue = null;
                        if (aqiMap != null) {
                            aqiValue = aqiMap.get(key);
                        }
                        env.put("aqi", ObjectUtils.isEmpty(aqiValue) ? 0F : Float.parseFloat((String) aqiValue));
                    }
                    if (adjustValue.contains("vocs")) {
                    if (formula.contains("vocs")) {
                        Object vocsValue = ObjectUtils.isEmpty(deviceData.get("a99054")) ? 0F : deviceData.get("a99054");
                        env.put("vocs", vocsValue);
                    }
                    if (adjustValue.contains("cel")) {
                    if (formula.contains("cel")) {
                        env.put("cel", Float.parseFloat((String) measuredValue));
                    }
                    //校准
@@ -84,7 +70,6 @@
                        measuredValue = 0F;
                    }
                    deviceData.put(key, Double.parseDouble(String.format("%.3f", measuredValue)));
                }
            }
        } catch (Exception e) {
@@ -93,5 +78,4 @@
        }
        return deviceData;
    }
}
screen-manage/src/main/resources/application-dev.yml
@@ -92,11 +92,11 @@
    enable:
      auto:
        commit: false
    servers: 192.168.0.16:9092,192.168.0.17:9092,192.168.0.18:9092
    servers: 172.16.44.65:9092,172.16.44.67:9092,172.16.44.66:9092
    session:
      timeout: 6000
    zookeeper:
      connect: 192.168.0.16:2181,192.168.0.17:2181,192.168.0.18:2181
      connect: 172.16.44.65:2181,172.16.44.67:2181,172.16.44.66:2181
  producer:
    batch:
      size: 4096
@@ -104,7 +104,7 @@
      memory: 40960
    linger: 1
    retries: 0
    servers: 192.168.0.16:9092,192.168.0.17:9092,192.168.0.18:9092
    servers: 172.16.44.65:9092,172.16.44.67:9092,172.16.44.66:9092
mvc:
  interceptor:
    exclude: