cjl
2023-10-19 ab061be50b7653531cbe134416c3ebdd876e4791
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,10 +1,14 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.moral.api.config.mybatis.MybatisPlusConfig;
import com.moral.api.config.rest.Crc16Utils;
import com.moral.api.entity.Device;
import com.moral.api.entity.HistoryHourly;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.DeviceMapper;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.mapper.HistoryMinutelyMapper;
import com.moral.api.service.HistoryHourlyService;
@@ -17,18 +21,23 @@
import com.moral.util.MybatisPLUSUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.methods.PostMethod;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalDouble;
import java.util.Set;
import java.io.*;
import java.math.BigDecimal;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -44,10 +53,16 @@
 * @since 2021-06-28
 */
@Service
@Slf4j
public class HistoryHourlyServiceImpl implements HistoryHourlyService {
    @Value("${result.date.changshu}")
    private String dateChangShu;
    @Autowired
    private HistoryHourlyMapper historyHourlyMapper;
    @Autowired
    private DeviceMapper deviceMapper;
    @Autowired
    private RedisTemplate redisTemplate;
@@ -229,6 +244,7 @@
        //存入小时表
        historyHourlyMapper.insertHistoryHourly(insertData);
    }
    @Override
@@ -422,4 +438,278 @@
        }
        return null;
    }
    @Override
    public void dateToChangShu(String time) {
        //获取所有设备小时数据
        Date end = new Date();
        List<String> stringList = Arrays.asList(dateChangShu.split(","));
        String startTime = DateUtils.dateToDateString(DateUtils.addHours(end,-1),DateUtils.yyyy_MM_dd_HH_EN)+":00:00";
        Map<String, Object> prop = new HashMap<>();
        String timeUnits = DateUtils.dateToDateString(end, DateUtils.yyyyMM_EN);
        prop.put("timeUnits", timeUnits);
        prop.put("start", startTime);
        prop.put("end", DateUtils.dateToDateString(end,DateUtils.yyyy_MM_dd_HH_EN)+":00:00");
        prop.put("list", stringList);
        List<Map<String, Object>> dailyData = this.selectDailyData(prop);
        String startTimeStr = DateUtils.dateToDateString(DateUtils.addHours(end,-1),"yyyyMMddHH")+"0000";
        List<String> list = new ArrayList<>();
        for(Map<String, Object> m : dailyData){
            String result = strList(startTimeStr,m.get("mac").toString(),m.get("value").toString());
            list.add(result);
            redisTemplate.opsForHash().delete(RedisConstants.DATE_CHANG_SHU,m.get("mac").toString());
            redisTemplate.opsForHash().put(RedisConstants.DATE_CHANG_SHU,m.get("mac").toString(),result);
        }
        for(String s : list){
            sendSocket("222.92.166.238",15031,s);
        }
        log.info("发送完成");
    }
    /**
     * 计算海城市的小时数据
     */
    @Override
    public void dateInsertHistoryHourlyAvg() {
        QueryWrapper<Device> wrapper = new QueryWrapper<>();
        wrapper.select("mac");
        wrapper.eq("organization_id",71);
        wrapper.eq("is_delete",Constants.NOT_DELETE);
        List<Device> devices = deviceMapper.selectList(wrapper);
        ArrayList<String> macs = new ArrayList<>();
        for (Device device : devices) {
            macs.add(device.getMac());
        }
        //时间格式化:yyyy-MM-dd HH:mm
        String format = DateUtils.yyyy_MM_dd_HH_EN;
        Date now = new Date();
        //从数据库获取数据参数
        Map<String, Object> params = new HashMap<>();
        //开始时间
        Date start = DateUtils.dataToTimeStampTime(DateUtils.addHours(now, -1), format);
        //结束时间
        Date end = DateUtils.dataToTimeStampTime(now, format);
//        String start1= "2023-08-18 03:00:00";
//        String end1 = "2023-08-18 04:00:00";
//        Date date1 = DateUtils.getDate(start1, DateUtils.yyyy_MM_dd_HH_mm_ss_EN);
//        Date date2 = DateUtils.getDate(end1, DateUtils.yyyy_MM_dd_HH_mm_ss_EN);
        params.put("start", start);
        params.put("end", end);
        params.put("macs",macs);
        //获取数据的分钟表后缀
        String timeUnits = DateUtils.dateToDateString(start, DateUtils.yyyyMM_EN);
        params.put("timeUnits", timeUnits);
        //因子
        QueryWrapper<Sensor> sensorQueryWrapper = new QueryWrapper<>();
        sensorQueryWrapper.select("code", "lower", "upper").eq("is_delete", Constants.NOT_DELETE);
        List<Sensor> sensors = sensorService.list(sensorQueryWrapper);
        // 获取海城设备的一小时内分钟的数据
        List<Map<String, Object>> hourlyData = historyMinutelyMapper.getHistoryMinutelyData(params);
        if (ObjectUtils.isEmpty(hourlyData)) {
            return;
        }
        //按mac分组
        Map<String, List<Map<String, Object>>> data = hourlyData.parallelStream()
                .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
        //存入数据库的结果集
        List<Map<String, Object>> insertData = new ArrayList<>();
        data.forEach((key, value) -> {
            Map<String, Object> historyHourly = new HashMap<>();
            historyHourly.put("mac", key);
            historyHourly.put("time",start);
            Map<String, Object> jsonMap = new HashMap<>();
            Map<String, Object> map = new HashMap<>();
            map.put("data", value);
            map.put("type", "hour");
            for (Sensor sensor : sensors) {
                String sensorCode = sensor.getCode();
                //风向上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_DIR)) {
                    if (sensor.getUpper() != null) {
                        map.put("windDirUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windDirLower", sensor.getLower());
                    }
                }
                //风速上下限
                if (sensorCode.equals(Constants.SENSOR_CODE_WIND_SPEED)) {
                    if (sensor.getUpper() != null) {
                        map.put("windSpeedUpper", sensor.getUpper());
                    }
                    if (sensor.getLower() != null) {
                        map.put("windSpeedLower", sensor.getLower());
                    }
                }
            }
            //风向均值计算并修约
            Map<String, Object> windDirAvg = AmendUtils.getWindDirAvg(map);
            if (!ObjectUtils.isEmpty(windDirAvg)) {
                jsonMap.putAll(windDirAvg);
            }
            //除风向外其他因子均值计算
            sensors.forEach(sensor -> {
                String sensorCode = sensor.getCode();
                Double upper = sensor.getUpper();
                Double lower = sensor.getLower();
                AtomicInteger size = new AtomicInteger();
                DoubleStream optionalDouble = value.parallelStream()
                        .flatMapToDouble(v -> {
                            Map<String, Object> dataValue = JSONObject.parseObject((String) v.get("value"), Map.class);
                            Object sensorValue = dataValue.get(sensorCode);
                            //数据有效性标记位
                            Object flag = dataValue.get(sensorCode + "-" + Constants.MARKER_BIT_KEY);
                            if (!Constants.MARKER_BIT_TRUE.equals(flag)) {
                                return null;
                            }
                            if (ObjectUtils.isEmpty(sensorValue)) {
                                return null;
                            }
                            //风向单独计算
                            if (Constants.SENSOR_CODE_WIND_DIR.equals(sensorCode)) {
                                return null;
                            }
                            //剔除数据超过上下限的数据
                            double aDouble = Double.parseDouble(sensorValue.toString());
                            if (!ObjectUtils.isEmpty(upper)) {
                                if (aDouble < upper) {
                                    return null;
                                }
                            }
                            if (!ObjectUtils.isEmpty(lower)) {
                                if (aDouble > lower) {
                                    return null;
                                }
                            }
                            size.getAndIncrement();
                            return DoubleStream.of(aDouble);
                        });
                OptionalDouble average = optionalDouble.average();
                if (average.isPresent()) {
                    //银行家算法修约
                    double sciCal = AmendUtils.sciCal(average.getAsDouble(), 4);
                    jsonMap.put(sensorCode, sciCal);
                    //标志位
                    if (size.get() >= 45) {
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_TRUE);
                    } else {
                        jsonMap.put(sensorCode + "-" + Constants.MARKER_BIT_KEY, Constants.MARKER_BIT_FALSE);
                    }
                }
            });
            historyHourly.put("version", value.get(0).get("version"));
            historyHourly.put("value", JSONObject.toJSONString(jsonMap));
            insertData.add(historyHourly);
        });
        //存入数据库
       historyHourlyMapper.insertHistoryHourlyAvg(insertData,timeUnits);
    }
    private String strList(String startTime,String mn,String msg){
        String qn = "QN="+ startTime + "001;ST=22;CN=2061;PW=123456;MN="+mn.toUpperCase()+";CP=&&DataTime="+startTime+";";
        Map<String, Object> data = JSON.parseObject(msg, Map.class);
        StringBuffer stringBuffer = new StringBuffer();
        if(data.size()>0){
            data.remove("mac");
            data.remove("time");
            for(Map.Entry<String, Object> entry : data.entrySet()){
                String mapKey = entry.getKey();
                Object mapValue = entry.getValue();
                if(mapKey.contains("e")){
                    if(mapKey.contains("a00e12")||mapKey.contains("a00e13")){
                    }else {
                        continue;
                    }
                }
                if(!mapKey.contains("-Flag")){
                    if(mapKey.contains("a34002")||mapKey.contains("a34004")||mapKey.contains("a21026")||mapKey.contains("a21004")||mapKey.contains("a05024")){
                        BigDecimal d = Objects.nonNull(mapValue)? BigDecimal.valueOf(Double.valueOf(mapValue.toString())).divide(BigDecimal.valueOf(1000)):BigDecimal.ZERO;
                       mapValue = d.toString();
                    }
                    if(mapKey.contains("a00e13")){
                        mapKey = "a90085";
                    }
                    stringBuffer.append(mapKey+"-Avg=").append(mapValue+",").append(mapKey+"-Flag=N;");
                }
            }
        }
        stringBuffer.deleteCharAt(stringBuffer.length()-1);
        stringBuffer.append("&&");
        //String result = "QN=20230726160000001;ST=22;CN=2061;PW=123456;MN=P5DND7A0391978;CP=&&DataTime=20230726160000;a21005-Avg=0.03072,a21005-Flag=N;a21004-Avg=20.6,a21004-Flag=N;a21026-Avg=140.43,a21026-Flag=N;a21028-Avg=0.00778,a21028-Flag=N;a21001-Avg=0.13132,a21001-Flag=N;a01001-Avg=38.68,a01001-Flag=N;a05024-Avg=116.27,a05024-Flag=N;a01002-Avg=57.47,a01002-Flag=N;a01007-Avg=0.864,a01007-Flag=N;a01006-Avg=811.918,a01006-Flag=N;a01008-Avg=156.66,a01008-Flag=N;a34002-Avg=9.18,a34002-Flag=N;a34004-Avg=8.1,a34004-Flag=N;a99054-Avg=0.02283,a99054-Flag=N;a31001-Avg=0,a31001-Flag=N&&";
        String result = qn+stringBuffer.toString();
        String s = Crc16Utils.padLeftTest(String.valueOf(result.length()),4,null);
        String s1 = Crc16Utils.crc16_2017(result, result.length());
        return  "##"+s+result+s1+"\r\n";
    }
    public static void main(String[] args) {
        String result = "QN=20231019000100001;ST=31;CN=2051;PW=123456;MN=P5DND7A0392082;Flag=4;CP=&&DataTime=20231019000100;a01001-Avg=25.0,a01001-Flag=N;a01002-Avg=61.0,a01002-Flag=N;a34004-Avg=19.3,a34004-Flag=N;a34002-Avg=38.5,a34002-Flag=N;a34001-Avg=41.2,a34001-Flag=N;a01007-Avg=0.00,a01007-Flag=N;a01008-Avg=137.53,a01008-Flag=N;a01006-Avg=4255.774,a01006-Flag=T;a00e13-Avg=39.2,a00e13-Flag=N&&";
        String s = Crc16Utils.padLeftTest(String.valueOf(result.length()),4,null);
        String s1 = Crc16Utils.crc16_2017(result, result.length());
        System.out.println("##"+s+result+s1+"\r\n");
        int i = 0;
    }
    private static String sendSocket(String host, Integer port, String message) {
        log.debug("请求地址:{},端口:{},报文:{}", host, port, message);
        Socket socket = null;
        OutputStream outputStream = null;
        InputStream inputStream = null;
        BufferedReader bufferedReader = null;
        try {
            socket = new Socket(host, port);
            socket.setSoTimeout(3000);
            // 建立连接后获得输出流
            outputStream = socket.getOutputStream();
            outputStream.write(message.getBytes());
            outputStream.flush();
            socket.shutdownOutput();
            inputStream = socket.getInputStream();
            bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
            String readLen=bufferedReader.readLine();
            log.debug("响应报文:{}", readLen);
            return readLen;
        } catch (IOException e) {
            log.error("socket发送异常:{}", e.toString());
        } finally {
            try {
                    if(bufferedReader != null){
                    bufferedReader.close();
                }
                if (inputStream != null) {
                    inputStream.close();
                }
                if (outputStream != null) {
                    outputStream.close();
                }
                if (socket != null) {
                    socket.close();
                }
            } catch (IOException e) {
                log.error("socket关闭异常:{}", e.toString());
            }
        }
        return null;
    }
}