| | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Value; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.kafka.annotation.KafkaListener; |
| | | import org.springframework.kafka.support.Acknowledgment; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | |
| | | import java.util.*; |
| | | |
| | | |
| | | @Component |
| | | @Slf4j |
| | | public class DeviceConsumer { |
| | | |
| | | @Value("${result.device.list}") |
| | | private String deviceList; |
| | | @Autowired |
| | | private HistoryMinutelyService historyMinutelyService; |
| | | |
| | |
| | | public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { |
| | | String msg = record.value(); |
| | | try { |
| | | |
| | | Map<String, Object> data = JSON.parseObject(msg, Map.class); |
| | | Object mac = data.get("mac"); |
| | | Object time = data.get("DataTime"); |
| | | if (ObjectUtils.isEmpty(time) || ObjectUtils.isEmpty(mac)) { |
| | | log.warn("some properties is null, param{}", msg); |
| | | ack.acknowledge(); |
| | | return; |
| | | } |
| | | List<String> listDictDevice = Arrays.asList(deviceList.split(",")); |
| | | if(!listDictDevice.contains(mac.toString())){ |
| | | ack.acknowledge(); |
| | | } |
| | | //数据过滤 |
| | | data.remove("time"); |
| | |
| | | ack.acknowledge(); |
| | | } catch (Exception e) { |
| | | log.error("param{}" + msg); |
| | | ack.acknowledge(); |
| | | } |
| | | } |
| | | |