前面几篇文章主要针对Tomcat的启动做了较为详细的分析。而对于Tomcat来说,还有一块复杂的业务,那便是Tomcat是如何处理请求的。所以,今天我们就走进Tomcat源码,看看http在Tomcat中经历了哪些弯弯道道。
在Tomcat源码分析--启动流程文章中,我们主要对Tomcat的启动做了较为详细的分析。然而为了更好的进入本次主题,还需要对于Connector的启动这一块做更深层次的分析。那么我们就从org.apache.catalina.connector.Connector#startInternal中的protocolHandler.start();开始吧。
我们一路跟进,来到代码org.apache.coyote.AbstractProtocol#start()。在这里,主要做了两件事,一件事是endpoint启动,另外一件事便是开启一个检测请求超时线程,这个是做什么的呢?接着往下看。
在org.apache.tomcat.util.net.AbstractEndpoint#start中,主要做了两件事,bind()和startInternal()。
在这个方法中,主要是做了对于endpoint的初始化。而具体又做了如下几件事。
1)创建ServerSocket通道
if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getAcceptCount()); } else { Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } }在创建ServerSocket通道中,这里有两种方式。如果允许使用内置的通道,则执行else操作,否则将重新创建一个通道。如果是新创建一个ServerSocketChannel,他需要绑定ip端口,以及设置可接受的数量(getAcceptCount()),默认为100;如果使用内置的通道System.inheritedChannel(),程序并没有过多的设置和参与,直接拿过来就使用。这里简单介绍一下nheritedChannel。
返回从创建此 Java 虚拟机的实体中继承的信道,此方法返回通过调用系统级默认 selectorprovider 对象的 inheritedchannel 方法获得的信道。 除了 inheritedchannel 中描述的面向网络的信道之外,此方法以后还可能返回其他种类的信道。
2)设置Acceptor与Poller的数量
Acceptor与Poller都是做什么的?现在你只需只要Acceptor是http的门户,他将请求交给poller,然后由poller将socket交给工作线程处理。
if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; }默认情况下,acceptorThreadCount 的数量为1,而poller的数量取决于可使用内核的数量。如果可用cpu内核数量≥2,则为2个,如果可用cpu内核数量=1,那poller个数为1个。
private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());3)打开selectorPool
这个和NIO编程相关,这里就不详细描述了,感兴趣的可以自行恶补一下。
在org.apache.tomcat.util.net.NioEndpoint#startInternal方法中,主要做了四件事。
1)创建三个栈变量
processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool());在这几行代码中,初始化了socketProcessor缓冲池、事件缓冲池和nio通道。这里并没有使用jdk自带的栈,而是自己写了一个线程安全的栈。我想它只处理入栈和出栈的业务逻辑,并不会处理复杂的逻辑,为了节省对象空间而单独设计一个栈结构吧。
2)创建执行器
// Create worker collection if ( getExecutor() == null ) { createExecutor(); }这里有一个前提条件,如果执行器为空的情况下,才去创建。那什么时候不为空呢?不知道大家想到server.xml文件没有,这里有一项配置(Executor元素):
<Service name="Catalina"> <Executor name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="150" minSpareThreads="4"/> <Connector executor="tomcatThreadPool" port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" /> </Service>如果为空,则需要创建任务线程池。
public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }这里没什么说的,和正常创建线程池没什么区别,不过注意一下这个变量:internalExecutor,内置执行器。这里设置为true,证明以后的任务线程池全部使用内置的线程池。另外还要注意一下线程的名字,getName() + "-exec-",默认是TP-exce-,但是在初始化的过程中对于getName()做了重新设置,这里我们主要关注-exec-就可以了。
3)创建Poller线程
pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); }这一代没有那么复杂,通过上面分析,这里一般需要初始化两个poller。由于将poller加入到Thread中,而且还执行了start方法,所以我们有必要看一下Poller的run方法。官方给予的注释为:
The background thread that adds sockets to the Poller, checks the poller for triggered events and hands the associated socket off to an appropriate processor as events occur.
将套接字添加到轮询器的后台线程,检查轮询器是否触发了事件, 并在事件发生时将关联的套接字移交给适当的处理器。这里有一个while(ture){},而且在while中并没有阻塞的方法,所以一直通过while循环监听,知道通过代码监听到来自Acceptor发送的,才进行处理业务逻辑,那这段代码为selector.selectedKeys().iterator()。
4)创建并启动Acceptor线程
protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); //设置Acceptor线程的名字 String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); //设置是否守护线程 t.setDaemon(getDaemon()); t.start(); } }根据上面分析,这里默认创建一个Acceptor线程,注意关键字“-Acceptor-”。那Acceptor的run方法执行了哪些操作呢?注意,这次我们要找子类org.apache.tomcat.util.net.NioEndpoint.Acceptor。在这个类的run方法中,我们找到一句代码socket = serverSock.accept();,当执行到这段代码,线程阻塞。
感兴趣的可以看一下Nio2Endpoint.Acceptor,看看他是怎么实现了,为什么Nio2Endpoint称为异步非阻塞。
我们注意一下关键字AsyncTimeout,异步超时线程。这段代码的主要作用是当出现超时的连接,通过当前线程会将请求转发到任务线程。这是我的猜测,不知道是否正确。。
分析到这里,我们知道,acceptor类的run方法中的serversocket.accept()还一直阻塞呢,也就是他监听着请求的到来。
通过上面的代码分析我们知道,当http请求真的来了,我们应该从org.apache.tomcat.util.net.NioEndpoint.Acceptor#run方法中的socket = serverSock.accept();开始。沿着代码往下走。
在这个方法中,主要做了两件事:
1)获取一个channel,并将socket放入channel中
NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); }
不知大家是否还有印象,在上面的代码中我们一口气创建了三个栈,其中有一个便为nioChannel,我们弹出一个channel,并将socket放入channel中,当然,如果nioChannel为空,我们会创建一个channel。
2)将带有socket的channel注册给poller.
getPoller0().register(channel);上面提到,poller默认初始化两个。那么我们选取哪个poller进行处理呢?
public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }这段代码很好理解,采用轮训的方式进行注册。
代码开始,创建一个NioSocketWrapper包装类(这里用到了装饰者的设计模式),接着从事件缓冲池eventCache弹出一个事件,然后设置当前socket为读模式,最终,将此socket加入到poller事件中。
我们快速来到org.apache.tomcat.util.net.NioEndpoint.Poller#run。这里执行步骤如下:
在这个方法中有这么一行代码hasEvents = events();
public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }在这段代码中,首先弹出一个事件,pe = events.poll(),并执行run方法。在run方法中,主要有如下代码
socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);将读事件注册到select中。
当再次执行org.apache.tomcat.util.net.NioEndpoint.Poller#run的while循环中执行keyCount = selector.selectNow();发现有一个选择器。
Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); processKey(sk, attachment); } }我们获取到selector的所有SelectionKey,迭代循环执行。通过processKey(sk,attachment)发送到下一个方法org.apache. tomcat.util.net.NioEndpoint.Poller#processKey。
if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } }根据Socket的读事件或者写事件,执行processSocket(attachment, SocketEvent.OPEN_READ, true)方法。
在方法org.apache.tomcat.util.net.AbstractEndpoint#processSocket中,首先创建SocketProcessorBase对象sc,然后拿到工作线程池,Executor executor = getExecutor();,最终执行SocketProcessorBase。
所以此时我们需要看一下SocketProcessorBase的run方法。首先这是一个抽象类,具体实现类为org.apache.tomcat.util. net.NioEndpoint.SocketProcessor。其实SocketProcessorBase的run方法执行了SocketProcessor的doRun方法。
这里才是真正处理http请求的地方。
在下一篇,我们将重点分析一下tomcat是如何解析并处理http请求的。