From bb814d35fdde0f45c380721857d1c2896c0cdfe7 Mon Sep 17 00:00:00 2001 From: getsentry-bot Date: Tue, 1 Oct 2024 18:30:39 +0000 Subject: [PATCH] Revert "feat(job-runner): track job status (#6332)" This reverts commit a94f00f7b159898c18d0432635b8941c8a254bbc. Co-authored-by: onewland <130124+onewland@users.noreply.github.com> --- snuba/cli/jobs.py | 34 +++--------- snuba/manual_jobs/__init__.py | 6 +- snuba/manual_jobs/job_loader.py | 2 +- snuba/manual_jobs/runner.py | 97 ++------------------------------- snuba/redis.py | 4 -- snuba/settings/__init__.py | 2 - snuba/settings/settings_test.py | 1 - tests/cli/test_jobs.py | 25 +-------- tests/manual_jobs/test_job.py | 82 ---------------------------- 9 files changed, 16 insertions(+), 237 deletions(-) delete mode 100644 tests/manual_jobs/test_job.py diff --git a/snuba/cli/jobs.py b/snuba/cli/jobs.py index a5f996074e..4e736e6577 100644 --- a/snuba/cli/jobs.py +++ b/snuba/cli/jobs.py @@ -3,12 +3,8 @@ import click from snuba.manual_jobs import JobSpec -from snuba.manual_jobs.runner import ( - MANIFEST_FILENAME, - get_job_status, - list_job_specs, - run_job, -) +from snuba.manual_jobs.job_loader import JobLoader +from snuba.manual_jobs.runner import list_job_specs JOB_SPECIFICATION_ERROR_MSG = "Missing job type and/or job id" @@ -19,19 +15,7 @@ def jobs() -> None: @jobs.command() -@click.option("--json_manifest", default=MANIFEST_FILENAME) -def list(*, json_manifest: str) -> None: - job_specs = list_job_specs(json_manifest) - click.echo(job_specs) - - -def _run_job_and_echo_status(job_spec: JobSpec, dry_run: bool) -> None: - status = run_job(job_spec, dry_run) - click.echo(f"resulting job status = {status}") - - -@jobs.command() -@click.option("--json_manifest", default=MANIFEST_FILENAME) +@click.option("--json_manifest", required=True) @click.option("--job_id") @click.option( "--dry_run", @@ -42,7 +26,8 @@ def run_from_manifest(*, json_manifest: str, job_id: str, dry_run: bool) -> None if job_id not in job_specs.keys(): raise click.ClickException("Provide a valid job id") - _run_job_and_echo_status(job_specs[job_id], dry_run) + job_to_run = JobLoader.get_job_instance(job_specs[job_id], dry_run) + job_to_run.execute() def _parse_params(pairs: Tuple[str, ...]) -> MutableMapping[Any, Any]: @@ -62,10 +47,5 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) -> raise click.ClickException(JOB_SPECIFICATION_ERROR_MSG) job_spec = JobSpec(job_id=job_id, job_type=job_type, params=_parse_params(pairs)) - _run_job_and_echo_status(job_spec, dry_run) - - -@jobs.command() -@click.option("--job_id") -def status(*, job_id: str) -> None: - click.echo(get_job_status(job_id)) + job_to_run = JobLoader.get_job_instance(job_spec, dry_run) + job_to_run.execute() diff --git a/snuba/manual_jobs/__init__.py b/snuba/manual_jobs/__init__.py index 270e8281c3..a4d52e3e89 100644 --- a/snuba/manual_jobs/__init__.py +++ b/snuba/manual_jobs/__init__.py @@ -6,14 +6,14 @@ from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory -logger = logging.getLogger("snuba.manual_jobs") +logger = logging.getLogger("snuba_init") @dataclass class JobSpec: job_id: str job_type: str - params: Optional[MutableMapping[Any, Any]] = None + params: Optional[MutableMapping[Any, Any]] class Job(ABC, metaclass=RegisteredClass): @@ -26,7 +26,7 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None: @abstractmethod def execute(self) -> None: - raise NotImplementedError + pass @classmethod def config_key(cls) -> str: diff --git a/snuba/manual_jobs/job_loader.py b/snuba/manual_jobs/job_loader.py index 6bf72155aa..45e4080d68 100644 --- a/snuba/manual_jobs/job_loader.py +++ b/snuba/manual_jobs/job_loader.py @@ -9,7 +9,7 @@ def __init__(self, job_type: str): super().__init__(f"Job does not exist. Did you make a file {job_type}.py yet?") -class _JobLoader: +class JobLoader: @staticmethod def get_job_instance(job_spec: JobSpec, dry_run: bool) -> "Job": job_type_class = Job.class_from_name(job_spec.job_type) diff --git a/snuba/manual_jobs/runner.py b/snuba/manual_jobs/runner.py index b0705c8706..f8cb70044d 100644 --- a/snuba/manual_jobs/runner.py +++ b/snuba/manual_jobs/runner.py @@ -1,41 +1,12 @@ import os -from enum import StrEnum -from typing import Any, Mapping, Optional +from typing import Any, Mapping import simplejson -from sentry_sdk import capture_exception from snuba.manual_jobs import JobSpec -from snuba.manual_jobs.job_loader import _JobLoader -from snuba.redis import RedisClientKey, get_redis_client -from snuba.utils.serializable_exception import SerializableException -_redis_client = get_redis_client(RedisClientKey.MANUAL_JOBS) - -class JobStatus(StrEnum): - RUNNING = "running" - FINISHED = "finished" - NOT_STARTED = "not_started" - FAILED = "failed" - - -class JobLockedException(SerializableException): - def __init__(self, job_id: str): - super().__init__(f"Job {job_id} lock exists, not available to run") - - -class JobStatusException(SerializableException): - def __init__(self, job_id: str, status: JobStatus): - super().__init__( - f"Job {job_id} has run before, status = {status}, not available to run" - ) - - -MANIFEST_FILENAME = "job_manifest.json" - - -def _read_manifest_from_path(filename: str) -> Mapping[str, JobSpec]: +def _read_from_path(filename: str) -> Mapping[str, JobSpec]: local_root = os.path.dirname(__file__) with open(os.path.join(local_root, filename)) as stream: @@ -63,67 +34,7 @@ def _build_job_spec_from_entry(content: Any) -> JobSpec: return job_spec -def _build_job_lock_key(job_id: str) -> str: - return f"snuba:manual_jobs:{job_id}:lock" - - -def _build_job_status_key(job_id: str) -> str: - return f"snuba:manual_jobs:{job_id}:execution_status" - - -def _acquire_job_lock(job_id: str) -> bool: - return bool( - _redis_client.set( - name=_build_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60) - ) - ) - - -def _release_job_lock(job_id: str) -> None: - _redis_client.delete(_build_job_lock_key(job_id)) - - -def _set_job_status(job_id: str, status: JobStatus) -> JobStatus: - if not _redis_client.set(name=_build_job_status_key(job_id), value=status.value): - raise SerializableException(f"Failed to set job status {status} on {job_id}") - return status - - -def get_job_status(job_id: str) -> Optional[JobStatus]: - redis_status = _redis_client.get(name=_build_job_status_key(job_id)) - return JobStatus(redis_status.decode("utf-8")) if redis_status else redis_status - - def list_job_specs( - manifest_filename: str = MANIFEST_FILENAME, + jobs_filename: str = "job_manifest.json", ) -> Mapping[str, JobSpec]: - return _read_manifest_from_path(manifest_filename) - - -def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus: - current_job_status = get_job_status(job_spec.job_id) - if current_job_status is not None and current_job_status != JobStatus.NOT_STARTED: - raise JobStatusException(job_id=job_spec.job_id, status=current_job_status) - - have_lock = _acquire_job_lock(job_spec.job_id) - if not have_lock: - raise JobLockedException(job_spec.job_id) - - current_job_status = _set_job_status(job_spec.job_id, JobStatus.NOT_STARTED) - - job_to_run = _JobLoader.get_job_instance(job_spec, dry_run) - - try: - if not dry_run: - current_job_status = _set_job_status(job_spec.job_id, JobStatus.RUNNING) - job_to_run.execute() - if not dry_run: - current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED) - except BaseException: - if not dry_run: - current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED) - capture_exception() - finally: - _release_job_lock(job_spec.job_id) - - return current_job_status + return _read_from_path(jobs_filename) diff --git a/snuba/redis.py b/snuba/redis.py index df0c21e98f..575948018e 100644 --- a/snuba/redis.py +++ b/snuba/redis.py @@ -114,7 +114,6 @@ class RedisClientKey(Enum): DLQ = "dlq" OPTIMIZE = "optimize" ADMIN_AUTH = "admin_auth" - MANUAL_JOBS = "manual_jobs" _redis_clients: Mapping[RedisClientKey, RedisClientType] = { @@ -142,9 +141,6 @@ class RedisClientKey(Enum): RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster( settings.REDIS_CLUSTERS["admin_auth"] ), - RedisClientKey.MANUAL_JOBS: _initialize_specialized_redis_cluster( - settings.REDIS_CLUSTERS["manual_jobs"] - ), } diff --git a/snuba/settings/__init__.py b/snuba/settings/__init__.py index 310587f5d7..f681b4dd99 100644 --- a/snuba/settings/__init__.py +++ b/snuba/settings/__init__.py @@ -175,7 +175,6 @@ class RedisClusters(TypedDict): dlq: RedisClusterConfig | None optimize: RedisClusterConfig | None admin_auth: RedisClusterConfig | None - manual_jobs: RedisClusterConfig | None REDIS_CLUSTERS: RedisClusters = { @@ -187,7 +186,6 @@ class RedisClusters(TypedDict): "dlq": None, "optimize": None, "admin_auth": None, - "manual_jobs": None, } # Query Recording Options diff --git a/snuba/settings/settings_test.py b/snuba/settings/settings_test.py index 70ca081e43..c39978f00e 100644 --- a/snuba/settings/settings_test.py +++ b/snuba/settings/settings_test.py @@ -60,7 +60,6 @@ (7, "dlq"), (8, "optimize"), (9, "admin_auth"), - (10, "manual_jobs"), ] } VALIDATE_DATASET_YAMLS_ON_STARTUP = True diff --git a/tests/cli/test_jobs.py b/tests/cli/test_jobs.py index e37affa10d..841fe1f7c5 100644 --- a/tests/cli/test_jobs.py +++ b/tests/cli/test_jobs.py @@ -1,10 +1,8 @@ -import pytest from click.testing import CliRunner -from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status +from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest -@pytest.mark.redis_db def test_cmd_line_valid() -> None: runner = CliRunner() result = runner.invoke( @@ -15,7 +13,6 @@ def test_cmd_line_valid() -> None: assert result.exit_code == 0 -@pytest.mark.redis_db def test_invalid_job_errors() -> None: runner = CliRunner() result = runner.invoke( @@ -35,7 +32,6 @@ def test_invalid_job_errors() -> None: assert result.exit_code == 1 -@pytest.mark.redis_db def test_cmd_line_no_job_specification_errors() -> None: runner = CliRunner() result = runner.invoke(run, ["--dry_run", "True", "k1=v1", "k2=v2"]) @@ -43,7 +39,6 @@ def test_cmd_line_no_job_specification_errors() -> None: assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n" -@pytest.mark.redis_db def test_cmd_line_no_job_id_errors() -> None: runner = CliRunner() result = runner.invoke( @@ -53,7 +48,6 @@ def test_cmd_line_no_job_id_errors() -> None: assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n" -@pytest.mark.redis_db def test_cmd_line_no_job_type_errors() -> None: runner = CliRunner() result = runner.invoke( @@ -63,7 +57,6 @@ def test_cmd_line_no_job_type_errors() -> None: assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n" -@pytest.mark.redis_db def test_json_valid() -> None: runner = CliRunner() result = runner.invoke( @@ -76,19 +69,3 @@ def test_json_valid() -> None: ], ) assert result.exit_code == 0 - - -@pytest.mark.redis_db -def test_jobs_status() -> None: - runner = CliRunner() - runner.invoke( - run_from_manifest, - [ - "--json_manifest", - "job_manifest.json", - "--job_id", - "abc1234", - ], - ) - result = runner.invoke(status, ["--job_id", "abc1234"]) - assert result.exit_code == 0 diff --git a/tests/manual_jobs/test_job.py b/tests/manual_jobs/test_job.py deleted file mode 100644 index e011eb590c..0000000000 --- a/tests/manual_jobs/test_job.py +++ /dev/null @@ -1,82 +0,0 @@ -from threading import Thread -from time import sleep -from unittest.mock import patch - -import pytest - -from snuba.manual_jobs import Job, JobSpec -from snuba.manual_jobs.job_loader import _JobLoader -from snuba.manual_jobs.runner import ( - JobLockedException, - JobStatus, - JobStatusException, - _acquire_job_lock, - get_job_status, - run_job, -) -from snuba.utils.serializable_exception import SerializableException - -JOB_ID = "abc1234" -test_job_spec = JobSpec(job_id=JOB_ID, job_type="ToyJob") - - -class FailJob(Job): - def __init__(self) -> None: - pass - - def execute(self) -> None: - raise SerializableException("Intended failure") - - -class SlowJob(Job): - def __init__(self) -> None: - self.stop = False - - def execute(self) -> None: - while not self.stop: - sleep(0.005) - - -@pytest.mark.redis_db -def test_job_status_changes_to_finished() -> None: - assert get_job_status(JOB_ID) is None - run_job(test_job_spec, False) - assert get_job_status(JOB_ID) == JobStatus.FINISHED - with pytest.raises(JobStatusException): - run_job(test_job_spec, False) - - -@pytest.mark.redis_db -def test_job_with_exception_causes_failure() -> None: - with patch.object(_JobLoader, "get_job_instance") as MockGetInstance: - MockGetInstance.return_value = FailJob() - assert get_job_status(JOB_ID) is None - run_job(test_job_spec, False) - assert get_job_status(JOB_ID) == JobStatus.FAILED - - -@pytest.mark.redis_db -def test_slow_job_stay_running() -> None: - with patch.object(_JobLoader, "get_job_instance") as MockGetInstance: - job = SlowJob() - MockGetInstance.return_value = job - assert get_job_status(JOB_ID) is None - t = Thread( - target=run_job, name="slow-background-job", args=[test_job_spec, False] - ) - t.start() - sleep(0.1) - assert get_job_status(JOB_ID) == JobStatus.RUNNING - job.stop = True - - -@pytest.mark.redis_db -def test_job_status_with_invalid_job_id() -> None: - assert get_job_status("invalid_job_id") is None - - -@pytest.mark.redis_db -def test_job_lock() -> None: - _acquire_job_lock(JOB_ID) - with pytest.raises(JobLockedException): - run_job(test_job_spec, False)