diff --git a/asyncpg/pool.py b/asyncpg/pool.py index 9c8d581e..d9230415 100644 --- a/asyncpg/pool.py +++ b/asyncpg/pool.py @@ -6,6 +6,7 @@ import asyncio +from concurrent.futures._base import TimeoutError import functools import inspect import time @@ -15,6 +16,15 @@ from . import exceptions +BAD_CONN_EXCEPTION = ( + exceptions._base.PostgresError, + exceptions._base.FatalPostgresError, + exceptions._base.UnknownPostgresError, + TimeoutError, + ConnectionRefusedError, +) + + class PoolConnectionProxyMeta(type): def __new__(mcls, name, bases, dct, *, wrap=False): @@ -96,10 +106,12 @@ class PoolConnectionHolder: '_connect_args', '_connect_kwargs', '_max_queries', '_setup', '_init', '_max_inactive_time', '_in_use', - '_inactive_callback', '_timeout') + '_inactive_callback', '_timeout', + '_max_consecutive_exceptions', '_consecutive_exceptions') def __init__(self, pool, *, connect_args, connect_kwargs, - max_queries, setup, init, max_inactive_time): + max_queries, setup, init, max_inactive_time, + max_consecutive_exceptions): self._pool = pool self._con = None @@ -108,6 +120,8 @@ def __init__(self, pool, *, connect_args, connect_kwargs, self._connect_kwargs = connect_kwargs self._max_queries = max_queries self._max_inactive_time = max_inactive_time + self._max_consecutive_exceptions = max_consecutive_exceptions + self._consecutive_exceptions = 0 self._setup = setup self._init = init self._inactive_callback = None @@ -259,6 +273,16 @@ def _deactivate_connection(self): self._con.terminate() self._con = None + async def maybe_close_bad_connection(self, exc_type): + if self._max_consecutive_exceptions > 0 and \ + isinstance(exc_type, BAD_CONN_EXCEPTION): + + self._consecutive_exceptions += 1 + + if self._consecutive_exceptions > self._max_consecutive_exceptions: + await self.close() + self._consecutive_exceptions = 0 + class Pool: """A connection pool. @@ -285,6 +309,7 @@ def __init__(self, *connect_args, init, loop, connection_class, + max_consecutive_exceptions, **connect_kwargs): if loop is None: @@ -331,6 +356,7 @@ def __init__(self, *connect_args, connect_kwargs=connect_kwargs, max_queries=max_queries, max_inactive_time=max_inactive_connection_lifetime, + max_consecutive_exceptions=max_consecutive_exceptions, setup=setup, init=init) @@ -459,7 +485,8 @@ async def _acquire_impl(): ch = await self._queue.get() # type: PoolConnectionHolder try: proxy = await ch.acquire() # type: PoolConnectionProxy - except Exception: + except Exception as e: + await ch.maybe_close_bad_connection(e) self._queue.put_nowait(ch) raise else: @@ -580,6 +607,11 @@ async def __aexit__(self, *exc): self.done = True con = self.connection self.connection = None + if not exc[0]: + con._holder._consecutive_exceptions = 0 + else: + # Pass exception type to ConnectionHolder + await con._holder.maybe_close_bad_connection(exc[0]) await self.pool.release(con) def __await__(self): @@ -592,6 +624,7 @@ def create_pool(dsn=None, *, max_size=10, max_queries=50000, max_inactive_connection_lifetime=300.0, + max_consecutive_exceptions=0, setup=None, init=None, loop=None, @@ -651,6 +684,12 @@ def create_pool(dsn=None, *, Number of seconds after which inactive connections in the pool will be closed. Pass ``0`` to disable this mechanism. + :param int max_consecutive_exceptions: + the maximum number of consecutive exceptions that may be raised by a + single connection before that connection is assumed corrupt (ex. + pointing to an old DB after a failover) and will therefore be closed. + Pass ``0`` to disable. + :param coroutine setup: A coroutine to prepare a connection right before it is returned from :meth:`Pool.acquire() `. An example use @@ -699,4 +738,5 @@ def create_pool(dsn=None, *, min_size=min_size, max_size=max_size, max_queries=max_queries, loop=loop, setup=setup, init=init, max_inactive_connection_lifetime=max_inactive_connection_lifetime, + max_consecutive_exceptions=max_consecutive_exceptions, **connect_kwargs)