07.Golang 通道(channel )源码分析(一、定义与创建关闭)

Golang 通道(channel )源码分析(一、定义与创建关闭)

注意当前go版本代码为1.23

Channel 在运行时的内部表示是 runtime.hchan

定义

Go 语言的 Channel 在运行时使用 runtime.hchan 结构体表示,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type hchan struct {
qcount uint // 队列中已有的数据数量。
dataqsiz uint // 循环队列的容量大小。
buf unsafe.Pointer // 指向底层循环数组的指针。数组大小为 dataqsiz 个元素. 由于元素类型不确定,所以使用 unsafe.Pointer.
elemsize uint16 // 每个元素的大小(以字节为单位)。
closed uint32 // 通道是否已关闭的标志。0 表示未关闭,非 0 表示已关闭。
timer *timer // 计时器,用于带有超时发送/接收操作的通道。
elemtype *_type // 元素的类型信息。
sendx uint // 发送索引,指向下一个可发送数据的位置(循环队列中的索引)。
recvx uint // 接收索引,指向下一个可接收数据的位置(循环队列中的索引)。
recvq waitq // 等待接收数据的 goroutine 队列。
sendq waitq // 等待发送数据的 goroutine 队列。

// lock 保护 hchan 中的所有字段,以及阻塞在该通道上的 sudog 中的几个字段。
lock mutex
}

type waitq struct {
first *sudog // 队列中的第一个 sudog,代表第一个等待的 goroutine。
last *sudog // 队列中的最后一个 sudog,代表最后一个等待的 goroutine。
}

buf 指向底层循环数组,只有缓冲型的 channel 才有。

sendxrecvx 均指向底层循环数组,表示当前可以发送和接收的元素位置索引值(相对于底层数组)。

sendqrecvq 分别表示被阻塞的 goroutine,这些 goroutine 由于尝试读取 channel 或向 channel 发送数据而被阻塞。

waitqsudog 的一个双向链表,而 sudog 实际上是对 goroutine 的一个封装:runtime.sudog 表示一个在等待列表中的 Goroutine,该结构中存储了两个分别指向前后 runtime.sudog 的指针以构成链表。

例如,创建一个容量为 6 的,元素为 int 型的 channel 数据结构如下(采用自Go 程序员面试笔试宝典) :

chan data structure

创建

源码位置:runtime.makechan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// makechan 创建一个新的通道
func makechan(t *chantype, size int) *hchan {
// 获取通道的元素类型
elem := t.Elem

// 。。。。。。省略前置检查与对齐

// 计算所需内存大小,检查是否溢出或超出最大分配限制
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: 尺寸超出范围"))
}

// 根据元素是否包含指针,决定如何分配内存
var c *hchan
switch {
case mem == 0:
// 如果队列或元素大小为零,直接分配hchan大小
c = (*hchan)(mallocgc(hchanSize, nil, true))
// 为竞争检测器设置同步位置
c.buf = c.raceaddr()
case !elem.Pointers():
// 如果元素不包含指针,一次性分配hchan和缓冲区,为当前的 Channel 和底层的数组分配一块连续的内存空间;
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指针,需要分别分配hchan和缓冲区
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}

// 初始化 hchan 结构体的字段。
c.elemsize = uint16(elem.Size_) // 元素大小
c.elemtype = elem // 元素类型信息
c.dataqsiz = uint(size) // 缓冲区大小
lockInit(&c.lock, lockRankHchan) // 初始化 channel 的锁

// 如果开启了通道调试,打印创建通道的信息
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}

关闭

