Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job_dir attribute to Response class to record where a job ran #570

Merged
merged 10 commits into from
Mar 29, 2024
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ default_language_version:
exclude: "^src/atomate2/vasp/schemas/calc_types/"
repos:
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.1.11
rev: v0.3.4
hooks:
- id: ruff
args: [--fix]
Expand Down Expand Up @@ -43,7 +43,7 @@ repos:
- id: rst-directive-colons
- id: rst-inline-touching-normal
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
rev: v1.9.0
hooks:
- id: mypy
files: ^src/
Expand Down
1 change: 1 addition & 0 deletions examples/encode_decode.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A simple example to show message passing between jobs."""

from jobflow import Flow, job, run_locally


Expand Down
1 change: 1 addition & 0 deletions examples/fibonacci.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""A dynamic workflow that calculates the Fibonacci sequence."""

from jobflow import Response, job, run_locally


Expand Down
1 change: 0 additions & 1 deletion examples/replace.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""A demonstration of how to use dynamic workflows with replace."""


from jobflow import Flow, job, run_locally


Expand Down
11 changes: 7 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ docs = [
"sphinx-copybutton==0.5.2",
"sphinx==7.2.6",
]
dev = ["pre-commit>=2.12.1"]
dev = ["pre-commit>=2.12.1", "typing_extensions; python_version < '3.11'"]
tests = ["moto==4.2.13", "pytest-cov==5.0.0", "pytest==8.1.1"]
vis = ["matplotlib", "pydot"]
fireworks = ["FireWorks"]
Expand All @@ -62,8 +62,8 @@ strict = [
"pydantic==2.6.4",
"pydash==7.0.7",
"pydot==2.0.0",
"typing-extensions==4.10.0",
"python-ulid==2.2.0",
"typing-extensions==4.10.0",
]

[project.urls]
Expand Down Expand Up @@ -121,7 +121,8 @@ exclude_lines = [

[tool.ruff]
target-version = "py39"
ignore-init-module-imports = true

[tool.ruff.lint]
select = [
"B", # flake8-bugbear
"C4", # flake8-comprehensions
Expand Down Expand Up @@ -159,6 +160,7 @@ ignore = [
"DTZ005",
"FBT001",
"FBT002",
"ISC001",
"PLR0911", # too-many-return-statements
"PLR0912", # too-many-branches
"PLR0913", # too-many-arguments
Expand All @@ -169,8 +171,9 @@ ignore = [
]
pydocstyle.convention = "numpy"
isort.known-first-party = ["jobflow"]
ignore-init-module-imports = true

[tool.ruff.per-file-ignores]
[tool.ruff.lint.per-file-ignores]
# F401: unused import
"__init__.py" = ["F401"]
# D: pydocstyle
Expand Down
26 changes: 19 additions & 7 deletions src/jobflow/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@
import typing
import warnings
from dataclasses import dataclass, field
from typing import cast

from monty.json import MSONable, jsanitize
from typing_extensions import Self

from jobflow.core.reference import OnMissing, OutputReference
from jobflow.utils.uid import suid

if typing.TYPE_CHECKING:
from collections.abc import Hashable, Sequence
from pathlib import Path
from typing import Any, Callable

from networkx import DiGraph
Expand Down Expand Up @@ -526,7 +529,7 @@ def set_uuid(self, uuid: str) -> None:
self.uuid = uuid
self.output = self.output.set_uuid(uuid)

def run(self, store: jobflow.JobStore) -> Response:
def run(self, store: jobflow.JobStore, job_dir: Path = None) -> Response:
"""
Run the job.

Expand Down Expand Up @@ -581,7 +584,9 @@ def run(self, store: jobflow.JobStore) -> Response:
function = types.MethodType(function, bound)

response = function(*self.function_args, **self.function_kwargs)
response = Response.from_job_returns(response, self.output_schema)
response = Response.from_job_returns(
response, self.output_schema, job_dir=job_dir
)

