⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 https://juejin.im/post/5ab381a26fb9a028b410f9be 「预流」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

Tomcat 的 Connector 有三种运行模式 bio、nio、apr ,先了解一下这三种的区别。

  1. bio(blocking I/O),顾名思义即阻塞式 I/O 操作,表示 Tomcat 使用的是传统的 Java I/O 操作(即java.io包及其子包)。Tomcat 在默认情况下,就是以 bio 模式运行的。一般而言 bio 模式是三种运行模式中性能最低的一种。
  2. nio(new I/O),是 Java SE 1.4 及后续版本提供的一种新的 I/O 操作方式(即java.nio包及其子包)。Java nio 是一个基于缓冲区并能提供非阻塞 I/O 操作的 Java API ,因此 nio 也被看成是 non-blocking I/O 的缩写。它拥有比传统 I/O 操作( bio )更好的并发运行性能。要让 Tomcat 以 nio 模式来运行只需要在 Tomcat 安装目录/conf/server.xml文件中将 Connector 节点的 protocol 配置成org.apache.coyote.http11.Http11NioProtocol即可。
  3. apr( Apache Portable Runtime/Apache 可移植运行时),是 Apache HTTP 服务器的支持库。可以简单地理解为 Tomcat 将以 JNI 的形式调 用Apache HTTP 服务器的核心动态链接库来处理文件读取或网络传输操作,从而大大提高 Tomcat 对静态文件的处理性能。 Tomcat apr 也是在 Tomcat 上运行高并发应用的首选模式。

写个 BIO 的 Socket 服务器还是比较容易的,无非是每 accept 一个 socket 之后就扔到一个线程中处理请求生成响应,这种方式可以改进的点就是增加线程池的支持,本文主要分析一下 Tomcat 中 NIO 处理方式的相关代码逻辑。

关键代码都是在org.apache.tomcat.util.net.NioEndpoint这个类里面,它是 Http11NioProtocol 中负责接收处理 socket 的主要组件,别看代码很长,仔细阅读会发现有很多共通的地方,如:

  1. 都会对JDK中原有的API做一下扩展或者包装,如 ThreadPoolExecutor 是对java.util.concurrent.ThreadPoolExecutor的扩展,NioChannel 是对 ByteChannel 的扩展,KeyAttachment 则是对 NioChannel 的包装
  2. 很多类设计成非 GC 的,方便缓存和重复使用,实现方式都是通过 ConcurrentLinkedQueue 类构造一个队列。比如 NioEndpoint 类里面的 ConcurrentLinkedQueue processorCache、ConcurrentLinkedQueue keyCache、ConcurrentLinkedQueue eventCache、ConcurrentLinkedQueue nioChannels 。Poller 类里面的 ConcurrentLinkedQueue events

先看下整个 Connector 组件结构图:

看过之前 Tomcat 启动文章的应该都知道,Connector 的启动会调用 Connector 类的 startInternal 方法,里面调用了 protocolHandler 的 start() ,该方法中将调用抽象的 endpoint 的 start() 方法,这个方法会调用到具体 Endpoint 类的 startInternal() ,所以代码分析先从 NioEndpoint 类的 startInternal 看起。

1.NioEndpoint 类核心组件的初始化

/**
* Start the NIO endpoint, creating acceptor, poller threads.
*/
@Override
public void startInternal() throws Exception {

if (!running) {
running = true;
paused = false;

// Create worker collection
if ( getExecutor() == null ) {
// 构造线程池,用于后续执行SocketProcessor线程,这就是上图中的Worker。
createExecutor();
}

initializeConnectionLatch();

// Start poller threads
// 根据处理器数量构造一定数目的轮询器,即上图中的Poller
pollers = new Poller[getPollerThreadCount()];
for (int i=0; i<pollers.length; i++) {
pollers[i] = new Poller();
Thread pollerThread = new Thread(pollers[i], getName() + "-ClientPoller-"+i);
pollerThread.setPriority(threadPriority);
pollerThread.setDaemon(true);
pollerThread.start();
}

// 创建接收者线程,即上图中的Acceptor
startAcceptorThreads();
}
}

startAcceptorThreads 调用的是父类org.apache.tomcat.util.net.AbstractEndpoint中的实现:

