3 minutes
Week1031_tip
ARTS - Tip 补 2019.2.6
线程池的getTask()方法
线程池持有线程不退出的思路
学线程池,都知道它的好处就是减少了频繁创建、销毁线程的开销,从而提高效率的,那么到底是怎么实现这个机制的呢?
我们自己设想下,线程的声明周期就是创建、就绪、运行、销毁。如果线程一直在运行状态并且可以接收其他任务,那么其实就相当于是避开了频繁创建、销毁的开销。那么,线程池也是这么个思路。
先看代码 runWorker, 我们知道runWorker其实就是相当于线程的run方法, 同理addWorker 可以看做是创建线程。
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
/** 注意这个while循环, task为空时候就调用了getTask()方法,
* 这个getTask()方法其实就充当了获取待执行任务的功能
* 还有个重要点是看这个while循环里的最后,重置 task = null,
* 这时候就相当于调用getTask()获取任务了,接下来我们看getTask()
*/
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
// 这里重置了task 为 null,循环继续走task = getTask()的条件
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask()方法
我们看源码:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
// 如果设置了核心线程超时退出 或者线程数大于核心线程大小,timed 就是true
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/* 最重要的部分看这里,这做了一个判断就是
* 1. timed == true, 走 workQueue.pool() 方法,
* 2. timed == false, 走 workQueue.task() 方法
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
以上代码我们得到结果:
- timed== true 其实就是非核心线程处理和设置了核心线程超时的核心线程处理, 走poll()方法
- timed == false 其实就是核心线程处理, 走 take() 方法
所以实际上核心线程和非核心线程的区别(设置了超时时间的其实就是当做非核心线程处理了) 就是获取阻塞队列中方式不一样的区别了。
pool 取得队列中元素,没有就返回null, take没有就阻塞,不返回,根据这个特性,我们就发现了在runWorker里的getTask() 方法在队列为空时候就会被一直被阻塞,也就是实现了runWorker卡在那不退出的情况,也就是线程一直处在 running 的状态,等到取得新的任务,线程继续执行。
简单总结
线程池实现方式:
Execute 方法提交任务
addWorker 相当于线程创建,并调用了start方法,这相当于创建了一大堆线程,
每个线程执行自己的 runWorker() 方法,runWorker方法本质就是死循环从队列中取任务,当都取不到任务时候,就相当于队列为空,非核心线程获取到null 任务,核心线程获取不到任务一直被阻塞,
非核心线程继续走完runWorker() 方法,线程结束退出。
poll 和take
这里说下poll和take的不同,这里看下LinkedBlockingQueue的实现:
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 当取不到元素时候,就延迟 返回null
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 当取不到元素时候,就执行了await()方法,阻塞
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
那么,take方法 notEmpty.await() 又是什么呢?跟上去,代码如下
private final Condition notEmpty = takeLock.newCondition();
// 那么,什么是Condition呢?我们继续看newCondition()
// 这个是在ReentrantLock类里的
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
至于这一大段 await 到底是什么意思,留个坑,以后再填吧,(我也不懂)。