08.Golang 通道(channel )源码分析(二、数据发送与接收)

Golang 通道(channel )源码分析(二、数据发送与接收)

注意当前go版本代码为1.23

Channel 在运行时的内部表示是 runtime.hchan

介绍

当向 Channel 发送数据时,就需要使用 ch <- i 语句,编译器会将它解析成 OSEND 节点并在 cmd/compile/internal/gc.walkexpr 中转换成

而接收 Channel数据时,可以使用两种不同的方式去接收 Channel 中的数据:

1
2
i <- ch
i, ok <- ch

最终会通过编译器转换成 runtime.chanrecv1runtime.chanrecv2 两种不同函数的调用,这两个函数最终会调用 runtime.chanrecv进行处理。

数据发送

runtime.chansend

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
// chansend 函数实现向 channel 发送数据的核心逻辑
// c: channel指针
// ep: 要发送的数据的指针
// block: 是否阻塞
// callerpc: 调用者的程序计数器
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 。。。。。忽略前置检查逻辑

// 情况1: 如果存在等待接收的 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// 找到一个等待的接收者。我们将要发送的值直接传递给接收者,绕过通道缓冲区(如果有)。
// send 函数负责调用sendDirect方法将数据从发送者数据 ep 传递给接收者 sg,成功后唤醒接收者。
// 第三个参数是一个函数,它将在数据传递完成后被调用,这里是解锁。
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}

// 情况2: 如果通道缓冲区有空间
if c.qcount < c.dataqsiz {
// 空间在通道缓冲区中可用。将要发送的元素入队。
// 获取下一个可用的缓冲区位置
qp := chanbuf(c, c.sendx)
// 如果开启了 race 竞争检测
if raceenabled {
// 通知竞争检测器,将要向通道缓冲区写入数据
racenotify(c, c.sendx, nil)
}
// 将数据从 ep 复制到缓冲区
typedmemmove(c.elemtype, qp, ep)
// 发送索引加 1
c.sendx++
// 如果发送索引到达缓冲区末尾,则循环回到开头
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}

// 对于非阻塞发送,如果没有接收者且缓冲区已满,返回失败
if !block {
unlock(&c.lock)
return false
}

// 情况3:代码执行到这里,说明是阻塞模式 (block == true) 且不存在缓冲区或者缓冲区已满,需要阻塞当前 goroutine 等待接收者

// 获取当前 goroutine 的指针
gp := getg()
// 从 sudog 池中分配一个 sudog 结构体,用于表示当前 goroutine 在通道上的等待信息
mysg := acquireSudog()
// 初始化 sudog 的 releasetime 字段为 0,表示尚未记录阻塞时间
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}

// 在将要发送的数据指针分配给 mysg.elem 和将 mysg 加入到 gp.waiting 链表之间,不允许发生栈分裂。
// 这是为了防止 copystack 在进行栈复制时,无法正确找到 mysg.elem 指向的数据。
// 将要发送的数据的指针保存到 sudog 中
mysg.elem = ep
// 将 sudog 的 waitlink 指针置为 nil,表示当前 sudog 还没有加入到等待队列中
mysg.waitlink = nil
mysg.g = gp // 关联当前goroutine
// 将 sudog 的 isSelect 字段设置为 false,表示当前操作不是 select 语句的一部分
mysg.isSelect = false
mysg.c = c // 关联当前channel
gp.waiting = mysg // 设置goroutine的等待对象
// 将当前 goroutine 的 param 字段置为 nil,用于在唤醒时传递额外信息(这里不需要)
gp.param = nil
// 将 mysg 加入到通道的发送队列 c.sendq 中
c.sendq.enqueue(mysg)

// parkingOnChan 字段设置为 true,通知其他goroutine,当前goroutine 即将因channel操作而阻塞
// 这个时间窗口内不能进行栈收缩,因为G的状态正在改变
gp.parkingOnChan.Store(true)

// 调用 gopark 函数阻塞当前 goroutine
// chanparkcommit 函数用于在真正阻塞前做一些提交操作,例如解锁等
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

// 确保待发送的值在接收者复制完成前不会被垃圾回收
// sudog虽然有指向栈对象的指针,但不会被栈追踪器视为根对象
KeepAlive(ep)

// 被唤醒后的处理逻辑

// 检查当前 goroutine 的 waiting 字段是否仍然指向 mysg,如果不一致,说明等待列表被破坏了
if mysg != gp.waiting {
throw("G waiting list is corrupted") // 等待列表损坏
}
// 将当前 goroutine 的 waiting 字段置为 nil,表示当前 goroutine 不再等待任何 sudog
gp.waiting = nil
// 将当前 goroutine 的 activeStackChans 字段设置为 false, 表示当前 goroutine 不在 channel 操作相关的栈上
gp.activeStackChans = false
// 检查 mysg 的 success 字段,如果为 false,则表示发送失败(例如通道被关闭)
closed := !mysg.success
gp.param = nil // 清空参数