protected final void startAcceptorThreads() {
int count = getAcceptorThreadCount();
acceptors = new Acceptor[count];

for (int i = 0; i < count; i++) {
// 调用子类的createAcceptor方法,本例中即NioEndpoint类的createAcceptor方法
acceptors[i] = createAcceptor();
String threadName = getName() + "-Acceptor-" + i;
acceptors[i].setThreadName(threadName);
Thread t = new Thread(acceptors[i], threadName);
t.setPriority(getAcceptorThreadPriority());
t.setDaemon(getDaemon());
t.start();
}
}

以上就是 Acceptor、Poller、Worker 等核心组件的初始化过程。

2.请求接收

核心组件初始化之后接着就是 Acceptor 线程接收 socket 连接,看下 Acceptor 的源码:

// --------------------------------------------------- Acceptor Inner Class
/**
* 后台线程,用于监听TCP/IP连接以及将它们分发给相应的调度器处理。
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
protected class Acceptor extends AbstractEndpoint.Acceptor {

@Override
public void run() {

int errorDelay = 0;

// 循环遍历直到接收到关闭命令
// Loop until we receive a shutdown command
while (running) {

// Loop if endpoint is paused
while (paused && running) {
state = AcceptorState.PAUSED;
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// Ignore
}
}

if (!running) {
break;
}
state = AcceptorState.RUNNING;

try {
// 如果已经达到最大连接数则让线程等待
//if we have reached max connections, wait
countUpOrAwaitConnection();

SocketChannel socket = null;
try {
// 接收连接,这里用的阻塞模式。
// Accept the next incoming connection from the server
// socket
socket = serverSock.accept();
} catch (IOException ioe) {
//we didn't get a socket
countDownConnection();
// Introduce delay if necessary
errorDelay = handleExceptionWithDelay(errorDelay);
// re-throw
throw ioe;
}
// Successful accept, reset the error delay
errorDelay = 0;

// 注意这个setSocketOptions方法
// 它将把上面接收到的socket添加到轮询器Poller中
// setSocketOptions() will add channel to the poller
// if successful
if (running && !paused) {
if (!setSocketOptions(socket)) {
countDownConnection();
closeSocket(socket);
}
} else {
countDownConnection();
closeSocket(socket);
}
} catch (SocketTimeoutException sx) {
// Ignore: Normal condition
} catch (IOException x) {
if (running) {
log.error(sm.getString("endpoint.accept.fail"), x);
}
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error(sm.getString("endpoint.accept.fail"), t);
}
}
state = AcceptorState.ENDED;
}
}

3.Socket 参数设置

在 Acceptor 里接收到一个连接之后调用 setSocketOptions 方法设置 SocketChannel 的一些参数,然后将 SocketChannel 注册到 Poller 中。看下 setSocketOptions 的实现:

/**
* Process the specified connection.
*/
protected boolean setSocketOptions(SocketChannel socket) {
// Process the connection
try {
// 将SocketChannel配置为非阻塞模式
//disable blocking, APR style, we are gonna be polling it
socket.configureBlocking(false);
Socket sock = socket.socket();
// 设置Socket参数值(从server.xml的Connector节点上获取参数值)
// 比如Socket发送、接收的缓存大小、心跳检测等
socketProperties.setProperties(sock);

// 从NioChannel的缓存队列中取出一个NioChannel
// NioChannel是SocketChannel的一个的包装类
// 这里对上层屏蔽SSL和一般TCP连接的差异
NioChannel channel = nioChannels.poll();

// 缓存队列中没有则新建一个NioChannel
if ( channel == null ) {
// SSL setup
if (sslContext != null) {
SSLEngine engine = createSSLEngine();
int appbufsize = engine.getSession().getApplicationBufferSize();
NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),
Math.max(appbufsize,socketProperties.getAppWriteBufSize()),
socketProperties.getDirectBuffer());
channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);
} else {
// normal tcp setup
NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),
socketProperties.getAppWriteBufSize(),
socketProperties.getDirectBuffer());

