| | |
| | | @Slf4j |
| | | public class SecondDataConsumer implements ConsumerSeekAware { |
| | | |
| | | @KafkaListener(containerFactory = "secondDataListenerFactory",topics = "second_data") |
| | | @KafkaListener(containerFactory = "secondDataListenerFactory", topics = "second_data") |
| | | public void listen(ConsumerRecord<String, String> record, Consumer consumer) throws Exception { |
| | | String messageStr = record.value(); |
| | | Map<String, Object> message = (Map<String, Object>) JSON.parse(messageStr); |
| | |
| | | String unitKey = sensor.getUnitKey(); |
| | | String unit = sensor.getUnit(); |
| | | //如果消息中没有该因子则退出循环 |
| | | if(message.get(code)==null) |
| | | continue; |
| | | Object value = message.get(code); |
| | | //对数据保留两位小数,并且向下取整 |
| | | Double sourceDataD = Double.valueOf(String.valueOf(message.get(code))); |
| | | /*if(value==null) 源代码 |
| | | continue; |
| | | Double sourceDataD = Double.valueOf(String.valueOf(value));*/ |
| | | //测试代码使用,给臭气一个固定值 start |
| | | Double sourceDataD = null; |
| | | if (value != null) { |
| | | sourceDataD = Double.valueOf(String.valueOf(value)); |
| | | }else{ |
| | | sourceDataD = 5.00d; |
| | | } |
| | | //测试代码使用,给臭气一个固定值 end |
| | | BigDecimal bg = new BigDecimal(sourceDataD); |
| | | bg = bg.setScale(2,BigDecimal.ROUND_FLOOR); |
| | | bg = bg.setScale(2, BigDecimal.ROUND_FLOOR); |
| | | String sourceData = bg.toString(); |
| | | //数据补偿 |
| | | //单位转换 |
| | |
| | | resultMessgae.put(sensor.getCode(), resultData); |
| | | } else { |
| | | //拼接单位 |
| | | sourceData = sourceData + " " + showUnit; |
| | | if(!showUnit.equals("无单位")) |
| | | sourceData = sourceData + " " + showUnit; |
| | | resultMessgae.put(sensor.getCode(), sourceData); |
| | | } |
| | | } |