Skip to content

Commit

Permalink
Revert "feat(job-runner): track job status (#6332)"
Browse files Browse the repository at this point in the history
This reverts commit a94f00f.

Co-authored-by: onewland <[email protected]>
  • Loading branch information
getsentry-bot and onewland committed Oct 1, 2024
1 parent a94f00f commit bb814d3
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 237 deletions.
34 changes: 7 additions & 27 deletions snuba/cli/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,8 @@
import click

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.runner import (
MANIFEST_FILENAME,
get_job_status,
list_job_specs,
run_job,
)
from snuba.manual_jobs.job_loader import JobLoader
from snuba.manual_jobs.runner import list_job_specs

JOB_SPECIFICATION_ERROR_MSG = "Missing job type and/or job id"

Expand All @@ -19,19 +15,7 @@ def jobs() -> None:


@jobs.command()
@click.option("--json_manifest", default=MANIFEST_FILENAME)
def list(*, json_manifest: str) -> None:
job_specs = list_job_specs(json_manifest)
click.echo(job_specs)


def _run_job_and_echo_status(job_spec: JobSpec, dry_run: bool) -> None:
status = run_job(job_spec, dry_run)
click.echo(f"resulting job status = {status}")


@jobs.command()
@click.option("--json_manifest", default=MANIFEST_FILENAME)
@click.option("--json_manifest", required=True)
@click.option("--job_id")
@click.option(
"--dry_run",
Expand All @@ -42,7 +26,8 @@ def run_from_manifest(*, json_manifest: str, job_id: str, dry_run: bool) -> None
if job_id not in job_specs.keys():
raise click.ClickException("Provide a valid job id")

_run_job_and_echo_status(job_specs[job_id], dry_run)
job_to_run = JobLoader.get_job_instance(job_specs[job_id], dry_run)
job_to_run.execute()


def _parse_params(pairs: Tuple[str, ...]) -> MutableMapping[Any, Any]:
Expand All @@ -62,10 +47,5 @@ def run(*, job_type: str, job_id: str, dry_run: bool, pairs: Tuple[str, ...]) ->
raise click.ClickException(JOB_SPECIFICATION_ERROR_MSG)
job_spec = JobSpec(job_id=job_id, job_type=job_type, params=_parse_params(pairs))

_run_job_and_echo_status(job_spec, dry_run)


@jobs.command()
@click.option("--job_id")
def status(*, job_id: str) -> None:
click.echo(get_job_status(job_id))
job_to_run = JobLoader.get_job_instance(job_spec, dry_run)
job_to_run.execute()
6 changes: 3 additions & 3 deletions snuba/manual_jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@

from snuba.utils.registered_class import RegisteredClass, import_submodules_in_directory

logger = logging.getLogger("snuba.manual_jobs")
logger = logging.getLogger("snuba_init")


@dataclass
class JobSpec:
job_id: str
job_type: str
params: Optional[MutableMapping[Any, Any]] = None
params: Optional[MutableMapping[Any, Any]]


class Job(ABC, metaclass=RegisteredClass):
Expand All @@ -26,7 +26,7 @@ def __init__(self, job_spec: JobSpec, dry_run: bool) -> None:

@abstractmethod
def execute(self) -> None:
raise NotImplementedError
pass

@classmethod
def config_key(cls) -> str:
Expand Down
2 changes: 1 addition & 1 deletion snuba/manual_jobs/job_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def __init__(self, job_type: str):
super().__init__(f"Job does not exist. Did you make a file {job_type}.py yet?")


class _JobLoader:
class JobLoader:
@staticmethod
def get_job_instance(job_spec: JobSpec, dry_run: bool) -> "Job":
job_type_class = Job.class_from_name(job_spec.job_type)
Expand Down
97 changes: 4 additions & 93 deletions snuba/manual_jobs/runner.py
Original file line number Diff line number Diff line change
@@ -1,41 +1,12 @@
import os
from enum import StrEnum
from typing import Any, Mapping, Optional
from typing import Any, Mapping

import simplejson
from sentry_sdk import capture_exception

from snuba.manual_jobs import JobSpec
from snuba.manual_jobs.job_loader import _JobLoader
from snuba.redis import RedisClientKey, get_redis_client
from snuba.utils.serializable_exception import SerializableException

_redis_client = get_redis_client(RedisClientKey.MANUAL_JOBS)


class JobStatus(StrEnum):
RUNNING = "running"
FINISHED = "finished"
NOT_STARTED = "not_started"
FAILED = "failed"


class JobLockedException(SerializableException):
def __init__(self, job_id: str):
super().__init__(f"Job {job_id} lock exists, not available to run")


class JobStatusException(SerializableException):
def __init__(self, job_id: str, status: JobStatus):
super().__init__(
f"Job {job_id} has run before, status = {status}, not available to run"
)


MANIFEST_FILENAME = "job_manifest.json"


def _read_manifest_from_path(filename: str) -> Mapping[str, JobSpec]:
def _read_from_path(filename: str) -> Mapping[str, JobSpec]:
local_root = os.path.dirname(__file__)

with open(os.path.join(local_root, filename)) as stream:
Expand Down Expand Up @@ -63,67 +34,7 @@ def _build_job_spec_from_entry(content: Any) -> JobSpec:
return job_spec


def _build_job_lock_key(job_id: str) -> str:
return f"snuba:manual_jobs:{job_id}:lock"


def _build_job_status_key(job_id: str) -> str:
return f"snuba:manual_jobs:{job_id}:execution_status"


def _acquire_job_lock(job_id: str) -> bool:
return bool(
_redis_client.set(
name=_build_job_lock_key(job_id), value=1, nx=True, ex=(24 * 60 * 60)
)
)


def _release_job_lock(job_id: str) -> None:
_redis_client.delete(_build_job_lock_key(job_id))


def _set_job_status(job_id: str, status: JobStatus) -> JobStatus:
if not _redis_client.set(name=_build_job_status_key(job_id), value=status.value):
raise SerializableException(f"Failed to set job status {status} on {job_id}")
return status


def get_job_status(job_id: str) -> Optional[JobStatus]:
redis_status = _redis_client.get(name=_build_job_status_key(job_id))
return JobStatus(redis_status.decode("utf-8")) if redis_status else redis_status


def list_job_specs(
manifest_filename: str = MANIFEST_FILENAME,
jobs_filename: str = "job_manifest.json",
) -> Mapping[str, JobSpec]:
return _read_manifest_from_path(manifest_filename)


def run_job(job_spec: JobSpec, dry_run: bool) -> JobStatus:
current_job_status = get_job_status(job_spec.job_id)
if current_job_status is not None and current_job_status != JobStatus.NOT_STARTED:
raise JobStatusException(job_id=job_spec.job_id, status=current_job_status)

have_lock = _acquire_job_lock(job_spec.job_id)
if not have_lock:
raise JobLockedException(job_spec.job_id)

current_job_status = _set_job_status(job_spec.job_id, JobStatus.NOT_STARTED)

job_to_run = _JobLoader.get_job_instance(job_spec, dry_run)

try:
if not dry_run:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.RUNNING)
job_to_run.execute()
if not dry_run:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FINISHED)
except BaseException:
if not dry_run:
current_job_status = _set_job_status(job_spec.job_id, JobStatus.FAILED)
capture_exception()
finally:
_release_job_lock(job_spec.job_id)

