Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speedup 09: Use shared memory for hypo-dd write_correlations for 20 % speedup #529

9 changes: 6 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
* utils.correlate
- 25 % speedup for `_get_array_dicts` with quicker access to properties.
* utils.catalog_to_dd
- ._prepare_stream
- _prepare_stream
- Now more consistently slices templates to length = extract_len * samp_rate
so that user receives less warnings about insufficient data.
- Add ability to weight correlations by raw correlation rather than just
correlation squared.
- write_correlations
- New option `use_shared_memory` to speed up correlation of many events by
ca. 20 % by moving trace data into shared memory.
- Add ability to weight correlations by raw correlation rather than just
correlation squared.
* utils.cluster.decluster_distance_time
- Bug-fix: fix segmentation fault when declustering more than 46340 detections
with hypocentral_separation.
Expand Down
15 changes: 15 additions & 0 deletions eqcorrscan/tests/catalog_to_dd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,21 @@ def test_write_correlations_parallel_process(self):
self.assertTrue(os.path.isfile("dt.cc"))
os.remove('dt.cc')

def test_write_correlations_parallel_shared_memory(self):
# Contents checked elsewhere
shift_len = 2
short_cat = self.catalog[0:10]
stream_dict = {event.resource_id.id: stream
for event, stream in zip(short_cat, self.streams)}
write_correlations(
catalog=short_cat, event_id_mapper=None,
max_sep=8., min_link=0, min_cc=0.0, stream_dict=stream_dict,
extract_len=2.0, pre_pick=0.5, shift_len=shift_len,
interpolate=False, parallel_process=True, max_workers=2,
use_shared_memory=True)
self.assertTrue(os.path.isfile("dt.cc"))
os.remove('dt.cc')

def test_write_correlations_parallel_trace_correlation(self):
# Contents checked elsewhere
shift_len = 2
Expand Down
100 changes: 96 additions & 4 deletions eqcorrscan/utils/catalog_to_dd.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
import numpy as np
import logging
from collections import namedtuple, defaultdict, Counter
from obspy.core import stream
from multiprocessing import cpu_count, Pool
from multiprocessing import cpu_count, Pool, shared_memory

