Skip to content

Commit

Permalink
Speedup 02: 3x speed up for prep_data_for_correlation with custom cop…
Browse files Browse the repository at this point in the history
…y and trace-selection (#525)

* speed up prep_data_for_correlation with custom copy and trace-selection

* add changelog, remove unused imports

* quicker to define empty starttimes with ns=0; need to copy nan-numpy array

* slightly more optimization for quick trace copy

* add deepcopy option and documentation to quick copy functions

* use quick stream copy function for continuous data and templates

* remove stream_quick_select from this PR

* 5 % more speedup with quicker .ns - retrieval

* properly set stats-dict for nan-channels

Co-authored-by: Calum Chamberlain <[email protected]>
  • Loading branch information
flixha and calum-chamberlain authored Jan 3, 2023
1 parent 8e5ca9d commit 0fc5ec7
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* core.match_filter.template
- new quick_group_templates function for 50x quicker template grouping.
* utils.pre_processing
- `_prep_data_for_correlation`: 3x speedup for filling NaN-traces in templates
- New function ``quick_trace_select` for a very efficient selection of trace
by seed ID without wildcards (4x speedup).
* utils.catalog_to_dd._prepare_stream
Expand Down
10 changes: 5 additions & 5 deletions eqcorrscan/core/match_filter/matched_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from eqcorrscan.utils.correlate import get_stream_xcorr
from eqcorrscan.utils.findpeaks import multi_find_peaks
from eqcorrscan.utils.pre_processing import (
dayproc, shortproc, _prep_data_for_correlation)
dayproc, shortproc, _prep_data_for_correlation, _quick_copy_stream)

Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -339,8 +339,8 @@ def _group_process(template_group, parallel, cores, stream, daylong,
kwargs.update({'endtime': _endtime})
else:
_endtime = kwargs['starttime'] + 86400
chunk_stream = stream.slice(starttime=kwargs['starttime'],
endtime=_endtime).copy()
chunk_stream = _quick_copy_stream(
stream.slice(starttime=kwargs['starttime'], endtime=_endtime))
Logger.debug(f"Processing chunk {i} between {kwargs['starttime']} "
f"and {_endtime}")
if len(chunk_stream) == 0:
Expand Down Expand Up @@ -688,8 +688,8 @@ def match_filter(template_names, template_list, st, threshold,
if copy_data:
# Copy the stream here because we will muck about with it
Logger.info("Copying data to keep your input safe")
stream = st.copy()
templates = [t.copy() for t in template_list]
stream = _quick_copy_stream(st)
templates = [_quick_copy_stream(t) for t in template_list]
_template_names = template_names.copy() # This can be a shallow copy
else:
stream, templates, _template_names = st, template_list, template_names
Expand Down
93 changes: 87 additions & 6 deletions eqcorrscan/utils/pre_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import numpy as np
import logging
import datetime as dt
import copy

from collections import Counter, defaultdict
from multiprocessing import Pool, cpu_count
Expand Down Expand Up @@ -728,6 +729,77 @@ def _fill_gaps(tr):
return gaps, tr


def _quick_copy_trace(trace, deepcopy_data=True):
"""
Function to quickly copy a trace. Sets values in the traces' and trace
header's dict directly, circumventing obspy's init functions.
Speedup: from 37 us to 12 us per trace - 3x faster
:type trace: :class:`obspy.core.trace.Trace`
:param trace: Stream to quickly copy
:type deepcopy_data: bool
:param deepcopy_data:
Whether to deepcopy trace data (with `deepcopy_data=False` expect up to
20 % speedup, but use only when you know that data trace contents will
not change or affect results). Warning: do not use this option to copy
traces with processing history or response information.
:rtype: :class:`obspy.core.trace.Trace`
return: trace
"""
new_trace = Trace()
for key, value in trace.__dict__.items():
if key == 'stats':
new_stats = new_trace.stats
for key_2, value_2 in value.__dict__.items():
if isinstance(value_2, UTCDateTime):
new_stats.__dict__[key_2] = UTCDateTime(
ns=value_2.__dict__['_UTCDateTime__ns'])
else:
new_stats.__dict__[key_2] = value_2
elif deepcopy_data:
# data needs to be deepcopied (and anything else, to be safe)
new_trace.__dict__[key] = copy.deepcopy(value)
else: # No deepcopy, e.g. for NaN-traces with no effect on results
new_trace.__dict__[key] = value
return new_trace


def _quick_copy_stream(stream, deepcopy_data=True):
"""
Function to quickly copy a stream.
Speedup for simple trace:
from 112 us to 44 (35) us per 3-trace stream - 2.8x (3.2x) faster
Warning: use `deepcopy_data=False` (saves extra ~20 % time) only when the
changing the data in the stream later does not change results
(e.g., for NaN-trace or when data array will not be changed).
This is what takes longest (1 empty trace, total time to copy 27 us):
copy header: 18 us (vs create new empty header: 683 ns)
Two points that can speed up copying / creation:
1. circumvent trace.__init__ and trace.__set_attr__ by setting value
directly in trace's __dict__
2. when setting trace header, circumvent that Stats(header) is called
when header is already a Stats instance
:type stream: :class:`obspy.core.stream.Stream`
:param stream: Stream to quickly copy
:type deepcopy_data: bool
:param deepcopy_data:
Whether to deepcopy data (with `deepcopy_data=False` expect up to 20 %
speedup, but use only when you know that data trace contents will not
change or affect results).
:rtype: :class:`obspy.core.stream.Stream`
return: stream
"""
new_traces = list()
for trace in stream:
new_traces.append(
_quick_copy_trace(trace, deepcopy_data=deepcopy_data))
return Stream(new_traces)


def _stream_quick_select(stream, seed_id):
"""
4x quicker selection of traces in stream by full Seed-ID. Does not support
Expand Down Expand Up @@ -849,12 +921,13 @@ def _prep_data_for_correlation(stream, templates, template_names=None,

# Initialize nan template for speed.
nan_channel = np.full(template_length, np.nan, dtype=np.float32)
nan_channel = np.require(nan_channel, requirements=['C_CONTIGUOUS'])
nan_template = Stream()
for _seed_id in seed_ids:
net, sta, loc, chan = _seed_id[0].split('.')
nan_template += Trace(header=Stats({
'network': net, 'station': sta, 'location': loc,
'channel': chan, 'starttime': UTCDateTime(),
'channel': chan, 'starttime': UTCDateTime(ns=0),
'npts': template_length, 'sampling_rate': samp_rate}))

# Remove templates with no matching channels
Expand Down Expand Up @@ -889,8 +962,8 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
net, sta, loc, chan = earliest_templ_trace_id.split('.')
nan_template += Trace(header=Stats({
'network': net, 'station': sta, 'location': loc,
'channel': chan, 'starttime': UTCDateTime(),
'npts': template_length, 'sampling_rate': samp_rate}))
'channel': chan, 'starttime': UTCDateTime(ns=0),
'sampling_rate': samp_rate}))
stream_nan_data = np.full(
stream_length, np.nan, dtype=np.float32)
out_stream += Trace(
Expand All @@ -909,7 +982,7 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
for template_name in incomplete_templates:
template = _out[template_name]
template_starttime = min(tr.stats.starttime for tr in template)
out_template = nan_template.copy()
out_template = _quick_copy_stream(nan_template, deepcopy_data=False)

# Select traces very quickly: assume that trace order does not change,
# make dict of trace-ids and list of indices and use indices to select
Expand All @@ -925,9 +998,17 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
template_channel = Stream([
template.traces[idx] for idx in stream_trace_id_dict[seed_id]])
if len(template_channel) <= channel_index:
out_template[channel_number].data = nan_channel
out_template[channel_number].stats.starttime = \
# out_template[channel_number].data = nan_channel # quicker:
out_template[channel_number].__dict__['data'] = copy.deepcopy(
nan_channel)
out_template[channel_number].stats.__dict__['npts'] = \
template_length
out_template[channel_number].stats.__dict__['starttime'] = \
template_starttime
out_template[channel_number].stats.__dict__['endtime'] = \
UTCDateTime(ns=int(
round(template_starttime.ns
+ (template_length * samp_rate) * 1e9)))
else:
out_template[channel_number] = template_channel[channel_index]
# If a template-trace matches a NaN-trace in the stream , then set
Expand Down

0 comments on commit 0fc5ec7

Please sign in to comment.