Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Paralelize processing frames to avoid dead time #2

Merged
merged 37 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
654a6b1
benchmark
lobis Aug 29, 2024
81d0a25
read frame debug
lobis Aug 29, 2024
085f5bd
debug
lobis Aug 29, 2024
d773111
debug
lobis Aug 29, 2024
1d06ef4
debug
lobis Aug 29, 2024
d0098ae
frame q
lobis Aug 29, 2024
3ee719b
avoid print
lobis Aug 29, 2024
bfb72aa
thread to process frames
lobis Aug 29, 2024
eec7270
process frames
lobis Aug 29, 2024
e190529
debug event
lobis Aug 29, 2024
fd73964
end of built event
lobis Aug 29, 2024
fbbc301
do not overfill q
lobis Aug 29, 2024
0a6ebe9
do not allow too many frames in q
lobis Aug 29, 2024
cf9f57b
less wait for frames
lobis Aug 29, 2024
da448b6
frame queue usage
lobis Aug 29, 2024
3e1938b
print queue usage
lobis Aug 29, 2024
02601be
cout
lobis Aug 29, 2024
7804724
Merge remote-tracking branch 'origin/main' into speed
lobis Aug 29, 2024
fea3cae
cout
lobis Aug 29, 2024
985b935
danger emoji
lobis Aug 29, 2024
862011e
flag for fast compression
lobis Aug 29, 2024
4e67e85
text
lobis Aug 29, 2024
329f89e
increase frame q size
lobis Aug 29, 2024
388747c
set priority system call
lobis Aug 29, 2024
2a74390
correct auto-update
lobis Aug 29, 2024
28dfc0a
do not change process priority
lobis Aug 29, 2024
9f2a1c9
emoji
lobis Aug 29, 2024
60a83ef
error when opening file with less entry
lobis Aug 29, 2024
4545ba5
space
lobis Aug 29, 2024
c5788da
remove space
lobis Aug 29, 2024
8b9a65b
attempt to disable aqs
lobis Aug 29, 2024
d997ca7
attempt to disable aqs
lobis Aug 29, 2024
636bde8
attempt to disable aqs
lobis Aug 29, 2024
d7d0c7d
disable aqs
lobis Aug 29, 2024
1b23755
q level metric
lobis Aug 29, 2024
e637b36
typo
lobis Aug 29, 2024
7346668
names
lobis Aug 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ void CleanSharedMemory(int s) {
}

