From d46fb3e5c727b2bb4bb651ebf42c902129c3e7f2 Mon Sep 17 00:00:00 2001 From: Gary Pavlis Date: Mon, 4 Dec 2023 10:46:59 -0500 Subject: [PATCH] Fix spark setup problem and section to cleanup files created in data area --- python/tests/io/test_distributed.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/python/tests/io/test_distributed.py b/python/tests/io/test_distributed.py index 3bf53d937..0bf271332 100644 --- a/python/tests/io/test_distributed.py +++ b/python/tests/io/test_distributed.py @@ -36,7 +36,7 @@ wfdir = "python/tests/data/wf_filetestdata" @pytest.fixture -def setup_module_environment(scope="module"): +def setup_environment(): if os.path.exists(wfdir): raise Exception("test_distributed setup: scratch directory={} exists. Must be empty to run this test".format(wfdir)) else: @@ -47,7 +47,7 @@ def setup_module_environment(scope="module"): if os.path.exists(wfdir): shutil.rmtree(wfdir) -@pytest.fixture(scope="module") +@pytest.fixture def spark(): sc = SparkContext("local", "io_distributed_testing") yield sc @@ -413,7 +413,7 @@ def set_dir_dfile_ensemble(d): ("spark","wf_TimeSeries"), ("spark","wf_Seismogram") ]) -def test_read_distributed_atomic(spark,atomic_time_series_generator,atomic_seismogram_generator,scheduler,collection): +def test_read_distributed_atomic(setup_environment,spark,atomic_time_series_generator,atomic_seismogram_generator,scheduler,collection): """ This function is run with multiple tests to test atomic read (mostly) and limited writes with the io.distributed module. That is, read_distributed_data @@ -622,7 +622,8 @@ def test_read_distributed_atomic(spark,atomic_time_series_generator,atomic_seism ("spark","wf_TimeSeries"), ("spark","wf_Seismogram") ]) -def test_write_distributed_atomic(scheduler, +def test_write_distributed_atomic(setup_environment, + scheduler, collection, spark, define_kill_one, @@ -974,7 +975,7 @@ def get_srclist_by_tag(db,data_tag)->list: ("spark","wf_TimeSeries"), ("spark","wf_Seismogram") ]) -def test_read_distributed_ensemble(spark,TimeSeriesEnsemble_generator,SeismogramEnsemble_generator,scheduler,collection): +def test_read_distributed_ensemble(setup_environment,spark,TimeSeriesEnsemble_generator,SeismogramEnsemble_generator,scheduler,collection): print("Starting test with scheduler=",scheduler, " and collection=",collection) if scheduler=="spark": context=spark @@ -1122,7 +1123,8 @@ def test_read_distributed_ensemble(spark,TimeSeriesEnsemble_generator,Seismogram ("spark","wf_TimeSeries"), ("spark","wf_Seismogram"), ]) -def test_write_distributed_ensemble(scheduler, +def test_write_distributed_ensemble(setup_environment, + scheduler, collection, spark, define_kill_one_member,