Java中创建线程主要有三种方式:
继承Thread类创建线程类 定义Thread类的子类,并重写该类的run方法,该run方法的方法体就代表了线程要完成的任务。因此把run()方法称为执行体创建Thread子类的实例,即创建了线程对象调用线程对象的start()方法来启动该线程通过Runnable接口创建线程类 定义runnable接口的实现类,并重写该接口的run()方法,该run()方法的方法体同样是该线程的线程执行体创建 Runnable实现类的实例,并依此实例作为Thread的target来创建Thread对象,该Thread对象才是真正的线程对象调用线程对象的start()方法来启动该线程通过Callable接口和FutureTask类创建线程 创建Callable接口的实现类,并实现call()方法,该call()方法将作为线程执行体,并且有返回值创建Callable实现类的实例,使用FutureTask类来包装Callable对象,该FutureTask对象封装了该Callable对象的call()方法的返回值使用FutureTask对象作为Thread对象的target创建并启动新线程调用FutureTask对象的get()方法来获得子线程执行结束后的返回值其余详略,自查网络
synchronized关键字以内存中的一个对象作为锁(互斥锁),获取到这个对象的线程可以执行synchronized内部的代码,执行完毕才放弃锁
synchronized其实锁定的是可以是对象(在方法上修饰也属于锁定类),也可以是类 若锁定当前类,则当前类只能由一个线程进入执行,其他线程阻塞synchronized修饰非静态方法和使用synchronized(this)都是锁住了这个类的对象,如果多线程访问,对象不同,就锁不住,对象固定是一个,就可锁住使用synchronized(类名.class)和修饰静态方法,是锁住了代码块,不管多线程访问的时候对象是不是同一个(能缩小代码段的范围就尽量缩小,能在代码段上加同步就不要再整个方法上加同步,缩小锁的粒度)基本使用
public class T { private int count = 10; private Object o = new Object(); public void m() { synchronized(o) { //任何线程要执行下面的代码,必须先拿到o的锁 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } }专门创建一个无用的对象作为锁显得浪费,可以直接以当前对象作为锁
public class T { private int count = 10; public void m() { synchronized(this) { //任何线程要执行下面的代码,必须先拿到this的锁 count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } } }以当前对象作为锁,可以省略synchronized块的定义,直接在方法上修饰
public class T { private int count = 10; public synchronized void m() { //等同于在方法的代码执行时要synchronized(this) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } }synchronized修饰静态方法,锁定的是字节码对象
下面代码m方法与mm方法的锁定效果一样
public class T { private static int count = 10; public synchronized static void m() { //这里等同于synchronized(yxxy.c_004.T.class) count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void mm() { synchronized(T.class) { //考虑一下这里写synchronized(this)是否可以? // 不可以,静态方法不能访问非静态对象 count --; } } }下面的代码可以演示到多线程运行时出现的重复问题,若取消synchronized关键字的注释,可以避免这个问题
public class T implements Runnable { private int count = 10; public /*synchronized*/ void run() { count--; System.out.println(Thread.currentThread().getName() + " count = " + count); } public static void main(String[] args) { T t = new T(); for(int i=0; i<5; i++) { new Thread(t, "THREAD" + i).start(); } } }一个synchronized代码块是作为原子性操作的,整体不可分
同步方法与非同步方法是可以同时调用/执行的
原因是只有synchronized方法执行才需要申请锁,其他方法不需要申请锁,两者互不干扰(比喻:A上厕所锁了门与来洗手盆洗手的B没有关系)
public class T { public synchronized void m1() { System.out.println(Thread.currentThread().getName() + " m1 start..."); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m1 end"); } public void m2() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " m2 "); } public static void main(String[] args) { T t = new T(); /*new Thread(()->t.m1(), "t1").start(); new Thread(()->t.m2(), "t2").start();*/ new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start(); /* new Thread(new Runnable() { @Override public void run() { t.m1(); } }); */ } }面试有关小知识点:银行代码当然需要加锁,写操作需要加锁那么读操作需要加锁吗?
答案是需要的:若不加锁,当的写操作(加锁)阻塞的同时进行读操作(不加锁,会产生脏读现象,如扣了钱但查出来的余额仍然不变除非业务允许脏读,读操作可以不加锁以提升性能一个同步方法可以调用另外一个同步方法(包括子类同步方法调用父类同步方法也是可以的)
一个线程已经拥有某个对象的锁,再次申请相同的锁的时候仍然会得到该对象的锁
即已取得锁的当前线程再申请获取同一个锁是可行的
也就是说synchronized获得的锁是可重入的
下面代码m1内调用m2,是可以执行的
import java.util.concurrent.TimeUnit; public class T { synchronized void m1() { System.out.println("m1 start"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } m2(); } synchronized void m2() { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("m2"); } }程序在执行过程中,如果出现异常,默认情况锁会被释放
所以,在并发处理的过程中,有异常要多加小心,可能会发生不一致的情况。
比如,在一个web app处理过程中,多个servlet线程共同访问同一个资源,这时如果异常处理不合适,在第一个线程中抛出异常,其他线程就会进入同步代码区,有可能会访问到异常产生时第一个线程未修改完的数据
因此要非常小心的处理同步业务逻辑中的异常
当可能抛出异常可以catch来避免上述错误(如回滚数据操作)
import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m() { System.out.println(Thread.currentThread().getName() + " start"); while(true) { count ++; System.out.println(Thread.currentThread().getName() + " count = " + count); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } if(count == 5) { int i = 1/0; //此处抛出异常,锁将被释放,要想不被释放,可以在这里进行catch,然后让循环继续 System.out.println(i); } } } public static void main(String[] args) { T t = new T(); Runnable r = new Runnable() { @Override public void run() { t.m(); } }; new Thread(r, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(r, "t2").start(); } }死锁
最基本原理:
线程1需要按顺序获取锁A和锁B来执行,线程2需要按顺序获取锁B和锁A来执行,当线程1获取了锁A未获取锁B时线程2并发,线程2执行获取了锁B,此时就发生死锁,线程1无法获取锁B,线程2也无法获取锁A来继续执行,程序卡死
下面程序可以模拟死锁
public class T { Object a = new Object(); Object b = new Object(); public void m1() { synchronized (a) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (b) { System.out.println("success1"); } } } public void m2() { synchronized (b) { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } synchronized (a) { System.out.println("success2"); } } } public static void main(String[] args) { T t = new T(); new Thread(t::m1, "t1").start(); new Thread(t::m2, "t2").start(); } }synchronized的优化
synchronized同步代码块中的代码越少越好
import java.util.concurrent.TimeUnit; public class T { int count = 0; synchronized void m1() { //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //业务逻辑中只有下面这句需要sync,这时不应该给整个方法上锁 count ++; //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } void m2() { //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //业务逻辑中只有下面这句需要sync,这时不应该给整个方法上锁 //采用细粒度的锁,可以使线程争用时间变短,从而提高效率 synchronized(this) { count ++; } //do sth need not sync try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } } }锁定对象状态对锁定的影响
锁定某对象o,如果o的属性发生改变,不影响锁的使用,但是如果o变成另外一个对象,则锁定的对象发生改变
应该避免将锁定对象的引用变成另外的对象
当锁定对象改变后,多个线程间获取的锁对象就有可能不一样了,使得同步代码块失效
下面代码说明了这个问题,锁定对象改变了,线程t2进入同步代码块执行,但若锁定对象不变,线程t2将不能进入同步代码块执行
import java.util.concurrent.TimeUnit; public class T { Object o = new Object(); void m() { synchronized(o) { while(true) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); } } } public static void main(String[] args) { T t = new T(); //启动第一个线程 new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } //创建第二个线程 Thread t2 = new Thread(t::m, "t2"); t.o = new Object(); //锁对象发生改变,所以t2线程得以执行,如果注释掉这句话,线程2将永远得不到执行机会 t2.start(); } }不要以String字符串类型作为锁对象
不要以字符串常量作为锁定对象,由于String类型的特殊性(常量池),表面上变量名不同的两个String对象可能指向的是同一个地址
在下面的程序中,m1和m2其实锁定的是同一个对象
这种情况还会发生比较诡异的现象,比如你用到了一个类库,在该类库中代码锁定了字符串“Hello”,但是你读不到源码,所以你在自己的代码中也锁定了"Hello",这时候就有可能发生非常诡异的死锁阻塞,因为你的程序和你用到的类库不经意间使用了同一把锁 public class T { String s1 = "Hello"; String s2 = "Hello"; void m1() { synchronized(s1) { } } void m2() { synchronized(s2) { } } }volatile关键字,可以使一个变量在多个线程间可见
如A、B线程都用到同一个变量,Java默认是A线程中保留一份copy,这样如果B线程修改了该变量,则A线程未必知道
在下面的代码中,running是存在于堆内存的t对象中,当线程t1开始运行的时候,会把running值从内存中读到(拷贝到)t1线程的工作区,在运行过程中直接使用这个copy,并不会每次都去读取堆内存,这样,当主线程修改running的值之后,t1线程感知不到,所以不会停止运行
使用volatile关键字,将会强制所有线程都去堆内存中读取变量running的值
可以阅读这篇文章进行更深入的理解 http://www.cnblogs.com/nexiyi/p/java_memory_model_and_thread.html
volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized
下面代码可以说明volatile关键字的效果,当running变量可以被其他线程改变时,while代码块才会结束并打印"m end"
import java.util.concurrent.TimeUnit; public class T { /*volatile*/ boolean running = true; //对比一下有无volatile的情况下,整个程序运行结果的区别 void m() { System.out.println("m start"); while(running) { /* try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }*/ // 若上面几行代码取消注释,有可能在cpu空闲时去读取堆内存中的running变量的值从而结束while代码块 } System.out.println("m end!"); } public static void main(String[] args) { T t = new T(); new Thread(t::m, "t1").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t.running = false; } }volatile不能替代synchronized
volatile并不能保证多个线程共同修改running变量时所带来的不一致问题,也就是说volatile不能替代synchronized
运行下面的程序输出count的值会出现不到100000的结果,原因是因为虽然volatile可以保证每个线程读取count的值是同步的,但不能保证/要求线程写入count的值时一定是根据此时的count值+1的操作
即读操作和写操作是复合操作,不构成原子性操作
解决这个问题可以使用synchronized关键字修饰m方法即可以保证可见性和原子性同步
import java.util.ArrayList; import java.util.List; public class T { volatile int count = 0; void m() { for (int i = 0; i < 10000; i++) count++; } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }若仅在数据进行简单的运算时需要保证原子性,可以使用AtomicXXX类
AtomicXXX类本身方法都是原子性的,但不能保证多个方法连续调用是原子性的,即多个AtomicXXX类的方法连续被调用的时候不能保证原子性
import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; public class T { /*volatile*/ //int count = 0; AtomicInteger count = new AtomicInteger(0); /*synchronized*/ void m() { for (int i = 0; i < 10000; i++) //if count.get() < 1000 count.incrementAndGet(); //count++ } public static void main(String[] args) { T t = new T(); List<Thread> threads = new ArrayList<Thread>(); for (int i = 0; i < 10; i++) { threads.add(new Thread(t::m, "thread-" + i)); } threads.forEach((o) -> o.start()); threads.forEach((o) -> { try { o.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); System.out.println(t.count); } }基本概念
线程安全有两个方面:执行控制和内存可见
执行控制的目的是控制代码执行(顺序)及是否可以并发执行内存可见控制的是线程执行结果在内存中对其它线程的可见性,根据Java内存模型的实现,线程在具体执行时,会先拷贝主存数据到线程本地(CPU缓存),操作完成后再把结果从线程本地刷到主存synchronized关键字解决的是执行控制的问题,它会阻止其它线程获取当前对象的监控锁,这样就使得当前对象中被synchronized关键字保护的代码块无法被其它线程访问,也就无法并发执行。更重要的是,synchronized还会创建一个内存屏障,内存屏障指令保证了所有CPU操作结果都会直接刷到主存中,从而保证了操作的内存可见性,同时也使得先获得这个锁的线程的所有操作,都happens-before于随后获得这个锁的线程的操作。
volatile关键字解决的是内存可见性的问题,会使得所有对volatile变量的读写都会直接刷到主存,即保证了变量的可见性。这样就能满足一些对变量可见性有要求而对读取顺序没有要求的需求。
使用volatile关键字仅能实现对原始变量(如boolen、 short 、int 、long等)操作的原子性,但需要特别注意, volatile不能保证复合操作的原子性,即使只是i++,实际上也是由多个原子操作组成:read i; inc; write i,假如多个线程同时执行i++,volatile只能保证他们操作的i是同一块内存,但依然可能出现写入脏数据的情况。
volatile与synchronized两者区别
volatile本质是在告诉jvm当前变量在寄存器(工作内存)中的值是不确定的,需要从主存中读取; synchronized则是锁定当前变量,只有当前线程可以访问该变量,其他线程被阻塞住volatile仅能使用在变量级别;synchronized则可以使用在变量、方法、和类级别的volatile仅能实现变量的修改可见性,不能保证原子性;而synchronized则可以保证变量的修改可见性和原子性volatile不会造成线程的阻塞;synchronized可能会造成线程的阻塞volatile标记的变量不会被编译器优化;synchronized标记的变量可以被编译器优化volatile的性能比synchronized的性能要高
可以用volatile的时候尽量避免使用synchronized
ReentrantLock可以用于替代synchronized,前者可以完成后者可完成的功能且更灵活(但性能没有明显区别)
但使用ReentrantLock必须手动释放锁,使用synchronized锁定的话如果遇到异常,JVM会自动释放锁,但是ReentrantLock必须手动释放锁,因此经常在finally块中进行锁的释放
ReentrantLock是Java并发包中互斥锁,它有公平锁和非公平锁两种实现方式
详细可见https://www.jianshu.com/p/155260c8af6c
基本使用
下面的代码可以演示使用重入锁完成使m1方法与m2方法互斥的功能,
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock2 { Lock lock = new ReentrantLock(); // 创建锁 void m1() { try { lock.lock(); //synchronized(this) 申请并锁定锁lock for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); // 解锁 } } void m2() { lock.lock(); // 申请并锁定锁lock System.out.println("m2 ..."); lock.unlock(); // 解锁 } public static void main(String[] args) { ReentrantLock2 rl = new ReentrantLock2(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }tryLock方法——“尝试锁定”
使用ReentrantLock可以调用tryLock方法尝试进行锁定
线程可以根据tryLock方法的返回值判断是否锁定并以此决定是否继续等待或执行其他操作
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock3 { Lock lock = new ReentrantLock(); void m1() { try { lock.lock(); for (int i = 0; i < 10; i++) { TimeUnit.SECONDS.sleep(1); System.out.println(i); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } /** * 使用tryLock进行尝试锁定,不管锁定与否,方法都将继续执行 * 可以根据tryLock的返回值来判定是否锁定 * 也可以指定tryLock的时间,由于tryLock(time)抛出异常,所以要注意unclock的处理,必须放到finally中 */ void m2() { /* boolean locked = lock.tryLock(); System.out.println("m2 ..." + locked); if(locked) lock.unlock(); */ boolean locked = false; try { locked = lock.tryLock(5, TimeUnit.SECONDS); System.out.println("m2 ..." + locked); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(locked) lock.unlock(); } } public static void main(String[] args) { ReentrantLock3 rl = new ReentrantLock3(); new Thread(rl::m1).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(rl::m2).start(); } }lockInterruptibly方法
调用lockInterruptibly方法申请锁的线程可以对线程interrupt方法做出响应
即若主线程想调用interrupt方法打断某一个线程的执行,通常来讲通过lock方法申请锁却因为未申请到锁而阻塞的线程不能被打断,而通过lockInterruptibly方法申请锁的线程阻塞时可以对主线程调用interrupt方法作出响应而被打断
简言之:lockInterruptibly方法的作用是使在一个线程在等待锁的过程中,可以被打断
下面的代码可以演示当线程t1霸占锁使得线程t2一直等待获得锁而阻塞,t2使用lockInterruptibly方法代替lock方法来声明申请锁,主线程可以通过调用线程t2对象的interrupt方法打断线程t2的执行——“别等了,哥们”
线程t1在获取/申请锁的过程中不响应中断(lock方法),而t2在获取/申请锁的过程响应中断(lockInterruptibly方法)
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; public class ReentrantLock4 { public static void main(String[] args) { Lock lock = new ReentrantLock(); Thread t1 = new Thread(()->{ try { lock.lock(); System.out.println("t1 start"); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE); System.out.println("t1 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t1.start(); Thread t2 = new Thread(()->{ try { //lock.lock(); lock.lockInterruptibly(); //可以对interrupt()方法做出响应 System.out.println("t2 start"); TimeUnit.SECONDS.sleep(5); System.out.println("t2 end"); } catch (InterruptedException e) { System.out.println("interrupted!"); } finally { lock.unlock(); } }); t2.start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } t2.interrupt(); //打断线程2的等待 } }
ReentrantLock可以被指定为公平锁
ReentrantLock在创建时可以被指定为公平锁,而synchronized是非公平锁
公平锁
公平锁:某个对象的锁对所有线程都是公平的,先到先得。每次加锁前都会检查队列里面有没有排队等待的线程,有则排队等待,没有才会尝试获取锁。
非公平锁:当一个线程采用非公平锁这种方式获取锁时,该线程会首先去尝试获取锁而不是等待。如果没有后去成功,那么它才会去队列里面等待。
下面代码使用公平锁,结果应该是两个线程交替打印,若创建重入锁时没有传入true,则打印结果无法预测
import java.util.concurrent.locks.ReentrantLock; public class ReentrantLock5 extends Thread { private static ReentrantLock lock=new ReentrantLock(true); //参数为true表示为公平锁,请对比输出结果 public void run() { for(int i=0; i<100; i++) { lock.lock(); try{ System.out.println(Thread.currentThread().getName()+"获得锁"); }finally{ lock.unlock(); } } } public static void main(String[] args) { ReentrantLock5 rl=new ReentrantLock5(); Thread th1=new Thread(rl); Thread th2=new Thread(rl); th1.start(); th2.start(); } }基本使用
ThreadLocal是使用空间换时间,synchronized是使用时间换空间
简言之:ThreadLocal中存放的数据每个线程独立一份,各个线程之间的ThreadLocal互不影响
当单个线程可以独立维护一个变量,不需要或不应该被其他线程修改这个变量,则可以使用TreadLocal,比如在Hibernate中session就存在与ThreadLocal中,避免synchronized的使用
注意:ThreadLocal可能会导致内存泄漏
下面代码结果应该是线程t1通过get方法不能获取Person对象(获取了null)
import java.util.concurrent.TimeUnit; public class ThreadLocal2 { //volatile static Person p = new Person(); static ThreadLocal<Person> tl = new ThreadLocal<>(); public static void main(String[] args) { new Thread(()->{ try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(tl.get()); }).start(); new Thread(()->{ try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } tl.set(new Person()); }).start(); } static class Person { String name = "zhangsan"; } }详见https://blog.csdn.net/cselmu9/article/details/51366946
单例模式
两个特点
在任何情况下,单例类永远只有一个实例存在
单例需要有能力为整个系统提供这一唯一实例
下面的程序使用了静态内置类的方式来实现单例模式
可以实现懒加载而且线程安全
import java.util.Arrays; public class Singleton { private Singleton() { System.out.println("single"); } private static class Inner { private static Singleton s = new Singleton(); } public static Singleton getSingle() { return Inner.s; } public static void main(String[] args) { Thread[] ths = new Thread[200]; for(int i=0; i<ths.length; i++) { ths[i] = new Thread(()->{ System.out.println(Singleton.getSingle()); }); } Arrays.asList(ths).forEach(o->o.start()); } }使用早期的同步容器以及Collections.synchronized**方法的不足之处,本文省略不表,请阅读: http://blog.csdn.net/itm_hadf/article/details/7506529*
使用新的并发容器http://xuganggogo.iteye.com/blog/321630
有N张火车票,每张票都有一个编号,同时有10个窗口对外售票,写一个模拟程序
实现一
以下是最基本的实现,弊端为ArrayList的remove方法不是同步的,可能重复remove同一张票;判断剩余票数的代码也不是同步的,可能卖出超过10000张票
/** * 有N张火车票,每张票都有一个编号 * 同时有10个窗口对外售票 * 请写一个模拟程序 * * 分析下面的程序可能会产生哪些问题? * 重复销售?超量销售? * * @author 马士兵 */ import java.util.ArrayList; import java.util.List; public class TicketSeller1 { static List<String> tickets = new ArrayList<>(); static { for(int i=0; i<10000; i++) tickets.add("票编号:" + i); } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { System.out.println("销售了--" + tickets.remove(0)); } }).start(); } } }实现二
使用Vector或者Collections.synchronizedXXX这种同步容器来实现同步
仍然存在弊端,因为判断条件的代码和操作容器的代码分离了,虽然Vector的remove方法是同步的,但判断是否还有票的代码不是同步的,可能售出超过10000张票(若打开while块内线程sleep的语句,可以模拟这种出错,原因是多个线程进入到了while块同时执行remove操作了)
即从判断到操作容器这两步中间可能会出现并发问题,因为两步操作不同步
解决方法呼之欲出了,使判断(size>0)和操作容器(remove方法)的代码同步,即实现原子性
import java.util.Vector; import java.util.concurrent.TimeUnit; public class TicketSeller2 { static Vector<String> tickets = new Vector<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 编号:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(tickets.size() > 0) { /*try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }*/ System.out.println("销售了--" + tickets.remove(0)); } }).start(); } } }实现三
通过在判断和操作这两步外部加synchronized块实现原子性
但这种实现的效率并不特别高
import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; public class TicketSeller3 { static List<String> tickets = new LinkedList<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 编号:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { synchronized(tickets) { if(tickets.size() <= 0) { break; } try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("销售了--" + tickets.remove(0)); } } }).start(); } } }实现四
使用并发容器ConcurrentLinkedQueue(并发链表队列)提高并发性
其中poll方法是同步的,操作代码并没有加锁,但可以实现高效率线程安全操作
原理是队列的特性:队列内一般不允许有值为null的元素(即使容器允许null值也应该作判断避免传入null值),若没有元素则返回null,先取出后判断,并不是先判断后取出,所以完美解决了判断和操作两部分的同步问题
import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; public class TicketSeller4 { static Queue<String> tickets = new ConcurrentLinkedQueue<>(); static { for(int i=0; i<1000; i++) { tickets.add("票 编号:" + i); } } public static void main(String[] args) { for(int i=0; i<10; i++) { new Thread(()->{ while(true) { String s = tickets.poll(); if(s == null) { break; } else { System.out.println("销售了--" + s); } } }).start(); } } }可见并发容器的优势所在,下文将介绍各个常用的并发容器类
ConcurrentMap(另有ConcurrentSet大类,此处不表)
以下程序可以演示高并发下不同并发Map容器的添加效率
小关键:这里的CountDownLatch主要是为了限制主线程等待添加操作完成后再继续执行
import java.util.Arrays; import java.util.Hashtable; import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; public class T01_ConcurrentMap { public static void main(String[] args) { //Map<String, String> map = new ConcurrentHashMap<>(); Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序 //Map<String, String> map = new Hashtable<>(); //Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX //TreeMap Random r = new Random(); Thread[] ths = new Thread[100]; CountDownLatch latch = new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0; i<ths.length; i++) { ths[i] = new Thread(()->{ for(int j=0; j<10000; j++) { map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000)); } latch.countDown(); }); } Arrays.asList(ths).forEach(t->t.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end - start); } }上文代码执行结果可以发现执行结果中在并发的情况下使用ConcurrentHashMap的效率比HashTable高,原因是HashTable在添加操作时会锁定整个容器,只响应一个线程的添加操作;而ConcurrentHashMap则是将容器分段(默认16段)(存疑,1.8之后底层改变,CAS替代分段锁,具体自查),并发操作时只锁定其中一段
在高并发且需要对元素排序的情况下,可以使用ConcurrentSkipListMap提高效率
关于SkipList(跳跃列表)可参阅http://blog.csdn.net/sunxianghuang/article/details/52221913
跳表可以理解为“多链链表”,是一种用空间换时间的数据结构,通过在每个节点中增加了向前的指针,从而提升查找的效率
写时复制容器(copy on write),当添加/删除等修改元素操作发生时,将逐一复制原列表值到新容器,修改操作(即写的操作)完成后再将原容器的引用调整至新容器,从而实现读取数据的线程安全
主要是读写分离的思想:在写的过程中引用并未指向新容器,所以读操作仍然在旧容器中读取,待写操作完成后才更新新容器的引用
CopyOnWriteArrayList的实现原理是,在一个线程开始遍历(创建Iterator对象)时,内部会创建一个“快照”数组,遍历基于这个快照Iterator进行,在遍历过程中这个快照数组不会改变,也就不会抛出ConcurrentModificationException。如果在遍历的过程中有其他线程尝试改变数组的内容,就会拷贝一份新的数据进行变更,而后面再来访问这个数组的线程,看到的就是变更过的数组。
其实CopyOnWirteArrayList主要就是解决了并发环境下修改操作和对容器遍历操作的冲突(修改时另一线程开始遍历容器会抛出ConcurrentModificationException)
可以查阅https://juejin.im/post/5aaa2ba8f265da239530b69e
有关ConcurrentModificationException的拓展资料查阅https://juejin.im/post/5a992a0d6fb9a028e46e17ef
多线程环境下,写时效率低,读时效率高,适合写少读多的环境
下面的程序可以演示CopyOnWirteArrayList的读/写的效率
import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Random; import java.util.Vector; import java.util.concurrent.CopyOnWriteArrayList; public class T02_CopyOnWriteList { public static void main(String[] args) { List<String> lists = //new ArrayList<>(); //这个会出并发问题! //new Vector(); new CopyOnWriteArrayList<>(); Random r = new Random(); Thread[] ths = new Thread[100]; for(int i=0; i<ths.length; i++) { Runnable task = new Runnable() { @Override public void run() { for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000)); } }; ths[i] = new Thread(task); } runAndComputeTime(ths); System.out.println(lists.size()); } static void runAndComputeTime(Thread[] ths) { long s1 = System.currentTimeMillis(); Arrays.asList(ths).forEach(t->t.start()); Arrays.asList(ths).forEach(t->{ try { t.join(); } catch (InterruptedException e) { e.printStackTrace(); } }); long s2 = System.currentTimeMillis(); System.out.println(s2 - s1); } }在并发环境下用得较多的容器
是无界队列,容量取决于内存
下面的代码演示基本使用ConcurrentLinkedQueue,poll和peek方法的区别是poll方法将返回并移除元素,peek方法是获取元素但不移除
另外有ConcurrentLinkedDeque并发双向链表队列
import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; public class T04_ConcurrentQueue { public static void main(String[] args) { Queue<String> strs = new ConcurrentLinkedQueue<>(); for(int i=0; i<10; i++) { strs.offer("a" + i); //add } System.out.println(strs); System.out.println(strs.size()); System.out.println(strs.poll()); System.out.println(strs.size()); System.out.println(strs.peek()); System.out.println(strs.size()); //双端队列Deque } }其中LinkedBlockingQueue是使用链表实现的阻塞式无界队列,put方法在容器已满时将等待,而take方法在容器为空时将等待(下文例题中有实现这种生产者/消费者模式的容器的程序)
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; public class T05_LinkedBlockingQueue { static BlockingQueue<String> strs = new LinkedBlockingQueue<>(); static Random r = new Random(); public static void main(String[] args) { new Thread(() -> { for (int i = 0; i < 100; i++) { try { strs.put("a" + i); //如果满了,就会等待 TimeUnit.MILLISECONDS.sleep(r.nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } } }, "p1").start(); for (int i = 0; i < 5; i++) { new Thread(() -> { for (;;) { try { System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待 } catch (InterruptedException e) { e.printStackTrace(); } } }, "c" + i).start(); } } }底层使用数组实现,容量有限制,offer方法可以向容器添加元素,并返回是否添加成功的布尔值(若容器已满则不添加元素并返回false,而使用add方法则会抛出异常)
且offer方法可以传入参数设置时间间隔,在此段时间间隔内不断添加,超时则放弃添加操作并返回false
而put方法在容器已满时将阻塞
import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class T06_ArrayBlockingQueue { static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); static Random r = new Random(); public static void main(String[] args) throws InterruptedException { for (int i = 0; i < 10; i++) { strs.put("a" + i); } strs.put("aaa"); //满了就会等待,程序阻塞 //strs.add("aaa"); //strs.offer("aaa"); //strs.offer("aaa", 1, TimeUnit.SECONDS); System.out.println(strs); } }TransferQueue有transfer方法(将元素放入容器),这个方法的作用是当多个消费者线程等待获取队列中的元素时,此时生产者再生产一个元素,不放入队列中,而是可以直接交给消费者线程,但使用了transfer方法,若没有消费者线程等待获取元素,使用transfer方法的线程将阻塞直至消费者线程出现
可以提高并发效率
import java.util.concurrent.LinkedTransferQueue; public class T08_TransferQueue { public static void main(String[] args) throws InterruptedException { LinkedTransferQueue<String> strs = new LinkedTransferQueue<>(); /*new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start();*/ //strs.transfer("aaa"); strs.put("aaa"); new Thread(() -> { try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } }是一种特殊的TransferQueue,特殊在容量为0
不能调用add方法,只能调用put方法(将阻塞等待消费者线程)
若没有消费者线程等待获取容器中的值,则会抛出异常IllegalStateException:Queue full
import java.util.concurrent.BlockingQueue; import java.util.concurrent.SynchronousQueue; public class T09_SynchronusQueue { //容量为0 public static void main(String[] args) throws InterruptedException { BlockingQueue<String> strs = new SynchronousQueue<>(); new Thread(()->{ try { System.out.println(strs.take()); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); strs.put("aaa"); //阻塞等待消费者消费 //strs.add("aaa"); System.out.println(strs.size()); } }同样,DelayQueue也是一个线程安全的无界队列
特点是当队列中的元素到达延迟时间时才能被取出,队列元素会按照最终执行时间(阻塞结束后到被执行的时间)在队列中进行排序,头部为最终执行时间最长的元素
可以使用延迟队列来执行定时任务
DelayQueue是一个无界阻塞队列,该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。如果延迟都还没有期满,则队列没有头部,并且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即使无法使用 take 或 poll 移除未到期的元素,也不会将这些元素作为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不允许使用 null 元素。
下面的代码演示了使用DelayQueue,其中DelayQueue存放的元素需要实现Delayed接口以实现元素延迟计时等功能(如下MyTask类实现了Delay接口)
import java.util.Calendar; import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class T07_DelayQueue { static BlockingQueue<MyTask> tasks = new DelayQueue<>(); static Random r = new Random(); static class MyTask implements Delayed { long runningTime; MyTask(long rt) { this.runningTime = rt; } @Override public int compareTo(Delayed o) { if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) return -1; else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) return 1; else return 0; } @Override public long getDelay(TimeUnit unit) { return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public String toString() { return "" + runningTime; } } public static void main(String[] args) throws InterruptedException { long now = System.currentTimeMillis(); MyTask t1 = new MyTask(now + 1000); MyTask t2 = new MyTask(now + 2000); MyTask t3 = new MyTask(now + 1500); MyTask t4 = new MyTask(now + 2500); MyTask t5 = new MyTask(now + 500); tasks.put(t1); tasks.put(t2); tasks.put(t3); tasks.put(t4); tasks.put(t5); System.out.println(tasks); for(int i=0; i<5; i++) { System.out.println(tasks.take()); } } }最顶层定义
Executor是一个接口,内部定义了execute(Runnable command)方法,实现类需要实现这个方法编写需要实现的具体任务
简言之,Executor的实现类是用于执行某个任务的
import java.util.concurrent.Executor; public class T01_MyExecutor implements Executor{ public static void main(String[] args) { new T01_MyExecutor().execute(()->System.out.println("hello executor")); } @Override public void execute(Runnable command) { //new Thread(command).run(); command.run(); } }ExecutorService是一个继承Executor的接口,除继承execute方法外,还定义了一系列其他关于执行任务的方法(如submit方法)
其中submit方法可以传入Callable和Runnable接口类型的参数
与Runnable接口非常相似,Runnable接口内定义了run方法,而Callable接口内部定义了call方法
与Runnable接口的区别在于,Runnable接口的run方法没有返回值且不能抛出异常,而Callable接口定义的call方法可以有返回值且可以抛出异常
Executors是简化使用Executor接口常见实现类的工具类
其中定义了一些使用的方法比如创建线程池等
具体API自查
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间
使用线程池可以达到线程的重用,提高性能
下面的程序演示了创建一个固定线程数量的线程池并直接向线程池派发任务并执行(把任务扔进线程池中,线程池中的数个线程将抢着执行任务)
其中shutdown方法的作用是关闭线程池,若线程仍在执行任务则等待线程全部空闲再关闭,有shutdownNow方法可以强制关闭线程池
isTerminated方法作用是检测此时线程池内任务是否被执行完毕(全部线程空闲)
isShutdown方法注意是检测该线程池是否执行了shutdown方法
Future
在Java中,如果需要设定代码执行的最长时间,即超时,可以用Java线程池ExecutorService类配合Future接口来实现。 Future接口是Java标准API的一部分,在java.util.concurrent包中。Future接口是Java线程Future模式的实 现,可以来进行异步计算。
Future模式可以这样来描述:我有一个任务,提交给了Future,Future替我完成这个任务。期间我自己可以去做任何想做的事情。一段时 间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future 接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。
Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间
这个设置超时的方法就是实现Java程序执行超时的关键
在Future接口中声明了5个方法
cancel方法:用于取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false 参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回trueisCancelled方法:表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回trueisDone方法:表示任务是否已经完成,若任务完成,则返回trueget()方法:用来获取执行结果(Callable的返回值),这个方法会产生阻塞,会一直等到任务执行完毕才返回get(long timeout, TimeUnit unit)方法:用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回nullFutureTask实现了Future接口
不直接构造Future对象,也可以使用ExecutorService.submit方法来获得Future对象,submit方法即支持以 Callable接口类型,也支持Runnable接口作为参数,具有很大的灵活性
下面的程序演示了两种获取Future对象的方式并通过讲task对象传入线程构造函数开启线程使用,其中FutureTask的泛型表示获取值的类型
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; public class T06_Future { public static void main(String[] args) throws InterruptedException, ExecutionException { FutureTask<Integer> task = new FutureTask<>(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1000; }); //new Callable () { Integer call();} new Thread(task).start(); System.out.println(task.get()); //阻塞 //******************************* ExecutorService service = Executors.newFixedThreadPool(5); Future<Integer> f = service.submit(()->{ TimeUnit.MILLISECONDS.sleep(500); return 1; }); System.out.println(f.get()); System.out.println(f.isDone()); } }下面程序使用FutureTask配合固定线程数的线程池实现了并行计算1-200000范围内得素数并对比了串行计算和并行计算的效率
其中不均分计算范围是因为数值越大计算量越大
import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class T07_ParallelComputing { public static void main(String[] args) throws InterruptedException, ExecutionException { long start = System.currentTimeMillis(); getPrime(1, 200000); long end = System.currentTimeMillis(); System.out.println(end - start); final int cpuCoreNum = 4; ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum); MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20 MyTask t2 = new MyTask(80001, 130000); MyTask t3 = new MyTask(130001, 170000); MyTask t4 = new MyTask(170001, 200000); Future<List<Integer>> f1 = service.submit(t1); Future<List<Integer>> f2 = service.submit(t2); Future<List<Integer>> f3 = service.submit(t3); Future<List<Integer>> f4 = service.submit(t4); start = System.currentTimeMillis(); f1.get(); f2.get(); f3.get(); f4.get(); end = System.currentTimeMillis(); System.out.println(end - start); } static class MyTask implements Callable<List<Integer>> { int startPos, endPos; MyTask(int s, int e) { this.startPos = s; this.endPos = e; } @Override public List<Integer> call() throws Exception { List<Integer> r = getPrime(startPos, endPos); return r; } } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } static List<Integer> getPrime(int start, int end) { List<Integer> results = new ArrayList<>(); for(int i=start; i<=end; i++) { if(isPrime(i)) results.add(i); } return results; } }最基本的线程池,线程数固定
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T05_ThreadPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newFixedThreadPool(5); //execute submit for (int i = 0; i < 6; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); service.shutdown(); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); TimeUnit.SECONDS.sleep(5); System.out.println(service.isTerminated()); System.out.println(service.isShutdown()); System.out.println(service); } }当任务需要时就自动创建新线程(不限制线程数量),线程默认超过60s空闲则销毁
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T08_CachedPool { public static void main(String[] args) throws InterruptedException { ExecutorService service = Executors.newCachedThreadPool(); System.out.println(service); for (int i = 0; i < 2; i++) { service.execute(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }); } System.out.println(service); TimeUnit.SECONDS.sleep(80); System.out.println(service); } }这种线程池的作用是保证多个任务顺序执行
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class T09_SingleThreadPool { public static void main(String[] args) { ExecutorService service = Executors.newSingleThreadExecutor(); for(int i=0; i<5; i++) { final int j = i; service.execute(()->{ System.out.println(j + " " + Thread.currentThread().getName()); }); } } }下面程序演示了使用固定频率执行任务
具体API自查
import java.util.Random; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class T10_ScheduledPool { public static void main(String[] args) { ScheduledExecutorService service = Executors.newScheduledThreadPool(4); service.scheduleAtFixedRate(()->{ try { TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()); }, 0, 500, TimeUnit.MILLISECONDS); } }一般来讲每个线程维护一个任务队列,每个线程只执行分配给自身的任务,而使用工作窃取线程池当有空闲线程时空闲线程将主动窃取另外线程的任务来执行
WorkStealingPool底层是由ForkJoinPool线程池实现的
注意产生的是守护线程
import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class T11_WorkStealingPool { public static void main(String[] args) throws IOException { ExecutorService service = Executors.newWorkStealingPool(); System.out.println(Runtime.getRuntime().availableProcessors()); service.execute(new R(1000)); service.execute(new R(2000)); service.execute(new R(2000)); service.execute(new R(2000)); //daemon service.execute(new R(2000)); //由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出 System.in.read(); } static class R implements Runnable { int time; R(int t) { this.time = t; } @Override public void run() { try { TimeUnit.MILLISECONDS.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(time + " " + Thread.currentThread().getName()); } } }任务的切分(切分子任务到多小)和合并可以由开发者指定,而ForkJoinPool将根据切分和合并的规则来创建线程并由ForkJoinPool维护线程
可以参阅:https://www.jianshu.com/p/8d7e3cc892cf
下面程序演示了计算长度为1000000的,内部存放随机数值的数组的和
其中使用static inner class是为了防止包可见导致命名冲突
import java.io.IOException; import java.util.Arrays; import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class T12_ForkJoinPool { static int[] nums = new int[1000000]; static final int MAX_NUM = 50000; static Random r = new Random(); static { for(int i=0; i<nums.length; i++) { nums[i] = r.nextInt(100); } System.out.println(Arrays.stream(nums).sum()); //stream api } /* static class AddTask extends RecursiveAction { int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected void compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; System.out.println("from:" + start + " to:" + end + " = " + sum); } else { int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); } } } */ static class AddTask extends RecursiveTask<Long> { private static final long serialVersionUID = 1L; int start, end; AddTask(int s, int e) { start = s; end = e; } @Override protected Long compute() { if(end-start <= MAX_NUM) { long sum = 0L; for(int i=start; i<end; i++) sum += nums[i]; return sum; } int middle = start + (end-start)/2; AddTask subTask1 = new AddTask(start, middle); AddTask subTask2 = new AddTask(middle, end); subTask1.fork(); subTask2.fork(); return subTask1.join() + subTask2.join(); } } public static void main(String[] args) throws IOException { ForkJoinPool fjp = new ForkJoinPool(); AddTask task = new AddTask(0, nums.length); fjp.execute(task); long result = task.join(); System.out.println(result); //System.in.read(); } }其实所有线程池的底层都是使用ThreadPoolExecutor作为支撑的,可以自己自定义线程池,指定不同的特定策略(最小/最大线程数、使用什么任务队列和执行策略等)
扩展-ParallelStream
默认使用多线程并行计算
具体自查
import java.util.ArrayList; import java.util.List; import java.util.Random; public class T14_ParallelStreamAPI { public static void main(String[] args) { List<Integer> nums = new ArrayList<>(); Random r = new Random(); for(int i=0; i<10000; i++) nums.add(1000000 + r.nextInt(1000000)); //System.out.println(nums); long start = System.currentTimeMillis(); nums.forEach(v->isPrime(v)); long end = System.currentTimeMillis(); System.out.println(end - start); //使用parallel stream api start = System.currentTimeMillis(); nums.parallelStream().forEach(T14_ParallelStreamAPI::isPrime); end = System.currentTimeMillis(); System.out.println(end - start); } static boolean isPrime(int num) { for(int i=2; i<=num/2; i++) { if(num % i == 0) return false; } return true; } }上述程序将不如我们所愿当size=5时break,而是不断添加到10次仍不打印"t2 结束",且程序在打印10次"add"后仍然无法退出
原因是size对线程t2不可见(一直为0),可以在lists变量声明volatile关键字解决
解法一
在lists变量声明volatile关键字
但这种解法有几个问题:
给lists添加volatile之后,t2能够接到通知,但是t2线程的死循环会浪费cpu即使lists对所有线程可见,但并不能保证break操作是size=5时执行,可能此时size已经为6了,即t2执行时机不准确使用wait和notify方法解决
锁对象调用wait和notify方法的作用
wait方法:会让当前线程进入等待,直到另一个线程调用同一个对象的notify()或notifyAll()方法 调用wait方法时将放弃锁/控制权notify方法:唤醒因调用这个对象wait()方法而阻塞的线程 调用notify方法时将不会放弃锁/控制权当执行notify方法时,会唤醒一个处于等待该对象锁的线程,然后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁故应该尽量在线程调用notify/notifyAll()后,立即退出临界区,即notify方法后面避免出现更多耗时的代码注意:
运用这种方法,必须要保证线程t2先执行,也就是首先让t2监听才可以线程t1在调用notify方法“叫醒”线程t2后需要再调用wait方法放弃锁将控制权交给线程t2线程t2在执行完毕之前也应该调用notify方法“叫醒”并最终执行完毕将控制权交回线程t1继续执行 import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MyContainer3 { //添加volatile,使t2能够得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer3 c = new MyContainer3(); final Object lock = new Object(); new Thread(() -> { synchronized(lock) { System.out.println("t2启动"); if(c.size() != 5) { try { lock.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 结束"); } }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); synchronized(lock) { for(int i=0; i<10; i++) { c.add(new Object()); System.out.println("add " + i); if(c.size() == 5) { lock.notify(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }, "t1").start(); } }使用Latch(门闩)替代wait和notify方法来进行通知,好处是通信方式简单,同时也可以指定等待时间
CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能
比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了
其中使用await和countdown方法替代wait和notify方法
CountDownLatch(int count) //构造一个用给定计数初始化的 CountDownLatch。void await() // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断boolean await(long timeout, TimeUnit unit) // 使当前线程在锁存器倒计数至零之前一直等待,除非线程被中断或超出了指定的等待时间void countDown() // 递减锁存器的计数,如果计数到达零,则释放所有等待的线程long getCount() // 返回当前计数String toString() // 返回标识此锁存器及其状态的字符串CountDownLatch不涉及锁定,当count的值为零时当前线程继续运行
简言之,CountDownLatch不是锁,而是一个对所有线程可见的、有令线程等待的功能的计数器(就像是一位严格的母亲,要求儿子必须等5位大人全部动筷才能动筷,其中每位大人第一次动筷母亲心里的计数器就减一)
这种方式不需要加锁,性能比上面的解法要好,但个人疑问在是否会出现解法一中的t2执行时机不准确的弊端(经验证,在t2结束代码前使t2等待5秒时将出现这种弊端)
当不涉及同步,只涉及线程通信的时候,用synchronized + wait/notify(加锁)就显得太重了,应该考虑使用CountDownLatch/CyclicBarrier/Semaphore代替
可以查看https://www.cnblogs.com/dolphin0520/p/3920397.html
import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class MyContainer5 { // 添加volatile,使t2能够得到通知 volatile List lists = new ArrayList(); public void add(Object o) { lists.add(o); } public int size() { return lists.size(); } public static void main(String[] args) { MyContainer5 c = new MyContainer5(); CountDownLatch latch = new CountDownLatch(1); new Thread(() -> { System.out.println("t2启动"); if (c.size() != 5) { try { latch.await(); //也可以指定等待时间 //latch.await(5000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("t2 结束"); }, "t2").start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } new Thread(() -> { System.out.println("t1启动"); for (int i = 0; i < 10; i++) { c.add(new Object()); System.out.println("add " + i); if (c.size() == 5) { // 打开门闩,让t2得以执行 latch.countDown(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }, "t1").start(); } }解法一
其中使用while而不是if来执行wait方法的原因是:当使用if判断时只会在阻塞前判断一次,阻塞结束直接放行不作二次判断,但此时若实际条件被其他线程改变成应该再次阻塞,则该线程放行执行会出现错误(如容器本来已满,if判断为容器已满,阻塞put方法,若两个生产者线程均进入到put方法阻塞,当容器变为未满状态时唤醒两个阻塞线程直接放行,某一个生产者线程操作使容器已满,则put方法实际应该被阻塞,但if语句不再判断,后来执行的生产者线程继续生产,从而使容器溢出发生错误)
注意:同步代码块/synchronized块即使线程在代码块内被wait,唤醒后依然需要获取锁后才能继续执行,否则继续阻塞等待锁
而while循环判断这个条件,可以解决这个问题
Effective Java一书中说明了wait方法绝大部分都是配合while来使用的
另外,使用notifyAll方法而不是notify方法的原因是notify方法只能唤醒一个,可能唤醒的是同类线程(生产者唤醒生产者使得while判断后两个生产者均wait())使整个程序出现假死
Effective Java一书中也说明了应该永远使用notifyAll方法,不使用notify方法
/** * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法, * 能够支持2个生产者线程以及10个消费者线程的阻塞调用 * * 使用wait和notify/notifyAll来实现 * * @author mashibing */ import java.util.LinkedList; import java.util.concurrent.TimeUnit; public class MyContainer1<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10个元素 private int count = 0; public synchronized void put(T t) { while(lists.size() == MAX) { //想想为什么用while而不是用if? try { this.wait(); // effective java 放弃锁,使得两个生产者线程均可进入同步代码块执行到这一行,先获取锁的先往下执行,未获取锁的暂时等待 } catch (InterruptedException e) { e.printStackTrace(); } } lists.add(t); ++count; this.notifyAll(); //通知消费者线程进行消费 } public synchronized T get() { T t = null; while(lists.size() == 0) { try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } t = lists.removeFirst(); count --; this.notifyAll(); //通知生产者进行生产 return t; } public static void main(String[] args) { MyContainer1<String> c = new MyContainer1<>(); //启动消费者线程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //启动生产者线程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }解法二
使用ReentrantLock作为锁且配合Condition对象使用可以精确唤醒/使等待具体的生产者/消费者线程
其中Condition是依靠“谁在这个方法执行到消费者.await方法来判断谁是消费者”的,并非直接指定哪个线程是生产者/消费者(如get方法内某线程执行到了consumer.await()则这个线程就被认为是消费者了)
线程的生产者/消费者之分是由线程内部执行什么方法来定义的,并非线程之间有所不一样,所有线程都是一样的,只是执行的方法不一样而区分为生产者和消费者
/** * 面试题:写一个固定容量同步容器,拥有put和get方法,以及getCount方法, * 能够支持2个生产者线程以及10个消费者线程的阻塞调用 * * 使用wait和notify/notifyAll来实现 * * 使用Lock和Condition来实现 * 对比两种方式,Condition的方式可以更加精确的指定哪些线程被唤醒 * * @author mashibing */ import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class MyContainer2<T> { final private LinkedList<T> lists = new LinkedList<>(); final private int MAX = 10; //最多10个元素 private int count = 0; private Lock lock = new ReentrantLock(); private Condition producer = lock.newCondition(); private Condition consumer = lock.newCondition(); public void put(T t) { try { lock.lock(); while(lists.size() == MAX) { //想想为什么用while而不是用if? producer.await(); } lists.add(t); ++count; consumer.signalAll(); //通知消费者线程进行消费 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public T get() { T t = null; try { lock.lock(); while(lists.size() == 0) { consumer.await(); } t = lists.removeFirst(); count --; producer.signalAll(); //通知生产者进行生产 } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return t; } public static void main(String[] args) { MyContainer2<String> c = new MyContainer2<>(); //启动消费者线程 for(int i=0; i<10; i++) { new Thread(()->{ for(int j=0; j<5; j++) System.out.println(c.get()); }, "c" + i).start(); } try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } //启动生产者线程 for(int i=0; i<2; i++) { new Thread(()->{ for(int j=0; j<25; j++) c.put(Thread.currentThread().getName() + " " + j); }, "p" + i).start(); } } }