Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix pass manager config for dynamic jobs #502

Closed

Conversation

FabiPi3
Copy link
Contributor

@FabiPi3 FabiPi3 commented Dec 8, 2023

Description & Motivation

I am writing dynamic workflows, basically using the replace feature of jobflow. I have a single starting job, which adds always a new job if some criterion is not yet fulfilled (In principle following the fibonacci example at https://github.com/materialsproject/jobflow/blob/main/examples/fibonacci.py). Now I want to use a manager to execute the workflow (specifially jobflow-remote), and I have to specify some manager_config's. Now I want that every step in my workflow uses different configurations, so I somehow have to specify them.

I know that there are ways to do that, for example set pass_manager_config=True when specifying configs or even use the response_manager_config to specify new configs for replace jobs. But I don't know how to do it for say 5 steps in the workflow, where every step uses different configs. And I find it very implicit and around the corner to specify this all beforehand (and sometimes I don't even know before execution).

What I would like to do is to set this in the job itself. I am not sure whether this breaks one of the basic design rules of jobflow. Here is a stupid example to show this:

@job
def replace_example(a, threshold):
    if a >= threshold:
        return "Finished!"

    new_job = replace_example(a + 1)
    new_job.name = f"replace_job_{a+1}"
    # new config depend on input value 'a'
    new_config = JobConfig(manager_config={"resources": {"abc": a+1}})
    new_job.update_config(config=new_config)
    return Response("Keep going...", replace=new_job)

But this does not work. The reason is, as far as I found out, the last function of src/jobflow/core/job.py, namely pass_manager_config(), which ends with the following lines of code:

# update manager config
for ajob in all_jobs:
    ajob.config.manager_config = deepcopy(manager_config)

This overwrites the manager_config of all new jobs with the old manager_config. Here is where my suggestion comes into play, I would like to change this into:

# update manager config
for ajob in all_jobs:
    ajob.config.manager_config = manager_config | ajob.config.manager_config

Which would instead merge the new config with the old one. Now my idea seems to work.

Tests

While running the tests of jobflow for my new changes, I found an issue in the tests. Consider the following (minimal) example of tests:

from jobflow import Job, Response, JobConfig

def add(a, b=5):
    return a + b

def replace_flow():
    from jobflow import Flow

    job = Job(add, function_args=(1,))
    flow = Flow([job], output=job.output)
    return Response(replace=flow)

def test_1(memory_jobstore):
    manager_config = {"abc": 1}
    pass_config = JobConfig(manager_config=manager_config, pass_manager_config=True)

    test_job = Job(replace_flow, config=pass_config)
    response = test_job.run(memory_jobstore)
    for j in response.replace:
        assert j.config.manager_config == manager_config

def test2(memory_jobstore):
    manager_config = {"abc": 1}
    manager_config2 = {"abc": 2}
    response_config = JobConfig(
        manager_config=manager_config, response_manager_config=manager_config2
    )

    test_job = Job(replace_flow, config=response_config)
    response = test_job.run(memory_jobstore)
    for j in response.replace:
        assert j.config.manager_config == manager_config2

With the old version of overwriting the manager_config, everything worked fine. With my new changes, this happened: Running ONLY ONE of the two tests independently, both work. But if I execute BOTH jobs in a single pytest run, the second job fails.

The reason seems to be in the store_inputs job, which is a trick to make the replace with a flow working. For whatever reason I do not understand, this job has an non-empty manager_config in the second run (which of course was ignored in the first version of the code, but now it leads to an error).

More in detail: looking in the function prepare_replace in src/jobflow/core/job.py line 1319 and adding a print here:

store_output_job = store_inputs(replace.output)
print(store_output_job.config.manager_config)

should always give an empty dict back. But if I run pytest tests/core/test_job.py -k test_job_config I see something in the dictionary:

{}
{}
{'abc': 1}

And I have no idea why. Now the fix I propose here and what worked locally is to remove the @job decorator from the store_inputs function and convert it explicitly into a Job using the same JobConfig. See the changes for that. What do you think?

In any case please provide some feedback on my initial example, or suggest some improvement on how I could achieve my goal.

Checklist

  • Code is in the standard Python style.
    The easiest way to handle this is to run the following in the correct sequence on
    your local machine. Start with running black on your new code. This will
    automatically reformat your code to PEP8 conventions and removes most issues. Then run
    pycodestyle, followed by flake8.
  • Docstrings have been added in theNumpy docstring format.
    Run pydocstyle on your code.
  • Type annotations are highly encouraged. Run mypy to
    type check your code.
  • Tests have been added for any new functionality or bug fixes.
  • All linting and tests pass.

@FabiPi3
Copy link
Contributor Author

FabiPi3 commented Dec 8, 2023

Maybe @utf or @gpetretto would be interested

Copy link

codecov bot commented Dec 12, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 99.86%. Comparing base (eda2a65) to head (393e71d).
Report is 14 commits behind head on main.

❗ Current head 393e71d differs from pull request most recent head 72081f9. Consider uploading reports for the commit 72081f9 to get more accurate results

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #502      +/-   ##
==========================================
+ Coverage   99.42%   99.86%   +0.44%     
==========================================
  Files          21       20       -1     
  Lines        1564     1511      -53     
  Branches      425      414      -11     
==========================================
- Hits         1555     1509      -46     
+ Misses          9        2       -7     
Files Coverage Δ
src/jobflow/core/job.py 100.00% <100.00%> (ø)

... and 7 files with indirect coverage changes

@FabiPi3 FabiPi3 closed this Apr 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant