Tomcat源码分析-http请求走过的那些路(一)

it2025-04-23  12

1.概述

前面几篇文章主要针对Tomcat的启动做了较为详细的分析。而对于Tomcat来说,还有一块复杂的业务,那便是Tomcat是如何处理请求的。所以,今天我们就走进Tomcat源码,看看http在Tomcat中经历了哪些弯弯道道。

2.Tomcat启动时的那些准备

在Tomcat源码分析--启动流程文章中,我们主要对Tomcat的启动做了较为详细的分析。然而为了更好的进入本次主题,还需要对于Connector的启动这一块做更深层次的分析。那么我们就从org.apache.catalina.connector.Connector#startInternal中的protocolHandler.start();开始吧。

我们一路跟进,来到代码org.apache.coyote.AbstractProtocol#start()。在这里,主要做了两件事,一件事是endpoint启动,另外一件事便是开启一个检测请求超时线程,这个是做什么的呢?接着往下看。

2.1 endpoint启动

在org.apache.tomcat.util.net.AbstractEndpoint#start中,主要做了两件事,bind()和startInternal()。

2.1.1 bind()

在这个方法中,主要是做了对于endpoint的初始化。而具体又做了如下几件事。

1)创建ServerSocket通道

if (!getUseInheritedChannel()) { serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = (getAddress()!=null?new InetSocketAddress(getAddress(),getPort()):new InetSocketAddress(getPort())); serverSock.socket().bind(addr,getAcceptCount()); } else { Channel ic = System.inheritedChannel(); if (ic instanceof ServerSocketChannel) { serverSock = (ServerSocketChannel) ic; } if (serverSock == null) { throw new IllegalArgumentException(sm.getString("endpoint.init.bind.inherited")); } }

在创建ServerSocket通道中,这里有两种方式。如果允许使用内置的通道,则执行else操作,否则将重新创建一个通道。如果是新创建一个ServerSocketChannel,他需要绑定ip端口,以及设置可接受的数量(getAcceptCount()),默认为100;如果使用内置的通道System.inheritedChannel(),程序并没有过多的设置和参与,直接拿过来就使用。这里简单介绍一下nheritedChannel。

返回从创建此 Java 虚拟机的实体中继承的信道,此方法返回通过调用系统级默认 selectorprovider 对象的 inheritedchannel 方法获得的信道。 除了 inheritedchannel 中描述的面向网络的信道之外,此方法以后还可能返回其他种类的信道。

2)设置Acceptor与Poller的数量

Acceptor与Poller都是做什么的?现在你只需只要Acceptor是http的门户,他将请求交给poller,然后由poller将socket交给工作线程处理。

if (acceptorThreadCount == 0) { // FIXME: Doesn't seem to work that well with multiple accept threads acceptorThreadCount = 1; } if (pollerThreadCount <= 0) { //minimum one poller thread pollerThreadCount = 1; }

默认情况下,acceptorThreadCount 的数量为1,而poller的数量取决于可使用内核的数量。如果可用cpu内核数量≥2,则为2个,如果可用cpu内核数量=1,那poller个数为1个。

private int pollerThreadCount = Math.min(2,Runtime.getRuntime().availableProcessors());

3)打开selectorPool

这个和NIO编程相关,这里就不详细描述了,感兴趣的可以自行恶补一下。

2.1.2 启动endpoint(startInternal())

在org.apache.tomcat.util.net.NioEndpoint#startInternal方法中,主要做了四件事。

1)创建三个栈变量

processorCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getProcessorCache()); eventCache = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getEventCache()); nioChannels = new SynchronizedStack<>(SynchronizedStack.DEFAULT_SIZE, socketProperties.getBufferPool());

在这几行代码中,初始化了socketProcessor缓冲池、事件缓冲池和nio通道。这里并没有使用jdk自带的栈,而是自己写了一个线程安全的栈。我想它只处理入栈和出栈的业务逻辑,并不会处理复杂的逻辑,为了节省对象空间而单独设计一个栈结构吧。

2)创建执行器

// Create worker collection if ( getExecutor() == null ) { createExecutor(); }

这里有一个前提条件,如果执行器为空的情况下,才去创建。那什么时候不为空呢?不知道大家想到server.xml文件没有,这里有一项配置(Executor元素):

<Service name="Catalina"> <Executor name="tomcatThreadPool" namePrefix="catalina-exec-" maxThreads="150" minSpareThreads="4"/> <Connector executor="tomcatThreadPool" port="8080" protocol="HTTP/1.1" connectionTimeout="20000" redirectPort="8443" /> </Service>

如果为空,则需要创建任务线程池。

public void createExecutor() { internalExecutor = true; TaskQueue taskqueue = new TaskQueue(); TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority()); executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf); taskqueue.setParent( (ThreadPoolExecutor) executor); }

这里没什么说的,和正常创建线程池没什么区别,不过注意一下这个变量:internalExecutor,内置执行器。这里设置为true,证明以后的任务线程池全部使用内置的线程池。另外还要注意一下线程的名字,getName() + "-exec-",默认是TP-exce-,但是在初始化的过程中对于getName()做了重新设置,这里我们主要关注-exec-就可以了。

3)创建Poller线程

pollers = new Poller[getPollerThreadCount()]; for (int i=0; i<pollers.length; i++) { pollers[i] = new Poller(); Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i); pollerThread.setPriority(threadPriority); pollerThread.setDaemon(true); pollerThread.start(); }

这一代没有那么复杂,通过上面分析,这里一般需要初始化两个poller。由于将poller加入到Thread中,而且还执行了start方法,所以我们有必要看一下Poller的run方法。官方给予的注释为:

