Netpoll of Network Program for Golang
深入剖析 Golang 网络编程之 Netpoll,主要涉及 Linux 环境下的 Epoll 初始化、 Go 网络编程基本流程(Listen、Accept、Read、Write)以及netpoll 执行流程
本文所涉及的源码版本:v1.22.3
I. 基础概念
网络编程,是允许不同计算机上的程序通过网络通信的开发过程,涉及多种协议(HTTP、TCP/IP等)以及不同编程语言的应用。
同步、异步、并发模型
IO 模型 | 读写操作和阻塞阶段 |
---|---|
阻塞 IO | 程序阻塞于读写函数 |
IO 复用 | 程序阻塞于 IO 复用系统调用,但可同时监听多个 IO 事件;对 IO 本身的读写操作是非阻塞的 |
SIGIO 信号 | 信号触发读写就绪事件,用户程序执行读写操作;程序本身没有阻塞阶段 |
异步 IO | 内核执行读写操作并触发读写完成事件;程序没有阻塞阶段 |
主要用于区分内核向应用程序通知的是何种 IO 事件(就绪事件 or 完成事件),以及由谁来完成 IO 读写(应用程序 or 内核)
IO模型中的同步
- 同步 IO 模型,指的是应用程序发起 IO 操作后,必须等待 IO 操作完成后才能继续执行后续的操作,即 IO 操作的结果需要立即返回给应用程序;在此期间,应用程序处于阻塞状态,无法做其他操作。
- 优点:编程模型简单
- 缺点:效率较低(应用程序的执行速度被 IO 操作所限制)
对于操作系统内核来说,同步 IO 操作是指在内核处理 IO 请求时需要等待
IO 模型中的异步
- 异步 IO 模型,指的是应用程序发起 IO 操作后,无须等待 IO 操作完成,可以立即进行后续的操作;在此期间,操作系统负责把 IO 操作的结果返回给应用程序;
- 优点:可以充分利用系统资源,提高 IO 操作的效率
- 缺点:编程模型相对复杂
对于操作系统内核来说,异步 IO 操作指的是,在内核处理 IO 请求时无需等待,立即返回
并发模式
并发模式,指的是 I/O 处理单元和多个逻辑单元之间协调完成任务的方法
Linux Epoll
-
epoll 在内核里使用红黑树(Red-black tree)来跟踪进程所有待检测的文件描述字
fd
,把需要监控的 socket 通过epoll_ctl()
函数加入内核中的红黑树里(红黑树是个高效的数据结构,增删改一般时间复杂度是O(logn)
) -
epoll 使用事件驱动的机制,在内核里维护了一个链表(List)来记录就绪事件。 当某个 socket 有事件发生时,内核通过回调函数将其加入到这个就绪事件列表中。 当用户调用
epoll_wait()
函数时,只会返回有事件发生的文件描述符的个数,不需要像 select/poll 那样轮询扫描整个 socket 集合,大大提高了检测的效率 -
两种触发模式
- Level trigger:服务器端不断地从 epoll_wait 中苏醒,直到内核缓冲区数据被 read 函数读完才结束
- Edge trigger:服务器端只会从 epoll_wait 中苏醒一次
-
事件宏
EPOLLIN
表示对应的文件描述符可读(包括对端 socket 正常关闭)EPOLLOUT
表示对应的文件描述符可写EPOLLPRI
表示对应的文件描述符有紧急的数据可读(带外数据)EPOLLERR
表示对应的文件描述符发生错误EPOLLHUP
表示对应的文件描述符被挂断EPOLLET
将 EPOLL 设为边缘触发模式(默认电平触发)EPOLLONESHOT
只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到内核中的事件注册表中
II. 应用示例
|
|
III. 相关数据结构
|
|
通过源码可以看到,Golang 网络编程涉及到的 netFD
, poll.FD
, Addr
, SysFile
以及 pollDesc
之间的关系如下:
fdmu
是为了保证对同一个文件的读、写操作能分别被序列化Sysfd
就是操作系统中syscall
返回的 fd 值pd
,pollDesc
I/O poller,是 Go 对 poll 过程的一个抽象,所有平台的抽象都是一样的csema
,当文件被关闭时会被触发isBlocking
表明 FD 是否为 blocking 模式IsStream
标志该 FD 是否是流式,与流式相反的是基于 packet 的,即 UDP socketZeroReadIsEOF
,当连接读到 0 长度时,用来区分是否代表 EOF. 如果是基于 packet 的 socket 连接,则始终是false
isFile
标志该 FD 是否代表文件,还是网络连接netFD
结构中包含一个poll.FD
类型的成员pfd
,以及Addr
接口类型的laddr
和raddr
poll.FD
结构含有SysFile
和pollDesc
类型的成员,以及fdMutex
类型的fdmu
IV. TCP 网络编程基本流程
本部分涉及众多函数调用,为了描述清晰,采用了图的形式,其中,每一块第一行表示该块所表示的函数名称,其他部分表示该函数中关键函数调用。
创建 TCP socket 并监听: net.Listen
NOTE
需要注意的是,在执行
net.(*netFD).listenStream
之前,由于maxListenerBacklog
函数调用了open("/proc/sys/net/core/somaxconn")
,则会导致 epoll 底层红黑树的提前创建:runtime.netpollinit
->syscall.EpollCreate1
->Syscall6(SYS_EPOLL_CREATE1, uintptr(flags), 0, 0, 0, 0, 0)
.另外,当启用 Timer 时,也存在提前初始化 netpoll 的可能,原因: Timers rely on the network poller
time.NewTimer
->runtime.startTimer
->runtime.addtimer
->runtime.doaddtimer
->netpollGenericInit()
1 2 3 4 5 6 7 8 9 10
// doaddtimer adds t to the current P's heap. // The caller must have locked the timers for pp. func doaddtimer(pp *p, t *timer) { // Timers rely on the network poller, so make sure the poller // has started. if netpollInited.Load() == 0 { netpollGenericInit() } ... }
获取 TCP 连接: net.(*TCPListener).Accept
TCP 连接读数据: net.(*TCPConn).Read
TCP 连接写数据: net.(*TCPConn).Write
V. netpoll 执行流程: netpoll
在调度和 GC 的关键点上都会检查一次 netpoll,确定是否存在 ready 状态的 FD:
-
startTheWorldWithSema
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
// reason is the same STW reason passed to stopTheWorld. start is the start // time returned by stopTheWorld. // // now is the current time; prefer to pass 0 to capture a fresh timestamp. // // stattTheWorldWithSema returns now. func startTheWorldWithSema(now int64, w worldStop) int64 { assertWorldStopped() mp := acquirem() // disable preemption because it can be holding p in a local var if netpollinited() { list, delta := netpoll(0) // non-blocking injectglist(&list) netpollAdjustWaiters(delta) } lock(&sched.lock) procs := gomaxprocs if newprocs != 0 { procs = newprocs newprocs = 0 } p1 := procresize(procs) sched.gcwaiting.Store(false) if sched.sysmonwait.Load() { sched.sysmonwait.Store(false) notewakeup(&sched.sysmonnote) } unlock(&sched.lock) worldStarted() ... }
-
findrunnable
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from local or global queue, poll network. // tryWakeP indicates that the returned goroutine is not normal (GC worker, trace // reader) so the caller should try to wake a P. func findRunnable() (gp *g, inheritTime, tryWakeP bool) { ... // Poll network until next timer. if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 { sched.pollUntil.Store(pollUntil) if mp.p != 0 { throw("findrunnable: netpoll with p") } if mp.spinning { throw("findrunnable: netpoll with spinning") } delay := int64(-1) if pollUntil != 0 { if now == 0 { now = nanotime() } delay = pollUntil - now if delay < 0 { delay = 0 } } if faketime != 0 { // When using fake time, just poll. delay = 0 } list, delta := netpoll(delay) // block until new work is available ... } ... }
-
pollWork
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
// pollWork reports whether there is non-background work this P could // be doing. This is a fairly lightweight check to be used for // background work loops, like idle GC. It checks a subset of the // conditions checked by the actual scheduler. func pollWork() bool { if sched.runqsize != 0 { return true } p := getg().m.p.ptr() if !runqempty(p) { return true } if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 { if list, delta := netpoll(0); !list.empty() { injectglist(&list) netpollAdjustWaiters(delta) return true } } return false }
-
sysmon
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
// Always runs without a P, so write barriers are not allowed. // //go:nowritebarrierrec func sysmon() { ... lock(&sched.sysmonlock) // Update now in case we blocked on sysmonnote or spent a long time // blocked on schedlock or sysmonlock above. now = nanotime() // trigger libc interceptors if needed if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // poll network if not polled for more than 10ms lastpoll := sched.lastpoll.Load() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { sched.lastpoll.CompareAndSwap(lastpoll, now) list, delta := netpoll(0) // non-blocking - returns list of goroutines if !list.empty() { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) injectglist(&list) incidlelocked(1) netpollAdjustWaiters(delta) } } ... }
根据 ready 的事件时 Read 或 Write,分别从 poolDesc 的 rg、wg 上获取该唤醒的 goroutine.
然后将已经 ready 的 goroutine push 到 toRun 链表,并且 toRun 链表最终会从 netpoll()
返回,通过 injectglist
进入全局队列.
相当于每次调度循环都要执行 netpoll,检查频率还是比较高的
|
|
VI. 总结
-
Golang 通过对 Linux 内核提供的
epoll
实现进行封装,实现了同步编程异步执行的效果,其核心数据结构是netFD
,并将Sysfd
与pollDesc
结构绑定。 当某个netFD
产生EAGAIN
错误时,则当前 Goroutine 将会被存储到其对应的pollDesc
中,同时 Goroutine 会gopark()
,直至这个netFD
再次发生读写事件,会将此 Goroutine 设置为 ready 并放入toRun
队列等待重新运行,而底层事件通知机制就是 epoll. -
Golang 中 netpoll 的创建与初始化的可能来源:Timer、读文件、TCP Listen.
-
如下的调度和 GC 关键函数
startTheWorldWithSema
、findrunnable
、pollWork
、sysmon
都会进行netpoll
执行流程,检查是否存在 ready 状态的 FD.