Skip to content

Commit 356a3c1

Browse files
authored
Merge pull request #86 from julia-vscode/cancel-v2
Cancel v2
2 parents fed5412 + 356bd07 commit 356a3c1

File tree

5 files changed

+115
-37
lines changed

5 files changed

+115
-37
lines changed

Project.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,20 @@
11
name = "JSONRPC"
22
uuid = "b9b8584e-8fd3-41f9-ad0c-7255d428e418"
33
authors = ["David Anthoff <anthoff@berkeley.edu>"]
4-
version = "1.4.3-DEV"
4+
version = "2.0.0-DEV"
55

66
[deps]
77
JSON = "682c06a0-de6a-54ab-a142-c8b1cf79cde6"
88
UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4"
9+
CancellationTokens = "2e8d271d-f2e2-407b-a864-17eb2156783e"
910

1011
[extras]
1112
TestItemRunner = "f8b46487-2199-4994-9208-9a1283c18c0a"
1213
Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40"
1314
Sockets = "6462fe0b-24de-5631-8697-dd941f90decc"
1415

1516
[compat]
17+
CancellationTokens = "1"
1618
JSON = "0.20, 0.21"
1719
julia = "1"
1820

src/JSONRPC.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module JSONRPC
22

3-
import JSON, UUIDs
3+
import JSON, UUIDs, CancellationTokens
44

55
include("packagedef.jl")
66

src/core.jl

Lines changed: 74 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,23 @@ function Base.showerror(io::IO, ex::JSONRPCError)
105105
end
106106
end
107107

108+
struct Request
109+
method::String
110+
params::Union{Nothing,Dict{String,Any},Vector{Any}}
111+
id::Union{Nothing,String}
112+
token::Union{CancellationTokens.CancellationToken,Nothing}
113+
end
114+
108115
mutable struct JSONRPCEndpoint{IOIn <: IO,IOOut <: IO}
109116
pipe_in::IOIn
110117
pipe_out::IOOut
111118

112119
out_msg_queue::Channel{Any}
113-
in_msg_queue::Channel{Any}
120+
in_msg_queue::Channel{Request}
114121

115-
outstanding_requests::Dict{String,Channel{Any}}
122+
outstanding_requests::Dict{String,Channel{Any}} # These are requests sent where we are waiting for a response
123+
cancellation_sources::Dict{String,CancellationTokens.CancellationTokenSource} # These are the cancellation sources for requests that are not finished processing
124+
no_longer_needed_cancellation_sources::Channel{String}
116125

117126
err_handler::Union{Nothing,Function}
118127

@@ -123,7 +132,18 @@ mutable struct JSONRPCEndpoint{IOIn <: IO,IOOut <: IO}
123132
end
124133

125134
JSONRPCEndpoint(pipe_in, pipe_out, err_handler = nothing) =
126-
JSONRPCEndpoint(pipe_in, pipe_out, Channel{Any}(Inf), Channel{Any}(Inf), Dict{String,Channel{Any}}(), err_handler, :idle, nothing, nothing)
135+
JSONRPCEndpoint(
136+
pipe_in,
137+
pipe_out,
138+
Channel{Any}(Inf),
139+
Channel{Request}(Inf),
140+
Dict{String,Channel{Any}}(),
141+
Dict{String,CancellationTokens.CancellationTokenSource}(),
142+
Channel{String}(Inf),
143+
err_handler,
144+
:idle,
145+
nothing,
146+
nothing)
127147

128148
function write_transport_layer(stream, response)
129149
response_utf8 = transcode(UInt8, response)
@@ -187,6 +207,13 @@ function Base.run(x::JSONRPCEndpoint)
187207

188208
x.read_task = @async try
189209
while true
210+
# First we delete any cancellation sources that are no longer needed. We do it this way to avoid a lock
211+
while isready(x.no_longer_needed_cancellation_sources)
212+
no_longer_needed_cs_id = take!(x.no_longer_needed_cancellation_sources)
213+
delete!(x.cancellation_sources, no_longer_needed_cs_id)
214+
end
215+
216+
# Now handle new messages
190217
message = read_transport_layer(x.pipe_in)
191218

192219
if message === nothing || x.status == :closed
@@ -196,13 +223,38 @@ function Base.run(x::JSONRPCEndpoint)
196223
message_dict = JSON.parse(message)
197224

