From 031bce8847cbe7a097657570b46285b6bea8d97d Mon Sep 17 00:00:00 2001 From: Gabriel Buica Date: Wed, 10 Apr 2024 14:46:34 +0100 Subject: [PATCH] fixup! CP-48195: Split tracing library Signed-off-by: Gabriel Buica --- ocaml/libs/tracing/tracing_export.ml | 594 +++++++++++++------------- ocaml/libs/tracing/tracing_export.mli | 28 +- 2 files changed, 308 insertions(+), 314 deletions(-) diff --git a/ocaml/libs/tracing/tracing_export.ml b/ocaml/libs/tracing/tracing_export.ml index 952b99f3d29..e7dc34ffdc6 100644 --- a/ocaml/libs/tracing/tracing_export.ml +++ b/ocaml/libs/tracing/tracing_export.ml @@ -20,327 +20,323 @@ open Tracing let ( let@ ) f x = f x -module Export = struct - let export_interval = ref 30. - - let set_export_interval t = export_interval := t - - let host_id = ref "localhost" - - let set_host_id id = host_id := id - - let service_name = ref None - - let set_service_name name = service_name := Some name - - let get_service_name () = - match !service_name with - | None -> - warn "service name not yet set!" ; - "unknown" - | Some name -> - name - - module Content = struct - module Json = struct - module Zipkinv2 = struct - module ZipkinSpan = struct - type zipkinEndpoint = {serviceName: string} [@@deriving rpcty] - - type annotation = {timestamp: int; value: string} [@@deriving rpcty] - - type t = { - id: string - ; traceId: string - ; parentId: string option - ; name: string - ; timestamp: int - ; duration: int - ; kind: string option - ; localEndpoint: zipkinEndpoint - ; annotations: annotation list - ; tags: (string * string) list - } - [@@deriving rpcty] - - type t_list = t list [@@deriving rpcty] - - let kind_to_zipkin_kind = function - | SpanKind.Internal -> - None - | k -> - Some k - - let json_of_t_list s = - Rpcmarshal.marshal t_list.Rpc.Types.ty s |> Jsonrpc.to_string - end - - let zipkin_span_of_span (s : Span.t) : ZipkinSpan.t = - let serviceName = get_service_name () in - let annotations = +let export_interval = ref 30. + +let set_export_interval t = export_interval := t + +let host_id = ref "localhost" + +let set_host_id id = host_id := id + +let service_name = ref None + +let set_service_name name = service_name := Some name + +let get_service_name () = + match !service_name with + | None -> + warn "service name not yet set!" ; + "unknown" + | Some name -> + name + +module Content = struct + module Json = struct + module Zipkinv2 = struct + module ZipkinSpan = struct + type zipkinEndpoint = {serviceName: string} [@@deriving rpcty] + + type annotation = {timestamp: int; value: string} [@@deriving rpcty] + + type t = { + id: string + ; traceId: string + ; parentId: string option + ; name: string + ; timestamp: int + ; duration: int + ; kind: string option + ; localEndpoint: zipkinEndpoint + ; annotations: annotation list + ; tags: (string * string) list + } + [@@deriving rpcty] + + type t_list = t list [@@deriving rpcty] + + let kind_to_zipkin_kind = function + | SpanKind.Internal -> + None + | k -> + Some k + + let json_of_t_list s = + Rpcmarshal.marshal t_list.Rpc.Types.ty s |> Jsonrpc.to_string + end + + let zipkin_span_of_span (s : Span.t) : ZipkinSpan.t = + let serviceName = get_service_name () in + let annotations = + s + |> Span.get_events + |> List.map (fun event : ZipkinSpan.annotation -> + let timestamp = + int_of_float (event.SpanEvent.time *. 1000000.) + in + let value = event.SpanEvent.name in + {timestamp; value} + ) + in + { + id= s |> Span.get_context |> SpanContext.span_id_of_span_context + ; traceId= s |> Span.get_context |> SpanContext.trace_id_of_span_context + ; parentId= s - |> Span.get_events - |> List.map (fun event : ZipkinSpan.annotation -> - let timestamp = - int_of_float (event.SpanEvent.time *. 1000000.) - in - let value = event.SpanEvent.name in - {timestamp; value} + |> Span.get_parent + |> Option.map (fun x -> + x |> Span.get_context |> SpanContext.span_id_of_span_context ) - in - { - id= s |> Span.get_context |> SpanContext.span_id_of_span_context - ; traceId= - s |> Span.get_context |> SpanContext.trace_id_of_span_context - ; parentId= - s - |> Span.get_parent - |> Option.map (fun x -> - x - |> Span.get_context - |> SpanContext.span_id_of_span_context - ) - ; name= s |> Span.get_name - ; timestamp= int_of_float (Span.get_begin_time s *. 1000000.) - ; duration= - Option.value (Span.get_end_time s) - ~default:(Unix.gettimeofday () *. 1000000.) - -. Span.get_begin_time s - |> ( *. ) 1000000. - |> int_of_float - ; kind= - s - |> Span.get_span_kind - |> ZipkinSpan.kind_to_zipkin_kind - |> Option.map SpanKind.to_string - ; localEndpoint= {serviceName} - ; annotations - ; tags= - Attributes.fold - (fun k v tags -> (k, v) :: tags) - (Span.get_attributes s) [] - } - - let content_of (spans : Span.t list) = - List.map zipkin_span_of_span spans |> ZipkinSpan.json_of_t_list - end + ; name= s |> Span.get_name + ; timestamp= int_of_float (Span.get_begin_time s *. 1000000.) + ; duration= + Option.value (Span.get_end_time s) + ~default:(Unix.gettimeofday () *. 1000000.) + -. Span.get_begin_time s + |> ( *. ) 1000000. + |> int_of_float + ; kind= + s + |> Span.get_span_kind + |> ZipkinSpan.kind_to_zipkin_kind + |> Option.map SpanKind.to_string + ; localEndpoint= {serviceName} + ; annotations + ; tags= + Attributes.fold + (fun k v tags -> (k, v) :: tags) + (Span.get_attributes s) [] + } + + let content_of (spans : Span.t list) = + List.map zipkin_span_of_span spans |> ZipkinSpan.json_of_t_list end end +end - module Destination = struct - module File = struct - let trace_log_dir = ref "/var/log/dt/zipkinv2/json" +module Destination = struct + module File = struct + let trace_log_dir = ref "/var/log/dt/zipkinv2/json" - let max_file_size = ref (1 lsl 20) + let max_file_size = ref (1 lsl 20) - let compress_tracing_files = ref true + let compress_tracing_files = ref true - let set_trace_log_dir dir = trace_log_dir := dir + let set_trace_log_dir dir = trace_log_dir := dir - let get_trace_log_dir () = !trace_log_dir + let get_trace_log_dir () = !trace_log_dir - let set_max_file_size size = max_file_size := size + let set_max_file_size size = max_file_size := size - let set_compress_tracing_files enabled = compress_tracing_files := enabled + let set_compress_tracing_files enabled = compress_tracing_files := enabled - let file_name = ref None + let file_name = ref None - let lock = Mutex.create () + let lock = Mutex.create () - let new_file_name () = - let date = Ptime_clock.now () |> Ptime.to_rfc3339 ~frac_s:6 in - let ( // ) = Filename.concat in - let name = - !trace_log_dir - // String.concat "-" [get_service_name (); !host_id; date] - ^ ".ndjson" - in - file_name := Some name ; - name - - let with_fd file_name = - Xapi_stdext_unix.Unixext.with_file file_name - [O_WRONLY; O_CREAT; O_APPEND] - 0o700 - - let write fd str = - let content = str ^ "\n" in - ignore @@ Unix.write_substring fd content 0 (String.length content) - - let export json = - try - let file_name = - match !file_name with None -> new_file_name () | Some x -> x - in - Xapi_stdext_unix.Unixext.mkdir_rec (Filename.dirname file_name) 0o700 ; - let@ fd = file_name |> with_fd in - write fd json ; - if (Unix.fstat fd).st_size >= !max_file_size then ( - debug "Tracing: Rotating file %s > %d" file_name !max_file_size ; - if !compress_tracing_files then - Zstd.Fast.compress_file Zstd.Fast.compress ~file_path:file_name - ~file_ext:"zst" ; - ignore @@ new_file_name () - ) ; - Ok () - with e -> Error e + let new_file_name () = + let date = Ptime_clock.now () |> Ptime.to_rfc3339 ~frac_s:6 in + let ( // ) = Filename.concat in + let name = + !trace_log_dir + // String.concat "-" [get_service_name (); !host_id; date] + ^ ".ndjson" + in + file_name := Some name ; + name - let with_stream f = - Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> f export) - end + let with_fd file_name = + Xapi_stdext_unix.Unixext.with_file file_name + [O_WRONLY; O_CREAT; O_APPEND] + 0o700 - module Http = struct - module Request = Cohttp.Request.Make (Cohttp_posix_io.Buffered_IO) - module Response = Cohttp.Response.Make (Cohttp_posix_io.Buffered_IO) - - let export ~url json = - try - let body = json in - let headers = - Cohttp.Header.of_list - ([ - ("Content-Type", "application/json") - ; ("Content-Length", string_of_int (String.length body)) - ] - @ - match Uri.host url with - | None -> - [] - | Some h -> - let port = - match Uri.port url with - | Some p -> - ":" ^ string_of_int p - | None -> - "" - in - [("Host", h ^ port)] - ) - in - Open_uri.with_open_uri url (fun fd -> - let request = - Cohttp.Request.make ~meth:`POST ~version:`HTTP_1_1 ~headers url - in - let ic = Unix.in_channel_of_descr fd in - let oc = Unix.out_channel_of_descr fd in - Request.write - (fun writer -> Request.write_body writer body) - request oc ; - (* We flush instead of closing the sending stream as nginx responds to a TCP - half-shutdown with a full shutdown of both directions of the HTTP request *) - flush oc ; - match try Response.read ic with _ -> `Eof with - | `Eof -> - Ok () - | `Invalid x -> - Error (Failure ("invalid read: " ^ x)) - | `Ok response - when Cohttp.Code.(response.status |> code_of_status |> is_error) - -> - Error (Failure (Cohttp.Code.string_of_status response.status)) - | `Ok response -> - let body = Buffer.create 128 in - let reader = Response.make_body_reader response ic in - let rec loop () = - match Response.read_body_chunk reader with - | Cohttp.Transfer.Chunk x -> - Buffer.add_string body x ; loop () - | Cohttp.Transfer.Final_chunk x -> - Buffer.add_string body x - | Cohttp.Transfer.Done -> - () - in - loop () ; Ok () - ) - with e -> Error e - end + let write fd str = + let content = str ^ "\n" in + ignore @@ Unix.write_substring fd content 0 (String.length content) - let export_to_endpoint parent traces endpoint = - debug "Tracing: About to export" ; + let export json = try - File.with_stream (fun file_export -> - let export, name = - match endpoint with - | Url url -> - (Http.export ~url, "Tracing.Http.export") - | Bugtool -> - (file_export, "Tracing.File.export") - in - let all_spans = - Hashtbl.fold (fun _ spans acc -> spans @ acc) traces [] - in - let attributes = - [ - ("export.span.count", List.length all_spans |> string_of_int) - ; ("export.endpoint", endpoint_to_string endpoint) - ; ( "xs.tracing.spans_table.count" - , Spans.span_count () |> string_of_int - ) - ; ( "xs.tracing.finished_spans_table.count" - , Hashtbl.length traces |> string_of_int - ) - ] - in - let@ _ = with_tracing ~parent ~attributes ~name in - Content.Json.Zipkinv2.content_of all_spans - |> export - |> Result.iter_error raise - ) - with exn -> - debug "Tracing: unable to export span : %s" (Printexc.to_string exn) - - let flush_spans () = - let span_list = Spans.since () in - let attributes = - [("export.traces.count", Hashtbl.length span_list |> string_of_int)] - in - let@ parent = - with_tracing ~parent:None ~attributes ~name:"Tracing.flush_spans" - in - get_tracer_providers () - |> List.filter TracerProvider.get_enabled - |> List.concat_map (fun x -> TracerProvider.get_endpoints x) - |> List.iter (export_to_endpoint parent span_list) - - let delay = Delay.make () - - (* Note this signal will flush the spans and terminate the exporter thread *) - let signal () = Delay.signal delay - - let create_exporter () = - enable_span_garbage_collector () ; - Thread.create - (fun () -> - let signaled = ref false in - while not !signaled do - debug "Tracing: Waiting %d seconds before exporting spans" - (int_of_float !export_interval) ; - if not (Delay.wait delay !export_interval) then ( - debug "Tracing: we are signaled, export spans now and exit" ; - signaled := true - ) ; - flush_spans () - done - ) - () + let file_name = + match !file_name with None -> new_file_name () | Some x -> x + in + Xapi_stdext_unix.Unixext.mkdir_rec (Filename.dirname file_name) 0o700 ; + let@ fd = file_name |> with_fd in + write fd json ; + if (Unix.fstat fd).st_size >= !max_file_size then ( + debug "Tracing: Rotating file %s > %d" file_name !max_file_size ; + if !compress_tracing_files then + Zstd.Fast.compress_file Zstd.Fast.compress ~file_path:file_name + ~file_ext:"zst" ; + ignore @@ new_file_name () + ) ; + Ok () + with e -> Error e + + let with_stream f = + Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> f export) + end - let exporter = ref None + module Http = struct + module Request = Cohttp.Request.Make (Cohttp_posix_io.Buffered_IO) + module Response = Cohttp.Response.Make (Cohttp_posix_io.Buffered_IO) - let lock = Mutex.create () + let export ~url json = + try + let body = json in + let headers = + Cohttp.Header.of_list + ([ + ("Content-Type", "application/json") + ; ("Content-Length", string_of_int (String.length body)) + ] + @ + match Uri.host url with + | None -> + [] + | Some h -> + let port = + match Uri.port url with + | Some p -> + ":" ^ string_of_int p + | None -> + "" + in + [("Host", h ^ port)] + ) + in + Open_uri.with_open_uri url (fun fd -> + let request = + Cohttp.Request.make ~meth:`POST ~version:`HTTP_1_1 ~headers url + in + let ic = Unix.in_channel_of_descr fd in + let oc = Unix.out_channel_of_descr fd in + Request.write + (fun writer -> Request.write_body writer body) + request oc ; + (* We flush instead of closing the sending stream as nginx responds to a TCP + half-shutdown with a full shutdown of both directions of the HTTP request *) + flush oc ; + match try Response.read ic with _ -> `Eof with + | `Eof -> + Ok () + | `Invalid x -> + Error (Failure ("invalid read: " ^ x)) + | `Ok response + when Cohttp.Code.(response.status |> code_of_status |> is_error) + -> + Error (Failure (Cohttp.Code.string_of_status response.status)) + | `Ok response -> + let body = Buffer.create 128 in + let reader = Response.make_body_reader response ic in + let rec loop () = + match Response.read_body_chunk reader with + | Cohttp.Transfer.Chunk x -> + Buffer.add_string body x ; loop () + | Cohttp.Transfer.Final_chunk x -> + Buffer.add_string body x + | Cohttp.Transfer.Done -> + () + in + loop () ; Ok () + ) + with e -> Error e + end - let main () = - Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> - match !exporter with - | None -> - let tid = create_exporter () in - exporter := Some tid ; - tid - | Some tid -> - tid + let export_to_endpoint parent traces endpoint = + debug "Tracing: About to export" ; + try + File.with_stream (fun file_export -> + let export, name = + match endpoint with + | Url url -> + (Http.export ~url, "Tracing.Http.export") + | Bugtool -> + (file_export, "Tracing.File.export") + in + let all_spans = + Hashtbl.fold (fun _ spans acc -> spans @ acc) traces [] + in + let attributes = + [ + ("export.span.count", all_spans |> List.length |> string_of_int) + ; ("export.endpoint", endpoint_to_string endpoint) + ; ( "xs.tracing.spans_table.count" + , Spans.span_count () |> string_of_int + ) + ; ( "xs.tracing.finished_spans_table.count" + , traces |> Hashtbl.length |> string_of_int + ) + ] + in + let@ _ = with_tracing ~parent ~attributes ~name in + all_spans + |> Content.Json.Zipkinv2.content_of + |> export + |> Result.iter_error raise ) - end + with exn -> + debug "Tracing: unable to export span : %s" (Printexc.to_string exn) + + let flush_spans () = + let span_list = Spans.since () in + let attributes = + [("export.traces.count", Hashtbl.length span_list |> string_of_int)] + in + let@ parent = + with_tracing ~parent:None ~attributes ~name:"Tracing.flush_spans" + in + get_tracer_providers () + |> List.filter TracerProvider.get_enabled + |> List.concat_map TracerProvider.get_endpoints + |> List.iter (export_to_endpoint parent span_list) + + let delay = Delay.make () + + (* Note this signal will flush the spans and terminate the exporter thread *) + let signal () = Delay.signal delay + + let create_exporter () = + enable_span_garbage_collector () ; + Thread.create + (fun () -> + let signaled = ref false in + while not !signaled do + debug "Tracing: Waiting %d seconds before exporting spans" + (int_of_float !export_interval) ; + if not (Delay.wait delay !export_interval) then ( + debug "Tracing: we are signaled, export spans now and exit" ; + signaled := true + ) ; + flush_spans () + done + ) + () + + let exporter = ref None + + let lock = Mutex.create () + + let main () = + Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () -> + match !exporter with + | None -> + let tid = create_exporter () in + exporter := Some tid ; + tid + | Some tid -> + tid + ) end -let flush_and_exit = Export.Destination.signal +let flush_and_exit = Destination.signal -let main = Export.Destination.main +let main = Destination.main diff --git a/ocaml/libs/tracing/tracing_export.mli b/ocaml/libs/tracing/tracing_export.mli index ca282440a02..a9a86ff6cc3 100644 --- a/ocaml/libs/tracing/tracing_export.mli +++ b/ocaml/libs/tracing/tracing_export.mli @@ -12,29 +12,27 @@ * GNU Lesser General Public License for more details. *) -module Export : sig - val set_export_interval : float -> unit +val set_export_interval : float -> unit - val set_host_id : string -> unit +val set_host_id : string -> unit - val set_service_name : string -> unit +val set_service_name : string -> unit - module Destination : sig - module File : sig - val set_max_file_size : int -> unit +module Destination : sig + module File : sig + val set_max_file_size : int -> unit - val set_trace_log_dir : string -> unit + val set_trace_log_dir : string -> unit - val get_trace_log_dir : unit -> string + val get_trace_log_dir : unit -> string - val set_compress_tracing_files : bool -> unit - end + val set_compress_tracing_files : bool -> unit + end - val flush_spans : unit -> unit + val flush_spans : unit -> unit - module Http : sig - val export : url:Uri.t -> string -> (unit, exn) result - end + module Http : sig + val export : url:Uri.t -> string -> (unit, exn) result end end