Skip to content

Commit

Permalink
Merge branch 'canonical:main' into 240305_fix-ceoem-bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
stanley31huang authored Mar 7, 2024
2 parents c03e85a + 990f6ff commit 2f0b009
Show file tree
Hide file tree
Showing 19 changed files with 837 additions and 268 deletions.
2 changes: 2 additions & 0 deletions checkbox-ng/checkbox_ng/launcher/checkbox_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class Context:
def __init__(self, args, sa):
self.args = args
self.sa = sa
def reset_sa(self):
self.sa = SessionAssistant()


def main():
Expand Down
8 changes: 7 additions & 1 deletion checkbox-ng/checkbox_ng/launcher/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,13 @@ def _resumed_session(self, session_id):
"""
try:
yield self.sa.resume_session(session_id)
except rpyc.core.vinegar.GenericException as e:
# cast back the (custom) remote exception for IncompatibleJobError
# (that is of type GenericException due to rpyc)
# so that it can be treated as a normal "local" exception"
if "plainbox.impl.session.resume.IncompatibleJobError" in str(e):
raise IncompatibleJobError(*e.args)
raise
finally:
self.sa.abandon_session()

Expand Down Expand Up @@ -405,7 +412,6 @@ def should_start_via_autoresume(self) -> bool:
app_blob = json.loads(metadata.app_blob.decode("UTF-8"))

if not app_blob.get("testplan_id"):
self.sa.abandon_session()
return False

self.sa.select_test_plan(app_blob["testplan_id"])
Expand Down
156 changes: 125 additions & 31 deletions checkbox-ng/checkbox_ng/launcher/subcommands.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from tempfile import TemporaryDirectory
import textwrap
import fnmatch
import contextlib
import gettext
import json
import logging
Expand All @@ -38,6 +39,7 @@

from plainbox.abc import IJobResult
from plainbox.impl.color import Colorizer
from plainbox.impl.session.resume import IncompatibleJobError
from plainbox.impl.execution import UnifiedRunner
from plainbox.impl.highlevel import Explorer
from plainbox.impl.result import MemoryJobResult
Expand Down Expand Up @@ -258,10 +260,8 @@ def invoked(self, ctx):
something_got_chosen = True
except ResumeInstead:
self.sa.finalize_session()
something_got_chosen = (
self._manually_resume_session(
self.resume_candidates
)
something_got_chosen = self._manually_resume_session(
self.resume_candidates
)

if not self.ctx.sa.get_static_todo_list():
Expand Down Expand Up @@ -340,6 +340,58 @@ def join_cmd(args):
lambda session_id: [join_cmd(respawn_cmd + [session_id])]
)

@contextlib.contextmanager
def _resumed_session(self, session_id):
"""
Used to temporarily resume a session to inspect it, abandoning it
before exiting the context
"""
try:
# reload the list of resumable_session in SA
yield self.sa.resume_session(session_id)
finally:
self.ctx.reset_sa()

def _should_autoresume_last_run(self, resume_candidates):
try:
last_abandoned_session = resume_candidates[0]
except IndexError:
return False
try:
with self._resumed_session(last_abandoned_session.id) as metadata:
app_blob = json.loads(metadata.app_blob.decode("UTF-8"))

if not app_blob.get("testplan_id"):
return False

self.sa.select_test_plan(app_blob["testplan_id"])
self.sa.bootstrap()

if not metadata.running_job_name:
return False

job_state = self.sa.get_job_state(metadata.running_job_name)
if job_state.job.plugin != "shell":
return False
return True
except IncompatibleJobError as ije:
# last resumable session is incompatible, produce a helpful log
_logger.error(
"Checkbox tried to resume last session (%s), but the "
"content of Checkbox Providers has changed.",
last_abandoned_session.id,
)
_logger.error(str(ije))
_logger.error(
"To resume it either revert the latest Checkbox snap refresh"
)
_logger.error(
"or roll back the relevant provider debian package first"
)

input("\nPress enter to start Checkbox.")
return False

def _auto_resume_session(self, resume_candidates):
"""
Check if there was a request to auto-resume a session.
Expand All @@ -358,12 +410,17 @@ def _auto_resume_session(self, resume_candidates):
]
if requested_sessions:
# session_ids are unique, so there should be only 1
self._resume_session(requested_sessions[0])
self._resume_session(
requested_sessions[0].id, IJobResult.OUTCOME_UNDECIDED
)
return True
else:
raise RuntimeError("Requested session is not resumable!")
else:
return False
elif self._should_autoresume_last_run(resume_candidates):
last_session = resume_candidates[0]
self._resume_session(last_session.id, None)
return True
return False

