服务器推送消息方法总结及实现(java)

服务器推送消息方法总结及实现(java)

最近在进行web开发时,有用到服务端推送消息这个功能,相信大家在平常开发时,也经常会有这种需求。本文对常用的几种服务器推送消息方法进行整理和总结,并实现使用流的方式推送消息(java)。

服务器推送消息主要有一下几种方法:

  • 轮询
  • http流
  • websocket
  • http2.0

下面对各个方法一一进行介绍。

轮询

轮询分为短轮询和长轮询。
短轮询即浏览器定时向服务器发送请求,以此来更新数据的方法。如下图所示

服务器推送消息方法总结及实现(java)

(图片来自javascript高级程序设计第三版)

浏览器每隔一段时间向服务器发送一次请求,请求浏览器想要的数据。严格意义上讲:短轮询不是服务器推送的消息,获取的数据也不是实时的

实现原理:

在浏览器使用定时器setTimeout或setInterval即可。不进行讲解了。

长轮询长轮询是短轮询的一个翻版,或者叫改进版。浏览器向服务器发送一个请求看有没有数据,有数据就响应,没数据就保持该请求,知道有数据再返回。浏览器在服务器返回数据时再发送一个请求。这样浏览器就可以一直获取到最新的数据。长轮询的时间线如下图所示

服务器推送消息方法总结及实现(java)

(图片来自javascript高级程序设计第三版)

实现原理:

在请求响应时,再次发送一个数据请求即可。

http流

流不同于上述两种轮询,因为它在页面的整个生命周期内只使用一个 HTTP 连接。具体来说,就是浏览器向服务器发送一个请求,而服务器一直保持连接打开,然后周期性地向浏览器发送数据。

实现:

本例以spring boot框架为基础,github地址如下:https://github.com/xubaodian/JAVA-SSE

下载该实例,并启动。该实例端口号为10000。

我们先对实例进行验证测试,然后再讲解代码。

测试页面地址为:http://localhost:10000/subscribe.html

测试步骤如下:

1、进入http://localhost:10000/subscribe.html,页面如下图所示:

服务器推送消息方法总结及实现(java)

左侧是订阅消息的操作和展示页面,右侧是发布内容的页面。
2、左侧输入订阅消息主题,点击订阅,订阅相关主题消息,例如:输入财经新闻主题FinancialNews,点击订阅,这样就订阅了财经新闻了。
3、在右侧发布内容页面输入主题和内容,点击发布。这样就可以发布内容了。

测试结果如下:

服务器推送消息方法总结及实现(java)

订阅了财经主题新闻,右侧发布了5条新闻,3条财经新闻,一条天气新闻,一条时政新闻,订阅者收到了3条财经新闻推送信息,证明我们工程已经跑起来了,实现了http流推送的最基本功能。

下面,对工程代码进行分析:

java代码

该工程使用spring boot框架,项目端口号为10000,接口代码如下:

package com.xbd.pushdata.controller;

import com.xbd.pushdata.Utils.ReqContextUtils;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@RestController
public class SubscribeController {

    @RequestMapping("/subscribe")
    public void subscribe(HttpServletRequest req, HttpServletResponse res, @RequestParam("topic") String topic) {
        ReqContextUtils.addSubscrib(topic, req, res);
    }

    @RequestMapping("/publish")
    public void publish(@RequestParam("topic") String topic, @RequestParam("content") String content) {
        ReqContextUtils.publishMessage(topic, content);
    }

}

有两个接口:

"/subscribe"接口:用于消息订阅,该接口有一个参数topic,即订阅的消息主题。

"/publish"接口:发布消息接口,有两个参数,topic是发布消息主题,content是发布消息内容。

订阅和发布消息的才做都封装在ReqContextUtils类中,ReqContextUtils的代码如下,代码中注释比较多,不再讲解了:

package com.xbd.pushdata.Utils;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;

public class ReqContextUtils {
    //超时时间
    private static int DEFAULT_TIME_OUT = 60*60*1000;
    //订阅列表,存储所有主题的订阅请求,每个topic对应一个ArrayList,ArrayList里该topic的所有订阅请求
    private static HashMap<String, ArrayList<AsyncContext>> subscribeArray = new LinkedHashMap<>();

    //添加订阅消息
    public static void addSubscrib(String topic, HttpServletRequest request, HttpServletResponse response) {
        if (null == topic || "".equals(topic)) {
            return;
        }
        //设置响应头ContentType
        response.setContentType("text/event-stream");
        //设置响应编码类型
        response.setCharacterEncoding("UTF-8");
        //request.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);
        //支持异步响应
        //异步这个概念很多地方都有,就像处理文件时,不是一直等待文件读完,而是让它去读,cpu做其它事情,读完通知cpu来处理即可。
        AsyncContext actx = request.startAsync(request, response);
        actx.setTimeout(DEFAULT_TIME_OUT);
        //添加一些监听函数
        actx.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) throws IOException {
                System.out.println("推送结束");
            }

