SpringBoot中使用websocket.md

最近有这样一个需求,网关厂家将物联设备接入我司云平台的时候,希望能看到上报设备数据的关键日志,以方便调试。
首先想到的就是使用websocket推送。浏览器发起websocket连接,发送订阅消息,然后往这个连接session中推送日志。

整个设计流程如下图:

SpringBoot中使用websocket.md

1.实现

我们设计两个类,一个类命名为WebSocketServer 用来管理websocket连接以及发送消息;另一个类命名为WebSocketBus用来管理WebSocketServer 对象,以及接收设备日志后匹配对应的WebSocketServer 对象,将日志信息推送到浏览器。

Talk is cheap, show me the code!

@ServerEndpoint(value="/websocket/message")
@Component
public class WebSocketServer {
    
    private Logger log = Logger.getLogger("WebSocket");
    
    private WebSocketBus webSocketBus;

    
    private Session session;//与某个客户端的连接会话,需要通过它来给客户端发送数据
    
    private String key;//订阅日志的标识
    
    
    

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(Session session) {
        this.session = session;  
         //在线数加1
   
        String success = "websocket连接成功!";
        try {
            sendMessage(success);
        } catch (IOException e) {
            log.error(e,e);
        }
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        webSocketBus.removeServer(key,this);  //从set中删除
        log.info("============= 有一连接关闭!key=" + key+",sessionId="+session.getId());
    }

    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息*/
    @OnMessage
    public void onMessage(String message, Session session) {
        if(StringUtil.isEmpty(message)) {
            return;
        }
        HashMap<String,String> javaObject = JacksonUtil.toJavaObject(message, HashMap.class);
        if(javaObject == null){
             log.info("unknown message: " + message);
             return ;
         }
        String operation = javaObject.get("operation");
        String key = javaObject.get("key");
        String uuid=(String)javaObject.get("uuid");
        if(StringUtil.isEmpty(operation)){
            log.info("unknown operation : " + message);
            return ;
        }
        
        if("register".equals(operation)){
            this.key = key;
            getWebSocketBus().addServer(key, this);
    
            log.info("[+]     register key="+key+",account="+",uuid="+uuid);
        }else if("unRegister".equals(operation)){
            webSocketBus.removeServer(key, this); 
            log.info("[+]     unregister key="+key+",account="+",uuid="+uuid);
        }
             
        
    }

    private WebSocketBus getWebSocketBus() {
        // TODO Auto-generated method stub
        if(this.webSocketBus == null) {
            webSocketBus =  (WebSocketBus) SpringContextUtil.getBean("webSocketBus");
        }
        return webSocketBus;
        
    }