198225
if haskey(message_dict, "method")
199-
try
200-
put!(x.in_msg_queue, message_dict)
201-
catch err
202-
if err isa InvalidStateException
203-
break
204-
else
205-
rethrow(err)
226+
method_name = message_dict["method"]
227+
params = get(message_dict, "params", nothing)
228+
id = get(message_dict, "id", nothing)
229+
cancel_source = id === nothing ? nothing : CancellationTokens.CancellationTokenSource()
230+
cancel_token = cancel_source === nothing ? nothing : CancellationTokens.get_token(cancel_source)
231+
232+
if method_name == "\$/cancelRequest"
233+
id_of_cancelled_request = params["id"]
234+
cs = get(x.cancellation_sources, id_of_cancelled_request, nothing) # We might have sent the response already
235+
if cs !== nothing
236+
CancellationTokens.cancel(cs)
237+
end
238+
else
239+
if id !== nothing
240+
x.cancellation_sources[id] = cancel_source
241+
end
242+
243+
request = Request(
244+
method_name,
245+
params,
246+
id,
247+
cancel_token
248+
)
249+
250+
try
251+
put!(x.in_msg_queue, request)
252+
catch err
253+
if err isa InvalidStateException
254+
break
255+
else
256+
rethrow(err)
257+
end
206258
end
207259
end
208260
else
@@ -294,20 +346,28 @@ function Base.iterate(endpoint::JSONRPCEndpoint, state = nothing)
294346
end
295347
end
296348

297-
function send_success_response(endpoint, original_request, result)
349+
function send_success_response(endpoint, original_request::Request, result)
298350
check_dead_endpoint!(endpoint)
299351

300-
response = Dict("jsonrpc" => "2.0", "id" => original_request["id"], "result" => result)
352+
original_request.id === nothing && error("Cannot send a response to a notification.")
353+
354+
put!(endpoint.no_longer_needed_cancellation_sources, original_request.id)
355+
356+
response = Dict("jsonrpc" => "2.0", "id" => original_request.id, "result" => result)
301357

302358
response_json = JSON.json(response)
303359

304360
put!(endpoint.out_msg_queue, response_json)
305361
end
306362

307-
function send_error_response(endpoint, original_request, code, message, data)
363+
function send_error_response(endpoint, original_request::Request, code, message, data)
308364
check_dead_endpoint!(endpoint)
309365

310-
response = Dict("jsonrpc" => "2.0", "id" => original_request["id"], "error" => Dict("code" => code, "message" => message, "data" => data))
366+
original_request.id === nothing && error("Cannot send a response to a notification.")
367+
368+
put!(endpoint.no_longer_needed_cancellation_sources, original_request.id)
369+
370+
response = Dict("jsonrpc" => "2.0", "id" => original_request.id, "error" => Dict("code" => code, "message" => message, "data" => data))
311371

312372
response_json = JSON.json(response)
313373

