JUC学习

it2023-11-01  64

JUC是什么

java.util.concurrent在并发编程中使用的工具类

java.util.concurrent:java并发包java.util.atomic:java并发原子包java.util.locks:java并发锁包

进程/线程回顾

1、进程/线程是什么

进程:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本分配单元,也是基本的执行单元。

线程:通常在一个进程中可以包含若干个线程,当然一个进程中至少有一个线程,不然没有存在的意义。线程可以利用进程所拥有的资源,在引入线程的操作系统中,通常都是把进程作为分配资源的基本单位,而把线程作为独立运行和独立调度的基本单位,由于线程比进程更小,基本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更高效的提高系统多个程序间并发执行的程度。

2、线程状态

NEW:新建状态

RUNNABLE:可运行状态

BLOCKED:阻塞状态

WAITING:等待,没有超时时间

TIMED_WAITING:超时等待,超时后自动返回

TERMINATION:线程结束

3、wait和sleep区别

wait会释放锁,sleep不会释放锁

Lock接口

Lock是什么

lock是java.util.concurrent.locks下的一个接口,lock锁实现提供了比使用同步方法和语句可以获得的更广泛的锁操作,它们允许更灵活的结构,可能具有非常不同的属性,并且可能支持多个关联的条件对对象。

Lock接口的实现ReentrantLock可重入锁

使用方法

class X { private final ReentrantLock lock = new ReentrantLock(); // ... public void m() { lock.lock(); // block until condition holds try { // ... method body } finally { lock.unlock() } } }

synchronized和Lock的区别

1、首先synchronized是java内置关键字,在jvm层面,Lock是个Java类;

2、synchronized无法判断是否获取锁的状态,Lock可以判断是否获取到锁;

3、synchronized会自动释放锁(一个线程执行完同步方法或者发生异常就会释放锁),Lock需在finally中手工释放锁(unlock方法释放锁),否则容易造成线程死锁;

4、用synchronized关键字的两个线程1和线程2,如果当前线程1获得锁,线程2等,如果线程1阻塞,线程2则会一直等待下去;而Lock锁就不会一直等待下去,如果尝试获取不到锁,线程可以不用一直等待就结束了;

5、synchronized的锁可重入、不可中断、非公平1,而Lock锁可重入、可判断、可公平

6、Lock锁适合大量同步的代码同步问题,synchronized锁适合代码少量的同步问题。

线程间通信

1、生产者+消费者

2、通知等待唤醒机制

synchronized实现

代码:

/** * 现在两个线程可以操作初始值为0的一个变量 * 实现一个线程对该变量加1,一个线程对该变量减1 * 实现交替,来10轮,变量初始值为0 * * 1、高内聚低耦合前提下,线程操作资源类 * 2、判断/干活/通知 */ public class ThreadWaitNotifyDemo { public static void main(String[] args) { AirConditioner airConditioner = new AirConditioner(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(200); airConditioner.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(300); airConditioner.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); } } //资源类 class AirConditioner{ int number = 0; public synchronized void increment() throws InterruptedException { //判断 if(number != 0){ this.wait(); } //干活 number++; System.out.println(Thread.currentThread().getName()+"\t"+number); //通知 this.notifyAll(); } public synchronized void decrement() throws InterruptedException { //判断 if(number == 0){ this.wait(); } //干活 number--; System.out.println(Thread.currentThread().getName()+"\t"+number); //通知 this.notifyAll(); } }

换成4个线程

换成4个线程会导致错误,虚假唤醒

原因:在java多线程判断时,不能用if,程序出事出在了判断上面,突然有一添加的线程进到if了,突然中了交出控制权,没有进行验证,而是直接走下去了,加了两次,甚至多次

解决办法

把if判断换成while判断

/** * 现在两个线程可以操作初始值为0的一个变量 * 实现一个线程对该变量加1,一个线程对该变量减1 * 实现交替,来10轮,变量初始值为0 * * 1、高内聚低耦合前提下,线程操作资源类 * 2、判断/干活/通知 * 3、多线程交互中,必须要防止多线程的虚假唤醒,也即判断只用while,不能用if */ public class ThreadWaitNotifyDemo { public static void main(String[] args) { AirConditioner airConditioner = new AirConditioner(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(200); airConditioner.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(300); airConditioner.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(400); airConditioner.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); airConditioner.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } //资源类 class AirConditioner{ int number = 0; public synchronized void increment() throws InterruptedException { //判断 while(number != 0){ this.wait(); } //干活 number++; System.out.println(Thread.currentThread().getName()+"\t"+number); //通知 this.notifyAll(); } public synchronized void decrement() throws InterruptedException { //判断 while(number == 0){ this.wait(); } //干活 number--; System.out.println(Thread.currentThread().getName()+"\t"+number); //通知 this.notifyAll(); } }

Lock实现

/** * 现在两个线程可以操作初始值为0的一个变量 * 实现一个线程对该变量加1,一个线程对该变量减1 * 实现交替,来10轮,变量初始值为0 * * 1、高内聚低耦合前提下,线程操作资源类 * 2、判断/干活/通知 * 3、多线程交互中,必须要防止多线程的虚假唤醒,也即判断只用while,不能用if */ public class ThreadWaitNotifyDemo { public static void main(String[] args) { AirConditioner airConditioner = new AirConditioner(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(200); airConditioner.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(300); airConditioner.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(400); airConditioner.increment(); } catch (InterruptedException e) { e.printStackTrace(); } } },"C").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { try { Thread.sleep(500); airConditioner.decrement(); } catch (InterruptedException e) { e.printStackTrace(); } } },"D").start(); } } //资源类 class AirConditioner{ private int number = 0; private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); public void increment(){ lock.lock(); try { //判断 while(number != 0){ condition.await(); } //干活 number++; System.out.println(Thread.currentThread().getName()+"\t"+number); //通知 condition.signalAll(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public void decrement(){ lock.lock(); try { //判断 while(number == 0){ condition.await(); } //操作 number--; System.out.println(Thread.currentThread().getName()+"\t"+number); //通知 condition.signalAll(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }

线程间定制化调用通信

精确通知顺序访问

代码:

/** * 多线程之间按顺序调用,实现A->B->C * 三个线程启动,要求如下: * * AA打印5次,BB打印10次,CC打印15次 * 接着 * AA打印5次,BB打印10次,CC打印15次 * ......来10轮 * * 1、线程操作资源类 * 2、判断/干活/通知 * 3、多线程交互中,必须要防止多线程的虚假唤醒,也即判断只用while,不能用if * 4、标志位 */ public class ThreadOtherAccess { public static void main(String[] args) { ShareResource shareResource = new ShareResource(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print5(); } },"A").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print10(); } },"B").start(); new Thread(() -> { for (int i = 0; i < 10; i++) { shareResource.print15(); } },"C").start(); } } class ShareResource{ private int number = 1; private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); public void print5(){ lock.lock(); try { //判断 while(number != 1){ condition1.await(); } for (int i = 1; i <= 5; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print10(){ lock.lock(); try { while(number != 2){ condition2.await(); } for (int i = 1; i <= 10; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void print15(){ lock.lock(); try { while(number != 3){ condition3.await(); } for (int i = 1; i <= 15; i++) { System.out.println(Thread.currentThread().getName()+"\t"+i); } number = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }

NotSafeDemo

证明集合类是不安全的

例子:

public class NotSafe { public static void main(String[] args) { List<String> list = new ArrayList<>(); for (int i = 1; i <= 30; i++) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0,8)); System.out.println(list); },String.valueOf(i)).start(); } } }

线程不安全错误:

java.util.ConcurrentModificationException

ArrayList在迭代的时候如果同时对其进行修改就会抛出java.util.ConcurrentModificationException异常 并发修改异常

为什么会发生这个异常?

因为ArrayList的add方法没有添加synchronized,线程不安全

解决方案

1、使用Vector,Vector的add方法加了synchronized,线程安全

List<String> list = new Vector<>();

2、使用Collections的synchronizedList方法

List<String> list = Collections.synchronizedList(new ArrayList<>());

3、写时复制

CopyOnWrite容器即写时复制的容器。往一个容器添加元素的时候,不直接往当前容器Object[]添加,而是先将当前容器Object[]进行Copy,复制出一个新的容器Object[] newElements,然后向新的容器Object[]newElements里添加元素。添加元素后,再将原容器的引用指向新的容器setArray(newElements)。这样做的好处是可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5QfGgE51-1603203656701)(D:\学习笔记\image\image-20201019135143930.png)]

