Skip to content

SQL Model Brokers

SQL Model Brokers ⧉ are a data access pattern used to centralize all database queries and prevent common performance issues.

In the Payment Gateway component, Model Brokers are MANDATORY for all database access. This is enforced as a code quality standard for this component.

Purpose

Model Brokers serve as the single point of access for all database operations on a specific SQLAlchemy model. They solve several critical problems:

1. Prevents N+1 Queries

Without Model Brokers, developers might write code that looks simple but triggers hundreds of database queries:

# ❌ BAD: N+1 query problem
cards = session.query(Card).filter(Card.card_holder_id == holder_id).all()
for card in cards:
    print(card.current_status_log.status)  # Triggers a query for each card!

Model Brokers enforce explicit eager loading, making performance characteristics visible:

# ✅ GOOD: Explicit eager loading via Model Broker
cards = CardModelBroker.list_cards_for_card_holder(
    session,
    card_holder_id=holder_id,
)
# current_status_log is already loaded via autoload configuration
for card in cards:
    print(card.current_status_log.status)  # No additional queries!

2. Centralizes Query Logic

All queries for a model live in one place, making them:

  • Easy to find: Want to know how to fetch a card? Look in CardModelBroker
  • Easy to optimize: Manage autoload configuration in one place
  • Easy to test: All database behavior tested in one place, catching N+1 queries and edge cases
  • Consistent: Same patterns across all models

3. Enforces Best Practices

Model Brokers enforce:

  • SQLAlchemy 2.0 syntax: Using select() instead of legacy Query API
  • Explicit relationship loading: No hidden lazy loading
  • Type safety: Clear method signatures with type hints
  • Session management: Session passed explicitly, never implicit

4. Supports Testability

Model Brokers are tested using factories and the real database, without mocking:

@pytest.mark.usefixtures("db")
def test_get_card():
    # Arrange - Create test data using factories
    card = CardFactory.create(
        display_name="John Doe",
        expiration_date=date(2023, 11, 30),
        provider=PaymentServiceProvider.adyen,
    )
    current_session.commit()

    # Act - Test the actual broker method
    result = CardModelBroker.get_card(current_session, id=card.id)

    # Assert - Verify results
    assert result.display_name == "John Doe"
    assert result.expiration_date == date(2023, 11, 30)

Why no mocking?

  • Tests verify actual database behavior
  • Catches SQL issues and N+1 queries
  • Tests are more reliable and reflect production behavior
  • Factories provide realistic test data

Architecture

Base Model Broker

All Model Brokers inherit from BaseModelBroker and define:

  • model: The SQLAlchemy model class this broker operates on (required)
  • autoload: Dictionary configuring eager loading - defines which relationships to eagerly load with every query (optional)
  • lazy_loading_strategy: Protection against accidental lazy loading (optional)

Complete example:

from shared.model_brokers.base_model_broker import BaseModelBroker, LazyLoadingStrategy
from components.payment_gateway.subcomponents.cards.models.card import Card

class CardModelBroker(BaseModelBroker):
    """Centralized data access for Card model."""

    # Required: The SQLAlchemy model this broker operates on
    model = Card

    # Optional: Relationships to always eager load
    autoload = {
        "current_status_log": True,  # Always load the current status
    }

    # Optional: Raise exception on accidental lazy loading
    lazy_loading_strategy = LazyLoadingStrategy.RAISE

Important: There is exactly one broker per model. The broker centralizes all database access for that model.

Key Features

autoload Dictionary

Defines relationships that should always be eager loaded:

autoload = {
    "current_status_log": True,              # Simple relationship
    "status_history": True,                   # Collection
    "account": {                              # Nested eager loading
        "account_holder": True,
    }
}

select() Method

Returns a SQLAlchemy 2.0 select statement with autoload options:

# Default: uses the broker's autoload configuration
query = cls.select()

# Custom autoload: REPLACES (not merges with) the default autoload
query = cls.select(custom_autoload={})  # No eager loading at all
query = cls.select(custom_autoload={"account": True})  # Only load account, ignore defaults

# Then use it
card = session.execute(query.filter(Card.id == card_id)).scalar_one()

Lazy Loading Strategy

Optional protection against accidental lazy loading:

class CardModelBroker(BaseModelBroker):
    model = Card
    lazy_loading_strategy = LazyLoadingStrategy.RAISE  # Raises exception on lazy load

Implementation Guidelines

When implementing Model Brokers, follow these core rules:

  • Always accept session as first positional-only parameter: def get_card(cls, session: Session, /, ...)
  • Never commit: Use session.flush() only - let callers control transactions
  • Configure eager loading: Define commonly needed relationships in autoload dictionary
  • Use LazyLoadingStrategy.RAISE in tests: Catch accidental lazy loading during development
  • Never use current_session in broker methods: Always accept session parameter (callers can pass current_session)

Naming Conventions

Model Brokers follow strict naming conventions to ensure consistency and discoverability.

Broker Class Names

