Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

race with multi-threaded wsgi server #23

Open
xtaje opened this issue May 19, 2020 · 33 comments
Open

race with multi-threaded wsgi server #23

xtaje opened this issue May 19, 2020 · 33 comments

Comments

@xtaje
Copy link

xtaje commented May 19, 2020

When substituting a threaded WSGI server for the Flask server, the messagebus.queue and uow.session can have thread races, with POC here.

From inspecting the logs, it looks like a couple things can happen:

  • the messagebus.queue gets swapped out, and some events are dropped before they can be processed.
  • the uow.session gets swapped out, and when the uow tries to commit or rollback, SqlAlchemy throws stack traces with message that look like
sqlalchemy.orm.exc.DetachedInstanceError: Instance <Product at 0x7f55fb51abb0> is not bound to a Session; attribute refresh operation cannot proceed (Background on this error at: http://sqlalche.me/e/bhk3)
AttributeError: 'NoneType' object has no attribute '_iterate_self_and_parents'
sqlalchemy.exc.InvalidRequestError: This session is in 'inactive' state, due to the SQL transaction being rolled back; no further SQL can be emitted within this transaction.
@xtaje
Copy link
Author

xtaje commented May 19, 2020

Demo of a couple different ideas to fix:

  • Create queue and uow on a per request basis POC
  • Inject uow from entry point POC
  • Inject uow type during bootstrap POC

These options do complicate some of the dependency injection functions, and don't account for unit testing, ideal package structure, ...

Also possible is to use thread-local storage.

@hjwp
Copy link
Contributor

hjwp commented May 19, 2020

thanks Ed!

i feel like that first PoC can't possibly pass the unit tests can it? the unit tests use the real message bus, and need a fake uow...

@xtaje
Copy link
Author

xtaje commented May 20, 2020

i feel like that first PoC can't possibly pass the unit tests can it? the unit tests use the real message bus, and need a fake uow...

Definitely, yes! These were just intended to demonstrate a few different options, as opposed to fully-realized solutions. The message bus also now has some awkward code that inspects the signature of each handler before dispatching, which I would want to fix IRL.

You would have to do the second or third option in order to inject a fake uow. Each PoC has some knock on effect on the unit tests + package structure that need to be sorted out

@aleksarias
Copy link

I was getting an AttributeError: 'NoneType' object has no attribute 'twophase' when I tried to run a uvicorn server which would spawn concurrent sessions with concurrent UOWs.
The reason this was happening is because the same session was being shared across the threads... so transactions were being nested and when one UOW closed the transaction deleted the session the other still hadn't finished the session. The solution was using a scoped_session. This now allows each thread to create its own session.

DEFAULT_SESSION_FACTORY = sessionmaker(
    bind=create_engine(
        config.get_db_uri(),
        isolation_level="REPEATABLE READ",
        echo_pool=True,
        pool_pre_ping=True,
        # echo=True,
    ),
    autoflush=False,
    autocommit=False
)


class SqlAlchemyUnitOfWork(AbstractUnitOfWork):

    def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
        self.session_factory = session_factory
        self.session = None

    def __enter__(self) -> AbstractUnitOfWork:
        self.session: scoped_session = scoped_session(self.session_factory)
        self.people = repository.SqlAlchemyPeopleRepository(self.session)
        return super().__enter__()

    def __exit__(self, *args):
        super().__exit__(*args)
        self.session.close()

    def _commit(self):
        self.session.commit()

    def rollback(self):
        self.session.rollback()

@hjwp
Copy link
Contributor

hjwp commented Jul 30, 2020

I need to write a blog post on this. At MADE we often split out a UnitOfWOrkManager which has all the once-per-application config stuff, like the session factory, and a UnitOfWork class which is meant to be a fresh instance for each request / handler. it makes it much harder to leave any state lying around between requests, and easier to avoid this sort of concurrency bug.

are you just experimenting with this stuff or is it in production? we'd love to hear more... you can also join us in the incipient #python community at the DDD-CQRS-ES slack: https://j.mp/ddd-es-cqrs

@xtaje
Copy link
Author

xtaje commented Jul 30, 2020

I think there may still be an issue with message queue per-context, but would want to double check. This would be the cleanest way to stop the session crashing out though.

@aleksarias
Copy link

@hjwp, thanks for the slack invite! I'm using this in non-critical production systems.

@hjwp
Copy link
Contributor

hjwp commented Aug 5, 2020

that's great to hear! had you seen this patterns elsewhere before? how are you finding they work in practice? did you make any other modifications you'd like to tell us about, other than this concurrency issue?

@aleksarias
Copy link

I have seen this type of pattern before. I've seen some developers use something similar in js at my previous employer. There's also a python package that implements this style as a framework: https://eventsourcing.readthedocs.io/

I didn't deviate away much from the pattern presented here but I did add logging of messages passed through the message bus (see below). If I had more time I would have used the eventsourcing library.

My program is only processing one message at a time but I would love to get it to be able to process several in parallel via asyncio and let the database create the locks to prevent the race conditions but I haven't had time to figure out how to do it.

Message = Union[commands.Command, events.Event]


@dataclass
class LogMessage(object):
    type: str
    name: str
    payload: dict


class AbstractMessageLogger(abc.ABC):

    def add(self, message: Message):
        self._add(message)

    @abc.abstractmethod
    def _add(self, message: Message):
        raise NotImplementedError


DEFAULT_SESSION_FACTORY = sessionmaker(
    bind=create_engine(
        config.get_db_uri(),
        json_serializer=lambda obj: json.dumps(jsonable_encoder(obj)) if obj else 'null',
    )
)


class SqlAlchemyMessageLogger(AbstractMessageLogger):

    def __init__(self, session_factory=DEFAULT_SESSION_FACTORY):
        super().__init__()
        self.session_factory = session_factory

    def _add(self, message: LogMessage):
        session: Session = self.session_factory()
        event_type = message.__class__.__bases__[0].__name__
        event_name = message.__class__.__name__
        log_message = LogMessage(type=event_type, name=event_name, payload=asdict(message))
        session.add(log_message)
        session.commit()


class MessageBus:

    def __init__(
            self,
            uow: unit_of_work.AbstractUnitOfWork,
            message_logger: AbstractMessageLogger,
            event_handlers: Dict[Type[events.Event], List[Callable]],
            command_handlers: Dict[Type[commands.Command], Callable],
    ):
        self.uow = uow
        self.message_logger = message_logger
        self.event_handlers = event_handlers
        self.command_handlers = command_handlers
        self.queue = None

    def handle(self, message: Message):
        self.queue = [message]
        while self.queue:
            message = self.queue.pop(0)
            self.message_logger.add(message)
            if isinstance(message, events.Event):
                self.handle_event(message)
            elif isinstance(message, commands.Command):
                self.handle_command(message)
            else:
                raise Exception(f'{message} was not an Event or Command')

    def handle_event(self, event: events.Event):
        event_type = type(event)
        event_name = event_type.__name__
        event_handlers = self.event_handlers.get(event_type, [])
        for handler in event_handlers:
            try:
                log.info('handling event %s with handler %s', event_name, handler.__name__)
                handler(event)
                self.queue.extend(self.uow.collect_new_events())
            except Exception as e:
                log.exception(f'Exception handling event {event_name}: {e}')
                traceback_str = traceback.format_exc()
                exception_message = str(e)
                self.queue.append(events.ErrorHandlingEvent(message=exception_message, traceback=traceback_str))
                continue
        if not event_handlers:
            log.warning('no event handlers found for event %s', event_name)

    def handle_command(self, command: commands.Command):
        command_type = type(command)
        command_name = command_type.__name__
        log.info('handling command %s', command_name)
        try:
            handler = self.command_handlers[command_type]
            handler(command)
            self.queue.extend(self.uow.collect_new_events())
        except Exception as e:
            log.exception(f'Exception handling command {command_name}: {e}')
            traceback_str = traceback.format_exc()
            exception_message = str(e)
            self.queue.append(events.ErrorHandlingCommand(message=exception_message, traceback=traceback_str))
            raise
        log.info('done handling command %s', command_name)

@xtaje
Copy link
Author

xtaje commented Aug 16, 2020

I'd defer to to @hjwp and Bob for a definitive take, but a couple things in this snippet make me nervous:

  • To cover all your bases with threads, I think there needs to be a queue instance per UoW. So that could be a thread-local self.queue, or a more significant re-factoring.
  • The message logger will commit the messages to the database before their effects are actually processed and committed. Maybe that will work, but it could also cause some very hard-to-track bugs because a consumer might see the event before the state has changed. Or if the message processor fails, a consumer might never see the intended effects. To really be safe, I would want to commit the messages along with the state changes in a single transaction, in the UoW.
  • If there are a lot of events, commiting each event separately might have a negative performance impact.

@hjwp
Copy link
Contributor

hjwp commented Aug 17, 2020

I'd defer to @xtaje on pretty much any subject. but fwiw, if you have threads then definitely do something about self.queue. the transaction point is spot on too. maybe pass the uow into message_logger.add()? and the third one is of course correct but it's the kind of thing i would worry about "later" :-)

