Skip to content

Adapters

Transfer adapters package.

This package contains adapters for converting between different data formats in the transfers subcomponent.

components.payment_gateway.subcomponents.transfers.adapters.adyen

helpers

associate_account_transfer_events

associate_account_transfer_events(
    session, /, data, account_transfer
)

Associate account transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_account_transfer_events(
    session: Session,
    /,
    data: "TransferData",
    account_transfer: AccountTransferModel,
) -> None:
    """Associate account transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(AccountTransferTransferEventAssociation)
        .filter(
            AccountTransferTransferEventAssociation.account_transfer_id
            == account_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    # First temporarily reverse the sequence numbers of existing associations, this will avoid conflicts when reordering
    session.execute(
        update(AccountTransferTransferEventAssociation)
        .where(
            AccountTransferTransferEventAssociation.account_transfer_id
            == account_transfer.id
        )
        .values(sequence=-AccountTransferTransferEventAssociation.sequence - 1)
    )

    with session.no_autoflush:
        associations: list[AccountTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                workspace_key=account_transfer.workspace_key,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(AccountTransferTransferEventAssociation).filter(  #
                        AccountTransferTransferEventAssociation.account_transfer_id
                        == account_transfer.id,
                        AccountTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = AccountTransferTransferEventAssociation(
                account_transfer=account_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

    # Now set all negative sequence numbers to 0; this should trigger conflicts in case some were missing from the new sequence
    with raise_if_missing_account_transfer_event(account_transfer.id):
        session.execute(
            update(AccountTransferTransferEventAssociation)
            .where(
                AccountTransferTransferEventAssociation.account_transfer_id
                == account_transfer.id
            )
            .where(AccountTransferTransferEventAssociation.sequence < 0)
            .values(sequence=0)
        )

associate_bank_transfer_events

associate_bank_transfer_events(
    session, /, data, bank_transfer
)

Associate bank transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_bank_transfer_events(
    session: Session,
    /,
    data: "TransferData",
    bank_transfer: BankTransferModel,
) -> None:
    """Associate bank transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(BankTransferTransferEventAssociation)
        .filter(
            BankTransferTransferEventAssociation.bank_transfer_id == bank_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    # First temporarily reverse the sequence numbers of existing associations, this will avoid conflicts when reordering
    session.execute(
        update(BankTransferTransferEventAssociation)
        .where(
            BankTransferTransferEventAssociation.bank_transfer_id == bank_transfer.id
        )
        .values(sequence=-BankTransferTransferEventAssociation.sequence - 1)
    )

    with session.no_autoflush:
        associations: list[BankTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                workspace_key=bank_transfer.workspace_key,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(BankTransferTransferEventAssociation).filter(
                        BankTransferTransferEventAssociation.bank_transfer_id
                        == bank_transfer.id,
                        BankTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = BankTransferTransferEventAssociation(
                bank_transfer=bank_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

    # Now set all negative sequence numbers to 0; this should trigger conflicts in case some were missing from the new sequence
    with raise_if_missing_bank_transfer_event(bank_transfer.id):
        session.execute(
            update(BankTransferTransferEventAssociation)
            .where(
                BankTransferTransferEventAssociation.bank_transfer_id
                == bank_transfer.id
            )
            .where(BankTransferTransferEventAssociation.sequence < 0)
            .values(sequence=0)
        )

associate_card_transfer_events

associate_card_transfer_events(
    session, /, data, card_transfer
)

Associate card transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_card_transfer_events(
    session: Session,
    /,
    data: "TransferData",
    card_transfer: CardTransferModel,
) -> None:
    """Associate card transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(CardTransferTransferEventAssociation)
        .filter(
            CardTransferTransferEventAssociation.card_payment_id == card_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    # First temporarily reverse the sequence numbers of existing associations, this will avoid conflicts when reordering
    session.execute(
        update(CardTransferTransferEventAssociation)
        .where(CardTransferTransferEventAssociation.card_payment_id == card_transfer.id)
        .values(sequence=-CardTransferTransferEventAssociation.sequence - 1)
    )

    with session.no_autoflush:
        associations: list[CardTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                workspace_key=card_transfer.workspace_key,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(CardTransferTransferEventAssociation).filter(
                        CardTransferTransferEventAssociation.card_payment_id
                        == card_transfer.id,
                        CardTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = CardTransferTransferEventAssociation(
                card_payment=card_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

    # Now set all negative sequence numbers to 0; this should trigger conflicts in case some were missing from the new sequence
    with raise_if_missing_card_transfer_event(card_transfer.id):
        session.execute(
            update(CardTransferTransferEventAssociation)
            .where(
                CardTransferTransferEventAssociation.card_payment_id == card_transfer.id
            )
            .where(CardTransferTransferEventAssociation.sequence < 0)
            .values(sequence=0)
        )

compute_transfer_update_amount_from_account_transfer_data

compute_transfer_update_amount_from_account_transfer_data(
    data,
)

Compute the amount and currency for an account transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.ACCOUNT to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old account transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_account_transfer_data(
    data: "TransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for an account transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.ACCOUNT` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old account transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    assert data.events
    assert data.status
    assert data.direction
    assert data.amount
    assert data.amount.value
    assert data.amount.currency
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    currency = data.amount.currency
    amount = data.amount.value if data.direction == "incoming" else -data.amount.value

    return amount, currency

compute_transfer_update_amount_from_bank_transfer_data

compute_transfer_update_amount_from_bank_transfer_data(
    data,
)

Compute the amount and currency for a bank transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.BANK to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old bank transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_bank_transfer_data(
    data: "TransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for a bank transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.BANK` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old bank transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    currency = data.amount.currency
    amount = data.amount.value if data.direction == "incoming" else -data.amount.value
    if data.status == "failed":
        # Need to invert the amount for failed transfers, for consistency with e.g. cancelled card transfers
        amount = -amount

    return amount, currency

compute_transfer_update_amount_from_card_transfer_data

compute_transfer_update_amount_from_card_transfer_data(
    data,
)

Compute the amount and currency for a card transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.CARD to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old card transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_card_transfer_data(
    data: "TransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for a card transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.CARD` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old card transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    assert data.events

    # Infer the right amount and currency
    last_event = data.events[-1]
    if last_event.amount:
        # Should always be the case but a handful bugs (see below)
        currency = last_event.amount.currency
        match data.status:
            case "captured" | "refunded" | "captureReversed":
                # Use mutation change as amount to handle differences between authorized and booked values
                assert last_event.mutations
                assert len(last_event.mutations) == 1
                last_mutation = last_event.mutations[0]
                amount = (
                    (last_mutation.balance or 0)
                    + (last_mutation.reserved or 0)
                    + (last_mutation.received or 0)
                )
            case "refused":
                # Invert the amount for consistency with authAdjustmentRefused
                amount = -last_event.amount.value
            case _:
                amount = last_event.amount.value
    elif data.status == "expired" and data.direction == "incoming":
        # This occurred a couple times for incoming transfers according to the logs so let's assume this is the only case we need to handle
        # Certainly an Adyen bug.
        # Take the reserved amount from the last mutation and the currency from the transfer (data shows it's OK)
        assert last_event.mutations
        assert len(last_event.mutations) == 1
        assert last_event.mutations[0].reserved
        currency = data.amount.currency
        amount = last_event.mutations[0].reserved
    else:
        # Last card transfer event has no amount, just use the value from the transfer data
        currency = data.amount.currency
        amount = data.amount.value

    return amount, currency

record_account_transfer_from_account_transfer_data

record_account_transfer_from_account_transfer_data(
    session, /, *, data, account, transfer_history_id
)

Record an account transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_account_transfer_from_account_transfer_data(
    session: Session,
    /,
    *,
    data: "TransferData",
    account: Account,
    transfer_history_id: UUID,
) -> tuple[AccountTransferModel, bool]:
    """Record an account transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.reference
    assert data.creationDate
    assert data.direction

    return AccountTransferModelBroker.record_account_transfer(
        session,
        workspace_key=account.workspace_key,
        external_id=data.id.strip(),
        effective_date=datetime.fromisoformat(data.creationDate),
        direction=TransferDirection(data.direction),
        reference=data.reference,
        account_id=account.id,
        transfer_history_id=transfer_history_id,
        raw={k: v for k, v in data.to_dict().items() if k != "events"},
    )

record_bank_transfer_from_bank_transfer_data

record_bank_transfer_from_bank_transfer_data(
    session, /, *, data, account, transfer_history_id
)

Record a bank transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_bank_transfer_from_bank_transfer_data(
    session: Session,
    /,
    *,
    data: "TransferData",
    account: Account,
    transfer_history_id: UUID,
) -> tuple[BankTransferModel, bool]:
    """Record a bank transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.creationDate
    assert data.direction

    return BankTransferModelBroker.record_bank_transfer(
        session,
        workspace_key=account.workspace_key,
        external_id=data.id.strip(),
        effective_date=datetime.fromisoformat(data.creationDate),
        direction=TransferDirection(data.direction),
        account_id=account.id,
        transfer_history_id=transfer_history_id,
        raw={k: v for k, v in data.to_dict().items() if k != "events"},
    )

record_card_transfer_from_card_transfer_data

record_card_transfer_from_card_transfer_data(
    session,
    /,
    *,
    data,
    card,
    account_id,
    transfer_history_id,
    merchant_info,
)

Record a card transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_card_transfer_from_card_transfer_data(
    session: Session,
    /,
    *,
    data: "TransferData",
    card: Card,
    account_id: UUID,
    transfer_history_id: UUID,
    merchant_info: MerchantInfo,
) -> tuple[CardTransferModel, bool]:
    """Record a card transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.creationDate

    return CardTransferModelBroker.record_card_transfer(
        session,
        workspace_key=card.workspace_key,
        external_id=data.id.strip(),
        effective_date=datetime.fromisoformat(data.creationDate),
        mcc=merchant_info.mcc or "",
        merchant_id=merchant_info.merchant_id,
        postal_code=merchant_info.postal_code,
        city=merchant_info.city or "",
        country=merchant_info.country or "",
        name=merchant_info.name or "",
        card_id=card.id,
        account_id=account_id,
        transfer_history_id=transfer_history_id,
        raw=data.to_dict(),
    )

record_transfer_event_from_transfer_data

record_transfer_event_from_transfer_data(
    session, /, *, data, transfer
)

Record a transfer event from an Adyen TransferEvent webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_event_from_transfer_data(
    session: Session,
    /,
    *,
    data: "TransferEvent",
    transfer: AccountTransferModel | BankTransferModel | CardTransferModel,
) -> tuple[TransferEventModel, bool]:
    """Record a transfer event from an Adyen TransferEvent webhook payload. Idempotent."""

    assert data.id
    assert data.type == "accounting"
    assert data.status
    assert data.bookingDate

    received = reserved = balance = 0
    if not (data.mutations):
        current_logger.info(
            f"Transfer event {data.id} with type {data.type} has no mutations, assuming no balance change"
        )
    else:
        received = sum(mutation.received or 0 for mutation in data.mutations)
        reserved = sum(mutation.reserved or 0 for mutation in data.mutations)
        balance = sum(mutation.balance or 0 for mutation in data.mutations)

        if len(data.mutations) > 1:
            current_logger.error(
                "Multiple mutations found in Adyen transfer event, using the sum of them all"
            )

    return TransferEventModelBroker.record_transfer_event(
        session,
        workspace_key=transfer.workspace_key,
        external_id=data.id,
        effective_date=datetime.fromisoformat(data.bookingDate),
        received=received,
        reserved=reserved,
        balance=balance,
        status=data.status,
        raw=data.to_dict(),
    )

record_transfer_update_from_account_transfer_data

record_transfer_update_from_account_transfer_data(
    session, /, *, data, account_transfer
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_account_transfer_data(
    session: Session,
    /,
    *,
    data: "TransferData",
    account_transfer: AccountTransferModel,
) -> tuple[TransferUpdateModel, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_account_transfer_data(data)

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=account_transfer.workspace_key,
        external_transfer_id=data.id,
        sequence_number=data.sequenceNumber,
        transfer_id=account_transfer.id,
        transfer_type=TransferUpdateTransferType.ACCOUNT,
        direction=TransferDirection(data.direction),
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        raw=data.to_dict(),
    )

record_transfer_update_from_bank_transfer_data

record_transfer_update_from_bank_transfer_data(
    session, /, *, data, bank_transfer
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_bank_transfer_data(
    session: Session,
    /,
    *,
    data: "TransferData",
    bank_transfer: BankTransferModel,
) -> tuple[TransferUpdateModel, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_bank_transfer_data(data)

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=bank_transfer.workspace_key,
        external_transfer_id=data.id,
        sequence_number=data.sequenceNumber,
        transfer_id=bank_transfer.id,
        transfer_type=TransferUpdateTransferType.BANK,
        direction=TransferDirection(data.direction),
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        raw=data.to_dict(),
    )

record_transfer_update_from_card_transfer_data

record_transfer_update_from_card_transfer_data(
    session, /, *, data, card_transfer
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_card_transfer_data(
    session: Session,
    /,
    *,
    data: "TransferData",
    card_transfer: CardTransferModel,
) -> tuple[TransferUpdateModel, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    from shared.services.payment_providers.adyen.openapi.balance_platform_transfer_notification_v4 import (
        IssuedCard,
    )

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_card_transfer_data(data)
    external_transaction_id = (
        data.categoryData.schemeUniqueTransactionId
        if isinstance(data.categoryData, IssuedCard)
        else None
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=card_transfer.workspace_key,
        external_transfer_id=data.id,
        sequence_number=data.sequenceNumber,
        transfer_id=card_transfer.id,
        transfer_type=TransferUpdateTransferType.CARD,
        direction=TransferDirection(data.direction),
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        external_transaction_id=external_transaction_id,
        raw=data.to_dict(),
    )

topic_subscribers

TransferTopicSubscriber

TransferTopicSubscriber(
    *,
    card_transfer_processor_policy,
    bank_transfer_processor_policy,
    account_transfer_processor_policy
)

Bases: Subscriber

This class subscribes to the Adyen transfer notification topic messages and dispatch them to application-provided processors

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/topic_subscribers.py
def __init__(
    self,
    *,
    card_transfer_processor_policy: CardTransferProcessorPolicy,
    bank_transfer_processor_policy: BankTransferProcessorPolicy,
    account_transfer_processor_policy: AccountTransferProcessorPolicy,
) -> None:
    self.card_transfer_processor_policy = card_transfer_processor_policy
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
    self.account_transfer_processor_policy = account_transfer_processor_policy
account_transfer_processor_policy instance-attribute
account_transfer_processor_policy = (
    account_transfer_processor_policy
)
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
card_transfer_processor_policy instance-attribute
card_transfer_processor_policy = (
    card_transfer_processor_policy
)
receive
receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: TransferNotificationRequest) -> None:
    logger = current_logger.bind(
        type=message.type,
        transfer_type=message.data.type,
        transfer_id=message.data.id,
        transfer_sequence_number=message.data.sequenceNumber,
    )
    try:
        match message.data.type, message.data.category:
            case "payment", "issuedCard":
                self.card_transfer_processor_policy.on_payment_received(
                    message.data
                )
            case ("bankTransfer", "bank") | ("capture", "topUp"):
                self.bank_transfer_processor_policy.on_bank_transfer_received(
                    message.data
                )
            case ("capture", "platformPayment") | ("fee", "internal"):
                logger.info(
                    f"Webhook with id: {message.data.id}, type: {message.data.type} and category: {message.data.category} ignored"
                )
                return
            case "internalTransfer" | "invoiceDeduction", "internal":
                self.account_transfer_processor_policy.on_account_transfer_received(
                    message.data
                )
            case _:
                logger.error(
                    f"Transfer of type {message.data.type} and category {message.data.category} is not supported"
                )
    except Exception as e:
        logger.exception(
            "Error processing transfer",
            exception=e,
        )
        alert_on_error_processing_transfer_notification(
            notification=message,
            message="An unexpected error occurred while processing the transfer notification",
        )

components.payment_gateway.subcomponents.transfers.adapters.jpmorgan

helpers

record_transfer_update_from_callback_data

record_transfer_update_from_callback_data(
    session, /, *, data, workspace_key
)

Record a transfer update from a JPMorgan PaymentCallbackEvent webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/helpers.py
def record_transfer_update_from_callback_data(
    session: Session,
    /,
    *,
    data: JPMorganPaymentCallbackEvent,
    workspace_key: str,
) -> tuple[TransferUpdateModel, bool]:
    """Record a transfer update from a JPMorgan PaymentCallbackEvent webhook payload. Idempotent."""

    external_transaction_id = data.firm_root_id
    external_transfer_id = data.end_to_end_id

    # Get the bank transfer that we created when we initiated the payment request
    # See JPMorganWireTransferAdapter.initiate_wire_transfer_request
    with raise_if_bank_transfer_not_found_for_external_id(external_transfer_id):
        bank_transfer = BankTransferModelBroker.get_bank_transfer_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=external_transfer_id,
        )

    # Get new transfer update data from last one (there must be at least one that we created along with the bank transfer)
    assert len(bank_transfer.updates) > 0
    last_transfer_update = bank_transfer.updates[0]
    sequence_number = last_transfer_update.sequence_number + 1
    amount = last_transfer_update.amount
    status = _map_jpmorgan_status_to_payout_bank_transfer_status(data.payment_status)
    if status == PayoutBankTransferStatus.failed:
        # Need to invert the original amount for failed transfers for consistency with other providers
        amount = -amount

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=last_transfer_update.workspace_key,
        external_transfer_id=external_transfer_id,
        sequence_number=sequence_number,
        transfer_id=last_transfer_update.transfer_id,
        transfer_type=last_transfer_update.transfer_type,
        direction=last_transfer_update.direction,
        occurred_at=datetime.now(),
        amount=amount,
        currency=last_transfer_update.currency,
        status=status,
        raw=data.raw,
        external_transaction_id=external_transaction_id,
    )

policies

bank_transfer_processor

JPMorganBankTransferProcessorPolicy
JPMorganBankTransferProcessorPolicy(workspace_key)

This class is responsible for processing JPMorgan bank transfer events.

It processes JPMorgan payment callback events.

Tags
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/policies/bank_transfer_processor.py
def __init__(self, workspace_key: str) -> None:
    raise_on_provider_not_supported(workspace_key, PaymentServiceProvider.jpmorgan)
    self.workspace_key = workspace_key
on_payment_callback_received
on_payment_callback_received(data)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_payment_callback_received(self, data: JPMorganPaymentCallbackEvent) -> None:
    transfer_update = self._process_payment_callback(data)
    if transfer_update is not None:
        transfer_update_topic.publish(transfer_update)
workspace_key instance-attribute
workspace_key = workspace_key

topic_subscribers

JPMorganBankTransferTopicSubscriber

JPMorganBankTransferTopicSubscriber(
    *, bank_transfer_processor_policy
)

Bases: Subscriber

Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/topic_subscribers.py
def __init__(
    self,
    *,
    bank_transfer_processor_policy: JPMorganBankTransferProcessorPolicy,
) -> None:
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
receive
receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: JPMorganPaymentCallbackEvent) -> None:
    logger = current_logger.bind(
        firm_root_id=message.firm_root_id,
        end_to_end_id=message.end_to_end_id,
    )
    try:
        self.bank_transfer_processor_policy.on_payment_callback_received(message)
    except Exception as e:
        logger.exception(
            "Error processing transfer",
            exception=e,
        )

components.payment_gateway.subcomponents.transfers.adapters.messaging

PayoutBankTransferTopicSubscriber

Bases: Subscriber

This class is an adapter between shared pub/sub (used only by payment gateway) and shared messaging (use for inter-component communication).

receive

receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/messaging.py
@override
def receive(self, message: TransferUpdate) -> None:
    if (
        message.direction == TransferDirection.OUTGOING
        and message.transfer_type == TransferUpdateTransferType.BANK
    ):
        event = PayoutBankTransferUpdated(
            event_date=message.occurred_at,
            bank_transfer_id=BankTransferId(message.transfer_id),
            workspace_key=message.workspace_key,
            provider=mandatory(get_provider_for_workspace(message.workspace_key)),
            external_transfer_id=mandatory(message.external_transfer_id),
            external_transaction_id=mandatory(message.external_transaction_id),
            status=PayoutBankTransferStatus(message.status),
        )
        get_message_broker().publish(event)

components.payment_gateway.subcomponents.transfers.adapters.models

mappers

account_transfer_model_to_dataclass

account_transfer_model_to_dataclass(account_transfer)

Convert an AccountTransfer model to AccountTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def account_transfer_model_to_dataclass(
    account_transfer: "AccountTransferModel",
) -> "AccountTransferDataclass":
    """Convert an AccountTransfer model to AccountTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        AccountTransfer as AccountTransferDataclass,
        AccountTransferId,
        TransferHistoryId,
    )

    return AccountTransferDataclass(
        id=AccountTransferId(account_transfer.id),
        workspace_key=account_transfer.workspace_key,
        external_id=account_transfer.external_id,
        direction=account_transfer.direction,
        reference=account_transfer.reference,
        effective_date=account_transfer.effective_date,
        account_id=AccountId(account_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=TransferHistoryId(account_transfer.transfer_history_id),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in account_transfer.updates
        ],
        events=[
            transfer_event_model_to_dataclass(event)
            for event in account_transfer.events
        ],
        raw=account_transfer.raw,
    )

bank_transfer_model_to_dataclass

bank_transfer_model_to_dataclass(bank_transfer)

Convert a BankTransfer model to BankTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def bank_transfer_model_to_dataclass(
    bank_transfer: "BankTransferModel",
) -> "BankTransferDataclass":
    """Convert a BankTransfer model to BankTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
        SepaBeneficiaryId,
    )
    from components.payment_gateway.subcomponents.banking_documents.protected.entities import (
        SepaMandateId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        BankTransfer as BankTransferDataclass,
        BankTransferId,
        TransferHistoryId,
    )

    return BankTransferDataclass(
        id=BankTransferId(bank_transfer.id),
        workspace_key=bank_transfer.workspace_key,
        external_id=bank_transfer.external_id,
        direction=bank_transfer.direction,
        effective_date=bank_transfer.effective_date,
        account_id=AccountId(bank_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=(
            TransferHistoryId(bank_transfer.transfer_history_id)
            if bank_transfer.transfer_history_id is not None
            else None
        ),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in bank_transfer.updates
        ],
        events=[
            transfer_event_model_to_dataclass(event) for event in bank_transfer.events
        ],
        sepa_mandate_id=(
            SepaMandateId(bank_transfer.sepa_mandate_id)
            if bank_transfer.sepa_mandate_id is not None
            else None
        ),
        sepa_beneficiary_id=(
            SepaBeneficiaryId(bank_transfer.sepa_beneficiary_id)
            if bank_transfer.sepa_beneficiary_id is not None
            else None
        ),
        raw=bank_transfer.raw,
    )

card_transfer_model_to_dataclass

card_transfer_model_to_dataclass(
    card_transfer,
    start_event_date=None,
    end_event_date=None,
)

Convert a CardTransfer model to CardTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def card_transfer_model_to_dataclass(
    card_transfer: "CardTransferModel",
    start_event_date: datetime | None = None,
    end_event_date: datetime | None = None,
) -> "CardTransferDataclass":
    """Convert a CardTransfer model to CardTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
    )
    from components.payment_gateway.subcomponents.cards.protected.entities import (
        CardId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        CardPaymentMerchant,
        CardTransfer as CardTransferDataclass,
        CardTransferId,
        TransferHistoryId,
    )

    # TODO @frederic.bonnet 2025-09-01 Backfill the country code in the database.
    country_info = get_country_info_by_name(card_transfer.country)
    country_code = country_info.alpha_3 if country_info else "XXX"

    return CardTransferDataclass(
        id=CardTransferId(card_transfer.id),
        workspace_key=card_transfer.workspace_key,
        external_id=card_transfer.external_id,
        effective_date=card_transfer.effective_date,
        merchant=CardPaymentMerchant(
            merchant_id=card_transfer.merchant_id,
            name=card_transfer.name,
            mcc=card_transfer.mcc,
            postal_code=card_transfer.postal_code,
            city=card_transfer.city,
            country_code=country_code,
        ),
        card_id=CardId(card_transfer.card_id),  # type: ignore[arg-type]
        account_id=AccountId(card_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=TransferHistoryId(card_transfer.transfer_history_id),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in card_transfer.updates
            if (
                start_event_date is None
                or sanitize_tz(update.occurred_at) >= start_event_date
            )
            and (
                end_event_date is None
                or sanitize_tz(update.occurred_at) <= end_event_date
            )
        ],
        events=[
            transfer_event_model_to_dataclass(event)
            for event in card_transfer.events
            if (
                start_event_date is None
                or sanitize_tz(event.effective_date) >= start_event_date
            )
            and (
                end_event_date is None
                or sanitize_tz(event.effective_date) <= end_event_date
            )
        ],
        raw=card_transfer.raw,
    )

internal_transfer_model_to_dataclass

internal_transfer_model_to_dataclass(internal_transfer)

Convert an InternalTransfer model to InternalTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def internal_transfer_model_to_dataclass(
    internal_transfer: "InternalTransferModel",
) -> "InternalTransferDataclass":
    """Convert an InternalTransfer model to InternalTransfer dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        InternalTransfer as InternalTransferDataclass,
        InternalTransferId,
        TransferHistoryId,
    )

    return InternalTransferDataclass(
        id=InternalTransferId(internal_transfer.id),
        effective_date=internal_transfer.effective_date,
        amount=internal_transfer.amount,
        description=internal_transfer.description,
        reference=internal_transfer.reference,
        transfer_history_id=TransferHistoryId(internal_transfer.transfer_history_id),
    )

transfer_event_model_to_dataclass

transfer_event_model_to_dataclass(transfer_event)

Convert a TransferEvent model to TransferEvent dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def transfer_event_model_to_dataclass(
    transfer_event: "TransferEventModel",
) -> "TransferEventDataclass":
    """Convert a TransferEvent model to TransferEvent dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        TransferEvent as TransferEventDataclass,
        TransferEventId,
    )

    return TransferEventDataclass(
        id=TransferEventId(transfer_event.id),
        workspace_key=transfer_event.workspace_key,
        external_id=transfer_event.external_id,
        effective_date=transfer_event.effective_date,
        received=transfer_event.received,
        reserved=transfer_event.reserved,
        balance=transfer_event.balance,
        status=transfer_event.status,
        raw=transfer_event.raw,
    )

transfer_update_model_to_dataclass

transfer_update_model_to_dataclass(transfer_update)

Convert a TransferUpdate model to TransferUpdate dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def transfer_update_model_to_dataclass(
    transfer_update: "TransferUpdateModel",
) -> "TransferUpdateDataclass":
    """Convert a TransferUpdate model to TransferUpdate dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        AccountTransferId,
        BankTransferId,
        CardTransferId,
        TransferUpdate as TransferUpdateDataclass,
        TransferUpdateId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.enums import (
        TransferUpdateTransferType,
    )

    transfer_id: CardTransferId | BankTransferId | AccountTransferId
    match transfer_update.transfer_type:
        case TransferUpdateTransferType.CARD:
            transfer_id = CardTransferId(transfer_update.transfer_id)
        case TransferUpdateTransferType.BANK:
            transfer_id = BankTransferId(transfer_update.transfer_id)
        case TransferUpdateTransferType.ACCOUNT:
            transfer_id = AccountTransferId(transfer_update.transfer_id)
        case _:
            assert_never(transfer_update.transfer_type)  # Exhaustiveness check

    return TransferUpdateDataclass(
        id=TransferUpdateId(transfer_update.id),
        workspace_key=transfer_update.workspace_key,
        external_transfer_id=transfer_update.external_transfer_id,
        sequence_number=transfer_update.sequence_number,
        transfer_id=transfer_id,
        transfer_type=transfer_update.transfer_type,
        direction=transfer_update.direction,
        occurred_at=transfer_update.occurred_at,
        amount=transfer_update.amount,
        currency=transfer_update.currency,
        status=transfer_update.status,
        external_transaction_id=transfer_update.external_transaction_id,
        raw=transfer_update.raw,
    )

components.payment_gateway.subcomponents.transfers.adapters.revolut

policies

bank_transfer_processor

NormalizedRevolutData dataclass
NormalizedRevolutData(
    amount, currency, direction, payment_id
)

Result of translating Revolut data to internal format.

amount instance-attribute
amount

Amount in cents (always positive)

currency instance-attribute
currency

Currency code (bill_currency or currency)

direction instance-attribute
direction

Transfer direction (INCOMING or OUTGOING)

payment_id instance-attribute
payment_id

Payment ID extracted from request_id (None for refunds)

RevolutBankTransferProcessorPolicy
RevolutBankTransferProcessorPolicy(workspace_key)

This class is responsible for processing Revolut bank transfer events.

It processes Revolut TransactionCreated (creates bank transfer) and TransactionStateChanged (updates existing bank transfer) events.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
def __init__(self, workspace_key: str) -> None:
    raise_on_provider_not_supported(workspace_key, PaymentServiceProvider.revolut)
    self.workspace_key = workspace_key
on_transaction_created
on_transaction_created(data)

Process a TransactionCreated event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_transaction_created(self, data: TransactionCreatedEvent) -> None:
    """Process a TransactionCreated event."""
    from shared.feature_flags.client import bool_feature_flag

    if not bool_feature_flag(
        "payment-gateway-revolut",
        default_value=False,
    ):
        current_logger.info(
            "Revolut event processing disabled by feature flag",
            transaction_id=str(data.transaction_id),
        )
        return

    transfer_update = self._process_transaction_created(data)
    if transfer_update is not None:
        transfer_update_topic.publish(transfer_update)
on_transaction_state_changed
on_transaction_state_changed(data)

Process a TransactionStateChanged event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_transaction_state_changed(self, data: TransactionStateChangedEvent) -> None:
    """Process a TransactionStateChanged event."""
    from shared.feature_flags.client import bool_feature_flag

    if not bool_feature_flag(
        "payment-gateway-revolut",
        default_value=False,
    ):
        current_logger.info(
            "Revolut event processing disabled by feature flag",
            transaction_id=str(data.transaction_id),
        )
        return

    transfer_update = self._process_transaction_state_changed(data)
    if transfer_update is not None:
        transfer_update_topic.publish(transfer_update)
workspace_key instance-attribute
workspace_key = workspace_key

topic_subscribers

TransactionCreatedSubscriber

TransactionCreatedSubscriber(
    *, bank_transfer_processor_policy
)

Bases: Subscriber

Subscribes to Revolut TransactionCreated events.

Creates bank transfer records when new transactions are created.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
def __init__(
    self,
    *,
    bank_transfer_processor_policy: RevolutBankTransferProcessorPolicy,
) -> None:
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
receive
receive(message)

Receive and process a TransactionCreated event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: TransactionCreatedEvent) -> None:
    """Receive and process a TransactionCreated event."""
    logger = current_logger.bind(
        transaction_id=str(message.transaction_id),
        request_id=str(message.request_id),
        state=message.state,
    )
    try:
        self.bank_transfer_processor_policy.on_transaction_created(message)
    except Exception as e:
        logger.exception(
            "Error processing TransactionCreated",
            exception=e,
        )

TransactionStateChangedSubscriber

TransactionStateChangedSubscriber(
    *, bank_transfer_processor_policy
)

Bases: Subscriber

Subscribes to Revolut TransactionStateChanged events.

Updates bank transfer status when transaction state changes.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
def __init__(
    self,
    *,
    bank_transfer_processor_policy: RevolutBankTransferProcessorPolicy,
) -> None:
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
receive
receive(message)

Receive and process a TransactionStateChanged event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: TransactionStateChangedEvent) -> None:
    """Receive and process a TransactionStateChanged event."""
    logger = current_logger.bind(
        transaction_id=str(message.transaction_id),
        payment_id=str(message.payment_id) if message.payment_id else None,
        state=message.state,
        old_state=message.old_state,
    )
    try:
        self.bank_transfer_processor_policy.on_transaction_state_changed(message)
    except Exception as e:
        logger.exception(
            "Error processing TransactionStateChanged",
            exception=e,
        )