【Java多线程】线程池

it2024-01-20  81

线程池

线程池是在JDK 1.5之后出现的概念,它为我们提供了一个Executor接口用于创建线程池。

在JDK 1.5之前,如果我们在并发编程的过程中需要实现多个线程的话,采用的是多线程计数的方式,而之所以出现了线程池的概念,原因就是因为多线程计数在部分条件下无法满足程序的要求。

多线程计数能够最大限度发挥多核处理器的计算能力,但是对于多线程来说,并不是线程数量越多,程序运行的效率越高,那么如果随意的去创建线程的话,反而会对系统性能带来不利的影响,原因就是因为线程的数量与系统性能不是一个正比的关系,而是一个抛物线的关系。

出现线程池的原因

1、创建和销毁线程也需要时间,导致系统性能低下。 T1线程的创建时间 T2线程的执行时间 T3线程的销毁时间 T1 + T3 >>> T2 2、线程也需要占用内存空间,大量的线程占用的内存资源会比较多,有可能导致OOM异常。3、大量的线程回收也会给GC带来很大的压力。4、大量的线程会抢占CPU的资源,CPU不停的进行各个线程的上下文切换。

线程池的概念

线程池就是会事先创建若干个可执行的线程放入一个池(容器)当中,有任务需要执行时,从池子中获取一个线程去执行任务,不用自行创建,执行完后不需要销毁线程,而是将当前的线程归还到线程池当中,从而减少创建和销毁线程所带来的性能的开销。

线程池的优势

1、降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗。2、提高系统响应速度,当有任务到达时,通过复用已存在的线程,无需等待新线程的创建便能立即执行。3、方便线程并发数的管控。

因为线程若是无限制的创建,可能会导致内存占用过多而产生OOM,并且会造成cpu过度切换(cpu切换线程是有时间成本的(需要保持当前执行线程的现场,并恢复要执行线程的现场))。

4、线程池可以统一的调优和监控。5、提供更强大的功能,延时定时线程池。

线程池的使用

