tomcat執行緒池和普通的執行緒池設計上有所區別,下面主要來看看它是如何設計的
org.apache.tomcat.util.net.AbstractEndpoint#createExecutor
tomcat建立執行緒池
public void createExecutor() {
internalExecutor = true;
// 任務佇列和普通的佇列有所區別,後續分析
TaskQueue taskqueue = new TaskQueue();
// 執行緒工廠用於建立執行緒 本地專案name=http-nio-port-exec-序號
TaskThreadFactory tf = new TaskThreadFactory(getName() + "-exec-", daemon, getThreadPriority());
// 建立執行緒池,注意這個ThreadPoolExecutor和java.util.concurrent包下的ThreadPoolExecutor有所區別
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), 60, TimeUnit.SECONDS,taskqueue, tf);
// 給任務佇列設定執行緒池,用於後續任務來了判斷是建立執行緒執行還是將執行緒新增到任務佇列
taskqueue.setParent( (ThreadPoolExecutor) executor);
}
tomcat的ThreadPoolExecutor實際上繼承了java包的ThreadPoolExecutor再其上客製化了一些功能
submittedCount:記錄了執行緒池中正有多少執行緒在執行任務(還沒執行完)
lastContextStoppedTime:記錄上次上下文停止的時間
lastTimeThreadKilledItself:記錄執行緒上一次為防止記憶體漏失自我kill的時間
構造方法:呼叫了父類別ThreadPoolExecutor,同時呼叫了prestartAllCoreThreads方法,再完成執行緒池的建立後預熱核心執行緒,使得任務到來時能夠直接執行任務,不用再花時間去建立執行緒,提高了效率。
execute方法:執行executor方法時首先將submittedCount加1,再呼叫父類別的executor方法執行任務。若丟擲RejectedExecutionException異常則再回嘗試將任務新增到任務佇列匯中
afterExecute:重寫父類別方法,任務執行完成後呼叫afterExecute勾點方法將submittedCount減1,再嘗試停止執行緒
contextStopping:若容器上下文停止,則會記錄lastContextStoppedTime為當前時間並中斷正在執行的執行緒。呼叫currentThreadShouldBeStopped方法的時候會判斷執行緒TaskThread建立的時間是否在lastContextStoppedTime之前,表示當前執行緒是在上一個上下文執行期間建立,則會嘗試kill執行緒
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
/**
* The string manager for this package.
*/
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
/**
* The number of tasks submitted but not yet finished. This includes tasks
* in the queue and tasks that have been handed to a worker thread but the
* latter did not start executing the task yet.
* This number is always greater or equal to {@link #getActiveCount()}.
*/
private final AtomicInteger submittedCount = new AtomicInteger(0);
private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
/**
* Most recent time in ms when a thread decided to kill itself to avoid
* potential memory leaks. Useful to throttle the rate of renewals of
* threads.
*/
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
/**
* Delay in ms between 2 threads being renewed. If negative, do not renew threads.
*/
private long threadRenewalDelay = Constants.DEFAULT_THREAD_RENEWAL_DELAY;
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
prestartAllCoreThreads();
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
prestartAllCoreThreads();
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new RejectHandler());
prestartAllCoreThreads();
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new RejectHandler());
prestartAllCoreThreads();
}
public long getThreadRenewalDelay() {
return threadRenewalDelay;
}
public void setThreadRenewalDelay(long threadRenewalDelay) {
this.threadRenewalDelay = threadRenewalDelay;
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedCount.decrementAndGet();
if (t == null) {
stopCurrentThreadIfNeeded();
}
}
/**
* If the current thread was started before the last time when a context was
* stopped, an exception is thrown so that the current thread is stopped.
*/
protected void stopCurrentThreadIfNeeded() {
if (currentThreadShouldBeStopped()) {
long lastTime = lastTimeThreadKilledItself.longValue();
if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
if (lastTimeThreadKilledItself.compareAndSet(lastTime,
System.currentTimeMillis() + 1)) {
// OK, it's really time to dispose of this thread
final String msg = sm.getString(
"threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
Thread.currentThread().getName());
throw new StopPooledThreadException(msg);
}
}
}
}
protected boolean currentThreadShouldBeStopped() {
if (threadRenewalDelay >= 0
&& Thread.currentThread() instanceof TaskThread) {
TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
if (currentTaskThread.getCreationTime() <
this.lastContextStoppedTime.longValue()) {
return true;
}
}
return false;
}
public int getSubmittedCount() {
return submittedCount.get();
}
/**
* {@inheritDoc}
*/
@Override
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the <code>Executor</code> implementation.
* If no threads are available, it will be added to the work queue.
* If the work queue is full, the system will wait for the specified
* time and it throw a RejectedExecutionException if the queue is still
* full after that.
*
* @param command the runnable task
* @param timeout A timeout for the completion of the task
* @param unit The timeout time unit
* @throws RejectedExecutionException if this task cannot be
* accepted for execution - the queue is full
* @throws NullPointerException if command or unit is null
*/
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
public void contextStopping() {
this.lastContextStoppedTime.set(System.currentTimeMillis());
// save the current pool parameters to restore them later
int savedCorePoolSize = this.getCorePoolSize();
TaskQueue taskQueue =
getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
if (taskQueue != null) {
// note by slaurent : quite oddly threadPoolExecutor.setCorePoolSize
// checks that queue.remainingCapacity()==0. I did not understand
// why, but to get the intended effect of waking up idle threads, I
// temporarily fake this condition.
taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
}
// setCorePoolSize(0) wakes idle threads
this.setCorePoolSize(0);
// TaskQueue.take() takes care of timing out, so that we are sure that
// all threads of the pool are renewed in a limited time, something like
// (threadKeepAlive + longest request time)
if (taskQueue != null) {
// ok, restore the state of the queue and pool
taskQueue.setForcedRemainingCapacity(null);
}
this.setCorePoolSize(savedCorePoolSize);
}
private static class RejectHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r,
java.util.concurrent.ThreadPoolExecutor executor) {
throw new RejectedExecutionException();
}
}
}
首先簡單翻一下注釋,為執行緒池執行專門設計的佇列,使用該佇列執行緒池如果有空閒佇列則會建立執行緒池執行任務而不是將任務放到任務佇列中。
它繼承了LinkedBlockingQueue無界佇列,容量為Integer.MAX_VALUE。
在execute方法中可以看到,當執行緒池執行緒大於核心數量的時候,會執行任務佇列的offer方法,下來來分析下TaskQueue的offer方法:
1.若parent為空也就是未給佇列設定執行緒池,則呼叫父類別offer方法,將任務新增到佇列中
2.執行緒池當前執行緒數量等於執行緒池最大數量,無法新增更多的執行緒,呼叫父類別offer方法,將任務新增到佇列中
3.執行緒池正在執行任務的執行緒數量小於等於執行緒池已有的執行緒數量,說明當前執行緒池有空閒執行緒,呼叫父類別offer方法,將任務新增到佇列中,等待執行緒從佇列中取任務執行
4.執行緒池執行緒數量小於執行緒池最大數量說明還可以增加執行緒,返回false,執行addWorker(command,false)向執行緒池新增非核心執行緒執行任務
5.都不滿足,呼叫父類別offer方法,將任務新增到佇列中
從上面的分析我們可以看到該任務佇列TaskQueue和普通的任務佇列不一樣,當執行緒池的執行緒數量小於最大執行緒數量時,任務不會新增到任務佇列中,而是會新增非核心執行緒來執行任務,當執行緒池執行緒數量達到最大數量時,才會將任務新增到任務佇列中
當然TaskQueue也有force方法直接呼叫父類別offer方法將任務新增到任務佇列中
/**
* As task queue specifically designed to run with a thread pool executor. The
* task queue is optimised to properly utilize threads within a thread pool
* executor. If you use a normal queue, the executor will spawn threads when
* there are idle threads and you wont be able to force items onto the queue
* itself.
*/
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
private transient volatile ThreadPoolExecutor parent = null;
// No need to be volatile. This is written and read in a single thread
// (when stopping a context and firing the listeners)
private Integer forcedRemainingCapacity = null;
public TaskQueue() {
super();
}
public TaskQueue(int capacity) {
super(capacity);
}
public TaskQueue(Collection<? extends Runnable> c) {
super(c);
}
public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}
public boolean force(Runnable o) {
if (parent == null || parent.isShutdown()) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if (parent == null || parent.isShutdown()) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {
// the poll timed out, it gives an opportunity to stop the current
// thread if needed to avoid memory leaks.
parent.stopCurrentThreadIfNeeded();
}
return runnable;
}
@Override
public Runnable take() throws InterruptedException {
if (parent != null && parent.currentThreadShouldBeStopped()) {
return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
// yes, this may return null (in case of timeout) which normally
// does not occur with take()
// but the ThreadPoolExecutor implementation allows this
}
return super.take();
}
@Override
public int remainingCapacity() {
if (forcedRemainingCapacity != null) {
// ThreadPoolExecutor.setCorePoolSize checks that
// remainingCapacity==0 to allow to interrupt idle threads
// I don't see why, but this hack allows to conform to this
// "requirement"
return forcedRemainingCapacity.intValue();
}
return super.remainingCapacity();
}
public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
this.forcedRemainingCapacity = forcedRemainingCapacity;
}
}
當有請求來時,從socket獲取到可讀事件並將socket封裝成一個任務(任務主要是解析請求然後下發到servlet執行),然後呼叫執行緒池的execute方法
/**
* Process the given SocketWrapper with the given status. Used to trigger
* processing as if the Poller (for those endpoints that have one)
* selected the socket.
*
* @param socketWrapper The socket wrapper to process
* @param event The socket event to be processed
* @param dispatch Should the processing be performed on a new
* container thread
*
* @return if processing was triggered successfully
*/
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
try {
if (socketWrapper == null) {
return false;
}
SocketProcessorBase<S> sc = processorCache.pop();
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
} catch (RejectedExecutionException ree) {
getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree);
return false;
} catch (Throwable t) {
ExceptionUtils.handleThrowable(t);
// This means we got an OOM or similar creating a thread, or that
// the pool and its queue are full
getLog().error(sm.getString("endpoint.process.fail"), t);
return false;
}
return true;
}