Skip to content

Reference

shared.transaction.CommitInTransactionError

Bases: TransactionError

Exception raised when an explicit commit is detected in a transaction block

shared.transaction.Propagation

Bases: Enum

Transaction propagation behaviors

NESTED class-attribute instance-attribute

NESTED = auto()

Create a new nested transaction within the current one

READ_ONLY class-attribute instance-attribute

READ_ONLY = auto()

Create a read-only transaction using the read-only engine

REQUIRED class-attribute instance-attribute

REQUIRED = auto()

Use existing transaction if available, create a new one if not

REQUIRES_NEW class-attribute instance-attribute

REQUIRES_NEW = auto()

Always create a new transaction, suspend current if it exists

shared.transaction.ReadOnlySessionFactory

ReadOnlySessionFactory()

Bases: ISessionFactory

Factory to build read-only sessions

Source code in shared/transaction/session.py
def __init__(self) -> None:
    self.read_only_session_maker: Callable[..., ReadOnlySession] | None = None

__call__

__call__(**kwargs)
Source code in shared/transaction/session.py
@override
def __call__(self, **kwargs) -> ReadOnlySession:  # type: ignore[no-untyped-def]
    from shared.models.orm.sqlalchemy import _build_session_options

    session_maker = sessionmaker(
        bind=get_engine(EngineType.READ_ONLY),
        **_build_session_options({"autoflush": False}),
        class_=ReadOnlySession,
    )

    return session_maker(**kwargs)  # type: ignore[no-any-return]

read_only_session_maker instance-attribute

read_only_session_maker = None

shared.transaction.SessionFactory

SessionFactory()

Bases: ISessionFactory

Factory to build sessions

Source code in shared/transaction/session.py
def __init__(self) -> None:
    self.session_maker: Callable[..., Session] | None = None

__call__

__call__(**kwargs)
Source code in shared/transaction/session.py
@override
def __call__(self, **kwargs) -> Session:  # type: ignore[no-untyped-def]
    from shared.models.orm.sqlalchemy import _build_session_options

    session_maker = sessionmaker(
        bind=get_engine(EngineType.PRIMARY),
        **_build_session_options({}),
    )

    # TODO: disable autobegin by default
    return session_maker(**kwargs)  # type: ignore[no-any-return]

session_maker instance-attribute

session_maker = None

shared.transaction.TransactionError

Bases: Exception

Base class for transaction exceptions

shared.transaction.USE_GLOBAL_SESSION module-attribute

USE_GLOBAL_SESSION = True

shared.transaction.UnitOfWork

UnitOfWork()

Building block to provide transaction context in a unit of work fashion

Source code in shared/transaction/unit_of_work.py
def __init__(self) -> None:
    self.transaction_ctx: AbstractContextManager[Session]
    self.session: Session

__enter__

__enter__()
Source code in shared/transaction/unit_of_work.py
def __enter__(self) -> Self:
    self.transaction_ctx = transaction()
    self.session = self.transaction_ctx.__enter__()
    return self

__exit__

__exit__(exc_type, exc_value, traceback)
Source code in shared/transaction/unit_of_work.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    self.transaction_ctx.__exit__(exc_type, exc_value, traceback)

session instance-attribute

session

transaction_ctx instance-attribute

transaction_ctx

shared.transaction.context

TransactionContext

TransactionContext(
    use_global_session=USE_GLOBAL_SESSION,
    session_factory=None,
    read_only_session_factory=None,
    producer=None,
)

State management for the transaction context

Session management

Maintain a session stack, keeping track of nested transaction contexts.

Commit guard

Protect against commit inside a transaction context while allowing explicit bypass.

Event management

Maintain events associated with a particular transaction context. Allow to dispatch them when the associated transaction is committed. Event state machine is as follow:

stateDiagram-v2
    [*] --> Pending: Register
    Pending --> Approved: Approve
    Pending --> [*]: Clear
    Approved --> [*]: Dispatch
Hold "Alt" / "Option" to enable pan & zoom
Hold "Alt" / "Option" to enable pan & zoom
  • Register an event to be dispatched on success of the current transaction (public)
  • Approve when the context exits without exceptions (internal)
  • Clear when the context exits with exception (internal)
  • Dispatch when the associated transaction is committed (internal)
Source code in shared/transaction/context.py
def __init__(
    self,
    use_global_session: bool = USE_GLOBAL_SESSION,
    session_factory: SessionFactory | None = None,
    read_only_session_factory: ReadOnlySessionFactory | None = None,
    producer: Producer | None = None,
):
    self.producer: Producer = producer or Producer(broker=get_message_broker())
    self.use_global_session: bool = use_global_session
    self.depth: int = 0
    self.is_in_read_only: bool = False
    self.session_factory: ISessionFactory = session_factory or SessionFactory()  # noqa: ALN079
    self.global_session: Session | None = None
    self.read_only_session_factory: ISessionFactory = (
        read_only_session_factory or ReadOnlySessionFactory()  # noqa: ALN079
    )
    self._session_stack: deque[Session] = deque()
    self._session_commit_guards: defaultdict[Session, int] = defaultdict(int)
    self._commit_guard_bypass: bool = False
    self._pending_session_events: defaultdict[
        tuple[Session, int], list[Message]
    ] = defaultdict(list)
    self._session_events: defaultdict[Session, list[Message]] = defaultdict(list)

