Loading... # 0x01 <div class="tip inlineBlock info"> 本篇源码分析基于 **Go 1.14.12** 注意信息时效 </div> Go 语言中的 Channel 能实现在不同的 Goroutine 中收发信息,算是一个并发安全的队列,有点类似 Python 中的线程安全`Queue`对象,但是结构比起`Queue`简单的加锁`list append popleft`有趣的多。 ```python # Python 中 Queue 忽略一堆加锁操作的实际代码 # Put a new item in the queue def _put(self, item): self.queue.append(item) # Get an item from the queue def _get(self): return self.queue.popleft() ``` # Channel 流程动画 <div class="hideContent">该部分仅登录用户可见</div> # Channel 的数据结构 按[上篇文章](https://gvoidy.cn/index.php/archives/161/)的 `go tool compile`方法编译找到 Channel 的创建函数 ```go [root@94a57e6a9e5b src]# go tool compile -S test_chan.go ... 0x0031 00049 (test_chan.go:6) CALL runtime.makechan(SB) ``` 然后再看`runtime.makechan()`这个函数的函数头 ```go func makechan(t *chantype, size int) *hchan { ... } ``` 就可以找到我们 Channel 的数据结构真身 `hchan`了。 > 这里先剧透一下,Channel 中的 buf 是一个环形队列,具体原因我们后面讲 ```go type hchan struct { qcount uint // 队列中当前元素个数 dataqsiz uint // 环形队列定义时的总长度, buf unsafe.Pointer // 指向一个长为 dataqsiz 数组的指针 elemsize uint16 // 单个元素大小 closed uint32 // channel 是否关闭 elemtype *_type // 元素类型 sendx uint // 当前 send 进这个 channel 的元素对应的 buf 下标, c <- i recvx uint // 当前 recv 出这个 channel 的元素对应的 buf 下标, <- c recvq waitq // 等待 recv 数据的 goroutine 链表 sendq waitq // 等待 send 数据的 goroutine 链表 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex } ``` # Channel 的创建 凭我们对 Go 中 Channel 的使用经验可以得知,Channel 的创建分为有缓冲区和无缓冲区两种,即 ```go // 无缓冲 ch1 := make(chan int) // 有缓冲 ch2 := make(chan int, 3) ``` 根据 `go tool compile`工具我们可以得知两种创建方式均为 `runtime.makechan()`这个函数,然后创建规则最重要的就是这一段创建 `hchan`的`buf`结构,其他的就是些简单的判断和初始化。 ```go // mem = elem 类型长度 * buf 长度 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } switch { case mem == 0: // Queue or element size is zero. // 队列或元素size为0,不分配 buf,即 make(chan int) c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. // buf 指向自身,不额外分配内存 c.buf = c.raceaddr() case elem.ptrdata == 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. // 分配一整块内存,用于存储 hchan 和 buf c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // Elements contain pointers. // elem 是指针类型,正常分配 hcchan, buf 单独分配 c = new(hchan) c.buf = mallocgc(mem, elem, true) } ``` # Channel Send  我们都知道当 Channel 的缓冲区没满的情况可以一直往里面塞元素,直到他满了继续塞会阻塞当前 goroutine。 那么我们可以假设,缓冲区没有满的情况下,我们是一直往他的 buf 里塞数据,等他满了,我们就保留一下现场把当前 send 挂到 channel 的 sendq 下,然后挂起当前G,当前 M 继续执行下一个 G。 <div class="tip inlineBlock info"> 有关 GMP 的介绍请看 [GO 的调度组件与调度循环](https://gvoidy.cn/index.php/archives/142/) </div> ```go ch := make(chan int, 3) go func() { ch <- 0 ch <- 1 ch <- 2 // 阻塞 ch <- 3 }() ``` ## buf为空,且 recvq 不为空  ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... } ``` ```go func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) } ``` 直接把 send 的元素即 `ep` 传给挂在 `recvq`上的 sudog, 并让等待 recv 的 G ready。 ## buf 为空,且 recvq 为空  ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } ... } ``` 将 send 到 channel 的元素 `ep`存入buf中,且让 `sendx`buf 下标计数器和`qcount`总数计数器 +1,如果`sendx`和`dataqsiz`相等了,那么将`sendx`置零,由此实现了一个环形数组。目的就是为了当数组里的元素被 recv 了再 send 可以找到空位。即: 假设 `dataqsiz` 等于 3, 即缓冲区有三个位置,初始状态下 `sendx`为0,第一个 `ch <- 1`进来往 buf 第 0 个位置上插,也就是 `sendx`的位置。如果缓冲区满了,即 `sendx`等于 2,根据代码我们可以知道,后续如果还是 send 那么是会往 `sendq` 链表上插而不是 `buf`上了,而我们这个 `sendx` 是确定往 `buf` 上插入的下标。所以等我们 `sendq`全部清空且拿走了`buf`上`recvx`位置的元素(即位置 0 的元素),那么我们再后续一个 send 是不是就要往 0 号位上插了?所以这就是为什么有`if c.sendx == c.dataqsiz{ c.sendx = 0 }` 的原因。 ## buf 满  ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) ... } ``` 就是把当前 send 打包成 sudog 放到 `sendq` 链表里,然后调用 `gopark`把这个 G 挂起。 ## 无缓冲区 channel <div class="tip inlineBlock info"> 无缓冲区 channel 其实就是 `datasiz`为 0 的情况 </div> 通过上面的结论,我们可以得出,因为`if c.qcount < c.dataqsiz`永远不成立,所以无缓冲区的 send 只能往 sendq 链表上挂,或者走**buf为空,且 recvq 不为空**的情况 # Channel Recv  ## buf 满,且 sendq 不为空  ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ... } ``` ```go func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { ... } else { // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) } ``` 如果当前 channel sendq 链表不为空且有缓冲区,则将 buf 中的元素拷贝给 recv ,再把 sendq 链表中的第一个值拷贝到 buf 中,并修改 recvx 和 sendx。同样,recvx 也是环形数组的下标,用于确认下一个 recv 的元素在 buf 中的第几个位置。 ## buf 不为空,sendq 为空  ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } ... } ``` 将 buf 中的 recvx 位置的元素拷贝给 recv 然后将该位置的清空,并修改 recvx ## buf 为空 > make(chan int) 缓冲区为空也是这种情况  ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. atomic.Store8(&gp.parkingOnChan, 1) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... } ``` 当 dataqsiz 为零(即初始化一个无缓冲区的 channel)同时 sendq 链表是空的情况,或者 dataqsiz 不为零(即初始化一个有缓冲区的 channel)同时 qcount 为零 (即 channel 的buf里没有元素) 那么我们的 recv 什么也做不了,只能打包一下 sudog 然后挂载 recvq 上最后再调用 `gopark()`挂起这个 G。 ## 无缓冲区 channel,且 sendq 不为空 ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ... } ``` ```go func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } ... } ``` 直接把 sendq 中的值拷贝给 recv # Channel 收发总结 - 发送流程(send)  - 接收流程(recv)  # Select Channel ```go package main import ( "fmt" "time" ) func main() { ch := make(chan int, 3) go func() { select { case x := <-ch: fmt.Printf("%d", x) default: fmt.Printf("Nothing\n") } }() fmt.Println("Done!") time.Sleep(time.Hour) } ``` 先随便写个 `select`,然后使用`go tool comile`看函数调用  看到这个函数名`selectnbrecv()`感觉非常有既视感,想去查查是不是有`selectnbsend()`  发现果然有,那么我们继续分开分析。 ## Select Send ```go func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) } ``` 看这个函数。。怎么感觉那么眼熟!好吧其实就是上面我们分析的几种 send 的情况,然后通过该函数返回的布尔值来选择 case。 ## Select Recv ```go func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(c, elem, false) return } ``` 然后我们也能发现 recv 的情况也差不多。 # Close channel 理代码前先写下结论,关闭 Channel 有以下要注意的三个点: - 不能关闭已经关闭的 Channel 和不能关闭为 nil 的 Channel - 关闭 Channel 前要**释放所有正在等待 接受`recv`/ 发送`send`**的 G,即清理掉所有挂在`sendq`和`recvq`上的`sudog` - 在缓冲区 `buf` 中的内容不会随着 Channel 的 Close 而清除 老规矩,还是先找到`close(ch)`的函数位置  来看代码,首先是我们上面结论的第一点,对 Channel 是否能关闭做判断,如果能关闭,就将 Channel 的 `closed`状态位改为 1 ```go func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { ... } c.closed = 1 ... } ``` 然后是释放所有挂在`sendq`和`recvq`上的`sudog` ```go func closechan(c *hchan) { ... // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { // 释放 sendq 逻辑同上 ... } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } } ``` 先把所有挂在`sendq`和`recvq`的`sudog`内的`elem`置为nil,好让 GC 回收内存,然后将这些`sudog`全部打包到一个结构为 ```go // A gList is a list of Gs linked through g.schedlink. A G can only be // on one gQueue or gList at a time. type gList struct { head guintptr } ``` 的链表里,然后对这些`sudog`一个个执行`goready()`让他们从阻塞态恢复出来。 # Channel 的锁机制 很多教程里都教我们一句话:**考虑并发问题前,首先想到 Channel 然后再想锁**,那么难道 Channel 里就没有锁了吗?其实不是的,我们回过头去看 Channel 数据结构 `hchan`,会发现他有一把保护整个数据结构的大锁。 ```go type hchan struct { ... // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. //锁保护hchan中的所有字段 //sudog 上的字段会被阻塞再这个 Channel 上。 //保持这个锁时不要改变另一个G的状态 //(特别是,不要ready一个G),因为这可能会死锁 //使用堆栈收缩。 lock mutex } ``` 然后我们细究前面我们分析的所有方法,均会发现完成是否能执行操作,在执行具体操作发生前均会有一个 `lock(&c.lock)`然后完成操作后均会有`unlock(&c.lock)`。所以,Channel 并不是无锁的! 至于为什么那么多文章推崇用 Channel 实现并发而不是自己用锁去维护,个人认为他们的初衷是想表达 Channel 是一个优雅的工具,Go 语言内部帮我们优化了绝大多数并发的场景。所以才推崇使用 Channel 而不是自己原生的用锁去实现,因为及时我们没用 Channel 而是自己造轮子,最终可能也是做了一个和 Channel 类似的结构。 但是具体情况具体分析, Channel 也不是万能的。 # Reference [深入理解go-channel和select的原理](https://juejin.cn/post/6844903940190912519#heading-28) [go channel 使用及机制流程汇总](https://juejin.cn/post/6844904036609556494) [go channel 详解](https://colobu.com/2016/04/14/Golang-Channels/) 最后修改:2021 年 07 月 09 日 01 : 21 PM © 允许规范转载 赞赏 如果觉得我的文章对你有用,请随意赞赏 赞赏作者 支付宝微信
2 条评论
是大佬,我死了。