博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Go channel实现源码分析
阅读量:6181 次
发布时间:2019-06-21

本文共 4708 字,大约阅读时间需要 15 分钟。

go通道基于go的并发调度实现,本身并不复杂,go并发调度请看我的这篇文章:

1.channel数据结构
type hchan struct {   qcount   uint               // 缓冲区中已有元素个数   dataqsiz uint               //循环队列容量大小   buf      unsafe.Pointer     // 缓冲区指针   elemsize uint16             //元素大小   closed   uint32             //关闭标记,0没关闭,1关闭   elemtype *_type             //数据项类型   sendx    uint               //发送索引   recvx    uint               //接收索引   recvq    waitq              //等待接收排队链表   sendq    waitq              //等待发送排队链表   lock mutex                  //锁}type waitq struct {   first *sudog   last  *sudog}
 
2.创建channel实现
创建channel实例:
ch := make(chan int, 4)
实现函数:
func makechan(t *chantype, size int64) *hchan
大致实现:
执行上面这行代码会new一个hchan结构,同时创建一个dataqsiz=4的int类型的循环队列,其实就是一个容纳4个元素的数组,就是按顺序往里面写数据,写满之后又从0开始写,这个顺序索引就是hchan.sendx
 
3.发送数据
发送数据实例:
ch <- 100
发送数据实现函数:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
ep指向要发送数据的首地址
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {   lock(&c.lock)   if c.closed != 0 {      unlock(&c.lock)      panic(plainError("send on closed channel"))   }    if sg := c.recvq.dequeue(); sg != nil {      //缓冲区就是一个固定长度的循环列表      //发送队列是一个双向链表,接在缓冲区的后面,整体是一个队列,保证先进先出      //有接收者,并不是将当前要发送的数据直接发出,而是将缓冲区的第一个元素发送给接收者,同时将发送队列的第一个元素加入缓冲区刚出队列的位置      send(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true   }    if c.qcount < c.dataqsiz {      //缓冲区没有满,直接将要发送的数据复制到缓冲区,直接返回,      qp := chanbuf(c, c.sendx)      typedmemmove(c.elemtype, qp, ep)      c.sendx++      if c.sendx == c.dataqsiz {         c.sendx = 0      }      c.qcount++      unlock(&c.lock)      return true   }    if !block {      unlock(&c.lock)      return false   }   //以上都是同步非阻塞的,ch <- 100直接返回      //以下是同步阻塞   //缓冲区满了,也没有接收者,通道将被阻塞,其实就是不执行当前G了,将状态改成等待状态   gp := getg()   mysg := acquireSudog()   c.sendq.enqueue(mysg)   goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)    //当G被唤醒,状态改成可执行状态,从这里开始继续执行   releaseSudog(mysg)   return true}
大致实现:
1:接收队列不为空,从接收队列中取出第一个接收者*sudog,将数据复制到sudog.elem,复制函数为memmove用汇编实现,通知接收方数据给你了,将接收方协程由等待状态改成可运行状态,将当前协程加入协程队列,等待被调度。
2:没有接收者,有缓冲区且没有满,直接将数据复制到缓冲中,写入缓冲区的位置为hchan.buf[sendx++],如果缓冲区已满sendx=0,就是循环队列的实现,往sendx指定的位置写数据,hchan.qcount++
3:没有接收者,没有缓冲区或是满了,则从当前协程对应的P的sudog队列中取一个struct sudog,将数据复制到sudog.elem,将sudog加入sendq队列中,通知接收方,当前流程阻塞,等待被唤醒,接收方收到通知后(被唤醒),继续往下执行,接收数据完成后会通知发送方,即将发送方协程状态由等待状态改成可运行状态,加入协程可运行队列,等着被执行不会阻塞的情况:
1:通道缓冲区没有满之前,因为只是将要发送的数据复制到缓冲区就返回了
2:有接收者的情况,有数据复制到接收方的数据结构中(不是最终接收数据的变量,在执行接收函数的时候会拷贝到最终接收数据的变量),唤醒接收协程会阻塞的情况:自然就是缓冲区满了,也没有接收方,这个时候会将数据打包放到发送队列,当前协程被设置成等待状态,这个状态不会被调度,当有接收方收到数据后,才会被唤醒
 
4.接收数据
接收数据实例:
val := <- ch
接收数据实现函数:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {   lock(&c.lock)   if sg := c.sendq.dequeue(); sg != nil {      // Found a waiting sender. If buffer is size 0, receive value      // directly from sender. Otherwise, receive from head of queue      // and add sender's value to the tail of the queue (both map to      // the same buffer slot because the queue is full).      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)      return true, true   }    if c.qcount > 0 {      // Receive directly from queue      qp := chanbuf(c, c.recvx)      if ep != nil {         typedmemmove(c.elemtype, ep, qp)      }      typedmemclr(c.elemtype, qp)      c.recvx++      if c.recvx == c.dataqsiz {         c.recvx = 0      }      c.qcount--      unlock(&c.lock)      return true, true   }    if !block {      unlock(&c.lock)      return false, false   }   //以上同步非阻塞    //以下同步阻塞   gp := getg()   mysg := acquireSudog()   c.recvq.enqueue(mysg)   //将当前G状态改成等待状态,停止调度   goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)    //当前G被唤醒从这里继续执行    mysg.c = nil   releaseSudog(mysg)   return true, !closed}
大致实现:
1.发送队列不为空(说明缓冲区已满),从发送队列中取出第一个发送者*sudog
1.1.没有缓冲区,直接将发送队列中的数据sudog.elem复制出来,存到接收数据的变量val中,通知发送方我处理完了,你可以继续执行
1.2.有缓冲区,复制出缓冲区hchan.buf[recvx]对应的元素到val,在将发送方sudog.elem复制到hchan.buf[recvx],发送方按顺序写,接收方按顺序读,典型的FIFO,为了保证是先进先出,所以先复制出,再将队列首元素复制到对应的缓冲区中,其实就是发送队列连接在缓冲区后面,缓冲区满了,就写队列,接收的时候先从缓冲区中拿数据,拿掉之后空出来的位置从发送队列中取第一个填满,并唤醒对应的G,只要发送队列不为空,缓冲区肯定会被填满
2.发送队列为空,缓冲区不为空,复制出缓冲区hchan.buf[recvx]对应的元素到val,hchan.qcount--
3.发送队列为空,缓冲区也为空,那就是没有任何待接收的数据,接收流程就只能等了,将接收信息打包成sudog,加入接收队列recvq,当前执行流程阻塞,等有发送数据后会被唤醒继续
 
5.channel FIFO在解释一次
5.1:缓冲区没满,发送数据就是进缓冲队列,接收数据就是出缓冲队列,比较好理解
5.2:缓冲区已满,发送数据就是进等待队列,接收数据先出缓冲队列,即为要接收的数据,等待队列出列,将数据存在缓冲队列刚出列的位置,刚出列的位置相当于缓冲队列的末尾,也就是说等待队列的列头连在缓冲队列的末尾,将等待队列的列头加入缓存队列的列尾,保证了缓冲队列是满的,减少的是缓冲队列中的数据,保证先进先出
5.3:接收数据,缓冲队列或等待队列有数据,拿走第一个,保证等待队列是接在缓冲区末尾,即缓冲区末尾有空缺,就让等待队列出列,并填充至缓冲区末尾,否则将自己打包加入接收队列,当前G进入等待状态,有数据发送自然会通知你
 
总结:Go channel基于go的并发调度实现阻塞和非阻塞两种通讯方式
 

转载于:https://www.cnblogs.com/hlxs/p/10275303.html

你可能感兴趣的文章
用EJB进行事务管理
查看>>
Linux Shell脚本系列之一
查看>>
数据可视化,个人经验总结(Echarts相关)
查看>>
Mysql MAC installation
查看>>
一款基于Vue和Go的桌面端管理star项目应用
查看>>
使用shell创建一个简单的菜单bash select用法
查看>>
Nuxt之默认模版和默认布局
查看>>
Vue模板、JS、CSS分离实现
查看>>
Hexo -- 快速、简洁且高效的博客框架 入门
查看>>
JVM
查看>>
高并发面试总结
查看>>
Pycharm--Python文件开头自动添加utf-8编码
查看>>
Leetcode PHP题解--D60 824. Goat Latin
查看>>
2019年一线大厂春招:Spring面试题和答案合集(上篇)
查看>>
尚未弄懂的JS系列(未完待续)
查看>>
浅析Java NIO
查看>>
企业级 SpringBoot 教程 (一)构建第一个SpringBoot工程
查看>>
学习云计算技术前景在哪里?云计算技术发展趋势
查看>>
干货|比特币如何产生与交易
查看>>
前端处理后端接口传递过来的图片文件
查看>>