approve_pending_events

approve_pending_events()

Approve all pending events

Source code in shared/transaction/context.py
def approve_pending_events(self) -> None:
    """Approve all pending events"""
    if not self.current_session:
        raise ValueError("No current session")

    logger.debug(
        f"Approving all pending events for session {id(self.current_session)} at depth {self.depth}"
    )
    if self._pending_session_events[(self.current_session, self.depth)]:
        self._session_events[self.current_session].extend(
            self._pending_session_events[(self.current_session, self.depth)]
        )

    if (self.current_session, self.depth) in self._pending_session_events:
        del self._pending_session_events[(self.current_session, self.depth)]

clear_pending_events

clear_pending_events()

Clear all pending events

Source code in shared/transaction/context.py
def clear_pending_events(self) -> None:
    """Clear all pending events"""
    if not self.current_session:
        raise ValueError("No current session")

    logger.debug(
        f"Clearing all pending events for session {id(self.current_session)} at depth {self.depth}"
    )
    if (self.current_session, self.depth) in self._pending_session_events:
        del self._pending_session_events[(self.current_session, self.depth)]

commit_guard

commit_guard(session)

Context manager setting a commit guard

Source code in shared/transaction/context.py
@contextmanager
def commit_guard(self, session: Session) -> Iterator[None]:
    """Context manager setting a commit guard"""
    if self._session_commit_guards[session] == 0:
        assert not sqlalchemy_event.contains(
            session, "before_commit", self._raise_on_commit
        ), "Listener already setup for session"
        sqlalchemy_event.listen(session, "before_commit", self._raise_on_commit)
    self._session_commit_guards[session] += 1
    try:
        yield
    finally:
        self._session_commit_guards[session] -= 1
        if self._session_commit_guards[session] == 0:
            sqlalchemy_event.remove(session, "before_commit", self._raise_on_commit)
            del self._session_commit_guards[session]

commit_guard_bypass

commit_guard_bypass()

Context manager to allow bypassing the commit guard

Source code in shared/transaction/context.py
@contextmanager
def commit_guard_bypass(self) -> Iterator[None]:
    """Context manager to allow bypassing the commit guard"""
    if self._commit_guard_bypass:
        raise ValueError("Guard already bypassed")

    self._commit_guard_bypass = True
    try:
        yield
    finally:
        self._commit_guard_bypass = False

create_session

create_session(
    read_only=False, disable_expire_on_commit=False
)

Add a new session on the stack

Source code in shared/transaction/context.py
def create_session(
    self, read_only: bool = False, disable_expire_on_commit: bool = False
) -> Session:
    """Add a new session on the stack"""
    if self.is_in_read_only:
        raise ValueError("Cannot create a session on top of a read-only one")

    session: Session
    if read_only:
        session = self.read_only_session_factory()
        self.is_in_read_only = True
    elif disable_expire_on_commit:
        session = self.session_factory(expire_on_commit=False)
    else:
        session = self.session_factory()
    self._session_stack.append(session)

    return session

current_session property

current_session

Session that is on top of the stack

depth instance-attribute

depth = 0

global_session instance-attribute

global_session = None

is_in_read_only instance-attribute

is_in_read_only = False

pop_session

pop_session()

Pop a session from the stack

Source code in shared/transaction/context.py
def pop_session(self) -> Session:
    """Pop a session from the stack"""
    if not self._session_stack:
        raise ValueError("No session to pop from empty stack")

    session = self._session_stack.pop()

    if session in self._session_commit_guards:
        raise ValueError("Cannot pop a session with active commit guard")

    if session in self._session_events:
        raise ValueError("Cannot pop a session with pending events")

    # reset the read-only flag, no need to check, a read-only session is necessarily
    # top of stack as nesting is prohibited
    self.is_in_read_only = False

    return session

producer instance-attribute

producer = producer or Producer(broker=get_message_broker())

publish_events

publish_events()

Dispatch all approved events

Source code in shared/transaction/context.py
def publish_events(self) -> None:
    """Dispatch all approved events"""
    if not self.current_session:
        raise ValueError("No current session")

    assert (self.current_session, self.depth) not in self._pending_session_events, (
        "Some events are pending"
    )

    events = self._session_events[self.current_session]
    del self._session_events[self.current_session]

    logger.debug(
        f"Publishing approved events for session {id(self.current_session)}"
    )
    for event in events:
        logger.debug(f"Publishing approved event {event}")
        # no need to try/catch here, it's done in the publish already
        self.producer.publish(event)

