|
66 | 66 | -record(state, { |
67 | 67 | config, |
68 | 68 | pending_request_map = #{}, |
69 | | - ws_socket_map = #{} |
| 69 | + ws_socket_map = #{}, |
| 70 | + pending_buffer_map = #{} |
70 | 71 | }). |
71 | 72 |
|
72 | 73 | %% |
@@ -122,59 +123,78 @@ handle_receive(Socket, Packet, State) -> |
122 | 123 |
|
123 | 124 | %% @private |
124 | 125 | handle_http_request(Socket, Packet, State) -> |
125 | | - case maps:get(Socket, State#state.pending_request_map, undefined) of |
| 126 | + PendingRequestMap = State#state.pending_request_map, |
| 127 | + BufferMap = State#state.pending_buffer_map, |
| 128 | + PendingBuffer = maps:get(Socket, BufferMap, <<>>), |
| 129 | + AccumulatedPacket = <<PendingBuffer/binary, Packet/binary>>, |
| 130 | + case maps:get(Socket, PendingRequestMap, undefined) of |
126 | 131 | undefined -> |
127 | | - HttpRequest = parse_http_request(binary_to_list(Packet)), |
128 | | - % ?TRACE("HttpRequest: ~p~n", [HttpRequest]), |
129 | | - #{ |
130 | | - method := Method, |
131 | | - headers := Headers |
132 | | - } = HttpRequest, |
133 | | - case get_protocol(Method, Headers) of |
134 | | - http -> |
135 | | - case init_handler(HttpRequest, State) of |
136 | | - {ok, {Handler, HandlerState, PathSuffix, HandlerConfig}} -> |
137 | | - NewHttpRequest = HttpRequest#{ |
138 | | - handler => Handler, |
139 | | - handler_state => HandlerState, |
140 | | - path_suffix => PathSuffix, |
141 | | - handler_config => HandlerConfig, |
142 | | - socket => Socket |
143 | | - }, |
144 | | - handle_request_state(Socket, NewHttpRequest, State); |
145 | | - Error -> |
146 | | - {close, create_error(?INTERNAL_SERVER_ERROR, Error)} |
147 | | - end; |
148 | | - ws -> |
149 | | - ?TRACE("Protocol is ws", []), |
150 | | - Config = State#state.config, |
151 | | - Path = maps:get(path, HttpRequest), |
152 | | - case get_handler(Path, Config) of |
153 | | - {ok, PathSuffix, EntryConfig} -> |
154 | | - WsHandler = maps:get(handler, EntryConfig), |
155 | | - ?TRACE("Got handler ~p", [WsHandler]), |
156 | | - HandlerConfig = maps:get(handler_config, EntryConfig, #{}), |
157 | | - case WsHandler:start(Socket, PathSuffix, HandlerConfig) of |
158 | | - {ok, WebSocket} -> |
159 | | - ?TRACE("Started web socket handler: ~p", [WebSocket]), |
160 | | - NewWebSocketMap = maps:put(Socket, WebSocket, State#state.ws_socket_map), |
161 | | - NewState = State#state{ws_socket_map = NewWebSocketMap}, |
162 | | - ReplyToken = get_reply_token(maps:get(headers, HttpRequest)), |
163 | | - ReplyHeaders = #{"Upgrade" => "websocket", "Connection" => "Upgrade", "Sec-WebSocket-Accept" => ReplyToken}, |
164 | | - Reply = create_reply(?SWITCHING_PROTOCOLS, ReplyHeaders, <<"">>), |
165 | | - ?TRACE("Sending web socket upgrade reply: ~p", [Reply]), |
166 | | - {reply, Reply, NewState}; |
| 132 | + case maybe_parse_http_request(AccumulatedPacket) of |
| 133 | + {more, IncompletePacket} -> |
| 134 | + NewBufferMap = BufferMap#{Socket => IncompletePacket}, |
| 135 | + {noreply, State#state{pending_buffer_map = NewBufferMap}}; |
| 136 | + {ok, HttpRequest} -> |
| 137 | + CleanBufferMap = maps:remove(Socket, BufferMap), |
| 138 | + CleanState = State#state{pending_buffer_map = CleanBufferMap}, |
| 139 | + % ?TRACE("HttpRequest: ~p~n", [HttpRequest]), |
| 140 | + #{ |
| 141 | + method := Method, |
| 142 | + headers := Headers |
| 143 | + } = HttpRequest, |
| 144 | + case get_protocol(Method, Headers) of |
| 145 | + http -> |
| 146 | + case init_handler(HttpRequest, CleanState) of |
| 147 | + {ok, {Handler, HandlerState, PathSuffix, HandlerConfig}} -> |
| 148 | + NewHttpRequest = HttpRequest#{ |
| 149 | + handler => Handler, |
| 150 | + handler_state => HandlerState, |
| 151 | + path_suffix => PathSuffix, |
| 152 | + handler_config => HandlerConfig, |
| 153 | + socket => Socket |
| 154 | + }, |
| 155 | + handle_request_state(Socket, NewHttpRequest, CleanState); |
167 | 156 | Error -> |
168 | | - ?TRACE("Web socket error: ~p", [Error]), |
169 | | - {close, create_error(?INTERNAL_SERVER_ERROR, {web_socket_error, Error})} |
| 157 | + {close, create_error(?INTERNAL_SERVER_ERROR, Error)} |
170 | 158 | end; |
171 | | - Error -> |
172 | | - Error |
173 | | - end |
| 159 | + ws -> |
| 160 | + ?TRACE("Protocol is ws", []), |
| 161 | + Config = CleanState#state.config, |
| 162 | + Path = maps:get(path, HttpRequest), |
| 163 | + case get_handler(Path, Config) of |
| 164 | + {ok, PathSuffix, EntryConfig} -> |
| 165 | + WsHandler = maps:get(handler, EntryConfig), |
| 166 | + ?TRACE("Got handler ~p", [WsHandler]), |
| 167 | + HandlerConfig = maps:get(handler_config, EntryConfig, #{}), |
| 168 | + case WsHandler:start(Socket, PathSuffix, HandlerConfig) of |
| 169 | + {ok, WebSocket} -> |
| 170 | + ?TRACE("Started web socket handler: ~p", [WebSocket]), |
| 171 | + NewWebSocketMap = maps:put(Socket, WebSocket, CleanState#state.ws_socket_map), |
| 172 | + NewState = CleanState#state{ws_socket_map = NewWebSocketMap}, |
| 173 | + ReplyToken = get_reply_token(maps:get(headers, HttpRequest)), |
| 174 | + ReplyHeaders = #{"Upgrade" => "websocket", "Connection" => "Upgrade", "Sec-WebSocket-Accept" => ReplyToken}, |
| 175 | + Reply = create_reply(?SWITCHING_PROTOCOLS, ReplyHeaders, <<"">>), |
| 176 | + ?TRACE("Sending web socket upgrade reply: ~p", [Reply]), |
| 177 | + {reply, Reply, NewState}; |
| 178 | + Error -> |
| 179 | + ?TRACE("Web socket error: ~p", [Error]), |
| 180 | + {close, create_error(?INTERNAL_SERVER_ERROR, {web_socket_error, Error})} |
| 181 | + end; |
| 182 | + Error -> |
| 183 | + Error |
| 184 | + end |
| 185 | + end; |
| 186 | + {error, Reason} -> |
| 187 | + CleanBufferMap = maps:remove(Socket, BufferMap), |
| 188 | + _CleanState = State#state{pending_buffer_map = CleanBufferMap}, |
| 189 | + {close, create_error(?BAD_REQUEST, Reason)} |
174 | 190 | end; |
175 | 191 | PendingHttpRequest -> |
176 | 192 | ?TRACE("Packetlen: ~p", [erlang:byte_size(Packet)]), |
177 | | - handle_request_state(Socket, PendingHttpRequest#{body := Packet}, State) |
| 193 | + ExistingBody = maps:get(body, PendingHttpRequest, <<>>), |
| 194 | + NewBody = <<ExistingBody/binary, Packet/binary>>, |
| 195 | + CleanBufferMap = maps:remove(Socket, BufferMap), |
| 196 | + CleanState = State#state{pending_buffer_map = CleanBufferMap}, |
| 197 | + handle_request_state(Socket, PendingHttpRequest#{body := NewBody}, CleanState) |
178 | 198 | end. |
179 | 199 |
|
180 | 200 | %% @private |
@@ -213,7 +233,7 @@ handle_request_state(Socket, HttpRequest, State) -> |
213 | 233 | {reply, Reply, State#state{pending_request_map = NewPendingRequestMap}}; |
214 | 234 | wait_for_body -> |
215 | 235 | NewPendingRequestMap = PendingRequestMap#{Socket => HttpRequest}, |
216 | | - call_http_req_handler(Socket, HttpRequest, State#state{pending_request_map = NewPendingRequestMap}) |
| 236 | + {noreply, State#state{pending_request_map = NewPendingRequestMap}} |
217 | 237 | end. |
218 | 238 |
|
219 | 239 | %% @private |
@@ -290,13 +310,19 @@ update_state(Socket, HttpRequest, HandlerState, State) -> |
290 | 310 |
|
291 | 311 | %% @hidden |
292 | 312 | handle_tcp_closed(Socket, State) -> |
293 | | - case maps:get(Socket, State#state.ws_socket_map, undefined) of |
| 313 | + NewPendingRequestMap = maps:remove(Socket, State#state.pending_request_map), |
| 314 | + NewPendingBufferMap = maps:remove(Socket, State#state.pending_buffer_map), |
| 315 | + CleanState = State#state{ |
| 316 | + pending_request_map = NewPendingRequestMap, |
| 317 | + pending_buffer_map = NewPendingBufferMap |
| 318 | + }, |
| 319 | + case maps:get(Socket, CleanState#state.ws_socket_map, undefined) of |
294 | 320 | undefined -> |
295 | | - State; |
| 321 | + CleanState; |
296 | 322 | WebSocket -> |
297 | 323 | ok = httpd_ws_handler:stop(WebSocket), |
298 | | - NewWebSocketMap = maps:remove(Socket, State#state.ws_socket_map), |
299 | | - State#state{ws_socket_map = NewWebSocketMap} |
| 324 | + NewWebSocketMap = maps:remove(Socket, CleanState#state.ws_socket_map), |
| 325 | + CleanState#state{ws_socket_map = NewWebSocketMap} |
300 | 326 | end. |
301 | 327 |
|
302 | 328 | %% |
@@ -324,6 +350,29 @@ parse_http_request(Packet) -> |
324 | 350 | } |
325 | 351 | ). |
326 | 352 |
|
| 353 | +maybe_parse_http_request(Packet) when is_binary(Packet) -> |
| 354 | + case find_header_delimiter(Packet) of |
| 355 | + nomatch -> |
| 356 | + {more, Packet}; |
| 357 | + {_Pos, _Len} -> |
| 358 | + try |
| 359 | + {ok, parse_http_request(binary_to_list(Packet))} |
| 360 | + catch |
| 361 | + throw:Reason -> |
| 362 | + {error, Reason}; |
| 363 | + error:Reason -> |
| 364 | + {error, Reason} |
| 365 | + end |
| 366 | + end. |
| 367 | + |
| 368 | +find_header_delimiter(Packet) -> |
| 369 | + case binary:match(Packet, <<"\r\n\r\n">>) of |
| 370 | + nomatch -> |
| 371 | + binary:match(Packet, <<"\n\n">>); |
| 372 | + Match -> |
| 373 | + Match |
| 374 | + end. |
| 375 | + |
327 | 376 | %% @private |
328 | 377 | parse_heading([$\s|Rest], start, Tmp, Accum) -> |
329 | 378 | parse_heading(Rest, start, Tmp, Accum); |
|
0 commit comments