Skip to content

Commit

Permalink
landing_worker: add basic support (bug 1865882)
Browse files Browse the repository at this point in the history
- add basic landing worker command
- add worker mixin for common functionality
- update models to support landings
  • Loading branch information
zzzeid committed Nov 23, 2023
1 parent 1de0453 commit 3ab95a4
Show file tree
Hide file tree
Showing 5 changed files with 250 additions and 35 deletions.
86 changes: 86 additions & 0 deletions src/lando/main/management/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@

import os
import re
import subprocess
from time import sleep

from lando.main.models import Worker


class WorkerMixin:
@staticmethod
def _setup_ssh(ssh_private_key: str):
"""Add a given private ssh key to ssh agent.
SSH keys are needed in order to push to repositories that have an ssh
push path.
The private key should be passed as it is in the key file, including all
new line characters and the new line character at the end.
Args:
ssh_private_key (str): A string representing the private SSH key file.
"""
# Set all the correct environment variables
agent_process = subprocess.run(
["ssh-agent", "-s"], capture_output=True, universal_newlines=True
)

# This pattern will match keys and values, and ignore everything after the
# semicolon. For example, the output of `agent_process` is of the form:
# SSH_AUTH_SOCK=/tmp/ssh-c850kLXXOS5e/agent.120801; export SSH_AUTH_SOCK;
# SSH_AGENT_PID=120802; export SSH_AGENT_PID;
# echo Agent pid 120802;
pattern = re.compile("(.+)=([^;]*)")
for key, value in pattern.findall(agent_process.stdout):
os.environ[key] = value

# Add private SSH key to agent
# NOTE: ssh-add seems to output everything to stderr, including upon exit 0.
add_process = subprocess.run(
["ssh-add", "-"],
input=ssh_private_key,
capture_output=True,
universal_newlines=True,
)
if add_process.returncode != 0:
raise Exception(add_process.stderr)

@property
def instance(self):
return Worker.objects.get(name=self.name)

def _setup(self):
"""Perform various setup actions."""
if self.instance.ssh_private_key:
self._setup_ssh(self.instance.ssh_private_key)

def _start(self, max_loops: int | None = None, *args, **kwargs):
"""Run the main event loop."""
# NOTE: The worker will exit when max_loops is reached, or when the stop
# variable is changed to True.
loops = 0
while not self.instance.is_stopped:
if max_loops is not None and loops >= max_loops:
break
while self.instance.is_paused:
self.throttle(self.instance.sleep_seconds)
self.loop(*args, **kwargs)
loops += 1

self.stdout.write(f"{self} exited after {loops} loops.")

def throttle(self, seconds: int | None = None):
"""Sleep for a given number of seconds."""
sleep(seconds if seconds is not None else self.instance.throttle_seconds)

def start(self, max_loops: int | None = None):
"""Run setup sequence and start the event loop."""
if self.instance.is_stopped:
return
self._setup()
self._start(max_loops=max_loops)

def loop(self, *args, **kwargs):
"""The main event loop."""
raise NotImplementedError()
87 changes: 87 additions & 0 deletions src/lando/main/management/commands/landing_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from __future__ import annotations

import logging
from contextlib import contextmanager
from datetime import datetime
from io import BytesIO

from django.core.management.base import BaseCommand

from lando.main.management.commands import WorkerMixin
from lando.main.models import LandingJob

logger = logging.getLogger(__name__)


@contextmanager
def job_processing(worker: LandingWorker, job: LandingJob):
"""Mutex-like context manager that manages job processing miscellany.
This context manager facilitates graceful worker shutdown, tracks the duration of
the current job, and commits changes to the DB at the very end.
Args:
worker: the landing worker that is processing jobs
job: the job currently being processed
db: active database session
"""
start_time = datetime.now()
try:
yield
finally:
job.duration_seconds = (datetime.now() - start_time).seconds
job.save()


class LandingWorker(BaseCommand, WorkerMixin):
help = "Start the landing worker."

def add_arguments(self, parser):
pass

def handle(self, *args, **options):
self.last_job_finished = None
self.start()

def loop(self):
if self.last_job_finished is False:
logger.info("Last job did not complete, sleeping.")
self.throttle(self.handle.sleep_seconds)

job = LandingJob.next_job(
repositories=self.handle.enabled_repos
).first()

if job is None:
self.throttle(self.handle.sleep_seconds)
return

with job_processing(self, job):
job.status = LandingJob.LandingJobStatus.IN_PROGRESS
job.attempts += 1
job.save()

self.stdout.write("Starting landing job", extra={"id": job.id})
self.last_job_finished = self.run_job(job)
self.stdout.write("Finished processing landing job", extra={"id": job.id})

def run_job(self, job: LandingJob) -> bool:
repo = job.repo
if not repo.is_initialized:
repo.initialize()

repo.reset()
repo.update()

