diff --git a/conn.go b/conn.go index 18466cb..35c6279 100644 --- a/conn.go +++ b/conn.go @@ -27,6 +27,7 @@ type Conn struct { sending sync.Mutex + cancelCtx context.CancelFunc disconnect chan struct{} logger Logger @@ -43,13 +44,19 @@ var _ JSONRPC2 = (*Conn)(nil) // JSON-RPC protocol is symmetric, so a Conn runs on both ends of a // client-server connection. // -// NewClient consumes conn, so you should call Close on the returned -// client not on the given conn. +// NewConn consumes stream, so you should call Close on the returned +// Conn not on the given stream or its underlying connection. +// +// Conn is closed when the given context's Done channel is closed. func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOpt) *Conn { + + ctx, cancel := context.WithCancel(ctx) + c := &Conn{ stream: stream, h: h, pending: map[ID]*call{}, + cancelCtx: cancel, disconnect: make(chan struct{}), logger: log.New(os.Stderr, "", log.LstdFlags), } @@ -60,6 +67,12 @@ func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOp opt(c) } go c.readMessages(ctx) + + go func() { + <-ctx.Done() + c.close(nil) + }() + return c } @@ -182,6 +195,7 @@ func (c *Conn) close(cause error) error { } close(c.disconnect) + c.cancelCtx() c.closed = true return c.stream.Close() } diff --git a/conn_test.go b/conn_test.go index aa32fd9..5d2a7e4 100644 --- a/conn_test.go +++ b/conn_test.go @@ -14,6 +14,58 @@ import ( "github.com/sourcegraph/jsonrpc2" ) +func TestConn(t *testing.T) { + + t.Run("closes when context is done", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + connA, connB := Pipe(ctx, noopHandler{}, noopHandler{}) + defer connA.Close() + defer connB.Close() + + cancel() + <-connA.DisconnectNotify() + + got := connA.Close() + want := jsonrpc2.ErrClosed + if got != want { + t.Fatalf("got %v, want %v", got, want) + } + }) + + t.Run("cancels context when closed", func(t *testing.T) { + ctxCanceled := make(chan struct{}) + + handler := handlerFunc(func(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Request) { + // Block until the context is canceled. + <-ctx.Done() + close(ctxCanceled) + }) + + connA, connB := Pipe(context.Background(), noopHandler{}, jsonrpc2.AsyncHandler(handler)) + defer connA.Close() + defer connB.Close() + + // Send a notification from connA to connB to trigger connB's handler + // function. + if err := connA.Notify(context.Background(), "foo", nil, nil); err != nil { + t.Fatal(err) + } + + // Disconnect connA from connB. + if err := connA.Close(); err != nil { + t.Fatal(err) + } + + select { + case <-ctxCanceled: + // Test passed, the handler's context was canceled. + case <-time.After(time.Second): + t.Fatal("context not canceled") + } + }) +} + var paramsTests = []struct { sendParams interface{} wantParams *json.RawMessage @@ -198,12 +250,12 @@ func testParams(t *testing.T, want *json.RawMessage, fn func(c *jsonrpc2.Conn) e wg.Done() }) - client, server := newClientServer(handler) - defer client.Close() - defer server.Close() + connA, connB := Pipe(context.Background(), noopHandler{}, handler) + defer connA.Close() + defer connB.Close() wg.Add(1) - if err := fn(client); err != nil { + if err := fn(connA); err != nil { t.Error(err) } wg.Wait() @@ -242,18 +294,11 @@ func assertRawJSONMessage(t *testing.T, got *json.RawMessage, want *json.RawMess } } -func newClientServer(handler jsonrpc2.Handler) (client *jsonrpc2.Conn, server *jsonrpc2.Conn) { - ctx := context.Background() - connA, connB := net.Pipe() - client = jsonrpc2.NewConn( - ctx, - jsonrpc2.NewPlainObjectStream(connA), - noopHandler{}, - ) - server = jsonrpc2.NewConn( - ctx, - jsonrpc2.NewPlainObjectStream(connB), - handler, - ) - return client, server +// Pipe returns two jsonrpc2.Conn, connected via a synchronous, in-memory, full +// duplex network connection. +func Pipe(ctx context.Context, handlerA, handlerB jsonrpc2.Handler) (connA *jsonrpc2.Conn, connB *jsonrpc2.Conn) { + a, b := net.Pipe() + connA = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(a), handlerA) + connB = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(b), handlerB) + return connA, connB } diff --git a/jsonrpc2.go b/jsonrpc2.go index 97e26d7..7d3e132 100644 --- a/jsonrpc2.go +++ b/jsonrpc2.go @@ -59,10 +59,10 @@ const ( // Handler handles JSON-RPC requests and notifications. type Handler interface { - // Handle is called to handle a request. No other requests are handled - // until it returns. If you do not require strict ordering behavior - // of received RPCs, it is suggested to wrap your handler in - // AsyncHandler. + // Handle is called to handle a request. No other requests are handled until + // it returns. If you do not require strict ordering behavior of received + // RPCs, it is suggested to wrap your handler in AsyncHandler. The context + // is automatically canceled when the connection closes. Handle(context.Context, *Conn, *Request) }