HttpClient源码解析系列:第三篇:发送-接收的底层过程
HttpConnection衍生出来的:
另外一个我们都知道的是 HttpClientConnection:
我们先把Connection相关的列出来,暂时不考虑Factory|Pool|Context。实际上,Factory|Pool|Context 列在这里是Intellj IDEA 的工具问题,以HttpConnectionFactory为例:
HttpConnectionFactory<T extends HttpConnection>
它只是泛型中继承了HttpConnection,HttpConnectionFactory本身并不是从HttpConnection中派生出来的。
以这两个接口派生出来的接口和类来往下延伸,由于比较多,所以去掉了已经声明为废弃的(每个接口派生出来的都看一遍才能确定)。发现4.3开始,都集中推荐使用DefaultBHttpClientConnection,而它又是继承自BHttpConnectionBase。
对于DefaultBHttpClientConnection,可以看到非常核心的元素,responseParser 以及 requestWriter。 以及上一节中提到的核心的方法:
this.requestWriter = (requestWriterFactory != null ? requestWriterFactory :
DefaultHttpRequestWriterFactory.INSTANCE).create(getSessionOutputBuffer());
this.responseParser = (responseParserFactory != null ? responseParserFactory :
DefaultHttpResponseParserFactory.INSTANCE).create(getSessionInputBuffer(), constraints);
构造方法里给出了 requestWriter 和 responseParser 的构造方式,跟踪到 Factory 中可以看到
实际上两者是基于 SessionOutputBuffer 以及 SessionInputBuffer 的。
看一下核心方法:
AbstractMessageWriter中的:
@Override
public void write(final T message) throws IOException, HttpException {
Args.notNull(message, "HTTP message");
writeHeadLine(message);
for (final HeaderIterator it = message.headerIterator(); it.hasNext(); ) {
final Header header = it.nextHeader();
this.sessionBuffer.writeLine
(lineFormatter.formatHeader(this.lineBuf, header));
}
this.lineBuf.clear();
this.sessionBuffer.writeLine(this.lineBuf);
}
AbstractMessageParser中的:
@Override
public T parse() throws IOException, HttpException {
final int st = this.state;
switch (st) {
case HEAD_LINE:
try {
this.message = parseHead(this.sessionBuffer);
} catch (final ParseException px) {
throw new ProtocolException(px.getMessage(), px);
}
this.state = HEADERS;
//$FALL-THROUGH$
case HEADERS:
final Header[] headers = AbstractMessageParser.parseHeaders(
this.sessionBuffer,
this.messageConstraints.getMaxHeaderCount(),
this.messageConstraints.getMaxLineLength(),
this.lineParser,
this.headerLines);
this.message.setHeaders(headers);
final T result = this.message;
this.message = null;
this.headerLines.clear();
this.state = HEAD_LINE;
return result;
default:
throw new IllegalStateException("Inconsistent parser state");
}
}
@Override
protected HttpRequest parseHead(
final SessionInputBuffer sessionBuffer)
throws IOException, HttpException, ParseException {
this.lineBuf.clear();
final int i = sessionBuffer.readLine(this.lineBuf);
if (i == -1) {
throw new ConnectionClosedException("Client closed connection");
}
final ParserCursor cursor = new ParserCursor(0, this.lineBuf.length());
final RequestLine requestline = this.lineParser.parseRequestLine(this.lineBuf, cursor);
return this.requestFactory.newHttpRequest(requestline);
}
|
@Override
public void sendRequestHeader(final HttpRequest request)
throws HttpException, IOException {
Args.notNull(request, "HTTP request");
ensureOpen();
this.requestWriter.write(request); //Header比较简单,直接写出即可
onRequestSubmitted(request);
incrementRequestCount(); //增加一次请求数量
}
@Override
public void sendRequestEntity(final HttpEntityEnclosingRequest request)
throws HttpException, IOException {
Args.notNull(request, "HTTP request");
ensureOpen();
final HttpEntity entity = request.getEntity();
if (entity == null) {
return;
}
final OutputStream outstream = prepareOutput(request);
entity.writeTo(outstream); // Entity写入流的方式由Entity本身来控制,只要将流给Entity就好了
outstream.close();
}
@Override
public HttpResponse receiveResponseHeader() throws HttpException, IOException {
ensureOpen();
final HttpResponse response = this.responseParser.parse(); // Header直接读出
onResponseReceived(response);
if (response.getStatusLine().getStatusCode() >= HttpStatus.SC_OK) {
incrementResponseCount(); // 增加一次响应数量,当返回状态码大于 200 的时候
}
return response;
}
@Override
public void receiveResponseEntity(
final HttpResponse response) throws HttpException, IOException {
Args.notNull(response, "HTTP response");
ensureOpen();
final HttpEntity entity = prepareInput(response); // Entity需要比较复杂的处理,所以不是由responseParse直接处理的
response.setEntity(entity);
}
由于 DefaultBHttpClientConnection 是从 BHttpConnectionBase 中继承而来的,故而上面标注的核心方法多数是在 BHttpConnectionBase 中定义的,只能放到下面再讲。
|
从逻辑上将,数据扔出去一定是通过输出流写到网络IO的,中间通过Socket,那么没有找到这个过程,就不算找到了真正发出请求的部分。 这几个方法已经封装的非常简单了,要了解详情,需要从基类中找实现,也就是BHttpConnectionBase。
从上面可以看出来,实际上请求的发出和接受都是基于SessionOutputBuffer 以及 SessionInputBuffer 的,而恰好 BHttpConnectionBase 中就包含了 这两个接口的实现。同时BHttpConnectionBase 中包括了 Socket,因此基本可以肯定最终请求是这里发出的了。
protected void ensureOpen() throws IOException {
final Socket socket = this.socketHolder.get();
if (socket == null) {
throw new ConnectionClosedException("Connection is closed");
}
if (!this.inbuffer.isBound()) {
this.inbuffer.bind(getSocketInputStream(socket));
}
if (!this.outbuffer.isBound()) {
this.outbuffer.bind(getSocketOutputStream(socket));
}
}
这个方法的核心就是,将 Socket 中的 InputStream 和 OutputStream 分别绑定到 SessionOutputBuffer 以及 SessionInputBuffer。再之后这两个SessionBuffer的读写都是针对 Stream进行操作了。
这也就是上面代码中标红的部分了。
|
现在可以接着讲 DefaultBHttpClientConnection 中的提到的方法了。
先给出简单的:
protected void onResponseReceived(final HttpResponse response) {
}
protected void onRequestSubmitted(final HttpRequest request) {
}
这是两个空方法,其实啥也没做。当然这为以后扩展留下了余地。
sendRequestHeader中的:this.requestWriter.write(request);
最终落实到了 SessionOutputBufferImpl 的 write方法。其实挺简单的,也就是将 byte[] 写入到 outputStream 而已。
@Override
public void write(final byte[] b, final int off, final int len) throws IOException {
if (b == null) {
return;
}
// Do not want to buffer large-ish chunks
// if the byte array is larger then MIN_CHUNK_LIMIT
// write it directly to the output stream
if (len > this.fragementSizeHint || len > this.buffer.capacity()) {
// flush the buffer
flushBuffer();
// write directly to the out stream
streamWrite(b, off, len);
this.metrics.incrementBytesTransferred(len);
} else {
// Do not let the buffer grow unnecessarily
final int freecapacity = this.buffer.capacity() - this.buffer.length();
if (len > freecapacity) {
// flush the buffer
flushBuffer();
}
// buffer
this.buffer.append(b, off, len);
}
}
private void streamWrite(final byte[] b, final int off, final int len) throws IOException {
Asserts.notNull(outstream, "Output stream");
this.outstream.write(b, off, len);
}
sendRequestEntity 中的 entity.writeTo(outstream);
最终落实到了 Entity 的writeTo(final OutputStream outstream),也是挺简单的。
@Override
public void writeTo(final OutputStream outstream) throws IOException {
Args.notNull(outstream, "Output stream");
final InputStream instream = getContent();
try {
int l;
final byte[] tmp = new byte[OUTPUT_BUFFER_SIZE];
while ((l = instream.read(tmp)) != -1) {
outstream.write(tmp, 0, l);
}
} finally {
instream.close();
}
}
receiveResponseHeader 中的 final HttpResponse response = this.responseParser.parse();
最终落实到了SessionInputBufferImpl 中的,
public int read(final byte[] b, final int off, final int len) throws IOException
也就是从 inputSteam 中读取字节流 this.instream.read(b, off, len);。跟上面差不多的,不再贴详细代码。
receiveResponseEntity中的 final HttpEntity entity = prepareInput(response);
最终就是BHttpConnectionBase中的方法。值得注意的是,HttpResponse 是 HttpMessage 的子类。从代码里也能看出,HttpEntity中的Content实际上是一个Stream,而不是一个固定的String。
protected HttpEntity prepareInput(final HttpMessage message) throws HttpException {
final BasicHttpEntity entity = new BasicHttpEntity();
final long len = this.incomingContentStrategy.determineLength(message);
final InputStream instream = createInputStream(len, this.inbuffer);
if (len == ContentLengthStrategy.CHUNKED) {
entity.setChunked(true);
entity.setContentLength(-1);
entity.setContent(instream);
} else if (len == ContentLengthStrategy.IDENTITY) {
entity.setChunked(false);
entity.setContentLength(-1);
entity.setContent(instream);
} else {
entity.setChunked(false);
entity.setContentLength(len);
entity.setContent(instream);
}
final Header contentTypeHeader = message.getFirstHeader(HTTP.CONTENT_TYPE);
if (contentTypeHeader != null) {
entity.setContentType(contentTypeHeader);
}
final Header contentEncodingHeader = message.getFirstHeader(HTTP.CONTENT_ENCODING);
if (contentEncodingHeader != null) {
entity.setContentEncoding(contentEncodingHeader);
}
return entity;
}
|
补充一下 entity.setChunked 的含义:
有时候服务生成HTTP回应是无法确定消息大小的,比如大文件的下载,或者后台需要复杂的逻辑才能全部处理页面的请求,这时用需要实时生成消息长度,服务器一般使用chunked编码。
在进行Chunked编码传输时,在回复消息的Headers有transfer-coding域值为chunked,表示将用chunked编码传输内容。
|
至此,所有关于请求发送的底层细节都已经很清楚了。
总结一下,得到下面的继承树:
本图中,SessionOutputBuffer 以及 SessionInputBuffer 没有列出来,这两个是 MessageWriter 和 MessageParser 的核心基础,同时是在 BHttpConnectionBase中包含并初始化的,知道这个关系就好了。画到图中会极大增加图的复杂性。
同理,再回头来看这个图,也是很简单的,核心接口被 DefaultClientConnection 实现,通过 一个Writer 和 一个 Parser 来发送和和接受并转化,从而完成整个网络交互。