jinpengyong
2021-06-23 69d00365e9f099b26c4fc7a298cabeb131956d8a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package com.moral.api.kafka.consumer;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
 
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
 
import com.alibaba.fastjson.JSON;
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.api.util.AdjustDataUtils;
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
 
//@Component
@Slf4j
public class KafkaConsumer {
 
    @Autowired
    private HistoryMinutelyService historyMinutelyService;
 
    @Autowired
    private HistoryHourlyService historyHourlyService;
 
    @Autowired
    private DeviceService deviceService;
 
    @Autowired
    private AdjustDataUtils adjustDataUtils;
 
    @Autowired
    private RedisTemplate redisTemplate;
 
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
 
            //数据过滤
            data = data.entrySet().stream()
                    .filter(map -> {
                        String key = map.getKey();
                        return !(key.contains("Min") || key.contains("Max") || key.contains("Cou"));
                    }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue));
            data.remove("time");
            //存入数据库
            historyMinutelyService.insertHistoryMinutely(data);
            ack.acknowledge();
        } catch (Exception e) {
            //log.error("param{}" + msg);
        }
    }
 
    //小时数据
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory")
    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
 
            //数据过滤
            data = data.entrySet().stream()
                    .filter(map -> {
                        String key = map.getKey();
                        return !(key.contains("Min") || key.contains("Max") || key.contains("Cou"));
                    }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue));
            data.remove("time");
            //存入数据库
            historyHourlyService.insertHistoryHourly(data);
            ack.acknowledge();
        } catch (Exception e) {
            //log.error("param{}" + msg);
        }
    }
 
    //秒数据,修改设备状态,缓存最新秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
            //数据校准
            data = adjustDataUtils.adjust(data);
            //存入redis
            redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + "_" + mac, data);
            //判断并修改设备状态
            deviceService.judgeDeviceState(data);
            ack.acknowledge();
        } catch (Exception e) {
            //log.error("param{}" + msg);
        }
    }
}