From 4995f5f09c10fee4909ca712b7f7d7359b41544d Mon Sep 17 00:00:00 2001 From: Jay Marcyes Date: Sun, 3 Feb 2019 15:58:24 -0700 Subject: [PATCH] first stab at issue #1. I've got Miss in but I want to use it for something before I consider it officially done and I update the readme and clean up the code, so Miss support is basically alpha right now --- README.md | 16 +-- mister.py | 326 +++++++++++++++++++++++++++++++++++++++---------- mister_test.py | 41 ++++++- 3 files changed, 308 insertions(+), 75 deletions(-) diff --git a/README.md b/README.md index cbea6b0..036c068 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,7 @@ Mister attempts to make running a map/reduce job approachable. When you've got data that isn't really big and so you're not quite ready to distribute the data across a gazillian machines and stuff but would still like an answer in a reasonable amount of time. + ## 5 minute getting started Mister needs you to define three methods: `prepare` (get the data ready to be run across multiple processes), `map` (actually do something with the chunks of data from `prepare`), and `reduce` (mash all the values returned from `map` together). @@ -37,15 +38,16 @@ reduce(self, output, value) The `output` is the global aggregation of all the `value` arguments the `reduce` method has seen. Basically, whatever you return from one `reduce` call will be passed back into the next `reduce` call as `output`. The `value` argument is whatever the recently finished `map` call returned. + ### Bringing it all together So let's bring it all together in our `MrHelloWorld` job, first let's get the skeleton in place: ```python -from mister import BaseMister +from mister import Mister -class MrHelloWorld(BaseMister): +class MrHelloWorld(Mister): def prepare(self, count, *args, **kwargs): pass def map(self, *args, **kwargs): pass def reduce(self, output, value): pass @@ -113,13 +115,13 @@ I think word counting is the traditional map/reduce example? So here it is: ```python import os import re -improt math +import math from collections import Counter -from mister import BaseMister +from mister import Mister -class MrWordCount(BaseMister): +class MrWordCount(Mister): def prepare(self, count, path): """prepare segments the data for the map() method""" size = os.path.getsize(path) @@ -151,7 +153,7 @@ class MrWordCount(BaseMister): output = Counter() output.update(count) return output - + # let's count the bible path = "./testdata/bible-kjv.txt" mr = MrWordCount(path) @@ -185,5 +187,5 @@ To install, use Pip: Or, to grab the latest and greatest: - $ pip install --upgrade git+https://github.com/Jaymon/mister#egg=mister + $ pip install --upgrade "git+https://github.com/Jaymon/mister#egg=mister" diff --git a/mister.py b/mister.py index 1b0d562..719c0b0 100644 --- a/mister.py +++ b/mister.py @@ -9,10 +9,78 @@ from multiprocessing import queues -__version__ = "0.0.2" +__version__ = "0.0.3" logger = logging.getLogger(__name__) +if not logger.handlers: + logger.addHandler(logging.NullHandler()) + + +class Process(multiprocessing.Process): + def log_start(self): + name = self.name + start = time.time() + logger.debug("{} Starting".format(name)) + return start + + def log_stop(self, start): + name = self.name + stop = time.time() + elapsed = round(abs(stop - start) * 1000.0, 1) + total = "{:.1f} ms".format(elapsed) + logger.debug("{} finished in {}".format(name, total)) + + +class Queue(object): + + timeout = 5.0 + + empty_count = 1 + + queue_class = multiprocessing.Queue # this is actually a function + + def __init__(self): + self.queue = self.queue_class() + + def enqueue(self, value): + enqueued = False + enqueue_count = 1 + while not enqueued: + try: + # queue size taps out at 32767, booooo + # http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767 + #queue.put_nowait(val) + self.queue.put(value, True, self.timeout) + enqueued = True + if enqueue_count > 1: + logger.debug("Enqueued after {} tries".format(enqueue_count)) + + except queues.Full as e: + logger.debug("Queue full {}".format(enqueue_count)) + enqueue_count += 1 + #logger.exception(e) + #queue.close() + # If we ever hit a full queue you lose a ton of data but if you + # don't call this method then the process just hangs + #reduce_queue.cancel_join_thread() + + def dequeue(self): + count = 0 + while True: + try: + return self.queue.get(True, self.timeout) + + except queues.Empty as e: + count += 1 + if count >= self.empty_count: + raise BufferError() + + def task_done(self): + return self.queue.task_done() + + def empty(self): + return self.queue.empty() class Count(int): @@ -67,26 +135,107 @@ def bounds(self, n, *args, **kwargs): yield args, kw -class BaseMister(object): +class MisterMap(multiprocessing.Process): + """This is a package internal class that handles the actual threading of the + map method + + https://docs.python.org/3/library/multiprocessing.html + """ + def __init__(self, target, name, queue, args, kwargs): + """ + :param target: the map callback + :param name: the name assigned to this process + :param queue: multiprocessing.JoinableQueue, the queue used for interprocess + communication + :param args: the *args that will be passed to target + :param kwargs: the **kwargs that will be passed to target + """ + + def wrapper_target(target, queue, args, kwargs): + + is_logged = logger.isEnabledFor(logging.DEBUG) + + if is_logged: + logger.debug("{} Starting".format(name)) + start = time.time() + + val = target(*args, **kwargs) + if val is not None: + try: + # queue size taps out at 32767, booooo + # http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767 + #queue.put_nowait(val) + queue.put(val, True, 1.0) + + except queues.Full as e: + logger.exception(e) + #queue.close() + # If we ever hit a full queue you lose a ton of data but if you + # don't call this method then the process just hangs + queue.cancel_join_thread() + + if is_logged: + stop = time.time() + elapsed = round(abs(stop - start) * 1000.0, 1) + total = "{:.1f} ms".format(elapsed) + logger.debug("{} finished in {}".format(name, total)) + + super(MisterMap, self).__init__(target=wrapper_target, name=name, kwargs={ + "target": target, + "queue": queue, + "args": args, + "kwargs": kwargs + }) + + def log_start(self, name): + is_logged = logger.isEnabledFor(logging.DEBUG) + start = time.time() + + if is_logged: + logger.debug("{} Starting".format(name)) + + return start, is_logged + + +class Mister(object): """If you want to subclass this is the class to use, anything you pass into __init__ will be passed to your child's prepare() method https://en.wikipedia.org/wiki/MapReduce """ + map_class = MisterMap + def __init__(self, *args, **kwargs): """create an instance :param *args: passed to prepare() :param **kwargs: passed to prepare() + :kwargs target_prepare: callback, see the .prepare method + :kwargs target_map: callback, see the .map method + :kwargs target_reduce: callback, see the .reduce method + :kwargs count: int, how many processes you want """ - if not getattr(self, "count", 0): + target_prepare = kwargs.pop("prepare", kwargs.pop("target_prepare", None)) + if target_prepare: + self.prepare = target_prepare + + target_map = kwargs.pop("map", kwargs.pop("target_map", None)) + if target_map: + self.map = target_map + + target_reduce = kwargs.pop("reduce", kwargs.pop("target_reduce", None)) + if target_reduce: + self.reduce = target_reduce + + count = kwargs.pop("count", 0) + if not count: count = multiprocessing.cpu_count() # we subtract one for the main process count = count - 1 if count > 1 else 1 - self.count = count + self.count = count - self.args = () if not args else args - self.kwargs = {} if not kwargs else kwargs + self.args = args + self.kwargs = kwargs def prepare(self, count, *args, **kwargs): """Handle chunking the data for the map() method @@ -135,11 +284,11 @@ def run(self): ident = 1 count = Count(self.count) for args, kwargs in self.prepare(count, *self.args, **self.kwargs): - name = "map-{}".format(ident) + name = "mister-map-{}".format(ident) logger.debug("{} = {}/{}".format(name, ident, count)) - t = Map( + t = self.map_class( target=self.map, name=name, queue=queue, @@ -170,79 +319,128 @@ def run(self): return output -class Mister(BaseMister): - """Similar to BaseMister but allows you to pass in prepare, map, and reduce as - callbacks and also set the process count via __init__ - """ - def __init__(self, target_prepare, target_map, target_reduce, count=0, args=None, kwargs=None): - """create an instance - - :param target_prepare: callback, see the .prepare method - :param target_map: callback, see the .map method - :param target_reduce: callback, see the .reduce method - :param count: int, how many processes you want - :param args: tuple, passed into target_prepare as *args - :param kwargs: dict, passed into target_prepare as **kwargs - """ - self.count = count - super(Mister, self).__init__(*args, **kwargs) - if target_prepare: - self.prepare = target_prepare - if target_map: - self.map = target_map - if target_reduce: - self.reduce = target_reduce - - -class Map(multiprocessing.Process): +class MissMap(Process): """This is a package internal class that handles the actual threading of the map method https://docs.python.org/3/library/multiprocessing.html """ - def __init__(self, target, name, queue, args, kwargs): + queue_timeout_count = 1 + + def __init__(self, target, name, map_queue, reduce_queue): """ :param target: the map callback :param name: the name assigned to this process - :param queue: multiprocessing.JoinableQueue, the queue used for interprocess - communication - :param args: the *args that will be passed to target - :param kwargs: the **kwargs that will be passed to target + :param map_queue: multiprocessing.JoinableQueue, the values yielded from the + prepare() callback will end up here and will be dequeued and sent to + the map() callback + :param reduce_queue: multiprocessing.JoinableQueue, whatever the map() callback + returns will be placed in this queue and passed to the reduce() callback """ + super(MissMap, self).__init__(target=self.target, name=name, kwargs={ + "target": target, + "map_queue": map_queue, + "reduce_queue": reduce_queue, + }) - def wrapper_target(target, queue, args, kwargs): + def target(self, target, map_queue, reduce_queue): + start = self.log_start() - is_logged = logger.isEnabledFor(logging.DEBUG) + while True: + try: + map_val = map_queue.dequeue() + reduce_val = target(map_val) - if is_logged: - logger.debug("{} Starting".format(name)) - start = time.time() + if reduce_val is not None: + reduce_queue.enqueue(reduce_val) - val = target(*args, **kwargs) - if val is not None: - try: - # queue size taps out at 32767, booooo - # http://stackoverflow.com/questions/5900985/multiprocessing-queue-maxsize-limit-is-32767 - #queue.put_nowait(val) - queue.put(val, True, 1.0) + except BufferError: + break - except queues.Full as e: - logger.exception(e) - #queue.close() - # If we ever hit a full queue you lose a ton of data but if you - # don't call this method then the process just hangs - queue.cancel_join_thread() + self.log_stop(start) - if is_logged: - stop = time.time() - elapsed = round(abs(stop - start) * 1000.0, 1) - total = "{:.1f} ms".format(elapsed) - logger.debug("{} finished in {}".format(name, total)) - super(Map, self).__init__(target=wrapper_target, name=name, kwargs={ +class MissPrepare(Process): + def __init__(self, target, name, count, map_queue, args, kwargs): + super(MissPrepare, self).__init__(target=self.target, name=name, kwargs={ "target": target, - "queue": queue, + "count": count, + "map_queue": map_queue, "args": args, - "kwargs": kwargs + "kwargs": kwargs, }) + def target(self, target, count, map_queue, args, kwargs): + start = self.log_start() + + # now we populate the queue all our map processes are going to read from + for map_val in target(count, *args, **kwargs): + map_queue.enqueue(map_val) + + self.log_stop(start) + + +class Miss(Mister): + + map_class = MissMap + + prepare_class = MissPrepare + + queue_class = Queue + + def run(self): + """run the map/reduce job, this is where all the magic happens + + :returns: mixed, the final output returned from the final call to reduce() + """ + ret = None + map_queue = self.queue_class() + reduce_queue = self.queue_class() + + processes = [] + map_count = Count(self.count - 1) + prepare_count = Count(1) + + # first we start all our mapping processes + for ident in range(1, map_count + 1): + name = "miss-map-{}".format(ident) + logger.debug("{} = {}/{}".format(name, ident, map_count)) + + t = self.map_class( + target=self.map, + name=name, + map_queue=map_queue, + reduce_queue=reduce_queue, + ) + t.start() + processes.append(t) + + for ident in range(1, prepare_count + 1): + name = "miss-prepare-{}".format(ident) + t = self.prepare_class( + target=self.prepare, + name=name, + count=map_count, + map_queue=map_queue, + args=self.args, + kwargs=self.kwargs, + ) + t.start() + + # now we can reduce everything if we need to + output = None + while processes or not reduce_queue.empty(): + try: + val = reduce_queue.dequeue() + ret = self.reduce(output, val) + if ret is not None: + output = ret + + except BufferError: + pass + + processes = [t for t in processes if t.is_alive()] + + return output + + diff --git a/mister_test.py b/mister_test.py index 3a8fa5a..8cde1ed 100644 --- a/mister_test.py +++ b/mister_test.py @@ -8,10 +8,13 @@ import testdata from testdata import TestCase -from mister import Mister, BaseMister +from mister import Mister, Miss -class MrHelloWorld(BaseMister): +testdata.basic_logging() + + +class MrHelloWorld(Mister): def prepare(self, count, name): # we're just going to return the number and the name we pass in for x in range(count): @@ -27,7 +30,7 @@ def reduce(self, output, value): return output -class MrWordCount(BaseMister): +class MrWordCount(Mister): def prepare(self, count, path): size = os.path.getsize(path) length = int(math.ceil(size / count)) @@ -57,7 +60,7 @@ def reduce(self, output, count): return output -class MrCount(BaseMister): +class MrCount(Mister): def prepare(self, count, numbers): chunk = int(math.ceil(len(numbers) / count)) # this splits the numbers into chunk size chunks @@ -109,3 +112,33 @@ def test_helloworld(self): output = mr.run() print(output) + +class MsCount(Miss): + def prepare(self, count, size): + for x in range(size): + yield x + + def map(self, n): + return 1 + + def reduce(self, output, incr): + #pout.v(output, incr) + ret = output + incr if output else incr + return ret + + def run(self): + self.queue_class.timeout = 0.1 + return super(MsCount, self).run() + + +class MsTest(TestCase): + def test_count(self): + size = 1000 + #size = 10 + #size = 100000 + ms = MsCount(size) + total = ms.run() + self.assertEqual(size, total) + + +