diff --git a/history_backup_spider_cms.py b/history_backup_spider_cms.py new file mode 100755 index 0000000..eee2930 --- /dev/null +++ b/history_backup_spider_cms.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python +""" +Script to fill the gap for any failure of history results of schedds. + +How to run: + - Please create new directory and copy etc/es.conf, password, username from /home/cmsjobmon/cms-htcondor-es directory from vocm0240 + - Please activate venv in /home/cmsjobmon/cms-htcondor-es, it includes required htcondor/classad versions + - Please test your script in read_only mode, it will just print converted classAd results: + > python history_backup_spider_cms.py --read_only --backup_start 1662268653 --backup_end 1662322692 --log_dir /$TEST_DIR/cms-htcondor-es/log_history/ --log_level INFO --collectors_file /home/cmsjobmon/cms-htcondor-es/etc/collectors.json + - Check only ES with test index pattern, in below example it will send data to "cms-test-backup" index in es-cms.cern.ch: + > python history_backup_spider_cms.py --feed_es --es_index_template "cms-test-backup" --backup_start 1662268653 --backup_end 1662269653 --log_dir /$TEST_DIR/cms-htcondor-es/log_history/ --log_level INFO --collectors_file /home/cmsjobmon/cms-htcondor-es/etc/collectors.json --es_hostname "es-cms.cern.ch" + - If everything fine, please find the gap time period in UTC unix epoch seconds format and give "--backup_start" and "--backup_end" parameters: + > python history_backup_spider_cms.py --backup_start 1662268653 --backup_end 1662322692 --feed_amq --feed_es --es_bunch_size 2000 --email_alerts 'cms-comp-monit-alerts@cern.ch' --log_dir /$TEST_DIR/cms-htcondor-es/log_history/ --log_level WARNING --collectors_file /home/cmsjobmon/cms-htcondor-es/etc/collectors.json --es_hostname "es-cms.cern.ch" + + - It will use checkpoint_test.json just to fill last checkpoints, it will not use it. + - [ATTENTION] to be able to get affiliations from "/home/cmsjobmon/.affiliation_dir.json" file, you need to run this script in vocms0240 and with cmsjobmon user. + You don't need to give this file name, because it's read from "AffiliationManager._AffiliationManager__DEFAULT_DIR_PATH" in convert_to_json.py + +""" + +import os +import sys +import time +import signal +import logging +import argparse +import multiprocessing + +try: + import htcondor_es +except ImportError: + if os.path.exists("src/htcondor_es/__init__.py") and "src" not in sys.path: + sys.path.append("src") + +import htcondor_es.history_backup +from htcondor_es.utils import ( + get_schedds, + get_schedds_from_file, + set_up_logging, + send_email_alert, +) +from htcondor_es.utils import collect_metadata, TIMEOUT_MINS + + +def main_driver(args): + """ + Driver method for the spider script. + """ + starttime = time.time() + + signal.alarm(TIMEOUT_MINS * 60 + 60) + + # Get all the schedd ads + schedd_ads = [] + if args.collectors_file: + schedd_ads = get_schedds_from_file(args, collectors_file=args.collectors_file) + del ( + args.collectors_file + ) # sending a file through postprocessing will cause problems. + else: + schedd_ads = get_schedds(args, collectors=args.collectors) + logging.warning("&&& There are %d schedds to query.", len(schedd_ads)) + + pool = multiprocessing.Pool(processes=args.query_pool_size) + + metadata = collect_metadata() + + if not args.skip_history: + htcondor_es.history_backup.process_histories( + schedd_ads=schedd_ads, + starttime=starttime, + pool=pool, + args=args, + metadata=metadata, + ) + + pool.close() + pool.join() + + logging.warning( + "@@@ Total processing time: %.2f mins", ((time.time() - starttime) / 60.0) + ) + + return 0 + + +def main(): + """ + Main method for the spider_cms script. + + Parses arguments and invokes main_driver + """ + parser = argparse.ArgumentParser() + parser.add_argument( + "--process_queue", + action="store_true", + dest="process_queue", + help="Process also schedd queue (Running/Idle/Pending jobs)", + ) + parser.add_argument( + "--feed_es", action="store_true", dest="feed_es", help="Feed to Elasticsearch" + ) + parser.add_argument( + "--feed_es_for_queues", + action="store_true", + dest="feed_es_for_queues", + help="Feed queue data also to Elasticsearch", + ) + parser.add_argument( + "--feed_amq", action="store_true", dest="feed_amq", help="Feed to CERN AMQ" + ) + + parser.add_argument( + "--schedd_filter", + default="", + type=str, + dest="schedd_filter", + help=( + "Comma separated list of schedd names to process " + "[default is to process all]" + ), + ) + parser.add_argument( + "--skip_history", + action="store_true", + dest="skip_history", + help="Skip processing the history. (Only do queues.)", + ) + parser.add_argument( + "--read_only", + action="store_true", + dest="read_only", + help="Only read the info, don't submit it.", + ) + parser.add_argument( + "--dry_run", + action="store_true", + dest="dry_run", + help=( + "Don't even read info, just pretend to. (Still " + "query the collector for the schedd's though.)" + ), + ) + parser.add_argument( + "--max_documents_to_process", + default=0, + type=int, + dest="max_documents_to_process", + help=( + "Abort after this many documents (per schedd). " + "[default: %(default)d (process all)]" + ), + ) + parser.add_argument( + "--keep_full_queue_data", + action="store_true", + dest="keep_full_queue_data", + help="Drop all but some fields for running jobs.", + ) + parser.add_argument( + "--amq_bunch_size", + default=5000, + type=int, + dest="amq_bunch_size", + help=("Send docs to AMQ in bunches of this number " "[default: %(default)d]"), + ) + parser.add_argument( + "--es_bunch_size", + default=250, + type=int, + dest="es_bunch_size", + help=("Send docs to ES in bunches of this number " "[default: %(default)d]"), + ) + parser.add_argument( + "--query_queue_batch_size", + default=50, + type=int, + dest="query_queue_batch_size", + help=( + "Send docs to listener in batches of this number " "[default: %(default)d]" + ), + ) + parser.add_argument( + "--upload_pool_size", + default=8, + type=int, + dest="upload_pool_size", + help=("Number of parallel processes for uploading " "[default: %(default)d]"), + ) + parser.add_argument( + "--query_pool_size", + default=8, + type=int, + dest="query_pool_size", + help=("Number of parallel processes for querying " "[default: %(default)d]"), + ) + + parser.add_argument( + "--es_hostname", + default="es-cms.cern.ch", + type=str, + dest="es_hostname", + help="Hostname of the elasticsearch instance to be used " + "[default: %(default)s]", + ) + parser.add_argument( + "--es_port", + default=9203, + type=int, + dest="es_port", + help="Port of the elasticsearch instance to be used " "[default: %(default)d]", + ) + parser.add_argument( + "--es_index_template", + default="cms", + type=str, + dest="es_index_template", + help=( + "Trunk of index pattern. " + "Needs to start with 'cms' " + "[default: %(default)s]" + ), + ) + parser.add_argument( + "--log_dir", + default="log/", + type=str, + dest="log_dir", + help="Directory for logging information [default: %(default)s]", + ) + parser.add_argument( + "--log_level", + default="WARNING", + type=str, + dest="log_level", + help="Log level (CRITICAL/ERROR/WARNING/INFO/DEBUG) " "[default: %(default)s]", + ) + parser.add_argument( + "--email_alerts", + default=[], + action="append", + dest="email_alerts", + help="Email addresses for alerts [default: none]", + ) + parser.add_argument( + "--collectors", + default=[ + "cmssrv623.fnal.gov:9620", + "cmsgwms-collector-tier0.cern.ch:9620", + "cmssrv276.fnal.gov", + "cmsgwms-collector-itb.cern.ch", + "vocms0840.cern.ch", + ], + action="append", + dest="collectors", + help="Collectors' addresses", + ) + parser.add_argument( + "--collectors_file", + default=None, + action="store", + type=argparse.FileType("r"), + dest="collectors_file", + help="FIle defining the pools and collectors", + ) + parser.add_argument( + "--backup_start", + default=None, + action="store", + type=int, + dest="backup_start", + help="Unix epoch timestamp, UTC tz in seconds", + ) + parser.add_argument( + "--backup_end", + default=None, + action="store", + type=int, + dest="backup_end", + help="Unix epoch timestamp, UTC tz in seconds", + ) + args = parser.parse_args() + set_up_logging(args) + + # --dry_run implies read_only + args.read_only = args.read_only or args.dry_run + + main_driver(args) + + +if __name__ == "__main__": + main() diff --git a/src/htcondor_es/history.py b/src/htcondor_es/history.py index b48e34f..c666daf 100755 --- a/src/htcondor_es/history.py +++ b/src/htcondor_es/history.py @@ -213,7 +213,8 @@ def update_checkpoint(name, completion_date): try: with open("checkpoint.json", "r") as fd: checkpoint = json.load(fd) - except IOError as ValueError: + except Exception as e: + logging.warning("ERROR - checkpoint.json is not readable as json. " + str(e)) checkpoint = {} checkpoint[name] = completion_date @@ -229,7 +230,9 @@ def process_histories(schedd_ads, starttime, pool, args, metadata=None): """ try: checkpoint = json.load(open("checkpoint.json")) - except IOError as ValueError: + except Exception as e: + # Exception should be general + logging.warning("ERROR - checkpoint.json is not readable as json. " + str(e)) checkpoint = {} futures = [] diff --git a/src/htcondor_es/history_backup.py b/src/htcondor_es/history_backup.py new file mode 100644 index 0000000..e721751 --- /dev/null +++ b/src/htcondor_es/history_backup.py @@ -0,0 +1,320 @@ +""" +Methods for processing the history in a schedd queue. + +Created for backup script "history_backup_spider_cms.py". It will not use checkpoint.json file but backup_start and backup_end parameters. +""" + +import json +import time +import logging +import datetime +import traceback +import multiprocessing + +import classad +import htcondor +import elasticsearch + +import htcondor_es.es +import htcondor_es.amq +from htcondor_es.utils import send_email_alert, time_remaining, TIMEOUT_MINS +from htcondor_es.convert_to_json import convert_to_json +from htcondor_es.convert_to_json import convert_dates_to_millisecs +from htcondor_es.convert_to_json import unique_doc_id + + +def unix_ts_to_str(ts): + """Convert UTC timestamp in seconds to date string""" + return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M:%S") + + +def process_schedd( + starttime, last_completion, checkpoint_queue, schedd_ad, args, metadata=None +): + """ + Given a schedd, process its entire set of history since last checkpoint. + """ + my_start = time.time() + pool_name = schedd_ad.get("CMS_Pool", "Unknown") + if time_remaining(starttime) < 0: + message = ( + "No time remaining to process %s history; exiting." % schedd_ad["Name"] + ) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms history timeout warning", message + ) + return last_completion + + metadata = metadata or {} + schedd = htcondor.Schedd(schedd_ad) + _q = """ + ( + ( EnteredCurrentStatus >= %(backup_start)d && EnteredCurrentStatus < %(backup_end)d ) + || + ( CRAB_PostJobLastUpdate >= %(backup_start)d && CRAB_PostJobLastUpdate < %(backup_end)d ) + ) + && (CMS_Type != "DONOTMONIT") + """ + history_query = classad.ExprTree(_q % {"backup_start": args.backup_start, "backup_end": args.backup_end}) + logging.info( + "Querying %s for history: %s. " " between %s ", + schedd_ad["Name"], + history_query, + unix_ts_to_str(args.backup_start) + " - " + unix_ts_to_str(args.backup_end), + ) + buffered_ads = {} + count = 0 + total_upload = 0 + sent_warnings = False + timed_out = False + error = False + if not args.read_only: + if args.feed_es: + es = htcondor_es.es.get_server_handle(args) + try: + if not args.dry_run: + history_iter = schedd.history(history_query, [], match=-1) + else: + history_iter = [] + + for job_ad in history_iter: + dict_ad = None + try: + dict_ad = convert_to_json(job_ad, return_dict=True, pool_name=pool_name) + except Exception as e: + message = "Failure when converting document on %s history: %s" % ( + schedd_ad["Name"], + str(e), + ) + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.warning(message) + if not sent_warnings: + send_email_alert( + args.email_alerts, + "spider_cms history document conversion error", + message, + ) + sent_warnings = True + + if not dict_ad: + continue + + idx = htcondor_es.es.get_index( + job_ad["QDate"], + template=args.es_index_template, + update_es=(args.feed_es and not args.read_only), + ) + ad_list = buffered_ads.setdefault(idx, []) + ad_list.append((unique_doc_id(dict_ad), dict_ad)) + + if len(ad_list) == args.es_bunch_size: + st = time.time() + if not args.read_only: + if args.feed_es: + htcondor_es.es.post_ads( + es.handle, idx, ad_list, metadata=metadata + ) + if args.feed_amq: + data_for_amq = [ + (id_, convert_dates_to_millisecs(dict_ad)) + for id_, dict_ad in ad_list + ] + htcondor_es.amq.post_ads(data_for_amq, metadata=metadata) + + logging.debug( + "...posting %d ads from %s (process_schedd)", + len(ad_list), + schedd_ad["Name"], + ) + total_upload += time.time() - st + buffered_ads[idx] = [] + + count += 1 + + # Find the most recent job and use that date as the new + # last_completion date + job_completion = job_ad.get("EnteredCurrentStatus") + if job_completion > last_completion: + last_completion = job_completion + + if time_remaining(starttime) < 0: + message = ( + "History crawler on %s has been running for " + "more than %d minutes; exiting." % (schedd_ad["Name"], TIMEOUT_MINS) + ) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms history timeout warning", message + ) + timed_out = True + break + + if args.max_documents_to_process and count > args.max_documents_to_process: + logging.warning( + "Aborting after %d documents (--max_documents_to_process option)" + % args.max_documents_to_process + ) + break + # Post the remaining ads + for idx, ad_list in list(buffered_ads.items()): + if ad_list: + logging.debug( + "...posting remaining %d ads from %s " "(process_schedd)", + len(ad_list), + schedd_ad["Name"], + ) + if not args.read_only: + if args.feed_es: + htcondor_es.es.post_ads( + es.handle, idx, ad_list, metadata=metadata + ) + if args.feed_amq: + data_for_amq = [ + (id_, convert_dates_to_millisecs(dict_ad)) + for id_, dict_ad in ad_list + ] + htcondor_es.amq.post_ads(data_for_amq, metadata=metadata) + except RuntimeError: + message = "Failed to query schedd for job history: %s" % schedd_ad["Name"] + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.error(message) + error = True + + except Exception as exn: + message = "Failure when processing schedd history query on %s: %s" % ( + schedd_ad["Name"], + str(exn), + ) + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.exception(message) + send_email_alert( + args.email_alerts, "spider_cms schedd history query error", message + ) + error = True + + total_time = (time.time() - my_start) / 60.0 + total_upload /= 60.0 + last_formatted = datetime.datetime.fromtimestamp(last_completion).strftime( + "%Y-%m-%d %H:%M:%S" + ) + logging.warning( + "Schedd %-25s history: response count: %5d; last completion %s; " + "query time %.2f min; upload time %.2f min", + schedd_ad["Name"], + count, + last_formatted, + total_time - total_upload, + total_upload, + ) + + # If we got to this point without a timeout, all these jobs have + # been processed and uploaded, so we can update the checkpoint + if not timed_out and not error: + checkpoint_queue.put((schedd_ad["Name"], last_completion)) + + return last_completion + + +def update_checkpoint(name, completion_date): + try: + with open("checkpoint_test.json", "r") as fd: + checkpoint = json.load(fd) + except Exception as e: + logging.warning("ERROR - checkpoint_test.json is not readable as json. " + str(e)) + checkpoint = {} + + checkpoint[name] = completion_date + + with open("checkpoint_test.json", "w") as fd: + json.dump(checkpoint, fd) + + +def process_histories(schedd_ads, starttime, pool, args, metadata=None): + """ + Process history files for each schedd listed in a given + multiprocessing pool + """ + checkpoint = {} + + futures = [] + metadata = metadata or {} + metadata["spider_source"] = "condor_history" + + manager = multiprocessing.Manager() + checkpoint_queue = manager.Queue() + + for schedd_ad in schedd_ads: + name = schedd_ad["Name"] + + # Check for last completion time + # If there was no previous completion, get last 12 h + last_completion = args.backup_start + + + future = pool.apply_async( + process_schedd, + (starttime, last_completion, checkpoint_queue, schedd_ad, args, metadata), + ) + futures.append((name, future)) + + def _chkp_updater(): + while True: + try: + job = checkpoint_queue.get() + if job is None: # Swallow poison pill + break + except EOFError as error: + logging.warning( + "EOFError - Nothing to consume left in the queue %s", error + ) + break + update_checkpoint(*job) + + chkp_updater = multiprocessing.Process(target=_chkp_updater) + chkp_updater.start() + + # Check whether one of the processes timed out and reset their last + # completion checkpoint in case + timed_out = False + for name, future in futures: + if time_remaining(starttime) > -10: + try: + future.get(time_remaining(starttime) + 10) + except multiprocessing.TimeoutError: + # This implies that the checkpoint hasn't been updated + message = "Schedd %s history timed out; ignoring progress." % name + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.error(message) + send_email_alert( + args.email_alerts, "spider_cms history timeout warning", message + ) + except elasticsearch.exceptions.TransportError: + message = ( + "Transport error while sending history data of %s; ignoring progress." + % name + ) + exc = traceback.format_exc() + message += "\n{}".format(exc) + logging.error(message) + send_email_alert( + args.email_alerts, + "spider_cms history transport error warning", + message, + ) + else: + timed_out = True + break + if timed_out: + pool.terminate() + + checkpoint_queue.put(None) # Send a poison pill + chkp_updater.join() + + logging.warning( + "Processing time for history: %.2f mins", ((time.time() - starttime) / 60.0) + ) diff --git a/src/htcondor_es/utils.py b/src/htcondor_es/utils.py index 0cfd62b..5825030 100644 --- a/src/htcondor_es/utils.py +++ b/src/htcondor_es/utils.py @@ -79,6 +79,10 @@ def send_email_alert(recipients, subject, message): """ Send a simple email alert (typically of failure). """ + # TMP: somehow send_email_alert still sending alerts + # let's disablee this feature for now + return + if not recipients: return msg = email.mime.text.MIMEText(message)