src/typed.jl

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,20 @@ function Base.setindex!(dispatcher::MsgDispatcher, func::Function, message_type:
5555
dispatcher._handlers[message_type.method] = Handler(message_type, func)
5656
end
5757

58-
function dispatch_msg(x::JSONRPCEndpoint, dispatcher::MsgDispatcher, msg)
58+
function dispatch_msg(x::JSONRPCEndpoint, dispatcher::MsgDispatcher, msg::Request)
5959
dispatcher._currentlyHandlingMsg = true
6060
try
61-
method_name = msg["method"]
61+
method_name = msg.method
6262
handler = get(dispatcher._handlers, method_name, nothing)
6363
if handler !== nothing
6464
param_type = get_param_type(handler.message_type)
65-
params = param_type === Nothing ? nothing : param_type <: NamedTuple ? convert(param_type,(;(Symbol(i[1])=>i[2] for i in msg["params"])...)) : param_type(msg["params"])
65+
params = param_type === Nothing ? nothing : param_type <: NamedTuple ? convert(param_type,(;(Symbol(i[1])=>i[2] for i in msg.params)...)) : param_type(msg.params)
6666

67-
res = handler.func(x, params)
67+
if handler.message_type isa RequestType
68+
res = handler.func(x, params, msg.token)
69+
else
70+
res = handler.func(x, params)
71+
end
6872

6973
if handler.message_type isa RequestType
7074
if res isa JSONRPCError
@@ -89,20 +93,28 @@ is_currently_handling_msg(d::MsgDispatcher) = d._currentlyHandlingMsg
8993

9094
macro message_dispatcher(name, body)
9195
quote
92-
function $(esc(name))(x, msg::Dict{String,Any}, context=nothing)
93-
method_name = msg["method"]::String
96+
function $(esc(name))(x, msg::Request, context=nothing)
97+
method_name = msg.method
9498

9599
$(
96100
(
97101
:(
98102
if method_name == $(esc(i.args[2])).method
99103
param_type = get_param_type($(esc(i.args[2])))
100-
params = param_type === Nothing ? nothing : param_type <: NamedTuple ? convert(param_type,(;(Symbol(i[1])=>i[2] for i in msg["params"])...)) : param_type(msg["params"])
104+
params = param_type === Nothing ? nothing : param_type <: NamedTuple ? convert(param_type,(;(Symbol(i[1])=>i[2] for i in msg.params)...)) : param_type(msg.params)
101105

102106
if context===nothing
103-
res = $(esc(i.args[3]))(x, params)
107+
if $(esc(i.args[2])) isa RequestType
108+
res = $(esc(i.args[3]))(params, msg.token)
109+
else
110+
res = $(esc(i.args[3]))(params)
111+
end
104112
else
105-
res = $(esc(i.args[3]))(x, params, context)
113+
if $(esc(i.args[2])) isa RequestType
114+
res = $(esc(i.args[3]))(params, context, msg.token)
115+
else
116+
res = $(esc(i.args[3]))(params, context)
117+
end
106118
end
107119

108120
if $(esc(i.args[2])) isa RequestType

test/test_typed.jl

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
request1_type = JSONRPC.RequestType("request1", Foo, String)
88
request2_type = JSONRPC.RequestType("request2", Nothing, String)
9-
notify1_type = JSONRPC.NotificationType("notify1", String)
9+
notify1_type = JSONRPC.NotificationType("notify1", Vector{String})
1010

1111
global g_var = ""
1212

@@ -19,20 +19,23 @@
1919
global conn = JSONRPC.JSONRPCEndpoint(sock, sock)
2020
global msg_dispatcher = JSONRPC.MsgDispatcher()
2121

22-
msg_dispatcher[request1_type] = (conn, params) -> begin
22+
msg_dispatcher[request1_type] = (conn, params, token) -> begin
2323
@test JSONRPC.is_currently_handling_msg(msg_dispatcher)
2424
params.fieldA == 1 ? "YES" : "NO"
2525
end
26-
msg_dispatcher[request2_type] = (conn, params) -> JSONRPC.JSONRPCError(-32600, "Our message", nothing)
27-
msg_dispatcher[notify1_type] = (conn, params) -> global g_var = params
26+
msg_dispatcher[request2_type] = (conn, params, token) -> JSONRPC.JSONRPCError(-32600, "Our message", nothing)
27+
msg_dispatcher[notify1_type] = (conn, params) -> global g_var = params[1]
2828

2929
run(conn)
3030

3131
for msg in conn
32+
@info "Got a message, now dispatching" msg
3233
JSONRPC.dispatch_msg(conn, msg_dispatcher, msg)
34+
@info "Finished dispatching"
3335
end
3436
catch err
3537
Base.display_error(stderr, err, catch_backtrace())
38+
Base.flush(stderr)
3639
end
3740

3841
wait(server_is_up)
@@ -42,7 +45,7 @@
4245

4346
run(conn2)
4447

45-
JSONRPC.send(conn2, notify1_type, "TEST")
48+
JSONRPC.send(conn2, notify1_type, ["TEST"])
4649

4750
res = JSONRPC.send(conn2, request1_type, Foo(fieldA=1, fieldB="FOO"))
4851

@@ -70,7 +73,7 @@
7073
global conn = JSONRPC.JSONRPCEndpoint(sock, sock)
7174
global msg_dispatcher = JSONRPC.MsgDispatcher()
7275

73-
msg_dispatcher[request2_type] = (conn, params)->34 # The request type requires a `String` return, so this tests whether we get an error.
76+
msg_dispatcher[request2_type] = (conn, params, token)->34 # The request type requires a `String` return, so this tests whether we get an error.
7477

7578
run(conn)
7679

@@ -79,6 +82,7 @@
7982
end
8083
catch err
8184
Base.display_error(stderr, err, catch_backtrace())
85+
Base.flush(stderr)
8286
end
8387

8488
wait(server_is_up)
@@ -117,18 +121,18 @@ end
117121

118122
request1_type = JSONRPC.RequestType("request1", Foo, String)
119123
request2_type = JSONRPC.RequestType("request2", Nothing, String)
120-
notify1_type = JSONRPC.NotificationType("notify1", String)
124+
notify1_type = JSONRPC.NotificationType("notify1", Vector{String})
121125

122126
global g_var = ""
123127

124128
server_is_up = Base.Condition()
125129

126130
JSONRPC.@message_dispatcher my_dispatcher begin
127-
request1_type => (conn, params) -> begin
131+
request1_type => (params, token) -> begin
128132
params.fieldA == 1 ? "YES" : "NO"
129133
end
130-
request2_type => (conn, params) -> JSONRPC.JSONRPCError(-32600, "Our message", nothing)
131-
notify1_type => (conn, params) -> global g_var = params
134+
request2_type => (params, token) -> JSONRPC.JSONRPCError(-32600, "Our message", nothing)
135+
notify1_type => (params) -> global g_var = params[1]
132136
end
133137

134138
server_task = @async try
@@ -154,7 +158,7 @@ end
154158

155159
run(conn2)
156160

157-
JSONRPC.send(conn2, notify1_type, "TEST")
161+
JSONRPC.send(conn2, notify1_type, ["TEST"])
158162

159163
res = JSONRPC.send(conn2, request1_type, Foo(fieldA=1, fieldB="FOO"))
160164

@@ -176,7 +180,7 @@ end
176180
server_is_up = Base.Condition()
177181

178182
JSONRPC.@message_dispatcher my_dispatcher2 begin
179-
request2_type => (conn, params) -> 34 # The request type requires a `String` return, so this tests whether we get an error.
183+
request2_type => (params, token) -> 34 # The request type requires a `String` return, so this tests whether we get an error.
180184
end
181185

182186
server_task2 = @async try

0 commit comments

Comments
 (0)