read_only_session_factory instance-attribute

read_only_session_factory = (
    read_only_session_factory or ReadOnlySessionFactory()
)

register_event

register_event(event)

Register an event

Source code in shared/transaction/context.py
def register_event(self, event: Message) -> None:
    """Register an event"""
    if not self.current_session:
        raise ValueError("No current session")

    logger.debug(
        f"Registerng event {event} as pending for session {id(self.current_session)} at depth {self.depth}",
    )
    self._pending_session_events[(self.current_session, self.depth)].append(event)

remove_global_session

remove_global_session()

Close the global session and make it ready to be created again

Source code in shared/transaction/context.py
def remove_global_session(self) -> None:
    """Close the global session and make it ready to be created again"""
    if not self.use_global_session:
        logger.warning("Not removing a global session that we're not using!")
        return

    if self.global_session is None:
        logger.info("Attempted to remove a global session that was not present")
        return

    self.global_session.close()
    self.global_session = None

session_factory instance-attribute

session_factory = session_factory or SessionFactory()

use_global_session instance-attribute

use_global_session = use_global_session

USE_GLOBAL_SESSION module-attribute

USE_GLOBAL_SESSION = True

get_transaction_context

get_transaction_context()

Get the transaction context, creating it if needed

Source code in shared/transaction/context.py
def get_transaction_context() -> TransactionContext:
    """Get the transaction context, creating it if needed"""
    ctx = _TRANSACTION_CONTEXT.get()
    if ctx is None:
        ctx = TransactionContext()
        _TRANSACTION_CONTEXT.set(ctx)
    return ctx

mock_transaction_context

mock_transaction_context(
    broker=None, read_only_session_factory=None
)

Utility to temporarily change the behavior of the context

This is useful for testing purposes.

Source code in shared/transaction/context.py
@contextmanager
def mock_transaction_context(
    broker: Broker | None = None,
    read_only_session_factory: ISessionFactory | None = None,
) -> Generator[None, None, None]:
    """
    Utility to temporarily change the behavior of the context

    This is useful for testing purposes.
    """
    ctx = get_transaction_context()
    previous_producer = ctx.producer
    previous_read_only_session_factory = ctx.read_only_session_factory

    if broker:
        ctx.producer = Producer(broker=broker)

    if read_only_session_factory:
        ctx.read_only_session_factory = read_only_session_factory

    yield

    if broker:
        ctx.producer = previous_producer

    if read_only_session_factory:
        ctx.read_only_session_factory = previous_read_only_session_factory

shared.transaction.core

Propagation

Bases: Enum

Transaction propagation behaviors

NESTED class-attribute instance-attribute

NESTED = auto()

Create a new nested transaction within the current one

READ_ONLY class-attribute instance-attribute

READ_ONLY = auto()

Create a read-only transaction using the read-only engine

REQUIRED class-attribute instance-attribute

REQUIRED = auto()

Use existing transaction if available, create a new one if not

REQUIRES_NEW class-attribute instance-attribute

REQUIRES_NEW = auto()

Always create a new transaction, suspend current if it exists

register_event

register_event(event)

Register an event within the current transaction context

Source code in shared/transaction/core.py
def register_event(event: Message) -> None:
    """Register an event within the current transaction context"""
    ctx = get_transaction_context()
    ctx.register_event(event)

transaction

transaction(
    propagation=Propagation.REQUIRED, force_manage=False
)

Provide a transactional context manager for database operations.

This function creates and manages database transactions with various propagation behaviors. It integrates with the existing current_session for backward compatibility while providing explicit transaction management.

Parameters:

Name Type Description Default
propagation Propagation

The transaction propagation behavior. Defaults to REQUIRED. - REQUIRED: Use existing transaction if available, create new if not - REQUIRES_NEW: Always create a new transaction, suspend current if exists - NESTED: Create a nested transaction within the current one - READ_ONLY: Create a read-only transaction using read-only engine

REQUIRED
force_manage bool

Whether to take control of existing transactions. Defaults to False. When True, will commit/rollback existing transactions that weren't started by this context manager. Use with caution during migration from legacy code.

False

Returns:

Type Description
Iterator[Session]

Iterator[Session]: A SQLAlchemy session within the transaction context

Raises:

Type Description
InReadOnlyTransactionError

When trying to nest transactions in read-only context

NotImplementedError

When using an unsupported propagation type

Examples:

Basic usage with context manager:

>>> with transaction() as session:
...     session.add(User(name="Marcel"))

Using different propagation behaviors:

>>> with transaction(propagation=Propagation.REQUIRES_NEW) as session:
...     session.add(User(name="Alice"))  # always in its own transaction
>>> with transaction(propagation=Propagation.READ_ONLY) as session:
...     users = session.scalars(select(User)).all()  # read-only operations

Force managing existing transactions (migration helper):

