Skip to content

Commit

Permalink
Merge pull request #3032 from E3SM-Project/aarondonahue/close_output_…
Browse files Browse the repository at this point in the history
…when_full
  • Loading branch information
bartgol authored Oct 17, 2024
2 parents 41f563d + 033d31e commit 68935b3
Show file tree
Hide file tree
Showing 15 changed files with 84 additions and 34 deletions.
1 change: 0 additions & 1 deletion components/eamxx/src/dynamics/homme/tests/dyn_grid_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ TEST_CASE("dyn_grid_io")
ekat::ParameterList out_params;
out_params.set<std::string>("Averaging Type","Instant");
out_params.set<std::string>("filename_prefix","dyn_grid_io");
out_params.set<bool>("MPI Ranks in Filename",true);
out_params.sublist("Fields").sublist("Dynamics").set<std::vector<std::string>>("Field Names",fnames);
out_params.sublist("Fields").sublist("Dynamics").set<std::string>("IO Grid Name","Physics GLL");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ create_om (const std::string& filename_prefix,
params.set<std::string>("Averaging Type","INSTANT");
params.set<std::string>("filename_prefix",filename_prefix);
params.set<std::string>("Floating Point Precision","real");
params.set("MPI Ranks in Filename", false);
params.set("Field Names",strvec_t{"p_mid","U","V"});
params.set("fill_value",fill_val);

Expand Down
21 changes: 18 additions & 3 deletions components/eamxx/src/share/io/scream_io_control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,38 @@ struct IOControl {
util::TimeStamp next_write_ts;
util::TimeStamp last_write_ts;

// At run time, set dt in the struct, so we can compute next_write_ts correctly,
// even if freq_units is "nsteps"
// NOTE: this ASSUMES dt is constant throughout the run (i.e., no time adaptivity).
// An error will be thrown if dt changes, so developers can fix this if we ever support variable dt
double dt = 0;

bool output_enabled () const {
return frequency_units!="none" && frequency_units!="never";
}

bool is_write_step (const util::TimeStamp& ts) const {
if (not output_enabled()) return false;
return frequency_units=="nsteps" ? ts.get_num_steps()==next_write_ts.get_num_steps()
: ts==next_write_ts;
: (ts.get_date()==next_write_ts.get_date() and
ts.get_time()==next_write_ts.get_time());
}

void set_dt (const double dt_in) {
EKAT_REQUIRE_MSG (dt==0 or dt==dt_in,
"[IOControl::set_dt] Error! Cannot reset dt once it is set.\n");

dt = dt_in;
}

// Computes next_write_ts from frequency and last_write_ts
void compute_next_write_ts () {
EKAT_REQUIRE_MSG (last_write_ts.is_valid(),
"Error! Cannot compute next_write_ts, since last_write_ts was never set.\n");
if (frequency_units=="nsteps") {
// This avoids having an invalid date/time in the above check next time this fcn runs
next_write_ts = last_write_ts;
// This avoids having an invalid/wrong date/time in StorageSpecs::snapshot_fits
// if storage type is NumSnaps
next_write_ts = last_write_ts + dt*frequency;
next_write_ts.set_num_steps(last_write_ts.get_num_steps()+frequency);
} else if (frequency_units=="nsecs") {
next_write_ts = last_write_ts;
Expand Down
5 changes: 2 additions & 3 deletions components/eamxx/src/share/io/scream_io_file_specs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct StorageSpecs {
// - type=NumSnaps: the number of stored snaps is less than the max allowed per file.
// - otherwise: the snapshot month/year index match the one currently stored in the file
// or the file has no snapshot stored yet
bool snapshot_fits (const util::TimeStamp& t) {
bool snapshot_fits (const util::TimeStamp& t) const {
const auto& idx = type==Monthly ? t.get_month() : t.get_year();
switch (type) {
case Yearly:
Expand Down Expand Up @@ -86,9 +86,8 @@ struct IOFileSpecs {
// If positive, flush the output file every these many snapshots
int flush_frequency = std::numeric_limits<int>::max();

// bool file_is_full () const { return num_snapshots_in_file>=max_snapshots_in_file; }
bool file_needs_flush () const {
return storage.num_snapshots_in_file%flush_frequency==0;
return storage.num_snapshots_in_file>0 and storage.num_snapshots_in_file%flush_frequency==0;
}

// Whether it is a model output, model restart, or history restart file
Expand Down
59 changes: 43 additions & 16 deletions components/eamxx/src/share/io/scream_output_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,16 +231,17 @@ setup (const ekat::Comm& io_comm, const ekat::ParameterList& params,
m_resume_output_file = last_output_filename!="" and not restart_pl.get("force_new_file",false);
if (m_resume_output_file) {
int num_snaps = scorpio::get_attribute<int>(rhist_file,"GLOBAL","last_output_file_num_snaps");

m_output_file_specs.filename = last_output_filename;
m_output_file_specs.is_open = true;
m_output_file_specs.storage.num_snapshots_in_file = num_snaps;

if (m_output_file_specs.storage.snapshot_fits(m_output_control.next_write_ts)) {
// The setup_file call will not register any new variable (the file is in Append mode,
// so all dims/vars must already be in the file). However, it will register decompositions,
// since those are a property of the run, not of the file.
m_output_file_specs.filename = last_output_filename;
m_output_file_specs.is_open = true;
setup_file(m_output_file_specs,m_output_control);
} else {
m_output_file_specs.close();
}
}
scorpio::release_file(rhist_file);
Expand Down Expand Up @@ -287,6 +288,24 @@ void OutputManager::init_timestep (const util::TimeStamp& start_of_step, const R
return;
}

// Make sure dt is in the control
m_output_control.set_dt(dt);

if (start_of_step==m_case_t0 and m_avg_type==OutputAvgType::Instant and
m_output_file_specs.storage.type!=NumSnaps and m_output_control.frequency_units=="nsteps") {
// This is the 1st step of the whole run, and a very sneaky corner case. Bear with me.
// When we call run, we also compute next_write_ts. Then, we use next_write_ts to see if the
// next output step will fit in the currently open file, and, if not, close it right away.
// For a storage type!=NumSnaps, we need to have a valid timestamp for next_write_ts, which
// for freq=nsteps requires to know dt. But at t=case_t0, we did NOT have dt, which means we
// computed next_write_ts=last_write_ts (in terms of date:time, the num_steps is correct).
// This means that at that time we deemed that the next_write_ts definitely fit in the same
// file as last_write_ts (date/time are the same!), which may or may not be true for non NumSnaps
// storage. To fix this, we recompute next_write_ts here, and close the file if it doesn't.
m_output_control.compute_next_write_ts();
close_or_flush_if_needed (m_output_file_specs,m_output_control);
}

// Check if the end of this timestep will correspond to an output step. If not, there's nothing to do
const auto& end_of_step = start_of_step+dt;

Expand Down Expand Up @@ -328,10 +347,6 @@ void OutputManager::run(const util::TimeStamp& timestamp)
"The most likely cause is an output frequency that is faster than the atm timestep.\n"
"Try to increase 'Frequency' and/or 'frequency_units' in your output yaml file.\n");

// Update counters
++m_output_control.nsamples_since_last_write;
++m_checkpoint_control.nsamples_since_last_write;

if (m_atm_logger) {
m_atm_logger->debug("[OutputManager::run] filename_prefix: " + m_filename_prefix + "\n");
}
Expand Down Expand Up @@ -361,6 +376,14 @@ void OutputManager::run(const util::TimeStamp& timestamp)
const bool is_full_checkpoint_step = is_checkpoint_step && has_checkpoint_data && not is_output_step;
const bool is_write_step = is_output_step || is_checkpoint_step;

// Update counters
++m_output_control.nsamples_since_last_write;
if (not is_t0_output) {
// In case REST_OPT=nsteps, don't count t0 output as one of those steps
// NOTE: for m_output_control, it doesn't matter, since it'll be reset to 0 before we return
++m_checkpoint_control.nsamples_since_last_write;
}

// Create and setup output/checkpoint file(s), if necessary
start_timer(timer_root+"::get_new_file");
auto setup_output_file = [&](IOControl& control, IOFileSpecs& filespecs) {
Expand All @@ -381,10 +404,6 @@ void OutputManager::run(const util::TimeStamp& timestamp)
snapshot_start = m_case_t0;
snapshot_start += m_time_bnds[0];
}
if (filespecs.is_open and not filespecs.storage.snapshot_fits(snapshot_start)) {
release_file(filespecs.filename);
filespecs.close();
}

// Check if we need to open a new file
if (not filespecs.is_open) {
Expand Down Expand Up @@ -534,10 +553,7 @@ void OutputManager::run(const util::TimeStamp& timestamp)
scorpio::write_var(filespecs.filename, "time_bnds", m_time_bnds.data());
}

// Check if we need to flush the output file
if (filespecs.file_needs_flush()) {
flush_file (filespecs.filename);
}
close_or_flush_if_needed(filespecs,control);
};

start_timer(timer_root+"::update_snapshot_tally");
Expand Down Expand Up @@ -891,7 +907,18 @@ void OutputManager::set_file_header(const IOFileSpecs& file_specs)
set_str_att("Conventions","CF-1.8");
set_str_att("product",e2str(file_specs.ftype));
}
/*===============================================================================================*/
void OutputManager::
close_or_flush_if_needed ( IOFileSpecs& file_specs,
const IOControl& control) const
{
if (not file_specs.storage.snapshot_fits(control.next_write_ts)) {
scorpio::release_file(file_specs.filename);
file_specs.close();
} else if (file_specs.file_needs_flush()) {
scorpio::flush_file (file_specs.filename);
}
}

void OutputManager::
push_to_logger()
{
Expand Down
8 changes: 8 additions & 0 deletions components/eamxx/src/share/io/scream_output_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ class OutputManager
void finalize();

long long res_dep_memory_footprint () const;

// For debug and testing purposes
const IOControl& output_control () const { return m_output_control; }
const IOFileSpecs& output_file_specs () const { return m_output_file_specs; }
protected:

std::string compute_filename (const IOFileSpecs& file_specs,
Expand All @@ -132,6 +136,10 @@ class OutputManager
void setup_file ( IOFileSpecs& filespecs,
const IOControl& control);

// If a file can be closed (next snap won't fit) or needs flushing, do so
void close_or_flush_if_needed ( IOFileSpecs& file_specs,
const IOControl& control) const;

// Manage logging of info to atm.log
void push_to_logger();

Expand Down
14 changes: 13 additions & 1 deletion components/eamxx/src/share/io/tests/io_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ void write (const std::string& avg_type, const std::string& freq_units,

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_basic"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", avg_type);
Expand All @@ -154,6 +153,15 @@ void write (const std::string& avg_type, const std::string& freq_units,
ctrl_pl.set("Frequency",freq);
ctrl_pl.set("save_grid_data",false);

// While setting this is in practice irrelevant (we would close
// the file anyways at the end of the run), we can test that the OM closes
// the file AS SOON as it's full (before calling finalize)
int max_snaps = num_output_steps;
if (avg_type=="INSTANT") {
++max_snaps;
}
om_pl.set("Max Snapshots Per File", max_snaps);

// Create Output manager
OutputManager om;

Expand Down Expand Up @@ -181,6 +189,10 @@ void write (const std::string& avg_type, const std::string& freq_units,
om.run (t);
}

// Check that the file was closed, since we reached full capacity
const auto& file_specs = om.output_file_specs();
REQUIRE (not file_specs.is_open);

// Close file and cleanup
om.finalize();
}
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_diags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ void write (const int seed, const ekat::Comm& comm)

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_diags"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("INSTANT"));
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_filled.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ void write (const std::string& avg_type, const std::string& freq_units,

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_filled"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", avg_type);
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_monthly.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ void write (const int seed, const ekat::Comm& comm)

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_monthly"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("Instant"));
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ void write (const int freq, const int seed, const int ps, const ekat::Comm& comm

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix","io_packed_ps"+std::to_string(ps));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("INSTANT"));
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_remap_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,6 @@ ekat::ParameterList set_output_params(const std::string& name, const std::string
params.set<std::string>("Averaging Type","Instant");
params.set<int>("Max Snapshots Per File",1);
params.set<std::string>("Floating Point Precision","real");
params.set<bool>("MPI Ranks in Filename",true);
auto& oc = params.sublist("output_control");
oc.set<int>("Frequency",1);
oc.set<std::string>("frequency_units","nsteps");
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_se_grid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ TEST_CASE("se_grid_io")
params.set<int>("Max Snapshots Per File",1);
params.set<strvec_t>("Field Names",{"field_1","field_2","field_3","field_packed"});
params.set<std::string>("Floating Point Precision","real");
params.set("MPI Ranks in Filename",true);
auto& ctl_pl = params.sublist("output_control");
ctl_pl.set("Frequency",1);
ctl_pl.set<std::string>("frequency_units","nsteps");
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/output_restart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ TEST_CASE("output_restart","io")
output_params.set<std::string>("Floating Point Precision","real");
output_params.set<std::vector<std::string>>("Field Names",{"field_1", "field_2", "field_3", "field_4","field_5"});
output_params.set<double>("fill_value",FillValue);
output_params.set<bool>("MPI Ranks in Filename","true");
output_params.set<int>("flush_frequency",1);
output_params.sublist("output_control").set<std::string>("frequency_units","nsteps");
output_params.sublist("output_control").set<int>("Frequency",10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,13 @@ std::vector<std::string> create_test_data_files(
}
// Create the output parameters
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("source_data_for_time_interpolation"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("INSTANT"));
om_pl.set("Max Snapshots Per File",snaps_per_file);
auto& ctrl_pl = om_pl.sublist("output_control");
ctrl_pl.set("frequency_units",std::string("nsteps"));
ctrl_pl.set("Frequency",snap_freq);
ctrl_pl.set("MPI Ranks in Filename",true);
ctrl_pl.set("save_grid_data",false);
// Create an output manager, note we use a subclass defined in this test so we can extract
// the list of files created by the output manager.
Expand Down

0 comments on commit 68935b3

Please sign in to comment.