From 084f21fcaf091c5665ad08b39962086ed7e0e1ec Mon Sep 17 00:00:00 2001 From: Ahmad Tourei Date: Mon, 30 Dec 2024 23:03:15 -0700 Subject: [PATCH] Low-freq proc (#470) --------- Co-authored-by: derrick chambers --- dascore/core/patch.py | 1 + dascore/core/spool.py | 2 + dascore/examples.py | 103 +++++++++++++++++++- dascore/utils/chunk.py | 2 +- docs/recipes/low_freq_proc.qmd | 166 ++++++++++++++++++++++++++++++++ docs/recipes/real_time_proc.qmd | 11 ++- scripts/_templates/_quarto.yml | 1 + tests/conftest.py | 2 +- tests/test_examples.py | 90 +++++++++++++++++ 9 files changed, 370 insertions(+), 8 deletions(-) create mode 100644 docs/recipes/low_freq_proc.qmd diff --git a/dascore/core/patch.py b/dascore/core/patch.py index dd20e9cf..8573868a 100644 --- a/dascore/core/patch.py +++ b/dascore/core/patch.py @@ -260,6 +260,7 @@ def channel_count(self) -> int: coords_from_df = dascore.proc.coords_from_df make_broadcastable_to = dascore.proc.make_broadcastable_to apply_ufunc = dascore.proc.apply_ufunc + get_patch_names = get_patch_names def get_patch_name(self, *args, **kwargs) -> str: """ diff --git a/dascore/core/spool.py b/dascore/core/spool.py index 13dc6177..796938a9 100644 --- a/dascore/core/spool.py +++ b/dascore/core/spool.py @@ -605,6 +605,8 @@ def get_contents(self) -> pd.DataFrame: """{doc}.""" return self._df[filter_df(self._df, **self._select_kwargs)] + get_patch_names = get_patch_names + class MemorySpool(DataFrameSpool): """A Spool for storing patches in memory.""" diff --git a/dascore/examples.py b/dascore/examples.py index aa2a3f0f..61690aaf 100644 --- a/dascore/examples.py +++ b/dascore/examples.py @@ -17,7 +17,7 @@ from dascore.exceptions import UnknownExampleError from dascore.utils.docs import compose_docstring from dascore.utils.downloader import fetch -from dascore.utils.misc import register_func +from dascore.utils.misc import iterate, register_func from dascore.utils.patch import get_patch_names from dascore.utils.time import to_timedelta64 @@ -415,6 +415,107 @@ def _ricker(time, delay): return dc.Patch(data=data, coords=coords, dims=dims) +@register_func(EXAMPLE_PATCHES, key="delta_patch") +def delta_patch( + dim="time", + shape=(10, 200), + time_min="2020-01-01", + time_step=1 / 250, + distance_min=0, + distance_step=1, + patch=None, +): + """ + Create a delta function patch (zeros everywhere except for + a unit value at the center) along the specified dimension. + The returned delta patch has single coordinate(s) along the + other dimensions. + + Parameters + ---------- + dim : str + The dimension at the center of which to place the unit value. + Typically ``"time"`` or ``"distance"``. + shape : tuple of int + The shape of the data as (distance, time). Defaults to (10, 200). + This is used only if no existing ``patch`` is provided. + time_min : str or datetime64 + The start time of the patch. + time_step : float + The time step in seconds between samples. + distance_min : float + The minimum distance coordinate. + distance_step : float + The distance step in meters between samples. + patch : dascore.Patch + If provided, creates the delta patch based on this existing patch. + Default is None. + """ + if patch is None: + if dim not in ["time", "distance"]: + raise ValueError( + "In case no patch is provided, the delta patch will be " + "a 2D patch with 'time' and 'distance' dimensions." + ) + + dims = ("distance", "time") + dist_len, time_len = shape + + # Create coordinates + time_step_td = to_timedelta64(time_step) + t0 = np.datetime64(time_min) + time_coord = dascore.core.get_coord( + data=t0 + np.arange(time_len) * time_step_td, step=time_step_td, units="s" + ) + dist_coord = dascore.core.get_coord( + data=distance_min + np.arange(dist_len) * distance_step, + step=distance_step, + units="m", + ) + + coords = {"distance": dist_coord, "time": time_coord} + attrs = dict( + time_min=t0, + time_step=time_step_td, + distance_min=distance_min, + distance_step=distance_step, + category="DAS", + network="", + station="", + tag="delta", + time_units="s", + distance_units="m", + ) + + # Depending on the selected dimension, place a line of ones at the midpoint + used_dims = tuple(iterate(dim)) + unused_dims = set(dims) - set(used_dims) + + # Get data with ones centered on selected dimensions. + index = tuple( + shape[dims.index(dimension)] // 2 if dimension in used_dims else 0 + for dimension in dims + ) + data = np.zeros((dist_len, time_len)) + data[index] = 1 + delta_patch = dc.Patch(data=data, coords=coords, dims=dims, attrs=attrs) + return delta_patch.select(**{x: 0 for x in unused_dims}, samples=True) + else: + used_dims = tuple(iterate(dim)) + unused_dims = set(patch.dims) - set(used_dims) + patch = patch.select(**{x: 0 for x in unused_dims}, samples=True) + + # Get data with ones centered on selected dimensions. + shape = patch.shape + index = tuple( + shape[patch.dims.index(dimension)] // 2 if dimension in used_dims else 0 + for dimension in patch.dims + ) + data = np.zeros_like(patch.data) + data[index] = 1 + return patch.update(data=data) + + @register_func(EXAMPLE_PATCHES, key="dispersion_event") def dispersion_event(): """ diff --git a/dascore/utils/chunk.py b/dascore/utils/chunk.py index ca7bb98e..6731e55c 100644 --- a/dascore/utils/chunk.py +++ b/dascore/utils/chunk.py @@ -253,7 +253,7 @@ def _create_df(self, df, name, start_stop, gnum): if len(vals) > 1: msg = ( f"Cannot merge on dim {self._name} because all values for " - f"{col} are not equal. Consider using the `attr_conflict` " + f"{col} are not equal. Consider using the `conflict` " f"argument to loosen this restriction." ) raise CoordMergeError(msg) diff --git a/docs/recipes/low_freq_proc.qmd b/docs/recipes/low_freq_proc.qmd new file mode 100644 index 00000000..a6e1f646 --- /dev/null +++ b/docs/recipes/low_freq_proc.qmd @@ -0,0 +1,166 @@ +--- +title: "Low-Frequency Processing" +--- + +This recipe demonstrates how DASCore can be used to apply low-frequency (LF) processing to a spool of DAS data. LF processing helps efficiently downsample the entire spool. + + +## Get a spool and define parameters +```{python} +import tempfile +from pathlib import Path + +import numpy as np +import os + +import dascore as dc +from dascore.utils.patch import get_patch_names + +## Load libraries and get a spool to work on +# Define path for saving results (swap out with your path) +output_data_dir = Path(tempfile.mkdtemp()) + +# Get a spool to work on +sp = dc.get_example_spool().update() + +# Sort the spool +sp = sp.sort("time") +# You may also want to sub-select the spool for the desired distance or time samples before proceeding. + +# Get the first patch +pa = sp[0] + +# Define the target sampling interval (in sec.) +dt = 1 +# With a sampling interval of 10 seconds, the cutoff frequency (Nyquist frequency) is determined to be 0.05 Hz +cutoff_freq = 1 / (2*dt) + +# Safety factor for the low-pass filter to avoid ailiasing +filter_safety_factor = 0.9 + +# Enter memory size to be dedicated for processing (in MB) +memory_limit_MB = 75 + +# Define a tolerance for determining edge effects (used in next step) +tolerance = 1e-3 +``` + +## Calculate chunk size and determine edge effects + +To chunk the spool, first we need to figure out the chunk size based on machine's memory size so we ensure we can load and process patches with no memory issues. Longer chunk size (longer patches) increases computation efficiency. + +Notes: + +1. The `processing_factor` is required because certain processing routines involve making copies of the data during the processing steps. It should be determined by performing memory profiling on an example dataset for the specific processing routine. For instance, the combination of low-pass filtering and interpolation, discussed in the next section, requires a processing factor of approximately 5. +2. The `memory_safety_factor` is optional and helps prevent getting too close to the memory limit. + +```{python} +# Get patch's number of bytes per seconds (based on patch's data type) +pa_bytes_per_second = pa.data.nbytes / pa.seconds +# Define processing factor and safety factor +processing_factor = 5 +memory_safety_factor = 1.2 + +# Calculate memory size required for each second of data to get processed +memory_size_per_second = pa_bytes_per_second * processing_factor * memory_safety_factor +memory_size_per_second_MB = memory_size_per_second / 1e6 + +# Calculate chunk size that can be loaded (in seconds) +chunk_size = memory_limit_MB / memory_size_per_second_MB + +# Ensure `chunk_size` does not exceed the spool length +merged_sp = sp.chunk(time=None)[0] +if chunk_size > merged_sp.seconds: + print( + f"Warning: Specified `chunk_size` ({chunk_size:.2f} seconds) exceeds the spool length " + f"({merged_sp.seconds:.2f} seconds). Adjusting `chunk_size` to match spool length." + ) + chunk_size = merged_sp.seconds +``` + +Next, we need to determine the extent of artifacts introduced by low-pass filtering at the edges of each patch. To achieve this, we apply LF processing to a delta function patch, which contains a unit value at the center and zeros elsewhere. The distorted edges are then identified based on a defined threshold. + +```{python} +# Retrieve a patch of appropriate size for LF processing that fits into memory +pa_chunked_sp = sp.chunk(time=chunk_size, keep_partial=True)[0] +# Create a delta patch based on new patch size +delta_pa = dc.get_example_patch("delta_patch", dim="time", patch=pa_chunked_sp) + +# Apply the low-pass filter on the delta patch +delta_pa_low_passed = delta_pa.pass_filter(time=(None, cutoff_freq * filter_safety_factor)) +# Resample the low-passed filtered patch +new_time_ax = np.arange(delta_pa.attrs["time_min"], delta_pa.attrs["time_max"], np.timedelta64(dt, "s")) +delta_pa_lfp = delta_pa_low_passed.interpolate(time=new_time_ax) + +# Identify the indices where the absolute value of the data exceeds the threshold +data_abs = np.abs(delta_pa_lfp.data) +threshold = np.max(data_abs) * tolerance +ind = data_abs > threshold +ind_1 = np.where(ind)[1][0] +ind_2 = np.where(ind)[1][-1] + +# Get the total duration of the processed delta function patch in seconds +time_coord = delta_pa_lfp.get_coord('time') +delta_pa_lfp_seconds = dc.to_float((time_coord.max() - time_coord.min())) +# Convert the new time axis to absolute seconds, relative to the first timestamp +time_ax_abs = (new_time_ax - new_time_ax[0]) / np.timedelta64(1, "s") +# Center the time axis +time_ax_centered = time_ax_abs - delta_pa_lfp_seconds // 2 + +# Calculate the maximum of edges in both sides (in seconds) where artifacts are present +edge = max(np.abs(time_ax_centered[ind_1]), np.abs(time_ax_centered[ind_2])) + +# Validate the `edge` value to ensure sufficient processing patch size +if np.ceil(edge) >= chunk_size / 2: + raise ValueError( + f"The calculated `edge` value ({edge:.2f} seconds) is greater than half of the processing patch size " + f"({chunk_size:.2f} seconds). To resolve this and increase efficiency, consider one of the following:\n" + "- Increase `memory_size` to allow for a larger processing window.\n" + "- Increase `tolerance` to reduce the sensitivity of artifact detection." + ) +``` + + +## Perform low-frequency processing and save results on disk +```{python} + +# First we chunk the spool based on the `chunk_size' and `edge` calculated before. +sp_chunked_overlap = sp.chunk(time=chunk_size, overlap=2*edge, keep_partial=True) + +# Process each patch in the spool and save the result patch +lf_patches = [] +for patch in sp_chunked_overlap: + # Apply any pre-processing you may need (such as velocity to strain rate transformation, detrending, etc.) + # ... + + # Apply the low-pass filter on the delta patch + pa_low_passed = patch.pass_filter(time=(..., cutoff_freq * filter_safety_factor)) + # Resample the low-passed filter patch + new_time_ax = np.arange(pa_low_passed.attrs["time_min"], pa_low_passed.attrs["time_max"], np.timedelta64(dt, "s")) + pa_lfp = ( + pa_low_passed.interpolate(time=new_time_ax) + .update_coords(time_step=dt) # Update sampling interval + # remove edges from data at both ends + .select(time=(edge, -edge), relative=True) + ) + + # Save processed patch + pa_lf_name = pa_lfp.get_patch_name() + path = output_data_dir / f"{pa_lf_name}.h5" + pa_lfp.io.write(path, "dasdae") +``` + +## Visualize the results +```{python} +# Create a spool of LF processed results +sp_lf = dc.spool(output_data_dir) + +# Merge the spool and create a single patch. May need to sub-select before merging to prevent exceeding the memory limit. +sp_lf_merged = sp_lf.chunk(time=None, conflict="keep_first") +pa_lf_merged = sp_lf_merged[0] + +# Visualize the results. Try different scale values for better Visualization. +pa_lf_merged.viz.waterfall(scale=0.5) +``` + +#### For any questions, please contact [Ahmad Tourei](https://github.com/ahmadtourei). diff --git a/docs/recipes/real_time_proc.qmd b/docs/recipes/real_time_proc.qmd index 27b65463..a3748592 100644 --- a/docs/recipes/real_time_proc.qmd +++ b/docs/recipes/real_time_proc.qmd @@ -5,19 +5,17 @@ execute: --- -This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in near real time for edge computing purposes. +This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in "near" real time for edge computing purposes. ## Load libraries and get a spool ```{python} -# Import libraries +import numpy as np import os import time import dascore as dc -import numpy as np - from dascore.units import s @@ -25,7 +23,10 @@ from dascore.units import s output_data_dir = '/path/to/desired/output/directory' # Get a spool to work on -sp = dc.get_example_spool().update().sort("time") +sp = dc.get_example_spool().update() + +# Sort the spool +sp = sp.sort("time") ``` ## Set real-time processing parameters (if needed) diff --git a/scripts/_templates/_quarto.yml b/scripts/_templates/_quarto.yml index c63f3ea8..c2958714 100644 --- a/scripts/_templates/_quarto.yml +++ b/scripts/_templates/_quarto.yml @@ -139,6 +139,7 @@ website: - recipes/real_time_proc.qmd - recipes/smoothing.qmd - recipes/parallelization.qmd + - recipes/low_freq_proc.qmd - section: "IO" contents: diff --git a/tests/conftest.py b/tests/conftest.py index b8a650f7..fb7aad7f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -389,7 +389,7 @@ def one_file_dir(tmp_path_factory, random_patch): def random_directory_spool(tmp_path_factory): """A directory with a few patch files.""" path = Path(tmp_path_factory.mktemp("one_file_file_spool")) - return dc.examples.random_directory_spool(path) + return dc.examples.random_directory_spool(path=path) @pytest.fixture(scope="class") diff --git a/tests/test_examples.py b/tests/test_examples.py index d796bd65..a964007a 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -68,3 +68,93 @@ def test_moveout(self): distances = patch.get_coord("distance").values expected_moveout = distances / velocity assert np.allclose(moveout, expected_moveout) + + +class TestDeltaPatch: + """Tests for the delta_patch example.""" + + @pytest.mark.parametrize("invalid_dim", ["inv_dim", "", None, 123, 1.1]) + def test_delta_patch_invalid_dim(self, invalid_dim): + """ + Test that passing an invalid dimension value raises a ValueError. + """ + msg = "with 'time' and 'distance'" + with pytest.raises(ValueError, match=msg): + dc.get_example_patch("delta_patch", dim=invalid_dim) + + @pytest.mark.parametrize("dim", ["time", "distance"]) + def test_delta_patch_structure(self, dim): + """Test that the delta_patch returns a Patch with correct structure.""" + patch = dc.get_example_patch("delta_patch", dim=dim) + assert isinstance(patch, dc.Patch), "delta_patch should return a Patch instance" + + dims = patch.dims + assert ( + "time" in dims and "distance" in dims + ), "Patch must have 'time' and 'distance' dimensions" + + @pytest.mark.parametrize("dim", ["time", "distance"]) + def test_delta_patch_delta_location(self, dim): + """ + Ensures the delta is at the center of the chosen dimension and zeros elsewhere. + """ + # The default shape from the function signature: shape=(10, 200) + # If dim="time", we end up with a single (distance=0) trace => shape (200,) + # If dim="distance", we end up with a single (time=0) trace => shape (10,) + patch = dc.get_example_patch("delta_patch", dim=dim) + data = patch.squeeze().data + + # The expected midpoint and verify single delta at center + mid_idx = len(data) // 2 + + assert data[mid_idx] == 1.0, "Expected a unit delta at the center" + # Check all other samples are zero + # Replace the center value with zero and ensure all zeros remain + test_data = np.copy(data) + test_data[mid_idx] = 0 + assert np.allclose( + test_data, 0 + ), "All other samples should be zero except the center" + + @pytest.mark.parametrize("dim", ["time", "distance"]) + def test_delta_patch_with_patch(self, dim): + """Test passing an existing patch to delta_patch and ensure delta is applied.""" + # Create a base patch + base_patch = dc.get_example_patch("random_das", shape=(5, 50)) + # Apply the delta_patch function with the existing patch + delta_applied_patch = dc.get_example_patch( + "delta_patch", dim=dim, patch=base_patch + ) + + assert isinstance(delta_applied_patch, dc.Patch), "Should return a Patch" + data = delta_applied_patch.squeeze().data + + # Check that only the center value is one and others are zero + mid_idx = len(data) // 2 + assert data[mid_idx] == 1.0, "Center sample should be 1.0" + test_data = np.copy(data) + test_data[mid_idx] = 0 + assert np.allclose( + test_data, 0 + ), "All other samples should be zero except the center" + + @pytest.mark.parametrize("dim", ["lag_time", "distance"]) + def test_delta_patch_with_3d_patch(self, dim): + """Test passing a 3D patch.""" + # Create a base patch + base_patch = dc.get_example_patch("sin_wav") + base_patch_3d = base_patch.correlate(distance=[2], samples=True) + # Apply the delta_patch function with the existing patch + delta_applied_patch = dc.get_example_patch( + "delta_patch", dim=dim, patch=base_patch_3d + ) + assert isinstance(delta_applied_patch, dc.Patch), "Should return a Patch" + data = delta_applied_patch.squeeze().data + # Check that only the center value is one and others are zero + mid_idx = len(data) // 2 + assert data[mid_idx] == 1.0, "Center sample should be 1.0" + test_data = np.copy(data) + test_data[mid_idx] = 0 + assert np.allclose( + test_data, 0 + ), "All other samples should be zero except the center"