From 3d60f1ba580a65442ca317bc3af4616d0e3cd692 Mon Sep 17 00:00:00 2001 From: Rory Sametz Date: Sun, 31 Mar 2019 11:40:00 -0700 Subject: [PATCH 1/4] Parallelize get_movie Calls --- .../movie_preference_controller.py | 12 ++-- app/util/concurrency.py | 64 +++++++++++++++++++ 2 files changed, 72 insertions(+), 4 deletions(-) create mode 100644 app/util/concurrency.py diff --git a/app/controllers/movie_preference_controller.py b/app/controllers/movie_preference_controller.py index 0b32d87..84851e7 100644 --- a/app/controllers/movie_preference_controller.py +++ b/app/controllers/movie_preference_controller.py @@ -3,7 +3,7 @@ from app import db from app.models import MoviePreference from app.util.tmdb_helpers import get_movie - +from app.util.concurrency import call_parallel class MoviePreferenceController(): def create(self, user): @@ -36,9 +36,13 @@ def update(self, user, movie_preference_id): def get(self, user): user_movie_preferences = user.movies + # Wrap get_movie calls inside lambdas, then call these lambdas in parallel. + get_movie_lambdas = map(lambda mp: (lambda: get_movie(mp.external_movie_id)), user_movie_preferences) + results = call_parallel(get_movie_lambdas) + movies = [] - for mp in user_movie_preferences: - external_movie = get_movie(mp.external_movie_id) + for zipped_result in zip(user_movie_preferences, results): + (mp, external_movie) = zipped_result movie_preference_dict = mp.to_dict() movie_preference_dict.update(external_movie) movies.append(movie_preference_dict) @@ -53,4 +57,4 @@ def delete(self, user, movie_preference_id): db.session.delete(movie_preference) db.session.commit() - return jsonify({"success": True}) \ No newline at end of file + return jsonify({"success": True}) diff --git a/app/util/concurrency.py b/app/util/concurrency.py new file mode 100644 index 0000000..64b1448 --- /dev/null +++ b/app/util/concurrency.py @@ -0,0 +1,64 @@ +from threading import Thread +from datetime import datetime + +DEFAULT_TIMEOUT = 10 + +""" +Class which extends Thread to expose a result from the target (input) function. +""" +class ResultThread(Thread): + def run(self): + self.result = self._target() + def get_result(self): + return self.result + +def get_now(): + return datetime.now().timestamp() + +def call_parallel(funcs, timeout=None): + """ + Takes in a list of functions, which are then run asynchronously, and returns a list of their results. + Doctest below. + + >>> from time import sleep + + >>> def sleep_func_gen(timeout): + ... def sleep_func(): + ... sleep(timeout) + ... return timeout + ... return sleep_func + + >>> calls = [sleep_func_gen(5), sleep_func_gen(1), sleep_func_gen(3), sleep_func_gen(5),] + + >>> call_parallel([]) + [] + + >>> call_parallel(calls) + [5, 1, 3, 5] + + >>> call_parallel(calls, timeout=2) + [None, 1, None, None] + """ + if timeout == None: + timeout = DEFAULT_TIMEOUT + threads = [] + results = [] + + start_time = get_now() + for func in funcs: + new_thread = ResultThread(None, target=func, daemon=True) + threads.append(new_thread) + new_thread.start() + for thread in threads: + time_elapsed = get_now() - start_time + thread.join(timeout - time_elapsed) + # If the thread is still alive, it's running past the timeout. + if thread.is_alive(): + results.append(None) + else: + results.append(thread.get_result()) + return results + +if __name__ == "__main__": + import doctest + doctest.testmod() From 65b50cef59516a2e1ce5dae37d5d517eb27a7569 Mon Sep 17 00:00:00 2001 From: Rory Sametz Date: Sun, 31 Mar 2019 12:05:02 -0700 Subject: [PATCH 2/4] Error Handling For Movie Preference Retrieval --- app/controllers/movie_preference_controller.py | 2 ++ src/pages/HomePage.tsx | 1 + 2 files changed, 3 insertions(+) diff --git a/app/controllers/movie_preference_controller.py b/app/controllers/movie_preference_controller.py index 84851e7..8190dc0 100644 --- a/app/controllers/movie_preference_controller.py +++ b/app/controllers/movie_preference_controller.py @@ -43,6 +43,8 @@ def get(self, user): movies = [] for zipped_result in zip(user_movie_preferences, results): (mp, external_movie) = zipped_result + if external_movie is None: + continue movie_preference_dict = mp.to_dict() movie_preference_dict.update(external_movie) movies.append(movie_preference_dict) diff --git a/src/pages/HomePage.tsx b/src/pages/HomePage.tsx index 55b5289..151d02e 100644 --- a/src/pages/HomePage.tsx +++ b/src/pages/HomePage.tsx @@ -8,6 +8,7 @@ import { fetchMovies } from "../network/requests"; const HomePage = (): JSX.Element => { const [user, setUser] = useState(null); + // TODO: Provide coherent UX when TMDB is down (when movies == []). const [movies, setMovies] = useState>([]); const fetchUserMovies = () => (user ? fetchMovies(user, setMovies) : null); From 5ebb1c57d0a0dd18cbbf5edb56b973c39795d06f Mon Sep 17 00:00:00 2001 From: Rory Sametz Date: Sun, 31 Mar 2019 18:10:38 -0700 Subject: [PATCH 3/4] Addressed PR Comments --- .../movie_preference_controller.py | 11 +-- app/util/concurrency.py | 70 ++++++++++++------- 2 files changed, 48 insertions(+), 33 deletions(-) diff --git a/app/controllers/movie_preference_controller.py b/app/controllers/movie_preference_controller.py index 8190dc0..37422bd 100644 --- a/app/controllers/movie_preference_controller.py +++ b/app/controllers/movie_preference_controller.py @@ -3,7 +3,7 @@ from app import db from app.models import MoviePreference from app.util.tmdb_helpers import get_movie -from app.util.concurrency import call_parallel +from app.util.concurrency import call_one_func_parallel class MoviePreferenceController(): def create(self, user): @@ -36,15 +36,10 @@ def update(self, user, movie_preference_id): def get(self, user): user_movie_preferences = user.movies - # Wrap get_movie calls inside lambdas, then call these lambdas in parallel. - get_movie_lambdas = map(lambda mp: (lambda: get_movie(mp.external_movie_id)), user_movie_preferences) - results = call_parallel(get_movie_lambdas) + results = call_one_func_parallel(user_movie_preferences, lambda mp: get_movie(mp.external_movie_id)) movies = [] - for zipped_result in zip(user_movie_preferences, results): - (mp, external_movie) = zipped_result - if external_movie is None: - continue + for mp, external_movie in results: movie_preference_dict = mp.to_dict() movie_preference_dict.update(external_movie) movies.append(movie_preference_dict) diff --git a/app/util/concurrency.py b/app/util/concurrency.py index 64b1448..eed0952 100644 --- a/app/util/concurrency.py +++ b/app/util/concurrency.py @@ -7,57 +7,77 @@ Class which extends Thread to expose a result from the target (input) function. """ class ResultThread(Thread): + def __init__(self, **kwargs): + super(ResultThread, self).__init__() + self.target = kwargs.get('target') def run(self): - self.result = self._target() - def get_result(self): - return self.result + self.result = self.target() def get_now(): return datetime.now().timestamp() -def call_parallel(funcs, timeout=None): +def call_one_func_parallel(inputs, func, timeout=DEFAULT_TIMEOUT): """ - Takes in a list of functions, which are then run asynchronously, and returns a list of their results. + Takes in a list of inputs and one function, which is run asynchronously across all inputs. + Tuples of (input, result) are returned. Doctest below. >>> from time import sleep - >>> def sleep_func_gen(timeout): - ... def sleep_func(): - ... sleep(timeout) - ... return timeout - ... return sleep_func + >>> def sleep_func(timeout): + ... sleep(timeout) + ... return timeout - >>> calls = [sleep_func_gen(5), sleep_func_gen(1), sleep_func_gen(3), sleep_func_gen(5),] + >>> call_one_func_parallel([7, 1, 1, 2], sleep_func) + [(7, 7), (1, 1), (1, 1), (2, 2)] - >>> call_parallel([]) + >>> call_one_func_parallel([7, 1, 1, 2], sleep_func, timeout=3) + Traceback (most recent call last): + ... + RuntimeError: ('Service call took longer than max timeout of :', 3, 'seconds') + """ + funcs = [func] * len(inputs) + return call_parallel(inputs, funcs, timeout) + +def call_parallel(inputs, funcs, timeout=DEFAULT_TIMEOUT): + """ + Takes in lists of inputs and functions, which are then run asynchronously, and returns a list of their results. + Doctest below. + + >>> from time import sleep + + >>> def sleep_func(timeout): + ... sleep(timeout) + ... return timeout + + >>> call_parallel([], []) [] - >>> call_parallel(calls) - [5, 1, 3, 5] + >>> call_parallel([5, 1, 3, 5], [sleep_func] * 4) + [(5, 5), (1, 1), (3, 3), (5, 5)] - >>> call_parallel(calls, timeout=2) - [None, 1, None, None] + >>> call_parallel([5, 1, 3, 5], [sleep_func] * 4, timeout=2) + Traceback (most recent call last): + ... + RuntimeError: ('Service call took longer than max timeout of :', 2, 'seconds') """ - if timeout == None: - timeout = DEFAULT_TIMEOUT + if len(inputs) != len(funcs): + raise RuntimeError("# of inputs (", len(inputs), ") do not match # of functions (", len(funcs), ")") threads = [] results = [] - start_time = get_now() - for func in funcs: - new_thread = ResultThread(None, target=func, daemon=True) + for input, func in zip(inputs, funcs): + new_thread = ResultThread(target=lambda: func(input), daemon=True) threads.append(new_thread) new_thread.start() for thread in threads: time_elapsed = get_now() - start_time thread.join(timeout - time_elapsed) - # If the thread is still alive, it's running past the timeout. if thread.is_alive(): - results.append(None) + raise RuntimeError("Service call took longer than max timeout of :", timeout, "seconds") else: - results.append(thread.get_result()) - return results + results.append(thread.result) + return list(zip(inputs, results)) if __name__ == "__main__": import doctest From 6b3b25d0816dcf570d479d655e8c6c133a1dae5b Mon Sep 17 00:00:00 2001 From: Rory Sametz Date: Sat, 6 Apr 2019 11:22:19 -0700 Subject: [PATCH 4/4] Removed TODO --- src/pages/HomePage.tsx | 1 - 1 file changed, 1 deletion(-) diff --git a/src/pages/HomePage.tsx b/src/pages/HomePage.tsx index 151d02e..55b5289 100644 --- a/src/pages/HomePage.tsx +++ b/src/pages/HomePage.tsx @@ -8,7 +8,6 @@ import { fetchMovies } from "../network/requests"; const HomePage = (): JSX.Element => { const [user, setUser] = useState(null); - // TODO: Provide coherent UX when TMDB is down (when movies == []). const [movies, setMovies] = useState>([]); const fetchUserMovies = () => (user ? fetchMovies(user, setMovies) : null);