Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose maxDataPoints to storage finders #159

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions docs/finders.rst
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,59 @@ in three steps:
``time_info`` is the same structure as the one returned by ``fetch()``.
``series`` is a dictionnary with paths as keys and datapoints as values.

Computing aggregations in the storage system
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Many applications for graphite-api make use of the ``maxDataPoints`` option
limiting the number of values in each resulting series. Normally the
aggregation of series to reduce the number of data points is handled via
the ``consolidateBy`` function in graphite-api. However some storage systems
may be able to compute the needed aggregations internally, reducing IO and often
increasing performance.

To make use of this feature, graphite-api can pass the ``maxDataPoints``
parameter through to the storage layer.

To support this feature in your custom Finder/Reader you need to do the following:

* Add the ``__aggregating__`` attribute to your reader class and accept a
third ``max_data_points`` parameter to ``fetch`` ::

from graphite_api.intervals import IntervalSet, Interval

class CustomReader(object):
__aggregating__ = True
__slots__ = ('path',) # __slots__ is recommended to save memory on readers

def __init__(self, path):
self.path = path

def fetch(self, start_time, end_time, max_data_points):
# fetch data, rollup to max_data_points by whatever means your
# storage system allows
time_info = _from_, _to_, _step_
return time_info, series

def get_intervals(self):
return IntervalSet([Interval(start, end)])

``max_data_points`` will either be ``None`` or an ``Integer``

* If your finder supports ``__fetch_multi``, also add the ``__aggregating__``
attribute to your finder class and accept a ``max_data_points`` parameter
to ``fetch_multi``::

class CustomFinder(objects):
__fetch_multi__ = 'custom'
__aggregating__ = True

def fetch_multi(self, nodes, start_time, end_time, max_data_points):
paths = [node.path for node in nodes]
# fetch paths rolling up each to max_data_points
return time_info, series

``max_data_points`` will either be ``None`` or an ``Integer``

Installing custom finders
^^^^^^^^^^^^^^^^^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions graphite_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ def render():
'startTime': request_options['startTime'],
'endTime': request_options['endTime'],
'tzinfo': request_options['tzinfo'],
'maxDataPoints': request_options.get('maxDataPoints'),
'data': [],
}

Expand Down
7 changes: 5 additions & 2 deletions graphite_api/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ def __init__(self, path, reader):
self.intervals = reader.get_intervals()
self.is_leaf = True

def fetch(self, startTime, endTime):
return self.reader.fetch(startTime, endTime)
def fetch(self, startTime, endTime, maxDataPoints=None):
if hasattr(self.reader, '__aggregating__'):
return self.reader.fetch(startTime, endTime, maxDataPoints)
else:
return self.reader.fetch(startTime, endTime)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use inspect.signature (or getargspec on Python 2) to "guess" the ability of .fetch to accept maxDataPoints instead of having to rely on a magic attribute?

Or do a try… except TypeError but it would be less elegant…


def __repr__(self):
return '<LeafNode[%x]: %s (%s)>' % (id(self), self.path, self.reader)
15 changes: 13 additions & 2 deletions graphite_api/render/datalib.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def fetchData(requestContext, pathExprs):
from ..app import app
startTime = int(epoch(requestContext['startTime']))
endTime = int(epoch(requestContext['endTime']))
maxDataPoints = requestContext.get('maxDataPoints')

# Convert to list if given single path
if not isinstance(pathExprs, list):
Expand Down Expand Up @@ -157,14 +158,24 @@ def fetchData(requestContext, pathExprs):
nodes = multi_nodes[finder.__fetch_multi__]
if not nodes:
continue
time_info, series = finder.fetch_multi(nodes, startTime, endTime)

if hasattr(finder, '__aggregating__'):
time_info, series = finder.fetch_multi(nodes,
startTime,
endTime,
maxDataPoints)
else:
time_info, series = finder.fetch_multi(nodes, startTime, endTime)

for path, values in series.items():
data_store.add_data(path, time_info, values,
path_to_exprs[path])

# Single fetches
fetches = [
(node, node.fetch(startTime, endTime)) for node in single_nodes]
(node, node.fetch(startTime, endTime, maxDataPoints))
for node in single_nodes
]
for node, results in fetches:
if not results:
logger.info("no results", node=node, start=startTime,
Expand Down
33 changes: 31 additions & 2 deletions tests/test_finders.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class FinderTest(TestCase):
def test_custom_finder(self):
store = Store([DummyFinder()])
store = Store([DummyFinder(DummyReader)])
nodes = list(store.find("foo"))
self.assertEqual(len(nodes), 1)
self.assertEqual(nodes[0].path, 'foo')
Expand All @@ -24,6 +24,32 @@ def test_custom_finder(self):
self.assertEqual(time_info, (100, 200, 10))
self.assertEqual(len(series), 10)

def test_aggregating_reader(self):
store = Store([DummyFinder(DummyAggregatingReader)])
nodes = list(store.find('bar.*'))
node = nodes[0]

time_info, series = node.fetch(100, 200, 12)
self.assertEqual(time_info, (100, 200, 10))
self.assertEqual(len(series), 12)


class DummyAggregatingReader(object):
__aggregating__ = True

__slots__ = ('path',)

def __init__(self, path):
self.path = path

def fetch(self, start_time, end_time, max_data_points):
return (start_time, end_time, 10), [
random.choice([None, 1, 2, 3]) for i in range(max_data_points)
]

def get_intervals(self):
return IntervalSet([Interval(time.time() - 3600, time.time())])


class DummyReader(object):
__slots__ = ('path',)
Expand All @@ -42,11 +68,14 @@ def get_intervals(self):


class DummyFinder(object):
def __init__(self, ReaderClass):
self.ReaderClass = ReaderClass

def find_nodes(self, query):
if query.pattern == 'foo':
yield BranchNode('foo')

elif query.pattern == 'bar.*':
for i in range(10):
path = 'bar.{0}'.format(i)
yield LeafNode(path, DummyReader(path))
yield LeafNode(path, self.ReaderClass(path))