func(c *pollCache) alloc() *pollDesc { lock(&c.lock) if c.first == nil { const pdSize = unsafe.Sizeof(pollDesc{}) n := pollBlockSize / pdSize if n == 0 { n = 1 } // Must be in non-GC memory because can be referenced // only from epoll/kqueue internals. mem := persistentalloc(n*pdSize, 0, &memstats.other_sys) for i := uintptr(0); i < n; i++ { pd := (*pollDesc)(add(mem, i*pdSize)) pd.link = c.first c.first = pd } } pd := c.first c.first = pd.link lockInit(&pd.lock, lockRankPollDesc) unlock(&c.lock) return pd }
if err := fd.pd.prepareRead(fd.isFile); err != nil { return-1, nil, "", err } for { s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { case syscall.EINTR: continue case syscall.EAGAIN: if fd.pd.pollable() { // poll_runtime_pollWait if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } case syscall.ECONNABORTED: // This means that a socket on the listen // queue was closed before we Accept()ed it; // it's a silly error, so try again. continue } return-1, nil, errcall, err } }
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait funcpoll_runtime_pollWait(pd *pollDesc, mode int)int { errcode := netpollcheckerr(pd, int32(mode)) if errcode != pollNoError { return errcode } // As for now only Solaris, illumos, and AIX use level-triggered IO. if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" { netpollarm(pd, mode) } // 阻塞 for !netpollblock(pd, int32(mode), false) { errcode = netpollcheckerr(pd, int32(mode)) if errcode != pollNoError { return errcode } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return pollNoError }
// ServeConn runs the server on a single connection. // ServeConn blocks, serving the connection until the client hangs up. // The caller typically invokes ServeConn in a go statement. // ServeConn uses the gob wire format (see package gob) on the // connection. To use an alternate codec, use ServeCodec. // See NewClient's comment for information about concurrent access. func(server *Server) ServeConn(conn io.ReadWriteCloser) { buf := bufio.NewWriter(conn) srv := &gobServerCodec{ rwc: conn, dec: gob.NewDecoder(conn), enc: gob.NewEncoder(buf), encBuf: buf, } server.ServeCodec(srv) }
// ServeCodec is like ServeConn but uses the specified codec to // decode requests and encode responses. func(server *Server) ServeCodec(codec ServerCodec) { sending := new(sync.Mutex) wg := new(sync.WaitGroup) for { service, mtype, req, argv, replyv, keepReading, err := server.readRequest(codec) if err != nil { if debugLog && err != io.EOF { log.Println("rpc:", err) } if !keepReading { break } // send a response if we actually managed to read a header. if req != nil { server.sendResponse(sending, req, invalidRequest, codec, err.Error()) server.freeRequest(req) } continue } wg.Add(1) go service.call(server, sending, wg, mtype, req, argv, replyv, codec) } // We've seen that there are no more requests. // Wait for responses to be sent before closing codec. wg.Wait() codec.Close() }
// NewClient returns a new Client to handle requests to the // set of services at the other end of the connection. // It adds a buffer to the write side of the connection so // the header and payload are sent as a unit. // // The read and write halves of the connection are serialized independently, // so no interlocking is required. However each half may be accessed // concurrently so the implementation of conn should protect against // concurrent reads or concurrent writes. funcNewClient(conn io.ReadWriteCloser) *Client { encBuf := bufio.NewWriter(conn) client := &gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf} return NewClientWithCodec(client) }
// NewClientWithCodec is like NewClient but uses the specified // codec to encode requests and decode responses. funcNewClientWithCodec(codec ClientCodec) *Client { client := &Client{ codec: codec, pending: make(map[uint64]*Call), } go client.input() return client }
func(client *Client) input() { var err error var response Response for err == nil { response = Response{} err = client.codec.ReadResponseHeader(&response) if err != nil { break } seq := response.Seq client.mutex.Lock() call := client.pending[seq] delete(client.pending, seq) client.mutex.Unlock()
switch { case call == nil: // We've got no pending call. That usually means that // WriteRequest partially failed, and call was already // removed; response is a server telling us about an // error reading request body. We should still attempt // to read error body, but there's no one to give it to. err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } case response.Error != "": // We've got an error response. Give this to the request; // any subsequent requests will get the ReadResponseBody // error if there is one. call.Error = ServerError(response.Error) err = client.codec.ReadResponseBody(nil) if err != nil { err = errors.New("reading error body: " + err.Error()) } call.done() default: err = client.codec.ReadResponseBody(call.Reply) if err != nil { call.Error = errors.New("reading body " + err.Error()) } call.done() } } // Terminate pending calls. client.reqMutex.Lock() client.mutex.Lock() client.shutdown = true closing := client.closing if err == io.EOF { if closing { err = ErrShutdown } else { err = io.ErrUnexpectedEOF } } for _, call := range client.pending { call.Error = err call.done() } client.mutex.Unlock() client.reqMutex.Unlock() if debugLog && err != io.EOF && !closing { log.Println("rpc: client protocol error:", err) } }
// Go invokes the function asynchronously. It returns the Call structure representing // the invocation. The done channel will signal when the call is complete by returning // the same Call object. If done is nil, Go will allocate a new channel. // If non-nil, done must be buffered or Go will deliberately crash. func(client *Client) Go(serviceMethod string, args any, reply any, done chan *Call) *Call { call := new(Call) call.ServiceMethod = serviceMethod call.Args = args call.Reply = reply if done == nil { done = make(chan *Call, 10) // buffered. } else { // If caller passes done != nil, it must arrange that // done has enough buffer for the number of simultaneous // RPCs that will be using that channel. If the channel // is totally unbuffered, it's best not to run at all. ifcap(done) == 0 { log.Panic("rpc: done channel is unbuffered") } } call.Done = done client.send(call) return call }
// Call invokes the named function, waits for it to complete, and returns its error status. func(client *Client) Call(serviceMethod string, args any, reply any) error { call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done return call.Error }