Go 并发编程Mutex(二)--源码

it2024-11-14  5

们从 Mutex 的一个简单实现开始,看看它是怎样逐步提升性能和公平性的。在这个过程中,我们可以学习如何逐步设计一个完善的同步原语,并能对复杂度、性能、结构设计的权衡考量有新的认识。经过这样一个学习,我们不仅能通透掌握 Mutex,更好地使用这个工具,同时,对我们自己设计并发数据接口也非常有帮助。那具体怎么来讲呢?我把 Mutex 的架构演进分成了四个阶段,下面给你画了一张图来说明。

“初版”的 Mutex 使用一个 flag 来表示锁是否被持有,实现比较简单;后来照顾到新来的 goroutine,所以会让新的 goroutine 也尽可能地先获取到锁,这是第二个阶段,我把它叫作“给新人机会”;那么,接下来就是第三阶段“多给些机会”,照顾新来的和被唤醒的 goroutine;但是这样会带来饥饿问题,所以目前又加入了饥饿的解决方案,也就是第四阶段“解决饥饿”。

初版的互斥锁

你可能会想到,可以通过一个 flag 变量,标记当前的锁是否被某个 goroutine 持有。如果这个 flag 的值是 1,就代表锁已经被持有,那么,其它竞争的 goroutine 只能等待;如果这个 flag 的值是 0,就可以通过 CAS(compare-and-swap,或者 compare-and-set)将这个 flag 设置为 1,标识锁被当前的这个 goroutine 持有了。实际上,Russ Cox 在 2008 年提交的第一版 Mutex 就是这样实现的。

// CAS操作,当时还没有抽象出atomic包 func cas(val *int32, old, new int32) bool func semacquire(*int32) func semrelease(*int32) // 互斥锁的结构,包含两个字段 type Mutex struct { key int32 // 锁是否被持有的标识 sema int32 // 信号量专用,用以阻塞/唤醒goroutine } // 保证成功在val上增加delta的值 func xadd(val *int32, delta int32) (new int32) { for { v := *val if cas(val, v, v+delta) { return v + delta } } panic("unreached") } // 请求锁 func (m *Mutex) Lock() { if xadd(&m.key, 1) == 1 { //标识加1,如果等于1,成功获取到锁 return } semacquire(&m.sema) // 否则阻塞等待 } func (m *Mutex) Unlock() { if xadd(&m.key, -1) == 0 { // 将标识减去1,如果等于0,则没有其它等待者 return } semrelease(&m.sema) // 唤醒其它阻塞的goroutine }

这里呢,我先简单补充介绍下刚刚提到的 CAS。CAS 指令将给定的值和一个内存地址中的值进行比较,如果它们是同一个值,就使用新值替换内存地址中的值,这个操作是原子性的。那啥是原子性呢?如果你还不太理解这个概念,那么在这里只需要明确一点就行了,那就是原子性保证这个指令总是基于最新的值进行计算,如果同时有其它线程已经修改了这个值,那么,CAS 会返回失败。CAS 是实现互斥锁和同步原语的基础,我们很有必要掌握它。好了,我们继续来分析下刚才的这段代码。

虽然当时的 Go 语法和现在的稍微有些不同,而且标准库的布局、实现和现在的也有很大的差异,但是,这些差异不会影响我们对代码的理解,因为最核心的结构体(struct)和函数、方法的定义几乎是一样的。 Mutex 结构体包含两个字段:

字段 key:是一个 flag,用来标识这个排外锁是否被某个 goroutine 所持有,如果 key 大于等于1,说明这个排外锁已经被持有;字段 sema:是个信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒。 调用 Lock 请求锁的时候,通过 xadd 方法进行 CAS 操作(第 24 行),xadd 方法通过循环执行 CAS 操作直到成功,保证对 key 加 1 的操作成功完成。如果比较幸运,锁没有被别的 goroutine 持有,那么,Lock 方法成功地将 key 设置为 1,这个 goroutine 就持有了这个锁;如果锁已经被别的 goroutine 持有了,那么,当前的 goroutine 会把 key 加 1,而且还会调用 semacquire 方法(第 27 行),使用信号量将自己休眠,等锁释放的时候,信号量会将它唤醒。