if response.replace is not None:
response.replace = prepare_replace(response.replace, self)
Expand Down Expand Up @@ -1170,6 +1175,8 @@ class Response(typing.Generic[T]):
Stop any children of the current flow.
stop_jobflow
Stop executing all remaining jobs.
job_dir
The directory where the job was run.
"""

output: T = None
Expand All @@ -1179,13 +1186,15 @@ class Response(typing.Generic[T]):
stored_data: dict[Hashable, Any] = None
stop_children: bool = False
stop_jobflow: bool = False
job_dir: str | Path = None

@classmethod
def from_job_returns(
cls,
job_returns: Any | None,
output_schema: type[BaseModel] = None,
) -> Response:
job_dir: str | Path = None,
) -> Self:
"""
Generate a :obj:`Response` from the outputs of a :obj:`Job`.

Expand All @@ -1199,6 +1208,8 @@ def from_job_returns(
output_schema
A pydantic model associated with the job. Used to enforce a schema for the
outputs.
job_dir
The directory where the job was run.

Raises
------
Expand All @@ -1215,17 +1226,18 @@ def from_job_returns(
# only apply output schema if there is no replace.
job_returns.output = apply_schema(job_returns.output, output_schema)

return job_returns
job_returns.job_dir = job_dir
return cast(Self, job_returns)

if isinstance(job_returns, (list, tuple)):
# check that a Response object is not given as one of many outputs
for r in job_returns:
if isinstance(r, Response):
for resp in job_returns:
if isinstance(resp, Response):
raise ValueError(
"Response cannot be returned in combination with other outputs."
)

return cls(output=apply_schema(job_returns, output_schema))
return cls(output=apply_schema(job_returns, output_schema), job_dir=job_dir)


def apply_schema(output: Any, schema: type[BaseModel] | None):
Expand Down
5 changes: 3 additions & 2 deletions src/jobflow/core/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from typing import Any, Optional, Union

from maggma.core import Sort
from typing_extensions import Self

from jobflow.core.schemas import JobStoreDocument

Expand Down Expand Up @@ -545,7 +546,7 @@ def get_output(
)

@classmethod
def from_file(cls: type[T], db_file: str | Path, **kwargs) -> T:
def from_file(cls, db_file: str | Path, **kwargs) -> Self:
"""
Create a JobStore from a database file.

Expand Down Expand Up @@ -605,7 +606,7 @@ def from_file(cls: type[T], db_file: str | Path, **kwargs) -> T:
return cls.from_dict_spec(store_info, **kwargs)

@classmethod
def from_dict_spec(cls: type[T], spec: dict, **kwargs) -> T:
def from_dict_spec(cls, spec: dict, **kwargs) -> Self:
"""
Create an JobStore from a dict specification.

Expand Down
6 changes: 4 additions & 2 deletions src/jobflow/managers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def run_locally(
The responses of the jobs, as a dict of ``{uuid: {index: response}}``.
"""
from collections import defaultdict
from datetime import datetime
from datetime import datetime, timezone
from pathlib import Path
from random import randint

Expand Down Expand Up @@ -152,7 +152,7 @@ def _run_job(job: jobflow.Job, parents):

def _get_job_dir():
if create_folders:
time_now = datetime.utcnow().strftime(SETTINGS.DIRECTORY_FORMAT)
time_now = datetime.now(tz=timezone.utc).strftime(SETTINGS.DIRECTORY_FORMAT)
job_dir = root_dir / f"job_{time_now}-{randint(10000, 99999)}"
job_dir.mkdir()
return job_dir
Expand All @@ -165,6 +165,8 @@ def _run(root_flow):
with cd(job_dir):
response, jobflow_stopped = _run_job(job, parents)

if response is not None:
response.job_dir = job_dir
encountered_bad_response = encountered_bad_response or response is None
if jobflow_stopped:
return False
Expand Down
1 change: 1 addition & 0 deletions src/jobflow/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Utility functions for logging, enumerations and manipulating dictionaries."""

from jobflow.utils.enum import ValueEnum
from jobflow.utils.find import (
contains_flow_or_job,
Expand Down
1 change: 1 addition & 0 deletions src/jobflow/utils/uid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tools for generating UUIDs."""

from __future__ import annotations

from uuid import UUID
Expand Down
1 change: 1 addition & 0 deletions src/jobflow/utils/uuid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Tools for generating UUIDs."""

from monty.dev import deprecated


Expand Down
23 changes: 20 additions & 3 deletions tests/managers/test_local.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
from pathlib import Path

import pytest


Expand All @@ -10,7 +13,11 @@ def test_simple_job(memory_jobstore, clean_dir, simple_job):
responses = run_locally(job, store=memory_jobstore)

# check responses has been filled
assert responses[uuid][1].output == "12345_end"
response1 = responses[uuid][1]
assert response1.output == "12345_end"
# check job_dir
assert isinstance(response1.job_dir, Path)
assert os.path.isdir(response1.job_dir)

# check store has the activity output
result = memory_jobstore.query_one({"uuid": uuid})
Expand Down Expand Up @@ -63,7 +70,8 @@ def test_simple_flow(memory_jobstore, clean_dir, simple_flow, capsys):
assert len(folders) == 1

# run with folders and root_dir
assert Path(root_dir := "test").exists() is False
root_dir = "test"
assert Path(root_dir).exists() is False
responses = run_locally(
flow, store=memory_jobstore, create_folders=True, root_dir=root_dir
)
Expand Down Expand Up @@ -159,12 +167,14 @@ def test_detour_flow(memory_jobstore, clean_dir, detour_flow):

# run with log
responses = run_locally(flow, store=memory_jobstore)
uuid2 = next(u for u in responses if u not in {uuid1, uuid3})
uuid2 = next(uuid for uuid in responses if uuid not in {uuid1, uuid3})

# check responses has been filled
assert len(responses) == 3
assert responses[uuid1][1].output == 11
assert responses[uuid1][1].detour is not None
assert isinstance(responses[uuid1][1].job_dir, Path)
assert os.path.isdir(responses[uuid1][1].job_dir)
assert responses[uuid2][1].output == "11_end"
assert responses[uuid3][1].output == "12345_end"

Expand Down Expand Up @@ -196,6 +206,8 @@ def test_replace_flow(memory_jobstore, clean_dir, replace_flow):
assert len(responses[uuid1]) == 2
assert responses[uuid1][1].output == 11
assert responses[uuid1][1].replace is not None
assert isinstance(responses[uuid1][1].job_dir, Path)
assert os.path.isdir(responses[uuid1][1].job_dir)
assert responses[uuid1][2].output == "11_end"
assert responses[uuid2][1].output == "12345_end"

Expand Down Expand Up @@ -416,7 +428,10 @@ def test_detour_stop_flow(memory_jobstore, clean_dir, detour_stop_flow):
assert len(responses) == 2
assert responses[uuid1][1].output == 11
assert responses[uuid1][1].detour is not None
assert responses[uuid1][1].job_dir is None
assert responses[uuid2][1].output == "1234"
# TODO maybe find way to set artificial job_dir and test is not None
assert responses[uuid2][1].job_dir is None

# check store has the activity output
result1 = memory_jobstore.query_one({"uuid": uuid1})
Expand Down Expand Up @@ -444,3 +459,5 @@ def test_external_reference(memory_jobstore, clean_dir, simple_job):
uuid2 = job2.uuid
responses = run_locally(job2, store=memory_jobstore, allow_external_references=True)
assert responses[uuid2][1].output == "12345_end_end"
assert isinstance(responses[uuid2][1].job_dir, Path)
assert os.path.isdir(responses[uuid2][1].job_dir)
Loading