diff --git a/src/lando/main/management/commands/__init__.py b/src/lando/main/management/commands/__init__.py index e69de29b..ff186433 100644 --- a/src/lando/main/management/commands/__init__.py +++ b/src/lando/main/management/commands/__init__.py @@ -0,0 +1,86 @@ + +import os +import re +import subprocess +from time import sleep + +from lando.main.models import Worker + + +class WorkerMixin: + @staticmethod + def _setup_ssh(ssh_private_key: str): + """Add a given private ssh key to ssh agent. + + SSH keys are needed in order to push to repositories that have an ssh + push path. + + The private key should be passed as it is in the key file, including all + new line characters and the new line character at the end. + + Args: + ssh_private_key (str): A string representing the private SSH key file. + """ + # Set all the correct environment variables + agent_process = subprocess.run( + ["ssh-agent", "-s"], capture_output=True, universal_newlines=True + ) + + # This pattern will match keys and values, and ignore everything after the + # semicolon. For example, the output of `agent_process` is of the form: + # SSH_AUTH_SOCK=/tmp/ssh-c850kLXXOS5e/agent.120801; export SSH_AUTH_SOCK; + # SSH_AGENT_PID=120802; export SSH_AGENT_PID; + # echo Agent pid 120802; + pattern = re.compile("(.+)=([^;]*)") + for key, value in pattern.findall(agent_process.stdout): + os.environ[key] = value + + # Add private SSH key to agent + # NOTE: ssh-add seems to output everything to stderr, including upon exit 0. + add_process = subprocess.run( + ["ssh-add", "-"], + input=ssh_private_key, + capture_output=True, + universal_newlines=True, + ) + if add_process.returncode != 0: + raise Exception(add_process.stderr) + + @property + def instance(self): + return Worker.objects.get(name=self.name) + + def _setup(self): + """Perform various setup actions.""" + if self.instance.ssh_private_key: + self._setup_ssh(self.instance.ssh_private_key) + + def _start(self, max_loops: int | None = None, *args, **kwargs): + """Run the main event loop.""" + # NOTE: The worker will exit when max_loops is reached, or when the stop + # variable is changed to True. + loops = 0 + while not self.instance.is_stopped: + if max_loops is not None and loops >= max_loops: + break + while self.instance.is_paused: + self.throttle(self.instance.sleep_seconds) + self.loop(*args, **kwargs) + loops += 1 + + self.stdout.write(f"{self} exited after {loops} loops.") + + def throttle(self, seconds: int | None = None): + """Sleep for a given number of seconds.""" + sleep(seconds if seconds is not None else self.instance.throttle_seconds) + + def start(self, max_loops: int | None = None): + """Run setup sequence and start the event loop.""" + if self.instance.is_stopped: + return + self._setup() + self._start(max_loops=max_loops) + + def loop(self, *args, **kwargs): + """The main event loop.""" + raise NotImplementedError() diff --git a/src/lando/main/management/commands/landing_worker.py b/src/lando/main/management/commands/landing_worker.py new file mode 100644 index 00000000..a29b251a --- /dev/null +++ b/src/lando/main/management/commands/landing_worker.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +import logging +from contextlib import contextmanager +from datetime import datetime +from io import BytesIO + +from django.core.management.base import BaseCommand + +from lando.main.management.commands import WorkerMixin +from lando.main.models import LandingJob + +logger = logging.getLogger(__name__) + + +@contextmanager +def job_processing(worker: LandingWorker, job: LandingJob): + """Mutex-like context manager that manages job processing miscellany. + + This context manager facilitates graceful worker shutdown, tracks the duration of + the current job, and commits changes to the DB at the very end. + + Args: + worker: the landing worker that is processing jobs + job: the job currently being processed + db: active database session + """ + start_time = datetime.now() + try: + yield + finally: + job.duration_seconds = (datetime.now() - start_time).seconds + job.save() + + +class LandingWorker(BaseCommand, WorkerMixin): + help = "Start the landing worker." + + def add_arguments(self, parser): + pass + + def handle(self, *args, **options): + self.last_job_finished = None + self.start() + + def loop(self): + if self.last_job_finished is False: + logger.info("Last job did not complete, sleeping.") + self.throttle(self.handle.sleep_seconds) + + job = LandingJob.next_job( + repositories=self.handle.enabled_repos + ).first() + + if job is None: + self.throttle(self.handle.sleep_seconds) + return + + with job_processing(self, job): + job.status = LandingJob.LandingJobStatus.IN_PROGRESS + job.attempts += 1 + job.save() + + self.stdout.write("Starting landing job", extra={"id": job.id}) + self.last_job_finished = self.run_job(job) + self.stdout.write("Finished processing landing job", extra={"id": job.id}) + + def run_job(self, job: LandingJob) -> bool: + repo = job.repo + if not repo.is_initialized: + repo.initialize() + + repo.reset() + repo.update() + + for revision in job.revisions: + patch_buffer = BytesIO(revision.patch_bytes) + repo.apply_patch(patch_buffer) + + # TODO: need to account for reverts/backouts somehow in the futue. + revision.commit_id = repo._run("rev-parse", "head") + revision.save() + + repo.push() + + job.status = LandingJob.LandingJobStatus.LANDED + job.save() diff --git a/src/lando/main/management/commands/test_command.py b/src/lando/main/management/commands/test_command.py deleted file mode 100644 index 06a504a7..00000000 --- a/src/lando/main/management/commands/test_command.py +++ /dev/null @@ -1,12 +0,0 @@ -from django.core.management.base import BaseCommand - - -class Command(BaseCommand): - help = "Test command" - - def add_arguments(self, parser): - parser.add_argument("names", nargs="+") - - def handle(self, *args, **options): - for name in options["names"]: - self.stdout.write(self.style.SUCCESS(f"Hello {name}!")) diff --git a/src/lando/main/migrations/0003_remove_landingjob_repository_name_and_more.py b/src/lando/main/migrations/0003_remove_landingjob_repository_name_and_more.py new file mode 100644 index 00000000..96144602 --- /dev/null +++ b/src/lando/main/migrations/0003_remove_landingjob_repository_name_and_more.py @@ -0,0 +1,40 @@ +# Generated by Django 5.0b1 on 2023-11-23 14:22 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("main", "0002_repo_worker"), + ] + + operations = [ + migrations.RemoveField( + model_name="landingjob", + name="repository_name", + ), + migrations.RemoveField( + model_name="landingjob", + name="repository_url", + ), + migrations.AddField( + model_name="landingjob", + name="target_repo", + field=models.ForeignKey( + null=True, on_delete=django.db.models.deletion.SET_NULL, to="main.repo" + ), + ), + migrations.AddField( + model_name="revision", + name="commit_id", + field=models.CharField(blank=True, max_length=40, null=True), + ), + migrations.AlterField( + model_name="repo", + name="system_path", + field=models.FilePathField( + allow_folders=True, max_length=255, path="/mediafiles/repos" + ), + ), + ] diff --git a/src/lando/main/models.py b/src/lando/main/models.py index 4b5b2c79..3a76110a 100644 --- a/src/lando/main/models.py +++ b/src/lando/main/models.py @@ -1,5 +1,6 @@ from __future__ import annotations +import tempfile import datetime import logging import os @@ -15,7 +16,7 @@ from django.utils.translation import gettext_lazy from lando import settings -from lando.utils import build_patch_for_revision +from lando.utils import build_patch_for_revision, GitPatchHelper logger = logging.getLogger(__name__) @@ -50,6 +51,9 @@ class Revision(BaseModel): # A general purpose data field to store arbitrary information about this revision. data = models.JSONField(default=dict) + # The commit ID generated by the landing worker, before pushing to remote repo. + commit_id = models.CharField(max_length=40, null=True, blank=True) + def __repr__(self): """Return a human-readable representation of the instance.""" # Add an identifier for the Phabricator revision if it exists. @@ -105,12 +109,6 @@ class LandingJobStatus(models.TextChoices): # LDAP email of the user who requested transplant. requester_email = models.CharField(max_length=255) - # Lando's name for the repository. - repository_name = models.CharField(max_length=255) - - # URL of the repository revisions are to land to. - repository_url = models.TextField(default="") - # Identifier for the most descendent commit created by this landing. landed_commit_id = models.TextField(default="") @@ -127,21 +125,7 @@ class LandingJobStatus(models.TextChoices): target_commit_hash = models.TextField(default="") revisions = models.ManyToManyField(Revision) # TODO: order by index - - @property - def landed_revisions(self) -> dict: - """Return revision and diff ID mapping associated with the landing job.""" - return None # TODO: fix this up. - - @property - def serialized_landing_path(self): - """Return landing path based on associated revisions or legacy fields.""" - return None # TODO: fix this up. - - @property - def landing_job_identifier(self) -> str: - """Human-readable representation of the branch head.""" - return None # TODO: fix this up. + target_repo = models.ForeignKey("Repo", on_delete=models.SET_NULL, null=True) @classmethod def job_queue_query( @@ -216,7 +200,7 @@ class Repo(BaseModel): def _run(self, *args, cwd=None): cwd = cwd or self.system_path command = ["git"] + list(args) - result = subprocess.run(command, cwd=cwd) + result = subprocess.run(command, cwd=cwd, check=True, capture_output=True) return result def initialize(self): @@ -239,6 +223,36 @@ def reset(self, branch=None): self._run("reset", "--hard", branch or self.default_branch) self._run("clean", "--force") + def apply_patch(self, patch_buffer): + patch_helper = GitPatchHelper(patch_buffer) + self.patch_header = patch_helper.get_header + + # Import the diff to apply the changes then commit separately to + # ensure correct parsing of the commit message. + f_msg = tempfile.NamedTemporaryFile(encoding="utf-8", mode="w+") + f_diff = tempfile.NamedTemporaryFile(encoding="utf-8", mode="w+") + import_cmd = ["apply"] + with f_msg, f_diff: + patch_helper.write_commit_description(f_msg) + f_msg.flush() + patch_helper.write_diff(f_diff) + f_diff.flush() + + self._run(import_cmd + [f_diff.name]) + + # Commit using the extracted date, user, and commit desc. + # --landing_system is provided by the set_landing_system hgext. + date = patch_helper.get_header("Date") + user = patch_helper.get_header("User") + + self._run(["commit"] + ["--date", date] + ["--author", user]) + + def last_commit_id(self): + return self._run("rev-parse", "HEAD").stdout.strip() + + def push(self): + self._run("push") + class Worker(BaseModel): name = models.CharField(max_length=255, unique=True)