Skip to content

Commit

Permalink
extend opt. max-tf-per-file to raw-tf reader, move it to device
Browse files Browse the repository at this point in the history
  • Loading branch information
shahor02 committed Nov 18, 2024
1 parent 0c01d1b commit 9019955
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 20 deletions.
7 changes: 7 additions & 0 deletions Detectors/CTF/workflow/src/CTFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ void CTFReaderSpec::init(InitContext& ic)
mUseLocalTFCounter = ic.options().get<bool>("local-tf-counter");
mImposeRunStartMS = ic.options().get<int64_t>("impose-run-start-timstamp");
mInput.checkTFLimitBeforeReading = ic.options().get<bool>("limit-tf-before-reading");
mInput.maxTFs = ic.options().get<int>("max-tf");
mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
mRunning = true;
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
Expand Down Expand Up @@ -474,6 +478,9 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
options.emplace_back(ConfigParamSpec{"local-tf-counter", VariantType::Bool, false, {"reassign header.tfCounter from local TF counter"}});
options.emplace_back(ConfigParamSpec{"fetch-failure-threshold", VariantType::Float, 0.f, {"Fail if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
options.emplace_back(ConfigParamSpec{"limit-tf-before-reading", VariantType::Bool, false, {"Check TF limiting before reading new TF, otherwhise before injecting it"}});
options.emplace_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
options.emplace_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});

if (!inp.metricChannel.empty()) {
options.emplace_back(ConfigParamSpec{"channel-config", VariantType::String, inp.metricChannel, {"Out-of-band channel config for TF throttling"}});
}
Expand Down
7 changes: 0 additions & 7 deletions Detectors/CTF/workflow/src/ctf-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ void customize(std::vector<o2::framework::ConfigParamSpec>& workflowOptions)
options.push_back(ConfigParamSpec{"ctf-input", VariantType::String, "none", {"comma-separated list CTF input files"}});
options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, std::string{DetID::ALL}, {"comma-separated list of detectors to accept. Overrides skipDet"}});
options.push_back(ConfigParamSpec{"skipDet", VariantType::String, std::string{DetID::NONE}, {"comma-separate list of detectors to skip"}});
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max CTFs to process (<= 0 : infinite)"}});
options.push_back(ConfigParamSpec{"max-tf-per-file", VariantType::Int, -1, {"max TFs to process per ctf file (<= 0 : infinite)"}});
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (infinite for N<0)"}});
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files or no-copy to avoid copying"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
Expand Down Expand Up @@ -117,11 +115,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
if (ctfInput.delay_us < 0) {
ctfInput.delay_us = 0;
}
int n = configcontext.options().get<int>("max-tf");
ctfInput.maxTFs = n > 0 ? n : 0x7fffffff;

n = configcontext.options().get<int>("max-tf-per-file");
ctfInput.maxTFsPerFile = n > 0 ? n : 0x7fffffff;

ctfInput.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));

