文章目录
1线程池基础使用1.1 概述1.2 线程池的优点1.3 Exector继承图1.4 ExecutorService接口1.5 Executors工具类1.5.1 生成各种线程池的方法1.5.2 方法的使用示例1.5.3 各个方法的源码返回ThreadPoolExecutor对象的方法:返回ScheduledThreadPoolExecutor对象的方法:返回ForkJoinPool对象的方法
1.2.5 线程池的工作流程1.2.6 ThreadPoolExecutor参数1.2.7 自定义线程池
---------分割线------下面内容面试不太涉及---------2.ThreadPoolExecutor源码分析2.1、常用变量的解释2.2、构造方法2.3、提交执行task的过程2.4、addworker源码解析2.5、线程池worker任务单元2.6、核心线程执行逻辑-runworker
3. WorkStealingPool---ForkJoinPool3.1 ForkJoinPool与ThreadPoolExecutor的区别
3.2 可以添加到ForkJoinPool中的任务类型
1线程池基础使用
1.1 概述
线程的创建和销毁消耗的资源都非常大,我们提前创建好多个线程,放入线程池中,使用时直接获取,使用完毕后再归还到线程池中,这样就避免了创建和销毁,实现重复利用。在实际的开发中我们都使用这个方法。java通过Executor这个工厂类向我们提供各种的线程池。
1.2 线程池的优点
减少线程的创建时间,提高相应速度
重复利用线程池中的线程,降低资源消耗
便于线程的管理,比如可以控制 核心池的大小,最大线程数等。
1.3 Exector继承图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-z8z0t9eZ-1603327309491)(/Users/luca/MarkText-img-Support/Executor继承图.png)]
1.4 ExecutorService接口
ExecutorService: 真正的线程池接口。常见子类ThreadPoolExecutor,ScheduledPoolExecutor, ForkJoinPool
其中定义的三个常用的方法:
Future submit(Callable task): 执行任务,有返回值,一般又来执行 Callable
void shutdown() : 关闭连接池
void execute(Runnable command) : 执行任务/命令,没有返回值,一般用来执行 Runnable
1.5 Executors工具类
1.5.1 生成各种线程池的方法
Executors: 工具类、线程池的工具类,用于创建并返回不同类型的线程池
Executors.newCachedThreadPool(): 创建一个可根据需要创建新线程的线程池
Executors.newFixedThreadPool(n); 创建一个可重用固定线程数的线程池
Executors.newSingleThreadExecutor() : 创建一个只有一个线程的线程池
Executors.newScheduledThreadPool(n): 创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
下面橙色方块中的是Executors中的方法,返回对应黄色方块中的对象,而黄色方块中的类都是ExecutorService的直接或间接子类.
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GKFdMXwV-1603327309500)(/Users/luca/MarkText-img-Support/2020-07-24-10-40-11-image.png)]
1.5.2 方法的使用示例
如何使用工具类提供的方法?以ThreadPoolExecutor为例
class NumThread implements Runnable{
@Override
public void run() {
for (int i
= 0; i
<100 ; i
++) {
System
.out
.println(Thread
.currentThread().getName()+": " + i
);
}
}
}
public class ThreadPoolTest {
public static void main(String
[] args
) {
ExecutorService executorService
= Executors
.newFixedThreadPool(10);
ThreadPoolExecutor service
= (ThreadPoolExecutor
) executorService
;
service
.setCorePoolSize(5);
service
.setMaximumPoolSize(20);
executorService
.execute(new NumThread());
executorService
.shutdown();
}
}
1.5.3 各个方法的源码
返回ThreadPoolExecutor对象的方法:
public static ExecutorService
newFixedThreadPool(int nThreads
, ThreadFactory threadFactory
) {
return new ThreadPoolExecutor(nThreads
, nThreads
,
0L
, TimeUnit
.MILLISECONDS
,
new LinkedBlockingQueue<Runnable>(),
threadFactory
);
}
public static ExecutorService
newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L
, TimeUnit
.MILLISECONDS
,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService
newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer
.MAX_VALUE
,
10L
, TimeUnit
.SECONDS
,
new SynchronousQueue<Runnable>());
}
返回ScheduledThreadPoolExecutor对象的方法:
public static ScheduledExecutorService
newScheduledThreadPool(
int corePoolSize
, ThreadFactory threadFactory
) {
return new ScheduledThreadPoolExecutor(corePoolSize
, threadFactory
);
}
public static ScheduledExecutorService
newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
返回ForkJoinPool对象的方法
public static ExecutorService
newWorkStealingPool(int parallelism
) {
return new ForkJoinPool
(parallelism
,
ForkJoinPool
.defaultForkJoinWorkerThreadFactory
,
null
, true);
}
1.2.5 线程池的工作流程
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i811OJea-1603327309574)(/Users/luca/MarkText-img-Support/2020-07-24-10-46-48-image.png)]
1.2.6 ThreadPoolExecutor参数
以ThreadPoolExecutor的构造器为例:
public ThreadPoolExecutor(int corePoolSize
,
int maximumPoolSize
,
long keepAliveTime
,
TimeUnit unit
,
BlockingQueue
<Runnable> workQueue
,
ThreadFactory threadFactory
,
RejectedExecutionHandler handler
) {
if (corePoolSize
< 0 ||
maximumPoolSize
<= 0 ||
maximumPoolSize
< corePoolSize
||
keepAliveTime
< 0)
throw new IllegalArgumentException();
if (workQueue
== null
|| threadFactory
== null
|| handler
== null
)
throw new NullPointerException();
this.corePoolSize
= corePoolSize
;
this.maximumPoolSize
= maximumPoolSize
;
this.workQueue
= workQueue
;
this.keepAliveTime
= unit
.toNanos(keepAliveTime
);
this.threadFactory
= threadFactory
;
this.handler
= handler
;
}
public static void SelfMakingThreadPoolExecutorTest(){
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(
1,
2,
10,
TimeUnit
.SECONDS
,
new ArrayBlockingQueue<Runnable>(5),
Executors
.defaultThreadFactory();
);
}
参数:
corePoolSize - the number of threads to keep in the pool, even if they are idle, unless allowCoreThreadTimeOut is set
maximumPoolSize - the maximum number of threads to allow in the pool
keepAliveTime - when the number of threads is greater than the core, this is the maximum time that excess idle threads will wait for new tasks before terminating.
unit - the time unit for the keepAliveTime argument
workQueue - the queue to use for holding tasks before they are executed. This queue will hold only the Runnable tasks submitted by the execute method. 这里写的是阻塞队列,阻塞队列也有很多种,更具具体的需求来选择不同的队列。
threadFactory - the factory to use when the executor creates a new thread
这个参数要实现ThreadFactory接口;这个工厂主要指定如何创建一个线程,比如线程的名字是什么,线程的优先级是什么,线程是否为守护线程等
(我们一般不提供这个参数,使用默认的Executors.defaultThreadFactory() )
handler - the handler to use when execution is blocked because the thread bounds and queue capacities are reached
拒绝策略,但所有线程正在使用,已经到达最大线程数,阻塞队列也已经满时,执行拒绝策略。
JDK默认给我们提供了4种拒绝策略:
AbortPolicy:扔掉线程,并抛异常
DiscardPolicy:扔掉,但是不抛异常
DiscardOldestPolicy:扔掉排队时间最久的,把新来的这个线程放入阻塞队列中
CallerRunsPolicy:调用者处理任务,哪一个线程向线程池提交的任务,就把这个任务还给谁去处理
我们也可以自己定义。(我们一般不提供这个参数,使用默认的)
1.2.7 自定义线程池
(我们拿ThreadPoolExecutor为例,其他的也一样) 熟悉了这七个参数后,我们就可以自己创建线程池了。
public static void SelfMakingThreadPoolExecutorTest(){
ThreadPoolExecutor threadPoolExecutor
= new ThreadPoolExecutor(1,
2,
10,
TimeUnit
.SECONDS
,
new ArrayBlockingQueue(5));
threadPoolExecutor
.execute(new Runnable() {
@Override
public void run() {
System
.out
.println("自定义线程池");
}
});
}
---------分割线------下面内容面试不太涉及---------
2.ThreadPoolExecutor源码分析
2.1、常用变量的解释
private final AtomicInteger ctl
= new AtomicInteger(ctlOf(RUNNING
, 0));
private static final int COUNT_BITS
= Integer
.SIZE
- 3;
private static final int CAPACITY
= (1 << COUNT_BITS
) - 1;
private static final int RUNNING
= -1 << COUNT_BITS
;
private static final int SHUTDOWN
= 0 << COUNT_BITS
;
private static final int STOP
= 1 << COUNT_BITS
;
private static final int TIDYING
= 2 << COUNT_BITS
;
private static final int TERMINATED
= 3 << COUNT_BITS
;
private static int runStateOf(int c
) { return c
& ~CAPACITY
; }
private static int workerCountOf(int c
) { return c
& CAPACITY
; }
private static int ctlOf(int rs
, int wc
) { return rs
| wc
; }
private static boolean runStateLessThan(int c
, int s
) {
return c
< s
;
}
private static boolean runStateAtLeast(int c
, int s
) {
return c
>= s
;
}
2.2、构造方法
public ThreadPoolExecutor(int corePoolSize
,
int maximumPoolSize
,
long keepAliveTime
,
TimeUnit unit
,
BlockingQueue
<Runnable> workQueue
,
ThreadFactory threadFactory
,
RejectedExecutionHandler handler
) {
if (corePoolSize
< 0 ||
maximumPoolSize
<= 0 ||
maximumPoolSize
< corePoolSize
||
keepAliveTime
< 0)
throw new IllegalArgumentException();
if (workQueue
== null
|| threadFactory
== null
|| handler
== null
)
throw new NullPointerException();
this.corePoolSize
= corePoolSize
;
this.maximumPoolSize
= maximumPoolSize
;
this.workQueue
= workQueue
;
this.keepAliveTime
= unit
.toNanos(keepAliveTime
);
this.threadFactory
= threadFactory
;
this.handler
= handler
;
}
2.3、提交执行task的过程
public void execute(Runnable command
) {
if (command
== null
)
throw new NullPointerException();
int c
= ctl
.get();
if (workerCountOf(c
) < corePoolSize
) {
if (addWorker(command
, true))
return;
c
= ctl
.get();
}
if (isRunning(c
) && workQueue
.offer(command
)) {
int recheck
= ctl
.get();
if (! isRunning(recheck
) && remove(command
))
reject(command
);
else if (workerCountOf(recheck
) == 0)
addWorker(null
, false);
}
else if (!addWorker(command
, false))
reject(command
);
}
2.4、addworker源码解析
private boolean addWorker(Runnable firstTask
, boolean core
) {
retry
:
for (;;) {
int c
= ctl
.get();
int rs
= runStateOf(c
);
if (rs
>= SHUTDOWN
&&
! (rs
== SHUTDOWN
&&
firstTask
== null
&&
! workQueue
.isEmpty()))
return false;
for (;;) {
int wc
= workerCountOf(c
);
if (wc
>= CAPACITY
||
wc
>= (core
? corePoolSize
: maximumPoolSize
))
return false;
if (compareAndIncrementWorkerCount(c
))
break retry
;
c
= ctl
.get();
if (runStateOf(c
) != rs
)
continue retry
;
}
}
----------------------------------------------------------------------
boolean workerStarted
= false;
boolean workerAdded
= false;
Worker w
= null
;
try {
w
= new Worker(firstTask
);
final Thread t
= w
.thread
;
if (t
!= null
) {
final ReentrantLock mainLock
= this.mainLock
;
mainLock
.lock();
try {
int rs
= runStateOf(ctl
.get());
if (rs
< SHUTDOWN
||
(rs
== SHUTDOWN
&& firstTask
== null
)) {
if (t
.isAlive())
throw new IllegalThreadStateException();
workers
.add(w
);
int s
= workers
.size();
if (s
> largestPoolSize
)
largestPoolSize
= s
;
workerAdded
= true;
}
} finally {
mainLock
.unlock();
}
if (workerAdded
) {
t
.start();
workerStarted
= true;
}
}
} finally {
if (! workerStarted
)
addWorkerFailed(w
);
}
return workerStarted
;
}
2.5、线程池worker任务单元
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID
= 6138294804551838833L
;
final Thread thread
;
Runnable firstTask
;
volatile long completedTasks
;
Worker(Runnable firstTask
) {
setState(-1);
this.firstTask
= firstTask
;
this.thread
= getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
2.6、核心线程执行逻辑-runworker
final void runWorker(Worker w
) {
Thread wt
= Thread
.currentThread();
Runnable task
= w
.firstTask
;
w
.firstTask
= null
;
w
.unlock();
boolean completedAbruptly
= true;
try {
while (task
!= null
|| (task
= getTask()) != null
) {
w
.lock();
if ((runStateAtLeast(ctl
.get(), STOP
) ||
(Thread
.interrupted() &&
runStateAtLeast(ctl
.get(), STOP
))) &&
!wt
.isInterrupted())
wt
.interrupt();
try {
beforeExecute(wt
, task
);
Throwable thrown
= null
;
try {
task
.run();
} catch (RuntimeException x
) {
thrown
= x
; throw x
;
} catch (Error x
) {
thrown
= x
; throw x
;
} catch (Throwable x
) {
thrown
= x
; throw new Error(x
);
} finally {
afterExecute(task
, thrown
);
}
} finally {
task
= null
;
w
.completedTasks
++;
w
.unlock();
}
}
completedAbruptly
= false;
} finally {
processWorkerExit(w
, completedAbruptly
);
}
}
3. WorkStealingPool—ForkJoinPool
Executor.WorkStealingPool()返回的是ForkJoinPool对象,ForkJoinPool对象的特点:
Fork分叉,join汇总;这个池子就是用来将一个大的任务分解成小的任务,之后在汇总起来
它可以用很少的线程来执行多个子任务
cpu密集型
3.1 ForkJoinPool与ThreadPoolExecutor的区别
ThreadPoolExecutor是有一个线程的集合(存储在HashSet中)和一个任务队列(也就是我们的BlockingQueue),所有的线程从同一个任务队列中取出任务,而ForkJoinPool是每一个线程都有一个单独的队列,当一个线程执行完自己的任务之后,会去其他的线程“偷”任务
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RqNN5wrj-1603327309586)(/Users/luca/MarkText-img-Support/2020-09-02-22-31-32-image.png)]
3.2 可以添加到ForkJoinPool中的任务类型
因为ForkJoinPool是可以拆分任务的,所以我们要求这个任务是可拆分的,可汇总的。所以我们不能继承传统的Runnable或Callerable接口,我们要为他定义一种特殊的类型。这个类就是ForkJoinTask
public abstract class ForkJoinTask<V> implements Future<V>, Serializable
{
}
ForkJoinTask在实际开发中比较原始,我们可以使用RecursiveAction(不带返回值;它叫做“递归动作”,不停的切分不就是一个递归吗?)
public abstract class RecursiveAction extends ForkJoinTask<Void> {}
当然,如果我们需要返回值我们可以继承RecursiveTask类