screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -359,8 +359,14 @@ Map<String,Object> historyHourlyMap = new HashMap<>(); historyHourlyMap.put("mac",mac); JSONObject value = JSONObject.parseObject(historyFiveMinutely.getValue()); Double sensorValue = Double.parseDouble(value.get(sensorCode).toString()); historyHourlyMap.put(sensorCode,sensorValue); if (value.get(sensorCode)==null){ historyHourlyMap.put(sensorCode,0.0); }else { Double sensorValue = Double.parseDouble(value.get(sensorCode).toString()); historyHourlyMap.put(sensorCode,sensorValue); } // Double sensorValue = Double.parseDouble(value.get(sensorCode).toString()); // historyHourlyMap.put(sensorCode,sensorValue); Date time = historyFiveMinutely.getTime(); String timeStr = DateUtils.dateToDateString(time, DateUtils.yyyy_MM_dd_HH_mm_ss_EN); historyHourlyMap.put("time",timeStr); screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -30,6 +30,9 @@ void insertHistoryHourlyComplete(@Param("list") List<Map<String, Object>> list, @Param("timeUnits") String timeUnits); void insertHistoryHourlyAvg(@Param("list") List<Map<String, Object>> list, @Param("timeUnits") String timeUnits); } screen-job/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -43,5 +43,8 @@ void dateToChangShu(String time); //计算海城市的小数数据 void dateInsertHistoryHourlyAvg(); } screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -5,8 +5,10 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.moral.api.config.mybatis.MybatisPlusConfig; import com.moral.api.config.rest.Crc16Utils; import com.moral.api.entity.Device; import com.moral.api.entity.HistoryHourly; import com.moral.api.entity.Sensor; import com.moral.api.mapper.DeviceMapper; import com.moral.api.mapper.HistoryHourlyMapper; import com.moral.api.mapper.HistoryMinutelyMapper; import com.moral.api.service.HistoryHourlyService; @@ -58,6 +60,9 @@ private String dateChangShu; @Autowired private HistoryHourlyMapper historyHourlyMapper; @Autowired private DeviceMapper deviceMapper; @Autowired private RedisTemplate redisTemplate; @@ -239,6 +244,7 @@ //存入小时表 historyHourlyMapper.insertHistoryHourly(insertData); } @Override @@ -461,6 +467,158 @@ log.info("发送完成"); } /** * 计算海城市的小时数据 */ @Override public void dateInsertHistoryHourlyAvg() { QueryWrapper<Device> wrapper = new QueryWrapper<>(); wrapper.select("mac"); wrapper.eq("organization_id",71); wrapper.eq("is_delete",Constants.NOT_DELETE); List<Device> devices = deviceMapper.selectList(wrapper); ArrayList<String> macs = new ArrayList<>(); for (Device device : devices) { macs.add(device.getMac()); } //时间格式化:yyyy-MM-dd HH:mm String format = DateUtils.yyyy_MM_dd_HH_EN; Date now = new Date(); //从数据库获取数据参数 Map<String, Object> params = new HashMap<>(); //开始时间 Date start = DateUtils.dataToTimeStampTime(DateUtils.addHours(now, -1), format); //结束时间 Date end = DateUtils.dataToTimeStampTime(now, format); // String start1= "2023-08-18 03:00:00"; // String end1 = "2023-08-18 04:00:00"; // Date date1 = DateUtils.getDate(start1, DateUtils.yyyy_MM_dd_HH_mm_ss_EN); // Date date2 = DateUtils.getDate(end1, DateUtils.yyyy_MM_dd_HH_mm_ss_EN); params.put("start", start); params.put("end", end); params.put("macs",macs); //获取数据的分钟表后缀 String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN); params.put("timeUnits", timeUnits); //因子 QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>(); sensorQueryWrapper.select("code", "lower", "upper").eq("is_delete", Constants.NOT_DELETE); List<Sensor> sensors = sensorService.list(sensorQueryWrapper); // 获取海城设备的一小时内分钟的数据 List<Map<String, Object>> hourlyData = historyMinutelyMapper.getHistoryMinutelyData(params); if (ObjectUtils.isEmpty(hourlyData)) { return; } //按mac分组 Map<String, List<Map<String, Object>>> data = hourlyData.parallelStream() .collect(Collectors.groupingBy(o -> (String) o.get("mac"))); //存入数据库的结果集 List<Map<String, Object>> insertData = new ArrayList<>(); data.forEach((key, value) -> { Map<String, Object> historyHourly = new HashMap<>(); historyHourly.put("mac", key); historyHourly.put("time",start); Map<String, Object> jsonMap = new HashMap<>(); Map<String, Object> map = new HashMap<>(); map.put("data", value); map.put("type", "hour"); for (Sensor sensor : sensors) { String sensorCode = sensor.getCode(); //风向上下限 if (sensorCode.equals(Constants.SENSOR_CODE_WIND_DIR)) { if (sensor.getUpper() != null) { map.put("windDirUpper", sensor.getUpper()); } if (sensor.getLower() != null) { map.put("windDirLower", sensor.getLower()); } } //风速上下限 if (sensorCode.equals(Constants.SENSOR_CODE_WIND_SPEED)) { if (sensor.getUpper() != null) { map.put("windSpeedUpper", sensor.getUpper()); } if (sensor.getLower() != null) { map.put("windSpeedLower", sensor.getLower()); } } } //风向均值计算并修约 Map<String, Object> windDirAvg = AmendUtils.getWindDirAvg(map); if (!ObjectUtils.isEmpty(windDirAvg)) { jsonMap.putAll(windDirAvg); } //除风向外其他因子均值计算 sensors.forEach(sensor -> { String sensorCode = sensor.getCode(); Double upper = sensor.getUpper(); Double lower = sensor.getLower(); AtomicInteger size = new AtomicInteger(); DoubleStream optionalDouble = value.parallelStream() .flatMapToDouble(v -> { Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class); Object sensorValue = dataValue.get(sensorCode); //数据有效性标记位 Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY); if (!Constants.MARKER_BIT_TRUE.equals(flag)) { return null; } if (ObjectUtils.isEmpty(sensorValue)) { return null; } //风向单独计算 if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) { return null; } //剔除数据超过上下限的数据 double aDouble = Double.parseDouble(sensorValue.toString()); if (!ObjectUtils.isEmpty(upper)) { if (aDouble < upper) { return null; } } if (!ObjectUtils.isEmpty(lower)) { if (aDouble > lower) { return null; } } size.getAndIncrement(); return DoubleStream.of(aDouble); }); OptionalDouble average = optionalDouble.average(); if (average.isPresent()) { //银行家算法修约 double sciCal = AmendUtils.sciCal(average.getAsDouble(), 4); jsonMap.put(sensorCode, sciCal); //标志位 if (size.get() >= 45) { jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE); } else { jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE); } } }); historyHourly.put("version", value.get(0).get("version")); historyHourly.put("value", JSONObject.toJSONString(jsonMap)); insertData.add(historyHourly); }); //存入数据库 historyHourlyMapper.insertHistoryHourlyAvg(insertData,timeUnits); } private String strList(String startTime,String mn,String msg){ String qn = "QN="+ startTime + "001;ST=22;CN=2061;PW=123456;MN="+mn.toUpperCase()+";CP=&&DataTime="+startTime+";"; Map<String, Object> data = JSON.parseObject(msg, Map.class); screen-job/src/main/java/com/moral/api/task/HistoryTableInsertTask.java
@@ -116,4 +116,18 @@ } return ReturnT.SUCCESS; } //海城小时数据统计 @XxlJob("dateInsertHistoryHourlyAvg") public ReturnT dateInsertHistoryHourlyAvg(){ try { historyHourlyService.dateInsertHistoryHourlyAvg(); } catch (Exception e) { e.printStackTrace(); return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); } return ReturnT.SUCCESS; } } screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -56,4 +56,14 @@ (#{item.mac}, #{item.time}, #{item.value}) </foreach> </insert> <insert id="insertHistoryHourlyAvg"> INSERT INTO history_hourly_${timeUnits} VALUES <foreach collection="list" item="item" separator=","> (#{item.mac}, #{item.time}, #{item.value}, #{item.version}) </foreach> </insert> </mapper> screen-manage/src/main/java/com/moral/api/controller/TestController.java
@@ -12,7 +12,9 @@ import com.moral.api.service.SysDictDataService; import com.moral.api.service.TestService; import com.moral.api.service.impl.SensorServiceImpl; import com.moral.api.util.AdjustDataUtils; import com.moral.api.util.CacheUtils; import com.moral.api.util.CompareFieldUtils; import com.moral.constant.Constants; import com.moral.constant.KafkaConstants; import com.moral.constant.RedisConstants; @@ -211,6 +213,8 @@ private SensorService sensorService; @Autowired SysDictDataMapper sysDictDataMapper; @Autowired private AdjustDataUtils adjustDataUtils; @ApiOperation(value = "因子测试", notes = "因子测试") @ApiImplicitParams({ @@ -218,6 +222,12 @@ }) @RequestMapping(value = "getSensor", method = RequestMethod.GET) public void getSensor() { HashMap<String, Object> map = new HashMap<>(); map.put("time","1692364996898"); map.put("mac","p5dnd7a0391986"); HashMap<String, Object> map1 = new HashMap<>(); HashMap<String, Object> map2 = new HashMap<>(); Map<String, Object> adjust = adjustDataUtils.adjust(map, map1, map2, "2"); QueryWrapper<Sensor> queryWrapper = new QueryWrapper<>(); queryWrapper.select("code").eq("is_delete", Constants.NOT_DELETE); List<Object> list = sensorService.listObjs(queryWrapper); screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
@@ -1,5 +1,6 @@ package com.moral.api.kafka.consumer; import com.alibaba.fastjson.JSONObject; import com.moral.api.service.*; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -10,6 +11,9 @@ import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.text.DecimalFormat; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -98,6 +102,24 @@ //数据过滤 // data.remove("time"); data.remove("entryTime"); Map<String, Object> deviceByMac = deviceService.getDeviceByMac(mac.toString()); HashMap<String, Object> map = (HashMap<String, Object>) deviceByMac.get("organization"); String id = map.get("id").toString(); if (id.equals("71")){ log.warn(id, msg); return; } // SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // String format = dateFormat.format(new Date().getTime()); // //获取年份 // String substring = format.substring(0, 4); // // String substring1 = time.toString().substring(0, 4); // // String replace = time.toString().replace(substring1, substring); // // data.put("DataTime",replace); Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); Map<String, Object> newMap = new HashMap<>(); Map.Entry<String, Object> next; screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -57,7 +57,6 @@ String format1 = dateFormat.format(l); Date time = DateUtils.getDate(format1, DateUtils.yyyy_MM_dd_HH_mm_ss_EN); // Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN); String yearAndMonth = DateUtils.dateToDateString(DateUtils.addHours(time, -1), DateUtils.yyyyMM_EN); screen-manage/src/main/java/com/moral/api/util/AdjustDataUtils.java
@@ -126,35 +126,45 @@ if (ObjectUtils.isEmpty(map)){ return deviceData; } if (ObjectUtils.isEmpty(aqiMap.get("a21005"))){ JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a21005 = jsonObject.get("a21005").toString(); deviceData.put("a21005",Double.parseDouble(a21005)); } if (ObjectUtils.isEmpty(aqiMap.get("a21026"))){ JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a21026 = jsonObject.get("a21026").toString(); deviceData.put("a21026",Double.parseDouble(a21026)); } if (ObjectUtils.isEmpty(aqiMap.get("a21004"))){ JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a21004 = jsonObject.get("a21004").toString(); deviceData.put("a21004",Double.parseDouble(a21004)); } if (ObjectUtils.isEmpty(aqiMap.get("a34002"))){ JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a34002 = jsonObject.get("a34002").toString(); deviceData.put("a34002",Double.parseDouble(a34002)); } if (ObjectUtils.isEmpty(aqiMap.get("a34004"))){ JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a34004 = jsonObject.get("a34004").toString(); deviceData.put("a34004",Double.parseDouble(a34004)); } if (ObjectUtils.isEmpty(aqiMap.get("a05024"))){ JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a05024 = jsonObject.get("a05024").toString(); deviceData.put("a05024",Double.parseDouble(a05024)); JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); if (ObjectUtils.isEmpty(aqiMap)){ deviceData.put("a21005",Double.parseDouble(jsonObject.get("a21005").toString())); deviceData.put("a21026",Double.parseDouble(jsonObject.get("a21026").toString())); deviceData.put("a21004",Double.parseDouble(jsonObject.get("a21004").toString())); deviceData.put("a34002",Double.parseDouble(jsonObject.get("a34002").toString())); deviceData.put("a34004",Double.parseDouble(jsonObject.get("a34004").toString())); deviceData.put("a05024",Double.parseDouble(jsonObject.get("a05024").toString())); }else { if (ObjectUtils.isEmpty(aqiMap.get("a21005"))){ String a21005 = jsonObject.get("a21005").toString(); deviceData.put("a21005",Double.parseDouble(a21005)); } if (ObjectUtils.isEmpty(aqiMap.get("a21026"))){ // JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a21026 = jsonObject.get("a21026").toString(); deviceData.put("a21026",Double.parseDouble(a21026)); } if (ObjectUtils.isEmpty(aqiMap.get("a21004"))){ // JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a21004 = jsonObject.get("a21004").toString(); deviceData.put("a21004",Double.parseDouble(a21004)); } if (ObjectUtils.isEmpty(aqiMap.get("a34002"))){ // JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a34002 = jsonObject.get("a34002").toString(); deviceData.put("a34002",Double.parseDouble(a34002)); } if (ObjectUtils.isEmpty(aqiMap.get("a34004"))){ // JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a34004 = jsonObject.get("a34004").toString(); deviceData.put("a34004",Double.parseDouble(a34004)); } if (ObjectUtils.isEmpty(aqiMap.get("a05024"))){ // JSONObject jsonObject = JSONObject.parseObject(map.get("value").toString()); String a05024 = jsonObject.get("a05024").toString(); deviceData.put("a05024",Double.parseDouble(a05024)); } } }