@aleksarias
Copy link

Woaw thanks for all the feedback! This is very helpful. For now it's working but I plan on addressing the points that were brought up sometime in the future.

@xtaje
Copy link
Author

xtaje commented Aug 17, 2020

I'd defer to @xtaje on pretty much any subject. but fwiw, if you have threads then definitely do something about self.queue. the transaction point is spot on too. maybe pass the uow into message_logger.add()? and the third one is of course correct but it's the kind of thing i would worry about "later" :-)

Yeah, the third point is not a problem until it becomes a problem so you could leave it be. But if this is something that you're putting in production I wouldn't leave fixing the other two things until later.

The possible effect of the queue issue is that you might lose some messages and your service will look like it's lost data or the logic is wrong.

The possible effect of the logger commit is hard to predict. In the best case, it might just lead to confusing logs and debugging sessions because things look like they're happening out of order, or are getting lost. In the worst case it might start causing errors in other services and corrupting data silently.

@daniel-butler
Copy link

Here are my two cents, that and $5 will get you a cup off coffee.

I had been running into the same issue and solved it with the following unit of work

class AbstractUnitOfWork:
    """The Abstract Base Class for the unit of work."""
    repo: repository.AbstractRepository

    def __enter__(self) -> AbstractUnitOfWork:
        """The base context manager enter."""
        return self

    def __exit__(self, *args):
        """The base context manager exit."""
        self.rollback()

    def commit(self) -> None:
        """Calls the abstract method `_commit`"""
        self._commit()

    def collect_new_events(self):
        """Collects all new events seen by the attached repository."""
        for import_ in self.PayableImport.seen:
            while import_.events:
                yield import_.events.pop(0)

    @abc.abstractmethod
    def _commit(self):
        """Abstract method to be implemented. Handles saving the data once all items are completed"""
        raise NotImplementedError

    @abc.abstractmethod
    def rollback(self):
        """Abstract method to be implemented. Handles rolling back any unsaved data
        so it goes back to its original state.
        """
        raise NotImplementedError