>>> with transaction(force_manage=True) as session:
...     session.add(User(name="Bob"))  # will commit existing transaction
Note
  • Transactions are automatically committed on successful context exit
  • Transactions are automatically rolled back on exceptions
  • Nested contexts share the same transaction unless REQUIRES_NEW is used
  • Events registered via register_event() are dispatched on successful commit
  • In dry-run mode, all transactions are rolled back regardless of success
  • Read-only transactions are always rolled back to prevent any data changes
Source code in shared/transaction/core.py
@contextmanager
def transaction(
    propagation: Propagation = Propagation.REQUIRED, force_manage: bool = False
) -> Iterator[Session]:
    """
    Provide a transactional context manager for database operations.

    This function creates and manages database transactions with various propagation
    behaviors. It integrates with the existing `current_session` for backward
    compatibility while providing explicit transaction management.

    Args:
        propagation: The transaction propagation behavior. Defaults to REQUIRED.
            - REQUIRED: Use existing transaction if available, create new if not
            - REQUIRES_NEW: Always create a new transaction, suspend current if exists
            - NESTED: Create a nested transaction within the current one
            - READ_ONLY: Create a read-only transaction using read-only engine
        force_manage: Whether to take control of existing transactions. Defaults to False.
            When True, will commit/rollback existing transactions that weren't started
            by this context manager. Use with caution during migration from legacy code.

    Returns:
        Iterator[Session]: A SQLAlchemy session within the transaction context

    Raises:
        InReadOnlyTransactionError: When trying to nest transactions in read-only context
        NotImplementedError: When using an unsupported propagation type

    Examples:
        Basic usage with context manager:

        >>> with transaction() as session:
        ...     session.add(User(name="Marcel"))

        Using different propagation behaviors:

        >>> with transaction(propagation=Propagation.REQUIRES_NEW) as session:
        ...     session.add(User(name="Alice"))  # always in its own transaction

        >>> with transaction(propagation=Propagation.READ_ONLY) as session:
        ...     users = session.scalars(select(User)).all()  # read-only operations

        Force managing existing transactions (migration helper):

        >>> with transaction(force_manage=True) as session:
        ...     session.add(User(name="Bob"))  # will commit existing transaction

    Note:
        - Transactions are automatically committed on successful context exit
        - Transactions are automatically rolled back on exceptions
        - Nested contexts share the same transaction unless REQUIRES_NEW is used
        - Events registered via `register_event()` are dispatched on successful commit
        - In dry-run mode, all transactions are rolled back regardless of success
        - Read-only transactions are always rolled back to prevent any data changes
    """
    ctx: TransactionContext = get_transaction_context()

    # session must always be set at some point below
    session: Session

    # transaction is only set by the context manager that starts it
    transaction: SessionTransaction | None = None

    # track if we created a new session and if it's read-only
    is_new_session: bool = False

    # prevent nesting in read-only context
    if ctx.is_in_read_only:
        raise InReadOnlyTransactionError()

    match propagation:
        case Propagation.REQUIRED:
            # use the current session if we have one, create one if needed
            if ctx.current_session is not None:
                session = ctx.current_session

                # special handling of a non-clean outermost context
                if (
                    ctx.use_global_session
                    and ctx.depth == 0
                    and session.in_transaction()
                ):
                    if force_manage:
                        # take control of the transaction by faking that we started it
                        transaction = session.get_transaction()
                        logger.info("Forcing management of existing outer transaction")
                    else:
                        # check if we altered the session (did more than querying)
                        if session.new or session.deleted or session.dirty:
                            logger.warning(
                                "`global_session` is altered when entering transaction block!"
                                " ⚠️ As it is, this transaction block will not commit/rollback, you will have to handle that!"
                                " Set `force_manage` to True for the transaction block to handle the commit/rollback"
                            )
                        else:
                            logger.debug(
                                "`global_session` is already in a transaction!"
                                " ⚠️ As it is, this transaction block will not commit/rollback, you will have to handle that!"
                                " Set `force_manage` to True for the transaction block to handle the commit/rollback"
                            )

                # start transaction if we are not in one
                if ctx.use_global_session and not session.in_transaction():
                    transaction = session.begin()

                # we should already be in a transaction
                # FIXME: we can be in this situation legitimately if
                # - we don't use a global session but have a current session (e.g. REQUIRED)
                # - and we're currently publishing events for this transaction context
                # - the sync subscriber opens a transaction with REQUIRED propagation
                # IMO we should forbid sync subscribers to use anything else than REQUIRES_NEW or READ_ONLY propagations
                # to avoid such situations and even when using a global session actually
                # NOTE: This is the same assert below for NESTED propagation
                assert session.in_transaction(), (
                    "Re-using a session without a transaction"
                )
            else:
                # create a new session and transaction
                session = ctx.create_session()
                transaction = session.begin()
                is_new_session = True
        case Propagation.REQUIRES_NEW:
            # create a new session and transaction
            session = ctx.create_session(disable_expire_on_commit=True)
            transaction = session.begin()
            is_new_session = True
        case Propagation.NESTED:
            if ctx.current_session is not None:
                session = ctx.current_session

                # start transaction if we are not in one
                if ctx.use_global_session and not session.in_transaction():
                    transaction = session.begin()
                else:
                    # we should already be in a transaction
                    assert session.in_transaction(), (
                        "Re-using a session without a transaction"
                    )

                    # start a new nested transaction
                    transaction = session.begin_nested()
            else:
                # create a new session and transaction
                session = ctx.create_session()
                transaction = session.begin()
                is_new_session = True
        case Propagation.READ_ONLY:
            # create a new read-only session
            session = ctx.create_session(read_only=True)
            transaction = session.begin()
            is_new_session = True
        case _:
            raise NotImplementedError("Propagation not implemented yet")

    ctx.depth += 1

    logger.debug(f"Entering transaction context at depth {ctx.depth}")
    try:
        with ctx.commit_guard(session):
            yield session
    except Exception:
        logger.debug("Exception detected within context")
        ctx.clear_pending_events()

        # only rollback if we started the transaction
        if transaction is not None:
            logger.debug(f"Rolling back transaction at depth {ctx.depth}")
            transaction.rollback()
        raise
    else:
        ctx.approve_pending_events()

        # only commit (or rollback in read-only/dry-run) if we started the transaction
        if transaction is not None:
            if ctx.is_in_read_only:
                logger.debug(  # type: ignore[unreachable]
                    f"Skipping commit/rollback for read-only transaction at depth {ctx.depth}"
                )
            elif is_dry_run():
                logger.debug(
                    f"Rolling back for dry-run transaction at depth {ctx.depth}"
                )
                transaction.rollback()
            else:
                logger.debug(f"Committing transaction at depth {ctx.depth}")
                with ctx.commit_guard_bypass():
                    transaction.commit()

            # dispatch events either way
            if transaction.parent is None:
                ctx.publish_events()
        else:
            # we should flush for changes to be visible in the session's subsequent queries
            logger.debug(f"Flushing session at depth {ctx.depth}")
            session.flush()
    finally:
        logger.debug(f"Exiting transaction context at depth {ctx.depth}")

        # remove from stack if we added it
        if is_new_session:
            popped = ctx.pop_session()
            logger.debug(f"Popping session at depth {ctx.depth}")

            assert popped is session, "Session stack corrupted"

            # ensure we cleaned up properly
            if propagation == Propagation.READ_ONLY or (
                (not ctx.use_global_session or ctx.depth > 0)
                and not popped.in_transaction()
            ):
                logger.debug(f"Closing session at depth {ctx.depth}")
                popped.close()

        ctx.depth -= 1

