From 7a152570be1c105542af6d1643408e2d4aead8e3 Mon Sep 17 00:00:00 2001 From: Rhett Garber Date: Wed, 16 Apr 2014 15:01:27 -0700 Subject: [PATCH] Fix issue with catching httpclient errors in tornado_utils Our wrapper around httpclient futures wasn't correctly handling the non-error response error case (like timeouts). The fix is to copy some more of tornado's code into our own to act as a proxy. --- CHANGES | 3 +- blueox/tornado_utils.py | 36 +++++++++++++----- tests/tornado_utils_test.py | 73 +++++++++++++++++++++++++++++++++++++ 3 files changed, 101 insertions(+), 11 deletions(-) diff --git a/CHANGES b/CHANGES index e93746f..edd8e26 100644 --- a/CHANGES +++ b/CHANGES @@ -1,8 +1,9 @@ blueox (0.8.2) * Disable buffering for oxview stdin + * Fix issue in tornado_utils with catching timeouts in httpclient --- Rhett Garber Mon, 31 Mar 2014 16:46:00 -0700 +-- Rhett Garber Wed, 16 Apr 2014 11:08:00 -0700 blueox (0.8.1) diff --git a/blueox/tornado_utils.py b/blueox/tornado_utils.py index 329e5da..99e0f3b 100644 --- a/blueox/tornado_utils.py +++ b/blueox/tornado_utils.py @@ -18,11 +18,13 @@ import traceback import types import sys +import time log = logging.getLogger(__name__) import tornado.web import tornado.gen +import tornado.httpclient import tornado.simple_httpclient import tornado.stack_context @@ -138,15 +140,16 @@ def __init__(self, *args, **kwargs): return super(AsyncHTTPClient, self).__init__(*args, **kwargs) def fetch(self, request, callback=None, **kwargs): + start_time = time.time() + + if isinstance(request, basestring): + request = tornado.httpclient.HTTPRequest(url=request, **kwargs) + ctx = blueox.Context(self.blueox_name) ctx.start() - if isinstance(request, basestring): - ctx.set('request.uri', request) - ctx.set('request.method', kwargs.get('method', 'GET')) - else: - ctx.set('request.uri', request.url) - ctx.set('request.method', request.method) - ctx.set('request.size', len(request.body) if request.body else 0) + ctx.set('request.uri', request.url) + ctx.set('request.method', request.method) + ctx.set('request.size', len(request.body) if request.body else 0) ctx.stop() @@ -165,15 +168,28 @@ def complete_context(response): if callback is None: def fetch_complete(future): - complete_context(future.result()) + # This error handling is just copied from tornado.httpclient as + # we need to record a real HTTPError. httpclient might do the same thing + # again if needs to deal with the caller's callbacks. + exc = future.exception() + if isinstance(exc, tornado.httpclient.HTTPError) and exc.response is not None: + response = exc.response + elif exc is not None: + response = tornado.httpclient.HTTPResponse( + request, 599, error=exc, + request_time=time.time() - start_time) + else: + response = future.result() + + complete_context(response) - future = super(AsyncHTTPClient, self).fetch(request, **kwargs) + future = super(AsyncHTTPClient, self).fetch(request) future.add_done_callback(fetch_complete) else: def callback_wrapper(response): complete_context(response) callback(response) - future = super(AsyncHTTPClient, self).fetch(request, callback=callback_wrapper, **kwargs) + future = super(AsyncHTTPClient, self).fetch(request, callback=callback_wrapper) return future diff --git a/tests/tornado_utils_test.py b/tests/tornado_utils_test.py index 8d006da..e6aedd4 100644 --- a/tests/tornado_utils_test.py +++ b/tests/tornado_utils_test.py @@ -9,6 +9,7 @@ import tornado.gen import tornado.web import blueox.tornado_utils +from blueox.utils import get_deep # vendor module. Tornado testing in Testify import tornado_test @@ -47,12 +48,38 @@ def write_error(self, status_code, **kwargs): return super(AsyncErrorHandler, self).write_error(status_code, **kwargs) +class AsyncTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): + @blueox.tornado_utils.coroutine + def get(self): + loop = self.request.connection.stream.io_loop + + called = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0) + + +class AsyncRecurseTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): + @blueox.tornado_utils.coroutine + def post(self): + loop = self.request.connection.stream.io_loop + http_client = blueox.tornado_utils.AsyncHTTPClient(loop) + + blueox.set("start", True) + try: + f = yield http_client.fetch(self.request.body, request_timeout=0.5) + except tornado.httpclient.HTTPError, e: + self.write("got it") + else: + self.write("nope") + + blueox.set("end", True) + + class MainHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler): def get(self): blueox.set('async', False) self.write("Hello, world") + class SimpleTestCase(tornado_test.AsyncHTTPTestCase): @setup def setup_bluox(self): @@ -74,6 +101,8 @@ def get_app(self): (r"/", MainHandler), (r"/async", AsyncHandler), (r"/error", AsyncErrorHandler), + (r"/timeout", AsyncTimeoutHandler), + (r"/recurse_timeout", AsyncRecurseTimeoutHandler), ]) application.test_url = self.get_url("/") @@ -98,6 +127,50 @@ def test_error(self): assert found_exception + def test_timeout_error(self): + f = self.http_client.fetch(self.get_url("/timeout"), self.stop, request_timeout=0.5) + resp = self.wait() + + #for ctx_id in self.log_ctx: + #print ctx_id + #for ctx in self.log_ctx[ctx_id]: + #pprint.pprint(ctx.to_dict()) + + assert_equal(len(self.log_ctx), 1) + ctx = self.log_ctx[self.log_ctx.keys()[0]][0] + assert_equal(get_deep(ctx.to_dict(), 'body.response.code'), 599) + + def test_recurse_timeout_error(self): + url = self.get_url("/timeout") + f = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop, + body=url, + method="POST", + request_timeout=1.5) + resp = self.wait() + + #for ctx_id in self.log_ctx: + #print ctx_id + #for ctx in self.log_ctx[ctx_id]: + #pprint.pprint(ctx.to_dict()) + + assert_equal(resp.code, 200) + assert_equal(resp.body, "got it") + + found_timeout = False + found_request = False + for ctx_list in self.log_ctx.values(): + for ctx in ctx_list: + c = ctx.to_dict() + if c['type'] == 'request.httpclient' and c['body']['response']['code'] == 599: + found_timeout = True + + if c['type'] == 'request' and get_deep(c, 'body.start'): + assert get_deep(c, 'body.end') + found_request = True + + assert found_timeout + assert found_request + def test_context(self): self.http_client.fetch(self.get_url("/async"), self.stop) resp = self.wait()