DEFAULT_SESSION_FACTORY = sessionmaker(bind=create_engine(
    settings.database_uri,
    isolation_level="REPEATABLE READ",
))


class SqlAlchemyUnitOfWork(AbstractUnitOfWork):
    """Uses the implementation from
    [Cosmic Python](https://github.com/cosmicpython/code/blob/master/src/allocation/service_layer/unit_of_work.py)
    The main difference is this unit of work keeps track of the session and won't close it until
    all active context managers are exited. The helps with nested unit of works like this.
    ```python
    with uow:
        # Does some work with the session
        x = uow.repo.get()
        # calls another function with the context manager
        with uow:
            # Does some work then exits
            y = uow.repo.get(x.ids)
        # ...continues to work on the session. It won't be closed!
        uow.repo.add(y)
    ```
    """
    session: Optional[Session] = None
    active_uows: Set[str] = set()
    repo = None

    def __init__(self, session: sessionmaker = DEFAULT_SESSION_FACTORY) -> None:
        self.session_factory = session
        self.active_id = str(uuid4())

    def __enter__(self):
        self.active_uows.add(self.active_id)
        if not self.session:
            self.session = self.session_factory()
            self.repo = repository.SqlAlchemyRepository(self.session)
        return super().__enter__()

    def __exit_(self, *args):
        super().__exit__(*args)
        self.active_uows.remove(self.active_id)
        if not bool(self.active_uows):  # No active sessions
            self.session.close()
            self.session = None

    def _commit(self):
        self.session.commit()

    def rollback(self):
        self.session.rollback()

It helped because my handlers were like this and I would get the two phase commit error without it.

def do_action_one(cmd, uow):
    with uow:
        # Does some work with the session
        x = uow.repo.get()
        # calls another function with the context manager
        do_action_two(x, uow)
        uow.commit()

def do_action_two(x, uow):
        with uow:
            # Does some work then exits
            y = uow.repo.get(x.ids)
            uow.commit()

@hjwp
Copy link
Contributor

hjwp commented Dec 14, 2020

i did a bit of experimenting with nested UoWs in one project and in the end just backed it out cos it got too complicated. but i always regretted it!

i guess the important thing is to think thru exactly what behaviour/semantics you want from your context managers / rollbacks / exception handling? see this closely related post by Tech-Reviewer-David @seddonym https://seddonym.me/2020/11/19/trouble-atomic/

@xtaje
Copy link
Author

xtaje commented Dec 15, 2020

Here's my two 2 cents. First is that I'm not sure how the nested UoW helps with the multi-threading? It seems that every request thread should get its own UoW anyway.

Second is that I think having nested contexts like that is probably an anti-pattern. The UoW is supposed to represent a logically indivisible task that completes before you start another. So when task1 calls task2, task1 has not actually finished. Yet task2 is committing task1's results.

If task1 needs to rollback for some reason, it now cannot. And I'm unsure what exactly will happen if some other process does a RW cycle in between the commits of task2 and and task1, due to SqlAlchemy magic.

So it seems to me that you would want one function that establishes the scope for the UoW, and any other function called in that scope should just operate on domain objects, like this:

def do_action_one(x, uow):
   with uow:
       y = uow.repo.get(x.ids)
       do_action_two(x, y)
       uow.commit()

