应用程序通过Executor框架控制上层的调度;而下层的调度由操作系统内核控制,下层的调度不受应用程序的控制。
Executor框架主要由以下3大部分组成: 1.任务。包括被执行任务需要实现的接口:Runnable接口或Callable接口。 2.任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)。 3.异步计算的结果。包括接口Future和实现Future接口的FutureTask类。
Executor框架最核心的类是ThreadPoolExecutor,它是线程池的实现类。
FixThreadPool被称为可重用固定线程数的线程池。
keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。 FixThreadPool把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。
FixedThreadPool适用于为了满足资源管理的需求,而 需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。适用于任务量已知,相对耗时的任务。
希望多个任务排队执行,任务数为1,当任务数多于1时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会释放。
跟自己创建单线程的区别:
使用单个Worker线程。 SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响与FixedThreadPool相同。
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
CachedThreadPool是一个会根据需要创建新线程的线程池。 直白地讲,它没有容量,没有线程来取是放不进去的。它还可以无限创建。
corePoolSize设置为0,maximumPoolSize设置为Integer.MAX_VALUE,keepAliveTime设置为60L(意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止)。
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列, 但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一 个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。 CachedThreadPool中任务传递的示意图如图所示。
CachedThreadPool是大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
Timer对应的是单个后台进程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
任务队列DelayQueue是一个无界队列。 执行主要分两大部分: 1.当调用scheduleAtFixedRate(以上一个任务开始的时间计时,到时间如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行)或scheduleWithFixedDelay(从上次任务结束时开始算)方法时,会向DelayQueue添加一个ScheduledFutureTask。 2.线程池的线程从DelayQueue中获取ScheduleFutureTask,然后执行任务。
默认情况下,即使第一个任务出现异常,第二个任务也会照常执行。 异常的处理方法: 1.用try catch捕获 2.Future接收结果,get()获取异常信息
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的Scheduled- FutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就 是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
1.线程从DelayQueue中获取已到期的ScheduledFutureTask 2.线程执行这个ScheduledFutureTask 3.线程修改这个ScheduledFutureTask的time变量为下次要执行的时间 4.线程将这个ScheduledFutureTask放回到DelayQueue
获取任务分3大步骤: 1.获取Lock 2.获取周期任务 (1)如果PriorityQueue为空,当前线程到Condition中等待;否则执行下面的2.2 (2)如果PriorityQueue的头元素的time时间比当前时间大,到Condition中等待到Time时间,否则执行下面的2.3 (3)获取PriorityQueue的头元素;如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程。 3.释放Lock
ScheduledThreadPoolExecutor在一个循环中执行步骤2,直到线程从PriorityQueue获取到一个元素之后,才会退出循环。
向DelayQueue添加任务的3大步骤: 1.获取Lock 2.添加任务 (1)向PriorityQueue添加任务 (2)如果添加的任务是PriorityQueue的头元素,则唤醒在Condition中等待所有线程 3.释放Lock
Future接口和实现FutureTask类,代表异步计算的结果。
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。 FutureTask可以处于下面3种状态: 1.未启动。run之前 2.已启动。run被执行过程中 3.已完成。执行完后正常结束,或被取消(FutureTask.cancel(…)),或执行FutureTask.run()方法时抛出异常而异常结束,FutureTask处于已完成状态。
当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。
可以把FutureTask交给Executor执行;也可以通过ExecutorService.submit(…)方法返回一个 FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(…)方法。除此以外,还可以单独 使用FutureTask。
FutureTask的实现基于AQS。 每一个基于AQS实现的同步器都会包含两种类型的操作: 1.至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。 2.至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。
FutureTask的设计示意图: get方法 run方法 级联唤醒
