Golang net包rpc代码分析浅谈

事先声明:所有参考内容,会在结尾留下。

为什么会对Golang 中 net 包进行分析,最近看完书中netpoll的讲解,之前也阅读过,但始终对于net包 和netpoll两者关系没有理清。然后又看到了字节跳动在自己开源的netpoll对于Go net包的评价。两者结合,对于net包的性能为何快,那么netpoll又扮演什么功能。

本文仅对net包中rpc服务和客户端的创建流程,对于其中netpoll是何时被调用,之间的联系。

todo:文章某些内容细节没有分析到位,后续补充。

go version: 1.19.13

rpc服务和客户端

创建和使用

在Go中创建一个rpc服务和客户端,其中基本上实现了rpc 协议的解码和编码、rpc 服务和客户端服务的连接和请求处理。下面先看一段代码示例和基本流程。

image-20240429170943578

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"fmt"
"log"
"net/rpc"
)

func main() {

client, err := rpc.Dial("tcp", "localhost:8080")
if err != nil {
log.Fatal("dialing:", err)
}

var reply string
err = client.Call("HelloService.Hello", "world", &reply)
if err != nil {
log.Fatal(err)
}

fmt.Println(reply)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package main

import (
"log"
"net"
"net/rpc"
)

type HelloService struct {
}

func (p *HelloService) Hello(request string, reply *string) error {
*reply = "hello " + request
return nil
}

func main() {
srv := rpc.NewServer()
srv.RegisterName("HelloService", new(HelloService))

listener, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("ListenTCP error:", err)
}

for {
conn, err := listener.Accept()
if err != nil {
log.Fatal("Accept error:", err)
}

go srv.ServeConn(conn)
}
}

上述代码流程里,我们实现了基本rpc服务创建,rpc客户端请求,在hello server处理请求数据,返回组合的数据,打印了hello world


rpc服务流程详细分析

针对上面的rpc服务创建流程,具体流程是怎么做的,netpoll 又充当了什么角色,为什么说net包实现的rpcBIO模式的。

下面对于其中Listen函数、Accept函数、ServeConn函数具体流程进行分析。

Listen函数

先看一下Listen函数(由Listener结构实现)中,其Listener底层数据结构主要字段。

从下图我们看到,Listener 结构体最终涉及调用了 pollDesc,而pollDescnetpoll功能的主要数据结构,pollDesc封装了操作系统的文件描述符,从而监控了网络IO的请求。 而在netpoll中,在linux中主要通过epoll模型实现。

alt

下面我们看一下整体函数调用链路。

image-20240429182001877

接下来再对应实际代码分析一下。由上图和下列代码组合来看,到socket函数中,通过调用sysScoket函数创建系统fd文件标识符,然后通过newFD函数,构建netFD网络文件标识符。最终通过sotype判断,走向listenStream函数,调用系统内核bind函数和Listen函数完成tcp三次握手前的准备。

fd.init函数流程:调用pollDescinit函数(在这里开始使用netpoll组件中的函数)

  1. 通过全局serverInit(sync.Once全局变量)执行runtime_pollServerInit,创建epollfdevent
  2. 然后调用runtime_pollOpen打开系统fd,通过全局pollcache缓存链表中创建获取pollDesc
  3. 通过epollctlfdpd绑定,去顶事件的监听类型,加入监听队列。

pollCache中,会构建一个全局资源pollcache,当链表中没有对应pd,则会根据参数,创建一批。并且会保证这些数据结构在不会触发垃圾收集的内存中初始化,只会保证epollkqueue引用(引自《Go语言涉及与实现》)。

image-20240506180954123.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil {
poll.CloseFunc(s)
return nil, err
}
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}

