Go语言的线程

[TOC]

线程与锁

现代CPU一般含有多个核,并且一个核可能支持多线程。换句话说,现代CPU可以同时执行多条指令流水线。 为了将CPU的能力发挥到极致,我们常常需要使我们的程序支持并发(concurrent)计算。并发计算是指若干计算可能在某些时间片段内同时运行的情形。 在并行计算中,多个计算在任何时间点都在同时运行。并行计算属于特殊的并发计算。

而并发编程会存在数据竞争(data race)的情况,在不同线程同时修改统一内存控制时。并发编程的一大任务就是要调度不同计算,控制它们对资源的访问时段,以使数据竞争的情况不会发生。 此任务常称为并发同步(或者数据同步)。

锁是解决数据竞争的一种方法,在go语言中提供了sync包。

1
2
3
4
5
sync.Mutex: 互斥锁
sync.RWMutex: 读写分离锁
sync.WaitGroup: 等待一组goroutine 返回
sync.Once: 保证某段代码只执行一次
sync.Cond: 让一组goroutine 在满足特定条件时被唤醒

sync.Mutex

Lock()加锁,Unlock()解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
var m sync.Mutex

func f1() {
m.Lock()
defer m.Unlock()
doSomething()
}

func f2() {
m.Lock()
doSomething()
m.Unlock()
}

sync.RWMutex

简单来说:不限制并发读,只限制并发写和并发读写

详细来说:

1
2
3
4
5
6
7
8
9
10
11
12
一个RWMutex值常称为一个读写互斥锁,它的内部包含两个锁:一个写锁和一个读锁。 对于一个可寻址的RWMutex值rwm,数据写入者可以通过方法调用rwm.Lock()对rwm加写锁,或者通过rwm.RLock()方法调用对rwm加读锁。 方法调用rwm.Unlock()和rwm.RUnlock()用来解开rwm的写锁和读锁。 rwm的读锁维护着一个计数。当rwm.RLock()调用成功时,此计数增1;当rwm.Unlock()调用成功时,此计数减1; 一个零计数表示rwm的读锁处于未加锁状态;反之,一个非零计数(肯定大于零)表示rwm的读锁处于加锁状态。

对于一个可寻址的RWMutex值rwm,下列规则存在:
rwm的写锁只有在它的写锁和读锁都处于未加锁状态时才能被成功加锁。 换句话说,rwm的写锁在任何时刻最多只能被一个数据写入者成功加锁,并且rwm的写锁和读锁不能同时处于加锁状态。
当rwm的写锁正处于加锁状态的时候,任何新的对之加写锁或者加读锁的操作试图都将导致当前协程进入阻塞状态,直到此写锁被解锁,这样的操作试图才有机会成功。
当rwm的读锁正处于加锁状态的时候,新的加写锁的操作试图将导致当前协程进入阻塞状态。 但是,一个新的加读锁的操作试图将成功,只要此操作试图发生在任何被阻塞的加写锁的操作试图之前(见下一条规则)。 换句话说,一个读写互斥锁的读锁可以同时被多个数据读取者同时加锁而持有。 当rwm的读锁维护的计数清零时,读锁将返回未加锁状态。
假设rwm的读锁正处于加锁状态的时候,为了防止后续数据写入者没有机会成功加写锁,后续发生在某个被阻塞的加写锁操作试图之后的所有加读锁的试图都将被阻塞。
假设rwm的写锁正处于加锁状态的时候,(至少对于标准编译器来说,)为了防止后续数据读取者没有机会成功加读锁,发生在此写锁下一次被解锁之前的所有加读锁的试图都将在此写锁下一次被解锁之后肯定取得成功,即使所有这些加读锁的试图发生在一些仍被阻塞的加写锁的试图之后。
后两条规则是为了确保数据读取者和写入者都有机会执行它们的操作。

请注意:一个锁并不会绑定到一个协程上,即一个锁并不记录哪个协程成功地加锁了它。

example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package main

import (
"fmt"
"runtime"
"sync"
)

type Counter struct {
m sync.RWMutex
n uint64
}

func (c *Counter) Value() uint64 {
c.m.RLock()
defer c.m.RUnlock()
return c.n
}

func (c *Counter) Increase(delta uint64) {
c.m.Lock()
c.n += delta
c.m.Unlock()
}

func main() {
var c Counter
for i := 0; i < 100; i++ {
go func() {
for k := 0; k < 100; k++ {
c.Increase(1)
}
}()
}


for c.Value() < 10000 {
runtime.Gosched()
}
fmt.Println(c.Value()) // 10000
}

sync.WatiGroup

每个sync.WaitGroup值在内部维护着一个计数,此计数的初始默认值为零。