Pattern: {Model}ModelBroker

# Model: Card
class CardModelBroker(BaseModelBroker):
    ...

# Model: Account
class AccountModelBroker(BaseModelBroker):
    ...

# Model: LineOfCredit
class LineOfCreditModelBroker(BaseModelBroker):
    ...

Method Names

Method names follow semantic patterns that clearly communicate their behavior.

Quick Reference

Method Prefix Returns Use When
get_* Single model (raises if not found) You need exactly one model
find_* Optional model (None if not found) Model might not exist
list_* List of models Fetching multiple models by simple criteria
search_* List of models Multi-criteria queries with optional, combinable filters
count_* Integer count Count models without loading full data
create_* New model Creating models
update_* Updated model Modifying models
set_* None or model State transitions
terminate_* None or model Soft deletes
record_* (model, bool) Atomic idempotent creation from external events

get_* - Single Model (Required)

Returns exactly one model or raises an exception.

Pattern: get_{model}[_{qualifier}]

When to use:

  • Model must exist
  • Caller should handle NoResultFound exception
  • Most common pattern for lookups by ID or unique fields
Examples
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.get_card classmethod
get_card(session, /, id, with_status_history=False)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def get_card(
    cls,
    session: Session,
    /,
    id: UUID,
    with_status_history: bool = False,
) -> Card:
    if with_status_history:
        query = cls.select(custom_autoload=cls.autoload | {"status_history": True})
    else:
        query = cls.select()
    card: Card = session.execute(query.filter(Card.id == id)).scalar_one()
    return card
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.get_card_by_external_id classmethod
get_card_by_external_id(session, /, provider, external_id)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def get_card_by_external_id(
    cls,
    session: Session,
    /,
    provider: PaymentServiceProvider,
    external_id: str,
) -> Card:
    card: Card = session.execute(
        cls.select().filter(
            Card.provider == provider, Card.external_id == external_id
        )
    ).scalar_one()
    return card
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.get_card_id_by_external_id classmethod
get_card_id_by_external_id(
    session, /, provider, external_id
)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def get_card_id_by_external_id(
    cls,
    session: Session,
    /,
    provider: PaymentServiceProvider,
    external_id: str,
) -> UUID:
    card: Card = session.execute(
        cls.select(custom_autoload={})
        .filter(Card.provider == provider, Card.external_id == external_id)
        .options(load_only(Card.id))
    ).scalar_one()
    return card.id

find_* - Optional Single Model

Returns Model or None if not found.

Pattern: find_{model}_by_{criterion}

When to use:

  • Looking up a single model by specific criteria
  • Model might not exist (valid scenario)
  • Simple lookups (e.g., by ID, external ID, unique field)
Examples
components.payment_gateway.subcomponents.transfers.models.brokers.bank_transfer.BankTransferModelBroker.find_bank_transfer_by_id classmethod
find_bank_transfer_by_id(session, /, id)
Source code in components/payment_gateway/subcomponents/transfers/models/brokers/bank_transfer.py
@classmethod
def find_bank_transfer_by_id(
    cls,
    session: Session,
    /,
    id: UUID,
) -> BankTransfer | None:
    return session.execute(
        cls.select().filter(BankTransfer.id == id)
    ).scalar_one_or_none()

list_* - Multiple Models

Returns a list of models (possibly empty).

Pattern: list_{models}_for_{parent} or list_{models}_by_{criteria}

Optimization pattern: Create both list_{models} and list_{model}_ids methods:

  • Use list_{model}_ids when you only need IDs (avoids loading full objects)
  • Use load_only() and custom_autoload={} for ID-only queries

When to use:

  • Fetching multiple models
  • Results might be empty list (valid scenario)
  • Common for "all models for parent" queries
Examples
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.list_cards_for_card_holder classmethod
list_cards_for_card_holder(
    session, /, card_holder_id, with_terminated=False
)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def list_cards_for_card_holder(
    cls,
    session: Session,
    /,
    card_holder_id: UUID,
    with_terminated: bool = False,
) -> list[Card]:
    query = cls.select().filter(Card.card_holder_id == card_holder_id)
    if not with_terminated:
        query = query.filter(Card.terminated_at.is_(None))
    return list(session.execute(query).scalars().all())
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.list_card_ids_for_card_holder classmethod
list_card_ids_for_card_holder(
    session, /, card_holder_id, with_terminated=False
)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def list_card_ids_for_card_holder(
    cls,
    session: Session,
    /,
    card_holder_id: UUID,
    with_terminated: bool = False,
) -> list[UUID]:
    query = (
        cls.select(custom_autoload={})
        .filter(Card.card_holder_id == card_holder_id)
        .options(load_only(Card.id))
    )
    if not with_terminated:
        query = query.filter(Card.terminated_at.is_(None))
    cards = session.execute(query).scalars().all()
    return [card.id for card in cards]

Returns a list of models using multiple, combinable filter criteria.

Pattern: search_{models} (optionally with _by_{criteria} or _for_{context} suffix)

