From 90cc3493e1ae58b9c5949bdaa665cb1c41862a78 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Fri, 19 Nov 2021 16:40:01 -0500 Subject: [PATCH 1/5] Bugfix. Don't block when max concurrent RPC calls is exceeded. --- go.mod | 2 +- go.sum | 1 + server/server.go | 36 ++++++++---------------------- server/server_test.go | 51 +++++++++++++++++-------------------------- 4 files changed, 31 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index f9ea1f6a..1e9aaf47 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,6 @@ go 1.16 require ( github.com/kylelemons/godebug v1.1.0 - github.com/stretchr/testify v1.7.0 // indirect + github.com/stretchr/testify v1.7.0 github.com/tinylib/msgp v1.1.5 ) diff --git a/go.sum b/go.sum index aaab4d62..7d854b18 100644 --- a/go.sum +++ b/go.sum @@ -33,6 +33,7 @@ golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4f golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/server/server.go b/server/server.go index 79b2dd98..8ee2601e 100644 --- a/server/server.go +++ b/server/server.go @@ -103,10 +103,6 @@ type Server struct { // call. It is closed when the acknowledgement is received. starting <-chan struct{} - // full is non-nil if a start() is waiting for a space in ongoing to - // free up. It is closed and set to nil when the next call returns. - full chan<- struct{} - // drain is non-nil when Shutdown starts and is closed by the last // call to return. drain chan struct{} @@ -225,29 +221,17 @@ func (srv *Server) start(ctx context.Context, m *Method, r capnp.Recv) capnp.Pip // Acquire an ID (semaphore). id := srv.nextID() if id == -1 { - full := make(chan struct{}) - srv.full = full - srv.mu.Unlock() - select { - case <-full: - case <-ctx.Done(): - srv.mu.Lock() - srv.starting = nil - close(starting) - srv.full = nil // full could be nil or non-nil, ensure it is nil. - srv.mu.Unlock() - r.Reject(ctx.Err()) - return nil - } - srv.mu.Lock() - id = srv.nextID() + defer srv.mu.Unlock() + + err := errors.New(errors.Overloaded, "capnp server", "max concurrent calls exceeded") if srv.drain != nil { srv.starting = nil close(starting) - srv.mu.Unlock() - r.Reject(errors.New(errors.Failed, "capnp server", "call after shutdown")) - return nil + err = errors.New(errors.Failed, "capnp server", "call after shutdown") } + + r.Reject(err) + return nil } // Bookkeeping: set starting to indicate we're waiting for an ack and @@ -270,17 +254,15 @@ func (srv *Server) start(ctx context.Context, m *Method, r capnp.Recv) capnp.Pip aq.reject(err) r.Returner.Return(err) } + srv.mu.Lock() srv.ongoing[id].cancel() srv.ongoing[id] = cstate{} if srv.drain != nil && !srv.hasOngoing() { close(srv.drain) } - if srv.full != nil { - close(srv.full) - srv.full = nil - } srv.mu.Unlock() + close(done) }() var pcall capnp.PipelineCaller diff --git a/server/server_test.go b/server/server_test.go index 235b9902..3acd2849 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -6,10 +6,12 @@ import ( "sync" "testing" + "context" + "capnproto.org/go/capnp/v3" air "capnproto.org/go/capnp/v3/internal/aircraftlib" "capnproto.org/go/capnp/v3/server" - "context" + "github.com/stretchr/testify/assert" ) type echoImpl struct{} @@ -103,25 +105,6 @@ func (seq *callSeq) GetNumber(ctx context.Context, call air.CallSequence_getNumb return nil } -type lockCallSeq struct { - n uint32 - mu sync.Mutex -} - -func (seq *lockCallSeq) GetNumber(ctx context.Context, call air.CallSequence_getNumber) error { - seq.mu.Lock() - defer seq.mu.Unlock() - call.Ack() - - r, err := call.AllocResults() - if err != nil { - return err - } - r.SetN(seq.n) - seq.n++ - return nil -} - func TestServerCallOrder(t *testing.T) { tests := []struct { name string @@ -167,30 +150,36 @@ func TestServerCallOrder(t *testing.T) { } func TestServerMaxConcurrentCalls(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + wait := make(chan struct{}) echo := air.Echo_ServerToClient(blockingEchoImpl{wait}, &server.Policy{ MaxConcurrentCalls: 2, }) defer echo.Client.Release() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + call1, finish := echo.Echo(ctx, nil) defer finish() + call2, finish := echo.Echo(ctx, nil) defer finish() + go close(wait) + call3, finish := echo.Echo(ctx, nil) defer finish() + <-wait - if _, err := call1.Struct(); err != nil { - t.Error("Echo #1:", err) - } - if _, err := call2.Struct(); err != nil { - t.Error("Echo #2:", err) - } - if _, err := call3.Struct(); err != nil { - t.Error("Echo #3:", err) - } + + _, err := call1.Struct() + assert.NoError(t, err, "call1 should succeed") + + _, err = call2.Struct() + assert.NoError(t, err, "call2 should succeed") + + _, err = call3.Struct() + assert.Error(t, err, "call3 should fail") } func TestServerShutdown(t *testing.T) { From 785aa8dc362294dad959a429c6ebc0a593c207e6 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sun, 5 Dec 2021 17:11:01 -0500 Subject: [PATCH 2/5] Implement unbounded, asynchronous send buffer. Fix server.SendPolicy docstring. --- go.mod | 3 +- go.sum | 2 +- internal/errors/errors.go | 2 +- internal/errors/errors_test.go | 12 ++ internal/mpsc/mpsc_test.go | 3 +- rpc/answer.go | 12 +- rpc/import.go | 27 ++- rpc/question.go | 55 +++--- rpc/rpc.go | 340 ++++++++++++++++++--------------- rpc/send.go | 108 +++++++++++ server/server.go | 41 ++-- server/server_test.go | 51 +++-- 12 files changed, 419 insertions(+), 237 deletions(-) create mode 100644 rpc/send.go diff --git a/go.mod b/go.mod index 1e9aaf47..7904e9b4 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/kylelemons/godebug v1.1.0 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.0 // indirect github.com/tinylib/msgp v1.1.5 + golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 ) diff --git a/go.sum b/go.sum index 7d854b18..f6d8c1d6 100644 --- a/go.sum +++ b/go.sum @@ -21,6 +21,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -33,7 +34,6 @@ golang.org/x/tools v0.0.0-20201022035929-9cf592e881e9/go.mod h1:emZCQorbCU4vsT4f golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/errors/errors.go b/internal/errors/errors.go index c9dd9914..ef1beb01 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -32,7 +32,7 @@ func (e *capnpError) GoString() string { // The returned error's type will match err's type. func Annotate(prefix, msg string, err error) error { if err == nil { - panic("Annotate on nil error") + return nil } ce, ok := err.(*capnpError) if !ok { diff --git a/internal/errors/errors_test.go b/internal/errors/errors_test.go index 37ad08b9..ec8c3b2e 100644 --- a/internal/errors/errors_test.go +++ b/internal/errors/errors_test.go @@ -3,6 +3,8 @@ package errors import ( "errors" "testing" + + "github.com/stretchr/testify/assert" ) func TestErrorString(t *testing.T) { @@ -52,6 +54,11 @@ func TestAnnotate(t *testing.T) { want string wantType Type }{ + { + prefix: "prefix", + msg: "context", + err: nil, + }, { msg: "context", err: errors.New("goofed"), @@ -113,6 +120,11 @@ func TestAnnotate(t *testing.T) { } for _, test := range tests { got := Annotate(test.prefix, test.msg, test.err) + if test.err == nil { + assert.Nil(t, got) + continue + } + if got.Error() != test.want { t.Errorf("Annotate(%q, %q, %#v).Error() = %q; %q", test.prefix, test.msg, test.err, got.Error(), test.want) } diff --git a/internal/mpsc/mpsc_test.go b/internal/mpsc/mpsc_test.go index e8a021c9..d902ae8d 100644 --- a/internal/mpsc/mpsc_test.go +++ b/internal/mpsc/mpsc_test.go @@ -22,7 +22,8 @@ func TestRecvEmpty(t *testing.T) { q := New() // Recv() on an empty queue should block until the context is canceled. - ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*10) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() v, err := q.Recv(ctx) assert.Equal(t, ctx.Err(), err, "Returned error is not ctx.Err()") diff --git a/rpc/answer.go b/rpc/answer.go index f6a99ea3..3d96d5db 100644 --- a/rpc/answer.go +++ b/rpc/answer.go @@ -181,7 +181,7 @@ func (ans *answer) Return(e error) { default: ans.c.tasks.Done() // added by handleCall if err := ans.c.shutdown(err); err != nil { - ans.c.report(err) + ans.c.er.ReportError(err) } // shutdown released c.mu rl.release() @@ -210,7 +210,7 @@ func (ans *answer) sendReturn(cstates []capnp.ClientState) (releaseList, error) var err error ans.exportRefs, err = ans.c.fillPayloadCapTable(ans.results, ans.resultCapTable, cstates) if err != nil { - ans.c.report(annotate(err).errorf("send return")) + ans.c.er.annotatef(err, "send return") // Continue. Don't fail to send return if cap table isn't fully filled. } @@ -220,7 +220,7 @@ func (ans *answer) sendReturn(cstates []capnp.ClientState) (releaseList, error) fin := ans.flags&finishReceived != 0 ans.c.mu.Unlock() if err := ans.sendMsg(); err != nil { - ans.c.reportf("send return: %v", err) + ans.c.er.reportf("send return: %v", err) } if fin { ans.releaseMsg() @@ -258,13 +258,13 @@ func (ans *answer) sendException(e error) releaseList { fin := ans.flags&finishReceived != 0 ans.c.mu.Unlock() if exc, err := ans.ret.NewException(); err != nil { - ans.c.reportf("send exception: %v", err) + ans.c.er.reportf("send exception: %v", err) } else { exc.SetType(rpccp.Exception_Type(errors.TypeOf(e))) if err := exc.SetReason(e.Error()); err != nil { - ans.c.reportf("send exception: %v", err) + ans.c.er.reportf("send exception: %v", err) } else if err := ans.sendMsg(); err != nil { - ans.c.reportf("send return: %v", err) + ans.c.er.reportf("send return: %v", err) } } if fin { diff --git a/rpc/import.go b/rpc/import.go index b25e1d25..7416ddf3 100644 --- a/rpc/import.go +++ b/rpc/import.go @@ -255,30 +255,29 @@ func (ic *importClient) Brand() capnp.Brand { func (ic *importClient) Shutdown() { ic.c.mu.Lock() + defer ic.c.mu.Unlock() + if !ic.c.startTask() { - ic.c.mu.Unlock() return } defer ic.c.tasks.Done() + ent := ic.c.imports[ic.id] if ic.generation != ent.generation { // A new reference was added concurrently with the Shutdown. See // impent.generation documentation for an explanation. - ic.c.mu.Unlock() return } + delete(ic.c.imports, ic.id) - err := ic.c.sendMessage(context.Background(), func(msg rpccp.Message) error { - rel, err := msg.NewRelease() - if err != nil { + + ic.c.sendq.SendAsync(context.Background(), + prepFunc(func(m rpccp.Message) error { + rel, err := m.NewRelease() + if err == nil { + rel.SetId(uint32(ic.id)) + rel.SetReferenceCount(uint32(ent.wireRefs)) + } return err - } - rel.SetId(uint32(ic.id)) - rel.SetReferenceCount(uint32(ent.wireRefs)) - return nil - }) - ic.c.mu.Unlock() - if err != nil { - ic.c.report(annotate(err).errorf("send release")) - } + }), ic.c.er.annotater("send releases")) } diff --git a/rpc/question.go b/rpc/question.go index 7dfc3738..27456412 100644 --- a/rpc/question.go +++ b/rpc/question.go @@ -75,39 +75,44 @@ func (q *question) handleCancel(ctx context.Context) { } q.c.mu.Lock() + defer q.c.mu.Unlock() + + // Promise already fulfilled? if q.flags&finished != 0 { - // Promise already fulfilled. - q.c.mu.Unlock() return } + q.flags |= finished q.release = func() {} - err := q.c.sendMessage(q.c.bgctx, func(msg rpccp.Message) error { - fin, err := msg.NewFinish() - if err != nil { - return err + q.c.sendq.SendAsync(q.c.bgctx, prepFunc(func(m rpccp.Message) error { + fin, err := m.NewFinish() + if err == nil { + fin.SetQuestionId(uint32(q.id)) + fin.SetReleaseResultCaps(true) } - fin.SetQuestionId(uint32(q.id)) - fin.SetReleaseResultCaps(true) - return nil - }) - if err == nil { - q.flags |= finishSent - } else { - select { - case <-q.c.bgctx.Done(): - default: - q.c.report(annotate(err).errorf("send finish")) + return err + }), errorReporterFunc(func(err error) { + q.c.mu.Lock() + defer q.c.mu.Unlock() + + if err == nil { + q.flags |= finishSent + } else { + select { + case <-q.c.bgctx.Done(): + default: + q.c.er.annotatef(err, "send finish") + } } - } - close(q.finishMsgSend) - q.c.mu.Unlock() + close(q.finishMsgSend) + q.c.mu.Unlock() - q.p.Reject(rejectErr) - if q.bootstrapPromise != nil { - q.bootstrapPromise.Fulfill(q.p.Answer().Client()) - q.p.ReleaseClients() - } + q.p.Reject(rejectErr) + if q.bootstrapPromise != nil { + q.bootstrapPromise.Fulfill(q.p.Answer().Client()) + q.p.ReleaseClients() + } + })) } func (q *question) PipelineSend(ctx context.Context, transform []capnp.PipelineOp, s capnp.Send) (*capnp.Answer, capnp.ReleaseFunc) { diff --git a/rpc/rpc.go b/rpc/rpc.go index 6d864ae9..8e91a50f 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -10,6 +10,7 @@ import ( "capnproto.org/go/capnp/v3" "capnproto.org/go/capnp/v3/internal/errors" rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" + "golang.org/x/sync/errgroup" ) /* @@ -67,7 +68,7 @@ is a common source of errors and/or inefficiencies. // It is safe to use from multiple goroutines. type Conn struct { bootstrap *capnp.Client - reporter ErrorReporter + er annotatingErrReporter abortTimeout time.Duration // bgctx is a Context that is canceled when shutdown starts. @@ -94,6 +95,8 @@ type Conn struct { // See the above comment for a longer explanation. sendCond chan struct{} + sendq *sendQueue + // Tables questions []*question questionID idgen @@ -137,6 +140,11 @@ type ErrorReporter interface { // requests from the transport. func NewConn(t Transport, opts *Options) *Conn { bgctx, bgcancel := context.WithCancel(context.Background()) + + // We use an errgroup to link the lifetime of background tasks + // to each other. + g, bgctx := errgroup.WithContext(bgctx) + c := &Conn{ transport: t, shut: make(chan struct{}), @@ -144,37 +152,48 @@ func NewConn(t Transport, opts *Options) *Conn { bgcancel: bgcancel, answers: make(map[answerID]*answer), imports: make(map[importID]*impent), + sendq: newSendQueue(), } if opts != nil { c.bootstrap = opts.BootstrapClient - c.reporter = opts.ErrorReporter + c.er.ErrorReporter = opts.ErrorReporter c.abortTimeout = opts.AbortTimeout } if c.abortTimeout == 0 { c.abortTimeout = 100 * time.Millisecond } - c.tasks.Add(1) + + // start background tasks + for _, f := range []func(context.Context) error{ + c.send, + c.receive, + } { + c.tasks.Add(1) + g.Go(c.newTask(f)) + } + + // monitor background tasks go func() { - abortErr := c.receive(c.bgctx) - c.tasks.Done() + if err := g.Wait(); err != nil { + c.mu.Lock() // shutdown unlocks c.mu. - c.mu.Lock() - select { - case <-c.bgctx.Done(): - c.mu.Unlock() - default: - if abortErr != nil { - c.report(abortErr) - } - // shutdown unlocks c.mu. - if err := c.shutdown(abortErr); err != nil { - c.report(err) + c.er.ReportError(err) + if err = c.shutdown(err); err != nil { + c.er.ReportError(err) } } }() + return c } +func (c *Conn) newTask(f func(context.Context) error) func() error { + return func() error { + defer c.tasks.Done() + return f(c.bgctx) + } +} + // Bootstrap returns the remote vat's bootstrap interface. This creates // a new client that the caller is responsible for releasing. func (c *Conn) Bootstrap(ctx context.Context) *capnp.Client { @@ -184,6 +203,7 @@ func (c *Conn) Bootstrap(ctx context.Context) *capnp.Client { return capnp.ErrorClient(disconnected("connection closed")) } defer c.tasks.Done() + q := c.newQuestion(capnp.Method{}) bootCtx, cancel := context.WithCancel(ctx) bc, cp := capnp.NewPromisedClient(bootstrapClient{ @@ -191,27 +211,26 @@ func (c *Conn) Bootstrap(ctx context.Context) *capnp.Client { cancel: cancel, }) q.bootstrapPromise = cp // safe to write because we're still holding c.mu + c.mu.Unlock() - err := c.sendMessage(ctx, func(msg rpccp.Message) error { - boot, err := msg.NewBootstrap() - if err != nil { - return err + if err := c.sendq.Send(ctx, prepFunc(func(m rpccp.Message) error { + boot, err := m.NewBootstrap() + if err == nil { + boot.SetQuestionId(uint32(q.id)) } - boot.SetQuestionId(uint32(q.id)) - return nil - }) - if err != nil { + return err + })); err != nil { c.questions[q.id] = nil c.questionID.remove(uint32(q.id)) - c.mu.Unlock() return capnp.ErrorClient(annotate(err).errorf("bootstrap")) } + c.tasks.Add(1) go func() { defer c.tasks.Done() q.handleCancel(bootCtx) }() - c.mu.Unlock() + return bc } @@ -345,6 +364,41 @@ closeTransport: return nil } +// send transmits messages to the remote vat. It runs in a background goroutine. +func (c *Conn) send(ctx context.Context) error { + for { + if err := c.sendMessage(ctx, c.sendq.Recv(ctx)); err != nil { + return err + } + } +} + +func (c *Conn) sendMessage(ctx context.Context, p preparer) error { + c.mu.Lock() + defer c.mu.Unlock() + + if err := c.tryLockSender(ctx); err != nil { + return err + } + defer c.unlockSender() + + msg, send, release, err := c.transport.NewMessage(ctx) + if err != nil { + return errorf("create message: %v", err) + } + defer release() + + if err := p.Prepare(msg); err != nil { + return errorf("build message: %v", err) + } + + if err = send(); err != nil { + return errorf("send message: %v", err) + } + + return nil +} + // receive receives and dispatches messages coming from c.transport. receive // runs in a background goroutine. // @@ -363,24 +417,24 @@ func (c *Conn) receive(ctx context.Context) error { exc, err := recv.Abort() if err != nil { releaseRecv() - c.reportf("read abort: %v", err) + c.er.reportf("read abort: %v", err) return nil } reason, err := exc.Reason() if err != nil { releaseRecv() - c.reportf("read abort reason: %v", err) + c.er.reportf("read abort reason: %v", err) return nil } ty := exc.Type() releaseRecv() - c.report(errors.New(errors.Type(ty), "rpc", "remote abort: "+reason)) + c.er.ReportError(errors.New(errors.Type(ty), "rpc", "remote abort: "+reason)) return nil case rpccp.Message_Which_bootstrap: bootstrap, err := recv.Bootstrap() if err != nil { releaseRecv() - c.reportf("read bootstrap: %v", err) + c.er.reportf("read bootstrap: %v", err) continue } qid := answerID(bootstrap.QuestionId()) @@ -392,7 +446,7 @@ func (c *Conn) receive(ctx context.Context) error { call, err := recv.Call() if err != nil { releaseRecv() - c.reportf("read call: %v", err) + c.er.reportf("read call: %v", err) continue } if err := c.handleCall(ctx, call, releaseRecv); err != nil { @@ -402,7 +456,7 @@ func (c *Conn) receive(ctx context.Context) error { ret, err := recv.Return() if err != nil { releaseRecv() - c.reportf("read return: %v", err) + c.er.reportf("read return: %v", err) continue } if err := c.handleReturn(ctx, ret, releaseRecv); err != nil { @@ -412,7 +466,7 @@ func (c *Conn) receive(ctx context.Context) error { fin, err := recv.Finish() if err != nil { releaseRecv() - c.reportf("read finish: %v", err) + c.er.reportf("read finish: %v", err) continue } qid := answerID(fin.QuestionId()) @@ -425,7 +479,7 @@ func (c *Conn) receive(ctx context.Context) error { rel, err := recv.Release() if err != nil { releaseRecv() - c.reportf("read release: %v", err) + c.er.reportf("read release: %v", err) continue } id := exportID(rel.Id()) @@ -438,20 +492,15 @@ func (c *Conn) receive(ctx context.Context) error { d, err := recv.Disembargo() if err != nil { releaseRecv() - c.reportf("read disembargo: %v", err) + c.er.reportf("read disembargo: %v", err) continue } - err = c.handleDisembargo(ctx, d) - releaseRecv() + err = c.handleDisembargo(ctx, d, releaseRecv) if err != nil { return err } default: - err := c.handleUnknownMessage(ctx, recv) - releaseRecv() - if err != nil { - return err - } + c.handleUnknownMessage(ctx, recv, releaseRecv) } } } @@ -475,7 +524,7 @@ func (c *Conn) handleBootstrap(ctx context.Context, id answerID) error { c.answers[id] = errorAnswer(c, id, err) c.unlockSender() c.mu.Unlock() - c.report(err) + c.er.ReportError(err) return nil } ret.SetAnswerId(uint32(id)) @@ -520,23 +569,19 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn id := answerID(call.QuestionId()) if call.SendResultsTo().Which() != rpccp.Call_sendResultsTo_Which_caller { // TODO(someday): handle SendResultsTo.yourself - c.reportf("incoming call: results destination is not caller") - c.mu.Lock() - err := c.sendMessage(ctx, func(m rpccp.Message) error { + c.er.reportf("incoming call: results destination is not caller") + + c.sendq.SendAsync(ctx, prepFunc(func(m rpccp.Message) error { + defer releaseCall() + mm, err := m.NewUnimplemented() if err != nil { return err } - if err := mm.SetCall(call); err != nil { - return err - } - return nil - }) - c.mu.Unlock() - releaseCall() - if err != nil { - c.report(annotate(err).errorf("incoming call: send unimplemented")) - } + + return mm.SetCall(call) + }), c.er.annotater("incoming call: send unimplemented")) + return nil } @@ -564,7 +609,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn c.answers[id] = errorAnswer(c, id, err) c.unlockSender() c.mu.Unlock() - c.report(err) + c.er.ReportError(err) clearCapTable(call.Message()) releaseCall() return nil @@ -587,7 +632,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn rl := ans.sendException(parseErr) c.unlockSender() c.mu.Unlock() - c.report(parseErr) + c.er.ReportError(parseErr) rl.release() clearCapTable(call.Message()) releaseCall() @@ -673,7 +718,7 @@ func (c *Conn) handleCall(ctx context.Context, call rpccp.Call, releaseCall capn rl.release() clearCapTable(call.Message()) releaseCall() - c.report(err) + c.er.ReportError(err) return nil } sub, err := capnp.Transform(content, p.target.transform) @@ -855,7 +900,7 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, releaseRet ca } pr := c.parseReturn(ret, q.called) // fills in CapTable if pr.parseFailed { - c.report(annotate(pr.err).errorf("incoming return")) + c.er.annotatef(pr.err, "incoming return") } switch { case q.bootstrapPromise != nil && pr.err == nil: @@ -898,7 +943,7 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, releaseRet ca if err := c.tryLockSender(ctx); err != nil { close(q.finishMsgSend) c.mu.Unlock() - c.report(annotate(err).errorf("incoming return: send finish")) + c.er.annotatef(err, "incoming return: send finish") return nil } c.mu.Unlock() @@ -910,18 +955,18 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, releaseRet ca for i := range pr.disembargoes { msg, send, release, err := c.transport.NewMessage(ctx) if err != nil { - c.report(errorf("incoming return: send disembargo: create message: %v", err)) + c.er.reportf("incoming return: send disembargo: create message: %v", err) continue } if err := pr.disembargoes[i].buildDisembargo(msg); err != nil { release() - c.report(annotate(err).errorf("incoming return")) + c.er.annotatef(err, "incoming return") continue } err = send() release() if err != nil { - c.report(errorf("incoming return: send disembargo: %v", err)) + c.er.reportf("incoming return: send disembargo: %v", err) continue } } @@ -934,7 +979,7 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, releaseRet ca c.unlockSender() close(q.finishMsgSend) c.mu.Unlock() - c.report(annotate(err).errorf("incoming return: send finish")) + c.er.annotatef(err, "incoming return: send finish") return nil } fin, err := msg.NewFinish() @@ -944,7 +989,7 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, releaseRet ca c.unlockSender() close(q.finishMsgSend) c.mu.Unlock() - c.report(errorf("incoming return: send finish: build message: %v", err)) + c.er.reportf("incoming return: send finish: build message: %v", err) return nil } fin.SetQuestionId(uint32(qid)) @@ -956,7 +1001,7 @@ func (c *Conn) handleReturn(ctx context.Context, ret rpccp.Return, releaseRet ca c.unlockSender() close(q.finishMsgSend) c.mu.Unlock() - c.report(errorf("incoming return: send finish: build message: %v", err)) + c.er.reportf("incoming return: send finish: build message: %v", err) return nil } } @@ -1132,7 +1177,7 @@ func (c *Conn) recvPayload(payload rpccp.Payload) (_ capnp.Ptr, locals uintSet, if err != nil { // Don't allow unreadable capability table to stop other results, // just present an empty capability table. - c.reportf("read payload: capability table: %v", err) + c.er.reportf("read payload: capability table: %v", err) return p, nil, nil } mtab := make([]*capnp.Client, ptab.Len()) @@ -1163,18 +1208,22 @@ func (c *Conn) handleRelease(ctx context.Context, id exportID, count uint32) err return nil } -func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo) error { +func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo, releaseCall capnp.ReleaseFunc) error { dtarget, err := d.Target() if err != nil { + releaseCall() return errorf("incoming disembargo: read target: %v", err) } var tgt parsedMessageTarget if err := parseMessageTarget(&tgt, dtarget); err != nil { - return annotate(err).errorf("incoming disembargo") + releaseCall() + return annotatef(err, "incoming disembargo") } switch d.Context().Which() { case rpccp.Disembargo_context_Which_receiverLoopback: + defer releaseCall() + id := embargoID(d.Context().ReceiverLoopback()) c.mu.Lock() e := c.findEmbargo(id) @@ -1187,38 +1236,46 @@ func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo) error { c.embargoID.remove(uint32(id)) c.mu.Unlock() e.lift() + case rpccp.Disembargo_context_Which_senderLoopback: c.mu.Lock() if tgt.which != rpccp.MessageTarget_Which_promisedAnswer { c.mu.Unlock() + releaseCall() return fail("incoming disembargo: sender loopback: target is not a promised answer") } ans := c.answers[tgt.promisedAnswer] if ans == nil { c.mu.Unlock() + releaseCall() return errorf("incoming disembargo: unknown answer ID %d", tgt.promisedAnswer) } if ans.flags&returnSent == 0 { c.mu.Unlock() + releaseCall() return errorf("incoming disembargo: answer ID %d has not sent return", tgt.promisedAnswer) } if ans.err != nil { c.mu.Unlock() + releaseCall() return errorf("incoming disembargo: answer ID %d returned exception", tgt.promisedAnswer) } content, err := ans.results.Content() if err != nil { c.mu.Unlock() + releaseCall() return errorf("incoming disembargo: read answer ID %d: %v", tgt.promisedAnswer, err) } ptr, err := capnp.Transform(content, tgt.transform) if err != nil { c.mu.Unlock() + releaseCall() return errorf("incoming disembargo: read answer ID %d: %v", tgt.promisedAnswer, err) } iface := ptr.Interface() if !iface.IsValid() || int64(iface.Capability()) >= int64(len(ans.resultCapTable)) { c.mu.Unlock() + releaseCall() return fail("incoming disembargo: sender loopback requested on a capability that is not an import") } client := ans.resultCapTable[iface.Capability()].AddRef() @@ -1228,63 +1285,55 @@ func (c *Conn) handleDisembargo(ctx context.Context, d rpccp.Disembargo) error { if !ok || imp.c != c { c.mu.Unlock() client.Release() + releaseCall() return fail("incoming disembargo: sender loopback requested on a capability that is not an import") } + c.mu.Unlock() // TODO(maybe): check generation? // Since this Cap'n Proto RPC implementation does not send imports // unless they are fully dequeued, we can just immediately loop back. id := d.Context().SenderLoopback() - err = c.sendMessage(ctx, func(msg rpccp.Message) error { - d, err := msg.NewDisembargo() + c.sendq.SendAsync(ctx, prepFunc(func(m rpccp.Message) error { + defer releaseCall() + defer client.Release() + + d, err := m.NewDisembargo() if err != nil { return err } + tgt, err := d.NewTarget() - if err != nil { - return err + if err == nil { + tgt.SetImportedCap(uint32(imp.id)) + d.Context().SetReceiverLoopback(id) } - tgt.SetImportedCap(uint32(imp.id)) - d.Context().SetReceiverLoopback(id) - return nil - }) - c.mu.Unlock() - client.Release() - if err != nil { - c.report(annotate(err).errorf("incoming disembargo: send receiver loopback")) - } + + return err + }), c.er.annotater("incoming disembargo: send receiver loopback")) + default: - c.reportf("incoming disembargo: context %v not implemented", d.Context().Which()) - c.mu.Lock() - err := c.sendMessage(ctx, func(msg rpccp.Message) error { - mm, err := msg.NewUnimplemented() + c.er.reportf("incoming disembargo: context %v not implemented", d.Context().Which()) + c.sendq.SendAsync(ctx, prepFunc(func(m rpccp.Message) error { + defer releaseCall() + um, err := m.NewUnimplemented() if err != nil { return err } - if err := mm.SetDisembargo(d); err != nil { - return err - } - return nil - }) - c.mu.Unlock() - if err != nil { - c.report(annotate(err).errorf("incoming disembargo: send unimplemented")) - } + return um.SetDisembargo(d) + }), c.er.annotater("incoming disembargo: send unimplemented")) } + return nil } -func (c *Conn) handleUnknownMessage(ctx context.Context, recv rpccp.Message) error { - c.reportf("unknown message type %v from remote", recv.Which()) - c.mu.Lock() - err := c.sendMessage(ctx, func(msg rpccp.Message) error { - return msg.SetUnimplemented(recv) - }) - c.mu.Unlock() - if err != nil { - c.report(annotate(err).errorf("send unimplemented")) - } - return nil +func (c *Conn) handleUnknownMessage(ctx context.Context, recv rpccp.Message, releaseRecv capnp.ReleaseFunc) { + c.er.reportf("unknown message type %v from remote", recv.Which()) + c.sendq.SendAsync(ctx, + prepFunc(func(m rpccp.Message) error { + return m.SetUnimplemented(recv) + }), + c.er.annotater("send unimplemented")) } // startTask increments c.tasks if c is not shutting down. @@ -1301,38 +1350,6 @@ func (c *Conn) startTask() bool { } } -// sendMessage creates a new message on the Sender, calls f to build it, -// and sends it if f does not return an error. When f returns, the -// message must have a nil CapTable. The caller must be holding onto -// c.mu. While f is being called, it will be holding onto the sender -// lock, but not c.mu. -func (c *Conn) sendMessage(ctx context.Context, f func(msg rpccp.Message) error) error { - if err := c.tryLockSender(ctx); err != nil { - return err - } - c.mu.Unlock() - msg, send, release, err := c.transport.NewMessage(ctx) - if err != nil { - c.mu.Lock() - c.unlockSender() - return errorf("create message: %v", err) - } - if err := f(msg); err != nil { - release() - c.mu.Lock() - c.unlockSender() - return errorf("build message: %v", err) - } - err = send() - release() - c.mu.Lock() - c.unlockSender() - if err != nil { - return errorf("send message: %v", err) - } - return nil -} - // tryLockSender attempts to acquire the sender lock, returning an error // if either the Context is Done or c starts shutdown before the lock // can be acquired. The caller must be holding c.mu. @@ -1384,23 +1401,6 @@ func (c *Conn) unlockSender() { c.sendCond = nil } -// report sends an error to c's reporter. The caller does not have to -// be holding c.mu. -func (c *Conn) report(err error) { - if c.reporter == nil { - return - } - c.reporter.ReportError(err) -} - -// reportf formats an error and sends it to c's reporter. -func (c *Conn) reportf(format string, args ...interface{}) { - if c.reporter == nil { - return - } - c.reporter.ReportError(errorf(format, args...)) -} - func clearCapTable(msg *capnp.Message) { releaseList(msg.CapTable).release() msg.CapTable = nil @@ -1422,6 +1422,34 @@ func unimplementedf(format string, args ...interface{}) error { return errors.New(errors.Unimplemented, "rpc", fmt.Sprintf(format, args...)) } +func annotatef(err error, format string, args ...interface{}) error { + return errors.Annotate("rpc", fmt.Sprintf(format, args...), err) +} + +type annotatingErrReporter struct { + ErrorReporter +} + +func (er annotatingErrReporter) ReportError(err error) { + if er.ErrorReporter != nil && err != nil { + er.ErrorReporter.ReportError(err) + } +} + +func (er annotatingErrReporter) reportf(format string, args ...interface{}) { + er.ReportError(errorf(format, args...)) +} + +func (er annotatingErrReporter) annotatef(err error, format string, args ...interface{}) { + er.ReportError(annotatef(err, format, args...)) +} + +func (er annotatingErrReporter) annotater(format string, args ...interface{}) errorReporterFunc { + return func(err error) { + er.ReportError(annotatef(err, format, args...)) + } +} + type annotater struct { err error } diff --git a/rpc/send.go b/rpc/send.go new file mode 100644 index 00000000..bedb462c --- /dev/null +++ b/rpc/send.go @@ -0,0 +1,108 @@ +package rpc + +import ( + "context" + "unsafe" + + "capnproto.org/go/capnp/v3/internal/mpsc" + rpccp "capnproto.org/go/capnp/v3/std/capnp/rpc" +) + +type preparer interface { + Prepare(rpccp.Message) error +} + +type sendQueue mpsc.Queue + +func newSendQueue() *sendQueue { return (*sendQueue)(mpsc.New()) } + +func (sq *sendQueue) Send(ctx context.Context, p preparer) (err error) { + cherr := cherrPool.Get() + defer cherrPool.Put(cherr) + + sq.SendAsync(ctx, p, cherr) + + select { + case err = <-cherr: + case <-ctx.Done(): + err = ctx.Err() + } + + return +} + +func (sq *sendQueue) SendAsync(ctx context.Context, p preparer, r ErrorReporter) { + (*mpsc.Queue)(sq).Send(sendReq{ + er: r, + preparer: p, + }) +} + +func (sq *sendQueue) Recv(ctx context.Context) preparer { + v, err := (*mpsc.Queue)(sq).Recv(ctx) + if err != nil { + return sendFailure(err) + } + + return *(*preparer)(unsafe.Pointer(&v)) +} + +type prepFunc func(rpccp.Message) error + +func (prepare prepFunc) Prepare(m rpccp.Message) error { return prepare(m) } + +func sendFailure(err error) prepFunc { + return func(rpccp.Message) error { return err } +} + +type sendReq struct { + er ErrorReporter + preparer +} + +func (r sendReq) Prepare(msg rpccp.Message) (err error) { + if err = r.preparer.Prepare(msg); r.er != nil { + r.er.ReportError(err) + } + + return +} + +type errorReporterFunc func(error) + +func (report errorReporterFunc) ReportError(err error) { report(err) } + +type errChan chan error + +func (cherr errChan) ReportError(err error) { + cherr <- err // buffered +} + +var cherrPool = make(errChanPool, 64) + +type errChanPool chan errChan + +func (pool errChanPool) Get() (cherr errChan) { + select { + case cherr = <-pool: + default: + cherr = make(errChan, 1) + } + + return +} + +func (pool errChanPool) Put(cherr errChan) { + select { + case _, ok := <-cherr: + if !ok { + return + } + default: + } + + select { + case pool <- cherr: + default: + } +} diff --git a/server/server.go b/server/server.go index 8ee2601e..1722ada3 100644 --- a/server/server.go +++ b/server/server.go @@ -103,6 +103,10 @@ type Server struct { // call. It is closed when the acknowledgement is received. starting <-chan struct{} + // full is non-nil if a start() is waiting for a space in ongoing to + // free up. It is closed and set to nil when the next call returns. + full chan<- struct{} + // drain is non-nil when Shutdown starts and is closed by the last // call to return. drain chan struct{} @@ -119,15 +123,14 @@ type cstate struct { type Policy struct { // MaxConcurrentCalls is the maximum number of methods allowed to be // executing on a single Server simultaneously. Attempts to make more - // calls than this limit will result in immediate error answers. + // calls than this limit will block. // // If this is zero, then a reasonably small default is used. MaxConcurrentCalls int // AnswerQueueSize is the maximum number of methods allowed to be // enqueued on a single returned Answer while it is unresolved. - // Attempts to enqueue more calls than this limit will result in - // immediate error answers. + // Attempts to enqueue more calls than this limit will block. // // If this is zero, then a reasonably small default is used. AnswerQueueSize int @@ -221,17 +224,29 @@ func (srv *Server) start(ctx context.Context, m *Method, r capnp.Recv) capnp.Pip // Acquire an ID (semaphore). id := srv.nextID() if id == -1 { - defer srv.mu.Unlock() - - err := errors.New(errors.Overloaded, "capnp server", "max concurrent calls exceeded") + full := make(chan struct{}) + srv.full = full + srv.mu.Unlock() + select { + case <-full: + case <-ctx.Done(): + srv.mu.Lock() + srv.starting = nil + close(starting) + srv.full = nil // full could be nil or non-nil, ensure it is nil. + srv.mu.Unlock() + r.Reject(ctx.Err()) + return nil + } + srv.mu.Lock() + id = srv.nextID() if srv.drain != nil { srv.starting = nil close(starting) - err = errors.New(errors.Failed, "capnp server", "call after shutdown") + srv.mu.Unlock() + r.Reject(errors.New(errors.Failed, "capnp server", "call after shutdown")) + return nil } - - r.Reject(err) - return nil } // Bookkeeping: set starting to indicate we're waiting for an ack and @@ -254,15 +269,17 @@ func (srv *Server) start(ctx context.Context, m *Method, r capnp.Recv) capnp.Pip aq.reject(err) r.Returner.Return(err) } - srv.mu.Lock() srv.ongoing[id].cancel() srv.ongoing[id] = cstate{} if srv.drain != nil && !srv.hasOngoing() { close(srv.drain) } + if srv.full != nil { + close(srv.full) + srv.full = nil + } srv.mu.Unlock() - close(done) }() var pcall capnp.PipelineCaller diff --git a/server/server_test.go b/server/server_test.go index 3acd2849..235b9902 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -6,12 +6,10 @@ import ( "sync" "testing" - "context" - "capnproto.org/go/capnp/v3" air "capnproto.org/go/capnp/v3/internal/aircraftlib" "capnproto.org/go/capnp/v3/server" - "github.com/stretchr/testify/assert" + "context" ) type echoImpl struct{} @@ -105,6 +103,25 @@ func (seq *callSeq) GetNumber(ctx context.Context, call air.CallSequence_getNumb return nil } +type lockCallSeq struct { + n uint32 + mu sync.Mutex +} + +func (seq *lockCallSeq) GetNumber(ctx context.Context, call air.CallSequence_getNumber) error { + seq.mu.Lock() + defer seq.mu.Unlock() + call.Ack() + + r, err := call.AllocResults() + if err != nil { + return err + } + r.SetN(seq.n) + seq.n++ + return nil +} + func TestServerCallOrder(t *testing.T) { tests := []struct { name string @@ -150,36 +167,30 @@ func TestServerCallOrder(t *testing.T) { } func TestServerMaxConcurrentCalls(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - wait := make(chan struct{}) echo := air.Echo_ServerToClient(blockingEchoImpl{wait}, &server.Policy{ MaxConcurrentCalls: 2, }) defer echo.Client.Release() - + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() call1, finish := echo.Echo(ctx, nil) defer finish() - call2, finish := echo.Echo(ctx, nil) defer finish() - go close(wait) - call3, finish := echo.Echo(ctx, nil) defer finish() - <-wait - - _, err := call1.Struct() - assert.NoError(t, err, "call1 should succeed") - - _, err = call2.Struct() - assert.NoError(t, err, "call2 should succeed") - - _, err = call3.Struct() - assert.Error(t, err, "call3 should fail") + if _, err := call1.Struct(); err != nil { + t.Error("Echo #1:", err) + } + if _, err := call2.Struct(); err != nil { + t.Error("Echo #2:", err) + } + if _, err := call3.Struct(); err != nil { + t.Error("Echo #3:", err) + } } func TestServerShutdown(t *testing.T) { From 66f6c3f9e309425892a918797043afe1715df1bb Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sun, 5 Dec 2021 20:59:16 -0500 Subject: [PATCH 3/5] Fix deadlock in question.handleCancel. --- rpc/question.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/rpc/question.go b/rpc/question.go index 27456412..b12fa96f 100644 --- a/rpc/question.go +++ b/rpc/question.go @@ -92,9 +92,6 @@ func (q *question) handleCancel(ctx context.Context) { } return err }), errorReporterFunc(func(err error) { - q.c.mu.Lock() - defer q.c.mu.Unlock() - if err == nil { q.flags |= finishSent } else { From 2ae55934c637705258e1b2d07e3c75c7d16fca30 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sun, 5 Dec 2021 21:00:22 -0500 Subject: [PATCH 4/5] Add comment. --- rpc/question.go | 1 + 1 file changed, 1 insertion(+) diff --git a/rpc/question.go b/rpc/question.go index b12fa96f..1b4934bd 100644 --- a/rpc/question.go +++ b/rpc/question.go @@ -92,6 +92,7 @@ func (q *question) handleCancel(ctx context.Context) { } return err }), errorReporterFunc(func(err error) { + // q.c.mu will be held by c.sendMessage when callback is invoked if err == nil { q.flags |= finishSent } else { From 771641d7e86cf3c5f0994cfa36694965e9d6ad73 Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Sat, 19 Feb 2022 17:08:55 -0500 Subject: [PATCH 5/5] Address https://github.com/capnproto/go-capnproto2/pull/196#discussion_r762659432 --- rpc/rpc.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/rpc/rpc.go b/rpc/rpc.go index 8e91a50f..afe694cb 100644 --- a/rpc/rpc.go +++ b/rpc/rpc.go @@ -164,13 +164,8 @@ func NewConn(t Transport, opts *Options) *Conn { } // start background tasks - for _, f := range []func(context.Context) error{ - c.send, - c.receive, - } { - c.tasks.Add(1) - g.Go(c.newTask(f)) - } + g.Go(c.newTask(c.send)) + g.Go(c.newTask(c.receive)) // monitor background tasks go func() {