Skip to content

Commit

Permalink
Fix issue with catching httpclient errors in tornado_utils
Browse files Browse the repository at this point in the history
Our wrapper around httpclient futures wasn't correctly handling the
non-error response error case (like timeouts). The fix is to copy some
more of tornado's code into our own to act as a proxy.
  • Loading branch information
rhettg committed Apr 16, 2014
1 parent 7a24a30 commit 7a15257
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 11 deletions.
3 changes: 2 additions & 1 deletion CHANGES
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
blueox (0.8.2)

* Disable buffering for oxview stdin
* Fix issue in tornado_utils with catching timeouts in httpclient

-- Rhett Garber <[email protected]> Mon, 31 Mar 2014 16:46:00 -0700
-- Rhett Garber <[email protected]> Wed, 16 Apr 2014 11:08:00 -0700

blueox (0.8.1)

Expand Down
36 changes: 26 additions & 10 deletions blueox/tornado_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import traceback
import types
import sys
import time

log = logging.getLogger(__name__)

import tornado.web
import tornado.gen
import tornado.httpclient
import tornado.simple_httpclient
import tornado.stack_context

Expand Down Expand Up @@ -138,15 +140,16 @@ def __init__(self, *args, **kwargs):
return super(AsyncHTTPClient, self).__init__(*args, **kwargs)

def fetch(self, request, callback=None, **kwargs):
start_time = time.time()

if isinstance(request, basestring):
request = tornado.httpclient.HTTPRequest(url=request, **kwargs)

ctx = blueox.Context(self.blueox_name)
ctx.start()
if isinstance(request, basestring):
ctx.set('request.uri', request)
ctx.set('request.method', kwargs.get('method', 'GET'))
else:
ctx.set('request.uri', request.url)
ctx.set('request.method', request.method)
ctx.set('request.size', len(request.body) if request.body else 0)
ctx.set('request.uri', request.url)
ctx.set('request.method', request.method)
ctx.set('request.size', len(request.body) if request.body else 0)

ctx.stop()

Expand All @@ -165,15 +168,28 @@ def complete_context(response):

if callback is None:
def fetch_complete(future):
complete_context(future.result())
# This error handling is just copied from tornado.httpclient as
# we need to record a real HTTPError. httpclient might do the same thing
# again if needs to deal with the caller's callbacks.
exc = future.exception()
if isinstance(exc, tornado.httpclient.HTTPError) and exc.response is not None:
response = exc.response
elif exc is not None:
response = tornado.httpclient.HTTPResponse(
request, 599, error=exc,
request_time=time.time() - start_time)
else:
response = future.result()

complete_context(response)

future = super(AsyncHTTPClient, self).fetch(request, **kwargs)
future = super(AsyncHTTPClient, self).fetch(request)
future.add_done_callback(fetch_complete)
else:
def callback_wrapper(response):
complete_context(response)
callback(response)

future = super(AsyncHTTPClient, self).fetch(request, callback=callback_wrapper, **kwargs)
future = super(AsyncHTTPClient, self).fetch(request, callback=callback_wrapper)

return future
73 changes: 73 additions & 0 deletions tests/tornado_utils_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import tornado.gen
import tornado.web
import blueox.tornado_utils
from blueox.utils import get_deep

# vendor module. Tornado testing in Testify
import tornado_test
Expand Down Expand Up @@ -47,12 +48,38 @@ def write_error(self, status_code, **kwargs):
return super(AsyncErrorHandler, self).write_error(status_code, **kwargs)


class AsyncTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler):
@blueox.tornado_utils.coroutine
def get(self):
loop = self.request.connection.stream.io_loop

called = yield tornado.gen.Task(loop.add_timeout, time.time() + 1.0)


class AsyncRecurseTimeoutHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler):
@blueox.tornado_utils.coroutine
def post(self):
loop = self.request.connection.stream.io_loop
http_client = blueox.tornado_utils.AsyncHTTPClient(loop)

blueox.set("start", True)
try:
f = yield http_client.fetch(self.request.body, request_timeout=0.5)
except tornado.httpclient.HTTPError, e:
self.write("got it")
else:
self.write("nope")

blueox.set("end", True)


class MainHandler(blueox.tornado_utils.BlueOxRequestHandlerMixin, tornado.web.RequestHandler):
def get(self):
blueox.set('async', False)
self.write("Hello, world")



class SimpleTestCase(tornado_test.AsyncHTTPTestCase):
@setup
def setup_bluox(self):
Expand All @@ -74,6 +101,8 @@ def get_app(self):
(r"/", MainHandler),
(r"/async", AsyncHandler),
(r"/error", AsyncErrorHandler),
(r"/timeout", AsyncTimeoutHandler),
(r"/recurse_timeout", AsyncRecurseTimeoutHandler),
])

application.test_url = self.get_url("/")
Expand All @@ -98,6 +127,50 @@ def test_error(self):

assert found_exception

def test_timeout_error(self):
f = self.http_client.fetch(self.get_url("/timeout"), self.stop, request_timeout=0.5)
resp = self.wait()

#for ctx_id in self.log_ctx:
#print ctx_id
#for ctx in self.log_ctx[ctx_id]:
#pprint.pprint(ctx.to_dict())

assert_equal(len(self.log_ctx), 1)
ctx = self.log_ctx[self.log_ctx.keys()[0]][0]
assert_equal(get_deep(ctx.to_dict(), 'body.response.code'), 599)

def test_recurse_timeout_error(self):
url = self.get_url("/timeout")
f = self.http_client.fetch(self.get_url("/recurse_timeout"), self.stop,
body=url,
method="POST",
request_timeout=1.5)
resp = self.wait()

#for ctx_id in self.log_ctx:
#print ctx_id
#for ctx in self.log_ctx[ctx_id]:
#pprint.pprint(ctx.to_dict())

assert_equal(resp.code, 200)
assert_equal(resp.body, "got it")

found_timeout = False
found_request = False
for ctx_list in self.log_ctx.values():
for ctx in ctx_list:
c = ctx.to_dict()
if c['type'] == 'request.httpclient' and c['body']['response']['code'] == 599:
found_timeout = True

if c['type'] == 'request' and get_deep(c, 'body.start'):
assert get_deep(c, 'body.end')
found_request = True

assert found_timeout
assert found_request

def test_context(self):
self.http_client.fetch(self.get_url("/async"), self.stop)
resp = self.wait()
Expand Down

0 comments on commit 7a15257

Please sign in to comment.