When to use:

  • Queries need multiple optional filter parameters
  • Callers should be able to combine different criteria (account IDs, date ranges, statuses, etc.)
  • Queries require ordering, limits, or pagination
  • Flexibility in query construction is important

Key characteristics:

  • Most parameters are optional (use | None type hints)
  • Callers can combine any subset of filters
  • Often includes ordering options (order_by_* parameters)
  • Returns list (possibly empty)

Distinction from list_*:

  • list_* → Simple queries with required criteria (e.g., "all cards for this holder")
  • search_* → Multi-criteria queries with optional, combinable filters
Examples
components.payment_gateway.subcomponents.transfers.models.brokers.account_transfer.AccountTransferModelBroker.search_account_transfers classmethod
search_account_transfers(
    session,
    /,
    *,
    account_ids=None,
    transfer_history_ids=None,
    start_effective_date=None,
    end_effective_date=None,
    order_by_created_at=False,
)
Source code in components/payment_gateway/subcomponents/transfers/models/brokers/account_transfer.py
@classmethod
def search_account_transfers(
    cls,
    session: Session,
    /,
    *,
    account_ids: list["AccountId"] | None = None,
    transfer_history_ids: list["TransferHistoryId"] | None = None,
    start_effective_date: datetime | None = None,
    end_effective_date: datetime | None = None,
    order_by_created_at: bool = False,
) -> list[AccountTransfer]:
    """Find all the account transfers for the given time period.

    Note: this will include TransferEvents that are out of bounds of the given time period.
    If at some point we want a function that returns only the events in the given time period,
    we should create a new method based on [this](https://stackoverflow.com/questions/67578639/flask-sqlalchemy-filter-objects-in-relationship-for-each-object)
    and not forget to add the .outerjoin.

    """
    query = cls.select()
    if account_ids is not None:
        query = query.filter(AccountTransfer.account_id.in_(account_ids))
    if transfer_history_ids is not None:
        query = query.filter(
            AccountTransfer.transfer_history_id.in_(transfer_history_ids)
        )

    if start_effective_date:
        query = query.filter(AccountTransfer.effective_date >= start_effective_date)
    if end_effective_date:
        query = query.filter(AccountTransfer.effective_date <= end_effective_date)

    # Build ordering
    orderings = []
    if order_by_created_at:
        orderings.append(AccountTransfer.created_at.desc())
    orderings.append(AccountTransfer.effective_date.desc())

    query = query.order_by(*orderings)

    return list(session.execute(query).scalars().all())
components.payment_gateway.subcomponents.transfers.models.brokers.bank_transfer.BankTransferModelBroker.search_bank_transfers classmethod
search_bank_transfers(
    session,
    /,
    *,
    account_ids=None,
    transfer_history_ids=None,
    start_effective_date=None,
    end_effective_date=None,
    order_by_created_at=False,
)
Source code in components/payment_gateway/subcomponents/transfers/models/brokers/bank_transfer.py
@classmethod
def search_bank_transfers(
    cls,
    session: Session,
    /,
    *,
    account_ids: list["AccountId"] | None = None,
    transfer_history_ids: list["TransferHistoryId"] | None = None,
    start_effective_date: datetime | None = None,
    end_effective_date: datetime | None = None,
    order_by_created_at: bool = False,
) -> list[BankTransfer]:
    """Find all the bank transfers in a transfer history for the given time period.

    Note: this will include TransferEvents that are out of bounds of the given time period.
    If at some point we want a function that returns only the events in the given time period,
    we should create a new method based on [this](https://stackoverflow.com/questions/67578639/flask-sqlalchemy-filter-objects-in-relationship-for-each-object)
    and not forget to add the .outerjoin.

    """
    query = cls.select()
    if account_ids is not None:
        query = query.filter(BankTransfer.account_id.in_(account_ids))
    if transfer_history_ids is not None:
        query = query.filter(
            BankTransfer.transfer_history_id.in_(transfer_history_ids)
        )

    if start_effective_date:
        query = query.filter(BankTransfer.effective_date >= start_effective_date)
    if end_effective_date:
        query = query.filter(BankTransfer.effective_date <= end_effective_date)

    # Build ordering
    orderings = []
    if order_by_created_at:
        orderings.append(BankTransfer.created_at.desc())
    orderings.append(BankTransfer.effective_date.desc())

    query = query.order_by(*orderings)

    return list(session.execute(query).scalars().all())

count_* - Count Models

Returns the count of models matching criteria without loading full objects.

Pattern: count_{models}_for_{parent} or count_{models}_by_{criteria}

When to use:

  • Need to know quantity without loading data
  • Performance-critical operations (avoid loading full objects)
  • Pagination or reporting features
Examples

Info

This pattern is currently unused and is planned for future adoption.

create_* - Create Model

Creates a new model and returns it.

Pattern: create_{model}

When to use:

  • Creating new models
  • Use session.flush() to get ID without committing
  • Create related models together (e.g., Card + CardStatusLog)
