第十章 Channel--第四天 完结
channel用于goroutine之间的通讯. 其内部实现了同步,确保并发安全,多个goroutine同时访问,不需要加锁. 对channel操作行为做如下总结: 1. ch <- : 写入channel 2. ch -> :读出channel 3. clouse : 关闭channel ? golang 中大部分类型都是值类型,只有 slice / channel / map 是引用类型 ? 一. channel的语法1. channel的定义方法var c chan int //定义了一个int类型的chan 此时c=nil 定义一个有初始值的channel. c := make(chan int)
2. channel是goroutine之间的通讯. 所有有一个goroutine发数据,就要有一个goroutine收数据package main import ( "fmt" time" ) func main() { c := make(chan int) go func() { for { 有一个goroutine发数据,必须有一个goroutine收数据,不然就不能放进去了,会报异常deadlock fatal error: all goroutines are asleep - deadlock!
? 3. 函数是一等公民,channel也是,channel也能作为参数,也能作为返回值a. channel作为参数传入 ) chan也可以作为参数传入. 也可以作为返回值 func getChan(c chan int) { { n := <- c fmt.Println(n) } } func chanDemo () { c := make(chan ) go getChan(c) c <- 1 c <- 2 b. channel用作数组 chan也可以作为参数传入. 也可以作为返回值 func worker(i int,c chan ) { c fmt.Printf(number: %d,worker,%c n,i,n) } } func chanDemo () { 开了10个协程,让10个协程分别取各自的数据 var channel [10]chan int for i := 0; i<10; i++ { channel[i] = make(chan ) go func(c chan ) { worker(00; i < { channel[i] <- 'a' + i } A time.Sleep(time.Second) } func main() { chanDemo() } 分配一个管道数组,里面有10个管道. 向每个管道里放两个数据. 打印.? ?c. channel作为返回值 chan也可以作为参数传入. 也可以作为返回值 func createWorker(i int) chan int { c := make(chan ) go func() { { n := <- c fmt.Printf(return c } func chanDemo () { var channel [10]chan int { channel[i] = createWorker(i) } time.Sleep(time.Second) } func main() { chanDemo() } ? 4 给channel定义方向.channel可以收数据也可以发数据. 那么我们返回的channel到底是收数据的还是发数据的呢? 我们可以告诉调用者.? chan<- int) go func() { // goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的 return c } func chanDemo () { var channel [10]chan<- int { channel[i] = createWorker(i) } time.Sleep(time.Second) } func main() { chanDemo() } ? 5. bufferedChannel 带有缓冲区的channel我们知道如果创建了一个channel,往里面放了数据,但是没有人接收,那么就会deadlock死锁. 也就是必须要有人立刻能够接收走. 这就是要求,要求立刻被收走. 如果没有,就报错. 我们可以给他一个缓冲,让他在几个范围内可以不被立刻收走,给收数据方一个缓冲的时间 func bufferedChannel() { c := make(chan int,3) c <- 23 c <- 4 } func main() { bufferedChannel() 上面的demo,定义了带有三个缓冲的channel. 里面3个以内数据不会报deadlock. 超过三个还没有被取走,才会包deadlock ) func worker(i int,c chan int) { for { n := <- c fmt.Printf("number: %d,%d n",n) } } func bufferedChannel() { c := make(chan 3) go worker(0 time.Sleep(time.Second) } func main() { bufferedChannel() } 6. channel Close()
) func worker(i goroutine里,channel是发数据的. 那么我们定义返回的channel只能是收数据的 { n,ok := <- c if !ok { // 如果没有数据了,就返回 break } fmt.Printf( channel发数据方,数据发完了,是可以close的 func channelClose() { c := make(chan ) go worker('bcd' 调用close 告诉接收方,数据已经发完了. close(c) time.Sleep(time.Second) } func main() { bufferedChannel() channelClose() } 另一种判断是否有数据的方法,使用range来判断. close(c) time.Sleep(time.Second) } func main() { channelClose() } ? ? 二. Channel的应用: 不要通过共享内存来通信,通过通信来共享内存通常,比如java是通过共享内存来通信. 比如定义一个公共的flag,这个flag就是共享的一块内存空间. 他的值变了,通知调用者. 这就是通过共享内存来通信.? ? 使用共享内存的话在多线程的场景下为了处理竞态,需要加锁,使用起来比较麻烦。另外使用过多的锁,容易使得程序的代码逻辑坚涩难懂,并且容易使程序死锁,死锁了以后排查问题相当困难,特别是很多锁同时存在的时候。 go语言的channel保证同一个时间只有一个goroutine能够访问里面的数据,为开发者提供了一种优雅简单的工具,所以go原生的做法就是使用channle来通信,而不是使用共享内存来通信。 ? 下面来感受一下go的通过通信来共享内存.? 三.? 使用channel等待任务结束我们在上面的demo中,都会有一句话 time.Sleep(time.Second) 让主程序休眠1秒钟.? 否则,协程还没执行完,主程序就退出了 再来分析,等待的目的是什么呢? 那就是等协程都执行完了,? 主程序再退出. 换个思路,我不用time.sleep,能不能协程执行完了,主动告诉主线程,我执行完了,退出吧呢? 可以的.? 1. 使用通信来共享内存,用channel实现.go的原则: 不要通过共享内存来通信,通信来共享内存. ? go中通信用什么呢? 使用channel 定义一个bool类型的chan,用来和外部通信. 事情做完了,给外面回复一个done func work(i for n := range w.c { fmt.Printf( w.done <- true } } type worker struct { c chan int // 传递字母的管道 done chan bool // 标记字母传递完成的管道 } w = worker{ c : make(chan int),done : make(chan bool),} go work(i,w) w } func chanDemo () { var wo [10]worker 分析: 1. 定义了一个worker类型,里面有两个管道. 第一个管道是用来传输字母的. 第二个管道用来标记传送行为是否完成2. 接下来看chanDemo,chanDemo是一个主goroutine. 在这里:第一步: 创建了10个工作者.?第二步: 开了10个goroutine 用来传输字母. 开goroutine调用createWorker,然后worker的工作是work第三步: for循环为每一个goroutine添加字母. 然后等待.......直到对应的done channel完成,取出结果. 继续往下执行.{ wo[i].c <- i 这里有一个通道,一直等着. 等着取数据. <- wo[i].done } 放数据:?wo[i].c <- 'a' + i . 然后就等待......等待到什么时候呢? wo[i].done中有数据可以取出. 这就让主goroutine保证了10个goroutine都执行完以后,在继续往后执行. ?这解释了为什么goroutine可以让主goroutine等待的原因输出结果 number: 4567899,J
这样打印的结果是按照顺序执行的. 一个goroutine执行完了,才能往另一个goroutine中放数据,这样效率太低了. 我们换一种方式,让他不停的打印. 最后一起等待执行完成. go func() { w.done <- true }() } } type worker struct { c chan int 传递字母的管道 done chan bool 标记字母传递完成的管道 } worker{ c : make(chan ),done : make(chan boolvar work []worker { work[i] = createWorker(i,work[i]) } for i,w := range work { w.c <- i } for _,w := range work { 放了两次,所以要等待两次都处理完,在执行后面的结果 <-w.done <-w.done } chanDemo() } 这里有两个变化, 1. 我在取结果done的时候,没有在线等执行完. 而是,你们去执行吧,我最后来收结果,收的顺序也是从1-9的顺序收的. 每个goroutine要有两个结果. 2. 在work具体执行完成的地方,要定义一个协程. 这样程序才能正常运行,否则会报deadline异常. 为什么会报异常呢? 因为,有两次放数据,第一次放完了,往管道done里写了一个数据,结果没有被收走,又放了一次..... 所以就发生死锁了. ? 新开一个goroutine,让他并行的发done. 就可以了 疑问:?为什么定义成goroutine,他就不会deadline了呢? goroutine还有什么其他的含义? 比如下面这段程序 func main() { c := make(chan ) c <- } 这么写汇报deadline,因为管道只有发送方,没有接收方. 要求必须既有发送方又有接收方 但是这么写就没问题: func main() { c := make(chan ) go func() { c <- c <- }() } 为啥呢? 经过一番研究过终于明白了,看下面这段程序 func main() { bufferedChannel() channelClose() c := make(chan ) c <- 0 go func() { log.Info("11") c <- 1 log.Info("22") c <- 2 log.Info("33") c <- 3 log.Info("44") c <- 4 }() log.Info("1,",<- c) log.Info("2,<- c ) log.Info("3,1)">log.Info("4,<- c ) time.Sleep(time.Second) } 你运行执行一下,看看结果,只打印出了11 [2020/02/21 07:09:58] channelDemo.go:83 [Info] 11 Process finished with exit code 0 原来goroutine中的代码一直在等待,知道有人要收数据了,他才会发 c <- 0 go func() { log.Info(11) c <- log.Info(223344 }() log.Info("1,<- c) time.Sleep(time.Second) } 这时的打印结果是 [12:08] channelDemo.go: [85 [Info] 2293 [Info] 1,1)">1 看到了吧,有人收,goroutine才会发,收几个,发几个. 其他的保留待发. ? 2. 使用WaitGroup等待任务的结束 sync. WaitGroup()是系统自带的一个等待任务全部完成的工具
我们第一步: 定义一个WaitGroup 开启了一个等待任务,我们通过waitGroup来等待任务的完成 var wg sync.WaitGroup 第二步: 添加20个任务,因为我们知道有20个任务就直接添加20就好了,如果不知道,可以在for循环里一个个添加 wg.Add(20)
第三步: 等待20个任务全部结束 wg.Wait() 第四步: 完成了一个任务,就标记他为done sync w.wg.Done() }() } } type worker int 传递字母的管道 wg *sync.WaitGroup // 标使用waitgroup来标记同步完成 sync.WaitGroup) worker { w :=]worker var wg sync.WaitGroup work[i] = createWorker(i,&wg) } wg.Add(20 i } wg.Wait() chanDemo() } ? 3. 在go里面函数是一等公民,其实等待任务结束的过程中,或者结束时要做很多事情. 我们要是定义成某一个参数,那就只能接收这个参数了,其他都参数不行.? 如何能够达到扩展的目的呢?定义成函数 w.wg() } } type worker wg func() func() { fmt.Println("事情1") wg.Done() fmt.Println("事情2") }, } go work(i,我们通过waitGroup来等待任务的完成 var wg sync.WaitGroup { work[i] = createWorker(i,&wg) } wg.Add( i } wg.Wait() chanDemo() } 四. 使用channel进行树的遍历这里使用channel进行树的遍历,是对channel的一个应用. 之前我们对树遍历后处理使用的是函数.我们也可以用channel 比如: 查找树中最大的值 第一步: 循环遍历获取树,然后将所有树节点放入到channel中. 返回一个管道 第二步: 从管道中取出树的节点,进行计算 第三步: 在第一步中,把所有节点都添加到管道中以后,一定要close 定一个管道,循环遍历,把遍历后的节点添加到管道中 func (n *TreeNode) TraveresForChannel() chan *TreeNode { out := make(chan *TreeNode) go func() { n.TraveresFunc(func(node *TreeNode) { out <- node }) close(out) }() return } 从管道中取出所有节点,取最大值 max := 0 for c := range root.TraveresForChannel() { if c.Value > max { max = c.Value } } fmt.Println(max) 这个逻辑比较清晰.? 五. 用select进行调度?1.?首先,我们来从官方文档看一下有关select的描述: A select" statement chooses which of a set of possible send or receive operations will proceed. 或者换一种说法,select就是用来监听和channel有关的IO操作,当 IO 操作发生时,触发相应的动作。 比如: 现在有两个channel? A 和B,我要从A 和 B中取值,谁先来,就取出谁. 怎么做呢?? package main import func main() { var A,B chan int select { case n := <- A: fmt.Println(select from A: case n := <- B: fmt.Println(select from A:default: fmt.Println(select from default) } } 这里定义了A和B两个channel,两个channel都是nil. 下面通过select来选择执行,会走一个默认的default. 输出结果: select from default 这里A和B都不会有输出,所以,就走默认的default,这是非阻塞的方式输出内容. channel是阻塞的,如果实现非阻塞的呢? 那就是使用select.....default实现 如果没有default又不断的从A和B中取值,就会deadlock B: fmt.Println( |