    /**
     * 发生错误时调用
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error(error,error);
    }


    public void sendMessage(String message) throws IOException {
//        this.session.getBasicRemote().sendText(message);
        this.session.getAsyncRemote().sendText(message);
    }


    public String getkey() {
        return key;
    } 
}

@Component
public class WebSocketBus {
    private Logger log = Logger.getLogger(this.getClass());
    
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    
        
    @Value(value="${LogRegistTopic}")
    private String logRegister;//订阅|取消订阅topic
    
    private static AtomicInteger onlineCount = new AtomicInteger(0);
    
    private Map<String,Set<WebSocketServer>> sessionCache = new ConcurrentHashMap<>();
    private Gson gson = new GsonBuilder().create();//线程安全的,大胆用吧
    
    
    @KafkaListener(id = "log",topics = {"${LogTopic}"})
    public void listen(ConsumerRecord<String, ?> record) {
        Optional kafkaMessage = Optional.ofNullable(record.value());
        Optional<String> kafkaKey = Optional.ofNullable(record.key());
        if (kafkaKey.isPresent()) {
            Object value = kafkaMessage.get();
            String key = kafkaKey.get();
            GatewayFormatLog gatewayLog = gson.fromJson((String)value, GatewayFormatLog.class);
            if(sessionCache.containsKey(key)) {
                Set<WebSocketServer> set = sessionCache.get(key);
                for(WebSocketServer server :set) {
                    try {
                            server.sendMessage(gatewayLog.getMessage().toString());
                    } catch (IOException e) {
                        log.error(e,e);
                    }
                }
            }
        }
    }
    
    
    
    public void addServer(String key,WebSocketServer server) {
        boolean notRegisted = true;
        if(sessionCache.containsKey(key)) {
            Set<WebSocketServer> set = sessionCache.get(key);
            if(set.contains(server)){
                notRegisted = false;
            }else {
                set.add(server);
            }
            
        }else {
            Set<WebSocketServer> set = new  CopyOnWriteArraySet<WebSocketServer>();
            set.add(server);
            sessionCache.put(key, set);
        }
        if(notRegisted) {
            kafkaTemplate.send(logRegister, gson.toJson(new DataEvent("register",key)));
        }
        
    }
    
    public void removeServer(String key,WebSocketServer server) {
        if(sessionCache.containsKey(key)) {
            Set<WebSocketServer> set = sessionCache.get(key);
            set.remove(server);
            kafkaTemplate.send(logRegister, gson.toJson(new DataEvent("unregister",key)));
        }
    }
    
    public static int getOnlineCount() {
        return onlineCount.get();
    }

    public static void addOnlineCount() {
        onlineCount.incrementAndGet();
    }

    public static synchronized void subOnlineCount() {
        onlineCount.decrementAndGet();
    }

}

WebSocketServer负责建立连接,以及收发websocket消息。浏览器每发起一个连接,都对应一个WebSocketServer对象。
WebSocketBus管理多个WebSocketServer对象。执行过程如下:
1.WebSocketServer的onMessage接收订阅消息,并将订阅消息交给WebSocketBus处理
2.1 如果是订阅消息,执行WebSocketBus.addServer,同一个key对应一个或多个websocket连接(就是多个客户端订阅了同一个key)。我们用CopyOnWriteArraySet存放WebSocketServer,如果是同一个对象,则不会重复添加。如果这个key没有被订阅过,就往kafka中发一条订阅消息。设备服务消费
2.2 如果是取消订阅服务,过程类似,往kafka中发一条消息订阅消息。

3.设备服务会将订阅的日志发送到Kafka中。WebSocketBus的listen来消费,根据key匹配到多个WebSocketServer,向每一个WebSocketServer推送消息。

下面是前端的测试代码:


<!DOCTYPE html>
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <title>WebSocket/SockJS)</title>

    <script src="http://cdn.sockjs.org/sockjs-0.3.min.js"></script>

    <script type="text/javascript">
        var websocket = null;

        //判断当前浏览器是否支持WebSocket
        if('WebSocket' in window){
            websocket = new WebSocket("ws://localhost:9100/websocket/message");
        }
        else{
            alert('Not support websocket')
        }

        //连接发生错误的回调方法
        websocket.onerror = function(){
            setMessageInnerHTML("error");
        };

        //连接成功建立的回调方法
        websocket.onopen = function(event){
            setMessageInnerHTML("open");
        }

        //接收到消息的回调方法
        websocket.onmessage = function(){
            setMessageInnerHTML(event.data);
        }

        //连接关闭的回调方法
        websocket.onclose = function(){
            setMessageInnerHTML("close");
        }

        //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。
        window.onbeforeunload = function(){
            websocket.close();
        }

        //将消息显示在网页上
        function setMessageInnerHTML(innerHTML){
            document.getElementById('message').innerHTML += innerHTML + '<br/>';
        }

        //关闭连接
        function closeWebSocket(){
            websocket.close();
        }

        //发送消息
        function send(){
            var message = document.getElementById('text').value;
            websocket.send(message);
        }

    </script>
</head>
<body>
Welcome<br/>
<input id="text" type="text" />
<button onclick="send()">Send</button>
<button onclick="closeWebSocket()">Close</button>
<div id="message"></div>
</body>
</html>

2. 使用@ServerEndpoint无法注入Bean

你可能注意到了,WebSocketServer中使用WebSocketBus时,并没有使用@Autowired,为什么呢?实际上使用@Autowired注入之后,没有注入成功,使用时webSocketBus还是为null。
我们在WebSocketServer类上使用了@Component注解。虽然@Component默认是单例模式的,但springboot还是会为每个websocket连接初始化一个bean。
查了一下源码:

public class DefaultServerEndpointConfigurator
        extends ServerEndpointConfig.Configurator {

    @Override
    public <T> T getEndpointInstance(Class<T> clazz)
            throws InstantiationException {
        try {
            return clazz.getConstructor().newInstance();
        } catch (InstantiationException e) {
            throw e;
        } catch (ReflectiveOperationException e) {
            InstantiationException ie = new InstantiationException();
            ie.initCause(e);
            throw ie;
        }
    }
}

使用@ServerEndpoint注解之后,无法自动注入Bean。每次创建一个新的连接之后,都是用反射创建一个对象,中间没有从Sprin容器中找相应的Bean。
所以我们要么自己获取Bean,要么将注入的Bean设置为static,让其注入到类上。
工具类获取Spring Bean,只需实现ApplicationContextAware 接口即可。

@Component
public class SpringContextUtil implements ApplicationContextAware {
     private static ApplicationContext applicationContext;     //Spring应用上下文环境
     private static Properties properties=new Properties();
      /**
      * 实现ApplicationContextAware接口的回调方法,设置上下文环境   
      * @param applicationContext
      * @throws BeansException
      */
      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
          SpringContextUtil.applicationContext = applicationContext;
      }
     
      /**
      * @return ApplicationContext
      */
      public static ApplicationContext getApplicationContext() {
        return applicationContext;
      }
      
      * 获取对象   
      * @param name
      * @return Object 一个以所给名字注册的bean的实例
      * @throws BeansException
      */
      public static Object getBean(String name) throws BeansException {
          if(applicationContext==null) {
              return null;
          }
        return applicationContext.getBean(name);
      }
}