Java线程池如何保证线程池的核心线程存活不被销毁?execute()的执行逻辑

🎯 365bet官网网 📅 2025-10-26 06:16:49 👤 admin 👀 9829 ❤️ 191
Java线程池如何保证线程池的核心线程存活不被销毁?execute()的执行逻辑

线程池介绍

基本元素

线程池是一个创建使用线程并能保存使用过的线程以达到服用的对象,他使用workQueue当作阻塞队列,workers作为线程的封装操作对象。

/**

* 用于保留任务并移交给工作线程(指允许不超过maximumPoolSize大小的线程)的队列。

* 我们不要求workQueue.poll()返回null必然意味着workQueue.isEmpty(),因此仅依靠isEmpty来查看队列是否为空(例如,在决定是否从SHUTDOWN过渡到TIDYING时必须这样做)。

* 这可容纳特殊用途的队列,例如DelayQueues,允许poll()返回null,即使它在延迟到期后稍后可能返回non-null。

*/

private final BlockingQueue workQueue;

/**

* 包含池中所有工作线程的集合。只有在持有mainLock锁时才能访问。

*/

private final HashSet workers = new HashSet();

保证线程安全的元素

/**

* 主池控制状态变量ctl包含了两个概念字段,workerCount表示有效线程数,runState表示是否正在运行,正在关闭等

* 为了将它们打包为一个int,我们将workerCount限制为(2 ^ 29)-1(约5亿)个线程,而不是(2 ^ 31)-1(20亿)可以表示的线程。如果将来有问题,可以将该变量更改为AtomicLong,并调整以下移位掩码常量。但是在需要之前,使用int可以使此代码更快,更简单。

* workerCount是已被允许启动但不允许停止的工人数。该值可能与活动线程的实际数量暂时不同,例如,当ThreadFactory在被询问时创建线程失败,并且退出线程仍在终止之前执行簿记操作时,该值会有所不同。用户可见的线程池大小是工作集合的当前大小。

*/

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

/**

* 锁定时要锁定工人集合和相关簿记。

* 虽然我们可以使用某种并发集合,但事实证明,通常最好使用锁。 原因之一是,这可以序列化interruptIdleWorkers,从而避免了不必要的中断风暴,尤其是在关机期间。 否则,退出线程将同时中断那些尚未中断的线程。 它还简化了一些相关的统计数据,如largePoolSize等。我们还在shutdown和shutdownNow上保留mainLock,以确保在单独检查中断和实际中断的权限时,工人集合是稳定的。

*/

private final ReentrantLock mainLock = new ReentrantLock();

execute(Runnable)执行逻辑

execute()执行逻辑,来自下方参考链接

/*

* 1.如果正在运行的线程少于corePoolSize线程,会创建新线程。对addWorker方法的调用从原子上检查runState和workerCount,并通过返回false来表示添加工作线程失败了。

* 2.如果一个任务可以成功排队,那么我们仍然需要仔细检查是否应该添加一个线程(因为现有线程自上次检查后就死掉了)或自从进入该方法以来该池已关闭。因此,我们重新检查状态,并在必要时回滚排队(如果已停止),或者在没有线程的情况下启动新线程。

* 3.如果我们无法将任务排队,则尝试添加一个新线程。如果失败,线程池可能已关闭或已饱和,因此拒绝该任务。

*/

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

////获取线程池控制状态

int c = ctl.get();

//通过workerCountOf计算出实际线程数

if (workerCountOf(c) < corePoolSize) {

//未超过核心线程数,则新增 Worker 对象,true表示核心线程,false表示最大线程

if (addWorker(command, true))

return;

c = ctl.get();

}

//核心线程满了,如果线程池处于运行状态则往队列中添加任务,

if (isRunning(c) && workQueue.offer(command)) {

int recheck = ctl.get();

//如果不在运行状态则删除阻塞队列中的任务并执行拒绝策略

if (! isRunning(recheck) && remove(command))

reject(command);

//如果当前线程池没有任何工作线程在运行,再次尝试添加新的工作线程

else if (workerCountOf(recheck) == 0)

addWorker(null, false);

}

//尝试调用最非核心线程,失败则执行拒绝策略

else if (!addWorker(command, false))

reject(command);

}

其中主要方法是addWorker()。

