本篇目的是对go中的channel做一个总结。 主要参考https://www.jianshu.com/p/76acce09da09
主要用于goroutine之间通信
在总结之前只记得有这么几条:
channel不能重复关闭, 否则会panic已经关闭的channel, 再读取会到零值…关于close函数可阅读我之前的文章:golang中的close函数
总的来说, 关于channel有三种操作: 读(<-ch), 写(ch <- value), 关(close(ch)), 操作可能出现的情况总结如下:
channel类型 \ 操作读(<-ch)写(ch <- value)关(close(ch))nil(未make)阻塞阻塞panic: close of nil channel正常(已make且未close)成功或阻塞成功或阻塞成功已关闭读到零值panicpanic所以对于nil channel在select…case中的情况, 相应的case分支会一直进不去, 如下面的ch:
func nil_channel_in_select() { var ch chan int ch2 := make(chan int) rand.NewSource(time.Now().Unix()) go func() { for { ch2 <- rand.Int() time.Sleep(time.Second) } }() for { // 对于ch的读写的case都不会进 select { case <-ch: fmt.Println("read from nil channel") case ch <- 1: fmt.Println("write to nil channel") case v := <-ch2: // 如果没这个case, 也会死锁 // fatal error: all goroutines are asleep - deadlock! fmt.Println("read value from ch2:", v) } } }使用for-range读取channel,当channel关闭时,for循环会自动退出,无需主动监测channel是否关闭,可以防止读取已经关闭的channel,造成读到数据为通道所存储的数据类型的零值。如:
func read_channel_by_for_range() { ch := make(chan int) go func() { for i := 0; i < 5; i++ { ch <- i * i } close(ch) // 应该在发送者处适时关闭channel }() for x := range ch { fmt.Println(x) } }读已关闭的channel会得到零值,如果不确定channel是否已经关闭,需要使用ok进行检测。ok的结果和含义:
true:读到数据,并且通道没有关闭。false:通道关闭,没数据读到。如:
func judge_channel_closed_by_ok() { ch := make(chan int) go func() { for i := 0; i < 5; i++ { ch <- i * i } close(ch) // 应该在发送者处适时关闭channel }() for { x, ok := <-ch if !ok { // 检测到 channel已经关闭时, 退出循环 fmt.Println("channel closed") break } fmt.Println(x, ok) } }select可以同时监控多个通道的情况,只处理未阻塞的case(如果有不止一个case ready, 则随机选择一个。当通道为nil时,对应的case永远为阻塞,无论读写。如:
func select_case() { ch1 := make(chan int) ch2 := make(chan int) timer := time.NewTicker(time.Second) ctx, cancel := context.WithCancel(context.Background()) go func() { for i := 0; i < 5; i++ { ch1 <- i * i time.Sleep(time.Second) } close(ch1) }() go func() { for i := 0; i < 5; i++ { ch2 <- i + i time.Sleep(time.Second * 2) } close(ch2) }() go func() { // 5秒后cancel time.Sleep(time.Second * 5) cancel() }() for { select { case _, ok := <-ch1: if !ok { fmt.Println("ch1 closed") } else { fmt.Println("read from ch1, now is:", time.Now()) } case _, ok := <-ch2: if !ok { fmt.Println("ch2 closed") } else { fmt.Println("read from ch2, now is:", time.Now()) } case <-ctx.Done(): // 发现cancel, 做一些善后工作, 然后退出 fmt.Println("ctx.Done()") return case <-timer.C: fmt.Println("read from timer, now is:", time.Now()) } } }channel是可以定义为只读/只写的, 如:
// 定义只读/只写channel // read_only := make(<-chan int) // read_only <- 1 // Invalid operation: read_only <- 1 (send to receive-only type <-chan int) // write_only := make(chan<- int) // <-write_only // Invalid operation: <-write_only (receive from send-only type chan<- int)但是这样没啥实际意义, 数据只能一边进, 别人怎么用啊?
一般还是用于函数参数及返回值等,这样可以使得参数或者返回址的意义更明确, 如:
signal.Notify(c chan<- os.Signal, sig ...os.Signal)是一个例子。
不太恰当的例子:
// 往给定的ch中写入n以内的奇数 func writeonly_channel(ch chan<- int, n int) { go func() { defer close(ch) for i := 1; i <= n; i++ { if i&1 == 1 { // n以内的所有奇数 ch <- i } } }() } // 使用 ch := make(chan int) writeonly_channel(ch, 10) for odd := range ch { fmt.Println(odd) }channel是可以带缓冲的, 缓冲嘛, 一定程度上可以提高并发度。 写一个, 还能继续写,不用等读出去了再写。如:
// 生成n个数到channel中, 看看不同buffer_size下的区别 func gen_num(n int, buffer_size int) <-chan int { ch := make(chan int, buffer_size) go func() { defer close(ch) for i := 1; i <= n; i++ { ch <- i } }() return ch } func calc_time_for_gen_num() { for i := 0; i <= runtime.NumCPU(); i++ { start := time.Now() for range gen_num(1e7, i) { } fmt.Printf("buffer_size: %d, used time(ns): %d\n", i, time.Now().Sub(start).Nanoseconds()) } }最后跑出来的结果:
buffer_size: 0, used time(ns): 2126284502 buffer_size: 1, used time(ns): 1706649198 buffer_size: 2, used time(ns): 1302575739 buffer_size: 3, used time(ns): 1087461655 buffer_size: 4, used time(ns): 1248243529 buffer_size: 5, used time(ns): 1175520107 buffer_size: 6, used time(ns): 1022376191 buffer_size: 7, used time(ns): 878169585 buffer_size: 8, used time(ns): 789670245从结果可以看到, 缓冲越大, 在上面代码的情况下用时越少。
其实context包中的cancel方法就是使用的close方法来使得调用方的<-ctx.Done()可以调用, 进而知道上层已经cancel(). 看代码:
func elegantly_exit() { ctx, cancel := context.WithCancel(context.Background()) // 用来通知上游, 表示业务已经处理完了 // 一般是一些善后工作,如: 将缓冲的消息尽快发出去 closed := make(chan struct{}) // 一些业务代码 go func(ctx context.Context) { for { select { case x := <-ctx.Done(): fmt.Println("ctx.Done():", x) close(closed) return default: } fmt.Println("handle business") time.Sleep(time.Second) } }(ctx) exitCh := make(chan os.Signal) signal.Notify(exitCh, syscall.SIGINT, syscall.SIGTERM) // 收到SITINT, SIGTERM信号后, 系统会将相应的信号写到exitCh中 // 否则会一直卡在这 sig := <-exitCh fmt.Println("received signal:", sig) // 通知下游goroutine cancel() // 表示下游goroutine已经做完善后工作了 <-closed }(完)
