From 86cf4b697fd999632684205a7bca667204e5a0e0 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Tue, 23 Jul 2024 22:56:34 -0400 Subject: [PATCH] Significantly refactored trace aggregator Previously we were leaking `ThreadTracer` objects in the `TraceAggregator` as creating new threads means `ThreadTracer` gets pushed into `TraceAggregator` but it is never removed. This causes a memory leak and also makes the `TraceAggregator` slower. This refactors the entire code to make this work. Some of the highlights are: - Removed the feature of dynamically adding in trace `Sink`s. `Sink` can now only be specified when the trace session is started. Since we don't really have any use case of dynamically hooking in sinks, this feature is a lot of complexity for no reason. - This also removed the sticky packet feature within the `TraceAggregator` which saved additional complexity. - Remove the feature of the `TraceAggregator` mirroring data to multiple `Sink`s. This is also unnecessary. Could create a `MultiSink` feature if necessary to emulate this. So now, when you start a trace session, you must give a single sink for the data to be pushed to. - `TraceAggregator` is now a permanent object (as a `shared_ptr`) on the `App` instead of being dynamically created and deleted when the trace session is started and stopped. This skips the need of having App cache the list of `ThreadTracer`s and pass it to the `TraceAggregator` during its construction when the trace session starts. - Instead, the `TraceAggregator` internally has a `SessionData` object (`session_`). This object is recreated and deleted when the trace session starts and stops. - Since the `TraceAggregator` is permanent now, the `Thread` directly register the `ThreadTracer` with the `TraceAggregator` during its start up procedure. This replaces registering through the `App`. The `Thread` are also now holding a `weak_ptr` to the `TraceAggregator`. - When a thread shuts down, the `ThreadTracer` is marked as "done". The `TraceAggregator` will check for this "done" status if no more events are available from the queue. If it is done, the `ThreadTracer` will be removed from the `TraceAggregator`. - `TraceAggregator` no longer supports `RequestStop` and `Join`. This is replaced with a single `Stop` call which will wait for the `TraceAggregator` to fully stop and reset the state of the TraceAggregator (and any registered `ThreadTracer`'s string interner). - Right now, there is a potential data race during the `TraceAggregator.Stop`, as we access `session_` without a lock. This is most likely OK as we don't expect `TraceAggregator.Stop` to be called from multiple threads or rapidly recreated for now. **Should probably fix it in the future.** - String interner states are now reset when a trace session stops. This means if another trace session is started, the strings it remembers are reset. The id starting positions are also reset. - Since we no longer have sticky packets, we also no longer emit a trace event packet that contains interned data from a previous trace session. --- .clang-tidy | 3 +- docs/imgs/trace-architecture.svg | 2 +- docs/imgs/tracing-ownership-structure.svg | 4 + docs/tracing.md | 49 ++-- examples/tracing_example_no_rt/main.cc | 8 +- include/cactus_rt/app.h | 27 +-- include/cactus_rt/thread.h | 28 ++- .../tracing/thread_tracer.disabled.h | 4 + include/cactus_rt/tracing/thread_tracer.h | 23 ++ .../tracing/trace_aggregator.disabled.h | 8 +- include/cactus_rt/tracing/trace_aggregator.h | 121 ++++------ .../cactus_rt/tracing/utils/string_interner.h | 2 + src/cactus_rt/app.cc | 72 +----- src/cactus_rt/thread.cc | 12 +- src/cactus_rt/tracing/trace_aggregator.cc | 214 ++++++++---------- .../tracing/utils/string_interner.cc | 6 + tests/tracing/single_threaded_test.cc | 95 +++----- 17 files changed, 264 insertions(+), 414 deletions(-) create mode 100644 docs/imgs/tracing-ownership-structure.svg diff --git a/.clang-tidy b/.clang-tidy index e3953db..3ab6ba8 100644 --- a/.clang-tidy +++ b/.clang-tidy @@ -21,7 +21,8 @@ cert-*, -readability-identifier-length, -readability-isolate-declaration, -readability-magic-numbers, --readability-redundant-inline-specifier' +-readability-redundant-inline-specifier, +-readability-use-anyofallof' # TODO: Re-enable bugprone-exception-escape when no longer throwing # https://github.com/isocpp/CppCoreGuidelines/issues/1589 WarningsAsErrors: '*' diff --git a/docs/imgs/trace-architecture.svg b/docs/imgs/trace-architecture.svg index bf565d1..af98ddc 100644 --- a/docs/imgs/trace-architecture.svg +++ b/docs/imgs/trace-architecture.svg @@ -1,4 +1,4 @@ -
Thread 1
Thread 1
ThreadTracer
ThreadTracer
TrackEventInternal
TrackEventInternal
Thread 2
Thread 2
ThreadTracer
ThreadTracer
TrackEventInternal
TrackEventInternal
Thread 3
Thread 3
ThreadTracer
ThreadTracer
TrackEventInternal
TrackEventInternal
FileSink
(write data to file)
FileSink...
Other Sink
(write data to custom sinks)
Other Sink...
TraceAggregator
TraceAggregator
Text is not SVG - cannot display
\ No newline at end of file +
Thread 1
Thread 1
ThreadTracer
ThreadTracer
TrackEventInternal
TrackEventInternal
Thread 2
Thread 2
ThreadTracer
ThreadTracer
TrackEventInternal
TrackEventInternal
Thread 3
Thread 3
ThreadTracer
ThreadTracer
TrackEventInternal
TrackEventInternal
FileSink
(write data to file)
FileSink...
TraceAggregator
TraceAggregator
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/imgs/tracing-ownership-structure.svg b/docs/imgs/tracing-ownership-structure.svg new file mode 100644 index 0000000..9a015e2 --- /dev/null +++ b/docs/imgs/tracing-ownership-structure.svg @@ -0,0 +1,4 @@ + + + +
shared_ptr
shared_ptr
shared_ptr
shared_ptr
App
App
shared_ptr
shared_ptr
weak_ptr
weak_ptr
Thread
Thread
ThreadTracer
ThreadTracer
shared_ptr
shared_ptr
unique_ptr
unique_ptr
TraceAggregator
TraceAggregator
TraceAggregator::
SessionState
TraceAggregator:...
stack/global var
stack/global var
shared_ptr
shared_ptr
main()
main()
User-accessible API
User-accessible...
cactus_rt internals
cactus_rt inter...
Legend
Legend
Text is not SVG - cannot display
\ No newline at end of file diff --git a/docs/tracing.md b/docs/tracing.md index 78f55a6..0314b7e 100644 --- a/docs/tracing.md +++ b/docs/tracing.md @@ -191,45 +191,45 @@ reading a single global atomic boolean variable. This variable controls all traces from all threads within the process. Upon enabling tracing via `App::StartTraceSession`, `cactus_rt` also creates and -starts the `TraceAggregator` threads and registers the appropriate sinks. The -`App` object caches a list of known `ThreadTracers` from all the threads that -currently exists and this is passed to the newly created `TraceAggregator`. -Perfetto's file format specification indicates that the track descriptor packets -must be first written before the actual trace event packets. Thus, after the -creation of the `TraceAggregator` and `Sink` registration, a +starts the `TraceAggregator` thread. Perfetto's file format specification +indicates that the track descriptor packets must be first written before the +actual trace event packets. Thus, after starting the `TraceAggregator`, a [`ProcessDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ProcessDescriptor) -packet is first written. Upon the registration of each of the cached -`ThreadTracers` as passed through by `App`, a +packet is first written. A [`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor) -packet is emitted for each thread. Then, the main loop of the `TraceAggregator` -can run which will write track event packets to the sinks. +packet is emitted for each thread that is known to the `TraceAggregator`. Then, +the main loop of the `TraceAggregator` can run which will write track event +packets to the sinks. When tracing is disabled via `App::StopTraceSession`, the tracing enabled atomic bool will be set to false. The system will request the `TraceAggregator` thread to drain all data from the existing `ThreadTracers` and stop. Once this is done, -the file is closed and the `TraceAggregator` is destroyed to save resources. +the file is closed and the `TraceAggregator` states (interned data, sequence +states) are reset so they can be launched again. #### Dynamic thread creation Each `Thread` owns a `ThreadTracer`. However, when a thread starts, it must -notify the `App` and `TraceAggregator` (if tracing is enabled and it exists) of -its existence and thread id so a +notify the `TraceAggregator` (if tracing is enabled and it exists) of its +existence and thread id so a [`ThreadDescriptor`](https://perfetto.dev/docs/reference/trace-packet-proto#ThreadDescriptor) packet can be written to the data stream before any trace event data is written. -If tracing is not enabled and thus `TraceAggregator` is not present, the `App` -will cache the `ThreadTracers` and will pass it onto the `TraceAggregator` -if/when tracing is enabled. +If tracing is not enabled right now, the `TraceAggregator` will cache the +`ThreadTracers` so that once tracing is enabled, the `ThreadDescriptor` packet +is written out. -The `Thread` is able to communicate with the `App` by storing a non-owning -pointer to the `App`. This pointer is setup during `App::RegisterThread` so -there's no explicit dependency between `Thread` and `App` during construction. -This decision may be revisited in the future. +The `Thread` is able to communicate with the `TraceAggregator` by storing a +`weak_ptr` to the `TraceAggregator`. This pointer is setup during +`App::RegisterThread` so there's no explicit dependency between `Thread` and +`App` during construction. This decision may be revisited in the future. -#### Cleanup after thread shutdown +#### Ownership structure -TODO... +The structure is not ideal and has some problems, but works for the most part. -#### Dynamic sink registration +![Trace architecture](imgs/tracing-ownership-structure.svg) + +#### Cleanup after thread shutdown TODO... @@ -255,6 +255,9 @@ noting: (i.e. thus `(trusted_packet_sequence_id, iid)` is sufficient to identify an interned string). This, along with (1), implies we have to intern strings on a per-thread interner. +3. When a tracing session stop, the string interner states for all known thread + tracers are reset. This means a subsequent session will not have the same + string iids. ### Other notes diff --git a/examples/tracing_example_no_rt/main.cc b/examples/tracing_example_no_rt/main.cc index b6533ce..86bc64f 100644 --- a/examples/tracing_example_no_rt/main.cc +++ b/examples/tracing_example_no_rt/main.cc @@ -1,6 +1,5 @@ #include -#include #include #include @@ -25,18 +24,15 @@ void StartTracing(const char* app_name, const char* filename) { // Create the file sink so the data aggregated by the TraceAggregator will be written to somewhere. auto file_sink = std::make_shared(filename); - trace_aggregator->RegisterSink(file_sink); quill::start(); - trace_aggregator->Start(); + trace_aggregator->Start(file_sink); } void StopTracing() { cactus_rt::tracing::DisableTracing(); - trace_aggregator->RequestStop(); - trace_aggregator->Join(); - trace_aggregator = nullptr; // Destroy the trace aggregator and free all resources. + trace_aggregator->Stop(); } int main() { diff --git a/include/cactus_rt/app.h b/include/cactus_rt/app.h index 5b47f4b..345ebad 100644 --- a/include/cactus_rt/app.h +++ b/include/cactus_rt/app.h @@ -3,7 +3,6 @@ #include -#include #include #include #include @@ -36,14 +35,7 @@ class App { std::vector> threads_; - // We need to cache the list thread tracers here because the trace_aggregator - // can be dynamically created and stopped. When a new trace aggregator is - // created, it needs to know about all the thread tracers. - // - // TODO: investigate into a weak pointer. - std::list> thread_tracers_; - std::unique_ptr trace_aggregator_ = nullptr; - std::mutex aggregator_mutex_; + std::shared_ptr trace_aggregator_; void SetDefaultLogFormat(quill::Config& cfg) { // Create a handler of stdout @@ -117,11 +109,6 @@ class App { */ bool StartTraceSession(std::shared_ptr sink) noexcept; - /** - * @brief Register a custom trace sink after starting the trace session - */ - void RegisterTraceSink(std::shared_ptr sink) noexcept; - /** * @brief Stops the tracing session for the process. Will be no-op if tracing * is not enabled. This function is not real-time safe. @@ -148,18 +135,6 @@ class App { void StartQuill(); private: - /** - * @brief Register a thread tracer. Should only be called from Thread::RunThread. - */ - void RegisterThreadTracer(std::shared_ptr thread_tracer) noexcept; - - /** - * @brief Remove a thread tracer. Should only be called from Thread::~Thread(). - */ - void DeregisterThreadTracer(const std::shared_ptr& thread_tracer) noexcept; - - void CreateAndStartTraceAggregator(std::shared_ptr sink) noexcept; - void StopTraceAggregator() noexcept; }; } // namespace cactus_rt diff --git a/include/cactus_rt/thread.h b/include/cactus_rt/thread.h index c796870..ef25e62 100644 --- a/include/cactus_rt/thread.h +++ b/include/cactus_rt/thread.h @@ -7,20 +7,17 @@ #include #include #include -#include #include "config.h" #include "quill/Quill.h" #include "tracing/thread_tracer.h" +#include "tracing/trace_aggregator.h" namespace cactus_rt { /// @private constexpr size_t kDefaultStackSize = 8 * 1024 * 1024; // 8MB default stack space should be plenty -// Necessary forward declaration -class App; - class Thread { ThreadConfig config_; std::string name_; @@ -28,7 +25,7 @@ class Thread { size_t stack_size_; quill::Logger* logger_; - std::shared_ptr tracer_; + std::shared_ptr tracer_ = nullptr; std::atomic_bool stop_requested_ = false; @@ -41,12 +38,10 @@ class Thread { */ static void* RunThread(void* data); - friend class App; - - // Non-owning App pointer. Used only for notifying that the thread has - // started/stopped for tracing purposes. Set by Thread::Start and read at + // Non-owning TraceAggregator pointer. Used only for notifying that the thread + // has started/stopped for tracing purposes. Set by Thread::Start and read at // the beginning of Thread::RunThread. - App* app_ = nullptr; + std::weak_ptr trace_aggregator_; public: /** @@ -60,8 +55,7 @@ class Thread { name_(name), cpu_affinity_(config_.cpu_affinity), stack_size_(static_cast(PTHREAD_STACK_MIN) + config_.stack_size), - logger_(quill::create_logger(name_)), - tracer_(std::make_shared(name, config_.tracer_config.queue_size)) { + logger_(quill::create_logger(name_)) { if (!config.scheduler) { throw std::runtime_error("ThreadConfig::scheduler cannot be nullptr"); } @@ -123,12 +117,16 @@ class Thread { * * @private */ - inline void SetApp(App* app) { - app_ = app; + inline void SetTraceAggregator(std::weak_ptr trace_aggregator) { + trace_aggregator_ = trace_aggregator; } protected: - inline quill::Logger* Logger() const { return logger_; } + inline quill::Logger* Logger() const { return logger_; } + + /** + * Gets the current tracer object. Should only ever be called from within the thread itself. + */ inline tracing::ThreadTracer& Tracer() { return *tracer_; } inline int64_t StartMonotonicTimeNs() const { return start_monotonic_time_ns_; } inline const ThreadConfig& Config() const noexcept { return config_; } diff --git a/include/cactus_rt/tracing/thread_tracer.disabled.h b/include/cactus_rt/tracing/thread_tracer.disabled.h index c6f185a..e7a6e8e 100644 --- a/include/cactus_rt/tracing/thread_tracer.disabled.h +++ b/include/cactus_rt/tracing/thread_tracer.disabled.h @@ -77,6 +77,10 @@ class ThreadTracer { void SetTid() noexcept {} + void MarkDone() noexcept {} + + void IsDone() noexcept {} + private: template bool Emit(Args&&... /* args */) noexcept { diff --git a/include/cactus_rt/tracing/thread_tracer.h b/include/cactus_rt/tracing/thread_tracer.h index c5e3615..9d43e3d 100644 --- a/include/cactus_rt/tracing/thread_tracer.h +++ b/include/cactus_rt/tracing/thread_tracer.h @@ -1,6 +1,7 @@ #ifndef CACTUS_TRACING_THREAD_TRACER_H_ #define CACTUS_TRACING_THREAD_TRACER_H_ +#include #ifndef CACTUS_RT_TRACING_ENABLED #include "thread_tracer.disabled.h" #else @@ -36,6 +37,8 @@ class ThreadTracer { moodycamel::ReaderWriterQueue queue_; + std::atomic_bool thread_done_; + // The event name interning must be done per thread (per sequence). Thus it is // stored here. However, this class must NEVER call functions here (other // than maybe .Size), as the memory allocation can occur. This variable is @@ -84,6 +87,26 @@ class ThreadTracer { */ void SetTid() noexcept { tid_ = gettid(); } + /** + * @brief This marks this thread tracer as "done" and thus the trace + * aggregator will try to remove it after flushing the data. + * + * @private + */ + void MarkDone() noexcept { + thread_done_.store(true, std::memory_order_release); + } + + /** + * @brief Checks if this thread tracer is done. Should only be called from + * TraceAggregator. + * + * @private + */ + bool IsDone() noexcept { + return thread_done_.load(std::memory_order_acquire); + } + private: template bool Emit(Args&&... args) noexcept; diff --git a/include/cactus_rt/tracing/trace_aggregator.disabled.h b/include/cactus_rt/tracing/trace_aggregator.disabled.h index 76a4928..bb94b63 100644 --- a/include/cactus_rt/tracing/trace_aggregator.disabled.h +++ b/include/cactus_rt/tracing/trace_aggregator.disabled.h @@ -11,7 +11,7 @@ namespace cactus_rt::tracing { class TraceAggregator { public: - explicit TraceAggregator(std::string /* name */, std::vector /* cpu_affinity */) {} + explicit TraceAggregator(std::string /* name */) {} TraceAggregator(const TraceAggregator&) = delete; TraceAggregator& operator=(const TraceAggregator&) = delete; @@ -24,11 +24,9 @@ class TraceAggregator { void DeregisterThreadTracer(const std::shared_ptr& /* tracer */) {} - void Start() {}; + void Start(std::shared_ptr /* sink */, std::vector cpu_affinity = {}) {}; - void RequestStop() noexcept {} - - void Join() noexcept {} + void Stop() noexcept {} }; } // namespace cactus_rt::tracing #endif diff --git a/include/cactus_rt/tracing/trace_aggregator.h b/include/cactus_rt/tracing/trace_aggregator.h index 4e18dcb..3b9965b 100644 --- a/include/cactus_rt/tracing/trace_aggregator.h +++ b/include/cactus_rt/tracing/trace_aggregator.h @@ -12,32 +12,41 @@ #include #include #include -#include #include #include "sink.h" #include "thread_tracer.h" -#include "utils/string_interner.h" namespace cactus_rt::tracing { class TraceAggregator { using Trace = cactus_tracing::vendor::perfetto::protos::Trace; - using InternedData = cactus_tracing::vendor::perfetto::protos::InternedData; - std::string process_name_; - std::vector cpu_affinity_; - uint64_t process_track_uuid_; - quill::Logger* logger_; + struct SessionState { + const std::vector cpu_affinity; + const std::shared_ptr sink; - // We use a std::thread and not a cactus_rt::Thread as cactus_rt::Thread has - // dependency on this class, so we cannot have a circular dependency. - std::thread thread_; - std::atomic_bool stop_requested_ = false; - std::mutex mutex_; + // We use a std::thread and not a cactus_rt::Thread as cactus_rt::Thread has + // dependency on this class, so we cannot have a circular dependency. + std::thread thread; + std::atomic_bool stop_requested = false; - // A list of sinks the output should be written to. - std::list> sinks_; + // This is a set of sequence ids where the first packet has already been emitted. + // If a sequence is not in here, the first packet emitted with have + // first_packet_on_sequence = true, previous_packet_dropped = true, and + // sequence_flags = SEQ_INCREMENTAL_STATE_CLEARED + std::unordered_set sequences_with_first_packet_emitted; + + explicit SessionState(std::shared_ptr s, std::vector affinity = {}) : cpu_affinity(affinity), sink(s) {} + }; + + const std::string process_name_; + + const uint64_t process_track_uuid_; + quill::Logger* logger_; + + // This mutex protects tracers_ and session_ + std::mutex mutex_; // This is a list of all known thread tracers. The background processing // thread will loop through this and pop all data from the queues. @@ -46,40 +55,11 @@ class TraceAggregator { // no problem. std::list> tracers_; - // This is a vector of sticky trace packets that should always be emitted - // when a new sink connects to the tracer. When a new sink connects to the - // tracer, these packet will be sent first, before any events are sent. - // - // Packets that should be included here are things like the process/thread - // track descriptor packets, or any other packets that affect the trace - // globally and must be emitted before events are emitted. - // - // The list of packets only grow here (although shouldn't grow that much). - std::list sticky_trace_packets_; - - // These are the interners for the event name and event categories to save - // space on the output. - utils::StringInterner event_name_interner_; - utils::StringInterner event_category_interner_; - - // This is a map of trusted_sequence_id to InternedData. - // - // The InternedData is allocated directly here and kept for the duration of - // the program. This is necessary in case we detect a packet loss, and we - // would like to re-emit the interned data for that sequence so it can - // continue. - // - // TODO: cap the amount of interned data to a maximum amount. - std::unordered_map> sequence_interned_data_; - - // This is a set of sequence ids where the first packet has already been emitted. - // If a sequence is not in here, the first packet emitted with have - // first_packet_on_sequence = true, previous_packet_dropped = true, and - // sequence_flags = SEQ_INCREMENTAL_STATE_CLEARED - std::unordered_set sequences_with_first_packet_emitted_; + // This includes a single trace session state. It is recreated every time we create a new thread. + std::unique_ptr session_ = nullptr; public: - explicit TraceAggregator(std::string name, std::vector cpu_affinity = {}); + explicit TraceAggregator(std::string name); // No copy no move TraceAggregator(const TraceAggregator&) = delete; @@ -87,11 +67,6 @@ class TraceAggregator { TraceAggregator(TraceAggregator&&) = delete; TraceAggregator& operator=(TraceAggregator&&) = delete; - /** - * @brief Adds a sink. Not real-time safe. - */ - void RegisterSink(std::shared_ptr sink); - /** * @brief Adds a thread tracer. Not real-time safe. * @@ -109,17 +84,14 @@ class TraceAggregator { /** * @brief Starts the trace aggregator background thread */ - void Start(); + void Start(std::shared_ptr sink, std::vector cpu_affinity = {}); /** - * @brief Requests the trace aggregator to stop - */ - void RequestStop() noexcept; - - /** - * @brief Joins the thread + * @brief Stops the trace aggregator and reset it (this waits until the thread is shutdown) + * + * Calling this from multiple threads is likely undefined behavior. */ - void Join() noexcept; + void Stop() noexcept; private: quill::Logger* Logger() noexcept; @@ -127,40 +99,31 @@ class TraceAggregator { void Run(); bool StopRequested() const noexcept; - void SetupCPUAffinityIfNecessary() const; - size_t TryDequeueOnceFromAllTracers(Trace& trace) noexcept; - void WriteTrace(const Trace& trace) noexcept; /** - * Creates the initial process descriptor packet + * Writes a trace into the sink. + * + * Must be called while session is active. */ - Trace CreateProcessDescriptorPacket() const; + void WriteTrace(const Trace& trace) noexcept; /** - * Creates a thread descriptor packet given a thread tracer. + * Adds the track event packet to an existing trace. * - * Must be called while a lock is held. + * Must be called while session is active. Requires locks to be held as it accesses session_. */ - Trace CreateThreadDescriptorPacket(const ThreadTracer& thread_tracer) const; + void AddTrackEventPacketToTraceInternal(Trace& trace, ThreadTracer& thread_tracer, const TrackEventInternal& track_event_internal); /** - * Adds the track event packet to an existing trace. - * - * Must be called while a lock is held. + * Creates the initial process descriptor packet */ - void AddTrackEventPacketToTrace(Trace& trace, ThreadTracer& thread_tracer, const TrackEventInternal& track_event_internal); + Trace CreateProcessDescriptorPacket() const; /** - * Create the initial interned data packet if a new sink joins. - * - * Must be called while a lock is held. - * - * @param initial_timestamp The initial timestamp of the track, must be before - * all other packets about to be written. Commonly this should be the - * timestamp of the sticky packets. + * Creates a thread descriptor packet given a thread tracer. */ - std::optional CreateInitialInternedDataPacket() const; + Trace CreateThreadDescriptorPacket(const ThreadTracer& thread_tracer) const; }; } // namespace cactus_rt::tracing diff --git a/include/cactus_rt/tracing/utils/string_interner.h b/include/cactus_rt/tracing/utils/string_interner.h index e628d9f..9d11fbb 100644 --- a/include/cactus_rt/tracing/utils/string_interner.h +++ b/include/cactus_rt/tracing/utils/string_interner.h @@ -41,6 +41,8 @@ class StringInterner { std::pair GetId(const std::string_view& s); std::pair GetId(const char* s); + void Reset(); + size_t Size() const { return strings_.size(); }; diff --git a/src/cactus_rt/app.cc b/src/cactus_rt/app.cc index 794630e..954d032 100644 --- a/src/cactus_rt/app.cc +++ b/src/cactus_rt/app.cc @@ -4,8 +4,10 @@ #include #include +#include #include +#include "cactus_rt/tracing/trace_aggregator.h" #include "cactus_rt/tracing/tracing_enabled.h" #include "cactus_rt/utils.h" #include "quill/Quill.h" @@ -15,7 +17,7 @@ using FileSink = cactus_rt::tracing::FileSink; namespace cactus_rt { void App::RegisterThread(std::shared_ptr thread) { - thread->SetApp(this); + thread->SetTraceAggregator(trace_aggregator_); threads_.push_back(thread); } @@ -23,7 +25,8 @@ App::App(std::string name, AppConfig config) : name_(name), heap_size_(config.heap_size), logger_config_(config.logger_config), - tracer_config_(config.tracer_config) { + tracer_config_(config.tracer_config), + trace_aggregator_(std::make_shared(name)) { if (logger_config_.default_handlers.empty()) { SetDefaultLogFormat(logger_config_); } @@ -65,7 +68,7 @@ bool App::StartTraceSession(const char* output_filename) noexcept { return false; } - CreateAndStartTraceAggregator(std::make_shared(output_filename)); + trace_aggregator_->Start(std::make_shared(output_filename)); cactus_rt::tracing::EnableTracing(); return true; @@ -76,20 +79,12 @@ bool App::StartTraceSession(std::shared_ptr sink) noexcept { return false; } - CreateAndStartTraceAggregator(sink); + trace_aggregator_->Start(sink); cactus_rt::tracing::EnableTracing(); return true; } -void App::RegisterTraceSink(std::shared_ptr sink) noexcept { - const std::scoped_lock lock(aggregator_mutex_); - - if (trace_aggregator_ != nullptr) { - trace_aggregator_->RegisterSink(sink); - } -} - bool App::StopTraceSession() noexcept { if (!cactus_rt::tracing::IsTracingEnabled()) { return false; @@ -101,28 +96,6 @@ bool App::StopTraceSession() noexcept { return true; } -void App::RegisterThreadTracer(std::shared_ptr thread_tracer) noexcept { - const std::scoped_lock lock(aggregator_mutex_); - - thread_tracers_.push_back(thread_tracer); - - if (trace_aggregator_ != nullptr) { - trace_aggregator_->RegisterThreadTracer(thread_tracer); - } -} - -void App::DeregisterThreadTracer(const std::shared_ptr& thread_tracer) noexcept { - const std::scoped_lock lock(aggregator_mutex_); - - thread_tracers_.remove_if([thread_tracer](const std::shared_ptr& t) { - return t == thread_tracer; - }); - - if (trace_aggregator_ != nullptr) { - trace_aggregator_->DeregisterThreadTracer(thread_tracer); - } -} - void App::LockMemory() const { // See https://lwn.net/Articles/837019/ @@ -189,36 +162,7 @@ void App::StartQuill() { quill::start(); } -void App::CreateAndStartTraceAggregator(std::shared_ptr sink) noexcept { - const std::scoped_lock lock(aggregator_mutex_); - - if (trace_aggregator_ != nullptr) { - // TODO: error here - return; - } - - trace_aggregator_ = std::make_unique(name_, tracer_config_.trace_aggregator_cpu_affinity); - for (auto tracer : thread_tracers_) { - trace_aggregator_->RegisterThreadTracer(tracer); - } - - if (sink != nullptr) { - trace_aggregator_->RegisterSink(sink); - } - - trace_aggregator_->Start(); -} - void App::StopTraceAggregator() noexcept { - const std::scoped_lock lock(aggregator_mutex_); - - if (trace_aggregator_ == nullptr) { - // TODO: error here - return; - } - - trace_aggregator_->RequestStop(); - trace_aggregator_->Join(); - trace_aggregator_ = nullptr; + trace_aggregator_->Stop(); } } // namespace cactus_rt diff --git a/src/cactus_rt/thread.cc b/src/cactus_rt/thread.cc index 48f7482..91af9d7 100644 --- a/src/cactus_rt/thread.cc +++ b/src/cactus_rt/thread.cc @@ -5,10 +5,11 @@ #include #include #include +#include #include -#include "cactus_rt/app.h" #include "cactus_rt/config.h" +#include "cactus_rt/tracing/thread_tracer.h" namespace cactus_rt { @@ -16,9 +17,11 @@ void* Thread::RunThread(void* data) { auto* thread = static_cast(data); thread->config_.scheduler->SetSchedAttr(); + thread->tracer_ = std::make_shared(thread->name_, thread->config_.tracer_config.queue_size); thread->tracer_->SetTid(); - if (thread->app_ != nullptr) { - thread->app_->RegisterThreadTracer(thread->tracer_); + + if (auto trace_aggregator = thread->trace_aggregator_.lock()) { + trace_aggregator->RegisterThreadTracer(thread->tracer_); } else { LOG_WARNING(thread->Logger(), "thread {} does not have app_ and tracing is disabled for this thread. Did you call App::RegisterThread?", thread->name_); } @@ -29,6 +32,9 @@ void* Thread::RunThread(void* data) { thread->Run(); thread->AfterRun(); + thread->tracer_->MarkDone(); + thread->tracer_ = nullptr; + return nullptr; } diff --git a/src/cactus_rt/tracing/trace_aggregator.cc b/src/cactus_rt/tracing/trace_aggregator.cc index 28c9d54..b27b80d 100644 --- a/src/cactus_rt/tracing/trace_aggregator.cc +++ b/src/cactus_rt/tracing/trace_aggregator.cc @@ -1,5 +1,11 @@ #include "cactus_rt/tracing/trace_aggregator.h" +#include + +#include +#include +#include + #ifndef _GNU_SOURCE #define _GNU_SOURCE #endif @@ -11,10 +17,10 @@ #include #include +using cactus_tracing::vendor::perfetto::protos::InternedData; using cactus_tracing::vendor::perfetto::protos::ProcessDescriptor; using cactus_tracing::vendor::perfetto::protos::ThreadDescriptor; using cactus_tracing::vendor::perfetto::protos::Trace; -using cactus_tracing::vendor::perfetto::protos::TracePacket; using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_INCREMENTAL_STATE_CLEARED; using cactus_tracing::vendor::perfetto::protos::TracePacket_SequenceFlags_SEQ_NEEDS_INCREMENTAL_STATE; using cactus_tracing::vendor::perfetto::protos::TrackDescriptor; @@ -24,36 +30,34 @@ using namespace std::chrono_literals; namespace { constexpr size_t kMaxInternedStrings = 10000; -} -namespace cactus_rt::tracing { -TraceAggregator::TraceAggregator(std::string process_name, std::vector cpu_affinity) - : process_name_(process_name), - cpu_affinity_(cpu_affinity), - process_track_uuid_(static_cast(getpid())), - logger_(quill::create_logger("__trace_aggregator__")) { - this->sticky_trace_packets_.push_back(CreateProcessDescriptorPacket()); -} +// TODO: refactor this elsewhere so it is usable everywhere. +void SetupCPUAffinityIfNecessary(const std::vector& cpu_affinity) { + if (cpu_affinity.empty()) { + return; + } -void TraceAggregator::RegisterSink(std::shared_ptr sink) { - // RegisterSink is mutually exclusive with RegisterThreadTracer and writing - // metric data from queues to the sinks. This is because we need to ensure the - // first trace packets on any new sink must be the track descriptors for all - // known tracks. We also need to ensure that writing to sinks only happen on - // one thread with mutual exclusion to avoid data race. - const std::scoped_lock lock(mutex_); - for (const auto& trace : this->sticky_trace_packets_) { - // TODO: deal with errors - sink->Write(trace); + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + for (auto cpu : cpu_affinity) { + CPU_SET(cpu, &cpuset); } - auto interned_data_trace = CreateInitialInternedDataPacket(); - if (interned_data_trace) { - // TODO: deal with errors - sink->Write(*interned_data_trace); + const int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + if (ret == 0) { + return; } - sinks_.push_back(sink); + throw std::runtime_error{std::string("cannot set affinity for trace aggregator: ") + std::strerror(errno)}; +} + +} // namespace + +namespace cactus_rt::tracing { +TraceAggregator::TraceAggregator(std::string process_name) + : process_name_(process_name), + process_track_uuid_(static_cast(getpid())), + logger_(quill::create_logger("__trace_aggregator__")) { } void TraceAggregator::RegisterThreadTracer(std::shared_ptr tracer) { @@ -69,18 +73,12 @@ void TraceAggregator::RegisterThreadTracer(std::shared_ptr tracer) // writing to sinks here, only a single thread may do it at a time thus also // requiring the lock. const std::scoped_lock lock(mutex_); - tracers_.push_back(tracer); - auto trace = CreateThreadDescriptorPacket(*tracer); - for (auto& sink : sinks_) { - // TODO: deal with errors - sink->Write(trace); + if (session_ != nullptr) { + auto trace = CreateThreadDescriptorPacket(*tracer); + session_->sink->Write(trace); } - - // Move the trace packet to the sticky packets so newly registered sinks gets - // this packet when they get registered. - this->sticky_trace_packets_.push_back(std::move(trace)); } void TraceAggregator::DeregisterThreadTracer(const std::shared_ptr& tracer) { @@ -91,18 +89,44 @@ void TraceAggregator::DeregisterThreadTracer(const std::shared_ptr }); } -void TraceAggregator::Start() { - // TODO: CPU affinity! - std::thread thread{&TraceAggregator::Run, this}; - thread_.swap(thread); -} +void TraceAggregator::Start(std::shared_ptr sink, std::vector cpu_affinity) { + const std::scoped_lock lock(mutex_); + + if (session_ == nullptr) { + session_ = std::make_unique(sink, cpu_affinity); + sink->Write(CreateProcessDescriptorPacket()); + + for (const auto& tracer : tracers_) { + sink->Write(CreateThreadDescriptorPacket(*tracer)); + } -void TraceAggregator::RequestStop() noexcept { - stop_requested_.store(true, std::memory_order_relaxed); + std::thread thread{&TraceAggregator::Run, this}; + session_->thread.swap(thread); + } } -void TraceAggregator::Join() noexcept { - thread_.join(); +void TraceAggregator::Stop() noexcept { + // We need to manually lock/unlock as we cannot hold the lock while joining... + mutex_.lock(); + + if (session_ != nullptr) { + session_->stop_requested.store(true, std::memory_order_relaxed); + mutex_.unlock(); + // Technically there's a data race on session_. But if we hold the lock while joining it can lead to dead lock. + // TODO: fix this issue somehow? + session_->thread.join(); + mutex_.lock(); + session_ = nullptr; // Delete it to reset the session state! + + // Technically, the TraceAggregator also owns the event_name_interner_ for all tracers. + // TODO: maybe move the interner into the TraceAggregator instead of having it on the ThreadTracer for simplicity. + for (const auto& tracer : tracers_) { + tracer->event_name_interner_.Reset(); + tracer->event_category_interner_.Reset(); + } + } + + mutex_.unlock(); } quill::Logger* TraceAggregator::Logger() noexcept { @@ -110,9 +134,9 @@ quill::Logger* TraceAggregator::Logger() noexcept { } void TraceAggregator::Run() { - SetupCPUAffinityIfNecessary(); + ::SetupCPUAffinityIfNecessary(session_->cpu_affinity); - while (!StopRequested()) { + while (!session_->stop_requested.load(std::memory_order_relaxed)) { Trace trace; auto num_events = TryDequeueOnceFromAllTracers(trace); @@ -156,29 +180,6 @@ void TraceAggregator::Run() { } } -bool TraceAggregator::StopRequested() const noexcept { - return stop_requested_.load(std::memory_order_relaxed); -} - -void TraceAggregator::SetupCPUAffinityIfNecessary() const { - if (cpu_affinity_.empty()) { - return; - } - - cpu_set_t cpuset; - CPU_ZERO(&cpuset); - for (auto cpu : cpu_affinity_) { - CPU_SET(cpu, &cpuset); - } - - const int ret = pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); - if (ret == 0) { - return; - } - - throw std::runtime_error{std::string("cannot set affinity for trace aggregator: ") + std::strerror(errno)}; -} - size_t TraceAggregator::TryDequeueOnceFromAllTracers(Trace& trace) noexcept { // This lock is needed because we are accessing the tracer_, which can race // with other threads that are registering a thread tracer. This is NOT for @@ -186,10 +187,15 @@ size_t TraceAggregator::TryDequeueOnceFromAllTracers(Trace& trace) noexcept { const std::scoped_lock lock(mutex_); size_t num_events = 0; + std::vector done_tracers; + for (auto& tracer : tracers_) { TrackEventInternal event; if (!tracer->queue_.try_dequeue(event)) { // No event in this queue. + if (tracer->IsDone()) { + done_tracers.push_back(tracer.get()); + } continue; } @@ -197,7 +203,19 @@ size_t TraceAggregator::TryDequeueOnceFromAllTracers(Trace& trace) noexcept { // errors/full queue problems. num_events++; - AddTrackEventPacketToTrace(trace, *tracer, event); + AddTrackEventPacketToTraceInternal(trace, *tracer, event); + } + + if (!done_tracers.empty()) { + tracers_.remove_if([&done_tracers](const std::shared_ptr& tracer) { + for (const auto* done_tracer : done_tracers) { + if (tracer.get() == done_tracer) { + return true; + } + } + + return false; + }); } return num_events; @@ -210,10 +228,8 @@ void TraceAggregator::WriteTrace(const Trace& trace) noexcept { const std::scoped_lock lock(mutex_); // TODO: better handle error by maybe emitting an error signal and calling an error callback? - for (auto& sink : sinks_) { - if (!sink->Write(trace)) { - LOG_WARNING_LIMIT(std::chrono::milliseconds(5000), Logger(), "failed to write trace data to sink, data may be corrupted"); - } + if (!session_->sink->Write(trace)) { + LOG_WARNING_LIMIT(std::chrono::milliseconds(5000), Logger(), "failed to write trace data to sink, data may be corrupted"); } } @@ -258,7 +274,7 @@ Trace TraceAggregator::CreateThreadDescriptorPacket(const ThreadTracer& thread_t return trace; } -void TraceAggregator::AddTrackEventPacketToTrace( +void TraceAggregator::AddTrackEventPacketToTraceInternal( Trace& trace, ThreadTracer& thread_tracer, const TrackEventInternal& track_event_internal @@ -344,8 +360,8 @@ void TraceAggregator::AddTrackEventPacketToTrace( packet->set_trusted_packet_sequence_id(thread_tracer.trusted_packet_sequence_id_); // Deal with "first packet" - if (sequences_with_first_packet_emitted_.count(thread_tracer.trusted_packet_sequence_id_) == 0) { - sequences_with_first_packet_emitted_.insert(thread_tracer.trusted_packet_sequence_id_); + if (session_->sequences_with_first_packet_emitted.count(thread_tracer.trusted_packet_sequence_id_) == 0) { + session_->sequences_with_first_packet_emitted.insert(thread_tracer.trusted_packet_sequence_id_); packet->set_first_packet_on_sequence(true); packet->set_previous_packet_dropped(true); @@ -364,50 +380,4 @@ void TraceAggregator::AddTrackEventPacketToTrace( // NOLINTEND(clang-analyzer-cplusplus.NewDeleteLeaks) } -std::optional TraceAggregator::CreateInitialInternedDataPacket() const { - Trace trace; - - bool wrote_interned_data = false; - - for (const auto& tracer : this->tracers_) { - InternedData* interned_data = nullptr; - for (const auto& [name, name_iid] : tracer->event_name_interner_.Ids()) { - if (interned_data == nullptr) { - interned_data = new InternedData(); - } - - auto* event_name = interned_data->add_event_names(); - event_name->set_name(name.data()); - event_name->set_iid(name_iid); - } - - for (const auto& [category, category_iid] : tracer->event_category_interner_.Ids()) { - if (interned_data == nullptr) { - interned_data = new InternedData(); - } - - auto* event_category = interned_data->add_event_categories(); - event_category->set_name(category.data()); - event_category->set_iid(category_iid); - } - - if (interned_data != nullptr) { - wrote_interned_data = true; - TracePacket* packet = trace.add_packet(); - - // TODO: is it okay to not have a timestamp? - // packet->set_timestamp(initial_timestamp); - - packet->set_allocated_interned_data(interned_data); - packet->set_trusted_packet_sequence_id(tracer->trusted_packet_sequence_id_); - } - } - - if (wrote_interned_data) { - return trace; - } - - return std::nullopt; -} - } // namespace cactus_rt::tracing diff --git a/src/cactus_rt/tracing/utils/string_interner.cc b/src/cactus_rt/tracing/utils/string_interner.cc index 7411bf1..c74bdfd 100644 --- a/src/cactus_rt/tracing/utils/string_interner.cc +++ b/src/cactus_rt/tracing/utils/string_interner.cc @@ -16,4 +16,10 @@ std::pair StringInterner::GetId(const std::string_view& s) { std::pair StringInterner::GetId(const char* s) { return GetId(std::string_view{s}); } + +void StringInterner::Reset() { + current_id_ = 0; + ids_ = std::unordered_map(); + strings_ = std::list(); +} } // namespace cactus_rt::tracing::utils diff --git a/tests/tracing/single_threaded_test.cc b/tests/tracing/single_threaded_test.cc index c9976b0..baf1f4a 100644 --- a/tests/tracing/single_threaded_test.cc +++ b/tests/tracing/single_threaded_test.cc @@ -302,15 +302,22 @@ TEST_F(SingleThreadTracingTest, RestartTracingStartsNewSession) { app_.StartTraceSession(sink_); regular_thread_->RunOneIteration([](MockRegularThread* self) { - auto span = self->TracerForTest().WithSpan("Event3"); - WasteTime(std::chrono::microseconds(1000)); + { + auto span = self->TracerForTest().WithSpan("Event3"); + WasteTime(std::chrono::microseconds(1000)); + } + + { + auto span = self->TracerForTest().WithSpan("Event1"); + WasteTime(std::chrono::microseconds(1000)); + } }); app_.StopTraceSession(); auto traces2 = sink_->LoggedTraces(); auto packets2 = GetPacketsFromTraces(traces2); - ASSERT_EQ(packets2.size(), 5); + ASSERT_EQ(packets2.size(), 6); AssertIsProcessTrackDescriptor(*packets2[0], kAppName); const auto process_track_uuid = packets2[0]->track_descriptor().uuid(); @@ -318,91 +325,41 @@ TEST_F(SingleThreadTracingTest, RestartTracingStartsNewSession) { AssertIsThreadTrackDescriptor(*packets2[1], kRegularThreadName, process_track_uuid); auto thread_track_uuid = packets2[1]->track_descriptor().uuid(); - // Event1 is emitted as interned data because that thread is still active and the event name got interned previously. - auto event_names = GetInternedEventNames(*packets2[2]); - ASSERT_EQ(event_names.size(), 1); - - auto event1_name_iid = event_names.at("Event1"); - ASSERT_GT(event1_name_iid, 0); - - auto event1_thread_sequence_id2 = packets2[2]->trusted_packet_sequence_id(); - - ASSERT_EQ(event1_thread_sequence_id1, event1_thread_sequence_id2); + AssertIsTrackEventSliceBegin(*packets2[2], thread_track_uuid); + // Since we started a new trace session, the interned data is reset. So the first event is only expected to have Event3 on it. // Note Event2 is lost as designed - AssertIsTrackEventSliceBegin(*packets2[3], thread_track_uuid); - auto sequence_id = packets2[3]->trusted_packet_sequence_id(); - - ASSERT_EQ(sequence_id, event1_thread_sequence_id2); - - event_names = GetInternedEventNames(*packets2[3]); + auto event_names = GetInternedEventNames(*packets2[2]); ASSERT_EQ(event_names.size(), 1); - const auto event3_name_iid = event_names.at("Event3"); + auto event3_name_iid = event_names.at("Event3"); ASSERT_GT(event3_name_iid, 0); - AssertTrackEventHasIid(*packets2[3], event3_name_iid, 0); - - AssertIsTrackEventSliceEnd(*packets2[4], thread_track_uuid, sequence_id); - - AssertTrackEventDuration(*packets2[3], *packets2[4], 1000000, 10000000); -} - -TEST_F(SingleThreadTracingTest, DynamicallyAddingSinkWillWork) { - regular_thread_->RunOneIteration([](MockRegularThread* self) { - const auto span = self->TracerForTest().WithSpan("Event1"); - WasteTime(std::chrono::microseconds(1000)); - }); + auto sequence_id = packets2[2]->trusted_packet_sequence_id(); - // This is kind of a hack to ensure the data from the previous only made it to - // sink_. If we don't wait for a bit, there's a race condition where sink2 - // could get these data. - // Unfortunately this has to be implemented via a sleep. This is not idea but - // it is the best option for now. - std::this_thread::sleep_for(std::chrono::milliseconds(200)); + ASSERT_EQ(event1_thread_sequence_id1, sequence_id); - auto sink2 = std::make_shared(); - app_.RegisterTraceSink(sink2); + AssertTrackEventHasIid(*packets2[2], event3_name_iid, 0); - regular_thread_->RunOneIteration([](MockRegularThread* self) { - self->TracerForTest().InstantEvent("Event2"); - }); + AssertIsTrackEventSliceEnd(*packets2[3], thread_track_uuid, sequence_id); - app_.StopTraceSession(); - auto traces2 = sink2->LoggedTraces(); - auto packets2 = GetPacketsFromTraces(traces2); + AssertTrackEventDuration(*packets2[2], *packets2[3], 1000000, 10000000); - ASSERT_EQ(packets2.size(), 4); + // Now let's check if Event1 will be properly written out again. - AssertIsProcessTrackDescriptor(*packets2[0], kAppName); - const auto process_track_uuid = packets2[0]->track_descriptor().uuid(); - - AssertIsThreadTrackDescriptor(*packets2[1], kRegularThreadName, process_track_uuid); - auto thread_track_uuid = packets2[1]->track_descriptor().uuid(); + AssertIsTrackEventSliceBegin(*packets2[4], thread_track_uuid); - auto event_names = GetInternedEventNames(*packets2[2]); + event_names = GetInternedEventNames(*packets2[4]); ASSERT_EQ(event_names.size(), 1); - const auto event1_name_iid = event_names.at("Event1"); + auto event1_name_iid = event_names.at("Event1"); ASSERT_GT(event1_name_iid, 0); - auto sequence_id = packets2[2]->trusted_packet_sequence_id(); - - AssertIsTrackEventInstant(*packets2[3], thread_track_uuid, sequence_id); - - event_names = GetInternedEventNames(*packets2[3]); - ASSERT_EQ(event_names.size(), 1); - - const auto event2_name_iid = event_names.at("Event2"); - ASSERT_GT(event2_name_iid, 0); - ASSERT_NE(event2_name_iid, event1_name_iid); - - AssertTrackEventHasIid(*packets2[3], event2_name_iid, 0); + AssertTrackEventHasIid(*packets2[4], event1_name_iid, 0); - auto traces = sink_->LoggedTraces(); - auto packets = GetPacketsFromTraces(traces); + AssertIsTrackEventSliceEnd(*packets2[5], thread_track_uuid, sequence_id); - ASSERT_EQ(packets.size(), 5); + AssertTrackEventDuration(*packets2[4], *packets2[5], 1000000, 10000000); } TEST_F(SingleThreadTracingTest, QueueOverflowWillNotBlock) {