深入浅出Tomcat网络通信的高并发处理机制


theme: healer-readable highlight: a11y-dark

深入浅出Tomcat网络通信的高并发处理机制

随着互联网应用的快速发展,Web服务器面临的访问压力日益增大,如何高效处理高并发的网络请求成为关键

Tomcat作为Java世界中最受欢迎的Web容器之一,可以灵活选择不同的IO模型来处理网络通信,确保面对高并发的网络请求时能够快速处理

上篇文章21张图解析Tomcat运行原理与架构全貌,我们说到Tomcat中通过Connector来处理网络通信,其中Connector的职责主要由组件EndPoint、Processor、Adapter来完成

EndPoint负责网络通信、Processor负责解析、Adapter负责将请求转换为Servlet的请求并交给容器处理

本篇文章就来重点聊聊AbstractEndpoint的多种实现类是如何处理网络通信的

AbstractEndpoint有三种实现类:NioEndPoint、Nio2EndPoint、AprEndPoint

其中默认使用NioEndPoint(多路复用模型),Nio2EndPoint使用异步IO模型,而AprEndPoint为早期提供高性能(Tomcat 10时被弃用)

NioEndPoint

NioEndPoint将处理网络通信分离为三个步骤,分别使用三个组件进行执行:接收连接、检测IO事件、处理请求

我们先大致对这些组件的作用进行描述,后续再通过源码分析~

Acceptor用于接收连接(循环执行):使用LimitLatch限制最大连接数量,等待客户端完成TCP三次握手连接后,将连接交给Poller

Poller用于检测IO事件是否就绪(循环执行):将连接注册到Selector上,使用Selector监听IO事件,当事件发生(读就绪)时交给Executor进行处理

Executor池化管理线程,使用线程执行后续流程(解析请求、封装适配、交给容器处理...)

Acceptor

Acceptor使用LimitLatch限制连接数量,收到连接后将其逐步封装为PollerEvent并放到Poller的队列中

NioEndpoint.initServerSocket 在启动组件时初始化socket

//服务端Channel
ServerSocketChannel serverSock = ServerSocketChannel.open();
socketProperties.setProperties(serverSock.socket());
InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset());
//服务端channel绑定端口,getAcceptCount为连接上限
serverSock.bind(addr, getAcceptCount());

ServerSocketChannel作为服务端Channel只监听一个端口(连接器中设置的端口)

无论Acceptor线程数量为多少,都共享该服务端channel

getAcceptCount() 为建立连接后最大积压的数量,acceptCount默认为100

TCP三次握手完成后会将客户端连接放到accept队列中等待服务端拿走,acceptCount就是accept队列最大存放的连接数量

Acceptor.run Acceptor接收连接

为了简化流程,只保留了较重要的流程:

  1. 使用LimitLatch限制连接数量,如果达到最大值则等待
  2. 等待获取客户端连接socket channel(TCP三次握手完成)
  3. 把socket channel交给Poller处理
public void run() {
//...
while (!stopCalled) {
//1.使用limit latch限制连接数量,如果达到最大值则等待
    endpoint.countUpOrAwaitConnection();

    U socket = null;
            
    //2.等待获取客户端socket channel
    socket = endpoint.serverSocketAccept();
            
    //3.交给Poller处理
    endpoint.setSocketOptions(socket);             
} 

}

对于实现限制连接数量,当达到最大值时则进行等待,实现的关键为计数、等待

熟悉并发包的同学会立马想到信号量Semaphore,但Tomcat没有直接使用信号量组件,而是基于AQS自己实现同步组件LimitLatch

LimitLatch直接使用原子类进行计数,利用AQS来实现等待

