Skip to content

Adapters

Transfer adapters package.

This package contains adapters for converting between different data formats in the transfers subcomponent.

components.payment_gateway.subcomponents.transfers.adapters.adyen

helpers

associate_account_transfer_events_from_adyen_transfer_data

associate_account_transfer_events_from_adyen_transfer_data(
    session, /, data, account_transfer
)

Associate account transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_account_transfer_events_from_adyen_transfer_data(
    session: Session,
    /,
    data: "AdyenTransferData",
    account_transfer: AccountTransfer,
) -> None:
    """Associate account transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(AccountTransferTransferEventAssociation)
        .filter(
            AccountTransferTransferEventAssociation.account_transfer_id
            == account_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    # First temporarily reverse the sequence numbers of existing associations, this will avoid conflicts when reordering
    session.execute(
        update(AccountTransferTransferEventAssociation)
        .where(
            AccountTransferTransferEventAssociation.account_transfer_id
            == account_transfer.id
        )
        .values(sequence=-AccountTransferTransferEventAssociation.sequence - 1)
    )

    with session.no_autoflush:
        associations: list[AccountTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                workspace_key=account_transfer.workspace_key,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(AccountTransferTransferEventAssociation).filter(  #
                        AccountTransferTransferEventAssociation.account_transfer_id
                        == account_transfer.id,
                        AccountTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = AccountTransferTransferEventAssociation(
                account_transfer=account_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

    # Now set all negative sequence numbers to 0; this should trigger conflicts in case some were missing from the new sequence
    with raise_if_missing_account_transfer_event(account_transfer.id):
        session.execute(
            update(AccountTransferTransferEventAssociation)
            .where(
                AccountTransferTransferEventAssociation.account_transfer_id
                == account_transfer.id
            )
            .where(AccountTransferTransferEventAssociation.sequence < 0)
            .values(sequence=0)
        )

associate_bank_transfer_events_from_adyen_transfer_data

associate_bank_transfer_events_from_adyen_transfer_data(
    session, /, data, bank_transfer
)

Associate bank transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_bank_transfer_events_from_adyen_transfer_data(
    session: Session,
    /,
    data: "AdyenTransferData",
    bank_transfer: BankTransfer,
) -> None:
    """Associate bank transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(BankTransferTransferEventAssociation)
        .filter(
            BankTransferTransferEventAssociation.bank_transfer_id == bank_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    # First temporarily reverse the sequence numbers of existing associations, this will avoid conflicts when reordering
    session.execute(
        update(BankTransferTransferEventAssociation)
        .where(
            BankTransferTransferEventAssociation.bank_transfer_id == bank_transfer.id
        )
        .values(sequence=-BankTransferTransferEventAssociation.sequence - 1)
    )

    with session.no_autoflush:
        associations: list[BankTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                workspace_key=bank_transfer.workspace_key,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(BankTransferTransferEventAssociation).filter(
                        BankTransferTransferEventAssociation.bank_transfer_id
                        == bank_transfer.id,
                        BankTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = BankTransferTransferEventAssociation(
                bank_transfer=bank_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

    # Now set all negative sequence numbers to 0; this should trigger conflicts in case some were missing from the new sequence
    with raise_if_missing_bank_transfer_event(bank_transfer.id):
        session.execute(
            update(BankTransferTransferEventAssociation)
            .where(
                BankTransferTransferEventAssociation.bank_transfer_id
                == bank_transfer.id
            )
            .where(BankTransferTransferEventAssociation.sequence < 0)
            .values(sequence=0)
        )

associate_card_transfer_events_from_adyen_transfer_data

associate_card_transfer_events_from_adyen_transfer_data(
    session, /, data, card_transfer
)

Associate card transfer with its events, reordering them if needed. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def associate_card_transfer_events_from_adyen_transfer_data(
    session: Session,
    /,
    data: "AdyenTransferData",
    card_transfer: CardTransfer,
) -> None:
    """Associate card transfer with its events, reordering them if needed. Idempotent."""

    assert data.id
    assert data.events

    nb_associations = session.execute(
        select(func.count())
        .select_from(CardTransferTransferEventAssociation)
        .filter(
            CardTransferTransferEventAssociation.card_payment_id == card_transfer.id
        )
    ).scalar_one()
    if len(data.events) < nb_associations:
        return

    # First temporarily reverse the sequence numbers of existing associations, this will avoid conflicts when reordering
    session.execute(
        update(CardTransferTransferEventAssociation)
        .where(CardTransferTransferEventAssociation.card_payment_id == card_transfer.id)
        .values(sequence=-CardTransferTransferEventAssociation.sequence - 1)
    )

    with session.no_autoflush:
        associations: list[CardTransferTransferEventAssociation] = []
        for sequence, event_data in enumerate(data.events):
            assert event_data.id

            # Ignore non-accounting events
            if event_data.type != "accounting":
                continue

            transfer_event = TransferEventModelBroker.get_transfer_event_by_external_id(
                session,
                workspace_key=card_transfer.workspace_key,
                external_id=event_data.id,
            )
            association = (
                session.execute(
                    select(CardTransferTransferEventAssociation).filter(
                        CardTransferTransferEventAssociation.card_payment_id
                        == card_transfer.id,
                        CardTransferTransferEventAssociation.transfer_event_id
                        == transfer_event.id,
                    )
                )
                .scalars()
                .first()
            )
            if association:
                association.sequence = sequence
                associations.append(association)
                continue
            association = CardTransferTransferEventAssociation(
                card_payment=card_transfer,
                transfer_event=transfer_event,
                sequence=sequence,
            )
            associations.append(association)

        session.add_all(associations)

    # Now set all negative sequence numbers to 0; this should trigger conflicts in case some were missing from the new sequence
    with raise_if_missing_card_transfer_event(card_transfer.id):
        session.execute(
            update(CardTransferTransferEventAssociation)
            .where(
                CardTransferTransferEventAssociation.card_payment_id == card_transfer.id
            )
            .where(CardTransferTransferEventAssociation.sequence < 0)
            .values(sequence=0)
        )

compute_transfer_update_amount_from_adyen_account_transfer_data

compute_transfer_update_amount_from_adyen_account_transfer_data(
    data,
)

Compute the amount and currency for an account transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.ACCOUNT to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old account transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_adyen_account_transfer_data(
    data: "AdyenTransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for an account transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.ACCOUNT` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old account transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    assert data.events
    assert data.status
    assert data.direction
    assert data.amount
    assert data.amount.value
    assert data.amount.currency
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    currency = data.amount.currency
    amount = data.amount.value if data.direction == "incoming" else -data.amount.value

    return amount, currency

compute_transfer_update_amount_from_adyen_bank_transfer_data

compute_transfer_update_amount_from_adyen_bank_transfer_data(
    data,
)

Compute the amount and currency for a bank transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.BANK to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old bank transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_adyen_bank_transfer_data(
    data: "AdyenTransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for a bank transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.BANK` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old bank transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    currency = data.amount.currency
    amount = data.amount.value if data.direction == "incoming" else -data.amount.value
    if data.status == "failed":
        # Need to invert the amount for failed transfers, for consistency with e.g. cancelled card transfers
        amount = -amount

    return amount, currency

compute_transfer_update_amount_from_adyen_card_transfer_data

compute_transfer_update_amount_from_adyen_card_transfer_data(
    data,
)

Compute the amount and currency for a card transfer from an Adyen TransferData webhook payload.

Warning

This function is critical for client code that relies on the amount and currency of transfers (for example payroll or billing). If you have to change it, you need to do the following, in this order:

  • Call it from transfer_update_model_to_dataclass on transfer_type=TransferUpdateTransferType.CARD to override the stored values of existing transfers; newly recorded transfers will always use the new values (note that this will induce a performance hit because of the need to rehydrate the original payload)
  • Backfill ALL the old card transfer updates
  • Remove it from transfer_update_model_to_dataclass once the backfill is done.
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def compute_transfer_update_amount_from_adyen_card_transfer_data(
    data: "AdyenTransferData",
) -> tuple[int, str]:
    """Compute the amount and currency for a card transfer from an Adyen TransferData webhook payload.

    Warning:
        This function is critical for client code that relies on the amount and
        currency of transfers (for example payroll or billing). If you have to
        change it, you need to do the following, in this order:

        - Call it from `transfer_update_model_to_dataclass` on
          `transfer_type`=`TransferUpdateTransferType.CARD` to override the
          stored values of existing transfers; newly recorded transfers will
          always use the new values (note that this will induce a performance
          hit because of the need to rehydrate the original payload)
        - Backfill ALL the old card transfer updates
        - Remove it from `transfer_update_model_to_dataclass` once the backfill
          is done.
    """

    assert data.events

    # Infer the right amount and currency
    last_event = data.events[-1]
    if last_event.amount:
        # Should always be the case but a handful bugs (see below)
        currency = last_event.amount.currency
        match data.status:
            case "received" | "authorised":
                # Amount is what we want here, there can be several mutations with different currencies and we only want the actual value in the main currency
                amount = last_event.amount.value
            case "refused":
                # Invert the amount for consistency with authAdjustmentRefused
                amount = -last_event.amount.value
            case "authAdjustmentRefused":
                # Mutations are usually empty so use the last event amount
                amount = last_event.amount.value
            case _:
                # Use last event mutation change in the general case
                assert last_event.mutations
                assert len(last_event.mutations) == 1
                last_mutation = last_event.mutations[0]
                amount = (
                    (last_mutation.balance or 0)
                    + (last_mutation.reserved or 0)
                    + (last_mutation.received or 0)
                )
    elif data.status == "expired" and data.direction == "incoming":
        # This occurred a couple times for incoming transfers according to the logs so let's assume this is the only case we need to handle
        # Certainly an Adyen bug.
        # Take the reserved amount from the last mutation and the currency from the transfer (data shows it's OK)
        assert last_event.mutations
        assert len(last_event.mutations) == 1
        assert last_event.mutations[0].reserved
        currency = data.amount.currency
        amount = last_event.mutations[0].reserved
    else:
        # Last card transfer event has no amount, just use the value from the transfer data
        currency = data.amount.currency
        amount = data.amount.value

    return amount, currency

get_account_from_adyen_transfer_data

get_account_from_adyen_transfer_data(
    session, /, *, workspace_key, data
)

Get the account from an Adyen TransferData webhook payload.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def get_account_from_adyen_transfer_data(
    session: Session,
    /,
    *,
    workspace_key: str,
    data: "AdyenTransferData",
) -> Account:
    """Get the account from an Adyen TransferData webhook payload."""

    assert data.balanceAccount
    assert data.balanceAccount.id

    with raise_if_account_not_found_for_external_id(data.balanceAccount.id):
        return AccountModelBroker.get_account_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=data.balanceAccount.id,
        )

get_card_from_adyen_card_transfer_data

get_card_from_adyen_card_transfer_data(
    session, /, *, workspace_key, data
)

Get the account from an Adyen TransferData webhook payload.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def get_card_from_adyen_card_transfer_data(
    session: Session,
    /,
    *,
    workspace_key: str,
    data: "AdyenTransferData",
) -> Card:
    """Get the account from an Adyen TransferData webhook payload."""

    assert data.paymentInstrument
    assert data.paymentInstrument.id

    with raise_if_card_not_found_for_external_id(data.paymentInstrument.id):
        return CardModelBroker.get_card_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=data.paymentInstrument.id,
        )

get_payment_status_from_adyen_account_transfer_update

get_payment_status_from_adyen_account_transfer_update(
    transfer_update,
)

Map Adyen account transfer statuses to terminal PaymentRequestStatus.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def get_payment_status_from_adyen_account_transfer_update(
    transfer_update: TransferUpdate,
) -> PaymentRequestStatus | None:
    """Map Adyen account transfer statuses to terminal PaymentRequestStatus."""
    match transfer_update.status:
        case "booked":
            return PaymentRequestStatus.succeeded
        case "failed":
            return PaymentRequestStatus.failed
        case _:
            return None

get_payment_status_from_adyen_bank_transfer_update

get_payment_status_from_adyen_bank_transfer_update(
    transfer_update,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def get_payment_status_from_adyen_bank_transfer_update(
    transfer_update: TransferUpdate,
) -> PaymentRequestStatus | None:
    match transfer_update.status:
        case "received":
            # Ignore
            return None
        case "authorised":
            return PaymentRequestStatus.pending
        case "booked":
            return PaymentRequestStatus.succeeded
        case "failed":
            return PaymentRequestStatus.failed
        case _:
            current_logger.warning(
                f"""Unknown Adyen bank transfer update status {transfer_update.status}"""
            )
            return None

get_payment_status_from_adyen_card_transfer_update

get_payment_status_from_adyen_card_transfer_update(
    transfer_update,
)

Map Adyen card transfer statuses to terminal PaymentRequestStatus.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def get_payment_status_from_adyen_card_transfer_update(
    transfer_update: TransferUpdate,
) -> PaymentRequestStatus | None:
    """Map Adyen card transfer statuses to terminal PaymentRequestStatus."""
    match transfer_update.status:
        case "captured":
            return PaymentRequestStatus.succeeded
        case "refused" | "expired":
            return PaymentRequestStatus.failed
        case _:
            # Non-terminal statuses (authorised, refunded, captureReversed, etc.)
            return None

record_account_transfer_from_adyen_account_transfer_data

record_account_transfer_from_adyen_account_transfer_data(
    session, /, *, data, account, transfer_history_id
)

Record an account transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_account_transfer_from_adyen_account_transfer_data(
    session: Session,
    /,
    *,
    data: "AdyenTransferData",
    account: Account,
    transfer_history_id: UUID,
) -> tuple[AccountTransfer, bool]:
    """Record an account transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.reference
    assert data.creationDate
    assert data.direction

    return AccountTransferModelBroker.record_account_transfer(
        session,
        workspace_key=account.workspace_key,
        external_id=data.id.strip(),
        effective_date=datetime.fromisoformat(data.creationDate),
        direction=TransferDirection(data.direction),
        reference=data.reference,
        account_id=account.id,
        transfer_history_id=transfer_history_id,
        raw={k: v for k, v in data.to_dict().items() if k != "events"},
    )

record_bank_transfer_from_adyen_bank_transfer_data

record_bank_transfer_from_adyen_bank_transfer_data(
    session, /, *, data, account, transfer_history_id
)

Record a bank transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_bank_transfer_from_adyen_bank_transfer_data(
    session: Session,
    /,
    *,
    data: "AdyenTransferData",
    account: Account,
    transfer_history_id: UUID,
) -> tuple[BankTransfer, bool]:
    """Record a bank transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.creationDate
    assert data.direction

    return BankTransferModelBroker.record_bank_transfer(
        session,
        workspace_key=account.workspace_key,
        external_id=data.id.strip(),
        effective_date=datetime.fromisoformat(data.creationDate),
        direction=TransferDirection(data.direction),
        account_id=account.id,
        transfer_history_id=transfer_history_id,
        raw={k: v for k, v in data.to_dict().items() if k != "events"},
    )

record_card_transfer_from_adyen_card_transfer_data

record_card_transfer_from_adyen_card_transfer_data(
    session,
    /,
    *,
    data,
    card,
    account_id,
    transfer_history_id,
    merchant_info,
)

Record a card transfer from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_card_transfer_from_adyen_card_transfer_data(
    session: Session,
    /,
    *,
    data: "AdyenTransferData",
    card: Card,
    account_id: UUID,
    transfer_history_id: UUID,
    merchant_info: MerchantInfo,
) -> tuple[CardTransfer, bool]:
    """Record a card transfer from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.creationDate

    return CardTransferModelBroker.record_card_transfer(
        session,
        workspace_key=card.workspace_key,
        external_id=data.id.strip(),
        effective_date=datetime.fromisoformat(data.creationDate),
        mcc=merchant_info.mcc or "",
        merchant_id=merchant_info.merchant_id,
        postal_code=merchant_info.postal_code,
        city=merchant_info.city or "",
        country=merchant_info.country or "",
        name=merchant_info.name or "",
        card_id=card.id,
        account_id=account_id,
        transfer_history_id=transfer_history_id,
        raw=data.to_dict(),
    )

record_transfer_event_from_adyen_transfer_event

record_transfer_event_from_adyen_transfer_event(
    session, /, *, data, transfer
)

Record a transfer event from an Adyen TransferEvent webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_event_from_adyen_transfer_event(
    session: Session,
    /,
    *,
    data: "AdyenTransferEvent",
    transfer: AccountTransfer | BankTransfer | CardTransfer,
) -> tuple[TransferEvent, bool]:
    """Record a transfer event from an Adyen TransferEvent webhook payload. Idempotent."""

    assert data.id
    assert data.type == "accounting"
    assert data.status
    assert data.bookingDate

    received = reserved = balance = 0
    if not (data.mutations):
        current_logger.info(
            f"Transfer event {data.id} with type {data.type} has no mutations, assuming no balance change"
        )
    else:
        received = sum(mutation.received or 0 for mutation in data.mutations)
        reserved = sum(mutation.reserved or 0 for mutation in data.mutations)
        balance = sum(mutation.balance or 0 for mutation in data.mutations)

        if len(data.mutations) > 1:
            current_logger.error(
                "Multiple mutations found in Adyen transfer event, using the sum of them all"
            )

    return TransferEventModelBroker.record_transfer_event(
        session,
        workspace_key=transfer.workspace_key,
        external_id=data.id,
        effective_date=datetime.fromisoformat(data.bookingDate),
        received=received,
        reserved=reserved,
        balance=balance,
        status=data.status,
        raw=data.to_dict(),
    )

record_transfer_update_from_adyen_account_transfer_data

record_transfer_update_from_adyen_account_transfer_data(
    session, /, *, data, account_transfer
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_adyen_account_transfer_data(
    session: Session,
    /,
    *,
    data: "AdyenTransferData",
    account_transfer: AccountTransfer,
) -> tuple[TransferUpdate, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_adyen_account_transfer_data(
        data
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=account_transfer.workspace_key,
        external_transfer_id=data.id,
        sequence_number=data.sequenceNumber,
        transfer_id=account_transfer.id,
        transfer_type=TransferUpdateTransferType.ACCOUNT,
        direction=TransferDirection(data.direction),
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        raw=data.to_dict(),
    )

record_transfer_update_from_adyen_bank_transfer_data

record_transfer_update_from_adyen_bank_transfer_data(
    session, /, *, data, bank_transfer
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_adyen_bank_transfer_data(
    session: Session,
    /,
    *,
    data: "AdyenTransferData",
    bank_transfer: BankTransfer,
) -> tuple[TransferUpdate, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_adyen_bank_transfer_data(
        data
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=bank_transfer.workspace_key,
        external_transfer_id=data.id,
        sequence_number=data.sequenceNumber,
        transfer_id=bank_transfer.id,
        transfer_type=TransferUpdateTransferType.BANK,
        direction=TransferDirection(data.direction),
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        raw=data.to_dict(),
    )

record_transfer_update_from_adyen_card_transfer_data

record_transfer_update_from_adyen_card_transfer_data(
    session, /, *, data, card_transfer
)

Record a transfer update from an Adyen TransferData webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def record_transfer_update_from_adyen_card_transfer_data(
    session: Session,
    /,
    *,
    data: "AdyenTransferData",
    card_transfer: CardTransfer,
) -> tuple[TransferUpdate, bool]:
    """Record a transfer update from an Adyen TransferData webhook payload. Idempotent."""

    from shared.services.payment_providers.adyen.openapi.balance_platform_transfer_notification_v4 import (
        AdyenIssuedCard,
    )

    assert data.id
    assert data.sequenceNumber
    assert data.creationDate

    amount, currency = compute_transfer_update_amount_from_adyen_card_transfer_data(
        data
    )
    external_transaction_id = (
        data.categoryData.schemeUniqueTransactionId
        if isinstance(data.categoryData, AdyenIssuedCard)
        else None
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=card_transfer.workspace_key,
        external_transfer_id=data.id,
        sequence_number=data.sequenceNumber,
        transfer_id=card_transfer.id,
        transfer_type=TransferUpdateTransferType.CARD,
        direction=TransferDirection(data.direction),
        occurred_at=datetime.fromisoformat(data.creationDate),
        amount=amount,
        currency=currency,
        status=data.status,
        external_transaction_id=external_transaction_id,
        raw=data.to_dict(),
    )

update_payment_request_from_adyen_bank_transfer_data

update_payment_request_from_adyen_bank_transfer_data(
    session, /, *, data, bank_transfer
)

Update the payment request matching the bank transfer if it exists

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/helpers.py
def update_payment_request_from_adyen_bank_transfer_data(
    session: Session,
    /,
    *,
    data: "AdyenTransferData",
    bank_transfer: BankTransfer,
) -> PaymentRequest | None:
    """Update the payment request matching the bank transfer if it exists"""
    from shared.services.payment_providers.adyen.openapi.balance_platform_transfer_notification_v4 import (
        AdyenPlatformPayment,
    )

    if not isinstance(data.categoryData, AdyenPlatformPayment):
        return None

    try:
        # We store the pspReference/pspPaymentReference in PaymentRequest.external_id for JPMAdyen payment requests.
        # See AdyenDirectDebitAdapter.initiate_direct_debit_request
        payment_request = PaymentRequestModelBroker.get_payment_request_by_external_id(
            session,
            workspace_key=bank_transfer.workspace_key,
            external_id=mandatory(data.categoryData.pspPaymentReference),
        )
        PaymentRequestModelBroker.set_bank_transfer(
            session,
            id=payment_request.id,
            bank_transfer_id=bank_transfer.id,
        )
        return payment_request
    except NoResultFound:
        return None

policies

account_transfer_processor

AccountTransferProcessingResult dataclass
AccountTransferProcessingResult(
    account_transfer,
    is_account_transfer_created,
    transfer_update,
    is_transfer_update_created,
    new_transfer_events,
)
account_transfer instance-attribute
account_transfer
is_account_transfer_created instance-attribute
is_account_transfer_created
is_transfer_update_created instance-attribute
is_transfer_update_created
new_transfer_events instance-attribute
new_transfer_events
transfer_update instance-attribute
transfer_update
AdyenAccountTransferProcessorPolicy
AdyenAccountTransferProcessorPolicy(
    *, workspace_key, account_transfer_router
)

This class is responsible for processing account transfer events (a.k.a internal transfer from Adyen).

It processes Adyen transfer notifications of type internalTransfer or invoiceDeduction.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/policies/account_transfer_processor.py
def __init__(
    self,
    *,
    workspace_key: str,
    account_transfer_router: AccountTransferRouter,
) -> None:
    raise_on_provider_not_supported(workspace_key, PaymentServiceProvider.adyen)
    self.workspace_key = workspace_key
    self.account_transfer_router = account_transfer_router
    self.ledger_actions = LedgerActions()
account_transfer_router instance-attribute
account_transfer_router = account_transfer_router
ledger_actions instance-attribute
ledger_actions = LedgerActions()
on_account_transfer_received
on_account_transfer_received(data)
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/policies/account_transfer_processor.py
@obs.event_subscriber()
def on_account_transfer_received(self, data: "AdyenTransferData") -> None:
    set_tag("workspace", self.workspace_key)
    logger = current_logger.bind(
        transfer_type=data.type,
        transfer_id=data.id,
        transfer_sequence_number=data.sequenceNumber,
    )

    # Sanity checks
    # These fields are supposed to be present in the data; we can't perform any operation without them anyway
    from shared.services.payment_providers.adyen.openapi.balance_platform_transfer_notification_v4 import (
        AdyenInternalCategoryData,
    )

    try:
        assert data.type == "internalTransfer" or data.type == "invoiceDeduction"
        assert data.category == "internal"
        assert data.categoryData
        if data.categoryData and not isinstance(
            data.categoryData, AdyenInternalCategoryData
        ):
            # This can happen when deserializing from a JSON string
            data.categoryData = AdyenInternalCategoryData.from_dict(
                data.categoryData  # type: ignore[arg-type]
            )
        assert data.id
        assert data.sequenceNumber
        assert data.balanceAccount
        assert data.balanceAccount.id
        assert data.creationDate
        assert data.direction
        assert data.reference
        assert data.events

    except AssertionError:
        logger.error("Missing required fields in account transfer data")
        alert_on_error_processing_adyen_transfer_data(
            data=data,
            message="Transfer data is missing required fields",
        )
        raise ValueError("Missing required fields in account transfer data")

    try:
        result = self._process_account_transfer(logger=logger, data=data)

        self._notify_changes(result)

    except Exception as ex:
        logger.exception(
            "Error processing account transfer",
            exception=ex,
            transfer_data=data,
        )
        alert_on_error_processing_adyen_transfer_data(
            data=data,
            message="An unexpected error occurred while upserting the account transfer",
        )
        raise
workspace_key instance-attribute
workspace_key = workspace_key

bank_transfer_processor

AdyenBankTransferProcessorPolicy
AdyenBankTransferProcessorPolicy(
    *, workspace_key, bank_transfer_router
)

This class is responsible for processing bank transfer events.

It processes Adyen transfer notifications of type bankTransfer or capture.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/policies/bank_transfer_processor.py
def __init__(
    self,
    *,
    workspace_key: str,
    bank_transfer_router: BankTransferRouter,
) -> None:
    raise_on_provider_not_supported(workspace_key, PaymentServiceProvider.adyen)
    self.workspace_key = workspace_key
    self.bank_transfer_router = bank_transfer_router
    self.ledger_actions = LedgerActions()
bank_transfer_router instance-attribute
bank_transfer_router = bank_transfer_router
ledger_actions instance-attribute
ledger_actions = LedgerActions()
on_bank_transfer_received
on_bank_transfer_received(data)
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_bank_transfer_received(self, data: "AdyenTransferData") -> None:
    set_tag("workspace", self.workspace_key)
    logger = current_logger.bind(
        transfer_type=data.type,
        transfer_id=data.id,
        transfer_sequence_number=data.sequenceNumber,
    )

    # Sanity checks
    # These fields are supposed to be present in the data; we can't perform any operation without them anyway
    from shared.services.payment_providers.adyen.openapi.balance_platform_transfer_notification_v4 import (
        AdyenBankCategoryData,
        AdyenPlatformPayment,
    )

    try:
        match data.type:
            case "bankTransfer":
                assert data.category == "bank"
                assert data.categoryData
                if data.categoryData and not isinstance(
                    data.categoryData, AdyenBankCategoryData
                ):
                    # This can happen when deserializing from a JSON string
                    data.categoryData = AdyenBankCategoryData.from_dict(
                        data.categoryData  # type: ignore[arg-type]
                    )
            case "capture":
                assert data.category == "topUp"
                if data.categoryData and not isinstance(
                    data.categoryData, AdyenPlatformPayment
                ):
                    # This can happen when deserializing from a JSON string
                    data.categoryData = AdyenPlatformPayment.from_dict(
                        data.categoryData  # type: ignore[arg-type]
                    )
            case _:
                logger.error(f"Webhook {data.type} unknown")
                raise ValueError(f"Webhook {data.type} unknown")

        assert data.id
        assert data.sequenceNumber
        assert data.balanceAccount
        assert data.balanceAccount.id
        assert data.creationDate
        assert data.direction
        assert data.events
    except AssertionError:
        logger.error(f"Missing required fields in {data.type} transfer data")
        alert_on_error_processing_adyen_transfer_data(
            data=data,
            message="Transfer data is missing required fields",
        )
        raise ValueError(f"Missing required fields in {data.type} transfer data")

    try:
        result = self._process_bank_transfer(logger=logger, data=data)

        self._notify_changes(result)

    except Exception as ex:
        logger.exception(
            f"Error processing Adyen {data.type} transfer",
            exception=ex,
            transfer_data=data,
        )
        alert_on_error_processing_adyen_transfer_data(
            data=data,
            message="An unexpected error occurred while processing the bank transfer",
        )
        raise
workspace_key instance-attribute
workspace_key = workspace_key
BankTransferProcessingResult dataclass
BankTransferProcessingResult(
    bank_transfer,
    is_bank_transfer_created,
    transfer_update,
    is_transfer_update_created,
    new_transfer_events,
    payment_request,
)
bank_transfer instance-attribute
bank_transfer
is_bank_transfer_created instance-attribute
is_bank_transfer_created
is_transfer_update_created instance-attribute
is_transfer_update_created
new_transfer_events instance-attribute
new_transfer_events
payment_request instance-attribute
payment_request
transfer_update instance-attribute
transfer_update

card_transfer_processor

AdyenCardTransferProcessorPolicy
AdyenCardTransferProcessorPolicy(
    *,
    workspace_key,
    card_transfer_router,
    merchant_registry_policy=None
)

This class is responsible for processing card transfer events.

It processes Adyen transfer notifications of type payment.

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/policies/card_transfer_processor.py
def __init__(
    self,
    *,
    workspace_key: str,
    card_transfer_router: CardTransferRouter,
    merchant_registry_policy: MerchantRegistryPolicy | None = None,
) -> None:
    raise_on_provider_not_supported(workspace_key, PaymentServiceProvider.adyen)
    self.workspace_key = workspace_key
    self.card_transfer_router = card_transfer_router
    self.merchant_registry_policy = (
        merchant_registry_policy or MerchantRegistryPolicy()
    )
    self.ledger_actions = LedgerActions()
card_transfer_router instance-attribute
card_transfer_router = card_transfer_router
ledger_actions instance-attribute
ledger_actions = LedgerActions()
merchant_registry_policy instance-attribute
merchant_registry_policy = (
    merchant_registry_policy or MerchantRegistryPolicy()
)
on_payment_received
on_payment_received(data)
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/policies/card_transfer_processor.py
@obs.event_subscriber()
def on_payment_received(self, data: "AdyenTransferData") -> None:
    set_tag("workspace", self.workspace_key)
    logger = current_logger.bind(
        transfer_type=data.type,
        transfer_id=data.id,
        transfer_sequence_number=data.sequenceNumber,
    )

    # Sanity checks
    # These fields are supposed to be present in the data; we can't perform any operation without them anyway
    from shared.services.payment_providers.adyen.openapi.balance_platform_transfer_notification_v4 import (
        AdyenIssuedCard,
    )

    try:
        assert data.type == "payment"
        assert data.category == "issuedCard"
        if data.categoryData and not isinstance(data.categoryData, AdyenIssuedCard):
            # This can happen when deserializing from a JSON string
            data.categoryData = AdyenIssuedCard.from_dict(data.categoryData)  # type: ignore[arg-type]
        assert data.id
        assert data.sequenceNumber
        assert data.paymentInstrument
        assert data.paymentInstrument.id
        assert data.balanceAccount
        assert data.balanceAccount.id
        assert data.counterparty
        assert data.counterparty.merchant
        assert data.creationDate
        assert data.events
    except AssertionError:
        logger.error("Missing required fields in card transfer data")
        alert_on_error_processing_adyen_transfer_data(
            data=data,
            message="Transfer data is missing required fields",
        )
        raise ValueError("Missing required fields in card transfer data")

    # Get or update merchant info
    # This will fill any gap in the payload with data from the merchant registry
    merchant_info = self.merchant_registry_policy.upsert_merchant_info(
        data.counterparty.merchant
    )
    if not merchant_info:
        logger.error(
            "Cannot route payment with no merchant info",
            merchant_data=data.counterparty.merchant,
        )
        alert_on_error_processing_adyen_transfer_data(
            data=data,
            message="Transfer data is missing merchant info and we couldn't retrieve it from the merchant registry",
        )
        raise ValueError("Cannot route payment with no merchant info")

    try:
        result = self._process_card_transfer(
            logger=logger, data=data, merchant_info=merchant_info
        )

        self._notify_changes(result)

    except Exception as ex:
        logger.exception(
            "Error processing card transfer",
            exception=ex,
            transfer_data=data,
        )
        alert_on_error_processing_adyen_transfer_data(
            data=data,
            message="An unexpected error occurred while upserting the card transfer",
        )
        raise
workspace_key instance-attribute
workspace_key = workspace_key
CardTransferProcessingResult dataclass
CardTransferProcessingResult(
    card_transfer,
    is_card_transfer_created,
    transfer_update,
    is_transfer_update_created,
    new_transfer_events,
)
card_transfer instance-attribute
card_transfer
is_card_transfer_created instance-attribute
is_card_transfer_created
is_transfer_update_created instance-attribute
is_transfer_update_created
new_transfer_events instance-attribute
new_transfer_events
transfer_update instance-attribute
transfer_update

topic_subscribers

TransferTopicSubscriber

TransferTopicSubscriber(
    *,
    card_transfer_processor_policy,
    bank_transfer_processor_policy,
    account_transfer_processor_policy
)

Bases: Subscriber

This class subscribes to the Adyen transfer notification topic messages and dispatch them to application-provided processors

Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/topic_subscribers.py
def __init__(
    self,
    *,
    card_transfer_processor_policy: AdyenCardTransferProcessorPolicy,
    bank_transfer_processor_policy: AdyenBankTransferProcessorPolicy,
    account_transfer_processor_policy: AdyenAccountTransferProcessorPolicy,
) -> None:
    self.card_transfer_processor_policy = card_transfer_processor_policy
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
    self.account_transfer_processor_policy = account_transfer_processor_policy
account_transfer_processor_policy instance-attribute
account_transfer_processor_policy = (
    account_transfer_processor_policy
)
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
card_transfer_processor_policy instance-attribute
card_transfer_processor_policy = (
    card_transfer_processor_policy
)
receive
receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/adyen/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: AdyenTransferNotificationRequest) -> None:
    logger = current_logger.bind(
        type=message.type,
        transfer_type=message.data.type,
        transfer_id=message.data.id,
        transfer_sequence_number=message.data.sequenceNumber,
    )
    try:
        match message.data.type, message.data.category:
            case "payment", "issuedCard":
                self.card_transfer_processor_policy.on_payment_received(
                    message.data
                )
            case ("bankTransfer", "bank") | ("capture", "topUp"):
                self.bank_transfer_processor_policy.on_bank_transfer_received(
                    message.data
                )
            case ("capture", "platformPayment") | ("fee", "internal"):
                logger.info(
                    f"Webhook with id: {message.data.id}, type: {message.data.type} and category: {message.data.category} ignored"
                )
                return
            case "internalTransfer" | "invoiceDeduction", "internal":
                self.account_transfer_processor_policy.on_account_transfer_received(
                    message.data
                )
            case _:
                logger.error(
                    f"Transfer of type {message.data.type} and category {message.data.category} is not supported"
                )
    except Exception as e:
        logger.exception(
            "Error processing Adyen transfer notification",
            exception=e,
        )
        alert_on_error_processing_adyen_transfer_notification_request(
            notification=message,
            message="An unexpected error occurred while processing Adyen transfer notification",
        )

components.payment_gateway.subcomponents.transfers.adapters.jpmorgan

helpers

get_bank_transfer_from_jpmorgan_callback

get_bank_transfer_from_jpmorgan_callback(
    session, /, *, workspace_key, data
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/helpers.py
def get_bank_transfer_from_jpmorgan_callback(
    session: Session,
    /,
    *,
    workspace_key: str,
    data: "JPMorganCallback",
) -> BankTransfer:
    assert data.firmRootId

    # Get the bank transfer that we created when we initiated the payment request
    # See JPMorganWireTransferAdapter.initiate_wire_transfer_request
    with raise_if_bank_transfer_not_found_for_external_id(data.firmRootId):
        return BankTransferModelBroker.get_bank_transfer_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=data.firmRootId,
        )

get_payment_request_from_jpmorgan_callback

get_payment_request_from_jpmorgan_callback(
    session, /, *, workspace_key, data
)

Get a Payment Request from a JPMorgan PaymentCallbackEvent webhook payload.

Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/helpers.py
def get_payment_request_from_jpmorgan_callback(
    session: Session,
    /,
    *,
    workspace_key: str,
    data: "JPMorganCallback",
) -> PaymentRequest | None:
    """Get a Payment Request from a JPMorgan PaymentCallbackEvent webhook payload."""

    try:
        # We store the firmRootId in PaymentRequest.external_id for JPMorgan payment requests.
        # See JPMorganWireTransferAdapter.initiate_wire_transfer_request
        return PaymentRequestModelBroker.get_payment_request_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=mandatory(data.firmRootId),
        )
    except NoResultFound:
        return None

get_payment_status_from_jpmorgan_bank_transfer_update

get_payment_status_from_jpmorgan_bank_transfer_update(
    transfer_update,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/helpers.py
def get_payment_status_from_jpmorgan_bank_transfer_update(
    transfer_update: TransferUpdate,
) -> PaymentRequestStatus | None:
    from shared.services.payment_providers.jpmorgan.openapi.global_payments_api_1_1_33 import (
        JPMorganPaymentStatus,
    )

    match transfer_update.status:
        case (
            JPMorganPaymentStatus.PENDING
            | JPMorganPaymentStatus.PENDING_POSTING
            | JPMorganPaymentStatus.WAREHOUSED
        ):
            return PaymentRequestStatus.pending
        case JPMorganPaymentStatus.COMPLETED | JPMorganPaymentStatus.COMPLETED_CREDITED:
            return PaymentRequestStatus.succeeded
        case (
            JPMorganPaymentStatus.REJECTED
            | JPMorganPaymentStatus.RETURNED
            | JPMorganPaymentStatus.BLOCKED
        ):
            return PaymentRequestStatus.failed
        case _:
            current_logger.warning(
                f"""Unknown JPMorgan bank transfer update status {transfer_update.status}"""
            )
            return None

record_transfer_update_from_jpmorgan_callback

record_transfer_update_from_jpmorgan_callback(
    session, /, *, data, bank_transfer
)

Record a transfer update from a JPMorgan PaymentCallbackEvent webhook payload. Idempotent.

Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/helpers.py
def record_transfer_update_from_jpmorgan_callback(
    session: Session,
    /,
    *,
    data: "JPMorganCallback",
    bank_transfer: BankTransfer,
) -> tuple[TransferUpdate, bool]:
    """Record a transfer update from a JPMorgan PaymentCallbackEvent webhook payload. Idempotent."""
    from shared.services.payment_providers.jpmorgan.openapi.global_payments_api_1_1_33 import (
        JPMorganPaymentStatus,
    )

    assert data.paymentStatus

    # Get new transfer update data from last one (there must be at least one that we created along with the bank transfer)
    assert len(bank_transfer.updates) > 0
    last_transfer_update = bank_transfer.updates[0]
    sequence_number = last_transfer_update.sequence_number + 1  # TODO check this
    amount = last_transfer_update.amount

    if data.paymentStatus in [
        JPMorganPaymentStatus.REJECTED,
        JPMorganPaymentStatus.RETURNED,
        JPMorganPaymentStatus.BLOCKED,
    ]:
        # Need to invert the original amount for failed transfers for consistency with other providers
        amount = -amount

    transfer_update, is_transfer_update_created = (
        TransferUpdateModelBroker.record_transfer_update(
            session,
            workspace_key=last_transfer_update.workspace_key,
            external_transfer_id=last_transfer_update.external_transfer_id,
            sequence_number=sequence_number,
            transfer_id=last_transfer_update.transfer_id,
            transfer_type=last_transfer_update.transfer_type,
            direction=last_transfer_update.direction,
            occurred_at=datetime.now(),
            amount=amount,
            currency=last_transfer_update.currency,
            status=data.paymentStatus,
            raw=data.model_dump(),
            external_transaction_id=last_transfer_update.external_transaction_id,
        )
    )

    return (
        transfer_update,
        is_transfer_update_created,
    )

policies

bank_transfer_processor

JPMorganBankTransferProcessorPolicy
JPMorganBankTransferProcessorPolicy(workspace_key)

This class is responsible for processing JPMorgan bank transfer events.

It processes JPMorgan payment callback events.

Tags
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/policies/bank_transfer_processor.py
def __init__(self, workspace_key: str) -> None:
    raise_on_provider_not_supported(workspace_key, PaymentServiceProvider.jpmorgan)
    self.workspace_key = workspace_key
on_payment_callback_received
on_payment_callback_received(data)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_payment_callback_received(self, data: "JPMorganCallback") -> None:
    set_tag("workspace", self.workspace_key)
    logger = current_logger.bind(
        callback_data=data,
    )

    # Sanity checks
    # These fields are supposed to be present in the data; we can't perform any operation without them anyway
    try:
        assert data.firmRootId
        assert data.endToEndId
        assert data.paymentStatus
    except AssertionError:
        logger.error("Missing required fields in payment callback data")
        alert_on_error_processing_jpmorgan_payment_callback(
            callback_data=data,
            message="Callback data is missing required fields",
        )
        raise ValueError("Missing required fields in payment callback data")

    try:
        result = self._process_payment_callback(data)

        self._notify_changes(result)

    except Exception as ex:
        logger.exception(
            "Error processing JPMorgan Callback",
            exception=ex,
            callback_data=data,
        )
        alert_on_error_processing_jpmorgan_payment_callback(
            callback_data=data,
            message="An unexpected error occurred while processing the payment callback",
        )
        raise
workspace_key instance-attribute
workspace_key = workspace_key
PaymentCallbackProcessingResult dataclass
PaymentCallbackProcessingResult(
    bank_transfer,
    transfer_update,
    is_transfer_update_created,
    payment_request,
)
bank_transfer instance-attribute
bank_transfer
is_transfer_update_created instance-attribute
is_transfer_update_created
payment_request instance-attribute
payment_request
transfer_update instance-attribute
transfer_update

topic_subscribers

JPMorganBankTransferTopicSubscriber

JPMorganBankTransferTopicSubscriber(
    *, bank_transfer_processor_policy
)

Bases: Subscriber

Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/topic_subscribers.py
def __init__(
    self,
    *,
    bank_transfer_processor_policy: JPMorganBankTransferProcessorPolicy,
) -> None:
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
receive
receive(message)
Source code in components/payment_gateway/subcomponents/transfers/adapters/jpmorgan/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: JPMorganCallback) -> None:
    logger = current_logger.bind(callback_data=message)
    try:
        self.bank_transfer_processor_policy.on_payment_callback_received(message)
    except Exception as e:
        logger.exception(
            "Error processing JPMorgan callback",
            exception=e,
        )
        alert_on_error_processing_jpmorgan_callback(
            callback_data=message,
            message="An unexpected error occurred while processing JPMorgan callback",
        )

components.payment_gateway.subcomponents.transfers.adapters.models

mappers

account_transfer_model_to_dataclass

account_transfer_model_to_dataclass(account_transfer)

Convert an AccountTransfer model to AccountTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def account_transfer_model_to_dataclass(
    account_transfer: "AccountTransferModel",
) -> "AccountTransferDataclass":
    """Convert an AccountTransfer model to AccountTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        AccountTransfer as AccountTransferDataclass,
        AccountTransferId,
        TransferHistoryId,
    )

    return AccountTransferDataclass(
        id=AccountTransferId(account_transfer.id),
        workspace_key=account_transfer.workspace_key,
        external_id=account_transfer.external_id,
        direction=account_transfer.direction,
        reference=account_transfer.reference,
        effective_date=account_transfer.effective_date,
        account_id=AccountId(account_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=TransferHistoryId(account_transfer.transfer_history_id),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in account_transfer.updates
        ],
        events=[
            transfer_event_model_to_dataclass(event)
            for event in account_transfer.events
        ],
        raw=account_transfer.raw,
    )

bank_transfer_model_to_dataclass

bank_transfer_model_to_dataclass(bank_transfer)

Convert a BankTransfer model to BankTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def bank_transfer_model_to_dataclass(
    bank_transfer: "BankTransferModel",
) -> "BankTransferDataclass":
    """Convert a BankTransfer model to BankTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
        SepaBeneficiaryId,
    )
    from components.payment_gateway.subcomponents.banking_documents.protected.entities import (
        SepaMandateId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        BankTransfer as BankTransferDataclass,
        BankTransferId,
        TransferHistoryId,
    )

    return BankTransferDataclass(
        id=BankTransferId(bank_transfer.id),
        workspace_key=bank_transfer.workspace_key,
        external_id=bank_transfer.external_id,
        direction=bank_transfer.direction,
        effective_date=bank_transfer.effective_date,
        account_id=AccountId(bank_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=(
            TransferHistoryId(bank_transfer.transfer_history_id)
            if bank_transfer.transfer_history_id is not None
            else None
        ),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in bank_transfer.updates
        ],
        events=[
            transfer_event_model_to_dataclass(event) for event in bank_transfer.events
        ],
        sepa_mandate_id=(
            SepaMandateId(bank_transfer.sepa_mandate_id)
            if bank_transfer.sepa_mandate_id is not None
            else None
        ),
        sepa_beneficiary_id=(
            SepaBeneficiaryId(bank_transfer.sepa_beneficiary_id)
            if bank_transfer.sepa_beneficiary_id is not None
            else None
        ),
        raw=bank_transfer.raw,
    )

card_transfer_model_to_dataclass

card_transfer_model_to_dataclass(
    card_transfer,
    start_event_date=None,
    end_event_date=None,
)

Convert a CardTransfer model to CardTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def card_transfer_model_to_dataclass(
    card_transfer: "CardTransferModel",
    start_event_date: datetime | None = None,
    end_event_date: datetime | None = None,
) -> "CardTransferDataclass":
    """Convert a CardTransfer model to CardTransfer dataclass."""
    from components.payment_gateway.subcomponents.accounts.protected.entities import (
        AccountId,
    )
    from components.payment_gateway.subcomponents.cards.protected.entities import (
        CardId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        CardPaymentMerchant,
        CardTransfer as CardTransferDataclass,
        CardTransferId,
        TransferHistoryId,
    )

    # TODO @frederic.bonnet 2025-09-01 Backfill the country code in the database.
    country_info = get_country_info_by_name(card_transfer.country)
    country_code = country_info.alpha_3 if country_info else "XXX"

    return CardTransferDataclass(
        id=CardTransferId(card_transfer.id),
        workspace_key=card_transfer.workspace_key,
        external_id=card_transfer.external_id,
        effective_date=card_transfer.effective_date,
        merchant=CardPaymentMerchant(
            merchant_id=card_transfer.merchant_id,
            name=card_transfer.name,
            mcc=card_transfer.mcc,
            postal_code=card_transfer.postal_code,
            city=card_transfer.city,
            country_code=country_code,
        ),
        card_id=CardId(card_transfer.card_id),  # type: ignore[arg-type]
        account_id=AccountId(card_transfer.account_id),  # type: ignore[arg-type]
        transfer_history_id=TransferHistoryId(card_transfer.transfer_history_id),
        updates=[
            transfer_update_model_to_dataclass(update)
            for update in card_transfer.updates
            if (
                start_event_date is None
                or sanitize_tz(update.occurred_at) >= start_event_date
            )
            and (
                end_event_date is None
                or sanitize_tz(update.occurred_at) <= end_event_date
            )
        ],
        events=[
            transfer_event_model_to_dataclass(event)
            for event in card_transfer.events
            if (
                start_event_date is None
                or sanitize_tz(event.effective_date) >= start_event_date
            )
            and (
                end_event_date is None
                or sanitize_tz(event.effective_date) <= end_event_date
            )
        ],
        raw=card_transfer.raw,
    )

internal_transfer_model_to_dataclass

internal_transfer_model_to_dataclass(internal_transfer)

Convert an InternalTransfer model to InternalTransfer dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def internal_transfer_model_to_dataclass(
    internal_transfer: "InternalTransferModel",
) -> "InternalTransferDataclass":
    """Convert an InternalTransfer model to InternalTransfer dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        InternalTransfer as InternalTransferDataclass,
        InternalTransferId,
        TransferHistoryId,
    )

    return InternalTransferDataclass(
        id=InternalTransferId(internal_transfer.id),
        effective_date=internal_transfer.effective_date,
        amount=internal_transfer.amount,
        description=internal_transfer.description,
        reference=internal_transfer.reference,
        transfer_history_id=TransferHistoryId(internal_transfer.transfer_history_id),
    )

transfer_event_model_to_dataclass

transfer_event_model_to_dataclass(transfer_event)

Convert a TransferEvent model to TransferEvent dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def transfer_event_model_to_dataclass(
    transfer_event: "TransferEventModel",
) -> "TransferEventDataclass":
    """Convert a TransferEvent model to TransferEvent dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        TransferEvent as TransferEventDataclass,
        TransferEventId,
    )

    return TransferEventDataclass(
        id=TransferEventId(transfer_event.id),
        workspace_key=transfer_event.workspace_key,
        external_id=transfer_event.external_id,
        effective_date=transfer_event.effective_date,
        received=transfer_event.received,
        reserved=transfer_event.reserved,
        balance=transfer_event.balance,
        status=transfer_event.status,
        raw=transfer_event.raw,
    )

transfer_update_model_to_dataclass

transfer_update_model_to_dataclass(transfer_update)

Convert a TransferUpdate model to TransferUpdate dataclass.

Source code in components/payment_gateway/subcomponents/transfers/adapters/models/mappers.py
def transfer_update_model_to_dataclass(
    transfer_update: "TransferUpdateModel",
) -> "TransferUpdateDataclass":
    """Convert a TransferUpdate model to TransferUpdate dataclass."""
    from components.payment_gateway.subcomponents.transfers.protected.entities import (
        AccountTransferId,
        BankTransferId,
        CardTransferId,
        TransferUpdate as TransferUpdateDataclass,
        TransferUpdateId,
    )
    from components.payment_gateway.subcomponents.transfers.protected.enums import (
        TransferUpdateTransferType,
    )

    transfer_id: CardTransferId | BankTransferId | AccountTransferId
    match transfer_update.transfer_type:
        case TransferUpdateTransferType.CARD:
            transfer_id = CardTransferId(transfer_update.transfer_id)
        case TransferUpdateTransferType.BANK:
            transfer_id = BankTransferId(transfer_update.transfer_id)
        case TransferUpdateTransferType.ACCOUNT:
            transfer_id = AccountTransferId(transfer_update.transfer_id)
        case _:
            assert_never(transfer_update.transfer_type)  # Exhaustiveness check

    return TransferUpdateDataclass(
        id=TransferUpdateId(transfer_update.id),
        workspace_key=transfer_update.workspace_key,
        external_transfer_id=transfer_update.external_transfer_id,
        sequence_number=transfer_update.sequence_number,
        transfer_id=transfer_id,
        transfer_type=transfer_update.transfer_type,
        direction=transfer_update.direction,
        occurred_at=transfer_update.occurred_at,
        amount=transfer_update.amount,
        currency=transfer_update.currency,
        status=transfer_update.status,
        external_transaction_id=transfer_update.external_transaction_id,
        raw=transfer_update.raw,
    )

components.payment_gateway.subcomponents.transfers.adapters.revolut

helpers

compute_bank_transfer_direction_from_revolut_transaction_created_event

compute_bank_transfer_direction_from_revolut_transaction_created_event(
    data,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def compute_bank_transfer_direction_from_revolut_transaction_created_event(
    data: "RevolutTransactionCreatedEvent",
) -> TransferDirection:
    assert len(data.legs) >= 1, "Transaction must have at least one leg"
    return _get_direction_from_leg(data.legs[0])

compute_bank_transfer_direction_from_revolut_transaction_info

compute_bank_transfer_direction_from_revolut_transaction_info(
    transaction_info,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def compute_bank_transfer_direction_from_revolut_transaction_info(
    transaction_info: "RevolutTransaction",
) -> TransferDirection:
    assert len(transaction_info.legs) >= 1, "Transaction must have at least one leg"
    return _get_direction_from_leg(transaction_info.legs[0])

compute_transfer_update_amount_from_out_of_order_revolut_transaction_state_changed_event

compute_transfer_update_amount_from_out_of_order_revolut_transaction_state_changed_event(
    data, transaction_info
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def compute_transfer_update_amount_from_out_of_order_revolut_transaction_state_changed_event(
    data: "RevolutTransactionStateChangedEvent",
    transaction_info: "RevolutTransaction",
) -> tuple[int, str]:
    from shared.services.payment_providers.revolut.openapi.business_1_0 import (
        RevolutTransactionState,
    )

    assert len(transaction_info.legs) >= 1, "Transaction must have at least one leg"
    amount, currency = _get_amount_from_leg(transaction_info.legs[0])

    if data.new_state in [
        RevolutTransactionState.declined,
        RevolutTransactionState.failed,
        RevolutTransactionState.reverted,
    ]:
        # Need to invert the original amount for failed transfers for consistency with other providers
        amount = -amount

    return (amount, currency)

compute_transfer_update_amount_from_revolut_transaction_created_event

compute_transfer_update_amount_from_revolut_transaction_created_event(
    data,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def compute_transfer_update_amount_from_revolut_transaction_created_event(
    data: "RevolutTransactionCreatedEvent",
) -> tuple[int, str]:
    assert len(data.legs) >= 1, "Transaction must have at least one leg"
    return _get_amount_from_leg(data.legs[0])

compute_transfer_update_amount_from_revolut_transaction_state_changed_event

compute_transfer_update_amount_from_revolut_transaction_state_changed_event(
    data, transfer_update
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def compute_transfer_update_amount_from_revolut_transaction_state_changed_event(
    data: "RevolutTransactionStateChangedEvent",
    transfer_update: TransferUpdate,
) -> tuple[int, str]:
    from shared.services.payment_providers.revolut.openapi.business_1_0 import (
        RevolutTransactionState,
    )

    amount = transfer_update.amount
    currency = transfer_update.currency

    if data.new_state in [
        RevolutTransactionState.declined,
        RevolutTransactionState.failed,
        RevolutTransactionState.reverted,
    ]:
        # Need to invert the original amount for failed transfers for consistency with other providers
        amount = -amount

    return (amount, currency)

get_account_from_revolut_transaction_created_event

get_account_from_revolut_transaction_created_event(
    session, /, *, workspace_key, data
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_account_from_revolut_transaction_created_event(
    session: Session,
    /,
    *,
    workspace_key: str,
    data: "RevolutTransactionCreatedEvent",
) -> Account:
    assert len(data.legs) >= 1, "Transaction must have at least one leg"
    first_leg = data.legs[0]
    with raise_if_account_not_found_for_external_id(str(first_leg.account_id)):
        return AccountModelBroker.get_account_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=str(first_leg.account_id),
        )

get_account_from_revolut_transaction_info

get_account_from_revolut_transaction_info(
    session, /, *, workspace_key, transaction_info
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_account_from_revolut_transaction_info(
    session: Session,
    /,
    *,
    workspace_key: str,
    transaction_info: "RevolutTransaction",
) -> Account:
    assert len(transaction_info.legs) >= 1, "Transaction must have at least one leg"
    first_leg = transaction_info.legs[0]
    with raise_if_account_not_found_for_external_id(str(first_leg.account_id)):
        return AccountModelBroker.get_account_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=str(first_leg.account_id),
        )

get_bank_transfer_from_revolut_transaction_state_changed_event

get_bank_transfer_from_revolut_transaction_state_changed_event(
    session, /, *, workspace_key, data
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_bank_transfer_from_revolut_transaction_state_changed_event(
    session: Session,
    /,
    *,
    workspace_key: str,
    data: "RevolutTransactionStateChangedEvent",
) -> BankTransfer:
    with raise_if_bank_transfer_not_found_for_external_id(str(data.transaction_id)):
        return BankTransferModelBroker.get_bank_transfer_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=str(data.transaction_id),
        )

get_payment_reference_from_revolut_transaction_created_event

get_payment_reference_from_revolut_transaction_created_event(
    session, /, *, workspace_key, data
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_payment_reference_from_revolut_transaction_created_event(
    session: Session,
    /,
    *,
    workspace_key: str,
    data: "RevolutTransactionCreatedEvent",
) -> str | None:
    from shared.services.payment_providers.revolut.openapi.business_1_0 import (
        RevolutTransactionType,
    )

    if data.type != RevolutTransactionType.refund:
        direction = (
            compute_bank_transfer_direction_from_revolut_transaction_created_event(data)
        )
        if direction == TransferDirection.INCOMING:
            # Ignoring incoming transactions that are not refunds
            current_logger.info(
                "Ignoring incoming transfer (positive amount)",
                event_data=data,
            )
            return None

        # We pass the payment reference as request_id when creating the Revolut payment
        # See RevolutWireTransferAdapter.initiate_wire_transfer_request
        return data.request_id
    else:
        # For refunds, get reference from the original transaction
        if not data.related_transaction_id:
            current_logger.warning(
                "Refund transaction missing related_transaction_id, ignoring",
                event_data=data,
            )
            return None

        payment_reference = _get_payment_reference_from_original_transaction_id(
            session,
            workspace_key=workspace_key,
            related_transaction_id=data.related_transaction_id,
        )
        if not payment_reference:
            current_logger.warning(
                "Could not find original transaction for refund, ignoring",
                event_data=data,
            )
            return None

        return payment_reference

get_payment_reference_from_revolut_transaction_info

get_payment_reference_from_revolut_transaction_info(
    session, /, *, workspace_key, transaction_info
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_payment_reference_from_revolut_transaction_info(
    session: Session,
    /,
    *,
    workspace_key: str,
    transaction_info: "RevolutTransaction",
) -> str | None:
    from shared.services.payment_providers.revolut.openapi.business_1_0 import (
        RevolutTransactionType,
    )

    if transaction_info.type != RevolutTransactionType.refund:
        direction = compute_bank_transfer_direction_from_revolut_transaction_info(
            transaction_info
        )
        if direction == TransferDirection.INCOMING:
            # Ignoring incoming transactions that are not refunds
            current_logger.info(
                "Ignoring incoming transfer (positive amount)",
                transaction_info=transaction_info,
            )
            return None

        # We pass the payment reference as request_id when creating the Revolut payment
        # See RevolutWireTransferAdapter.initiate_wire_transfer_request
        return transaction_info.request_id
    else:
        # For refunds, get reference from the original transaction
        if not transaction_info.related_transaction_id:
            current_logger.warning(
                "Refund transaction missing related_transaction_id, ignoring",
                transaction_info=transaction_info,
            )
            return None

        payment_reference = _get_payment_reference_from_original_transaction_id(
            session,
            workspace_key=workspace_key,
            related_transaction_id=transaction_info.related_transaction_id,
        )
        if not payment_reference:
            current_logger.warning(
                "Could not find original transaction for refund, ignoring",
                transaction_info=transaction_info,
            )
            return None

        return payment_reference

get_payment_request_from_revolut_refund_transaction_event

get_payment_request_from_revolut_refund_transaction_event(
    session, /, *, data, workspace_key
)

Get original payment request from a Revolut refund transaction event payload.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_payment_request_from_revolut_refund_transaction_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionCreatedEvent",
    workspace_key: str,
) -> PaymentRequest | None:
    """Get original payment request from a Revolut refund transaction event payload."""
    if not data.related_transaction_id:
        return None

    try:
        # We store the transaction ID in PaymentRequest.external_id for Revolut payment requests.
        # See RevolutWireTransferAdapter.initiate_wire_transfer_request
        return PaymentRequestModelBroker.get_payment_request_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=str(data.related_transaction_id),
        )
    except NoResultFound:
        return None

get_payment_request_from_revolut_transaction_event

get_payment_request_from_revolut_transaction_event(
    session, /, *, data, workspace_key
)

Get payment request from a Revolut transaction event payload.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_payment_request_from_revolut_transaction_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionCreatedEvent|RevolutTransactionStateChangedEvent",
    workspace_key: str,
) -> PaymentRequest | None:
    """Get payment request from a Revolut transaction event payload."""

    try:
        # We store the transaction ID in PaymentRequest.external_id for Revolut payment requests.
        # See RevolutWireTransferAdapter.initiate_wire_transfer_request
        return PaymentRequestModelBroker.get_payment_request_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=str(data.transaction_id),
        )
    except NoResultFound:
        return None

get_payment_status_from_revolut_bank_transfer_update

get_payment_status_from_revolut_bank_transfer_update(
    transfer_update,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def get_payment_status_from_revolut_bank_transfer_update(
    transfer_update: TransferUpdate,
) -> PaymentRequestStatus | None:
    from shared.services.payment_providers.revolut.openapi.business_1_0 import (
        RevolutTransactionState,
    )

    match transfer_update.status:
        case RevolutTransactionState.created:
            # Ignore
            return None
        case RevolutTransactionState.pending:
            return PaymentRequestStatus.pending
        case RevolutTransactionState.completed:
            return PaymentRequestStatus.succeeded
        case (
            RevolutTransactionState.declined
            | RevolutTransactionState.failed
            | RevolutTransactionState.reverted
        ):
            return PaymentRequestStatus.failed
        case _:
            current_logger.warning(
                f"""Unknown Revolut bank transfer update status {transfer_update.status}"""
            )
            return None

record_bank_transfer_from_out_of_order_revolut_transaction_state_changed_event

record_bank_transfer_from_out_of_order_revolut_transaction_state_changed_event(
    session, /, *, data, transaction_info, account
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def record_bank_transfer_from_out_of_order_revolut_transaction_state_changed_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionStateChangedEvent",
    transaction_info: "RevolutTransaction",
    account: Account,
) -> tuple[BankTransfer, bool]:
    direction = compute_bank_transfer_direction_from_revolut_transaction_info(
        transaction_info
    )

    # We use a shared transfer history per provider workspace for simplicity
    transfer_history, _ = TransferHistoryModelBroker.upsert_transfer_history(
        session,
        private_type=f"revolut:{account.workspace_key}",
        private_ref=account.workspace_key,
    )

    return BankTransferModelBroker.record_bank_transfer(
        session,
        workspace_key=account.workspace_key,
        external_id=transaction_info.id,
        effective_date=transaction_info.created_at,
        direction=direction,
        account_id=account.id,
        transfer_history_id=transfer_history.id,
        raw=data.raw_event,
    )

record_bank_transfer_from_revolut_transaction_created_event

record_bank_transfer_from_revolut_transaction_created_event(
    session, /, *, data, account
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def record_bank_transfer_from_revolut_transaction_created_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionCreatedEvent",
    account: Account,
) -> tuple[BankTransfer, bool]:
    direction = compute_bank_transfer_direction_from_revolut_transaction_created_event(
        data
    )

    # We use a shared transfer history per provider workspace for simplicity
    transfer_history, _ = TransferHistoryModelBroker.upsert_transfer_history(
        session,
        private_type=f"revolut:{account.workspace_key}",
        private_ref=account.workspace_key,
    )

    return BankTransferModelBroker.record_bank_transfer(
        session,
        workspace_key=account.workspace_key,
        external_id=str(data.transaction_id),
        effective_date=data.created_at,
        direction=direction,
        account_id=account.id,
        transfer_history_id=transfer_history.id,
        raw=data.raw_event,
    )

record_transfer_update_from_out_of_order_revolut_transaction_state_changed_event

record_transfer_update_from_out_of_order_revolut_transaction_state_changed_event(
    session,
    /,
    *,
    data,
    transaction_info,
    bank_transfer,
    payment_reference,
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def record_transfer_update_from_out_of_order_revolut_transaction_state_changed_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionStateChangedEvent",
    transaction_info: "RevolutTransaction",
    bank_transfer: BankTransfer,
    payment_reference: str,
) -> tuple[TransferUpdate, bool]:
    amount, currency = (
        compute_transfer_update_amount_from_out_of_order_revolut_transaction_state_changed_event(
            data, transaction_info
        )
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=bank_transfer.workspace_key,
        external_transfer_id=transaction_info.id,
        sequence_number=2,  # Hardcoded as we know the state transition orders
        transfer_id=bank_transfer.id,
        transfer_type=TransferUpdateTransferType.BANK,
        direction=bank_transfer.direction,
        occurred_at=transaction_info.created_at,
        amount=amount,
        currency=currency,
        status=transaction_info.state,
        raw=data.raw_event,
        external_transaction_id=payment_reference,
    )

record_transfer_update_from_revolut_transaction_created_event

record_transfer_update_from_revolut_transaction_created_event(
    session, /, *, data, bank_transfer, payment_reference
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def record_transfer_update_from_revolut_transaction_created_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionCreatedEvent",
    bank_transfer: BankTransfer,
    payment_reference: str,
) -> tuple[TransferUpdate, bool]:
    amount, currency = (
        compute_transfer_update_amount_from_revolut_transaction_created_event(data)
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=bank_transfer.workspace_key,
        external_transfer_id=str(data.transaction_id),
        sequence_number=1,  # Hardcoded as we know the state transition orders
        transfer_id=bank_transfer.id,
        transfer_type=TransferUpdateTransferType.BANK,
        direction=bank_transfer.direction,
        occurred_at=data.created_at,
        amount=amount,
        currency=currency,
        status=data.state,
        raw=data.raw_event,
        external_transaction_id=payment_reference,
    )

record_transfer_update_from_revolut_transaction_state_changed_event

record_transfer_update_from_revolut_transaction_state_changed_event(
    session, /, *, data, bank_transfer
)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def record_transfer_update_from_revolut_transaction_state_changed_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionStateChangedEvent",
    bank_transfer: BankTransfer,
) -> tuple[TransferUpdate, bool]:
    # Get new transfer update data from last one
    transfer_updates = (
        TransferUpdateModelBroker.get_transfer_updates_by_bank_transfer_id(
            session,
            BankTransferId(bank_transfer.id),
        )
    )
    assert len(transfer_updates) >= 1, (
        "Revolut bank transfer {bank_transfer.id} should have at least one transfer update"
    )
    last_transfer_update = transfer_updates[0]
    assert last_transfer_update.external_transaction_id

    amount, currency = (
        compute_transfer_update_amount_from_revolut_transaction_state_changed_event(
            data, last_transfer_update
        )
    )

    return TransferUpdateModelBroker.record_transfer_update(
        session,
        workspace_key=bank_transfer.workspace_key,
        external_transfer_id=str(data.transaction_id),
        sequence_number=2,  # Hardcoded as we know the state transition orders,
        transfer_id=bank_transfer.id,
        transfer_type=TransferUpdateTransferType.BANK,
        direction=bank_transfer.direction,
        occurred_at=data.updated_at,
        amount=amount,
        currency=currency,
        status=data.new_state,
        external_transaction_id=last_transfer_update.external_transaction_id,
        raw=data.raw_event,
    )

update_payment_request_from_revolut_transaction_event

update_payment_request_from_revolut_transaction_event(
    session, /, *, data, bank_transfer
)

Update the payment request matching the bank transfer if it exists

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/helpers.py
def update_payment_request_from_revolut_transaction_event(
    session: Session,
    /,
    *,
    data: "RevolutTransactionCreatedEvent|RevolutTransactionStateChangedEvent",
    bank_transfer: BankTransfer,
) -> PaymentRequest | None:
    """Update the payment request matching the bank transfer if it exists"""

    try:
        # We store the transaction ID in PaymentRequest.external_id for Revolut payment requests.
        # See RevolutWireTransferAdapter.initiate_wire_transfer_request
        payment_request = PaymentRequestModelBroker.get_payment_request_by_external_id(
            session,
            workspace_key=bank_transfer.workspace_key,
            external_id=str(data.transaction_id),
        )
        PaymentRequestModelBroker.set_bank_transfer(
            session,
            id=payment_request.id,
            bank_transfer_id=bank_transfer.id,
        )
        return payment_request
    except NoResultFound:
        return None

policies

bank_transfer_processor

RevolutBankTransferProcessorPolicy
RevolutBankTransferProcessorPolicy(
    workspace_key, revolut_client
)

This class is responsible for processing Revolut bank transfer events.

It processes Revolut TransactionCreated (creates bank transfer) and RevolutTransactionStateChanged (updates existing bank transfer) events.

Implements the following Nullable patterns: - Nullables: https://www.jamesshore.com/v2/projects/nullables/testing-without-mocks#nullables ⧉

Tags
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
def __init__(
    self,
    workspace_key: str,
    revolut_client: RevolutBusinessApiClient,
) -> None:
    raise_on_provider_not_supported(workspace_key, PaymentServiceProvider.revolut)
    self.revolut_client = revolut_client
    self.workspace_key = workspace_key
create classmethod
create(*, workspace_key)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
@classmethod
def create(
    cls,
    *,
    workspace_key: str,
) -> "RevolutBankTransferProcessorPolicy":
    if current_config["ACTUALLY_PAY_WITH_REVOLUT"]:
        revolut_client = RevolutBusinessApiClient.create(
            business_account_name=get_revolut_business_account_name(workspace_key)
        )
    else:
        current_logger.info(
            "Using fake Revolut client as ACTUALLY_PAY_WITH_REVOLUT isn't True, no payment will be triggered."
        )
        revolut_client = RevolutBusinessApiClient.create_null()

    return cls(
        workspace_key=workspace_key,
        revolut_client=revolut_client,
    )
create_null classmethod
create_null(*, workspace_key, revolut_responses=None)
Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
@classmethod
def create_null(
    cls,
    *,
    workspace_key: str,
    revolut_responses: list[ConfiguredResponse] | None = None,
) -> "RevolutBankTransferProcessorPolicy":
    return cls(
        workspace_key=workspace_key,
        revolut_client=RevolutBusinessApiClient.create_null(
            responses=revolut_responses
        ),
    )
on_transaction_created
on_transaction_created(data)

Process a TransactionCreated event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_transaction_created(self, data: "RevolutTransactionCreatedEvent") -> None:
    """Process a TransactionCreated event."""
    set_tag("workspace", self.workspace_key)
    logger = current_logger.bind(
        event_type="TransactionCreated",
        transaction_id=data.transaction_id,
        request_id=data.request_id,
    )

    if self._is_legacy_request_id(data.request_id):
        # This is the old Revolut way (outside of payment gateway)
        return

    try:
        result = self._process_transaction_created(logger=logger, data=data)
        if not result:
            # Event was ignored
            return

        if result.account_reference:
            set_tag("account_reference", result.account_reference)
        self._notify_changes(result)

    except Exception as ex:
        logger.exception(
            "Error processing Revolut transaction created event",
            exception=ex,
            event_data=data,
        )
        alert_on_error_processing_revolut_transaction_created_event(
            event_data=data,
            message="An unexpected error occurred while processing the transaction created event",
        )
        raise
on_transaction_state_changed
on_transaction_state_changed(data)

Process a TransactionStateChanged event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_transaction_state_changed(
    self, data: "RevolutTransactionStateChangedEvent"
) -> None:
    """Process a TransactionStateChanged event."""
    set_tag("workspace", self.workspace_key)
    logger = current_logger.bind(
        event_type="TransactionStateChanged",
        transaction_id=data.transaction_id,
        request_id=data.request_id,
    )

    if self._is_legacy_request_id(data.request_id):
        # This is the old Revolut way (outside of payment gateway)
        return

    try:
        result = self._process_transaction_state_changed(logger=logger, data=data)

        if not result:
            # Event was ignored
            return

        if result.account_reference:
            set_tag("account_reference", result.account_reference)
        self._notify_changes(result)

    except Exception as ex:
        logger.exception(
            "Error processing Revolut transaction state changed event",
            exception=ex,
            event_data=data,
        )
        alert_on_error_processing_revolut_transaction_state_changed_event(
            event_data=data,
            message="An unexpected error occurred while processing the transaction state changed event",
        )
        raise
revolut_client instance-attribute
revolut_client = revolut_client
workspace_key instance-attribute
workspace_key = workspace_key
TransactionEventProcessingResult dataclass
TransactionEventProcessingResult(
    bank_transfer,
    transfer_update,
    is_transfer_update_created,
    is_payment_request_refunded,
    payment_request,
    account_reference,
    reason_code=None,
)
account_reference instance-attribute
account_reference
bank_transfer instance-attribute
bank_transfer
is_payment_request_refunded instance-attribute
is_payment_request_refunded
is_transfer_update_created instance-attribute
is_transfer_update_created
payment_request instance-attribute
payment_request
reason_code class-attribute instance-attribute
reason_code = None
transfer_update instance-attribute
transfer_update

topic_subscribers

TransactionCreatedSubscriber

TransactionCreatedSubscriber(
    *, bank_transfer_processor_policy
)

Bases: Subscriber

Subscribes to Revolut TransactionCreated events.

Creates bank transfer records when new transactions are created.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
def __init__(
    self,
    *,
    bank_transfer_processor_policy: RevolutBankTransferProcessorPolicy,
) -> None:
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
receive
receive(message)

Receive and process a TransactionCreated event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: RevolutTransactionCreatedEvent) -> None:
    """Receive and process a TransactionCreated event."""
    logger = current_logger.bind(
        transaction_id=str(message.transaction_id),
        request_id=str(message.request_id),
        state=message.state,
    )
    try:
        self.bank_transfer_processor_policy.on_transaction_created(message)
    except Exception as e:
        logger.exception(
            "Error processing Revolut TransactionCreated event",
            exception=e,
        )
        alert_on_error_processing_revolut_transaction_event(
            event_data=message,
            message="An unexpected error occurred while processing Revolut transaction event",
        )

TransactionStateChangedSubscriber

TransactionStateChangedSubscriber(
    *, bank_transfer_processor_policy
)

Bases: Subscriber

Subscribes to Revolut TransactionStateChanged events.

Updates bank transfer status when transaction state changes.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
def __init__(
    self,
    *,
    bank_transfer_processor_policy: RevolutBankTransferProcessorPolicy,
) -> None:
    self.bank_transfer_processor_policy = bank_transfer_processor_policy
bank_transfer_processor_policy instance-attribute
bank_transfer_processor_policy = (
    bank_transfer_processor_policy
)
receive
receive(message)

Receive and process a TransactionStateChanged event.

Source code in components/payment_gateway/subcomponents/transfers/adapters/revolut/topic_subscribers.py
@override
@obs.event_subscriber()
def receive(self, message: RevolutTransactionStateChangedEvent) -> None:
    """Receive and process a TransactionStateChanged event."""
    logger = current_logger.bind(
        transaction_id=str(message.transaction_id),
        request_id=message.request_id,
        new_state=message.new_state,
        old_state=message.old_state,
    )
    try:
        self.bank_transfer_processor_policy.on_transaction_state_changed(message)
    except Exception as e:
        logger.exception(
            "Error processing Revolut TransactionStateChanged event",
            exception=e,
        )
        alert_on_error_processing_revolut_transaction_event(
            event_data=message,
            message="An unexpected error occurred while processing Revolut transaction event",
        )