Examples
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.create_card classmethod
create_card(
    session,
    /,
    *,
    provider,
    external_id,
    display_name,
    expiration_date,
    last_four_digits,
    is_virtual,
    description=None,
    reference=None,
    issued_at=None,
    issuance_reason=None,
    id=None,
    card_holder_id=None,
    account_id=None,
    status=CardStatus.inactive,
)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def create_card(
    cls,
    session: Session,
    /,
    *,
    provider: PaymentServiceProvider,
    external_id: str,
    display_name: str,
    expiration_date: date,
    last_four_digits: str,
    is_virtual: bool,
    description: str | None = None,
    reference: str | None = None,
    issued_at: datetime | None = None,
    issuance_reason: str | None = None,
    id: UUID | None = None,
    card_holder_id: UUID | None = None,
    account_id: UUID | None = None,
    status: CardStatus = CardStatus.inactive,
) -> Card:
    card = cls.model(
        id=id,
        provider=provider,
        external_id=external_id,
        display_name=display_name,
        expiration_date=expiration_date,
        last_four_digits=last_four_digits,
        is_virtual=is_virtual,
        description=description,
        reference=reference,
        issued_at=issued_at,
        issuance_reason=issuance_reason,
        card_holder_id=card_holder_id,
        account_id=account_id,
    )
    status_log = CardStatusLog(card=card, status=status)
    session.add(card)
    session.add(status_log)
    session.flush()

    return card
components.payment_gateway.subcomponents.accounts.models.brokers.account.AccountModelBroker.create_account classmethod
create_account(
    session,
    /,
    *,
    provider,
    external_id,
    description,
    reference=None,
    account_holder_id=None,
    status=AccountStatus.inactive,
)
Source code in components/payment_gateway/subcomponents/accounts/models/brokers/account.py
@classmethod
def create_account(
    cls,
    session: Session,
    /,
    *,
    provider: PaymentServiceProvider,
    external_id: str,
    description: str,
    reference: str | None = None,
    account_holder_id: UUID | None = None,
    status: AccountStatus = AccountStatus.inactive,
) -> Account:
    account = cls.model(
        provider=provider,
        external_id=external_id,
        description=description,
        reference=reference,
        account_holder_id=account_holder_id,
    )
    status_log = AccountStatusLog(account=account, status=status)
    session.add(account)
    session.add(status_log)
    session.flush()

    return account

update_* - Update Model

Updates an existing model and returns it.

Pattern: update_{model}

When to use:

  • Modifying existing models
  • Use model.assign(dict) helper for bulk updates
Examples
components.payment_gateway.subcomponents.accounts.models.brokers.account.AccountModelBroker.update_account classmethod
update_account(session, /, id, **data)
Source code in components/payment_gateway/subcomponents/accounts/models/brokers/account.py
@classmethod
def update_account(
    cls,
    session: Session,
    /,
    id: UUID,
    **data: Unpack[AccountUpdate],
) -> Account:
    account = cls.get_account(session, id)
    account.assign(data)

    return account

set_* - State Transition

Changes a specific aspect of a model (typically state/status).

Pattern: set_{model}_{aspect}

When to use:

  • State transitions (status changes)
  • Changing specific properties that need special handling
  • Often returns None since caller already has the model
Examples
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.set_card_status classmethod
set_card_status(
    session,
    /,
    id,
    status,
    reason=None,
    suspension_source=None,
)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def set_card_status(
    cls,
    session: Session,
    /,
    id: UUID,
    status: CardStatus,
    reason: str | None = None,
    suspension_source: CardSuspensionSource | None = None,
) -> None:
    card = cls.get_card(session, id)
    if card.status == status:
        return
    status_log = CardStatusLog(
        card=card, status=status, reason=reason, suspension_source=suspension_source
    )
    session.add(status_log)
    session.refresh(card)
components.payment_gateway.subcomponents.accounts.models.brokers.account.AccountModelBroker.set_account_status classmethod
set_account_status(session, /, id, status)
Source code in components/payment_gateway/subcomponents/accounts/models/brokers/account.py
@classmethod
def set_account_status(
    cls,
    session: Session,
    /,
    id: UUID,
    status: AccountStatus,
) -> None:
    account = cls.get_account(session, id)
    if account.status == status:
        return
    status_log = AccountStatusLog(account=account, status=status)
    session.add(status_log)
    session.refresh(account)

terminate_* - Soft Delete

Marks a model as terminated (soft delete).

Pattern: terminate_{model}

When to use:

  • Soft deletes (setting terminated_at timestamp)
  • Never hard delete models (for audit trail)
Examples
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.terminate_card classmethod
terminate_card(session, /, id)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def terminate_card(
    cls,
    session: Session,
    /,
    id: UUID,
) -> Card:
    card = cls.get_card(session, id)
    card.terminated_at = datetime.now()

    return card
