Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flush and close output files that are full #3032

Merged
merged 8 commits into from
Oct 17, 2024
14 changes: 13 additions & 1 deletion components/eamxx/cime_config/namelist_defaults_scream.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ be lost if SCREAM_HACK_XML is not enabled.
<sections>ctl_nl</sections>
</file>
<file name="data/scream_input.yaml" format="yaml">
<sections>driver_options,iop_options,atmosphere_processes,grids_manager,initial_conditions,Scorpio,e3sm_parameters</sections>
<sections>driver_debug_options,driver_options,iop_options,atmosphere_processes,grids_manager,initial_conditions,Scorpio,e3sm_parameters</sections>
</file>
</generated_files>

Expand Down Expand Up @@ -602,6 +602,18 @@ be lost if SCREAM_HACK_XML is not enabled.
</model_restart>
</Scorpio>

<!-- driver debug options for eamxx -->
<!--
These options are meant for power users to debug the code.
They are not meant to be used for typical eamxx simulations.
-->
<driver_debug_options>
<force_crash_nsteps type="integer"
doc="Force simulation to fail at a specific atmosphere timestep">
-9999
</force_crash_nsteps>
</driver_debug_options>

<!-- driver_options options for scream -->
<driver_options>
<atmosphere_dag_verbosity_level>0</atmosphere_dag_verbosity_level>
Expand Down
7 changes: 7 additions & 0 deletions components/eamxx/src/control/atmosphere_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,13 @@ initialize (const ekat::Comm& atm_comm,
void AtmosphereDriver::run (const int dt) {
start_timer("EAMxx::run");

// DEBUG option: Check if user has set the run to fail at a specific timestep.
if (m_atm_params.isSublist("driver_debug_options")) {
bool force_fail_nstep = m_atm_params.sublist("driver_debug_options").get<int>("force_crash_nsteps", -9999)==m_current_ts.get_num_steps();
if (force_fail_nstep) {
abort();
}
}
// Make sure the end of the time step is after the current start_time
EKAT_REQUIRE_MSG (dt>0, "Error! Input time step must be positive.\n");

Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/dynamics/homme/tests/dyn_grid_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,6 @@ TEST_CASE("dyn_grid_io")
ekat::ParameterList out_params;
out_params.set<std::string>("Averaging Type","Instant");
out_params.set<std::string>("filename_prefix","dyn_grid_io");
out_params.set<bool>("MPI Ranks in Filename",true);
out_params.sublist("Fields").sublist("Dynamics").set<std::vector<std::string>>("Field Names",fnames);
out_params.sublist("Fields").sublist("Dynamics").set<std::string>("IO Grid Name","Physics GLL");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ create_om (const std::string& filename_prefix,
params.set<std::string>("Averaging Type","INSTANT");
params.set<std::string>("filename_prefix",filename_prefix);
params.set<std::string>("Floating Point Precision","real");
params.set("MPI Ranks in Filename", false);
params.set("Field Names",strvec_t{"p_mid","U","V"});
params.set("fill_value",fill_val);

Expand Down
21 changes: 18 additions & 3 deletions components/eamxx/src/share/io/scream_io_control.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,38 @@ struct IOControl {
util::TimeStamp next_write_ts;
util::TimeStamp last_write_ts;

// At run time, set dt in the struct, so we can compute next_write_ts correctly,
// even if freq_units is "nsteps"
// NOTE: this ASSUMES dt is constant throughout the run (i.e., no time adaptivity).
// An error will be thrown if dt changes, so developers can fix this if we ever support variable dt
double dt = 0;

bool output_enabled () const {
return frequency_units!="none" && frequency_units!="never";
}

bool is_write_step (const util::TimeStamp& ts) const {
if (not output_enabled()) return false;
return frequency_units=="nsteps" ? ts.get_num_steps()==next_write_ts.get_num_steps()
: ts==next_write_ts;
: (ts.get_date()==next_write_ts.get_date() and
ts.get_time()==next_write_ts.get_time());
}

void set_dt (const double dt_in) {
EKAT_REQUIRE_MSG (dt==0 or dt==dt_in,
"[IOControl::set_dt] Error! Cannot reset dt once it is set.\n");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like above.


dt = dt_in;
}

// Computes next_write_ts from frequency and last_write_ts
void compute_next_write_ts () {
EKAT_REQUIRE_MSG (last_write_ts.is_valid(),
"Error! Cannot compute next_write_ts, since last_write_ts was never set.\n");
if (frequency_units=="nsteps") {
// This avoids having an invalid date/time in the above check next time this fcn runs
next_write_ts = last_write_ts;
// This avoids having an invalid/wrong date/time in StorageSpecs::snapshot_fits
// if storage type is NumSnaps
next_write_ts = last_write_ts + dt*frequency;
next_write_ts.set_num_steps(last_write_ts.get_num_steps()+frequency);
} else if (frequency_units=="nsecs") {
next_write_ts = last_write_ts;
Expand Down
5 changes: 2 additions & 3 deletions components/eamxx/src/share/io/scream_io_file_specs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ struct StorageSpecs {
// - type=NumSnaps: the number of stored snaps is less than the max allowed per file.
// - otherwise: the snapshot month/year index match the one currently stored in the file
// or the file has no snapshot stored yet
bool snapshot_fits (const util::TimeStamp& t) {
bool snapshot_fits (const util::TimeStamp& t) const {
const auto& idx = type==Monthly ? t.get_month() : t.get_year();
switch (type) {
case Yearly:
Expand Down Expand Up @@ -86,9 +86,8 @@ struct IOFileSpecs {
// If positive, flush the output file every these many snapshots
int flush_frequency = std::numeric_limits<int>::max();

// bool file_is_full () const { return num_snapshots_in_file>=max_snapshots_in_file; }
bool file_needs_flush () const {
return storage.num_snapshots_in_file%flush_frequency==0;
return storage.num_snapshots_in_file>0 and storage.num_snapshots_in_file%flush_frequency==0;
}

// Whether it is a model output, model restart, or history restart file
Expand Down
59 changes: 43 additions & 16 deletions components/eamxx/src/share/io/scream_output_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,17 @@ setup (const ekat::Comm& io_comm, const ekat::ParameterList& params,
m_resume_output_file = last_output_filename!="" and not restart_pl.get("force_new_file",false);
if (m_resume_output_file) {
int num_snaps = scorpio::get_attribute<int>(rhist_file,"GLOBAL","last_output_file_num_snaps");

m_output_file_specs.filename = last_output_filename;
m_output_file_specs.is_open = true;
m_output_file_specs.storage.num_snapshots_in_file = num_snaps;

if (m_output_file_specs.storage.snapshot_fits(m_output_control.next_write_ts)) {
// The setup_file call will not register any new variable (the file is in Append mode,
// so all dims/vars must already be in the file). However, it will register decompositions,
// since those are a property of the run, not of the file.
m_output_file_specs.filename = last_output_filename;
m_output_file_specs.is_open = true;
setup_file(m_output_file_specs,m_output_control);
} else {
m_output_file_specs.close();
}
}
scorpio::release_file(rhist_file);
Expand Down Expand Up @@ -299,6 +300,24 @@ void OutputManager::init_timestep (const util::TimeStamp& start_of_step, const R
return;
}

// Make sure dt is in the control
m_output_control.set_dt(dt);

if (start_of_step==m_case_t0 and m_avg_type==OutputAvgType::Instant and
m_output_file_specs.storage.type!=NumSnaps and m_output_control.frequency_units=="nsteps") {
// This is the 1st step of the whole run, and a very sneaky corner case. Bear with me.
// When we call run, we also compute next_write_ts. Then, we use next_write_ts to see if the
// next output step will fit in the currently open file, and, if not, close it right away.
// For a storage type!=NumSnaps, we need to have a valid timestamp for next_write_ts, which
// for freq=nsteps requires to know dt. But at t=case_t0, we did NOT have dt, which means we
// computed next_write_ts=last_write_ts (in terms of date:time, the num_steps is correct).
// This means that at that time we deemed that the next_write_ts definitely fit in the same
// file as last_write_ts (date/time are the same!), which may or may not be true for non NumSnaps
// storage. To fix this, we recompute next_write_ts here, and close the file if it doesn't.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: doesn't ... what? fit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, this seems a bit too complex? Some questions:

  • when did we the "currently open file"?
  • why can't we have all the info we need to determine if we can close it after write? If we are deciding to flush and close full files (per title of PR), then can't we deduce that if the number of snaps in file == max number of snaps then close based on that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true for storage type "NumSnaps". But with type "one_month" (say), we can't say if the file is full unless we know the time stamp of the next write.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, at a given time step, we know (can calculate) its time stamp, right? Then, why can't we deduce if one_month or one_day is ending here? Do we not know dt?

I understand the logic can be too convoluted, but it is still doable, no?

We don't have to do it now, but trying to understand if it is doable at all (ignoring the fact that we may choose not to make the code super ugly for some corner case)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot compute the next time stamp during t0 output. The driver does not have info about dt during the init sequence, which is when the OM is setup. When I originally designed the driver, I wanted to separate concerns as much as possible. In my mind, dt was a "run" time param, not an "init" time param (I didn't even know if dt could in principle change dynamically down the road).

If you want to compute next_write_ts during t0 output (which, again, happens during the init sequence), we need to pass dt to the driver init methods (from the f90 cpl interface). We can of course do that. And all in all, it may make the code simpler. It's a slightly deeper interface change though, so we could do it as a follow up PR.

m_output_control.compute_next_write_ts();
close_or_flush_if_needed (m_output_file_specs,m_output_control);
}

// Check if the end of this timestep will correspond to an output step. If not, there's nothing to do
const auto& end_of_step = start_of_step+dt;

Expand Down Expand Up @@ -340,10 +359,6 @@ void OutputManager::run(const util::TimeStamp& timestamp)
"The most likely cause is an output frequency that is faster than the atm timestep.\n"
"Try to increase 'Frequency' and/or 'frequency_units' in your output yaml file.\n");

// Update counters
++m_output_control.nsamples_since_last_write;
++m_checkpoint_control.nsamples_since_last_write;

if (m_atm_logger) {
m_atm_logger->debug("[OutputManager::run] filename_prefix: " + m_filename_prefix + "\n");
}
Expand Down Expand Up @@ -373,6 +388,14 @@ void OutputManager::run(const util::TimeStamp& timestamp)
const bool is_full_checkpoint_step = is_checkpoint_step && has_checkpoint_data && not is_output_step;
const bool is_write_step = is_output_step || is_checkpoint_step;

// Update counters
++m_output_control.nsamples_since_last_write;
if (not is_t0_output) {
// In case REST_OPT=nsteps, don't count t0 output as one of those steps
// NOTE: for m_output_control, it doesn't matter, since it'll be reset to 0 before we return
Comment on lines +394 to +395
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmmmm.... I see where things get weird! I wonder if shifting the indexing altogether can help?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbc, this small issue is unrelated from closing the file at the right time. To be honest, I think we could flat out rm the line that updates nsamples_since_last_write for checkpoint control: it is not used anyways! And since, as the comment states, for output control it doesn't matter, we may as well remove this if block altogether...

++m_checkpoint_control.nsamples_since_last_write;
}

// Create and setup output/checkpoint file(s), if necessary
start_timer(timer_root+"::get_new_file");
auto setup_output_file = [&](IOControl& control, IOFileSpecs& filespecs) {
Expand All @@ -393,10 +416,6 @@ void OutputManager::run(const util::TimeStamp& timestamp)
snapshot_start = m_case_t0;
snapshot_start += m_time_bnds[0];
}
if (filespecs.is_open and not filespecs.storage.snapshot_fits(snapshot_start)) {
release_file(filespecs.filename);
filespecs.close();
}

// Check if we need to open a new file
if (not filespecs.is_open) {
Expand Down Expand Up @@ -546,10 +565,7 @@ void OutputManager::run(const util::TimeStamp& timestamp)
scorpio::write_var(filespecs.filename, "time_bnds", m_time_bnds.data());
}

// Check if we need to flush the output file
if (filespecs.file_needs_flush()) {
flush_file (filespecs.filename);
}
close_or_flush_if_needed(filespecs,control);
};

start_timer(timer_root+"::update_snapshot_tally");
Expand Down Expand Up @@ -908,7 +924,18 @@ void OutputManager::set_file_header(const IOFileSpecs& file_specs)
set_str_att("Conventions","CF-1.8");
set_str_att("product",e2str(file_specs.ftype));
}
/*===============================================================================================*/
void OutputManager::
close_or_flush_if_needed ( IOFileSpecs& file_specs,
const IOControl& control) const
{
if (not file_specs.storage.snapshot_fits(control.next_write_ts)) {
scorpio::release_file(file_specs.filename);
file_specs.close();
} else if (file_specs.file_needs_flush()) {
scorpio::flush_file (file_specs.filename);
}
}

void OutputManager::
push_to_logger()
{
Expand Down
8 changes: 8 additions & 0 deletions components/eamxx/src/share/io/scream_output_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ class OutputManager
void finalize();

long long res_dep_memory_footprint () const;

// For debug and testing purposes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind elaborating what debug and testing we mean here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I want to be able to verify the correctness of the control/filespecs structs during unit tests. The comment was meant to say "this is not really needed at runtime".

const IOControl& output_control () const { return m_output_control; }
const IOFileSpecs& output_file_specs () const { return m_output_file_specs; }
protected:

std::string compute_filename (const IOControl& control,
Expand All @@ -133,6 +137,10 @@ class OutputManager
void setup_file ( IOFileSpecs& filespecs,
const IOControl& control);

// If a file can be closed (next snap won't fit) or needs flushing, do so
void close_or_flush_if_needed ( IOFileSpecs& file_specs,
const IOControl& control) const;

// Manage logging of info to atm.log
void push_to_logger();

Expand Down
14 changes: 13 additions & 1 deletion components/eamxx/src/share/io/tests/io_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,6 @@ void write (const std::string& avg_type, const std::string& freq_units,

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_basic"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", avg_type);
Expand All @@ -154,6 +153,15 @@ void write (const std::string& avg_type, const std::string& freq_units,
ctrl_pl.set("Frequency",freq);
ctrl_pl.set("save_grid_data",false);

// While setting this is in practice irrelevant (we would close
// the file anyways at the end of the run), we can test that the OM closes
// the file AS SOON as it's full (before calling finalize)
int max_snaps = num_output_steps;
if (avg_type=="INSTANT") {
++max_snaps;
}
om_pl.set("Max Snapshots Per File", max_snaps);

// Create Output manager
OutputManager om;

Expand Down Expand Up @@ -181,6 +189,10 @@ void write (const std::string& avg_type, const std::string& freq_units,
om.run (t);
}

// Check that the file was closed, since we reached full capacity
const auto& file_specs = om.output_file_specs();
REQUIRE (not file_specs.is_open);

// Close file and cleanup
om.finalize();
}
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_diags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ void write (const int seed, const ekat::Comm& comm)

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_diags"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("INSTANT"));
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_filled.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ void write (const std::string& avg_type, const std::string& freq_units,

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_filled"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", avg_type);
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_monthly.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ void write (const int seed, const ekat::Comm& comm)

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("io_monthly"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("Instant"));
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_packed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ void write (const int freq, const int seed, const int ps, const ekat::Comm& comm

// Create output params
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix","io_packed_ps"+std::to_string(ps));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("INSTANT"));
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_remap_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,6 @@ ekat::ParameterList set_output_params(const std::string& name, const std::string
params.set<std::string>("Averaging Type","Instant");
params.set<int>("Max Snapshots Per File",1);
params.set<std::string>("Floating Point Precision","real");
params.set<bool>("MPI Ranks in Filename",true);
auto& oc = params.sublist("output_control");
oc.set<int>("Frequency",1);
oc.set<std::string>("frequency_units","nsteps");
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/io_se_grid.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ TEST_CASE("se_grid_io")
params.set<int>("Max Snapshots Per File",1);
params.set<strvec_t>("Field Names",{"field_1","field_2","field_3","field_packed"});
params.set<std::string>("Floating Point Precision","real");
params.set("MPI Ranks in Filename",true);
auto& ctl_pl = params.sublist("output_control");
ctl_pl.set("Frequency",1);
ctl_pl.set<std::string>("frequency_units","nsteps");
Expand Down
1 change: 0 additions & 1 deletion components/eamxx/src/share/io/tests/output_restart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ TEST_CASE("output_restart","io")
output_params.set<std::string>("Floating Point Precision","real");
output_params.set<std::vector<std::string>>("Field Names",{"field_1", "field_2", "field_3", "field_4","field_5"});
output_params.set<double>("fill_value",FillValue);
output_params.set<bool>("MPI Ranks in Filename","true");
output_params.set<int>("flush_frequency",1);
output_params.sublist("output_control").set<std::string>("frequency_units","nsteps");
output_params.sublist("output_control").set<int>("Frequency",10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,15 +385,13 @@ std::vector<std::string> create_test_data_files(
}
// Create the output parameters
ekat::ParameterList om_pl;
om_pl.set("MPI Ranks in Filename",true);
om_pl.set("filename_prefix",std::string("source_data_for_time_interpolation"));
om_pl.set("Field Names",fnames);
om_pl.set("Averaging Type", std::string("INSTANT"));
om_pl.set("Max Snapshots Per File",snaps_per_file);
auto& ctrl_pl = om_pl.sublist("output_control");
ctrl_pl.set("frequency_units",std::string("nsteps"));
ctrl_pl.set("Frequency",snap_freq);
ctrl_pl.set("MPI Ranks in Filename",true);
mahf708 marked this conversation as resolved.
Show resolved Hide resolved
ctrl_pl.set("save_grid_data",false);
// Create an output manager, note we use a subclass defined in this test so we can extract
// the list of files created by the output manager.
Expand Down