int main(int argc, char** argv) {

CmdFetcher_Init(&cmdfetcher);
FemArray_Clear(&femarray);
EventBuilder_Clear(&eventbuilder);
Expand All @@ -126,11 +125,13 @@ int main(int argc, char** argv) {
std::string output_file;
int verbose_level = -1;
std::string output_directory;
std::string root_compression_algorithm = "LZMA";
bool version_flag = false;
bool fast_compression = false;
bool disable_aqs = false;

CLI::App app{"feminos-daq"};

app.add_flag("--version", version_flag, "Print the version");
app.add_option("-s,--server", server_ip, "Base IP address of remote server(s) in dotted decimal")
->group("Connection Options")
->check(CLI::ValidIPV4);
Expand All @@ -155,10 +156,8 @@ int main(int argc, char** argv) {
app.add_flag("--read-only", readOnly, "Read-only mode")
->group("General");
app.add_flag("--shared-buffer", sharedBuffer, "Store event data in a shared memory buffer")->group("General");
app.add_option("--root-compression-algorithm", root_compression_algorithm, "Root compression algorithm. Use LZMA (default) for best compression or LZ4 for speed when the event rate very high")
->group("File Options")
->check(CLI::IsMember({"ZLIB", "LZMA", "LZ4"}));
app.add_flag("--version", version_flag, "Print the version");
app.add_flag("--fast-compression", fast_compression, "Disable maximum compression in output file to improve performance. This should only be enabled when the event rate is so high that the default compression cannot keep up. This will increase output file size")->group("File Options");
app.add_flag("--disable-aqs", disable_aqs, "Do not store data in aqs format. NOTE: aqs files may be created anyways but they will not have data")->group("File Options");

CLI11_PARSE(app, argc, argv);

Expand All @@ -179,7 +178,8 @@ int main(int argc, char** argv) {
auto& storage_manager = feminos_daq_storage::StorageManager::Instance();

storage_manager.SetOutputDirectory(output_directory);
storage_manager.compression_algorithm = root_compression_algorithm;
storage_manager.fast_compression = fast_compression;
storage_manager.disable_aqs = disable_aqs;

stringIpToArray(server_ip, femarray.rem_ip_beg);
stringIpToArray(local_ip, femarray.loc_ip);
Expand Down Expand Up @@ -272,7 +272,6 @@ int main(int argc, char** argv) {
printf("Thread_Create failed %d\n", err);
goto cleanup;
}
// printf("femarray Thread_Create done\n" );

// Create Event Builder thread
eventbuilder.thread.routine = reinterpret_cast<void (*)()>(EventBuilder_Loop);
Expand All @@ -282,7 +281,6 @@ int main(int argc, char** argv) {
printf("Thread_Create failed %d\n", err);
goto cleanup;
}
// printf("eventbuilder Thread_Create done\n" );

// Run the main loop of the command interpreter
CmdFetcher_Main(&cmdfetcher);
Expand Down
210 changes: 15 additions & 195 deletions src/mclient/evbuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ and timestamps depending on event builder mode
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/shm.h>
#include <thread>

#include "prometheus.h"
#include "storage.h"
Expand Down Expand Up @@ -403,169 +404,6 @@ int EventBuilder_CheckBuffer(EventBuilder* eb, int src,
return (err);
}

void ReadFrame(void* fr, int fr_sz, feminos_daq_storage::Event& event) {
unsigned short* p;
int done = 0;
unsigned short r0, r1, r2;
unsigned short n0, n1;
unsigned short cardNumber, chipNumber, daqChannel;
unsigned int tmp;
int tmp_i[10];
int si;

p = (unsigned short*) fr;

done = 0;
si = 0;

unsigned int signal_id = 0;
std::array<unsigned short, 512> signal_data = {};

while (!done) {
// Is it a prefix for 14-bit content?
if ((*p & PFX_14_BIT_CONTENT_MASK) == PFX_CARD_CHIP_CHAN_HIT_IX) {
// if (sgnl.GetSignalID() >= 0 && sgnl.GetNumberOfPoints() >= fMinPoints) { fSignalEvent->AddSignal(sgnl); }

if (si > 0) {
event.add_signal(signal_id, signal_data);
}

cardNumber = GET_CARD_IX(*p);
chipNumber = GET_CHIP_IX(*p);
daqChannel = GET_CHAN_IX(*p);

if (daqChannel >= 0) {
daqChannel += cardNumber * 4 * 72 + chipNumber * 72;
}

p++;
si = 0;

signal_id = daqChannel;
}
// Is it a prefix for 12-bit content?
else if ((*p & PFX_12_BIT_CONTENT_MASK) == PFX_ADC_SAMPLE) {
r0 = GET_ADC_DATA(*p);

signal_data[si] = r0;

p++;
si++;
}
// Is it a prefix for 4-bit content?
else if ((*p & PFX_4_BIT_CONTENT_MASK) == PFX_START_OF_EVENT) {
// cout << " + Start of event" << endl;
r0 = GET_EVENT_TYPE(*p);
p++;

// Time Stamp lower 16-bit
r0 = *p;
p++;

// Time Stamp middle 16-bit
r1 = *p;
p++;

// Time Stamp upper 16-bit
r2 = *p;
p++;

// Set timestamp and event ID

// Event Count lower 16-bit
n0 = *p;
p++;

// Event Count upper 16-bit
n1 = *p;
p++;

tmp = (((unsigned int) n1) << 16) | ((unsigned int) n0);

// auto time = 0 + (2147483648 * r2 + 32768 * r1 + r0) * 2e-8;

// milliseconds unix time

if (event.timestamp == 0) {
auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
event.timestamp = milliseconds;
}

// cout << " - Time: " << time << endl;
// Some times the end of the frame contains the header of the next event.
// Then, in the attempt to read the header of next event, we must avoid
// that it overwrites the already assigned id. In that case (id != 0), we
// do nothing, and we store the values at fLastXX variables, that we will
// use that for next event.

/*
if (fSignalEvent->GetID() == 0) {
if (fLastEventId == 0) {
fSignalEvent->SetID(tmp);
fSignalEvent->SetTime(tStart + (2147483648 * r2 + 32768 * r1 + r0) * 2e-8);
} else {
fSignalEvent->SetID(fLastEventId);
fSignalEvent->SetTime(fLastTimeStamp);
}
}

fLastEventId = tmp;
fLastTimeStamp = tStart + (2147483648 * r2 + 32768 * r1 + r0) * 2e-8;

// If it is the first event we use it to define the run start time
if (fCounter == 0) {
fRunInfo->SetStartTimeStamp(fLastTimeStamp);
fCounter++;
} else {
// and we keep updating the end run time
fRunInfo->SetEndTimeStamp(fLastTimeStamp);
}
*/
// fSignalEvent->SetRunOrigin(fRunOrigin);
// fSignalEvent->SetSubRunOrigin(fSubRunOrigin);
} else if ((*p & PFX_4_BIT_CONTENT_MASK) == PFX_END_OF_EVENT) {
tmp = ((unsigned int) GET_EOE_SIZE(*p)) << 16;
p++;
tmp = tmp + (unsigned int) *p;
p++;

// if (fElectronicsType == "SingleFeminos") endOfEvent = true;
// cout << " - End of event" << endl;
}

// Is it a prefix for 0-bit content?
else if ((*p & PFX_0_BIT_CONTENT_MASK) == PFX_END_OF_FRAME) {
// if (sgnl.GetSignalID() >= 0 && sgnl.GetNumberOfPoints() >= fMinPoints) { fSignalEvent->AddSignal(sgnl); }
if (si > 0) {
event.add_signal(signal_id, signal_data);
}

p++;
done = 1;
} else if (*p == PFX_START_OF_BUILT_EVENT) {
p++;
} else if (*p == PFX_END_OF_BUILT_EVENT) {
p++;
} else if (*p == PFX_SOBE_SIZE) {
// Skip header
p++;

// Built Event Size lower 16-bit
r0 = *p;
p++;
// Built Event Size upper 16-bit
r1 = *p;
p++;
tmp_i[0] = (int) ((r1 << 16) | (r0));
} else {
p++;
}
}
}

/*******************************************************************************
EventBuilder_ProcessBuffer
*******************************************************************************/
int EventBuilder_ProcessBuffer(EventBuilder* eb, void* bu) {
int err = 0;
unsigned short* bu_s;
Expand All @@ -589,27 +427,15 @@ int EventBuilder_ProcessBuffer(EventBuilder* eb, void* bu) {
if (sharedBuffer) {
SemaphoreRed(SemaphoreId);

// printf( "Event time BEFORE : %lf\n",
// ShMem_DaqInfo->timeStamp );
Frame_ToSharedMemory((void*) stdout, (void*) bu_s,
(int) sz, 0x0, ShMem_DaqInfo,
ShMem_Buffer, timeStart, tcm);

// cout << " -- frame to shared memory" << endl;

// printf( "TIME START : %d\n", timeStart );
// printf( "Event time : %lf\n",
// ShMem_DaqInfo->timeStamp );
// ShMem_DaqInfo->timeStamp = (double)
// timeStart + ShMem_DaqInfo->timeStamp; printf(
// "Event time added : %lf\n",
// ShMem_DaqInfo->timeStamp ); printf( "-----\n");

SemaphoreGreen(SemaphoreId);
}

// Save data to file
if (!readOnly && eb->savedata) {
if (!readOnly && eb->savedata && !feminos_daq_storage::StorageManager::Instance().disable_aqs) {
// Should we close the current file and open a new one?
if ((eb->byte_wr + sz) > eb->file_max_size) {
if ((err = EventBuilder_FileAction(
Expand Down Expand Up @@ -638,14 +464,19 @@ int EventBuilder_ProcessBuffer(EventBuilder* eb, void* bu) {
}

eb->byte_wr += sz;
// printf("EventBuilder_ProcessBuffer: wrote %d
// bytes to file\n", sz);
}

auto& storage_manager = feminos_daq_storage::StorageManager::Instance();

if (storage_manager.IsInitialized()) {
ReadFrame((void*) bu_s, (int) sz, storage_manager.event);
// ReadFrame((void*) bu_s, (int) sz, storage_manager.event);

// Insert frame
std::vector<unsigned short> data;
data.reserve(sz); // Reserve space to avoid reallocations
std::copy(bu_s, bu_s + sz, std::back_inserter(data));

storage_manager.AddFrame(data);
}

return (err);
Expand Down Expand Up @@ -902,27 +733,16 @@ int EventBuilder_Loop(EventBuilder* eb) {
} else {
eb->had_sobe = 0;

auto& prometheus_manager = feminos_daq_prometheus::PrometheusManager::Instance();
auto& storage_manager = feminos_daq_storage::StorageManager::Instance();

if (storage_manager.IsInitialized()) {

// Send a special frame signaling the end of a built event
storage_manager.AddFrame({0});

if (storage_manager.GetNumberOfEntries() == 0) {
storage_manager.millisSinceEpochForSpeedCalculation = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}

storage_manager.event.id = storage_manager.event_tree->GetEntries();

prometheus_manager.SetNumberOfSignalsInEvent(storage_manager.event.size());
prometheus_manager.SetNumberOfEvents(storage_manager.event_tree->GetEntries());

storage_manager.event_tree->Fill();

storage_manager.Checkpoint();

prometheus_manager.UpdateOutputRootFileSize();

storage_manager.Clear();
}
}

Expand Down Expand Up @@ -1094,7 +914,7 @@ int EventBuilder_GetBufferToRecycle(EventBuilder* eb,
int EventBuilder_FileAction(EventBuilder* eb,
EBFileActions action,
int format) {
if (readOnly) return 0;
if (readOnly) { return 0; }

struct tm* now;
char name[120];
Expand Down Expand Up @@ -1154,7 +974,7 @@ int EventBuilder_FileAction(EventBuilder* eb,
}
// Close the current file
else if (action == EBFA_CloseCurrentOpenNext) {
if (eb->fout == 0) {
if (eb->fout == nullptr) {
} else {
fflush(eb->fout);
fclose(eb->fout);
Expand Down
21 changes: 16 additions & 5 deletions src/mclient/femarray.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
the sequence number is also cleared at the other end
- New scheme: on the first daq command, the sequence number becomes 0 after
it is incremented, and the first daq command does not contain the sequence
number so that it gets cleared at the other end. Sub-sequend daq requests
number so that it gets cleared at the other end. Subsequent daq requests
have a sequence number starting from 0x00 and incrementing by one unit until
wrap around.
The problem with the initial scheme is that after the last daq request is
Expand Down Expand Up @@ -313,17 +313,28 @@ int FemArray_SendDaq(FemArray* fa, unsigned int fem_beg, unsigned int fem_end, u
const auto speed_events_per_second = feminos_daq_storage::StorageManager::Instance().GetSpeedEventsPerSecond();
const auto number_of_events = feminos_daq_storage::StorageManager::Instance().GetNumberOfEntries();

auto& storageManager = feminos_daq_storage::StorageManager::Instance();
const auto queueUsage = storageManager.GetQueueUsage();

time_t now_time = time(nullptr);
tm* now_tm = gmtime(&now_time);
char time_str[80];
strftime(time_str, 80, "[%Y-%m-%dT%H:%M:%SZ]", now_tm);

cout << time_str << " | # Entries: " << number_of_events << " | 🏃 Speed: " << speed_events_per_second << " entries/s (" << daq_speed << " MB/s)" << endl;
string q_fill_string;
if (queueUsage > 0.05) {
std::stringstream ss;
ss << std::fixed << std::setprecision(1) << queueUsage * 100.0;
q_fill_string = " | ⚠\uFE0F Queue at " + ss.str() + "% Capacity ⚠\uFE0F";
}

cout << time_str << " | # Entries: " << number_of_events << " | 🏃 Speed: " << speed_events_per_second << " entry/s (" << daq_speed << " MB/s)" << q_fill_string << endl;

auto& manager = feminos_daq_prometheus::PrometheusManager::Instance();
auto& prometheus_manager = feminos_daq_prometheus::PrometheusManager::Instance();

manager.SetDaqSpeedMB(daq_speed);
manager.SetDaqSpeedEvents(speed_events_per_second);
prometheus_manager.SetDaqSpeedMB(daq_speed);
prometheus_manager.SetDaqSpeedEvents(speed_events_per_second);
prometheus_manager.SetFrameQueueFillLevel(queueUsage);

// Update the new time and size of received data
fa->daq_last_time = now;
Expand Down
Loading
Loading