channel = new NioChannel(socket, bufhandler);
}
} else {
// 将SocketChannel关联到从缓存队列中获取的NioChannel上来
channel.setIOChannel(socket);
if ( channel instanceof SecureNioChannel ) {
SSLEngine engine = createSSLEngine();
((SecureNioChannel)channel).reset(engine);
} else {
channel.reset();
}
}
// 将新接收到的SocketChannel注册到Poller中
getPoller0().register(channel);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
try {
log.error("",t);
} catch (Throwable tt) {
ExceptionUtils.handleThrowable(t);
}
// Tell to close the socket
return false;
}
return true;
}

核心调用是最后的getPoller0().register(channel);它将配置好的 SocketChannel 包装成一个 PollerEvent ,然后加入到 Poller 的 events 缓存队列中。

4.读取事件注册

getPoller0 方法将轮询当前的 Poller 数组,从中取出一个 Poller 返回。( Poller 的初始化参见上述第1步:NioEndpoint 类核心组件的初始化)

/**
* Return an available poller in true round robin fashion
*/
public Poller getPoller0() {
// 最简单的轮询调度算法,poller的计数器不断加1再对poller数组取余数
int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;
return pollers[idx];
}

之后调用 Poller 对象的 register 方法:

public void register(final NioChannel socket) {
// 设置socket的Poller引用,便于后续处理
socket.setPoller(this);
// 从NioEndpoint的keyCache缓存队列中取出一个KeyAttachment
KeyAttachment key = keyCache.poll();
// KeyAttachment实际是NioChannel的包装类
final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
// 重置KeyAttachment对象中Poller、NioChannel等成员变量的引用
ka.reset(this,socket,getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());

// 从Poller的事件对象缓存中取出一个PollerEvent,并用socket初始化事件对象
PollerEvent r = eventCache.poll();
// 设置读操作为感兴趣的操作
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);

// 加入到Poller对象里的事件队列
addEvent(r);
}

看下 Poller 类里 addEvent 的代码:

/**
* Only used in this class. Will be made private in Tomcat 8.0.x
* @deprecated
*/
@Deprecated
public void addEvent(Runnable event) {
events.offer(event);
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}

就两行,第一行从 event 对象添加到缓存队列中,第二行如果当前事件队列中没有事件,则唤醒处于阻塞状态的 selector 。

5.Poller 处理流程

上面讲的是从 Acceptor 中接收到的 Socket 以 PollerEvent 的形式包装并添加到 Poller 的事件缓存队列中,接下来看看另外一个核心组件 Poller 的处理过程:

/**
* Poller class.
*/
public class Poller implements Runnable {

// 这就是NIO中用到的选择器,可以看出每一个Poller都会关联一个Selector
protected Selector selector;
// 待处理的事件队列
protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();

// 唤醒多路复用器的条件阈值
protected AtomicLong wakeupCounter = new AtomicLong(0l);

public Poller() throws IOException {
// 对Selector的同步访问,通过调用Selector.open()方法创建一个Selector
synchronized (Selector.class) {
// Selector.open() isn't thread safe
// http://bugs.sun.com/view_bug.do?bug_id=6427854
// Affects 1.6.0_29, fixed in 1.7.0_01
this.selector = Selector.open();
}
}

// 通过addEvent方法将事件添加到Poller的事件队列中
/**
* Only used in this class. Will be made private in Tomcat 8.0.x
* @deprecated
*/
@Deprecated
public void addEvent(Runnable event) {
events.offer(event);
// 如果队列中没有待处理的事件则唤醒处于阻塞状态的selector
if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();
}

// 处理事件队列中的所有事件,如果事件队列是空的则返回false
/**
* Processes events in the event queue of the Poller.
*
* @return true if some events were processed,
* false if queue was empty
*/
public boolean events() {
boolean result = false;

Runnable r = null;
// 将Poller的事件队列中的事件逐个取出并执行相应的事件线程
while ( (r = events.poll()) != null ) {
result = true;
try {
// 执行事件处理逻辑
// 这里将事件设计成线程是将具体的事件处理逻辑和事件框架分开
r.run();
if ( r instanceof PollerEvent ) {
((PollerEvent)r).reset();
// 事件处理完之后,将事件对象返回NIOEndpoint的事件对象缓存中
eventCache.offer((PollerEvent)r);
}
} catch ( Throwable x ) {
log.error("",x);
}
}

return result;
}

// 将socket包装成统一的事件对象PollerEvent,加入到待处理事件队列中
public void register(final NioChannel socket) {
socket.setPoller(this);
KeyAttachment key = keyCache.poll();
final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);
ka.reset(this,socket,getSocketProperties().getSoTimeout());
ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());
ka.setSecure(isSSLEnabled());
// 从NIOEndpoint的事件对象缓存中取出一个事件对象
PollerEvent r = eventCache.poll();
ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.
if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);
else r.reset(socket,ka,OP_REGISTER);
// 将事件添加打Poller的事件队列中
addEvent(r);
}

