JAVA线程池用法

it2024-10-19  38

一、线程池ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,                               int maximumPoolSize,                               long keepAliveTime,                               TimeUnit unit,                               BlockingQueue<Runnable> workQueue,                               ThreadFactory threadFactory,                               RejectedExecutionHandler handler) {         if (corePoolSize < 0 ||             maximumPoolSize <= 0 ||             maximumPoolSize < corePoolSize ||             keepAliveTime < 0)             throw new IllegalArgumentException();         if (workQueue == null || threadFactory == null || handler == null)             throw new NullPointerException();         this.acc = System.getSecurityManager() == null ?                 null :                 AccessController.getContext();         this.corePoolSize = corePoolSize;         this.maximumPoolSize = maximumPoolSize;         this.workQueue = workQueue;         this.keepAliveTime = unit.toNanos(keepAliveTime);         this.threadFactory = threadFactory;         this.handler = handler;     }

构造函数的参数含义如下:

corePoolSize:指定了线程池中的线程数量,它的数量决定了添加的任务是开辟新的线程去执行,还是放到workQueue任务队列中去;

maximumPoolSize:指定了线程池中的最大线程数量,这个参数会根据你使用的workQueue任务队列的类型,决定线程池会开辟的最大线程数量;

keepAliveTime:当线程池中空闲线程数量超过corePoolSize时,多余的线程会在多长时间内被销毁;

unit:keepAliveTime的单位

workQueue:任务队列,被添加到线程池中,但尚未被执行的任务;它一般分为直接提交队列:SynchronousQueue(BlockingQueue)、有界任务队列:ArrayBlockingQueue、无界任务队列:LinkedBlockingQueue、优先任务队列:PriorityBlockingQueue几种;

ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。PriorityBlockingQueue:一个具有优先级得无限阻塞队列。

threadFactory:线程工厂,用于创建线程,一般用默认即可;

handler:拒绝策略;当任务太多来不及处理时,如何拒绝任务,实现了RejectedExecutionHandler接口;

二、ThreadPoolExecutor扩展

ThreadPoolExecutor扩展主要是围绕beforeExecute()、afterExecute()和terminated()三个接口实现的,

1、beforeExecute:线程池中任务运行前执行

2、afterExecute:线程池中任务运行完毕后执行

3、terminated:线程池退出后执行

三、举例用法

1)新建一个执行器Executor

public class Executor {

    private ThreadPoolExecutor asynExecutor;     public static Executor getInstance(){         return SingletonHolder.instance;     }     public static final class SingletonHolder{         private static final  Executor instance=new Executor();     }     public synchronized ThreadPoolExecutor getThreadPoolExecutorInstance(int corePoolSize, int maxnumPoolSize,         long keepAliveTime,TimeUnit unit,         BlockingQueue<Runnable> workQueue,RejectedExecutionHandler rejectedExecutionHandler){         if( asynExecutor == null ||  asynExecutor.isShutdown()){             asynExecutor=new ThreadPoolExecutor(corePoolSize,  maxnumPoolSize, keepAliveTime, unit,                      workQueue, rejectedExecutionHandler);         }         return asynExecutor;     }     public  synchronized ThreadPoolExecutor getQueueThreadPoolExecutorInstance(int corePoolSize, int maxnumPoolSize,             long keepAliveTime,TimeUnit unit,             BlockingQueue<Runnable> workQueue,RejectedExecutionHandler rejectedExecutionHandler){             if( asynExecutor == null ||  asynExecutor.isShutdown()){                 asynExecutor=new ThreadPoolExecutor(corePoolSize,  maxnumPoolSize, keepAliveTime, unit, workQueue, rejectedExecutionHandler){                     @Override                     public void execute(Runnable commad) {                      if(this.getQueue().size()>0){                          try {                             this.getQueue().put(commad);                         } catch (InterruptedException e) {                           e.printStackTrace();                         }                        }else{                          super.execute(commad);                         }                     }                 };             }          return asynExecutor;     }

}

2)调用执行器的方法(新建一个拒绝策略)

     private static  final TaoBaoServiceRejectedExecutionHandler taoBaoServiceRejectedExecutionHandler=new TaoBaoServiceRejectedExecutionHandler();      private static  final BlockingQueue<Runnable>  WORK_QUEUE=new  LinkedBlockingQueue<Runnable>() ;

/*****方法调用*****/

public void exceute(){

    ThreadPoolExecutor executor=    Executor.getInstance().getQueueThreadPoolExecutorInstance(3, 6,1, TimeUnit.SECONDS, WORK_QUEUE, taoBaoServiceRejectedExecutionHandler);         executor.execute(new TaoBaoServiceThread(3,4));

}

/*****构造方法注入参数值*****/

public final class TaoBaoServiceThread implements Runnable{         int i,j;         public TaoBaoServiceThread(int i, int j) {             super();             this.i=i;             this.j=j;         }         @Override         public void run() {               System.out.println(i);               System.out.println(j);         }     }

/*****声明一下拒绝策略类*****/

    public static final class TaoBaoServiceRejectedExecutionHandler implements RejectedExecutionHandler{         @Override         public void rejectedExecution(Runnable taoBaoServiceThread, ThreadPoolExecutor executor) {             try {                  executor.getQueue().put(taoBaoServiceThread);             } catch (InterruptedException e) {                 e.printStackTrace();             }         }         }

最新回复(0)