Skip to content

Commit

Permalink
dl-v2 handling graceful termination in license deployment (#471)
Browse files Browse the repository at this point in the history
* Handling gracefull termination in license dep.

* fic typo

* add terminationGracePeriod to license dep

* removed backoff limit

* version bump

* fixes for pod replacement

* increased terminationGracePeriodSeconds

* added doc string

* lint fixes

* minor nit

* server version bump
  • Loading branch information
aniketsinghrawat authored Sep 2, 2024
1 parent 9314b5e commit f0ead39
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 8 deletions.
2 changes: 1 addition & 1 deletion weather_dl_v2/fastapi-server/VERSION.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.6
1.0.7
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ kind: Job
metadata:
name: weather-dl-v2-license-dep
spec:
backoffLimit: 0
backoffLimit: 5
podReplacementPolicy: Failed
template:
spec:
restartPolicy: Never
nodeSelector:
cloud.google.com/gke-nodepool: default-pool
restartPolicy: OnFailure
containers:
- name: weather-dl-v2-license-dep
image: XXXXXXX
Expand All @@ -24,6 +23,7 @@ spec:
volumeMounts:
- name: config-volume
mountPath: ./config
terminationGracePeriodSeconds: 172800 # 48 hours
volumes:
- name: config-volume
configMap:
Expand Down
26 changes: 23 additions & 3 deletions weather_dl_v2/license_deployment/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from job_creator import create_download_job
from clients import CLIENTS
from manifest import FirestoreManifest
from util import exceptionit, ThreadSafeDict
from util import exceptionit, ThreadSafeDict, GracefulKiller

db_client = FirestoreClient()
log_client = google.cloud.logging.Client()
Expand Down Expand Up @@ -148,14 +148,20 @@ def fetch_request_from_db():
def main():
logger.info("Started looking at the request.")
error_map = ThreadSafeDict()
killer = GracefulKiller()
with ThreadPoolExecutor(concurrency_limit) as executor:
# Disclaimer: A license will pick always pick concurrency_limit + 1
# parition. One extra parition will be kept in threadpool task queue.

log_count = 0
while True:
# Fetch a request from the database
request = fetch_request_from_db()
# Check if SIGTERM was recived for graceful termination.
if not killer.kill_now:
# Fetch a request from the database.
request = fetch_request_from_db()
else:
logger.warning('SIGTERM recieved. Stopping further requets processing.')
break

if request is not None:
executor.submit(make_fetch_request, request, error_map)
Expand All @@ -176,6 +182,18 @@ def main():
log_count = 1 if log_count >= 3600 else log_count + 1
time.sleep(1)

logger.warning('Graceful Termination. Waiting for remaining requests to complete.')

# Making sure all pending requests are completed.
executor.shutdown(wait=True)

logger.warning('Graceful Termination. Completed all pending requests.')

# We want mark the pod as failed as we want to start a new pod which will
# continue to fetch requests.
raise RuntimeError('License Deployment was Graceful Terminated. ' \
'Raising Error to mark the pod as failed.')


def boot_up(license: str) -> None:
global license_id, client_name, concurrency_limit
Expand Down Expand Up @@ -206,3 +224,5 @@ def boot_up(license: str) -> None:
except Exception as e:
logger.info(f"License error: {e}.")
raise e

logger.info('License deployment shutting down.')
15 changes: 15 additions & 0 deletions weather_dl_v2/license_deployment/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import hashlib
import itertools
import os
import signal
import socket
import subprocess
import sys
Expand Down Expand Up @@ -63,6 +64,20 @@ def _retry_if_valid_input_but_server_or_socket_error_and_timeout_filter(
return True
return retry.retry_if_valid_input_but_server_error_and_timeout_filter(exception)

class GracefulKiller:
"""Class to check for SIGTERM signal.
Used to handle gracefull termination. If ever SIGTERM is recived by
the process GracefulKiller.kill_now will be `true`."""

kill_now = False
def __init__(self):
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

def exit_gracefully(self, signum, frame):
logger.warning('SIGTERM recieved.')
self.kill_now = True


class _FakeClock:

Expand Down

0 comments on commit f0ead39

Please sign in to comment.