From f4426c3c0dd24192b83fe34ca5a7305a9a8a1c9a Mon Sep 17 00:00:00 2001 From: gondiaz Date: Tue, 16 Nov 2021 18:27:29 +0100 Subject: [PATCH] Document Event_Mixer class --- invisible_cities/evm/mixer.py | 151 +++++++++++++++++++++++++++------- 1 file changed, 122 insertions(+), 29 deletions(-) diff --git a/invisible_cities/evm/mixer.py b/invisible_cities/evm/mixer.py index d7c635e4a..a9753b9b0 100644 --- a/invisible_cities/evm/mixer.py +++ b/invisible_cities/evm/mixer.py @@ -13,21 +13,45 @@ get_file_number = lambda filename: int(filename.split("/")[-1].split(".h5")[0].split("_")[1]) class Event_Mixer(): + ''' + This class writes MC mixed isaura files. It reads the files separately for each + MC component pair (isotope, g4volume), selects the provided number of events + for each component, and writes them in mixed files (ie files that contain events for each component). + + Parameters: + ---------- + :inpath: (str) the input path of the MC files, which must explicitly depend on the simulated component + through "g4volume" and "isotope" variables. For example: "/somepath/{g4volume}/{isotope}" + + :outpath:(str) the output path for where mixed files will be saved. + + :events_df: (pd.DataFrame) dataframe with 3 columns: G4Volume, Isotope, nevts + + :nevents_per_file: (int) the number of events to same in each mixed file. Each mixed file will + contain the fraction of events for each component based on :events_df: + ''' def __init__(self, inpath : str , outpath : str , events_df: pd.DataFrame , nevents_per_file: int): + ''' + Initializes the constant (fixed-value) class atributes, except for self.counter + which is modified some of the methods. + ''' self.inpath = os.path.expandvars(inpath) self.outpath = os.path.expandvars(outpath) self.nevents_per_file = nevents_per_file - self._init_counter(events_df) # defines self.counter + # defines self.counter + self._init_counter(events_df) + # number of output files self.nfiles_out = \ int(np.ceil(self.counter.nevts.sum() / self.nevents_per_file)) + # (group, node) pair of isaura file tables (ignoring Filters) self.tables = dict(( ( "events", ("Run", "events") ) , ( "runInfo", ("Run", "runInfo") ) , ( "eventMap", ("Run", "eventMap")) @@ -40,7 +64,20 @@ def __init__(self, inpath : str )) return + def _init_counter(self, df): + ''' + Defines the class counter, that keeps track of the files and events inside the file + that has already been written by the mixer for each component. + It contains 6 columns: (G4Volume, Isotope, nevts, file_idx, evt_idx, nevts_per_file) + + For each (G4Volume, Isotope) component, the columns represent: + :nevts: the number of events to write + :file_idx: file index in the ordered list of input files + :evt_idx : event index of the last event writed + (refered to the last input file with writen events) + :nevts_per_file: the number of events in each output file, as a fraction of the total events. + ''' self.counter = df.copy() @@ -50,13 +87,26 @@ def _init_counter(self, df): return + def run(self): + ''' + Runs the event mixer. + + For each output file, performs the following tasks: + - initilaze empty dataframes for each isaura-table, to be filled with the corresponding data + for each (g4volume, isotope) component and finally saved + - loop on (g4volume, isotope) components to read and append the data to the dataframes + - write the dataframes to the output file + + Once all the output files are writen, the dataframes used at the last file are disposed. + ''' i = 0 # loop on output files while (i < self.nfiles_out): + # initilaze dataframes to append data from all components self._init_dfs() # open output file to write @@ -68,50 +118,62 @@ def run(self): # loop on (g4volume, isotope) for cidx in self.counter.index: + # events already writen if (self.counter.loc[cidx, "nevts"] == 0): continue + + # append component data to dataframes self._get_component_data(cidx) - # protection againts nevents_per_file < number of (g4volume, isotope) + # protection againts nevents_per_file < number of components if (len(self.events) >= self.nevents_per_file): break + # write dataframes to output file self._write_data(h5out) h5out.flush() i += 1 + + # dispose the dataframes writen in the last output file self._dispose_dfs() return + def _init_dfs(self): - '''self.name = pd.DataFrame()''' + ''' + Defines empty dataframes for each table: self.table_name = pd.DataFrame() + ''' for key, _ in self.tables.items(): setattr(self, key, pd.DataFrame()) return - def _dispose_dfs(self): - "del self.name & del self.name_" - for key, table in self.tables.items(): - delattr(self, key) - delattr(self, key + "_") def _get_component_data(self, cidx): - - g4volume = self.counter.loc[cidx, "G4Volume"] - isotope = self.counter.loc[cidx, "Isotope"] - file_idx = self.counter.loc[cidx, "file_idx"] + ''' + For the component (g4volume, isotope) with index "cidx" in self.counter dataframe, + selects the data for the desired number of events in file and appends it to the + self.table_name dataframes. + ''' + # read component info for the given cidx + g4volume = self.counter.loc[cidx, "G4Volume"] + isotope = self.counter.loc[cidx, "Isotope"] + file_idx = self.counter.loc[cidx, "file_idx"] nevts_per_file = self.counter.loc[cidx, "nevts_per_file"] + # list of filenames, sorted by file-number filenames = sorted( glob.glob(self.inpath.format(g4volume = g4volume, isotope = isotope)) , key = get_file_number) for filename in filenames[file_idx:]: - # protection from nevts_per_file ceiling + # protection againts nevts_per_file ceiling nevts_per_file_ = min(self.nevents_per_file-len(self.events), nevts_per_file) + # read data for this filename as dataframes, named seld.table_name_ + # (notice the underscore to diferenciate from self.table_name global dataframe) self._read_data(filename) evt_idx = self.counter.loc[cidx, "evt_idx"] nevts = self.counter.loc[cidx, "nevts"] - n_idle = (len(self.events_) - evt_idx) + n_idle = (len(self.events_) - evt_idx) # number of unselected events # enough events in file if (n_idle > nevts_per_file_): @@ -122,6 +184,7 @@ def _get_component_data(self, cidx): self._write_component_info(g4volume, isotope) self._concat_data() + # update counter self.counter.loc[cidx, "evt_idx"] = last_idx self.counter.loc[cidx, "nevts"] -= len(self.events_) break @@ -132,6 +195,7 @@ def _get_component_data(self, cidx): self._write_component_info(g4volume, isotope) self._concat_data() + # update counter self.counter.loc[cidx, "evt_idx"] = 0 self.counter.loc[cidx, "file_idx"] += 1 self.counter.loc[cidx, "nevts"] -= len(self.events_) @@ -139,8 +203,12 @@ def _get_component_data(self, cidx): if (self.counter.loc[cidx, "nevts"] == 0): break return + def _read_data(self, filename): - '''self.name_ = load_dst(filename, group, node)''' + ''' + Reads data for each table in input file: self.name_ = load_dst(filename, group, node) + Notice that index is set to the event number, simplifying data selection at self._select_data + ''' for key, table in self.tables.items(): @@ -157,19 +225,12 @@ def _read_data(self, filename): setattr(self, key + "_", load_dst(filename, *table).set_index("event_id")) return - def _write_data(self, h5out): - '''df_writer(h5out, self.name, group, node, 'ZLIB4')''' - - for key, table in self.tables.items(): - if (key in ("events", "runInfo")): - exec(f"df_writer(h5out, self.{key}, *table, 'ZLIB4')") - else: - exec(f"df_writer(h5out, self.{key}.reset_index(), *table, 'ZLIB4')") - return def _select_data(self): - '''self.name_ = self.name_.loc[self.name_.index.intersection(self.events_.evt_number)]''' - + ''' + Selects the file data for the events in self.events_: + self.table_name_ = self.table_name_.loc[self.table_name_.index.intersection(self.events_.evt_number)] + ''' for key, table in self.tables.items(): if (key == "events"): continue @@ -184,21 +245,53 @@ def _select_data(self): exec(f"self.{key}_ = self.{key}_.loc[self.{key}_.index.intersection(self.eventMap_.nexus_evt)]") return - def _write_component_info(self, g4volume, isotope): - '''self.name_.loc[:, ('G4Volume', 'Isotope')] = (g4volume, isotope)''' + def _write_component_info(self, g4volume, isotope): + ''' + Add (G4Volume, Isotope) columns in dataframes: + self.name_.loc[:, ('G4Volume', 'Isotope')] = (g4volume, isotope) + ''' for key, table in self.tables.items(): if (key == "runInfo"): continue exec(f"self.{key}_.loc[:, ('G4Volume', 'Isotope')] = ('{g4volume}', '{isotope}')") return + def _concat_data(self): - '''self.name = pd.concat([self.name_, self.name])''' + ''' + Append self.table_name_ to self.table_name: + self.table_name = pd.concat([self.table_name_, self.table_name]) + ''' for key, table in self.tables.items(): setattr(self, key, pd.concat([getattr(self, key + "_"), getattr(self, key)])) return + def _write_data(self, h5out): + ''' + Write self.table_name dataframes to output file: + df_writer(h5out, self.table_name, group, node, 'ZLIB4') + ''' + for key, table in self.tables.items(): + if (key in ("events", "runInfo")): + exec(f"df_writer(h5out, self.{key}, *table, 'ZLIB4')") + else: + exec(f"df_writer(h5out, self.{key}.reset_index(), *table, 'ZLIB4')") + return + + + def _dispose_dfs(self): + ''' + Deletes dataframe atributtes: + del self.table_name + del self.table_name_ + ''' + for key, table in self.tables.items(): + delattr(self, key) + delattr(self, key + "_") + return + + def get_mixer_nevents(exposure : float, detector_db : str = "next100", isotopes : list = "all"): ''' This function computes the number of events of each component (isotope, volume) pairs