From 7607bab6e868a51609164ce111c9d5e1046cd11f Mon Sep 17 00:00:00 2001
From: jinpengyong <jpy123456>
Date: Wed, 01 Sep 2021 14:44:00 +0800
Subject: [PATCH] 走航车实时websocket,小时表分表

---
 screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java |   17 -
 screen-api/src/main/java/com/moral/api/controller/CruiserController.java             |   10 
 screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java       |  109 +++++++++++++
 screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml                         |    3 
 screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java      |    8 
 screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java         |    7 
 screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java    |   21 +-
 /dev/null                                                                            |   46 -----
 screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java           |   20 +-
 screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java          |    5 
 screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java         |   29 +++
 screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java         |    4 
 screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml                      |    8 -
 screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java            |    8 
 screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java               |    7 
 screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java               |    4 
 screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java         |   86 ++++++++++
 screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java             |    5 
 screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java    |   15 -
 screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml                         |   11 -
 20 files changed, 287 insertions(+), 136 deletions(-)

diff --git a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
index cc68807..0271ad4 100644
--- a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -31,6 +31,8 @@
     private int concurrency;
     @Value("${kafka.groupId.second-data}")
     private String secondDataGroupId;
+    @Value("${kafka.groupId.cruiser-data}")
+    private String cruiserDataGroupId;
 
     @Bean
     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
@@ -54,6 +56,15 @@
         return factory;
     }
 
+    @Bean("cruiserDataListenerFactory")
+    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){
+        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(cruiserDataConsumerFactory());//���������������������
+        factory.setConcurrency(concurrency);//���������������
+        factory.getContainerProperties().setPollTimeout(1500);//������������������������������
+        return factory;
+    }
+
     /**
     * @Description: ������������������
             * @Param: []
@@ -64,6 +75,16 @@
     public ConsumerFactory<String,String> secondDataConsumerFactory(){
         Map<String, Object> commonConfig = consumerConfigs();
         Map<String, Object> secondDataConfig = secondConsumerConfigs();
+        secondDataConfig.putAll(commonConfig);
+        return new DefaultKafkaConsumerFactory<>(secondDataConfig);
+    }
+
+    /*
+    * ���������������������������
+    * */
+    public ConsumerFactory<String,String> cruiserDataConsumerFactory(){
+        Map<String, Object> commonConfig = consumerConfigs();
+        Map<String, Object> secondDataConfig = cruiserConsumerConfigs();
         secondDataConfig.putAll(commonConfig);
         return new DefaultKafkaConsumerFactory<>(secondDataConfig);
     }
@@ -81,6 +102,14 @@
         return propsMap;
     }
 
+    /*
+    * ������������������������
+    * */
+    public Map<String,Object> cruiserConsumerConfigs(){
+        Map<String, Object> propsMap = new HashMap<>();
+        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId);
+        return propsMap;
+    }
 
     /**
     * @Description: ������������
diff --git a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
index 8e6393b..9189321 100644
--- a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
@@ -1,7 +1,7 @@
 package com.moral.api.config.websocket;
 
+import com.moral.api.websocket.CruiserWebSocketServer;
 import com.moral.api.websocket.SingleDeviceServer;
-import com.moral.constant.RedisConstants;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -26,5 +26,7 @@
     @Autowired
     public void setMessageService(RedisTemplate redisTemplate){
         SingleDeviceServer.redisTemplate = redisTemplate;
+        CruiserWebSocketServer.redisTemplate = redisTemplate;
     }
+
 }
diff --git a/screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java b/screen-api/src/main/java/com/moral/api/controller/CruiserController.java
similarity index 93%
rename from screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java
rename to screen-api/src/main/java/com/moral/api/controller/CruiserController.java
index 784d432..ee0ddcd 100644
--- a/screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java
+++ b/screen-api/src/main/java/com/moral/api/controller/CruiserController.java
@@ -23,11 +23,11 @@
 import com.moral.util.WebUtils;
 
 @Slf4j
-@Api(tags = {"������������"})
+@Api(tags = {"���������"})
 @RestController
 @CrossOrigin(origins = "*", maxAge = 3600)
-@RequestMapping("/specialDevice")
-public class SpecialDeviceController {
+@RequestMapping("/cruiser")
+public class CruiserController {
 
     @Autowired
     private SpecialDeviceService specialDeviceService;
@@ -35,7 +35,7 @@
     /**
      * @return ������������������������������������
      */