List<String> list = new CopyOnWriteArrayList<>();

Callable接口

第三种获得多线程的方式

怎么用

直接替换Runnable是否可行?

不可行,因为:thread类的构造方法根本没有Callable

利用java多态,一个类可以实现多个接口!!

public class CallableDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new MyThread()); new Thread(futureTask,"A").start(); //FutureTask的get方法可以获得返回值 System.out.println(futureTask.get()); } } class MyThread implements Callable<Integer>{ @Override public Integer call() throws Exception { System.out.println("****MyThread call"); return 1024; } }

FutureTask

FutureTask的继承关系

原理

在主线程中需要执行比较耗时的操作时,但又不想阻塞主线程时,可以把这些作业交给Future对象在后台完成,当主线程将来需要时,就可以通过Future对象获得后台作业的计算结果或者执行状态。

一般FutureTask多用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。

仅在计算完成时才能检索结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。get方法而获取结果只有在计算完成时获取,否则会一直阻塞直到任务转入完成状态,然后会返回结果或者抛出异常。

只计算一次

get方法放到最后,不放到最后的话,get方法会一直阻塞主线程,直到获取到结果。

JUC强大的辅助类

CountDownLatch:减少计数

原理

CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,这些线程会阻塞。

其它线程调用countDown方法将会将计数器减1(调用countDown方法的线程不会阻塞)。

当计数器的值变为0时,因await方法阻塞的线程会被唤醒,继续执行。

代码

/** * 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。 * * main主线程必须要等前面6个线程完成全部工作后,自己才能开干 */ public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6); for (int i = 0; i < 6; i++) { new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t离开"); countDownLatch.countDown(); },String.valueOf(i)).start(); } countDownLatch.await(); System.out.println(Thread.currentThread().getName()+"\t关门"); } }

CyclicBarrier:循环栅栏

原理

CyclicBarrier的字面意思是可循环(Cyclic)使用的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。线程进入屏障通过CyclicBarrier的await()方法。

代码

/** * 执行完一定数量的线程之后再执行该线程 */ public class CyclicBarrierDemo { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7,() -> {System.out.println("召唤神龙");}); for (int i = 1; i <= 7; i++) { int finalI = i; new Thread(() -> { System.out.println(Thread.currentThread().getName()+"\t收集到第"+ finalI +"课龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } },String.valueOf(i)).start(); } } }

Semaphore:信号量

原理

在信号量上我们定义两种操作:

acquire(获取) 当一个线程调用acquire操作时,它要么通过成功获取信号量(信号量减1),要么一直等下去,直到有线程释放信号量,或超时。release(释放)实际上会将信号量的值加1,然后唤醒等待的线程。

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

代码

public class SemaphoreDemo { public static void main(String[] args) { //模拟资源类 Semaphore semaphore = new Semaphore(3); //7个线程争抢3个资源,先抢到先用,资源全部被抢完,没抢到的线程需要等待资源的释放再去争抢 for (int i = 1; i <= 7; i++) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName()+"\t抢占到资源"); TimeUnit.SECONDS.sleep(4); System.out.println(Thread.currentThread().getName()+"\t释放资源"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } },String.valueOf(i)).start(); } } }

ReentrantReadWriteLock-读写锁

问题例子:

class MyCache{ private volatile Map<String,Object> map = new HashMap<>(); public void put(String key,Object value){ System.out.println(Thread.currentThread().getName()+"\t 正在写"+key); //暂停一会儿线程 try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace(); } map.put(key,value); System.out.println(Thread.currentThread().getName()+"\t 写完了"+key); } public Object get(String key){ Object result = null; System.out.println(Thread.currentThread().getName()+"\t 正在读"+key); try {TimeUnit.MILLISECONDS.sleep(300);} catch (InterruptedException e) {e.printStackTrace(); } result = map.get(key); System.out.println(Thread.currentThread().getName()+"\t 读完了"+result); return result; } } public class ReadWriteLockDemo { public static void main(String[] args) { MyCache myCache = new MyCache(); for (int i = 1; i <= 5; i++) { final int num = i; new Thread(()->{ myCache.put(num+"",num+""); },String.valueOf(i)).start(); } for (int i = 1; i <= 5; i++) { final int num = i; new Thread(()->{ myCache.get(num+""); },String.valueOf(i)).start(); } } }
最新回复(0)