components.payment_gateway.subcomponents.accounts.models.brokers.account.AccountModelBroker.terminate_account classmethod
terminate_account(session, /, id)
Source code in components/payment_gateway/subcomponents/accounts/models/brokers/account.py
@classmethod
def terminate_account(
    cls,
    session: Session,
    /,
    id: UUID,
) -> Account:
    account = cls.get_account(session, id)
    account.terminated_at = datetime.now()

    return account

record_* - Atomic Idempotent Create

Records a model using lock-free atomic creation via database UniqueConstraint.

Pattern: record_{model}

Key characteristics:

  • Atomic: Uses database UniqueConstraint for lock-free creation
  • Idempotent: Safe to call multiple times with same data
  • Returns existing unchanged: If model exists, new attribute values are ignored and existing model is returned as-is
  • No updates: This is NOT an upsert - existing models are never modified
  • Returns tuple of (model, was_created: bool)

When to use:

  • Recording external event data (webhooks, API responses)
  • Processing events that may be delivered multiple times
  • Ensuring exactly-once semantics for external data

Important: For true idempotency, always call with the same values for the same logical entity. If you need to update existing models, use update_* or set_* methods instead.

Implementation pattern

The model must define a UniqueConstraint on the fields that uniquely identify the logical entity:

class BankTransfer(BaseModel):
    __tablename__ = "bank_transfers"

    id = Column(UUID, primary_key=True, default=uuid4)
    external_id = Column(String, nullable=False)
    provider = Column(String, nullable=False)
    # ... other fields ...

    __table_args__ = (
        UniqueConstraint(
            "provider",
            "external_id",
            name="bank_transfer__unique_external_id_per_provider",
        ),
    )

The broker's record_* method relies on this constraint to ensure atomicity - if two concurrent calls try to create the same entity, the database will reject one and the broker will retrieve the existing entity instead.

@classmethod
def record_bank_transfer(
    cls,
    session: Session,
    /,
    external_id: str,
    provider: str,
    # ... other fields ...
) -> tuple[BankTransfer, bool]:
        # Statement will return ID of new row or None if it already exists
        insert_stmt = (
            insert(BankTransfer)
            .values(
                provider=provider,
                external_id=external_id,
                # ... other fields ...
            )
            .on_conflict_do_nothing(
                constraint="bank_transfer__unique_external_id_per_provider"
            )
            .returning(cls.model.id)
        )
        created = session.scalar(insert_stmt)

        # Return model from provider-specific IDs (we don't have the ID)
        return session.execute(
            cls.select().filter(
                cls.model.provider == provider,
                cls.model.external_id == external_id,
            )
        ).scalar_one(), bool(created)
Examples
components.payment_gateway.subcomponents.transfers.models.brokers.transfer_update.TransferUpdateModelBroker.record_transfer_update classmethod
record_transfer_update(
    session,
    /,
    *,
    transfer_id,
    transfer_type,
    direction,
    sequence_number,
    occurred_at,
    amount,
    currency,
    status,
    provider,
    external_transfer_id,
    raw=None,
    external_transaction_id=None,
)

Returns:

Name Type Description
TransferUpdate TransferUpdate

The transfer update model instance.

bool bool

True if the transfer update was created, False if it already existed.

Source code in components/payment_gateway/subcomponents/transfers/models/brokers/transfer_update.py
@classmethod
def record_transfer_update(
    cls,
    session: Session,
    /,
    *,
    transfer_id: UUID,
    transfer_type: TransferUpdateTransferType,
    direction: TransferDirection,
    sequence_number: int,
    occurred_at: datetime,
    amount: int,
    currency: str,
    status: str,
    provider: PaymentServiceProvider,
    external_transfer_id: str,
    raw: dict[str, Any] | None = None,
    external_transaction_id: str | None = None,
) -> tuple[TransferUpdate, bool]:
    """Create a transfer update if it doesn't exist, else do nothing (idempotent).

    It's OK to just ignore the updated fields if the row already exists, as
    the PSPs should send the same information on every occurrence of the
    event, and there's nothing we can do with inconsistent updates. So we
    just record the first one and ignore the rest.

    Returns:
        TransferUpdate: The transfer update model instance.
        bool: True if the transfer update was created, False if it already existed.
    """
    insert_stmt = (
        insert(cls.model)
        .values(
            transfer_id=transfer_id,
            transfer_type=transfer_type,
            direction=direction,
            sequence_number=sequence_number,
            occurred_at=occurred_at,
            amount=amount,
            currency=currency,
            status=status,
            provider=provider,
            raw=raw,
            external_transfer_id=external_transfer_id,
            external_transaction_id=external_transaction_id,
        )
        .on_conflict_do_nothing(
            constraint="transfer_update__unique_provider_specific_ids"
        )
        .returning(cls.model.id)
    )
    created = session.scalar(insert_stmt)

    # Return model from provider-specific IDs
    return session.execute(
        cls.select().filter(
            cls.model.provider == provider,
            cls.model.external_transfer_id == external_transfer_id,
            cls.model.sequence_number == sequence_number,
        )
    ).scalar_one(), bool(created)
