Skip to content

Commit

Permalink
Low-freq proc (#470)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: derrick chambers <[email protected]>
  • Loading branch information
ahmadtourei and d-chambers authored Dec 31, 2024
1 parent d8e1ff5 commit 084f21f
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 8 deletions.
1 change: 1 addition & 0 deletions dascore/core/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ def channel_count(self) -> int:
coords_from_df = dascore.proc.coords_from_df
make_broadcastable_to = dascore.proc.make_broadcastable_to
apply_ufunc = dascore.proc.apply_ufunc
get_patch_names = get_patch_names

def get_patch_name(self, *args, **kwargs) -> str:
"""
Expand Down
2 changes: 2 additions & 0 deletions dascore/core/spool.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,8 @@ def get_contents(self) -> pd.DataFrame:
"""{doc}."""
return self._df[filter_df(self._df, **self._select_kwargs)]

get_patch_names = get_patch_names


class MemorySpool(DataFrameSpool):
"""A Spool for storing patches in memory."""
Expand Down
103 changes: 102 additions & 1 deletion dascore/examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from dascore.exceptions import UnknownExampleError
from dascore.utils.docs import compose_docstring
from dascore.utils.downloader import fetch
from dascore.utils.misc import register_func
from dascore.utils.misc import iterate, register_func
from dascore.utils.patch import get_patch_names
from dascore.utils.time import to_timedelta64

Expand Down Expand Up @@ -415,6 +415,107 @@ def _ricker(time, delay):
return dc.Patch(data=data, coords=coords, dims=dims)


@register_func(EXAMPLE_PATCHES, key="delta_patch")
def delta_patch(
dim="time",
shape=(10, 200),
time_min="2020-01-01",
time_step=1 / 250,
distance_min=0,
distance_step=1,
patch=None,
):
"""
Create a delta function patch (zeros everywhere except for
a unit value at the center) along the specified dimension.
The returned delta patch has single coordinate(s) along the
other dimensions.
Parameters
----------
dim : str
The dimension at the center of which to place the unit value.
Typically ``"time"`` or ``"distance"``.
shape : tuple of int
The shape of the data as (distance, time). Defaults to (10, 200).
This is used only if no existing ``patch`` is provided.
time_min : str or datetime64
The start time of the patch.
time_step : float
The time step in seconds between samples.
distance_min : float
The minimum distance coordinate.
distance_step : float
The distance step in meters between samples.
patch : dascore.Patch
If provided, creates the delta patch based on this existing patch.
Default is None.
"""
if patch is None:
if dim not in ["time", "distance"]:
raise ValueError(
"In case no patch is provided, the delta patch will be "
"a 2D patch with 'time' and 'distance' dimensions."
)

dims = ("distance", "time")
dist_len, time_len = shape

# Create coordinates
time_step_td = to_timedelta64(time_step)
t0 = np.datetime64(time_min)
time_coord = dascore.core.get_coord(
data=t0 + np.arange(time_len) * time_step_td, step=time_step_td, units="s"
)
dist_coord = dascore.core.get_coord(
data=distance_min + np.arange(dist_len) * distance_step,
step=distance_step,
units="m",
)

coords = {"distance": dist_coord, "time": time_coord}
attrs = dict(
time_min=t0,
time_step=time_step_td,
distance_min=distance_min,
distance_step=distance_step,
category="DAS",
network="",
station="",
tag="delta",
time_units="s",
distance_units="m",
)

# Depending on the selected dimension, place a line of ones at the midpoint
used_dims = tuple(iterate(dim))
unused_dims = set(dims) - set(used_dims)

# Get data with ones centered on selected dimensions.
index = tuple(
shape[dims.index(dimension)] // 2 if dimension in used_dims else 0
for dimension in dims
)
data = np.zeros((dist_len, time_len))
data[index] = 1
delta_patch = dc.Patch(data=data, coords=coords, dims=dims, attrs=attrs)
return delta_patch.select(**{x: 0 for x in unused_dims}, samples=True)
else:
used_dims = tuple(iterate(dim))
unused_dims = set(patch.dims) - set(used_dims)
patch = patch.select(**{x: 0 for x in unused_dims}, samples=True)

# Get data with ones centered on selected dimensions.
shape = patch.shape
index = tuple(
shape[patch.dims.index(dimension)] // 2 if dimension in used_dims else 0
for dimension in patch.dims
)
data = np.zeros_like(patch.data)
data[index] = 1
return patch.update(data=data)


@register_func(EXAMPLE_PATCHES, key="dispersion_event")
def dispersion_event():
"""
Expand Down
2 changes: 1 addition & 1 deletion dascore/utils/chunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def _create_df(self, df, name, start_stop, gnum):
if len(vals) > 1:
msg = (
f"Cannot merge on dim {self._name} because all values for "
f"{col} are not equal. Consider using the `attr_conflict` "
f"{col} are not equal. Consider using the `conflict` "
f"argument to loosen this restriction."
)
raise CoordMergeError(msg)
Expand Down
166 changes: 166 additions & 0 deletions docs/recipes/low_freq_proc.qmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
title: "Low-Frequency Processing"
---

This recipe demonstrates how DASCore can be used to apply low-frequency (LF) processing to a spool of DAS data. LF processing helps efficiently downsample the entire spool.


## Get a spool and define parameters
```{python}
import tempfile
from pathlib import Path
import numpy as np
import os
import dascore as dc
from dascore.utils.patch import get_patch_names
## Load libraries and get a spool to work on
# Define path for saving results (swap out with your path)
output_data_dir = Path(tempfile.mkdtemp())
# Get a spool to work on
sp = dc.get_example_spool().update()
# Sort the spool
sp = sp.sort("time")
# You may also want to sub-select the spool for the desired distance or time samples before proceeding.
# Get the first patch
pa = sp[0]
# Define the target sampling interval (in sec.)
dt = 1
# With a sampling interval of 10 seconds, the cutoff frequency (Nyquist frequency) is determined to be 0.05 Hz
cutoff_freq = 1 / (2*dt)
# Safety factor for the low-pass filter to avoid ailiasing
filter_safety_factor = 0.9
# Enter memory size to be dedicated for processing (in MB)
memory_limit_MB = 75
# Define a tolerance for determining edge effects (used in next step)
tolerance = 1e-3
```

## Calculate chunk size and determine edge effects

To chunk the spool, first we need to figure out the chunk size based on machine's memory size so we ensure we can load and process patches with no memory issues. Longer chunk size (longer patches) increases computation efficiency.

Notes:

1. The `processing_factor` is required because certain processing routines involve making copies of the data during the processing steps. It should be determined by performing memory profiling on an example dataset for the specific processing routine. For instance, the combination of low-pass filtering and interpolation, discussed in the next section, requires a processing factor of approximately 5.
2. The `memory_safety_factor` is optional and helps prevent getting too close to the memory limit.

```{python}
# Get patch's number of bytes per seconds (based on patch's data type)
pa_bytes_per_second = pa.data.nbytes / pa.seconds
# Define processing factor and safety factor
processing_factor = 5
memory_safety_factor = 1.2
# Calculate memory size required for each second of data to get processed
memory_size_per_second = pa_bytes_per_second * processing_factor * memory_safety_factor
memory_size_per_second_MB = memory_size_per_second / 1e6
# Calculate chunk size that can be loaded (in seconds)
chunk_size = memory_limit_MB / memory_size_per_second_MB
# Ensure `chunk_size` does not exceed the spool length
merged_sp = sp.chunk(time=None)[0]
if chunk_size > merged_sp.seconds:
print(
f"Warning: Specified `chunk_size` ({chunk_size:.2f} seconds) exceeds the spool length "
f"({merged_sp.seconds:.2f} seconds). Adjusting `chunk_size` to match spool length."
)
chunk_size = merged_sp.seconds
```

Next, we need to determine the extent of artifacts introduced by low-pass filtering at the edges of each patch. To achieve this, we apply LF processing to a delta function patch, which contains a unit value at the center and zeros elsewhere. The distorted edges are then identified based on a defined threshold.

```{python}
# Retrieve a patch of appropriate size for LF processing that fits into memory
pa_chunked_sp = sp.chunk(time=chunk_size, keep_partial=True)[0]
# Create a delta patch based on new patch size
delta_pa = dc.get_example_patch("delta_patch", dim="time", patch=pa_chunked_sp)
# Apply the low-pass filter on the delta patch
delta_pa_low_passed = delta_pa.pass_filter(time=(None, cutoff_freq * filter_safety_factor))
# Resample the low-passed filtered patch
new_time_ax = np.arange(delta_pa.attrs["time_min"], delta_pa.attrs["time_max"], np.timedelta64(dt, "s"))
delta_pa_lfp = delta_pa_low_passed.interpolate(time=new_time_ax)
# Identify the indices where the absolute value of the data exceeds the threshold
data_abs = np.abs(delta_pa_lfp.data)
threshold = np.max(data_abs) * tolerance
ind = data_abs > threshold
ind_1 = np.where(ind)[1][0]
ind_2 = np.where(ind)[1][-1]
# Get the total duration of the processed delta function patch in seconds
time_coord = delta_pa_lfp.get_coord('time')
delta_pa_lfp_seconds = dc.to_float((time_coord.max() - time_coord.min()))
# Convert the new time axis to absolute seconds, relative to the first timestamp
time_ax_abs = (new_time_ax - new_time_ax[0]) / np.timedelta64(1, "s")
# Center the time axis
time_ax_centered = time_ax_abs - delta_pa_lfp_seconds // 2
# Calculate the maximum of edges in both sides (in seconds) where artifacts are present
edge = max(np.abs(time_ax_centered[ind_1]), np.abs(time_ax_centered[ind_2]))
# Validate the `edge` value to ensure sufficient processing patch size
if np.ceil(edge) >= chunk_size / 2:
raise ValueError(
f"The calculated `edge` value ({edge:.2f} seconds) is greater than half of the processing patch size "
f"({chunk_size:.2f} seconds). To resolve this and increase efficiency, consider one of the following:\n"
"- Increase `memory_size` to allow for a larger processing window.\n"
"- Increase `tolerance` to reduce the sensitivity of artifact detection."
)
```


## Perform low-frequency processing and save results on disk
```{python}
# First we chunk the spool based on the `chunk_size' and `edge` calculated before.
sp_chunked_overlap = sp.chunk(time=chunk_size, overlap=2*edge, keep_partial=True)
# Process each patch in the spool and save the result patch
lf_patches = []
for patch in sp_chunked_overlap:
# Apply any pre-processing you may need (such as velocity to strain rate transformation, detrending, etc.)
# ...
# Apply the low-pass filter on the delta patch
pa_low_passed = patch.pass_filter(time=(..., cutoff_freq * filter_safety_factor))
# Resample the low-passed filter patch
new_time_ax = np.arange(pa_low_passed.attrs["time_min"], pa_low_passed.attrs["time_max"], np.timedelta64(dt, "s"))
pa_lfp = (
pa_low_passed.interpolate(time=new_time_ax)
.update_coords(time_step=dt) # Update sampling interval
# remove edges from data at both ends
.select(time=(edge, -edge), relative=True)
)
# Save processed patch
pa_lf_name = pa_lfp.get_patch_name()
path = output_data_dir / f"{pa_lf_name}.h5"
pa_lfp.io.write(path, "dasdae")
```

## Visualize the results
```{python}
# Create a spool of LF processed results
sp_lf = dc.spool(output_data_dir)
# Merge the spool and create a single patch. May need to sub-select before merging to prevent exceeding the memory limit.
sp_lf_merged = sp_lf.chunk(time=None, conflict="keep_first")
pa_lf_merged = sp_lf_merged[0]
# Visualize the results. Try different scale values for better Visualization.
pa_lf_merged.viz.waterfall(scale=0.5)
```

#### For any questions, please contact [Ahmad Tourei](https://github.com/ahmadtourei).
11 changes: 6 additions & 5 deletions docs/recipes/real_time_proc.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,28 @@ execute:
---


This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in near real time for edge computing purposes.
This recipe serves as an example to showcase the real-time processing capability of DASCore. Here, we demonstrate how to use DASCore to perform rolling mean processing on a spool in "near" real time for edge computing purposes.


## Load libraries and get a spool

```{python}
# Import libraries
import numpy as np
import os
import time
import dascore as dc
import numpy as np
from dascore.units import s
# Define path for saving results
output_data_dir = '/path/to/desired/output/directory'
# Get a spool to work on
sp = dc.get_example_spool().update().sort("time")
sp = dc.get_example_spool().update()
# Sort the spool
sp = sp.sort("time")
```

## Set real-time processing parameters (if needed)
Expand Down
1 change: 1 addition & 0 deletions scripts/_templates/_quarto.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ website:
- recipes/real_time_proc.qmd
- recipes/smoothing.qmd
- recipes/parallelization.qmd
- recipes/low_freq_proc.qmd

- section: "IO"
contents:
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ def one_file_dir(tmp_path_factory, random_patch):
def random_directory_spool(tmp_path_factory):
"""A directory with a few patch files."""
path = Path(tmp_path_factory.mktemp("one_file_file_spool"))
return dc.examples.random_directory_spool(path)
return dc.examples.random_directory_spool(path=path)


@pytest.fixture(scope="class")
Expand Down
Loading

0 comments on commit 084f21f

Please sign in to comment.