Expand Down
23 changes: 18 additions & 5 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ TFReaderSpec::TFReaderSpec(const TFReaderInp& rinp) : mInput(rinp)
void TFReaderSpec::init(o2f::InitContext& ic)
{
mInput.tfIDs = o2::RangeTokenizer::tokenize<int>(ic.options().get<std::string>("select-tf-ids"));
mInput.maxTFs = ic.options().get<int>("max-tf");
mInput.maxTFs = mInput.maxTFs > 0 ? mInput.maxTFs : 0x7fffffff;
mInput.maxTFsPerFile = ic.options().get<int>("max-tf-per-file");
mInput.maxTFsPerFile = mInput.maxTFsPerFile > 0 ? mInput.maxTFsPerFile : 0x7fffffff;
mInput.maxTFCache = std::max(1, ic.options().get<int>("max-cached-tf"));
mInput.maxFileCache = std::max(1, ic.options().get<int>("max-cached-files"));
mFileFetcher = std::make_unique<o2::utils::FileFetcher>(mInput.inpdata, mInput.tffileRegex, mInput.remoteRegex, mInput.copyCmd);
mFileFetcher->setMaxFilesInQueue(mInput.maxFileCache);
mFileFetcher->setMaxLoops(mInput.maxLoops);
Expand Down Expand Up @@ -417,15 +423,17 @@ void TFReaderSpec::TFBuilder()
}
mTFBuilderCounter++;
}
if (!acceptTF) {
continue;
}
if (mRunning && tf) {
mWaitSendingLast = true;
mTFQueue.push(std::move(tf));
if (acceptTF) {
mWaitSendingLast = true;
mTFQueue.push(std::move(tf));
}
} else {
break;
}
if (mInput.maxTFsPerFile > 0 && mInput.maxTFsPerFile >= locID) { // go to next file
break;
}
}
// remove already processed file from the queue, unless they are needed for further looping
if (mFileFetcher) {
Expand Down Expand Up @@ -527,6 +535,11 @@ o2f::DataProcessorSpec o2::rawdd::getTFReaderSpec(o2::rawdd::TFReaderInp& rinp)
}
spec.options.emplace_back(o2f::ConfigParamSpec{"select-tf-ids", o2f::VariantType::String, "", {"comma-separated list TF IDs to inject (from cumulative counter of TFs seen)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"fetch-failure-threshold", o2f::VariantType::Float, 0.f, {"Fatil if too many failures( >0: fraction, <0: abs number, 0: no threshold)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf", o2f::VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-tf-per-file", o2f::VariantType::Int, -1, {"max TFs to process per raw-tf file (<= 0 : infinite)"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-tf", o2f::VariantType::Int, 3, {"max TFs to cache in memory"}});
spec.options.emplace_back(o2f::ConfigParamSpec{"max-cached-files", o2f::VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});

spec.algorithm = o2f::adaptFromTask<TFReaderSpec>(rinp);

return spec;
Expand Down
1 change: 1 addition & 0 deletions Detectors/Raw/TFReaderDD/src/TFReaderSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct TFReaderInp {
int64_t delay_us = 0;
int maxLoops = 0;
int maxTFs = -1;
int maxTFsPerFile = -1;
bool sendDummyForMissing = true;
bool sup0xccdb = false;
std::vector<o2::header::DataHeader> hdVec;
Expand Down
8 changes: 0 additions & 8 deletions Detectors/Raw/TFReaderDD/src/tf-reader-workflow.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,16 @@ void customize(std::vector<ConfigParamSpec>& workflowOptions)
options.push_back(ConfigParamSpec{"onlyDet", VariantType::String, "all", {"list of dectors"}});
options.push_back(ConfigParamSpec{"raw-only-det", VariantType::String, "none", {"do not open non-raw channel for these detectors"}});
options.push_back(ConfigParamSpec{"non-raw-only-det", VariantType::String, "none", {"do not open raw channel for these detectors"}});
options.push_back(ConfigParamSpec{"max-tf", VariantType::Int, -1, {"max TF ID to process (<= 0 : infinite)"}});
options.push_back(ConfigParamSpec{"loop", VariantType::Int, 0, {"loop N times (-1 = infinite)"}});
options.push_back(ConfigParamSpec{"delay", VariantType::Float, 0.f, {"delay in seconds between consecutive TFs sending"}});
options.push_back(ConfigParamSpec{"copy-cmd", VariantType::String, "alien_cp ?src file://?dst", {"copy command for remote files"}}); // Use "XrdSecPROTOCOL=sss,unix xrdcp -N root://eosaliceo2.cern.ch/?src ?dst" for direct EOS access
options.push_back(ConfigParamSpec{"tf-file-regex", VariantType::String, ".+\\.tf$", {"regex string to identify TF files"}});
options.push_back(ConfigParamSpec{"remote-regex", VariantType::String, "^(alien://|)/alice/data/.+", {"regex string to identify remote files"}}); // Use "^/eos/aliceo2/.+" for direct EOS access
options.push_back(ConfigParamSpec{"max-cached-tf", VariantType::Int, 3, {"max TFs to cache in memory"}});
options.push_back(ConfigParamSpec{"max-cached-files", VariantType::Int, 3, {"max TF files queued (copied for remote source)"}});
options.push_back(ConfigParamSpec{"tf-reader-verbosity", VariantType::Int, 0, {"verbosity level (1 or 2: check RDH, print DH/DPH for 1st or all slices, >2 print RDH)"}});
options.push_back(ConfigParamSpec{"raw-channel-config", VariantType::String, "", {"optional raw FMQ channel for non-DPL output"}});
options.push_back(ConfigParamSpec{"send-diststf-0xccdb", VariantType::Bool, false, {"send explicit FLP/DISTSUBTIMEFRAME/0xccdb output"}});
options.push_back(ConfigParamSpec{"disable-dummy-output", VariantType::Bool, false, {"Disable sending empty output if corresponding data is not found in the data"}});
options.push_back(ConfigParamSpec{"configKeyValues", VariantType::String, "", {"semicolon separated key=value strings"}});

options.push_back(ConfigParamSpec{"timeframes-shm-limit", VariantType::String, "0", {"Minimum amount of SHM required in order to publish data"}});
options.push_back(ConfigParamSpec{"metric-feedback-channel-format", VariantType::String, "name=metric-feedback,type=pull,method=connect,address=ipc://{}metric-feedback-{},transport=shmem,rateLogging=0", {"format for the metric-feedback channel for TF rate limiting"}});

Expand All @@ -59,8 +55,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
o2::rawdd::TFReaderInp rinp;
rinp.inpdata = configcontext.options().get<std::string>("input-data");
rinp.maxLoops = configcontext.options().get<int>("loop");
int n = configcontext.options().get<int>("max-tf");
rinp.maxTFs = n > 0 ? n : 0x7fffffff;
auto detlistSelect = configcontext.options().get<std::string>("onlyDet");
if (detlistSelect == "all") {
// Exclude FOCAL from default detlist (must be selected on request)
Expand All @@ -74,8 +68,6 @@ WorkflowSpec defineDataProcessing(ConfigContext const& configcontext)
rinp.rawChannelConfig = configcontext.options().get<std::string>("raw-channel-config");
rinp.delay_us = uint64_t(1e6 * configcontext.options().get<float>("delay")); // delay in microseconds
rinp.verbosity = configcontext.options().get<int>("tf-reader-verbosity");
rinp.maxTFCache = std::max(1, configcontext.options().get<int>("max-cached-tf"));
rinp.maxFileCache = std::max(1, configcontext.options().get<int>("max-cached-files"));
rinp.copyCmd = configcontext.options().get<std::string>("copy-cmd");
rinp.tffileRegex = configcontext.options().get<std::string>("tf-file-regex");
rinp.remoteRegex = configcontext.options().get<std::string>("remote-regex");
Expand Down

0 comments on commit 9019955

Please sign in to comment.