def do_action_two(x, y):
    ...

which is basically like the functional-core/imperative shell mentioned in the book.

@daniel-butler
Copy link

daniel-butler commented Dec 18, 2020

@xtaje I agree with you nested unit of works don't have much to do with multithreading race conditions. I totally missed the multi threading part which lead me here!

Quick backstory (if i should move this discussion somewhere else please let me know!) the deployment environment is a windows server hosting IIS and using MSSQL. Not the best situation. Because of the deployment environment bringing in other systems like Redis and Celery are not a great option, but tasks still need to be ran in the background. The application is a flask web app as the backend api and a vue js frontend. Basically every http entrypoint needs to return something to the frontend.

Attempt 1

Kicked off a background thread if the command has return_result as True a fatal flaw is the sqlalchemy session because it run into race conditions. It would be in both threads because of the nested uow above. (The nested uow was my solution to be able to write tests that would rollback all database transactions - maybe that should just be a test uow implementation?).

commands.py

"""
User and System defined Actions for the system to use.
"""
from pathlib import Path
from uuid import uuid4, UUID
from typing import Optional, List

import attr


@attr.s
class Command:
    """Base Command class used by all Commands.

    Attributes:
        id: The unique UUID of the command
        correlation_id: The id that causes the command. By default it is the id.
        causation_id: The id that the command is related to. By default it is the id.
        return_result: Whether or not to return a value back through the bus. This is normally used to return a
            result back to the http api.
    """
    id: UUID = attr.ib(factory=uuid4)
    correlation_id: UUID = attr.ib(init=False)
    causation_id: UUID = attr.ib(init=False)
    return_result: bool = attr.ib(default=False)

    def __attrs_post_init__(self):
        """Creates a unique id for the Command.

        The id is set tot he correlation and causation id because the command is the initiator.
        """
        self.correlation_id = self.causation_id = self.id

    def dict(self) -> dict:
        """Dictionary representation of the Command"""
        return attr.asdict(self)

messagebus.py

from __future__ import annotations
import logging
import threading
from typing import Callable, Dict, List, Union, Type, Any, Optional

from ..domain import commands, events
from . import unit_of_work

logger = logging.getLogger(__name__)

Message = Union[commands.Command, events.Event]


class MessageBus:
    """Uses the implementation from
    [Cosmic Python](https://github.com/cosmicpython/code/blob/master/src/allocation/service_layer/messagebus.py)

    The main difference is this message bus will return results, if specified, to work better with
    the web's request -> response cycle.

    """
    def __init__(
            self,
            uow: unit_of_work.AbstractUnitOfWork,
            command_handlers: Dict[Type[commands.Command], Callable],
            event_handlers: Dict[Type[events.Event], List[Callable]],
    ) -> None:
        self.uow = uow
        self.command_handlers = command_handlers
        self.event_handlers = event_handlers
        self.queue = []

    def handle(self, message: Message) -> Optional[Any]:
        if isinstance(message, commands.Command) and message.return_result:
            return self.handle_command_with_return(message)
        else:
            self.queue = [message]
            self.work_the_queue()

    def work_the_queue_in_the_background(self):
        task = threading.Thread(target=self.work_the_queue)
        task.start()

    def work_the_queue(self):
        while self.queue:
            msg = self.queue.pop(0)

            if isinstance(msg, events.Event):
                self.handle_event(msg)

            elif isinstance(msg, commands.Command):
                self.handle_command(msg)
            else:
                raise Exception(f"{msg} is not an Event or Command!")

    def handle_event(self, event: events.Event) -> None:
        for handler in self.event_handlers[type(event)]:
            try:
                logger.debug(f'{handler.__name__} handling event: {event}')
                handler(event)
                self.queue.extend(self.uow.collect_new_events())
            except Exception as e:
                logger.exception(f'Exception handling event {event} error: {e}', exc_info=True)
                continue

    def handle_command(self, command: commands.Command) -> None:
        logger.debug(f'Handling command {command} with no return')
        try:
            handler = self.command_handlers[type(command)]
            logger.debug(f'{handler.__name__} handling command: {command}')
            handler(command)
            self.queue.extend(self.uow.collect_new_events())
        except Exception as e:
            logger.exception(f'Exception handling command {command} error: {e}', exc_info=True)
            raise e

    def handle_command_with_return(self, command: commands.Command) -> Any:
        logger.debug(f'Handling command {command} with a return value')
        try:
            handler = self.command_handlers[type(command)]
            logger.debug(f'{handler.__name__} handling command: {command}')
            result = handler(command)

            # Because we want a result returned we need to kick off the event processing
            self.queue.extend(self.uow.collect_new_events())
            self.work_the_queue_in_the_background()

            # Return the result once completed
            return result
        except Exception as e:
            logger.exception(f'Exception handling command {command} error: {e}', exc_info=True)
            raise e

