facebook/swift:构建thrift http server(2)--HttpServerCodec
接上一篇文章《facebook/swift:构建thrift http server(1)》
Netty http server
在为facelog选择XHR实现方案时,我反复看过facebook/swift,了解到它依赖的底层通讯框架是netty,说实话,我之前对netty并不太了解,藉于这次任务需要,我才花时间进一步了解了一下netty是什么。
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。
@百度百科
我好像明白facebook/swift为什么要基于netty设计了,netty本就是一个高效的异步通讯框架,你可以利用它实现你想要的任何应用协议(当然也包括thrift的通讯协议)。
更进一步了解,才知道netty就已经实现了HTTP协议的编解码:
参见 org.jboss.netty.handler.codec.http.HttpServerCodec类(facebook/swift是基于netty 3.7,所以这里的链接是3.7版本的源码)
而thrift本身是支持JSON格式的数据协议的,参见org.apache.thrift.protocol.TJSONProtocol,对于浏览器前端,JSON协议至关重要。
于是一个基本的想法在我的脑中浮现:
利用netty内置的HTTP协议编解码器(
HttpServerCodec
)来响应浏览器的XMLHttpRequest (XHR)请求,再将解码出JSON数据通过TJSONProtocol
反序列化成java数据对象就可以传递给thrift服务了,反方向HTTP Respone的也是差不多的反向流程。
哈,似乎很简单,调整一下Netty的协议栈就好了,不需要写很多代码。
简单说,就是实现一个基于netty的thrift http server,而这其中关键的几个部件都是现成的,似乎不需要重新开发新部件。
基于我过往的开发经验,虽然我知道这样想很天真,但这个方案对于我来说,就是完全满足我的要求的方案:不需要新的依赖库,不需要servlet容器。
我之前对netty并不太了解,小白一个,在这个方案中,我最大的成本是熟悉netty框架,熟悉它才知道怎么修改协议栈。(如果你也不了解netty建议你在网上找找这方面的文章,很多,本文不负责netty的入门)
推荐文章: 《Netty源码解读(三)Channel与Pipeline》
NOTE:
阅读下面的技术分析时,建议你最好用你熟悉的IDE(eclipse,IDEA)打开这几个相关的开源框架的源码以便对照查看源码
facebookarchive/nifty,
facebookarchive/swift,
apache/thrift(java)
NettyServerTransport
下面是com.facebook.nifty.core.NettyServerTransport的构造方法及服务启动方法(start),可以理解为facebook/swift用于启动thrift Server的前的准备工作
/* 构造方法 */
public NettyServerTransport(
final ThriftServerDef def,
final NettyServerConfig nettyServerConfig,
final ChannelGroup allChannels)
{
this.def = def;
this.nettyServerConfig = nettyServerConfig;
this.port = def.getServerPort();
this.allChannels = allChannels;
// connectionLimiter must be instantiated exactly once (and thus outside the pipeline factory)
final ConnectionLimiter connectionLimiter = new ConnectionLimiter(def.getMaxConnections());
this.channelStatistics = new ChannelStatistics(allChannels);
this.pipelineFactory = new ChannelPipelineFactory()
{
@Override
public ChannelPipeline getPipeline()
throws Exception
{
ChannelPipeline cp = Channels.pipeline();
TProtocolFactory inputProtocolFactory = def.getDuplexProtocolFactory().getInputProtocolFactory();
NiftySecurityHandlers securityHandlers = def.getSecurityFactory().getSecurityHandlers(def, nettyServerConfig);
cp.addLast("connectionContext", new ConnectionContextHandler());
cp.addLast("connectionLimiter", connectionLimiter);
cp.addLast(ChannelStatistics.NAME, channelStatistics);
cp.addLast("encryptionHandler", securityHandlers.getEncryptionHandler());
cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(),
inputProtocolFactory));
if (def.getClientIdleTimeout() != null) {
// Add handlers to detect idle client connections and disconnect them
cp.addLast("idleTimeoutHandler", new IdleStateHandler(nettyServerConfig.getTimer(),
def.getClientIdleTimeout().toMillis(),
NO_WRITER_IDLE_TIMEOUT,
NO_ALL_IDLE_TIMEOUT,
TimeUnit.MILLISECONDS));
cp.addLast("idleDisconnectHandler", new IdleDisconnectHandler());
}
cp.addLast("authHandler", securityHandlers.getAuthenticationHandler());
cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer()));
cp.addLast("exceptionLogger", new NiftyExceptionLogger());
return cp;
}
};
}
/* 启动netty网络服务 */
public void start(ServerChannelFactory serverChannelFactory)
{
if (!(InternalLoggerFactory.getDefaultFactory() instanceof Slf4JLoggerFactory)) {
log.warn("Nifty always logs to slf4j, but netty is currently configured to use a " +
"different logging implementation. To correct this call " +
"InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory()) " +
"during your server's startup");
}
bootstrap = new ServerBootstrap(serverChannelFactory);
bootstrap.setOptions(nettyServerConfig.getBootstrapOptions());
bootstrap.setPipelineFactory(pipelineFactory);
serverChannel = bootstrap.bind(new InetSocketAddress(port));
SocketAddress actualSocket = serverChannel.getLocalAddress();
if (actualSocket instanceof InetSocketAddress) {
int actualPort = ((InetSocketAddress) actualSocket).getPort();
log.info("started transport {}:{} (:{})", def.getName(), actualPort, port);
}
else {
log.info("started transport {}:{}", def.getName(), port);
}
}
从上面的代码中可以看到,在启动thrift server前,向协议管道(ChannelPipeline)添加了很多个ChannelHandler
.这其中我们最关注的只有两个分别名为frameCodec
和dispatcher
的ChannelHandler
即下面这两行
cp.addLast("frameCodec", def.getThriftFrameCodecFactory().create(def.getMaxFrameSize(),inputProtocolFactory));
cp.addLast("dispatcher", new NiftyDispatcher(def, nettyServerConfig.getTimer()));
frameCodec
是双向编码器,负责将client端通讯数据的帧数据编解码,而dispatcher
则是将frameCodec
解码的数据传递给封装为ThriftServiceProcessor
的thrift服务实例。只要dispatcher
能正确收到client的请求数据,就成功了一半了。
FrameDecoder
FrameDecoder是netty定义的一个帧数据解码器抽象类。主要的作用就是负责从数据通道(ChannelBuffer,such as TCP/IP)接收数据并解码成指定的格式。具体是什么格式,这要看子类如何实现抽象方法decode了。
DefaultThriftFrameCodec
def.getThriftFrameCodecFactory()
默认提供的是实现二进制传输的com.facebook.nifty.codec.DefaultThriftFrameCodec帧编解码器实例,负责最外层的数据编码解码工作(为什么默认是DefaultThriftFrameCodec
,说来话长,不如你自己看源码)DefaultThriftFrameCodec
通过DefaultThriftFrameDecoder
实现帧数据解码,用DefaultThriftFrameEncoder
实现数据编码,
DefaultThriftFrameDecoder就是FrameDecoder
的一个子类
package com.facebook.nifty.codec;
import org.apache.thrift.protocol.TProtocolFactory;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
public class DefaultThriftFrameCodec implements ThriftFrameCodec
{
private final ThriftFrameDecoder decoder;
private final ThriftFrameEncoder encoder;
public DefaultThriftFrameCodec(int maxFrameSize, TProtocolFactory inputProtocolFactory)
{
this.decoder = new DefaultThriftFrameDecoder(maxFrameSize, inputProtocolFactory);
this.encoder = new DefaultThriftFrameEncoder(maxFrameSize);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception
{
encoder.handleDownstream(ctx, e);
}
@Override
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception
{
decoder.handleUpstream(ctx, e);
}
}
HttpServerCodec
根据之前看Netty相关的教程,我知道netty已经实现了HTTP的编解码,前面说过了,就是org.jboss.netty.handler.codec.http.HttpServerCodec
HttpServerCodec
通过HttpRequestDecoder
实例HTTP请求的解码,通过HttpResponseEncoder
实现将返回数据编码为HTTP Response
package org.jboss.netty.handler.codec.http;
import org.jboss.netty.channel.ChannelDownstreamHandler;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelUpstreamHandler;
/**
* A combination of {@link HttpRequestDecoder} and {@link HttpResponseEncoder}
* which enables easier server side HTTP implementation.
* @see HttpClientCodec
*
* @apiviz.has org.jboss.netty.handler.codec.http.HttpRequestDecoder
* @apiviz.has org.jboss.netty.handler.codec.http.HttpResponseEncoder
*/
public class HttpServerCodec implements ChannelUpstreamHandler,
ChannelDownstreamHandler {
private final HttpRequestDecoder decoder;
private final HttpResponseEncoder encoder = new HttpResponseEncoder();
/**
* Creates a new instance with the default decoder options
* ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
* {@code maxChunkSize (8192)}).
*/
public HttpServerCodec() {
this(4096, 8192, 8192);
}
/**
* Creates a new instance with the specified decoder options.
*/
public HttpServerCodec(
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
decoder = new HttpRequestDecoder(maxInitialLineLength, maxHeaderSize, maxChunkSize);
}
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
decoder.handleUpstream(ctx, e);
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
encoder.handleDownstream(ctx, e);
}
}
小结
通过上面的分析,可以得出结论:如果要thrift 服务实现对HTTP请求的响应,显然不能使用实现thrift二进制协议的DefaultThriftFrameCodec
,而应该实现HTTP协议的编解码。
而netty有提供HTTP协议的编解码器HttpServerCodec
,看来只要把NettyServerTransport
中默认的帧编解码器(frameCodec
)从DefaultThriftFrameCodec
换成HttpServerCodec
就可以响应浏览器的XHR请求了。
HttpServerCodec替换DefaultThriftFrameCodec
facebook/swift/swift-service的com.facebook.swift.service.ThriftServer类的作用是将thrift服务实例(封装为NiftyProcessor
接口实例,thrift实现类为ThriftServiceProcessor
)进一步封装为一个可以启动的Netty server。
如果希望在ThriftServer
启动时用HttpServerCodec
替换DefaultThriftFrameCodec
需要做如下的工作,
首先要创建一个类ThriftHttpCodec
,将HttpServerCodec
封装为ThriftFrameCodec
,代码如下:
/**
* {@link HttpClientCodec}本身已经实现了{@link ThriftFrameCodec}接口的所有方法,
* 只是因为{@link ThriftFrameCodec}是nifty定义的接口,所以{@link HttpClientCodec}不是{@link ThriftFrameCodec}实例,
* {@link ThriftHttpCodec}的作用就是将{@link HttpClientCodec}封装成{@link ThriftFrameCodec}实例
*
* @author guyadong
*
*/
public class ThriftHttpCodec extends HttpServerCodec implements ThriftFrameCodec {
}
再创建一个类ThriftHttpCodecFactory
实现ThriftFrameCodecFactory
接口
public class ThriftHttpCodecFactory implements ThriftFrameCodecFactory {
@Override
public ChannelHandler create(int maxFrameSize, TProtocolFactory defaultProtocolFactory) {
return new ThriftHttpCodec();
}
}
有了ThriftHttpCodec
和ThriftHttpCodecFactory
类就可以在ThriftServer
启动时用ThriftHttpCodec
替换DefaultThriftFrameCodec
。
示例如下:
/**
* 在{@link ThriftServer#DEFAULT_PROTOCOL_FACTORIES}基础上增加'json'支持
*/
ImmutableMap<String,TDuplexProtocolFactory> DEFAULT_PROTOCOL_FACTORIES =
ImmutableMap.<String, TDuplexProtocolFactory>builder()
.putAll(ThriftServer.DEFAULT_PROTOCOL_FACTORIES)
.put("json", TDuplexProtocolFactory.fromSingleFactory(new TJSONProtocol.Factory()))
.build();
/**
* 在{@link ThriftServer#DEFAULT_FRAME_CODEC_FACTORIES}基础上增加'http'支持
*/
ImmutableMap<String,ThriftFrameCodecFactory> DEFAULT_FRAME_CODEC_FACTORIES =
ImmutableMap.<String, ThriftFrameCodecFactory>builder()
.putAll(ThriftServer.DEFAULT_FRAME_CODEC_FACTORIES)
.put("http", (ThriftFrameCodecFactory) new ThriftHttpCodecFactory())
.build();
// 创建ThriftServerConfig实例
ThriftServerConfig thriftServerConfig = new ThriftServerConfig()
.setPort(26412) /* 指定服务端口号*/
.setTransportName("http") /* 指定transport的名字 */
.setProtocolName("json"); /* 指定协议的名字 */
this.processor = new ThriftServiceProcessorCustom(
new ThriftCodecManager(),
checkNotNull(eventHandlers,"eventHandlers is null"),
services);
this.thriftServer = new ThriftServer(processor,
thriftServerConfig,
new NiftyTimer("thrift"),
DEFAULT_FRAME_CODEC_FACTORIES, DEFAULT_PROTOCOL_FACTORIES,
ThriftServer.DEFAULT_WORKER_EXECUTORS,
ThriftServer.DEFAULT_SECURITY_FACTORY);
this.thriftServer.start();
上面这样处理后,ThriftServer
已经是一个支持HTTP响应的thrift http server了。
但这样就结束了么?启动的ThriftServer
就可以正常响应http request了么?NO,NO后面还有坑。。。
写了好长,休息一下,下节再说。
《facebook/swift:构建thrift http server(3)–CORS跨域》
《facebook/swift:构建thrift http server(4)–ThriftXHRDecoder,ThriftXHREncoder》