package com.moral.api.kafka.consumer; 
 | 
  
 | 
import com.moral.api.service.*; 
 | 
import lombok.extern.slf4j.Slf4j; 
 | 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
 | 
import org.springframework.beans.factory.annotation.Autowired; 
 | 
import org.springframework.data.redis.core.RedisTemplate; 
 | 
import org.springframework.kafka.annotation.KafkaListener; 
 | 
import org.springframework.kafka.support.Acknowledgment; 
 | 
import org.springframework.stereotype.Component; 
 | 
import org.springframework.util.ObjectUtils; 
 | 
  
 | 
import java.util.HashMap; 
 | 
import java.util.Iterator; 
 | 
import java.util.Map; 
 | 
  
 | 
import com.alibaba.fastjson.JSON; 
 | 
import com.moral.constant.KafkaConstants; 
 | 
import com.moral.constant.RedisConstants; 
 | 
  
 | 
/* 
 | 
 * 普通设备消费者 
 | 
 * */ 
 | 
@Component 
 | 
@Slf4j 
 | 
public class DeviceConsumer { 
 | 
  
 | 
    @Autowired 
 | 
    private HistoryMinutelyService historyMinutelyService; 
 | 
  
 | 
    @Autowired 
 | 
    private HistoryHourlyService historyHourlyService; 
 | 
  
 | 
    @Autowired 
 | 
    private DeviceService deviceService; 
 | 
  
 | 
    @Autowired 
 | 
    private RedisTemplate redisTemplate; 
 | 
  
 | 
    @Autowired 
 | 
    private HistorySecondCruiserService historySecondCruiserService; 
 | 
  
 | 
    @Autowired 
 | 
    private HistorySecondUavService historySecondUavService; 
 | 
  
