From 7607bab6e868a51609164ce111c9d5e1046cd11f Mon Sep 17 00:00:00 2001
From: jinpengyong <jpy123456>
Date: Wed, 01 Sep 2021 14:44:00 +0800
Subject: [PATCH] 走航车实时websocket,小时表分表
---
screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java | 17 -
screen-api/src/main/java/com/moral/api/controller/CruiserController.java | 10
screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java | 109 +++++++++++++
screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml | 3
screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java | 8
screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java | 7
screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java | 21 +-
/dev/null | 46 -----
screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java | 20 +-
screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java | 5
screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java | 29 +++
screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java | 4
screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml | 8 -
screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java | 8
screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java | 7
screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java | 4
screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java | 86 ++++++++++
screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java | 5
screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java | 15 -
screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml | 11 -
20 files changed, 287 insertions(+), 136 deletions(-)
diff --git a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
index cc68807..0271ad4 100644
--- a/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -31,6 +31,8 @@
private int concurrency;
@Value("${kafka.groupId.second-data}")
private String secondDataGroupId;
+ @Value("${kafka.groupId.cruiser-data}")
+ private String cruiserDataGroupId;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
@@ -54,6 +56,15 @@
return factory;
}
+ @Bean("cruiserDataListenerFactory")
+ public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> cruiserDataListenerFactory(){
+ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(cruiserDataConsumerFactory());//���������������������
+ factory.setConcurrency(concurrency);//���������������
+ factory.getContainerProperties().setPollTimeout(1500);//������������������������������
+ return factory;
+ }
+
/**
* @Description: ������������������
* @Param: []
@@ -64,6 +75,16 @@
public ConsumerFactory<String,String> secondDataConsumerFactory(){
Map<String, Object> commonConfig = consumerConfigs();
Map<String, Object> secondDataConfig = secondConsumerConfigs();
+ secondDataConfig.putAll(commonConfig);
+ return new DefaultKafkaConsumerFactory<>(secondDataConfig);
+ }
+
+ /*
+ * ���������������������������
+ * */
+ public ConsumerFactory<String,String> cruiserDataConsumerFactory(){
+ Map<String, Object> commonConfig = consumerConfigs();
+ Map<String, Object> secondDataConfig = cruiserConsumerConfigs();
secondDataConfig.putAll(commonConfig);
return new DefaultKafkaConsumerFactory<>(secondDataConfig);
}
@@ -81,6 +102,14 @@
return propsMap;
}
+ /*
+ * ������������������������
+ * */
+ public Map<String,Object> cruiserConsumerConfigs(){
+ Map<String, Object> propsMap = new HashMap<>();
+ propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, cruiserDataGroupId);
+ return propsMap;
+ }
/**
* @Description: ������������
diff --git a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
index 8e6393b..9189321 100644
--- a/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
+++ b/screen-api/src/main/java/com/moral/api/config/websocket/WebSocketConfig.java
@@ -1,7 +1,7 @@
package com.moral.api.config.websocket;
+import com.moral.api.websocket.CruiserWebSocketServer;
import com.moral.api.websocket.SingleDeviceServer;
-import com.moral.constant.RedisConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -26,5 +26,7 @@
@Autowired
public void setMessageService(RedisTemplate redisTemplate){
SingleDeviceServer.redisTemplate = redisTemplate;
+ CruiserWebSocketServer.redisTemplate = redisTemplate;
}
+
}
diff --git a/screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java b/screen-api/src/main/java/com/moral/api/controller/CruiserController.java
similarity index 93%
rename from screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java
rename to screen-api/src/main/java/com/moral/api/controller/CruiserController.java
index 784d432..ee0ddcd 100644
--- a/screen-api/src/main/java/com/moral/api/controller/SpecialDeviceController.java
+++ b/screen-api/src/main/java/com/moral/api/controller/CruiserController.java
@@ -23,11 +23,11 @@
import com.moral.util.WebUtils;
@Slf4j
-@Api(tags = {"������������"})
+@Api(tags = {"���������"})
@RestController
@CrossOrigin(origins = "*", maxAge = 3600)
-@RequestMapping("/specialDevice")
-public class SpecialDeviceController {
+@RequestMapping("/cruiser")
+public class CruiserController {
@Autowired
private SpecialDeviceService specialDeviceService;
@@ -35,7 +35,7 @@
/**
* @return ������������������������������������
*/
- @GetMapping("getCarsByOrg")
+ @GetMapping("selectCruisers")
@ApiOperation(value = "������������������������������������������", notes = "���������������")
public ResultMessage getCarsInfo() {
List<Map<String, Object>> response = specialDeviceService.getCarsInfo();
@@ -46,7 +46,7 @@
* @param request ������������
* @return ������������������������������������
*/
- @GetMapping("carTrajectory")
+ @GetMapping("cruiserTrajectory")
@ApiOperation(value = "���������������", notes = "���������������")
@ApiImplicitParams(value = {
@ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String"),
diff --git a/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java
deleted file mode 100644
index 045fdd6..0000000
--- a/screen-api/src/main/java/com/moral/api/entity/HistoryHourly.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package com.moral.api.entity;
-
-import com.baomidou.mybatisplus.extension.activerecord.Model;
-import java.io.Serializable;
-import java.util.Date;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * <p>
- * ������������������
- * </p>
- *
- * @author moral
- * @since 2021-07-14
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-public class HistoryHourly extends Model<HistoryHourly> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * ������mac
- */
- private String mac;
-
- /**
- * ������������
- */
- private Date time;
-
- /**
- * ������
- */
- private String value;
-
- /**
- * ������
- */
- private Integer version;
-
-
-
-
-}
diff --git a/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java b/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java
new file mode 100644
index 0000000..609d2fb
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/kafka/consumer/CruiserDataConsumer.java
@@ -0,0 +1,109 @@
+package com.moral.api.kafka.consumer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerSeekAware;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.moral.api.entity.Sensor;
+import com.moral.api.entity.SpecialDevice;
+import com.moral.api.entity.UnitConversion;
+import com.moral.api.websocket.CruiserWebSocketServer;
+import com.moral.constant.KafkaConstants;
+import com.moral.util.UnitConvertUtils;
+
+/*
+ * ������������������������
+ * */
+@Component
+@Slf4j
+public class CruiserDataConsumer implements ConsumerSeekAware {
+
+ @KafkaListener(topics = KafkaConstants.CRUISER_TOPIC_SECOND, containerFactory = "cruiserDataListenerFactory")
+ public void listenSecondSpecial(ConsumerRecord<String, String> record) throws Exception {
+ String msg = record.value();
+ Map<String, Object> data = JSONObject.parseObject(msg, Map.class);
+ CopyOnWriteArraySet<CruiserWebSocketServer> sockets = CruiserWebSocketServer.sockets;
+ for (CruiserWebSocketServer socket : sockets) {
+ String messageMac = (String) data.get("mac");
+ String mac = socket.getMac();
+ if (!mac.equalsIgnoreCase(messageMac))
+ continue;
+ //������������������
+ SpecialDevice specialDevice = socket.getSpecialDevice();
+ //������������������������
+ Map<String, Object> result = new HashMap<>();
+ result.put("time",data.get("time"));
+ //������������
+ List<Sensor> sensors = specialDevice.getVersion().getSensors();//������������������������������
+ for (Sensor sensor : sensors) {
+ String code = sensor.getCode();
+ String showUnit = sensor.getShowUnit();
+ String showUnitKey = sensor.getShowUnitKey();
+ String unitKey = sensor.getUnitKey();
+ String unit = sensor.getUnit();
+ //������������������������������������������������
+ if (data.get(code) == null) {
+ continue;
+ }
+ Double sourceDataD = Double.valueOf(String.valueOf(data.get(code)));
+ /*BigDecimal bg = new BigDecimal(sourceDataD);
+ bg = bg.setScale(2, BigDecimal.ROUND_FLOOR);*/
+ String sourceData = String.valueOf(sourceDataD);
+ //������������
+ //������������
+ if (!unitKey.equals(showUnitKey)) {//������������������������������������������������������������
+ String formula = sensor.getFormula();
+ //������sensor���������������������������������������������
+ if (ObjectUtils.isEmpty(formula)) {
+ List<UnitConversion> unitConversions = socket.getUnitConversions();
+ for (UnitConversion unitConversion : unitConversions) {
+ if (unitConversion.getOriginalUnitKey().equals(unitKey) && unitConversion.getTargetUnitKey().equals(showUnitKey))
+ formula = unitConversion.getFormula();
+ }
+ }
+ //������������
+ String resultData = UnitConvertUtils.calculate(sourceData, formula);
+ if (resultData != null) {
+ resultData += showUnit;
+ } else {//���������������������������null���������������������������������������������������������������
+ resultData = sourceData + unit;
+ }
+ result.put(sensor.getCode(), resultData);
+ } else {
+ //������������
+ sourceData = sourceData + " " + showUnit;
+ result.put(sensor.getCode(), sourceData);
+ }
+ }
+ socket.sendMessage(JSON.toJSONString(result));
+ }
+ }
+
+ @Override
+ public void registerSeekCallback(ConsumerSeekCallback consumerSeekCallback) {
+
+ }
+
+ @Override
+ public void onPartitionsAssigned(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+ map.forEach((t, o) -> consumerSeekCallback.seekToEnd(t.topic(), t.partition()));
+ }
+
+ @Override
+ public void onIdleContainer(Map<TopicPartition, Long> map, ConsumerSeekCallback consumerSeekCallback) {
+
+ }
+}
diff --git a/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
index ec74607..819edd9 100644
--- a/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
+++ b/screen-api/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -1,7 +1,6 @@
package com.moral.api.mapper;
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import java.util.Map;
/**
* <p>
@@ -11,6 +10,8 @@
* @author moral
* @since 2021-07-14
*/
-public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> {
+public interface HistoryHourlyMapper{
+
+ String selectHourlyData(Map<String,Object> params);
}
diff --git a/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java b/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java
index b437717..9d1b811 100644
--- a/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java
+++ b/screen-api/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -2,9 +2,6 @@
import java.util.Map;
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.extension.service.IService;
-
/**
* <p>
* ������������������ ���������
@@ -13,7 +10,7 @@
* @author moral
* @since 2021-07-14
*/
-public interface HistoryHourlyService extends IService<HistoryHourly> {
+public interface HistoryHourlyService{
//������mac������������AQI
Map<String,Object> getHourlyAqiByMac(String mac);
diff --git a/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java b/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
index 98bffb3..1c1b45b 100644
--- a/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
+++ b/screen-api/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -86,7 +86,6 @@
queryWrapper.select("mac", "name").in("mac", macs);
List<Device> devices = deviceMapper.selectList(queryWrapper);
-
//������������
List<String> times = (List<String>) params.remove("times");
//������code
@@ -100,7 +99,8 @@
for (String start : times) {
if ("hour".equals(type)) {
end = DateUtils.getDateAddDay(start, 1);
- timeUnits = "hourly";
+ String yearAndMonth = DateUtils.dateToDateString(DateUtils.getDate(start, DateUtils.yyyy_MM_dd_EN), DateUtils.yyyyMM_EN);
+ timeUnits = "hourly_" + yearAndMonth;
dateFormat = "%Y-%m-%d %H";
} else if ("day".equals(type)) {
end = DateUtils.getDateAddMonth(start, 1);
@@ -146,26 +146,26 @@
@Override
public Device getDeviceByMac(String mac) {
- Map<String,Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE,mac);
+ Map<String, Object> deviceMap = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac);
Device device = JSON.parseObject(JSON.toJSONString(deviceMap), Device.class);
//���map������organizationId���monitorPointId������versionId
- Map<String,Object> organizationMap = (Map<String,Object>)deviceMap.get("organization");
- Map<String,Object> monitorPointMap = (Map<String,Object>)deviceMap.get("monitorPoint");
- Map<String,Object> versionMap = (Map<String,Object>)deviceMap.get("version");
+ Map<String, Object> organizationMap = (Map<String, Object>) deviceMap.get("organization");
+ Map<String, Object> monitorPointMap = (Map<String, Object>) deviceMap.get("monitorPoint");
+ Map<String, Object> versionMap = (Map<String, Object>) deviceMap.get("version");
device.setDeviceVersionId((Integer) versionMap.get("id"));
device.setOrganizationId((Integer) organizationMap.get("id"));
device.setMonitorPointId((Integer) monitorPointMap.get("id"));
//������������������������������������
- if(ObjectUtils.isEmpty(device)){
+ if (ObjectUtils.isEmpty(device)) {
return getDeviceByMacFromDB(mac);
}
return device;
}
- private Device getDeviceByMacFromDB(String mac){
+ private Device getDeviceByMacFromDB(String mac) {
QueryWrapper<Device> wrapper = new QueryWrapper<>();
- wrapper.eq("mac",mac);
- wrapper.eq("is_delete",Constants.NOT_DELETE);
+ wrapper.eq("mac", mac);
+ wrapper.eq("is_delete", Constants.NOT_DELETE);
return deviceMapper.selectOne(wrapper);
}
diff --git a/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
index 59a1139..58b84a5 100644
--- a/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
+++ b/screen-api/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,17 +1,15 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
-import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.moral.api.entity.HistoryHourly;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.service.HistoryHourlyService;
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.constant.Constants;
import com.moral.util.AQIUtils;
import com.moral.util.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
import java.util.Date;
import java.util.HashMap;
@@ -26,24 +24,27 @@
* @since 2021-07-14
*/
@Service
-public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
+public class HistoryHourlyServiceImpl implements HistoryHourlyService {
@Autowired
private HistoryHourlyMapper historyHourlyMapper;
@Override
public Map<String, Object> getHourlyAqiByMac(String mac) {
- QueryWrapper<HistoryHourly> queryWrapper = new QueryWrapper<>();
- String time = DateUtils.dateToDateString(new Date(), DateUtils.yyyy_MM_dd_HH_EN) + ":00:00";
- queryWrapper.eq("time", time).eq("mac", mac);
+ Date now = new Date();
+ String time = DateUtils.dateToDateString(now, DateUtils.yyyy_MM_dd_HH_EN) + ":00:00";
//������������������
- HistoryHourly historyHourly = historyHourlyMapper.selectOne(queryWrapper);
+ Map<String, Object> params = new HashMap<>();
+ params.put("timeUnits", DateUtils.dateToDateString(now, DateUtils.yyyyMM_EN));
+ params.put("mac", mac);
+ params.put("time", time);
+ String value = historyHourlyMapper.selectHourlyData(params);
Map<String, Object> result = new HashMap<>();
- if (historyHourly == null) {
+ if (ObjectUtils.isEmpty(value)) {
result.put("AQI", Constants.NULL_VALUE);
return result;
}
- Map<String, Object> data = JSONObject.parseObject(historyHourly.getValue(), Map.class);
+ Map<String, Object> data = JSONObject.parseObject(value, Map.class);
result.put("AQI", AQIUtils.hourlyAqi(data));
return result;
}
diff --git a/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java b/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java
new file mode 100644
index 0000000..fd31b01
--- /dev/null
+++ b/screen-api/src/main/java/com/moral/api/websocket/CruiserWebSocketServer.java
@@ -0,0 +1,86 @@
+package com.moral.api.websocket;
+
+import lombok.Data;
+import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.util.ObjectUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import javax.websocket.OnClose;
+import javax.websocket.OnError;
+import javax.websocket.OnMessage;
+import javax.websocket.OnOpen;
+import javax.websocket.Session;
+import javax.websocket.server.PathParam;
+import javax.websocket.server.ServerEndpoint;
+
+import com.moral.api.entity.SpecialDevice;
+import com.moral.api.entity.UnitConversion;
+import com.moral.constant.RedisConstants;
+
+//���������������websocket
+@ServerEndpoint("/cruiserWebsocket/{mac}")
+@Component
+@Data
+public class CruiserWebSocketServer {
+
+ //���������������������������������server������
+ public static CopyOnWriteArraySet<CruiserWebSocketServer> sockets = new CopyOnWriteArraySet<>();
+
+ public static RedisTemplate redisTemplate;
+
+ private Session session;
+
+ private String mac;
+
+ private SpecialDevice specialDevice;
+
+ private Map<String, Object> regionAqi;
+
+ private List<UnitConversion> unitConversions;
+
+ @OnOpen
+ public void onOpen(Session session, @PathParam("mac") String mac) {
+ this.session = session;
+ this.mac = mac;
+ this.specialDevice = (SpecialDevice) redisTemplate.opsForHash().get(RedisConstants.SPECIAL_DEVICE_INFO, mac);
+ //���������������������������AQI������������������
+ Map<String, Object> deviceInfo = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac);
+ Map<String, Object> orgInfo = (Map<String, Object>) deviceInfo.get("organization");
+ String areaCode = String.valueOf(orgInfo.get("areaCode"));
+ String cityCode = String.valueOf(orgInfo.get("cityCode"));
+ try {
+ this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, areaCode);
+ if (ObjectUtils.isEmpty(this.regionAqi))
+ this.regionAqi = (Map<String, Object>) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, cityCode);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ sockets.add(this);
+ }
+
+ @OnClose
+ public void onClose() {
+ sockets.remove(this);
+ }
+
+ @OnMessage
+ public void onMessage(String message, Session session) {
+ System.out.println("websocket==" + message);
+ }
+
+ @OnError
+ public void onError(Session session, Throwable error) {
+ }
+
+ public void sendMessage(String message) throws Exception {
+ if (this.session.isOpen()) {
+ // synchronized (session) {
+ this.session.getBasicRemote().sendText(message);
+ // }
+ }
+ }
+}
diff --git a/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml
index 90af3f8..f9b3ceb 100644
--- a/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml
+++ b/screen-api/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -2,12 +2,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.HistoryHourlyMapper">
- <!-- ������������������������ -->
- <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly">
- <result column="mac" property="mac"/>
- <result column="time" property="time"/>
- <result column="value" property="value"/>
- <result column="version" property="version"/>
- </resultMap>
-
+ <select id="selectHourlyData" resultType="java.lang.String">
+ SELECT `value` FROM history_hourly_${timeUnits} WHERE mac = #{mac} AND `time` = #{time}
+ </select>
</mapper>
\ No newline at end of file
diff --git a/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java
deleted file mode 100644
index 9b42822..0000000
--- a/screen-job/src/main/java/com/moral/api/entity/HistoryHourly.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package com.moral.api.entity;
-
-import com.baomidou.mybatisplus.extension.activerecord.Model;
-import java.io.Serializable;
-import java.util.Date;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * <p>
- * ������������������
- * </p>
- *
- * @author moral
- * @since 2021-06-28
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-public class HistoryHourly extends Model<HistoryHourly> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * ������mac
- */
- private String mac;
-
- /**
- * ������������
- */
- private Date time;
-
- /**
- * ������
- */
- private String value;
-
- /**
- * ������
- */
- private Integer version;
-
-
- @Override
- protected Serializable pkVal() {
- return null;
- }
-
-}
diff --git a/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
index 26ed4ca..f30a75d 100644
--- a/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
+++ b/screen-job/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -3,8 +3,6 @@
import java.util.List;
import java.util.Map;
-import com.moral.api.entity.HistoryHourly;
-
/**
* <p>
* ������������������ Mapper ������
@@ -17,7 +15,7 @@
void createTable(String timeUnits);
- void insertHistoryHourly(List<HistoryHourly> list);
+ void insertHistoryHourly(List<Map<String, Object>> list);
Integer selectCountByTime(Map<String, Object> params);
diff --git a/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
index 8f5f0b5..caa9bb8 100644
--- a/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
+++ b/screen-job/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -2,7 +2,6 @@
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
-import com.moral.api.entity.HistoryHourly;
import com.moral.api.entity.Sensor;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.mapper.HistoryMinutelyMapper;
@@ -116,14 +115,14 @@
.collect(Collectors.groupingBy(o -> (String) o.get("mac")));
//���������������������������
- List<HistoryHourly> insertData = new ArrayList<>();
+ List<Map<String, Object>> insertData = new ArrayList<>();
data.forEach((key, value) -> {
- HistoryHourly historyHourly = new HistoryHourly();
- historyHourly.setMac(key);
- historyHourly.setTime(end);
- Map<String, Object> jsonMap = new HashMap<>();
+ Map<String, Object> historyHourly = new HashMap<>();
+ historyHourly.put("mac", key);
+ historyHourly.put("time", end);
+ Map<String, Object> jsonMap = new HashMap<>();
Map<String, Object> map = new HashMap<>();
map.put("data", value);
map.put("type", "hour");
@@ -211,8 +210,8 @@
}
}
});
- historyHourly.setValue(JSONObject.toJSONString(jsonMap));
- historyHourly.setVersion((Integer) value.get(0).get("version"));
+ historyHourly.put("version", value.get(0).get("version"));
+ historyHourly.put("value", JSONObject.toJSONString(jsonMap));
insertData.add(historyHourly);
});
diff --git a/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
index bd1b0e3..6b4558a 100644
--- a/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
+++ b/screen-job/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -7,12 +7,13 @@
`mac` VARCHAR (20) DEFAULT NULL COMMENT '������mac',
`time` datetime DEFAULT NULL COMMENT '������������',
`value` json DEFAULT NULL COMMENT '������',
+ `version` INT(11) DEFAULT NULL COMMENT '������',
KEY `idx_mac_time` (`mac`,`time`)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COMMENT '���������������'
</update>
<insert id="insertHistoryHourly">
- INSERT INTO history_hourly VALUES
+ INSERT INTO history_hourly_${timeUnits} VALUES
<foreach collection="list" item="item" separator=",">
(#{item.mac}, #{item.time}, #{item.value}, #{item.version})
</foreach>
diff --git a/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java b/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
index 3ecc6e7..0a2af8e 100644
--- a/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
+++ b/screen-manage/src/main/java/com/moral/api/config/kafka/KafkaConsumerConfig.java
@@ -36,7 +36,7 @@
@Value("${kafka.groupId.state}")
private String stateGroupId;
- @Bean
+ @Bean("insertListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> insertListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(insertConsumerFactory());
@@ -46,7 +46,7 @@
return factory;
}
- @Bean
+ @Bean("stateListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> stateListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(stateConsumerFactory());
@@ -71,8 +71,8 @@
}
/*
- * ������������
- * */
+ * ������������
+ * */
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
diff --git a/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java b/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java
deleted file mode 100644
index 6c713f3..0000000
--- a/screen-manage/src/main/java/com/moral/api/entity/HistoryHourly.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package com.moral.api.entity;
-
-import com.baomidou.mybatisplus.extension.activerecord.Model;
-
-import java.io.Serializable;
-import java.util.Date;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * <p>
- * ������������������
- * </p>
- *
- * @author moral
- * @since 2021-06-04
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-public class HistoryHourly extends Model<HistoryHourly> {
-
- private static final long serialVersionUID = 1L;
-
- /**
- * ������mac
- */
- private String mac;
-
- /**
- * ������������
- */
- private Date time;
-
- /**
- * ������
- */
- private String value;
-
- /**
- * ������
- */
- private Integer version;
-
-
-}
diff --git a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
similarity index 97%
rename from screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
rename to screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
index 211b9f5..bc9db3d 100644
--- a/screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaConsumer.java
+++ b/screen-manage/src/main/java/com/moral/api/kafka/consumer/DeviceConsumer.java
@@ -22,10 +22,11 @@
import com.moral.constant.RedisConstants;
/*
- * ������������������
+ * ���������������������
* */
+@Component
@Slf4j
-public class KafkaConsumer {
+public class DeviceConsumer {
@Autowired
private HistoryMinutelyService historyMinutelyService;
@@ -148,7 +149,7 @@
}
//���������������������
- @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "kafkaListenerContainerFactory")
+ @KafkaListener(topics = KafkaConstants.UAV_TOPIC_SECOND, groupId = KafkaConstants.GROUP_INSERT, containerFactory = "insertListenerContainerFactory")
public void listenSecondSpecial(ConsumerRecord<String, String> record, Acknowledgment ack) {
String msg = record.value();
try {
diff --git a/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java b/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
index 9a0cb22..b81df44 100644
--- a/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
+++ b/screen-manage/src/main/java/com/moral/api/mapper/HistoryHourlyMapper.java
@@ -1,12 +1,6 @@
package com.moral.api.mapper;
-import org.apache.ibatis.annotations.Param;
-
import java.util.Map;
-
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-
/**
* <p>
@@ -16,7 +10,7 @@
* @author moral
* @since 2021-06-04
*/
-public interface HistoryHourlyMapper extends BaseMapper<HistoryHourly> {
+public interface HistoryHourlyMapper {
void insertHistoryHourly(Map<String, Object> params);
diff --git a/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java b/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java
index 8dc34b5..51dc990 100644
--- a/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java
+++ b/screen-manage/src/main/java/com/moral/api/service/HistoryHourlyService.java
@@ -2,9 +2,6 @@
import java.util.Map;
-import com.moral.api.entity.HistoryHourly;
-import com.baomidou.mybatisplus.extension.service.IService;
-
/**
* <p>
* ��������� ���������
@@ -13,7 +10,7 @@
* @author moral
* @since 2021-06-04
*/
-public interface HistoryHourlyService extends IService<HistoryHourly> {
+public interface HistoryHourlyService {
//������������insert
void insertHistoryHourly(Map<String, Object> data);
diff --git a/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java b/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
index 9d18e07..00c3443 100644
--- a/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
+++ b/screen-manage/src/main/java/com/moral/api/service/impl/HistoryHourlyServiceImpl.java
@@ -1,11 +1,9 @@
package com.moral.api.service.impl;
import com.alibaba.fastjson.JSONObject;
-import com.moral.api.entity.HistoryHourly;
import com.moral.api.mapper.HistoryHourlyMapper;
import com.moral.api.service.DeviceService;
import com.moral.api.service.HistoryHourlyService;
-import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.moral.constant.Constants;
import com.moral.constant.RedisConstants;
import com.moral.util.DateUtils;
@@ -28,7 +26,7 @@
* @since 2021-06-04
*/
@Service
-public class HistoryHourlyServiceImpl extends ServiceImpl<HistoryHourlyMapper, HistoryHourly> implements HistoryHourlyService {
+public class HistoryHourlyServiceImpl implements HistoryHourlyService {
@Autowired
private HistoryHourlyMapper historyHourlyMapper;
@@ -50,12 +48,13 @@
Map<String, Object> dataAdjust = new HashMap<>(data);
Date time = DateUtils.getDate((String) data.remove("DataTime"), DateUtils.yyyyMMddHHmmss_EN);
+ String yearAndMonth = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN);
Map<String, Object> result = new HashMap<>();
result.put("mac", mac);
result.put("time", time);
result.put("version", version);
- result.put("timeUnits", Constants.UN_ADJUST);
+ result.put("timeUnits", yearAndMonth + "_" + Constants.UN_ADJUST);
result.put("value", JSONObject.toJSONString(data));
//������������������insert
historyHourlyMapper.insertHistoryHourly(result);
@@ -63,13 +62,9 @@
//������������
dataAdjust = deviceService.adjustDeviceData(dataAdjust);
- HistoryHourly historyHourly = new HistoryHourly();
- historyHourly.setMac(mac);
- historyHourly.setTime(time);
- historyHourly.setVersion(version);
- historyHourly.setValue(JSONObject.toJSONString(dataAdjust));
-
//������������������insert
- historyHourlyMapper.insert(historyHourly);
+ result.put("timeUnits", yearAndMonth);
+ result.put("value", JSONObject.toJSONString(dataAdjust));
+ historyHourlyMapper.insertHistoryHourly(result);
}
}
diff --git a/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml b/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml
index 13bc1fe..701b196 100644
--- a/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml
+++ b/screen-manage/src/main/resources/mapper/HistoryHourlyMapper.xml
@@ -2,14 +2,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.moral.api.mapper.HistoryHourlyMapper">
- <!-- ������������������������ -->
- <resultMap id="BaseResultMap" type="com.moral.api.entity.HistoryHourly">
- <result column="mac" property="mac"/>
- <result column="time" property="time"/>
- <result column="value" property="value"/>
- <result column="version" property="version"/>
- </resultMap>
-
<insert id="insertHistoryHourly">
INSERT INTO history_hourly_${timeUnits}
VALUES (#{mac}, #{time}, #{value}, #{version})
--
Gitblit v1.8.0