Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow hooking job failure for generic error handling #205

Merged
merged 7 commits into from
Sep 23, 2024

Conversation

NeoLegends
Copy link
Contributor

@NeoLegends NeoLegends commented Aug 19, 2024

Closes #179
Closes #204

now testing this

@NeoLegends NeoLegends added the enhancement New feature or request label Aug 19, 2024
@NeoLegends NeoLegends self-assigned this Aug 19, 2024
def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]):
prev_jobs = set(prev_jobs.get(gs.STATE_ERROR, []))
for job in cur_jobs.get(gs.STATE_ERROR, []):
if job not in prev_jobs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this line should be if job not in prev_jobs.get(gs.STATE_ERROR, []):?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think overwriting that variable is just confusing and not what I intended. I've since changed the var names, so it should be clearer now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed that, but yes it's better to use a different name.

Copy link
Contributor

@critias critias left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add another doc string, but beside that it looks good to me.

About the proposed usage in the linked issues: It might be worth having a more general way detect broken nodes and exclude them for a certain time instead of adding this to each job individually 🤔

@@ -572,6 +576,12 @@ def maybe_clear_state(state, always_clear, action):
self.job_cleaner.start()
return True

def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a short doc string here as well.

logic.

Sispyhus will call this function w/ the job instance if the job enters the
failure state. The callback itself is then responsible for any retry logic,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
failure state. The callback itself is then responsible for any retry logic,
error state. The callback itself is then responsible for any retry logic,

I think it is called "error" everywhere else. cf. also "interrupted_resumable" and "interrupted_non_resumable" which I would also name failures but they are not handled here.

Maybe the function name should be renamed as well.

Comment on lines 607 to 608
prev_jobs = self.jobs
cur_jobs = self.update_jobs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this interact with the block directly below. self.clear_states removes some errors, call self.update_jobs() and then cycle the outer loop again. This would mess up the logic to detect new jobs in error state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ok, the loop cycles around if jobs were cleared before any of the logic is run.

for job in cur_jobs.get(gs.STATE_ERROR, []):
if job not in prev_errored_jobs:
gs.on_job_failure(job)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you call self.update_jobs() after finishing the loop to account for changes in the state of jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, since this will loop around immediately when the processing is done.

def handle_job_failure(self, prev_jobs: Dict[str, List[Job]], cur_jobs: Dict[str, List[Job]]):
prev_errored_jobs = set(prev_jobs.get(gs.STATE_ERROR, []))
for job in cur_jobs.get(gs.STATE_ERROR, []):
if job not in prev_errored_jobs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to keep track of which job is newly added to error state or could this be a stateless function and we always use it on all errored jobs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could just as well be stateless.

@NeoLegends NeoLegends marked this pull request as ready for review September 23, 2024 14:12
@NeoLegends
Copy link
Contributor Author

I tested this and it works well. Do you think it's worthwhile adding default handlers that check for e.g. certain log file substrings and then automatically clear the error if they are present?

Copy link
Contributor

@michelwi michelwi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess some changes can now be removed again

sisyphus/manager.py Outdated Show resolved Hide resolved
sisyphus/manager.py Outdated Show resolved Hide resolved
sisyphus/manager.py Outdated Show resolved Hide resolved
@NeoLegends
Copy link
Contributor Author

NeoLegends commented Sep 23, 2024

As a first approximation of a proper error handling implementation for the CUDA errors I'm encountering at the momentadding this snippet to settings.py works reasonably well:

ignore_error_cache = set()

def on_job_failure(job: "Job"):
    from i6_core.returnn import ReturnnTrainingJob

    if not isinstance(job, ReturnnTrainingJob):
        logging.debug(f"{job.job_id()}: error, but not a {ReturnnTrainingJob.__name__}, so not doing anything.")
        return
    elif job.job_id() in ignore_error_cache:
        return

    log_file_path = os.path.join(job.work_path(), "../log.run.1")
    with open(log_file_path, "rt") as log_file:
        is_cuda_err = any("cuda error" in line.lower() for line in log_file)

    if not is_cuda_err:
        logging.debug(f"{job.job_id()}: died but probably not due to a CUDA error, better go check by hand.")
        ignore_error_cache.add(job.job_id())
        return

    logging.info(f"{job.job_id()}: CUDA 💥, re-starting... 🔁")

    # archive log file
    i = 1
    cleared_log_path = None
    while cleared_log_path is None or os.path.exists(cleared_log_path):
        cleared_log_path = os.path.join(job.work_path(), f"../log.run.cuda-cleared.{i:04}.gz")
        i += 1
    with open(log_file_path, "rb") as log_in, gzip.open(cleared_log_path, "wb") as log_out:
        shutil.copyfileobj(log_in, log_out)
    os.remove(log_file_path)

    # re-schedule job
    for f in [
        os.path.join(job.work_path(), "../error.run.1"),
        os.path.join(job.work_path(), "../submit_log.run"),
    ]:
        try:
            os.remove(f)
        except FileNotFoundError:
            pass

@NeoLegends NeoLegends merged commit c7de85e into master Sep 23, 2024
3 checks passed
@NeoLegends NeoLegends deleted the moritz-job-failure-hook branch September 23, 2024 15:29
@albertz
Copy link
Member

albertz commented Oct 4, 2024

I wonder, this callback is called all the time, not once on job failure? This is a bit unexpected to me. And also makes the logic much more complicated on the user side. E.g. you need to add this ignore_error_cache logic here. Which is also wrong, because once the user clears the error for this job, and it continues to run, it might later run into a CUDA error, but then you would ignore it, because you never clear the ignore_error_cache here.

@albertz
Copy link
Member

albertz commented Oct 4, 2024

Well, ok, checking the mtime of the error file probably should be better, if you want to keep the callback logic this way. Like:

ignore_error_cache = {}  # job_id -> err_mtime


# https://github.com/rwth-i6/sisyphus/pull/205#issuecomment-2368527715
def on_job_failure(job: Job):
    import logging
    import gzip
    from i6_core.returnn import ReturnnTrainingJob

    if not isinstance(job, ReturnnTrainingJob):
        return

    try:
        err_mtime = os.path.getmtime(os.path.join(job.work_path(), "../error.run.1"))
    except FileNotFoundError:
        return  # maybe was already cleared
    if ignore_error_cache.get(job.job_id()) == err_mtime:
        return

    log_file_path = os.path.join(job.work_path(), "../log.run.1")
    with open(log_file_path, "rt") as log_file:
        is_cuda_err = any(("cuda error" in line.lower() or "cuFFT error" in line) for line in log_file)

    if not is_cuda_err:
        logging.debug(f"{job.job_id()}: died but probably not due to a CUDA error, better go check by hand.")
        ignore_error_cache[job.job_id()] = err_mtime
        return

    ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Hooking job exit/failure Auto-restart jobs on user-specified error conditions
4 participants