package com.moral.api.exception.consumer; import com.moral.api.utils.AdjustDataUtils; import com.moral.api.utils.UnitConvertUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.math.BigDecimal; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.moral.api.entity.Sensor; import com.moral.api.entity.SpecialDevice; import com.moral.api.entity.UnitConversion; import com.moral.api.websocket.CruiserWebSocketServer; import com.moral.constant.KafkaConstants; /* * 走航车数据消费者 * */ //@Component @Slf4j public class CruiserDataConsumer implements ConsumerSeekAware { @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "cruiserDataListenerFactory") public void listenSecondSpecial(ConsumerRecord record) throws Exception { String msg = record.value(); Map data = JSONObject.parseObject(msg, Map.class); CopyOnWriteArraySet sockets = CruiserWebSocketServer.sockets; for (CruiserWebSocketServer socket : sockets) { String messageMac = (String) data.get("mac"); String mac = socket.getMac(); if (!mac.equalsIgnoreCase(messageMac)) continue; //取出基本信息 SpecialDevice specialDevice = socket.getSpecialDevice(); Map adjustFormula = socket.getAdjustFormula(); Map regionAqi = socket.getRegionAqi(); if(adjustFormula!=null) data = AdjustDataUtils.adjust(data,adjustFormula,regionAqi); //创建最终消息对象 Map result = new HashMap<>(); result.put("time",data.get("time")); //拼接单位 List sensors = specialDevice.getVersion().getSensors();//获取型号所有因子信息 for (Sensor sensor : sensors) { String code = sensor.getCode(); String showUnit = sensor.getShowUnit(); String showUnitKey = sensor.getShowUnitKey(); String unitKey = sensor.getUnitKey(); String unit = sensor.getUnit(); //对数据保留两位小数,并且向下取整 if (data.get(code) == null) { continue; } Double sourceDataD = Double.valueOf(String.valueOf(data.get(code))); BigDecimal bg = new BigDecimal(sourceDataD); bg = bg.setScale(2, BigDecimal.ROUND_FLOOR); String sourceData = String.valueOf(sourceDataD); //数据补偿 //单位转换 if (!unitKey.equals(showUnitKey)) {//如果源单位和显示单位不同,则进行单位转换 String formula = sensor.getFormula(); //如果sensor中的公式为空则从缓存中获取公式 if (ObjectUtils.isEmpty(formula)) { List unitConversions = socket.getUnitConversions(); for (UnitConversion unitConversion : unitConversions) { if (unitConversion.getOriginalUnitKey().equals(unitKey) && unitConversion.getTargetUnitKey().equals(showUnitKey)) formula = unitConversion.getFormula(); } } //单位转换 String resultData = UnitConvertUtils.calculate(sourceData, formula); if (resultData != null) { resultData += showUnit; } else {//如果转换出的数据为null,则代表缓存中也没有公式,依然使用源单位。 resultData = sourceData + unit; } result.put(sensor.getCode(), resultData); } else { //拼接单位 sourceData = sourceData + " " + showUnit; result.put(sensor.getCode(), sourceData); } } socket.sendMessage(JSON.toJSONString(result)); } } @Override public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) { } @Override public void onPartitionsAssigned(Map map, ConsumerSeekCallback consumerSeekCallback) { map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition())); } @Override public void onIdleContainer(Map map, ConsumerSeekCallback consumerSeekCallback) { } }