ThreadPoolExecutor源码解析
1、常用变量的解释
private final AtomicInteger ctl
= new AtomicInteger(ctlOf(RUNNING
, 0));
private static final int COUNT_BITS
= Integer
.SIZE
- 3;
private static final int CAPACITY
= (1 << COUNT_BITS
) - 1;
private static final int RUNNING
= -1 << COUNT_BITS
;
private static final int SHUTDOWN
= 0 << COUNT_BITS
;
private static final int STOP
= 1 << COUNT_BITS
;
private static final int TIDYING
= 2 << COUNT_BITS
;
private static final int TERMINATED
= 3 << COUNT_BITS
;
private static int runStateOf(int c
) { return c
& ~CAPACITY
; }
private static int workerCountOf(int c
) { return c
& CAPACITY
; }
private static int ctlOf(int rs
, int wc
) { return rs
| wc
; }
private static boolean runStateLessThan(int c
, int s
) {
return c
< s
;
}
private static boolean runStateAtLeast(int c
, int s
) {
return c
>= s
;
}
2、构造方法
public ThreadPoolExecutor(int corePoolSize
,
int maximumPoolSize
,
long keepAliveTime
,
TimeUnit unit
,
BlockingQueue
<Runnable> workQueue
,
ThreadFactory threadFactory
,
RejectedExecutionHandler handler
) {
if (corePoolSize
< 0 ||
maximumPoolSize
<= 0 ||
maximumPoolSize
< corePoolSize
||
keepAliveTime
< 0)
throw new IllegalArgumentException();
if (workQueue
== null
|| threadFactory
== null
|| handler
== null
)
throw new NullPointerException();
this.corePoolSize
= corePoolSize
;
this.maximumPoolSize
= maximumPoolSize
;
this.workQueue
= workQueue
;
this.keepAliveTime
= unit
.toNanos(keepAliveTime
);
this.threadFactory
= threadFactory
;
this.handler
= handler
;
}
3、提交执行task的过程
public void execute(Runnable command
) {
if (command
== null
)
throw new NullPointerException();
int c
= ctl
.get();
if (workerCountOf(c
) < corePoolSize
) {
if (addWorker(command
, true))
return;
c
= ctl
.get();
}
if (isRunning(c
) && workQueue
.offer(command
)) {
int recheck
= ctl
.get();
if (! isRunning(recheck
) && remove(command
))
reject(command
);
else if (workerCountOf(recheck
) == 0)
addWorker(null
, false);
}
else if (!addWorker(command
, false))
reject(command
);
}
4、addworker源码解析
private boolean addWorker(Runnable firstTask
, boolean core
) {
retry
:
for (;;) {
int c
= ctl
.get();
int rs
= runStateOf(c
);
if (rs
>= SHUTDOWN
&&
! (rs
== SHUTDOWN
&&
firstTask
== null
&&
! workQueue
.isEmpty()))
return false;
for (;;) {
int wc
= workerCountOf(c
);
if (wc
>= CAPACITY
||
wc
>= (core
? corePoolSize
: maximumPoolSize
))
return false;
if (compareAndIncrementWorkerCount(c
))
break retry
;
c
= ctl
.get();
if (runStateOf(c
) != rs
)
continue retry
;
}
}
----------------------------------------------------------------------
boolean workerStarted
= false;
boolean workerAdded
= false;
Worker w
= null
;
try {
w
= new Worker(firstTask
);
final Thread t
= w
.thread
;
if (t
!= null
) {
final ReentrantLock mainLock
= this.mainLock
;
mainLock
.lock();
try {
int rs
= runStateOf(ctl
.get());
if (rs
< SHUTDOWN
||
(rs
== SHUTDOWN
&& firstTask
== null
)) {
if (t
.isAlive())
throw new IllegalThreadStateException();
workers
.add(w
);
int s
= workers
.size();
if (s
> largestPoolSize
)
largestPoolSize
= s
;
workerAdded
= true;
}
} finally {
mainLock
.unlock();
}
if (workerAdded
) {
t
.start();
workerStarted
= true;
}
}
} finally {
if (! workerStarted
)
addWorkerFailed(w
);
}
return workerStarted
;
}
5、线程池worker任务单元
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID
= 6138294804551838833L
;
final Thread thread
;
Runnable firstTask
;
volatile long completedTasks
;
Worker(Runnable firstTask
) {
setState(-1);
this.firstTask
= firstTask
;
this.thread
= getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
6、核心线程执行逻辑-runworker
final void runWorker(Worker w
) {
Thread wt
= Thread
.currentThread();
Runnable task
= w
.firstTask
;
w
.firstTask
= null
;
w
.unlock();
boolean completedAbruptly
= true;
try {
while (task
!= null
|| (task
= getTask()) != null
) {
w
.lock();
if ((runStateAtLeast(ctl
.get(), STOP
) ||
(Thread
.interrupted() &&
runStateAtLeast(ctl
.get(), STOP
))) &&
!wt
.isInterrupted())
wt
.interrupt();
try {
beforeExecute(wt
, task
);
Throwable thrown
= null
;
try {
task
.run();
} catch (RuntimeException x
) {
thrown
= x
; throw x
;
} catch (Error x
) {
thrown
= x
; throw x
;
} catch (Throwable x
) {
thrown
= x
; throw new Error(x
);
} finally {
afterExecute(task
, thrown
);
}
} finally {
task
= null
;
w
.completedTasks
++;
w
.unlock();
}
}
completedAbruptly
= false;
} finally {
processWorkerExit(w
, completedAbruptly
);
}
}
转载请注明原文地址: https://lol.8miu.com/read-13934.html