| | |
| | | import com.moral.api.service.DeviceService; |
| | | import com.moral.api.service.HistoryHourlyService; |
| | | import com.moral.api.service.HistoryMinutelyService; |
| | | import com.moral.api.util.AdjustDataUtils; |
| | | import com.moral.constant.KafkaConstants; |
| | | import com.moral.constant.RedisConstants; |
| | | |
| | | //@Component |
| | | @Component |
| | | @Slf4j |
| | | public class KafkaConsumer { |
| | | |
| | |
| | | |
| | | @Autowired |
| | | private DeviceService deviceService; |
| | | |
| | | @Autowired |
| | | private AdjustDataUtils adjustDataUtils; |
| | | |
| | | @Autowired |
| | | private RedisTemplate redisTemplate; |
| | |
| | | return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); |
| | | }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); |
| | | data.remove("time"); |
| | | data.remove("entryTime"); |
| | | //存入数据库 |
| | | historyMinutelyService.insertHistoryMinutely(data); |
| | | ack.acknowledge(); |
| | |
| | | return !(key.contains("Min") || key.contains("Max") || key.contains("Cou")); |
| | | }).collect(Collectors.toMap(m -> m.getKey().replaceAll("-Avg", ""), Map.Entry::getValue)); |
| | | data.remove("time"); |
| | | data.remove("entryTime"); |
| | | //存入数据库 |
| | | historyHourlyService.insertHistoryHourly(data); |
| | | ack.acknowledge(); |
| | |
| | | return; |
| | | } |
| | | //数据校准 |
| | | data = adjustDataUtils.adjust(data); |
| | | data = deviceService.adjustDeviceData(data); |
| | | //存入redis |
| | | redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + "_" + mac, data); |
| | | redisTemplate.opsForValue().set(RedisConstants.DEVICE_DATA + mac, data); |
| | | //判断并修改设备状态 |
| | | deviceService.judgeDeviceState(data); |
| | | ack.acknowledge(); |