@@ -70,20 +70,28 @@ function write_transport_layer(stream, response)
7070end
7171
7272function read_transport_layer (stream)
73- header_dict = Dict {String,String} ()
74- line = chomp (readline (stream))
75- # Check whether the socket was closed
76- if line == " "
77- return nothing
78- end
79- while length (line) > 0
80- h_parts = split (line, " :" )
81- header_dict[chomp (h_parts[1 ])] = chomp (h_parts[2 ])
73+ try
74+ header_dict = Dict {String,String} ()
8275 line = chomp (readline (stream))
76+ # Check whether the socket was closed
77+ if line == " "
78+ return nothing
79+ end
80+ while length (line) > 0
81+ h_parts = split (line, " :" )
82+ header_dict[chomp (h_parts[1 ])] = chomp (h_parts[2 ])
83+ line = chomp (readline (stream))
84+ end
85+ message_length = parse (Int, header_dict[" Content-Length" ])
86+ message_str = String (read (stream, message_length))
87+ return message_str
88+ catch err
89+ if err isa Base. IOError
90+ return nothing
91+ end
92+
93+ rethrow (err)
8394 end
84- message_length = parse (Int, header_dict[" Content-Length" ])
85- message_str = String (read (stream, message_length))
86- return message_str
8795end
8896
8997Base. isopen (x:: JSONRPCEndpoint ) = x. status != :closed && isopen (x. pipe_in) && isopen (x. pipe_out)
@@ -114,41 +122,41 @@ function Base.run(x::JSONRPCEndpoint)
114122 end
115123
116124 x. read_task = @async try
117- try
118- while true
119- message = read_transport_layer (x. pipe_in)
125+ while true
126+ message = read_transport_layer (x. pipe_in)
120127
121- if message === nothing || x. status == :closed
122- break
123- end
128+ if message === nothing || x. status == :closed
129+ break
130+ end
124131
125- message_dict = JSON. parse (message)
126-
127- if haskey (message_dict, " method" )
128- try
129- put! (x. in_msg_queue, message_dict)
130- catch err
131- if err isa InvalidStateException
132- break
133- else
134- rethrow (err)
135- end
136- end
137- else
138- # This must be a response
139- id_of_request = message_dict[" id" ]
132+ message_dict = JSON. parse (message)
140133
141- channel_for_response = x. outstanding_requests[id_of_request]
142- put! (channel_for_response, message_dict)
134+ if haskey (message_dict, " method" )
135+ try
136+ put! (x. in_msg_queue, message_dict)
137+ catch err
138+ if err isa InvalidStateException
139+ break
140+ else
141+ rethrow (err)
142+ end
143143 end
144+ else
145+ # This must be a response
146+ id_of_request = message_dict[" id" ]
147+
148+ channel_for_response = x. outstanding_requests[id_of_request]
149+ put! (channel_for_response, message_dict)
144150 end
145- finally
146- close (x. in_msg_queue)
147151 end
152+
153+ close (x. in_msg_queue)
148154
149155 for i in values (x. outstanding_requests)
150156 close (i)
151157 end
158+
159+ x. status = :closed
152160 catch err
153161 bt = catch_backtrace ()
154162 if x. err_handler != = nothing
@@ -243,6 +251,8 @@ function send_error_response(endpoint, original_request, code, message, data)
243251end
244252
245253function Base. close (endpoint:: JSONRPCEndpoint )
254+ endpoint. status == :closed && return
255+
246256 flush (endpoint)
247257
248258 endpoint. status = :closed
0 commit comments