Java中的锁(二)

it2025-04-22  19

文章目录

Java中的锁(二)实现自定义锁锁的语义自定义同步组件思路AQS的结构是一个FIFO的双向队列。为什么是双向队列?AQS支持重写的方法实现Lock接口AQS提供的可使用的模板方法实现代码 线程间的通知机制场景实现方式一存在的问题 实现方式二注意事项

Java中的锁(二)实现自定义锁

锁的语义

当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中当线程获取锁时,JMM会把该线程对应的本地内存置为无效。从而使得被监视器保护的 临界区代码必须从主内存中读取共享变量。

线程A释放一个锁,实质上是线程A向接下来将要获取这个锁的某个线程发出了(线程A 对共享变量所做修改的)消息。 ·线程B获取一个锁,实质上是线程B接收了之前某个线程发出的(在释放这个锁之前对共 享变量所做修改的)消息。 ·线程A释放锁,随后线程B获取这个锁,这个过程实质上是线程A通过主内存向线程B发 送消息。

自定义同步组件

思路

自定义同步组件 = 自定义同步器 + 实现Lock接口

核心操作获取锁、释放锁是通过AbstractQueuedSynchronizer-AQS-队列同步器来实现的。只需要继承AQS并根据实际场景重写某些方法,即可实现自定义同步器。如AQS的注释描述

然后再通过实现Lock接口,在实现的接口里,调用AQS的某些方法。即达到自定义同步组件的实现

AQS的结构是一个FIFO的双向队列。为什么是双向队列?

当独占锁被多个线程竞争时,未抢到锁的线程就会在上图的队列里等待,自旋。当某个线程释放锁的时候,其他线程会开始竞争锁。

假设只是一个单向队列,即head->next->tail,那么这时怎么判断获取到锁的线程的前驱是不是头结点?因为原则是Fisrt In First Out,这样就判断不了,也就保证不了FIFO原则。直接把锁按顺序给下一个线程不行吗?很可惜,确定不了最终是哪个线程抢到锁。如果改为双向队列,即head<->next/prev<->tail。那么当某个线程释放锁的时候,其他线程竞争锁之前,判断前驱是不是头结点,如果不是,那么也就不必去竞争锁了,接着自旋。如果是,才去竞争锁。而且也保证了FIFO这一原则,因为,多个线程入队列的顺序,我们肯定可以控制,即不断的在当前节点后面(后面是tail)拼接新的节点就可以了,这一步是要保证原子性的-用compareAndSetTail。这样构造出来的队列就是按照我们指定的顺序组成的。出队列的顺序使用prev来保证。

AQS支持重写的方法

protected boolean tryAcquire(int arg)-独占式(同一时刻只有一个线程可以操作)获取同步状态,重写该方法需要查询当前的state值是否符合要求,然后再进行CAS设置状态protected boolean tryRelease(int arg)-独占式释放同步状态protected int tryAcquireShared(int arg) -共享式获取同步状态,当返回结果>=0,则代表获取成功protected boolean tryReleaseShared(int arg)-共享式释放同步状态protected boolean isHeldExclusively()-基本用来判断当前线程和拿到锁的线程是不是同一个

AQS通过一个int变量state来表示同步状态,在上面几个方法中,设置状态、获取状态、修改状态都需要原子操作。直接使用AQS提供的setState()、getState()、compareAndSetState()方法即可

实现Lock接口

实现lock接口的方法,在具体方法里面调用AQS提供的模板方法,这些模板方法内部会调用上面那5个重写的方法。从而实现自定义组件的获取锁、释放锁的操作

AQS提供的可使用的模板方法

void acquire(int arg)会调用tryAcquire方法boolean tryAcquireNanos(int arg, long nanos)加入了超时限制void acquireShared(int arg)会调用tryAcquireShared方法boolean tryAcquireSharedNanos(int arg, long nanos)加入了超时限制boolean release(int arg)会调用tryRelease方法boolean releaseShared(int arg)会调用tryReleaseShared方法Collection getQueuedThreads()获取在等待队列上的线程集合

实现代码