 | 
    //分钟数据 
 | 
    @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, containerFactory = "insertListenerContainerFactory") 
 | 
    public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { 
 | 
        String msg = record.value(); 
 | 
        try { 
 | 
            Map<String, Object> data = JSON.parseObject(msg, Map.class); 
 | 
            Object mac = data.get("mac"); 
 | 
            Object time = data.get("DataTime"); 
 | 
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { 
 | 
                log.warn("some properties is null, param{}", msg); 
 | 
                ack.acknowledge(); 
 | 
                return; 
 | 
            } 
 | 
  
 | 
            //数据过滤 
 | 
            data.remove("time"); 
 | 
            data.remove("entryTime"); 
 | 
            Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); 
 | 
            Map<String, Object> newMap = new HashMap<>(); 
 | 
            Map.Entry<String, Object> next; 
 | 
            while (iterator.hasNext()) { 
 | 
                next = iterator.next(); 
 | 
                String key = next.getKey(); 
 | 
                Object value = next.getValue(); 
 | 
                if (key.contains("-Avg")) { 
 | 
                    newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); 
 | 
                } else { 
 | 
                    newMap.put(key, value); 
 | 
                } 
 | 
                iterator.remove(); 
 | 
            } 
 | 
            //存入数据库 
 | 
            historyMinutelyService.insertHistoryMinutely(newMap); 
 | 
            ack.acknowledge(); 
 | 
        } catch (Exception e) { 
 | 
            log.error("param{}" + msg); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    //小时数据 
 | 
    @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, containerFactory = "insertListenerContainerFactory") 
 | 
    public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { 
 | 
        String msg = record.value(); 
 | 
        try { 
 | 
            Map<String, Object> data = JSON.parseObject(msg, Map.class); 
 | 
            Object mac = data.get("mac"); 
 | 
            Object time = data.get("DataTime"); 
 | 
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { 
 | 
                log.warn("some properties is null, param{}", msg); 
 | 
                ack.acknowledge(); 
 | 
                return; 
 | 
            } 
 | 
  
 | 
            //数据过滤 
 | 
            data.remove("time"); 
 | 
            data.remove("entryTime"); 
 | 
            Iterator<Map.Entry<String, Object>> iterator = data.entrySet().iterator(); 
 | 
            Map<String, Object> newMap = new HashMap<>(); 
 | 
            Map.Entry<String, Object> next; 
 | 
            while (iterator.hasNext()) { 
 | 
                next = iterator.next(); 
 | 
                String key = next.getKey(); 
 | 
                Object value = next.getValue(); 
 | 
                if (key.contains("-Avg")) { 
 | 
                    newMap.put(key.replaceAll("-Avg", ""), Double.parseDouble(value.toString())); 
 | 
                } else { 
 | 
                    newMap.put(key, value); 
 | 
                } 
 | 
                iterator.remove(); 
 | 
            } 
 | 
            //存入数据库 
 | 
            historyHourlyService.insertHistoryHourly(newMap); 
 | 
            ack.acknowledge(); 
 | 
        } catch (Exception e) { 
 | 
            log.error("param{}" + msg); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    //秒数据,修改设备状态,缓存最新秒数据 
 | 
    @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, containerFactory = "stateListenerContainerFactory") 
 | 
    public void listenSecond(ConsumerRecord<String, String> record) { 
 | 
        String msg = record.value(); 
 | 
        try { 
 | 
            Map<String, Object> data = JSON.parseObject(msg, Map.class); 
 | 
            Object mac = data.get("mac"); 
 | 
            Object time = data.get("DataTime"); 
 | 
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { 
 | 
                log.warn("some properties is null, param{}", msg); 
 | 
                return; 
 | 
            } 
 | 
            //数据过滤 
 | 
            data.remove("time"); 
 | 
            data.remove("entryTime"); 
 | 
  
 | 
            //数据校准 
 | 
            data = deviceService.adjustDeviceData(data); 
 | 
            //存入redis 
 | 
            data.put("DataTime", time); 
 | 
            redisTemplate.opsForHash().put(RedisConstants.DATA_SECOND, mac, data); 
 | 
            //判断并修改设备状态 
 | 
            data.put("mac", mac); 
 | 
            deviceService.judgeDeviceState(data); 
 | 
        } catch (Exception e) { 
 | 
            log.error("param{}" + msg); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    //无人机秒数据 
 | 
    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") 
 | 
    public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { 
 | 
        String msg = record.value(); 
 | 
        try { 
 | 
            Map<String, Object> data = JSON.parseObject(msg, Map.class); 
 | 
            Object mac = data.get("mac"); 
 | 
            Object time = data.get("DataTime"); 
 | 
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { 
 | 
                log.warn("some properties is null, param{}", msg); 
 | 
                ack.acknowledge(); 
 | 
                return; 
 | 
            } 
 | 
  
 | 
            //数据过滤 
 | 
            data.remove("time"); 
 | 
            data.remove("entryTime"); 
 | 
  
 | 
            historySecondUavService.insertHistorySecond(data); 
 | 
            ack.acknowledge(); 
 | 
        } catch (Exception e) { 
 | 
            log.error("param{}" + msg); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    //走航车秒数据 
 | 
    @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "insertListenerContainerFactory") 
 | 
    public void listenSecondCruiser(ConsumerRecord<String, String> record, Acknowledgment ack) { 
 | 
        String msg = record.value(); 
 | 
        try { 
 | 
            Map<String, Object> data = JSON.parseObject(msg, Map.class); 
 | 
            Object mac = data.get("mac"); 
 | 
            Object time = data.get("DataTime"); 
 | 
            if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { 
 | 
                log.warn("some properties is null, param{}", msg); 
 | 
                ack.acknowledge(); 
 | 
                return; 
 | 
            } 
 | 
  
 | 
            //数据过滤 
 | 
            data.remove("time"); 
 | 
            data.remove("entryTime"); 
 | 
  
 | 
            historySecondCruiserService.insertHistorySecond(data); 
 | 
            ack.acknowledge(); 
 | 
        } catch (Exception e) { 
 | 
            log.error("param{}" + e); 
 | 
        } 
 | 
    } 
 | 
} 
 |