-    @GetMapping("getCarsByOrg")
+    @GetMapping("selectCruisers")
     @ApiOperation(value = "������������������������������������������", notes = "���������������")
     public ResultMessage getCarsInfo() {
         List<Map<String, Object>> response = specialDeviceService.getCarsInfo();
@@ -46,7 +46,7 @@
      * @param request ������������
      * @return ������������������������������������
      */
-    @GetMapping("carTrajectory")
+    @GetMapping("cruiserTrajectory")
     @ApiOperation(value = "���������������", notes = "���������������")
     @ApiImplicitParams(value = {
             @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String"),
diff --git a/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java
deleted file mode 100644
index 045fdd6..0000000
--- a/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.moral.api.entity;
-
-import com.baomidou.mybatisplus.extension.activerecord.Model;
-import java.io.Serializable;
-import java.util.Date;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * <p>
- * ������������������
- * </p>
- *
- * @author moral
- * @since 2021-07-14
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-public class HistoryHourly extends Model<HistoryHourly> {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * ������mac
-     */
-    private String mac;
-
-    /**
-     * ������������
-     */
-    private Date time;
-
-    /**
-     * ������
-     */
-    private String value;
-
-    /**
-     * ������
-     */
-    private Integer version;
-
-
-
-
-}
diff --git a/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java b/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java
new file mode 100644
index 0000000..609d2fb
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java
@@ -0,0 +1,109 @@
+package com.moral.api.kafka.consumer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerSeekAware;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.moral.api.entity.Sensor;
+import com.moral.api.entity.SpecialDevice;
+import com.moral.api.entity.UnitConversion;
+import com.moral.api.websocket.CruiserWebSocketServer;
+import com.moral.constant.KafkaConstants;
+import com.moral.util.UnitConvertUtils;
+
+/*
+ * ������������������������
+ * */
+@Component
+@Slf4j
+public class CruiserDataConsumer implements ConsumerSeekAware {
+
+    @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "cruiserDataListenerFactory")
+    public void listenSecondSpecial(ConsumerRecord<String, String> record) throws Exception {
+        String msg = record.value();
+        Map<String, Object> data = JSONObject.parseObject(msg, Map.class);
+        CopyOnWriteArraySet<CruiserWebSocketServer> sockets = CruiserWebSocketServer.sockets;
+        for (CruiserWebSocketServer socket : sockets) {
+            String messageMac = (String) data.get("mac");
+            String mac = socket.getMac();
+            if (!mac.equalsIgnoreCase(messageMac))
+                continue;
+            //������������������
+            SpecialDevice specialDevice = socket.getSpecialDevice();
+            //������������������������
+            Map<String, Object> result = new HashMap<>();
+            result.put("time",data.get("time"));
+            //������������
+            List<Sensor> sensors = specialDevice.getVersion().getSensors();//������������������������������
+            for (Sensor sensor : sensors) {
+                String code = sensor.getCode();
+                String showUnit = sensor.getShowUnit();
+                String showUnitKey = sensor.getShowUnitKey();
+                String unitKey = sensor.getUnitKey();
+                String unit = sensor.getUnit();
+                //������������������������������������������������
+                if (data.get(code) == null) {
+                    continue;
+                }
+                Double sourceDataD = Double.valueOf(String.valueOf(data.get(code)));
+                /*BigDecimal bg = new BigDecimal(sourceDataD);
+                bg = bg.setScale(2, BigDecimal.ROUND_FLOOR);*/
+                String sourceData = String.valueOf(sourceDataD);
+                //������������
+                //������������
+                if (!unitKey.equals(showUnitKey)) {//������������������������������������������������������������
+                    String formula = sensor.getFormula();
+                    //������sensor���������������������������������������������
+                    if (ObjectUtils.isEmpty(formula)) {
+                        List<UnitConversion> unitConversions = socket.getUnitConversions();
+                        for (UnitConversion unitConversion : unitConversions) {
+                            if (unitConversion.getOriginalUnitKey().equals(unitKey) && unitConversion.getTargetUnitKey().equals(showUnitKey))
+                                formula = unitConversion.getFormula();
+                        }
+                    }
+                    //������������
+                    String resultData = UnitConvertUtils.calculate(sourceData, formula);
+                    if (resultData != null) {
+                        resultData += showUnit;
+                    } else {//���������������������������null���������������������������������������������������������������
+                        resultData = sourceData + unit;
+                    }
+                    result.put(sensor.getCode(), resultData);
+                } else {
+                    //������������
+                    sourceData = sourceData + " " + showUnit;
+                    result.put(sensor.getCode(), sourceData);
+                }
+            }
+            socket.sendMessage(JSON.toJSONString(result));
+        }
+    }
+
+    @Override
+    public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
+
+    }
+
+    @Override
+    public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+        map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition()));
+    }
+
+    @Override
+    public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+
+    }
+}
diff --git a/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
index ec74607..819edd9 100644
--- a/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
+++ b/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -1,7 +1,6 @@
 package com.moral.api.mapper;
 
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import java.util.Map;
 
 /**
  * <p>
@@ -11,6 +10,8 @@
  * @author moral
  * @since 2021-07-14
  */
