From 36017b241b9a6349eaae9b06f7da155cb157272a Mon Sep 17 00:00:00 2001 From: flixha Date: Tue, 22 Nov 2022 09:42:26 +0100 Subject: [PATCH 1/7] add option to keep streams in shared memory for hypodd differential time correlation workers --- eqcorrscan/utils/catalog_to_dd.py | 78 +++++++++++++++++++++++++++++-- 1 file changed, 73 insertions(+), 5 deletions(-) diff --git a/eqcorrscan/utils/catalog_to_dd.py b/eqcorrscan/utils/catalog_to_dd.py index 263a9d2ab..60efac05a 100644 --- a/eqcorrscan/utils/catalog_to_dd.py +++ b/eqcorrscan/utils/catalog_to_dd.py @@ -11,8 +11,8 @@ import numpy as np import logging from collections import namedtuple, defaultdict, Counter -from obspy.core import stream -from multiprocessing import cpu_count, Pool +from multiprocessing import cpu_count, Pool, shared_memory +import uuid from obspy import UTCDateTime, Stream from obspy.core.event import ( @@ -221,14 +221,27 @@ def _prepare_stream(stream, event, extract_len, pre_pick, seed_pick_ids=None): def _compute_dt_correlations(catalog, master, min_link, event_id_mapper, stream_dict, min_cc, extract_len, pre_pick, - shift_len, interpolate, max_workers=1, **kwargs): + shift_len, interpolate, max_workers=1, + shm_data_shape=None, shm_dtype=None, **kwargs): """ Compute cross-correlation delay times. """ max_workers = max_workers or 1 Logger.info( f"Correlating {str(master.resource_id)} with {len(catalog)} events") differential_times_dict = dict() + # Assign trace data from shared memory + for (key, stream) in stream_dict.items(): + for tr in stream: + if len(tr.data) == 0 and hasattr(tr, 'shared_memory_name'): + shm = shared_memory.SharedMemory(name=tr.shared_memory_name) + # Reconstructing numpy data array + sm_data = np.ndarray( + shm_data_shape, dtype=shm_dtype, buffer=shm.buf) + tr.data = np.zeros_like(sm_data) + # Copy data into process memory + tr.data[:] = sm_data[:] + master_stream = _prepare_stream( - stream=stream_dict[str(master.resource_id)], event=master, + stream=stream=stream_dict[str(master.resource_id)], event=master, extract_len=extract_len, pre_pick=pre_pick) available_seed_ids = {tr.id for st in master_stream.values() for tr in st} Logger.debug(f"The channels provided are: {available_seed_ids}") @@ -465,12 +478,47 @@ def _prep_horiz_picks(catalog, stream_dict, event_id_mapper): return catalog +def stream_dict_to_shared_mem(stream_dict): + """ + """ + shm_name_list = [] + shm_data_shapes = [] + shm_data_dtypes = [] + for (key, stream) in stream_dict.items(): + for tr in stream: + data_array = tr.data + # Create valid filename for shared memory from resource ID and trac + shm_name = str(key) + tr.id + str(tr.stats.starttime) + shm_name = shm_name.replace('/', '_').replace(':', '+') + # make the name filename unique + shm_name = shm_name + '_' + str(uuid.uuid4()) + shm = shared_memory.SharedMemory( + name=shm_name, create=True, size=data_array.nbytes) + shm_name_list.append(shm_name) + # Now create a NumPy array backed by shared memory + shm_data_shape = data_array.shape + shm_data_dtype = data_array.dtype + shared_data_array = np.ndarray( + shm_data_shape, dtype=shm_data_dtype, buffer=shm.buf) + # Copy the original data into shared memory + shared_data_array[:] = data_array[:] + # tr.data = shared_data_array + tr.data = np.array([]) + tr.shared_memory_name = shm_name + shm_data_shapes.append(shm_data_shape) + shm_data_dtypes.append(shm_data_dtype) + shm_data_shapes = list(set(shm_data_shapes)) + shm_data_dtypes = list(set(shm_data_dtypes)) + return stream_dict, shm_name_list, shm_data_shapes, shm_data_dtypes + + def compute_differential_times(catalog, correlation, stream_dict=None, event_id_mapper=None, max_sep=8., min_link=8, min_cc=None, extract_len=None, pre_pick=None, shift_len=None, interpolate=False, all_horiz=False, max_workers=None, - max_trace_workers=1, *args, **kwargs): + max_trace_workers=1, use_shared_memory=False, + *args, **kwargs): """ Generate groups of differential times for a catalog. @@ -587,6 +635,18 @@ def compute_differential_times(catalog, correlation, stream_dict=None, sub_catalogs = ([ev for i, ev in enumerate(sparse_catalog) if master_filter[i]] for master_filter in distance_filter) + # Move trace data into shared memory + if use_shared_memory: + shm_stream_dict, shm_name_list, shm_data_shapes, shm_dtypes = ( + stream_dict_to_shared_mem(stream_dict)) + if len(shm_data_shapes) == 1 and len(shm_dtypes) == 1: + shm_data_shape = shm_data_shapes[0] + shm_dtype = shm_dtypes[0] + additional_args.update({'stream_dict': shm_stream_dict}) + additional_args.update({'shm_data_shape': shm_data_shape}) + additional_args.update({'shm_dtype': shm_dtype}) + else: + use_shared_memory = False with pool_boy(Pool, n, cores=max_workers) as pool: # Parallelize over events instead of traces additional_args.update(dict(max_workers=1)) @@ -598,11 +658,19 @@ def compute_differential_times(catalog, correlation, stream_dict=None, sparse_catalog) if str(master.resource_id) in additional_args[ "stream_dict"].keys()] + Logger.info('Submitted asynchronous jobs to workers.') differential_times = { master.resource_id: result.get() for master, result in zip(sparse_catalog, results) if str(master.resource_id) in additional_args[ "stream_dict"].keys()} + Logger.debug('Got results from workers.') + # Destroy shared memory + if use_shared_memory: + for shm_name in shm_name_list: + shm = shared_memory.SharedMemory(name=shm_name) + shm.close() + shm.unlink() else: sub_catalogs = ([ev for i, ev in enumerate(sparse_catalog) if master_filter[i]] From 99fd05b986a138c08fe36f614482e27a31af4c37 Mon Sep 17 00:00:00 2001 From: flixha Date: Tue, 22 Nov 2022 09:45:05 +0100 Subject: [PATCH 2/7] fix typo --- eqcorrscan/utils/catalog_to_dd.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eqcorrscan/utils/catalog_to_dd.py b/eqcorrscan/utils/catalog_to_dd.py index 60efac05a..ecd8fd196 100644 --- a/eqcorrscan/utils/catalog_to_dd.py +++ b/eqcorrscan/utils/catalog_to_dd.py @@ -241,7 +241,7 @@ def _compute_dt_correlations(catalog, master, min_link, event_id_mapper, tr.data[:] = sm_data[:] master_stream = _prepare_stream( - stream=stream=stream_dict[str(master.resource_id)], event=master, + stream=stream_dict[str(master.resource_id)], event=master, extract_len=extract_len, pre_pick=pre_pick) available_seed_ids = {tr.id for st in master_stream.values() for tr in st} Logger.debug(f"The channels provided are: {available_seed_ids}") From 233e53d6fc8689050edef8abdfb38d21b67754c7 Mon Sep 17 00:00:00 2001 From: flixha Date: Mon, 12 Dec 2022 12:05:06 +0100 Subject: [PATCH 3/7] add more documentation for shared memory use --- eqcorrscan/utils/catalog_to_dd.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/eqcorrscan/utils/catalog_to_dd.py b/eqcorrscan/utils/catalog_to_dd.py index ecd8fd196..36ebd5f95 100644 --- a/eqcorrscan/utils/catalog_to_dd.py +++ b/eqcorrscan/utils/catalog_to_dd.py @@ -480,6 +480,27 @@ def _prep_horiz_picks(catalog, stream_dict, event_id_mapper): def stream_dict_to_shared_mem(stream_dict): """ + Move the data of streams from a dict of (key, obspy.stream) into shared + memory so that the data can be retrieved by multiple processes in parallel. + This can help speed up parallel execution because the initiation of each + worker process becomes cheaper (less data to transfer). For now this only + puts the numpy array in trace.data into shared memory (because it's easy). + + :type stream_dict: dict of (key, `obspy.stream`) + :param stream_dict: dict of streams that should be moved to shared memory + + :returns: stream_dict, shm_name_list, shm_data_shapes, shm_data_dtypes + + :rtype: dict + :return: Dictionary streams that were moved to shared memory + :rtype: list + :return: List of names to the shared memory address for each trace. + :rtype: list + :return: + List of numpy-array shaped for each trace-data array in shared memory. + :rtype: list + :return: List of data types for each trace-data-array in shared memory. + """ shm_name_list = [] shm_data_shapes = [] @@ -565,6 +586,11 @@ def compute_differential_times(catalog, correlation, stream_dict=None, Maximum number of workers for parallel correlation of traces insted of events. If None then all threads will be used (but can only be used when max_workers = 1). + :type use_shared_memory: bool + :param use_shared_memory: + Whether to move trace data arrays into shared memory for computing + trace correlations. Can speed up total execution time by ~20 % for + hypodd-correlations with a lot of clustered seismicity. :rtype: dict :return: Dictionary of differential times keyed by event id. From 17cdb6bb1e89990b1319afa08aaaf16c645b2794 Mon Sep 17 00:00:00 2001 From: flixha Date: Mon, 12 Dec 2022 12:21:24 +0100 Subject: [PATCH 4/7] add test for shared memory and changlog entry --- CHANGES.md | 3 +++ eqcorrscan/tests/catalog_to_dd_test.py | 15 +++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 024c576b4..3f6a4f5a1 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,4 +1,7 @@ ## Current +* utils.catalog_to_dd.write_correlations + - New option `use_shared_memory` to speed up correlation of many events by + ca. 20 % by moving trace data into shared memory. ## 0.4.4 * core.match_filter diff --git a/eqcorrscan/tests/catalog_to_dd_test.py b/eqcorrscan/tests/catalog_to_dd_test.py index bee613895..7169c7f7b 100644 --- a/eqcorrscan/tests/catalog_to_dd_test.py +++ b/eqcorrscan/tests/catalog_to_dd_test.py @@ -306,6 +306,21 @@ def test_write_correlations_parallel_process(self): self.assertTrue(os.path.isfile("dt.cc")) os.remove('dt.cc') + def test_write_correlations_parallel_shared_memory(self): + # Contents checked elsewhere + shift_len = 2 + short_cat = self.catalog[0:10] + stream_dict = {event.resource_id.id: stream + for event, stream in zip(short_cat, self.streams)} + write_correlations( + catalog=short_cat, event_id_mapper=None, + max_sep=8., min_link=0, min_cc=0.0, stream_dict=stream_dict, + extract_len=2.0, pre_pick=0.5, shift_len=shift_len, + interpolate=False, parallel_process=True, max_workers=2, + use_shared_memory=True) + self.assertTrue(os.path.isfile("dt.cc")) + os.remove('dt.cc') + def test_write_correlations_parallel_trace_correlation(self): # Contents checked elsewhere shift_len = 2 From def7656af8182d04b90817e6f1f2f75208cd77f8 Mon Sep 17 00:00:00 2001 From: Felix Date: Wed, 4 Jan 2023 09:28:13 +0100 Subject: [PATCH 5/7] let multiprocessing figure out shared memory filenames to work on MacOS --- eqcorrscan/utils/catalog_to_dd.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/eqcorrscan/utils/catalog_to_dd.py b/eqcorrscan/utils/catalog_to_dd.py index 87e008097..cb23d639d 100644 --- a/eqcorrscan/utils/catalog_to_dd.py +++ b/eqcorrscan/utils/catalog_to_dd.py @@ -520,14 +520,10 @@ def stream_dict_to_shared_mem(stream_dict): for (key, stream) in stream_dict.items(): for tr in stream: data_array = tr.data - # Create valid filename for shared memory from resource ID and trac - shm_name = str(key) + tr.id + str(tr.stats.starttime) - shm_name = shm_name.replace('/', '_').replace(':', '+') - # make the name filename unique - shm_name = shm_name + '_' + str(uuid.uuid4()) + # Let SharedMemory create suitable filename itself: shm = shared_memory.SharedMemory( - name=shm_name, create=True, size=data_array.nbytes) - shm_name_list.append(shm_name) + create=True, size=data_array.nbytes) + shm_name_list.append(shm.name) # Now create a NumPy array backed by shared memory shm_data_shape = data_array.shape shm_data_dtype = data_array.dtype @@ -537,7 +533,7 @@ def stream_dict_to_shared_mem(stream_dict): shared_data_array[:] = data_array[:] # tr.data = shared_data_array tr.data = np.array([]) - tr.shared_memory_name = shm_name + tr.shared_memory_name = shm.name shm_data_shapes.append(shm_data_shape) shm_data_dtypes.append(shm_data_dtype) shm_data_shapes = list(set(shm_data_shapes)) From 694f0af113a0edd2b39edfa75d5055a01d6480c2 Mon Sep 17 00:00:00 2001 From: Felix Date: Wed, 4 Jan 2023 12:58:07 +0100 Subject: [PATCH 6/7] remove unused import --- eqcorrscan/utils/catalog_to_dd.py | 1 - 1 file changed, 1 deletion(-) diff --git a/eqcorrscan/utils/catalog_to_dd.py b/eqcorrscan/utils/catalog_to_dd.py index cb23d639d..9227f14f2 100644 --- a/eqcorrscan/utils/catalog_to_dd.py +++ b/eqcorrscan/utils/catalog_to_dd.py @@ -12,7 +12,6 @@ import logging from collections import namedtuple, defaultdict, Counter from multiprocessing import cpu_count, Pool, shared_memory -import uuid from obspy import UTCDateTime, Stream from obspy.core.event import ( From a26d792d934283724718f908d631faa3dd1e6690 Mon Sep 17 00:00:00 2001 From: Felix Date: Thu, 16 Mar 2023 10:04:06 +0100 Subject: [PATCH 7/7] experimental commit: return references to shared memory arrays in list so Windows does not destroy them --- eqcorrscan/utils/catalog_to_dd.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/eqcorrscan/utils/catalog_to_dd.py b/eqcorrscan/utils/catalog_to_dd.py index 9227f14f2..ef3d447af 100644 --- a/eqcorrscan/utils/catalog_to_dd.py +++ b/eqcorrscan/utils/catalog_to_dd.py @@ -516,6 +516,7 @@ def stream_dict_to_shared_mem(stream_dict): shm_name_list = [] shm_data_shapes = [] shm_data_dtypes = [] + shm_references = [] for (key, stream) in stream_dict.items(): for tr in stream: data_array = tr.data @@ -523,6 +524,7 @@ def stream_dict_to_shared_mem(stream_dict): shm = shared_memory.SharedMemory( create=True, size=data_array.nbytes) shm_name_list.append(shm.name) + shm_references.append(shm) # Now create a NumPy array backed by shared memory shm_data_shape = data_array.shape shm_data_dtype = data_array.dtype @@ -537,7 +539,8 @@ def stream_dict_to_shared_mem(stream_dict): shm_data_dtypes.append(shm_data_dtype) shm_data_shapes = list(set(shm_data_shapes)) shm_data_dtypes = list(set(shm_data_dtypes)) - return stream_dict, shm_name_list, shm_data_shapes, shm_data_dtypes + return (stream_dict, shm_name_list, shm_references, shm_data_shapes, + shm_data_dtypes) def compute_differential_times(catalog, correlation, stream_dict=None, @@ -670,7 +673,8 @@ def compute_differential_times(catalog, correlation, stream_dict=None, for master_filter in distance_filter) # Move trace data into shared memory if use_shared_memory: - shm_stream_dict, shm_name_list, shm_data_shapes, shm_dtypes = ( + (shm_stream_dict, shm_name_list, shm_references, + shm_data_shapes, shm_dtypes) = ( stream_dict_to_shared_mem(stream_dict)) if len(shm_data_shapes) == 1 and len(shm_dtypes) == 1: shm_data_shape = shm_data_shapes[0]