if laddr != nil && raddr == nil {
switch sotype {
case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
case syscall.SOCK_DGRAM:
if err := fd.listenDatagram(laddr, ctrlFn); err != nil {
fd.Close()
return nil, err
}
return fd, nil
}
}
/*
****
*/
return fd, nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
c, err := newRawConn(fd)
if err != nil {
return err
}
if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil {
return err
}
}
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// 调用sysListen
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != 0 && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != 0 && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
pd.fd = fd
pd.closing = false
pd.setEventErr(false)
pd.rseq++
pd.rg.Store(0)
pd.rd = 0
pd.wseq++
pd.wg.Store(0)
pd.wd = 0
pd.self = pd
pd.publishInfo()
unlock(&pd.lock)

errno := netpollopen(fd, pd)
if errno != 0 {
pollcache.free(pd)
return nil, int(errno)
}
return pd, 0
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}

Accept函数

在分析Accept函数前,我们先看一下流程调用图。

image-20240506193733741

根据上面的Listen函数分析,此时我们创建完对的系统fdpd(pollDesc)。通过调用系统级别accept函数,监听系统fd,此时判断的errsyscall.EAGAIN,然后netpoll中调用pollWait阻塞,判断合适时,跳出,通过系统accept获取fd(scoket),然后组合信息封装成TCPConn,给到下面的ServeConn函数处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (fd *netFD) accept() (netfd *netFD, err error) {
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}

if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()