            @Override
            public void onTimeout(AsyncEvent event) throws IOException {
                System.out.println("推送超时");
            }

            @Override
            public void onError(AsyncEvent event) throws IOException {
                System.out.println("推送错误");
            }

            @Override
            public void onStartAsync(AsyncEvent event) throws IOException {
                System.out.println("推送开始");
            }
        });
        //将异步请求存入列表
        ArrayList<AsyncContext> actxList = subscribeArray.get(topic);
        if (null == actxList) {
            actxList = new ArrayList<AsyncContext>();
            subscribeArray.put(topic, actxList);
        }
        actxList.add(actx);
    }

    //获取订阅列表
    public static ArrayList<AsyncContext> getSubscribList(String topic) {
        return subscribeArray.get(topic);
    }

    //推送消息
    public static void publishMessage(String topic, String content) {
        //获取对应topic的订阅列表
        ArrayList<AsyncContext> actxList = subscribeArray.get(topic);
        if (null != actxList) {
            for(AsyncContext actx :actxList) {
                try {
                    PrintWriter out = actx.getResponse().getWriter();
                    out.print(content);
                    actx.getResponse().flushBuffer();
                    //out.flush();
                } catch (Exception e) {
                    e.printStackTrace();
                }

            }
        }
    }
}

前端代码

前端代码如下,主要就是2个请求,代码中有注释,不再讲解了。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>订阅消息</title>
    <style>
        .left-container {
            float: left;
            width: 350px;
            min-height: 300px;
            border-right: 3px solid #4b4b4b;
        }
        .left-container li{
            text-overflow: ellipsis;
            white-space: nowrap;
            overflow: hidden;
        }
        .right-container{
            padding-left: 30px;
            float: left;
            width: 350px;
        }
    </style>
</head>
<body>
<div class="left-container">
    <label>订阅主题</label>
    <input type="text" id="topic">
    <button onclick="subscribe()">订阅</button>
    <div>收到消息如下:</div>
    <ul id="message"></ul>
</div>
<div class="right-container">
    <div>
        <label>消息主题</label>
        <input type="text" id="pub_topic">
    </div>
    <div>
        <label>消息内容</label>
        <input type="text" id="pub_content">
    </div>
    <button onclick="publish()">发布</button>
    <div>发布消息和内容如下:</div>
    <ul id="pub_message"></ul>
</div>

<script>
    function subscribe() {
        let topic = document.getElementById('topic').value;
        let url = location.origin + '/subscribe?topic=' + topic;
        send(url, null, process);
    }

    //发送订阅消息
    function send(url, data, callback) {
        let xhr = new XMLHttpRequest();
        xhr.onreadystatechange = function(){
            //http流的响应时,xhr.readyState为3
            if (xhr.readyState == 3 || xhr.readyState == 4){
                if (callback) {
                    callback(xhr.responseText);
                }
            }
        };
        xhr.open('get', url, true);
        xhr.send(data);
    }

    let len = 0;
    //处理订阅消息
    function process(messsage) {
        let li = document.createElement('li');
        li.innerHTML = messsage.substr(len);
        len = messsage.length;
        let ul = document.getElementById('message');
        ul.appendChild(li);
    }

    //发布消息
    function publish() {
        let topic = document.getElementById('pub_topic').value;
        let content = document.getElementById('pub_content').value;
        let url = location.origin + '/publish?topic=' + topic + '&content=' + content;
        send(url, null, null);
        let li = document.createElement('li');
        li.innerHTML = `发布主题:${topic}; 发布内容:${content}`;
        let ul = document.getElementById('pub_message');
        ul.appendChild(li);
    }
</script>
</body>
</html>

webSocket推送消息

Web Sockets 的是在一个单独的持久连接上提供全双工、双向通信。在 JavaScript 中创建了 Web Socket 之后,会有一个 HTTP 请求发送到浏览器以发起连接。在取得服务器响应后,建立的连接会从 HTTP 协议升级为 Web Socket 协议。

使用spring框架可以很容易实现websocket,这是spring实现websocket的官方教程(非常详细)地址:https://spring.io/guides/gs/messaging-stomp-websocket/ ,需要的可以移步官方网页学习。

http2.0

http2.0的特点是首部压缩,多路复用,请求响应管线化,服务器推送等等,这些特点是建立在http2.0流的基础上的。

具体想要学习http2.0的同学可以上网找下资料,这里只是提一下,不做过多描述了。