// Poller是一个线程,该线程同Acceptor一样会监听TCP/IP连接并将它们交给合适的处理器处理
/**
* The background thread that listens for incoming TCP/IP connections and
* hands them off to an appropriate processor.
*/
@Override
public void run() {
// Loop until destroy() is called
while (true) {
try {
// Loop if endpoint is paused
while (paused && (!close) ) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// Ignore
}
}

boolean hasEvents = false;

// Time to terminate?
if (close) {
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
} else {
// 执行事件队列中的事件线程
hasEvents = events();
}
try {
if ( !close ) {
if (wakeupCounter.getAndSet(-1) > 0) {
// 把wakeupCounter设成-1,这是与addEvent里的代码呼应,这样会唤醒selector
//if we are here, means we have other stuff to do
//do a non blocking select
// 以非阻塞方式查看selector是否有事件发生
keyCount = selector.selectNow();
} else {
// 查看selector是否有事件发生,超过指定时间则立即返回
keyCount = selector.select(selectorTimeout);
}
wakeupCounter.set(0);
}
if (close) {
// 执行事件队列中的事件线程
events();
timeout(0, false);
try {
selector.close();
} catch (IOException ioe) {
log.error(sm.getString(
"endpoint.nio.selectorCloseFail"), ioe);
}
break;
}
} catch ( NullPointerException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch ( CancelledKeyException x ) {
//sun bug 5076772 on windows JDK 1.5
if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);
if ( wakeupCounter == null || selector == null ) throw x;
continue;
} catch (Throwable x) {
ExceptionUtils.handleThrowable(x);
log.error("",x);
continue;
}
//either we timed out or we woke up, process events first
if ( keyCount == 0 ) hasEvents = (hasEvents | events());

Iterator iterator =
keyCount > 0 ? selector.selectedKeys().iterator() : null;
// 根据向selector中注册的key遍历channel中已经就绪的keys,并处理这些key
// Walk through the collection of ready keys and dispatch
// any active event.
while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
// 这里的attachment方法返回的就是在register()方法中注册的
// 而KeyAttachment对象是对socket的包装
KeyAttachment attachment = (KeyAttachment)sk.attachment();
// Attachment may be null if another thread has called
// cancelledKey()
if (attachment == null) {
iterator.remove();
} else {
// 更新通道最近一次发生事件的时间
// 防止因超时没有事件发生而被剔除出selector
attachment.access();
iterator.remove();
// 具体处理通道的逻辑
processKey(sk, attachment);
}
}//while

//process timeouts
// 多路复用器每执行一遍完整的轮询便查看所有通道是否超时
// 对超时的通道将会被剔除出多路复用器
timeout(keyCount,hasEvents);
if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
releaseCaches();
log.error("", oom);
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
}
}//while
synchronized (this) {
this.notifyAll();
}
stopLatch.countDown();

}

// 处理selector检测到的通道事件
protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {
boolean result = true;
try {
if ( close ) {
cancelledKey(sk, SocketStatus.STOP, attachment.comet);
} else if ( sk.isValid() && attachment != null ) {
// 确保通道不会因超时而被剔除
attachment.access();//make sure we don't time out valid sockets
sk.attach(attachment);//cant remember why this is here
NioChannel channel = attachment.getChannel();
// 处理通道发生的读写事件
if (sk.isReadable() || sk.isWritable() ) {
if ( attachment.getSendfileData() != null ) {
processSendfile(sk,attachment, false);
} else {
if ( isWorkerAvailable() ) {
// 在通道上注销对已经发生事件的关注
unreg(sk, attachment, sk.readyOps());
boolean closeSocket = false;
// Read goes before write
if (sk.isReadable()) {
// 具体的通道处理逻辑
if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {
closeSocket = true;
}
}
if (!closeSocket && sk.isWritable()) {
if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {
closeSocket = true;
}
}
if (closeSocket) {
// 解除无效通道
cancelledKey(sk,SocketStatus.DISCONNECT,false);
}
} else {
result = false;
}
}
}
} else {
//invalid key
cancelledKey(sk, SocketStatus.ERROR,false);
}
} catch ( CancelledKeyException ckx ) {
cancelledKey(sk, SocketStatus.ERROR,false);
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
log.error("",t);
}
return result;
}

