package com.moral.webSocketServer;
|
|
import java.io.IOException;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
|
import javax.websocket.OnClose;
|
import javax.websocket.OnError;
|
import javax.websocket.OnMessage;
|
import javax.websocket.OnOpen;
|
import javax.websocket.Session;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
|
import com.moral.common.util.ParameterUtils;
|
import com.moral.entity.Device;
|
import com.moral.service.DeviceService;
|
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Connection;
|
import com.rabbitmq.client.ConnectionFactory;
|
import com.rabbitmq.client.QueueingConsumer;
|
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
|
/**
|
* @ServerEndpoint 注解是一个类层次的注解,它的功能主要是将目前的类定义成一个websocket服务器端,
|
* 注解的值将被用于监听用户连接的终端访问URL地址,客户端可以通过这个URL来连接到WebSocket服务器端
|
*/
|
@Slf4j
|
@ServerEndpoint("/screen/webSocket/{param}")
|
@Component
|
public class WebSocketServer {
|
|
public static DeviceService deviceService;
|
|
// 与某个客户端的连接会话,需要通过它来给客户端发送数据
|
private Session session;
|
|
private String orgId;
|
|
private String accountId;
|
|
private String regionCode;
|
|
private Connection connection;
|
|
private Channel channel;
|
|
// 存放session的集合,很重要!!
|
private static CopyOnWriteArraySet<WebSocketServer> webSocketSet = new CopyOnWriteArraySet<WebSocketServer>();
|
|
/**
|
* 连接建立成功调用的方法
|
*
|
* @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据
|
*/
|
@OnOpen
|
public void onOpen(Session session, @PathParam("param") String param) {
|
this.session = session;
|
//这个一定要写,第一次很容易忽略!
|
webSocketSet.add(this);
|
int flag = param.indexOf("&");
|
int regionCodeIndex = param.indexOf("_");
|
orgId = param.substring(0, flag);
|
accountId = param.substring(flag + 1,regionCodeIndex);
|
regionCode = param.substring(regionCodeIndex + 1);
|
String QUEUE_NAME = "deviceInfo_" + accountId;
|
Map<String, Object> paramMap = new HashMap<String, Object>();
|
paramMap.put("orgId", orgId);
|
paramMap.put("regionCode", regionCode);
|
ParameterUtils.getRegionType4RegionCode(paramMap);
|
List<Device> deviceList = deviceService.queryDevice(paramMap);
|
try {
|
//打开连接和创建频道,与发送端一样
|
ConnectionFactory factory = new ConnectionFactory();
|
//设置MabbitMQ所在主机ip或者主机名
|
factory.setHost("47.96.15.25");
|
factory.setPort(5672);
|
factory.setUsername("guest");
|
factory.setPassword("guest_pass");
|
String routingKey;
|
connection = factory.newConnection();
|
channel = connection.createChannel();
|
//声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
|
channel.queueDeclare(QUEUE_NAME, false, false, true, null);
|
for (Device d : deviceList) {
|
routingKey = orgId + "." + d.getMac();
|
channel.queueBind(QUEUE_NAME, "screens_data", routingKey);
|
}
|
//创建队列消费者
|
QueueingConsumer consumer = new QueueingConsumer(channel);
|
//指定消费队列
|
channel.basicConsume(QUEUE_NAME, true, consumer);
|
|
while (true) {
|
//nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
|
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
|
String message = new String(delivery.getBody());
|
sendMessage(message);
|
}
|
} catch (Exception e) {
|
log.error(e.getMessage());
|
}
|
}
|
|
/**
|
* 连接关闭调用的方法
|
*/
|
@OnClose
|
public void onClose() {
|
/**从安全Set中 移除当前连接对象*/
|
webSocketSet.remove(this);
|
try {
|
connection.close();
|
} catch (IOException e) {
|
log.error(e.getMessage());
|
}
|
}
|
|
@OnMessage
|
public void onMessage(String message) {
|
for (WebSocketServer webSocketServer : webSocketSet) {
|
webSocketServer.sendMessage(message);
|
}
|
}
|
|
/* @RabbitListener(bindings = @QueueBinding(
|
value = @Queue(value = "deviceInfo", durable = "false", autoDelete = "true"),
|
exchange = @Exchange(value = "screens_data", durable = "true", type = "topic"),
|
key = "99.*"
|
))
|
@RabbitHandler //注解意思:如果有消息过来 需要消费的时候才会调用该方法
|
public void receiveMessage(@Payload String message, @Headers Map<String, Object> headers, Channel channel) throws IOException {
|
//sendMessage(message.toString());
|
onMessage(message);
|
*//* Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
|
//手动接受并告诉rabbitmq消息已经接受了 deliverTag记录接受消息 false不批量接受
|
channel.basicAck(deliveryTag, true);*//*
|
}*/
|
|
/**
|
* 服务器端推送消息
|
*/
|
public void sendMessage(String message) {
|
try {
|
if (session.isOpen()) {
|
this.session.getBasicRemote().sendText(message);
|
}
|
} catch (IOException e) {
|
log.error(e.getMessage());
|
}
|
}
|
|
/**
|
* 发生错误时调用
|
*
|
* @param session
|
* @param error
|
*/
|
@OnError
|
public void onError(Session session, Throwable error) {
|
log.error(error.getMessage());
|
}
|
|
}
|