Skip to content

Commit

Permalink
Fixed some reconnecting bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
igorcoding committed Feb 15, 2017
1 parent 499ba7a commit 302ae79
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 27 deletions.
2 changes: 1 addition & 1 deletion asynctnt/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .connection import Connection, connect
from .iproto.protocol import Iterator, Response

__version__ = '0.0.9'
__version__ = '0.0.10'
37 changes: 26 additions & 11 deletions asynctnt/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,18 +134,34 @@ def connection_lost(self, exc):

if self._reconnect_timeout > 0 \
and self._state != ConnectionState.DISCONNECTING:
if self._state == ConnectionState.RECONNECTING:
return
self._set_state(ConnectionState.DISCONNECTING)
self._set_state(ConnectionState.RECONNECTING)
logger.info('%s Started reconnecting', self.fingerprint)
self._reconnect_coro = \
asyncio.ensure_future(self._connect(return_exceptions=False),
loop=self._loop)
self._start_reconnect(return_exceptions=False)
else:
self._set_state(ConnectionState.DISCONNECTED)
if self._disconnect_waiter:
self._disconnect_waiter.set_result(True)
self._disconnect_waiter = None

def __create_reconnect_coro(self, return_exceptions=False):
if self._reconnect_coro:
self._reconnect_coro.cancel()
self._reconnect_coro = asyncio.ensure_future(
self._connect(return_exceptions=return_exceptions),
loop=self._loop
)
return self._reconnect_coro

def _start_reconnect(self, return_exceptions=False):
if self._state == ConnectionState.RECONNECTING:
logger.info('Already in reconnecting state')
return

logger.info('%s Started reconnecting', self.fingerprint)
self._set_state(ConnectionState.RECONNECTING)
self.__create_reconnect_coro(return_exceptions)

def protocol_factory(self, connected_fut, cls=protocol.Protocol):
return cls(host=self._host,
port=self._port,
Expand All @@ -168,7 +184,6 @@ async def _connect(self, return_exceptions=True):
ConnectionState.CONNECTING,
ConnectionState.CONNECTED,
ConnectionState.DISCONNECTING,
ConnectionState.DISCONNECTED,
}
if self._state in ignore_states:
return
Expand Down Expand Up @@ -222,27 +237,27 @@ async def _connect(self, return_exceptions=True):
if e.code in {ErrorCode.ER_LOADING}:
# If Tarantool is still loading then reconnect
if self._reconnect_timeout > 0:
await self._do_reconnect(e)
await self._wait_reconnect(e)
continue
if return_exceptions:
self._reconnect_coro = None
raise e
else:
logger.exception(e)
if self._reconnect_timeout > 0:
await self._do_reconnect(e)
await self._wait_reconnect(e)
continue
except Exception as e:
if self._reconnect_timeout > 0:
await self._do_reconnect(e)
await self._wait_reconnect(e)
continue
if return_exceptions:
self._reconnect_coro = None
raise e
else:
logger.exception(e)

async def _do_reconnect(self, exc=None):
async def _wait_reconnect(self, exc=None):
self._set_state(ConnectionState.RECONNECTING)
logger.warning('Connect to %s failed: %s. Retrying in %f seconds',
self.fingerprint,
Expand All @@ -256,7 +271,7 @@ def connect(self):
"""
Connect coroutine
"""
return self._connect(return_exceptions=True)
return self.__create_reconnect_coro(True)

async def disconnect(self):
"""
Expand Down
26 changes: 13 additions & 13 deletions temp/t.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async def main(loop):
# print(conn._protocol.schema)
# print(conn._protocol._con_state)
#
# res = await conn.call16('long', [3])
res = await conn.call16('long', [5])
# res = await conn.auth('tt', 'ttp')d

# await tnt.stop()
Expand All @@ -69,18 +69,18 @@ async def main(loop):
# res = await conn.select('tester')
# print(res.body)
# res = await conn.call('test', timeout=0)
class A():
def __init__(self, a):
super(A, self).__init__()
self.a = a

def __str__(self):
return "A({})".format(self.a)

res = await conn.call('func_param', [A(100)])
print(res)
print(res.body)
print(res.schema_id)
# class A():
# def __init__(self, a):
# super(A, self).__init__()
# self.a = a
#
# def __str__(self):
# return "A({})".format(self.a)
#
# res = await conn.call('func_param', [A(100)])
# print(res)
# print(res.body)
# print(res.schema_id)
return
# res = await conn.refetch_schema()
# res = await conn.insert('tester', (2, 'hello', 3))
Expand Down
61 changes: 59 additions & 2 deletions tests/test_connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
import logging

from asynctnt.connection import ConnectionState
from asynctnt.exceptions import TarantoolDatabaseError, ErrorCode
from tests import BaseTarantoolTestCase

import asynctnt


class ConnectTestCase(BaseTarantoolTestCase):
DO_CONNECT = False
LOGGING_LEVEL = logging.DEBUG

async def test__connect(self):
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
Expand Down Expand Up @@ -122,13 +124,23 @@ async def test__connect_wait_tnt_started(self):
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
username='t1', password='t1',
fetch_schema=True,
reconnect_timeout=0.000001,
reconnect_timeout=0.000000001,
loop=self.loop)
coro = self.ensure_future(conn.connect())
await self.sleep(0.3)
await self.tnt.start()
await self.sleep(1)
await coro
while True:
try:
await coro
break
except TarantoolDatabaseError as e:
if e.code == ErrorCode.ER_NO_SUCH_USER:
# Try again
coro = self.ensure_future(conn.connect())
continue
raise

self.assertEqual(conn.state, ConnectionState.CONNECTED)
await conn.disconnect()

Expand Down Expand Up @@ -169,3 +181,48 @@ async def test__close(self):
conn.close()
await self.sleep(0.1)
self.assertEqual(conn.state, ConnectionState.DISCONNECTED)

async def test_reconnect_from_idle(self):
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
reconnect_timeout=0,
loop=self.loop)
await conn.reconnect()

self.assertEqual(conn.state, ConnectionState.CONNECTED)
await conn.disconnect()

async def test_reconnect_after_connect(self):
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
reconnect_timeout=0,
loop=self.loop)
await conn.connect()
await conn.reconnect()

self.assertEqual(conn.state, ConnectionState.CONNECTED)
await conn.disconnect()

async def test_manual_reconnect(self):
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
reconnect_timeout=0,
loop=self.loop)
await conn.connect()
await conn.disconnect()
await conn.connect()

self.assertEqual(conn.state, ConnectionState.CONNECTED)
await conn.disconnect()

async def test__connect_connection_lost(self):
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
reconnect_timeout=1/3,
loop=self.loop)
await conn.connect()
await self.tnt.stop()
await self.sleep(0.5)
await self.tnt.start()
await self.sleep(0.5)

self.assertEqual(conn.state, ConnectionState.CONNECTED)
self.assertTrue(conn.is_connected)

await conn.disconnect()

0 comments on commit 302ae79

Please sign in to comment.