Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in tracing.ml #5601

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 54 additions & 38 deletions ocaml/libs/tracing/tracing.ml
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ let validate_attribute (key, value) =
&& Re.execp attribute_key_regex key
&& W3CBaggage.Key.is_valid_key key

let observe = ref true
let observe = Atomic.make true

let set_observe mode = observe := mode
let set_observe mode = Atomic.set observe mode

module SpanKind = struct
type t = Server | Consumer | Client | Producer | Internal [@@deriving rpcty]
Expand Down Expand Up @@ -296,34 +296,43 @@ module Spans = struct

let spans = Hashtbl.create 100

let span_count () = Hashtbl.length spans
let span_count () =
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
Hashtbl.length spans
)

let max_spans = ref 1000
let max_spans = Atomic.make 1000

let set_max_spans x = max_spans := x
let set_max_spans x = Atomic.set max_spans x

let max_traces = ref 1000
let max_traces = Atomic.make 1000

let set_max_traces x = max_traces := x
let set_max_traces x = Atomic.set max_traces x

let finished_spans = Hashtbl.create 100

let span_hashtbl_is_empty () = Hashtbl.length spans = 0
let span_hashtbl_is_empty () =
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
Hashtbl.length spans = 0
)

let finished_span_hashtbl_is_empty () = Hashtbl.length finished_spans = 0
let finished_span_hashtbl_is_empty () =
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
Hashtbl.length finished_spans = 0
)

let add_to_spans ~(span : Span.t) =
let key = span.context.trace_id in
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
match Hashtbl.find_opt spans key with
| None ->
if Hashtbl.length spans < !max_traces then
if Hashtbl.length spans < Atomic.get max_traces then
Hashtbl.add spans key [span]
else
debug "%s exceeded max traces when adding to span table"
__FUNCTION__
| Some span_list ->
if List.length span_list < !max_spans then
if List.length span_list < Atomic.get max_spans then
Hashtbl.replace spans key (span :: span_list)
else
debug "%s exceeded max traces when adding to span table"
Expand Down Expand Up @@ -354,13 +363,13 @@ module Spans = struct
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
match Hashtbl.find_opt finished_spans key with
| None ->
if Hashtbl.length finished_spans < !max_traces then
if Hashtbl.length finished_spans < Atomic.get max_traces then
Hashtbl.add finished_spans key [span]
else
debug "%s exceeded max traces when adding to finished span table"
__FUNCTION__
| Some span_list ->
if List.length span_list < !max_spans then
if List.length span_list < Atomic.get max_spans then
Hashtbl.replace finished_spans key (span :: span_list)
else
debug "%s exceeded max traces when adding to finished span table"
Expand All @@ -373,13 +382,14 @@ module Spans = struct
match x with
| None ->
false
| Some (span : Span.t) -> (
match Hashtbl.find_opt finished_spans span.context.trace_id with
| None ->
false
| Some span_list ->
List.exists (fun x -> x = span) span_list
)
| Some (span : Span.t) ->
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
match Hashtbl.find_opt finished_spans span.context.trace_id with
| None ->
false
| Some span_list ->
List.exists (fun x -> x = span) span_list
)

(** since copies the existing finished spans and then clears the existing spans as to only export them once *)
let since () =
Expand All @@ -389,12 +399,15 @@ module Spans = struct
copy
)

let dump () = Hashtbl.(copy spans, Hashtbl.copy finished_spans)
let dump () =
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
Hashtbl.(copy spans, Hashtbl.copy finished_spans)
)

module GC = struct
let lock = Mutex.create ()

let span_timeout = ref 86400.
let span_timeout = Atomic.make 86400.

let span_timeout_thread = ref None

Expand All @@ -408,7 +421,7 @@ module Spans = struct
let elapsed =
Unix.gettimeofday () -. span.Span.begin_time
in
if elapsed > !span_timeout *. 1000000. then (
if elapsed > Atomic.get span_timeout *. 1000000. then (
debug "Tracing: Span %s timed out, forcibly finishing now"
span.Span.context.span_id ;
let span =
Expand All @@ -431,14 +444,14 @@ module Spans = struct
)

let initialise_thread ~timeout =
span_timeout := timeout ;
Atomic.set span_timeout timeout ;
span_timeout_thread :=
Some
(Thread.create
(fun () ->
while true do
debug "Tracing: Span garbage collector" ;
Thread.delay !span_timeout ;
Thread.delay (Atomic.get span_timeout) ;
gc_inactive_spans ()
done
)
Expand Down Expand Up @@ -538,9 +551,12 @@ let lock = Mutex.create ()

let tracer_providers = Hashtbl.create 100

let get_tracer_providers () =
let get_tracer_providers_unlocked () =
Hashtbl.fold (fun _ provider acc -> provider :: acc) tracer_providers []

let get_tracer_providers () =
Xapi_stdext_threads.Threadext.Mutex.execute lock get_tracer_providers_unlocked

let set ?enabled ?attributes ?endpoints ~uuid () =
Xapi_stdext_threads.Threadext.Mutex.execute lock (fun () ->
let provider =
Expand All @@ -561,17 +577,17 @@ let set ?enabled ?attributes ?endpoints ~uuid () =
failwith
(Printf.sprintf "The TracerProvider : %s does not exist" uuid)
in
Hashtbl.replace tracer_providers uuid provider
) ;
if
List.for_all
(fun provider -> not provider.TracerProvider.enabled)
(get_tracer_providers ())
then
Xapi_stdext_threads.Threadext.Mutex.execute Spans.lock (fun () ->
Hashtbl.clear Spans.spans ;
Hashtbl.clear Spans.finished_spans
)
Hashtbl.replace tracer_providers uuid provider ;
if
List.for_all
(fun provider -> not provider.TracerProvider.enabled)
(get_tracer_providers_unlocked ())
then
Xapi_stdext_threads.Threadext.Mutex.execute Spans.lock (fun () ->
Hashtbl.clear Spans.spans ;
Hashtbl.clear Spans.finished_spans
)
)

let create ~enabled ~attributes ~endpoints ~name_label ~uuid =
let endpoints = List.map endpoint_of_string endpoints in
Expand Down Expand Up @@ -615,7 +631,7 @@ let enable_span_garbage_collector ?(timeout = 86400.) () =
Spans.GC.initialise_thread ~timeout

let with_tracing ?(attributes = []) ?(parent = None) ~name f =
if not !observe then
if not (Atomic.get observe) then
f None
else
let tracer = get_tracer ~name in
Expand Down
Loading