Attempt 2

After reviewing how the redis entrypoint works, the handlers kick off a separate subprocess hooking into a cli entrypoint. This solved the problem of using a sqlalchemy session across threads and also not losing events in the main process, but it did mean that tests which keep track of and rollback transactions through a single sqlalchemy session broke.

messagebus.py

Kicking off tasks to the background is moved to the handlers like a redis event publisher.

from __future__ import annotations
import logging
import threading
from typing import Callable, Dict, List, Union, Type, Any, Optional

from ..domain import commands, events
from . import unit_of_work

logger = logging.getLogger(__name__)

Message = Union[commands.Command, events.Event]


class MessageBus:
    """Uses the implementation from
    [Cosmic Python](https://github.com/cosmicpython/code/blob/master/src/allocation/service_layer/messagebus.py)

    The main difference is this message bus will return results, if specified, to work better with
    the web's request -> response cycle.

    """
    def __init__(
            self,
            uow: unit_of_work.AbstractUnitOfWork,
            command_handlers: Dict[Type[commands.Command], Callable],
            event_handlers: Dict[Type[events.Event], List[Callable]],
    ) -> None:
        self.uow = uow
        self.command_handlers = command_handlers
        self.event_handlers = event_handlers
        self.queue = []

    def handle(self, message: Message) -> Optional[Any]:
        if isinstance(message, commands.Command) and message.return_result:
            return self.handle_command_with_return(message)
        self.queue = [message]
        self.work_the_queue()

    def work_the_queue(self):
        while self.queue:
            msg = self.queue.pop(0)

            if isinstance(msg, events.Event):
                self.handle_event(msg)

            elif isinstance(msg, commands.Command):
                self.handle_command(msg)
            else:
                raise Exception(f"{msg} is not an Event or Command!")

    def handle_event(self, event: events.Event) -> None:
        for handler in self.event_handlers[type(event)]:
            try:
                logger.debug(f'{handler.__name__} handling event: {event}')
                handler(event)
                self.queue.extend(self.uow.collect_new_events())
            except Exception as e:
                logger.exception(f'Exception handling event {event} error: {e}', exc_info=True)
                continue

    def handle_command(self, command: commands.Command) -> None:
        logger.debug(f'Handling command {command} with no return')
        try:
            handler = self.command_handlers[type(command)]
            logger.debug(f'{handler.__name__} handling command: {command}')
            handler(command)
            self.queue.extend(self.uow.collect_new_events())
        except Exception as e:
            logger.exception(f'Exception handling command {command} error: {e}', exc_info=True)
            raise e

    def handle_command_with_return(self, command: commands.Command) -> Any:
        logger.debug(f'Handling command {command} with a return value')
        try:
            handler = self.command_handlers[type(command)]
            logger.debug(f'{handler.__name__} handling command: {command}')
            result = handler(command)

            # Because we want a result returned we need to kick off the event processing
            self.queue.extend(self.uow.collect_new_events())
            self.work_the_queue()

            # Return the result once completed
            return result
        except Exception as e:
            logger.exception(f'Exception handling command {command} error: {e}', exc_info=True)
            raise e

handlers.py

The handler essentially uses the projects cli entrypoints to "publish" the events too. As a side note subprocess.Popen() was used because subprocess.run() blocks until the subprocess to completed.

import subprocess
from ..domain import commands, events, model
from .unit_of_work import AbstractUnitOfWork


def import_payables(cmd: commands.GenerateData, uow: AbstractUnitOfWork) -> Path:
    with uow:
        important_data= model.ImportantData.generate(cmd)
        uow.repo.add(important_data)
        entries_folder = important_data.create_pass_through_entries(save_location=cmd.save_location)  # event is created here
        uow.commit()
        return entries_folder

def imports_received(event: events.DataReceived) -> None:
    subprocess.Popen([
        'project', 'add-info', str(event.reference), '--hot-folder-location', str(event.hot_folder_location),
        '--causation-id', str(event.causation_id), '--correlation-id', str(event.correlation_id),
    ])

Because these is related to race conditions I was hoping to get feedback or give ideas to others if they are running into the same sort of issue.

@hjwp
Copy link
Contributor

hjwp commented Dec 29, 2020

that subprocess.Popen() makes me nervous. is it a daemon process? what happens if the web app goes down when it's half-way thru running? it does feel like what you really want is some sort of async task queue, like celery. you could roll your own by persisting the events.DataReceived somehow? and then have a separate process that picks jobs off the queue...

re: needing to return things from handlers, there was a related question recently at cosmicpython/book#316 -- finding ways to avoid doing this is usually worth it!

@hjwp
Copy link
Contributor

hjwp commented Dec 29, 2020

one alternative place to ask these questions is in the DDD/CQRS slack btw https://j.mp/ddd-es-cqrs

@xtaje
Copy link
Author

xtaje commented Dec 30, 2020

Ch9-Ch11 had different message bus implementations that returned a result, before the change to CQRS in Ch12. So if it were me, I would go back to copying elements of that implementation, but still push events off to a queue. Attempt #2 is logically equivalent, but much more complicated. The difference is that the Ch11 bus is letting the event loop "run-to-completion", so you know it's done processing the command and its synchronous events.

As to the Popen and IIS, I find that concerning also. If you are unable to deploy any additional servers, you can also deploy the consumer process as an additional Windows Service on the IIS machine. For the queue, you could also deploy the broker on the IIS machine. I think Rabbit is installable on Windows, otherwise I would see what options the Windows universe provides. WSL may also let you run Redis. Finally, it's possible to add a table or db to your existing MSSQL instance to act as a queue.

@hjwp
Copy link
Contributor

hjwp commented Dec 31, 2020

Finally, it's possible to add a table or db to your existing MSSQL instance to act as a queue.

I've certainly done this. it has the advantage of being able to use the same transaction to persist your state change and to record the outstanding task. then another service/process can pop items out of the task queue table, do them, and mark them as completed as and when. the two processes are independent, so the system is more fault tolerant. the task queue process can be restarted at any time and resume where it left off...

@daniel-butler
Copy link

Sorry for the late reply all. I love the feedback! Process queues are a weak spot for me.

From what I gather the issue with Popen is the system could crash and there would be no way to pickup from where it left off? Is right and/or are there other things to be worried about?

Normally, when there is code to run in the background I setup a command line interface then call it from the Task Manager at a given interval. In the case that Popen was created, the event is created once a month from a user's web submission. The event needs to be handled immediately. It generates about 50-100 PDFs which can take about 2-3 minutes - too long for a browser to be waiting. I didn't want to create a scheduled task to run every minute of every day that seemed like overkill. Popen was the only thing I could come up with that didn't required a lot of extra software dependencies which for windows unfortunately are mostly unsupported. And we are running on server 2016 which when I looked into it didn't support WSL :(

I contemplated saving the event in a the database. As far as I know it would have the same problem as calling a command line interface from the Schedule Task once a minute. I've google around looking for how to implement this and haven't hit on anything helpful. How do you have a separate process run constantly that pops things off a database queue?

Creating a service listening for events never occurred to me. I would think it would run into the same problem as Popen which is if the system crashes it will be hard to start back where it was left off. I could be missing it.

Popen in the code above calls a command line interface into its own codebase. Which I believe is like starting another python process. It avoids the race condition I was running into using threads with a not thread safe sqlalchemy session.

@hjwp I wasn't able to use the link. It was expired when I tried to use it last month. Thank you for sending it though!

Again I appreciate the feedback!

@hjwp
Copy link
Contributor

hjwp commented Jan 22, 2021

@hjwp
Copy link
Contributor

hjwp commented Jan 22, 2021

How do you have a separate process run constantly that pops things off a database queue?

i guess it depends on where your code is deployed, and what tools the platform gives you? i've certainly used something as stupid as a cron job that runs every minute. but if you have some way of defining an "always on" service that will be restarted automatically if it crashes or if the system reboots, then you can just build a simple while True: loop to poll for new rows (maybe with a time.sleep() so it's not a busy loop lol).

@dmgolembiowski
Copy link

Here's my two 2 cents. First is that I'm not sure how the nested UoW helps with the multi-threading? It seems that every request thread should get its own UoW anyway.

Second is that I think having nested contexts like that is probably an anti-pattern. The UoW is supposed to represent a logically indivisible task that completes before you start another. So when task1 calls task2, task1 has not actually finished. Yet task2 is committing task1's results.

If task1 needs to rollback for some reason, it now cannot. And I'm unsure what exactly will happen if some other process does a RW cycle in between the commits of task2 and and task1, due to SqlAlchemy magic.

So it seems to me that you would want one function that establishes the scope for the UoW, and any other function called in that scope should just operate on domain objects, like this:

def do_action_one(x, uow):
   with uow:
       y = uow.repo.get(x.ids)
       do_action_two(x, y)
       uow.commit()

def do_action_two(x, y):
    ...

which is basically like the functional-core/imperative shell mentioned in the book.

Just a thought, but I suspect that nesting units of work lets the system have built-in idempotent shield of sorts. Suppose that there was a constraint that says "you can spawn Kind::A of all your various units of work", however there can only ever be one Kind::A at a time. Then, within the Kind::A UoW task, its job is to poll on the status of something that's slow (i.e. Kind::S ) -- and bad to duplicate.

Maybe this is what they had in mind?

