Skip to content

Policies

components.payment_gateway.subcomponents.transfers.business_logic.policies.account_transfer_processor

AccountTransferProcessorPolicy

AccountTransferProcessorPolicy(*, account_transfer_router)

This class is responsible for processing account transfer events (a.k.a internal transfer from Adyen).

It processes Adyen transfer notifications of type internalTransfer or invoiceDeduction.

Source code in components/payment_gateway/subcomponents/transfers/business_logic/policies/account_transfer_processor.py
def __init__(
    self,
    *,
    account_transfer_router: AccountTransferRouter,
) -> None:
    self.account_transfer_router = account_transfer_router
    self.ledger_logic = LedgerLogic()

account_transfer_router instance-attribute

account_transfer_router = account_transfer_router

ledger_logic instance-attribute

ledger_logic = LedgerLogic()

on_account_transfer_received

on_account_transfer_received(data)
Source code in components/payment_gateway/subcomponents/transfers/business_logic/policies/account_transfer_processor.py
@obs.event_subscriber()
def on_account_transfer_received(self, data: "TransferData") -> None:
    from shared.services.adyen.openapi.balance_platform_transfer_notification_v4 import (
        InternalCategoryData,
    )

    logger = current_logger.bind(
        transfer_type=data.type,
        transfer_id=data.id,
        transfer_sequence_number=data.sequenceNumber,
    )

    # Sanity checks
    # These fields are supposed to be present in the data; we can't perform any operation without them anyway
    try:
        assert data.type == "internalTransfer" or data.type == "invoiceDeduction"
        assert data.category == "internal"
        assert data.categoryData
        if data.categoryData and not isinstance(
            data.categoryData, InternalCategoryData
        ):
            # This can happen when deserializing from a JSON string
            data.categoryData = InternalCategoryData.from_dict(data.categoryData)  # type: ignore[arg-type]
        assert data.id
        assert data.sequenceNumber
        assert data.balanceAccount
        assert data.balanceAccount.id
        assert data.creationDate
        assert data.direction
        assert data.reference
        assert data.events

    except AssertionError:
        logger.error("Missing required fields in account transfer data")
        alert_on_error_processing_transfer(
            data=data,
            message="Transfer data is missing required fields",
        )
        raise ValueError("Missing required fields in account transfer data")

    try:
        self._upsert_account_transfer(logger=logger, data=data)
    except Exception as ex:
        logger.exception(
            "Error processing account transfer",
            exception=ex,
            transfer_data=data,
        )
        alert_on_error_processing_transfer(
            data=data,
            message="An unexpected error occurred while upserting the account transfer",
        )
        raise

components.payment_gateway.subcomponents.transfers.business_logic.policies.bank_transfer_processor

BankTransferProcessorPolicy

BankTransferProcessorPolicy(*, bank_transfer_router)

This class is responsible for processing bank transfer events.

It processes Adyen transfer notifications of type bankTransfer.

Source code in components/payment_gateway/subcomponents/transfers/business_logic/policies/bank_transfer_processor.py
def __init__(
    self,
    *,
    bank_transfer_router: BankTransferRouter,
) -> None:
    self.bank_transfer_router = bank_transfer_router
    self.ledger_logic = LedgerLogic()

bank_transfer_router instance-attribute

bank_transfer_router = bank_transfer_router

ledger_logic instance-attribute

ledger_logic = LedgerLogic()

on_bank_transfer_received

on_bank_transfer_received(data)
Source code in components/payment_gateway/subcomponents/transfers/business_logic/policies/bank_transfer_processor.py
@obs.event_subscriber()
def on_bank_transfer_received(self, data: "TransferData") -> None:
    from shared.services.adyen.openapi.balance_platform_transfer_notification_v4 import (
        BankCategoryData,
    )

    logger = current_logger.bind(
        transfer_type=data.type,
        transfer_id=data.id,
        transfer_sequence_number=data.sequenceNumber,
    )

    # Sanity checks
    # These fields are supposed to be present in the data; we can't perform any operation without them anyway
    try:
        assert data.type == "bankTransfer"
        assert data.category == "bank"
        assert data.categoryData
        if data.categoryData and not isinstance(
            data.categoryData, BankCategoryData
        ):
            # This can happen when deserializing from a JSON string
            data.categoryData = BankCategoryData.from_dict(data.categoryData)  # type: ignore[arg-type]
        assert data.id
        assert data.sequenceNumber
        assert data.balanceAccount
        assert data.balanceAccount.id
        assert data.creationDate
        assert data.direction
        assert data.events
    except AssertionError:
        logger.error("Missing required fields in bank transfer data")
        alert_on_error_processing_transfer(
            data=data,
            message="Transfer data is missing required fields",
        )
        raise ValueError("Missing required fields in bank transfer data")

    try:
        self._upsert_bank_transfer(logger=logger, data=data)
    except Exception as ex:
        logger.exception(
            "Error processing bank transfer",
            exception=ex,
            transfer_data=data,
        )
        alert_on_error_processing_transfer(
            data=data,
            message="An unexpected error occurred while upserting the bank transfer",
        )
        raise

components.payment_gateway.subcomponents.transfers.business_logic.policies.card_transfer_processor

CardTransferProcessorPolicy

CardTransferProcessorPolicy(
    *, card_transfer_router, merchant_registry_policy=None
)

This class is responsible for processing card transfer events.

It processes Adyen transfer notifications of type payment.

Source code in components/payment_gateway/subcomponents/transfers/business_logic/policies/card_transfer_processor.py
def __init__(
    self,
    *,
    card_transfer_router: CardTransferRouter,
    merchant_registry_policy: MerchantRegistryPolicy | None = None,
) -> None:
    self.card_transfer_router = card_transfer_router
    self.merchant_registry_policy = (
        merchant_registry_policy or MerchantRegistryPolicy()
    )
    self.ledger_logic = LedgerLogic()

