package com.moral.api.kafka.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.HashMap; import java.util.Map; import com.alibaba.fastjson.JSON; import com.moral.api.service.DeviceService; import com.moral.api.service.HistoryHourlyService; import com.moral.api.service.HistoryMinutelyService; import com.moral.constant.KafkaConstants; /*@Slf4j @Component public class KafkaReceiver { @Autowired private HistoryMinutelyService historyMinutelyService; @Autowired private HistoryHourlyService historyHourlyService; @Autowired private DeviceService deviceService; //分钟数据 @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID, containerFactory = "kafkaListenerContainerFactory") public void listenMinute(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, HashMap.class); System.out.println(data); Object mac = data.get("mac"); Object time = data.get("DataTime"); Object ver = data.get("ver"); if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) { log.warn("some properties is null, param[0] message:" + msg); return; } Map deviceInfo = deviceService.getDeviceByMac(mac.toString()); if (deviceInfo == null) { String deviceRealState = "null or deleted"; log.warn("device record is " + deviceRealState + ", param[0] message:" + msg); return; } //清除毫秒,四舍五入 data.put("DataTime", Math.round(new Double((Long) time) / 1000) * 1000); //存入数据库 historyMinutelyService.insertHistoryMinutely(data); ack.acknowledge(); } catch (Exception e) { log.error("param[0] message:" + msg); } } //小时数据 @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID, containerFactory = "kafkaListenerContainerFactory") public void listenHour(ConsumerRecord record, Acknowledgment ack) { String msg = record.value(); try { Map data = JSON.parseObject(msg, HashMap.class); System.out.println(data); Object mac = data.get("mac"); Object time = data.get("DataTime"); Object ver = data.get("ver"); if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) { log.warn("some properties is null, param[0] message:" + msg); return; } Map deviceInfo = deviceService.getDeviceByMac(mac.toString()); if (deviceInfo == null) { String deviceRealState = "null or deleted"; log.warn("device record is " + deviceRealState + ", param[0] message:" + msg); return; } //清除毫秒,四舍五入 data.put("DataTime", Math.round(new Double((Long) time) / 1000) * 1000); //存入数据库 historyHourlyService.insertHistoryHourly(data); ack.acknowledge(); } catch (Exception e) { log.error("param[0] message:" + msg); } } }*/