Skip to content

Delivery guarantees

Both send_event (producer) and @events_pipeline_consumer (consumer) accept a guarantee keyword. Same vocabulary on both sides — pick the contract that matches what you're actually doing.

from components.events_pipeline.public.guarantee import Guarantee

Guarantee is a StrEnum with three members: EXACTLY_ONCE, AT_LEAST_ONCE, AT_MOST_ONCE. Default everywhere is Guarantee.EXACTLY_ONCE.

Side-by-side

Side Mode Behavior
Producer EXACTLY_ONCE (default) outbox row joins caller tx — commits/rolls back atomically with business state
Producer AT_LEAST_ONCE outbox row in own REQUIRES_NEW — commits regardless of caller (event may fire without business state)
Producer AT_MOST_ONCE rejected — durable publication is the whole point of the outbox
Consumer EXACTLY_ONCE (default) worker txn wraps handler + dedup row + checkpoint; handler is (event, context, session), commit-guarded
Consumer AT_LEAST_ONCE handler runs outside any worker txn; dedup row written after success; retried on failure; handler is (event, context)
Consumer AT_MOST_ONCE dedup row written before handler; handler runs outside any worker txn; not retried on failure; handler is (event, context)

Producer

EXACTLY_ONCE (default)

Joins the caller's transaction (Propagation.REQUIRED). The outbox row commits or rolls back atomically with the caller's business state — the textbook outbox guarantee. Use this 99% of the time.

def create_widget(owner_id: str) -> Widget:
    widget = Widget(owner_id=owner_id)
    current_session.add(widget)
    current_session.flush()

    send_event(
        stream_name=StreamName.WIDGETS,
        event=WidgetCreated(widget_id=str(widget.id), owner_id=owner_id),
    )
    return widget

If the caller raises after send_event returns, the outbox row disappears with the rest of the transaction. Downstream consumers never see an event that doesn't match real database state.

AT_LEAST_ONCE

Opens a fresh REQUIRES_NEW and commits the outbox row even if the caller later rolls back. Use only when the event is genuinely independent of caller state.

send_event(
    stream_name=StreamName.AUDIT_TRAIL,
    event=AdminActionAttempted(actor_id=actor.id, action="purge_user"),
    guarantee=Guarantee.AT_LEAST_ONCE,
)

When to use: - audit log entries that should land regardless of request outcome - admin / CLI-triggered events with no co-tx state - re-emits from a worker that already committed its primary write

When NOT to use: - normal domain events tied to business state (leave the default) - as a way to "make the event arrive faster" — the publisher's pg_notify trigger already wakes the publisher in milliseconds; latency is not the reason this mode exists

AT_MOST_ONCE

Rejected with ValueError. Durable publication is the whole point of the outbox; dropping events on caller failure is not a supported mode.

Consumer

EXACTLY_ONCE (default)

The worker wraps handler invocation, dedup-row insert, and checkpoint update in one REQUIRES_NEW transaction. Handler signature is (event, context, session). Calling session.commit() raises CommitInTransactionError — the worker owns the transaction boundary.

@events_pipeline_consumer(
    StreamName.WIDGETS,
    name="my_component:on_widget_created",
    event_types=[WidgetCreated],
)
def on_widget_created(
    event: WidgetCreated, context: EventContext, session: Session
) -> None:
    ...

AT_LEAST_ONCE

Handler runs outside any worker transaction. The worker writes the dedup row only after the handler returns successfully. If the handler crashes before returning, the event is redelivered. Handlers MUST be idempotent.

@events_pipeline_consumer(
    StreamName.AUDIT_TRAIL,
    name="my_component:on_audit",
    event_types=[AuditAttempted],
    guarantee=Guarantee.AT_LEAST_ONCE,
)
def on_audit(event: AuditAttempted, context: EventContext) -> None:
    # uses current_session; may commit; must be idempotent
    ...

Note: handler signature drops the session arg.

AT_MOST_ONCE

Worker writes the dedup row first, then runs the handler outside any worker transaction. If the handler crashes, the dedup row is already committed and the event will not be redelivered — side effects from this run are orphaned by design. Reserve for fire-and-forget actions where re-running is more harmful than missing once.

@events_pipeline_consumer(
    StreamName.IRREVERSIBLE_PAGER,
    name="my_component:on_pager",
    event_types=[PagerTriggered],
    guarantee=Guarantee.AT_MOST_ONCE,
)
def on_pager(event: PagerTriggered, context: EventContext) -> None:
    # external irreversible action without idempotency key
    pagerduty.send(event.summary)

Picking the right mode

You are... Mode
publishing a domain event tied to a created/updated entity producer EXACTLY_ONCE (default)
publishing an audit log of an attempted action producer AT_LEAST_ONCE
consuming with handler logic that must be atomic with the dedup row consumer EXACTLY_ONCE (default)
consuming with helpers that already commit (renewal flows, email shoots, webhook handlers) consumer AT_LEAST_ONCE (and verify idempotency)
consuming for an irreversible side effect that must NOT retry on failure consumer AT_MOST_ONCE