Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Conn struct {

sending sync.Mutex

cancelCtx context.CancelFunc
disconnect chan struct{}

logger Logger
Expand All @@ -43,13 +44,19 @@ var _ JSONRPC2 = (*Conn)(nil)
// JSON-RPC protocol is symmetric, so a Conn runs on both ends of a
// client-server connection.
//
// NewClient consumes conn, so you should call Close on the returned
// client not on the given conn.
// NewConn consumes stream, so you should call Close on the returned
// Conn not on the given stream or its underlying connection.
//
// Conn is closed when the given context's Done channel is closed.
func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOpt) *Conn {

ctx, cancel := context.WithCancel(ctx)

c := &Conn{
stream: stream,
h: h,
pending: map[ID]*call{},
cancelCtx: cancel,
disconnect: make(chan struct{}),
logger: log.New(os.Stderr, "", log.LstdFlags),
}
Expand All @@ -60,6 +67,12 @@ func NewConn(ctx context.Context, stream ObjectStream, h Handler, opts ...ConnOp
opt(c)
}
go c.readMessages(ctx)

go func() {
<-ctx.Done()
c.close(nil)
}()
Comment on lines +71 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right so this is safe since we will always call cancel when done. lgtm


return c
}

Expand Down Expand Up @@ -182,6 +195,7 @@ func (c *Conn) close(cause error) error {
}

close(c.disconnect)
c.cancelCtx()
c.closed = true
return c.stream.Close()
}
Expand Down
81 changes: 63 additions & 18 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,58 @@ import (
"github.com/sourcegraph/jsonrpc2"
)

func TestConn(t *testing.T) {

t.Run("closes when context is done", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

connA, connB := Pipe(ctx, noopHandler{}, noopHandler{})
defer connA.Close()
defer connB.Close()

cancel()
<-connA.DisconnectNotify()

got := connA.Close()
want := jsonrpc2.ErrClosed
if got != want {
t.Fatalf("got %v, want %v", got, want)
}
})

t.Run("cancels context when closed", func(t *testing.T) {
ctxCanceled := make(chan struct{})

handler := handlerFunc(func(ctx context.Context, c *jsonrpc2.Conn, r *jsonrpc2.Request) {
// Block until the context is canceled.
<-ctx.Done()
close(ctxCanceled)
})

connA, connB := Pipe(context.Background(), noopHandler{}, jsonrpc2.AsyncHandler(handler))
defer connA.Close()
defer connB.Close()

// Send a notification from connA to connB to trigger connB's handler
// function.
if err := connA.Notify(context.Background(), "foo", nil, nil); err != nil {
t.Fatal(err)
}

// Disconnect connA from connB.
if err := connA.Close(); err != nil {
t.Fatal(err)
}

select {
case <-ctxCanceled:
// Test passed, the handler's context was canceled.
case <-time.After(time.Second):
t.Fatal("context not canceled")
}
})
}

var paramsTests = []struct {
sendParams interface{}
wantParams *json.RawMessage
Expand Down Expand Up @@ -198,12 +250,12 @@ func testParams(t *testing.T, want *json.RawMessage, fn func(c *jsonrpc2.Conn) e
wg.Done()
})

client, server := newClientServer(handler)
defer client.Close()
defer server.Close()
connA, connB := Pipe(context.Background(), noopHandler{}, handler)
defer connA.Close()
defer connB.Close()

wg.Add(1)
if err := fn(client); err != nil {
if err := fn(connA); err != nil {
t.Error(err)
}
wg.Wait()
Expand Down Expand Up @@ -242,18 +294,11 @@ func assertRawJSONMessage(t *testing.T, got *json.RawMessage, want *json.RawMess
}
}

func newClientServer(handler jsonrpc2.Handler) (client *jsonrpc2.Conn, server *jsonrpc2.Conn) {
ctx := context.Background()
connA, connB := net.Pipe()
client = jsonrpc2.NewConn(
ctx,
jsonrpc2.NewPlainObjectStream(connA),
noopHandler{},
)
server = jsonrpc2.NewConn(
ctx,
jsonrpc2.NewPlainObjectStream(connB),
handler,
)
return client, server
// Pipe returns two jsonrpc2.Conn, connected via a synchronous, in-memory, full
// duplex network connection.
func Pipe(ctx context.Context, handlerA, handlerB jsonrpc2.Handler) (connA *jsonrpc2.Conn, connB *jsonrpc2.Conn) {
a, b := net.Pipe()
connA = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(a), handlerA)
connB = jsonrpc2.NewConn(ctx, jsonrpc2.NewPlainObjectStream(b), handlerB)
return connA, connB
}
8 changes: 4 additions & 4 deletions jsonrpc2.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,10 @@ const (

// Handler handles JSON-RPC requests and notifications.
type Handler interface {
// Handle is called to handle a request. No other requests are handled
// until it returns. If you do not require strict ordering behavior
// of received RPCs, it is suggested to wrap your handler in
// AsyncHandler.
// Handle is called to handle a request. No other requests are handled until
// it returns. If you do not require strict ordering behavior of received
// RPCs, it is suggested to wrap your handler in AsyncHandler. The context
// is automatically canceled when the connection closes.
Handle(context.Context, *Conn, *Request)
}

Expand Down