源码位置:runtime.closechan

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
// closechan 函数用于关闭一个 channel。
// c 参数是指向 hchan 结构体的指针,表示要关闭的 channel。
func closechan(c *hchan) {
// 。。。。。忽略前置检查

// 将 channel 的 closed 字段设置为 1,表示 channel 已关闭。
c.closed = 1

// 创建一个空的 gList,用于存储需要被唤醒的 goroutine。
var glist gList

// 释放所有正在等待接收数据的 goroutine (readers)。
for {
// 从 recvq (接收队列) 中取出一个等待的 sudog。
sg := c.recvq.dequeue()
// 如果 recvq 为空,则跳出循环。
if sg == nil {
break
}
// 如果 sudog 的 elem 字段不为 nil (表示它正在等待接收数据)。
if sg.elem != nil {
// 清空 sudog 的 elem 字段指向的内存区域 (将接收到的值设置为零值)。
typedmemclr(c.elemtype, sg.elem)
// 将 sudog 的 elem 字段设置为 nil。
sg.elem = nil
}
// 如果 sudog 的 releasetime 字段不为 0 (表示它被阻塞了)。
if sg.releasetime != 0 {
// 更新 sudog 的 releasetime 字段为当前时间。
sg.releasetime = cputicks()
}
// 获取 sudog 关联的 goroutine。
gp := sg.g
// 将 sudog 的指针存储在 goroutine 的 param 字段中。
gp.param = unsafe.Pointer(sg)
// 将 sudog 的 success 字段设置为 false,表示接收操作失败。
sg.success = false
// 如果启用了竞态检测,则获取 goroutine 的竞态资源。
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将 goroutine 添加到 glist 中。
glist.push(gp)
}

// 释放所有正在等待发送数据的 goroutine (writers)。
// 这些 goroutine 将会 panic。
for {
// 从 sendq (发送队列) 中取出一个等待的 sudog。
sg := c.sendq.dequeue()
// 如果 sendq 为空,则跳出循环。
if sg == nil {
break
}
// 将 sudog 的 elem 字段设置为 nil (表示它发送的数据不会被接收)。
sg.elem = nil
// 如果 sudog 的 releasetime 字段不为 0 (表示它被阻塞了)。
if sg.releasetime != 0 {
// 更新 sudog 的 releasetime 字段为当前时间。
sg.releasetime = cputicks()
}
// 获取 sudog 关联的 goroutine。
gp := sg.g
// 将 sudog 的指针存储在 goroutine 的 param 字段中。
gp.param = unsafe.Pointer(sg)
// 将 sudog 的 success 字段设置为 false,表示发送操作失败。
sg.success = false
// 如果启用了竞态检测,则获取 goroutine 的竞态资源。
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将 goroutine 添加到 glist 中。
glist.push(gp)
}
unlock(&c.lock)

// 现在已经释放了 channel 的锁,可以安全地唤醒所有在 glist 中的 goroutine。
// 唤醒这些 goroutine 的操作必须在释放锁之后进行,以避免死锁。
for !glist.empty() {
// 从 glist 中取出一个 goroutine。
gp := glist.pop()
// 将 goroutine 的 schedlink 字段设置为 0。schedlink 通常用于将 goroutine 链接到调度器的运行队列中。
// 设置为 0 表示该 goroutine 不在任何运行队列中。
gp.schedlink = 0
// 使用 goready 函数将 goroutine 标记为就绪状态,并将其放入全局运行队列或 P 的本地运行队列。
// 第三个参数 3 表示唤醒的优先级,通常用于表示该 goroutine 是由于 channel 操作被唤醒的。
goready(gp, 3)
}
}

问题

1.go 可以从一个关闭的channel里面读取数据吗?

可以,Go 语言可以从一个已经关闭的 channel 中读取数据。

具体情况如下:

  1. 读取已存在的数据: 当一个 channel 被关闭后,你仍然可以从中读取之前发送到 channel 中但尚未被接收的数据。一旦 channel 中所有已存在的数据都被读取完毕,后续的读取操作将会立即返回该 channel 类型的零值,并且不会阻塞。

  2. 判断 channel 是否关闭: 从 channel 接收数据时,可以使用两个返回值的形式来判断 channel 是否已经关闭:

    1
    value, ok := <-ch
    • value:接收到的值。如果 channel 已关闭且没有数据,则为该类型的零值。
    • ok: 布尔值。
      • 如果 oktrue,表示成功从 channel 接收到了一个值。
      • 如果 okfalse,表示 channel 已经被关闭且没有更多数据可读取。

示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"fmt"
"time"
)

func main() {
ch := make(chan int, 3)

// 发送一些数据到 channel
ch <- 1
ch <- 2
ch <- 3

// 关闭 channel
close(ch)

// 从 channel 中读取数据
for i := 0; i < 5; i++ {
value, ok := <-ch
fmt.Printf("Value: %d, OK: %t\n", value, ok)
time.Sleep(time.Millisecond * 100) // 为了观察输出
}

// 使用 range 循环读取,直到 channel 关闭
ch2 := make(chan string, 2)
ch2 <- "hello"
ch2 <- "world"
close(ch2)
for v := range ch2 {
fmt.Println("Range:", v)
}
}

输出:

1
2
3
4
5
6
7
Value: 1, OK: true
Value: 2, OK: true
Value: 3, OK: true
Value: 0, OK: false
Value: 0, OK: false
Range: hello
Range: world

总结:

  • 关闭 channel 后,仍然可以读取其中已有的数据。
  • 使用 value, ok := <-ch 可以判断 channel 是否关闭以及是否还有数据。
  • 使用 range 循环可以方便地读取 channel 中的数据,直到 channel 关闭。

需要注意:

  • 向一个已经关闭的 channel 发送数据会导致 panic。
  • 关闭一个未初始化的 channel (nil channel) 也会导致 panic。
  • 重复关闭一个 channel 也会导致 panic。
操作 nil channel closed channel not nil, not closed channel
close panic panic 正常关闭
读 <- ch 阻塞 读到对应类型的零值 阻塞或正常读取数据。缓冲型 channel 为空或非缓冲型 channel 没有等待发送者时会阻塞
写 ch <- 阻塞 panic 阻塞或正常写入数据。非缓冲型 channel 没有等待接收者或缓冲型 channel buf 满时会被阻塞

因此,在操作 channel 时,需要谨慎处理关闭操作,确保不会出现上述 panic 情况。 通常,由发送方负责关闭 channel。

2.关于如何优雅关闭通道:如何优雅地关闭通道

参考链接

1.Go 程序员面试笔试宝典

2.《Go学习笔记》

3.Go 语言设计与实现

4.《如何优雅地关闭通道》


07.Golang 通道(channel )源码分析(一、定义与创建关闭)
https://blog.longpi1.com/2024/12/06/07-Golang-通道(channel-)源码分析(一、定义与数据结构)/