jinpengyong
2022-02-23 15bba267b1992286f04be9db924af2786f3aac97
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package com.moral.api.kafka.consumer;
 
import com.alibaba.fastjson.JSON;
import com.moral.api.entity.Device;
import com.moral.api.entity.Sensor;
import com.moral.api.entity.UnitConversion;
import com.moral.api.utils.UnitConvertUtils;
import com.moral.api.websocket.SingleDeviceServer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
 
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
 
/**
 * @ClassName SecondsDataConsumer1
 * @Description TODO
 * @Author 陈凯裕
 * @Date 2021/6/15 14:49
 * @Version TODO
 **/
@Component
@Slf4j
public class SecondDataConsumer implements ConsumerSeekAware {
 
    @KafkaListener(containerFactory = "secondDataListenerFactory", topics = "second_data")
    public void listen(ConsumerRecord<String, String> record, Consumer consumer) throws Exception {
        String messageStr = record.value();
        Map<String, Object> message = (Map<String, Object>) JSON.parse(messageStr);
        //long serverStartTime = System.currentTimeMillis();
        CopyOnWriteArraySet<SingleDeviceServer> sockets = SingleDeviceServer.sockets;
        for (SingleDeviceServer socket : sockets) {
            //判断消息是否数据该socket
            String messageMac = (String) message.get("mac");
            String mac = socket.getMac();
            if (!mac.equalsIgnoreCase(messageMac))
                continue;
            //取出基本信息
            Device device = socket.getDeviceAlarmInfo();
            //创建最终消息对象
            Map<String, Object> resultMessgae = new HashMap<>();
            //拼接单位
            List<Sensor> sensors = device.getVersion().getSensors();//获取型号所有因子信息
            for (Sensor sensor : sensors) {
                String code = sensor.getCode();
                String showUnit = sensor.getShowUnit();
                String showUnitKey = sensor.getShowUnitKey();
                String unitKey = sensor.getUnitKey();
                String unit = sensor.getUnit();
                //如果消息中没有该因子则退出循环
                Object value = message.get(code);
                //对数据保留两位小数,并且向下取整
                /*if(value==null)    源代码
                    continue;
                Double sourceDataD = Double.valueOf(String.valueOf(value));*/
                //测试代码使用,给臭气一个固定值  start
                Double sourceDataD = null;
                if (value != null) {
                     sourceDataD = Double.valueOf(String.valueOf(value));
                }else{
                    sourceDataD = 5.00d;
                }
                //测试代码使用,给臭气一个固定值  end
                BigDecimal bg = new BigDecimal(sourceDataD);
                bg = bg.setScale(2, BigDecimal.ROUND_FLOOR);
                String sourceData = bg.toString();
                //数据补偿
                //单位转换
                if (!unitKey.equals(showUnitKey)) {//如果源单位和显示单位不同,则进行单位转换
                    String formula = sensor.getFormula();
                    //如果sensor中的公式为空则从缓存中获取公式
                    if (ObjectUtils.isEmpty(formula)) {
                        List<UnitConversion> unitConversions = socket.getUnitConversions();
                        for (UnitConversion unitConversion : unitConversions) {
                            if (unitConversion.getOriginalUnitKey().equals(unitKey) && unitConversion.getTargetUnitKey().equals(showUnitKey))
                                formula = unitConversion.getFormula();
                        }
                    }
                    //单位转换
                    String resultData = UnitConvertUtils.calculate(sourceData, formula);
                    if (resultData != null) {
                        resultData += showUnit;
                    } else {//如果转换出的数据为null,则代表缓存中也没有公式,依然使用源单位。
                        resultData = sourceData + unit;
                    }
                    resultMessgae.put(sensor.getCode(), resultData);
                } else {
                    //拼接单位
                    if(!showUnit.equals("无单位"))
                        sourceData = sourceData + " " + showUnit;
                    resultMessgae.put(sensor.getCode(), sourceData);
                }
            }
            //测试时间延迟使用的属性
            //resultMessgae.put("DataTime", message.get("DataTime"));
            //resultMessgae.put("time", message.get("time"));
            //resultMessgae.put("time1", message.get("time1"));
            //resultMessgae.put("serverTime", System.currentTimeMillis());
            //resultMessgae.put("serverStartTime", serverStartTime);
            socket.sendMessage(JSON.toJSONString(resultMessgae));
        }
    }
 
    @Override
    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
 
    }
 
    //consumer回调函数,设置从最新的offset开始消费
    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
        map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition()));
    }
 
    @Override
    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
 
    }
 
}