class RunnableTask implements Runnable { @Override public void run() { System.out.println("任务执行时间:" + new Date().getSeconds()); System.out.println(Thread.currentThread().getName() + " :: hello world"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " :: hello world, goodBye"); } } public class TestDemo { public static void main(String[] args) { //创建线程池对象 ExecutorService pool = Executors.newFixedThreadPool(2); //包含2个线程对象 //创建任务 RunnableTask task = new RunnableTask(); /** * Thread t = new Thread(task); * t.start(); //启动当前的线程,调用RunnableTask中的run方法 */ //提交当前的任务到线程池 pool.submit(task); //线程池分配一个线程,执行当前的task pool.submit(task); pool.submit(task); //关闭线程池 pool.shutdown(); } }

一个完整的线程池应该具备的要素

1、线程数量管控功能。2、任务队列:用于缓存提交的任务。3、任务拒绝策略:线程数量到达上线并且任务队列已满。(任务队列是一个有界的队列,线程池中线程的数量也是有界的)4、线程工厂:用于加工线程。5、QueueSize:limit数量。

线程池的工作流程

线程池的处理流程

1、判断核心线程池是否已满,没满则创建一个新的工作线程来执行任务。已满则进行第二步。2、判断任务队列是否已满,没满则将新提交的任务添加在工作队列,已满则进行第三步。3、判断整个线程池是否已满,没满则创建一个新的工作线程来执行任务,已满则执行饱和策略。

JDK中所提供的线程池

单一数量的线程池。固定数量的线程池。周期性的线程池。可缓存的线程池。

线程池的创建是用Executors工厂类调用静态方法实现的,而线程任务的提交除了submit()方法之外,还有一个execute()方法。

execute()和submit()方法的区别:

execute(Runnable command),执行一个任务,没有返回值。submit(Runnable command),提交一个线程任务,有返回值。

submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用。(IntentService中有体现)

submit(Runnable task, T result),能通过传入的载体 result 间接获得线程的返回值。submit(Runnable task),则是没有返回值的,就算获取它的返回值也是null。Future.get()方法会使取结果的线程进入阻塞状态,知道线程执行完成之后,唤醒取结果的线程,然后返回结果。

线程池终止的方法

shutdown() :之前提交的任务继续执行,但是不接受新的任务,无法保证之前的任务执行完毕后关闭。shutdownNow() :试图停止所有正在执行的任务,暂停处理正在等待的任务,会返回一个等待的任务列表,这个方法不会等待正在执行的任务终止。isShutdown() :如果已经被shutdown,返回true。isTerminate() :如果所有任务都已经被终止,返回true。awaitTermination(long timeout, TimeUnit unit) :如果当前任务执行完毕,或者任务等待时间超过超时时间后关闭。

单一数量的线程池

创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

适用:一个任务一个任务执行的场景。 class CallableTask implements Callable<Integer> { @Override public Integer call() { System.out.println("任务执行时间:" + new Date().getSeconds()); System.out.println(Thread.currentThread().getName() + " :: hello world"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " :: hello world, goodBye"); return (int) (Math.random() * 100); } } public class TestDemo { public static void main(String[] args) { //单一数量的线程池 ExecutorService pool = Executors.newSingleThreadExecutor(); //创建Callable任务 CallableTask task = new CallableTask(); //Future对象接收当前task的返回结果 for (int i = 0; i < 5; i++) { Future<Integer> result = pool.submit(task); try { System.out.println(result.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //关闭线程池 pool.shutdown(); } }

固定数量的线程池

创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

适用:执行长期的任务,性能好很多。 public class TestDemo { public static void main(String[] args) { //固定数量的线程池 ExecutorService pool = Executors.newFixedThreadPool(2); } }

周期性的线程池

创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。

适用:周期性执行任务的场景。 public class TestDemo { public static void main(String[] args) { //周期性的线程池 ScheduledExecutorService pool = Executors.newScheduledThreadPool(5); //包含有5个核心线程的线程池 //创建任务 RunnableTask task = new RunnableTask(); //提交任务 System.out.println("任务提交时间:" + new Date().getSeconds()); pool.schedule(task,3,TimeUnit.SECONDS); //关闭线程池 pool.shutdown(); } }

可缓存的线程池

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

适用:执行很多短期异步的小程序或者负载较轻的服务器。 class CallableTask implements Callable<Integer> { private CountDownLatch latch; public CallableTask(CountDownLatch latch) { this.latch = latch; } @Override public Integer call() { System.out.println("任务执行时间:" + new Date().getSeconds()); System.out.println(Thread.currentThread().getName() + " :: hello world"); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " :: hello world, goodBye"); latch.countDown(); return (int) (Math.random() * 100); } } public class TestDemo { public static void main(String[] args) { //可缓存的线程池 /** * 当有任务提交时,优先重复使用线程池中的空闲线程, * 但是如果线程池中没有空闲线程时,则会直接创建新的线程去执行提交的任务 */ ExecutorService pool = Executors.newCachedThreadPool(); CountDownLatch latch = new CountDownLatch(30); for (int i = 0; i < 30; i++) { pool.submit(new CallableTask(latch)); } try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("main thread end"); //关闭线程池 pool.shutdown(); } }

线程池原理分析

在JDK当中,为线程池提供了一个Executor框架,它相当于整个线程池继承关系的父接口,它的主要作用是把任务的提交和执行进行解耦,我们只需要定义好任务,然后提交给线程池,而不需要关心线程是如何执行任务,任务被哪个线程执行,以及什么时候执行。

在Executor框架下面,JDK还为我们提供了一个工厂类,即Executors,它是Java线程池的工厂类,我们通过它可以初始化一个线程池。

接下来,我们通过newFixedThreadPool这个线程池来看看线程池的底层究竟是如何来执行的。

//创建线程池对象 ExecutorService pool = Executors.newFixedThreadPool(2); //包含2个线程对象

当我们需要使用一个线程池时,我们可以直接通过Executors工厂类调用相应的静态方法,创建线程池对象。

/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

而工厂类中的静态方法实际上是给我们返回了一个ThreadPoolExecutor构造器。事实上底层创建线程池对象时都会需要这个构造器,它存在于ThreadPoolExecutor这个类当中。

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters and default thread factory and rejected execution handler. * It may be more convenient to use one of the {@link Executors} factory * methods instead of this general purpose constructor. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } int corePoolSize,核心线程数,当提交一个任务,线程池创建一个新线程去执行任务,直到当前线程数等于corePoolSize,继续提交的任务会被缓存到阻塞队列中,等待被执行。int maximumPoolSize,最大线程数,如果当前阻塞队列满了,继续提交任务,需要创建新的线程执行任务,前提是当前线程数小于maximumPoolSize。long keepAliveTime,线程空闲时的存活时间。TimeUnit unit,keepAliveTime的时间单位。BlockingQueue<Runnable> workQueue,用于缓存任务的阻塞队列。ThreadFactory threadFactory,创建线程的工厂。RejectedExecutionHandler handler。拒绝策略。

JDK中线程池的拒绝策略

如果我们在使用线程池执行任务时,当阻塞队列满了,没有空闲的工作线程时,此时如果继续提交任务,那么线程池需要采取一种策略处理当前的任务,JDK为线程池提供了4种策略:

1、AbortPolicy :直接抛出异常。2、CallerRunsPolicy :用调用者所在的线程去执行任务。3、DiscardOldestPolicy :丢弃阻塞队列中最靠前的任务。4、DiscardPolicy :直接丢弃当前的任务。5、除了JDK提供的这四种拒绝策略之外,我们还可以自定义拒绝策略,需要实现RejectedExecutionHandler接口。

拒绝策略的验证,以AbortPolicy为例:

class Task implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName()); try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } public class TestDemo { public static void main(String[] args) { //拒绝策略 - AbortPolicy 直接抛出异常 ExecutorService pool = new ThreadPoolExecutor(2, 3, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(5), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 10;i++){ pool.submit(new Task()); } pool.shutdown(); } }

四种线程池的传递参数

newFixedThreadPool :固定数量的线程池。 /** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

初始化一个指定数量的线程池,其中corePoolSize == maximumPoolSize == nThreads,使用LinkedBlockingDeque作为阻塞队列,如果当前没有可执行任务,也不会释放线程。

newCachedThreadPool :可缓存的线程池。 /** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

初始化一个可缓存线程的线程池,默认缓存60s,线程池的线程数最大可达到Integer.MAX_VALUE,使用SynchronousQueue作为阻塞队列,一定要注意控制并发的任务数,否则会导致严重系统性能问题。

newSingleThreadExecutor :单一数量的线程池。 /** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

初始化只有一个线程的线程池,如果该线程异常结束,会重新创建一个新的线程执行任务,使用LinkedBlockingQueue作为阻塞队列。

newScheduledThreadPool :周期性的线程池。 /** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

初始化线程池的时候使用new ScheduledThreadPoolExecutor创建一个线程池对象,可以指定的时间内周期性的执行所提交的执行。

ThreadPoolExecutor类中的属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //利用低位29位表示线程池中的线程数目,通过高位3位表示线程池的运行状态,默认是Running private static final int COUNT_BITS = Integer.SIZE - 3; //29 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //0010 0000 ... - 1 = 0001 1111 ... // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; //高位111 private static final int SHUTDOWN = 0 << COUNT_BITS; //高位000 //该状态的线程池不会接收新任务,但是会处理阻塞队列中的任务 private static final int STOP = 1 << COUNT_BITS; //高位001 //该状态的线程池不会接收新任务,但是不会处理阻塞队列中的任务,还会中断正在执行的任务 private static final int TIDYING = 2 << COUNT_BITS; //高位010 private static final int TERMINATED = 3 << COUNT_BITS; //高位011 // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; } // c & 1110 0000... 获取高三位(运行状态) private static int workerCountOf(int c) { return c & CAPACITY; } // c & 0001 1111... 获取低29位(线程个数) private static int ctlOf(int rs, int wc) { return rs | wc; } // -1 | 0 private final ReentrantLock mainLock = new ReentrantLock(); //控制新增工作线程的原子性 private final HashSet<Worker> workers = new HashSet<Worker>(); //工作线程的集合

线程池提交任务流程分析

线程池在提交任务的过程中有两种方法,分别是execute()方法和submit()方法。

execute(Runnable command)submit(Runnable task) / submit(Runnable task, T result) / submit(Callable<T> task)

execute底层源码

//execute()提交任务流程 execute() -> addWorker() -> runWorker() 线程池底层的工作线程是worker线程,需要由ReentrantLock保证。 /** * Executes the given task sometime in the future. The task * may execute in a new thread or in an existing pooled thread. * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}. * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */ public void execute(Runnable command) { //任务不允许为null值 if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); //获取ctl变量 //线程数量与corePoolSize,如果小于corePoolSize if (workerCountOf(c) < corePoolSize) { //addWorker创建线程 if (addWorker(command, true)) return; //没有成功创建线程,再一次获取ctl变量 c = ctl.get(); } //判断线程池状态是否是Running状态,把当前这个任务放入阻塞队列, //如果是Running状态,同时任务成功放入阻塞队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); //再次去检查线程池的状态,如果是非Running状态, //从阻塞队列中移除当前的任务 if (! isRunning(recheck) && remove(command)) //执行拒绝策略 reject(command); //如果线程池仍旧是Running状态,再次获取线程数量,如果没有线程 else if (workerCountOf(recheck) == 0) //创建线程 addWorker(null, false); } //判断线程池中创建线程是否成功,如果失败 else if (!addWorker(command, false)) //执行拒绝策略 reject(command); }

在execute()方法中,我们会发现对线程的状态进行了两次检查,即double check,那么为什么要对它进行两次检查呢?

原因是在多线程环境下,线程池的状态时刻都会发生改变,而ctl.get()是非原子操作,有可能获取到线程池状态后线程池状态就发生了改变。

由此可知,在使用execute()方法提交任务时,线程池的工作线程通过worker实现,在ReentrantLock锁的保证下,会将worker工作线程加入到workers(HashSet),并启动Worker当中的线程。

/** * Main worker run loop. Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * * 1. We may start out with an initial task, in which case we * don't need to get the first one. Otherwise, as long as pool is * running, we get tasks from getTask. If it returns null then the * worker exits due to changed pool state or configuration * parameters. Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread. * * 2. Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set. * * 3. Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task. * * 4. Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute. * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables. * Because we cannot rethrow Throwables within Runnable.run, we * wrap them within Errors on the way out (to the thread's * UncaughtExceptionHandler). Any thrown exception also * conservatively causes thread to die. * * 5. After task.run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die. According to JLS Sec 14.20, this exception is the one that * will be in effect even if task.run throws. * * The net effect of the exception mechanics is that afterExecute * and the thread's UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code. * * @param w the worker */ final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }

runWorker() - - 线程执行的过程。它的核心工作流程为:

1、线程启动之后, 通过unlock释放锁,为了允许当前线程可被中断。2、获取任务firstTask,执行任务的run方法,在执行run方法之前,还需要进行加锁操作,任务执行完需要释放锁。3、在执行任务的前后,可以根据业务场景自定义beforeExecute方法和afterExecute方法。

submit底层源码

/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } /** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } submit()方法存在于AbstractExecutorService抽象类中,通过源码我们可以发现,submit()方法在底层仍然调用的是execute()方法,因此,submit()方法提交任务的流程与execute()方法大体一致。
最新回复(0)