from obspy import UTCDateTime, Stream
from obspy.core.event import (
Expand Down Expand Up @@ -234,12 +233,25 @@ def _prepare_stream(stream, event, extract_len, pre_pick, seed_pick_ids=None):
def _compute_dt_correlations(catalog, master, min_link, event_id_mapper,
stream_dict, min_cc, extract_len, pre_pick,
shift_len, interpolate, max_workers=1,
shm_data_shape=None, shm_dtype=None,
weight_by_square=True, **kwargs):
""" Compute cross-correlation delay times. """
max_workers = max_workers or 1
Logger.info(
f"Correlating {str(master.resource_id)} with {len(catalog)} events")
differential_times_dict = dict()
# Assign trace data from shared memory
for (key, stream) in stream_dict.items():
for tr in stream:
if len(tr.data) == 0 and hasattr(tr, 'shared_memory_name'):
shm = shared_memory.SharedMemory(name=tr.shared_memory_name)
# Reconstructing numpy data array
sm_data = np.ndarray(
shm_data_shape, dtype=shm_dtype, buffer=shm.buf)
tr.data = np.zeros_like(sm_data)
# Copy data into process memory
tr.data[:] = sm_data[:]

master_stream = _prepare_stream(
stream=stream_dict[str(master.resource_id)], event=master,
extract_len=extract_len, pre_pick=pre_pick)
Expand Down Expand Up @@ -481,13 +493,67 @@ def _prep_horiz_picks(catalog, stream_dict, event_id_mapper):
return catalog


def stream_dict_to_shared_mem(stream_dict):
"""
Move the data of streams from a dict of (key, obspy.stream) into shared
memory so that the data can be retrieved by multiple processes in parallel.
This can help speed up parallel execution because the initiation of each
worker process becomes cheaper (less data to transfer). For now this only
puts the numpy array in trace.data into shared memory (because it's easy).

:type stream_dict: dict of (key, `obspy.stream`)
:param stream_dict: dict of streams that should be moved to shared memory

:returns: stream_dict, shm_name_list, shm_data_shapes, shm_data_dtypes

:rtype: dict
:return: Dictionary streams that were moved to shared memory
:rtype: list
:return: List of names to the shared memory address for each trace.
:rtype: list
:return:
List of numpy-array shaped for each trace-data array in shared memory.
:rtype: list
:return: List of data types for each trace-data-array in shared memory.

"""
shm_name_list = []
shm_data_shapes = []
shm_data_dtypes = []
shm_references = []
for (key, stream) in stream_dict.items():
for tr in stream:
data_array = tr.data
# Let SharedMemory create suitable filename itself:
shm = shared_memory.SharedMemory(
create=True, size=data_array.nbytes)
shm_name_list.append(shm.name)
shm_references.append(shm)
# Now create a NumPy array backed by shared memory
shm_data_shape = data_array.shape
shm_data_dtype = data_array.dtype
shared_data_array = np.ndarray(
shm_data_shape, dtype=shm_data_dtype, buffer=shm.buf)
# Copy the original data into shared memory
shared_data_array[:] = data_array[:]
# tr.data = shared_data_array
tr.data = np.array([])
tr.shared_memory_name = shm.name
shm_data_shapes.append(shm_data_shape)
shm_data_dtypes.append(shm_data_dtype)
shm_data_shapes = list(set(shm_data_shapes))
shm_data_dtypes = list(set(shm_data_dtypes))
return (stream_dict, shm_name_list, shm_references, shm_data_shapes,
shm_data_dtypes)


def compute_differential_times(catalog, correlation, stream_dict=None,
event_id_mapper=None, max_sep=8., min_link=8,
min_cc=None, extract_len=None, pre_pick=None,
shift_len=None, interpolate=False,
all_horiz=False, max_workers=None,
max_trace_workers=1, weight_by_square=True,
*args, **kwargs):
max_trace_workers=1, use_shared_memory=False,
weight_by_square=True, *args, **kwargs):
"""
Generate groups of differential times for a catalog.

Expand Down Expand Up @@ -534,6 +600,11 @@ def compute_differential_times(catalog, correlation, stream_dict=None,
Maximum number of workers for parallel correlation of traces insted of
events. If None then all threads will be used (but can only be used
when max_workers = 1).
:type use_shared_memory: bool
:param use_shared_memory:
Whether to move trace data arrays into shared memory for computing
trace correlations. Can speed up total execution time by ~20 % for
hypodd-correlations with a lot of clustered seismicity.
:type weight_by_square: bool
:param weight_by_square:
Whether to compute correlation weights as the square of the maximum
Expand Down Expand Up @@ -612,6 +683,19 @@ def compute_differential_times(catalog, correlation, stream_dict=None,
sub_catalogs = ([ev for i, ev in enumerate(sparse_catalog)
if master_filter[i]]
for master_filter in distance_filter)
# Move trace data into shared memory
if use_shared_memory:
(shm_stream_dict, shm_name_list, shm_references,
shm_data_shapes, shm_dtypes) = (
stream_dict_to_shared_mem(stream_dict))
if len(shm_data_shapes) == 1 and len(shm_dtypes) == 1:
shm_data_shape = shm_data_shapes[0]
shm_dtype = shm_dtypes[0]
additional_args.update({'stream_dict': shm_stream_dict})
additional_args.update({'shm_data_shape': shm_data_shape})
additional_args.update({'shm_dtype': shm_dtype})
else:
use_shared_memory = False
with pool_boy(Pool, n, cores=max_workers) as pool:
# Parallelize over events instead of traces
additional_args.update(dict(max_workers=1))
Expand All @@ -623,11 +707,19 @@ def compute_differential_times(catalog, correlation, stream_dict=None,
sparse_catalog)
if str(master.resource_id) in additional_args[
"stream_dict"].keys()]
Logger.info('Submitted asynchronous jobs to workers.')
differential_times = {
master.resource_id: result.get()
for master, result in zip(sparse_catalog, results)
if str(master.resource_id) in additional_args[
"stream_dict"].keys()}
Logger.debug('Got results from workers.')
# Destroy shared memory
if use_shared_memory:
for shm_name in shm_name_list:
shm = shared_memory.SharedMemory(name=shm_name)
shm.close()
shm.unlink()
else:
sub_catalogs = ([ev for i, ev in enumerate(sparse_catalog)
if master_filter[i]]
Expand Down