transactional

transactional(
    func: Callable[Concatenate[Session, P], T],
) -> Callable[P, T]
transactional(
    *,
    propagation: Propagation = Propagation.REQUIRED,
    force_manage: bool = False
) -> Callable[
    [Callable[Concatenate[Session, P], T]], Callable[P, T]
]
transactional(
    func=None,
    propagation=Propagation.REQUIRED,
    force_manage=False,
)

Decorator that provides transactional context around a function

The decorated function must accept a session keyword arguments that will be injected by the decorator.

@transactional ... def my_function(session: Session, arg1, arg2): ... # do something with the session

Source code in shared/transaction/core.py
def transactional(
    func: Callable[Concatenate[Session, P], T] | None = None,
    propagation: Propagation = Propagation.REQUIRED,
    force_manage: bool = False,
) -> Callable[P, T] | Callable[[Callable[Concatenate[Session, P], T]], Callable[P, T]]:
    """
    Decorator that provides transactional context around a function

    The decorated function must accept a `session` keyword arguments that will be
    injected by the decorator.

    >>> @transactional
    ... def my_function(session: Session, arg1, arg2):
    ...     # do something with the session
    """

    def decorator(func: Callable[Concatenate[Session, P], T]) -> Callable[P, T]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            # wrap in a transaction
            with transaction(
                propagation=propagation, force_manage=force_manage
            ) as session:
                try:
                    result = func(session, *args, **kwargs)
                    return result
                except Exception:
                    # re-raise
                    raise

        return wrapper

    # handle both @transactional and @transactional(...) syntax
    if func is None:
        return decorator

    return decorator(func)

shared.transaction.exception

CommitInTransactionError

Bases: TransactionError

Exception raised when an explicit commit is detected in a transaction block

InReadOnlyTransactionError

Bases: TransactionError

Exception raised when trying to acquire a non read-only transaction context within a read-only one

TransactionError

Bases: Exception

Base class for transaction exceptions

shared.transaction.mock_transaction_context

mock_transaction_context(
    broker=None, read_only_session_factory=None
)

Utility to temporarily change the behavior of the context

This is useful for testing purposes.