// 这个unreg()很巧妙,防止了通道对同一个事件不断select的问题
protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {
//this is a must, so that we don't have multiple threads messing with the socket
reg(sk,attachment,sk.interestOps()& (~readyOps));
}

// 向NioChannel注册感兴趣的事件,具体代码看下面的PollerEvent类的说明
protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {
sk.interestOps(intops);
attachment.interestOps(intops);
attachment.setCometOps(intops);
}

}

6.PollerEvent 处理流程

Poller 处理的核心是启动执行事件队列中的 PollerEvent,接着从 selector 中遍历已经就绪的 key ,一旦发生了感兴趣的事件,则交由 processSocket 方法处理。PollerEvent 的作用是向 socket 注册或更新感兴趣的事件:

   /**
*
* PollerEvent, cacheable object for poller events to avoid GC
*/
public static class PollerEvent implements Runnable {

// 每个PollerEvent都会保存NioChannel的引用
protected NioChannel socket;
protected int interestOps;
protected KeyAttachment key;
public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {
reset(ch, k, intOps);
}

public void reset(NioChannel ch, KeyAttachment k, int intOps) {
socket = ch;
interestOps = intOps;
key = k;
}

public void reset() {
reset(null, null, 0);
}

@Override
public void run() {
//socket第一次注册到selector中,完成对socket读事件的注册
if ( interestOps == OP_REGISTER ) {
try {
socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);
} catch (Exception x) {
log.error("", x);
}
} else {
// socket之前已经注册到了selector中,更新socket所感兴趣的事件
final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());
try {
boolean cancel = false;
if (key != null) {
final KeyAttachment att = (KeyAttachment) key.attachment();
if ( att!=null ) {
//handle callback flag
if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {
att.setCometNotify(true);
} else {
att.setCometNotify(false);
}
interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag
// 刷新事件的最后访问时间,防止事件超时
att.access();//to prevent timeout
//we are registering the key to start with, reset the fairness counter.
int ops = key.interestOps() | interestOps;
att.interestOps(ops);
key.interestOps(ops);
} else {
cancel = true;
}
} else {
cancel = true;
}
if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
}catch (CancelledKeyException ckx) {
try {
socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);
}catch (Exception ignore) {}
}
}//end if
}//run

@Override
public String toString() {
return super.toString()+"[intOps="+this.interestOps+"]";
}
}

7.将 socket 交给 Worker 执行

在第5步的 Poller 处理流程的分析中看到它的 run 方法最后会调用 processKey() 处理 selector 检测到的通道事件,而在这个方法最后会调用 processSocket 来调用具体的通道处理逻辑,看下 processSocket 方法的实现:

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {
try {
KeyAttachment attachment = (KeyAttachment)socket.getAttachment();
if (attachment == null) {
return false;
}
attachment.setCometNotify(false); //will get reset upon next reg
// 从SocketProcessor的缓存队列中取出一个来处理socket
SocketProcessor sc = processorCache.poll();
if ( sc == null ) sc = new SocketProcessor(socket,status);
else sc.reset(socket,status);
// 将有事件发生的socket交给Worker处理
if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);
else sc.run();
} catch (RejectedExecutionException rx) {
log.warn("Socket processing request was rejected for:"+socket,rx);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
log.error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}

Poller 通过 NioEndpoint 的协调,将发生事件的 socket 交给工作者线程 Worker 来进一步处理。整个事件框架的工作就到此结束,下面就是 Worker 的处理。

8.从 socket 中处理请求

