java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。ExecutorService是一个接口,继承了Executor接口。Executor接口只包含了一个方法:void execute(Runnable command);,该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。jdk默认提供了四种线程池供我们使用。
大体的继承结构如下:
ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名高三位接受新任务处理阻塞队列任务说明RUNNING111是是线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。SHUTDOWN000否是不会接收新任务,但会处理阻塞队列剩余任务(调用shutdown()方法)STOP001否否会中断正在执行的任务,并抛弃阻塞队列任务(调用shutdownNow()方法)TIDYING010--任务全执行完毕,活动线程为 0 即将进入终结TERMINATED011--终结状态从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,因为最高位表示符号位,这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。
状态转换图如下:
corePoolSize: 核心线程数目 (最多保留的线程数),线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会 被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。
maximumPoolSize :最大线程数目,一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接执行,如果没有则会缓存到工作队列中,如果工作队列满了,才会创建一个新线程(这就是救急线程),然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建救急线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize的数量减去corePoolSize的数量来确定,最多能达到maximunPoolSize即最大线程池线程数量。
keepAliveTime :生存时间 - 针对救急线程
unit: 时间单位 - 针对救急线程
workQueue :新任务被提交后,如果没有空闲的核心线程就会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中常见的任务队列:
①ArrayBlockingQueue
基于数组的有界阻塞队列,按FIFO排序。有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。
②LinkedBlockingQuene
基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。
③SynchronousQuene
一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。
④PriorityBlockingQueue
具有优先级的无界阻塞队列,优先级通过参数Comparator实现。
threadFactory :线程工厂 - 可以为线程创建时起个好名字
handler: 拒绝策略
如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现:
AbortPolicy策略:抛出 RejectedExecutionException 异常,这是默认策略
CallerRunsPolicy策略: 在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。
DiscardPolicy策略: 放弃本次任务
DiscardOldestPolicy策略: 放弃队列中最早的任务,本任务取而代之
Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads) : 创建固定数目线程的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }特点:
核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间。阻塞队列是无界的,可以放任意数量的任务适用于任务量已知,相对耗时的任务public static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }特点:
核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
救急线程可以无限创建。
整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线
程。 适合任务数比较密集,但每个任务执行时间较短的情况
public static ExecutorService newSingleThreadExecutor(): 创建一个单线程化的Executor。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }特点:
自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
相较于其它三种线程池,newSingleThreadExecutor()创建的是FinalizableDelegatedExecutorService对象,其应用的是装饰者模式 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。FinalizableDelegatedExecutorService继承自DelegatedExecutorService中的方法都是调用ExecutorService 接口的方法
适用于希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }ScheduledExecutorService接口的方法
/** *指定延迟时间后执行 */ public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); /** *scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完* 毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。 */ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); /** *scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。 */ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);