jinpengyong
2021-07-15 371a381267699280f45748c13ed42faba66b6577
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.moral.api.kafka.consumer;
 
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
 
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
 
import com.alibaba.fastjson.JSON;
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
import com.moral.api.service.HistoryMinutelyService;
import com.moral.constant.KafkaConstants;
import com.moral.constant.RedisConstants;
 
@Component
@Slf4j
public class KafkaConsumer {
 
    @Autowired
    private HistoryMinutelyService historyMinutelyService;
 
    @Autowired
    private HistoryHourlyService historyHourlyService;
 
    @Autowired
    private DeviceService deviceService;
 
    @Autowired
    private RedisTemplate redisTemplate;
 
    //分钟数据
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_MINUTE, containerFactory = "kafkaListenerContainerFactory")
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
 
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
            Map<String, Object> newMap = new HashMap<>();
            Map.Entry<String, Object> next;
            while (iterator.hasNext()) {
                next = iterator.next();
                String key = next.getKey();
                Object value = next.getValue();
                if (key.contains("-Avg")) {
                    newMap.put(key.replaceAll("-Avg", ""), value);
                } else {
                    newMap.put(key, value);
                }
                iterator.remove();
            }
            //存入数据库
            historyMinutelyService.insertHistoryMinutely(newMap);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("param{}" + msg);
        }
    }
 
    //小时数据
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_HOUR, containerFactory = "kafkaListenerContainerFactory")
    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
 
            //数据过滤
            data.remove("time");
            data.remove("entryTime");
            Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator();
            Map<String, Object> newMap = new HashMap<>();
            Map.Entry<String, Object> next;
            while (iterator.hasNext()) {
                next = iterator.next();
                String key = next.getKey();
                Object value = next.getValue();
                if (key.contains("-Avg")) {
                    newMap.put(key.replaceAll("-Avg", ""), value);
                } else {
                    newMap.put(key, value);
                }
                iterator.remove();
            }
            //存入数据库
            historyHourlyService.insertHistoryHourly(newMap);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("param{}" + msg);
        }
    }
 
    //秒数据,修改设备状态,缓存最新秒数据
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_STATE, containerFactory = "kafkaListenerContainerFactory")
    public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String msg = record.value();
        try {
            Map<String, Object> data = JSON.parseObject(msg, HashMap.class);
            Object mac = data.get("mac");
            Object time = data.get("DataTime");
            Object ver = data.get("ver");
            if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) {
                log.warn("some properties is null, param{}", msg);
                ack.acknowledge();
                return;
            }
            //数据校准
            data = deviceService.adjustDeviceData(data);
            //存入redis
            redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data);
            //判断并修改设备状态
            deviceService.judgeDeviceState(data);
            ack.acknowledge();
        } catch (Exception e) {
            log.error("param{}" + msg);
        }
    }
}