在 Tomcat 6 版本的 NIO 处理实现中有一个 Worker 类,在 Tomcat 7 中把它去掉了,但工作者的职责还在,只是交由了上面看到的 SocketProcessor 这个类来担当,看下这个类的实现代码:

  // ---------------------------------------------- SocketProcessor Inner Class
// 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。
/**
* This class is the equivalent of the Worker, but will simply use in an
* external Executor thread pool.
*/
protected class SocketProcessor implements Runnable {

// 每个SocketProcessor保存一个NioChannel的引用
protected NioChannel socket = null;
protected SocketStatus status = null;

public SocketProcessor(NioChannel socket, SocketStatus status) {
reset(socket,status);
}

public void reset(NioChannel socket, SocketStatus status) {
this.socket = socket;
this.status = status;
}

@Override
public void run() {
// 从socket中获取SelectionKey
SelectionKey key = socket.getIOChannel().keyFor(
socket.getPoller().getSelector());
KeyAttachment ka = null;

if (key != null) {
ka = (KeyAttachment)key.attachment();
}

// Upgraded connections need to allow multiple threads to access the
// connection at the same time to enable blocking IO to be used when
// NIO has been configured
if (ka != null && ka.isUpgraded() &&
SocketStatus.OPEN_WRITE == status) {
synchronized (ka.getWriteThreadLock()) {
doRun(key, ka);
}
} else {
synchronized (socket) {
doRun(key, ka);
}
}
}

private void doRun(SelectionKey key, KeyAttachment ka) {
try {
int handshake = -1;

try {
if (key != null) {
// For STOP there is no point trying to handshake as the
// Poller has been stopped.
if (socket.isHandshakeComplete() ||
status == SocketStatus.STOP) {
handshake = 0;
} else {
handshake = socket.handshake(
key.isReadable(), key.isWritable());
// The handshake process reads/writes from/to the
// socket. status may therefore be OPEN_WRITE once
// the handshake completes. However, the handshake
// happens when the socket is opened so the status
// must always be OPEN_READ after it completes. It
// is OK to always set this as it is only used if
// the handshake completes.
status = SocketStatus.OPEN_READ;
}
}
}catch ( IOException x ) {
handshake = -1;
if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);
}catch ( CancelledKeyException ckx ) {
handshake = -1;
}
if ( handshake == 0 ) {
SocketState state = SocketState.OPEN;
// Process the request from this socket
if (status == null) {
// 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求
state = handler.process(ka, SocketStatus.OPEN_READ);
} else {
state = handler.process(ka, status);
}
if (state == SocketState.CLOSED) {
// Close socket and pool
try {
close(ka, socket, key, SocketStatus.ERROR);
} catch ( Exception x ) {
log.error("",x);
}
}
} else if (handshake == -1 ) {
close(ka, socket, key, SocketStatus.DISCONNECT);
} else {
ka.getPoller().add(socket, handshake);
}
} catch (CancelledKeyException cx) {
socket.getPoller().cancelledKey(key, null, false);
} catch (OutOfMemoryError oom) {
try {
oomParachuteData = null;
log.error("", oom);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);
}
releaseCaches();
}catch ( Throwable oomt ) {
try {
System.err.println(oomParachuteMsg);
oomt.printStackTrace();
}catch (Throwable letsHopeWeDontGetHere){
ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);
}
}
} catch (VirtualMachineError vme) {
ExceptionUtils.handleThrowable(vme);
}catch ( Throwable t ) {
log.error("",t);
if (socket != null) {
socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);
}
} finally {
socket = null;
status = null;
//return to cache
if (running && !paused) {
processorCache.offer(this);
}
}
}

private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,
SocketStatus socketStatus) {
...
}
}

可以看到由 SocketProcessor 寻找合适的 Handler 处理器做最终 socket 转换处理。

可以用下面这幅图总结一下 NioEndpoint 的主要流程:

Acceptor 和 Poller 是线程数组,Worker 是一个线程池( Executor )

文章目录
  1. 1. 1.NioEndpoint 类核心组件的初始化
  2. 2. 2.请求接收
  3. 3. 3.Socket 参数设置
  4. 4. 4.读取事件注册
  5. 5. 5.Poller 处理流程
  6. 6. 6.PollerEvent 处理流程
  7. 7. 7.将 socket 交给 Worker 执行
  8. 8. 8.从 socket 中处理请求