private boolean addWorker(Runnable firstTask, boolean core) {

//这部分主要是对运行状态的操作,尝试通过原子操作增加工作线程数,如果成功则跳出循环,否则重新获取ctl的值并重新检查运行状,略过。。。

boolean workerStarted = false;//线程是否启动

boolean workerAdded = false;//线程是否添加

Worker w = null;

try {

w = new Worker(firstTask);

final Thread t = w.thread;

if (t != null) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

// 在锁的保护下重新检查运行状态rs。

int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||

(rs == SHUTDOWN && firstTask == null)) {

if (t.isAlive()) // 线程是否正在执行任务

throw new IllegalThreadStateException();

workers.add(w);

//更新最大线程池大小largestPoolSize。

int s = workers.size();

if (s > largestPoolSize)

largestPoolSize = s;

workerAdded = true;

}

} finally {

mainLock.unlock();

}

if (workerAdded) {

//线程启动!

t.start();

workerStarted = true;

}

}

} finally {

if (! workerStarted)

addWorkerFailed(w);

}

return workerStarted;

}

经过一系列逻辑运行后,终于 t.start() 了!,然后他会调用Worker类重写的run()方法,而里面只有runWorker()方法调用。

final void runWorker(Worker w) {

Thread wt = Thread.currentThread();

//获取要执行的任务

Runnable task = w.firstTask;

w.firstTask = null;

//释放锁以允许中断。

w.unlock();

//设置一个布尔变量completedAbruptly,用于标记任务是否突然完成。

boolean completedAbruptly = true;

try {

//完成初始任务后,继续等待获取新任务执行

while (task != null || (task = getTask()) != null) {

w.lock();

//检查线程池是否正在停止,如果是,则确保线程被中断;如果不是,则确保线程没有被中断

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);//方法里面是空的,需要自定义实现

Throwable thrown = null;

try {

//执行我们传入的代码

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);//方法里面是空的,需要自定义实现

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

//最后执行线程退出前的操作

processWorkerExit(w, completedAbruptly);

}

}

这个方法的主要逻辑是通过循环不停调用getTask()获取任务并执行,直到getTask()返回为空。

/**

* 根据当前配置设置执行阻塞或定时等待任务,或者线程由于以下任何原因而必须退出,则返回null:

* 1. 超过最大线程数。

* 2.线程池已停止。

* 3.线程池被关闭并且队列已经空了。

* 4.线程超时等待任务,并且在定时等待之前和之后都将终止线程(即{@code allowCoreThreadTimeOut || workerCount> corePoolSize}),并且如果队列为非空,此工作程序不是线程池中的最后一个线程。

*/

private Runnable getTask() {

//声明一个布尔变量timedOut,用于记录上一次poll()是否超时。

boolean timedOut = false;

for (;;) {

int c = ctl.get();

int rs = runStateOf(c);

// 检查线程池状态以及阻塞队列是否为空

if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

decrementWorkerCount();

return null;

}

int wc = workerCountOf(c);

// 允许核心线程超时或者实际线程数大于核心线程则为true,否则为false

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))

&& (wc > 1 || workQueue.isEmpty())) {

if (compareAndDecrementWorkerCount(c))

return null;

continue;

}

try {

//poll():使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。

//take():使当前线程等待,直到发出信号或被中断为止。

Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

workQueue.take();

if (r != null)

return r;

timedOut = true;

} catch (InterruptedException retry) {

timedOut = false;

}

}

}

核心线程和非核心线程的逻辑区分

特别说明下核心线程和非核心线程的区别,面试容易问到。

核心线程如果不设置属性 allowCoreThreadTimeOut 为 true,那么创建后永远不会被关闭中断,会在 getTask() 方法中的 workQueue.take() 处 阻塞等待任务;

如果核心线程设置属性 allowCoreThreadTimeOut 为 true或者是非核心线程,那么就会调用 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 方法,指定时间内获取不到任务,就会跳出 runWorker(Worker w) 方法中的while循环而关闭线程。

参考链接

Java线程池是如何保证核心线程不被销毁的

🎯 相关推荐

谢娜综艺节目
🎯 安徽365热线

谢娜综艺节目

📅 07-09 👀 7953
天谕手游pvp职业哪个强 天谕手游pvp职业推荐
🎯 安徽365热线

天谕手游pvp职业哪个强 天谕手游pvp职业推荐

📅 10-17 👀 9360
微信商家小店怎么关闭?微信小店和微店的区别
🎯 日博365邮箱

微信商家小店怎么关闭?微信小店和微店的区别

📅 10-26 👀 1167

🎁 合作伙伴