// 记录阻塞事件用于性能分析
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}

// 清理sudog
mysg.c = nil
releaseSudog(mysg)

// 处理channel关闭的情况
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup") // 异常唤醒
}
panic(plainError("send on closed channel")) // channel已关闭则panic
}
return true // 发送成功
}

从上述代码可以知道,go通道发送主要流程总结如下:

发送流程:

  1. 空 Channel 检查 && 竞态检测 && (非阻塞)快速检测 && 加锁 && Channel 关闭检查:
  2. 三种发送情况:
    • 情况 1: 存在等待的接收者 (c.recvq.dequeue() != nil)
      • 从接收队列 c.recvq 中取出一个等待的接收者 sg
      • 调用 send(c, sg, ep, func() { unlock(&c.lock) }, 3) 直接将数据从发送者 ep 传递给接收者 sg,绕过缓冲区。
      • 发送成功后唤醒接收者 sg
      • 发送成功,返回 true
    • 情况 2: 缓冲区有空间 (c.qcount < c.dataqsiz)
      • 计算下一个可用缓冲区位置 qp
      • 如果启用了竞态检测,调用 racenotify 通知将要向缓冲区写入数据。
      • 使用 typedmemmove 将数据从 ep 复制到缓冲区 qp
      • 更新发送索引 c.sendx 和缓冲区元素计数 c.qcount
      • 释放锁 c.lock
      • 发送成功,返回 true
    • 情况 3: 不存在缓冲区或者缓冲区已满且为阻塞模式 (block == true)
      • 获取当前 goroutine 的指针 gp
      • sudog 池中分配一个 sudog 结构体 mysg
      • 初始化 mysg 的各个字段,包括要发送的数据指针 ep、当前 goroutine gp、当前 channel c 等。
      • mysg 加入到 channel 的发送队列 c.sendq 中。
      • 设置 gp.parkingOnChantrue,表示 goroutine 即将因 channel 操作阻塞。
      • 调用 gopark 阻塞当前 goroutine,等待被唤醒。
      • gopark 会释放锁 c.lock
      • 被唤醒后,检查 gp.waiting 是否仍然指向 mysg,确保等待列表的完整性。
      • 检查 mysg.success 字段,判断发送是否成功。
      • 清理 mysg 并释放。
      • 如果 closedtrue (发送失败),且 channel 未关闭,则抛出 “spurious wakeup” 异常
      • 返回 (true, success)

数据接收

runtime.chanrecv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// chanrecv 从通道 c 接收数据并将其写入 ep。
// ep 可以为 nil,在这种情况下,接收到的数据将被忽略。
// 如果 block == false 并且没有可用的元素,则返回 (false, false)。
// 否则,如果 c 已关闭,则将 *ep 清零并返回 (true, false)。
// 否则,用一个元素填充 *ep 并返回 (true, true)。
// 非 nil 的 ep 必须指向堆或调用者的栈。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 。。。。。忽略前置检查逻辑

// 加锁,保护通道状态
lock(&c.lock)

