2 files renamed
3 files deleted
2 files added
15 files modified
| | |
| | | private int concurrency; |
| | | @Value("${kafka.groupId.second-data}") |
| | | private String secondDataGroupId; |
| | | @Value("${kafka.groupId.cruiser-data}") |
| | | private String cruiserDataGroupId; |
| | | |
| | | @Bean |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { |
| | |
| | | return factory; |
| | | } |
| | | |
| | | @Bean("cruiserDataListenerFactory") |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){ |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(cruiserDataConsumerFactory());//设置消费者工厂 |
| | | factory.setConcurrency(concurrency);//设置线程数 |
| | | factory.getContainerProperties().setPollTimeout(1500);//设置拉取数据超时时间 |
| | | return factory; |
| | | } |
| | | |
| | | /** |
| | | * @Description: 秒级消费工厂 |
| | | * @Param: [] |
| | |
| | | public ConsumerFactory<String,String> secondDataConsumerFactory(){ |
| | | Map<String, Object> commonConfig = consumerConfigs(); |
| | | Map<String, Object> secondDataConfig = secondConsumerConfigs(); |
| | | secondDataConfig.putAll(commonConfig); |
| | | return new DefaultKafkaConsumerFactory<>(secondDataConfig); |
| | | } |
| | | |
| | | /* |
| | | * 走航车数据消费工厂 |
| | | * */ |
| | | public ConsumerFactory<String,String> cruiserDataConsumerFactory(){ |
| | | Map<String, Object> commonConfig = consumerConfigs(); |
| | | Map<String, Object> secondDataConfig = cruiserConsumerConfigs(); |
| | | secondDataConfig.putAll(commonConfig); |
| | | return new DefaultKafkaConsumerFactory<>(secondDataConfig); |
| | | } |
| | |
| | | return propsMap; |
| | | } |
| | | |
| | | /* |
| | | * 走航车消费组配置 |
| | | * */ |
| | | public Map<String,Object> cruiserConsumerConfigs(){ |
| | | Map<String, Object> propsMap = new HashMap<>(); |
| | | propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId); |
| | | return propsMap; |
| | | } |
| | | |
| | | /** |
| | | * @Description: 通用配置 |
| | |
| | | package com.moral.api.config.websocket; |
| | | |
| | | import com.moral.api.websocket.CruiserWebSocketServer; |
| | | import com.moral.api.websocket.SingleDeviceServer; |
| | | import com.moral.constant.RedisConstants; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | |
| | | @Autowired |
| | | public void setMessageService(RedisTemplate redisTemplate){ |
| | | SingleDeviceServer.redisTemplate = redisTemplate; |
| | | CruiserWebSocketServer.redisTemplate = redisTemplate; |
| | | } |
| | | |
| | | } |
File was renamed from screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java |
| | |
| | | import com.moral.util.WebUtils; |
| | | |
| | | @Slf4j |
| | | @Api(tags = {"特殊设备"}) |
| | | @Api(tags = {"走航车"}) |
| | | @RestController |
| | | @CrossOrigin(origins = "*", maxAge = 3600) |
| | | @RequestMapping("/specialDevice") |
| | | public class SpecialDeviceController { |
| | | @RequestMapping("/cruiser") |
| | | public class CruiserController { |
| | | |
| | | @Autowired |
| | | private SpecialDeviceService specialDeviceService; |
| | |
| | | /** |
| | | * @return 返回请求成功后的对象信息 |
| | | */ |
| | | @GetMapping("getCarsByOrg") |
| | | @GetMapping("selectCruisers") |
| | | @ApiOperation(value = "获取当前组织下所有走航车列表", notes = "走航车轨迹") |
| | | public ResultMessage getCarsInfo() { |
| | | List<Map<String, Object>> response = specialDeviceService.getCarsInfo(); |
| | |
| | | * @param request 请求信息 |
| | | * @return 返回请求成功后的对象信息 |
| | | */ |
| | | @GetMapping("carTrajectory") |
| | | @GetMapping("cruiserTrajectory") |
| | | @ApiOperation(value = "走航车轨迹", notes = "走航车轨迹") |
| | | @ApiImplicitParams(value = { |
| | | @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String"), |
New file |
| | |
| | | package com.moral.api.kafka.consumer; |
| | | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.apache.kafka.clients.consumer.ConsumerRecord; |
| | | import org.apache.kafka.common.TopicPartition; |
| | | import org.springframework.kafka.annotation.KafkaListener; |
| | | import org.springframework.kafka.listener.ConsumerSeekAware; |
| | | import org.springframework.kafka.support.Acknowledgment; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import java.math.BigDecimal; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.CopyOnWriteArraySet; |
| | | |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.moral.api.entity.Sensor; |
| | | import com.moral.api.entity.SpecialDevice; |
| | | import com.moral.api.entity.UnitConversion; |
| | | import com.moral.api.websocket.CruiserWebSocketServer; |
| | | import com.moral.constant.KafkaConstants; |
| | | import com.moral.util.UnitConvertUtils; |
| | | |
| | | /* |
| | | * 走航车数据消费者 |
| | | * */ |
| | | @Component |
| | | @Slf4j |
| | | public class CruiserDataConsumer implements ConsumerSeekAware { |
| | | |
| | | @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "cruiserDataListenerFactory") |
| | | public void listenSecondSpecial(ConsumerRecord<String, String> record) throws Exception { |
| | | String msg = record.value(); |
| | | Map<String, Object> data = JSONObject.parseObject(msg, Map.class); |
| | | CopyOnWriteArraySet<CruiserWebSocketServer> sockets = CruiserWebSocketServer.sockets; |
| | | for (CruiserWebSocketServer socket : sockets) { |
| | | String messageMac = (String) data.get("mac"); |
| | | String mac = socket.getMac(); |
| | | if (!mac.equalsIgnoreCase(messageMac)) |
| | | continue; |
| | | //取出基本信息 |
| | | SpecialDevice specialDevice = socket.getSpecialDevice(); |
| | | //创建最终消息对象 |
| | | Map<String, Object> result = new HashMap<>(); |
| | | result.put("time",data.get("time")); |
| | | //拼接单位 |
| | | List<Sensor> sensors = specialDevice.getVersion().getSensors();//获取型号所有因子信息 |
| | | for (Sensor sensor : sensors) { |
| | | String code = sensor.getCode(); |
| | | String showUnit = sensor.getShowUnit(); |
| | | String showUnitKey = sensor.getShowUnitKey(); |
| | | String unitKey = sensor.getUnitKey(); |
| | | String unit = sensor.getUnit(); |
| | | //对数据保留两位小数,并且向下取整 |
| | | if (data.get(code) == null) { |
| | | continue; |
| | | } |
| | | Double sourceDataD = Double.valueOf(String.valueOf(data.get(code))); |
| | | /*BigDecimal bg = new BigDecimal(sourceDataD); |
| | | bg = bg.setScale(2, BigDecimal.ROUND_FLOOR);*/ |
| | | String sourceData = String.valueOf(sourceDataD); |
| | | //数据补偿 |
| | | //单位转换 |
| | | if (!unitKey.equals(showUnitKey)) {//如果源单位和显示单位不同,则进行单位转换 |
| | | String formula = sensor.getFormula(); |
| | | //如果sensor中的公式为空则从缓存中获取公式 |
| | | if (ObjectUtils.isEmpty(formula)) { |
| | | List<UnitConversion> unitConversions = socket.getUnitConversions(); |
| | | for (UnitConversion unitConversion : unitConversions) { |
| | | if (unitConversion.getOriginalUnitKey().equals(unitKey) && unitConversion.getTargetUnitKey().equals(showUnitKey)) |
| | | formula = unitConversion.getFormula(); |
| | | } |
| | | } |
| | | //单位转换 |
| | | String resultData = UnitConvertUtils.calculate(sourceData, formula); |
| | | if (resultData != null) { |
| | | resultData += showUnit; |
| | | } else {//如果转换出的数据为null,则代表缓存中也没有公式,依然使用源单位。 |
| | | resultData = sourceData + unit; |
| | | } |
| | | result.put(sensor.getCode(), resultData); |
| | | } else { |
| | | //拼接单位 |
| | | sourceData = sourceData + " " + showUnit; |
| | | result.put(sensor.getCode(), sourceData); |
| | | } |
| | | } |
| | | socket.sendMessage(JSON.toJSONString(result)); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) { |
| | | |
| | | } |
| | | |
| | | @Override |
| | | public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { |
| | | map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition())); |
| | | } |
| | | |
| | | @Override |
| | | public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { |
| | | |
| | | } |
| | | } |
| | |
| | | package com.moral.api.mapper; |
| | | |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
| | | import java.util.Map; |
| | | |
| | | /** |
| | | * <p> |
| | |
| | | * @author moral |
| | | * @since 2021-07-14 |
| | | */ |
| | | public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> { |
| | | public interface HistoryHourlyMapper{ |
| | | |
| | | String selectHourlyData(Map<String,Object> params); |
| | | |
| | | } |
| | |
| | | |
| | | import java.util.Map; |
| | | |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.baomidou.mybatisplus.extension.service.IService; |
| | | |
| | | /** |
| | | * <p> |
| | | * 已校准小时表 服务类 |
| | |
| | | * @author moral |
| | | * @since 2021-07-14 |
| | | */ |
| | | public interface HistoryHourlyService extends IService<HistoryHourly> { |
| | | public interface HistoryHourlyService{ |
| | | |
| | | //根据mac获取小时AQI |
| | | Map<String,Object> getHourlyAqiByMac(String mac); |
| | |
| | | queryWrapper.select("mac", "name").in("mac", macs); |
| | | List<Device> devices = deviceMapper.selectList(queryWrapper); |
| | | |
| | | |
| | | //所选时间 |
| | | List<String> times = (List<String>) params.remove("times"); |
| | | //因子code |
| | |
| | | for (String start : times) { |
| | | if ("hour".equals(type)) { |
| | | end = DateUtils.getDateAddDay(start, 1); |
| | | timeUnits = "hourly"; |
| | | String yearAndMonth = DateUtils.dateToDateString(DateUtils.getDate(start, DateUtils.yyyy_MM_dd_EN), DateUtils.yyyyMM_EN); |
| | | timeUnits = "hourly_" + yearAndMonth; |
| | | dateFormat = "%Y-%m-%d %H"; |
| | | } else if ("day".equals(type)) { |
| | | end = DateUtils.getDateAddMonth(start, 1); |
| | |
| | | |
| | | @Override |
| | | public Device getDeviceByMac(String mac) { |
| | | Map<String,Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE,mac); |
| | | Map<String, Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac); |
| | | Device device = JSON.parseObject(JSON.toJSONString(deviceMap), Device.class); |
| | | //从map获取organizationId和monitorPointId以及versionId |
| | | Map<String,Object> organizationMap = (Map<String,Object>)deviceMap.get("organization"); |
| | | Map<String,Object> monitorPointMap = (Map<String,Object>)deviceMap.get("monitorPoint"); |
| | | Map<String,Object> versionMap = (Map<String,Object>)deviceMap.get("version"); |
| | | Map<String, Object> organizationMap = (Map<String, Object>) deviceMap.get("organization"); |
| | | Map<String, Object> monitorPointMap = (Map<String, Object>) deviceMap.get("monitorPoint"); |
| | | Map<String, Object> versionMap = (Map<String, Object>) deviceMap.get("version"); |
| | | device.setDeviceVersionId((Integer) versionMap.get("id")); |
| | | device.setOrganizationId((Integer) organizationMap.get("id")); |
| | | device.setMonitorPointId((Integer) monitorPointMap.get("id")); |
| | | //如果缓存为空则查询数据库 |
| | | if(ObjectUtils.isEmpty(device)){ |
| | | if (ObjectUtils.isEmpty(device)) { |
| | | return getDeviceByMacFromDB(mac); |
| | | } |
| | | return device; |
| | | } |
| | | |
| | | private Device getDeviceByMacFromDB(String mac){ |
| | | private Device getDeviceByMacFromDB(String mac) { |
| | | QueryWrapper<Device> wrapper = new QueryWrapper<>(); |
| | | wrapper.eq("mac",mac); |
| | | wrapper.eq("is_delete",Constants.NOT_DELETE); |
| | | wrapper.eq("mac", mac); |
| | | wrapper.eq("is_delete", Constants.NOT_DELETE); |
| | | return deviceMapper.selectOne(wrapper); |
| | | } |
| | | |
| | |
| | | package com.moral.api.service.impl; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.moral.api.mapper.HistoryHourlyMapper; |
| | | import com.moral.api.service.HistoryHourlyService; |
| | | import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
| | | import com.moral.constant.Constants; |
| | | import com.moral.util.AQIUtils; |
| | | import com.moral.util.DateUtils; |
| | | |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Service; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | |
| | | * @since 2021-07-14 |
| | | */ |
| | | @Service |
| | | public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService { |
| | | public class HistoryHourlyServiceImpl implements HistoryHourlyService { |
| | | |
| | | @Autowired |
| | | private HistoryHourlyMapper historyHourlyMapper; |
| | | |
| | | @Override |
| | | public Map<String, Object> getHourlyAqiByMac(String mac) { |
| | | QueryWrapper<HistoryHourly> queryWrapper = new QueryWrapper<>(); |
| | | String time = DateUtils.dateToDateString(new Date(), DateUtils.yyyy_MM_dd_HH_EN) + ":00:00"; |
| | | queryWrapper.eq("time", time).eq("mac", mac); |
| | | Date now = new Date(); |
| | | String time = DateUtils.dateToDateString(now, DateUtils.yyyy_MM_dd_HH_EN) + ":00:00"; |
| | | //获取小时数据 |
| | | HistoryHourly historyHourly = historyHourlyMapper.selectOne(queryWrapper); |
| | | Map<String, Object> params = new HashMap<>(); |
| | | params.put("timeUnits", DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN)); |
| | | params.put("mac", mac); |
| | | params.put("time", time); |
| | | String value = historyHourlyMapper.selectHourlyData(params); |
| | | Map<String, Object> result = new HashMap<>(); |
| | | if (historyHourly == null) { |
| | | if (ObjectUtils.isEmpty(value)) { |
| | | result.put("AQI", Constants.NULL_VALUE); |
| | | return result; |
| | | } |
| | | Map<String, Object> data = JSONObject.parseObject(historyHourly.getValue(), Map.class); |
| | | Map<String, Object> data = JSONObject.parseObject(value, Map.class); |
| | | result.put("AQI", AQIUtils.hourlyAqi(data)); |
| | | return result; |
| | | } |
New file |
| | |
| | | package com.moral.api.websocket; |
| | | |
| | | import lombok.Data; |
| | | import org.springframework.data.redis.core.RedisTemplate; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.util.ObjectUtils; |
| | | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.CopyOnWriteArraySet; |
| | | |
| | | import javax.websocket.OnClose; |
| | | import javax.websocket.OnError; |
| | | import javax.websocket.OnMessage; |
| | | import javax.websocket.OnOpen; |
| | | import javax.websocket.Session; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | |
| | | import com.moral.api.entity.SpecialDevice; |
| | | import com.moral.api.entity.UnitConversion; |
| | | import com.moral.constant.RedisConstants; |
| | | |
| | | //走航车实时websocket |
| | | @ServerEndpoint("/cruiserWebsocket/{mac}") |
| | | @Component |
| | | @Data |
| | | public class CruiserWebSocketServer { |
| | | |
| | | //线程安全集合,用于存放server对象 |
| | | public static CopyOnWriteArraySet<CruiserWebSocketServer> sockets = new CopyOnWriteArraySet<>(); |
| | | |
| | | public static RedisTemplate redisTemplate; |
| | | |
| | | private Session session; |
| | | |
| | | private String mac; |
| | | |
| | | private SpecialDevice specialDevice; |
| | | |
| | | private Map<String, Object> regionAqi; |
| | | |
| | | private List<UnitConversion> unitConversions; |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session, @PathParam("mac") String mac) { |
| | | this.session = session; |
| | | this.mac = mac; |
| | | this.specialDevice = (SpecialDevice) redisTemplate.opsForHash().get(RedisConstants.SPECIAL_DEVICE_INFO, mac); |
| | | //获取设备地区对应的AQI用于补偿使用 |
| | | Map<String, Object> deviceInfo = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac); |
| | | Map<String, Object> orgInfo = (Map<String, Object>) deviceInfo.get("organization"); |
| | | String areaCode = String.valueOf(orgInfo.get("areaCode")); |
| | | String cityCode = String.valueOf(orgInfo.get("cityCode")); |
| | | try { |
| | | this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, areaCode); |
| | | if (ObjectUtils.isEmpty(this.regionAqi)) |
| | | this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, cityCode); |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | } |
| | | sockets.add(this); |
| | | } |
| | | |
| | | @OnClose |
| | | public void onClose() { |
| | | sockets.remove(this); |
| | | } |
| | | |
| | | @OnMessage |
| | | public void onMessage(String message, Session session) { |
| | | System.out.println("websocket==" + message); |
| | | } |
| | | |
| | | @OnError |
| | | public void onError(Session session, Throwable error) { |
| | | } |
| | | |
| | | public void sendMessage(String message) throws Exception { |
| | | if (this.session.isOpen()) { |
| | | // synchronized (session) { |
| | | this.session.getBasicRemote().sendText(message); |
| | | // } |
| | | } |
| | | } |
| | | } |
| | |
| | | <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
| | | <mapper namespace="com.moral.api.mapper.HistoryHourlyMapper"> |
| | | |
| | | <!-- 通用查询映射结果 --> |
| | | <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly"> |
| | | <result column="mac" property="mac"/> |
| | | <result column="time" property="time"/> |
| | | <result column="value" property="value"/> |
| | | <result column="version" property="version"/> |
| | | </resultMap> |
| | | |
| | | <select id="selectHourlyData" resultType="java.lang.String"> |
| | | SELECT `value` FROM history_hourly_${timeUnits} WHERE mac = #{mac} AND `time` = #{time} |
| | | </select> |
| | | </mapper> |
| | |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | |
| | | import com.moral.api.entity.HistoryHourly; |
| | | |
| | | /** |
| | | * <p> |
| | | * 已校准小时表 Mapper 接口 |
| | |
| | | |
| | | void createTable(String timeUnits); |
| | | |
| | | void insertHistoryHourly(List<HistoryHourly> list); |
| | | void insertHistoryHourly(List<Map<String, Object>> list); |
| | | |
| | | Integer selectCountByTime(Map<String, Object> params); |
| | | |
| | |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.moral.api.entity.Sensor; |
| | | import com.moral.api.mapper.HistoryHourlyMapper; |
| | | import com.moral.api.mapper.HistoryMinutelyMapper; |
| | |
| | | .collect(Collectors.groupingBy(o -> (String) o.get("mac"))); |
| | | |
| | | //存入数据库的结果集 |
| | | List<HistoryHourly> insertData = new ArrayList<>(); |
| | | List<Map<String, Object>> insertData = new ArrayList<>(); |
| | | |
| | | data.forEach((key, value) -> { |
| | | HistoryHourly historyHourly = new HistoryHourly(); |
| | | historyHourly.setMac(key); |
| | | historyHourly.setTime(end); |
| | | Map<String, Object> jsonMap = new HashMap<>(); |
| | | Map<String, Object> historyHourly = new HashMap<>(); |
| | | historyHourly.put("mac", key); |
| | | historyHourly.put("time", end); |
| | | |
| | | Map<String, Object> jsonMap = new HashMap<>(); |
| | | Map<String, Object> map = new HashMap<>(); |
| | | map.put("data", value); |
| | | map.put("type", "hour"); |
| | |
| | | } |
| | | } |
| | | }); |
| | | historyHourly.setValue(JSONObject.toJSONString(jsonMap)); |
| | | historyHourly.setVersion((Integer) value.get(0).get("version")); |
| | | historyHourly.put("version", value.get(0).get("version")); |
| | | historyHourly.put("value", JSONObject.toJSONString(jsonMap)); |
| | | insertData.add(historyHourly); |
| | | }); |
| | | |
| | |
| | | `mac` VARCHAR (20) DEFAULT NULL COMMENT '设备mac', |
| | | `time` datetime DEFAULT NULL COMMENT '数据时间', |
| | | `value` json DEFAULT NULL COMMENT '数据', |
| | | `version` INT(11) DEFAULT NULL COMMENT '型号', |
| | | KEY `idx_mac_time` (`mac`,`time`) |
| | | ) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '小时数据表' |
| | | </update> |
| | | |
| | | <insert id="insertHistoryHourly"> |
| | | INSERT INTO history_hourly VALUES |
| | | INSERT INTO history_hourly_${timeUnits} VALUES |
| | | <foreach collection="list" item="item" separator=","> |
| | | (#{item.mac}, #{item.time}, #{item.value}, #{item.version}) |
| | | </foreach> |
| | |
| | | @Value("${kafka.groupId.state}") |
| | | private String stateGroupId; |
| | | |
| | | @Bean |
| | | @Bean("insertListenerContainerFactory") |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() { |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(insertConsumerFactory()); |
| | |
| | | return factory; |
| | | } |
| | | |
| | | @Bean |
| | | @Bean("stateListenerContainerFactory") |
| | | public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() { |
| | | ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); |
| | | factory.setConsumerFactory(stateConsumerFactory()); |
| | |
| | | } |
| | | |
| | | /* |
| | | * 通用配置 |
| | | * */ |
| | | * 通用配置 |
| | | * */ |
| | | public Map<String, Object> consumerConfigs() { |
| | | Map<String, Object> propsMap = new HashMap<>(); |
| | | propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); |
File was renamed from screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java |
| | |
| | | import com.moral.constant.RedisConstants; |
| | | |
| | | /* |
| | | * 设备数据接入 |
| | | * 普通设备消费者 |
| | | * */ |
| | | @Component |
| | | @Slf4j |
| | | public class KafkaConsumer { |
| | | public class DeviceConsumer { |
| | | |
| | | @Autowired |
| | | private HistoryMinutelyService historyMinutelyService; |
| | |
| | | } |
| | | |
| | | //特殊设备秒数据 |
| | | @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory") |
| | | @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") |
| | | public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { |
| | | String msg = record.value(); |
| | | try { |
| | |
| | | package com.moral.api.mapper; |
| | | |
| | | import org.apache.ibatis.annotations.Param; |
| | | |
| | | import java.util.Map; |
| | | |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
| | | |
| | | |
| | | /** |
| | | * <p> |
| | |
| | | * @author moral |
| | | * @since 2021-06-04 |
| | | */ |
| | | public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> { |
| | | public interface HistoryHourlyMapper { |
| | | |
| | | void insertHistoryHourly(Map<String, Object> params); |
| | | |
| | |
| | | |
| | | import java.util.Map; |
| | | |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.baomidou.mybatisplus.extension.service.IService; |
| | | |
| | | /** |
| | | * <p> |
| | | * 小时表 服务类 |
| | |
| | | * @author moral |
| | | * @since 2021-06-04 |
| | | */ |
| | | public interface HistoryHourlyService extends IService<HistoryHourly> { |
| | | public interface HistoryHourlyService { |
| | | |
| | | //小时数据insert |
| | | void insertHistoryHourly(Map<String, Object> data); |
| | |
| | | package com.moral.api.service.impl; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.moral.api.entity.HistoryHourly; |
| | | import com.moral.api.mapper.HistoryHourlyMapper; |
| | | import com.moral.api.service.DeviceService; |
| | | import com.moral.api.service.HistoryHourlyService; |
| | | import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; |
| | | import com.moral.constant.Constants; |
| | | import com.moral.constant.RedisConstants; |
| | | import com.moral.util.DateUtils; |
| | |
| | | * @since 2021-06-04 |
| | | */ |
| | | @Service |
| | | public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService { |
| | | public class HistoryHourlyServiceImpl implements HistoryHourlyService { |
| | | |
| | | @Autowired |
| | | private HistoryHourlyMapper historyHourlyMapper; |
| | |
| | | |
| | | Map<String, Object> dataAdjust = new HashMap<>(data); |
| | | Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN); |
| | | String yearAndMonth = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN); |
| | | |
| | | Map<String, Object> result = new HashMap<>(); |
| | | result.put("mac", mac); |
| | | result.put("time", time); |
| | | result.put("version", version); |
| | | result.put("timeUnits", Constants.UN_ADJUST); |
| | | result.put("timeUnits", yearAndMonth + "_" + Constants.UN_ADJUST); |
| | | result.put("value", JSONObject.toJSONString(data)); |
| | | //未校准小时表insert |
| | | historyHourlyMapper.insertHistoryHourly(result); |
| | |
| | | //数据校准 |
| | | dataAdjust = deviceService.adjustDeviceData(dataAdjust); |
| | | |
| | | HistoryHourly historyHourly = new HistoryHourly(); |
| | | historyHourly.setMac(mac); |
| | | historyHourly.setTime(time); |
| | | historyHourly.setVersion(version); |
| | | historyHourly.setValue(JSONObject.toJSONString(dataAdjust)); |
| | | |
| | | //校准后小时表insert |
| | | historyHourlyMapper.insert(historyHourly); |
| | | result.put("timeUnits", yearAndMonth); |
| | | result.put("value", JSONObject.toJSONString(dataAdjust)); |
| | | historyHourlyMapper.insertHistoryHourly(result); |
| | | } |
| | | } |
| | |
| | | <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> |
| | | <mapper namespace="com.moral.api.mapper.HistoryHourlyMapper"> |
| | | |
| | | <!-- 通用查询映射结果 --> |
| | | <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly"> |
| | | <result column="mac" property="mac"/> |
| | | <result column="time" property="time"/> |
| | | <result column="value" property="value"/> |
| | | <result column="version" property="version"/> |
| | | </resultMap> |
| | | |
| | | <insert id="insertHistoryHourly"> |
| | | INSERT INTO history_hourly_${timeUnits} |
| | | VALUES (#{mac}, #{time}, #{value}, #{version}) |