有锁的 goroutine 调用 Unlock 释放锁时,它会将 key 减 1(第 31 行)。如果当前没有其它等待这个锁的 goroutine,这个方法就返回了。但是,如果还有等待此锁的其它 goroutine,那么,它会调用 semrelease 方法(第 34 行),利用信号量唤醒等待锁的其它 goroutine 中的一个。

所以,到这里,我们就知道了,初版的 Mutex 利用 CAS 原子操作,对 key 这个标志量进行设置。key 不仅仅标识了锁是否被 goroutine 所持有,还记录了当前持有和等待获取锁的 goroutine 的数量。Mutex 的整体设计非常简洁,学习起来一点也没有障碍。但是,注意,我要划重点了。

Unlock 方法可以被任意的 goroutine 调用释放锁,即使是没持有这个互斥锁的 goroutine,也可以进行这个操作。这是因为,Mutex 本身并没有包含持有这把锁的 goroutine 的信息,所以,Unlock 也不会对此进行检查。Mutex 的这个设计一直保持至今。

这就带来了一个有趣而危险的功能。为什么这么说呢?你看,其它 goroutine 可以强制释放锁,这是一个非常危险的操作,因为在临界区的 goroutine 可能不知道锁已经被释放了,还会继续执行临界区的业务操作,这可能会带来意想不到的结果,因为这个 goroutine 还以为自己持有锁呢,有可能导致 data race 问题。所以,我们在使用 Mutex 的时候,必须要保证 goroutine 尽可能不去释放自己未持有的锁,一定要遵循“谁申请,谁释放”的原则。在真实的实践中,我们使用互斥锁的时候,很少在一个方法中单独申请锁,而在另外一个方法中单独释放锁,一般都会在同一个方法中获取锁和释放锁。如果你接触过其它语言(比如 Java 语言)的互斥锁的实现,就会发现这一点和其它语言的互斥锁不同,所以,如果是从其它语言转到 Go 语言开发的同学,一定要注意。以前,我们经常会基于性能的考虑,及时释放掉锁,所以在一些 if-else 分支中加上释放锁的代码,代码看起来很臃肿。而且,在重构的时候,也很容易因为误删或者是漏掉而出现死锁的现象。

type Foo struct { mu sync.Mutex count int } func (f *Foo) Bar() { f.mu.Lock() if f.count < 1000 { f.count += 3 f.mu.Unlock() // 此处释放锁 return } f.count++ f.mu.Unlock() // 此处释放锁 return }

从 1.14 版本起,Go 对 defer 做了优化,采用更有效的内联方式,取代之前的生成 defer 对象到 defer chain 中,defer 对耗时的影响微乎其微了,所以基本上修改成下面简洁的写法也没问题:

func (f *Foo) Bar() { f.mu.Lock() defer f.mu.Unlock() if f.count < 1000 { f.count += 3 return } f.count++ return }

从 1.14 版本起,Go 对 defer 做了优化,采用更有效的内联方式,取代之前的生成 defer 对象到 defer chain 中,defer 对耗时的影响微乎其微了,所以基本上修改成下面简洁的写法也没问题:

func (f *Foo) Bar() { f.mu.Lock() defer f.mu.Unlock() if f.count < 1000 { f.count += 3 return } f.count++ return }

这样做的好处就是 Lock/Unlock 总是成对紧凑出现,不会遗漏或者多调用,代码更少。但是,如果临界区只是方法中的一部分,为了尽快释放锁,还是应该第一时间调用 Unlock,而不是一直等到方法返回时才释放。

初版的 Mutex 实现之后,Go 开发组又对 Mutex 做了一些微调,比如把字段类型变成了 uint32 类型;调用 Unlock 方法会做检查;使用 atomic 包的同步原语执行原子操作等等,这些小的改动,都不是核心功能,你简单知道就行了,我就不详细介绍了。

但是,初版的 Mutex 实现有一个问题:请求锁的 goroutine 会排队等待获取互斥锁。虽然这貌似很公平,但是从性能上来看,却不是最优的。因为如果我们能够把锁交给正在占用 CPU 时间片的 goroutine 的话,那就不需要做上下文的切换,在高并发的情况下,可能会有更好的性能。

接下来,我们就继续探索 Go 开发者是怎么解决这个问题的。

给新人机会

Go 开发者在 2011 年 6 月 30 日的 commit 中对 Mutex 做了一次大的调整,调整后的 Mutex 实现如下:

type Mutex struct { state int32 sema uint32 } const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexWaiterShift = iota )

虽然 Mutex 结构体还是包含两个字段,但是第一个字段已经改成了 state,它的含义也不一样了。 state 是一个复合型的字段,一个字段包含多个意义,这样可以通过尽可能少的内存来实现互斥锁。这个字段的第一位(最小的一位)来表示这个锁是否被持有,第二位代表是否有唤醒的 goroutine,剩余的位数代表的是等待此锁的 goroutine 数。所以,state 这一个字段被分成了三部分,代表三个数据。

请求锁的方法 Lock 也变得复杂了。复杂之处不仅仅在于对字段 state 的操作难以理解,而且代码逻辑也变得相当复杂。

func (m *Mutex) Lock() { // Fast path: 幸运case,能够直接获取到锁 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } awoke := false for { old := m.state new := old | mutexLocked // 新状态加锁 if old&mutexLocked != 0 { new = old + 1<<mutexWaiterShift //等待者数量加一 } if awoke { // goroutine是被唤醒的, // 新状态清除唤醒标志 new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) {//设置新状态 if old&mutexLocked == 0 { // 锁原状态未加锁 break } runtime.Semacquire(&m.sema) // 请求信号量 awoke = true } } }

首先是通过 CAS 检测 state 字段中的标志(第 3 行),如果没有 goroutine 持有锁,也没有等待持有锁的 gorutine,那么,当前的 goroutine 就很幸运,可以直接获得锁,这也是注释中的 Fast path 的意思。

如果不够幸运,state 不是零值,那么就通过一个循环进行检查。接下来的第 7 行到第 26 行这段代码虽然只有几行,但是理解起来却要费一番功夫,因为涉及到对 state 不同标志位的操作。这里的位操作以及操作后的结果和数值比较,并没有明确的解释,有时候你需要根据后续的处理进行推断。所以说,如果你充分理解了这段代码,那么对最新版的 Mutex 也会比较容易掌握了,因为你已经清楚了这些位操作的含义。

我们先前知道,如果想要获取锁的 goroutine 没有机会获取到锁,就会进行休眠,但是在锁释放唤醒之后,它并不能像先前一样直接获取到锁,还是要和正在请求锁的 goroutine 进行竞争。这会给后来请求锁的 goroutine 一个机会,也让 CPU 中正在执行的 goroutine 有更多的机会获取到锁,在一定程度上提高了程序的性能。

for 循环是不断尝试获取锁,如果获取不到,就通过 runtime.Semacquire(&m.sema) 休眠,休眠醒来之后 awoke 置为 true,尝试争抢锁。代码中的第 10 行将当前的 flag 设置为加锁状态,如果能成功地通过 CAS 把这个新值赋予 state(第 19 行和第 20 行),就代表抢夺锁的操作成功了。

不过,需要注意的是,如果成功地设置了 state 的值,但是之前的 state 是有锁的状态,那么,state 只是清除 mutexWoken 标志或者增加一个 waiter 而已。

请求锁的 goroutine 有两类,一类是新来请求锁的 goroutine,另一类是被唤醒的等待请求锁的 goroutine。锁的状态也有两种:加锁和未加锁。我用一张表格,来说明一下 goroutine 不同来源不同状态下的处理逻辑。

刚刚说的都是获取锁,接下来,我们再来看看释放锁。释放锁的 Unlock 方法也有些复杂,我们来看一下。

func (m *Mutex) Unlock() { // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) //去掉锁标志 if (new+mutexLocked)&mutexLocked == 0 { //本来就没有加锁 panic("sync: unlock of unlocked mutex") } old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { // 没有等待者,或者有唤醒的waiter,或者锁原来已加锁 return } new = (old - 1<<mutexWaiterShift) | mutexWoken // 新状态,准备唤醒goroutine,并设置唤醒标志 if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime.Semrelease(&m.sema) return } old = m.state } }
最新回复(0)