From c9109befb0175e77a6bb6ea6b3a41aaf7eb985e6 Mon Sep 17 00:00:00 2001 From: Nikodemas Tuckus Date: Tue, 5 Mar 2024 12:03:22 +0100 Subject: [PATCH] Dont choose null task --- cpueff-goweb/spark/cpueff_stepchain_goweb.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cpueff-goweb/spark/cpueff_stepchain_goweb.py b/cpueff-goweb/spark/cpueff_stepchain_goweb.py index 8775c407..70f4ccf6 100644 --- a/cpueff-goweb/spark/cpueff_stepchain_goweb.py +++ b/cpueff-goweb/spark/cpueff_stepchain_goweb.py @@ -70,9 +70,9 @@ def udf_step_extract(row): _fwjr_id = row['meta_data']['fwjr_id'] _jobtype = row['meta_data']['jobtype'] _ts = row['meta_data']['ts'] - if 'steps' in row and row['steps']: + if 'steps' in row and row['steps'] is not None: for step in filter(None, row['steps']): - if ('name' in step) and step['name'].lower().startswith('cmsrun'): + if 'name' in step and step['name'].lower().startswith('cmsrun'): step_res = {'Task': _task_name, 'ts': _ts, 'fwjr_id': _fwjr_id, 'JobType': _jobtype} count += 1 @@ -88,11 +88,11 @@ def udf_step_extract(row): step_res['threads_total_job_time'] = step_res['job_time'] * step_res['nthreads'] except Exception: step_res['threads_total_job_time'] = None - if step['output']: + if 'output' in step and step['output'] is not None: for outx in step['output']: if outx['acquisitionEra']: step_res['acquisition_era'].append(outx['acquisitionEra']) - if 'performance' in step: + if 'performance' in step and step['performance'] is not None: performance = step['performance'] if 'storage' in performance: if 'writeTotalMB' in performance['storage']: @@ -174,6 +174,7 @@ def main(start_date, end_date, hdfs_out_dir, last_n_days): .filter(f"""data.meta_data.jobstate='success' AND data.wmats >= {start_date.timestamp()} AND data.wmats < {end_date.timestamp()} + AND data.task IS NOT NULL """) .filter(col('data.meta_data.jobtype').isin(_PROD_CMS_JOB_TYPES_FILTER)) )