diff --git a/CHANGES.md b/CHANGES.md index e49cdf69e..6503ac785 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,9 @@ ## Current +- core.match_filter.tribe + - Add option to set minimum number of stations required to use a template in detect + (`min_stations` kwarg) + +## 0.5.0 * core.match_filter.tribe - Significant re-write of detect logic to take advantage of parallel steps (see #544) - Significant re-structure of hidden functions. diff --git a/eqcorrscan/core/match_filter/helpers/processes.py b/eqcorrscan/core/match_filter/helpers/processes.py index 7a85945c0..7eb44e52b 100644 --- a/eqcorrscan/core/match_filter/helpers/processes.py +++ b/eqcorrscan/core/match_filter/helpers/processes.py @@ -388,14 +388,18 @@ def _pre_processor( break Logger.debug("Getting stream from queue") st = _get_and_check(input_stream_queue, poison_queue) + Logger.warning(st) if st is None: Logger.info("Ran out of streams, stopping processing") break elif isinstance(st, Poison): Logger.error("Killed") break - if len(st) == 0: - break + # if len(st) == 0: + # Logger.error("Empty stream provided") + # poison_queue.put_nowait(Poison(IndexError + # ("No matching channels between stream and "))) + # break Logger.info(f"Processing stream:\n{st}") # Process stream @@ -445,6 +449,7 @@ def _prepper( output_queue: Queue, poison_queue: Queue, xcorr_func: str = None, + min_stations: int = 0, ): """ Prepare templates and stream for correlation. @@ -472,6 +477,8 @@ def _prepper( Queue to check for poison, or put poison into if something goes awry :param xcorr_func: Name of correlation function backend to be used. + :param min_stations: + Minimum number of stations to run a template. """ if isinstance(templates, dict): # We have been passed a db of template files on disk @@ -525,16 +532,25 @@ def _prepper( f"Multiple channels in continuous data for " f"{', '.join(_duplicate_sids)}"))) break + template_sids = {tr.id for template in templates for tr in template.st} + if len(template_sids.intersection(st_sids)) == 0: + poison_queue.put_nowait(Poison( + IndexError("No matching channels between templates and data"))) # Do the grouping for this stream Logger.info(f"Grouping {len(templates)} templates into groups " f"of {group_size} templates") try: template_groups = _group(sids=st_sids, templates=templates, - group_size=group_size, groups=groups) + group_size=group_size, groups=groups, + min_stations=min_stations) except Exception as e: Logger.error(e) poison_queue.put_nowait(Poison(e)) break + if template_groups is None: + output_queue.put_nowait(None) + return + Logger.info(f"Grouped into {len(template_groups)} groups") for i, template_group in enumerate(template_groups): killed = _check_for_poison(poison_queue) diff --git a/eqcorrscan/core/match_filter/helpers/tribe.py b/eqcorrscan/core/match_filter/helpers/tribe.py index 9786a68fc..2b6ff2a1a 100644 --- a/eqcorrscan/core/match_filter/helpers/tribe.py +++ b/eqcorrscan/core/match_filter/helpers/tribe.py @@ -17,7 +17,7 @@ import numpy as np from collections import defaultdict -from typing import List, Set +from typing import List, Set, Union from timeit import default_timer from concurrent.futures import ThreadPoolExecutor @@ -261,8 +261,9 @@ def _group( sids: Set[str], templates: List[Template], group_size: int, - groups: List[List[str]] = None -) -> List[List[Template]]: + groups: List[List[str]] = None, + min_stations: int = 0, +) -> Union[List[List[Template]], None]: """ Group templates either by seed id, or using pre-computed groups @@ -270,27 +271,37 @@ def _group( :param templates: Templates to group :param group_size: Maximum group size :param groups: [Optional] List of List of template names in groups - :return: Groups of templates. + :param min_stations: Minimum number of stations to run a template. + + :return: Groups of templates. None if no templates meet criteria """ Logger.info(f"Grouping for {sids}") if groups: Logger.info("Using pre-computed groups") t_dict = {t.name: t for t in templates} + stations = {sid.split('.')[1] for sid in sids} template_groups = [] for grp in groups: template_group = [ t_dict.get(t_name) for t_name in grp - if t_name in t_dict.keys()] + if t_name in t_dict.keys() and + len({tr.stats.station for tr in + t_dict.get(t_name).st}.intersection(stations) + ) >= max(1, min_stations)] + Logger.info(f"Dropping {len(grp) - len(template_group)} templates " + f"due to fewer than {min_stations} matched channels") if len(template_group): template_groups.append(template_group) return template_groups template_groups = group_templates_by_seedid( templates=templates, st_seed_ids=sids, - group_size=group_size) + group_size=group_size, + min_stations=min_stations) if len(template_groups) == 1 and len(template_groups[0]) == 0: Logger.error("No matching ids between stream and templates") - raise IndexError("No matching ids between stream and templates") + return None + # raise IndexError("No matching ids between stream and templates") return template_groups diff --git a/eqcorrscan/core/match_filter/template.py b/eqcorrscan/core/match_filter/template.py index 599d27ed3..6bdf5c4e3 100644 --- a/eqcorrscan/core/match_filter/template.py +++ b/eqcorrscan/core/match_filter/template.py @@ -761,6 +761,7 @@ def group_templates_by_seedid( templates: List[Template], st_seed_ids: Set[str], group_size: int, + min_stations: int = 0, ) -> List[List[Template]]: """ Group templates to reduce dissimilar traces @@ -771,6 +772,9 @@ def group_templates_by_seedid( Seed ids in the stream to be matched with :param group_size: Maximum group size - will not exceed this size + :param min_stations: + Minimum number of overlapping stations between template + and stream to use the template for detection. :return: List of lists of templates grouped. @@ -788,9 +792,9 @@ def group_templates_by_seedid( # Don't use templates that don't have any overlap with the stream template_seed_ids = tuple( (t_name, t_chans) for t_name, t_chans in template_seed_ids - if len(t_chans)) + if len({sid.split('.')[1] for sid in t_chans}) >= max(1, min_stations)) Logger.info(f"Dropping {len(templates) - len(template_seed_ids)} " - f"templates due to no matched channels") + f"templates due to fewer than {min_stations} matched channels") # We will need this dictionary at the end for getting the templates by id template_dict = {t.name: t for t in templates} # group_size can be None, in which case we don't actually need to group diff --git a/eqcorrscan/core/match_filter/tribe.py b/eqcorrscan/core/match_filter/tribe.py index e415d1897..60d2e8b3e 100644 --- a/eqcorrscan/core/match_filter/tribe.py +++ b/eqcorrscan/core/match_filter/tribe.py @@ -657,7 +657,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False, concurrent_processing=False, ignore_length=False, ignore_bad_data=False, group_size=None, overlap="calculate", full_peaks=False, save_progress=False, process_cores=None, - pre_processed=False, check_processing=True, + pre_processed=False, check_processing=True, min_stations=0, **kwargs): """ Detect using a Tribe of templates within a continuous stream. @@ -749,6 +749,10 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False, :type check_processing: bool :param check_processing: Whether to check that all templates were processed the same. + :type min_stations: int + :param min_stations: + Minimum number of overlapping stations between template + and stream to use the template for detection. :return: :class:`eqcorrscan.core.match_filter.Party` of Families of @@ -871,7 +875,7 @@ def detect(self, stream, threshold, threshold_type, trig_int, plot=False, ignore_bad_data, group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, - plot, plotdir, plot_format,) + plot, plotdir, plot_format, min_stations,) if concurrent_processing: party = self._detect_concurrent(*args, **inner_kwargs) @@ -899,7 +903,7 @@ def _detect_serial( group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format, - **kwargs + min_stations, **kwargs ): """ Internal serial detect workflow. """ from eqcorrscan.core.match_filter.helpers.tribe import ( @@ -929,7 +933,10 @@ def _detect_serial( delta = st_chunk[0].stats.delta template_groups = _group( sids={tr.id for tr in st_chunk}, - templates=self.templates, group_size=group_size, groups=groups) + templates=self.templates, group_size=group_size, groups=groups, + min_stations=min_stations) + if template_groups is None: + continue for i, template_group in enumerate(template_groups): templates = [_quick_copy_stream(t.st) for t in template_group] template_names = [t.name for t in template_group] @@ -986,7 +993,7 @@ def _detect_concurrent( group_size, groups, sampling_rate, threshold, threshold_type, save_progress, xcorr_func, concurrency, cores, export_cccsums, parallel, peak_cores, trig_int, full_peaks, plot, plotdir, plot_format, - **kwargs + min_stations, **kwargs ): """ Internal concurrent detect workflow. """ from eqcorrscan.core.match_filter.helpers.processes import ( @@ -1062,6 +1069,7 @@ def _detect_concurrent( output_queue=prepped_queue, poison_queue=poison_queue, xcorr_func=xcorr_func, + min_stations=min_stations, ), name="PrepProcess" ) @@ -1224,7 +1232,7 @@ def client_detect(self, client, starttime, endtime, threshold, ignore_bad_data=False, group_size=None, return_stream=False, full_peaks=False, save_progress=False, process_cores=None, retries=3, - check_processing=True, **kwargs): + check_processing=True, min_stations=0, **kwargs): """ Detect using a Tribe of templates within a continuous stream. @@ -1316,6 +1324,10 @@ def client_detect(self, client, starttime, endtime, threshold, :param retries: Number of attempts allowed for downloading - allows for transient server issues. + :type min_stations: int + :param min_stations: + Minimum number of overlapping stations between template + and stream to use the template for detection. :return: :class:`eqcorrscan.core.match_filter.Party` of Families of @@ -1425,7 +1437,8 @@ def client_detect(self, client, starttime, endtime, threshold, process_cores=process_cores, save_progress=save_progress, return_stream=return_stream, check_processing=False, poison_queue=poison_queue, shutdown=False, - concurrent_processing=concurrent_processing, groups=groups) + concurrent_processing=concurrent_processing, groups=groups, + min_stations=min_stations) if not concurrent_processing: Logger.warning("Using concurrent_processing=True can be faster if" diff --git a/eqcorrscan/tests/matched_filter/match_filter_test.py b/eqcorrscan/tests/matched_filter/match_filter_test.py index ff121c242..76180f897 100644 --- a/eqcorrscan/tests/matched_filter/match_filter_test.py +++ b/eqcorrscan/tests/matched_filter/match_filter_test.py @@ -729,6 +729,52 @@ def test_tribe_detect(self): party=party, party_in=self.party, float_tol=0.05, check_event=True) + def test_min_stations(self): + """ + Check that minimum stations is respected and templates + without sufficient stations are not run. + """ + local_tribe = self.tribe.copy() + # Drop some channels + local_tribe[0].st = local_tribe[0].st[0:-2] + for conc_proc in [False, True]: + party = local_tribe.detect( + stream=self.unproc_st, threshold=8.0, threshold_type='MAD', + trig_int=6.0, daylong=False, plot=False, min_stations=5, + parallel_process=False, concurrent_processing=conc_proc) + self.assertEqual(len(party), 3) + + @pytest.mark.network + def test_min_stations_network(self): + """ + Check that minimum stations is respected and templates + without sufficient stations are not run. + """ + local_tribe = self.tribe.copy() + # Drop some channels + local_tribe[0].st = local_tribe[0].st[0:-2] + for conc_proc in [False, True]: + party = local_tribe.client_detect( + client=Client('NCEDC'), starttime=UTCDateTime(2004, 9, 28, 17), + endtime=UTCDateTime(2004, 9, 28, 18), + threshold=8.0, threshold_type='MAD', + trig_int=6.0, daylong=False, plot=False, min_stations=5, + parallel_process=False, concurrent_processing=conc_proc) + self.assertEqual(len(party), 3) + + def test_no_stations(self): + """ + Check that minimum stations is respected and templates + without sufficient stations are not run. + """ + local_tribe = self.tribe.copy() + for conc_proc in [False, True]: + party = local_tribe.detect( + stream=self.unproc_st, threshold=8.0, threshold_type='MAD', + trig_int=6.0, daylong=False, plot=False, min_stations=6, + parallel_process=False, concurrent_processing=conc_proc) + self.assertEqual(len(party), 0) + def test_tribe_detect_with_empty_streams(self): """ Compare the detect method for a tribe of one vs two templates and check