From f0ead39e143607966b1bbfc3ab273eb15037c1cb Mon Sep 17 00:00:00 2001 From: Aniket Singh Rawat <122869307+aniketsinghrawat@users.noreply.github.com> Date: Mon, 2 Sep 2024 23:30:47 +0530 Subject: [PATCH] `dl-v2` handling graceful termination in license deployment (#471) * Handling gracefull termination in license dep. * fic typo * add terminationGracePeriod to license dep * removed backoff limit * version bump * fixes for pod replacement * increased terminationGracePeriodSeconds * added doc string * lint fixes * minor nit * server version bump --- weather_dl_v2/fastapi-server/VERSION.txt | 2 +- .../license_dep/license_deployment.yaml | 8 +++--- weather_dl_v2/license_deployment/fetch.py | 26 ++++++++++++++++--- weather_dl_v2/license_deployment/util.py | 15 +++++++++++ 4 files changed, 43 insertions(+), 8 deletions(-) diff --git a/weather_dl_v2/fastapi-server/VERSION.txt b/weather_dl_v2/fastapi-server/VERSION.txt index ece61c60..f9cbc01a 100644 --- a/weather_dl_v2/fastapi-server/VERSION.txt +++ b/weather_dl_v2/fastapi-server/VERSION.txt @@ -1 +1 @@ -1.0.6 \ No newline at end of file +1.0.7 \ No newline at end of file diff --git a/weather_dl_v2/fastapi-server/license_dep/license_deployment.yaml b/weather_dl_v2/fastapi-server/license_dep/license_deployment.yaml index a46b6f6f..f06735b2 100644 --- a/weather_dl_v2/fastapi-server/license_dep/license_deployment.yaml +++ b/weather_dl_v2/fastapi-server/license_dep/license_deployment.yaml @@ -5,12 +5,11 @@ kind: Job metadata: name: weather-dl-v2-license-dep spec: - backoffLimit: 0 + backoffLimit: 5 + podReplacementPolicy: Failed template: spec: - restartPolicy: Never - nodeSelector: - cloud.google.com/gke-nodepool: default-pool + restartPolicy: OnFailure containers: - name: weather-dl-v2-license-dep image: XXXXXXX @@ -24,6 +23,7 @@ spec: volumeMounts: - name: config-volume mountPath: ./config + terminationGracePeriodSeconds: 172800 # 48 hours volumes: - name: config-volume configMap: diff --git a/weather_dl_v2/license_deployment/fetch.py b/weather_dl_v2/license_deployment/fetch.py index 72ae50bf..f15dcc29 100644 --- a/weather_dl_v2/license_deployment/fetch.py +++ b/weather_dl_v2/license_deployment/fetch.py @@ -26,7 +26,7 @@ from job_creator import create_download_job from clients import CLIENTS from manifest import FirestoreManifest -from util import exceptionit, ThreadSafeDict +from util import exceptionit, ThreadSafeDict, GracefulKiller db_client = FirestoreClient() log_client = google.cloud.logging.Client() @@ -148,14 +148,20 @@ def fetch_request_from_db(): def main(): logger.info("Started looking at the request.") error_map = ThreadSafeDict() + killer = GracefulKiller() with ThreadPoolExecutor(concurrency_limit) as executor: # Disclaimer: A license will pick always pick concurrency_limit + 1 # parition. One extra parition will be kept in threadpool task queue. log_count = 0 while True: - # Fetch a request from the database - request = fetch_request_from_db() + # Check if SIGTERM was recived for graceful termination. + if not killer.kill_now: + # Fetch a request from the database. + request = fetch_request_from_db() + else: + logger.warning('SIGTERM recieved. Stopping further requets processing.') + break if request is not None: executor.submit(make_fetch_request, request, error_map) @@ -176,6 +182,18 @@ def main(): log_count = 1 if log_count >= 3600 else log_count + 1 time.sleep(1) + logger.warning('Graceful Termination. Waiting for remaining requests to complete.') + + # Making sure all pending requests are completed. + executor.shutdown(wait=True) + + logger.warning('Graceful Termination. Completed all pending requests.') + + # We want mark the pod as failed as we want to start a new pod which will + # continue to fetch requests. + raise RuntimeError('License Deployment was Graceful Terminated. ' \ + 'Raising Error to mark the pod as failed.') + def boot_up(license: str) -> None: global license_id, client_name, concurrency_limit @@ -206,3 +224,5 @@ def boot_up(license: str) -> None: except Exception as e: logger.info(f"License error: {e}.") raise e + + logger.info('License deployment shutting down.') diff --git a/weather_dl_v2/license_deployment/util.py b/weather_dl_v2/license_deployment/util.py index d24a1405..0f1672e6 100644 --- a/weather_dl_v2/license_deployment/util.py +++ b/weather_dl_v2/license_deployment/util.py @@ -19,6 +19,7 @@ import hashlib import itertools import os +import signal import socket import subprocess import sys @@ -63,6 +64,20 @@ def _retry_if_valid_input_but_server_or_socket_error_and_timeout_filter( return True return retry.retry_if_valid_input_but_server_error_and_timeout_filter(exception) +class GracefulKiller: + """Class to check for SIGTERM signal. + Used to handle gracefull termination. If ever SIGTERM is recived by + the process GracefulKiller.kill_now will be `true`.""" + + kill_now = False + def __init__(self): + signal.signal(signal.SIGINT, self.exit_gracefully) + signal.signal(signal.SIGTERM, self.exit_gracefully) + + def exit_gracefully(self, signum, frame): + logger.warning('SIGTERM recieved.') + self.kill_now = True + class _FakeClock: