Skip to content

Commit

Permalink
Per technical note in Issue #278
Browse files Browse the repository at this point in the history
- Created make_fc_metadata_from_processing_config
  - there are still some things to iron out but its working on first synthetic test
- Tuned some logic in tf_kernel
  • Loading branch information
kkappler committed Sep 2, 2023
1 parent bc7a497 commit 969f6ba
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 33 deletions.
76 changes: 65 additions & 11 deletions aurora/pipelines/process_mth5.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,51 @@ def export_tf(
def enrich_row(row):
pass

def make_fc_metadata_from_processing_config(fc_decimation_level,
dec_level_config,
ignore_harmonic_indices=True):
"""
In future this should be a class method of FCDecimationLevel or DecimationLevel
Assigns values to an FCDecimationLevel object that come from an aurora processing
DecimationLevel object.
Ignore these props for now:
"channels_estimated" = []
"hdf5_reference": "<HDF5 object reference>",
"id": null,
"mth5_type": "FCDecimation",
"time_period.end": "1980-01-01T00:00:00+00:00",
"time_period.start": "1980-01-01T00:00:00+00:00",
Parameters
----------
fc_decimation_level:
dec_level_config
Returns
-------
"""
fc_decimation_level.metadata.anti_alias_filter = dec_level_config.anti_alias_filter
fc_decimation_level.metadata.decimation_factor = dec_level_config.decimation.factor
fc_decimation_level.metadata.decimation_level = dec_level_config.decimation.level
if ignore_harmonic_indices:
pass
else:
fc_decimation_level.metadata.harmonic_indices = dec_level_config.harmonic_indices()
fc_decimation_level.metadata.id = f"{dec_level_config.decimation.level}"
fc_decimation_level.metadata.method = dec_level_config.method
fc_decimation_level.metadata.min_num_stft_windows = dec_level_config.min_num_stft_windows
fc_decimation_level.metadata.pre_fft_detrend_type = dec_level_config.pre_fft_detrend_type
fc_decimation_level.metadata.prewhitening_type = dec_level_config.prewhitening_type
fc_decimation_level.metadata.recoloring = dec_level_config.recoloring
fc_decimation_level.metadata.sample_rate_decimation = dec_level_config.sample_rate_decimation
fc_decimation_level.metadata.window = dec_level_config.window
# DUPLCIATED PROPERTIES
fc_decimation_level.decimation_factor = dec_level_config.decimation.factor
fc_decimation_level.decimation_level = dec_level_config.decimation.level
return fc_decimation_level

def process_mth5(
config,
tfk_dataset=None,
Expand Down Expand Up @@ -241,17 +286,17 @@ def process_mth5(
tf_dict = {}

for i_dec_level, dec_level_config in enumerate(tfk.valid_decimations()):
# Apply STFT to all runs
local_stfts = []
remote_stfts = []

# NOT sure this is needed if FCs already exist ... although there is no harm in doing it
tfk.update_dataset_df(i_dec_level)

# TFK 1: get clock-zero from data if needed
if dec_level_config.window.clock_zero_type == "data start":
dec_level_config.window.clock_zero = str(tfk.dataset_df.start.min())

# Apply STFT to all runs
local_stfts = []
remote_stfts = []

# Check first if TS processing or accessing FC Levels
for i, row in tfk.dataset_df.iterrows():

Expand All @@ -263,7 +308,7 @@ def process_mth5(
stft_obj = make_stft_objects(
tfk.config, i_dec_level, run_obj, run_xrds, units, row.station_id
)
# ToDo: add proper FC packing into here
# FC packing goes here
if dec_level_config.save_fcs:
if dec_level_config.save_fcs_type == "csv":
print("WARNING: Unless you are debugging or running the tests, saving FCs to csv is unexpected")
Expand All @@ -282,12 +327,21 @@ def process_mth5(
if not row.mth5_obj.h5_is_write():
raise NotImplementedError("See Note #1 at top this method")
fc_group = station_obj.fourier_coefficients_group.add_fc_group(run_obj.metadata.id)
fc_decimation_level = fc_group.add_decimation_level(f"{i_dec_level}")
# print("fc_decimation_level MUST GET ITS METADATA FROM CONFIG THAT WAS USED TO MAKE STFT")
# dec_level_config.id = "0"
# fc_decimation_level = fc_group.add_decimation_level(f"{i_dec_level}",
# decimation_level_metadata=dec_level_config)
# THIS METHOD WILL BE SUPERCEDED BY ONE IN MTH5
# See Technical Note posted on Aurora Issue #278, Sept 1, 2023

# OLD
# fc_decimation_level = fc_group.add_decimation_level(f"{i_dec_level}")

# NEW
# Make a dummy FC decimation level for a skeleton
#from mt_metadata.transfer_functions.processing.fourier_coefficients import
#from mth5.groups.fourier_coefficients import FCDecimationGroup
dummy_fc_decimation_level = fc_group.add_decimation_level("-1")
fc_group.remove_decimation_level("-1")
updated_dummy_fcdl = make_fc_metadata_from_processing_config(dummy_fc_decimation_level, dec_level_config)
fcdl_meta = updated_dummy_fcdl.metadata
fc_decimation_level = fc_group.add_decimation_level(f"{i_dec_level}",
decimation_level_metadata=fcdl_meta)

fc_decimation_level.from_xarray(stft_obj)
fc_decimation_level.update_metadata()
Expand Down
48 changes: 26 additions & 22 deletions aurora/pipelines/transfer_function_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,17 @@ def check_if_fcdecimation_group_has_fcs(fcdec_group, decimation_level, remote):
return False

# anti_alias_filter (AAF)
cond1 = decimation_level.anti_alias_filter == "default"
cond2 = fcdec_group.metadata.anti_alias_filter is None
if cond1 & cond2:
pass
else:
msg = "Antialias Filters Not Compatible -- need to add handling for "
msg =f"{msg} fcdec {fcdec_group.metadata.anti_alias_filter} and processing config:{decimation_level.anti_alias_filter}"
raise NotImplementedError(msg)
try:
assert fcdec_group.metadata.anti_alias_filter == decimation_level.anti_alias_filter
except AssertionError:
cond1 = decimation_level.anti_alias_filter == "default"
cond2 = fcdec_group.metadata.anti_alias_filter is None
if cond1 & cond2:
pass
else:
msg = "Antialias Filters Not Compatible -- need to add handling for "
msg =f"{msg} fcdec {fcdec_group.metadata.anti_alias_filter} and processing config:{decimation_level.anti_alias_filter}"
raise NotImplementedError(msg)

# Sample rate
try:
Expand Down Expand Up @@ -122,23 +125,20 @@ def check_if_fcdecimation_group_has_fcs(fcdec_group, decimation_level, remote):
# rather than work with frequencies explcitly.
# note that if harmonic_indices is -1, it means keep all so we can skip this check.
if -1 in fcdec_group.metadata.harmonic_indices:
return True
pass
else:
harmonic_indices_requested = []
for band in decimation_level.bands:
print(band)
indices = list(np.arange(band.index_min, band.index_max+1))
harmonic_indices_requested += indices
harmonic_indices_requested = decimation_level.harmonic_indices()
print(f"determined that {harmonic_indices_requested} are the requested indices")
fcdec_group_set = set(fcdec_group.metadata.harmonic_indices)
processing_set = set(harmonic_indices_requested)
if processing_set.issubset(fcdec_group_set):
return True
pass
else:
msg = f"Processing FC indices {processing_set} is not contained in FC indices {fcdec_group_set}"
print(msg)
return False

#failed no checks if you get here
return True


def check_if_fcgroup_supports_processing_config(fc_group, processing_config, remote):
Expand All @@ -150,7 +150,9 @@ def check_if_fcgroup_supports_processing_config(fc_group, processing_config, rem
Currently using a nested for loop, but this can be made a bit quicker by checking if sample_rates
are in agreement (if they aren't we don't need to check any other parameters)
Also, Once an fc_decimation_id is found to match a dec_level, we don't need to keep checking that fc_decimation_id
Note #1: The idea is to
Parameters
----------
fc_group
Expand All @@ -160,8 +162,10 @@ def check_if_fcgroup_supports_processing_config(fc_group, processing_config, rem
-------
"""
fc_decimation_ids_to_check = fc_group.groups_list
levels_present = np.full(processing_config.num_decimation_levels, False)
for i, dec_level in enumerate(processing_config.decimations):
# See Note #1
#print(f"{i}")
#print(f"{dec_level}")

Expand All @@ -171,11 +175,14 @@ def check_if_fcgroup_supports_processing_config(fc_group, processing_config, rem
return False

# iterate over fc_group decimations
for fc_decimation_id in fc_group.groups_list:
# This can be done smarter ... once an fc_decimation_id is found to
for fc_decimation_id in fc_decimation_ids_to_check:
fc_dec_group = fc_group.get_decimation_level(fc_decimation_id)
levels_present[i] = check_if_fcdecimation_group_has_fcs(fc_dec_group, dec_level, remote)

if levels_present[i]:
continue #break inner loop
fc_decimation_ids_to_check.remove(fc_decimation_id) #no need to look at this one again
break #break inner loop


return levels_present.all()
Expand Down Expand Up @@ -256,16 +263,13 @@ def initialize_mth5s(self, mode="r"):
self._mth5_objs = mth5_objs
return

def update_dataset_df(self,i_dec_level):
def update_dataset_df(self,i_dec_level, fc_existence_info=None):
"""
This could and probably should be moved to TFK.update_dataset_df()
This function has two different modes. The first mode, initializes values in the
array, and could be placed into TFKDataset.initialize_time_series_data()
The second mode, decimates. The function is kept in pipelines becasue it calls
time series operations.
Notes:
1. When iterating over dataframe, (i)ndex must run from 0 to len(df), otherwise
get indexing errors. Maybe reset_index() before main loop? or push reindexing
Expand Down

0 comments on commit 969f6ba

Please sign in to comment.