Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Individual analytic/preproc profiling #49

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ include supremm/outputter.py
include supremm/pcparchive.py
include supremm/plugin.py
include supremm/processhelpers.py
include supremm/profile.py
include supremm/scripthelpers.py
include supremm/statistics.py
include supremm/subsample.py
Expand Down
48 changes: 48 additions & 0 deletions supremm/profile.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from supremm.statistics import RollingStats

class Profile(object):
"""
Profile class keeps running total of how much time each analytic
uses overall and tabulates results
"""

def __init__(self):
self.times = dict()

def merge(self, record):
"""Adds data from another instance of Profile to this one"""
for k in record:
if k not in self.times:
self.times[k] = record[k]
continue
for catigory in record[k]:
if catigory not in self.times[k]:
self.times[k][catigory] = RollingStats()
self.times[k][catigory] += record[k][catigory]

def add(self, analytic, catigory, val):
if analytic in self.times:
if catigory not in self.times[analytic]:
self.times[analytic][catigory] = RollingStats()
self.times[analytic][catigory].append(val)
else:
self.times[analytic] = dict()
self.times[analytic][catigory] = RollingStats()
self.times[analytic][catigory].append(val)

def calc_derived(self):
for k in self.times:
if 'process' in self.times[k]:
self.times[k]['extract'] = self.times[k]['process+extract'] - self.times[k]['process']
self.times[k]['total'] = self.times[k]['process'] + self.times[k]['extract'] + self.times[k]['results']
else:
self.times[k]['total'] = self.times[k]['process+extract']

def ranked(self):
self.calc_derived()
total_times = dict()
for k in self.times:
total_times[k] = self.times[k]['total'].sum()

for k in sorted(total_times, key=total_times.get, reverse=True):
yield (k, self.times[k])
64 changes: 64 additions & 0 deletions supremm/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class RollingStats(object):

def __init__(self):
self._count = 0
self.m = 0
self.last_m = 0
self.min = 0
self.max = 0
self.s = 0
self.last_s = 0

def append(self, x):
self._count += 1
Expand All @@ -59,6 +65,64 @@ def append(self, x):
self.min = numpy.minimum(self.min, x)
self.max = numpy.maximum(self.max, x)

def __add__(self, other):
"""
RollingStats of the union of the data involved with self and other
self and other do not overlap
"""
ret = RollingStats()
ret._count = self._count + other._count
ret.m = ((self.m*self._count) + (other.m*other._count)) / ret._count
ret.last_m = ret.m
n1 = self._count
n2 = other._count
m1 = self.m
m2 = other.m
# Based on http://cas.ee.ic.ac.uk/people/dt10/research/thomas-08-sample-mean-and-variance.pdf (page 4)
# Note ret.s is going to be an estimate
ret.s = self.s + other.s + ( ( n2 / (n1*(n1+n2)) ) * ((n1*m2-n2*m1)**2))
ret.last_s = ret.s
ret.min = numpy.minimum(self.min, other.min)
ret.max = numpy.maximum(self.max, other.max)
return ret

def __iadd__(self, other):
ret = self + other
return ret

def __sub__(self, other):
"""
RollingStats of the data in self and not in other
assumes self is a superset of other
"""
ret = RollingStats()
ret._count = self._count # Not sure if that's right - assumes self is superset of other
ret.m = ((self.m*self._count) - (other.m*other._count)) / ret._count
ret.last_m = ret.m
n1 = self._count
n2 = other._count
m1 = self.m
m2 = other.m
# Based formula mentioned in http://cas.ee.ic.ac.uk/people/dt10/research/thomas-08-sample-mean-and-variance.pdf (page 4)
# Note ret.s is going to be an estimate
ret.s = self.s + other.s + ( ( n2 / (n1*(n1+n2)) ) * ((n1*m2-n2*m1)**2))
ret.last_s = ret.s
# Probably a better way to do this
minim = numpy.minimum(self.min, other.min)
if minim == other.min: # Min might be in set of data we're removing, so then don't know what should be min
ret.min = None
else:
ret.min = numpy.minimum(self.min, other.min)
maxim = numpy.maximum(self.max, other.max)
if maxim == other.max: # Same with max
ret.max = None
else:
ret.max = numpy.maximum(self.max, other.max)
return ret

def sum(self):
return self._count * self.mean()

def get(self):
""" return a dict with the various statistics """
return {'avg': self.mean(), 'min': self.min, 'max': self.max, 'cnt': self._count, 'std': math.sqrt(self.variance())}
Expand Down
86 changes: 76 additions & 10 deletions supremm/summarize.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
import traceback
from supremm.plugin import NodeMetadata
from profile import Profile

import numpy
import copy
Expand Down Expand Up @@ -43,9 +44,14 @@ def __init__(self, preprocessors, analytics, job):
self.job = job
self.start = time.time()
self.archives_processed = 0
self._profile = False