sqlalchemy-bot pushed a commit to sqlalchemy/sqlalchemy that referenced this issue Dec 27, 2021
the discussion at #7387 refers to a condition that seems
to happen in the wild also, such as [1] [2] [3], it's not
entirely clear why this specific spot is how this occurs,
however it's maybe that when the connection is being acquired
from the pool, under load there might be a wait on the connection
pool, leading to more time for another errant thread to be
calling .close(), just a theory.

in this patch we propose using decorators and context managers
along with declarative state declarations to block reentrant
or concurrent calls to methods that conflict with expected
state changes.

The :class:`_orm.Session` (and by extension :class:`.AsyncSession`) now has
new state-tracking functionality that will proactively trap any unexpected
state changes which occur as a particular transactional method proceeds.
This is to allow situations where the :class:`_orm.Session` is being used
in a thread-unsafe manner, where event hooks or similar may be calling
unexpected methods within operations, as well as potentially under other
concurrency situations such as asyncio or gevent to raise an informative
message when the illegal access first occurs, rather than passing silently
leading to secondary failures due to the :class:`_orm.Session` being in an
invalid state.

[1] https://stackoverflow.com/questions/25768428/sqlalchemy-connection-errors
[2] https://groups.google.com/g/sqlalchemy/c/n5oVX3v4WOw
[3] cosmicpython/code#23

Fixes: #7433
Change-Id: I699b935c0ec4e5a63f12cf878af6f7a92a30a3aa
@jalvespinto
Copy link

Hi. Just wondering if there are any news on this subject. tks

@echoboomer
Copy link

echoboomer commented Jun 20, 2022

I don't know if anyone is still looking into this, and my situation may be different than others, but I was able to resolve mine by simply using scoped_session. I'm building a single-page application using React backed by a Flask-based API fronted by waitress.

I have a settings page that makes multiple concurrent calls on each load to several API endpoints which each perform a db read. I was constantly getting one of these errors:

'NoneType' object has no attribute 'twophase
...object cannot be converted to 'persistent' state

I leveraged this advice and made two changes to my database code:

First, Session is now created thusly:

session_factory = sessionmaker(db, autocommit=False, autoflush=False)
Session = scoped_session(session_factory)

After every database operation, I added Session.remove():

def something(name: str):
    try:
        ....
        Session.add(something)
        Session.commit()
    except Exception as error:
        logger.error(f"Create failed for {name}: {error}")
        Session.rollback()
    finally:
        Session.close()
        Session.remove()

It's entirely possible this is not a proper fit for everyone, but for my simple use case this made a world of difference and I don't encounter these errors anymore. I'm ignoring complexity of higher-level threading and different use cases, but this was the fix I needed.

@nomhoi
Copy link

nomhoi commented Apr 11, 2023

I have completed refactoring: https://github.com/nomhoi/cosmicpython-fastapi
Try to repeat this issue under this fully asynchronous variant.

@cglace
Copy link

cglace commented Jun 7, 2023

I solved this by wrapping the dependencies with a Depends class. We have been using this solution on a large project receiving ~100k messages daily and have yet to run into any issues.

def bootstrap(
    uow: unit_of_work.AbstractUnitOfWork = Depends(unit_of_work.SqlAlchemyUnitOfWork),
    ...
):
def Depends(dependency, *args, use_cache=False, initialize=True):
    return Dependency(dependency, *args, use_cache=use_cache, initialize=initialize)


class Dependency:
    def __init__(
        self, dependency: Optional[Callable[..., Any]] = None, *args, use_cache: bool = True, initialize: bool = True
    ):
        self.initalize = initialize
        self.args = args
        self.dependency = dependency
        self.use_cache = use_cache

    def load_dep(self, **kwargs):
        if not self.initalize:
            return self.dependency
        if callable(self.dependency):
            args = self.args
            return self.dependency(*args, **kwargs)
        else:
            return self.dependency

    def __repr__(self) -> str:
        attr = getattr(self.dependency, "__name__", type(self.dependency).__name__)
        cache = "" if self.use_cache else ", use_cache=False"
        return f"{self.__class__.__name__}({attr}{cache})"

Then I altered get_dependencies to check if the dep should be initialized.

class MessageBus:

  ...

  def get_dependencies(self, func, dependencies):
    params = inspect.signature(func).parameters
    deps = {
        name: dependency
        for name, dependency in dependencies.items()
        if name in params
    }
    
    initialized_deps = dict()
    for key, dep in deps.items():
        if isinstance(dep, Dependency):
            if dep.use_cache:
                if key in self.dep_cache:
                    initialized_deps[key] = self.dep_cache[key]
                else:
                    downstream_deps = dict()
                    if callable(dep.dependency):
                        downstream_deps = self.get_dependencies(dep.dependency, self.dependencies)
                    initialized_deps[key] = dep.load_dep(**downstream_deps)
            else:
                downstream_deps = dict()
                if callable(dep.dependency):
                    downstream_deps = self.get_dependencies(dep.dependency, self.dependencies)
                initialized_deps[key] = dep.load_dep(**downstream_deps)
        else:
            initialized_deps[key] = dep
    
    return initialized_deps