for revision in job.revisions:
patch_buffer = BytesIO(revision.patch_bytes)
repo.apply_patch(patch_buffer)

# TODO: need to account for reverts/backouts somehow in the futue.
revision.commit_id = repo._run("rev-parse", "head")
revision.save()

repo.push()

job.status = LandingJob.LandingJobStatus.LANDED
job.save()
12 changes: 0 additions & 12 deletions src/lando/main/management/commands/test_command.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Generated by Django 5.0b1 on 2023-11-23 14:22

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("main", "0002_repo_worker"),
]

operations = [
migrations.RemoveField(
model_name="landingjob",
name="repository_name",
),
migrations.RemoveField(
model_name="landingjob",
name="repository_url",
),
migrations.AddField(
model_name="landingjob",
name="target_repo",
field=models.ForeignKey(
null=True, on_delete=django.db.models.deletion.SET_NULL, to="main.repo"
),
),
migrations.AddField(
model_name="revision",
name="commit_id",
field=models.CharField(blank=True, max_length=40, null=True),
),
migrations.AlterField(
model_name="repo",
name="system_path",
field=models.FilePathField(
allow_folders=True, max_length=255, path="/mediafiles/repos"
),
),
]
60 changes: 37 additions & 23 deletions src/lando/main/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import tempfile
import datetime
import logging
import os
Expand All @@ -15,7 +16,7 @@
from django.utils.translation import gettext_lazy

from lando import settings
from lando.utils import build_patch_for_revision
from lando.utils import build_patch_for_revision, GitPatchHelper

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,6 +51,9 @@ class Revision(BaseModel):
# A general purpose data field to store arbitrary information about this revision.
data = models.JSONField(default=dict)

# The commit ID generated by the landing worker, before pushing to remote repo.
commit_id = models.CharField(max_length=40, null=True, blank=True)

def __repr__(self):
"""Return a human-readable representation of the instance."""
# Add an identifier for the Phabricator revision if it exists.
Expand Down Expand Up @@ -105,12 +109,6 @@ class LandingJobStatus(models.TextChoices):
# LDAP email of the user who requested transplant.
requester_email = models.CharField(max_length=255)

# Lando's name for the repository.
repository_name = models.CharField(max_length=255)

# URL of the repository revisions are to land to.
repository_url = models.TextField(default="")

# Identifier for the most descendent commit created by this landing.
landed_commit_id = models.TextField(default="")

Expand All @@ -127,21 +125,7 @@ class LandingJobStatus(models.TextChoices):
target_commit_hash = models.TextField(default="")

revisions = models.ManyToManyField(Revision) # TODO: order by index

@property
def landed_revisions(self) -> dict:
"""Return revision and diff ID mapping associated with the landing job."""
return None # TODO: fix this up.

@property
def serialized_landing_path(self):
"""Return landing path based on associated revisions or legacy fields."""
return None # TODO: fix this up.

@property
def landing_job_identifier(self) -> str:
"""Human-readable representation of the branch head."""
return None # TODO: fix this up.
target_repo = models.ForeignKey("Repo", on_delete=models.SET_NULL, null=True)

@classmethod
def job_queue_query(
Expand Down Expand Up @@ -216,7 +200,7 @@ class Repo(BaseModel):
def _run(self, *args, cwd=None):
cwd = cwd or self.system_path
command = ["git"] + list(args)
result = subprocess.run(command, cwd=cwd)
result = subprocess.run(command, cwd=cwd, check=True, capture_output=True)
return result

def initialize(self):
Expand All @@ -239,6 +223,36 @@ def reset(self, branch=None):
self._run("reset", "--hard", branch or self.default_branch)
self._run("clean", "--force")

def apply_patch(self, patch_buffer):
patch_helper = GitPatchHelper(patch_buffer)
self.patch_header = patch_helper.get_header

# Import the diff to apply the changes then commit separately to
# ensure correct parsing of the commit message.
f_msg = tempfile.NamedTemporaryFile(encoding="utf-8", mode="w+")
f_diff = tempfile.NamedTemporaryFile(encoding="utf-8", mode="w+")
import_cmd = ["apply"]
with f_msg, f_diff:
patch_helper.write_commit_description(f_msg)
f_msg.flush()
patch_helper.write_diff(f_diff)
f_diff.flush()

self._run(import_cmd + [f_diff.name])

# Commit using the extracted date, user, and commit desc.
# --landing_system is provided by the set_landing_system hgext.
date = patch_helper.get_header("Date")
user = patch_helper.get_header("User")

self._run(["commit"] + ["--date", date] + ["--author", user])

def last_commit_id(self):
return self._run("rev-parse", "HEAD").stdout.strip()

def push(self):
self._run("push")


class Worker(BaseModel):
name = models.CharField(max_length=255, unique=True)
Expand Down

0 comments on commit 3ab95a4

Please sign in to comment.