Skip to content

Commit

Permalink
landing_worker: add basic support (bug 1865882)
Browse files Browse the repository at this point in the history
- add basic landing worker command
- add worker mixin for common functionality
- update models to support landings
  • Loading branch information
zzzeid committed Dec 1, 2023
1 parent 2c8605d commit 27acbdb
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 44 deletions.
9 changes: 8 additions & 1 deletion src/lando/main/admin.py
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
# Register your models here.
from django.contrib import admin

from lando.main.models import LandingJob, Repo, Revision, Worker

admin.site.register(LandingJob, admin.ModelAdmin)
admin.site.register(Revision, admin.ModelAdmin)
admin.site.register(Repo, admin.ModelAdmin)
admin.site.register(Worker, admin.ModelAdmin)
85 changes: 85 additions & 0 deletions src/lando/main/management/commands/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import os
import re
import subprocess
from time import sleep

from lando.main.models import Worker


class WorkerMixin:
@staticmethod
def _setup_ssh(ssh_private_key: str):
"""Add a given private ssh key to ssh agent.
SSH keys are needed in order to push to repositories that have an ssh
push path.
The private key should be passed as it is in the key file, including all
new line characters and the new line character at the end.
Args:
ssh_private_key (str): A string representing the private SSH key file.
"""
# Set all the correct environment variables
agent_process = subprocess.run(
["ssh-agent", "-s"], capture_output=True, universal_newlines=True
)

# This pattern will match keys and values, and ignore everything after the
# semicolon. For example, the output of `agent_process` is of the form:
# SSH_AUTH_SOCK=/tmp/ssh-c850kLXXOS5e/agent.120801; export SSH_AUTH_SOCK;
# SSH_AGENT_PID=120802; export SSH_AGENT_PID;
# echo Agent pid 120802;
pattern = re.compile("(.+)=([^;]*)")
for key, value in pattern.findall(agent_process.stdout):
os.environ[key] = value

# Add private SSH key to agent
# NOTE: ssh-add seems to output everything to stderr, including upon exit 0.
add_process = subprocess.run(
["ssh-add", "-"],
input=ssh_private_key,
capture_output=True,
universal_newlines=True,
)
if add_process.returncode != 0:
raise Exception(add_process.stderr)

@property
def _instance(self):
return Worker.objects.get(name=self.name)

def _setup(self):
"""Perform various setup actions."""
if self._instance.ssh_private_key:
self._setup_ssh(self._instance.ssh_private_key)

def _start(self, max_loops: int | None = None, *args, **kwargs):
"""Run the main event loop."""
# NOTE: The worker will exit when max_loops is reached, or when the stop
# variable is changed to True.
loops = 0
while not self._instance.is_stopped:
if max_loops is not None and loops >= max_loops:
break
while self._instance.is_paused:
self.throttle(self._instance.sleep_seconds)
self.loop(*args, **kwargs)
loops += 1

self.stdout.write(f"{self} exited after {loops} loops.")

def throttle(self, seconds: int | None = None):
"""Sleep for a given number of seconds."""
sleep(seconds if seconds is not None else self._instance.throttle_seconds)

def start(self, max_loops: int | None = None):
"""Run setup sequence and start the event loop."""
if self._instance.is_stopped:
return
self._setup()
self._start(max_loops=max_loops)

def loop(self, *args, **kwargs):
"""The main event loop."""
raise NotImplementedError()
86 changes: 86 additions & 0 deletions src/lando/main/management/commands/landing_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from __future__ import annotations

import logging
from contextlib import contextmanager
from datetime import datetime
from io import StringIO

from django.core.management.base import BaseCommand
from django.db import transaction
from lando.main.management.commands import WorkerMixin
from lando.main.models import LandingJob, LandingJobStatus

logger = logging.getLogger(__name__)


@contextmanager
def job_processing(job: LandingJob):
"""Mutex-like context manager that manages job processing miscellany.
This context manager facilitates graceful worker shutdown, tracks the duration of
the current job, and commits changes to the DB at the very end.
Args:
job: the job currently being processed
db: active database session
"""
start_time = datetime.now()
try:
yield
finally:
job.duration_seconds = (datetime.now() - start_time).seconds


class Command(BaseCommand, WorkerMixin):
help = "Start the landing worker."
name = "landing-worker"

def add_arguments(self, parser):
pass

def handle(self, *args, **options):
self.last_job_finished = None
self.start()

def loop(self):
if self.last_job_finished is False:
logger.info("Last job did not complete, sleeping.")
self.throttle(self._instance.sleep_seconds)

for repo in self._instance.enabled_repos:
if not repo.is_initialized:
repo.initialize()

with transaction.atomic():
job = LandingJob.next_job(repositories=self._instance.enabled_repos).first()

if job is None:
self.throttle(self._instance.sleep_seconds)
return

with job_processing(job):
job.status = LandingJobStatus.IN_PROGRESS
job.attempts += 1
job.save()

self.stdout.write(f"Starting landing job {job}")
self.last_job_finished = self.run_job(job)
self.stdout.write("Finished processing landing job")

def run_job(self, job: LandingJob) -> bool:
repo = job.target_repo
repo.reset()
repo.pull()

for revision in job.revisions.all():
patch_buffer = StringIO(revision.patch)
repo.apply_patch(patch_buffer)

# TODO: need to account for reverts/backouts somehow in the futue.
revision.commit_id = repo._run("rev-parse", "HEAD").stdout.strip()
revision.save()

repo.push()

job.status = LandingJobStatus.LANDED
job.save()
12 changes: 0 additions & 12 deletions src/lando/main/management/commands/test_command.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# Generated by Django 5.0rc1 on 2023-12-01 16:16

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("main", "0002_repo_worker"),
]

operations = [
migrations.RemoveField(
model_name="landingjob",
name="repository_name",
),
migrations.RemoveField(
model_name="landingjob",
name="repository_url",
),
migrations.RemoveField(
model_name="revision",
name="patch_bytes",
),
migrations.AddField(
model_name="landingjob",
name="target_repo",
field=models.ForeignKey(
null=True, on_delete=django.db.models.deletion.SET_NULL, to="main.repo"
),
),
migrations.AddField(
model_name="revision",
name="commit_id",
field=models.CharField(blank=True, max_length=40, null=True),
),
migrations.AddField(
model_name="revision",
name="patch",
field=models.TextField(blank=True, default=""),
),
migrations.AlterField(
model_name="landingjob",
name="landed_commit_id",
field=models.TextField(blank=True, default=""),
),
migrations.AlterField(
model_name="landingjob",
name="requester_email",
field=models.CharField(blank=True, default="", max_length=255),
),
migrations.AlterField(
model_name="landingjob",
name="target_commit_hash",
field=models.TextField(blank=True, default=""),
),
migrations.AlterField(
model_name="repo",
name="system_path",
field=models.FilePathField(
allow_folders=True,
blank=True,
default="",
max_length=255,
path="/mediafiles/repos",
),
),
migrations.AlterField(
model_name="revision",
name="data",
field=models.JSONField(blank=True, default=dict),
),
migrations.AlterField(
model_name="revision",
name="diff_id",
field=models.IntegerField(blank=True, null=True),
),
migrations.AlterField(
model_name="revision",
name="patch_data",
field=models.JSONField(blank=True, default=dict),
),
migrations.AlterField(
model_name="revision",
name="revision_id",
field=models.IntegerField(blank=True, null=True, unique=True),
),
]
Loading

0 comments on commit 27acbdb

Please sign in to comment.