作为常用的http
协议服务器,tomcat
应用非常广泛。tomcat也是遵循Servelt协议的,Servelt
协议可以让服务器与真实服务逻辑代码进行解耦。各自只需要关注Servlet
协议即可。
对于tomcat是如何作为一个高性能的服务器的呢?你是不是也会有这样的疑问?
tomcat是如何接收网络请求?
如何做到高性能的http
协议服务器?
tomcat从8.0往后开始使用了NIO非阻塞io模型,提高了吞吐量,本文的源码是tomcat 9.0.48版本
org.apache.tomcat.util.net.Acceptor
实现了Runnable
接口,在一个单独的线程中以死循环的方式一直进行socket的监听
线程的初始化及启动是在方法org.apache.tomcat.util.net.AbstractEndpoint#startAcceptorThread
有个很重要的属性org.apache.tomcat.util.net.AbstractEndpoint
;同时实现了run
方法,方法中主要有以下功能:
socketChannel
public void run() { int errorDelay = 0; try { // Loop until we receive a shutdown command while (!stopCalled) { ... if (stopCalled) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait // 如果连接超过了 8*1024,则线程阻塞等待; 是使用org.apache.tomcat.util.threads.LimitLatch类实现了分享锁(内部实现了AbstractQueuedSynchronizer) // 请你注意到达最大连接数后操作系统底层还是会接收客户端连接,但用户层已经不再接收。 endpoint.countUpOrAwaitConnection(); // Endpoint might have been paused while waiting for latch // If that is the case, don"t accept new connections if (endpoint.isPaused()) { continue; } U socket = null; try { // Accept the next incoming connection from the server // socket // 抽象方法,不同的endPoint有不同的实现方法。NioEndPoint为例,实现方法为serverSock.accept(),这个方法主要看serverSock实例化时如果为阻塞,accept方法为阻塞;反之为立即返回,如果没有socket链接,则为null socket = endpoint.serverSocketAccept(); } catch (Exception ioe) { // We didn"t get a socket endpoint.countDownConnection(); if (endpoint.isRunning()) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (!stopCalled && !endpoint.isPaused()) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // endPoint类的抽象方法,不同的endPoint有不同的实现。处理获取到的socketChannel链接,如果该socket链接能正常处理,那么该方法会返回true,否则为false if (!endpoint.setSocketOptions(socket)) { endpoint.closeSocket(socket); } } else { endpoint.destroySocket(socket); } } catch (Throwable t) { ... } } } finally { stopLatch.countDown(); } state = AcceptorState.ENDED; }
再来看下org.apache.tomcat.util.net.NioEndpoint#setSocketOptions
方法的具体实现(NioEndpoint为例)
这个方法中主要做的事:
protected boolean setSocketOptions(SocketChannel socket) { NioSocketWrapper socketWrapper = null; try { // Allocate channel and wrapper // 优先使用已有的缓存nioChannel NioChannel channel = null; if (nioChannels != null) { channel = nioChannels.pop(); } if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(bufhandler, this); } else { channel = new NioChannel(bufhandler); } } // 将nioEndpoint与NioChannel进行包装 NioSocketWrapper newWrapper = new NioSocketWrapper(channel, this); channel.reset(socket, newWrapper); connections.put(socket, newWrapper); socketWrapper = newWrapper; // Set socket properties // Disable blocking, polling will be used // 设置当前链接的socket为非阻塞 socket.configureBlocking(false); if (getUnixDomainSocketPath() == null) { socketProperties.setProperties(socket.socket()); } socketWrapper.setReadTimeout(getConnectionTimeout()); socketWrapper.setWriteTimeout(getConnectionTimeout()); socketWrapper.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); // 将包装后的nioChannel与nioEndpoint进行注册,注册到Poller,将对应的socket包装类添加到Poller的队列中,同时唤醒selector poller.register(socketWrapper); return true; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error(sm.getString("endpoint.socketOptionsError"), t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } if (socketWrapper == null) { destroySocket(socket); } } // Tell to close the socket if needed return false; }
上一小节是接收到了socket请求,进行包装之后,将socket添加到了Poller
的队列上,并可能唤醒了Selector
,本小节就来看看,Poller是如何进行socket的轮询的。
首先org.apache.tomcat.util.net.NioEndpoint.Poller
也是实现了Runnable接口,是一个可以单独启动的线程
初始化及启动是在org.apache.tomcat.util.net.NioEndpoint#startInternal
重要的属性:
java.nio.channels.Selector
:在Poller对象初始化的时候,就会启动轮询器SynchronizedQueue<PollerEvent>
:同步的事件队列再来看下具体处理逻辑,run方法的源码
public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { // 去SynchronizedQueue事件队列中拉去,看是否已经有了事件,如果有,则返回true // 如果从队列中拉取到了event(即上一步将NioSocketWrapper封装为PollerEvent添加到次队列中),将socketChannel注册到Selector上,标记为SelectionKey.OP_READ,添加处理函数attachment(为Accetpor添加到Poller时的 // NioSocketWrapper) hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { // If we are here, means we have other stuff to do // Do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } // Either we timed out or we woke up, process events first if (keyCount == 0) { hasEvents = (hasEvents | events()); } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error(sm.getString("endpoint.nio.selectorLoopError"), x); continue; } Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. // selector轮询获取已经注册的事件,如果有事件准备好,此时通过selectKeys方法就能拿到对应的事件 while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); // 获取到事件后,从迭代器删除事件,防止事件重复轮询 iterator.remove(); // 获取事件的处理器,这个attachment是在event()方法中注册的,后续这个事件的处理,就交给这个wrapper去处理 NioSocketWrapper socketWrapper = (NioSocketWrapper) sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (socketWrapper != null) { processKey(sk, socketWrapper); } } // Process timeouts timeout(keyCount,hasEvents); } getStopLatch().countDown(); }
在这里,有一个很重要的方法,org.apache.tomcat.util.net.NioEndpoint.Poller#events()
,他是从Poller
的事件队列中获取Acceptor
接收到的可用socket,并将其注册到Selector
上
/** * Processes events in the event queue of the Poller. * * @return <code>true</code> if some events were processed, * <code>false</code> if queue was empty */ public boolean events() { boolean result = false; PollerEvent pe = null; // 如果Acceptor将socket添加到队列中,那么events.poll()方法就能拿到对应的事件,否则拿不到就返回false for (int i = 0, size = events.size(); i < size && (pe = events.poll()) != null; i++ ) { result = true; NioSocketWrapper socketWrapper = pe.getSocketWrapper(); SocketChannel sc = socketWrapper.getSocket().getIOChannel(); int interestOps = pe.getInterestOps(); if (sc == null) { log.warn(sm.getString("endpoint.nio.nullSocketChannel")); socketWrapper.close(); } else if (interestOps == OP_REGISTER) { // 如果是Acceptor刚添加到队列中的事件,那么此时的ops就是OP_REGISTER try {, // 将次socket注册到selector上,标记为OP_READ事件,添加事件触发时处理函数socketWrapper sc.register(getSelector(), SelectionKey.OP_READ, socketWrapper); } catch (Exception x) { log.error(sm.getString("endpoint.nio.registerFail"), x); } } else { // ??这里的逻辑,不清楚什么情况下会进入到这个分支里面 final SelectionKey key = sc.keyFor(getSelector()); if (key == null) { // The key was cancelled (e.g. due to socket closure) // and removed from the selector while it was being // processed. Count down the connections at this point // since it won"t have been counted down when the socket // closed. socketWrapper.close(); } else { final NioSocketWrapper attachment = (NioSocketWrapper) key.attachment(); if (attachment != null) { // We are registering the key to start with, reset the fairness counter. try { int ops = key.interestOps() | interestOps; attachment.interestOps(ops); key.interestOps(ops); } catch (CancelledKeyException ckx) { cancelledKey(key, socketWrapper); } } else { cancelledKey(key, socketWrapper); } } } if (running && !paused && eventCache != null) { pe.reset(); eventCache.push(pe); } } return result; }
还有一个重要方法就是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey
,上一个方法是获取event,并注册到selector,那这个方法就是通过Selector
获取到的数据准备好的event,并开始封装成对应的业务处理线程SocketProcessorBase
,扔到线程池里开始处理
protected void processKey(SelectionKey sk, NioSocketWrapper socketWrapper) { try { if (close) { cancelledKey(sk, socketWrapper); } else if (sk.isValid()) { if (sk.isReadable() || sk.isWritable()) { if (socketWrapper.getSendfileData() != null) { processSendfile(sk, socketWrapper, false); } else { unreg(sk, socketWrapper, sk.readyOps()); boolean closeSocket = false; // Read goes before write if (sk.isReadable()) { //这里如果是异步的操作,就会走这里 if (socketWrapper.readOperation != null) { if (!socketWrapper.readOperation.process()) { closeSocket = true; } } else if (socketWrapper.readBlocking) { // readBlocking默认为false synchronized (socketWrapper.readLock) { socketWrapper.readBlocking = false; socketWrapper.readLock.notify(); } } else if (!processSocket(socketWrapper, SocketEvent.OPEN_READ, true)) { // 处理正常的事件,这里的processSocket就要正式开始处理请求了。 // 将对应的事件封装成对应的线程,然后交给线程池去处理正式的请求业务 closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (socketWrapper.writeOperation != null) { if (!socketWrapper.writeOperation.process()) { closeSocket = true; } } else if (socketWrapper.writeBlocking) { synchronized (socketWrapper.writeLock) { socketWrapper.writeBlocking = false; socketWrapper.writeLock.notify(); } } else if (!processSocket(socketWrapper, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk, socketWrapper); } } } } else { // Invalid key cancelledKey(sk, socketWrapper); } } catch (CancelledKeyException ckx) { cancelledKey(sk, socketWrapper); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.nio.keyProcessingError"), t); } }
上一步,Selector获取到了就绪的请求socket,然后根据socket注册的触发处理函数等,将这些数据进行封装,扔到了线程池里,开始具体的业务逻辑处理。本节就是从工作线程封装开始,org.apache.tomcat.util.net.SocketProcessorBase
为工作线程类的抽象类,实现了Runnable接口,不同的Endpoint实现具体的处理逻辑,本节以NioEndpoint为例
以下为org.apache.tomcat.util.net.AbstractEndpoint#processSocket
方法源码
/** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } // 优先使用已经存在的线程 SocketProcessorBase<S> sc = null; if (processorCache != null) { sc = processorCache.pop(); } if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // 获取线程池。线程池的初始化,是在Acceptor、Poller这两个单独线程启动之前创建 // tomcat使用了自定义的org.apache.tomcat.util.threads.TaskQueue,这块tomcat也进行了小的适配开发 // 核心线程为10个,最大200线程 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; }
上面的方法是得到了处理业务逻辑的线程SocketProcessorBase,NioEndpoint内部类org.apache.tomcat.util.net.NioEndpoint.SocketProcessor
继承了这个抽象类,也就是具体的业务处理逻辑在org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun
方法中,最终调用到我们的Servlet
protected void doRun() { /* * Do not cache and re-use the value of socketWrapper.getSocket() in * this method. If the socket closes the value will be updated to * CLOSED_NIO_CHANNEL and the previous value potentially re-used for * a new connection. That can result in a stale cached value which * in turn can result in unintentionally closing currently active * connections. */ Poller poller = NioEndpoint.this.poller; if (poller == null) { socketWrapper.close(); return; } try { int handshake = -1; try { // 握手相关判断逻辑 ... } catch (IOException x) { ... } // 三次握手成功了 if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket // event为SocketEvent.OPEN_READ,这个变量是org.apache.tomcat.util.net.NioEndpoint.Poller#processKey方法赋值 if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { // 这里就开始正式处理请求了 state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { poller.cancelledKey(getSelectionKey(), socketWrapper); } } else if (handshake == -1 ) { getHandler().process(socketWrapper, SocketEvent.CONNECT_FAIL); poller.cancelledKey(getSelectionKey(), socketWrapper); } else if (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { poller.cancelledKey(getSelectionKey(), socketWrapper); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error(sm.getString("endpoint.processing.fail"), t); poller.cancelledKey(getSelectionKey(), socketWrapper); } finally { socketWrapper = null; event = null; //return to cache if (running && !paused && processorCache != null) { processorCache.push(this); } } }
Tomcat
是如何接收网络请求?
使用java nio的同步非阻塞去进行网络监听。
org.apache.tomcat.util.net.AbstractEndpoint#bindWithCleanup
中初始化网络监听、SSL
{ .... serverSock = ServerSocketChannel.open(); socketProperties.setProperties(serverSock.socket()); InetSocketAddress addr = new InetSocketAddress(getAddress(), getPortWithOffset()); // 当应用层面的连接数到达最大值时,操作系统可以继续接收连接,那么操作系统能继续接收的最大连接数就是这个队列长度,可以通过acceptCount 参数配置,默认是 100 serverSock.bind(addr, getAcceptCount()); } serverSock.configureBlocking(true); //mimic APR behavior
org.apache.tomcat.util.net.NioEndpoint#startInternal
中初始化业务处理的线程池、连接限制器、Poller线程、Acceptor线程
如何做到高性能的http
协议服务器?
Tomcat把接收连接、检测 I/O 事件以及处理请求进行了拆分,用不同规模的线程去做对应的事情,这也是tomcat能高并发处理请求的原因。不让线程阻塞,尽量让CPU忙起来
是怎么设计的呢?
通过接口、抽象类等,将不同的处理逻辑拆分,各司其职
org.apache.tomcat.util.net.AbstractEndpoint
:I/O事件的检测、处理逻辑都在这个类的实现类里面。使用模板方法,不同的协议有不同的实现方法。NioEndpoint/Nio2Endpoint/AprEndpoint
org.apache.tomcat.util.net.NioEndpoint.Poller
:引用了java.nio.channels.Selector
,内部有个事件队列,监听I/O事件具体就是在这里做的org.apache.tomcat.util.net.NioEndpoint.NioSocketWrapper
org.apache.tomcat.util.net.NioEndpoint.SocketProcessor
: 具体处理请求的线程类到此这篇关于Apache Tomcat如何高并发处理请求 的文章就介绍到这了,更多相关Apache Tomcat高并发请求 内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!