| | |
| | | import com.moral.api.service.DeviceService; |
| | | import com.moral.api.service.HistoryHourlyService; |
| | | import com.moral.api.service.HistoryMinutelyService; |
| | | import com.moral.api.util.AdjustDataUtils; |
| | | import com.moral.constant.KafkaConstants; |
| | | import com.moral.constant.RedisConstants; |
| | | |
| | |
| | | private DeviceService deviceService; |
| | | |
| | | @Autowired |
| | | private AdjustDataUtils adjustDataUtils; |
| | | |
| | | @Autowired |
| | | private RedisTemplate redisTemplate; |
| | | |
| | | //分钟数据 |
| | | @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID_INSERT, containerFactory = "kafkaListenerContainerFactory") |
| | | public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { |
| | | String msg = record.value(); |
| | | System.out.println(msg); |
| | | try { |
| | | Map<String, Object> data = JSON.parseObject(msg, HashMap.class); |
| | | Object mac = data.get("mac"); |
| | |
| | | @KafkaListener(topics = KafkaConstants.TOPIC_SECOND, groupId = KafkaConstants.GROUP_ID_STATE, containerFactory = "kafkaListenerContainerFactory") |
| | | public void listenSecond(ConsumerRecord<String, String> record, Acknowledgment ack) { |
| | | String msg = record.value(); |
| | | //System.out.println(record.offset() + "===>" + msg); |
| | | try { |
| | | Map<String, Object> data = JSON.parseObject(msg, HashMap.class); |
| | | Object mac = data.get("mac"); |
| | |
| | | return; |
| | | } |
| | | //数据校准 |
| | | data = adjustDataUtils.adjust(data); |
| | | data = deviceService.adjustDeviceData(data); |
| | | //存入redis |
| | | redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + "_" + mac, data); |
| | | redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data); |
| | | //判断并修改设备状态 |
| | | deviceService.judgeDeviceState(data); |
| | | ack.acknowledge(); |