Skip to content

Actions

components.payment_gateway.subcomponents.payments.protected.business_logic.actions.payment_actions

PaymentActions

PaymentActions(
    payment_method_adapter_registry,
    payment_request_propagation,
)

Actions for payment operations, such as: - SEPA Direct Debit - SEPA Credit Transfer - Canada Electronic Funds Transfer

Tags
Source code in components/payment_gateway/subcomponents/payments/protected/business_logic/actions/payment_actions.py
def __init__(
    self,
    payment_method_adapter_registry: PaymentMethodAdapterRegistry,
    payment_request_propagation: Propagation,
) -> None:
    self.payment_method_adapter_registry = payment_method_adapter_registry
    self.payment_request_propagation = payment_request_propagation

create classmethod

create()

Normal factory

Source code in components/payment_gateway/subcomponents/payments/protected/business_logic/actions/payment_actions.py
@classmethod
def create(cls) -> "PaymentActions":
    """Normal factory"""
    return cls(
        payment_method_adapter_registry=PaymentMethodAdapterRegistry.create(),
        payment_request_propagation=Propagation.REQUIRES_NEW,
    )

create_null classmethod

create_null(
    *,
    track_adyen_requests=None,
    track_jpmorgan_requests=None,
    jpmorgan_responses=None,
    jpmorgan_simulator=None,
    revolut_simulator=None
)

Null factory

Source code in components/payment_gateway/subcomponents/payments/protected/business_logic/actions/payment_actions.py
@classmethod
def create_null(
    cls,
    *,
    track_adyen_requests: list[tuple[str, dict, dict]] | None = None,  # type: ignore[type-arg]
    track_jpmorgan_requests: list[TrackedRequest] | None = None,
    jpmorgan_responses: list[ConfiguredResponse] | None = None,
    jpmorgan_simulator: JPMorganPaymentsSimulator | None = None,
    revolut_simulator: RevolutBusinessSimulator | None = None,
) -> "PaymentActions":
    """Null factory"""
    return cls(
        payment_method_adapter_registry=PaymentMethodAdapterRegistry.create_null(
            track_adyen_requests=track_adyen_requests,
            track_jpmorgan_requests=track_jpmorgan_requests,
            jpmorgan_responses=jpmorgan_responses,
            jpmorgan_simulator=jpmorgan_simulator,
            revolut_simulator=revolut_simulator,
        ),
        payment_request_propagation=Propagation.REQUIRED,
    )

find_or_recover_payment_request_by_reference

find_or_recover_payment_request_by_reference(
    session,
    /,
    *,
    workspace_key,
    reference,
    payment_created_at,
    financial_instrument_id=None,
)

Find a PaymentRequest by (workspace_key, reference), falling back to the PSP if missing.

Used by consumers (BE/FR claim management) to verify whether a payment was actually processed before generating a new reference for a retry.

Lookup order: 1. DB: if a PaymentRequest exists for (workspace_key, reference), return its id. 2. PSP fallback (Revolut only today): ask the PSP whether a transaction with this reference exists in the window around payment_created_at. If found, backfill a minimal PaymentRequest from the PSP response (orphan recovery) and return its id. 3. Otherwise return None — the payment was genuinely never processed by the PSP.

After backfilling, the existing StalePaymentsPolicy is invoked (with the fetched transaction passed in to avoid a second PSP call) to create the matching BankTransfer / TransferUpdate and emit the PaymentRequestUpdated event the consumer expects.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session to use for DB operations.

required
workspace_key str

Workspace whose PSP credentials should be used for the fallback lookup.

required
reference str

The reference passed to the PSP at submission (= the payment's payment_reference).

required
payment_created_at datetime

Approximate creation date of the original payment, used to scope the PSP lookup window.

required
financial_instrument_id FinancialInstrumentId | None

FI of the original payment. Required only if the PSP fallback hits and we need to backfill — the PaymentRequest column is NOT NULL. If absent in that branch, the action logs and returns None so the caller can fall through (e.g. legacy path).

None

Returns:

Type Description
PaymentRequestId | None

The id of the found or recovered PaymentRequest, or None.

Tags
Source code in components/payment_gateway/subcomponents/payments/protected/business_logic/actions/payment_actions.py
@obs.api_call()
def find_or_recover_payment_request_by_reference(
    self,
    session: Session,
    /,
    *,
    workspace_key: str,
    reference: str,
    payment_created_at: datetime,
    financial_instrument_id: FinancialInstrumentId | None = None,
) -> PaymentRequestId | None:
    """Find a PaymentRequest by (workspace_key, reference), falling back to the PSP if missing.

    Used by consumers (BE/FR claim management) to verify whether a payment
    was actually processed before generating a new reference for a retry.

    Lookup order:
    1. **DB**: if a PaymentRequest exists for (workspace_key, reference),
       return its id.
    2. **PSP fallback** (Revolut only today): ask the PSP whether a
       transaction with this reference exists in the window around
       `payment_created_at`. If found, backfill a minimal PaymentRequest
       from the PSP response (orphan recovery) and return its id.
    3. **Otherwise** return None — the payment was genuinely never
       processed by the PSP.

    After backfilling, the existing `StalePaymentsPolicy` is invoked
    (with the fetched transaction passed in to avoid a second PSP call)
    to create the matching BankTransfer / TransferUpdate and emit the
    `PaymentRequestUpdated` event the consumer expects.

    Args:
        session: SQLAlchemy session to use for DB operations.
        workspace_key: Workspace whose PSP credentials should be used for
            the fallback lookup.
        reference: The reference passed to the PSP at submission (= the
            payment's `payment_reference`).
        payment_created_at: Approximate creation date of the original
            payment, used to scope the PSP lookup window.
        financial_instrument_id: FI of the original payment. Required only
            if the PSP fallback hits and we need to backfill — the
            PaymentRequest column is NOT NULL. If absent in that branch,
            the action logs and returns None so the caller can fall
            through (e.g. legacy path).

    Returns:
        The id of the found or recovered PaymentRequest, or None.

    Tags:
        - @payment_method: Wire Transfer
        - @revolut: business_api
    """
    set_tag("workspace", workspace_key)

    # 1. DB lookup.
    existing = PaymentRequestModelBroker.get_payment_request_by_reference(
        session, workspace_key=workspace_key, reference=reference
    )
    if existing is not None:
        return PaymentRequestId(existing.id)

    # 2. PSP fallback (Revolut only today).
    payment_service_provider = get_provider_for_workspace(workspace_key)
    if payment_service_provider is None:
        raise WorkspaceNotRegisteredException(workspace_key)

    if payment_service_provider != PaymentServiceProvider.revolut:
        # Orphan recovery is only implemented for Revolut today. When a
        # second PSP needs this capability, extract `find_transaction_by_reference`
        # to an abstract method on `WireTransferAdapter` and drop this branch.
        return None

    adapter = self.payment_method_adapter_registry.get_wire_transfer_adapter(
        payment_service_provider
    )
    assert isinstance(adapter, RevolutWireTransferAdapter)

    psp_transaction = adapter.find_transaction_by_reference(
        workspace_key=workspace_key,
        reference=reference,
        around_date=payment_created_at.date(),
    )
    if psp_transaction is None:
        return None

    # 3. Extract the debit leg.
    # Wire transfer payouts to external counterparties (the only PG-via-
    # Revolut wire-transfer use case today) carry a single leg: our
    # account debited with a negative amount. abs() on the leg amount
    # gives positive minor units. If we ever support Revolut-to-Revolut
    # internal transfers via this path, leg selection needs to filter on
    # the external counterparty.
    if not psp_transaction.legs:
        current_logger.warning(
            f"Revolut transaction {psp_transaction.id} for reference {reference!r} has no legs",
            transaction_id=str(psp_transaction.id),
            reference=reference,
        )
        return None
    leg = psp_transaction.legs[0]
    debtor_account_external_id = str(leg.account_id)
    amount = int(round(abs(leg.amount) * 100))
    currency = leg.currency.root

    # 4. FI is required to backfill (PaymentRequest.financial_instrument_id is NOT NULL).
    # If the caller didn't supply one, we can't recover — log + return None so they
    # fall through (legacy path or fresh submit).
    if financial_instrument_id is None:
        current_logger.warning(
            f"Cannot backfill PaymentRequest for reference {reference!r}: no financial_instrument_id provided",
            workspace_key=workspace_key,
            reference=reference,
            external_id=str(psp_transaction.id),
        )
        return None

    # 5. Resolve the PG Account that maps to the Revolut debit account.
    try:
        debtor_account = AccountModelBroker.get_account_by_external_id(
            session,
            workspace_key=workspace_key,
            external_id=debtor_account_external_id,
        )
    except NoResultFound:
        # Can't backfill without a PG Account. Should never happen in
        # practice — if Revolut returned a transaction debited from this
        # workspace, PG should have the matching Account row.
        current_logger.error(
            f"Cannot backfill PaymentRequest for reference {reference!r}: no PG Account for "
            f"workspace_key={workspace_key!r} external_id={debtor_account_external_id!r}",
            workspace_key=workspace_key,
            reference=reference,
            debtor_account_external_id=debtor_account_external_id,
        )
        return None

    # 6. Backfill the PaymentRequest and convert it to the entity the
    # reconciliation policy consumes (mirrors how the stale-payments job
    # supplies it from PaymentQueries.get_stale_payment_requests).
    with transaction(propagation=self.payment_request_propagation) as session:
        payment_request_model = PaymentRequestModelBroker.create_payment_request(
            session,
            workspace_key=workspace_key,
            external_id=str(psp_transaction.id),
            account_id=debtor_account.id,
            payment_type=PaymentRequestType.wire_transfer,
            reference=reference,
            amount=amount,
            currency=CurrencyCode(currency),
            raw=psp_transaction.model_dump(mode="json"),
            financial_instrument_id=financial_instrument_id,
        )
        payment_request_id = PaymentRequestId(payment_request_model.id)
        payment_request = PaymentQueries().get_payment_request(
            session, payment_request_id
        )

    # 7. Reconcile through the existing webhook code path: create the
    # BankTransfer + TransferUpdate and emit `PaymentRequestUpdated` so
    # the consumer (BE/FR claim management) sees the current state.
    # `transaction_info` is passed to skip a second Revolut roundtrip.
    # If this call fails after the PaymentRequest is committed, the
    # stale-payments reconciliation job will pick the row up on its
    # next run.
    StalePaymentsPolicy.create(
        workspace_key=workspace_key
    ).reconcile_stale_payment_request(
        payment_request,
        transaction_info=psp_transaction,
    )

    current_logger.info(
        f"Recovered orphan PaymentRequest {payment_request_id} for reference {reference!r} from PSP",
        workspace_key=workspace_key,
        reference=reference,
        payment_request_id=str(payment_request_id),
        external_id=str(psp_transaction.id),
    )

    return payment_request_id

initiate_direct_debit_request

initiate_direct_debit_request(
    session,
    /,
    *,
    debtor_financial_instrument_id,
    creditor_account_id,
    amount,
    currency,
    reference,
    description,
)

Initiates a Direct Debit payment request.

  • Debtor Financial Instrument: External Bank Account we're transferring funds from.
  • Creditor Account: Provider Account we're transferring funds to, managed by Alan on a PSP Workspace.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session to use for DB operations.

required
debtor_financial_instrument_id FinancialInstrumentId

ID of the financial instrument owning the account that will be debited.

required
creditor_account_id AccountId

ID of the account that will receive the funds.

required
amount int

Payment amount in minor currency units (e.g., cents for EUR). Must be positive.

required
currency CurrencyCode

ISO 4217 currency code (e.g., 'USD', 'EUR').

required
reference str

Unique reference for the payment request. Used for tracking and reconciliation. Must be short (max 35 characters) to be safely used across all payment providers.

required
description str | None

Optional human-readable description for the payment. Defaults to None.

required

Returns:

Type Description
PaymentRequestId

The ID of the newly created payment request.

Note

This operation is currently only supported for Adyen.

Tags
Source code in components/payment_gateway/subcomponents/payments/protected/business_logic/actions/payment_actions.py
@obs.api_call()
def initiate_direct_debit_request(
    self,
    session: Session,
    /,
    *,
    debtor_financial_instrument_id: FinancialInstrumentId,
    creditor_account_id: AccountId,
    amount: int,
    currency: CurrencyCode,
    reference: str,
    description: str | None,
) -> PaymentRequestId:
    """Initiates a Direct Debit payment request.

    - Debtor Financial Instrument: External Bank Account we're transferring funds from.
    - Creditor Account: Provider Account we're transferring funds to, managed by Alan on a PSP Workspace.

    Args:
        session: SQLAlchemy session to use for DB operations.
        debtor_financial_instrument_id: ID of the financial instrument owning the account that will be debited.
        creditor_account_id: ID of the account that will receive the funds.
        amount: Payment amount in minor currency units (e.g., cents for EUR). Must be positive.
        currency: ISO 4217 currency code (e.g., 'USD', 'EUR').
        reference: Unique reference for the payment request. Used for tracking and reconciliation. Must be short (max 35 characters) to be safely used across all payment providers.
        description: Optional human-readable description for the payment. Defaults to None.

    Returns:
        The ID of the newly created payment request.

    Note:
        This operation is currently only supported for Adyen.

    Tags:
        - @payment_flow: Flex Top-Up
        - @payment_method: Direct Debit
        - @adyen: payments_api
    """
    with raise_if_financial_instrument_not_found(debtor_financial_instrument_id):
        debtor_financial_instrument = (
            FinancialInstrumentModelBroker.get_financial_instrument(
                session,
                debtor_financial_instrument_id,
                with_legal_entity=True,
            )
        )
    raise_on_terminated_financial_instrument(debtor_financial_instrument)
    raise_on_terminated_legal_entity(debtor_financial_instrument.legal_entity)

    # 1. Get Account to retrieve the payment service provider
    with raise_if_account_not_found(creditor_account_id):
        creditor_account = AccountModelBroker.get_account(
            session, creditor_account_id
        )
    raise_on_terminated_account(creditor_account)
    set_tag("workspace", creditor_account.workspace_key)

    payment_service_provider = get_provider_for_workspace(
        creditor_account.workspace_key
    )
    if payment_service_provider is None:
        raise WorkspaceNotRegisteredException(creditor_account.workspace_key)
    payment_method_adapter = (
        self.payment_method_adapter_registry.get_direct_debit_adapter(
            payment_service_provider
        )
    )

    with transaction(propagation=self.payment_request_propagation) as session:
        # 2. Call the adapter to initiate the transfer
        external_id, raw_response = (
            payment_method_adapter.initiate_direct_debit_request(
                debtor_financial_instrument=debtor_financial_instrument,
                creditor_account=creditor_account,
                amount=amount,
                currency=currency,
                reference=reference,
                description=description,
            )
        )

        # 3. Record the PaymentRequest with the response
        # TODO @frederic.bonnet 2026-01-15: Should we record the payment request on failure? => keep consistent with initiate_wire_transfer_request
        payment_request = PaymentRequestModelBroker.create_payment_request(
            session,
            external_id=external_id,
            workspace_key=creditor_account.workspace_key,
            account_id=creditor_account_id,
            payment_type=PaymentRequestType.direct_debit,
            reference=reference,
            amount=amount,
            currency=currency,
            raw=raw_response,
            financial_instrument_id=debtor_financial_instrument_id,
        )
        payment_request_id = PaymentRequestId(payment_request.id)

    publish_payment_request_created(
        occurred_at=datetime.now(UTC),
        workspace_key=creditor_account.workspace_key,
        payment_request_id=payment_request_id,
        account_id=creditor_account_id,
        bank_transfer_id=None,
        transfer_update_id=None,
        reference=reference,
        status=PaymentRequestStatus.submitted,
    )

    return payment_request_id

initiate_wire_transfer_request

initiate_wire_transfer_request(
    session,
    /,
    *,
    debtor_account_id,
    creditor_financial_instrument_id,
    amount,
    currency,
    reference,
    description,
)

Initiates a Wire Transfer payment request.

  • Debtor Account: Provider Account we're transferring funds from, managed by Alan on a PSP Workspace.
  • Creditor Financial Instrument: External Bank Account we're transferring funds to, belonging to the counterparty.

Parameters:

Name Type Description Default
session Session

SQLAlchemy session to use for DB operations.

required
debtor_account_id AccountId

ID of the account that will be debited.

required
creditor_financial_instrument_id FinancialInstrumentId

ID of the financial instrument that will receive the funds.

required
amount int

Payment amount in minor currency units (e.g., cents for EUR). Must be positive.

required
currency CurrencyCode

ISO 4217 currency code (e.g., 'USD', 'EUR').

required
reference str

Unique reference for the payment request. Used for tracking and reconciliation. Must be short (max 35 characters) to be safely used across all payment providers.

required
description str | None

Optional human-readable description for the payment. Defaults to None.

required

Returns:

Type Description
PaymentRequestId

The ID of the newly created payment request.

Note

This operation is currently only supported for JPMorgan.

Tags
Source code in components/payment_gateway/subcomponents/payments/protected/business_logic/actions/payment_actions.py
@obs.api_call()
def initiate_wire_transfer_request(
    self,
    session: Session,
    /,
    *,
    debtor_account_id: AccountId,
    creditor_financial_instrument_id: FinancialInstrumentId,
    amount: int,
    currency: CurrencyCode,
    reference: str,
    description: str | None,
) -> PaymentRequestId:
    """
    Initiates a Wire Transfer payment request.

    - Debtor Account: Provider Account we're transferring funds from, managed by Alan on a PSP Workspace.
    - Creditor Financial Instrument: External Bank Account we're transferring funds to, belonging to the counterparty.

    Args:
        session: SQLAlchemy session to use for DB operations.
        debtor_account_id: ID of the account that will be debited.
        creditor_financial_instrument_id: ID of the financial instrument that will receive the funds.
        amount: Payment amount in minor currency units (e.g., cents for EUR). Must be positive.
        currency: ISO 4217 currency code (e.g., 'USD', 'EUR').
        reference: Unique reference for the payment request. Used for tracking and reconciliation. Must be short (max 35 characters) to be safely used across all payment providers.
        description: Optional human-readable description for the payment. Defaults to None.

    Returns:
        The ID of the newly created payment request.

    Note:
        This operation is currently only supported for JPMorgan.

    Tags:
        - @payment_flow: FR Claims Payout
        - @payment_flow: CA Claims Payout
        - @payment_method: Wire Transfer
        - @revolut: business_api
        - @jpmorgan: global_payments_api
    """
    with raise_if_financial_instrument_not_found(creditor_financial_instrument_id):
        creditor_financial_instrument = (
            FinancialInstrumentModelBroker.get_financial_instrument(
                session,
                creditor_financial_instrument_id,
                with_legal_entity=True,
            )
        )
    raise_on_terminated_financial_instrument(creditor_financial_instrument)
    raise_on_terminated_legal_entity(creditor_financial_instrument.legal_entity)

    # 1. Get Account to retrieve the payment service provider
    with raise_if_account_not_found(debtor_account_id):
        debtor_account = AccountModelBroker.get_account(session, debtor_account_id)
    raise_on_terminated_account(debtor_account)
    set_tag("workspace", debtor_account.workspace_key)

    payment_service_provider = get_provider_for_workspace(
        debtor_account.workspace_key
    )
    if payment_service_provider is None:
        raise WorkspaceNotRegisteredException(debtor_account.workspace_key)
    if (
        payment_service_provider == PaymentServiceProvider.revolut
        and debtor_account.reference
    ):
        set_tag("account_reference", debtor_account.reference)
    payment_method_adapter = (
        self.payment_method_adapter_registry.get_wire_transfer_adapter(
            payment_service_provider
        )
    )

    with transaction(propagation=self.payment_request_propagation) as session:
        # 2. Call the adapter to initiate the transfer
        try:
            external_id, raw_response, bank_transfer_id, transfer_update_id = (
                payment_method_adapter.initiate_wire_transfer_request(
                    debtor_account=debtor_account,
                    creditor_financial_instrument=creditor_financial_instrument,
                    amount=amount,
                    currency=currency,
                    reference=reference,
                    description=description,
                )
            )
        except PaymentRequestDuplicateAtProvider:
            # Reconcile from the existing PaymentRequest so the action stays
            # idempotent for callers. Re-raise if the local row is missing —
            # that's a real data inconsistency worth paging on.
            current_logger.info(
                "Reconciling duplicate-request response",
                reference=reference,
                workspace_key=debtor_account.workspace_key,
            )
            recovered_id = self.find_or_recover_payment_request_by_reference(
                session,
                workspace_key=debtor_account.workspace_key,
                reference=reference,
                payment_created_at=datetime.now(UTC),
                financial_instrument_id=creditor_financial_instrument_id,
            )
            if recovered_id is None:
                raise
            return recovered_id

        # 3. Record the PaymentRequest with the response
        # TODO @frederic.bonnet 2026-01-15: Should we record the payment request on failure? => keep consistent with initiate_direct_debit_request
        payment_request = PaymentRequestModelBroker.create_payment_request(
            session,
            external_id=external_id,
            workspace_key=debtor_account.workspace_key,
            account_id=debtor_account_id,
            payment_type=PaymentRequestType.wire_transfer,
            reference=reference,
            amount=amount,
            currency=currency,
            raw=raw_response,
            bank_transfer_id=bank_transfer_id,
            financial_instrument_id=creditor_financial_instrument_id,
        )
        payment_request_id = PaymentRequestId(payment_request.id)

    publish_payment_request_created(
        occurred_at=datetime.now(UTC),
        workspace_key=debtor_account.workspace_key,
        payment_request_id=payment_request_id,
        account_id=debtor_account_id,
        bank_transfer_id=bank_transfer_id,
        transfer_update_id=transfer_update_id,
        reference=reference,
        status=PaymentRequestStatus.submitted,
    )

    return payment_request_id

payment_method_adapter_registry instance-attribute

payment_method_adapter_registry = (
    payment_method_adapter_registry
)

Registry of adapters for the different payment methods.

payment_request_propagation instance-attribute

payment_request_propagation = payment_request_propagation

Transaction propagation behavior to use for the adapter calls and payment request persistence.