Netty多语言(Java、Android 、C#、WebSocket)通信实例Demo (二)Java端简介【附源码】

转载请注明出处,原文地址:https://blog.csdn.net/lucherr/article/details/88378235

Netty多语言(Java、Android 、C#、WebSocket)通信实例Demo (一)概述 【附源码】
Netty多语言(Java、Android 、C#、WebSocket)通信实例Demo (二)Java端简介【附源码】
Netty多语言(Java、Android 、C#、WebSocket)通信实例Demo (三)Android端简介【附源码】
Netty多语言(Java、Android 、C#、WebSocket)通信实例Demo (四)C#端简介【附源码】
 

该项目源码中,包含了Java服务端、Java客户端、WebSocket示例源码,对应的源码目录为NettyServer:

Netty多语言(Java、Android 、C#、WebSocket)通信实例Demo (二)Java端简介【附源码】

 源码结构如下图:

Netty多语言(Java、Android 、C#、WebSocket)通信实例Demo (二)Java端简介【附源码】

 服务端:

        Netty的强大之处在于他的API使用非常简单、功能强大、扩展性很强,对于简单的一个Demo来说,几句代码就可以搞定,在本Demo中,由于同时支持了TCP和WebSocket协议,而且使用了自定义的编解码器,TCP使用MessagePack,WebSocket使用了Json,传递的事件对象也是加入了类的继承关系以及枚举,所以增加了Demo的复杂度,所以这个Demo并不是单纯的想要展示这几个端是怎么通信的,而是更加符合实际开发而加入了一些相对复杂的设定。

        使用Netty开发我们主要关心的就是编解码类、消息处理类的实现,对于编解码Netty提供了一些简单的编解码器,也可以很方便的定制自己的编解码器,对于TCP粘包和拆包问题解决也是So easy,简单的说明一下我对编解码的理解,首先说说概念:编码可以理解为序列化:是指将对象转换为字节数组的过程。 
解码可以理解为反序列化:是指将字节数组还原成原始对象的过程。

为什么需要编码?需要将变量或对象从内存中取出来进行存储或传输。

        我觉得可以将Netty中的编解码用大家都熟悉的快递业务来对比说明,这样比较好理解,因为本例中使用了2种协议,所以举例的时候会将协议一起说明。

收发快递例子说明Netty编解码:
Netty就像是快递柜,你可以通过它收发快递,它支持N家快递公司,它的服务很专业
协议就像是这里的快递公司,用不同的快递公司将会用不同的包装将你的包裹包装起来
编码就像是要邮寄物品的时候,把物品打包成包裹的过程
解码就像是领取包裹后,拆开包裹的过程
        来一个案例:现在你是一个小服装厂的老板(刚开始干),有个客户在网上购买了一件衣服需要试穿,你高兴的不得了,现在你要把衣服邮寄给她,你家楼下有个快递柜(也就是Netty,特别省事特别方便),客户那边说了只方便收顺丰的快递(也就是指定了协议,ps:顺丰没有给我广告费),于是你在邮寄的时候选择了顺丰,当然衣服你不能直接就塞快递柜里了,你还得给它细心的包装起来,你可以随便找个袋或者纸壳箱子把衣服塞进去用透明胶带给粘上,也可能你有专业人士给你设计的高大上包装盒,里面还放了一张五星好评返红包的卡片,再加个小礼物(把衣服用包装盒抱起来,里面加入小卡片和小礼物的这个过程就是编码本码没错了,好比Demo中的MessagePack打包方式),用包装盒打包完成包裹后,放进快递柜,顺丰的工作人员拿出包裹后,贴上了顺丰的快递单(相当于Demo中的TCP协议),然后把这个快递发出去了(你的数据包发出去了),第二天,客户收到快递包裹了,上面贴着顺丰的快递单,然后她开始拆包裹(也就是解码过程了,之前你怎么给它包起来的,她就怎么拆开,Demo中就是收到消息也得用MessagePack解包),然后取出了衣服,她看到这个包装盒真的很漂亮还看到了小礼物,觉得你太贴心了,感动的眼泪都差点出来了,于是赶紧把衣服拿回家试穿(这里就相当于是Handler做的事了),试完后发现稍微有点小,然后她决定换一件大点的,然后又把衣服打包发给你,依然选了TCP协议,进行编码,再发给你,你又解码,看看衣服有没有损坏......编不下去了哈哈,表达能力有待提升,就这个小例子都编了半天,相信你的理解能力。

        下面来看看编码类,使用了MessagePack进行打包,也就是你把衣服装到包装盒里的过程,你得把你要发的消息打包起来,这样才能给快递公司寄出去,MessagePack的API可以查看官网:https://msgpack.org/

public class MessagePackEncoder extends MessageToByteEncoder<Object> {

	@Override
	protected void encode(ChannelHandlerContext ctx, Object obj, ByteBuf buf) throws Exception {
		MessagePack msgPack = new MessagePack();
		//序列化操作
		byte[] bytes = msgPack.write(obj);
		//netty操作,将对象序列化数组传入ByteBuf
		buf.writeBytes(bytes);
	}
}

         解码类,与编码对应,也使用MessagePack进行解包,就是收到包裹后,拆开包裹取出衣服的过程

public class MessagePackDecoder extends MessageToMessageDecoder<ByteBuf> {
	
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> objs) throws Exception {
		final byte[] bytes;
		final int length = buf.readableBytes();
		bytes = new byte[length];
		// 从数据包buf中获取要操作的byte数组
		buf.getBytes(buf.readerIndex(), bytes, 0, length);
		// 将bytes反序列化成对象,并添加到解码列表中
		MessagePack msgpack = new MessagePack();
		
		objs.add(msgpack.read(bytes));

	}
}

         Demo中比较麻烦的一个问题是同时支持TCP和WebSocket协议,做之前也调研过,最简单的办法就是监听2个端口,分别做不同的编解码,但是我还是想要在一个端口同时实现TCP和WebSocket协议的支持,后来也是借鉴了有个网友的方法,根据协议动态修改编解码器,判断的方法是获取协议签名的几个字符进行判断,如果是WebSocket的协议会有一个固定的协议信息,根据这个信息来判断是WebSocket还是TCP,由于目前就支持这两种协议,所以可以按照目前ProtocolDecoder中的判断方式,具体办法就是先将TCP和WebSocket使用的编解码器都加入,然后在ProtocolDecoder中进行判断,具体实现逻辑请查看Demo中的代码

public class NettyServer extends Thread {
......
@Override
protected void initChannel(Channel ch) throws Exception {
	ChannelPipeline pipeline = ch.pipeline();
	// 协议解码处理器,判断是什么协议(WebSocket还是TcpSocket),然后动态修改编解码器
	pipeline.addLast("protocolHandler", new ProtocolDecoder());

	/** TcpSocket协议需要使用的编解码器 */
	// Tcp粘包处理,添加一个LengthFieldBasedFrameDecoder解码器,它会在解码时按照消息头的长度来进行解码。
	pipeline.addLast("tcpFrameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
	// MessagePack解码器,消息进来后先由frameDecoder处理,再给msgPackDecoder处理
	pipeline.addLast("tcpMsgPackDecoder", new MessagePackDecoder());
	// Tcp粘包处理,添加一个
	// LengthFieldPrepender编码器,它会在ByteBuf之前增加4个字节的字段,用于记录消息长度。
	pipeline.addLast("tcpFrameEncoder", new LengthFieldPrepender(4));
	// MessagePack编码器,消息发出之前先由frameEncoder处理,再给msgPackEncoder处理
	pipeline.addLast("tcpMsgPackEncoder", new MessagePackEncoder());

	/** WebSocket协议需要使用的编解码器 */
	// websocket协议本身是基于http协议的,所以这边也要使用http解编码器
	pipeline.addLast("httpCodec", new HttpServerCodec());
	// netty是基于分段请求的,HttpObjectAggregator的作用是将请求分段再聚合,参数是聚合字节的最大长度
	pipeline.addLast("httpAggregator", new HttpObjectAggregator(65536));
	// 用于向客户端发送Html5文件,主要用于支持浏览器和服务端进行WebSocket通信
	pipeline.addLast("httpChunked", new ChunkedWriteHandler());

	// 管道消息处理
	pipeline.addLast("channelHandler", new ServerChannelHandler());
}
......
public class ProtocolDecoder extends ByteToMessageDecoder {

	/**
	 * 请求行信息的长度,ws为:GET /ws HTTP/1.1, Http为:GET / HTTP/1.1
	 */
	private static final int PROTOCOL_LENGTH = 16;
	/**
	 * WebSocket握手协议的前缀, 本例限定为:GET /ws ,在访问ws的时候,请求地址需要为如下格式 ws://ip:port/ws
	 */
	private static final String WEBSOCKET_PREFIX = "GET /ws";

	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		String protocol = getBufStart(in);
//		System.out.println("ProtocolHandler protocol:" + protocol);
		if (protocol.startsWith(WEBSOCKET_PREFIX)) {// WebSocket协议处理,移除TcpSocket相关编解码器
			ctx.pipeline().remove("tcpFrameDecoder");
			ctx.pipeline().remove("tcpMsgPackDecoder");
			ctx.pipeline().remove("tcpFrameEncoder");
			ctx.pipeline().remove("tcpMsgPackEncoder");
			// 将对应的管道标记为ws协议
			ChannelWraper channelWraper = NettyServer.CLIENTS.get(ctx.channel().id().asLongText());
			if (channelWraper != null) {
				channelWraper.setProtocol(ChannelWraper.PROTOCOL_WS);
			}
		} else {// TcpSocket协议处理,移除WebSocket相关编解码器
			ctx.pipeline().remove("httpCodec");
			ctx.pipeline().remove("httpAggregator");
			ctx.pipeline().remove("httpChunked");
			// 将对应的管道标记为tcp协议
			ChannelWraper channelWraper = NettyServer.CLIENTS.get(ctx.channel().id().asLongText());
			if (channelWraper != null) {
				channelWraper.setProtocol(ChannelWraper.PROTOCOL_TCP);
			}
		}
		// 重置index标记位
		in.resetReaderIndex();
		// 移除该协议处理器,该channel后续的处理由对应协议安排好的编解码器处理
		ctx.pipeline().remove(this.getClass());
	}

	/**
	 * 获取buffer中指定长度的信息
	 * 
	 * @param in
	 * @return
	 */
	private String getBufStart(ByteBuf in) {
		int length = in.readableBytes();
		if (length > PROTOCOL_LENGTH) {
			length = PROTOCOL_LENGTH;
		}
		// 标记读取位置
		in.markReaderIndex();
		byte[] content = new byte[length];
		in.readBytes(content);
		return new String(content);
	}
}

         还值得一提的就是Handler的处理,由于支持2中协议,TCP协议采用了MessagePack编解码,WebSocket使用JSON,所以对接受到的消息做了object类型的判断,根据不同类型使用不同的方法处理:

public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
......
// 接收到消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, final Object object) throws Exception {
	// 管道读取到消息,先判断消息对象是什么类型,然后做不同处理
	try {
		if (object instanceof ArrayValue) {// 这是经过MessagePack解码完成后的对象
			handleValue(ctx, (ArrayValue) object);
		} else if (object instanceof FullHttpRequest) {// HTTP请求对象
			handleHttpRequest(ctx, (FullHttpRequest) object);
		} else if (object instanceof WebSocketFrame) {// WebSocket消息对象
			handleWebSocketFrame(ctx, (WebSocketFrame) object);
		}
	} catch (Exception e) {
		e.printStackTrace();
	}
}
......

        这里具体就不讲那么细了,要不然篇幅会很长,代码里有详细注释,大家稍微看看就明白。经过上面的处理,最后封装了一个方法来处理不同协议的处理逻辑,这个方法的目的就是通过2种协议传来的数据还原成对象后,交给统一的方法处理逻辑

public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
......
/**
 * 统一处理Event,为了让TCP和Websocket的处理逻辑统一,封装了该方法,两种协议的对象采用不同序列化方案
 * 
 * @param eventType
 * @param value
 * @param text
 * @throws IOException
 */
private void handleEvent(EventType eventType, Value value, String text) throws IOException {
	switch (eventType) {// 对于测试类事件,转发给所有终端
	case TEST_EVENT:
		TestEvent testEvent = null;
		if (value != null) {// 如果是tcp协议,采用messagepack序列化
			testEvent = MessageConverter.converter(value, TestEvent.class);
		} else {// ws使用json序列化
			testEvent = JSON.parseObject(text, TestEvent.class);
		}
		System.out.println("收到新消息:" + testEvent);
		// 将该消息转发给所有终端
		sendEventToAll(testEvent);

		break;
	case OTHER_EVENT:// 其他事件,暂未处理
		System.out.println("未处理,主要用于测试判断不同类型事件");
		break;

	default:
		break;
	}
}
......

客户端:

        讲完服务端后,感觉客户端已经没什么好说的,需要注意的就是使用的编解码器与服务器端对应,就是你发的那个衣服的快递,怎么包装的,买家那边也会怎么拆开包装,你包了两层,她就需要拆两层

public class NettyClient extends Thread {
......
@Override
protected void initChannel(SocketChannel ch) throws Exception {
	ChannelPipeline pipeline = ch.pipeline();
	// Tcp粘包处理,添加一个LengthFieldBasedFrameDecoder解码器,它会在解码时按照消息头的长度来进行解码。
	pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 4, 0, 4));
	// MessagePack解码器,消息进来后先由frameDecoder处理,再给msgPackDecoder处理
	pipeline.addLast("msgPackDecoder", new MessagePackDecoder());
	// Tcp粘包处理,添加一个
	// LengthFieldPrepender编码器,它会在ByteBuf之前增加4个字节的字段,用于记录消息长度。
	pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
	// MessagePack编码器,消息发出之前先由frameEncoder处理,再给msgPackEncoder处理
	pipeline.addLast("msgPackEncoder", new MessagePackEncoder());
	// 消息处理handler
	pipeline.addLast("handler", new NettyClientHandler());
}
......

         当然,你的IP和端口号需要与服务端对应:

public class RunClient {

	// Server端IP地址,根据实际情况进行修改
	static final String HOST = System.getProperty("host", "127.0.0.1");
	// Netty服务端监听端口号
	static final int PORT = Integer.parseInt(System.getProperty("port", "8888"));
......

WebSocket端:

        代码没什么难度,就一个html文件,直接用支持WebSocket的浏览器打开即可,里面也加入了注释,直接在之前基于WebSocket实现的Android和H5聊天通讯实例【附效果图附所有源码】这篇文章Demo的基础上修改的,代码就不粘这里占空间了,需要注意的仍然是IP和端口号要与服务端对应:

//参数就是与服务器连接的地址
socket = new WebSocket("ws://127.0.0.1:8888/ws");

//客户端收到服务器消息的时候就会执行这个回调方法
socket.onmessage = function(event) {
	console.log("onmessage:"+event.data);
	var ta = document.getElementById("responseText");
	//解析json
	var testEvent = JSON.parse(event.data);
	//将内容加入到文本框中
	ta.value = "【" +testEvent.time +","+testEvent.content +"】\n"+ta.value;
}

源码地址:

CSDN下载 积分不受我的控制

Github地址

PS:如果发现代码中有写的不对、有更好的实现方法或者文章中有误的地方,还望各位指出,我及时修改