开篇留给新冠病毒和科比,不解释。
“不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存” 这句话是所有gopher都听过的一句话,Golang用channel来实现其一直吹捧的CSP模式。
结构
上代码:
1 | type hchan struct { |
字段含义如下:
- qcount 记录了channel中全部元素的个数。
- dataqsiz 记录了channel中循环队列的长度。
- buf 缓冲区数据指针。
- elemsize 能够收发的元素大小。
- closed
- elemtype 能够收发的元素类型。
- sendx channel的发送操作处理到的位置。
- recvx channel的接收操作处理到的位置。
- sendq 当前channel由于缓冲区空间不足而阻塞的发送Goroutine列表。
- recvq 当前channel由于缓冲区空间不足而阻塞的接收Goroutine列表。
sudog 表示一个在等待列表中的oroutine,该结构体中存储了阻塞的相关信息以及两个分别指向前后runtime.sudog的指针。
lock保护在hchan的所有字段,以及在这个通道上被阻塞的sudogs的几个字段。
创建channel
1 | func makechan(t *chantype, size int) *hchan { |
上述代码根据 Channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 结构体和缓冲区:
- 不存在缓冲区(mem == 0),那么就只会为 runtime.hchan 分配一段内存空间
- 存储的类型不是指针类型(elem.ptrdata == 0),就会为当前的channel和底层的数组分配一块连续的内存空间
- 默认情况下会为hchan和缓冲区单独分配内存
最后更新dataqsiz(channel中循环队列的长度)、elemsize(能够收发的元素大小)、elemtype(能够收发的元素类型)。
向channel发送数据
1 | func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
发送过程会先检查当前chan的状态,是否为空、是否已经关闭等。完成检查会进入发送阶段。
1、先判断recvq中有没有阻塞的接收者,如果有则取出最先陷入等待的goroutine,直接发送给该goroutine并返回。
1 | func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
sendDirect会直接将数据拷贝到接收者的内存地址上,goready让接收者的goroutine处于待运行状态,在下一次调度的时候被唤醒。
2、如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满(c.qcount < c.dataqsiz),会将数据放入sendx中,并增加sendx索引和qcount计数器。如果sendx指向了队尾则回到开始位置(循环队列)。
3、当缓冲区已满时,当前goroutine会进入到阻塞状态。
- getg()获取发送数据使用的goroutine - gp。
- 执行acquireSudog()函数获sudog结构体并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在Select控制结构中和待发送数据的内存地址等。
- 将新建的sudog放入发送队列sendq中。
- goparkunlock()函数触发goroutine的调度让出处理器的使用权,将当前的goroutine陷入沉睡等待唤醒.
- 被唤醒后进行收尾工作并返回。
接收数据
1 | func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
检查当前channel是否为空,如果channel为空,那么就会直接调用gopark()挂起当前goroutine。
检查当前channel已经关闭并且缓冲区没有任何数据,直接返回。
1、先判断sendq中有没有阻塞的发送者,如果有则取出最先陷入等待的goroutine,直接从阻塞的发送者或者缓冲区中获取数据。
如果channel不存在缓冲区,recvDirect()会将channel发送队列中goroutine存储的elem数据拷贝到目标内存地址中.
如果channel存在缓冲区,将队列中的数据拷贝到接收方的内存地址,将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方。
1 |
|
2、当缓冲区存在数据时(c.qcount > 0),从channel的缓冲区中接收数据。会将数据放入recvx中,并增加recvx索引,减少qcount计数器。如果recvx指向了队尾则回到开始位置(循环队列)
3、当缓冲区中不存在数据时,等待其他goroutine向channel发送数据.
- getg()获取发送数据使用的goroutine - gp。
- 执行acquireSudog()函数获sudog结构体并设置这一次阻塞发送的相关信息.
- 将新建的sudog放入发送队列recvq中。
- goparkunlock()函数触发goroutine的调度让出处理器的使用权,将当前的goroutine陷入沉睡等待唤醒.
- 被唤醒后进行收尾工作并返回。
关闭channel
1 | func closechan(c *hchan) { |
当channel是一个空指针或者已经被关闭时,运行时都会直接panic并抛出异常。
将recvq和sendq两个队列中的数据加入到gList中,释放所有的reader和writer。
最后遍历gList中的goroutine,并触发调度。