@@ -41,9 +41,9 @@ function Base.showerror(io::IO, ex::JSONRPCError)
4141 end
4242end
4343
44- mutable struct JSONRPCEndpoint
45- pipe_in
46- pipe_out
44+ mutable struct JSONRPCEndpoint{IOIn <: IO ,IOOut <: IO }
45+ pipe_in:: IOIn
46+ pipe_out:: IOOut
4747
4848 out_msg_queue:: Channel{Any}
4949 in_msg_queue:: Channel{Any}
@@ -56,12 +56,11 @@ mutable struct JSONRPCEndpoint
5656
5757 read_task:: Union{Nothing,Task}
5858 write_task:: Union{Nothing,Task}
59-
60- function JSONRPCEndpoint (pipe_in, pipe_out, err_handler = nothing )
61- return new (pipe_in, pipe_out, Channel {Any} (Inf ), Channel {Any} (Inf ), Dict {String,Channel{Any}} (), err_handler, :idle , nothing , nothing )
62- end
6359end
6460
61+ JSONRPCEndpoint (pipe_in, pipe_out, err_handler = nothing ) =
62+ JSONRPCEndpoint (pipe_in, pipe_out, Channel {Any} (Inf ), Channel {Any} (Inf ), Dict {String,Channel{Any}} (), err_handler, :idle , nothing , nothing )
63+
6564function write_transport_layer (stream, response)
6665 response_utf8 = transcode (UInt8, response)
6766 n = length (response_utf8)
@@ -86,12 +85,19 @@ function read_transport_layer(stream)
8685 return message_str
8786end
8887
88+ Base. isopen (x:: JSONRPCEndpoint ) = x. status != :closed && isopen (x. pipe_in) && isopen (x. pipe_out)
89+
8990function Base. run (x:: JSONRPCEndpoint )
9091 x. status == :idle || error (" Endpoint is not idle." )
9192
9293 x. write_task = @async try
9394 for msg in x. out_msg_queue
94- write_transport_layer (x. pipe_out, msg)
95+ if isopen (x. pipe_out)
96+ write_transport_layer (x. pipe_out, msg)
97+ else
98+ # TODO Reconsider at some point whether this should be treated as an error.
99+ break
100+ end
95101 end
96102 catch err
97103 bt = catch_backtrace ()
@@ -143,7 +149,7 @@ function Base.run(x::JSONRPCEndpoint)
143149end
144150
145151function send_notification (x:: JSONRPCEndpoint , method:: AbstractString , params)
146- x . status == :running || error ( " Endpoint is not running. " )
152+ check_dead_endpoint! (x )
147153
148154 message = Dict (" jsonrpc" => " 2.0" , " method" => method, " params" => params)
149155
@@ -155,7 +161,7 @@ function send_notification(x::JSONRPCEndpoint, method::AbstractString, params)
155161end
156162
157163function send_request (x:: JSONRPCEndpoint , method:: AbstractString , params)
158- x . status == :running || error ( " Endpoint is not running. " )
164+ check_dead_endpoint! (x )
159165
160166 id = string (UUIDs. uuid4 ())
161167 message = Dict (" jsonrpc" => " 2.0" , " method" => method, " params" => params, " id" => id)
@@ -182,15 +188,15 @@ function send_request(x::JSONRPCEndpoint, method::AbstractString, params)
182188end
183189
184190function get_next_message (endpoint:: JSONRPCEndpoint )
185- endpoint . status == :running || error ( " Endpoint is not running. " )
191+ check_dead_endpoint! (endpoint )
186192
187193 msg = take! (endpoint. in_msg_queue)
188194
189195 return msg
190196end
191197
192198function Base. iterate (endpoint:: JSONRPCEndpoint , state = nothing )
193- endpoint . status == :running || error ( " Endpoint is not running. " )
199+ check_dead_endpoint! (endpoint )
194200
195201 try
196202 return take! (endpoint. in_msg_queue), nothing
@@ -204,7 +210,7 @@ function Base.iterate(endpoint::JSONRPCEndpoint, state = nothing)
204210end
205211
206212function send_success_response (endpoint, original_request, result)
207- endpoint . status == :running || error ( " Endpoint is not running. " )
213+ check_dead_endpoint! (endpoint )
208214
209215 response = Dict (" jsonrpc" => " 2.0" , " id" => original_request[" id" ], " result" => result)
210216
@@ -214,7 +220,7 @@ function send_success_response(endpoint, original_request, result)
214220end
215221
216222function send_error_response (endpoint, original_request, code, message, data)
217- endpoint . status == :running || error ( " Endpoint is not running. " )
223+ check_dead_endpoint! (endpoint )
218224
219225 response = Dict (" jsonrpc" => " 2.0" , " id" => original_request[" id" ], " error" => Dict (" code" => code, " message" => message, " data" => data))
220226
@@ -224,11 +230,7 @@ function send_error_response(endpoint, original_request, code, message, data)
224230end
225231
226232function Base. close (endpoint:: JSONRPCEndpoint )
227- endpoint. status == :running || error (" Endpoint is not running." )
228-
229- while isready (endpoint. out_msg_queue)
230- yield ()
231- end
233+ flush (endpoint)
232234
233235 endpoint. status = :closed
234236 close (endpoint. in_msg_queue)
@@ -240,3 +242,17 @@ function Base.close(endpoint::JSONRPCEndpoint)
240242 # the socket, which we don't want to do
241243 # fetch(endpoint.read_task)
242244end
245+
246+ function Base. flush (endpoint:: JSONRPCEndpoint )
247+ check_dead_endpoint! (endpoint)
248+
249+ while isready (endpoint. out_msg_queue)
250+ yield ()
251+ end
252+ end
253+
254+ function check_dead_endpoint! (endpoint)
255+ status = endpoint. status
256+ status === :running && return
257+ error (" Endpoint is not running, the current state is $(status) ." )
258+ end
0 commit comments