Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup 02: 3x speed up for prep_data_for_correlation with custom copy and trace-selection #525

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
## Current
* utils.pre_processing:
- _prep_data_for_correlation: 3x speedup for filling NaN-traces in templates
- new functions _quick_copy_trace and _quick_stream_copy for 3x quicker
trace / stream copy.
* utils.cluster.decluster_distance_time
- Bug-fix: fix segmentation fault when declustering more than 46340 detections
with hypocentral_separation.
Expand Down
10 changes: 5 additions & 5 deletions eqcorrscan/core/match_filter/matched_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from eqcorrscan.utils.correlate import get_stream_xcorr
from eqcorrscan.utils.findpeaks import multi_find_peaks
from eqcorrscan.utils.pre_processing import (
dayproc, shortproc, _prep_data_for_correlation)
dayproc, shortproc, _prep_data_for_correlation, _quick_copy_stream)

Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -324,8 +324,8 @@ def _group_process(template_group, parallel, cores, stream, daylong,
kwargs.update({'endtime': _endtime})
else:
_endtime = kwargs['starttime'] + 86400
chunk_stream = stream.slice(starttime=kwargs['starttime'],
endtime=_endtime).copy()
chunk_stream = _quick_copy_stream(
stream.slice(starttime=kwargs['starttime'], endtime=_endtime))
Logger.debug(f"Processing chunk {i} between {kwargs['starttime']} "
f"and {_endtime}")
if len(chunk_stream) == 0:
Expand Down Expand Up @@ -666,8 +666,8 @@ def match_filter(template_names, template_list, st, threshold,
if copy_data:
# Copy the stream here because we will muck about with it
Logger.info("Copying data to keep your input safe")
stream = st.copy()
templates = [t.copy() for t in template_list]
stream = _quick_copy_stream(st)
templates = [_quick_copy_stream(t) for t in template_list]
_template_names = template_names.copy() # This can be a shallow copy
else:
stream, templates, _template_names = st, template_list, template_names
Expand Down
104 changes: 96 additions & 8 deletions eqcorrscan/utils/pre_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
import numpy as np
import logging
import datetime as dt
import copy

from collections import Counter
from collections import Counter, defaultdict
from multiprocessing import Pool, cpu_count

from obspy import Stream, Trace, UTCDateTime
Expand Down Expand Up @@ -728,6 +729,77 @@ def _fill_gaps(tr):
return gaps, tr


def _quick_copy_trace(trace, deepcopy_data=True):
"""
Function to quickly copy a trace. Sets values in the traces' and trace
header's dict directly, circumventing obspy's init functions.
Speedup: from 37 us to 12 us per trace - 3x faster

:type trace: :class:`obspy.core.trace.Trace`
:param trace: Stream to quickly copy
:type deepcopy_data: bool
:param deepcopy_data:
Whether to deepcopy trace data (with `deepcopy_data=False` expect up to
20 % speedup, but use only when you know that data trace contents will
not change or affect results). Warning: do not use this option to copy
traces with processing history or response information.
:rtype: :class:`obspy.core.trace.Trace`
return: trace
"""
new_trace = Trace()
for key, value in trace.__dict__.items():
if key == 'stats':
new_stats = new_trace.stats
for key_2, value_2 in value.__dict__.items():
if isinstance(value_2, UTCDateTime):
new_stats.__dict__[key_2] = UTCDateTime(
ns=value_2.__dict__['_UTCDateTime__ns'])
else:
new_stats.__dict__[key_2] = value_2
elif deepcopy_data:
# data needs to be deepcopied (and anything else, to be safe)
new_trace.__dict__[key] = copy.deepcopy(value)
else: # No deepcopy, e.g. for NaN-traces with no effect on results
new_trace.__dict__[key] = value
return new_trace


def _quick_copy_stream(stream, deepcopy_data=True):
"""
Function to quickly copy a stream.
Speedup for simple trace:
from 112 us to 44 (35) us per 3-trace stream - 2.8x (3.2x) faster

Warning: use `deepcopy_data=False` (saves extra ~20 % time) only when the
changing the data in the stream later does not change results
(e.g., for NaN-trace or when data array will not be changed).

This is what takes longest (1 empty trace, total time to copy 27 us):
copy header: 18 us (vs create new empty header: 683 ns)
Two points that can speed up copying / creation:
1. circumvent trace.__init__ and trace.__set_attr__ by setting value
directly in trace's __dict__
2. when setting trace header, circumvent that Stats(header) is called
when header is already a Stats instance

:type stream: :class:`obspy.core.stream.Stream`
:param stream: Stream to quickly copy
:type deepcopy_data: bool
:param deepcopy_data:
Whether to deepcopy data (with `deepcopy_data=False` expect up to 20 %
speedup, but use only when you know that data trace contents will not
change or affect results).

:rtype: :class:`obspy.core.stream.Stream`
return: stream
"""
new_traces = list()
for trace in stream:
new_traces.append(
_quick_copy_trace(trace, deepcopy_data=deepcopy_data))
return Stream(new_traces)


def _prep_data_for_correlation(stream, templates, template_names=None,
force_stream_epoch=True):
"""
Expand Down Expand Up @@ -834,12 +906,13 @@ def _prep_data_for_correlation(stream, templates, template_names=None,

# Initialize nan template for speed.
nan_channel = np.full(template_length, np.nan, dtype=np.float32)
nan_channel = np.require(nan_channel, requirements=['C_CONTIGUOUS'])
nan_template = Stream()
for _seed_id in seed_ids:
net, sta, loc, chan = _seed_id[0].split('.')
nan_template += Trace(header=Stats({
'network': net, 'station': sta, 'location': loc,
'channel': chan, 'starttime': UTCDateTime(),
'channel': chan, 'starttime': UTCDateTime(ns=0),
'npts': template_length, 'sampling_rate': samp_rate}))

# Remove templates with no matching channels
Expand Down Expand Up @@ -874,8 +947,8 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
net, sta, loc, chan = earliest_templ_trace_id.split('.')
nan_template += Trace(header=Stats({
'network': net, 'station': sta, 'location': loc,
'channel': chan, 'starttime': UTCDateTime(),
'npts': template_length, 'sampling_rate': samp_rate}))
'channel': chan, 'starttime': UTCDateTime(ns=0),
'sampling_rate': samp_rate}))
stream_nan_data = np.full(
stream_length, np.nan, dtype=np.float32)
out_stream += Trace(
Expand All @@ -894,13 +967,28 @@ def _prep_data_for_correlation(stream, templates, template_names=None,
for template_name in incomplete_templates:
template = _out[template_name]
template_starttime = min(tr.stats.starttime for tr in template)
out_template = nan_template.copy()
# out_template = nan_template.copy()
out_template = _quick_copy_stream(nan_template, deepcopy_data=False)

# Select traces very quickly: assume that trace order does not change,
# make dict of trace-ids and list of indices and use indices to select
stream_trace_id_dict = defaultdict(list)
for n, tr in enumerate(template.traces):
stream_trace_id_dict[tr.id].append(n)

for channel_number, _seed_id in enumerate(seed_ids):
seed_id, channel_index = _seed_id
template_channel = template.select(id=seed_id)
# Select all traces with same seed_id, based on indices for
# corresponding traces stored in stream_trace_id_dict
# Much quicker than: template_channel = template.select(id=seed_id)
template_channel = Stream([
template.traces[idx] for idx in stream_trace_id_dict[seed_id]])
if len(template_channel) <= channel_index:
out_template[channel_number].data = nan_channel
out_template[channel_number].stats.starttime = \
# out_template[channel_number].data = nan_channel # quicker:
out_template[channel_number].__dict__['data'] = copy.deepcopy(
nan_channel)
out_template[channel_number].__dict__['npts'] = template_length
out_template[channel_number].__dict__['starttime'] = \
template_starttime
else:
out_template[channel_number] = template_channel[channel_index]
Expand Down