diff --git a/CHANGES.md b/CHANGES.md index f7a366f69..05a4507f0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -9,6 +9,7 @@ * core.match_filter.template - new quick_group_templates function for 50x quicker template grouping. * utils.pre_processing + - `_prep_data_for_correlation`: 3x speedup for filling NaN-traces in templates - New function ``quick_trace_select` for a very efficient selection of trace by seed ID without wildcards (4x speedup). * utils.catalog_to_dd._prepare_stream diff --git a/eqcorrscan/core/match_filter/matched_filter.py b/eqcorrscan/core/match_filter/matched_filter.py index de8c5986a..e68615d0a 100644 --- a/eqcorrscan/core/match_filter/matched_filter.py +++ b/eqcorrscan/core/match_filter/matched_filter.py @@ -25,7 +25,7 @@ from eqcorrscan.utils.correlate import get_stream_xcorr from eqcorrscan.utils.findpeaks import multi_find_peaks from eqcorrscan.utils.pre_processing import ( - dayproc, shortproc, _prep_data_for_correlation) + dayproc, shortproc, _prep_data_for_correlation, _quick_copy_stream) Logger = logging.getLogger(__name__) @@ -339,8 +339,8 @@ def _group_process(template_group, parallel, cores, stream, daylong, kwargs.update({'endtime': _endtime}) else: _endtime = kwargs['starttime'] + 86400 - chunk_stream = stream.slice(starttime=kwargs['starttime'], - endtime=_endtime).copy() + chunk_stream = _quick_copy_stream( + stream.slice(starttime=kwargs['starttime'], endtime=_endtime)) Logger.debug(f"Processing chunk {i} between {kwargs['starttime']} " f"and {_endtime}") if len(chunk_stream) == 0: @@ -688,8 +688,8 @@ def match_filter(template_names, template_list, st, threshold, if copy_data: # Copy the stream here because we will muck about with it Logger.info("Copying data to keep your input safe") - stream = st.copy() - templates = [t.copy() for t in template_list] + stream = _quick_copy_stream(st) + templates = [_quick_copy_stream(t) for t in template_list] _template_names = template_names.copy() # This can be a shallow copy else: stream, templates, _template_names = st, template_list, template_names diff --git a/eqcorrscan/utils/pre_processing.py b/eqcorrscan/utils/pre_processing.py index 61658361c..45791f109 100644 --- a/eqcorrscan/utils/pre_processing.py +++ b/eqcorrscan/utils/pre_processing.py @@ -12,6 +12,7 @@ import numpy as np import logging import datetime as dt +import copy from collections import Counter, defaultdict from multiprocessing import Pool, cpu_count @@ -728,6 +729,77 @@ def _fill_gaps(tr): return gaps, tr +def _quick_copy_trace(trace, deepcopy_data=True): + """ + Function to quickly copy a trace. Sets values in the traces' and trace + header's dict directly, circumventing obspy's init functions. + Speedup: from 37 us to 12 us per trace - 3x faster + + :type trace: :class:`obspy.core.trace.Trace` + :param trace: Stream to quickly copy + :type deepcopy_data: bool + :param deepcopy_data: + Whether to deepcopy trace data (with `deepcopy_data=False` expect up to + 20 % speedup, but use only when you know that data trace contents will + not change or affect results). Warning: do not use this option to copy + traces with processing history or response information. + :rtype: :class:`obspy.core.trace.Trace` + return: trace + """ + new_trace = Trace() + for key, value in trace.__dict__.items(): + if key == 'stats': + new_stats = new_trace.stats + for key_2, value_2 in value.__dict__.items(): + if isinstance(value_2, UTCDateTime): + new_stats.__dict__[key_2] = UTCDateTime( + ns=value_2.__dict__['_UTCDateTime__ns']) + else: + new_stats.__dict__[key_2] = value_2 + elif deepcopy_data: + # data needs to be deepcopied (and anything else, to be safe) + new_trace.__dict__[key] = copy.deepcopy(value) + else: # No deepcopy, e.g. for NaN-traces with no effect on results + new_trace.__dict__[key] = value + return new_trace + + +def _quick_copy_stream(stream, deepcopy_data=True): + """ + Function to quickly copy a stream. + Speedup for simple trace: + from 112 us to 44 (35) us per 3-trace stream - 2.8x (3.2x) faster + + Warning: use `deepcopy_data=False` (saves extra ~20 % time) only when the + changing the data in the stream later does not change results + (e.g., for NaN-trace or when data array will not be changed). + + This is what takes longest (1 empty trace, total time to copy 27 us): + copy header: 18 us (vs create new empty header: 683 ns) + Two points that can speed up copying / creation: + 1. circumvent trace.__init__ and trace.__set_attr__ by setting value + directly in trace's __dict__ + 2. when setting trace header, circumvent that Stats(header) is called + when header is already a Stats instance + + :type stream: :class:`obspy.core.stream.Stream` + :param stream: Stream to quickly copy + :type deepcopy_data: bool + :param deepcopy_data: + Whether to deepcopy data (with `deepcopy_data=False` expect up to 20 % + speedup, but use only when you know that data trace contents will not + change or affect results). + + :rtype: :class:`obspy.core.stream.Stream` + return: stream + """ + new_traces = list() + for trace in stream: + new_traces.append( + _quick_copy_trace(trace, deepcopy_data=deepcopy_data)) + return Stream(new_traces) + + def _stream_quick_select(stream, seed_id): """ 4x quicker selection of traces in stream by full Seed-ID. Does not support @@ -849,12 +921,13 @@ def _prep_data_for_correlation(stream, templates, template_names=None, # Initialize nan template for speed. nan_channel = np.full(template_length, np.nan, dtype=np.float32) + nan_channel = np.require(nan_channel, requirements=['C_CONTIGUOUS']) nan_template = Stream() for _seed_id in seed_ids: net, sta, loc, chan = _seed_id[0].split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, - 'channel': chan, 'starttime': UTCDateTime(), + 'channel': chan, 'starttime': UTCDateTime(ns=0), 'npts': template_length, 'sampling_rate': samp_rate})) # Remove templates with no matching channels @@ -889,8 +962,8 @@ def _prep_data_for_correlation(stream, templates, template_names=None, net, sta, loc, chan = earliest_templ_trace_id.split('.') nan_template += Trace(header=Stats({ 'network': net, 'station': sta, 'location': loc, - 'channel': chan, 'starttime': UTCDateTime(), - 'npts': template_length, 'sampling_rate': samp_rate})) + 'channel': chan, 'starttime': UTCDateTime(ns=0), + 'sampling_rate': samp_rate})) stream_nan_data = np.full( stream_length, np.nan, dtype=np.float32) out_stream += Trace( @@ -909,7 +982,7 @@ def _prep_data_for_correlation(stream, templates, template_names=None, for template_name in incomplete_templates: template = _out[template_name] template_starttime = min(tr.stats.starttime for tr in template) - out_template = nan_template.copy() + out_template = _quick_copy_stream(nan_template, deepcopy_data=False) # Select traces very quickly: assume that trace order does not change, # make dict of trace-ids and list of indices and use indices to select @@ -925,9 +998,17 @@ def _prep_data_for_correlation(stream, templates, template_names=None, template_channel = Stream([ template.traces[idx] for idx in stream_trace_id_dict[seed_id]]) if len(template_channel) <= channel_index: - out_template[channel_number].data = nan_channel - out_template[channel_number].stats.starttime = \ + # out_template[channel_number].data = nan_channel # quicker: + out_template[channel_number].__dict__['data'] = copy.deepcopy( + nan_channel) + out_template[channel_number].stats.__dict__['npts'] = \ + template_length + out_template[channel_number].stats.__dict__['starttime'] = \ template_starttime + out_template[channel_number].stats.__dict__['endtime'] = \ + UTCDateTime(ns=int( + round(template_starttime.ns + + (template_length * samp_rate) * 1e9))) else: out_template[channel_number] = template_channel[channel_index] # If a template-trace matches a NaN-trace in the stream , then set