Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scaleup healing #6018

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions terraform-aws-github-runner/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ module "runners" {
environment = var.environment
tags = local.tags

scale_config_org = var.scale_config_org
scale_config_repo = var.scale_config_repo
scale_config_repo_path = var.scale_config_repo_path

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@ module.exports = {
lines: 80,
statements: 80
}
}
},
moduleNameMapper: {
axios: 'axios/dist/node/axios.cjs', // Allow axios to work in tests
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@types/uuid": "^9.0.1",
"async-mutex": "^0.4.0",
"aws-sdk": "^2.863.0",
"axios": "^1.7.7",
"cron-parser": "^3.3.0",
"generic-pool": "^3.9.0",
"lru-cache": "^6.0.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Config } from './scale-runners/config';
import { ScaleUpMetrics, sendMetricsAtTimeout, sendMetricsTimeoutVars } from './scale-runners/metrics';
import { getDelayWithJitterRetryCount, stochaticRunOvershoot } from './scale-runners/utils';
import { scaleDown as scaleDownR } from './scale-runners/scale-down';
import { scaleUpChron as scaleUpChronR } from './scale-runners/scale-up-chron';
import { sqsSendMessages, sqsDeleteMessageBatch } from './scale-runners/sqs';

async function sendRetryEvents(evtFailed: Array<[SQSRecord, boolean, number]>, metrics: ScaleUpMetrics) {
Expand Down Expand Up @@ -155,3 +156,17 @@ export async function scaleDown(event: ScheduledEvent, context: Context, callbac
return callback('Failed');
}
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
export async function scaleUpChron(event: ScheduledEvent, context: Context, callback: any) {
// we mantain open connections to redis, so the event pool is only cleaned when the SIGTERM is sent
context.callbackWaitsForEmptyEventLoop = false;

try {
await scaleUpChronR();
return callback(null);
} catch (e) {
console.error(e);
return callback('Failed');
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ export class Config {
readonly retryScaleUpRecordQueueUrl: string | undefined;
readonly runnerGroupName: string | undefined;
readonly runnersExtraLabels: undefined | string;
readonly scaleConfigOrg: string;
readonly scaleConfigRepo: string;
readonly scaleConfigRepoPath: string;
readonly scaleUpRecordQueueUrl: string | undefined;
readonly secretsManagerSecretsId: string | undefined;
readonly sSMParamCleanupAgeDays: number;
readonly sSMParamMaxCleanupAllowance: number;
Expand Down Expand Up @@ -94,8 +96,10 @@ export class Config {
/* istanbul ignore next */
this.retryScaleUpRecordJitterPct = Number(process.env.RETRY_SCALE_UP_RECORD_JITTER_PCT || '0');
this.retryScaleUpRecordQueueUrl = process.env.RETRY_SCALE_UP_RECORD_QUEUE_URL;
this.scaleUpRecordQueueUrl = process.env.SCALE_UP_RECORD_QUEUE_URL;
this.runnerGroupName = process.env.RUNNER_GROUP_NAME;
this.runnersExtraLabels = process.env.RUNNER_EXTRA_LABELS;
this.scaleConfigOrg = process.env.SCALE_CONFIG_ORG || '';
/* istanbul ignore next */
this.scaleConfigRepo = process.env.SCALE_CONFIG_REPO || '';
if (this.enableOrganizationRunners && !this.scaleConfigRepo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1424,6 +1424,12 @@ export class ScaleDownMetrics extends Metrics {
}
}

export class ScaleUpChronMetrics extends Metrics {
constructor() {
super('scaleUpChron');
}
}

export interface sendMetricsTimeoutVars {
metrics?: Metrics;
setTimeout?: ReturnType<typeof setTimeout>;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Import the required modules
import { getQueuedJobs } from './scale-up-chron';


test('getQueuedRunners should fetch queued runners', async () => {
const runners = await getQueuedJobs();
console.log('Queued Runners:', runners);
expect(runners).toBeDefined();
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import moment from 'moment';
import nock from 'nock';
import { mocked } from 'ts-jest/utils';
import { Config } from './config';

// Import the required modules
import { getQueuedJobs } from './scale-up-chron';


describe('scaleUp', () => {

it('getQueuedRunners should fetch queued runners', async () => {
const runners = await getQueuedJobs();
console.log('Queued Runners:', runners);
expect(runners).toBeDefined();
});
});

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import axios from 'axios';

import { Config } from './config';
import { getRepo } from './utils';
import { ScaleUpChronMetrics } from './metrics';
import { getRunnerTypes } from './gh-runners';
import { sqsSendMessages } from './sqs';
import { ActionRequestMessage } from './scale-up';
import { randomUUID } from 'crypto';

export async function scaleUpChron(): Promise<void> {
// This function does the following:
// 1. Queries for queued runners via HUD
// 2. Polls scale-config to filter the list to ones that are self-hosted by this fleet and
// are ephemeral
// 3. Sends a SQS request to the scale-up lambda to provision more of those instances
let queuedJobs = await getQueuedJobs();

const scaleConfigRepo = getRepo(Config.Instance.scaleConfigOrg, Config.Instance.scaleConfigRepo);


const metrics = new ScaleUpChronMetrics();
const validRunnerTypes = await getRunnerTypes(scaleConfigRepo, metrics);

const minAutoScaleupDelayMinutes = 30;
// Only proactively scale up the jobs that have been queued for longer than normal
queuedJobs = queuedJobs.filter((runner) => {
return runner.min_queue_time_minutes >= minAutoScaleupDelayMinutes &&
runner.org === Config.Instance.scaleConfigOrg;
});

// Filter out the queued jobs that are do not correspond to a valid runner type
queuedJobs = queuedJobs.filter((requested_runner) => {
return Array.from(validRunnerTypes.keys()).some((available_runner_label) => {
return available_runner_label === requested_runner.runner_label;
});
});

// Send a message to the SQS queue to scale up the runners
let scaleUpRequests : Array<ActionRequestMessage> = queuedJobs.map((runner) => {
return {
"id": Math.floor(Math.random() * 100000000000000),
"eventType": "workflow_job",
"repositoryName": runner.repo,
"repositoryOwner": runner.org,
"runnerLabels": [runner.runner_label],
};
});

if (!Config.Instance.scaleUpRecordQueueUrl) {
throw new Error('scaleUpRecordQueueUrl is not set. Cannot send scale up requests');
}

await sqsSendMessages(metrics, scaleUpRequests, Config.Instance.scaleUpRecordQueueUrl);
}

class QueuedJobsForRunner {
runner_label: string;
org: string;
repo: string;
num_queued_jobs: number;
min_queue_time_minutes: number;
max_queue_time_minutes: number;

constructor(runner_label: string, org: string, repo: string, num_queued_jobs: number, min_queue_time_minutes: number, max_queue_time_minutes: number) {
this.runner_label = runner_label;
this.org = org;
this.repo = repo;
this.num_queued_jobs = num_queued_jobs;
this.min_queue_time_minutes = min_queue_time_minutes;
this.max_queue_time_minutes = max_queue_time_minutes;
}
}

export async function getQueuedJobs(): Promise<QueuedJobsForRunner[]> {
// This function queries the HUD for queued runners
// and returns a list of them

const url = 'https://hud.pytorch.org/api/clickhouse/queued_jobs_aggregate?parameters=%5B%5D';

try {
const response = await axios.get(url);

// Map the response to the class
const queued_runners = response.data.map((runner: any) => {
return new QueuedJobsForRunner(
runner.runner_label,
runner.org,
runner.repo,
runner.num_queued_jobs,
runner.min_queue_time_minutes,
runner.max_queue_time_minutes);
});
return queued_runners;
} catch (error) {
console.error('Error fetching queued runners:', error);
return [];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
AWSTemplateFormatVersion: '2010-09-09'
Transform: 'AWS::Serverless-2016-10-31'
Resources:
ScaleUpChronFunction:
Type: 'AWS::Serverless::Function'
Properties:
Handler: index.scaleUpChron
Runtime: nodejs20.x
Events:
ScheduledEvent:
Type: Schedule
Properties:
Schedule: 'rate(1 minute)'
Original file line number Diff line number Diff line change
Expand Up @@ -1381,6 +1381,15 @@ aws-sdk@^2.863.0:
uuid "8.0.0"
xml2js "0.6.2"

axios@^1.7.7:
version "1.7.7"
resolved "https://registry.yarnpkg.com/axios/-/axios-1.7.7.tgz#2f554296f9892a72ac8d8e4c5b79c14a91d0a47f"
integrity sha512-S4kL7XrjgBmvdGut0sN3yJxqYzrDOnivkBiN0OFs6hLiUam3UPvswUo0kqGyhqUZGEOytHyumEdXsAkgCOUf3Q==
dependencies:
follow-redirects "^1.15.6"
form-data "^4.0.0"
proxy-from-env "^1.1.0"

babel-jest@^26.6.3:
version "26.6.3"
resolved "https://registry.yarnpkg.com/babel-jest/-/babel-jest-26.6.3.tgz#d87d25cb0037577a0c89f82e5755c5d293c01056"
Expand Down Expand Up @@ -2326,6 +2335,11 @@ flatted@^3.2.9:
resolved "https://registry.yarnpkg.com/flatted/-/flatted-3.3.1.tgz#21db470729a6734d4997002f439cb308987f567a"
integrity sha512-X8cqMLLie7KsNUDSdzeN8FYK9rEt4Dt67OsG/DNGnYTSDBG4uFAJFBnUeiV+zCVAvwFy56IjM9sH51jVaEhNxw==

follow-redirects@^1.15.6:
version "1.15.9"
resolved "https://registry.yarnpkg.com/follow-redirects/-/follow-redirects-1.15.9.tgz#a604fa10e443bf98ca94228d9eebcc2e8a2c8ee1"
integrity sha512-gew4GsXizNgdoRyqmyfMHyAmXsZDk6mHkSxZFCzW9gwlbtOW44CDtYavM+y+72qD/Vq2l550kMF52DT8fOLJqQ==

for-each@^0.3.3:
version "0.3.3"
resolved "https://registry.yarnpkg.com/for-each/-/for-each-0.3.3.tgz#69b447e88a0a5d32c3e7084f3f1710034b21376e"
Expand All @@ -2347,6 +2361,15 @@ form-data@^3.0.0:
combined-stream "^1.0.8"
mime-types "^2.1.12"

form-data@^4.0.0:
version "4.0.1"
resolved "https://registry.yarnpkg.com/form-data/-/form-data-4.0.1.tgz#ba1076daaaa5bfd7e99c1a6cb02aa0a5cff90d48"
integrity sha512-tzN8e4TX8+kkxGPK8D5u0FNmjPUjw3lwC9lSLxxoB/+GtsJG91CO8bSWy73APlgAZzZbXEYZJuxjkHH2w+Ezhw==
dependencies:
asynckit "^0.4.0"
combined-stream "^1.0.8"
mime-types "^2.1.12"

fragment-cache@^0.2.1:
version "0.2.1"
resolved "https://registry.yarnpkg.com/fragment-cache/-/fragment-cache-0.2.1.tgz#4290fad27f13e89be7f33799c6bc5a0abfff0d19"
Expand Down Expand Up @@ -4028,6 +4051,11 @@ propagate@^2.0.0:
resolved "https://registry.yarnpkg.com/propagate/-/propagate-2.0.1.tgz#40cdedab18085c792334e64f0ac17256d38f9a45"
integrity sha512-vGrhOavPSTz4QVNuBNdcNXePNdNMaO1xj9yBeH1ScQPjk/rhg9sSlCXPhMkFuaNNW/syTvYqsnbIJxMBfRbbag==

proxy-from-env@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/proxy-from-env/-/proxy-from-env-1.1.0.tgz#e102f16ca355424865755d2c9e8ea4f24d58c3e2"
integrity sha512-D+zkORCbA9f1tdWRK0RaCR3GPv50cMxcrz4X8k5LTSUD1Dkw47mKJEZQNunItRTkWwgtaUSo1RVFRIG9ZXiFYg==

psl@^1.1.33:
version "1.9.0"
resolved "https://registry.yarnpkg.com/psl/-/psl-1.9.0.tgz#d0df2a137f00794565fcaf3b2c00cd09f8d5a5a7"
Expand Down
1 change: 1 addition & 0 deletions terraform-aws-github-runner/modules/runners/scale-down.tf
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ resource "aws_lambda_function" "scale_down" {
MINIMUM_RUNNING_TIME_IN_MINUTES = var.minimum_running_time_in_minutes
REDIS_ENDPOINT = var.redis_endpoint
REDIS_LOGIN = var.redis_login
SCALE_CONFIG_ORG = var.scale_config_org
SCALE_CONFIG_REPO = var.scale_config_repo
SCALE_CONFIG_REPO_PATH = var.scale_config_repo_path
SCALE_DOWN_CONFIG = jsonencode(var.idle_config)
Expand Down
Loading
Loading