WebSocket + kafka实时推送数据(springboot纯后台)
逻辑:kafka订阅消费者主题 → 消费后通过webSocket推送到前端
kafka:http://kafka.apachecn.org/
webSocket:学习引用 https://blog.****.net/moshowgame/article/details/80275084
项目结构:
kafka消费者service:
package com.xx.demo.service; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.serializer.StringDecoder; import kafka.utils.VerifiableProperties; import org.springframework.stereotype.Service; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * Created by csn on 2020/1/7. */ @Service public class KafkaComsumer { // 接收主题消费者 private static ConsumerConnector backconsumer; // 接收主题 public static String BACKTOPIC = "websocketTopic"; boolean starttype = false; public void initkafka() { if (starttype) { return; } try { initBackConsumer(); startBackConsumer(); starttype = true; } catch (Exception e) { starttype = false; backconsumer.shutdown(); } } /** * 初始化接收主题消费者 */ private static void initBackConsumer() { Properties props = new Properties(); // zookeeper 集群地址 props.put("zookeeper.connect", "xx.xx.xx.xx:2181,2xx.xx.xx.xx:2181,xx.xx.xx.xx:2181"); // group 代表一个消费组 props.put("group.id", "csnWebSocket-group"); //组名任意写 // zk连接超时 props.put("zookeeper.session.timeout.ms", "4000"); props.put("zookeeper.sync.time.ms", "200"); props.put("auto.commit.interval.ms", "5000"); props.put("auto.offset.reset", "largest"); props.put("enable.auto.commit", "false"); // 序列化类 props.put("serializer.class", "kafka.serializer.StringEncoder"); ConsumerConfig config = new ConsumerConfig(props); backconsumer = kafka.consumer.Consumer.createJavaConsumerConnector(config); } /** * 开启接收主题消费者线程 */ public static void startBackConsumer() { new Thread(new Runnable() { @Override public void run() { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(BACKTOPIC, new Integer(1)); StringDecoder keyDecoder = new StringDecoder( new VerifiableProperties()); StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties()); Map<String, List<KafkaStream<String, String>>> consumerMap = backconsumer .createMessageStreams(topicCountMap, keyDecoder, valueDecoder); KafkaStream<String, String> stream = consumerMap.get(BACKTOPIC) .get(0); ConsumerIterator<String, String> it = stream.iterator(); String message = ""; JSONObject obj = new JSONObject(); while (true) { while (it.hasNext()) { message = it.next().message(); try { obj = JSON.parseObject(message); for (WebSocket socket: WebSocket.webSocketSet ) { try { //webSocket推送消息 socket.sendMessage(obj.toString()); } catch (Exception e) { e.printStackTrace(); } } } catch (JSONException e) { continue; } } } } }, "BackThread").start(); } }
webSocket服务端service:
package com.xx.demo.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.util.concurrent.CopyOnWriteArraySet; /** * Created by csn on 2020/1/8. */ @ServerEndpoint("/webSocket") @Component public class WebSocket { // 用来记录当前连接数的变量 private static volatile int onlineCount = 0; // concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象 public static CopyOnWriteArraySet<WebSocket> webSocketSet = new CopyOnWriteArraySet<WebSocket>(); //接收人 //private String userId = ""; // 与某个客户端的连接会话,需要通过它来与客户端进行数据收发 private Session session; private static final Logger LOGGER = LoggerFactory.getLogger(WebSocket.class); @OnOpen public void onOpen(Session session) throws Exception { this.session = session; webSocketSet.add(this); LOGGER.info("当前在线人数为:" + webSocketSet.size()); } @OnClose public void onClose() { webSocketSet.remove(this); LOGGER.info("Close a websocket. "); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 * */ @OnMessage public void onMessage(String message, Session session) { LOGGER.info("Receive a message from client: " + message); } @OnError public void onError(Session session, Throwable error) { LOGGER.error("Error while websocket. ", error); } /** * 实现服务器主动推送 */ public void sendMessage(String message) throws Exception { if (this.session.isOpen()) { this.session.getBasicRemote().sendText(message); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { WebSocket.onlineCount--; } }
webSocket配置类:
package com.xx.demo.config; import org.springframework.boot.autoconfigure.condition.ConditionalOnWebApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * Created by csn on 2020/1/8. */ @Configuration @ConditionalOnWebApplication public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } @Bean public MySpringConfigurator mySpringConfigurator() { return new MySpringConfigurator(); } }
Spring配置ServerEndpointConfig类:
package com.xx.demo.config; import org.springframework.beans.BeansException; import org.springframework.beans.factory.BeanFactory; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import javax.websocket.server.ServerEndpointConfig; /** * Created by csn on 2020/1/8. */ public class MySpringConfigurator extends ServerEndpointConfig.Configurator implements ApplicationContextAware { private static volatile BeanFactory context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { MySpringConfigurator.context = applicationContext; } @Override public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException { return context.getBean(clazz); } }
简单的前端页面(webSocket客户端):
<!DOCTYPE html> <html> <head> <title>WebSocket示例</title> <meta content='width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no' name='viewport' /> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> </head> <body> <input id="text" type="text"/> <button οnclick="send()">发送消息</button> <hr/> <button οnclick="closeWebSocket()">关闭WebSocket连接</button> <hr/> <div id="message"></div> </body> <script type="text/javascript"> var websocket = null; //判断当前浏览器是否支持WebSocket if ('WebSocket' in window) { // 不带参数的写法 //websocket = new WebSocket("ws://127.0.0.1:18080/webSocket"); // 通过路径传递参数的方法(服务端采用第一种方法"@ServerEndpoint"实现) websocket = new WebSocket("ws://" + window.location.host + "/webSocket"); //websocket = new WebSocket("ws://127.0.0.1:18080/webSocket/23/Lebron"); // 通过类似GET请求方式传递参数的方法(服务端采用第二种方法"WebSocketHandler"实现) //websocket = new WebSocket("ws://127.0.0.1:18080/webSocket?id=23&name=Lebron"); } else { alert('当前浏览器 Not support websocket') } //连接发生错误的回调方法 websocket.onerror = function () { setMessageInnerHTML("WebSocket连接发生错误"); }; //连接成功建立的回调方法 websocket.onopen = function () { setMessageInnerHTML("WebSocket连接成功"); } //接收到消息的回调方法 websocket.onmessage = function (event) { setMessageInnerHTML(event.data); } //连接关闭的回调方法 websocket.onclose = function () { setMessageInnerHTML("WebSocket连接关闭"); } //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function () { closeWebSocket(); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML) { document.getElementById('message').innerHTML += innerHTML + '<br/>'; } //关闭WebSocket连接 function closeWebSocket() { websocket.close(); } //发送消息 function send() { var message = document.getElementById('text').value; websocket.send(message); } </script> </html>
END.
亲测可用...