Source code in shared/transaction/context.py
@contextmanager
def mock_transaction_context(
    broker: Broker | None = None,
    read_only_session_factory: ISessionFactory | None = None,
) -> Generator[None, None, None]:
    """
    Utility to temporarily change the behavior of the context

    This is useful for testing purposes.
    """
    ctx = get_transaction_context()
    previous_producer = ctx.producer
    previous_read_only_session_factory = ctx.read_only_session_factory

    if broker:
        ctx.producer = Producer(broker=broker)

    if read_only_session_factory:
        ctx.read_only_session_factory = read_only_session_factory

    yield

    if broker:
        ctx.producer = previous_producer

    if read_only_session_factory:
        ctx.read_only_session_factory = previous_read_only_session_factory

shared.transaction.read_only_session_factory module-attribute

read_only_session_factory = ReadOnlySessionFactory()

shared.transaction.register_event

register_event(event)

Register an event within the current transaction context

Source code in shared/transaction/core.py
def register_event(event: Message) -> None:
    """Register an event within the current transaction context"""
    ctx = get_transaction_context()
    ctx.register_event(event)

shared.transaction.session

ReadOnlySessionFactory

ReadOnlySessionFactory()

Bases: ISessionFactory

Factory to build read-only sessions

Source code in shared/transaction/session.py
def __init__(self) -> None:
    self.read_only_session_maker: Callable[..., ReadOnlySession] | None = None

__call__

__call__(**kwargs)
Source code in shared/transaction/session.py
@override
def __call__(self, **kwargs) -> ReadOnlySession:  # type: ignore[no-untyped-def]
    from shared.models.orm.sqlalchemy import _build_session_options

    session_maker = sessionmaker(
        bind=get_engine(EngineType.READ_ONLY),
        **_build_session_options({"autoflush": False}),
        class_=ReadOnlySession,
    )

    return session_maker(**kwargs)  # type: ignore[no-any-return]

read_only_session_maker instance-attribute

read_only_session_maker = None

SessionFactory

SessionFactory()

Bases: ISessionFactory

Factory to build sessions

Source code in shared/transaction/session.py
def __init__(self) -> None:
    self.session_maker: Callable[..., Session] | None = None

__call__

__call__(**kwargs)
Source code in shared/transaction/session.py
@override
def __call__(self, **kwargs) -> Session:  # type: ignore[no-untyped-def]
    from shared.models.orm.sqlalchemy import _build_session_options

    session_maker = sessionmaker(
        bind=get_engine(EngineType.PRIMARY),
        **_build_session_options({}),
    )

    # TODO: disable autobegin by default
    return session_maker(**kwargs)  # type: ignore[no-any-return]

session_maker instance-attribute

session_maker = None

read_only_session_factory module-attribute

read_only_session_factory = ReadOnlySessionFactory()

session_factory module-attribute

session_factory = SessionFactory()

session_registry module-attribute

session_registry = scoped_session(session_factory)

shared.transaction.session_factory module-attribute

session_factory = SessionFactory()

shared.transaction.session_registry module-attribute

session_registry = scoped_session(session_factory)

shared.transaction.transaction

transaction(
    propagation=Propagation.REQUIRED, force_manage=False
)

Provide a transactional context manager for database operations.

This function creates and manages database transactions with various propagation behaviors. It integrates with the existing current_session for backward compatibility while providing explicit transaction management.

Parameters:

Name Type Description Default
propagation Propagation

The transaction propagation behavior. Defaults to REQUIRED. - REQUIRED: Use existing transaction if available, create new if not - REQUIRES_NEW: Always create a new transaction, suspend current if exists - NESTED: Create a nested transaction within the current one - READ_ONLY: Create a read-only transaction using read-only engine

REQUIRED
force_manage bool

Whether to take control of existing transactions. Defaults to False. When True, will commit/rollback existing transactions that weren't started by this context manager. Use with caution during migration from legacy code.

False

Returns:

Type Description
Iterator[Session]

Iterator[Session]: A SQLAlchemy session within the transaction context

Raises:

Type Description
InReadOnlyTransactionError

When trying to nest transactions in read-only context

NotImplementedError

When using an unsupported propagation type

Examples:

Basic usage with context manager:

>>> with transaction() as session:
...     session.add(User(name="Marcel"))

Using different propagation behaviors:

>>> with transaction(propagation=Propagation.REQUIRES_NEW) as session:
...     session.add(User(name="Alice"))  # always in its own transaction
>>> with transaction(propagation=Propagation.READ_ONLY) as session:
...     users = session.scalars(select(User)).all()  # read-only operations

Force managing existing transactions (migration helper):

>>> with transaction(force_manage=True) as session:
...     session.add(User(name="Bob"))  # will commit existing transaction
Note
  • Transactions are automatically committed on successful context exit
  • Transactions are automatically rolled back on exceptions
  • Nested contexts share the same transaction unless REQUIRES_NEW is used
  • Events registered via register_event() are dispatched on successful commit
  • In dry-run mode, all transactions are rolled back regardless of success
  • Read-only transactions are always rolled back to prevent any data changes
