目录
1、为什么会发生死锁
2、怎样避免死锁
1)、破坏占有且等待
2)、破坏不可抢占
3)、破坏循环等待
3、线上项目应用场景
应该说死锁也算是面试的高频了,那么面试官需要知道什么,希望你回答什么。我觉得从下面几个点切入,怎么会发生死锁,怎样避免,在项目中的实际应用场景。欢迎指正、讨论。
当一组相互竞争资源的线程因相互等待,导致永久阻塞的现象,或者其他的解释非常多。
在理解完MESA管程模型之后,知道比如synchronized关键字底层的模型。需要先获取锁资源,加锁,处理任务(中间状态对外不可见),最后再解锁。那么对性能损耗非常的高,那么使用细粒度锁,也是提高并发程序的一种重要手段。那么多次使用synchronized对必须的地方加互斥锁,则会很容易照成死锁。
比如并发情况下需要进行转账的情况,当A给B转账:则需要先锁住A和B的账号,再给A账号减去金额,B账号加上相应金额。那么在获取到A账户的锁后, 比如其他线程(或进程)在给A和B两个账户转账时可能先获取到了B账户的锁,继续获取A账户的锁。那么相互拿到了别人需要的锁,而自己却拿不到自己想要的锁,则程序无法执行下去,就照成了死锁。死锁不仅发送在并发程序中,数据库等都有可能发生死锁。
代码如下:
class Account { private int balance; // 转账 void transfer(Account target, int amt){ // 锁定转出账户 synchronized(this) { // 锁定转入账户 synchronized(target) { if (this.balance > amt) { this.balance -= amt; target.balance += amt; } } } } }
好在有牛人Coffman总结了发送死锁的需要同时满足四个必要的条件:
1)、互斥,共享资源X和Y只能被一个线程占用
2)、占有且等待,线程T1已经取得共享资源锁X,在等待共享资源Y的时候,不释放共享资源X
3)、不可抢占,其他线程不能强行抢占线程T1抢占的资源
4)、循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有的资源,就是循环等待。
那么只需要破掉上面的四个条件之一,就没有死锁了。第一个条件基本上是不可破的,那么就只能破掉下面三个条件之一。
那就是一次性拿到所有的锁,再往下走,而不是像上面一下获取到就往里进,那么synchronized关键字则需要三方协调。如
class Allocator { private List<Object> als = new ArrayList<>(); // 一次性申请所有资源 synchronized boolean apply( Object from, Object to){ if(als.contains(from) || als.contains(to)){ return false; } else { als.add(from); als.add(to); } return true; } // 归还资源 synchronized void free( Object from, Object to){ als.remove(from); als.remove(to); } }那就是先获取锁,如果自己发现进一步申请不到其他的资源(锁),那么就释放自己已经获得的锁,那么也就不会继续死锁下去了。这个条件对应c封装的synchronized关键字同样是不行的,但是基于juc Lock写的程序还可以调用lockInterruptibly方法处理。
个人比较推崇使用该方式破掉或防止死锁,也在项目上有使用。破掉该方式有两个思路,一个是获取锁有超时机制,当然对于synchronized也是不可行的;第二种思路就是对锁进行排序,然后按照一定的顺序获取锁。
比如我们使用用户的id作为锁,那么在并发的情况下A需要获取 1,2,3的锁,B需要获取2,3,4的锁。那么重合的地方就是2,3 ,不管哪个线程都需要按照从小到大(或者从大到小)顺序获取,那么哪个线程先获取到了2才有机会获取3. 瞬间就不乱了。
1)、多层synchronized场景 2)、批量获取分布式锁
前段时间项目上使用redisson的分布式锁,但是需要支持批量获取锁的场景。在获取分布式锁时就很有可能发生死锁,我就封装了获取锁和执行的过程,代码大致如下:
/** * 在排序的分布式锁中,执行操作 * @param method 需要回调执行的方法体 * @param cacheConfig 可以为null, 则自己带前置传入 * @param lockKey 缓存key列表 * @param <V> 回调返回值 * @return 回调返回信息 */ public static <V> V invokeInLock(CallBack<V> method, @Nullable CacheConfig cacheConfig, String... lockKey) { Parameter parameter = PREFIX_PARAMETER.get(); if (lockKey == null || (cacheConfig == null && parameter == null)) { throw new RuntimeException("请求参数不能为空!"); } boolean hasCacheConfig = cacheConfig == null; final String prefix = hasCacheConfig ? parameter.getPrefix() : cacheConfig.cachePrefix; long waitTime = hasCacheConfig ? parameter.getWaitTime() : cacheConfig.waitTime; long leaseTime = hasCacheConfig ? parameter.getLeaseTime() : cacheConfig.leaseTime; // 获取批量的分布式锁 RLock[] rLocks = Arrays.stream(lockKey) // 进行排序,防止获取分布式锁时死锁 .sorted() // 组装获取锁 .map(key -> getRedissonClient().getLock(prefix + key)) .toArray(RLock[]::new); RLock lock = redissonClient.getMultiLock(rLocks); // 加锁 boolean isLock = false; try { isLock = waitTime == 0 ? lock.tryLock() : lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { log.error("获取分布式锁失败!"); } V result = null; if (isLock) { try { // 在加锁的情况下创建vso【查预售量,创建vso,修改剩余预售量】 result = method.callBack(); } finally { // 最终进行解锁 try { lock.unlock(); } catch (Exception e) { log.error("分布式锁超时【续命锁也超时】,已经自动解锁!"); } } } return result; }