From 46bc5e71d1b571b1d1c8ad204adc691f73be5b5a Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 15 May 2024 16:20:31 +0100 Subject: [PATCH 1/2] [WIP] Add test for EOF leak --- cohttp-lwt-unix/test/dune | 5 ++++ cohttp-lwt-unix/test/test_leak.ml | 47 +++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 cohttp-lwt-unix/test/test_leak.ml diff --git a/cohttp-lwt-unix/test/dune b/cohttp-lwt-unix/test/dune index 163c21cdcf..7600be88b6 100644 --- a/cohttp-lwt-unix/test/dune +++ b/cohttp-lwt-unix/test/dune @@ -36,6 +36,11 @@ (name test_body) (libraries cohttp_lwt_unix_test cohttp-lwt-unix)) +(executable + (modules test_leak) + (name test_leak) + (libraries cohttp-lwt-unix)) + (rule (alias runtest) (package cohttp-lwt-unix) diff --git a/cohttp-lwt-unix/test/test_leak.ml b/cohttp-lwt-unix/test/test_leak.ml new file mode 100644 index 0000000000..afff3ba2f5 --- /dev/null +++ b/cohttp-lwt-unix/test/test_leak.ml @@ -0,0 +1,47 @@ +(* + This test is meant to be used in the following way: + - `make` the whole repo + - `dune exec ./_build/default/cohttp-lwt-unix/test/test_leak.exe` + - `curl -s 'localhost:8080/sleep'` in a different console + - observe the first console having a stream of "sleep messages" + - when stopping (CTRL+C) the `curl` request, the first console + should show a closing connection message; if it does, then the + test is successful, otherwise (and the server keeps sleeping), + the test failed. +*) + +open Lwt +open Cohttp_lwt_unix + +let port = 8080 + +let callback (_, con) req _body = + (* Record connection established *) + let con_string = Cohttp.Connection.to_string con in + Format.printf "Cohttp connection on %s@." con_string; + (* Match given endpoint *) + let uri = req |> Request.uri |> Uri.path in + match uri with + | "/sleep" -> + (* Continuous sleep *) + let rec get_busy () = + Lwt_unix.sleep 1.0 >>= fun () -> + Format.printf "I slept @."; + get_busy () + in + get_busy () + (* Unknown call *) + | _ -> Server.respond_string ~status:`Not_found ~body:"Not found" () + +let start_server () = + let server = + Server.create + ~mode:(`TCP (`Port port)) + (Server.make + ~conn_closed:(fun _ -> Format.printf "Cohttp connection closed\n%!") + ~callback ()) + in + Printf.printf "Server running on port %d\n%!" port; + server + +let () = Lwt_main.run (start_server ()) From c29f936c9132cbe09b1dec85f61847bbada53f31 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Thu, 6 Jun 2024 11:26:50 +0100 Subject: [PATCH 2/2] Fix Cohttp EOF read with Lwt_unix recv MSG_PEEK --- cohttp-lwt-unix/src/io.ml | 29 +++++++++++++++ cohttp-lwt-unix/test/test_leak.ml | 1 + cohttp-lwt/src/s.ml | 30 ++++++++++++++++ cohttp-lwt/src/server.ml | 60 ++++++++++++++++++++----------- cohttp-mirage/src/io.ml | 2 ++ 5 files changed, 101 insertions(+), 21 deletions(-) diff --git a/cohttp-lwt-unix/src/io.ml b/cohttp-lwt-unix/src/io.ml index 70389dd90b..6656178da8 100644 --- a/cohttp-lwt-unix/src/io.ml +++ b/cohttp-lwt-unix/src/io.ml @@ -80,3 +80,32 @@ let catch f = | ex -> Lwt.fail ex) let pp_error = Fmt.exn + +let wait_eof_or_closed conn ic sleep_fn = + let wait_for_cancel () = fst (Lwt.task ()) in + match (conn : Conduit_lwt_unix.flow) with + | Vchan _ -> wait_for_cancel () + | TCP { fd; _ } | Domain_socket { fd; _ } -> + let peek_buffer = Bytes.create 1 in + let has_recv_eof fd = + (* MSG_PEEK does not consume data from the stream and does not + impact normal read operations *) + Lwt_unix.recv fd peek_buffer 0 1 Unix.[ MSG_PEEK ] >>= fun n -> + Lwt.return (n = 0) + in + let rec loop fd = + (* Calls [sleep_fn] to allow yielding control to the request handler *) + sleep_fn () >>= fun () -> + if Lwt_io.is_closed ic then + (* The connection was closed locally. Stop waiting for EOF. + The client has closed the connection and now possibly is doing + some clean up. We should not interrupt this. Let's wait + till the promise for the request handling is resolved and then this + promise will be cancelled. *) + wait_for_cancel () + else + has_recv_eof fd >>= function + | true -> Lwt.return_unit + | false -> loop fd + in + loop fd diff --git a/cohttp-lwt-unix/test/test_leak.ml b/cohttp-lwt-unix/test/test_leak.ml index afff3ba2f5..9939bd6320 100644 --- a/cohttp-lwt-unix/test/test_leak.ml +++ b/cohttp-lwt-unix/test/test_leak.ml @@ -39,6 +39,7 @@ let start_server () = ~mode:(`TCP (`Port port)) (Server.make ~conn_closed:(fun _ -> Format.printf "Cohttp connection closed\n%!") + ~sleep_fn:(fun () -> Lwt_unix.sleep 1.0) ~callback ()) in Printf.printf "Server running on port %d\n%!" port; diff --git a/cohttp-lwt/src/s.ml b/cohttp-lwt/src/s.ml index fe5b33d2cb..d5be96ffe7 100644 --- a/cohttp-lwt/src/s.ml +++ b/cohttp-lwt/src/s.ml @@ -13,6 +13,20 @@ module type IO = sig which case it returns the error. *) val pp_error : Format.formatter -> error -> unit + + val wait_eof_or_closed : conn -> ic -> (unit -> unit t) -> unit t + (** [wait_eof_or_closed conn ic sleep_fn] waits for an EOF or a Closed status + on the input channel [ic]. This function is designed to be used in + [Lwt.pick] to run concurrently with the request handling from the input + channel. The function checks for EOF using [MSG_PEEK] on the input channel + without consuming data, thereby not disturbing the request handling. If + the connection is closed locally, Cohttp will stop waiting for EOF and + will wait the promise to be cancelled. This function ensures that the + monitoring does not spin too quickly and uses CPU efficiently when the + input channel has read activity but the client is not reading it. + + [sleep_fn] is a parameter function used to yield control periodically, + keeping Cohttp platform-independent. *) end (** The [Net] module type defines how to connect to a remote node and close the @@ -155,12 +169,27 @@ module type Server = sig val make_response_action : ?conn_closed:(conn -> unit) -> + ?sleep_fn:(unit -> unit Lwt.t) -> callback:(conn -> Cohttp.Request.t -> Body.t -> response_action Lwt.t) -> unit -> t + (** [make_response_action] creates a set of callbacks used by Cohttp Server. + + - [callback] is called when a new connection is accepted by the server + socket. + - [conn_closed] if provided, will be called when the connection is closed, + e.g. when an EOF is received. + - [sleep_fn] if provided, will be used for periodic checks for EOF from + the client. If this callback is not provided, Cohttp will not detect and + notify the client about EOF received from the peer while the client is + handling the new connection. This can lead to a resource leak if the + [callback] is designed to never resolve. If the connection is closed + locally, Cohttp will stop waiting for EOF and will wait the promise to + be cancelled. *) val make_expert : ?conn_closed:(conn -> unit) -> + ?sleep_fn:(unit -> unit Lwt.t) -> callback: (conn -> Cohttp.Request.t -> @@ -171,6 +200,7 @@ module type Server = sig val make : ?conn_closed:(conn -> unit) -> + ?sleep_fn:(unit -> unit Lwt.t) -> callback: (conn -> Cohttp.Request.t -> Body.t -> (Cohttp.Response.t * Body.t) Lwt.t) -> unit -> diff --git a/cohttp-lwt/src/server.ml b/cohttp-lwt/src/server.ml index 922ee8851f..e7c00f1138 100644 --- a/cohttp-lwt/src/server.ml +++ b/cohttp-lwt/src/server.ml @@ -19,22 +19,23 @@ module Make (IO : S.IO) = struct type t = { callback : conn -> Cohttp.Request.t -> Body.t -> response_action Lwt.t; conn_closed : conn -> unit; + sleep_fn : (unit -> unit Lwt.t) option; } - let make_response_action ?(conn_closed = ignore) ~callback () = - { conn_closed; callback } + let make_response_action ?(conn_closed = ignore) ?sleep_fn ~callback () = + { conn_closed; callback; sleep_fn } - let make ?conn_closed ~callback () = + let make ?conn_closed ?sleep_fn ~callback () = let callback conn req body = callback conn req body >|= fun rsp -> `Response rsp in - make_response_action ?conn_closed ~callback () + make_response_action ?conn_closed ?sleep_fn ~callback () - let make_expert ?conn_closed ~callback () = + let make_expert ?conn_closed ?sleep_fn ~callback () = let callback conn req body = callback conn req body >|= fun rsp -> `Expert rsp in - make_response_action ?conn_closed ~callback () + make_response_action ?conn_closed ?sleep_fn ~callback () module Transfer_IO = Cohttp__Transfer_io.Make (IO) @@ -111,7 +112,9 @@ module Make (IO : S.IO) = struct `Response rsp)) (fun () -> Body.drain_body body) - let handle_response ~keep_alive oc res body conn_closed handle_client = + type conn_action = Call_conn_closed | Call_conn_closed_and_drain of Body.t + + let handle_response ~keep_alive oc res body handle_client = IO.catch (fun () -> let flush = Response.flush res in Response.write ~flush @@ -119,32 +122,28 @@ module Make (IO : S.IO) = struct res oc) >>= function | Ok () -> - if keep_alive then handle_client oc - else - let () = conn_closed () in - Lwt.return_unit + if keep_alive then handle_client oc else Lwt.return Call_conn_closed | Error e -> Log.info (fun m -> m "IO error while writing body: %a" IO.pp_error e); - conn_closed (); - Body.drain_body body + Lwt.return (Call_conn_closed_and_drain body) let rec handle_client ic oc conn spec = Request.read ic >>= function | `Eof -> - spec.conn_closed conn; - Lwt.return_unit + Log.debug (fun m -> + m "Got EOF while handling client: %s" + (Cohttp.Connection.to_string (snd conn))); + Lwt.return Call_conn_closed | `Invalid data -> Log.err (fun m -> m "invalid input %s while handling client" data); - spec.conn_closed conn; - Lwt.return_unit + Lwt.return Call_conn_closed | `Ok req -> ( let body = read_body ic req in handle_request spec.callback conn req body >>= function | `Response (res, body) -> let keep_alive = Request.is_keep_alive req in - handle_response ~keep_alive oc res body - (fun () -> spec.conn_closed conn) - (fun oc -> handle_client ic oc conn spec) + handle_response ~keep_alive oc res body (fun oc -> + handle_client ic oc conn spec) | `Expert (res, io_handler) -> Response.write_header res oc >>= fun () -> io_handler ic oc >>= fun () -> handle_client ic oc conn spec) @@ -152,9 +151,28 @@ module Make (IO : S.IO) = struct let callback spec io_id ic oc = let conn_id = Cohttp.Connection.create () in let conn_closed () = spec.conn_closed (io_id, conn_id) in + let handle () = handle_client ic oc (io_id, conn_id) spec in + let is_conn_closed () = + (* Without a sleep function we cannot safely loop waiting for EOF *) + match spec.sleep_fn with + | None -> fst (Lwt.task ()) (* wait to be cancelled *) + | Some sleep_fn -> + IO.wait_eof_or_closed io_id ic sleep_fn >>= fun () -> + Log.debug (fun m -> + m "Client closed the connection, got EOF for %s" + (Cohttp.Connection.to_string conn_id)); + Lwt.return Call_conn_closed + in Lwt.catch (fun () -> - IO.catch (fun () -> handle_client ic oc (io_id, conn_id) spec) + IO.catch (fun () -> + Lwt.pick [ handle (); is_conn_closed () ] >>= function + | Call_conn_closed -> + conn_closed (); + Lwt.return_unit + | Call_conn_closed_and_drain body -> + conn_closed (); + Body.drain_body body) >>= function | Ok () -> Lwt.return_unit | Error e -> diff --git a/cohttp-mirage/src/io.ml b/cohttp-mirage/src/io.ml index fe2f9d7668..80ceb1511d 100644 --- a/cohttp-mirage/src/io.ml +++ b/cohttp-mirage/src/io.ml @@ -76,4 +76,6 @@ module Make (Channel : Mirage_channel.S) = struct | Read_exn e -> Lwt.return_error (Read_error e) | Write_exn e -> Lwt.return_error (Write_error e) | ex -> Lwt.fail ex) + + let wait_eof_or_closed _conn _ic _sleep_fn = assert false end