// 如果通道已关闭
if c.closed != 0 {
// 并且缓冲区中没有数据
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
// 解锁
unlock(&c.lock)
// 如果 ep 非空,则将 ep 指向的数据清零
if ep != nil {
typedmemclr(c.elemtype, ep)
}
// 返回 true, false,表示通道已关闭且没有接收到数据
return true, false
}
// 通道已关闭,但通道的缓冲区中有数据。
} else {
// 情况1: 存在等待的发送者且通道未关闭。
// 从发送队列中取出一个 sudog
if sg := c.sendq.dequeue(); sg != nil {
/*
调用recv函数接收数据
该函数会根据缓冲区的大小分别处理不同的情况:
1.如果 Channel 不存在缓冲区;
调用 runtime.recvDirect 将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中;
2.如果 Channel 存在缓冲区;
将队列中的数据拷贝到接收方的内存地址;
将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;
*/
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 情况2: 如果缓冲区中有数据
if c.qcount > 0 {
// 直接从缓冲区接收数据
qp := chanbuf(c, c.recvx) // 获取接收索引位置的指针
if raceenabled {
// 通知 race detector 发生了数据访问
racenotify(c, c.recvx, nil)
}
// 如果 ep 非空,则将数据从缓冲区复制到 ep 指向的内存
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 清空缓冲区中该位置的数据
typedmemclr(c.elemtype, qp)
// 接收索引加一
c.recvx++
// 如果接收索引到达缓冲区末尾,则将其重置为 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓冲区中数据数量减一
c.qcount--
unlock(&c.lock)
// 返回 true, true,表示成功接收到数据
return true, true
}

// 如果是非阻塞模式
if !block {
unlock(&c.lock)
return false, false // 返回 false, false,表示非阻塞模式下没有接收到数据
}

// 情况3: 不存在缓冲区或者缓冲区为空且为阻塞模式

// 获取当前 goroutine
gp := getg()
// 获取一个 sudog 结构体
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 在将 elem 赋值和将 mysg 入队到 gp.waiting 之间没有栈分割,
// 这样 copystack 就可以找到它。
mysg.elem = ep // 存储接收数据的地址
mysg.waitlink = nil // sudog 链表指针
gp.waiting = mysg // 将 sudog 设置为当前 goroutine 的 waiting sudog

mysg.g = gp // sudog 关联的 goroutine
mysg.isSelect = false // 表示不是 select 语句
mysg.c = c // sudog 关联的通道
gp.param = nil // 在被唤醒时,发送方会将数据地址存储在这里,或者将 nil 存储在这里表示通道已关闭
// 将 sudog 入队到接收队列
c.recvq.enqueue(mysg)
// 如果设置了通道定时器,则阻塞定时器
if c.timer != nil {
blockTimerChan(c)
}

// 向任何试图收缩该协程栈的人发出信号,表明即将停放在一个通道上。
// 在此 G 的状态更改和设置 gp.activeStackChans 之间的窗口对于栈收缩是不安全的。
gp.parkingOnChan.Store(true)
// 阻塞当前 goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)

// 被唤醒后的处理逻辑

// 检查当前 goroutine 的 waiting 字段是否仍然指向 mysg,如果不一致,说明等待列表被破坏了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 如果设置了通道定时器,则解除阻塞
if c.timer != nil {
unblockTimerChan(c)
}
gp.waiting = nil // 清空 waiting sudog
gp.activeStackChans = false // 允许栈收缩
// 如果记录了阻塞时间,则记录阻塞事件
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 获取操作是否成功的标志
success := mysg.success
gp.param = nil // 清空 param
mysg.c = nil // 清空 sudog 关联的通道
// 释放 sudog
releaseSudog(mysg)
// 返回 true 和 success 标志
return true, success
}

主要逻辑和情况处理:

  1. 空 Channel 检查 && 竞态检测 && (非阻塞)快速检测 && 加锁 && Channel 关闭检查:
  2. 通道未关闭:
    • 情况 1: 存在等待的发送者 (c.sendq 不为空):
      • 调用recv函数接收数据,如果 Channel 不存在缓冲区,调用 runtime.recvDirect 将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中;如果 Channel 存在缓冲区;将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;
    • 情况 2: 缓冲区中有数据 (c.qcount > 0):
      • 直接从缓冲区 c.recvx 索引处读取数据。
      • 如果 ep 不为 nil,则将数据拷贝到 ep 指向的内存。
      • 更新 c.recvxc.qcount
      • 返回 (true, true)
    • 情况 3: 不存在缓冲区或者缓冲区为空且为阻塞模式:
      • 获取当前 goroutine 的指针 gp
      • sudog 池中分配一个 sudog 结构体 mysg
      • 初始化 mysg 的各个字段,包括要发送的数据指针 ep、当前 goroutine gp、当前 channel c 等。
      • mysg 加入到 channel 的发送队列 c.recvq 中。
      • 调用 gopark 函数阻塞当前 goroutine,等待被唤醒。
      • 被唤醒后,检查 gp.waiting 是否仍然指向 mysg,确保等待列表的完整性。
      • 检查 mysg.success 字段,判断发送是否成功。
      • 清理 mysg 并释放。
      • 返回 (true, success)

问题

操作 nil channel closed channel not nil, not closed channel
close panic panic 正常关闭
读 <- ch 阻塞 读到对应类型的零值 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <- 阻塞 panic 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

1.Channel 发送和接收数据的本质是什么?

All transfer of value on the go channels happens with the copy of value.

就是说 channel 的发送和接收操作本质上都是 “值的拷贝”,无论是从 sender goroutine 的栈到 chan buf,还是从 chan buf 到 receiver goroutine,或者是直接从 sender goroutine 到 receiver goroutine。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
type user struct {
name string
age int8
}

var u = user{name: "Ankur", age: 25}
var g = &u

func modifyUser(pu *user) {
fmt.Println("modifyUser Received Vaule", pu)
pu.name = "Anand"
}

func printUser(u <-chan *user) {
time.Sleep(2 * time.Second)
fmt.Println("printUser goRoutine called", <-u)
}

func main() {
c := make(chan *user, 5)
c <- g
fmt.Println(g)
// modify g
g = &user{name: "Ankur Anand", age: 100}
go printUser(c)
go modifyUser(g)
time.Sleep(5 * time.Second)
fmt.Println(g)
}
1
2
3
4
5
输出结果:
&{Ankur 25}
modifyUser Received Vaule &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}

output

参考链接

1.Go 程序员面试笔试宝典

2.《Go学习笔记》

3.Go 语言设计与实现

4.《Diving Deep Into The Golang Channels.》


08.Golang 通道(channel )源码分析(二、数据发送与接收)
https://blog.longpi1.com/2024/12/14/08-Golang-通道(channel-)源码分析(二、数据发送与接收)/