Delivery guarantees¶
Both send_event (producer) and @events_pipeline_consumer (consumer)
accept a guarantee keyword. Same vocabulary on both sides — pick the
contract that matches what you're actually doing.
Guarantee is a StrEnum with three members: EXACTLY_ONCE,
AT_LEAST_ONCE, AT_MOST_ONCE. Default everywhere is
Guarantee.EXACTLY_ONCE.
Side-by-side¶
| Side | Mode | Behavior |
|---|---|---|
| Producer | EXACTLY_ONCE (default) |
outbox row joins caller tx — commits/rolls back atomically with business state |
| Producer | AT_LEAST_ONCE |
outbox row in own REQUIRES_NEW — commits regardless of caller (event may fire without business state) |
| Producer | AT_MOST_ONCE |
rejected — durable publication is the whole point of the outbox |
| Consumer | EXACTLY_ONCE (default) |
worker txn wraps handler + dedup row + checkpoint; handler is (event, context, session), commit-guarded |
| Consumer | AT_LEAST_ONCE |
handler runs outside any worker txn; dedup row written after success; retried on failure; handler is (event, context) |
| Consumer | AT_MOST_ONCE |
dedup row written before handler; handler runs outside any worker txn; not retried on failure; handler is (event, context) |
Producer¶
EXACTLY_ONCE (default)¶
Joins the caller's transaction (Propagation.REQUIRED). The outbox row
commits or rolls back atomically with the caller's business state — the
textbook outbox guarantee. Use this 99% of the time.
def create_widget(owner_id: str) -> Widget:
widget = Widget(owner_id=owner_id)
current_session.add(widget)
current_session.flush()
send_event(
stream_name=StreamName.WIDGETS,
event=WidgetCreated(widget_id=str(widget.id), owner_id=owner_id),
)
return widget
If the caller raises after send_event returns, the outbox row disappears
with the rest of the transaction. Downstream consumers never see an event
that doesn't match real database state.
AT_LEAST_ONCE¶
Opens a fresh REQUIRES_NEW and commits the outbox row even if the caller
later rolls back. Use only when the event is genuinely independent of
caller state.
send_event(
stream_name=StreamName.AUDIT_TRAIL,
event=AdminActionAttempted(actor_id=actor.id, action="purge_user"),
guarantee=Guarantee.AT_LEAST_ONCE,
)
When to use: - audit log entries that should land regardless of request outcome - admin / CLI-triggered events with no co-tx state - re-emits from a worker that already committed its primary write
When NOT to use:
- normal domain events tied to business state (leave the default)
- as a way to "make the event arrive faster" — the publisher's pg_notify
trigger already wakes the publisher in milliseconds; latency is not the
reason this mode exists
AT_MOST_ONCE¶
Rejected with ValueError. Durable publication is the whole point of
the outbox; dropping events on caller failure is not a supported mode.
Consumer¶
EXACTLY_ONCE (default)¶
The worker wraps handler invocation, dedup-row insert, and checkpoint update
in one REQUIRES_NEW transaction. Handler signature is
(event, context, session). Calling session.commit() raises
CommitInTransactionError — the worker owns the transaction boundary.
@events_pipeline_consumer(
StreamName.WIDGETS,
name="my_component:on_widget_created",
event_types=[WidgetCreated],
)
def on_widget_created(
event: WidgetCreated, context: EventContext, session: Session
) -> None:
...
AT_LEAST_ONCE¶
Handler runs outside any worker transaction. The worker writes the dedup row only after the handler returns successfully. If the handler crashes before returning, the event is redelivered. Handlers MUST be idempotent.
@events_pipeline_consumer(
StreamName.AUDIT_TRAIL,
name="my_component:on_audit",
event_types=[AuditAttempted],
guarantee=Guarantee.AT_LEAST_ONCE,
)
def on_audit(event: AuditAttempted, context: EventContext) -> None:
# uses current_session; may commit; must be idempotent
...
Note: handler signature drops the session arg.
AT_MOST_ONCE¶
Worker writes the dedup row first, then runs the handler outside any worker transaction. If the handler crashes, the dedup row is already committed and the event will not be redelivered — side effects from this run are orphaned by design. Reserve for fire-and-forget actions where re-running is more harmful than missing once.
@events_pipeline_consumer(
StreamName.IRREVERSIBLE_PAGER,
name="my_component:on_pager",
event_types=[PagerTriggered],
guarantee=Guarantee.AT_MOST_ONCE,
)
def on_pager(event: PagerTriggered, context: EventContext) -> None:
# external irreversible action without idempotency key
pagerduty.send(event.summary)
Picking the right mode¶
| You are... | Mode |
|---|---|
| publishing a domain event tied to a created/updated entity | producer EXACTLY_ONCE (default) |
| publishing an audit log of an attempted action | producer AT_LEAST_ONCE |
| consuming with handler logic that must be atomic with the dedup row | consumer EXACTLY_ONCE (default) |
| consuming with helpers that already commit (renewal flows, email shoots, webhook handlers) | consumer AT_LEAST_ONCE (and verify idempotency) |
| consuming for an irreversible side effect that must NOT retry on failure | consumer AT_MOST_ONCE |