package com.moral.api.websocket; import com.moral.api.entity.Device; import com.moral.api.entity.MonitorPoint; import com.moral.api.entity.Sensor; import com.moral.api.entity.UnitConversion; import com.moral.constant.RedisConstants; import lombok.Data; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.PathVariable; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; /** * @ClassName SingleDeviceServer * @Description TODO * @Author 陈凯裕 * @Date 2021/6/15 13:56 * @Version TODO **/ @ServerEndpoint("/singleDevice/{mac}") @Component @Data public class SingleDeviceServer { //线程安全集合,用于存放server对象 public static CopyOnWriteArraySet sockets = new CopyOnWriteArraySet<>(); public static RedisTemplate redisTemplate; private Session session; private String mac; private Device deviceAlarmInfo; private List unitConversions; private Map regionAqi; @OnOpen public void onOpen(Session session, @PathParam("mac") String mac) throws Exception { this.session = session; this.mac = mac; this.deviceAlarmInfo = (Device) redisTemplate.opsForHash().get(RedisConstants.DEVICE_INFO, mac); this.unitConversions = redisTemplate.opsForList().range(RedisConstants.UNIT_CONVERSION, 0, -1); //获取设备地区对应的AQI用于补偿使用 Map deviceInfo = (Map) redisTemplate.opsForHash().get(RedisConstants.DEVICE, mac); Map monitorPointMap = (Map) deviceInfo.get("monitorPoint"); String areaCode = String.valueOf(monitorPointMap.get("areaCode")); String cityCode = String.valueOf(monitorPointMap.get("cityCode")); try { this.regionAqi = (Map) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, areaCode); if (ObjectUtils.isEmpty(this.regionAqi)) this.regionAqi = (Map) redisTemplate.opsForHash().get(RedisConstants.AQI_DATA, cityCode); } catch (Exception e) { e.printStackTrace(); } sockets.add(this); } @OnClose public void onClose() { sockets.remove(this); } @OnMessage public void onMessage(String message, Session session) { System.out.println(message); } @OnError public void onError(Session session, Throwable error) { } public void sendMessage(String message) throws Exception { if (this.session.isOpen()) { // synchronized (session) { this.session.getBasicRemote().sendText(message); // } } } }