Skip to content

Adapters

Transfer adapters package.

This package contains adapters for converting between different data formats in the transfers subcomponent.

components.payment_gateway.subcomponents.transfers.adapters.adyen

helpers

associate_account_transfer_events

associate_account_transfer_events(
    session, /, data, account_transfer
)

Associate account transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_account_transfer_events(
    session: Session,
    /,
    data: "TransferData",
    account_transfer: AccountTransferModel,
) -> None:
    """Associate account transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(AccountTransferTransferEventAssociation)
        .filter(
            AccountTransferTransferEventAssociation.account_transfer_id
            == account_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    with session.no_autoflush:
        associations: list[AccountTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                provider=PaymentServiceProvider.adyen,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(AccountTransferTransferEventAssociation).filter(  #
                        AccountTransferTransferEventAssociation.account_transfer_id
                        == account_transfer.id,
                        AccountTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = AccountTransferTransferEventAssociation(
                account_transfer=account_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

associate_bank_transfer_events

associate_bank_transfer_events(
    session, /, data, bank_transfer
)

Associate bank transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_bank_transfer_events(
    session: Session,
    /,
    data: "TransferData",
    bank_transfer: BankTransferModel,
) -> None:
    """Associate bank transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(BankTransferTransferEventAssociation)
        .filter(
            BankTransferTransferEventAssociation.bank_transfer_id == bank_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    with session.no_autoflush:
        associations: list[BankTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                provider=PaymentServiceProvider.adyen,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(BankTransferTransferEventAssociation).filter(
                        BankTransferTransferEventAssociation.bank_transfer_id
                        == bank_transfer.id,
                        BankTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = BankTransferTransferEventAssociation(
                bank_transfer=bank_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

associate_card_transfer_events

associate_card_transfer_events(
    session, /, data, card_transfer
)

Associate card transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_card_transfer_events(
    session: Session,
    /,
    data: "TransferData",
    card_transfer: CardTransferModel,
) -> None:
    """Associate card transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(CardTransferTransferEventAssociation)
        .filter(
            CardTransferTransferEventAssociation.card_payment_id == card_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    with session.no_autoflush:
        associations: list[CardTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                provider=PaymentServiceProvider.adyen,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(CardTransferTransferEventAssociation).filter(
                        CardTransferTransferEventAssociation.card_payment_id
                        == card_transfer.id,
                        CardTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = CardTransferTransferEventAssociation(
                card_payment=card_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

compute_transfer_update_amount_from_account_transfer_data

compute_transfer_update_amount_from_account_transfer_data(
    data,
)

Compute the amount and currency for an account transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.ACCOUNT to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old account transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_account_transfer_data(
    data: "TransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for an account transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.ACCOUNT` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old account transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    assert data.events
    assert data.status
    assert data.direction
    assert data.amount
    assert data.amount.value
    assert data.amount.currency
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    currency = data.amount.currency
    amount = data.amount.value if data.direction == "incoming" else -data.amount.value

    return amount, currency

compute_transfer_update_amount_from_bank_transfer_data

compute_transfer_update_amount_from_bank_transfer_data(
    data,
)

Compute the amount and currency for a bank transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.BANK to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old bank transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_bank_transfer_data(
    data: "TransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for a bank transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.BANK` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old bank transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    currency = data.amount.currency
    amount = data.amount.value if data.direction == "incoming" else -data.amount.value
    if data.status == "failed":
        # Need to invert the amount for failed transfers, for consistency with e.g. cancelled card transfers
        amount = -amount

    return amount, currency

compute_transfer_update_amount_from_card_transfer_data

compute_transfer_update_amount_from_card_transfer_data(
    data,
)

Compute the amount and currency for a card transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.CARD to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old card transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_card_transfer_data(
    data: "TransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for a card transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.CARD` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old card transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    assert data.events

    # Infer the right amount and currency
    last_event = data.events[-1]
    if last_event.amount:
        # Should always be the case but a handful bugs (see below)
        currency = last_event.amount.currency
        match data.status:
            case "captured" | "refunded" | "captureReversed":
                # Use mutation change as amount to handle differences between authorized and booked values
                assert last_event.mutations
                assert len(last_event.mutations) == 1
                last_mutation = last_event.mutations[0]
                amount = (
                    (last_mutation.balance or 0)
                    + (last_mutation.reserved or 0)
                    + (last_mutation.received or 0)
                )
            case "refused":
                # Invert the amount for consistency with authAdjustmentRefused
                amount = -last_event.amount.value
            case _:
                amount = last_event.amount.value
    elif data.status == "expired" and data.direction == "incoming":
        # This occurred a couple times for incoming transfers according to the logs so let's assume this is the only case we need to handle
        # Certainly an Adyen bug.
        # Take the reserved amount from the last mutation and the currency from the transfer (data shows it's OK)
        assert last_event.mutations
        assert len(last_event.mutations) == 1
        assert last_event.mutations[0].reserved
        currency = data.amount.currency
        amount = last_event.mutations[0].reserved
    else:
        # Last card transfer event has no amount, just use the value from the transfer data
        currency = data.amount.currency
        amount = data.amount.value

    return amount, currency

record_account_transfer_from_account_transfer_data

record_account_transfer_from_account_transfer_data(
    session, /, data, account_id, transfer_history_id
)

Record an account transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_account_transfer_from_account_transfer_data(
    session: Session,
    /,
    data: "TransferData",
    account_id: UUID,
    transfer_history_id: UUID,
) -> tuple[AccountTransferModel, bool]:
    """Record an account transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.reference
    assert data.creationDate
    assert data.direction

    return AccountTransferModelBroker.record_account_transfer(
        session,
        effective_date=datetime.fromisoformat(data.creationDate),
        direction=TransferDirection(data.direction),
        reference=data.reference,
        provider=PaymentServiceProvider.adyen,
        external_id=data.id.strip(),
        account_id=account_id,
        transfer_history_id=transfer_history_id,
        raw={k: v for k, v in data.to_dict().items() if k != "events"},
    )

record_bank_transfer_from_bank_transfer_data

record_bank_transfer_from_bank_transfer_data(
    session, /, data, account_id, transfer_history_id
)

Record a bank transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_bank_transfer_from_bank_transfer_data(
    session: Session,
    /,
    data: "TransferData",
    account_id: UUID,
    transfer_history_id: UUID,
) -> tuple[BankTransferModel, bool]:
    """Record a bank transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.creationDate
    assert data.direction

    return BankTransferModelBroker.record_bank_transfer(
        session,
        effective_date=datetime.fromisoformat(data.creationDate),
        direction=TransferDirection(data.direction),
        provider=PaymentServiceProvider.adyen,
        external_id=data.id.strip(),
        account_id=account_id,
        transfer_history_id=transfer_history_id,
        raw={k: v for k, v in data.to_dict().items() if k != "events"},
    )

record_card_transfer_from_card_transfer_data

record_card_transfer_from_card_transfer_data(
    session,
    /,
    data,
    card_id,
    account_id,
    transfer_history_id,
    merchant_info,
)

Record a card transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_card_transfer_from_card_transfer_data(
    session: Session,
    /,
    data: "TransferData",
    card_id: UUID,
    account_id: UUID,
    transfer_history_id: UUID,
    merchant_info: MerchantInfo,
) -> tuple[CardTransferModel, bool]:
    """Record a card transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.creationDate

    return CardTransferModelBroker.record_card_transfer(
        session,
        effective_date=datetime.fromisoformat(data.creationDate),
        mcc=merchant_info.mcc or "",
        merchant_id=merchant_info.merchant_id,
        postal_code=merchant_info.postal_code,
        city=merchant_info.city or "",
        country=merchant_info.country or "",
        name=merchant_info.name or "",
        provider=PaymentServiceProvider.adyen,
        external_id=data.id.strip(),
        card_id=card_id,
        account_id=account_id,
        transfer_history_id=transfer_history_id,
        raw=data.to_dict(),
    )

record_transfer_event_from_transfer_data

record_transfer_event_from_transfer_data(session, /, data)

Record a transfer event from an Adyen TransferEvent webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_event_from_transfer_data(
    session: Session,
    /,
    data: "TransferEvent",
) -> tuple[TransferEventModel, bool]:
    """Record a transfer event from an Adyen TransferEvent webhook payload. Idempotent."""

    assert data.id
    assert data.type == "accounting"
    assert data.status
    assert data.bookingDate

    received = reserved = balance = 0
    if not (data.mutations):
        current_logger.info(
            f"Transfer event {data.id} with type {data.type} has no mutations, assuming no balance change"
        )
    else:
        received = sum(mutation.received or 0 for mutation in data.mutations)
        reserved = sum(mutation.reserved or 0 for mutation in data.mutations)
        balance = sum(mutation.balance or 0 for mutation in data.mutations)

        if len(data.mutations) > 1:
            current_logger.error(
                "Multiple mutations found in Adyen transfer event, using the sum of them all"
            )

    return TransferEventModelBroker.record_transfer_event(
        session,
        effective_date=datetime.fromisoformat(data.bookingDate),
        received=received,
        reserved=reserved,
        balance=balance,
        status=data.status,
        provider=PaymentServiceProvider.adyen,
        external_id=data.id,
        raw=data.to_dict(),
    )

record_transfer_update_from_account_transfer_data

record_transfer_update_from_account_transfer_data(
    session, /, data, transfer_id
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_account_transfer_data(
    session: Session,
    /,
    data: "TransferData",
    transfer_id: UUID,
) -> tuple[TransferUpdateModel, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_account_transfer_data(data)

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        transfer_id=transfer_id,
        transfer_type=TransferUpdateTransferType.ACCOUNT,
        direction=TransferDirection(data.direction),
        sequence_number=data.sequenceNumber,
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        provider=PaymentServiceProvider.adyen,
        external_transfer_id=data.id,
        raw=data.to_dict(),
    )

record_transfer_update_from_bank_transfer_data

record_transfer_update_from_bank_transfer_data(
    session, /, data, transfer_id
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_bank_transfer_data(
    session: Session,
    /,
    data: "TransferData",
    transfer_id: UUID,
) -> tuple[TransferUpdateModel, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_bank_transfer_data(data)

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        transfer_id=transfer_id,
        transfer_type=TransferUpdateTransferType.BANK,
        direction=TransferDirection(data.direction),
        sequence_number=data.sequenceNumber,
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        provider=PaymentServiceProvider.adyen,
        external_transfer_id=data.id,
        raw=data.to_dict(),
    )

record_transfer_update_from_card_transfer_data

record_transfer_update_from_card_transfer_data(
    session, /, data, transfer_id
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_card_transfer_data(
    session: Session,
    /,
    data: "TransferData",
    transfer_id: UUID,
) -> tuple[TransferUpdateModel, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    from shared.services.adyen.openapi.balance_platform_transfer_notification_v4 import (
        IssuedCard,
    )

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_card_transfer_data(data)
    external_transaction_id = (
        data.categoryData.schemeUniqueTransactionId
        if isinstance(data.categoryData, IssuedCard)
        else None
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        transfer_id=transfer_id,
        transfer_type=TransferUpdateTransferType.CARD,
        direction=TransferDirection(data.direction),
        sequence_number=data.sequenceNumber,
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        provider=PaymentServiceProvider.adyen,
        external_transfer_id=data.id,
        external_transaction_id=external_transaction_id,
        raw=data.to_dict(),
    )

topic_subscribers

TransferTopicSubscriber

TransferTopicSubscriber(
    *,
    card_transfer_processor_policy,
    bank_transfer_processor_policy,
    account_transfer_processor_policy
)

Bases: Subscriber

This class subscribes to the Adyen transfer notification topic messages and dispatch them to application-provided processors

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/topic_subscribers.py
def __init__(
    self,
    *,
    card_transfer_processor_policy: CardTransferProcessorPolicy,
    bank_transfer_processor_policy: BankTransferProcessorPolicy,
    account_transfer_processor_policy: AccountTransferProcessorPolicy,
) -> None:
    self.card_transfer_processor_policy = card_transfer_processor_policy
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
    self.account_transfer_processor_policy = account_transfer_processor_policy
account_transfer_processor_policy instance-attribute
account_transfer_processor_policy = (
    account_transfer_processor_policy
)
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
card_transfer_processor_policy instance-attribute
card_transfer_processor_policy = (
    card_transfer_processor_policy
)
receive
receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: TransferNotificationRequest) -> None:
    logger = current_logger.bind(
        type=message.type,
        transfer_type=message.data.type,
        transfer_id=message.data.id,
        transfer_sequence_number=message.data.sequenceNumber,
    )
    try:
        match message.data.type, message.data.category:
            case "payment", "issuedCard":
                self.card_transfer_processor_policy.on_payment_received(
                    message.data
                )
            case "bankTransfer", "bank":
                self.bank_transfer_processor_policy.on_bank_transfer_received(
                    message.data
                )
            case "internalTransfer" | "invoiceDeduction", "internal":
                self.account_transfer_processor_policy.on_account_transfer_received(
                    message.data
                )
            case _:
                logger.error(
                    f"Transfer of type {message.data.type} is not supported"
                )
    except Exception as e:
        logger.exception(
            "Error processing transfer",
            exception=e,
        )
        alert_on_error_processing_transfer_notification(
            notification=message,
            message="An unexpected error occurred while processing the transfer notification",
        )

components.payment_gateway.subcomponents.transfers.adapters.jpmorgan

jpmorgan_payout_service_provider

JPMorganPayoutServiceProvider

JPMorganPayoutServiceProvider(jpmorgan_client)

Bases: PayoutServiceProvider

Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/jpmorgan_payout_service_provider.py
def __init__(self, jpmorgan_client: NoopJPMorganClient | JPMorganClient) -> None:
    self.jpmorgan_client = jpmorgan_client
create classmethod
create()
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/jpmorgan_payout_service_provider.py
@classmethod
def create(cls) -> "JPMorganPayoutServiceProvider":
    return JPMorganPayoutServiceProvider(
        JPMorganClient(JPMorganBusinessAccountName.alan_insurance)
    )
create_null classmethod
create_null()
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/jpmorgan_payout_service_provider.py
@classmethod
def create_null(cls) -> "JPMorganPayoutServiceProvider":
    return JPMorganPayoutServiceProvider(NoopJPMorganClient())
jpmorgan_client instance-attribute
jpmorgan_client = jpmorgan_client
pay
pay(
    request_id,
    amount_in_cents,
    currency,
    recipient_bank_account,
    recipient,
    description=None,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/jpmorgan_payout_service_provider.py
@override
def pay(
    self,
    request_id: str,
    amount_in_cents: int,
    currency: str,
    recipient_bank_account: BankTransferAccount,
    recipient: RecipientUser,
    description: str | None = None,
) -> tuple[TransactionId, RawResponse | None]:
    account_type = (
        JPMorganAccountType.INTERAC
        if recipient_bank_account.bank_transfer_account_type == "ca_account"
        else JPMorganAccountType.IBAN
    )
    supported_currency = self.jpmorgan_client.get_supported_currency()
    if supported_currency != currency:
        raise ValueError(
            f"Currency {currency} not supported by JPMorgan client, only {supported_currency} is supported"
        )
    payment_info = self.jpmorgan_client.pay(
        request_id=request_id,
        amount_in_cents=amount_in_cents,
        ibancode=recipient_bank_account.bank_transfer_account_info,
        account_type=account_type,
        recipient_first_name=recipient.first_name,
        recipient_last_name=recipient.last_name,
        address=(
            BillingAddress(
                street=recipient.address.street,
                postal_code=recipient.address.postal_code,
                city=recipient.address.city,
                country=recipient.address.country,
            )
            if recipient.address is not None
            else None
        ),
    )
    return payment_info.transaction_id, payment_info.payment_payload

policies

bank_transfer_processor

JPMorganBankTransferProcessorPolicy
on_bank_transfer_received
on_bank_transfer_received(data)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_bank_transfer_received(self, data: JPMorganPaymentCallbackEvent) -> None:
    external_transaction_id = data.firm_root_id
    external_transfer_id = data.end_to_end_id

    new_status = self._map_jpmorgan_status_to_payout_bank_transfer_status(
        data.payment_status
    )
    if new_status is None:
        raise ValueError(f"Unknown JPMorgan payment status: {data.payment_status}")

    with transaction(propagation=Propagation.REQUIRED) as session:
        bank_transfer = BankTransferModelBroker.get_bank_transfer_by_external_id(
            session,
            provider=PaymentServiceProvider.jpmorgan,
            external_id=external_transfer_id,
        )
        bank_transfer_id = bank_transfer.id

        transfer_updates = (
            TransferUpdateModelBroker.get_transfer_updates_by_bank_transfer_id(
                session,
                BankTransferId(bank_transfer_id),
            )
        )
        if len(transfer_updates) == 0:
            # This should never happen, as we should have created an initial transfer update
            # when we created the bank transfer.
            raise ValueError(
                f"No existing updates found for bank transfer {bank_transfer_id}, cannot process JPMorgan callback"
            )
        last_transfer_update = transfer_updates[0]
        new_sequence_number = last_transfer_update.sequence_number + 1
        amount = last_transfer_update.amount
        currency = last_transfer_update.currency

    # Dedicate a new transaction for recording the transfer update
    transfer_update_created = None
    with transaction(propagation=Propagation.REQUIRES_NEW) as session:
        transfer_update, created = TransferUpdateModelBroker.record_transfer_update(
            session,
            transfer_id=bank_transfer_id,
            transfer_type=TransferUpdateTransferType.BANK,
            direction=TransferDirection.OUTGOING,
            sequence_number=new_sequence_number,
            occurred_at=datetime.now(),
            amount=amount,
            currency=currency,
            status=new_status,
            provider=PaymentServiceProvider.jpmorgan,
            raw=data.raw,
            external_transfer_id=external_transfer_id,
            external_transaction_id=external_transaction_id,
        )
        if created:
            transfer_update_created = transfer_update_model_to_dataclass(
                transfer_update
            )

    # Publish outside transaction to avoid session closure issues
    if transfer_update_created is not None:
        transfer_update_topic.publish(transfer_update_created)

topic_subscribers

JPMorganBankTransferTopicSubscriber

JPMorganBankTransferTopicSubscriber(
    *, bank_transfer_processor_policy
)

Bases: Subscriber

Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/topic_subscribers.py
def __init__(
    self,
    *,
    bank_transfer_processor_policy: JPMorganBankTransferProcessorPolicy,
) -> None:
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
receive
receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: JPMorganPaymentCallbackEvent) -> None:
    logger = current_logger.bind(
        firm_root_id=message.firm_root_id,
        end_to_end_id=message.end_to_end_id,
    )
    try:
        self.bank_transfer_processor_policy.on_bank_transfer_received(message)
    except Exception as e:
        logger.exception(
            "Error processing transfer",
            exception=e,
        )

components.payment_gateway.subcomponents.transfers.adapters.messaging

PayoutBankTransferTopicSubscriber

Bases: Subscriber

This class is an adapter between shared pub/sub (used only by payment gateway) and shared messaging (use for inter-component communication).

receive

receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/messaging.py
@override
def receive(self, message: TransferUpdate) -> None:
    if (
        message.direction == TransferDirection.OUTGOING
        and message.transfer_type == TransferUpdateTransferType.BANK
    ):
        event = PayoutBankTransferUpdated(
            event_date=message.occurred_at,
            bank_transfer_id=BankTransferId(message.transfer_id),
            provider=message.provider,
            external_transfer_id=mandatory(message.external_transfer_id),
            external_transaction_id=mandatory(message.external_transaction_id),
            status=PayoutBankTransferStatus(message.status),
        )
        get_message_broker().publish(event)

components.payment_gateway.subcomponents.transfers.adapters.models

mappers

account_transfer_model_to_dataclass

account_transfer_model_to_dataclass(account_transfer)

Convert an AccountTransfer model to AccountTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def account_transfer_model_to_dataclass(
    account_transfer: "AccountTransferModel",
) -> "AccountTransferDataclass":
    """Convert an AccountTransfer model to AccountTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        AccountTransfer as AccountTransferDataclass,
        AccountTransferId,
        TransferHistoryId,
    )

    return AccountTransferDataclass(
        id=AccountTransferId(account_transfer.id),
        direction=account_transfer.direction,
        reference=account_transfer.reference,
        effective_date=account_transfer.effective_date,
        account_id=AccountId(account_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=TransferHistoryId(account_transfer.transfer_history_id),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in account_transfer.updates
        ],
        events=[
            transfer_event_model_to_dataclass(event)
            for event in account_transfer.events
        ],
        raw=account_transfer.raw,
    )

bank_transfer_model_to_dataclass

bank_transfer_model_to_dataclass(bank_transfer)

Convert a BankTransfer model to BankTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def bank_transfer_model_to_dataclass(
    bank_transfer: "BankTransferModel",
) -> "BankTransferDataclass":
    """Convert a BankTransfer model to BankTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
        SepaBeneficiaryId,
    )
    from components.payment_gateway.subcomponents.banking_documents.protected.entities import (
        SepaMandateId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        BankTransfer as BankTransferDataclass,
        BankTransferId,
        TransferHistoryId,
    )

    return BankTransferDataclass(
        id=BankTransferId(bank_transfer.id),
        external_id=bank_transfer.external_id,
        direction=bank_transfer.direction,
        effective_date=bank_transfer.effective_date,
        account_id=AccountId(bank_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=(
            TransferHistoryId(bank_transfer.transfer_history_id)
            if bank_transfer.transfer_history_id is not None
            else None
        ),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in bank_transfer.updates
        ],
        events=[
            transfer_event_model_to_dataclass(event) for event in bank_transfer.events
        ],
        sepa_mandate_id=(
            SepaMandateId(bank_transfer.sepa_mandate_id)
            if bank_transfer.sepa_mandate_id is not None
            else None
        ),
        sepa_beneficiary_id=(
            SepaBeneficiaryId(bank_transfer.sepa_beneficiary_id)
            if bank_transfer.sepa_beneficiary_id is not None
            else None
        ),
        raw=bank_transfer.raw,
    )

card_transfer_model_to_dataclass

card_transfer_model_to_dataclass(
    card_transfer,
    start_event_date=None,
    end_event_date=None,
)

Convert a CardTransfer model to CardTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def card_transfer_model_to_dataclass(
    card_transfer: "CardTransferModel",
    start_event_date: datetime | None = None,
    end_event_date: datetime | None = None,
) -> "CardTransferDataclass":
    """Convert a CardTransfer model to CardTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
    )
    from components.payment_gateway.subcomponents.cards.protected.entities import (
        CardId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        CardPaymentMerchant,
        CardTransfer as CardTransferDataclass,
        CardTransferId,
        TransferHistoryId,
    )

    # TODO @frederic.bonnet 2025-09-01 Backfill the country code in the database.
    country_info = get_country_info_by_name(card_transfer.country)
    country_code = country_info.alpha_3 if country_info else "XXX"

    return CardTransferDataclass(
        id=CardTransferId(card_transfer.id),
        effective_date=card_transfer.effective_date,
        merchant=CardPaymentMerchant(
            merchant_id=card_transfer.merchant_id,
            name=card_transfer.name,
            mcc=card_transfer.mcc,
            postal_code=card_transfer.postal_code,
            city=card_transfer.city,
            country_code=country_code,
        ),
        card_id=CardId(card_transfer.card_id),  # type: ignore[arg-type]
        account_id=AccountId(card_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=TransferHistoryId(card_transfer.transfer_history_id),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in card_transfer.updates
            if (
                start_event_date is None
                or sanitize_tz(update.occurred_at) >= start_event_date
            )
            and (
                end_event_date is None
                or sanitize_tz(update.occurred_at) <= end_event_date
            )
        ],
        events=[
            transfer_event_model_to_dataclass(event)
            for event in card_transfer.events
            if (
                start_event_date is None
                or sanitize_tz(event.effective_date) >= start_event_date
            )
            and (
                end_event_date is None
                or sanitize_tz(event.effective_date) <= end_event_date
            )
        ],
        raw=card_transfer.raw,
    )

internal_transfer_model_to_dataclass

internal_transfer_model_to_dataclass(internal_transfer)

Convert an InternalTransfer model to InternalTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def internal_transfer_model_to_dataclass(
    internal_transfer: "InternalTransferModel",
) -> "InternalTransferDataclass":
    """Convert an InternalTransfer model to InternalTransfer dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        InternalTransfer as InternalTransferDataclass,
        InternalTransferId,
        TransferHistoryId,
    )

    return InternalTransferDataclass(
        id=InternalTransferId(internal_transfer.id),
        effective_date=internal_transfer.effective_date,
        amount=internal_transfer.amount,
        description=internal_transfer.description,
        reference=internal_transfer.reference,
        transfer_history_id=TransferHistoryId(internal_transfer.transfer_history_id),
    )

transfer_event_model_to_dataclass

transfer_event_model_to_dataclass(transfer_event)

Convert a TransferEvent model to TransferEvent dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def transfer_event_model_to_dataclass(
    transfer_event: "TransferEventModel",
) -> "TransferEventDataclass":
    """Convert a TransferEvent model to TransferEvent dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        TransferEvent as TransferEventDataclass,
        TransferEventId,
    )

    return TransferEventDataclass(
        id=TransferEventId(transfer_event.id),
        effective_date=transfer_event.effective_date,
        received=transfer_event.received,
        reserved=transfer_event.reserved,
        balance=transfer_event.balance,
        status=transfer_event.status,
        raw=transfer_event.raw,
    )

transfer_update_model_to_dataclass

transfer_update_model_to_dataclass(transfer_update)

Convert a TransferUpdate model to TransferUpdate dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def transfer_update_model_to_dataclass(
    transfer_update: "TransferUpdateModel",
) -> "TransferUpdateDataclass":
    """Convert a TransferUpdate model to TransferUpdate dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        AccountTransferId,
        BankTransferId,
        CardTransferId,
        TransferUpdate as TransferUpdateDataclass,
        TransferUpdateId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.enums import (
        TransferUpdateTransferType,
    )

    transfer_id: CardTransferId | BankTransferId | AccountTransferId
    match transfer_update.transfer_type:
        case TransferUpdateTransferType.CARD:
            transfer_id = CardTransferId(transfer_update.transfer_id)
        case TransferUpdateTransferType.BANK:
            transfer_id = BankTransferId(transfer_update.transfer_id)
        case TransferUpdateTransferType.ACCOUNT:
            transfer_id = AccountTransferId(transfer_update.transfer_id)
        case _:
            assert_never(transfer_update.transfer_type)  # Exhaustiveness check

    return TransferUpdateDataclass(
        id=TransferUpdateId(transfer_update.id),
        transfer_id=transfer_id,
        transfer_type=transfer_update.transfer_type,
        direction=transfer_update.direction,
        sequence_number=transfer_update.sequence_number,
        occurred_at=transfer_update.occurred_at,
        amount=transfer_update.amount,
        currency=transfer_update.currency,
        status=transfer_update.status,
        provider=transfer_update.provider,
        external_transfer_id=transfer_update.external_transfer_id,
        external_transaction_id=transfer_update.external_transaction_id,
        raw=transfer_update.raw,
    )

components.payment_gateway.subcomponents.transfers.adapters.payout_service_provider

PayoutServiceProvider

Bases: ABC

pay abstractmethod

pay(
    request_id,
    amount_in_cents,
    currency,
    recipient_bank_account,
    recipient,
    description=None,
)

Initiate a payout to a bank account. :param request_id: Unique identifier for the payout request. :param amount_in_cents: Amount to be paid in cents. :param currency: Currency code (e.g., 'USD', 'EUR'). :param recipient_bank_account: Bank account details of the recipient. :param recipient: Recipient details, either a user or a company. :param description: Optional description for the payment. :return: TransactionId - Unique identifier of the transaction generated by the payment provider.

Source code in components/payment_gateway/subcomponents/transfers/adapters/payout_service_provider.py
@abstractmethod
def pay(
    self,
    request_id: str,
    amount_in_cents: int,
    currency: str,
    recipient_bank_account: BankTransferAccount,
    recipient: RecipientUser,
    description: str | None = None,
) -> tuple[TransactionId, RawResponse | None]:
    """
    Initiate a payout to a bank account.
     :param request_id: Unique identifier for the payout request.
     :param amount_in_cents: Amount to be paid in cents.
     :param currency: Currency code (e.g., 'USD', 'EUR').
     :param recipient_bank_account: Bank account details of the recipient.
     :param recipient: Recipient details, either a user or a company.
     :param description: Optional description for the payment.
     :return: TransactionId - Unique identifier of the transaction generated by the payment provider.
    """
    ...

RawResponse module-attribute

RawResponse = dict[str, Any]

TransactionId module-attribute

TransactionId = str

components.payment_gateway.subcomponents.transfers.adapters.payout_service_provider_registry

PayoutServiceProviderRegistry

PayoutServiceProviderRegistry(providers)
Source code in components/payment_gateway/subcomponents/transfers/adapters/payout_service_provider_registry.py
def __init__(
    self, providers: dict[PaymentServiceProvider, PayoutServiceProvider]
) -> None:
    self.providers = providers

create classmethod

create()
Source code in components/payment_gateway/subcomponents/transfers/adapters/payout_service_provider_registry.py
@classmethod
def create(cls) -> "PayoutServiceProviderRegistry":
    return PayoutServiceProviderRegistry(
        {
            PaymentServiceProvider.jpmorgan: JPMorganPayoutServiceProvider.create(),
        }
    )

create_null classmethod

create_null()
Source code in components/payment_gateway/subcomponents/transfers/adapters/payout_service_provider_registry.py
@classmethod
def create_null(cls) -> "PayoutServiceProviderRegistry":
    return PayoutServiceProviderRegistry(
        {
            PaymentServiceProvider.jpmorgan: JPMorganPayoutServiceProvider.create_null(),
        }
    )

get

get(provider_name)
Source code in components/payment_gateway/subcomponents/transfers/adapters/payout_service_provider_registry.py
def get(self, provider_name: PaymentServiceProvider) -> PayoutServiceProvider:
    provider = self.providers.get(provider_name)
    if provider is None:
        raise NotImplementedError(
            f"Payout is not supported by this provider {PayoutServiceProvider}"
        )
    return provider

providers class-attribute instance-attribute

providers = providers