components.payment_gateway.subcomponents.transfers.models.brokers.bank_transfer.BankTransferModelBroker.record_bank_transfer classmethod
record_bank_transfer(
    session,
    /,
    *,
    effective_date,
    direction,
    provider,
    external_id,
    account_id,
    transfer_history_id,
    sepa_mandate_id=None,
    sepa_beneficiary_id=None,
    raw=None,
)
Source code in components/payment_gateway/subcomponents/transfers/models/brokers/bank_transfer.py
@classmethod
def record_bank_transfer(
    cls,
    session: Session,
    /,
    *,
    effective_date: datetime,
    direction: TransferDirection,
    provider: PaymentServiceProvider,
    external_id: str,
    account_id: UUID,
    transfer_history_id: UUID,
    sepa_mandate_id: UUID | None = None,
    sepa_beneficiary_id: UUID | None = None,
    raw: dict[str, Any] | None = None,
) -> tuple[BankTransfer, bool]:
    insert_stmt = (
        insert(BankTransfer)
        .values(
            effective_date=effective_date,
            direction=direction,
            provider=provider,
            external_id=external_id,
            account_id=account_id,
            transfer_history_id=transfer_history_id,
            sepa_mandate_id=sepa_mandate_id,
            sepa_beneficiary_id=sepa_beneficiary_id,
            raw=raw,
        )
        .on_conflict_do_nothing(
            constraint="bank_transfer__unique_external_id_per_provider"
        )
        .returning(cls.model.id)
    )
    created = session.scalar(insert_stmt)

    # Return model from provider-specific IDs
    return session.execute(
        cls.select().filter(
            cls.model.provider == provider,
            cls.model.external_id == external_id,
        )
    ).scalar_one(), bool(created)

count_* - Count Models

Returns the count of models matching criteria.

Pattern: count_{models}_for_{parent} or count_{models}_by_{criteria}

When to use:

  • Need count without loading models
  • Checking existence/quantity

Non-Standard Methods

For operations that don't fit the standard patterns above, use descriptive, intention-revealing names.

Rules:

  • Use clear, descriptive names that explain what the method does
  • Don't reuse standard prefixes (get_*, find_*, search_*, list_*, count_*, create_*, update_*, set_*, terminate_*, record_*)
  • Document behavior with clear docstrings
  • Follow signature conventions (session first, positional-only, etc.)

When to use:

  • Complex operations combining multiple patterns
  • Domain-specific operations that don't map to CRUD
  • Temporary methods during refactoring (document migration plan)

Note

