From bfead127efdf6312270cd4a9961e48e4cbe03cc4 Mon Sep 17 00:00:00 2001 From: Ian DesJardin Date: Fri, 12 Aug 2016 14:37:31 -0400 Subject: [PATCH] Individual analytic/preproc profiling --- MANIFEST.in | 1 + supremm/profile.py | 48 ++++++++++++++++++++++ supremm/statistics.py | 64 +++++++++++++++++++++++++++++ supremm/summarize.py | 86 ++++++++++++++++++++++++++++++++++----- supremm/summarize_jobs.py | 52 +++++++++++++++++++---- 5 files changed, 234 insertions(+), 17 deletions(-) create mode 100644 supremm/profile.py diff --git a/MANIFEST.in b/MANIFEST.in index 2b19162e..56aa54b8 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -15,6 +15,7 @@ include supremm/outputter.py include supremm/pcparchive.py include supremm/plugin.py include supremm/processhelpers.py +include supremm/profile.py include supremm/scripthelpers.py include supremm/statistics.py include supremm/subsample.py diff --git a/supremm/profile.py b/supremm/profile.py new file mode 100644 index 00000000..6a4553c0 --- /dev/null +++ b/supremm/profile.py @@ -0,0 +1,48 @@ +from supremm.statistics import RollingStats + +class Profile(object): + """ + Profile class keeps running total of how much time each analytic + uses overall and tabulates results + """ + + def __init__(self): + self.times = dict() + + def merge(self, record): + """Adds data from another instance of Profile to this one""" + for k in record: + if k not in self.times: + self.times[k] = record[k] + continue + for catigory in record[k]: + if catigory not in self.times[k]: + self.times[k][catigory] = RollingStats() + self.times[k][catigory] += record[k][catigory] + + def add(self, analytic, catigory, val): + if analytic in self.times: + if catigory not in self.times[analytic]: + self.times[analytic][catigory] = RollingStats() + self.times[analytic][catigory].append(val) + else: + self.times[analytic] = dict() + self.times[analytic][catigory] = RollingStats() + self.times[analytic][catigory].append(val) + + def calc_derived(self): + for k in self.times: + if 'process' in self.times[k]: + self.times[k]['extract'] = self.times[k]['process+extract'] - self.times[k]['process'] + self.times[k]['total'] = self.times[k]['process'] + self.times[k]['extract'] + self.times[k]['results'] + else: + self.times[k]['total'] = self.times[k]['process+extract'] + + def ranked(self): + self.calc_derived() + total_times = dict() + for k in self.times: + total_times[k] = self.times[k]['total'].sum() + + for k in sorted(total_times, key=total_times.get, reverse=True): + yield (k, self.times[k]) diff --git a/supremm/statistics.py b/supremm/statistics.py index c6753350..3ea304d2 100644 --- a/supremm/statistics.py +++ b/supremm/statistics.py @@ -39,6 +39,12 @@ class RollingStats(object): def __init__(self): self._count = 0 + self.m = 0 + self.last_m = 0 + self.min = 0 + self.max = 0 + self.s = 0 + self.last_s = 0 def append(self, x): self._count += 1 @@ -59,6 +65,64 @@ def append(self, x): self.min = numpy.minimum(self.min, x) self.max = numpy.maximum(self.max, x) + def __add__(self, other): + """ + RollingStats of the union of the data involved with self and other + self and other do not overlap + """ + ret = RollingStats() + ret._count = self._count + other._count + ret.m = ((self.m*self._count) + (other.m*other._count)) / ret._count + ret.last_m = ret.m + n1 = self._count + n2 = other._count + m1 = self.m + m2 = other.m + # Based on http://cas.ee.ic.ac.uk/people/dt10/research/thomas-08-sample-mean-and-variance.pdf (page 4) + # Note ret.s is going to be an estimate + ret.s = self.s + other.s + ( ( n2 / (n1*(n1+n2)) ) * ((n1*m2-n2*m1)**2)) + ret.last_s = ret.s + ret.min = numpy.minimum(self.min, other.min) + ret.max = numpy.maximum(self.max, other.max) + return ret + + def __iadd__(self, other): + ret = self + other + return ret + + def __sub__(self, other): + """ + RollingStats of the data in self and not in other + assumes self is a superset of other + """ + ret = RollingStats() + ret._count = self._count # Not sure if that's right - assumes self is superset of other + ret.m = ((self.m*self._count) - (other.m*other._count)) / ret._count + ret.last_m = ret.m + n1 = self._count + n2 = other._count + m1 = self.m + m2 = other.m + # Based formula mentioned in http://cas.ee.ic.ac.uk/people/dt10/research/thomas-08-sample-mean-and-variance.pdf (page 4) + # Note ret.s is going to be an estimate + ret.s = self.s + other.s + ( ( n2 / (n1*(n1+n2)) ) * ((n1*m2-n2*m1)**2)) + ret.last_s = ret.s + # Probably a better way to do this + minim = numpy.minimum(self.min, other.min) + if minim == other.min: # Min might be in set of data we're removing, so then don't know what should be min + ret.min = None + else: + ret.min = numpy.minimum(self.min, other.min) + maxim = numpy.maximum(self.max, other.max) + if maxim == other.max: # Same with max + ret.max = None + else: + ret.max = numpy.maximum(self.max, other.max) + return ret + + def sum(self): + return self._count * self.mean() + def get(self): """ return a dict with the various statistics """ return {'avg': self.mean(), 'min': self.min, 'max': self.max, 'cnt': self._count, 'std': math.sqrt(self.variance())} diff --git a/supremm/summarize.py b/supremm/summarize.py index f345055d..c73ee118 100644 --- a/supremm/summarize.py +++ b/supremm/summarize.py @@ -9,6 +9,7 @@ import logging import traceback from supremm.plugin import NodeMetadata +from profile import Profile import numpy import copy @@ -43,9 +44,14 @@ def __init__(self, preprocessors, analytics, job): self.job = job self.start = time.time() self.archives_processed = 0 + self._profile = False self.indomcache = None + def activate_profile(self): + self._profile = True + self.profile_dict = Profile() + def adderror(self, category, errormsg): """ All errors reported with this function show up in the job summary """ if category not in self.errors: @@ -92,13 +98,32 @@ def get(self): for analytic in self.alltimestamps: if analytic.status != "uninitialized": if analytic.mode == "all": - output[analytic.name] = analytic.results() + if self._profile: + starttime = time.time() + output[analytic.name] = analytic.results() + delta_time = time.time() - starttime + self.profile_dict.add(analytic.name, 'results', delta_time) + else: + output[analytic.name] = analytic.results() if analytic.mode == "timeseries": - timeseries[analytic.name] = analytic.results() + if self._profile: + starttime = time.time() + timeseries[analytic.name] = analytic.results() + delta_time = time.time() - starttime + self.profile_dict.add(analytic.name, 'results', delta_time) + else: + timeseries[analytic.name] = analytic.results() for analytic in self.firstlast: if analytic.status != "uninitialized": - output[analytic.name] = analytic.results() - + if self._profile: + starttime = time.time() + output[analytic.name] = analytic.results() + delta_time = time.time() - starttime + self.profile_dict.add(analytic.name, 'results', delta_time) + else: + output[analytic.name] = analytic.results() + + output['summarization'] = { "version": VERSION, "elapsed": time.time() - self.start, @@ -114,7 +139,15 @@ def get(self): output['timeseries'] = timeseries for preproc in self.preprocs: - result = preproc.results() + + if self._profile: + starttime = time.time() + result = preproc.results() + delta_t = time.time() - starttime + self.profile_dict.add(preproc.name, 'results', delta_t) + else: + result = preproc.results() + if result != None: output.update(result) @@ -248,7 +281,13 @@ def runcallback(self, analytic, result, mtypes, ctx, mdata, metric_id_array): description.append([tmpidx, tmpnames]) try: - retval = analytic.process(mdata, float(result.contents.timestamp), data, description) + if self._profile: + starttime = time.time() + retval = analytic.process(mdata, float(result.contents.timestamp), data, description) + delta_t = time.time() - starttime + self.profile_dict.add(analytic.name, 'process', delta_t) + else: + retval = analytic.process(mdata, float(result.contents.timestamp), data, description) return retval except Exception as e: logging.exception("%s %s @ %s", self.job.job_id, analytic.name, float(result.contents.timestamp)) @@ -267,7 +306,14 @@ def runpreproccall(self, preproc, result, mtypes, ctx, mdata, metric_id_array): data.append(numpy.array([pcpfast.pcpfastExtractValues(result, i, j, mtypes[i]) for j in xrange(result.contents.get_numval(i))])) - return preproc.process(float(result.contents.timestamp), data, description) + if self._profile: + starttime = time.time() + ret = preproc.process(float(result.contents.timestamp), data, description) + delta_t = time.time() - starttime + self.profile_dict.add(preproc.name, 'process', delta_t) + else: + ret = preproc.process(float(result.contents.timestamp), data, description) + return ret @staticmethod def getindomdict(ctx, metric_id_array): @@ -445,7 +491,14 @@ def processarchive(self, nodename, nodeidx, archive): newctx = context.pmDupContext() context._ctx = newctx - self.processforpreproc(context, mdata, preproc) + # Optionally keeps track of how long analyitc/preproc takes + if self._profile: + starttime = time.time() + self.processforpreproc(context, mdata, preproc) + delta_time = time.time() - starttime + self.profile_dict.add(preproc.name, 'process+extract', delta_time) + else: + self.processforpreproc(context, mdata, preproc) context.__del__() @@ -454,7 +507,13 @@ def processarchive(self, nodename, nodeidx, archive): newctx = context.pmDupContext() context._ctx = newctx - self.processforanalytic(context, mdata, analytic) + if self._profile: + starttime = time.time() + self.processforanalytic(context, mdata, analytic) + delta_time = time.time() - starttime + self.profile_dict.add(analytic.name, 'process+extract', delta_time) + else: + self.processforanalytic(context, mdata, analytic) context.__del__() @@ -463,7 +522,14 @@ def processarchive(self, nodename, nodeidx, archive): newctx = context.pmDupContext() context._ctx = newctx - self.processfirstlast(context, mdata, analytic) + if self._profile: + starttime = time.time() + self.processfirstlast(context, mdata, analytic) + delta_time = time.time() - starttime + self.profile_dict.add(analytic.name, 'process+extract', delta_time) + else: + self.processfirstlast(context, mdata, analytic) + context.__del__() diff --git a/supremm/summarize_jobs.py b/supremm/summarize_jobs.py index cdd3d615..44be37df 100644 --- a/supremm/summarize_jobs.py +++ b/supremm/summarize_jobs.py @@ -12,6 +12,7 @@ from supremm.summarize import Summarize from supremm.plugin import loadplugins, loadpreprocessors from supremm.scripthelpers import parsetime +from supremm.profile import Profile import sys import os @@ -46,6 +47,7 @@ def usage(): print " subdirectories will be created. This option is ignored " print " if multiple jobs are to be processed." print " -h --help display this help message and exit." + print " -p --profile times and logs how long each analytic takes to run" def getoptions(): @@ -67,7 +69,7 @@ def getoptions(): "resource": None } - opts, _ = getopt(sys.argv[1:], "j:r:t:dqs:e:T:D:Eo:h", + opts, _ = getopt(sys.argv[1:], "j:r:t:dqs:e:T:D:Eo:h:p", ["localjobid=", "resource=", "threads=", @@ -79,7 +81,8 @@ def getoptions(): "delete=", "extract-only", "output=", - "help"]) + "help", + "profile"]) for opt in opts: if opt[0] in ("-j", "--jobid"): @@ -107,6 +110,8 @@ def getoptions(): if opt[0] in ("-h", "--help"): usage() sys.exit(0) + if opt[0] in ("-p", "--profile"): + retdata['profile'] = True if retdata['extractonly']: # extract-only supresses archive delete @@ -135,7 +140,7 @@ def getoptions(): sys.exit(1) -def summarizejob(job, conf, resconf, plugins, preprocs, m, dblog, opts): +def summarizejob(job, conf, resconf, plugins, preprocs, m, dblog, opts, profile): """ Main job processing, Called for every job to be processed """ success = False @@ -152,6 +157,9 @@ def summarizejob(job, conf, resconf, plugins, preprocs, m, dblog, opts): analytics = [x(job) for x in plugins] s = Summarize(preprocessors, analytics, job) + if 'profile' in opts and opts['profile'] == True: + s.activate_profile() + if 0 == mergeresult: logging.info("Success for %s files in %s", job.job_id, job.jobdir) s.process() @@ -168,6 +176,9 @@ def summarizejob(job, conf, resconf, plugins, preprocs, m, dblog, opts): dblog.markasdone(job, success or force_success, time.time() - mergestart) + if 'profile' in opts and opts['profile']==True: + profile.merge(s.profile_dict.times) + except Exception as e: logging.error("Failure for job %s %s. Error: %s %s", job.job_id, job.jobdir, str(e), traceback.format_exc()) @@ -188,12 +199,20 @@ def override_defaults(resconf, opts): def processjobs(config, opts, procid): """ main function that does the work. One run of this function per process """ + starttime = None + if 'profile' in opts and opts['profile'] == True: + starttime = time.time() + preprocs = loadpreprocessors() logging.debug("Loaded %s preprocessors", len(preprocs)) plugins = loadplugins() logging.debug("Loaded %s plugins", len(plugins)) + p = None + if 'profile' in opts and opts['profile'] == True: + p = Profile() + for r, resconf in config.resourceconfigs(): if opts['resource'] == None or opts['resource'] == r or opts['resource'] == str(resconf['resource_id']): logging.info("Processing resource %s", r) @@ -211,14 +230,33 @@ def processjobs(config, opts, procid): if opts['mode'] == "single": for job in dbif.getbylocaljobid(opts['local_job_id']): - summarizejob(job, config, resconf, plugins, preprocs, m, dbif, opts) + summarizejob(job, config, resconf, plugins, preprocs, m, dbif, opts, p) elif opts['mode'] == "timerange": for job in dbif.getbytimerange(opts['start'], opts['end']): - summarizejob(job, config, resconf, plugins, preprocs, m, dbif, opts) + summarizejob(job, config, resconf, plugins, preprocs, m, dbif, opts, p) else: for job in dbif.get(None, None): - summarizejob(job, config, resconf, plugins, preprocs, m, dbif, opts) - + summarizejob(job, config, resconf, plugins, preprocs, m, dbif, opts, p) + + if 'profile' in opts and opts['profile'] == True: + del_t = time.time() - starttime + logging.info("Results of profiling") + title = "| {0:<23} {1:<16} {6:<4} {2:<5} {3:<16} {6:<4} {2:<5} {4:<17} {6:<4} {2:<5} {5:<17} {6:<4} {2:<5} |".format("analytic/preproc", 'total (sec)', '%', 'extract (sec)', 'process (sec)', 'results (sec)','freq') + logging.info('-'*len(title)) + logging.info(title) + for k, val in p.ranked(): + if 'total' in val: + if 'process' in val and 'results' in val: + logging.info("| {0:<23} {1:<16} {2:<4} {3:05.2f} {4:<16} {5:<4} {10:05.2f} {6:<17} {7:<4} {11:05.2f} {8:<17} {9:<4} {12:05.2f} |".format(k, val['total'].sum(), val['total']._count, (val['total'].sum() / del_t)*100, val['extract'].sum(), val['extract']._count, val['process'].sum(), val['process']._count, val['results'].sum(), val['results']._count, (val['extract'].sum()/del_t)*100, (val['process'].sum()/del_t)*100, (val['results'].sum()/del_t)*100)) + else: + toprint = "| {0:<23} {1:<16} {2:<4} {3:05.2f}".format(k, val['total'].sum(), val['total']._count, (val['total'].sum() / del_t)*100) + toprint = toprint + (" "*(len(title) - len(toprint)-1)) + '|' + logging.info(toprint) + else: + toprint = "| {0:<23} {1:<17}".format(k, val) + toprint = toprint + (" "*(len(title) - len(toprint)-1)) + '|' + logging.info(toprint) + logging.info('-'*len(title)) def main(): """