Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize get_movie Calls #35

Merged
merged 4 commits into from
Apr 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions app/controllers/movie_preference_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from app import db
from app.models import MoviePreference
from app.util.tmdb_helpers import get_movie

from app.util.concurrency import call_one_func_parallel

class MoviePreferenceController():
def create(self, user):
Expand Down Expand Up @@ -36,9 +36,10 @@ def update(self, user, movie_preference_id):
def get(self, user):
user_movie_preferences = user.movies

results = call_one_func_parallel(user_movie_preferences, lambda mp: get_movie(mp.external_movie_id))

movies = []
for mp in user_movie_preferences:
external_movie = get_movie(mp.external_movie_id)
for mp, external_movie in results:
movie_preference_dict = mp.to_dict()
movie_preference_dict.update(external_movie)
movies.append(movie_preference_dict)
Expand All @@ -53,4 +54,4 @@ def delete(self, user, movie_preference_id):
db.session.delete(movie_preference)
db.session.commit()

return jsonify({"success": True})
return jsonify({"success": True})
84 changes: 84 additions & 0 deletions app/util/concurrency.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
from threading import Thread
from datetime import datetime

DEFAULT_TIMEOUT = 10

"""
Class which extends Thread to expose a result from the target (input) function.
"""
class ResultThread(Thread):
def __init__(self, **kwargs):
super(ResultThread, self).__init__()
self.target = kwargs.get('target')
def run(self):
self.result = self.target()

def get_now():
return datetime.now().timestamp()

def call_one_func_parallel(inputs, func, timeout=DEFAULT_TIMEOUT):
"""
Takes in a list of inputs and one function, which is run asynchronously across all inputs.
Tuples of (input, result) are returned.
Doctest below.

>>> from time import sleep

>>> def sleep_func(timeout):
... sleep(timeout)
... return timeout

>>> call_one_func_parallel([7, 1, 1, 2], sleep_func)
[(7, 7), (1, 1), (1, 1), (2, 2)]

>>> call_one_func_parallel([7, 1, 1, 2], sleep_func, timeout=3)
Traceback (most recent call last):
...
RuntimeError: ('Service call took longer than max timeout of :', 3, 'seconds')
"""
funcs = [func] * len(inputs)
return call_parallel(inputs, funcs, timeout)

def call_parallel(inputs, funcs, timeout=DEFAULT_TIMEOUT):
"""
Takes in lists of inputs and functions, which are then run asynchronously, and returns a list of their results.
Doctest below.

>>> from time import sleep

>>> def sleep_func(timeout):
... sleep(timeout)
... return timeout

>>> call_parallel([], [])
[]

>>> call_parallel([5, 1, 3, 5], [sleep_func] * 4)
[(5, 5), (1, 1), (3, 3), (5, 5)]

>>> call_parallel([5, 1, 3, 5], [sleep_func] * 4, timeout=2)
Traceback (most recent call last):
...
RuntimeError: ('Service call took longer than max timeout of :', 2, 'seconds')
"""
if len(inputs) != len(funcs):
raise RuntimeError("# of inputs (", len(inputs), ") do not match # of functions (", len(funcs), ")")
threads = []
results = []
start_time = get_now()
for input, func in zip(inputs, funcs):
new_thread = ResultThread(target=lambda: func(input), daemon=True)
threads.append(new_thread)
new_thread.start()
for thread in threads:
time_elapsed = get_now() - start_time
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How important is the extra few milliseconds here? I think there's a bit less complexity if we just pass in timeout, even if technically the thread gets to run for a few extra milliseconds. Then we don't need to worry about get_now(), recording start time, etc.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thread.join's first argument is the amount of time we're actually waiting for the thread to finish since calling thread.join on that thread. As an extreme example, if there are 20 threads, and each one exceeds our maximum timeout of 10 seconds, we will wait for 200 seconds. Keeping the 'start time' keeps this timeout relative to when we actually kicked them all off.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooooh, interesting. Makes sense.

thread.join(timeout - time_elapsed)
if thread.is_alive():
raise RuntimeError("Service call took longer than max timeout of :", timeout, "seconds")
else:
results.append(thread.result)
return list(zip(inputs, results))

if __name__ == "__main__":
import doctest
doctest.testmod()