线程池是在JDK 1.5之后出现的概念,它为我们提供了一个Executor接口用于创建线程池。
在JDK 1.5之前,如果我们在并发编程的过程中需要实现多个线程的话,采用的是多线程计数的方式,而之所以出现了线程池的概念,原因就是因为多线程计数在部分条件下无法满足程序的要求。
多线程计数能够最大限度发挥多核处理器的计算能力,但是对于多线程来说,并不是线程数量越多,程序运行的效率越高,那么如果随意的去创建线程的话,反而会对系统性能带来不利的影响,原因就是因为线程的数量与系统性能不是一个正比的关系,而是一个抛物线的关系。
线程池就是会事先创建若干个可执行的线程放入一个池(容器)当中,有任务需要执行时,从池子中获取一个线程去执行任务,不用自行创建,执行完后不需要销毁线程,而是将当前的线程归还到线程池当中,从而减少创建和销毁线程所带来的性能的开销。
因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。
4、线程池可以统一的调优和监控。5、提供更强大的功能,延时定时线程池。线程池的创建是用Executors工厂类调用静态方法实现的,而线程任务的提交除了submit()方法之外,还有一个execute()方法。
execute()和submit()方法的区别:
execute(Runnable command),执行一个任务,没有返回值。submit(Runnable command),提交一个线程任务,有返回值。submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用。(IntentService中有体现)
submit(Runnable task, T result),能通过传入的载体 result 间接获得线程的返回值。submit(Runnable task),则是没有返回值的,就算获取它的返回值也是null。Future.get()方法会使取结果的线程进入阻塞状态,知道线程执行完成之后,唤醒取结果的线程,然后返回结果。创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
适用:一个任务一个任务执行的场景。 class CallableTask implements Callable<Integer> { @Override public Integer call() { System.out.println("任务执行时间:" + new Date().getSeconds()); System.out.println(Thread.currentThread().getName() + " :: hello world"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " :: hello world, goodBye"); return (int) (Math.random() * 100); } } public class TestDemo { public static void main(String[] args) { //单一数量的线程池 ExecutorService pool = Executors.newSingleThreadExecutor(); //创建Callable任务 CallableTask task = new CallableTask(); //Future对象接收当前task的返回结果 for (int i = 0; i < 5; i++) { Future<Integer> result = pool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //关闭线程池 pool.shutdown(); } }创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
适用:执行长期的任务,性能好很多。 public class TestDemo { public static void main(String[] args) { //固定数量的线程池 ExecutorService pool = Executors.newFixedThreadPool(2); } }创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
适用:周期性执行任务的场景。 public class TestDemo { public static void main(String[] args) { //周期性的线程池 ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); //包含有5个核心线程的线程池 //创建任务 RunnableTask task = new RunnableTask(); //提交任务 System.out.println("任务提交时间:" + new Date().getSeconds()); pool.schedule(task,3,TimeUnit.SECONDS); //关闭线程池 pool.shutdown(); } }创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
适用:执行很多短期异步的小程序或者负载较轻的服务器。 class CallableTask implements Callable<Integer> { private CountDownLatch latch; public CallableTask(CountDownLatch latch) { this.latch = latch; } @Override public Integer call() { System.out.println("任务执行时间:" + new Date().getSeconds()); System.out.println(Thread.currentThread().getName() + " :: hello world"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " :: hello world, goodBye"); latch.countDown(); return (int) (Math.random() * 100); } } public class TestDemo { public static void main(String[] args) { //可缓存的线程池 /** * 当有任务提交时,优先重复使用线程池中的空闲线程, * 但是如果线程池中没有空闲线程时,则会直接创建新的线程去执行提交的任务 */ ExecutorService pool = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(30); for (int i = 0; i < 30; i++) { pool.submit(new CallableTask(latch)); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main thread end"); //关闭线程池 pool.shutdown(); } }在JDK当中,为线程池提供了一个Executor框架,它相当于整个线程池继承关系的父接口,它的主要作用是把任务的提交和执行进行解耦,我们只需要定义好任务,然后提交给线程池,而不需要关心线程是如何执行任务,任务被哪个线程执行,以及什么时候执行。
在Executor框架下面,JDK还为我们提供了一个工厂类,即Executors,它是Java线程池的工厂类,我们通过它可以初始化一个线程池。
接下来,我们通过newFixedThreadPool这个线程池来看看线程池的底层究竟是如何来执行的。
//创建线程池对象 ExecutorService pool = Executors.newFixedThreadPool(2); //包含2个线程对象当我们需要使用一个线程池时,我们可以直接通过Executors工厂类调用相应的静态方法,创建线程池对象。
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }而工厂类中的静态方法实际上是给我们返回了一个ThreadPoolExecutor构造器。事实上底层创建线程池对象时都会需要这个构造器,它存在于ThreadPoolExecutor这个类当中。
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory and rejected execution handler. * It may be more convenient to use one of the {@link Executors} factory * methods instead of this general purpose constructor. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } int corePoolSize,核心线程数,当提交一个任务,线程池创建一个新线程去执行任务,直到当前线程数等于corePoolSize,继续提交的任务会被缓存到阻塞队列中,等待被执行。int maximumPoolSize,最大线程数,如果当前阻塞队列满了,继续提交任务,需要创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。long keepAliveTime,线程空闲时的存活时间。TimeUnit unit,keepAliveTime的时间单位。BlockingQueue<Runnable> workQueue,用于缓存任务的阻塞队列。ThreadFactory threadFactory,创建线程的工厂。RejectedExecutionHandler handler。拒绝策略。如果我们在使用线程池执行任务时,当阻塞队列满了,没有空闲的工作线程时,此时如果继续提交任务,那么线程池需要采取一种策略处理当前的任务,JDK为线程池提供了4种策略:
1、AbortPolicy :直接抛出异常。2、CallerRunsPolicy :用调用者所在的线程去执行任务。3、DiscardOldestPolicy :丢弃阻塞队列中最靠前的任务。4、DiscardPolicy :直接丢弃当前的任务。5、除了JDK提供的这四种拒绝策略之外,我们还可以自定义拒绝策略,需要实现RejectedExecutionHandler接口。拒绝策略的验证,以AbortPolicy为例:
class Task implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public class TestDemo { public static void main(String[] args) { //拒绝策略 - AbortPolicy 直接抛出异常 ExecutorService pool = new ThreadPoolExecutor(2, 3, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(5), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 10;i++){ pool.submit(new Task()); } pool.shutdown(); } }初始化一个指定数量的线程池,其中corePoolSize == maximumPoolSize == nThreads,使用LinkedBlockingDeque作为阻塞队列,如果当前没有可执行任务,也不会释放线程。
newCachedThreadPool :可缓存的线程池。 /** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }初始化一个可缓存线程的线程池,默认缓存60s,线程池的线程数最大可达到Integer.MAX_VALUE,使用SynchronousQueue作为阻塞队列,一定要注意控制并发的任务数,否则会导致严重系统性能问题。
newSingleThreadExecutor :单一数量的线程池。 /** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }初始化只有一个线程的线程池,如果该线程异常结束,会重新创建一个新的线程执行任务,使用LinkedBlockingQueue作为阻塞队列。
newScheduledThreadPool :周期性的线程池。 /** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }初始化线程池的时候使用new ScheduledThreadPoolExecutor创建一个线程池对象,可以指定的时间内周期性的执行所提交的执行。
线程池在提交任务的过程中有两种方法,分别是execute()方法和submit()方法。
execute(Runnable command)submit(Runnable task) / submit(Runnable task, T result) / submit(Callable<T> task)在execute()方法中,我们会发现对线程的状态进行了两次检查,即double check,那么为什么要对它进行两次检查呢?
原因是在多线程环境下,线程池的状态时刻都会发生改变,而ctl.get()是非原子操作,有可能获取到线程池状态后线程池状态就发生了改变。由此可知,在使用execute()方法提交任务时,线程池的工作线程通过worker实现,在ReentrantLock锁的保证下,会将worker工作线程加入到workers(HashSet),并启动Worker当中的线程。
/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }runWorker() - - 线程执行的过程。它的核心工作流程为:
1、线程启动之后, 通过unlock释放锁,为了允许当前线程可被中断。2、获取任务firstTask,执行任务的run方法,在执行run方法之前,还需要进行加锁操作,任务执行完需要释放锁。3、在执行任务的前后,可以根据业务场景自定义beforeExecute方法和afterExecute方法。