You can ignore the loading of downstream dependencies, but I find it useful for services requiring their dependencies.

@dbaber
Copy link

dbaber commented May 12, 2024

This is only an issue if using, for example, gunicorn with any workers whose type that is NOT sync? How terrible is it to just use X number of sync workers to avoid this whole thing?

@nightblure
Copy link

I also encountered the same error in a multi-process fastapi application. are there any ways to fix it? is scoped session suitable?

@cglace
Copy link

cglace commented Jun 21, 2024 via email

@nightblure
Copy link

nightblure commented Jun 22, 2024

hi there

im solve this problem with dependency injector and so far everything is working fine

we need only 3 things: UOW, smth like database manager (or any wrapper above sessionmaker that should be created only one time) and dependency injector

UOW implementation is quite suitable from the book, but you need to add a dependency for UOW in the form database manager. my implementation is as follows:

from typing import Iterator

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session, Session


class DbSessionManager:
    def __init__(
            self,
            *,
            db_url: str,
            echo: bool = False,
            scoped: bool = False,
            pool_size: int = 20,
            max_overflow: int = 0,
            pool_pre_ping: bool = False
    ):
        self.db_url = db_url
        self.echo = echo
        self.scoped = scoped
        self.pool_size = pool_size
        self.max_overflow = max_overflow
        self.pool_pre_ping = pool_pre_ping

        self.engine = create_engine(
            self.db_url,
            echo=self.echo,
            pool_size=self.pool_size,
            max_overflow=self.max_overflow,
            pool_pre_ping=self.pool_pre_ping
        )
        maker_args = dict(
            autocommit=False,
            autoflush=False,
            bind=self.engine
        )

        self.session_factory: sessionmaker | Session = sessionmaker(**maker_args)

        if self.scoped:
            self.session_factory = scoped_session(self.session_factory)

    def get_db_session(self) -> Session:
        """ Direct access to session"""
        db_session = self.session_factory()
        return db_session

UOW:

from dataclasses import dataclass

from sqlalchemy.orm import Session


@dataclass(slots=True, kw_only=True)
class UnitOfWork:
    db_session_manager: DbSessionManager
    some_dao: DetectionsRepository | None = None

    _db_session: Session | None = None

    def _init(self):
        session = self.db_session_manager.session_factory()
        self.some_dao = SomeDAO(session)
        self._db_session = session

    @property
    def db_session(self) -> SQLADbSession:
        if self._db_session is None:
            raise Exception(f'Unit of work should be used ONLY with context manager!')
        return self._db_session

    def commit(self):
        self._db_session.commit()

    def __enter__(self):
        self._init()
        return self

    def rollback(self):
        self._db_session.rollback()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_type is not None:
            self.rollback()

        self.close()

        if self.db_session_manager.scoped:
            self.db_session_manager.session_factory.remove()

    def close(self):
        self._db_session.close()

next step: declare container with UOW and db manager (look for a provider that will provide the desired lifecycle to these objects):

class DIContainer(containers.DeclarativeContainer):
    """
    https://python-dependency-injector.ets-labs.org/providers/singleton.html
    https://python-dependency-injector.ets-labs.org/providers/callable.html

    https://python-dependency-injector.ets-labs.org/examples/fastapi.html
    https://python-dependency-injector.ets-labs.org/examples/fastapi-sqlalchemy.html
    https://python-dependency-injector.ets-labs.org/examples/application-single-container.html

    https://python-dependency-injector.ets-labs.org/wiring.html#
    """
    # BEFORE START APP WE NEED TO EXPLICIT CALL DIContainer.wire() METHOD!
    wiring_config = containers.WiringConfiguration(packages=wiring_packages, auto_wire=False)
    your_settings = providers.Singleton(YourSettingsClass)

    db_session_manager = providers.Singleton(
        DbSessionManager,
        db_url=your_settings.provided.database,
        echo=your_settings.provided.sqla_echo,
        pool_size=20,
        max_overflow=0,
        pool_pre_ping=True,
        scoped=False
    )

    uow = providers.Factory(UnitOfWork, db_session_manager=db_session_manager)

and now you can implement this in your api layer like this (fastapi example):

from dependency_injector.wiring import Provide, inject
from fastapi import APIRouter, Depends


router = APIRouter(prefix='/some_resource', tags=['some_resource'])

@router.get('', response_model=YourResponse)
@inject
def get_smth(uow=Depends(Provide['uow'])):
    with uow:
        query = uow.db_session.query(...)...
        # or uow.some_dao.do_something(...)
    return ...

I think this is good, because the life cycle of objects is controlled by the DI container and a unique UOW instance will be created for each call to your api while di container stores sessionmaker as a single instance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests