diff --git a/CHANGELOG.md b/CHANGELOG.md index 77af2d694..f8d01fc57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Added `DataCube.load_stac()` to also support creating a `load_stac` based cube without a connection ([#638](https://github.com/Open-EO/openeo-python-client/issues/638)) +- `MultiBackendJobManager`: Added `initialize_from_df(df)` (to `CsvJobDatabase` and `ParquetJobDatabase`) to initialize (and persist) the job database from a given DataFrame. + Also added `create_job_db()` factory to easily create a job database from a given dataframe and its type guessed from filename extension. + ([#635](https://github.com/Open-EO/openeo-python-client/issues/635)) + + ### Changed diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 4668a8d9e..eb6dd86db 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -258,14 +258,15 @@ def _make_resilient(connection): connection.session.mount("https://", HTTPAdapter(max_retries=retries)) connection.session.mount("http://", HTTPAdapter(max_retries=retries)) - def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: - """Ensure we have the required columns and the expected type for the geometry column. + @staticmethod + def _normalize_df(df: pd.DataFrame) -> pd.DataFrame: + """ + Normalize given pandas dataframe (creating a new one): + ensure we have the required columns. :param df: The dataframe to normalize. :return: a new dataframe that is normalized. """ - # TODO: this was originally an internal helper, but we need a clean public API for the user - # check for some required columns. required_with_default = [ ("status", "not_started"), @@ -440,13 +441,7 @@ def run_jobs( assert not kwargs, f"Unexpected keyword arguments: {kwargs!r}" if isinstance(job_db, (str, Path)): - job_db_path = Path(job_db) - if job_db_path.suffix.lower() == ".csv": - job_db = CsvJobDatabase(path=job_db_path) - elif job_db_path.suffix.lower() == ".parquet": - job_db = ParquetJobDatabase(path=job_db_path) - else: - raise ValueError(f"Unsupported job database file type {job_db_path!r}") + job_db = get_job_db(path=job_db) if not isinstance(job_db, JobDatabaseInterface): raise ValueError(f"Unsupported job_db {job_db!r}") @@ -456,8 +451,7 @@ def run_jobs( _log.info(f"Resuming `run_jobs` from existing {job_db}") elif df is not None: # TODO: start showing deprecation warnings for this usage pattern? - df = self._normalize_df(df) - job_db.persist(df) + job_db.initialize_from_df(df) while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: self._job_update_loop(job_db=job_db, start_job=start_job) @@ -697,6 +691,35 @@ def __init__(self): super().__init__() self._df = None + def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"): + """ + Initialize the job database from a given dataframe, + which will be first normalized to be compatible + with :py:class:`MultiBackendJobManager` usage. + + :param df: dataframe with some columns your ``start_job`` callable expects + :param on_exists: what to do when the job database already exists (persisted on disk): + - "error": (default) raise an exception + - "skip": work with existing database, ignore given dataframe and skip any initialization + + :return: initialized job database. + + .. versionadded:: 0.33.0 + """ + # TODO: option to provide custom MultiBackendJobManager subclass with custom normalize? + if self.exists(): + if on_exists == "skip": + return self + elif on_exists == "error": + raise FileExistsError(f"Job database {self!r} already exists.") + else: + # TODO handle other on_exists modes: e.g. overwrite, merge, ... + raise ValueError(f"Invalid on_exists={on_exists!r}") + df = MultiBackendJobManager._normalize_df(df) + self.persist(df) + # Return self to allow chaining with constructor. + return self + @property def df(self) -> pd.DataFrame: if self._df is None: @@ -822,3 +845,44 @@ def persist(self, df: pd.DataFrame): self._merge_into_df(df) self.path.parent.mkdir(parents=True, exist_ok=True) self.df.to_parquet(self.path, index=False) + + +def get_job_db(path: Union[str, Path]) -> JobDatabaseInterface: + """ + Factory to get a job database at a given path, + guessing the database type from filename extension. + + :param path: path to job database file. + + .. versionadded:: 0.33.0 + """ + path = Path(path) + if path.suffix.lower() in {".csv"}: + job_db = CsvJobDatabase(path=path) + elif path.suffix.lower() in {".parquet", ".geoparquet"}: + job_db = ParquetJobDatabase(path=path) + else: + raise ValueError(f"Could not guess job database type from {path!r}") + return job_db + + +def create_job_db(path: Union[str, Path], df: pd.DataFrame, *, on_exists: str = "error"): + """ + Factory to create a job database at given path, + initialized from a given dataframe, + and its database type guessed from filename extension. + + :param path: Path to the job database file. + :param df: DataFrame to store in the job database. + :param on_exists: What to do when the job database already exists: + - "error": (default) raise an exception + - "skip": work with existing database, ignore given dataframe and skip any initialization + + .. versionadded:: 0.33.0 + """ + job_db = get_job_db(path) + if isinstance(job_db, FullDataFrameJobDatabase): + job_db.initialize_from_df(df=df, on_exists=on_exists) + else: + raise NotImplementedError(f"Initialization of {type(job_db)} is not supported.") + return job_db diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index f1c6eaff6..87a483652 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -29,6 +29,8 @@ CsvJobDatabase, MultiBackendJobManager, ParquetJobDatabase, + create_job_db, + get_job_db, ) from openeo.util import rfc3339 @@ -131,9 +133,7 @@ def start_job(row, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - job_db = CsvJobDatabase(output_file) - # TODO #636 avoid this cumbersome pattern using private _normalize_df API - job_db.persist(manager._normalize_df(df)) + job_db = CsvJobDatabase(output_file).initialize_from_df(df) manager.run_jobs(job_db=job_db, start_job=start_job) assert sleep_mock.call_count > 10 @@ -148,6 +148,58 @@ def start_job(row, connection, **kwargs): metadata_path = manager.get_job_metadata_path(job_id="job-2022") assert metadata_path.exists() + @pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase]) + def test_db_class(self, tmp_path, requests_mock, sleep_mock, db_class): + """ + Basic run parameterized on database class + """ + manager = self._create_basic_mocked_manager(requests_mock, tmp_path) + + def start_job(row, connection, **kwargs): + year = int(row["year"]) + return BatchJob(job_id=f"job-{year}", connection=connection) + + df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]}) + output_file = tmp_path / "jobs.db" + job_db = db_class(output_file).initialize_from_df(df) + + manager.run_jobs(job_db=job_db, start_job=start_job) + assert sleep_mock.call_count > 10 + + result = job_db.read() + assert len(result) == 5 + assert set(result.status) == {"finished"} + assert set(result.backend_name) == {"foo", "bar"} + + @pytest.mark.parametrize( + ["filename", "expected_db_class"], + [ + ("jobz.csv", CsvJobDatabase), + ("jobz.parquet", ParquetJobDatabase), + ], + ) + def test_create_job_db(self, tmp_path, requests_mock, sleep_mock, filename, expected_db_class): + """ + Basic run with `create_job_db()` usage + """ + manager = self._create_basic_mocked_manager(requests_mock, tmp_path) + + def start_job(row, connection, **kwargs): + year = int(row["year"]) + return BatchJob(job_id=f"job-{year}", connection=connection) + + df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]}) + output_file = tmp_path / filename + job_db = create_job_db(path=output_file, df=df) + + manager.run_jobs(job_db=job_db, start_job=start_job) + assert sleep_mock.call_count > 10 + + result = job_db.read() + assert len(result) == 5 + assert set(result.status) == {"finished"} + assert set(result.backend_name) == {"foo", "bar"} + def test_basic_threading(self, tmp_path, requests_mock, sleep_mock): manager = self._create_basic_mocked_manager(requests_mock, tmp_path) @@ -164,9 +216,7 @@ def start_job(row, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - job_db = CsvJobDatabase(output_file) - # TODO #636 avoid this cumbersome pattern using private _normalize_df API - job_db.persist(manager._normalize_df(df)) + job_db = CsvJobDatabase(output_file).initialize_from_df(df) manager.start_job_thread(start_job=start_job, job_db=job_db) # Trigger context switch to job thread @@ -244,14 +294,8 @@ def mock_job_status(job_id, queued=1, running=2): return manager def test_normalize_df(self): - df = pd.DataFrame( - { - "some_number": [3, 2, 1], - } - ) - - df_normalized = MultiBackendJobManager()._normalize_df(df) - + df = pd.DataFrame({"some_number": [3, 2, 1]}) + df_normalized = MultiBackendJobManager._normalize_df(df) assert set(df_normalized.columns) == set( [ "some_number", @@ -636,6 +680,63 @@ def test_automatic_cancel_of_too_long_running_jobs( ) +class TestFullDataFrameJobDatabase: + @pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase]) + def test_initialize_from_df(self, tmp_path, db_class): + orig_df = pd.DataFrame({"some_number": [3, 2, 1]}) + path = tmp_path / "jobs.db" + + db = db_class(path) + assert not path.exists() + db.initialize_from_df(orig_df) + assert path.exists() + + # Check persisted CSV + assert path.exists() + expected_columns = { + "some_number", + "status", + "id", + "start_time", + "running_start_time", + "cpu", + "memory", + "duration", + "backend_name", + } + + actual_columns = set(db_class(path).read().columns) + assert actual_columns == expected_columns + + @pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase]) + def test_initialize_from_df_on_exists_error(self, tmp_path, db_class): + df = pd.DataFrame({"some_number": [3, 2, 1]}) + path = tmp_path / "jobs.csv" + _ = db_class(path).initialize_from_df(df, on_exists="error") + assert path.exists() + + with pytest.raises(FileExistsError, match="Job database.* already exists"): + _ = db_class(path).initialize_from_df(df, on_exists="error") + + assert set(db_class(path).read()["some_number"]) == {1, 2, 3} + + @pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase]) + def test_initialize_from_df_on_exists_skip(self, tmp_path, db_class): + path = tmp_path / "jobs.csv" + + db = db_class(path).initialize_from_df( + pd.DataFrame({"some_number": [3, 2, 1]}), + on_exists="skip", + ) + assert set(db.read()["some_number"]) == {1, 2, 3} + + db = db_class(path).initialize_from_df( + pd.DataFrame({"some_number": [444, 555, 666]}), + on_exists="skip", + ) + assert set(db.read()["some_number"]) == {1, 2, 3} + + class TestCsvJobDatabase: def test_repr(self, tmp_path): @@ -695,7 +796,7 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame): ], ) def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame): - path = tmp_path / "jobs.parquet" + path = tmp_path / "jobs.csv" required_with_default = [ ("status", "not_started"), @@ -726,6 +827,56 @@ def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame): assert all.loc[2,"status"] == "not_started" print(loaded.index) + def test_initialize_from_df(self, tmp_path): + orig_df = pd.DataFrame({"some_number": [3, 2, 1]}) + path = tmp_path / "jobs.csv" + + # Initialize the CSV from the dataframe + _ = CsvJobDatabase(path).initialize_from_df(orig_df) + + # Check persisted CSV + assert path.exists() + expected_columns = { + "some_number", + "status", + "id", + "start_time", + "running_start_time", + "cpu", + "memory", + "duration", + "backend_name", + } + + # Raw file content check + raw_columns = set(path.read_text().split("\n")[0].split(",")) + # Higher level read + read_columns = set(CsvJobDatabase(path).read().columns) + + assert raw_columns == expected_columns + assert read_columns == expected_columns + + def test_initialize_from_df_on_exists_error(self, tmp_path): + orig_df = pd.DataFrame({"some_number": [3, 2, 1]}) + path = tmp_path / "jobs.csv" + _ = CsvJobDatabase(path).initialize_from_df(orig_df, on_exists="error") + with pytest.raises(FileExistsError, match="Job database.* already exists"): + _ = CsvJobDatabase(path).initialize_from_df(orig_df, on_exists="error") + + def test_initialize_from_df_on_exists_skip(self, tmp_path): + path = tmp_path / "jobs.csv" + + db = CsvJobDatabase(path).initialize_from_df( + pd.DataFrame({"some_number": [3, 2, 1]}), + on_exists="skip", + ) + assert set(db.read()["some_number"]) == {1, 2, 3} + + db = CsvJobDatabase(path).initialize_from_df( + pd.DataFrame({"some_number": [444, 555, 666]}), + on_exists="skip", + ) + assert set(db.read()["some_number"]) == {1, 2, 3} class TestParquetJobDatabase: @@ -753,3 +904,56 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame): assert loaded.dtypes.to_dict() == orig.dtypes.to_dict() assert loaded.equals(orig) assert type(orig) is type(loaded) + + def test_initialize_from_df(self, tmp_path): + orig_df = pd.DataFrame({"some_number": [3, 2, 1]}) + path = tmp_path / "jobs.parquet" + + # Initialize the CSV from the dataframe + _ = ParquetJobDatabase(path).initialize_from_df(orig_df) + + # Check persisted CSV + assert path.exists() + expected_columns = { + "some_number", + "status", + "id", + "start_time", + "running_start_time", + "cpu", + "memory", + "duration", + "backend_name", + } + + df_from_disk = ParquetJobDatabase(path).read() + assert set(df_from_disk.columns) == expected_columns + + +@pytest.mark.parametrize( + ["filename", "expected"], + [ + ("jobz.csv", CsvJobDatabase), + ("jobz.parquet", ParquetJobDatabase), + ], +) +def test_get_job_db(tmp_path, filename, expected): + path = tmp_path / filename + db = get_job_db(path) + assert isinstance(db, expected) + assert not path.exists() + + +@pytest.mark.parametrize( + ["filename", "expected"], + [ + ("jobz.csv", CsvJobDatabase), + ("jobz.parquet", ParquetJobDatabase), + ], +) +def test_create_job_db(tmp_path, filename, expected): + df = pd.DataFrame({"year": [2023, 2024]}) + path = tmp_path / filename + db = create_job_db(path=path, df=df) + assert isinstance(db, expected) + assert path.exists()