From 7607bab6e868a51609164ce111c9d5e1046cd11f Mon Sep 17 00:00:00 2001 From: jinpengyong <jpy123456> Date: Wed, 01 Sep 2021 14:44:00 +0800 Subject: [PATCH] 走航车实时websocket,小时表分表 --- screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java | 17 - screen-api/src/main/java/com/moral/api/controller/CruiserController.java | 10 screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java | 109 +++++++++++++ screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml | 3 screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java | 8 screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java | 7 screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java | 21 +- /dev/null | 46 ----- screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java | 20 +- screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java | 5 screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java | 29 +++ screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java | 4 screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml | 8 - screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java | 8 screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java | 7 screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java | 4 screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java | 86 ++++++++++ screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java | 5 screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java | 15 - screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml | 11 - 20 files changed, 287 insertions(+), 136 deletions(-) diff --git a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java index cc68807..0271ad4 100644 --- a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java +++ b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java @@ -31,6 +31,8 @@ private int concurrency; @Value("${kafka.groupId.second-data}") private String secondDataGroupId; + @Value("${kafka.groupId.cruiser-data}") + private String cruiserDataGroupId; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { @@ -54,6 +56,15 @@ return factory; } + @Bean("cruiserDataListenerFactory") + public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){ + ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(cruiserDataConsumerFactory());//��������������������� + factory.setConcurrency(concurrency);//��������������� + factory.getContainerProperties().setPollTimeout(1500);//������������������������������ + return factory; + } + /** * @Description: ������������������ * @Param: [] @@ -64,6 +75,16 @@ public ConsumerFactory<String,String> secondDataConsumerFactory(){ Map<String, Object> commonConfig = consumerConfigs(); Map<String, Object> secondDataConfig = secondConsumerConfigs(); + secondDataConfig.putAll(commonConfig); + return new DefaultKafkaConsumerFactory<>(secondDataConfig); + } + + /* + * ��������������������������� + * */ + public ConsumerFactory<String,String> cruiserDataConsumerFactory(){ + Map<String, Object> commonConfig = consumerConfigs(); + Map<String, Object> secondDataConfig = cruiserConsumerConfigs(); secondDataConfig.putAll(commonConfig); return new DefaultKafkaConsumerFactory<>(secondDataConfig); } @@ -81,6 +102,14 @@ return propsMap; } + /* + * ������������������������ + * */ + public Map<String,Object> cruiserConsumerConfigs(){ + Map<String, Object> propsMap = new HashMap<>(); + propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId); + return propsMap; + } /** * @Description: ������������ diff --git a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java index 8e6393b..9189321 100644 --- a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java +++ b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java @@ -1,7 +1,7 @@ package com.moral.api.config.websocket; +import com.moral.api.websocket.CruiserWebSocketServer; import com.moral.api.websocket.SingleDeviceServer; -import com.moral.constant.RedisConstants; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -26,5 +26,7 @@ @Autowired public void setMessageService(RedisTemplate redisTemplate){ SingleDeviceServer.redisTemplate = redisTemplate; + CruiserWebSocketServer.redisTemplate = redisTemplate; } + } diff --git a/screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java b/screen-api/src/main/java/com/moral/api/controller/CruiserController.java similarity index 93% rename from screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java rename to screen-api/src/main/java/com/moral/api/controller/CruiserController.java index 784d432..ee0ddcd 100644 --- a/screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java +++ b/screen-api/src/main/java/com/moral/api/controller/CruiserController.java @@ -23,11 +23,11 @@ import com.moral.util.WebUtils; @Slf4j -@Api(tags = {"������������"}) +@Api(tags = {"���������"}) @RestController @CrossOrigin(origins = "*", maxAge = 3600) -@RequestMapping("/specialDevice") -public class SpecialDeviceController { +@RequestMapping("/cruiser") +public class CruiserController { @Autowired private SpecialDeviceService specialDeviceService; @@ -35,7 +35,7 @@ /** * @return ������������������������������������ */ - @GetMapping("getCarsByOrg") + @GetMapping("selectCruisers") @ApiOperation(value = "������������������������������������������", notes = "���������������") public ResultMessage getCarsInfo() { List<Map<String, Object>> response = specialDeviceService.getCarsInfo(); @@ -46,7 +46,7 @@ * @param request ������������ * @return ������������������������������������ */ - @GetMapping("carTrajectory") + @GetMapping("cruiserTrajectory") @ApiOperation(value = "���������������", notes = "���������������") @ApiImplicitParams(value = { @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String"), diff --git a/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java deleted file mode 100644 index 045fdd6..0000000 --- a/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java +++ /dev/null @@ -1,47 +0,0 @@ -package com.moral.api.entity; - -import com.baomidou.mybatisplus.extension.activerecord.Model; -import java.io.Serializable; -import java.util.Date; - -import lombok.Data; -import lombok.EqualsAndHashCode; - -/** - * <p> - * ������������������ - * </p> - * - * @author moral - * @since 2021-07-14 - */ -@Data -@EqualsAndHashCode(callSuper = false) -public class HistoryHourly extends Model<HistoryHourly> { - - private static final long serialVersionUID = 1L; - - /** - * ������mac - */ - private String mac; - - /** - * ������������ - */ - private Date time; - - /** - * ������ - */ - private String value; - - /** - * ������ - */ - private Integer version; - - - - -} diff --git a/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java b/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java new file mode 100644 index 0000000..609d2fb --- /dev/null +++ b/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java @@ -0,0 +1,109 @@ +package com.moral.api.kafka.consumer; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.listener.ConsumerSeekAware; +import org.springframework.kafka.support.Acknowledgment; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.math.BigDecimal; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.moral.api.entity.Sensor; +import com.moral.api.entity.SpecialDevice; +import com.moral.api.entity.UnitConversion; +import com.moral.api.websocket.CruiserWebSocketServer; +import com.moral.constant.KafkaConstants; +import com.moral.util.UnitConvertUtils; + +/* + * ������������������������ + * */ +@Component +@Slf4j +public class CruiserDataConsumer implements ConsumerSeekAware { + + @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "cruiserDataListenerFactory") + public void listenSecondSpecial(ConsumerRecord<String, String> record) throws Exception { + String msg = record.value(); + Map<String, Object> data = JSONObject.parseObject(msg, Map.class); + CopyOnWriteArraySet<CruiserWebSocketServer> sockets = CruiserWebSocketServer.sockets; + for (CruiserWebSocketServer socket : sockets) { + String messageMac = (String) data.get("mac"); + String mac = socket.getMac(); + if (!mac.equalsIgnoreCase(messageMac)) + continue; + //������������������ + SpecialDevice specialDevice = socket.getSpecialDevice(); + //������������������������ + Map<String, Object> result = new HashMap<>(); + result.put("time",data.get("time")); + //������������ + List<Sensor> sensors = specialDevice.getVersion().getSensors();//������������������������������ + for (Sensor sensor : sensors) { + String code = sensor.getCode(); + String showUnit = sensor.getShowUnit(); + String showUnitKey = sensor.getShowUnitKey(); + String unitKey = sensor.getUnitKey(); + String unit = sensor.getUnit(); + //������������������������������������������������ + if (data.get(code) == null) { + continue; + } + Double sourceDataD = Double.valueOf(String.valueOf(data.get(code))); + /*BigDecimal bg = new BigDecimal(sourceDataD); + bg = bg.setScale(2, BigDecimal.ROUND_FLOOR);*/ + String sourceData = String.valueOf(sourceDataD); + //������������ + //������������ + if (!unitKey.equals(showUnitKey)) {//������������������������������������������������������������ + String formula = sensor.getFormula(); + //������sensor��������������������������������������������� + if (ObjectUtils.isEmpty(formula)) { + List<UnitConversion> unitConversions = socket.getUnitConversions(); + for (UnitConversion unitConversion : unitConversions) { + if (unitConversion.getOriginalUnitKey().equals(unitKey) && unitConversion.getTargetUnitKey().equals(showUnitKey)) + formula = unitConversion.getFormula(); + } + } + //������������ + String resultData = UnitConvertUtils.calculate(sourceData, formula); + if (resultData != null) { + resultData += showUnit; + } else {//���������������������������null��������������������������������������������������������������� + resultData = sourceData + unit; + } + result.put(sensor.getCode(), resultData); + } else { + //������������ + sourceData = sourceData + " " + showUnit; + result.put(sensor.getCode(), sourceData); + } + } + socket.sendMessage(JSON.toJSONString(result)); + } + } + + @Override + public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) { + + } + + @Override + public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { + map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition())); + } + + @Override + public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) { + + } +} diff --git a/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java index ec74607..819edd9 100644 --- a/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java +++ b/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java @@ -1,7 +1,6 @@ package com.moral.api.mapper; -import com.moral.api.entity.HistoryHourly; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import java.util.Map; /** * <p> @@ -11,6 +10,8 @@ * @author moral * @since 2021-07-14 */ -public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> { +public interface HistoryHourlyMapper{ + + String selectHourlyData(Map<String,Object> params); } diff --git a/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java b/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java index b437717..9d1b811 100644 --- a/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java +++ b/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java @@ -2,9 +2,6 @@ import java.util.Map; -import com.moral.api.entity.HistoryHourly; -import com.baomidou.mybatisplus.extension.service.IService; - /** * <p> * ������������������ ��������� @@ -13,7 +10,7 @@ * @author moral * @since 2021-07-14 */ -public interface HistoryHourlyService extends IService<HistoryHourly> { +public interface HistoryHourlyService{ //������mac������������AQI Map<String,Object> getHourlyAqiByMac(String mac); diff --git a/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java b/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java index 98bffb3..1c1b45b 100644 --- a/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java +++ b/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java @@ -86,7 +86,6 @@ queryWrapper.select("mac", "name").in("mac", macs); List<Device> devices = deviceMapper.selectList(queryWrapper); - //������������ List<String> times = (List<String>) params.remove("times"); //������code @@ -100,7 +99,8 @@ for (String start : times) { if ("hour".equals(type)) { end = DateUtils.getDateAddDay(start, 1); - timeUnits = "hourly"; + String yearAndMonth = DateUtils.dateToDateString(DateUtils.getDate(start, DateUtils.yyyy_MM_dd_EN), DateUtils.yyyyMM_EN); + timeUnits = "hourly_" + yearAndMonth; dateFormat = "%Y-%m-%d %H"; } else if ("day".equals(type)) { end = DateUtils.getDateAddMonth(start, 1); @@ -146,26 +146,26 @@ @Override public Device getDeviceByMac(String mac) { - Map<String,Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE,mac); + Map<String, Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac); Device device = JSON.parseObject(JSON.toJSONString(deviceMap), Device.class); //���map������organizationId���monitorPointId������versionId - Map<String,Object> organizationMap = (Map<String,Object>)deviceMap.get("organization"); - Map<String,Object> monitorPointMap = (Map<String,Object>)deviceMap.get("monitorPoint"); - Map<String,Object> versionMap = (Map<String,Object>)deviceMap.get("version"); + Map<String, Object> organizationMap = (Map<String, Object>) deviceMap.get("organization"); + Map<String, Object> monitorPointMap = (Map<String, Object>) deviceMap.get("monitorPoint"); + Map<String, Object> versionMap = (Map<String, Object>) deviceMap.get("version"); device.setDeviceVersionId((Integer) versionMap.get("id")); device.setOrganizationId((Integer) organizationMap.get("id")); device.setMonitorPointId((Integer) monitorPointMap.get("id")); //������������������������������������ - if(ObjectUtils.isEmpty(device)){ + if (ObjectUtils.isEmpty(device)) { return getDeviceByMacFromDB(mac); } return device; } - private Device getDeviceByMacFromDB(String mac){ + private Device getDeviceByMacFromDB(String mac) { QueryWrapper<Device> wrapper = new QueryWrapper<>(); - wrapper.eq("mac",mac); - wrapper.eq("is_delete",Constants.NOT_DELETE); + wrapper.eq("mac", mac); + wrapper.eq("is_delete", Constants.NOT_DELETE); return deviceMapper.selectOne(wrapper); } diff --git a/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java index 59a1139..58b84a5 100644 --- a/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java +++ b/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java @@ -1,17 +1,15 @@ package com.moral.api.service.impl; import com.alibaba.fastjson.JSONObject; -import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.moral.api.entity.HistoryHourly; import com.moral.api.mapper.HistoryHourlyMapper; import com.moral.api.service.HistoryHourlyService; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.moral.constant.Constants; import com.moral.util.AQIUtils; import com.moral.util.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; import java.util.Date; import java.util.HashMap; @@ -26,24 +24,27 @@ * @since 2021-07-14 */ @Service -public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService { +public class HistoryHourlyServiceImpl implements HistoryHourlyService { @Autowired private HistoryHourlyMapper historyHourlyMapper; @Override public Map<String, Object> getHourlyAqiByMac(String mac) { - QueryWrapper<HistoryHourly> queryWrapper = new QueryWrapper<>(); - String time = DateUtils.dateToDateString(new Date(), DateUtils.yyyy_MM_dd_HH_EN) + ":00:00"; - queryWrapper.eq("time", time).eq("mac", mac); + Date now = new Date(); + String time = DateUtils.dateToDateString(now, DateUtils.yyyy_MM_dd_HH_EN) + ":00:00"; //������������������ - HistoryHourly historyHourly = historyHourlyMapper.selectOne(queryWrapper); + Map<String, Object> params = new HashMap<>(); + params.put("timeUnits", DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN)); + params.put("mac", mac); + params.put("time", time); + String value = historyHourlyMapper.selectHourlyData(params); Map<String, Object> result = new HashMap<>(); - if (historyHourly == null) { + if (ObjectUtils.isEmpty(value)) { result.put("AQI", Constants.NULL_VALUE); return result; } - Map<String, Object> data = JSONObject.parseObject(historyHourly.getValue(), Map.class); + Map<String, Object> data = JSONObject.parseObject(value, Map.class); result.put("AQI", AQIUtils.hourlyAqi(data)); return result; } diff --git a/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java b/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java new file mode 100644 index 0000000..fd31b01 --- /dev/null +++ b/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java @@ -0,0 +1,86 @@ +package com.moral.api.websocket; + +import lombok.Data; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.stereotype.Component; +import org.springframework.util.ObjectUtils; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArraySet; + +import javax.websocket.OnClose; +import javax.websocket.OnError; +import javax.websocket.OnMessage; +import javax.websocket.OnOpen; +import javax.websocket.Session; +import javax.websocket.server.PathParam; +import javax.websocket.server.ServerEndpoint; + +import com.moral.api.entity.SpecialDevice; +import com.moral.api.entity.UnitConversion; +import com.moral.constant.RedisConstants; + +//���������������websocket +@ServerEndpoint("/cruiserWebsocket/{mac}") +@Component +@Data +public class CruiserWebSocketServer { + + //���������������������������������server������ + public static CopyOnWriteArraySet<CruiserWebSocketServer> sockets = new CopyOnWriteArraySet<>(); + + public static RedisTemplate redisTemplate; + + private Session session; + + private String mac; + + private SpecialDevice specialDevice; + + private Map<String, Object> regionAqi; + + private List<UnitConversion> unitConversions; + + @OnOpen + public void onOpen(Session session, @PathParam("mac") String mac) { + this.session = session; + this.mac = mac; + this.specialDevice = (SpecialDevice) redisTemplate.opsForHash().get(RedisConstants.SPECIAL_DEVICE_INFO, mac); + //���������������������������AQI������������������ + Map<String, Object> deviceInfo = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac); + Map<String, Object> orgInfo = (Map<String, Object>) deviceInfo.get("organization"); + String areaCode = String.valueOf(orgInfo.get("areaCode")); + String cityCode = String.valueOf(orgInfo.get("cityCode")); + try { + this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, areaCode); + if (ObjectUtils.isEmpty(this.regionAqi)) + this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, cityCode); + } catch (Exception e) { + e.printStackTrace(); + } + sockets.add(this); + } + + @OnClose + public void onClose() { + sockets.remove(this); + } + + @OnMessage + public void onMessage(String message, Session session) { + System.out.println("websocket==" + message); + } + + @OnError + public void onError(Session session, Throwable error) { + } + + public void sendMessage(String message) throws Exception { + if (this.session.isOpen()) { + // synchronized (session) { + this.session.getBasicRemote().sendText(message); + // } + } + } +} diff --git a/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml index 90af3f8..f9b3ceb 100644 --- a/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml +++ b/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml @@ -2,12 +2,7 @@ <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.moral.api.mapper.HistoryHourlyMapper"> - <!-- ������������������������ --> - <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly"> - <result column="mac" property="mac"/> - <result column="time" property="time"/> - <result column="value" property="value"/> - <result column="version" property="version"/> - </resultMap> - + <select id="selectHourlyData" resultType="java.lang.String"> + SELECT `value` FROM history_hourly_${timeUnits} WHERE mac = #{mac} AND `time` = #{time} + </select> </mapper> \ No newline at end of file diff --git a/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java deleted file mode 100644 index 9b42822..0000000 --- a/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.moral.api.entity; - -import com.baomidou.mybatisplus.extension.activerecord.Model; -import java.io.Serializable; -import java.util.Date; - -import lombok.Data; -import lombok.EqualsAndHashCode; - -/** - * <p> - * ������������������ - * </p> - * - * @author moral - * @since 2021-06-28 - */ -@Data -@EqualsAndHashCode(callSuper = false) -public class HistoryHourly extends Model<HistoryHourly> { - - private static final long serialVersionUID = 1L; - - /** - * ������mac - */ - private String mac; - - /** - * ������������ - */ - private Date time; - - /** - * ������ - */ - private String value; - - /** - * ������ - */ - private Integer version; - - - @Override - protected Serializable pkVal() { - return null; - } - -} diff --git a/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java index 26ed4ca..f30a75d 100644 --- a/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java +++ b/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java @@ -3,8 +3,6 @@ import java.util.List; import java.util.Map; -import com.moral.api.entity.HistoryHourly; - /** * <p> * ������������������ Mapper ������ @@ -17,7 +15,7 @@ void createTable(String timeUnits); - void insertHistoryHourly(List<HistoryHourly> list); + void insertHistoryHourly(List<Map<String, Object>> list); Integer selectCountByTime(Map<String, Object> params); diff --git a/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java index 8f5f0b5..caa9bb8 100644 --- a/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java +++ b/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java @@ -2,7 +2,6 @@ import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; -import com.moral.api.entity.HistoryHourly; import com.moral.api.entity.Sensor; import com.moral.api.mapper.HistoryHourlyMapper; import com.moral.api.mapper.HistoryMinutelyMapper; @@ -116,14 +115,14 @@ .collect(Collectors.groupingBy(o -> (String) o.get("mac"))); //��������������������������� - List<HistoryHourly> insertData = new ArrayList<>(); + List<Map<String, Object>> insertData = new ArrayList<>(); data.forEach((key, value) -> { - HistoryHourly historyHourly = new HistoryHourly(); - historyHourly.setMac(key); - historyHourly.setTime(end); - Map<String, Object> jsonMap = new HashMap<>(); + Map<String, Object> historyHourly = new HashMap<>(); + historyHourly.put("mac", key); + historyHourly.put("time", end); + Map<String, Object> jsonMap = new HashMap<>(); Map<String, Object> map = new HashMap<>(); map.put("data", value); map.put("type", "hour"); @@ -211,8 +210,8 @@ } } }); - historyHourly.setValue(JSONObject.toJSONString(jsonMap)); - historyHourly.setVersion((Integer) value.get(0).get("version")); + historyHourly.put("version", value.get(0).get("version")); + historyHourly.put("value", JSONObject.toJSONString(jsonMap)); insertData.add(historyHourly); }); diff --git a/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml index bd1b0e3..6b4558a 100644 --- a/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml +++ b/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml @@ -7,12 +7,13 @@ `mac` VARCHAR (20) DEFAULT NULL COMMENT '������mac', `time` datetime DEFAULT NULL COMMENT '������������', `value` json DEFAULT NULL COMMENT '������', + `version` INT(11) DEFAULT NULL COMMENT '������', KEY `idx_mac_time` (`mac`,`time`) ) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '���������������' </update> <insert id="insertHistoryHourly"> - INSERT INTO history_hourly VALUES + INSERT INTO history_hourly_${timeUnits} VALUES <foreach collection="list" item="item" separator=","> (#{item.mac}, #{item.time}, #{item.value}, #{item.version}) </foreach> diff --git a/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java index 3ecc6e7..0a2af8e 100644 --- a/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java +++ b/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java @@ -36,7 +36,7 @@ @Value("${kafka.groupId.state}") private String stateGroupId; - @Bean + @Bean("insertListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(insertConsumerFactory()); @@ -46,7 +46,7 @@ return factory; } - @Bean + @Bean("stateListenerContainerFactory") public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(stateConsumerFactory()); @@ -71,8 +71,8 @@ } /* - * ������������ - * */ + * ������������ + * */ public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); diff --git a/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java deleted file mode 100644 index 6c713f3..0000000 --- a/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java +++ /dev/null @@ -1,46 +0,0 @@ -package com.moral.api.entity; - -import com.baomidou.mybatisplus.extension.activerecord.Model; - -import java.io.Serializable; -import java.util.Date; - -import lombok.Data; -import lombok.EqualsAndHashCode; - -/** - * <p> - * ������������������ - * </p> - * - * @author moral - * @since 2021-06-04 - */ -@Data -@EqualsAndHashCode(callSuper = false) -public class HistoryHourly extends Model<HistoryHourly> { - - private static final long serialVersionUID = 1L; - - /** - * ������mac - */ - private String mac; - - /** - * ������������ - */ - private Date time; - - /** - * ������ - */ - private String value; - - /** - * ������ - */ - private Integer version; - - -} diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java similarity index 97% rename from screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java rename to screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java index 211b9f5..bc9db3d 100644 --- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java +++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java @@ -22,10 +22,11 @@ import com.moral.constant.RedisConstants; /* - * ������������������ + * ��������������������� * */ +@Component @Slf4j -public class KafkaConsumer { +public class DeviceConsumer { @Autowired private HistoryMinutelyService historyMinutelyService; @@ -148,7 +149,7 @@ } //��������������������� - @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory") + @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory") public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { diff --git a/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java index 9a0cb22..b81df44 100644 --- a/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java +++ b/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java @@ -1,12 +1,6 @@ package com.moral.api.mapper; -import org.apache.ibatis.annotations.Param; - import java.util.Map; - -import com.moral.api.entity.HistoryHourly; -import com.baomidou.mybatisplus.core.mapper.BaseMapper; - /** * <p> @@ -16,7 +10,7 @@ * @author moral * @since 2021-06-04 */ -public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> { +public interface HistoryHourlyMapper { void insertHistoryHourly(Map<String, Object> params); diff --git a/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java b/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java index 8dc34b5..51dc990 100644 --- a/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java +++ b/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java @@ -2,9 +2,6 @@ import java.util.Map; -import com.moral.api.entity.HistoryHourly; -import com.baomidou.mybatisplus.extension.service.IService; - /** * <p> * ��������� ��������� @@ -13,7 +10,7 @@ * @author moral * @since 2021-06-04 */ -public interface HistoryHourlyService extends IService<HistoryHourly> { +public interface HistoryHourlyService { //������������insert void insertHistoryHourly(Map<String, Object> data); diff --git a/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java index 9d18e07..00c3443 100644 --- a/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java +++ b/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java @@ -1,11 +1,9 @@ package com.moral.api.service.impl; import com.alibaba.fastjson.JSONObject; -import com.moral.api.entity.HistoryHourly; import com.moral.api.mapper.HistoryHourlyMapper; import com.moral.api.service.DeviceService; import com.moral.api.service.HistoryHourlyService; -import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.moral.constant.Constants; import com.moral.constant.RedisConstants; import com.moral.util.DateUtils; @@ -28,7 +26,7 @@ * @since 2021-06-04 */ @Service -public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService { +public class HistoryHourlyServiceImpl implements HistoryHourlyService { @Autowired private HistoryHourlyMapper historyHourlyMapper; @@ -50,12 +48,13 @@ Map<String, Object> dataAdjust = new HashMap<>(data); Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN); + String yearAndMonth = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN); Map<String, Object> result = new HashMap<>(); result.put("mac", mac); result.put("time", time); result.put("version", version); - result.put("timeUnits", Constants.UN_ADJUST); + result.put("timeUnits", yearAndMonth + "_" + Constants.UN_ADJUST); result.put("value", JSONObject.toJSONString(data)); //������������������insert historyHourlyMapper.insertHistoryHourly(result); @@ -63,13 +62,9 @@ //������������ dataAdjust = deviceService.adjustDeviceData(dataAdjust); - HistoryHourly historyHourly = new HistoryHourly(); - historyHourly.setMac(mac); - historyHourly.setTime(time); - historyHourly.setVersion(version); - historyHourly.setValue(JSONObject.toJSONString(dataAdjust)); - //������������������insert - historyHourlyMapper.insert(historyHourly); + result.put("timeUnits", yearAndMonth); + result.put("value", JSONObject.toJSONString(dataAdjust)); + historyHourlyMapper.insertHistoryHourly(result); } } diff --git a/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml index 13bc1fe..701b196 100644 --- a/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml +++ b/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml @@ -2,14 +2,6 @@ <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.moral.api.mapper.HistoryHourlyMapper"> - <!-- ������������������������ --> - <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly"> - <result column="mac" property="mac"/> - <result column="time" property="time"/> - <result column="value" property="value"/> - <result column="version" property="version"/> - </resultMap> - <insert id="insertHistoryHourly"> INSERT INTO history_hourly_${timeUnits} VALUES (#{mac}, #{time}, #{value}, #{version}) -- Gitblit v1.8.0