Channel #
Don’t communicate by sharing memory; share memory by communicating.
不要通过共享内存来通信,通过通信来共享内存。
这是 Go 语言最重要的编程理念。goroutine 通过 channel 向另一个 goroutine 发送消息,channel 和 goroutine 结合,可以实现用通信代替共享内存的 CSP (Communicating Sequential Process)模型。
使用 #
创建 channel:
// 无缓冲 channel
ch := make(chan int)
// 带缓冲 channel,缓冲区为 3
ch = make(chan int, 3)
channel 的零值是
nil
。
无缓冲 channel #
无缓冲 channel 也叫做同步 channel:
- 一个 goroutine 基于一个无缓冲 channel 发送数据,那么就会阻塞,直到另一个 goroutine 在相同的 channel 上执行接收操作。
- 一个 goroutine 基于一个无缓冲 channel 先执行了接收操作,也会阻塞,直到另一个 goroutine 在相同的 channel 上执行发送操作
带缓冲 channel #
带缓冲的 channel 有一个缓冲区:
- 若缓冲区未满则不会阻塞,发送者可以不断的发送数据。当缓冲区满了后,发送者就会阻塞。
- 当缓冲区为空时,接受者就会阻塞,直至有新的数据
关闭 channel #
使用 close
函数关闭 channel:
- channel 关闭后不能再发送数据
- channel 关闭后可以接收已经发送成功的数据。
- channel 关闭后如果 channel 中没有数据,那么接收者会收到一个 channel 元素的零值。
close
表示这个 channel 不会再继续发送数据,所以要在发送者所在的 goroutine 去关闭 channel。
关闭一个
nil
的 channel 会导致 panic。
重复关闭 channel 会导致 panic。
向已关闭的 channel 发送值会导致 panic。
单向 channel #
当一个 channel 作为一个函数参数时,它一般总是被专门用于只发送或者只接收。
chan<- int
表示一个只发送int
的 channel。<-chan int
表示一个只接收int
的 channel。
cap 和 len #
cap
函数可以获取 channel 内部缓冲区的容量。len
函数可以获取 channel 内部缓冲区有效元素的个数。
使用 range 遍历 channel #
使用 range
循环可以遍历 channel,它依次从 channel 中接收数据,当 channel 被关闭并且没有值可接收时跳出循环:
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
// 关闭 channel
// 如果不关闭 channel,range 就会阻塞当前 goroutine, 直到 channel 关闭
close(ch)
for v := range ch {
fmt.Println(v)
}
使用 channel 实现互斥锁 #
我们可以使用容量只有 1
的 channel 来保证最多只有一个 goroutine 在同一时刻访问一个共享变量:
var (
sema = make(chan struct{}, 1) // a binary semaphore guarding balance
balance int
)
func Deposit(amount int) {
sema <- struct{}{} // acquire lock
balance = balance + amount
<-sema // release lock
}
func Balance() int {
sema <- struct{}{} // acquire lock
b := balance
<-sema // release lock
// return b
}
原理 #
channel 本质上就是一个有锁的环形队列,channel 的结构体 hchan
:
// src/runtime/chan.go
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
qcount
:channel 中的元素个数dataqsiz
:channel 中的循环队列的长度buf
:channel 的缓冲区数据指针,指向底层的循环数组,只针对有缓冲的 channel。elemsize
:channel 中元素大小elemtype
:channel 中元素类型closed
:channel 是否被关闭的标志位sendx
:表示当前可以发送的元素在底层循环数组中位置索引recvx
:表示当前可以发送的元素在底层循环数组中位置索引sendq
:向 channel 发送数据而被阻塞的 goroutine 队列recvq
:读取 channel 的数据而被阻塞的 goroutine 队列lock
:保护hchan
中所有字段
waitq
是一个双向链表,链表中所有的元素都是 sudog
:
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
// 指向当前的 goroutine
g *g
// 指向下一个 goroutine
next *sudog
// 指向上一个 goroutine
prev *sudog
// 指向元素数据
elem unsafe.Pointer
// ...
}
创建 channel #
创建 channel 要使用 make
,编译器会将 make
转换成 makechan
或者 makechan64
函数:
// src/runtime/chan.go#L72
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// compiler checks this but be safe.
// ...
var c *hchan
switch {
case mem == 0:
// 无缓冲 channel
// 调用 mallocgc 方法分配一段连续的内存空间
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
// channel 存储的元素类型不是指针
// 分配一块连续的内存给 hchan 和底层数组
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 默认情况下,进行两次内存分配操作,分别为 hchan 和缓冲区分配内存
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 设置元素大小,元素类型,循环数组的长度
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
// ...
return c
}
使用 mallocgc 函数创建 channel,就意味着 channel 都是分配在堆上的。所以当一个 channel 没有被任何 goroutine 引用时,是会被 GC 回收的。
向 channel 发送数据 #
发送操作,也就是 ch <- i
语句,编译器最终会将该语句转换成 chansend
函数:
// src/runtime/chan.go
// block 为 true 时,表示当前操作是阻塞的
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 不可以阻塞,直接返回 false,表示未发送成功
if !block {
return false
}
// 挂起当前 goroutine
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
// ...
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 执行发送数据的逻辑之前,先为当前 channel 加锁,防止多个线程并发修改数据
lock(&c.lock)
// 如果 channel 已经关闭,那么向该 channel 发送数据会导致 panic:send on closed channel
if c.closed != 0 {
// 解锁
unlock(&c.lock)
// panic
panic(plainError("send on closed channel"))
}
// 当前接收队列里存在 goroutine,通过 runtime.send 直接将数据发送给阻塞的接收者
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 走到这里,说明没有等待数据的接收者
// 对于有缓冲的 channel,并且还有缓冲空间
if c.qcount < c.dataqsiz {
// 计算出下一个可以存储数据的位置
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
// 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器
typedmemmove(c.elemtype, qp, ep)
// sendx 索引 +1
c.sendx++
// 由于 buf 是一个循环数组,所以当 sendx 等于 dataqsiz 时会重新回到数组开始的位置。
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
// 释放锁
unlock(&c.lock)
return true
}
// 走到这里,说明缓冲空间已满,或者是无缓冲 channel
// 如果不可以阻塞,直接返回 false,表示未发送成功
if !block {
unlock(&c.lock)
return false
}
// 缓冲空间已满或者是无缓冲 channel,发送方会被阻塞
// 获取当前发送数据的 goroutine 的指针
gp := getg()
// 构造一个 sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 设置这一次阻塞发送的相关信息
mysg.elem = ep // 待发送数据的内存地址
mysg.waitlink = nil
mysg.g = gp // 当前发送数据的 goroutine 的指针
mysg.isSelect = false // 是否在 select 中
mysg.c = c // 发送的 channel
gp.waiting = mysg
gp.param = nil
// 将 sudog 放入到发送等待队列
c.sendq.enqueue(mysg)
// 挂起当前 goroutine,等待唤醒
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
// goroutine 开始被唤醒了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 移除 mysg 上绑定的 channel
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
// 被唤醒了,但是 channel 已经关闭了,panic
panic(plainError("send on closed channel"))
}
// 返回 true 表示已经成功向 channel 发送了数据
return true
}
send
发送数据:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// ...
// sg 是接收者的 sudog 结构
// sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
if sg.elem != nil {
// 直接拷贝内存到 val <- ch 表达式中变量 val 所在的内存地址(&val)上
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 获取 sudog 上绑定的等待接收的 goroutine 的指针
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
// 唤醒等待接收的 goroutine
goready(gp, skip+1)
}
goready
是将 goroutine 的状态改成runnable
,然后需要等待调度器的调度。
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src 是当前 goroutine 发送的数据的内存地址
// dst 是接收者的
dst := sg.elem
// 写屏障
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
// 拷贝内存数据
memmove(dst, src, t.size)
}
从 channel 接收数据 #
Go 中可以使用两种不同的方式去接收 channel 中的数据:
i <- ch
i, ok <- ch
编译器的处理后分别会转换成 chanrecv1
,chanrecv2
:
// src/runtime/chan.go
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
两个方法最终还是调用了 chanrecv
函数:
// src/runtime/chan.go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// ...
// channel 是 nil
if c == nil {
// 不可以阻塞,直接返回
if !block {
return
}
// 挂起当前 goroutine
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 执行接收数据的逻辑之前,先为当前 channel 加锁
lock(&c.lock)
// channel 已关闭
if c.closed != 0 {
// 底层的循环数组 buf 中没有元素
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
// 释放锁
unlock(&c.lock)
if ep != nil {
// typedmemclr 根据类型清理相应地址的内存
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
// channel 未关闭,并且等待发送队列里存在 goroutine
// 发送的 goroutine 被阻塞,那有两种情况:
// 1. 这是一个非缓冲型的 channel
// 2. 缓冲型的 channel,但是 buf 满了
// recv 直接进行内存拷贝
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// channel 未关闭
// 缓冲型 channel 并且 buf 里有元素,可以正常接收
if c.qcount > 0 {
// 直接从循环数组里取出要接收的元素
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
// 这里表示,代码中没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {
// 拷贝数据
typedmemmove(c.elemtype, ep, qp)
}
// 清理掉循环数组里相应位置的值
typedmemclr(c.elemtype, qp)
// recvx 索引 +1
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 元素个数 -1
c.qcount--
unlock(&c.lock)
return true, true
}
// 非阻塞接收,释放锁
// selected 返回 false,因为没有接收到值
if !block {
unlock(&c.lock)
return false, false
}
// 走到这里说明 buf 是空的
// 没有数据可接收,阻塞当前接收的 goroutine
// 获取当前接收的 goroutine
gp := getg()
// 构造一个 sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 设置这一次阻塞接收的相关信息
mysg.elem = ep // 待接收数据的地址
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp // 当前接收的 goroutine 指针
mysg.isSelect = false // 是否在 select 中
mysg.c = c // 接收的 channel
gp.param = nil
// 将 sudog 放入到接收等待队列
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
// 挂起当前接收 goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// 被唤醒了
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
recv
接收数据:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 无缓冲的 channel
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
// 这里表示,代码中没有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
if ep != nil {
// 直接拷贝数据
recvDirect(c.elemtype, sg, ep)
}
} else {
// 缓冲型的 channel,但是 buf 已满
// 将底层的循环数组 buf 队首的元素拷贝到接收数据的地址
// 将发送者的数据放入 buf
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 将发送者数据拷贝到 buf
typedmemmove(c.elemtype, qp, sg.elem)
// 增加 recvx 索引
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
// 释放锁
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒发送的 goroutine
goready(gp, skip+1)
}
关闭 channel #
close
关闭 channel 会被编译器转换成 closechan
函数:
// src/runtime/chan.go#L357
func closechan(c *hchan) {
// 关闭一个 nil 的 channel,panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 先加锁
lock(&c.lock)
// 重复关闭,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
// ...
// 设置 channel 关闭的标志位
c.closed = 1
var glist gList
// 将 channel 等待接收队列的里 sudog 释放
for {
// 从接收队列里取出一个 sudog
sg := c.recvq.dequeue()
// 接收队列空了,跳出循环
if sg == nil {
break
}
//
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 获取接收 goroutine 的指针
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 放入链表
glist.push(gp)
}
// 将 channel 等待发送队列的里 sudog 释放
// 如果存在,这些 goroutine 将会 panic
// 可以查看 chansend 函数中的逻辑:
// 对于发送者,如果被唤醒后 channel 已关闭,则会 panic
for {
// 从发送队列里取出一个 sudog
sg := c.sendq.dequeue()
// 发送队列空了,跳出循环
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 获取发送 goroutine 的指针
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 放入链表
glist.push(gp)
}
// 释放锁
unlock(&c.lock)
// 遍历链表,唤醒所有 goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
recvq
和 sendq
中的所有 goroutine 被唤醒后,会分别去执行 chanrecv
和 chansend
中 gopark
后面的代码。