type hchan struct { qcount uint// total data in the queue dataqsiz uint// size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements // channel中元素大小 elemsize uint16 // 是否已关闭 closed uint32 // channel中元素类型 elemtype *_type // element type sendx uint// send index recvx uint// receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
// isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock.
acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
// compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") }
// 计算缓冲区需要的总大小(缓冲区大小*元素大小),并判断是否超出最大可分配范围 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) }
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. var c *hchan switch { case mem == 0: // 缓冲区大小为0,或者channel中元素大小为0(struct{}{})时,只需分配channel必需的空间即可 // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // 通过位运算知道channel中元素类型不是指针,分配一片连续内存空间,所需空间等于 缓冲区数组空间 + hchan必需的空间。 // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素中包含指针,为hchan和缓冲区分别分配空间 // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) }
var t0 int64 if blockprofilerate > 0 { t0 = cputicks() }
// 加锁 lock(&c.lock)
// 如果channel已关闭 if c.closed != 0 { // 解锁,直接panic unlock(&c.lock) panic(plainError("send on closed channel")) }
// 除了以上情况,当channel未关闭时,就有以下几种情况:
// 1、当存在等待接收的Goroutine if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any).
// 2、当缓冲区未满时 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. // 获取指向缓冲区数组中位于sendx位置的元素的指针 qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } // 将当前发送的值拷贝到缓冲区 typedmemmove(c.elemtype, qp, ep) // sendx索引加一 c.sendx++ // 因为是循环队列,sendx等于队列长度时置为0 if c.sendx == c.dataqsiz { c.sendx = 0 } // 队列中元素总数加一,并解锁,返回发送成功 c.qcount++ unlock(&c.lock) returntrue }
// 3、当既没有等待接收的Goroutine,缓冲区也没有剩余空间,如果是非阻塞的发送,那么直接解锁,返回发送失败 if !block { unlock(&c.lock) returnfalse }
// Block on the channel. Some receiver will complete our operation for us. // 4、如果是阻塞发送,那么就将当前的Goroutine打包成一个sudog结构体,并加入到channel的发送队列sendq里 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg)
// 调用goparkunlock将当前Goroutine设置为等待状态并解锁,进入休眠等待被唤醒 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep)
// someone woke us up. // 被唤醒之后执行清理工作并释放sudog结构体 if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) returntrue }
var t0 int64 if blockprofilerate > 0 { t0 = cputicks() }
// 加锁 lock(&c.lock) // 如果channel已关闭,并且缓冲区无元素 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) // 有等待接收的变量(即 v = <-ch中的v) if ep != nil { //根据channel元素的类型清理ep对应地址的内存,即ep接收了channel元素类型的零值 typedmemclr(c.elemtype, ep) } // 返回(true, false),即接收到值,但不是从channel中接收的有效值 returntrue, false }
// 除了以上非常规情况,还有有以下几种常见情况:
// 1、等待发送的队列sendq里存在Goroutine,那么有两种情况:当前channel无缓冲区,或者当前channel已满 if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). // 如果无缓冲区,那么直接从sender接收数据;否则,从buf队列的头部接收数据,并把sender的数据加到buf队列的尾部 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) // 接收成功 returntrue, true }
funcrecv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { // 无缓冲区 if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // 缓冲区已满 // Queue is full. Take the item at the // head of the queue. Make the sender enqueue // its item at the tail of the queue. Since the // queue is full, those are both the same slot. qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 将等待发送数据的Goroutine的状态从_Gwaiting置为 _Grunnable,等待下一次调度。 goready(gp, skip+1) }
// release all readers // 遍历recvq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) }
// release all writers (they will panic) // 遍历sendq,清除sudog的数据,取出其中处于_Gwaiting状态的Goroutine加入到glist中 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock. 将glist中所有Goroutine的状态置为_Grunnable,等待调度器进行调度 for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }