From 9ca8fdc65028752165e4e4cdd04cea2bfadd0040 Mon Sep 17 00:00:00 2001 From: Sam Herrmann Date: Wed, 13 Aug 2025 10:54:12 -0400 Subject: [PATCH 1/2] Cancel Handler context when connection closes --- conn.go | 6 +++++ conn_test.go | 62 ++++++++++++++++++++++++++++++++++++++-------------- jsonrpc2.go | 8 +++---- 3 files changed, 55 insertions(+), 21 deletions(-) diff --git a/conn.go b/conn.go index 18466cb..33629a8 100644 --- a/conn.go +++ b/conn.go @@ -27,6 +27,7 @@ type Conn struct { sending sync.Mutex + cancelCtx context.CancelFunc disconnect chan struct{} logger Logger @@ -46,10 +47,14 @@ var _ JSONRPC2 = (*Conn)(nil) // NewClient consumes conn, so you should call Close on the returned // client not on the given conn. func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOpt) *Conn { + + ctx, cancel := context.WithCancel(ctx) + c := &Conn{ stream: stream, h: h, pending: map[ID]*call{}, + cancelCtx: cancel, disconnect: make(chan struct{}), logger: log.New(os.Stderr, "", log.LstdFlags), } @@ -182,6 +187,7 @@ func (c *Conn) close(cause error) error { } close(c.disconnect) + c.cancelCtx() c.closed = true return c.stream.Close() } diff --git a/conn_test.go b/conn_test.go index aa32fd9..7313465 100644 --- a/conn_test.go +++ b/conn_test.go @@ -14,6 +14,40 @@ import ( "github.com/sourcegraph/jsonrpc2" ) +func TestConn(t *testing.T) { + t.Run("cancels context when closed", func(t *testing.T) { + ctxCanceled := make(chan struct{}) + + handler := handlerFunc(func(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Request) { + // Block until the context is canceled. + <-ctx.Done() + close(ctxCanceled) + }) + + connA, connB := Pipe(noopHandler{}, jsonrpc2.AsyncHandler(handler)) + defer connA.Close() + defer connB.Close() + + // Send a notification from connA to connB to trigger connB's handler + // function. + if err := connA.Notify(context.Background(), "foo", nil, nil); err != nil { + t.Fatal(err) + } + + // Disconnect connA from connB. + if err := connA.Close(); err != nil { + t.Fatal(err) + } + + select { + case <-ctxCanceled: + // Test passed, the handler's context was canceled. + case <-time.After(time.Second): + t.Fatal("context not canceled") + } + }) +} + var paramsTests = []struct { sendParams interface{} wantParams *json.RawMessage @@ -198,12 +232,12 @@ func testParams(t *testing.T, want *json.RawMessage, fn func(c *jsonrpc2.Conn) e wg.Done() }) - client, server := newClientServer(handler) - defer client.Close() - defer server.Close() + connA, connB := Pipe(noopHandler{}, handler) + defer connA.Close() + defer connB.Close() wg.Add(1) - if err := fn(client); err != nil { + if err := fn(connA); err != nil { t.Error(err) } wg.Wait() @@ -242,18 +276,12 @@ func assertRawJSONMessage(t *testing.T, got *json.RawMessage, want *json.RawMess } } -func newClientServer(handler jsonrpc2.Handler) (client *jsonrpc2.Conn, server *jsonrpc2.Conn) { +// Pipe returns two jsonrpc2.Conn, connected via a synchronous, in-memory, full +// duplex network connection. +func Pipe(handlerA, handlerB jsonrpc2.Handler) (connA *jsonrpc2.Conn, connB *jsonrpc2.Conn) { ctx := context.Background() - connA, connB := net.Pipe() - client = jsonrpc2.NewConn( - ctx, - jsonrpc2.NewPlainObjectStream(connA), - noopHandler{}, - ) - server = jsonrpc2.NewConn( - ctx, - jsonrpc2.NewPlainObjectStream(connB), - handler, - ) - return client, server + a, b := net.Pipe() + connA = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(a), handlerA) + connB = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(b), handlerB) + return connA, connB } diff --git a/jsonrpc2.go b/jsonrpc2.go index 97e26d7..7d3e132 100644 --- a/jsonrpc2.go +++ b/jsonrpc2.go @@ -59,10 +59,10 @@ const ( // Handler handles JSON-RPC requests and notifications. type Handler interface { - // Handle is called to handle a request. No other requests are handled - // until it returns. If you do not require strict ordering behavior - // of received RPCs, it is suggested to wrap your handler in - // AsyncHandler. + // Handle is called to handle a request. No other requests are handled until + // it returns. If you do not require strict ordering behavior of received + // RPCs, it is suggested to wrap your handler in AsyncHandler. The context + // is automatically canceled when the connection closes. Handle(context.Context, *Conn, *Request) } From bb87722ac29db0ab48ec5575e3ddc194a3b6355b Mon Sep 17 00:00:00 2001 From: Sam Herrmann Date: Thu, 14 Aug 2025 11:35:57 -0400 Subject: [PATCH 2/2] Close Conn when given context is done --- conn.go | 12 ++++++++++-- conn_test.go | 25 +++++++++++++++++++++---- 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/conn.go b/conn.go index 33629a8..35c6279 100644 --- a/conn.go +++ b/conn.go @@ -44,8 +44,10 @@ var _ JSONRPC2 = (*Conn)(nil) // JSON-RPC protocol is symmetric, so a Conn runs on both ends of a // client-server connection. // -// NewClient consumes conn, so you should call Close on the returned -// client not on the given conn. +// NewConn consumes stream, so you should call Close on the returned +// Conn not on the given stream or its underlying connection. +// +// Conn is closed when the given context's Done channel is closed. func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOpt) *Conn { ctx, cancel := context.WithCancel(ctx) @@ -65,6 +67,12 @@ func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOp opt(c) } go c.readMessages(ctx) + + go func() { + <-ctx.Done() + c.close(nil) + }() + return c } diff --git a/conn_test.go b/conn_test.go index 7313465..5d2a7e4 100644 --- a/conn_test.go +++ b/conn_test.go @@ -15,6 +15,24 @@ import ( ) func TestConn(t *testing.T) { + + t.Run("closes when context is done", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + connA, connB := Pipe(ctx, noopHandler{}, noopHandler{}) + defer connA.Close() + defer connB.Close() + + cancel() + <-connA.DisconnectNotify() + + got := connA.Close() + want := jsonrpc2.ErrClosed + if got != want { + t.Fatalf("got %v, want %v", got, want) + } + }) + t.Run("cancels context when closed", func(t *testing.T) { ctxCanceled := make(chan struct{}) @@ -24,7 +42,7 @@ func TestConn(t *testing.T) { close(ctxCanceled) }) - connA, connB := Pipe(noopHandler{}, jsonrpc2.AsyncHandler(handler)) + connA, connB := Pipe(context.Background(), noopHandler{}, jsonrpc2.AsyncHandler(handler)) defer connA.Close() defer connB.Close() @@ -232,7 +250,7 @@ func testParams(t *testing.T, want *json.RawMessage, fn func(c *jsonrpc2.Conn) e wg.Done() }) - connA, connB := Pipe(noopHandler{}, handler) + connA, connB := Pipe(context.Background(), noopHandler{}, handler) defer connA.Close() defer connB.Close() @@ -278,8 +296,7 @@ func assertRawJSONMessage(t *testing.T, got *json.RawMessage, want *json.RawMess // Pipe returns two jsonrpc2.Conn, connected via a synchronous, in-memory, full // duplex network connection. -func Pipe(handlerA, handlerB jsonrpc2.Handler) (connA *jsonrpc2.Conn, connB *jsonrpc2.Conn) { - ctx := context.Background() +func Pipe(ctx context.Context, handlerA, handlerB jsonrpc2.Handler) (connA *jsonrpc2.Conn, connB *jsonrpc2.Conn) { a, b := net.Pipe() connA = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(a), handlerA) connB = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(b), handlerB)