Delivery guarantees¶
The producer picks its transaction scope by function name
(send_event_on_commit vs send_event_immediately); the consumer
(@events_pipeline_consumer) takes a guarantee keyword. Same vocabulary on
both sides — pick the contract that matches what you're actually doing.
Guarantee is a StrEnum with three members: EXACTLY_ONCE,
AT_LEAST_ONCE, AT_MOST_ONCE. Consumer default is
Guarantee.EXACTLY_ONCE.
Side-by-side¶
| Side | Mode | Behavior |
|---|---|---|
| Producer | send_event_on_commit (default choice) |
outbox row joins caller tx — commits/rolls back atomically with business state |
| Producer | send_event_immediately |
outbox row in own REQUIRES_NEW — commits regardless of caller (event may fire without business state) |
| Consumer | EXACTLY_ONCE (default) |
worker txn wraps handler + dedup row + checkpoint; handler is (event, context, session), commit-guarded |
| Consumer | AT_LEAST_ONCE |
handler runs outside any worker txn; dedup row written after success; retried on failure; handler is (event, context) |
| Consumer | AT_MOST_ONCE |
dedup row written before handler; handler runs outside any worker txn; not retried on failure; handler is (event, context) |
Producer¶
send_event_on_commit (default choice)¶
Joins the caller's transaction (Propagation.REQUIRED). The outbox row
commits or rolls back atomically with the caller's business state — the
textbook outbox guarantee. Use this 99% of the time.
def create_widget(owner_id: str) -> Widget:
widget = Widget(owner_id=owner_id)
current_session.add(widget)
current_session.flush()
send_event_on_commit(
stream_name=StreamName.WIDGETS,
event=WidgetCreated(widget_id=str(widget.id), owner_id=owner_id),
)
return widget
If the caller raises after send_event_on_commit returns, the outbox row
disappears with the rest of the transaction. Downstream consumers never see
an event that doesn't match real database state.
send_event_immediately¶
Opens a fresh REQUIRES_NEW and commits the outbox row even if the caller
later rolls back. Use only when the event is genuinely independent of
caller state.
send_event_immediately(
stream_name=StreamName.AUDIT_TRAIL,
event=AdminActionAttempted(actor_id=actor.id, action="purge_user"),
)
When to use: - audit log entries that should land regardless of request outcome - admin / CLI-triggered events with no co-tx state - re-emits from a worker that already committed its primary write
When NOT to use:
- normal domain events tied to business state (leave the default)
- as a way to "make the event arrive faster" — the publisher's pg_notify
trigger already wakes the publisher in milliseconds; latency is not the
reason this mode exists
AT_MOST_ONCE¶
Not exposed by the producer — there is no function for it. Durable
publication is the whole point of the outbox; dropping events on caller
failure is not a supported mode. (The internal _send_raw_event still
guards against it with a ValueError.)
Consumer¶
EXACTLY_ONCE (default)¶
The worker wraps handler invocation, dedup-row insert, and checkpoint update
in one REQUIRES_NEW transaction. Handler signature is
(event, context, session). Calling session.commit() raises
CommitInTransactionError — the worker owns the transaction boundary.
@events_pipeline_consumer(
StreamName.WIDGETS,
name="my_component:on_widget_created",
events_origin=EventOrigin.ALL_APPS,
event_types=[WidgetCreated],
)
def on_widget_created(
event: WidgetCreated, context: EventContext, session: Session
) -> None:
...
AT_LEAST_ONCE¶
Handler runs outside any worker transaction. The worker writes the dedup row only after the handler returns successfully. If the handler crashes before returning, the event is redelivered. Handlers MUST be idempotent.
@events_pipeline_consumer(
StreamName.AUDIT_TRAIL,
name="my_component:on_audit",
events_origin=EventOrigin.ALL_APPS,
event_types=[AuditAttempted],
guarantee=Guarantee.AT_LEAST_ONCE,
)
def on_audit(event: AuditAttempted, context: EventContext) -> None:
# uses current_session; may commit; must be idempotent
...
Note: handler signature drops the session arg.
AT_MOST_ONCE¶
Worker writes the dedup row first, then runs the handler outside any worker transaction. If the handler crashes, the dedup row is already committed and the event will not be redelivered — side effects from this run are orphaned by design. Reserve for fire-and-forget actions where re-running is more harmful than missing once.
@events_pipeline_consumer(
StreamName.IRREVERSIBLE_PAGER,
name="my_component:on_pager",
events_origin=EventOrigin.ALL_APPS,
event_types=[PagerTriggered],
guarantee=Guarantee.AT_MOST_ONCE,
)
def on_pager(event: PagerTriggered, context: EventContext) -> None:
# external irreversible action without idempotency key
pagerduty.send(event.summary)
Picking the right mode¶
| You are... | Mode |
|---|---|
| publishing a domain event tied to a created/updated entity | producer EXACTLY_ONCE (default) |
| publishing an audit log of an attempted action | producer AT_LEAST_ONCE |
| consuming with handler logic that must be atomic with the dedup row | consumer EXACTLY_ONCE (default) |
| consuming with helpers that already commit (renewal flows, email shoots, webhook handlers) | consumer AT_LEAST_ONCE (and verify idempotency) |
| consuming for an irreversible side effect that must NOT retry on failure | consumer AT_MOST_ONCE |