return current_job_status
return _read_from_path(jobs_filename)
4 changes: 0 additions & 4 deletions snuba/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class RedisClientKey(Enum):
DLQ = "dlq"
OPTIMIZE = "optimize"
ADMIN_AUTH = "admin_auth"
MANUAL_JOBS = "manual_jobs"


_redis_clients: Mapping[RedisClientKey, RedisClientType] = {
Expand Down Expand Up @@ -142,9 +141,6 @@ class RedisClientKey(Enum):
RedisClientKey.ADMIN_AUTH: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["admin_auth"]
),
RedisClientKey.MANUAL_JOBS: _initialize_specialized_redis_cluster(
settings.REDIS_CLUSTERS["manual_jobs"]
),
}


Expand Down
2 changes: 0 additions & 2 deletions snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ class RedisClusters(TypedDict):
dlq: RedisClusterConfig | None
optimize: RedisClusterConfig | None
admin_auth: RedisClusterConfig | None
manual_jobs: RedisClusterConfig | None


REDIS_CLUSTERS: RedisClusters = {
Expand All @@ -187,7 +186,6 @@ class RedisClusters(TypedDict):
"dlq": None,
"optimize": None,
"admin_auth": None,
"manual_jobs": None,
}

# Query Recording Options
Expand Down
1 change: 0 additions & 1 deletion snuba/settings/settings_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
(7, "dlq"),
(8, "optimize"),
(9, "admin_auth"),
(10, "manual_jobs"),
]
}
VALIDATE_DATASET_YAMLS_ON_STARTUP = True
25 changes: 1 addition & 24 deletions tests/cli/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import pytest
from click.testing import CliRunner

from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest, status
from snuba.cli.jobs import JOB_SPECIFICATION_ERROR_MSG, run, run_from_manifest


@pytest.mark.redis_db
def test_cmd_line_valid() -> None:
runner = CliRunner()
result = runner.invoke(
Expand All @@ -15,7 +13,6 @@ def test_cmd_line_valid() -> None:
assert result.exit_code == 0


@pytest.mark.redis_db
def test_invalid_job_errors() -> None:
runner = CliRunner()
result = runner.invoke(
Expand All @@ -35,15 +32,13 @@ def test_invalid_job_errors() -> None:
assert result.exit_code == 1


@pytest.mark.redis_db
def test_cmd_line_no_job_specification_errors() -> None:
runner = CliRunner()
result = runner.invoke(run, ["--dry_run", "True", "k1=v1", "k2=v2"])
assert result.exit_code == 1
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


@pytest.mark.redis_db
def test_cmd_line_no_job_id_errors() -> None:
runner = CliRunner()
result = runner.invoke(
Expand All @@ -53,7 +48,6 @@ def test_cmd_line_no_job_id_errors() -> None:
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


@pytest.mark.redis_db
def test_cmd_line_no_job_type_errors() -> None:
runner = CliRunner()
result = runner.invoke(
Expand All @@ -63,7 +57,6 @@ def test_cmd_line_no_job_type_errors() -> None:
assert result.output == "Error: " + JOB_SPECIFICATION_ERROR_MSG + "\n"


@pytest.mark.redis_db
def test_json_valid() -> None:
runner = CliRunner()
result = runner.invoke(
Expand All @@ -76,19 +69,3 @@ def test_json_valid() -> None:
],
)
assert result.exit_code == 0


@pytest.mark.redis_db
def test_jobs_status() -> None:
runner = CliRunner()
runner.invoke(
run_from_manifest,
[
"--json_manifest",
"job_manifest.json",
"--job_id",
"abc1234",
],
)
result = runner.invoke(status, ["--job_id", "abc1234"])
assert result.exit_code == 0
Loading

0 comments on commit bb814d3

Please sign in to comment.