Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reusing engines from different threads #1110

Open
dhirschfeld opened this issue Oct 1, 2024 · 3 comments
Open

Reusing engines from different threads #1110

dhirschfeld opened this issue Oct 1, 2024 · 3 comments

Comments

@dhirschfeld
Copy link
Contributor

My observation is that passing an engine connected to an in-memory duckdb database to a different thread doesn't work.

I'm wondering if that's expected or if it would be considered a bug / missing feature?

Example:

import anyio
import sqlalchemy as sa
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    Session,
    mapped_column,
)


class Base(DeclarativeBase):
    pass


seq = sa.Sequence("user_id")


class User(Base):
    __tablename__ = "Users"
    id: Mapped[int] = mapped_column(
        seq,
        server_default=seq.next_value(),
        primary_key=True,
    )
    name: Mapped[str] = mapped_column(sa.String(30))


engine = sa.create_engine("duckdb:///:memory:")
Base.metadata.create_all(bind=engine)

with Session(engine) as session:
    spongebob = User(name="spongebob")
    sandy = User(name="sandy")
    patrick = User(name="patrick")
    session.add_all([spongebob, sandy, patrick])
    session.commit()


def run_query(engine: sa.Engine):
    with engine.connect() as conn:
        return conn.execute(sa.text("select * from Users")).fetchall()

Running the run_query function works as expected:

>>> run_query(engine)
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

...but if I run it in a background thread I get a Catalog Error: Table with name Users does not exist! exception 😔

My assumption is that the engine loses it's connection to the in-memory database in the main thread and creates a new in-memory database where that table doesn't exist?

>>> await anyio.to_thread.run_sync(run_query, engine)
---------------------------------------------------------------------------
CatalogException                          Traceback (most recent call last)
File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters)
   1966     if not evt_handled:
-> 1967         self.dialect.do_execute(
   1968             cursor, str_statement, effective_parameters, context
   1969         )
   1971 if self._has_events or self.engine._has_events:

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context)
    940 def do_execute(self, cursor, statement, parameters, context=None):
--> 941     cursor.execute(statement, parameters)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context)
    139     else:
--> 140         self.__c.execute(statement, parameters)
    141 except RuntimeError as e:

CatalogException: Catalog Error: Table with name Users does not exist!
Did you mean "sqlite_master"?
LINE 1: select * from Users
                      ^

The above exception was the direct cause of the following exception:

ProgrammingError                          Traceback (most recent call last)
Cell In[10], line 1
----> 1 await anyio.to_thread.run_sync(run_query, engine)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/to_thread.py:56, in run_sync(func, abandon_on_cancel, cancellable, limiter, *args)
     48     abandon_on_cancel = cancellable
     49     warn(
     50         "The `cancellable=` keyword argument to `anyio.to_thread.run_sync` is "
     51         "deprecated since AnyIO 4.1.0; use `abandon_on_cancel=` instead",
     52         DeprecationWarning,
     53         stacklevel=2,
     54     )
---> 56 return await get_async_backend().run_sync_in_worker_thread(
     57     func, args, abandon_on_cancel=abandon_on_cancel, limiter=limiter
     58 )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1060, in TrioBackend.run_sync_in_worker_thread(cls, func, args, abandon_on_cancel, limiter)
   1057         return func(*args)
   1059 token = TrioBackend.current_token()
-> 1060 return await run_sync(
   1061     wrapper,
   1062     abandon_on_cancel=abandon_on_cancel,
   1063     limiter=cast(trio.CapacityLimiter, limiter),
   1064 )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:437, in to_thread_run_sync(sync_fn, thread_name, abandon_on_cancel, limiter, *args)
    433 msg_from_thread: outcome.Outcome[RetT] | Run[object] | RunSync[object] = (
    434     await trio.lowlevel.wait_task_rescheduled(abort)
    435 )
    436 if isinstance(msg_from_thread, outcome.Outcome):
--> 437     return msg_from_thread.unwrap()
    438 elif isinstance(msg_from_thread, Run):
    439     await msg_from_thread.run()

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***)
    211 captured_error = self.error
    212 try:
--> 213     raise captured_error
    214 finally:
    215     # We want to avoid creating a reference cycle here. Python does
    216     # collect cycles just fine, so it wouldn't be the end of the world
   (...)
    225     # methods frame, we avoid the 'captured_error' object's
    226     # __traceback__ from indirectly referencing 'captured_error'.
    227     del captured_error, self

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:363, in to_thread_run_sync.<locals>.report_back_in_trio_thread_fn.<locals>.do_release_then_return_result()
    357 def do_release_then_return_result() -> RetT:
    358     # release_on_behalf_of is an arbitrary user-defined method, so it
    359     # might raise an error. If it does, we want that error to
    360     # replace the regular return value, and if the regular return was
    361     # already an exception then we want them to chain.
    362     try:
--> 363         return result.unwrap()
    364     finally:
    365         limiter.release_on_behalf_of(placeholder)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/outcome/_impl.py:213, in Error.unwrap(***failed resolving arguments***)
    211 captured_error = self.error
    212 try:
--> 213     raise captured_error
    214 finally:
    215     # We want to avoid creating a reference cycle here. Python does
    216     # collect cycles just fine, so it wouldn't be the end of the world
   (...)
    225     # methods frame, we avoid the 'captured_error' object's
    226     # __traceback__ from indirectly referencing 'captured_error'.
    227     del captured_error, self

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/trio/_threads.py:382, in to_thread_run_sync.<locals>.worker_fn()
    380 PARENT_TASK_DATA.task_register = task_register
    381 try:
--> 382     ret = context.run(sync_fn, *args)
    384     if inspect.iscoroutine(ret):
    385         # Manually close coroutine to avoid RuntimeWarnings
    386         ret.close()

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/anyio/_backends/_trio.py:1057, in TrioBackend.run_sync_in_worker_thread.<locals>.wrapper()
   1055 def wrapper() -> T_Retval:
   1056     with claim_worker_thread(TrioBackend, token):
-> 1057         return func(*args)

Cell In[8], line 3, in run_query(engine)
      1 def run_query(engine: sa.Engine):
      2     with engine.connect() as conn:
----> 3         return conn.execute(sa.text("select * from Users")).fetchall()

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1418, in Connection.execute(self, statement, parameters, execution_options)
   1416     raise exc.ObjectNotExecutableError(statement) from err
   1417 else:
-> 1418     return meth(
   1419         self,
   1420         distilled_parameters,
   1421         execution_options or NO_OPTIONS,
   1422     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/sql/elements.py:515, in ClauseElement._execute_on_connection(self, connection, distilled_params, execution_options)
    513     if TYPE_CHECKING:
    514         assert isinstance(self, Executable)
--> 515     return connection._execute_clauseelement(
    516         self, distilled_params, execution_options
    517     )
    518 else:
    519     raise exc.ObjectNotExecutableError(self)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1640, in Connection._execute_clauseelement(self, elem, distilled_parameters, execution_options)
   1628 compiled_cache: Optional[CompiledCacheType] = execution_options.get(
   1629     "compiled_cache", self.engine._compiled_cache
   1630 )
   1632 compiled_sql, extracted_params, cache_hit = elem._compile_w_cache(
   1633     dialect=dialect,
   1634     compiled_cache=compiled_cache,
   (...)
   1638     linting=self.dialect.compiler_linting | compiler.WARN_LINTING,
   1639 )
-> 1640 ret = self._execute_context(
   1641     dialect,
   1642     dialect.execution_ctx_cls._init_compiled,
   1643     compiled_sql,
   1644     distilled_parameters,
   1645     execution_options,
   1646     compiled_sql,
   1647     distilled_parameters,
   1648     elem,
   1649     extracted_params,
   1650     cache_hit=cache_hit,
   1651 )
   1652 if has_events:
   1653     self.dispatch.after_execute(
   1654         self,
   1655         elem,
   (...)
   1659         ret,
   1660     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1846, in Connection._execute_context(self, dialect, constructor, statement, parameters, execution_options, *args, **kw)
   1844     return self._exec_insertmany_context(dialect, context)
   1845 else:
-> 1846     return self._exec_single_context(
   1847         dialect, context, statement, parameters
   1848     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1986, in Connection._exec_single_context(self, dialect, context, statement, parameters)
   1983     result = context._setup_result_proxy()
   1985 except BaseException as e:
-> 1986     self._handle_dbapi_exception(
   1987         e, str_statement, effective_parameters, cursor, context
   1988     )
   1990 return result

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:2355, in Connection._handle_dbapi_exception(self, e, statement, parameters, cursor, context, is_sub_exec)
   2353 elif should_wrap:
   2354     assert sqlalchemy_exception is not None
-> 2355     raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
   2356 else:
   2357     assert exc_info[1] is not None

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/base.py:1967, in Connection._exec_single_context(self, dialect, context, statement, parameters)
   1965                 break
   1966     if not evt_handled:
-> 1967         self.dialect.do_execute(
   1968             cursor, str_statement, effective_parameters, context
   1969         )
   1971 if self._has_events or self.engine._has_events:
   1972     self.dispatch.after_cursor_execute(
   1973         self,
   1974         cursor,
   (...)
   1978         context.executemany,
   1979     )

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/sqlalchemy/engine/default.py:941, in DefaultDialect.do_execute(self, cursor, statement, parameters, context)
    940 def do_execute(self, cursor, statement, parameters, context=None):
--> 941     cursor.execute(statement, parameters)

File /opt/python/envs/dev310/.pixi/envs/default/lib/python3.10/site-packages/duckdb_engine/__init__.py:140, in CursorWrapper.execute(self, statement, parameters, context)
    138         self.__c.execute(statement)
    139     else:
--> 140         self.__c.execute(statement, parameters)
    141 except RuntimeError as e:
    142     if e.args[0].startswith("Not implemented Error"):

ProgrammingError: (duckdb.duckdb.CatalogException) Catalog Error: Table with name Users does not exist!
Did you mean "sqlite_master"?
LINE 1: select * from Users
                      ^
[SQL: select * from Users]
(Background on this error at: https://sqlalche.me/e/20/f405)
@dhirschfeld
Copy link
Contributor Author

dhirschfeld commented Oct 1, 2024

If I change the run_query function to instead accept a sa.Connection then the query works in a background thread:

>>> def run_query(conn: sa.Connection):
...     return conn.execute(sa.text("select * from Users")).fetchall()

>>> with engine.connect() as conn:
...     res = run_query(conn)

>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

>>> with engine.connect() as conn:
...     res = await anyio.to_thread.run_sync(run_query, conn)

>>> res
[(1, 'spongebob'), (2, 'sandy'), (3, 'patrick')]

@dhirschfeld
Copy link
Contributor Author

It would be great if it were possible to pass an engine to s separate thread to use so you could use the same code irrespective of whether you were connected to a Postgres database in production or a duckdb in-memory database in CI.

@dhirschfeld
Copy link
Contributor Author

Calling back into the main-thread from the worker thread seems to work, but then it only works from the worker-thread context, so, not ideal.

def run_query(engine: sa.Engine):
    with anyio.from_thread.run_sync(engine.connect) as conn:
        return conn.execute(sa.text("select * from Users")).fetchall()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant