SpringBoot中使用websocket.md
最近有这样一个需求,网关厂家将物联设备接入我司云平台的时候,希望能看到上报设备数据的关键日志,以方便调试。
首先想到的就是使用websocket推送。浏览器发起websocket连接,发送订阅消息,然后往这个连接session中推送日志。
整个设计流程如下图:
1.实现
我们设计两个类,一个类命名为WebSocketServer 用来管理websocket连接以及发送消息;另一个类命名为WebSocketBus用来管理WebSocketServer 对象,以及接收设备日志后匹配对应的WebSocketServer 对象,将日志信息推送到浏览器。
Talk is cheap, show me the code!
@ServerEndpoint(value="/websocket/message")
@Component
public class WebSocketServer {
private Logger log = Logger.getLogger("WebSocket");
private WebSocketBus webSocketBus;
private Session session;//与某个客户端的连接会话,需要通过它来给客户端发送数据
private String key;//订阅日志的标识
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
//在线数加1
String success = "websocket连接成功!";
try {
sendMessage(success);
} catch (IOException e) {
log.error(e,e);
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
webSocketBus.removeServer(key,this); //从set中删除
log.info("============= 有一连接关闭!key=" + key+",sessionId="+session.getId());
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息*/
@OnMessage
public void onMessage(String message, Session session) {
if(StringUtil.isEmpty(message)) {
return;
}
HashMap<String,String> javaObject = JacksonUtil.toJavaObject(message, HashMap.class);
if(javaObject == null){
log.info("unknown message: " + message);
return ;
}
String operation = javaObject.get("operation");
String key = javaObject.get("key");
String uuid=(String)javaObject.get("uuid");
if(StringUtil.isEmpty(operation)){
log.info("unknown operation : " + message);
return ;
}
if("register".equals(operation)){
this.key = key;
getWebSocketBus().addServer(key, this);
log.info("[+] register key="+key+",account="+",uuid="+uuid);
}else if("unRegister".equals(operation)){
webSocketBus.removeServer(key, this);
log.info("[+] unregister key="+key+",account="+",uuid="+uuid);
}
}
private WebSocketBus getWebSocketBus() {
// TODO Auto-generated method stub
if(this.webSocketBus == null) {
webSocketBus = (WebSocketBus) SpringContextUtil.getBean("webSocketBus");
}
return webSocketBus;
}
/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
log.error(error,error);
}
public void sendMessage(String message) throws IOException {
// this.session.getBasicRemote().sendText(message);
this.session.getAsyncRemote().sendText(message);
}
public String getkey() {
return key;
}
}
@Component
public class WebSocketBus {
private Logger log = Logger.getLogger(this.getClass());
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Value(value="${LogRegistTopic}")
private String logRegister;//订阅|取消订阅topic
private static AtomicInteger onlineCount = new AtomicInteger(0);
private Map<String,Set<WebSocketServer>> sessionCache = new ConcurrentHashMap<>();
private Gson gson = new GsonBuilder().create();//线程安全的,大胆用吧
@KafkaListener(id = "log",topics = {"${LogTopic}"})
public void listen(ConsumerRecord<String, ?> record) {
Optional kafkaMessage = Optional.ofNullable(record.value());
Optional<String> kafkaKey = Optional.ofNullable(record.key());
if (kafkaKey.isPresent()) {
Object value = kafkaMessage.get();
String key = kafkaKey.get();
GatewayFormatLog gatewayLog = gson.fromJson((String)value, GatewayFormatLog.class);
if(sessionCache.containsKey(key)) {
Set<WebSocketServer> set = sessionCache.get(key);
for(WebSocketServer server :set) {
try {
server.sendMessage(gatewayLog.getMessage().toString());
} catch (IOException e) {
log.error(e,e);
}
}
}
}
}
public void addServer(String key,WebSocketServer server) {
boolean notRegisted = true;
if(sessionCache.containsKey(key)) {
Set<WebSocketServer> set = sessionCache.get(key);
if(set.contains(server)){
notRegisted = false;
}else {
set.add(server);
}
}else {
Set<WebSocketServer> set = new CopyOnWriteArraySet<WebSocketServer>();
set.add(server);
sessionCache.put(key, set);
}
if(notRegisted) {
kafkaTemplate.send(logRegister, gson.toJson(new DataEvent("register",key)));
}
}
public void removeServer(String key,WebSocketServer server) {
if(sessionCache.containsKey(key)) {
Set<WebSocketServer> set = sessionCache.get(key);
set.remove(server);
kafkaTemplate.send(logRegister, gson.toJson(new DataEvent("unregister",key)));
}
}
public static int getOnlineCount() {
return onlineCount.get();
}
public static void addOnlineCount() {
onlineCount.incrementAndGet();
}
public static synchronized void subOnlineCount() {
onlineCount.decrementAndGet();
}
}
WebSocketServer负责建立连接,以及收发websocket消息。浏览器每发起一个连接,都对应一个WebSocketServer对象。
WebSocketBus管理多个WebSocketServer对象。执行过程如下:
1.WebSocketServer的onMessage接收订阅消息,并将订阅消息交给WebSocketBus处理
2.1 如果是订阅消息,执行WebSocketBus.addServer,同一个key对应一个或多个websocket连接(就是多个客户端订阅了同一个key)。我们用CopyOnWriteArraySet存放WebSocketServer,如果是同一个对象,则不会重复添加。如果这个key没有被订阅过,就往kafka中发一条订阅消息。设备服务消费
2.2 如果是取消订阅服务,过程类似,往kafka中发一条消息订阅消息。
3.设备服务会将订阅的日志发送到Kafka中。WebSocketBus的listen来消费,根据key匹配到多个WebSocketServer,向每一个WebSocketServer推送消息。
下面是前端的测试代码:
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>WebSocket/SockJS)</title>
<script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>
<script type="text/javascript">
var websocket = null;
//判断当前浏览器是否支持WebSocket
if('WebSocket' in window){
websocket = new WebSocket("ws://localhost:9100/websocket/message");
}
else{
alert('Not support websocket')
}
//连接发生错误的回调方法
websocket.onerror = function(){
setMessageInnerHTML("error");
};
//连接成功建立的回调方法
websocket.onopen = function(event){
setMessageInnerHTML("open");
}
//接收到消息的回调方法
websocket.onmessage = function(){
setMessageInnerHTML(event.data);
}
//连接关闭的回调方法
websocket.onclose = function(){
setMessageInnerHTML("close");
}
//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
window.onbeforeunload = function(){
websocket.close();
}
//将消息显示在网页上
function setMessageInnerHTML(innerHTML){
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
//关闭连接
function closeWebSocket(){
websocket.close();
}
//发送消息
function send(){
var message = document.getElementById('text').value;
websocket.send(message);
}
</script>
</head>
<body>
Welcome<br/>
<input id="text" type="text" />
<button onclick="send()">Send</button>
<button onclick="closeWebSocket()">Close</button>
<div id="message"></div>
</body>
</html>
2. 使用@ServerEndpoint无法注入Bean
你可能注意到了,WebSocketServer中使用WebSocketBus时,并没有使用@Autowired,为什么呢?实际上使用@Autowired注入之后,没有注入成功,使用时webSocketBus还是为null。
我们在WebSocketServer类上使用了@Component注解。虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean。
查了一下源码:
public class DefaultServerEndpointConfigurator
extends ServerEndpointConfig.Configurator {
@Override
public <T> T getEndpointInstance(Class<T> clazz)
throws InstantiationException {
try {
return clazz.getConstructor().newInstance();
} catch (InstantiationException e) {
throw e;
} catch (ReflectiveOperationException e) {
InstantiationException ie = new InstantiationException();
ie.initCause(e);
throw ie;
}
}
}
使用@ServerEndpoint注解之后,无法自动注入Bean。每次创建一个新的连接之后,都是用反射创建一个对象,中间没有从Sprin容器中找相应的Bean。
所以我们要么自己获取Bean,要么将注入的Bean设置为static,让其注入到类上。
工具类获取Spring Bean,只需实现ApplicationContextAware 接口即可。
@Component
public class SpringContextUtil implements ApplicationContextAware {
private static ApplicationContext applicationContext; //Spring应用上下文环境
private static Properties properties=new Properties();
/**
* 实现ApplicationContextAware接口的回调方法,设置上下文环境
* @param applicationContext
* @throws BeansException
*/
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
SpringContextUtil.applicationContext = applicationContext;
}
/**
* @return ApplicationContext
*/
public static ApplicationContext getApplicationContext() {
return applicationContext;
}
* 获取对象
* @param name
* @return Object 一个以所给名字注册的bean的实例
* @throws BeansException
*/
public static Object getBean(String name) throws BeansException {
if(applicationContext==null) {
return null;
}
return applicationContext.getBean(name);
}
}