The background thread that adds sockets to the Poller, checks the poller for triggered events and hands the associated socket off to an appropriate processor as events occur.

将套接字添加到轮询器的后台线程,检查轮询器是否触发了事件, 并在事件发生时将关联的套接字移交给适当的处理器。

这里有一个while(ture){},而且在while中并没有阻塞的方法,所以一直通过while循环监听,知道通过代码监听到来自Acceptor发送的,才进行处理业务逻辑,那这段代码为selector.selectedKeys().iterator()。

4)创建并启动Acceptor线程

protected final void startAcceptorThreads() { int count = getAcceptorThreadCount(); acceptors = new Acceptor[count]; for (int i = 0; i < count; i++) { acceptors[i] = createAcceptor(); //设置Acceptor线程的名字 String threadName = getName() + "-Acceptor-" + i; acceptors[i].setThreadName(threadName); Thread t = new Thread(acceptors[i], threadName); t.setPriority(getAcceptorThreadPriority()); //设置是否守护线程 t.setDaemon(getDaemon()); t.start(); } }

根据上面分析,这里默认创建一个Acceptor线程,注意关键字“-Acceptor-”。那Acceptor的run方法执行了哪些操作呢?注意,这次我们要找子类org.apache.tomcat.util.net.NioEndpoint.Acceptor。在这个类的run方法中,我们找到一句代码socket = serverSock.accept();,当执行到这段代码,线程阻塞。

感兴趣的可以看一下Nio2Endpoint.Acceptor,看看他是怎么实现了,为什么Nio2Endpoint称为异步非阻塞。

2.2 启动超时连接线程

asyncTimeout = new AsyncTimeout(); Thread timeoutThread = new Thread(asyncTimeout, getNameInternal() + "-AsyncTimeout"); int priority = endpoint.getThreadPriority(); if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) { priority = Thread.NORM_PRIORITY; } timeoutThread.setPriority(priority); timeoutThread.setDaemon(true); timeoutThread.start();

我们注意一下关键字AsyncTimeout,异步超时线程。这段代码的主要作用是当出现超时的连接,通过当前线程会将请求转发到任务线程。这是我的猜测,不知道是否正确。。

分析到这里,我们知道,acceptor类的run方法中的serversocket.accept()还一直阻塞呢,也就是他监听着请求的到来。

3. 请求真的来了

通过上面的代码分析我们知道,当http请求真的来了,我们应该从org.apache.tomcat.util.net.NioEndpoint.Acceptor#run方法中的socket = serverSock.accept();开始。沿着代码往下走。

3.1 setSocketOptions(socket)【将socket传递给poller】

在这个方法中,主要做了两件事:

1)获取一个channel,并将socket放入channel中

 

NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); }

不知大家是否还有印象,在上面的代码中我们一口气创建了三个栈,其中有一个便为nioChannel,我们弹出一个channel,并将socket放入channel中,当然,如果nioChannel为空,我们会创建一个channel。

2)将带有socket的channel注册给poller.

getPoller0().register(channel);

上面提到,poller默认初始化两个。那么我们选取哪个poller进行处理呢?

public Poller getPoller0() { int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; }

这段代码很好理解,采用轮训的方式进行注册。

3.2 socket注册到poller

public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); PollerEvent r = eventCache.pop(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); }

代码开始,创建一个NioSocketWrapper包装类(这里用到了装饰者的设计模式),接着从事件缓冲池eventCache弹出一个事件,然后设置当前socket为读模式,最终,将此socket加入到poller事件中。

3.3 poller到工作线程

我们快速来到org.apache.tomcat.util.net.NioEndpoint.Poller#run。这里执行步骤如下:

3.3.1 事件处理

在这个方法中有这么一行代码hasEvents = events();

public boolean events() { boolean result = false; PollerEvent pe = null; for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; try { pe.run(); pe.reset(); if (running && !paused) { eventCache.push(pe); } } catch ( Throwable x ) { log.error("",x); } } return result; }

在这段代码中,首先弹出一个事件,pe = events.poll(),并执行run方法。在run方法中,主要有如下代码

socket.getIOChannel().register( socket.getPoller().getSelector(), SelectionKey.OP_READ, socketWrapper);

将读事件注册到select中。

3.3.2 读取Selector内容

当再次执行org.apache.tomcat.util.net.NioEndpoint.Poller#run的while循环中执行keyCount = selector.selectNow();发现有一个选择器。

Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { iterator.remove(); processKey(sk, attachment); } }

我们获取到selector的所有SelectionKey,迭代循环执行。通过processKey(sk,attachment)发送到下一个方法org.apache. tomcat.util.net.NioEndpoint.Poller#processKey。

if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } }

根据Socket的读事件或者写事件,执行processSocket(attachment, SocketEvent.OPEN_READ, true)方法。

3.3.3 移交工作线程池

SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); }

在方法org.apache.tomcat.util.net.AbstractEndpoint#processSocket中,首先创建SocketProcessorBase对象sc,然后拿到工作线程池,Executor executor = getExecutor();,最终执行SocketProcessorBase。

所以此时我们需要看一下SocketProcessorBase的run方法。首先这是一个抽象类,具体实现类为org.apache.tomcat.util. net.NioEndpoint.SocketProcessor。其实SocketProcessorBase的run方法执行了SocketProcessor的doRun方法。

这里才是真正处理http请求的地方。

在下一篇,我们将重点分析一下tomcat是如何解析并处理http请求的。

最新回复(0)