card_transfer_router instance-attribute

card_transfer_router = card_transfer_router

ledger_logic instance-attribute

ledger_logic = LedgerLogic()

merchant_registry_policy instance-attribute

merchant_registry_policy = (
    merchant_registry_policy or MerchantRegistryPolicy()
)

on_payment_received

on_payment_received(data)
Source code in components/payment_gateway/subcomponents/transfers/business_logic/policies/card_transfer_processor.py
@obs.event_subscriber()
def on_payment_received(self, data: "TransferData") -> None:
    from components.payment_gateway.subcomponents.transfers.adapters.models.mappers import (
        card_transfer_model_to_dataclass,
        transfer_event_model_to_dataclass,
        transfer_update_model_to_dataclass,
    )
    from shared.services.adyen.openapi.balance_platform_transfer_notification_v4 import (
        IssuedCard,
    )

    # TODO: implement optimistic locking with versioning using Adyen's sequenceNumber
    # see https://docs.sqlalchemy.org/en/latest/orm/versioning.html#configuring-a-version-counter
    logger = current_logger.bind(
        transfer_type=data.type,
        transfer_id=data.id,
        transfer_sequence_number=data.sequenceNumber,
    )

    # Sanity checks
    # These fields are supposed to be present in the data; we can't perform any operation without them anyway
    try:
        assert data.type == "payment"
        assert data.category == "issuedCard"
        if data.categoryData and not isinstance(data.categoryData, IssuedCard):
            # This can happen when deserializing from a JSON string
            data.categoryData = IssuedCard.from_dict(data.categoryData)  # type: ignore[arg-type]
        assert data.id
        assert data.sequenceNumber
        assert data.paymentInstrument
        assert data.paymentInstrument.id
        assert data.balanceAccount
        assert data.balanceAccount.id
        assert data.counterparty
        assert data.counterparty.merchant
        assert data.creationDate
        assert data.events
    except AssertionError:
        logger.error("Missing required fields in card transfer data")
        alert_on_error_processing_transfer(
            data=data,
            message="Transfer data is missing required fields",
        )
        raise ValueError("Missing required fields in card transfer data")

    # Get or update merchant info
    # This will fill any gap in the payload with data from the merchant registry
    merchant_info = self.merchant_registry_policy.upsert_merchant_info(
        data.counterparty.merchant
    )
    if not merchant_info:
        logger.error(
            "Cannot route payment with no merchant info",
            merchant_data=data.counterparty.merchant,
        )
        alert_on_error_processing_transfer(
            data=data,
            message="Transfer data is missing merchant info and we couldn't retrieve it from the merchant registry",
        )
        raise ValueError("Cannot route payment with no merchant info")

    try:
        (
            card_transfer,
            transfer_update,
            is_transfer_update_created,
            new_transfer_events,
        ) = self._upsert_card_transfer(
            logger=logger, data=data, merchant_info=merchant_info
        )
    except Exception as ex:
        logger.exception(
            "Error processing card transfer",
            exception=ex,
            transfer_data=data,
        )
        alert_on_error_processing_transfer(
            data=data,
            message="An unexpected error occurred while upserting the card transfer",
        )
        raise

    # Notify respective topics if needed
    if card_transfer:
        card_transfer_topic.publish(card_transfer_model_to_dataclass(card_transfer))
    if transfer_update and is_transfer_update_created:
        transfer_update_topic.publish(
            transfer_update_model_to_dataclass(transfer_update)
        )
    for event in new_transfer_events:
        transfer_event_topic.publish(transfer_event_model_to_dataclass(event))

components.payment_gateway.subcomponents.transfers.business_logic.policies.helpers

record_ledger_entry_for_transfer_event

record_ledger_entry_for_transfer_event(
    session, /, ledger_logic, ledger_id, transfer_event
)

Record a new ledger entry matching the balance change of the provided transfer event.

Does nothing if the balance change is zero.

Note

The function makes no assumption about multiple ledger entries sharing the same external_id, deduplication is the caller's responsibility to ensure there is no double accounting of the same event.

Parameters:

Name Type Description Default
ledger_logic LedgerLogic

The ledger logic instance to use.

required
ledger_id LedgerId

The ID of the ledger to record the entry in.

required
transfer_event TransferEvent

The transfer event to record the ledger entry for.

required
Source code in components/payment_gateway/subcomponents/transfers/business_logic/policies/helpers.py
def record_ledger_entry_for_transfer_event(
    session: Session,
    /,
    ledger_logic: LedgerLogic,
    ledger_id: LedgerId,
    transfer_event: TransferEvent,
) -> None:
    """Record a new ledger entry matching the balance change of the provided
    transfer event.

    Does nothing if the balance change is zero.

    Note:
        The function makes no assumption about multiple ledger entries sharing
        the same external_id, deduplication is the caller's responsibility to
        ensure there is no double accounting of the same event.

    Args:
        ledger_logic: The ledger logic instance to use.
        ledger_id: The ID of the ledger to record the entry in.
        transfer_event: The transfer event to record the ledger entry for.
    """
    balance_change = calculate_amount_from_events(
        [transfer_event_model_to_dataclass(transfer_event)],
        including_received=True,
    )
    if balance_change == 0:
        # Don't record a ledger entry if there is no balance change
        return

    ledger_logic.record_entry(
        session,
        id=ledger_id,
        amount=balance_change,
        occurred_at=transfer_event.effective_date,
        external_transaction_id=transfer_event.external_id,
    )