diff --git a/src/main.cpp b/src/main.cpp index e9cf209..e2c5a25 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -115,7 +115,6 @@ void CleanSharedMemory(int s) { } int main(int argc, char** argv) { - CmdFetcher_Init(&cmdfetcher); FemArray_Clear(&femarray); EventBuilder_Clear(&eventbuilder); @@ -126,11 +125,13 @@ int main(int argc, char** argv) { std::string output_file; int verbose_level = -1; std::string output_directory; - std::string root_compression_algorithm = "LZMA"; bool version_flag = false; + bool fast_compression = false; + bool disable_aqs = false; CLI::App app{"feminos-daq"}; + app.add_flag("--version", version_flag, "Print the version"); app.add_option("-s,--server", server_ip, "Base IP address of remote server(s) in dotted decimal") ->group("Connection Options") ->check(CLI::ValidIPV4); @@ -155,10 +156,8 @@ int main(int argc, char** argv) { app.add_flag("--read-only", readOnly, "Read-only mode") ->group("General"); app.add_flag("--shared-buffer", sharedBuffer, "Store event data in a shared memory buffer")->group("General"); - app.add_option("--root-compression-algorithm", root_compression_algorithm, "Root compression algorithm. Use LZMA (default) for best compression or LZ4 for speed when the event rate very high") - ->group("File Options") - ->check(CLI::IsMember({"ZLIB", "LZMA", "LZ4"})); - app.add_flag("--version", version_flag, "Print the version"); + app.add_flag("--fast-compression", fast_compression, "Disable maximum compression in output file to improve performance. This should only be enabled when the event rate is so high that the default compression cannot keep up. This will increase output file size")->group("File Options"); + app.add_flag("--disable-aqs", disable_aqs, "Do not store data in aqs format. NOTE: aqs files may be created anyways but they will not have data")->group("File Options"); CLI11_PARSE(app, argc, argv); @@ -179,7 +178,8 @@ int main(int argc, char** argv) { auto& storage_manager = feminos_daq_storage::StorageManager::Instance(); storage_manager.SetOutputDirectory(output_directory); - storage_manager.compression_algorithm = root_compression_algorithm; + storage_manager.fast_compression = fast_compression; + storage_manager.disable_aqs = disable_aqs; stringIpToArray(server_ip, femarray.rem_ip_beg); stringIpToArray(local_ip, femarray.loc_ip); @@ -272,7 +272,6 @@ int main(int argc, char** argv) { printf("Thread_Create failed %d\n", err); goto cleanup; } - // printf("femarray Thread_Create done\n" ); // Create Event Builder thread eventbuilder.thread.routine = reinterpret_cast(EventBuilder_Loop); @@ -282,7 +281,6 @@ int main(int argc, char** argv) { printf("Thread_Create failed %d\n", err); goto cleanup; } - // printf("eventbuilder Thread_Create done\n" ); // Run the main loop of the command interpreter CmdFetcher_Main(&cmdfetcher); diff --git a/src/mclient/evbuilder.cpp b/src/mclient/evbuilder.cpp index f4142bc..c7d61c0 100644 --- a/src/mclient/evbuilder.cpp +++ b/src/mclient/evbuilder.cpp @@ -37,6 +37,7 @@ and timestamps depending on event builder mode #include #include #include +#include #include "prometheus.h" #include "storage.h" @@ -403,169 +404,6 @@ int EventBuilder_CheckBuffer(EventBuilder* eb, int src, return (err); } -void ReadFrame(void* fr, int fr_sz, feminos_daq_storage::Event& event) { - unsigned short* p; - int done = 0; - unsigned short r0, r1, r2; - unsigned short n0, n1; - unsigned short cardNumber, chipNumber, daqChannel; - unsigned int tmp; - int tmp_i[10]; - int si; - - p = (unsigned short*) fr; - - done = 0; - si = 0; - - unsigned int signal_id = 0; - std::array signal_data = {}; - - while (!done) { - // Is it a prefix for 14-bit content? - if ((*p & PFX_14_BIT_CONTENT_MASK) == PFX_CARD_CHIP_CHAN_HIT_IX) { - // if (sgnl.GetSignalID() >= 0 && sgnl.GetNumberOfPoints() >= fMinPoints) { fSignalEvent->AddSignal(sgnl); } - - if (si > 0) { - event.add_signal(signal_id, signal_data); - } - - cardNumber = GET_CARD_IX(*p); - chipNumber = GET_CHIP_IX(*p); - daqChannel = GET_CHAN_IX(*p); - - if (daqChannel >= 0) { - daqChannel += cardNumber * 4 * 72 + chipNumber * 72; - } - - p++; - si = 0; - - signal_id = daqChannel; - } - // Is it a prefix for 12-bit content? - else if ((*p & PFX_12_BIT_CONTENT_MASK) == PFX_ADC_SAMPLE) { - r0 = GET_ADC_DATA(*p); - - signal_data[si] = r0; - - p++; - si++; - } - // Is it a prefix for 4-bit content? - else if ((*p & PFX_4_BIT_CONTENT_MASK) == PFX_START_OF_EVENT) { - // cout << " + Start of event" << endl; - r0 = GET_EVENT_TYPE(*p); - p++; - - // Time Stamp lower 16-bit - r0 = *p; - p++; - - // Time Stamp middle 16-bit - r1 = *p; - p++; - - // Time Stamp upper 16-bit - r2 = *p; - p++; - - // Set timestamp and event ID - - // Event Count lower 16-bit - n0 = *p; - p++; - - // Event Count upper 16-bit - n1 = *p; - p++; - - tmp = (((unsigned int) n1) << 16) | ((unsigned int) n0); - - // auto time = 0 + (2147483648 * r2 + 32768 * r1 + r0) * 2e-8; - - // milliseconds unix time - - if (event.timestamp == 0) { - auto milliseconds = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); - event.timestamp = milliseconds; - } - - // cout << " - Time: " << time << endl; - // Some times the end of the frame contains the header of the next event. - // Then, in the attempt to read the header of next event, we must avoid - // that it overwrites the already assigned id. In that case (id != 0), we - // do nothing, and we store the values at fLastXX variables, that we will - // use that for next event. - - /* - if (fSignalEvent->GetID() == 0) { - if (fLastEventId == 0) { - fSignalEvent->SetID(tmp); - fSignalEvent->SetTime(tStart + (2147483648 * r2 + 32768 * r1 + r0) * 2e-8); - } else { - fSignalEvent->SetID(fLastEventId); - fSignalEvent->SetTime(fLastTimeStamp); - } - } - - fLastEventId = tmp; - fLastTimeStamp = tStart + (2147483648 * r2 + 32768 * r1 + r0) * 2e-8; - - // If it is the first event we use it to define the run start time - if (fCounter == 0) { - fRunInfo->SetStartTimeStamp(fLastTimeStamp); - fCounter++; - } else { - // and we keep updating the end run time - fRunInfo->SetEndTimeStamp(fLastTimeStamp); - } - */ - // fSignalEvent->SetRunOrigin(fRunOrigin); - // fSignalEvent->SetSubRunOrigin(fSubRunOrigin); - } else if ((*p & PFX_4_BIT_CONTENT_MASK) == PFX_END_OF_EVENT) { - tmp = ((unsigned int) GET_EOE_SIZE(*p)) << 16; - p++; - tmp = tmp + (unsigned int) *p; - p++; - - // if (fElectronicsType == "SingleFeminos") endOfEvent = true; - // cout << " - End of event" << endl; - } - - // Is it a prefix for 0-bit content? - else if ((*p & PFX_0_BIT_CONTENT_MASK) == PFX_END_OF_FRAME) { - // if (sgnl.GetSignalID() >= 0 && sgnl.GetNumberOfPoints() >= fMinPoints) { fSignalEvent->AddSignal(sgnl); } - if (si > 0) { - event.add_signal(signal_id, signal_data); - } - - p++; - done = 1; - } else if (*p == PFX_START_OF_BUILT_EVENT) { - p++; - } else if (*p == PFX_END_OF_BUILT_EVENT) { - p++; - } else if (*p == PFX_SOBE_SIZE) { - // Skip header - p++; - - // Built Event Size lower 16-bit - r0 = *p; - p++; - // Built Event Size upper 16-bit - r1 = *p; - p++; - tmp_i[0] = (int) ((r1 << 16) | (r0)); - } else { - p++; - } - } -} - -/******************************************************************************* - EventBuilder_ProcessBuffer -*******************************************************************************/ int EventBuilder_ProcessBuffer(EventBuilder* eb, void* bu) { int err = 0; unsigned short* bu_s; @@ -589,27 +427,15 @@ int EventBuilder_ProcessBuffer(EventBuilder* eb, void* bu) { if (sharedBuffer) { SemaphoreRed(SemaphoreId); - // printf( "Event time BEFORE : %lf\n", - // ShMem_DaqInfo->timeStamp ); Frame_ToSharedMemory((void*) stdout, (void*) bu_s, (int) sz, 0x0, ShMem_DaqInfo, ShMem_Buffer, timeStart, tcm); - // cout << " -- frame to shared memory" << endl; - - // printf( "TIME START : %d\n", timeStart ); - // printf( "Event time : %lf\n", - // ShMem_DaqInfo->timeStamp ); - // ShMem_DaqInfo->timeStamp = (double) - // timeStart + ShMem_DaqInfo->timeStamp; printf( - // "Event time added : %lf\n", - // ShMem_DaqInfo->timeStamp ); printf( "-----\n"); - SemaphoreGreen(SemaphoreId); } // Save data to file - if (!readOnly && eb->savedata) { + if (!readOnly && eb->savedata && !feminos_daq_storage::StorageManager::Instance().disable_aqs) { // Should we close the current file and open a new one? if ((eb->byte_wr + sz) > eb->file_max_size) { if ((err = EventBuilder_FileAction( @@ -638,14 +464,19 @@ int EventBuilder_ProcessBuffer(EventBuilder* eb, void* bu) { } eb->byte_wr += sz; - // printf("EventBuilder_ProcessBuffer: wrote %d - // bytes to file\n", sz); } auto& storage_manager = feminos_daq_storage::StorageManager::Instance(); if (storage_manager.IsInitialized()) { - ReadFrame((void*) bu_s, (int) sz, storage_manager.event); + // ReadFrame((void*) bu_s, (int) sz, storage_manager.event); + + // Insert frame + std::vector data; + data.reserve(sz); // Reserve space to avoid reallocations + std::copy(bu_s, bu_s + sz, std::back_inserter(data)); + + storage_manager.AddFrame(data); } return (err); @@ -902,27 +733,16 @@ int EventBuilder_Loop(EventBuilder* eb) { } else { eb->had_sobe = 0; - auto& prometheus_manager = feminos_daq_prometheus::PrometheusManager::Instance(); auto& storage_manager = feminos_daq_storage::StorageManager::Instance(); if (storage_manager.IsInitialized()) { + // Send a special frame signaling the end of a built event + storage_manager.AddFrame({0}); + if (storage_manager.GetNumberOfEntries() == 0) { storage_manager.millisSinceEpochForSpeedCalculation = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); } - - storage_manager.event.id = storage_manager.event_tree->GetEntries(); - - prometheus_manager.SetNumberOfSignalsInEvent(storage_manager.event.size()); - prometheus_manager.SetNumberOfEvents(storage_manager.event_tree->GetEntries()); - - storage_manager.event_tree->Fill(); - - storage_manager.Checkpoint(); - - prometheus_manager.UpdateOutputRootFileSize(); - - storage_manager.Clear(); } } @@ -1094,7 +914,7 @@ int EventBuilder_GetBufferToRecycle(EventBuilder* eb, int EventBuilder_FileAction(EventBuilder* eb, EBFileActions action, int format) { - if (readOnly) return 0; + if (readOnly) { return 0; } struct tm* now; char name[120]; @@ -1154,7 +974,7 @@ int EventBuilder_FileAction(EventBuilder* eb, } // Close the current file else if (action == EBFA_CloseCurrentOpenNext) { - if (eb->fout == 0) { + if (eb->fout == nullptr) { } else { fflush(eb->fout); fclose(eb->fout); diff --git a/src/mclient/femarray.cpp b/src/mclient/femarray.cpp index b6b5966..63ea80b 100644 --- a/src/mclient/femarray.cpp +++ b/src/mclient/femarray.cpp @@ -21,7 +21,7 @@ the sequence number is also cleared at the other end - New scheme: on the first daq command, the sequence number becomes 0 after it is incremented, and the first daq command does not contain the sequence - number so that it gets cleared at the other end. Sub-sequend daq requests + number so that it gets cleared at the other end. Subsequent daq requests have a sequence number starting from 0x00 and incrementing by one unit until wrap around. The problem with the initial scheme is that after the last daq request is @@ -313,17 +313,28 @@ int FemArray_SendDaq(FemArray* fa, unsigned int fem_beg, unsigned int fem_end, u const auto speed_events_per_second = feminos_daq_storage::StorageManager::Instance().GetSpeedEventsPerSecond(); const auto number_of_events = feminos_daq_storage::StorageManager::Instance().GetNumberOfEntries(); + auto& storageManager = feminos_daq_storage::StorageManager::Instance(); + const auto queueUsage = storageManager.GetQueueUsage(); + time_t now_time = time(nullptr); tm* now_tm = gmtime(&now_time); char time_str[80]; strftime(time_str, 80, "[%Y-%m-%dT%H:%M:%SZ]", now_tm); - cout << time_str << " | # Entries: " << number_of_events << " | 🏃 Speed: " << speed_events_per_second << " entries/s (" << daq_speed << " MB/s)" << endl; + string q_fill_string; + if (queueUsage > 0.05) { + std::stringstream ss; + ss << std::fixed << std::setprecision(1) << queueUsage * 100.0; + q_fill_string = " | ⚠\uFE0F Queue at " + ss.str() + "% Capacity ⚠\uFE0F"; + } + + cout << time_str << " | # Entries: " << number_of_events << " | 🏃 Speed: " << speed_events_per_second << " entry/s (" << daq_speed << " MB/s)" << q_fill_string << endl; - auto& manager = feminos_daq_prometheus::PrometheusManager::Instance(); + auto& prometheus_manager = feminos_daq_prometheus::PrometheusManager::Instance(); - manager.SetDaqSpeedMB(daq_speed); - manager.SetDaqSpeedEvents(speed_events_per_second); + prometheus_manager.SetDaqSpeedMB(daq_speed); + prometheus_manager.SetDaqSpeedEvents(speed_events_per_second); + prometheus_manager.SetFrameQueueFillLevel(queueUsage); // Update the new time and size of received data fa->daq_last_time = now; diff --git a/src/prometheus/prometheus.cpp b/src/prometheus/prometheus.cpp index 1bcd760..bc33a65 100644 --- a/src/prometheus/prometheus.cpp +++ b/src/prometheus/prometheus.cpp @@ -86,6 +86,26 @@ feminos_daq_prometheus::PrometheusManager::PrometheusManager() { {0.99, 0.02}, }); + daq_frames_queue_fill_level_now = &BuildGauge() + .Name("daq_frames_queue_fill_level_now") + .Help("DAQ frames queue fill level (0 - empty - good, 1 - full - bad)") + .Register(*registry) + .Add({}); + + daq_frames_queue_fill_level = &BuildSummary() + .Name("daq_frames_queue_fill_level") + .Help("DAQ frames queue fill level (0 - empty - good, 1 - full - bad)") + .Register(*registry) + .Add({}, Summary::Quantiles{ + {0.01, 0.02}, + {0.1, 0.02}, + {0.25, 0.02}, + {0.5, 0.02}, + {0.75, 0.02}, + {0.9, 0.02}, + {0.99, 0.02}, + }); + run_number = &BuildGauge() .Name("run_number") .Help("Run number") @@ -208,6 +228,16 @@ void feminos_daq_prometheus::PrometheusManager::UpdateOutputRootFileSize() { } } +void feminos_daq_prometheus::PrometheusManager::SetFrameQueueFillLevel(double fill_level) { + if (daq_frames_queue_fill_level_now) { + daq_frames_queue_fill_level_now->Set(fill_level); + } + + if (daq_frames_queue_fill_level) { + daq_frames_queue_fill_level->Observe(fill_level); + } +} + double feminos_daq_prometheus::GetFreeDiskSpaceGigabytes(const std::string& path) { std::error_code ec; std::filesystem::space_info space = std::filesystem::space(path, ec); diff --git a/src/prometheus/prometheus.h b/src/prometheus/prometheus.h index 6afc0d5..7dec5d4 100644 --- a/src/prometheus/prometheus.h +++ b/src/prometheus/prometheus.h @@ -37,6 +37,8 @@ class PrometheusManager { PrometheusManager& operator=(const PrometheusManager&) = delete; + void SetFrameQueueFillLevel(double fill_level); + void SetDaqSpeedMB(double speed); void SetDaqSpeedEvents(double speed); @@ -64,13 +66,20 @@ class PrometheusManager { Gauge* uptime_seconds = nullptr; Gauge* number_of_events = nullptr; + Gauge* run_number = nullptr; + Gauge* daq_speed_mb_per_s_now = nullptr; + Summary* daq_speed_mb_per_s = nullptr; + Gauge* daq_speed_events_per_s_now = nullptr; - Gauge* run_number = nullptr; + Summary* daq_speed_events_per_s = nullptr; + + Gauge* daq_frames_queue_fill_level_now = nullptr; + Summary* daq_frames_queue_fill_level = nullptr; + Gauge* number_of_signals_in_last_event = nullptr; Summary* number_of_signals_in_event = nullptr; - Summary* daq_speed_mb_per_s = nullptr; - Summary* daq_speed_events_per_s = nullptr; + Gauge* output_root_file_size = nullptr; }; } // namespace feminos_daq_prometheus diff --git a/src/root/storage.cpp b/src/root/storage.cpp index c54cb1a..fa6530d 100644 --- a/src/root/storage.cpp +++ b/src/root/storage.cpp @@ -1,7 +1,9 @@ #include "storage.h" +#include "frame.h" #include "prometheus.h" #include +#include using namespace std; using namespace feminos_daq_storage; @@ -28,6 +30,142 @@ double StorageManager::GetSpeedEventsPerSecond() const { return 1000.0 * GetNumberOfEntries() / millis; } +bool ReadFrame(const std::vector& frame_data, feminos_daq_storage::Event& event) { + unsigned short r0, r1, r2; + unsigned short n0, n1; + unsigned short cardNumber, chipNumber, daqChannel; + unsigned int tmp; + int tmp_i[10]; + int si = 0; + + auto p = const_cast(frame_data.data()); + auto start = p; + + bool end_of_event = false; + bool done = false; + + unsigned int signal_id = 0; + std::array signal_data = {}; + + while (!done) { + // Is it a prefix for 14-bit content? + if ((*p & PFX_14_BIT_CONTENT_MASK) == PFX_CARD_CHIP_CHAN_HIT_IX) { + // if (sgnl.GetSignalID() >= 0 && sgnl.GetNumberOfPoints() >= fMinPoints) { fSignalEvent->AddSignal(sgnl); } + + if (si > 0) { + event.add_signal(signal_id, signal_data); + } + + cardNumber = GET_CARD_IX(*p); + chipNumber = GET_CHIP_IX(*p); + daqChannel = GET_CHAN_IX(*p); + + if (daqChannel >= 0) { + daqChannel += cardNumber * 4 * 72 + chipNumber * 72; + } + + p++; + si = 0; + + signal_id = daqChannel; + } + // Is it a prefix for 12-bit content? + else if ((*p & PFX_12_BIT_CONTENT_MASK) == PFX_ADC_SAMPLE) { + r0 = GET_ADC_DATA(*p); + + signal_data[si] = r0; + + p++; + si++; + } + // Is it a prefix for 4-bit content? + else if ((*p & PFX_4_BIT_CONTENT_MASK) == PFX_START_OF_EVENT) { + // cout << " + Start of event" << endl; + r0 = GET_EVENT_TYPE(*p); + p++; + + // Time Stamp lower 16-bit + r0 = *p; + p++; + + // Time Stamp middle 16-bit + r1 = *p; + p++; + + // Time Stamp upper 16-bit + r2 = *p; + p++; + + // Set timestamp and event ID + + // Event Count lower 16-bit + n0 = *p; + p++; + + // Event Count upper 16-bit + n1 = *p; + p++; + + tmp = (((unsigned int) n1) << 16) | ((unsigned int) n0); + + // auto time = 0 + (2147483648 * r2 + 32768 * r1 + r0) * 2e-8; + + // milliseconds unix time + + if (event.timestamp == 0) { + auto milliseconds = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + event.timestamp = milliseconds; + } + + } else if ((*p & PFX_4_BIT_CONTENT_MASK) == PFX_END_OF_EVENT) { + tmp = ((unsigned int) GET_EOE_SIZE(*p)) << 16; + p++; + tmp = tmp + (unsigned int) *p; + p++; + + // if (fElectronicsType == "SingleFeminos") endOfEvent = true; + } + + // Is it a prefix for 0-bit content? + else if ((*p & PFX_0_BIT_CONTENT_MASK) == PFX_END_OF_FRAME) { + // if (sgnl.GetSignalID() >= 0 && sgnl.GetNumberOfPoints() >= fMinPoints) { fSignalEvent->AddSignal(sgnl); } + if (si > 0) { + event.add_signal(signal_id, signal_data); + } + + done = true; + + if (end_of_event) { + cout << "End of frame fount at " << p - start << endl; + } + + p++; + + } else if (*p == PFX_START_OF_BUILT_EVENT) { + cout << "Start of built event fount at " << p - start << endl; + p++; + } else if (*p == PFX_END_OF_BUILT_EVENT) { + end_of_event = true; + cout << "End of event fount at " << p - start << endl; + p++; + } else if (*p == PFX_SOBE_SIZE) { + // Skip header + p++; + + // Built Event Size lower 16-bit + r0 = *p; + p++; + // Built Event Size upper 16-bit + r1 = *p; + p++; + tmp_i[0] = (int) ((r1 << 16) | (r0)); + } else { + p++; + } + } + return end_of_event; +} + void StorageManager::Initialize(const string& filename) { if (file != nullptr) { cerr << "StorageManager already initialized" << endl; @@ -36,14 +174,8 @@ void StorageManager::Initialize(const string& filename) { file = std::make_unique(filename.c_str(), "RECREATE"); - if (compression_algorithm == "ZLIB") { - file->SetCompressionAlgorithm(ROOT::kZLIB); // good compression ratio and fast (old root default) - } else if (compression_algorithm == "LZ4") { - file->SetCompressionAlgorithm(ROOT::kLZ4); // good compression ratio and fast (new root default) - } else if (compression_algorithm == "LZMA") { + if (!fast_compression) { file->SetCompressionAlgorithm(ROOT::kLZMA); // biggest compression ratio but slowest - } else { - throw std::runtime_error("Unknown compression algorithm: " + compression_algorithm); } // file->SetCompressionLevel(9); // max compression level, but it's very slow, probably not worth it @@ -76,6 +208,39 @@ void StorageManager::Initialize(const string& filename) { prometheus_manager.ExposeRootOutputFilename(filename); prometheus_manager.UpdateOutputRootFileSize(); + + thread([this]() { + while (true) { + const auto frame = PopFrame(); + + if (frame.empty()) { + // PopFrame does not block since it requires locking the mutex. If there are no frames in the queue, it should return an empty frame + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } else if (frame.size() == 1 && frame[0] == 0) { + // special frame signaling end of built event + auto& storage_manager = feminos_daq_storage::StorageManager::Instance(); + auto& prometheus_manager = feminos_daq_prometheus::PrometheusManager::Instance(); + + if (storage_manager.IsInitialized()) { + + storage_manager.event.id = storage_manager.event_tree->GetEntries(); + storage_manager.event_tree->Fill(); + + storage_manager.Checkpoint(); + + prometheus_manager.SetNumberOfSignalsInEvent(storage_manager.event.size()); + prometheus_manager.SetNumberOfEvents(storage_manager.event_tree->GetEntries()); + + prometheus_manager.UpdateOutputRootFileSize(); + } + + storage_manager.Clear(); + } else { + // read frame data into event + ReadFrame(frame, event); + } + } + }).detach(); } void StorageManager::SetOutputDirectory(const string& directory) { @@ -105,6 +270,39 @@ void StorageManager::SetOutputDirectory(const string& directory) { } } +void StorageManager::AddFrame(const vector& frame) { + lock_guard lock(frames_mutex); + frames.push(frame); + frames_count++; + + if (frames.size() >= max_frames) { + throw std::runtime_error("Too many frames in queue"); + } +} + +std::vector StorageManager::PopFrame() { + lock_guard lock(frames_mutex); + if (frames.empty()) { + return {}; + } + auto frame = frames.front(); + frames.pop(); + return std::move(frame); +} + +unsigned int StorageManager::GetNumberOfFramesInserted() const { + return frames_count; +} + +unsigned int StorageManager::GetNumberOfFramesInQueue() { + lock_guard lock(frames_mutex); + return frames.size(); +} + +double StorageManager::GetQueueUsage() { + return GetNumberOfFramesInQueue() / (double) max_frames; +} + std::pair> Event::get_signal_id_data_pair(size_t index) const { unsigned short channel = signal_ids[index]; std::array data{}; diff --git a/src/root/storage.h b/src/root/storage.h index df855af..d6de90d 100644 --- a/src/root/storage.h +++ b/src/root/storage.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include #include #include @@ -81,7 +83,8 @@ class StorageManager { std::unique_ptr run_tree; Event event; - std::string compression_algorithm; + bool fast_compression = false; + bool disable_aqs = false; unsigned long long run_number = 0; unsigned long long run_time_start = 0; @@ -109,11 +112,22 @@ class StorageManager { return output_directory; } + void AddFrame(const std::vector& frame); + std::vector PopFrame(); + unsigned int GetNumberOfFramesInQueue(); + double GetQueueUsage(); + unsigned int GetNumberOfFramesInserted() const; + private: // make it a point in the past to force a checkpoint on the first event const std::chrono::duration checkpoint_interval = std::chrono::seconds(10); std::chrono::time_point checkpoint_last = std::chrono::system_clock::now() - checkpoint_interval; std::string output_directory; + + std::queue> frames; + std::atomic frames_count = 0; + std::mutex frames_mutex; + const size_t max_frames = 1000000; // this should be about 2GB when full (depends on frame size) }; } // namespace feminos_daq_storage diff --git a/viewer/feminos-viewer.py b/viewer/feminos-viewer.py index 0ff8162..e8260d5 100755 --- a/viewer/feminos-viewer.py +++ b/viewer/feminos-viewer.py @@ -1938,6 +1938,8 @@ def attach(self): self.plot_graph() self.auto_update_button.select() + # just selecting the button does not trigger the command! + self.on_auto_update() def load_file(self): if self.filepath is None: @@ -1949,7 +1951,7 @@ def load_file(self): try: self.event_tree = self.file["events"] except KeyError: - self.current_entry = 0 + self.update_entry(0) messagebox.showerror( "Error", @@ -1961,7 +1963,7 @@ def load_file(self): try: self.run_tree = self.file["run"] except KeyError: - self.current_entry = 0 + self.update_entry(0) messagebox.showerror( "Error", @@ -1982,7 +1984,7 @@ def load_file(self): ) if self.current_entry >= self.event_tree.num_entries: - self.current_entry = 0 + self.update_entry(0) def open_local_file(self): self.filepath = filedialog.askopenfilename(filetypes=[("ROOT files", "*.root")])