def _manually_resume_session(self, resume_candidates):
"""
Expand Down Expand Up @@ -405,7 +462,7 @@ def _manually_resume_session(self, resume_candidates):
break

if resume_params.session_id:
self._resume_session(resume_params)
self._resume_session_via_resume_params(resume_params)
return True
return False

Expand All @@ -432,53 +489,89 @@ def _run_resume_ui_loop(self, resume_candidates):
self._delete_old_sessions(ids)
return False

def _resume_session(self, resume_params):
metadata = self.ctx.sa.resume_session(resume_params.session_id)
def _resume_session_via_resume_params(self, resume_params):
outcome = {
"pass": IJobResult.OUTCOME_PASS,
"fail": IJobResult.OUTCOME_FAIL,
"skip": IJobResult.OUTCOME_SKIP,
"rerun": IJobResult.OUTCOME_UNDECIDED,
}[resume_params.action]
return self._resume_session(
resume_params.session_id, outcome, resume_params.comments
)

def _get_autoresume_outcome_last_job(self, metadata):
"""
Calculates the result of the latest running job given its flags. This
is used to automatically resume a session and assign an outcome to the
job that interrupted the session. If the interruption is due to a
noreturn job (for example, reboot), the job will be marked as passed,
else, if the job made Checkbox crash, it will be marked as crash
"""
job_state = self.sa.get_job_state(metadata.running_job_name)
if "noreturn" in (job_state.job.flags or set()):
return IJobResult.OUTCOME_PASS
return IJobResult.OUTCOME_CRASH

def _resume_session(
self, session_id: str, outcome: "IJobResult|None", comments=[]
):
"""
Resumes the session with the given session_id assigning to the latest
running job the given outcome. If outcome is not provided it will be
calculated from the function _get_autoresume_outcome_last_job
"""
metadata = self.ctx.sa.resume_session(session_id)
if "testplanless" not in metadata.flags:
app_blob = json.loads(metadata.app_blob.decode("UTF-8"))
test_plan_id = app_blob["testplan_id"]
self.ctx.sa.select_test_plan(test_plan_id)
self.ctx.sa.bootstrap()
if outcome is None:
outcome = self._get_autoresume_outcome_last_job(metadata)

last_job = metadata.running_job_name
is_cert_blocker = (
self.ctx.sa.get_job_state(last_job).effective_certification_status
== "blocker"
)
# If we resumed maybe not rerun the same, probably broken job
result_dict = {
"comments": resume_params.comments,
}
if resume_params.action == "pass":
result_dict["outcome"] = IJobResult.OUTCOME_PASS
result_dict = {"comments": comments, "outcome": outcome}
if outcome == IJobResult.OUTCOME_PASS:
result_dict["comments"] = newline_join(
result_dict["comments"], "Passed after resuming execution"
)

elif resume_params.action == "fail":
if is_cert_blocker:
if not resume_params.comments:
result_dict["comments"] = request_comment("why it failed")
elif outcome == IJobResult.OUTCOME_FAIL:
if is_cert_blocker and not comments:
result_dict["comments"] = request_comment("why it failed")
else:
result_dict["comments"] = newline_join(
result_dict["comments"], "Failed after resuming execution"
)

result_dict["outcome"] = IJobResult.OUTCOME_FAIL
elif resume_params.action == "skip":
if is_cert_blocker:
if not resume_params.comments:
result_dict["comments"] = request_comment(
"why you want to skip it"
)
elif outcome == IJobResult.OUTCOME_SKIP:
if is_cert_blocker and not comments:
result_dict["comments"] = request_comment(
"why you want to skip it"
)
else:
result_dict["comments"] = newline_join(
result_dict["comments"], "Skipped after resuming execution"
)
result_dict["outcome"] = IJobResult.OUTCOME_SKIP

elif resume_params.action == "rerun":
elif outcome == IJobResult.OUTCOME_CRASH:
if is_cert_blocker and not comments:
result_dict["comments"] = request_comment("why it failed")
else:
result_dict["comments"] = newline_join(
result_dict["comments"], "Crashed after resuming execution"
)
elif outcome == IJobResult.OUTCOME_UNDECIDED:
# if we don't call use_job_result it means we'll rerun the job
return
else:
raise ValueError(
"Unsupported outcome for resume {}".format(outcome)
)
result = MemoryJobResult(result_dict)
self.ctx.sa.use_job_result(last_job, result)

Expand Down Expand Up @@ -1209,7 +1302,8 @@ def invoked(self, ctx):
attrs["full_id"] = job_unit.id
attrs["id"] = job_unit.partial_id
attrs["certification_status"] = self.ctx.sa.get_job_state(
job).effective_certification_status
job
).effective_certification_status
jobs.append(attrs)
if ctx.args.format == "?":
all_keys = set()
Expand Down
Loading

0 comments on commit 2f0b009

Please sign in to comment.