*sync.WaitGroup类型有三个方法:Add(delta int)、Done()和Wait()。

对于一个可寻址的sync.WaitGroup值wg:

  • 我们可以使用方法调用wg.Add(delta)来改变值wg维护的计数。
  • 方法调用wg.Done()和wg.Add(-1)是完全等价的。
  • 如果一个wg.Add(delta)或者wg.Done()调用将wg维护的计数更改成一个负数,一个恐慌将产生。
  • 当一个协程调用了wg.Wait()时,
    • 如果此时wg维护的计数为零,则此wg.Wait()此操作为一个空操作(no-op);
    • 否则(计数为一个正整数),此协程将进入阻塞状态。 当以后其它某个协程将此计数更改至0时(一般通过调用wg.Done()),此协程将重新进入运行状态(即wg.Wait()将返回)。

一般,一个sync.WaitGroup值用来让某个协程等待其它若干协程都先完成它们各自的任务。 一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func main() {
rand.Seed(time.Now().UnixNano())

const N = 5
var values [N]int32

var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
i := i
go func() {
values[i] = 50 + rand.Int31n(50)
fmt.Println("Done:", i)
wg.Done() // <=> wg.Add(-1)
}()
}

wg.Wait()
// 所有的元素都保证被初始化了。
fmt.Println("values:", values)
}

sync.Once

对一个可寻址的sync.Once值o,o.Do()方法调用可以在多个协程中被多次并发地执行,但有且只有一个调用的实参函数(值)将得到调用。 此被调用的实参函数保证在任何o.Do()方法调用返回之前退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package main

import (
"log"
"sync"
)

func main() {
log.SetFlags(0)

x := 0
doSomething := func() {
x++
log.Println("Hello")
}

var wg sync.WaitGroup
var once sync.Once
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
once.Do(doSomething)
log.Println("world!")
}()
}

wg.Wait()
log.Println("x =", x) // x = 1
}

在此例中,Hello将仅被输出一次,而world!将被输出5次,并且Hello肯定在所有的5个world!之前输出。

sync.Cond

sync.Cond类型提供了一种有效的方式来实现多个协程间的通知。
每个sync.Cond值拥有一个sync.Locker类型的名为L的字段。 此字段的具体值常常为一个sync.Mutex值或者sync.RWMutex值。
*sync.Cond类型有三个方法:Wait()、Signal()和Broadcast()。

每个Cond值维护着一个先进先出等待协程队列。 对于一个可寻址的Cond值c,

  • c.Wait()必须在c.L字段值的锁处于加锁状态的时候调用;否则,c.Wait()调用将造成一个恐慌。 一个c.Wait()调用将
    • 首先将当前协程推入到c所维护的等待协程队列;
    • 然后调用c.L.Unlock()对c.L的锁解锁;
    • 然后使当前协程进入阻塞状态;(当前协程将被另一个协程通过c.Signal()或c.Broadcast()调用唤醒而重新进入运行状态。)一旦当前协程重新进入运行状态,c.L.Lock()将被调用以试图重新对c.L字段值的锁加锁。 此c.Wait()调用将在此试图成功之后退出。
  • 一个c.Signal()调用将唤醒并移除c所维护的等待协程队列中的第一个协程(如果此队列不为空的话)。
  • 一个c.Broadcast()调用将唤醒并移除c所维护的等待协程队列中的所有协程(如果此队列不为空的话)。

c.Signal()c.Broadcast()调用常用来通知某个条件的状态发生了变化。 一般说来,c.Wait()应该在一个检查某个条件是否已经得到满足的循环中调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"fmt"
"math/rand"
"sync"
"time"
)

func main() {
rand.Seed(time.Now().UnixNano())

const N = 10
var values [N]string

cond := sync.NewCond(&sync.Mutex{})

for i := 0; i < N; i++ {
d := time.Second * time.Duration(rand.Intn(10)) / 10
go func(i int) {
time.Sleep(d) // 模拟一个工作负载
cond.L.Lock()
// 下面的修改必须在cond.L被锁定的时候执行
values[i] = string('a' + i)
cond.Broadcast() // 可以在cond.L被解锁后发出通知
cond.L.Unlock()
// 上面的通知也可以在cond.L未锁定的时候发出。
//cond.Broadcast() // 上面的调用也可以放在这里
}(i)
}

// 此函数必须在cond.L被锁定的时候调用。
checkCondition := func() bool {
fmt.Println(values)
for i := 0; i < N; i++ {
if values[i] == "" {
return false
}
}
return true
}

cond.L.Lock()
defer cond.L.Unlock()
for !checkCondition() {
cond.Wait() // 必须在cond.L被锁定的时候调用
}
}

参考: