From f67f027e481ceba2f13d8615babc7c1dbe2cf881 Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Mon, 25 Mar 2024 18:38:34 +0100 Subject: [PATCH 1/9] add job_dir attribute to Response class to record where a job ran --- .pre-commit-config.yaml | 4 ++-- src/jobflow/core/job.py | 22 +++++++++++++++++----- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index bb52dad0..b8a3a0c0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -3,7 +3,7 @@ default_language_version: exclude: "^src/atomate2/vasp/schemas/calc_types/" repos: - repo: https://github.com/charliermarsh/ruff-pre-commit - rev: v0.1.11 + rev: v0.3.4 hooks: - id: ruff args: [--fix] @@ -43,7 +43,7 @@ repos: - id: rst-directive-colons - id: rst-inline-touching-normal - repo: https://github.com/pre-commit/mirrors-mypy - rev: v1.8.0 + rev: v1.9.0 hooks: - id: mypy files: ^src/ diff --git a/src/jobflow/core/job.py b/src/jobflow/core/job.py index 6d036a2b..976c1212 100644 --- a/src/jobflow/core/job.py +++ b/src/jobflow/core/job.py @@ -6,14 +6,17 @@ import typing import warnings from dataclasses import dataclass, field +from typing import cast from monty.json import MSONable, jsanitize +from typing_extensions import Self from jobflow.core.reference import OnMissing, OutputReference from jobflow.utils.uid import suid if typing.TYPE_CHECKING: from collections.abc import Hashable, Sequence + from pathlib import Path from typing import Any, Callable from networkx import DiGraph @@ -526,7 +529,7 @@ def set_uuid(self, uuid: str) -> None: self.uuid = uuid self.output = self.output.set_uuid(uuid) - def run(self, store: jobflow.JobStore) -> Response: + def run(self, store: jobflow.JobStore, job_dir: Path = None) -> Response: """ Run the job. @@ -581,7 +584,9 @@ def run(self, store: jobflow.JobStore) -> Response: function = types.MethodType(function, bound) response = function(*self.function_args, **self.function_kwargs) - response = Response.from_job_returns(response, self.output_schema) + response = Response.from_job_returns( + response, self.output_schema, job_dir=job_dir + ) if response.replace is not None: response.replace = prepare_replace(response.replace, self) @@ -1170,6 +1175,8 @@ class Response(typing.Generic[T]): Stop any children of the current flow. stop_jobflow Stop executing all remaining jobs. + job_dir + The directory where the job was run. """ output: T = None @@ -1179,13 +1186,15 @@ class Response(typing.Generic[T]): stored_data: dict[Hashable, Any] = None stop_children: bool = False stop_jobflow: bool = False + job_dir: Path = None @classmethod def from_job_returns( cls, job_returns: Any | None, output_schema: type[BaseModel] = None, - ) -> Response: + job_dir: Path = None, + ) -> Self: """ Generate a :obj:`Response` from the outputs of a :obj:`Job`. @@ -1199,6 +1208,8 @@ def from_job_returns( output_schema A pydantic model associated with the job. Used to enforce a schema for the outputs. + job_dir + The directory where the job was run. Raises ------ @@ -1215,7 +1226,8 @@ def from_job_returns( # only apply output schema if there is no replace. job_returns.output = apply_schema(job_returns.output, output_schema) - return job_returns + job_returns.job_dir = job_dir + return cast(Self, job_returns) if isinstance(job_returns, (list, tuple)): # check that a Response object is not given as one of many outputs @@ -1225,7 +1237,7 @@ def from_job_returns( "Response cannot be returned in combination with other outputs." ) - return cls(output=apply_schema(job_returns, output_schema)) + return cls(output=apply_schema(job_returns, output_schema), job_dir=job_dir) def apply_schema(output: Any, schema: type[BaseModel] | None): From 463e4c39c5cc1ecc0bb4435f880912348541d80e Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Mon, 25 Mar 2024 18:39:29 +0100 Subject: [PATCH 2/9] add typing_extensions to dev deps for Python below 3.11 --- pyproject.toml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9d9d6c2c..8770ef6b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,7 +46,7 @@ docs = [ "sphinx-copybutton==0.5.2", "sphinx==7.2.6", ] -dev = ["pre-commit>=2.12.1"] +dev = ["pre-commit>=2.12.1", "typing_extensions; python_version < '3.11'"] tests = ["moto==4.2.13", "pytest-cov==4.1.0", "pytest==8.1.1"] vis = ["matplotlib", "pydot"] fireworks = ["FireWorks"] @@ -62,8 +62,8 @@ strict = [ "pydantic==2.6.4", "pydash==7.0.7", "pydot==2.0.0", - "typing-extensions==4.10.0", "python-ulid==2.2.0", + "typing-extensions==4.10.0", ] [project.urls] @@ -122,6 +122,8 @@ exclude_lines = [ [tool.ruff] target-version = "py39" ignore-init-module-imports = true + +[tool.ruff.lint] select = [ "B", # flake8-bugbear "C4", # flake8-comprehensions @@ -170,7 +172,7 @@ ignore = [ pydocstyle.convention = "numpy" isort.known-first-party = ["jobflow"] -[tool.ruff.per-file-ignores] +[tool.ruff.lint.per-file-ignores] # F401: unused import "__init__.py" = ["F401"] # D: pydocstyle From a43022759e560e1b53dadcf665ad6b1d8edd6eef Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Mon, 25 Mar 2024 18:40:01 +0100 Subject: [PATCH 3/9] return type Self on JobStore.from_... methods --- src/jobflow/core/store.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/jobflow/core/store.py b/src/jobflow/core/store.py index 0150201f..f9a26f4c 100644 --- a/src/jobflow/core/store.py +++ b/src/jobflow/core/store.py @@ -17,6 +17,7 @@ from typing import Any, Optional, Union from maggma.core import Sort + from typing_extensions import Self from jobflow.core.schemas import JobStoreDocument @@ -545,7 +546,7 @@ def get_output( ) @classmethod - def from_file(cls: type[T], db_file: str | Path, **kwargs) -> T: + def from_file(cls, db_file: str | Path, **kwargs) -> Self: """ Create a JobStore from a database file. @@ -605,7 +606,7 @@ def from_file(cls: type[T], db_file: str | Path, **kwargs) -> T: return cls.from_dict_spec(store_info, **kwargs) @classmethod - def from_dict_spec(cls: type[T], spec: dict, **kwargs) -> T: + def from_dict_spec(cls, spec: dict, **kwargs) -> Self: """ Create an JobStore from a dict specification. From 7ea331f90fb3f86bd9132cbf133709c50e75e2b2 Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Mon, 25 Mar 2024 18:40:32 +0100 Subject: [PATCH 4/9] set response.job_dir in run_locally --- src/jobflow/managers/local.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/jobflow/managers/local.py b/src/jobflow/managers/local.py index 002fd212..37c42961 100644 --- a/src/jobflow/managers/local.py +++ b/src/jobflow/managers/local.py @@ -165,6 +165,8 @@ def _run(root_flow): with cd(job_dir): response, jobflow_stopped = _run_job(job, parents) + if response is not None: + response.job_dir = job_dir encountered_bad_response = encountered_bad_response or response is None if jobflow_stopped: return False From 7a64979026f1cb579043ec41366f3ce58cf41589 Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Mon, 25 Mar 2024 18:41:31 +0100 Subject: [PATCH 5/9] assert response.job_dir is pathlib.Path or None and dir exists if not none --- tests/managers/test_local.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/tests/managers/test_local.py b/tests/managers/test_local.py index 66751e53..38137f2d 100644 --- a/tests/managers/test_local.py +++ b/tests/managers/test_local.py @@ -1,3 +1,5 @@ +from pathlib import Path + import pytest @@ -10,7 +12,11 @@ def test_simple_job(memory_jobstore, clean_dir, simple_job): responses = run_locally(job, store=memory_jobstore) # check responses has been filled - assert responses[uuid][1].output == "12345_end" + response1 = responses[uuid][1] + assert response1.output == "12345_end" + # check job_dir + assert isinstance(response1.job_dir, Path) + assert response1.job_dir.exists() # check store has the activity output result = memory_jobstore.query_one({"uuid": uuid}) @@ -63,7 +69,8 @@ def test_simple_flow(memory_jobstore, clean_dir, simple_flow, capsys): assert len(folders) == 1 # run with folders and root_dir - assert Path(root_dir := "test").exists() is False + root_dir = "test" + assert Path(root_dir).exists() is False responses = run_locally( flow, store=memory_jobstore, create_folders=True, root_dir=root_dir ) @@ -416,7 +423,9 @@ def test_detour_stop_flow(memory_jobstore, clean_dir, detour_stop_flow): assert len(responses) == 2 assert responses[uuid1][1].output == 11 assert responses[uuid1][1].detour is not None + assert responses[uuid1][1].job_dir is None assert responses[uuid2][1].output == "1234" + assert responses[uuid2][1].job_dir is None # check store has the activity output result1 = memory_jobstore.query_one({"uuid": uuid1}) From fb786f50c736d3ce8ac214853ae84e4a24a0659f Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Mon, 25 Mar 2024 19:04:01 +0100 Subject: [PATCH 6/9] fix ruff --- examples/encode_decode.py | 1 + examples/fibonacci.py | 1 + examples/replace.py | 1 - pyproject.toml | 3 ++- src/jobflow/utils/__init__.py | 1 + src/jobflow/utils/uid.py | 1 + src/jobflow/utils/uuid.py | 1 + 7 files changed, 7 insertions(+), 2 deletions(-) diff --git a/examples/encode_decode.py b/examples/encode_decode.py index 7e73dedf..23ab42a4 100644 --- a/examples/encode_decode.py +++ b/examples/encode_decode.py @@ -1,4 +1,5 @@ """A simple example to show message passing between jobs.""" + from jobflow import Flow, job, run_locally diff --git a/examples/fibonacci.py b/examples/fibonacci.py index 51befcfe..5125dbf0 100644 --- a/examples/fibonacci.py +++ b/examples/fibonacci.py @@ -1,4 +1,5 @@ """A dynamic workflow that calculates the Fibonacci sequence.""" + from jobflow import Response, job, run_locally diff --git a/examples/replace.py b/examples/replace.py index d5fd96bf..f1044163 100644 --- a/examples/replace.py +++ b/examples/replace.py @@ -1,6 +1,5 @@ """A demonstration of how to use dynamic workflows with replace.""" - from jobflow import Flow, job, run_locally diff --git a/pyproject.toml b/pyproject.toml index 8770ef6b..bb418919 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -121,7 +121,6 @@ exclude_lines = [ [tool.ruff] target-version = "py39" -ignore-init-module-imports = true [tool.ruff.lint] select = [ @@ -161,6 +160,7 @@ ignore = [ "DTZ005", "FBT001", "FBT002", + "ISC001", "PLR0911", # too-many-return-statements "PLR0912", # too-many-branches "PLR0913", # too-many-arguments @@ -171,6 +171,7 @@ ignore = [ ] pydocstyle.convention = "numpy" isort.known-first-party = ["jobflow"] +ignore-init-module-imports = true [tool.ruff.lint.per-file-ignores] # F401: unused import diff --git a/src/jobflow/utils/__init__.py b/src/jobflow/utils/__init__.py index 117e7f3e..ed6e1648 100644 --- a/src/jobflow/utils/__init__.py +++ b/src/jobflow/utils/__init__.py @@ -1,4 +1,5 @@ """Utility functions for logging, enumerations and manipulating dictionaries.""" + from jobflow.utils.enum import ValueEnum from jobflow.utils.find import ( contains_flow_or_job, diff --git a/src/jobflow/utils/uid.py b/src/jobflow/utils/uid.py index 36b453fd..86ddd89c 100644 --- a/src/jobflow/utils/uid.py +++ b/src/jobflow/utils/uid.py @@ -1,4 +1,5 @@ """Tools for generating UUIDs.""" + from __future__ import annotations from uuid import UUID diff --git a/src/jobflow/utils/uuid.py b/src/jobflow/utils/uuid.py index d5b5a54d..f822d932 100644 --- a/src/jobflow/utils/uuid.py +++ b/src/jobflow/utils/uuid.py @@ -1,4 +1,5 @@ """Tools for generating UUIDs.""" + from monty.dev import deprecated From 145fa7fcacf0ba16054d1252b85bc76693a06aab Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Fri, 29 Mar 2024 12:14:03 +0100 Subject: [PATCH 7/9] fix: remove deprecated datetime.utcnow -> datetime.now(tz=UTC) --- src/jobflow/core/job.py | 4 ++-- src/jobflow/managers/local.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/jobflow/core/job.py b/src/jobflow/core/job.py index 976c1212..60cf6a15 100644 --- a/src/jobflow/core/job.py +++ b/src/jobflow/core/job.py @@ -1231,8 +1231,8 @@ def from_job_returns( if isinstance(job_returns, (list, tuple)): # check that a Response object is not given as one of many outputs - for r in job_returns: - if isinstance(r, Response): + for resp in job_returns: + if isinstance(resp, Response): raise ValueError( "Response cannot be returned in combination with other outputs." ) diff --git a/src/jobflow/managers/local.py b/src/jobflow/managers/local.py index 37c42961..fdfcb911 100644 --- a/src/jobflow/managers/local.py +++ b/src/jobflow/managers/local.py @@ -58,7 +58,7 @@ def run_locally( The responses of the jobs, as a dict of ``{uuid: {index: response}}``. """ from collections import defaultdict - from datetime import datetime + from datetime import UTC, datetime from pathlib import Path from random import randint @@ -152,7 +152,7 @@ def _run_job(job: jobflow.Job, parents): def _get_job_dir(): if create_folders: - time_now = datetime.utcnow().strftime(SETTINGS.DIRECTORY_FORMAT) + time_now = datetime.now(tz=UTC).strftime(SETTINGS.DIRECTORY_FORMAT) job_dir = root_dir / f"job_{time_now}-{randint(10000, 99999)}" job_dir.mkdir() return job_dir From 3835d26c53cd011c0a51b08125c2ebf1f64d8ad9 Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Fri, 29 Mar 2024 12:27:28 +0100 Subject: [PATCH 8/9] allow str or Path for job_dir, test with assert os.path.isdir(response.job_dir) --- src/jobflow/core/job.py | 4 ++-- tests/managers/test_local.py | 12 ++++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/jobflow/core/job.py b/src/jobflow/core/job.py index 60cf6a15..678a1cbe 100644 --- a/src/jobflow/core/job.py +++ b/src/jobflow/core/job.py @@ -1186,14 +1186,14 @@ class Response(typing.Generic[T]): stored_data: dict[Hashable, Any] = None stop_children: bool = False stop_jobflow: bool = False - job_dir: Path = None + job_dir: str | Path = None @classmethod def from_job_returns( cls, job_returns: Any | None, output_schema: type[BaseModel] = None, - job_dir: Path = None, + job_dir: str | Path = None, ) -> Self: """ Generate a :obj:`Response` from the outputs of a :obj:`Job`. diff --git a/tests/managers/test_local.py b/tests/managers/test_local.py index 38137f2d..b26dce69 100644 --- a/tests/managers/test_local.py +++ b/tests/managers/test_local.py @@ -1,3 +1,4 @@ +import os from pathlib import Path import pytest @@ -16,7 +17,7 @@ def test_simple_job(memory_jobstore, clean_dir, simple_job): assert response1.output == "12345_end" # check job_dir assert isinstance(response1.job_dir, Path) - assert response1.job_dir.exists() + assert os.path.isdir(response1.job_dir) # check store has the activity output result = memory_jobstore.query_one({"uuid": uuid}) @@ -166,12 +167,14 @@ def test_detour_flow(memory_jobstore, clean_dir, detour_flow): # run with log responses = run_locally(flow, store=memory_jobstore) - uuid2 = next(u for u in responses if u not in {uuid1, uuid3}) + uuid2 = next(uuid for uuid in responses if uuid not in {uuid1, uuid3}) # check responses has been filled assert len(responses) == 3 assert responses[uuid1][1].output == 11 assert responses[uuid1][1].detour is not None + assert isinstance(responses[uuid1][1].job_dir, Path) + assert os.path.isdir(responses[uuid1][1].job_dir) assert responses[uuid2][1].output == "11_end" assert responses[uuid3][1].output == "12345_end" @@ -203,6 +206,8 @@ def test_replace_flow(memory_jobstore, clean_dir, replace_flow): assert len(responses[uuid1]) == 2 assert responses[uuid1][1].output == 11 assert responses[uuid1][1].replace is not None + assert isinstance(responses[uuid1][1].job_dir, Path) + assert os.path.isdir(responses[uuid1][1].job_dir) assert responses[uuid1][2].output == "11_end" assert responses[uuid2][1].output == "12345_end" @@ -425,6 +430,7 @@ def test_detour_stop_flow(memory_jobstore, clean_dir, detour_stop_flow): assert responses[uuid1][1].detour is not None assert responses[uuid1][1].job_dir is None assert responses[uuid2][1].output == "1234" + # TODO maybe find way to set artificial job_dir and test is not None assert responses[uuid2][1].job_dir is None # check store has the activity output @@ -453,3 +459,5 @@ def test_external_reference(memory_jobstore, clean_dir, simple_job): uuid2 = job2.uuid responses = run_locally(job2, store=memory_jobstore, allow_external_references=True) assert responses[uuid2][1].output == "12345_end_end" + assert isinstance(responses[uuid2][1].job_dir, Path) + assert os.path.isdir(responses[uuid2][1].job_dir) From 227071e0a0f29c95f3b7e068f18b5cd5b4e1c772 Mon Sep 17 00:00:00 2001 From: Janosh Riebesell Date: Fri, 29 Mar 2024 12:42:01 +0100 Subject: [PATCH 9/9] UTC -> timezone.utc since UTC is py311+ --- src/jobflow/managers/local.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/jobflow/managers/local.py b/src/jobflow/managers/local.py index fdfcb911..8487081d 100644 --- a/src/jobflow/managers/local.py +++ b/src/jobflow/managers/local.py @@ -58,7 +58,7 @@ def run_locally( The responses of the jobs, as a dict of ``{uuid: {index: response}}``. """ from collections import defaultdict - from datetime import UTC, datetime + from datetime import datetime, timezone from pathlib import Path from random import randint @@ -152,7 +152,7 @@ def _run_job(job: jobflow.Job, parents): def _get_job_dir(): if create_folders: - time_now = datetime.now(tz=UTC).strftime(SETTINGS.DIRECTORY_FORMAT) + time_now = datetime.now(tz=timezone.utc).strftime(SETTINGS.DIRECTORY_FORMAT) job_dir = root_dir / f"job_{time_now}-{randint(10000, 99999)}" job_dir.mkdir() return job_dir