Skip to content

Commit

Permalink
Implement reconnecting in amsphinxql (native MYSQL_OPT_RECONNECT is d…
Browse files Browse the repository at this point in the history
…eprecated in MySQL 8.0.34+)
  • Loading branch information
andreymal committed Sep 25, 2023
1 parent 5b651ca commit 4b3f6ec
Showing 1 changed file with 63 additions and 27 deletions.
90 changes: 63 additions & 27 deletions mini_fiction/apis/amsphinxql.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import traceback
from queue import Queue
from threading import Lock, local
from dataclasses import dataclass
from types import TracebackType
import typing
from typing import Any, TypeVar, Union, Optional, Sequence, Tuple, List, Dict
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union

if typing.TYPE_CHECKING:
from MySQLdb.cursors import Cursor
Expand Down Expand Up @@ -52,26 +52,26 @@ def __enter__(self: T) -> T:
self._with_level += 1
return self

def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
assert self._with_level > 0

self._with_level -= 1
if self._with_level == 0:
if exc:
from MySQLdb import Error

# 2002 - Can't connect to local MySQL server
# 2006 - MySQL server has gone away
# 2013 - Lost connection to MySQL server during query
if not isinstance(exc, Error) or exc.args[0] not in (2002, 2006, 2013):
if exc is not None:
if not _should_reconnect(exc):
self.rollback()
else:
self.commit()

def ping(self, reconnect: bool = True) -> None:
self.mysql_conn.ping(reconnect)
def ping(self) -> None:
self.mysql_conn.ping()

def tables(self):
def tables(self) -> Sequence[Tuple[str, str]]:
return self._execute('show tables').fetchall()

def _execute(
Expand Down Expand Up @@ -403,35 +403,71 @@ def __init__(
self.conn_queue = conn_queue or Queue()
self._lock = Lock()

def __enter__(self) -> SphinxConnection:
if not hasattr(self.local, "level") or self.local.level == 0:
def _get_connection_from_pool(self) -> SphinxConnection:
while True:
with self._lock:
if self.conn_queue.empty() and self.count < self.max_conns:
self.conn_queue.put(SphinxConnection(self.conn))
self.count += 1
self.local.conn = self.conn_queue.get()
self.local.conn.__enter__()

conn = self.conn_queue.get()

try:
conn.ping()
except Exception as exc:
# drop broken connection
with self._lock:
self.count -= 1

if _should_reconnect(exc):
continue # connection errors are not fatal
raise

return conn

def __enter__(self) -> SphinxConnection:
if not hasattr(self.local, "level") or self.local.level == 0:
conn = self._get_connection_from_pool()
conn.__enter__()
self.local.conn = conn
self.local.level = 1
else:
self.local.level += 1
conn: SphinxConnection = self.local.conn
conn.ping(reconnect=True)
return conn

conn = self.local.conn
self.local.level += 1
return conn

def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
assert self.local.level > 0
assert self.local.conn is not None

self.local.level -= 1
if self.local.level == 0:
c = self.local.conn
conn: SphinxConnection = self.local.conn
self.local.conn = None

try:
c.__exit__(exc_type, exc, tb)
conn.__exit__(exc_type, exc, exc_tb)
except Exception:
# drop broken connection
with self._lock:
self.count -= 1
traceback.print_exc()
else:
self.conn_queue.put(c)
# connection errors are fatal here due to possible data loss
raise

self.conn_queue.put(conn)


def _should_reconnect(exc: BaseException) -> bool:
from MySQLdb import Error

# 2002 - Can't connect to local MySQL server
# 2006 - MySQL server has gone away
# 2013 - Lost connection to MySQL server during query
# 4031 - The client was disconnected by the server because of inactivity
return isinstance(exc, Error) and exc.args[0] in (2002, 2006, 2013, 4031)

0 comments on commit 4b3f6ec

Please sign in to comment.