From 4b3f6ec8d4d2db64060ab26184225e3de3a933b0 Mon Sep 17 00:00:00 2001 From: andreymal Date: Mon, 25 Sep 2023 23:13:59 +0300 Subject: [PATCH] Implement reconnecting in amsphinxql (native MYSQL_OPT_RECONNECT is deprecated in MySQL 8.0.34+) --- mini_fiction/apis/amsphinxql.py | 90 +++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 27 deletions(-) diff --git a/mini_fiction/apis/amsphinxql.py b/mini_fiction/apis/amsphinxql.py index 24dd7181..70c9ea03 100644 --- a/mini_fiction/apis/amsphinxql.py +++ b/mini_fiction/apis/amsphinxql.py @@ -1,9 +1,9 @@ -import traceback from queue import Queue from threading import Lock, local from dataclasses import dataclass +from types import TracebackType import typing -from typing import Any, TypeVar, Union, Optional, Sequence, Tuple, List, Dict +from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union if typing.TYPE_CHECKING: from MySQLdb.cursors import Cursor @@ -52,26 +52,26 @@ def __enter__(self: T) -> T: self._with_level += 1 return self - def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None: + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: assert self._with_level > 0 self._with_level -= 1 if self._with_level == 0: - if exc: - from MySQLdb import Error - - # 2002 - Can't connect to local MySQL server - # 2006 - MySQL server has gone away - # 2013 - Lost connection to MySQL server during query - if not isinstance(exc, Error) or exc.args[0] not in (2002, 2006, 2013): + if exc is not None: + if not _should_reconnect(exc): self.rollback() else: self.commit() - def ping(self, reconnect: bool = True) -> None: - self.mysql_conn.ping(reconnect) + def ping(self) -> None: + self.mysql_conn.ping() - def tables(self): + def tables(self) -> Sequence[Tuple[str, str]]: return self._execute('show tables').fetchall() def _execute( @@ -403,35 +403,71 @@ def __init__( self.conn_queue = conn_queue or Queue() self._lock = Lock() - def __enter__(self) -> SphinxConnection: - if not hasattr(self.local, "level") or self.local.level == 0: + def _get_connection_from_pool(self) -> SphinxConnection: + while True: with self._lock: if self.conn_queue.empty() and self.count < self.max_conns: self.conn_queue.put(SphinxConnection(self.conn)) self.count += 1 - self.local.conn = self.conn_queue.get() - self.local.conn.__enter__() + + conn = self.conn_queue.get() + + try: + conn.ping() + except Exception as exc: + # drop broken connection + with self._lock: + self.count -= 1 + + if _should_reconnect(exc): + continue # connection errors are not fatal + raise + + return conn + + def __enter__(self) -> SphinxConnection: + if not hasattr(self.local, "level") or self.local.level == 0: + conn = self._get_connection_from_pool() + conn.__enter__() + self.local.conn = conn self.local.level = 1 - else: - self.local.level += 1 - conn: SphinxConnection = self.local.conn - conn.ping(reconnect=True) + return conn + + conn = self.local.conn + self.local.level += 1 return conn - def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None: + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> None: assert self.local.level > 0 assert self.local.conn is not None self.local.level -= 1 if self.local.level == 0: - c = self.local.conn + conn: SphinxConnection = self.local.conn self.local.conn = None + try: - c.__exit__(exc_type, exc, tb) + conn.__exit__(exc_type, exc, exc_tb) except Exception: # drop broken connection with self._lock: self.count -= 1 - traceback.print_exc() - else: - self.conn_queue.put(c) + # connection errors are fatal here due to possible data loss + raise + + self.conn_queue.put(conn) + + +def _should_reconnect(exc: BaseException) -> bool: + from MySQLdb import Error + + # 2002 - Can't connect to local MySQL server + # 2006 - MySQL server has gone away + # 2013 - Lost connection to MySQL server during query + # 4031 - The client was disconnected by the server because of inactivity + return isinstance(exc, Error) and exc.args[0] in (2002, 2006, 2013, 4031)