From 0d6083ecd8eef26488cefd6ca768c47c68e5c56b Mon Sep 17 00:00:00 2001 From: Mark Bell <mbell697@gmail.com> Date: Sat, 5 Mar 2016 12:59:34 -0500 Subject: [PATCH] Expose maxDataPoints to storage finders --- docs/finders.rst | 53 ++++++++++++++++++++++++++++++++++ graphite_api/app.py | 1 + graphite_api/node.py | 7 +++-- graphite_api/render/datalib.py | 15 ++++++++-- tests/test_finders.py | 33 +++++++++++++++++++-- 5 files changed, 103 insertions(+), 6 deletions(-) diff --git a/docs/finders.rst b/docs/finders.rst index d1229ac..b6c5614 100644 --- a/docs/finders.rst +++ b/docs/finders.rst @@ -118,6 +118,59 @@ in three steps: ``time_info`` is the same structure as the one returned by ``fetch()``. ``series`` is a dictionnary with paths as keys and datapoints as values. +Computing aggregations in the storage system +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Many applications for graphite-api make use of the ``maxDataPoints`` option +limiting the number of values in each resulting series. Normally the +aggregation of series to reduce the number of data points is handled via +the ``consolidateBy`` function in graphite-api. However some storage systems +may be able to compute the needed aggregations internally, reducing IO and often +increasing performance. + +To make use of this feature, graphite-api can pass the ``maxDataPoints`` +parameter through to the storage layer. + +To support this feature in your custom Finder/Reader you need to do the following: + +* Add the ``__aggregating__`` attribute to your reader class and accept a + third ``max_data_points`` parameter to ``fetch`` :: + + from graphite_api.intervals import IntervalSet, Interval + + class CustomReader(object): + __aggregating__ = True + __slots__ = ('path',) # __slots__ is recommended to save memory on readers + + def __init__(self, path): + self.path = path + + def fetch(self, start_time, end_time, max_data_points): + # fetch data, rollup to max_data_points by whatever means your + # storage system allows + time_info = _from_, _to_, _step_ + return time_info, series + + def get_intervals(self): + return IntervalSet([Interval(start, end)]) + + ``max_data_points`` will either be ``None`` or an ``Integer`` + +* If your finder supports ``__fetch_multi``, also add the ``__aggregating__`` + attribute to your finder class and accept a ``max_data_points`` parameter + to ``fetch_multi``:: + + class CustomFinder(objects): + __fetch_multi__ = 'custom' + __aggregating__ = True + + def fetch_multi(self, nodes, start_time, end_time, max_data_points): + paths = [node.path for node in nodes] + # fetch paths rolling up each to max_data_points + return time_info, series + + ``max_data_points`` will either be ``None`` or an ``Integer`` + Installing custom finders ^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/graphite_api/app.py b/graphite_api/app.py index 0dc5bd3..a35c635 100644 --- a/graphite_api/app.py +++ b/graphite_api/app.py @@ -361,6 +361,7 @@ def render(): 'startTime': request_options['startTime'], 'endTime': request_options['endTime'], 'tzinfo': request_options['tzinfo'], + 'maxDataPoints': request_options.get('maxDataPoints'), 'data': [], } diff --git a/graphite_api/node.py b/graphite_api/node.py index 1872c05..5c15e14 100644 --- a/graphite_api/node.py +++ b/graphite_api/node.py @@ -24,8 +24,11 @@ def __init__(self, path, reader): self.intervals = reader.get_intervals() self.is_leaf = True - def fetch(self, startTime, endTime): - return self.reader.fetch(startTime, endTime) + def fetch(self, startTime, endTime, maxDataPoints=None): + if hasattr(self.reader, '__aggregating__'): + return self.reader.fetch(startTime, endTime, maxDataPoints) + else: + return self.reader.fetch(startTime, endTime) def __repr__(self): return '<LeafNode[%x]: %s (%s)>' % (id(self), self.path, self.reader) diff --git a/graphite_api/render/datalib.py b/graphite_api/render/datalib.py index 890d406..df163cd 100644 --- a/graphite_api/render/datalib.py +++ b/graphite_api/render/datalib.py @@ -127,6 +127,7 @@ def fetchData(requestContext, pathExprs): from ..app import app startTime = int(epoch(requestContext['startTime'])) endTime = int(epoch(requestContext['endTime'])) + maxDataPoints = requestContext.get('maxDataPoints') # Convert to list if given single path if not isinstance(pathExprs, list): @@ -157,14 +158,24 @@ def fetchData(requestContext, pathExprs): nodes = multi_nodes[finder.__fetch_multi__] if not nodes: continue - time_info, series = finder.fetch_multi(nodes, startTime, endTime) + + if hasattr(finder, '__aggregating__'): + time_info, series = finder.fetch_multi(nodes, + startTime, + endTime, + maxDataPoints) + else: + time_info, series = finder.fetch_multi(nodes, startTime, endTime) + for path, values in series.items(): data_store.add_data(path, time_info, values, path_to_exprs[path]) # Single fetches fetches = [ - (node, node.fetch(startTime, endTime)) for node in single_nodes] + (node, node.fetch(startTime, endTime, maxDataPoints)) + for node in single_nodes + ] for node, results in fetches: if not results: logger.info("no results", node=node, start=startTime, diff --git a/tests/test_finders.py b/tests/test_finders.py index 6956126..e6e8f98 100644 --- a/tests/test_finders.py +++ b/tests/test_finders.py @@ -10,7 +10,7 @@ class FinderTest(TestCase): def test_custom_finder(self): - store = Store([DummyFinder()]) + store = Store([DummyFinder(DummyReader)]) nodes = list(store.find("foo")) self.assertEqual(len(nodes), 1) self.assertEqual(nodes[0].path, 'foo') @@ -24,6 +24,32 @@ def test_custom_finder(self): self.assertEqual(time_info, (100, 200, 10)) self.assertEqual(len(series), 10) + def test_aggregating_reader(self): + store = Store([DummyFinder(DummyAggregatingReader)]) + nodes = list(store.find('bar.*')) + node = nodes[0] + + time_info, series = node.fetch(100, 200, 12) + self.assertEqual(time_info, (100, 200, 10)) + self.assertEqual(len(series), 12) + + +class DummyAggregatingReader(object): + __aggregating__ = True + + __slots__ = ('path',) + + def __init__(self, path): + self.path = path + + def fetch(self, start_time, end_time, max_data_points): + return (start_time, end_time, 10), [ + random.choice([None, 1, 2, 3]) for i in range(max_data_points) + ] + + def get_intervals(self): + return IntervalSet([Interval(time.time() - 3600, time.time())]) + class DummyReader(object): __slots__ = ('path',) @@ -42,6 +68,9 @@ def get_intervals(self): class DummyFinder(object): + def __init__(self, ReaderClass): + self.ReaderClass = ReaderClass + def find_nodes(self, query): if query.pattern == 'foo': yield BranchNode('foo') @@ -49,4 +78,4 @@ def find_nodes(self, query): elif query.pattern == 'bar.*': for i in range(10): path = 'bar.{0}'.format(i) - yield LeafNode(path, DummyReader(path)) + yield LeafNode(path, self.ReaderClass(path))