-public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> {
+public interface HistoryHourlyMapper{
+
+    String selectHourlyData(Map<String,Object> params);
 
 }
diff --git a/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java b/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java
index b437717..9d1b811 100644
--- a/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java
+++ b/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -2,9 +2,6 @@
 
 import java.util.Map;
 
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.extension.service.IService;
-
 /**
  * <p>
  * ������������������ ���������
@@ -13,7 +10,7 @@
  * @author moral
  * @since 2021-07-14
  */
-public interface HistoryHourlyService extends IService<HistoryHourly> {
+public interface HistoryHourlyService{
 
     //������mac������������AQI
     Map<String,Object> getHourlyAqiByMac(String mac);
diff --git a/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java b/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
index 98bffb3..1c1b45b 100644
--- a/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
+++ b/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -86,7 +86,6 @@
         queryWrapper.select("mac", "name").in("mac", macs);
         List<Device> devices = deviceMapper.selectList(queryWrapper);
 
-
         //������������
         List<String> times = (List<String>) params.remove("times");
         //������code
@@ -100,7 +99,8 @@
         for (String start : times) {
             if ("hour".equals(type)) {
                 end = DateUtils.getDateAddDay(start, 1);
-                timeUnits = "hourly";
+                String yearAndMonth = DateUtils.dateToDateString(DateUtils.getDate(start, DateUtils.yyyy_MM_dd_EN), DateUtils.yyyyMM_EN);
+                timeUnits = "hourly_" + yearAndMonth;
                 dateFormat = "%Y-%m-%d %H";
             } else if ("day".equals(type)) {
                 end = DateUtils.getDateAddMonth(start, 1);
@@ -146,26 +146,26 @@
 
     @Override
     public Device getDeviceByMac(String mac) {
-        Map<String,Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE,mac);
+        Map<String, Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac);
         Device device = JSON.parseObject(JSON.toJSONString(deviceMap), Device.class);
         //���map������organizationId���monitorPointId������versionId
-        Map<String,Object> organizationMap = (Map<String,Object>)deviceMap.get("organization");
-        Map<String,Object> monitorPointMap = (Map<String,Object>)deviceMap.get("monitorPoint");
-        Map<String,Object> versionMap = (Map<String,Object>)deviceMap.get("version");
+        Map<String, Object> organizationMap = (Map<String, Object>) deviceMap.get("organization");
+        Map<String, Object> monitorPointMap = (Map<String, Object>) deviceMap.get("monitorPoint");
+        Map<String, Object> versionMap = (Map<String, Object>) deviceMap.get("version");
         device.setDeviceVersionId((Integer) versionMap.get("id"));
         device.setOrganizationId((Integer) organizationMap.get("id"));
         device.setMonitorPointId((Integer) monitorPointMap.get("id"));
         //������������������������������������
-        if(ObjectUtils.isEmpty(device)){
+        if (ObjectUtils.isEmpty(device)) {
             return getDeviceByMacFromDB(mac);
         }
         return device;
     }
 
-    private Device getDeviceByMacFromDB(String mac){
+    private Device getDeviceByMacFromDB(String mac) {
         QueryWrapper<Device> wrapper = new QueryWrapper<>();
-        wrapper.eq("mac",mac);
-        wrapper.eq("is_delete",Constants.NOT_DELETE);
+        wrapper.eq("mac", mac);
+        wrapper.eq("is_delete", Constants.NOT_DELETE);
         return deviceMapper.selectOne(wrapper);
     }
 
diff --git a/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
index 59a1139..58b84a5 100644
--- a/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
+++ b/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,17 +1,15 @@
 package com.moral.api.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.moral.api.entity.HistoryHourly;
 import com.moral.api.mapper.HistoryHourlyMapper;
 import com.moral.api.service.HistoryHourlyService;
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.moral.constant.Constants;
 import com.moral.util.AQIUtils;
 import com.moral.util.DateUtils;
 
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
 
 import java.util.Date;
 import java.util.HashMap;
@@ -26,24 +24,27 @@
  * @since 2021-07-14
  */
 @Service
-public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
+public class HistoryHourlyServiceImpl implements HistoryHourlyService {
 
     @Autowired
     private HistoryHourlyMapper historyHourlyMapper;
 
     @Override
     public Map<String, Object> getHourlyAqiByMac(String mac) {
-        QueryWrapper<HistoryHourly> queryWrapper = new QueryWrapper<>();
-        String time = DateUtils.dateToDateString(new Date(), DateUtils.yyyy_MM_dd_HH_EN) + ":00:00";
-        queryWrapper.eq("time", time).eq("mac", mac);
+        Date now = new Date();
+        String time = DateUtils.dateToDateString(now, DateUtils.yyyy_MM_dd_HH_EN) + ":00:00";
         //������������������
-        HistoryHourly historyHourly = historyHourlyMapper.selectOne(queryWrapper);
+        Map<String, Object> params = new HashMap<>();
+        params.put("timeUnits", DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN));
+        params.put("mac", mac);
+        params.put("time", time);
+        String value = historyHourlyMapper.selectHourlyData(params);
         Map<String, Object> result = new HashMap<>();
-        if (historyHourly == null) {
+        if (ObjectUtils.isEmpty(value)) {
             result.put("AQI", Constants.NULL_VALUE);
             return result;
         }
-        Map<String, Object> data = JSONObject.parseObject(historyHourly.getValue(), Map.class);
+        Map<String, Object> data = JSONObject.parseObject(value, Map.class);
         result.put("AQI", AQIUtils.hourlyAqi(data));
         return result;
     }
diff --git a/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java b/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java
new file mode 100644
index 0000000..fd31b01
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java
@@ -0,0 +1,86 @@
+package com.moral.api.websocket;
+
+import lombok.Data;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import com.moral.api.entity.SpecialDevice;
+import com.moral.api.entity.UnitConversion;
+import com.moral.constant.RedisConstants;
+
+//���������������websocket
+@ServerEndpoint("/cruiserWebsocket/{mac}")
+@Component
+@Data
+public class CruiserWebSocketServer {
+
+    //���������������������������������server������
+    public static CopyOnWriteArraySet<CruiserWebSocketServer> sockets = new CopyOnWriteArraySet<>();
+
+    public static RedisTemplate redisTemplate;
+
+    private Session session;
+
+    private String mac;
+
+    private SpecialDevice specialDevice;
+
+    private Map<String, Object> regionAqi;
+
+    private List<UnitConversion> unitConversions;
+
+    @OnOpen
+    public void onOpen(Session session, @PathParam("mac") String mac) {
+        this.session = session;
+        this.mac = mac;
+        this.specialDevice = (SpecialDevice) redisTemplate.opsForHash().get(RedisConstants.SPECIAL_DEVICE_INFO, mac);
+        //���������������������������AQI������������������
+        Map<String, Object> deviceInfo = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac);
+        Map<String, Object> orgInfo = (Map<String, Object>) deviceInfo.get("organization");
+        String areaCode = String.valueOf(orgInfo.get("areaCode"));
+        String cityCode = String.valueOf(orgInfo.get("cityCode"));
+        try {
+            this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, areaCode);
+            if (ObjectUtils.isEmpty(this.regionAqi))
+                this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, cityCode);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        sockets.add(this);
+    }
+
+    @OnClose
+    public void onClose() {
+        sockets.remove(this);
+    }
+
+    @OnMessage
+    public void onMessage(String message, Session session) {
+        System.out.println("websocket==" + message);
+    }
+
+    @OnError
+    public void onError(Session session, Throwable error) {
+    }
+
+    public void sendMessage(String message) throws Exception {
+        if (this.session.isOpen()) {
+            //  synchronized (session) {
+            this.session.getBasicRemote().sendText(message);
+            // }
+        }
+    }
+}
diff --git a/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml
index 90af3f8..f9b3ceb 100644
--- a/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml
+++ b/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -2,12 +2,7 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="com.moral.api.mapper.HistoryHourlyMapper">
 
-    <!-- ������������������������ -->
-    <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly">
-        <result column="mac" property="mac"/>
-        <result column="time" property="time"/>
-        <result column="value" property="value"/>
-        <result column="version" property="version"/>
-    </resultMap>
-
+    <select id="selectHourlyData" resultType="java.lang.String">
+        SELECT `value` FROM history_hourly_${timeUnits} WHERE mac = #{mac} AND `time` = #{time}
+    </select>
 </mapper>
\ No newline at end of file
diff --git a/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java
deleted file mode 100644
index 9b42822..0000000
--- a/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.moral.api.entity;
-
-import com.baomidou.mybatisplus.extension.activerecord.Model;
-import java.io.Serializable;
-import java.util.Date;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * <p>
- * ������������������
- * </p>
- *
- * @author moral
- * @since 2021-06-28
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-public class HistoryHourly extends Model<HistoryHourly> {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * ������mac
-     */
-    private String mac;
-
-    /**
-     * ������������
-     */
-    private Date time;
-
-    /**
-     * ������
-     */
-    private String value;
-
-    /**
-     * ������
-     */
-    private Integer version;
-
-
-    @Override
-    protected Serializable pkVal() {
-        return null;
-    }
-
-}
diff --git a/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
index 26ed4ca..f30a75d 100644
--- a/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
+++ b/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -3,8 +3,6 @@
 import java.util.List;
 import java.util.Map;
 
-import com.moral.api.entity.HistoryHourly;
-
 /**
  * <p>
  * ������������������ Mapper ������
@@ -17,7 +15,7 @@
 
     void createTable(String timeUnits);
 
-    void insertHistoryHourly(List<HistoryHourly> list);
+    void insertHistoryHourly(List<Map<String, Object>> list);
 
     Integer selectCountByTime(Map<String, Object> params);
 
diff --git a/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
index 8f5f0b5..caa9bb8 100644
--- a/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
+++ b/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -2,7 +2,6 @@
 
 import com.alibaba.fastjson.JSONObject;
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.moral.api.entity.HistoryHourly;
 import com.moral.api.entity.Sensor;
 import com.moral.api.mapper.HistoryHourlyMapper;
 import com.moral.api.mapper.HistoryMinutelyMapper;
@@ -116,14 +115,14 @@
                 .collect(Collectors.groupingBy(o -> (String) o.get("mac")));
 
         //���������������������������
-        List<HistoryHourly> insertData = new ArrayList<>();
+        List<Map<String, Object>> insertData = new ArrayList<>();
 
         data.forEach((key, value) -> {
-            HistoryHourly historyHourly = new HistoryHourly();
-            historyHourly.setMac(key);
-            historyHourly.setTime(end);
-            Map<String, Object> jsonMap = new HashMap<>();
+            Map<String, Object> historyHourly = new HashMap<>();
+            historyHourly.put("mac", key);
+            historyHourly.put("time", end);
 
+            Map<String, Object> jsonMap = new HashMap<>();
             Map<String, Object> map = new HashMap<>();
             map.put("data", value);
             map.put("type", "hour");
@@ -211,8 +210,8 @@
                     }
                 }
             });
-            historyHourly.setValue(JSONObject.toJSONString(jsonMap));
-            historyHourly.setVersion((Integer) value.get(0).get("version"));
+            historyHourly.put("version", value.get(0).get("version"));
+            historyHourly.put("value", JSONObject.toJSONString(jsonMap));
             insertData.add(historyHourly);
         });
 
diff --git a/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
index bd1b0e3..6b4558a 100644
--- a/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
+++ b/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -7,12 +7,13 @@
             `mac` VARCHAR (20) DEFAULT NULL COMMENT '������mac',
             `time` datetime DEFAULT NULL COMMENT '������������',
             `value` json DEFAULT NULL COMMENT '������',
+            `version` INT(11) DEFAULT NULL COMMENT '������',
             KEY `idx_mac_time` (`mac`,`time`)
             ) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '���������������'
     </update>
 
     <insert id="insertHistoryHourly">
-        INSERT INTO history_hourly VALUES
+        INSERT INTO history_hourly_${timeUnits} VALUES
         <foreach collection="list" item="item" separator=",">
             (#{item.mac}, #{item.time}, #{item.value}, #{item.version})
         </foreach>
diff --git a/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
index 3ecc6e7..0a2af8e 100644
--- a/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
+++ b/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -36,7 +36,7 @@
     @Value("${kafka.groupId.state}")
     private String stateGroupId;
 
-    @Bean
+    @Bean("insertListenerContainerFactory")
     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() {
         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(insertConsumerFactory());
@@ -46,7 +46,7 @@
         return factory;
     }
 
-    @Bean
+    @Bean("stateListenerContainerFactory")
     public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() {
         ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
         factory.setConsumerFactory(stateConsumerFactory());
@@ -71,8 +71,8 @@
     }
 
     /*
-    * ������������
-    * */
+     * ������������
+     * */
     public Map<String, Object> consumerConfigs() {
         Map<String, Object> propsMap = new HashMap<>();
         propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
diff --git a/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java
deleted file mode 100644
index 6c713f3..0000000
--- a/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.moral.api.entity;
-
-import com.baomidou.mybatisplus.extension.activerecord.Model;
-
-import java.io.Serializable;
-import java.util.Date;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * <p>
- * ������������������
- * </p>
- *
- * @author moral
- * @since 2021-06-04
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-public class HistoryHourly extends Model<HistoryHourly> {
-
-    private static final long serialVersionUID = 1L;
-
-    /**
-     * ������mac
-     */
-    private String mac;
-
-    /**
-     * ������������
-     */
-    private Date time;
-
-    /**
-     * ������
-     */
-    private String value;
-
-    /**
-     * ������
-     */
-    private Integer version;
-
-
-}
diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
similarity index 97%
rename from screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
rename to screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
index 211b9f5..bc9db3d 100644
--- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
+++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
@@ -22,10 +22,11 @@
 import com.moral.constant.RedisConstants;
 
 /*
- * ������������������
+ * ���������������������
  * */
+@Component
 @Slf4j
-public class KafkaConsumer {
+public class DeviceConsumer {
 
     @Autowired
     private HistoryMinutelyService historyMinutelyService;
@@ -148,7 +149,7 @@
     }
 
     //���������������������
-    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
+    @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory")
     public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) {
         String msg = record.value();
         try {
diff --git a/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
index 9a0cb22..b81df44 100644
--- a/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
+++ b/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -1,12 +1,6 @@
 package com.moral.api.mapper;
 
-import org.apache.ibatis.annotations.Param;
-
 import java.util.Map;
-
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-
 
 /**
  * <p>
@@ -16,7 +10,7 @@
  * @author moral
  * @since 2021-06-04
  */
-public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> {
+public interface HistoryHourlyMapper {
 
     void insertHistoryHourly(Map<String, Object> params);
 
diff --git a/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java b/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java
index 8dc34b5..51dc990 100644
--- a/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java
+++ b/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -2,9 +2,6 @@
 
 import java.util.Map;
 
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.extension.service.IService;
-
 /**
  * <p>
  * ��������� ���������
@@ -13,7 +10,7 @@
  * @author moral
  * @since 2021-06-04
  */
-public interface HistoryHourlyService extends IService<HistoryHourly> {
+public interface HistoryHourlyService {
 
     //������������insert
     void insertHistoryHourly(Map<String, Object> data);
diff --git a/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
index 9d18e07..00c3443 100644
--- a/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
+++ b/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,11 +1,9 @@
 package com.moral.api.service.impl;
 
 import com.alibaba.fastjson.JSONObject;
-import com.moral.api.entity.HistoryHourly;
 import com.moral.api.mapper.HistoryHourlyMapper;
 import com.moral.api.service.DeviceService;
 import com.moral.api.service.HistoryHourlyService;
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.moral.constant.Constants;
 import com.moral.constant.RedisConstants;
 import com.moral.util.DateUtils;
@@ -28,7 +26,7 @@
  * @since 2021-06-04
  */
 @Service
-public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
+public class HistoryHourlyServiceImpl implements HistoryHourlyService {
 
     @Autowired
     private HistoryHourlyMapper historyHourlyMapper;
@@ -50,12 +48,13 @@
 
         Map<String, Object> dataAdjust = new HashMap<>(data);
         Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
+        String yearAndMonth = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN);
 
         Map<String, Object> result = new HashMap<>();
         result.put("mac", mac);
         result.put("time", time);
         result.put("version", version);
-        result.put("timeUnits", Constants.UN_ADJUST);
+        result.put("timeUnits", yearAndMonth + "_" + Constants.UN_ADJUST);
         result.put("value", JSONObject.toJSONString(data));
         //������������������insert
         historyHourlyMapper.insertHistoryHourly(result);
@@ -63,13 +62,9 @@
         //������������
         dataAdjust = deviceService.adjustDeviceData(dataAdjust);
 
-        HistoryHourly historyHourly = new HistoryHourly();
-        historyHourly.setMac(mac);
-        historyHourly.setTime(time);
-        historyHourly.setVersion(version);
-        historyHourly.setValue(JSONObject.toJSONString(dataAdjust));
-
         //������������������insert
-        historyHourlyMapper.insert(historyHourly);
+        result.put("timeUnits", yearAndMonth);
+        result.put("value", JSONObject.toJSONString(dataAdjust));
+        historyHourlyMapper.insertHistoryHourly(result);
     }
 }
diff --git a/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml
index 13bc1fe..701b196 100644
--- a/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml
+++ b/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -2,14 +2,6 @@
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="com.moral.api.mapper.HistoryHourlyMapper">
 
-    <!-- ������������������������ -->
-    <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly">
-        <result column="mac" property="mac"/>
-        <result column="time" property="time"/>
-        <result column="value" property="value"/>
-        <result column="version" property="version"/>
-    </resultMap>
-
     <insert id="insertHistoryHourly">
         INSERT INTO history_hourly_${timeUnits}
         VALUES (#{mac}, #{time}, #{value}, #{version})

--
Gitblit v1.8.0