jinpengyong
2021-08-02 dce0021131e955bac968cb2a7e24ce3673eb1a3b
screen-api/src/main/java/com/moral/api/kafka/consumer/SecondDataConsumer.java
@@ -6,9 +6,12 @@
import com.moral.api.entity.UnitConversion;
import com.moral.api.websocket.SingleDeviceServer;
import com.moral.util.UnitConvertUtils;
import lombok.AllArgsConstructor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.stereotype.Component;
@@ -29,7 +32,7 @@
@Component
public class SecondDataConsumer implements ConsumerSeekAware {
    @KafkaListener(topics = "second_data", groupId = "SecondsDataGroup996")
    @KafkaListener(containerFactory = "secondDataListenerFactory",topics = "second_data")
    public void listen(ConsumerRecord<String, String> record, Consumer consumer) throws Exception {
        String messageStr = record.value();
        Map<String, Object> message = (Map<String, Object>) JSON.parse(messageStr);
@@ -39,7 +42,7 @@
            //判断消息书否数据该socket
            String messageMac = (String) message.get("mac");
            String mac = socket.getMac();
            if(!mac.equalsIgnoreCase(messageMac))
            if (!mac.equalsIgnoreCase(messageMac))
                continue;
            //取出基本信息
            Map<String, Device> devicesInfo = socket.getDevicesInfo();
@@ -70,22 +73,22 @@
                    //单位转换
                    String resultData = UnitConvertUtils.calculate(sourceData, formula);
                    if (resultData != null) {
                        resultData+=showUnit;
                    }else{//如果转换出的数据为null,则代表缓存中也没有公式,依然使用源单位。
                        resultData = sourceData+unit;
                        resultData += showUnit;
                    } else {//如果转换出的数据为null,则代表缓存中也没有公式,依然使用源单位。
                        resultData = sourceData + unit;
                    }
                    resultMessgae.put(sensor.getCode(), resultData);
                } else {
                    //拼接单位
                    sourceData += showUnit;
                    sourceData = sourceData + " " + showUnit;
                    resultMessgae.put(sensor.getCode(), sourceData);
                }
            }
            resultMessgae.put("DataTime",message.get("DataTime"));
            resultMessgae.put("time",message.get("time"));
            resultMessgae.put("time1",message.get("time1"));
            resultMessgae.put("serverTime",System.currentTimeMillis());
            resultMessgae.put("serverStartTime",serverStartTime);
            resultMessgae.put("DataTime", message.get("DataTime"));
            resultMessgae.put("time", message.get("time"));
            resultMessgae.put("time1", message.get("time1"));
            resultMessgae.put("serverTime", System.currentTimeMillis());
            resultMessgae.put("serverStartTime", serverStartTime);
            socket.sendMessage(JSON.toJSONString(resultMessgae));
        }
    }