Source code in shared/transaction/core.py
@contextmanager
def transaction(
    propagation: Propagation = Propagation.REQUIRED, force_manage: bool = False
) -> Iterator[Session]:
    """
    Provide a transactional context manager for database operations.

    This function creates and manages database transactions with various propagation
    behaviors. It integrates with the existing `current_session` for backward
    compatibility while providing explicit transaction management.

    Args:
        propagation: The transaction propagation behavior. Defaults to REQUIRED.
            - REQUIRED: Use existing transaction if available, create new if not
            - REQUIRES_NEW: Always create a new transaction, suspend current if exists
            - NESTED: Create a nested transaction within the current one
            - READ_ONLY: Create a read-only transaction using read-only engine
        force_manage: Whether to take control of existing transactions. Defaults to False.
            When True, will commit/rollback existing transactions that weren't started
            by this context manager. Use with caution during migration from legacy code.

    Returns:
        Iterator[Session]: A SQLAlchemy session within the transaction context

    Raises:
        InReadOnlyTransactionError: When trying to nest transactions in read-only context
        NotImplementedError: When using an unsupported propagation type

    Examples:
        Basic usage with context manager:

        >>> with transaction() as session:
        ...     session.add(User(name="Marcel"))

        Using different propagation behaviors:

        >>> with transaction(propagation=Propagation.REQUIRES_NEW) as session:
        ...     session.add(User(name="Alice"))  # always in its own transaction

        >>> with transaction(propagation=Propagation.READ_ONLY) as session:
        ...     users = session.scalars(select(User)).all()  # read-only operations

        Force managing existing transactions (migration helper):

        >>> with transaction(force_manage=True) as session:
        ...     session.add(User(name="Bob"))  # will commit existing transaction

    Note:
        - Transactions are automatically committed on successful context exit
        - Transactions are automatically rolled back on exceptions
        - Nested contexts share the same transaction unless REQUIRES_NEW is used
        - Events registered via `register_event()` are dispatched on successful commit
        - In dry-run mode, all transactions are rolled back regardless of success
        - Read-only transactions are always rolled back to prevent any data changes
    """
    ctx: TransactionContext = get_transaction_context()

    # session must always be set at some point below
    session: Session

    # transaction is only set by the context manager that starts it
    transaction: SessionTransaction | None = None

    # track if we created a new session and if it's read-only
    is_new_session: bool = False

    # prevent nesting in read-only context
    if ctx.is_in_read_only:
        raise InReadOnlyTransactionError()

    match propagation:
        case Propagation.REQUIRED:
            # use the current session if we have one, create one if needed
            if ctx.current_session is not None:
                session = ctx.current_session

                # special handling of a non-clean outermost context
                if (
                    ctx.use_global_session
                    and ctx.depth == 0
                    and session.in_transaction()
                ):
                    if force_manage:
                        # take control of the transaction by faking that we started it
                        transaction = session.get_transaction()
                        logger.info("Forcing management of existing outer transaction")
                    else:
                        # check if we altered the session (did more than querying)
                        if session.new or session.deleted or session.dirty:
                            logger.warning(
                                "`global_session` is altered when entering transaction block!"
                                " ⚠️ As it is, this transaction block will not commit/rollback, you will have to handle that!"
                                " Set `force_manage` to True for the transaction block to handle the commit/rollback"
                            )
                        else:
                            logger.debug(
                                "`global_session` is already in a transaction!"
                                " ⚠️ As it is, this transaction block will not commit/rollback, you will have to handle that!"
                                " Set `force_manage` to True for the transaction block to handle the commit/rollback"
                            )

                # start transaction if we are not in one
                if ctx.use_global_session and not session.in_transaction():
                    transaction = session.begin()

                # we should already be in a transaction
                # FIXME: we can be in this situation legitimately if
                # - we don't use a global session but have a current session (e.g. REQUIRED)
                # - and we're currently publishing events for this transaction context
                # - the sync subscriber opens a transaction with REQUIRED propagation
                # IMO we should forbid sync subscribers to use anything else than REQUIRES_NEW or READ_ONLY propagations
                # to avoid such situations and even when using a global session actually
                # NOTE: This is the same assert below for NESTED propagation
                assert session.in_transaction(), (
                    "Re-using a session without a transaction"
                )
            else:
                # create a new session and transaction
                session = ctx.create_session()
                transaction = session.begin()
                is_new_session = True
        case Propagation.REQUIRES_NEW:
            # create a new session and transaction
            session = ctx.create_session(disable_expire_on_commit=True)
            transaction = session.begin()
            is_new_session = True
        case Propagation.NESTED:
            if ctx.current_session is not None:
                session = ctx.current_session

                # start transaction if we are not in one
                if ctx.use_global_session and not session.in_transaction():
                    transaction = session.begin()
                else:
                    # we should already be in a transaction
                    assert session.in_transaction(), (
                        "Re-using a session without a transaction"
                    )

                    # start a new nested transaction
                    transaction = session.begin_nested()
            else:
                # create a new session and transaction
                session = ctx.create_session()
                transaction = session.begin()
                is_new_session = True
        case Propagation.READ_ONLY:
            # create a new read-only session
            session = ctx.create_session(read_only=True)
            transaction = session.begin()
            is_new_session = True
        case _:
            raise NotImplementedError("Propagation not implemented yet")

    ctx.depth += 1

    logger.debug(f"Entering transaction context at depth {ctx.depth}")
    try:
        with ctx.commit_guard(session):
            yield session
    except Exception:
        logger.debug("Exception detected within context")
        ctx.clear_pending_events()

        # only rollback if we started the transaction
        if transaction is not None:
            logger.debug(f"Rolling back transaction at depth {ctx.depth}")
            transaction.rollback()
        raise
    else:
        ctx.approve_pending_events()

        # only commit (or rollback in read-only/dry-run) if we started the transaction
        if transaction is not None:
            if ctx.is_in_read_only:
                logger.debug(  # type: ignore[unreachable]
                    f"Skipping commit/rollback for read-only transaction at depth {ctx.depth}"
                )
            elif is_dry_run():
                logger.debug(
                    f"Rolling back for dry-run transaction at depth {ctx.depth}"
                )
                transaction.rollback()
            else:
                logger.debug(f"Committing transaction at depth {ctx.depth}")
                with ctx.commit_guard_bypass():
                    transaction.commit()

            # dispatch events either way
            if transaction.parent is None:
                ctx.publish_events()
        else:
            # we should flush for changes to be visible in the session's subsequent queries
            logger.debug(f"Flushing session at depth {ctx.depth}")
            session.flush()
    finally:
        logger.debug(f"Exiting transaction context at depth {ctx.depth}")

        # remove from stack if we added it
        if is_new_session:
            popped = ctx.pop_session()
            logger.debug(f"Popping session at depth {ctx.depth}")

            assert popped is session, "Session stack corrupted"

            # ensure we cleaned up properly
            if propagation == Propagation.READ_ONLY or (
                (not ctx.use_global_session or ctx.depth > 0)
                and not popped.in_transaction()
            ):
                logger.debug(f"Closing session at depth {ctx.depth}")
                popped.close()

        ctx.depth -= 1

