一、线程池ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
构造函数的参数含义如下:
corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;
maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;
keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;
unit:keepAliveTime的单位
workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列:SynchronousQueue(BlockingQueue)、有界任务队列:ArrayBlockingQueue、无界任务队列:LinkedBlockingQueue、优先任务队列:PriorityBlockingQueue几种;
ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。PriorityBlockingQueue:一个具有优先级得无限阻塞队列。threadFactory:线程工厂,用于创建线程,一般用默认即可;
handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务,实现了RejectedExecutionHandler接口;
二、ThreadPoolExecutor扩展
ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,
1、beforeExecute:线程池中任务运行前执行
2、afterExecute:线程池中任务运行完毕后执行
3、terminated:线程池退出后执行
三、举例用法
1)新建一个执行器Executor
public class Executor {
private ThreadPoolExecutor asynExecutor; public static Executor getInstance(){ return SingletonHolder.instance; } public static final class SingletonHolder{ private static final Executor instance=new Executor(); } public synchronized ThreadPoolExecutor getThreadPoolExecutorInstance(int corePoolSize, int maxnumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler rejectedExecutionHandler){ if( asynExecutor == null || asynExecutor.isShutdown()){ asynExecutor=new ThreadPoolExecutor(corePoolSize, maxnumPoolSize, keepAliveTime, unit, workQueue, rejectedExecutionHandler); } return asynExecutor; } public synchronized ThreadPoolExecutor getQueueThreadPoolExecutorInstance(int corePoolSize, int maxnumPoolSize, long keepAliveTime,TimeUnit unit, BlockingQueue<Runnable> workQueue,RejectedExecutionHandler rejectedExecutionHandler){ if( asynExecutor == null || asynExecutor.isShutdown()){ asynExecutor=new ThreadPoolExecutor(corePoolSize, maxnumPoolSize, keepAliveTime, unit, workQueue, rejectedExecutionHandler){ @Override public void execute(Runnable commad) { if(this.getQueue().size()>0){ try { this.getQueue().put(commad); } catch (InterruptedException e) { e.printStackTrace(); } }else{ super.execute(commad); } } }; } return asynExecutor; }
}
2)调用执行器的方法(新建一个拒绝策略)
private static final TaoBaoServiceRejectedExecutionHandler taoBaoServiceRejectedExecutionHandler=new TaoBaoServiceRejectedExecutionHandler(); private static final BlockingQueue<Runnable> WORK_QUEUE=new LinkedBlockingQueue<Runnable>() ;
/*****方法调用*****/
public void exceute(){
ThreadPoolExecutor executor= Executor.getInstance().getQueueThreadPoolExecutorInstance(3, 6,1, TimeUnit.SECONDS, WORK_QUEUE, taoBaoServiceRejectedExecutionHandler); executor.execute(new TaoBaoServiceThread(3,4));
}
/*****构造方法注入参数值*****/
public final class TaoBaoServiceThread implements Runnable{ int i,j; public TaoBaoServiceThread(int i, int j) { super(); this.i=i; this.j=j; } @Override public void run() { System.out.println(i); System.out.println(j); } }
/*****声明一下拒绝策略类*****/
public static final class TaoBaoServiceRejectedExecutionHandler implements RejectedExecutionHandler{ @Override public void rejectedExecution(Runnable taoBaoServiceThread, ThreadPoolExecutor executor) { try { executor.getQueue().put(taoBaoServiceThread); } catch (InterruptedException e) { e.printStackTrace(); } } }