Skip to content

Business logic

components.payment_gateway.internal.business_logic.queries

webhook_log_extended_queries

get_extended_webhook_logs_in_period

get_extended_webhook_logs_in_period(
    session,
    /,
    from_date,
    to_date,
    account_id=None,
    card_holder_id=None,
    status_filter=None,
)
Source code in components/payment_gateway/internal/business_logic/queries/webhook_log_extended_queries.py
def get_extended_webhook_logs_in_period(
    session: Session,
    /,
    from_date: date,
    to_date: date,
    account_id: UUID | None = None,
    card_holder_id: UUID | None = None,
    status_filter: WebhookLogTransferStatus | None = None,
) -> list[WebhookLogExtended]:
    webhook_logs_in_period = WebhookLogModelBroker.list_extended_webhook_logs_in_period(
        session,
        start_date=from_date,
        end_date=to_date,
        account_id=account_id,
        card_holder_id=card_holder_id,
        status_filter=status_filter,
    )

    return [
        _webhook_log_extended_to_dataclass(webhook_log_extended_tuple)
        for webhook_log_extended_tuple in webhook_logs_in_period
    ]

webhook_log_queries

get_latest_webhook_log_by_external_id

get_latest_webhook_log_by_external_id(
    session, /, external_id
)

Get webhook log by id, a webhook log is a record of a webhook payload sent to the payment gateway from Adyen

Args: - webhook_id: The internal id of the webhook log to fetch

  • WebhookLog: The webhook log record
  • id: The internal id of the webhook log
  • payload: The raw content of the webhook payload from Adyen
Source code in components/payment_gateway/internal/business_logic/queries/webhook_log_queries.py
def get_latest_webhook_log_by_external_id(
    session: Session,
    /,
    external_id: str,
) -> WebhookLogDataclass | None:
    """
    Get webhook log by id, a webhook log is a record of a webhook payload sent to the payment gateway from Adyen

    Args:
    - webhook_id: The internal id of the webhook log to fetch

    Returns:
    - WebhookLog: The webhook log record
      - id: The internal id of the webhook log
      - payload: The raw content of the webhook payload from Adyen
    """
    from components.payment_gateway.internal.models.brokers.webhook_log import (
        WebhookLogModelBroker,
    )

    webhook_logs = WebhookLogModelBroker.list_webhook_logs_by_external_id(
        session, external_id
    )
    sorted_webhook_logs = sorted(
        webhook_logs,
        key=lambda x: (x.sequence_number or -1, x.created_at),
        reverse=True,
    )
    if len(sorted_webhook_logs) == 0:
        return None

    return _to_dataclass(sorted_webhook_logs[0])

get_webhook_log

get_webhook_log(session, /, webhook_id)

Get webhook log by id, a webhook log is a record of a webhook payload sent to the payment gateway from Adyen

Args: - webhook_id: The internal id of the webhook log to fetch

  • WebhookLog: The webhook log record
  • id: The internal id of the webhook log
  • payload: The raw content of the webhook payload from Adyen
Source code in components/payment_gateway/internal/business_logic/queries/webhook_log_queries.py
def get_webhook_log(
    session: Session,
    /,
    webhook_id: UUID,
) -> WebhookLogDataclass:
    """
    Get webhook log by id, a webhook log is a record of a webhook payload sent to the payment gateway from Adyen

    Args:
    - webhook_id: The internal id of the webhook log to fetch

    Returns:
    - WebhookLog: The webhook log record
      - id: The internal id of the webhook log
      - payload: The raw content of the webhook payload from Adyen
    """
    from components.payment_gateway.internal.models.brokers.webhook_log import (
        WebhookLogModelBroker,
    )

    webhook_log = WebhookLogModelBroker.get_webhook_log(session, webhook_id)

    return _to_dataclass(webhook_log)