if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
for {
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
case syscall.EAGAIN:
if fd.pd.pollable() {
// poll_runtime_pollWait
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
// 阻塞
for !netpollblock(pd, int32(mode), false) {
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}

ServeConn函数

ServeConn函数流程中,先构建gobServeCodec结构,然后交给ServeCodec函数进行逐个调用处理。在ServeCodec函数中,进行每个请求体解析,然后分配一个goroutinue给到对应函数,通过反射的call函数进行调用返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
// The caller typically invokes ServeConn in a go statement.
// ServeConn uses the gob wire format (see package gob) on the
// connection. To use an alternate codec, use ServeCodec.
// See NewClient's comment for information about concurrent access.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
buf := bufio.NewWriter(conn)
srv := &gobServerCodec{
rwc: conn,
dec: gob.NewDecoder(conn),
enc: gob.NewEncoder(buf),
encBuf: buf,
}
server.ServeCodec(srv)
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// ServeCodec is like ServeConn but uses the specified codec to
// decode requests and encode responses.
func (server *Server) ServeCodec(codec ServerCodec) {
sending := new(sync.Mutex)
wg := new(sync.WaitGroup)
for {
service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec)
if err != nil {
if debugLog && err != io.EOF {
log.Println("rpc:", err)
}
if !keepReading {
break
}
// send a response if we actually managed to read a header.
if req != nil {
server.sendResponse(sending, req, invalidRequest, codec, err.Error())
server.freeRequest(req)
}
continue
}
wg.Add(1)
go service.call(server, sending, wg, mtype, req, argv, replyv, codec)
}
// We've seen that there are no more requests.
// Wait for responses to be sent before closing codec.
wg.Wait()
codec.Close()
}

rpc客户端详细分析

Dail函数

Dail函数中,传入tcpip,与服务端建立连接,返回一个Conn实例,然后通过构建一个新的Client实例,用于处理函数请求。在Client创建中,会先创建bufgobClientCodec,在通过gobClientCodec构建Client中,会创建一个goroutine处理clientinput函数,然后返回Client

1
2
3
4
5
6
7
8
// Dial connects to an RPC server at the specified network address.
func Dial(network, address string) (*Client, error) {
conn, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return NewClient(conn), nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// NewClient returns a new Client to handle requests to the
// set of services at the other end of the connection.
// It adds a buffer to the write side of the connection so
// the header and payload are sent as a unit.
//
// The read and write halves of the connection are serialized independently,
// so no interlocking is required. However each half may be accessed
// concurrently so the implementation of conn should protect against
// concurrent reads or concurrent writes.
func NewClient(conn io.ReadWriteCloser) *Client {
encBuf := bufio.NewWriter(conn)
client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}
return NewClientWithCodec(client)
}

// NewClientWithCodec is like NewClient but uses the specified
// codec to encode requests and decode responses.
func NewClientWithCodec(codec ClientCodec) *Client {
client := &Client{
codec: codec,
pending: make(map[uint64]*Call),
}
go client.input()
return client
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
func (client *Client) input() {
var err error
var response Response
for err == nil {
response = Response{}
err = client.codec.ReadResponseHeader(&response)
if err != nil {
break
}
seq := response.Seq
client.mutex.Lock()
call := client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()

switch {
case call == nil:
// We've got no pending call. That usually means that
// WriteRequest partially failed, and call was already
// removed; response is a server telling us about an
// error reading request body. We should still attempt
// to read error body, but there's no one to give it to.
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
case response.Error != "":
// We've got an error response. Give this to the request;
// any subsequent requests will get the ReadResponseBody
// error if there is one.
call.Error = ServerError(response.Error)
err = client.codec.ReadResponseBody(nil)
if err != nil {
err = errors.New("reading error body: " + err.Error())
}
call.done()
default:
err = client.codec.ReadResponseBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// Terminate pending calls.
client.reqMutex.Lock()
client.mutex.Lock()
client.shutdown = true
closing := client.closing
if err == io.EOF {
if closing {
err = ErrShutdown
} else {
err = io.ErrUnexpectedEOF
}
}
for _, call := range client.pending {
call.Error = err
call.done()
}
client.mutex.Unlock()
client.reqMutex.Unlock()
if debugLog && err != io.EOF && !closing {
log.Println("rpc: client protocol error:", err)
}
}

Call函数

Call函数调用中,通过Call结构体中的Done字段(channel)来进行调用结束,以及err的返回。Call函数中,通过调用Go函数,构建Call结构体,然后调用send函数发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// Go invokes the function asynchronously. It returns the Call structure representing
// the invocation. The done channel will signal when the call is complete by returning
// the same Call object. If done is nil, Go will allocate a new channel.
// If non-nil, done must be buffered or Go will deliberately crash.
func (client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call {
call := new(Call)
call.ServiceMethod = serviceMethod
call.Args = args
call.Reply = reply
if done == nil {
done = make(chan *Call, 10) // buffered.
} else {
// If caller passes done != nil, it must arrange that
// done has enough buffer for the number of simultaneous
// RPCs that will be using that channel. If the channel
// is totally unbuffered, it's best not to run at all.
if cap(done) == 0 {
log.Panic("rpc: done channel is unbuffered")
}
}
call.Done = done
client.send(call)
return call
}

// Call invokes the named function, waits for it to complete, and returns its error status.
func (client *Client) Call(serviceMethod string, args any, reply any) error {
call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
return call.Error
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (client *Client) send(call *Call) {
client.reqMutex.Lock()
defer client.reqMutex.Unlock()

// Register this call.
client.mutex.Lock()
if client.shutdown || client.closing {
client.mutex.Unlock()
call.Error = ErrShutdown
call.done()
return
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()

// Encode and send the request.
client.request.Seq = seq
client.request.ServiceMethod = call.ServiceMethod
err := client.codec.WriteRequest(&client.request, call.Args)
if err != nil {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
if call != nil {
call.Error = err
call.done()
}
}
}

总结

server端:

netpoll组件里面处理网络I/O时,net包底层在linux系统里面使用对应的epoll模型,更为高效的创建和处理。但在处理对应连接请求时,主协程进行连接的接收,在server 实例中,调用ServeConn函数创建一个协程来单独处理网络连接请求,尽管已经很高效的运行了。但仍然有问题:协程数量没有控制。在gnet中,有效的解决了这些问题。

client端:

此处暂时理解不是很全面,后期进行补充。

todo

发送请求时,主协程会通过创建一个协程,处理请求返回信息。


参考

《Go语言设计与实现》