日常使用的使用看这两种创建方式差别不大,实际上稍微有点区别:主要在于多线程访问同一资源的情况下,Running方式创建的线程可以操作同一共享资源,继承Thread方式创建的线程是独立各自有自己的资源,互不干扰(当然我们可以给Thread类中通过各种方式传递参数等)。
只有一个两个线程时手动创建也没什么问题,但是在高并发的情况下,大量创建线程是有致命缺陷的: 1、线程的创建和销毁是需要一定时间的,大量创建销毁线程的时候,所消耗的时间就会体现出来,这样性能就会比较差 2、过多过频繁的线程创建、运行与销毁会给cpu吞吐量带来极大的压力,导致性能急剧下降,而且占用内存很大的情况下,有可能oom 3、过多的线程创建与销毁会使GC频繁回收导致内存抖动,移动端会发生卡顿现象,这是很致命的
综上,在高并发的情况下,java推荐我们应当使用线程池来管理众多线程
实际上线程池就是将很多线程通过一定的逻辑架构缓存起来,而不是一个线程创建用完就将其销毁掉,这样自然就能避免高并发下大量创建线程的性能缺陷。
1、线程的创建和销毁由线程池维护,一个线程在完成任务后并不会立即销毁,而是由后续的任务复用这个线程,从而减少线程的创建和销毁,节约系统的开销 2、线程池旨在线程的复用,这就可以节约我们用以往的方式创建线程和销毁所消耗的时间,减少线程频繁调度的开销,从而节约系统资源,提高系统吞吐量 3、在执行大量异步任务时提高了性能 4、Java内置的一套ExecutorService线程池相关的api,可以更方便的控制线程的最大并发数、线程的定时任务、单线程的顺序执行等
官方推荐使用这种方法来创建线程池,而是推荐使用Executors的工厂方法来创建线程池,Executors类是官方提供的一个工厂类,它里面封装好了众多功能不一样的线程池,从而使得我们创建线程池非常的简便,主要提供了如下五种功能不一样的线程池:
1、固定线程池:Executors.newFixedThreadPool(coreThreadSize)
适用场景:核心线程池数固定,适合执行长期的任务,可重用(非核心线程无限存活,不会被默认回收) 队列:LinkedBlockingQueue
int availableRrocessorsNum = RunTime.getRunTime().availableProcessors(); ExecutorService fixedThreadPool = Executors.newFixedThreadPool(availableRrocessorsNum ); for (int i = 1; i <= 10; i++) { final int index = i; fixedThreadPool.execute(new Runnable() { @Override public void run() { String threadName = Thread.currentThread().getName(); Log.v("zxy", "线程:"+threadName+",正在执行第" + index + "个任务"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); }2、单一线程池:Executors.newSingleThreadExecutor()
适用场景:核心线程数为1,适合一个一个任务执行,可重用 队列:LinkedBlockingQueue
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor(); for (int i = 1; i <= 10; i++) { final int index = i; singleThreadPool.execute(new Runnable() { @Override public void run() { String threadName = Thread.currentThread().getName(); Log.v("zxy", "线程:"+threadName+",正在执行第" + index + "个任务"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); }3、按需分配线程池:Executors.newCacheThreadPool()
适用场景:按需分配需要多少线程池就创建多少,无上限,可重用但是会受到keepaliveTime参数影响,线程存活时间到了就会被回收掉,且不是提交一个任务立马就创建新线程,有可用的就会复用 队列:SynchronousQueue
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 1; i <= 10; i++) { final int index = i; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } cachedThreadPool.execute(new Runnable() { @Override public void run() { String threadName = Thread.currentThread().getName(); Log.v("zxy", "线程:" + threadName + ",正在执行第" + index + "个任务"); try { long time = index * 500; Thread.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } }); }4、周期性执行线程池:Executors.scheduleThreadPool(coreThreadSize)
适用场景:适合延迟任务或者周期性任务的执行 队列:DelayedWorkQueue
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3); //延迟2秒后执行该任务 scheduledThreadPool.schedule(new Runnable() { @Override public void run() { } }, 2, TimeUnit.SECONDS); //延迟1秒后,每隔2秒执行一次该任务 scheduledThreadPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { } }, 1, 2, TimeUnit.SECONDS);5、单一周期性执行线程池:Executors.newSingleThreadExecutor()
适用场景:这个和4没什么区别,只是核心线程数为1,而上面4的线程池可以指定线程数 队列:DelayedWorkQueue
ScheduledExecutorService singleThreadScheduledPool = Executors.newSingleThreadScheduledExecutor(); //延迟1秒后,每隔2秒执行一次该任务 singleThreadScheduledPool.scheduleAtFixedRate(new Runnable() { @Override public void run() { String threadName = Thread.currentThread().getName(); Log.v("zxy", "线程:" + threadName + ",正在执行"); } },1,2,TimeUnit.SECONDS);我们从源码来看,其实上述几种线程池创建方式底层都是通过ThreadPoolExecutor来实现的,只是因为它的构造参数比较多,代码会繁琐一点,学习源码的过程中实质掌握了这一种核心实现即可达到举一反三的效果,
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(int coreThreadSize, int maximumPoolSize, long keepAliveTime, TimeUnit Unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectExecutionHandler handler);这里有一个重点,手机的线程数并不是越大越好,而是要根据硬件性能来决定,我们可以在代码中获取CPU核心线程数,来动态给线程池基于不同硬件设备设置coreThreadSize
int availableRrocessorsNum = RunTime.getRunTime().availableProcessors();下面来看ThreadPoolExecutor构造参数涵义: 1、int coreThreadSize 核心线程数 2、int maximumPoolSize 最大线程数 3、long keepAliveTime 非核心线程存活时长 4、TimeUnit Unit 3中时间的单元,TimeUnit是一个枚举类型,TimeUnit.SECONDS代表秒, 5、BlockingQueue workQueue 阻塞队列,就是说提交的任务是以怎样的逻辑运行执行任务,是靠这个阻塞队列去调度的 6、ThreadFactory threadFactory 线程工厂,用来创建线程池中的线程,通常用默认的即可 7、RejectExecutionHandler handler 拒绝策略 abortPolicy,就是说提交的任务把核心线程和最大线程都用完了,后续提交到队列应该以怎样的逻辑去运行,是直接抛异常,还是丢弃等等
这里前四个参数是一目了然的,重点是阻塞队列和拒绝策略,我们来着重分析一下: BlockingQueue阻塞队列 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出; 常用的队列主要有以下两种:(当然通过不同的实现方式,还可以延伸出很多不同类型的队列,DelayQueue就是其中的一种)
先进先出FIFO队列:先插入的队列的元素也最先出队列,类似于排队的功能。从某种程度上来说这种队列也体现了一种公平性。后进先出LOFO队列:后插入队列的元素最先出队列,这种队列优先处理最近发生的事件。 下面两幅图演示了BlockingQueue的两个常见阻塞场景: 如上图所示:当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。 如上图所示:当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。 这也是我们在多线程环境下,为什么需要BlockingQueue的原因。作为BlockingQueue的使用者,我们再也不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,因为这一切BlockingQueue都给你一手包办了。既然BlockingQueue如此神通广大,让我们一起来见识下它的常用方法:BlockingQueue的核心方法: 放入数据: offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.(本方法不阻塞当前执行方法的线程) offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。 put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续. 获取数据: poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null; poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。 take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入; drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数), 通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
常见BlockingQueue 在了解了BlockingQueue的基本功能后,让我们来看看BlockingQueue大致有哪些? BlockingQueue成员详细介绍
1、ArrayBlockingQueue实现原理:
ArrayBlockingQueue基于数组的阻塞队列实现,内部维护了一个定长数组用于存储队列的数据对象,还有两个整形变量,用于记录数组的头和尾在数组中的位置。 ArrayBlockingQueue在生产者放入数据和消费者获取数据时,都是用同一个锁底下,因此意味着两者无法真正实现并行运行,这是ArrayBlockingQueue与LinkedBlockingQueue最大的区别
2、LinkedBlockingQueue实现原理:
LinkedBlockingQueue基于链表的阻塞队列,内部维护了一个链表用于存储数据对象,当生产者往队列中放入数据,队列会从生产者手中获取,并缓存在队列中,而生产者立即返回;当队列缓冲区达到最大缓存容量(LinkedBlockingQueue可以通过构造函数指定该值),就会阻塞生产者队列,直到消费者从队列中取出数据,生产者线程即被唤醒,对于消费者亦是同理,LinkedBlockingQueue可以高效处理并发任务是因为其对于生产者消费者两端使用分离锁来控制数据同步,需要特别注意的是,构造LinkedBlockingQueue对象时,如果不传入最大缓存容量,其默认值是Integer.MAX_VALUE,这样生产者数据太多可能会发生oom,
3、DelayQueue实现原理:
类似Handle延时处理,DelayQueue容器无上限队列,生产者插入数据永远不会阻塞,消费者取数据时根据传入的延时参数决定是否阻塞。
4、PriorityBlockingQueue实现原理:
PriorityBlockingQueue基于优先级执行阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),不会阻塞生产者队列,只会在没有可消费的数据时,阻塞取数据的消费者,所以和3一样需要控制生产者速度,不能比消费者快,否则大量的任务会耗尽系统内存。PriorityBlockingQueue内部控制线程同步采用同步锁
5、SynchronousQueue实现原理:
无缓存的等待队列,生产者提交的任务必须直接找到消费者,无法像有缓存的BlockingQueue集中批量处理,这个缓存队列就像中间商一样,将生产者与消费者解耦了。声明一个SynchronousQueue有两种方式,分为公平模式与非公平模式: 公平模式SynchronousQueue队列使用公平锁,配合FIFO队列来阻塞多余的生产者与消费者,从而体现公平策略 非公平模式是SynchronousQueue是默认模式,采用非公平锁,配合LIFO队列管理多余生产者与消费者,
使用上述五种线程池基本可以满足我们的日常需求,当然也可以自定义线程池,就会用到五种BlockingQueue和abortPolicy,这里用一个可以暂停线程池Demo来展示(阻塞队列使用PriorityBlockingQueue) 1、首先我们可以创建这样一个线程池
ExecutorService priorityThreadPool = new ThreadPoolExecutor(coreThreadSize,maxThreadSize,keepAlivetime,TimeUnit.SECONDES,new PriorityBlockingQueue<Runnable>())2、然后创建一个实现Runnable接口的类,并向外提供一个抽象方法供我们实现自定义功能,并实现Comparable接口,实现这个接口主要就是进行优先级的比较:
public abstract class PriorityRunnable implements Runnable, Comparable<PriorityRunnable> { private int priority; public PriorityRunnable(int priority) { if (priority < 0) throw new IllegalArgumentException(); this.priority = priority; } @Override public int compareTo(PriorityRunnable another) { int my = this.getPriority(); int other = another.getPriority(); return my < other ? 1 : my > other ? -1 : 0; } @Override public void run() { perform(); } public abstract void perform(); public int getPriority() { return priority; }3、使用上面定义的PriorityRunnable提交任务
ExecutorService priorityThreadPool = new ThreadPoolExecutor(coreThreadSize,maxThreadSize,keepAlivetime,TimeUnit.SECONDES,new PriorityBlockingQueue<Runnable>()); for (int i = 1; i <= 10; i++) { final int priority = i; priorityThreadPool.execute(new PriorityRunnable(priority) { @Override public void perform() { String threadName = Thread.currentThread().getName(); Log.v("zxy", "线程:" + threadName + ",正在执行优先级为:" + priority + "的任务"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); }这是展示具有优先级的线程池,我们再将暂停的功能加上,ThreadPoolExecutor提供了三个函数供开发者扩展需求,分别是
1、beforeExecute() 任务执行前调用 2、afterExecute()任务执行后调用 3、terminated()线程池关闭后调用
1、2两个函数在ThreadPoolExecutor类中runWork函数中可以看到,runWorker()是ThreadPoolExecutor的内部类Worker实现的方法,Worker它实现了Runnable接口,也正是线程池内处理任务的工作线程,而Worker.runWorker()方法则是处理我们所提交的任务的方法,它会同时被多个线程访问,所以我们看runWorker()方法的实现,由于涉及到多个线程的异步调用,必然是需要使用锁来处理,而这里使用的是Lock来实现的,我们来看看runWorker()方法内主要实现,step1与step2分别对应,源码如下:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //step 1 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //step 2 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }第三个函数terminated()是在关闭线程池的时候调用,关闭线程池有两种方法,源码中很容易找到,就不贴代码了
1、shutdown()在执行时如果当前正在执行任务,这些执行的任务不会被终止掉 2、shutdownNow()会试着终止掉当前正在执行的任务,并返回未执行的任务列表
因此扩展线程池只需重写这三个函数,
public class MyThreadPoolExecutor extends ThreadPoolExecutor { private boolean pauseStatus; private ReentrantLock pauseLock = new ReentrantLock(); private Condition unPauseed = pauseLock.newCondition(); public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); pauseLock.lock(); try{ whild(pauseStatus){ unPauseed.await(); } }catch(InterruptedException e){ e.interrupt(); }finally{ pauseLock.unlock(); } Log.v("ccc", "线程:" + threadName + "准备执行任务!"); } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); String threadName = Thread.currentThread().getName(); Log.v("ccc", "线程:" + threadName + "任务执行结束!"); } @Override protected void terminated() { super.terminated(); Log.v("ccc", "线程池结束!"); } }然后结合上面的优先级线程池的实现,创建具有暂停功能的优先级线程池:
MyThreadPoolExecutor pausableThreadPoolExecutor = new MyThreadPoolExecutor (1, 1, 0L, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>()); for (int i = 1; i <= 100; i++) { final int priority = i; pausableThreadPoolExecutor.execute(new PriorityRunnable(priority) { @Override public void perform() { runOnUiThread(new Runnable() { @Override public void run() { textView.setText(priority + ""); } }); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); } //这里可以用一个button控制切换 if (isPause) { pausableThreadPoolExecutor.resume(); isPause = false; } else { pausableThreadPoolExecutor.pause(); isPause = true; }插一句题外话,AsyncTask (handle+Thread实现),实际上Thread是概括,最终是用线程池实现的,两个线程池,一个是串行线程池,一个是固定线程数量的线程池