self.indomcache = None

def activate_profile(self):
self._profile = True
self.profile_dict = Profile()

def adderror(self, category, errormsg):
""" All errors reported with this function show up in the job summary """
if category not in self.errors:
Expand Down Expand Up @@ -92,13 +98,32 @@ def get(self):
for analytic in self.alltimestamps:
if analytic.status != "uninitialized":
if analytic.mode == "all":
output[analytic.name] = analytic.results()
if self._profile:
starttime = time.time()
output[analytic.name] = analytic.results()
delta_time = time.time() - starttime
self.profile_dict.add(analytic.name, 'results', delta_time)
else:
output[analytic.name] = analytic.results()
if analytic.mode == "timeseries":
timeseries[analytic.name] = analytic.results()
if self._profile:
starttime = time.time()
timeseries[analytic.name] = analytic.results()
delta_time = time.time() - starttime
self.profile_dict.add(analytic.name, 'results', delta_time)
else:
timeseries[analytic.name] = analytic.results()
for analytic in self.firstlast:
if analytic.status != "uninitialized":
output[analytic.name] = analytic.results()

if self._profile:
starttime = time.time()
output[analytic.name] = analytic.results()
delta_time = time.time() - starttime
self.profile_dict.add(analytic.name, 'results', delta_time)
else:
output[analytic.name] = analytic.results()


output['summarization'] = {
"version": VERSION,
"elapsed": time.time() - self.start,
Expand All @@ -114,7 +139,15 @@ def get(self):
output['timeseries'] = timeseries

for preproc in self.preprocs:
result = preproc.results()

if self._profile:
starttime = time.time()
result = preproc.results()
delta_t = time.time() - starttime
self.profile_dict.add(preproc.name, 'results', delta_t)
else:
result = preproc.results()

if result != None:
output.update(result)

Expand Down Expand Up @@ -248,7 +281,13 @@ def runcallback(self, analytic, result, mtypes, ctx, mdata, metric_id_array):
description.append([tmpidx, tmpnames])

try:
retval = analytic.process(mdata, float(result.contents.timestamp), data, description)
if self._profile:
starttime = time.time()
retval = analytic.process(mdata, float(result.contents.timestamp), data, description)
delta_t = time.time() - starttime
self.profile_dict.add(analytic.name, 'process', delta_t)
else:
retval = analytic.process(mdata, float(result.contents.timestamp), data, description)
return retval
except Exception as e:
logging.exception("%s %s @ %s", self.job.job_id, analytic.name, float(result.contents.timestamp))
Expand All @@ -267,7 +306,14 @@ def runpreproccall(self, preproc, result, mtypes, ctx, mdata, metric_id_array):
data.append(numpy.array([pcpfast.pcpfastExtractValues(result, i, j, mtypes[i])
for j in xrange(result.contents.get_numval(i))]))

return preproc.process(float(result.contents.timestamp), data, description)
if self._profile:
starttime = time.time()
ret = preproc.process(float(result.contents.timestamp), data, description)
delta_t = time.time() - starttime
self.profile_dict.add(preproc.name, 'process', delta_t)
else:
ret = preproc.process(float(result.contents.timestamp), data, description)
return ret

@staticmethod
def getindomdict(ctx, metric_id_array):
Expand Down Expand Up @@ -445,7 +491,14 @@ def processarchive(self, nodename, nodeidx, archive):
newctx = context.pmDupContext()
context._ctx = newctx

self.processforpreproc(context, mdata, preproc)
# Optionally keeps track of how long analyitc/preproc takes
if self._profile:
starttime = time.time()
self.processforpreproc(context, mdata, preproc)
delta_time = time.time() - starttime
self.profile_dict.add(preproc.name, 'process+extract', delta_time)
else:
self.processforpreproc(context, mdata, preproc)

context.__del__()

Expand All @@ -454,7 +507,13 @@ def processarchive(self, nodename, nodeidx, archive):
newctx = context.pmDupContext()
context._ctx = newctx

self.processforanalytic(context, mdata, analytic)
if self._profile:
starttime = time.time()
self.processforanalytic(context, mdata, analytic)
delta_time = time.time() - starttime
self.profile_dict.add(analytic.name, 'process+extract', delta_time)
else:
self.processforanalytic(context, mdata, analytic)

context.__del__()

Expand All @@ -463,7 +522,14 @@ def processarchive(self, nodename, nodeidx, archive):
newctx = context.pmDupContext()
context._ctx = newctx

self.processfirstlast(context, mdata, analytic)
if self._profile:
starttime = time.time()
self.processfirstlast(context, mdata, analytic)
delta_time = time.time() - starttime
self.profile_dict.add(analytic.name, 'process+extract', delta_time)
else:
self.processfirstlast(context, mdata, analytic)


context.__del__()

Expand Down
Loading