tomcat运行
上一篇文章说到了Tomcat的启动,但是Tomcat作为一个Server,就必须要接收请求,这次分享就着重说一下这部分。
说到Tomcat的接收请求,就要想到上节课提到的Connector,提到Connector,就要想到默认的NIO方式。
OK,接下来就开始正题,我们先来看一张图
这张图描述了一个请求和响应的过程,咱们这篇文章的重点就在过程3,4,5。
对于Tomcat,这个过程可以体现在接下来的一张图:
下面来具体解释
接收请求
NIO
在理解Tomcat接收请求之前,大家需要对NIO的知识大概的了解,起码要知道NIO的核心思想,NIO的核心模块(Channel, Selector, Buffer),这样理解Tomcat的接受请求部分,就会很容易上手。
关于性能的问题,在网上找资料的时候,看到了一个小统计,首先来理解下IO对程序性能的影响:
从上面的表中可以得出的结论是:处理时间与IO时间对吞吐率的影响:把单位处理时间减半,仅能提高吞吐率2.44%。而仅仅缩短I/O延迟10%,就可使吞吐率增加10.53%;把I/O时间减半,吞吐率几乎翻番。可见在操作系统层面,IO的性能是多么的重要。
那么IO是什么,在操作系统层面代表着什么,看下图:
好了,正式开始Tomcat的NIO部分
接收请求
还记得上次分享提到的NioEndpoint的startInternal方法吧,这个方法是在Tomcat启动时运行的,因为和我们的接收请求关系非常密切,所以再次重复提到。
/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {
if (!running) {
running = true;
paused = false;
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getProcessorCache());
eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getEventCache());
nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE,
socketProperties.getBufferPool());
// 创建搬运工集合
if ( getExecutor() == null ) {
createExecutor();
}
initializeConnectionLatch();
// 开启轮训者线程
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}
// 开启接收者线程
startAcceptorThreads();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
在这里让我看看Tomcat都启动了什么
Acceptor(请求接收者)
默认只有一个线程、用于接收请求,并将请求信息封装为PollEvent对象放入PollerEvent待处理队列中,用的是生产者消费者模式,设计模式以后会专门列举,这里只是提一下
- 如果端点处于暂停状态,50s探测一次
- 连接数+1,并判断是否超最大连接数(LimitLantch实现),如果超了阻塞等待(Tomcat默认连接数是10000)
- 通过ServerSocketChannel.accept()等待新的请求
- 将SocketChannel请求信息封装为NioChannel(NioChannel从缓存中读取,没有就新生成)
- 将NioChannel封装为PollerEvent(PollerEvent从缓存中读取,没有就新生成)
/**
* Process the specified connection.
* @param socket The socket channel
* @return <code>true</code> if the socket was correctly configured
* and processing may continue, <code>false</code> if the socket needs to be
* close immediately
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
socketProperties.setProperties(sock);
NioChannel channel = nioChannels.pop();
if (channel == null) {
SocketBufferHandler bufhandler = new SocketBufferHandler(
socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());
if (isSSLEnabled()) {
channel = new SecureNioChannel(socket, bufhandler, selectorPool, this);
} else {
channel = new NioChannel(socket, bufhandler);
}
} else {
channel.setIOChannel(socket);
channel.reset();
}
// 这里的注册方法,就是向poller中添加事件
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(tt);
}
// Tell to close the socket
return false;
}
return true;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
Poller(轮训者)
使用NIO进行Socket的读写,在Poller的register方法中,通过NioSocketWrapper对NioChannel的封装,最后将NioSocketWrapper封装在PollerEvent中,最后加到PollerEvent队列,为了保证在多线程同时访问时数据的一致性,这个队列是一个SynchronizedQueue,使用synchronized来保证对队列中数据的一致性。
/**
* Registers a newly created socket with the poller.
*
* @param socket The newly created socket
*/
public void register(final NioChannel socket) {
socket.setPoller(this);
NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this);
socket.setSocketWrapper(ka);
ka.setPoller(this);
ka.setReadTimeout(getSocketProperties().getSoTimeout());
ka.setWriteTimeout(getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
ka.setReadTimeout(getSoTimeout());
ka.setWriteTimeout(getSoTimeout());
// eventCache是PollerEvent的缓存栈
PollerEvent r = eventCache.pop();
// 因为不知道是否为新的Channel,默认将感兴趣的事件设置为read,在下面会有是否为new的判断
ka.interestOps(SelectionKey.OP_READ);
// 如果是新的PollerEvent,在run之后就通过识别OP_REGISTER属性,就会把接收到的channel注册到selector中
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 加入队列
addEvent(r);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
eventCache是PollerEvent事件的缓存,在Poller上注册的时候,从eventCache中取出PollerEvent对象,重置这个对象,然后再放入Poller的事件队列中。Poller在处理队列的过程中,每从队列中取出一个要处理的PollerEvent事件,处理完之后,把这个PollerEvent对象放回缓存中。 —- 避免频繁地创建PollerEvent对象和GC回收。
上面说了加入队列,下面来说Poller怎样读取队列
默认两个线程,扫描PollEvent队列,进行处理
/**
* Poller thread count.
*/
private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());
- 1
- 2
- 3
- 4
下面来看Poller的run方法
/**
* The background thread that adds sockets to the Poller, checks the
* poller for triggered events and hands the associated socket off to an
* appropriate processor as events occur.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
boolean hasEvents = false;
try {
if (!close) {
hasEvents = events();
if (wakeupCounter.getAndSet(-1) > 0) {
// 到这一步,说明我们有请求需要处理了
// 开启非阻塞select
keyCount = selector.selectNow();
} else {
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
......
}
} catch (Throwable x) {
......
}
......
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}//while
//process timeouts
timeout(keyCount,hasEvents);
}//while
stopLatch.countDown();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
针对这段代码,有几点进行着重分享
1、获取事件
从PollerEvent队列中拿到队列的头节点,如果有事件,就把事件缓存下来
/**
* Processes events in the event queue of the Poller.
*
* @return <code>true</code> if some events were processed,
* <code>false</code> if queue was empty
*/
public boolean events() {
boolean result = false;
PollerEvent pe = null;
while ( (pe = events.poll()) != null ) {
result = true;
try {
pe.run();
pe.reset();
if (running && !paused) {
eventCache.push(pe);
}
} catch ( Throwable x ) {
log.error("",x);
}
}
return result;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
在Event缓存之前,大家知道,NIO中channel必须注册到selector上,才能从selector中获得key,那么注册这部分就在pe.run()方法中,如下代码所示
@Override
public void run() {
if (interestOps == OP_REGISTER) {
try {
// 向selector中注册channel
socket.getIOChannel().register(
socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);
} catch (Exception x) {
log.error(sm.getString("endpoint.nio.registerFail"), x);
}
} else {
......
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
如果有事件,selector就及时获取,如果没有就阻塞1s,大家在这里注意两个变量keyCount和wakeupCounter
private AtomicLong wakeupCounter = new AtomicLong(0);
private volatile int keyCount = 0;
- 1
- 2
- 3
wakeupCounter的作用:
- 告诉Poller当前有多少个新连接,这样当Poller进行selector的操作时,可以选择是否需要阻塞来等待读写请求到达。
- 标识Poller在进行select选择时,是否有连接到达。如果有,就让当前的阻塞调用立即返回
keyCount的作用:
- 注册到Poller的channel中,I/O状态已经OK的的个数
2、获取NioSocketWrapper
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
iterator.remove();
processKey(sk, attachment);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
3、处理SelectionKey
protected void processKey(SelectionKey sk, NioSocketWrapper attachment) {
try {
if ( close ) {
cancelledKey(sk);
} else if ( sk.isValid() && attachment != null ) {
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// 读数据
if (sk.isReadable()) {
if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) {
closeSocket = true;
}
}
// 写数据
if (!closeSocket && sk.isWritable()) {
if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
cancelledKey(sk);
}
}
}
} else {
//invalid key
cancelledKey(sk);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
虽然这段代码需要看的内容不多,但是这是Tomcat读和写的脉络,下面我们再看processSocket()方法,这个方法起到以下作用:
- 将NioSockectWrapper对象封装成SocketProcessor(从缓存中读取,没有就新生成)
- 线程池启动SocketProcessor线程处理
/**
* Process the given SocketWrapper with the given status. Used to trigger
* processing as if the Poller (for those endpoints that have one)
* selected the socket.
*
* @param socketWrapper The socket wrapper to process
* @param event The socket event to be processed
* @param dispatch Should the processing be performed on a new
* container thread
*
* @return if processing was triggered successfully
*/
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
Executor(搬运工)
上面提到线程池,上面提到的线程池就是executor,看一下在NioEndpoint中创建的方法
public void createExecutor() {
internalExecutor = true;
TaskQueue taskqueue = new TaskQueue();
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
好了现在开始正式的read部分,大家打起精神来了,好戏才刚刚开始
注意上面代码的rc.run(),这是SocketProcessorBase的run方法,方法中又调用了doRun()方法
ConnectionHandler(连接处理者)
从实现类来看,应该对应到我们今天分析的NioEndPoint中的SocketProcessor in NioEndpoint。SocketProcessor线程调用ConnectionHandler进行处理,ConnectionHandler获取一个Processor进行处理(Processor也存在一个对象堆缓存,Stack实现)
@Override
protected void doRun() {
NioChannel socket = socketWrapper.getSocket();
SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
int handshake = -1;
......
if (handshake == 0) {
SocketState state = SocketState.OPEN;
// 从socket中处理请求
if (event == null) {
state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ);
} else {
state = getHandler().process(socketWrapper, event);
}
if (state == SocketState.CLOSED) {
close(socket, key);
}
} else if (handshake == -1 ) {
close(socket, key);
} else if (handshake == SelectionKey.OP_READ){
socketWrapper.registerReadInterest();
} else if (handshake == SelectionKey.OP_WRITE){
socketWrapper.registerWriteInterest();
}
}
......
} finally {
socketWrapper = null;
event = null;
//return to cache
if (running && !paused) {
processorCache.push(this);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
下面来看ConnectionHandler的process()方法,先试图从connections中获取当前Socket对应的Processor,如果没有找到的话从recycledProcessors中获取,也就是已经处理过连接但是没有被销毁的Processor,这样做的好处是避免频繁地创建和销毁对象。processor还是为空的话,那就使用createProcessor创建。
@Override
public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) {
}
if (wrapper == null) {
// Nothing to do. Socket has been closed.
return SocketState.CLOSED;
}
S socket = wrapper.getSocket();
Processor processor = connections.get(socket);
if ((status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR)
&& processor == null) {
// Nothing to do. Endpoint requested a close and there is no
// longer a processor associated with this socket.
return SocketState.CLOSED;
}
ContainerThreadMarker.set();
try {
if (processor == null) {
String negotiatedProtocol = wrapper.getNegotiatedProtocol();
if (negotiatedProtocol != null) {
UpgradeProtocol upgradeProtocol =
getProtocol().getNegotiatedProtocol(negotiatedProtocol);
if (upgradeProtocol != null) {
processor = upgradeProtocol.getProcessor(
wrapper, getProtocol().getAdapter());
} else if (negotiatedProtocol.equals("http/1.1")) {
// Explicitly negotiated the default protocol.
// Obtain a processor below.
} else {
// TODO:
// OpenSSL 1.0.2's ALPN callback doesn't support
// failing the handshake with an error if no
// protocol can be negotiated. Therefore, we need to
// fail the connection here. Once this is fixed,
// replace the code below with the commented out
// block.
if (getLog().isDebugEnabled()) {
getLog().debug(sm.getString(
"abstractConnectionHandler.negotiatedProcessor.fail",
negotiatedProtocol));
}
return SocketState.CLOSED;
}
}
}
if (processor == null) {
processor = recycledProcessors.pop();
}
if (processor == null) {
processor = getProtocol().createProcessor();
register(processor);
}
processor.setSslSupport(
wrapper.getSslSupport(getProtocol().getClientCertProvider()));
// Associate the processor with the connection
connections.put(socket, processor);
// Make sure an async timeout doesn't fire
getProtocol().removeWaitingProcessor(processor);
SocketState state = SocketState.CLOSED;
do {
state = processor.process(wrapper, status);
......
// Make sure socket/processor is removed from the list of current
// connections
connections.remove(socket);
release(processor);
return SocketState.CLOSED;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
在这里说明一下EndPoint和ConnectionHandler的关系
在Tomcat中Endpoint主要用来接收网络请求,处理则由ConnectionHandler来执行。
ConnectionHandler主要作用是调用对应协议的Processor来处理请求。而对应的processor经过简单的内容解析之后,则调用Adapter(连接适配器)的方法,将请求转发给容器,由Servlet容器来处理用户请求。
按照这个顺序,接下来先看processor(AbstractProcessorLight)的process方法
Http11Processor(Http请求处理者)
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
// Do nothing here, just wait for it to get recycled
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
// 可能有管道数据读取。如果数据没有处理,
// 执行将退出这个循环和呼叫release()将回收处理器(输入缓冲区)删除任何管道数据。
// 为了避免这种情况,现在处理它。
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
// Extra write event likely after async, ignore
state = SocketState.LONG;
} else {
state = service(socketWrapper);
}
if (state != SocketState.CLOSED && isAsync()) {
state = asyncPostProcess();
}
if (getLog().isDebugEnabled()) {
getLog().debug("Socket: [" + socketWrapper +
"], Status in: [" + status +
"], State out: [" + state + "]");
}
if (dispatches == null || !dispatches.hasNext()) {
// Only returns non-null iterator if there are
// dispatches to process.
dispatches = getIteratorAndClearDispatches();
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
接下来进入service方法,需要提一下,service方法在AbstractProcessorLight中是一个抽象方法,其真正的实现根据协议不同,实现的方式也不同,咱们这儿是http,所以使用的是Http11Processor
这个方法中终于见到了我们熟悉的Request和Response,还有一个NIO很重要的Buffer模块,NIO中数据的读取和写入都要从经过Buffer。
大家注意getAdapter().service(request, response); 到了这一步,Processor正式把接力棒交到了Adapter手中,由Adapter继续处理Request和Response。
public SocketState service(SocketWrapperBase<?> socketWrapper)
throws IOException {
RequestInfo rp = request.getRequestProcessor();
rp.setStage(org.apache.coyote.Constants.STAGE_PARSE);
// Setting up the I/O
setSocketWrapper(socketWrapper);
inputBuffer.init(socketWrapper);
outputBuffer.init(socketWrapper);
// Flags
keepAlive = true;
openSocket = false;
readComplete = true;
boolean keptAlive = false;
while (!getErrorState().isError() && keepAlive && !isAsync() &&
upgradeToken == null && !endpoint.isPaused()) {
// 省略部分是设置request和response,因为只是介绍tomcat的调用流程
// 有兴趣的同学自己查阅相关内容
......
// Process the request in the adapter
if (!getErrorState().isError()) {
try {
rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE);
getAdapter().service(request, response);
if(keepAlive && !getErrorState().isError() && !isAsync() &&
statusDropsConnection(response.getStatus())) {
setErrorState(ErrorState.CLOSE_CLEAN, null);
}
......
}
// Finish the handling of the request
......
if (!isAsync() || getErrorState().isError()) {
request.updateCounters();
if (getErrorState().isIoAllowed()) {
inputBuffer.nextRequest();
outputBuffer.nextRequest();
}
}
......
if (breakKeepAliveLoop(socketWrapper)) {
break;
}
}
rp.setStage(org.apache.coyote.Constants.STAGE_ENDED);
......
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
CoyoteAdapter(Coyote适配器)
好了,我们来到Adapter,顾名思义,是个适配器,由此我们可以想到哪种设计模式,对-就是适配器模式(将一个类的接口转换成客户希望的另外一个接口。Adapter模式使得原本由于接口不兼容而不能一起工作的那些类可以一起工作),Adapter是一个接口,在org.apache.coyote路径下,等于说是一个规范用于调用service方法,并且唤起所有的监听器。视线转到CoyoteAdapter,说白了就是Coyote类型的实现。
public void service(org.apache.coyote.Request req,
org.apache.coyote.Response res)
throws Exception {
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
// 设置request和response
......
boolean async = false;
boolean postParseSuccess = false;
try {
// Parse and set Catalina and configuration specific
// request parameters
req.getRequestProcessor().setWorkerThreadName(THREAD_NAME.get());
postParseSuccess = postParseRequest(req, request, res, response);
if (postParseSuccess) {
//check valves if we support async
request.setAsyncSupported(connector.getService().getContainer().getPipeline().isAsyncSupported());
// Calling the container
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
}
if (request.isAsync()) {
async = true;
ReadListener readListener = req.getReadListener();
if (readListener != null && request.isFinished()) {
// Possible the all data may have been read during service()
// method so this needs to be checked here
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
if (req.sendAllDataReadEvent()) {
req.getReadListener().onAllDataRead();
}
} finally {
request.getContext().unbind(false, oldCL);
}
}
Throwable throwable =
(Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
// If an async request was started, is not going to end once
// this container thread finishes and an error occurred, trigger
// the async error process
if (!request.isAsyncCompleting() && throwable != null) {
request.getAsyncContextInternal().setErrorState(throwable, true);
}
} else {
request.finishRequest();
response.finishResponse();
}
}
......
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
Pipeline & Valve 管道阀门链
大家注意这行代码:
connector.getService().getContainer().getPipeline().getFirst().invoke(request, response);
Connector是什么大家还有印象吧,咱们一直在看请求流程,别忘了Connector就是接收请求的模块,没别的,就是提醒一下。
OK,接下来该重头戏了,另外一个Tomcat的设计优美之处,责任链模式调用。
大家可以结合这两图,来理解 从Connector –>Service –>Container的层层获取
大家还记得在第一节课提到的结构吧,Engine、Host、Context、Wrapper都属于Container,这几层Container之间的request传递是怎么进行的呢,这里就要说到Tomcat里面的PipeLine和Valve
Pipeline就像一个工厂中的生产线,负责调配工人(valve)的位置,valve则是生产线上负责不同操作的工人。
一个生产线的完成需要两步:
1、把原料运到工人边上
2、工人完成自己负责的部分
Valve的逻辑结构就是常用的单向链表
每一个Container对Valve都有一个标准实现,对于Engine来说,就有StandardEngineValve,在初始化StandardEngine的时候,就会把这个Valve加进StandardPipeline中,是first,也是basic,当addValve之后,First向前移动,效果图就如上图所示。
具体的每一层就不再一一列举,直接到达StandardWrapperValve的invoke方法
Servlet
// Allocate a servlet instance to process this request
try {
if (!unavailable) {
servlet = wrapper.allocate();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
可以看到由StandardWrapper分配servlet,接下来的一系列操作就直接代码给出,只列出关键代码
allocate方法中
try {
if (log.isDebugEnabled()) {
log.debug("Allocating non-STM instance");
}
// Note: We don't know if the Servlet implements
// SingleThreadModel until we have loaded it.
instance = loadServlet();
newInstance = true;
if (!singleThreadModel) {
// For non-STM, increment here to prevent a race
// condition with unload. Bug 43683, test case
// #3
countAllocated.incrementAndGet();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
loadServlet中就会进行servlet的初始化工作
接下来要说到filterChain,下面这段代码也是在StandardWrapperValve的invoke方法中
// Create the filter chain for this request
ApplicationFilterChain filterChain =
ApplicationFilterFactory.createFilterChain(request, wrapper, servlet);
try {
if ((servlet != null) && (filterChain != null)) {
// Swallow output if needed
if (context.getSwallowOutput()) {
try {
SystemLogHandler.startCapture();
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
} else {
filterChain.doFilter(request.getRequest(),
response.getResponse());
}
} finally {
String log = SystemLogHandler.stopCapture();
if (log != null && log.length() > 0) {
context.getLogger().info(log);
}
}
} else {
if (request.isAsyncDispatching()) {
request.getAsyncContextInternal().doInternalDispatch();
} else {
filterChain.doFilter
(request.getRequest(), response.getResponse());
}
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
filterChain.doFilter(request.getRequest(), response.getResponse());是进入我们熟悉的doGet、doPost等方法的关键,进入doFilter方法看,在ApplicationFilterChain的internalDoFilter方法中,有一行servlet.service(request, response);这就是进入到咱们servlet 服务的入口,此方法在Tomcat自带的HttpServlet中实现
/**
* Dispatches client requests to the protected
* <code>service</code> method. There's no need to
* override this method.
*/
public void service(ServletRequest req, ServletResponse res)
throws ServletException, IOException {
HttpServletRequest request;
HttpServletResponse response;
try {
request = (HttpServletRequest) req;
response = (HttpServletResponse) res;
} catch (ClassCastException e) {
throw new ServletException("non-HTTP request or response");
}
service(request, response);
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
大家看到,在上面这个service方法中request被转成HttpServletRequest,response被转成HttpServletResponse,
接着执行另一个service方法
protected void service(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
String method = req.getMethod();
if (method.equals(METHOD_GET)) {
long lastModified = getLastModified(req);
if (lastModified == -1) {
// servlet doesn't support if-modified-since, no reason
// to go through further expensive logic
doGet(req, resp);
} else {
long ifModifiedSince;
try {
ifModifiedSince = req.getDateHeader(HEADER_IFMODSINCE);
} catch (IllegalArgumentException iae) {
// Invalid date header - proceed as if none was set
ifModifiedSince = -1;
}
if (ifModifiedSince < (lastModified / 1000 * 1000)) {
// If the servlet mod time is later, call doGet()
// Round down to the nearest second for a proper compare
// A ifModifiedSince of -1 will always be less
maybeSetLastModified(resp, lastModified);
doGet(req, resp);
} else {
resp.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
}
}
} else if (method.equals(METHOD_HEAD)) {
long lastModified = getLastModified(req);
maybeSetLastModified(resp, lastModified);
doHead(req, resp);
} else if (method.equals(METHOD_POST)) {
doPost(req, resp);
} else if (method.equals(METHOD_PUT)) {
doPut(req, resp);
} else if (method.equals(METHOD_DELETE)) {
doDelete(req, resp);
} else if (method.equals(METHOD_OPTIONS)) {
doOptions(req,resp);
} else if (method.equals(METHOD_TRACE)) {
doTrace(req,resp);
} else {
//
// Note that this means NO servlet supports whatever
// method was requested, anywhere on this server.
//
String errMsg = lStrings.getString("http.method_not_implemented");
Object[] errArgs = new Object[1];
errArgs[0] = method;
errMsg = MessageFormat.format(errMsg, errArgs);
resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED, errMsg);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
这个方法在整个流程中非常重要,因为在这里才会开始咱们久久盼望的业务逻辑
大家可以看到上面的截图中高亮的部分,Spring的FrameworkServlet就是通过间接的继承HttpServlet,从而可以自己实现一套业务处理规范。
好了,业务具体是怎么处理的在这里就不多说了, 那么现在从接收请求到业务处理就告一段落了,别忘了,现在只是接收请求,还有返回response的部分,接下来说这一部分
返回数据
提到Tomcat的写数据,依然要想到selector和outputBuffer,在NioEndpoint中,NioSocketWrapper内部类封装了NIO的读和写,下面简单介绍一下
NioSocketWrapper doWrite()
protected void doWrite(boolean block) throws IOException {
socketBufferHandler.configureWriteBufferForRead();
long writeTimeout = getWriteTimeout();
Selector selector = null;
try {
selector = pool.get();
} catch (IOException x) {
// Ignore
}
try {
pool.write(socketBufferHandler.getWriteBuffer(), getSocket(),
selector, writeTimeout, block);
if (block) {
// Make sure we are flushed
do {
if (getSocket().flush(true, selector, writeTimeout)) break;
} while (true);
}
updateLastWrite();
} finally {
if (selector != null) {
pool.put(selector);
}
}
// If there is data left in the buffer the socket will be registered for
// write further up the stack. This is to ensure the socket is only
// registered for write once as both container and user code can trigger
// write registration.
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
NioSelectorPool
doWrite方法中有一行pool.write(socketBufferHandler.getWriteBuffer(), getSocket(),
selector, writeTimeout, block);
这行代码就是说要向NIO的selector里面注册写类型的事件
在NioBlockingSelector中有详细的write方法,有兴趣的同学可以进入源码查看,这里就不在说明。
好了,这次的分享就到此为止了,有问题的同学可以给我留言,下次会分享一下Tomcat中的设计模式,更有趣的还在后面。
上一篇文章说到了Tomcat的启动,但是Tomcat作为一个Server,就必须要接收请求,这次分享就着重说一下这部分。