shared.transaction.transactional

transactional(
    func: Callable[Concatenate[Session, P], T],
) -> Callable[P, T]
transactional(
    *,
    propagation: Propagation = Propagation.REQUIRED,
    force_manage: bool = False
) -> Callable[
    [Callable[Concatenate[Session, P], T]], Callable[P, T]
]
transactional(
    func=None,
    propagation=Propagation.REQUIRED,
    force_manage=False,
)

Decorator that provides transactional context around a function

The decorated function must accept a session keyword arguments that will be injected by the decorator.

@transactional ... def my_function(session: Session, arg1, arg2): ... # do something with the session

Source code in shared/transaction/core.py
def transactional(
    func: Callable[Concatenate[Session, P], T] | None = None,
    propagation: Propagation = Propagation.REQUIRED,
    force_manage: bool = False,
) -> Callable[P, T] | Callable[[Callable[Concatenate[Session, P], T]], Callable[P, T]]:
    """
    Decorator that provides transactional context around a function

    The decorated function must accept a `session` keyword arguments that will be
    injected by the decorator.

    >>> @transactional
    ... def my_function(session: Session, arg1, arg2):
    ...     # do something with the session
    """

    def decorator(func: Callable[Concatenate[Session, P], T]) -> Callable[P, T]:
        @functools.wraps(func)
        def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
            # wrap in a transaction
            with transaction(
                propagation=propagation, force_manage=force_manage
            ) as session:
                try:
                    result = func(session, *args, **kwargs)
                    return result
                except Exception:
                    # re-raise
                    raise

        return wrapper

    # handle both @transactional and @transactional(...) syntax
    if func is None:
        return decorator

    return decorator(func)

shared.transaction.unit_of_work

UnitOfWork

UnitOfWork()

Building block to provide transaction context in a unit of work fashion

Source code in shared/transaction/unit_of_work.py
def __init__(self) -> None:
    self.transaction_ctx: AbstractContextManager[Session]
    self.session: Session

__enter__

__enter__()
Source code in shared/transaction/unit_of_work.py
def __enter__(self) -> Self:
    self.transaction_ctx = transaction()
    self.session = self.transaction_ctx.__enter__()
    return self

__exit__

__exit__(exc_type, exc_value, traceback)
Source code in shared/transaction/unit_of_work.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    self.transaction_ctx.__exit__(exc_type, exc_value, traceback)

session instance-attribute

session

transaction_ctx instance-attribute

transaction_ctx