pom.xml
@@ -42,6 +42,7 @@ <io.springfox.version>2.7.0</io.springfox.version> <swagger-bootstrap-ui.version>1.9.6</swagger-bootstrap-ui.version> <spring.kafka.version>2.2.2</spring.kafka.version> <xxl-job.version>2.3.0</xxl-job.version> </properties> @@ -116,7 +117,11 @@ <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>${xxl-job.version}</version> </dependency> </dependencies> screen-api/src/main/resources/mapper/MenuMapper.xml
@@ -28,7 +28,7 @@ AND gm.menu_id = m.id AND ug.is_delete = 0 AND gm.is_delete = 0 AND gm.channel_key = 1 AND gm.channel_key = 0 AND m.is_delete = 0 </select> @@ -38,7 +38,7 @@ FROM `menu` m, `organization_menu` om WHERE om.organization_id = #{orgId} AND m.id = om.menu_id AND om.channel_key = 1 AND om.channel_key = 0 AND om.menu_id = m.id AND om.is_delete = 0 AND m.is_delete = 0 screen-common/src/main/java/com/moral/constant/Constants.java
@@ -113,6 +113,12 @@ * 采购商字典类型 * */ public static final String SYSTEM_DICT_TYPE_PURCHASER = "purchaser"; /* * 未校准数据表后缀 * */ public static final String UN_ADJUST = "unadjust"; } screen-common/src/main/java/com/moral/constant/KafkaConstants.java
New file @@ -0,0 +1,19 @@ package com.moral.constant; public class KafkaConstants { /** * 分钟数据主题 */ public static final String TOPIC_MINUTE = "test_topic"; /** * 小时数据主题 */ public static final String TOPIC_HOUR = "hour"; /** * 消费组 */ public static final String GROUP_ID = "test"; } screen-common/src/main/java/com/moral/constant/RedisConstants.java
@@ -28,4 +28,14 @@ * 使用Map<Integer,Sensor>接收 * */ public static final String SENSOR_KEY = "sensor"; /* * 设备信息前缀 * */ public static final String DEVICE = "device"; /* * 设备状态信息前缀 * */ public static final String STATE = "state"; } screen-job/pom.xml
@@ -14,12 +14,6 @@ <dependencies> <dependency> <groupId>org.moral</groupId> <artifactId>screen-common</artifactId> <version>1.0-SNAPSHOT</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> screen-job/src/main/java/com/moral/api/config/xxl/XxlJobConfig.java
New file @@ -0,0 +1,135 @@ package com.moral.api.config.xxl; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import com.xxl.job.core.executor.impl.XxlJobSpringExecutor; @Configuration @ComponentScan(basePackages = "com.moral.api.jobHandler") @Slf4j public class XxlJobConfig { @Value("${xxl.job.admin.addresses}") private String adminAddresses; @Value("${xxl.job.executor.appname}") private String appName; @Value("${xxl.job.executor.ip}") private String ip; @Value("${xxl.job.executor.port}") private Integer port; @Value("${xxl.job.accessToken}") private String accessToken; @Value("${xxl.job.executor.logpath}") private String logPath; @Value("${xxl.job.executor.logretentiondays}") private Integer logRetentionDays; @Bean public XxlJobSpringExecutor xxlJobSpringExecutor(){ log.info("xxl jon config init"); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appName); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays); return xxlJobSpringExecutor; } @Override public String toString() { return "XxlJobConfig{" + "adminAddresses='" + adminAddresses + '\'' + ", appName='" + appName + '\'' + ", ip='" + ip + '\'' + ", port=" + port + ", accessToken='" + accessToken + '\'' + ", logPath='" + logPath + '\'' + ", logRetentionDays=" + logRetentionDays + '}'; } public XxlJobConfig() { } public XxlJobConfig(String adminAddresses, String appName, String ip, Integer port, String accessToken, String logPath, Integer logRetentionDays) { this.adminAddresses = adminAddresses; this.appName = appName; this.ip = ip; this.port = port; this.accessToken = accessToken; this.logPath = logPath; this.logRetentionDays = logRetentionDays; } public String getAdminAddresses() { return adminAddresses; } public void setAdminAddresses(String adminAddresses) { this.adminAddresses = adminAddresses; } public String getAppName() { return appName; } public void setAppName(String appName) { this.appName = appName; } public String getIp() { return ip; } public void setIp(String ip) { this.ip = ip; } public Integer getPort() { return port; } public void setPort(Integer port) { this.port = port; } public String getAccessToken() { return accessToken; } public void setAccessToken(String accessToken) { this.accessToken = accessToken; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public Integer getLogRetentionDays() { return logRetentionDays; } public void setLogRetentionDays(Integer logRetentionDays) { this.logRetentionDays = logRetentionDays; } } screen-job/src/main/java/com/moral/api/jobHandler/TestHandler.java
New file @@ -0,0 +1,21 @@ package com.moral.api.jobHandler; import org.springframework.stereotype.Component; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.context.XxlJobHelper; import com.xxl.job.core.handler.annotation.XxlJob; @Component public class TestHandler{ @XxlJob("testJobHandler") public ReturnT<String> execute(){ //执行器传参 String param = XxlJobHelper.getJobParam(); System.out.println(param); System.out.println("定时任务测试"); return ReturnT.SUCCESS; } } screen-job/src/main/resources/application-dev.yml
@@ -1,5 +1,5 @@ server: port: 8082 port: 8083 tomcat: uri-encoding: UTF-8 #最小线程数 @@ -79,7 +79,30 @@ call-setters-on-nulls: true log-impl: org.apache.ibatis.logging.stdout.StdOutImpl logging: config: classpath:logback.xml xxl: job: admin: # 调度中心部署地址:如调度中心集群部署存在多个地址则用逗号分隔。执行器将会使用该地址进行"执行器心跳注册"和"任务结果回调" addresses: http://172.16.44.70:8090/xxl-job-admin #执行器通讯TOKEN,无TOKEN则可以空白 accessToken: #执行器配置,可以配置多个 executor: # 执行器的名字和地址信息配置:是该执行器心跳注册分组依据; #地址信息用于"调度中心请求并触发任务"和"执行器注册"。 #执行器默认端口为9999,执行器IP默认为空表示自动获取IP,多网卡时可手动设置指定IP,该IP不会绑定Host仅作为通讯实用。 #单机部署多个执行器时,注意要配置不同执行器端口 appname: xxl-job-executor-sample-springboot ip: port: 9999 #执行器运行日志文件存储的磁盘位置,需要对该路径拥有读写权限 logpath: /data/applogs/xxl-job/jobhandler #执行器日志文件定期清理功能,指定日志保存天数,过期自动删除,最少保存3天否则不生效,-1则不启动。 logretentiondays: -1 screen-job/src/main/resources/logback.xml
New file @@ -0,0 +1,29 @@ <?xml version="1.0" encoding="UTF-8"?> <configuration debug="false" scan="true" scanPeriod="1 seconds"> <contextName>logback</contextName> <property name="log.path" value="/data/applogs/xxl-job/xxl-job-executor-sample-springboot.log"/> <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%d{HH:mm:ss.SSS} %contextName [%thread] %-5level %logger{36} - %msg%n</pattern> </encoder> </appender> <appender name="file" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${log.path}</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${log.path}.%d{yyyy-MM-dd}.zip</fileNamePattern> </rollingPolicy> <encoder> <pattern>%date %level [%thread] %logger{36} [%file : %line] %msg%n </pattern> </encoder> </appender> <root level="info"> <appender-ref ref="console"/> <appender-ref ref="file"/> </root> </configuration> screen-manage/src/main/java/com/moral/api/controller/TestController.java
@@ -8,6 +8,7 @@ import com.moral.util.PageResult; import com.moral.util.TokenEncryptUtils; import com.moral.util.TokenUtils; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; @@ -18,6 +19,7 @@ import org.springframework.web.bind.annotation.*; import javax.annotation.Resource; import java.io.*; @@ -32,6 +34,7 @@ @Resource private TestService testService; /** * name 姓名 * email 郵箱 @@ -49,6 +52,7 @@ return ResultMessage.ok(); } /** * page 當前頁 * size 每頁大小 @@ -99,9 +103,12 @@ * kafka測試 */ @ApiOperation(value = "kafka測試", notes = "kafka測試") @ApiImplicitParams({ @ApiImplicitParam(name = "token", value = "token", required = true, paramType = "header", dataType = "String") }) @RequestMapping(value = "kafkaTest", method = RequestMethod.GET) public void kafkaTest() { kafkaTemplate.send("test_topic","test111111111111111"); kafkaTemplate.send("test_topic", "{'mac': 'p5dnd1234567','DataTime':1623058244104,'e1':10,'e2':20,'ver':2}"); } @GetMapping("testToken") screen-manage/src/main/java/com/moral/api/kafka/consumer/KafkaReceiver.java
New file @@ -0,0 +1,97 @@ package com.moral.api.kafka.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import java.util.HashMap; import java.util.Map; import com.alibaba.fastjson.JSON; import com.moral.api.service.DeviceService; import com.moral.api.service.HistoryHourlyService; import com.moral.api.service.HistoryMinutelyService; import com.moral.constant.KafkaConstants; /*@Slf4j @Component public class KafkaReceiver { @Autowired private HistoryMinutelyService historyMinutelyService; @Autowired private HistoryHourlyService historyHourlyService; @Autowired private DeviceService deviceService; //分钟数据 @KafkaListener(topics = KafkaConstants.TOPIC_MINUTE, groupId = KafkaConstants.GROUP_ID, containerFactory = "kafkaListenerContainerFactory") public void listenMinute(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { Map<String, Object> data = JSON.parseObject(msg, HashMap.class); System.out.println(data); Object mac = data.get("mac"); Object time = data.get("DataTime"); Object ver = data.get("ver"); if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) { log.warn("some properties is null, param[0] message:" + msg); return; } Map<String, Object> deviceInfo = deviceService.getDeviceByMac(mac.toString()); if (deviceInfo == null) { String deviceRealState = "null or deleted"; log.warn("device record is " + deviceRealState + ", param[0] message:" + msg); return; } //清除毫秒,四舍五入 data.put("DataTime", Math.round(new Double((Long) time) / 1000) * 1000); //存入数据库 historyMinutelyService.insertHistoryMinutely(data); ack.acknowledge(); } catch (Exception e) { log.error("param[0] message:" + msg); } } //小时数据 @KafkaListener(topics = KafkaConstants.TOPIC_HOUR, groupId = KafkaConstants.GROUP_ID, containerFactory = "kafkaListenerContainerFactory") public void listenHour(ConsumerRecord<String, String> record, Acknowledgment ack) { String msg = record.value(); try { Map<String, Object> data = JSON.parseObject(msg, HashMap.class); System.out.println(data); Object mac = data.get("mac"); Object time = data.get("DataTime"); Object ver = data.get("ver"); if (StringUtils.isEmpty(ver) || StringUtils.isEmpty(time) || StringUtils.isEmpty(mac)) { log.warn("some properties is null, param[0] message:" + msg); return; } Map<String, Object> deviceInfo = deviceService.getDeviceByMac(mac.toString()); if (deviceInfo == null) { String deviceRealState = "null or deleted"; log.warn("device record is " + deviceRealState + ", param[0] message:" + msg); return; } //清除毫秒,四舍五入 data.put("DataTime", Math.round(new Double((Long) time) / 1000) * 1000); //存入数据库 historyHourlyService.insertHistoryHourly(data); ack.acknowledge(); } catch (Exception e) { log.error("param[0] message:" + msg); } } }*/ screen-manage/src/main/java/com/moral/api/mapper/HistoryMinutelyMapper.java
New file @@ -0,0 +1,20 @@ package com.moral.api.mapper; import java.util.Map; import com.moral.api.entity.HistoryHourly; import com.baomidou.mybatisplus.core.mapper.BaseMapper; /** * <p> * 分钟表 Mapper 接口 * </p> * * @author moral * @since 2021-06-04 */ public interface HistoryMinutelyMapper{ int insertHistoryMinutely(Map<String,Object> params); } screen-manage/src/main/java/com/moral/api/service/DeviceService.java
@@ -52,4 +52,13 @@ //根据设备id查询设备组织,站点,维护人,行业,工艺,设备检测器,采购商等信息 Map<String, Object> selectDeviceInfoById(Integer deviceId); //根据mac获取设备信息 Map<String, Object> getDeviceByMac(String mac); //设备数据校准 Map<String, Object> adjustDeviceData(Map<String, Object> deviceData, Map<String, Object> deviceInfo); //判断并缓存设备状态 Map<String, Object> judgeDeviceState(Map<String, Object> data, Map<String, Object> deviceInfo); } screen-manage/src/main/java/com/moral/api/service/HistoryMinutelyService.java
New file @@ -0,0 +1,20 @@ package com.moral.api.service; import java.util.Map; import com.baomidou.mybatisplus.extension.service.IService; import com.moral.api.entity.HistoryHourly; /** * <p> * 小时表 服务类 * </p> * * @author moral * @since 2021-06-04 */ public interface HistoryMinutelyService { void insertHistoryMinutely(Map<String, Object> deviceData); } screen-manage/src/main/java/com/moral/api/service/impl/DeviceServiceImpl.java
@@ -1,5 +1,6 @@ package com.moral.api.service.impl; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -22,11 +23,12 @@ import com.moral.api.util.LogUtils; import com.moral.constant.Constants; import com.moral.redis.RedisUtil; import com.moral.constant.RedisConstants; import com.moral.util.ConvertUtils; import com.moral.util.DateUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.context.request.RequestContextHolder; @@ -75,6 +77,49 @@ @Autowired private LogUtils logUtils; @Autowired private RedisTemplate redisTemplate; /* * 从redis获取设备信息 * */ private Map<String, Object> getDeviceInfoFromRedis(String mac) { Map<String, Object> deviceInfo = (Map<String, Object>) redisTemplate.opsForValue().get(RedisConstants.DEVICE + mac); return deviceInfo; } /* * 设备信息存入redis */ private void setDeviceInfoToRedis(String mac, Map<String, Object> deviceInfo) { redisTemplate.opsForValue().set(getDeviceKey(mac), deviceInfo); } /* * 从redis删除设备信息 */ private void delDeviceInfoFromRedis(String mac) { redisTemplate.delete(getDeviceKey(mac)); } /* * 获取设备信息在redis里的key */ private String getDeviceKey(String mac) { return keysConnect(RedisConstants.DEVICE, mac); } //redis key前缀 private String keysConnect(String... keys) { StringBuilder key = new StringBuilder(keys[0]); for (int i = 1; i < keys.length; i++) { key.append("_"); key.append(keys[i]); } return key.toString().toLowerCase(); } @Override @Transactional public void insert(Device device) { @@ -83,12 +128,15 @@ deviceMapper.insert(device); Map<String, Object> deviceInfo = selectDeviceInfoById(device.getId()); //新增设备信息存入redis RedisUtil.del("device_" + device.getMac()); RedisUtil.set("device_" + device.getMac(), deviceInfo); String mac = device.getMac(); //从redis中删除设备信息 delDeviceInfoFromRedis(mac); //设备信息存入redis setDeviceInfoToRedis(mac, deviceInfo); //操作日志记录 HttpServletRequest request = ((ServletRequestAttributes) Objects.requireNonNull(RequestContextHolder.getRequestAttributes())).getRequest(); StringBuilder content = new StringBuilder(); content.append("添加了设备:").append(device.getName()).append(";").append("mac:").append(device.getMac()); content.append("添加了设备:").append(device.getName()).append(";").append("mac:").append(mac); logUtils.saveOperationForManage(request, content.toString(), Constants.INSERT_OPERATE_TYPE); } @@ -100,8 +148,8 @@ deviceMapper.update(null, updateWrapper); Device device = deviceMapper.selectById(deviceId); String mac = device.getMac(); //清除redis RedisUtil.del("device_" + mac); //从redis中删除设备信息 delDeviceInfoFromRedis(mac); //操作日志记录 HttpServletRequest request = ((ServletRequestAttributes) Objects.requireNonNull(RequestContextHolder.getRequestAttributes())).getRequest(); StringBuilder content = new StringBuilder(); @@ -116,10 +164,11 @@ Device oldDevice = deviceMapper.selectById(deviceId); deviceMapper.updateById(device); String mac = deviceMapper.selectById(deviceId).getMac(); //更新redis RedisUtil.del("device_" + mac); //从redis中删除设备信息 delDeviceInfoFromRedis(mac); Map<String, Object> deviceInfo = selectDeviceInfoById(deviceId); RedisUtil.set("device_" + mac, deviceInfo); //设备信息存入redis setDeviceInfoToRedis(mac, deviceInfo); //操作日志记录 HttpServletRequest request = ((ServletRequestAttributes) Objects.requireNonNull(RequestContextHolder.getRequestAttributes())).getRequest(); StringBuilder content = new StringBuilder(); @@ -249,7 +298,7 @@ @Override public Map<String, Object> selectDeviceInfoById(Integer deviceId) { String mac = deviceMapper.selectById(deviceId).getMac(); Map<String, Object> deviceInfo = (Map<String, Object>) RedisUtil.get("device_" + mac); Map<String, Object> deviceInfo = getDeviceInfoFromRedis(mac); //先从redis中取 if (deviceInfo != null) { return deviceInfo; @@ -317,7 +366,7 @@ mpInfo.put("name", monitorPoint.getName()); deviceInfo.put("monitorPoint", mpInfo); RedisUtil.set("device_" + mac, deviceInfo); setDeviceInfoToRedis(mac, deviceInfo); return deviceInfo; } @@ -335,4 +384,30 @@ return monitorPointMapper.selectMaps(queryWrapper); } @Override public Map<String, Object> getDeviceByMac(String mac) { Map<String, Object> deviceInfo = getDeviceInfoFromRedis(mac); if (deviceInfo == null) { QueryWrapper<Device> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("mac", mac).eq("is_delete", Constants.NOT_DELETE); Device device = deviceMapper.selectOne(queryWrapper); if (device != null) { deviceInfo = selectDeviceInfoById(device.getId()); setDeviceInfoToRedis(mac, deviceInfo); } } return deviceInfo; } @Override public Map<String, Object> adjustDeviceData(Map<String, Object> deviceData, Map<String, Object> deviceInfo) { return null; } @Override public Map<String, Object> judgeDeviceState(Map<String, Object> deviceData, Map<String, Object> deviceInfo) { return null; } } screen-manage/src/main/java/com/moral/api/service/impl/HistoryMinutelyServiceImpl.java
New file @@ -0,0 +1,63 @@ package com.moral.api.service.impl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.Date; import java.util.HashMap; import java.util.Map; import com.alibaba.fastjson.JSON; import com.moral.api.mapper.HistoryMinutelyMapper; import com.moral.api.service.DeviceService; import com.moral.api.service.HistoryMinutelyService; import com.moral.constant.Constants; import com.moral.util.DateUtils; @Service public class HistoryMinutelyServiceImpl implements HistoryMinutelyService { @Autowired private HistoryMinutelyMapper historyMinutelyMapper; @Autowired private DeviceService deviceService; @Override public void insertHistoryMinutely(Map<String, Object> deviceData) { Map<String, Object> result = new HashMap<>(); Object mac = deviceData.remove("mac"); result.put("mac", mac); result.put("version", deviceData.remove("ver")); Date time = new Date((Long) deviceData.remove("DataTime")); result.put("time", DateUtils.dateToDateString(time)); result.put("value", JSON.toJSONString(deviceData)); String timeUnits = DateUtils.dateToDateString(time, DateUtils.yyyyMM_EN); result.put("timeUnits", tableSuffix(timeUnits, Constants.UN_ADJUST)); //未校准 historyMinutelyMapper.insertHistoryMinutely(result); //设备信息 Map<String, Object> deviceInfo = deviceService.getDeviceByMac(mac.toString()); //设备数据校准,并存入数据库 result.put("timeUnits", timeUnits); deviceData = deviceService.adjustDeviceData(deviceData, deviceInfo); result.put("value", JSON.toJSONString(deviceData)); historyMinutelyMapper.insertHistoryMinutely(result); //判断设备状态 Map<String, Object> deviceState = deviceService.judgeDeviceState(deviceData, deviceInfo); } //表后缀 private String tableSuffix(String... keys) { StringBuilder key = new StringBuilder(keys[0]); for (int i = 1; i < keys.length; i++) { key.append("_"); key.append(keys[i]); } return key.toString(); } } screen-manage/src/main/resources/mapper/HistoryMinutelyMapper.xml
New file @@ -0,0 +1,9 @@ <?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.moral.api.mapper.HistoryMinutelyMapper"> <insert id="insertHistoryMinutely"> INSERT INTO history_minutely_${timeUnits} VALUES (#{mac}, #{time}, #{value}, #{version}) </insert> </mapper>