(LimitLatch基于AQS实现比较简单,这里就不进行分析,不熟悉AQS的同学可以查看这篇文章:10分钟从源码级别搞懂AQS

如果没有超过限制则会获取下一个完成连接(TCP三次握手)的客户端连接SocketChannel result = serverSock.accept();

并将Channel包装成SocketWrapper,再包装为PollerEvent,加入Poller的队列中

public void register(final NioSocketWrapper socketWrapper) {
    //监听读事件(读就绪时poller能够继续处理)
    socketWrapper.interestOps(SelectionKey.OP_READ);
    //包装为PollerEvent 指定关心注册事件OP_REGISTER(后续poller将该通道注册到select上)
    PollerEvent pollerEvent = createPollerEvent(socketWrapper, OP_REGISTER);
    //放入poller的队列
    addEvent(pollerEvent);
}

Poller的队列SynchronizedQueue也是Tomcat自己实现的

Acceptor与Poller之间通过队列通信,SynchronizedQueue使用synchronized保证并发操作下的原子性

Poller

Poller循环处理队列中的PollerEvent事件,当Selector上监听的连接发生IO事件时迭代处理,将事件封装为SocketProcessor交给线程池处理

Poller.run

Poller主要循环检测是否有IO事件发生,主要流程为:

  1. 轮询处理队列中的事件PollerEvent,比如将通道注册到Selector上
  2. Selector阻塞到事件发生或超时
  3. 迭代遍历处理事件,交给线程池处理
public void run() {
    while (true) {
        //...
    //1.轮询处理队列中的事件
events();
    
//2.select 阻塞直到事件发生
    selector.select(selectorTimeout);
    
    //3.迭代遍历处理事件
    Iterator<selectionkey> iterator = keyCount &gt; 0 ? selector.selectedKeys().iterator() : null;

while (iterator != null && iterator.hasNext()) {
SelectionKey sk = iterator.next();
iterator.remove();
//从附件中拿到连接的包装
NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment();
if (socketWrapper != null) {
//交给线程池处理
processKey(sk, socketWrapper);
}
}

}   

}

交给线程池处理前会将其封装为SocketProcessor

AbstractEndpoint.processSocket

public boolean processSocket(SocketWrapperBase socketWrapper,
        SocketEvent event, boolean dispatch) {
    SocketProcessorBase sc = null;
    if (processorCache != null) {
        sc = processorCache.pop();
    }
    //封装SocketProcessor
    if (sc == null) {
        sc = createSocketProcessor(socketWrapper, event);
    } else {
        sc.reset(socketWrapper, event);
    }
Executor executor = getExecutor();
if (dispatch &amp;&amp; executor != null) {
    //交给线程池执行
    executor.execute(sc);
} else {
    //当前线程执行(使用AIO时走这里)
    sc.run();
}

}

该方法用于封装SocketProcessor和调用后续执行,放于父类中,所有实现类通用该方法

Executor

线程池中的线程执行SocketProcessor时会去交给ProtocolHandler处理 getHandler().process(socketWrapper, event)

SocketProcessor中存在连接等包装组件,通过它能够处理请求(读取数据)和响应(写回数据)

需要注意的是Tomcat中的线程池是自己实现的,而不是并发包下的ThreadPoolExecutor

(对于Tomcat的线程池,我们后续文章再进行分析)

NioEndPoint大致的运行流程如下图:

连接的包装类(NioSocketWrapper)为核心贯穿全文,封装流程如下:

Acceptor接收连接:NioChannel(拿到客户端连接) -> NioSocketWrapper -> PollerEvent(放入Poller的SynchronizedQueue)

Poller处理PollerEvent注册到Selector,并阻塞监听IO事件:

PollerEvent(从SynchronizedQueue中取出) -> NioSocketWrapper(注册到Selector) -> SocketProcessor(事件发生时交给Executor)

Nio2EndPoint

Nio2EndPoint使用异步IO模型(AIO)来处理网络通信

AIO的特点就是异步,使用回调函数,当数据就绪时使用异步线程调用回调函数

无需再像NIO中使用Selector阻塞,让应用线程来触发读取数据,阻塞到数据拷贝到应用缓冲区

Nio2实际上指的就是AIO,NIO2表明这是对原有NIO的一个升级版本

Nio2EndPoint处理网络通信时不再需要检测IO事件,把这件事交给内核去做,当事件发生(数据就绪)时使用异步线程调用回调函数即可

相比于NioEndPoint,Nio2EndPoint在处理网络通信时,不需要再用Poller检测IO事件

Nio2Acceptor

Nio2EndPoint中使用Nio2Acceptor接收连接,Nio2Acceptor继承Acceptor并实现回调接口CompletionHandler

class Nio2Acceptor 
extends Acceptor 
implements CompletionHandler

回调接口第一个参数为IO操作的结果,第二个泛型为操作使用的附件,其中有两个方法分别代表着成功/失败后执行的回调

public interface CompletionHandler {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

Nio2Acceptor.run

由于使用AIO,Nio2Acceptor在执行任务时不再需要循环,只需要携带回调函数,当客户端连接完成时触发回调

在执行时主要做两件事:

  1. 使用LimitLatch限制连接数
  2. 接收连接
public void run() {
    if (!isPaused()) {
        try {
            //1.使用LimitLatch限制连接数
            countUpOrAwaitConnection();
        } catch (InterruptedException e) {
        }
        if (!isPaused()) {
            //2.接收连接
            serverSock.accept(null, this);
        } else {
            state = AcceptorState.PAUSED;
        }
    } else {
        state = AcceptorState.PAUSED;
    }
}

serverSock.accept(null, this); 在接收连接时,使用的服务端channel为AsynchronousServerSocketChannel,并把当前对象作为回调传入

这样在下次收到连接后的回调又可以调用该方法,以此来达到不需要循环调用

Nio2Acceptor.completed

回调成功的方法中主要做几件事:

  1. 是否限制连接数量
  2. 调用accept,方便接收下次连接
  3. 调用后续处理
public void completed(AsynchronousSocketChannel socket,Void attachment) {
    errorDelay = 0;
    if (isRunning() && !isPaused()) {
        //1.是否限制连接数量
        if (getMaxConnections() == -1) {
            //不限制连接数量,方便接收下一次连接
            serverSock.accept(null, this);
        } else if (getConnectionCount() < getMaxConnections()) {
            try {
                //当前连接数小于最大限制连接数,不阻塞,主要是去自增计数
                countUpOrAwaitConnection();
            } catch (InterruptedException e) {
                // Ignore
            }
            //方便接收下次连接
            serverSock.accept(null, this);
        } else {
            //当前连接数大于等于最大限制连接数,再调用limitlatch会阻塞,为了避免阻塞使用线程池去执行(排队)
            getExecutor().execute(this);
        }
        //setSocketOptions后续处理
        if (!setSocketOptions(socket)) {
            closeSocket(socket);
        }
    } else {
        if (isRunning()) {
            state = AcceptorState.PAUSED;
        }
        destroySocket(socket);
    }
}

在Acceptor中也会调用setSocketOptions方法,那时会将连接包装NioSocketWrapper,然后封装为PollerEvent放入poller队列

在AIO中由于不再存在poller,该方法会将连接包装为Nio2SocketWrapper,然后调用父类AbstractEndpoint.processSocket方法去执行

AbstractEndpoint.processSocket

在此方法中会先将包装类封装为SocketProcessor再去执行

public boolean processSocket(SocketWrapperBase socketWrapper,SocketEvent event, boolean dispatch) {
        //...
    //封装为SocketProcessor
        SocketProcessorBase<s> sc = null;
        if (processorCache != null) {
            sc = processorCache.pop();
        }
        if (sc == null) {
            sc = createSocketProcessor(socketWrapper, event);
        } else {
            sc.reset(socketWrapper, event);
        }
        Executor executor = getExecutor();
        if (dispatch &amp;&amp; executor != null) {
            executor.execute(sc);
        } else {
            //当前线程执行
            sc.run();
        }
    
    return true;

}

需要注意的是,当前线程本来就是异步回调线程,参数dispatch会为false

也就是这里不会使用线程池去执行,而是由当前异步回调的线程去执行SocketProcessor

Nio2SocketWrapper

当前线程是连接完成执行异步回调的线程,去执行SocketProcessor也就是会使用Processor解析数据,但此时数据可能还未准备好

为了不让Processor阻塞等待,这里会失败,直到数据就绪时触发的异步回调来执行时才能够读到数据

Nio2SocketWrapper中包含一些读写事件的回调

比如读回调中:当数据就绪时,会去执行processSocket,也就是封装SocketProcessor进行后续调用(此时会第二次使用Processor进行读数据,这样确保数据已就绪)

this.readCompletionHandler = new CompletionHandler() {
    @Override
    public void completed(Integer nBytes, ByteBuffer attachment) {
        //...
        getEndpoint().processSocket(Nio2SocketWrapper.this, SocketEvent.OPEN_READ, false);
    }
}    

Http11Processor

在Processor处理HTTP协议的实现类Http11Processor中,执行service解析请求时,会先解析请求头parseRequestLine

public SocketState service(SocketWrapperBase socketWrapper)  throws IOException {
    //...
    inputBuffer.parseRequestLine(keptAlive, protocol.getConnectionTimeout(),protocol.getKeepAliveTimeout())
    //...    
}

当parseRequestLine返回false时,说明数据未就绪,不会执行后续操作,因此第一次读数据时由于数据未就绪不会再往后执行

NioEndPoint大致的运行流程如下图:

在Nio2EndPoint中,使用异步回调的方式,避免poller中的操作,能够提升效率,但是大量异步线程的引入又会带来线程上下文切换的开销

连接的包装类(Nio2SocketWrapper)为核心贯穿全文,封装流程如下:

Nio2Channel(回调拿到客户端连接) -> Nio2SocketWrapper -> SocketProcessor(事件发生时)

连接完成的回调:Nio2Channel -> Nio2SocketWrapper -> SocketProcessor -> Http11Processor(解析失败,数据未就绪)

读事件就绪的回调:Nio2SocketWrapper -> SocketProcessor -> Http11Processor

AprEndPoint

APR(Apache Portable Runtime)是Apache提供的可移植运行库,是为了早期的Tomcat的提供高性能的

早期NIO还不成熟,使用APR通过JNI调用本地C语言实现的库,能够使用操作系统的epoll来实现多路复用模型,旨在提高高性能

AprEndPoint 流程与NioEndPoint相同,只是其中调用的方法不同,NioEndPoint调用JDK NIO的API,而AprEndPoint调用APR库

AprEndPoint在通道上使用的缓冲区是基于直接内存的(DirectByteBuffer),而NioEndPoint与Nio2EndPoint都是使用堆内存的(HeapByteBuffer)

使用直接内存的好处是能够减少数据拷贝带来的开销,但无法使用JVM来进行管理内存

并且AprEndPoint还能使用零拷贝sendfile,将数据从磁盘读到网卡发送时减少各种拷贝开销

但在后来NIO、AIO逐渐成熟,AprEndPoint带来的好处逐渐被追平,在Tomcat 10时被遗弃

总结

NioEndPoint将处理网络通信分为接收连接、监听事件、处理请求三个步骤

其中Acceptor负责接收连接,使用LimitLatch限制连接数量(若超过上限则等待),获取客户端连接NioChannel,包装为NioSocketWrapper,再封装为PollerEvent放入Poller的队列中

Poller会轮询处理PollerEvent,通过PollerEvent拿到NioSocketWrapper将连接注册到Selector上,使用Selector监听事件,有事件触发时,从附件中获取连接的包装NioSocketWrapper,将其封装为SocketProcessor交给线程池处理

线程池的线程处理SocketProcessor时,则会使用Processor解析协议,后续再封装请求/响应调用容器处理

Nio2EndPoint 使用AIO,由内核监听事件(数据就绪)后使用异步线程执行回调

其中Nio2Acceptor继承Acceptor,接收连接不再循环处理,而是使用异步回调:当连接完成后再使用LimitLatch判断是否限制连接,调用非阻塞accept便于接收下次连接(回调),然后将客户端连接Nio2Channel封装为Nio2SocketWrapper再封装为SocketProcessor处理(后续调用processor无法解析,因为当前是连接完成的回调线程,数据还未就绪)

当数据就绪时,通过Nio2SocketWrapper的回调继续封装为SocketProcessor向后处理(后续调用processor可以解析,因为当前为读数据就绪的回调线程,第二次读)

早期的APR通过本地库、直接内存、零拷贝等多种方式进行性能优化

🌠最后(不要白嫖,一键三连求求拉~)

本篇文章被收入专栏 Tomcat全解析:架构设计与核心组件实现,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜


这是一个从 https://juejin.cn/post/7368477662375313435 下的原始话题分离的讨论话题