Non-standard methods are acceptable when standard patterns don't fit. However, if you find yourself creating many, consider whether:

  • The operations might be better suited to a different layer (business logic, queries)
  • The method belongs to a different model broker (e.g., computing aggregates by parent model ID might belong to the parent's broker)

Method Signatures

All Model Broker methods follow strict signature patterns:

Session Parameter

Always first parameter, positional-only:

@classmethod
def get_card(
    cls,
    session: Session,
    /,  # Everything before this is positional-only
    id: UUID,
    with_status_history: bool = False,
) -> Card:
    ...

Transaction Management

Model Brokers NEVER commit transactions - they only perform database operations within the provided session. Transaction management is the caller's responsibility.

Key principles:

  • Use session.flush(): Model Brokers use flush() to persist changes and get IDs without committing
  • Caller commits: The calling code (business logic, queries, actions) decides when to commit
  • Transaction boundaries: Callers define transaction boundaries based on business requirements
  • Rollback on error: Callers handle exceptions and rollback if needed

Example - Broker implementation:

@classmethod
def create_card(
    cls,
    session: Session,
    /,
    **data,
) -> Card:
    card = Card(**data)
    status_log = CardStatusLog(card=card, status=CardStatus.inactive)

    session.add(card)
    session.add(status_log)
    session.flush()  # ✅ Flush to get IDs, but DON'T commit

    return card

Example - Caller's responsibility:

# Business logic layer handles transactions
def create_user_card(user_id: UUID, card_data: dict) -> Card:
    try:
        # Multiple broker calls in one transaction
        account = AccountModelBroker.get_account_for_user(
            current_session,
            user_id=user_id,
        )
        card = CardModelBroker.create_card(
            current_session,
            account_id=account.id,
            **card_data,
        )

        # Caller decides when to commit
        current_session.commit()  # ✅ Business logic commits
        return card

    except Exception:
        current_session.rollback()  # ✅ Business logic rolls back
        raise

Why this pattern?

  • Flexible transactions: Callers can combine multiple broker operations in one transaction
  • Clear boundaries: Transaction scope is explicit in business logic
  • Error handling: Callers handle rollback based on business requirements
  • Testability: Tests can use transactions without committing to the database

Type Hints

Full type hints on all parameters and returns:

@classmethod
def list_cards_for_account(
    cls,
    session: Session,
    /,
    account_id: UUID,
    with_terminated: bool = False,
) -> list[Card]:  # Explicit return type
    ...

Keyword-only Parameters

For methods with many parameters, use keyword-only:

@classmethod
def record_bank_transfer(
    cls,
    session: Session,
    /,
    *,  # Everything after this is keyword-only
    effective_date: datetime,
    direction: TransferDirection,
    provider: PaymentServiceProvider,
    external_id: str,
    account_id: UUID,
    # ... more parameters
) -> tuple[BankTransfer, bool]:
    ...

Optional Parameters

Use explicit defaults and type hints:

@classmethod
def get_card(
    cls,
    session: Session,
    /,
    id: UUID,
    with_status_history: bool = False,  # Optional, defaults to False
) -> Card:
    ...

The with_ Convention

Model Broker methods use the with_ prefix for optional boolean parameters that control eager loading and filtering behavior.

with_{relationship} - Eager Loading Control

Controls whether to eagerly load specific relationships.

When to use:

  • Relationships that are expensive to load
  • Relationships only needed in specific use cases
  • Allows caller to opt-in to additional data loading
Examples
components.payment_gateway.subcomponents.ledgers.models.brokers.ledger.LedgerModelBroker.get_ledger classmethod
get_ledger(session, /, id, with_entries=False)
Source code in components/payment_gateway/subcomponents/ledgers/models/brokers/ledger.py
@classmethod
def get_ledger(
    cls,
    session: Session,
    /,
    id: UUID,
    with_entries: bool = False,
) -> Ledger:
    if with_entries:
        query = cls.select(custom_autoload=cls.autoload | {"entries": True})
    else:
        query = cls.select()
    ledger: Ledger = session.execute(query.filter(Ledger.id == id)).scalar_one()
    return ledger
components.payment_gateway.subcomponents.transfers.models.brokers.bank_transfer.BankTransferModelBroker.get_bank_transfer classmethod
get_bank_transfer(session, /, id, with_sepa_mandate=False)
Source code in components/payment_gateway/subcomponents/transfers/models/brokers/bank_transfer.py
@classmethod
def get_bank_transfer(
    cls,
    session: Session,
    /,
    id: UUID,
    with_sepa_mandate: bool = False,
) -> BankTransfer:
    if with_sepa_mandate:
        query = cls.select(custom_autoload=cls.autoload | {"sepa_mandate": True})
    else:
        query = cls.select()
    bank_transfer: BankTransfer = session.execute(
        query.filter(BankTransfer.id == id)
    ).scalar_one()
    return bank_transfer
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.get_card classmethod
get_card(session, /, id, with_status_history=False)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def get_card(
    cls,
    session: Session,
    /,
    id: UUID,
    with_status_history: bool = False,
) -> Card:
    if with_status_history:
        query = cls.select(custom_autoload=cls.autoload | {"status_history": True})
    else:
        query = cls.select()
    card: Card = session.execute(query.filter(Card.id == id)).scalar_one()
    return card

Usage example:

# Without eager loading - only the card is loaded
card = CardModelBroker.get_card(
    session,
    id=card_id,
)
# card.status_history would trigger N+1 or a lazy loading exception!

# With eager loading - card + status_history are loaded
card = CardModelBroker.get_card(
    session,
    id=card_id,
    with_status_history=True,
)
# Can safely access card.status_history
for log in card.status_history:
    print(log.status, log.created_at)

with_terminated - Include Soft-Deleted Models

Controls whether to include terminated (soft-deleted) models in results.

When to use:

  • Listing/counting methods that might need terminated models
  • Default should be False (exclude terminated)
  • Common for audit/reporting features
Examples
components.payment_gateway.subcomponents.cards.models.brokers.card.CardModelBroker.list_cards_for_card_holder classmethod
list_cards_for_card_holder(
    session, /, card_holder_id, with_terminated=False
)
Source code in components/payment_gateway/subcomponents/cards/models/brokers/card.py
@classmethod
def list_cards_for_card_holder(
    cls,
    session: Session,
    /,
    card_holder_id: UUID,
    with_terminated: bool = False,
) -> list[Card]:
    query = cls.select().filter(Card.card_holder_id == card_holder_id)
    if not with_terminated:
        query = query.filter(Card.terminated_at.is_(None))
    return list(session.execute(query).scalars().all())

Usage example:

# Only active cards
active_cards = CardModelBroker.list_cards_for_card_holder(
    session,
    card_holder_id=holder_id,
)

# All cards including terminated ones
all_cards = CardModelBroker.list_cards_for_card_holder(
    session,
    card_holder_id=holder_id,
    with_terminated=True,
)

Common Patterns

ID-Only Optimization

Provide separate methods for fetching only IDs:

@classmethod
def list_card_ids_for_card_holder(
    cls,
    session: Session,
    /,
    card_holder_id: UUID,
) -> list[UUID]:
    """Optimized: only loads IDs, not full objects."""
    query = (
        cls.select(custom_autoload={})  # No eager loading
        .filter(Card.card_holder_id == card_holder_id)
        .options(load_only(Card.id))  # Only select ID column
    )
    cards = session.execute(query).scalars().all()
    return [card.id for card in cards]

Create multiple related models in one transaction:

@classmethod
def create_card(
    cls,
    session: Session,
    /,
    status: CardStatus = CardStatus.inactive,
    **data,
) -> Card:
    # Create main model
    card = cls.model(**data)

    # Create related model
    status_log = CardStatusLog(card=card, status=status)

    # Add both
    session.add(card)
    session.add(status_log)
    session.flush()  # Ensure we get IDs

    return card

File Organization

subcomponent/
└── models/
    ├── {model}.py             # SQLAlchemy model
    ├── brokers/
    │   ├── __init__.py
    │   ├── {model}.py         # {Model}ModelBroker
    │   └── tests/
    │       └── test_{model}_model_broker.py
    └── tests/
        └── factories.py       # Test factories

Testing Model Brokers

Unit Tests

Test individual broker methods:

@pytest.mark.usefixtures("db")
def test_get_card():
    # Arrange
    card = CardFactory.create()
    current_session.commit()

    # Act
    result = CardModelBroker.get_card(current_session, id=card.id)

    # Assert
    assert result.id == card.id
    assert result.current_status_log is not None  # Autoloaded

@pytest.mark.usefixtures("db")
def test_list_cards_for_card_holder_excludes_terminated():
    # Arrange
    card_holder_id = uuid4()
    active_card = CardFactory.create(card_holder_id=card_holder_id)
    terminated_card = CardFactory.create(
        card_holder_id=card_holder_id,
        terminated_at=datetime.now()
    )
    current_session.commit()

    # Act
    cards = CardModelBroker.list_cards_for_card_holder(
        current_session,
        card_holder_id=card_holder_id,
        with_terminated=False,
    )

    # Assert
    assert len(cards) == 1
    assert cards[0].id == active_card.id

Lazy Loading Prevention Tests

Test that N+1 queries are prevented using LazyLoadingStrategy.RAISE:

import pytest
import sqlalchemy.exc
from shared.model_brokers.base_model_broker import LazyLoadingStrategy

@pytest.fixture(autouse=True, scope="module")
def _prevent_lazy_loading():
    """Force all tests to use RAISE strategy to catch lazy loading."""
    old_strategy = CardModelBroker.lazy_loading_strategy
    CardModelBroker.lazy_loading_strategy = LazyLoadingStrategy.RAISE
    yield
    CardModelBroker.lazy_loading_strategy = old_strategy

@pytest.mark.usefixtures("db")
def test_list_cards_avoids_n_plus_1():
    # Arrange
    card_holder_id = uuid4()
    for _ in range(10):
        CardFactory.create(card_holder_id=card_holder_id)
    current_session.commit()

    # Act - Load cards with autoload
    cards = CardModelBroker.list_cards_for_card_holder(
        current_session,
        card_holder_id=card_holder_id,
    )

    # Assert - Can access autoloaded relationships without triggering lazy load
    for card in cards:
        assert card.current_status_log is not None  # ✅ Already loaded!

@pytest.mark.usefixtures("db")
def test_accessing_non_autoloaded_relationship_raises():
    # Arrange
    card = CardFactory.create()
    current_session.commit()

    # Act - Load card without optional relationships
    result = CardModelBroker.get_card(
        current_session,
        id=card.id,
        # Note: with_status_history=False (default)
    )

    # Assert - Accessing non-autoloaded relationship raises exception
    with pytest.raises(sqlalchemy.exc.InvalidRequestError):
        len(result.status_history)  # ❌ Lazy loading detected!

Why this is better than query counting:

  • Explicit failures: Lazy loading raises immediately, making issues obvious
  • No test infrastructure needed: Uses built-in LazyLoadingStrategy
  • Catches all lazy loads: Not just N+1 queries, but any accidental lazy loading
  • Standard pattern: Aligns with the Testing section's approach

Best Practices

✅ DO

  • Use Model Brokers exclusively: Never query models directly in business logic
  • Define autoload: Specify commonly needed relationships
  • Use descriptive names: Follow naming conventions consistently
  • Optimize ID queries: Provide separate methods for fetching only IDs
  • Test performance: Verify no N+1 queries
  • Document complex queries: Add docstrings explaining behavior
  • Use positional-only session: session: Session, /,
  • Flush, don't commit: Use session.flush() in create methods

❌ DON'T

  • Don't query directly: Never session.query(Card).filter(...)
  • Don't lazy load: Configure autoload instead
  • Don't return None from get_*: Use find_ for optional results
  • Don't bypass the broker: Even in tests, use brokers
  • Don't commit in broker: Let caller control transactions
  • Don't use legacy Query API: Use SQLAlchemy 2.0 select()
  • Don't use current_session in broker methods: Always accept session as parameter (callers can pass current_session)

References