import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; /** * 同一时刻内只允许两个线程同时访问,超过两个将被阻塞。 * * 实现自定义的锁,要实现Lock接口 * * 使用者需要继承同步器并重写指定的 * 方法,随后将同步器Sync组合在自定义同步组件TwinsLock的实现中, * 并调用同步器提供的模板方法,而这些 * 模板方法将会调用使用者重写的方法。 * @author jacksparrow414 * @date 2020/10/19 */ public class TwinsLock implements Lock { /** * 所有自定义的同步器都要继承AQS. * * 根据自定义同步器要实现的功能,选择性的重写以下5个方法 * * tryAcquire-独占式,只有一个线程可以获得锁 * * tryAcquireShared-共享式,多个线程可以获得锁 * * 以上两个方法:返回值大于、等于0,代表锁获得成功;反之则代表获取锁失败 * * tryRelease-独占式,线程释放锁 * * tryReleaseShared-共享式,共享式释放同步状态 * * 以上两个方法:返回true 代表锁释放成功;反之则代表锁释放失败 * * isHeldExclusively 判断当前线程和同步器中的线程是否一致 */ private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { if (count <= 0) { throw new IllegalArgumentException("count must large then zero"); } // 修改、设置、获取同步状态直接使用AQS提供的方法即可 setState(count); } /** * 获取锁. * @param reduceCount * @return */ @Override protected int tryAcquireShared(int reduceCount) { for (;;) { int current = getState(); int newCount = current - reduceCount; if (newCount < 0 || compareAndSetState(current, newCount)) { return newCount; } } } /** * 释放锁. * @param returnCount * @return */ @Override protected boolean tryReleaseShared(int returnCount) { for (;;) { int current = getState(); int newCount = current + returnCount; // 使用AQS提供的方法CAS设置值,可以保证原子性 if (compareAndSetState(current, newCount)) { return true; } } } } private final Sync sync = new Sync(2); /** * 实现Lock的lock、unlock接口. * * 在这些接口里调用AQS的模板方法acquire、acquireShared、release、releaseShared * * tryAcquireNanos、tryAcquireSharedNanos 这两个是有超时时间的 * * 这里的方法会最终调用上面重写的方法 */ @Override public void lock() { sync.acquireShared(1); } @Override public void unlock() { sync.releaseShared(1); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { return false; } @Override public boolean tryLock(final long time, final TimeUnit unit) throws InterruptedException { return false; } @Override public Condition newCondition() { return null; } }

线程间的通知机制

场景

当两个线程之间需要通信,这里的通信可以理解为线程之间互相协调完成一项工作。如某个类的对象A,在某些情况会被多线程访问。当线程M修改了一个状态state,那么线程N就要进行打印操作。这种情况怎么实现?

利用线程等待/通知机制。每个Java对象都具有wait() ,wait(long timeout)notify(),notifyAll()方法-为什么?因为每个类都默认继承了Object类,这些方法都是在Object里面的

当线程N获得当前对象A的锁之后,发现此时flag不满足条件,那么应该释放这把锁,让给其他线程执行。如果不释放,则锁一直被N占有,有可能flag一直不满足条件,则其他线程都拿不到对象A的这把锁。一旦N释放对象锁之后,此时线程M获取到了锁,M开始修改flag,修改完释放。如果之后N再次抢到了这把对象锁,则发现flag已经满足条件了,则开始执行后面的逻辑.

实现方式可以是

线程N一直监控state的状态,当flag满足时,才去争夺这把锁。当flag不满足,可以简单的休眠一段时间这里线程N应该等待线程M的通知,得到通知后,再去争夺这把锁

实现方式一

import java.util.concurrent.TimeUnit; import lombok.SneakyThrows; import org.junit.Test; /** * 不是用等待同步/机制,仅仅靠死循环检测,条件不满足则休眠一段时间. * * @author jacksparrow414 * @date 2020/10/20 */ public final class ThreadWaitTest { static boolean flag = true; static Object lock = new Object(); @Test @SneakyThrows public void assertThreadWait() { Thread threadN = new Thread(new ThreadN(), "threadN"); Thread threadM = new Thread(new ThreadM(), "threadM"); threadN.start(); threadM.start(); // 为了测试用例能够打印完全,特意休眠10秒 TimeUnit.SECONDS.sleep(10); } static class ThreadN implements Runnable { @Override public void run() { while (flag) { try { System.out.println(Thread.currentThread().getName() + "条件不满足,开始休眠,循环获取"); Thread.sleep(1000); } catch (InterruptedException exception) { exception.printStackTrace(); } } synchronized (lock) { System.out.println(Thread.currentThread().getName() + "acquire lock"); } } } static class ThreadM implements Runnable { @Override public void run() { while (flag) { synchronized (lock) { try { System.out.println(Thread.currentThread().getName() + "acquire lcok"); // 第二种休眠方法 TimeUnit.SECONDS.sleep(5); } catch (InterruptedException exception) { exception.printStackTrace(); } flag = false; } } synchronized (lock) { System.out.println("acquire again"); } } } }

测试结果如下:

threadN条件不满足,开始休眠,循环获取 threadM acquire lcok threadN条件不满足,开始休眠,循环获取 threadN条件不满足,开始休眠,循环获取 threadN条件不满足,开始休眠,循环获取 threadN条件不满足,开始休眠,循环获取 threadM acquire again threadN acquire lock

可以看到,一看是线程N判断flag不满足条件,则不去竞争锁,而是休眠一段时间。休眠期过了再重复检测。而一开始线程M是满足条件的,所以线程M执行,并修改了flag状态。执行完毕释放对象锁。由于此时N还处于线程休眠期内,则锁又被M获取了一遍,M再次执行完毕,释放锁,此时已经没有线程竞争锁,线程N休眠期一过,尝试获取锁,获得成功,执行代码

存在的问题

多个线程去竞争锁,应该是同时去竞争的,由于我们上面代码N加了线程休眠,所以在M第一次释放锁之后,N还处于休眠期内,则不会去竞争锁。这就是个问题了,本来应该在释放锁之后,线程N立刻和其他线程去竞争锁的,由于代码问题,导致线程N没去竞争。这样做的严重后果是,当竞争锁的线程很多,而线程N每次都会休眠1秒,假设,每次释放锁都在N的休眠期内,那么N将要等待所有竞争这把锁的线程使用完毕,才有机会获得锁。这明显是不符合多线程编程的初衷的。初衷是什么?初衷就是我们只管提交线程任务,谁抢到谁就执行,但绝不是这种最后才执行由于线程休眠不消耗CPU资源,而上面的问题有时休眠期引起的,那么尝试将休眠期降低,比如降低到50毫秒、1毫秒。但是如果线程执行的任务是耗时的,那么这么频繁的休眠、检测,势必会消耗大量的CPU资源。事实是,我们永远无法确定一个线程精确的休眠期应该是多少。因为每个线程执行任务的时间是不确定的

实现方式二

为了解决实现方式一中的问题,使用线程的等待/同步机制即可。首先要获得对象锁,此时发现当前线程执行条件不满足,那么当前线程应该等待,并释放锁。接下啦,领完一个已经获得锁的线程,在即将执行完毕同步体时,使用notifyAll或者notify方法,通知等待在这把锁上的所有线程或者某一个线程。接收到通知的线程去竞争锁,谁获得了锁,谁结束wait状态

import java.util.concurrent.TimeUnit; import lombok.SneakyThrows; import org.junit.Test; /** * @author jacksparrow414 * @date 2020/10/20 */ public final class ThreadWaitNotifyTest { static boolean flag = true; static Object lock = new Object(); @Test @SneakyThrows public void assertThreadWaitNotify() { Thread threadN = new Thread(new threadN(), "threadN"); Thread threadM = new Thread(new threadM(), "threadM"); threadN.start(); TimeUnit.SECONDS.sleep(1); threadM.start(); TimeUnit.SECONDS.sleep(10); } static class threadN implements Runnable { @Override public void run() { synchronized (lock) { while (flag) { System.out.println(Thread.currentThread().getName() + " flag is true wait"); try { lock.wait(); } catch (InterruptedException exception) { exception.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + "falg is false running"); } } } static class threadM implements Runnable { @Override public void run() { synchronized (lock) { System.out.println(Thread.currentThread().getName() + " acquire lock"); flag = false; // 唤醒所有等待的线程 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException exception) { exception.printStackTrace(); } } lock.notifyAll(); synchronized (lock) { System.out.println(Thread.currentThread().getName() + " acquire again"); } } } }

注意事项

使用wait、notifyAll、notify等方法时,都要在同步体里执行,也就是保证在当前线程已经获得锁的前提下执行。如果此时此时线程没有获得对象的锁,就执行,则会报错

Exception in thread “threadM” java.lang.IllegalMonitorStateException at java.lang.Object.notifyAll(Native Method) at com.example.mybatis.demomybatis.thread.ThreadWaitNotifyTest$threadM.run(ThreadWaitNotifyTest.java:60) at java.lang.Thread.run(Thread.java:748)

当一个获得锁的线程执行notify、notifyAll之后,等待在这把锁上的线程不会从wait方法返回,返回的前提一定是获得了锁之后才会结束wait状态当线程N收到通知、竞争再次获取到锁之后,依旧要检查while条件,满足才会执行条件。其实很好理解,拿到锁,肯定要重新执行这个线程,而不是从该线程上一次地方执行 示例代码位置
最新回复(0)