Events Pipeline Tutorial¶
A guided introduction for engineers who have never touched the events pipeline. By the end you will know what it is, when (and when not) to use it, and how to publish + consume your first event end-to-end.
For deeper reference once you're past this page: README ⧉, schemas.md ⧉, consumer.md ⧉, publisher README, archiver README.
1. What is the events pipeline¶
The events pipeline lets one component emit a typed event in a transaction, and any number of other components react to it asynchronously, with exactly-once delivery and per-stream ordering.
It is a transactional outbox ⧉ on top of PostgreSQL, drained into Redis Streams by a dedicated publisher service. Consumers run inside backend apps and process events with exactly-once semantics (lease + dedup + atomic checkpoint).
flowchart LR
subgraph App
direction LR
A[Business Logic]
end
subgraph Database
direction LR
B[(outbox_event)]
end
subgraph Publisher
direction LR
C[Publisher Worker]
end
subgraph Redis
direction LR
S[(Redis stream)]
end
subgraph App Consumer
direction LR
F[Consumer Worker] -->|call| G[Handler fn]
end
subgraph Archiver
direction LR
H[Archiver Worker] -->|archive| J[S3]
end
A -->|same tx| B
B -->|change detected| C
C -->|XADD| S
S -->|XREAD| F
S -->|XREAD| H
Producer writes the event row in the same DB transaction as its domain mutation. Once that transaction commits, the publisher service eventually picks it up and XADDs it to Redis. Consumer workers XREAD and call your handler. The archiver flushes everything to S3 for replay / audit.
2. When to use it (and when NOT)¶
Use the events pipeline when these are true:
- The producing component must not block on the consumer
- A consumer must reliably react to a domain change (not okay to miss)
- More than one consumer might want to react now or later
- Cross-component / cross-app propagation matters
| You need | Use |
|---|---|
| Decoupled, durable, multi-consumer fan-out | events pipeline |
| Background work owned by the same component (one producer, one consumer, no replay) | RQ background job |
| Synchronous side-effect inside the request | direct function call (TODO: other patterns) |
Anti-patterns: - Putting business decisions inside the consumer that the producer's request depends on → use a sync call instead. - Sending an event then immediately reading the resulting state from the same request → use a sync call. - Using the pipeline for "fire and forget" tasks with no semantic meaning (cache busts, log forwarding) → RQ is cheaper.
3. Quick start: ship your first event end-to-end¶
We'll add a fake WidgetCreated event on a new widgets stream, produce it from a backend action, and consume it from another component.
3.1 Define an EventSchema¶
File: stdlib/python/src/events_pipeline_common/schemas/widgets.py
from events_pipeline_common.schemas.utils.base import EventSchema
from events_pipeline_common.schemas.utils.registry import events_pipeline_schema
@events_pipeline_schema
class WidgetCreated(EventSchema):
widget_id: str
owner_id: str
Class name is globally unique in events_pipeline_common. Use only primitive / stdlib types in fields (no backend imports). Full rules: schemas.md ⧉.
3.2 Add a stream name¶
File: stdlib/python/src/events_pipeline_common/stream_names.py
Single source of truth for stream identifiers — typos caught by mypy / IDE.
3.3 Produce the event¶
In the business logic that creates a widget:
from components.events_pipeline.public.producer import send_event
from events_pipeline_common.schemas.widgets import WidgetCreated
from events_pipeline_common.stream_names import StreamName
def create_widget(owner_id: str) -> Widget:
widget = Widget(owner_id=owner_id)
current_session.add(widget)
current_session.flush()
send_event(
stream_name=StreamName.WIDGETS,
event=WidgetCreated(widget_id=str(widget.id), owner_id=owner_id),
)
return widget
send_event writes to events.outbox_event inside the surrounding transaction. If the request rolls back, the event vanishes too — the outbox is consistent with your domain data.
For the rare flow where the event must fire regardless of caller outcome (audit log, admin/CLI signal, re-emit from a worker that already committed), pass guarantee=Guarantee.AT_LEAST_ONCE. Breaks outbox atomicity — see guarantees.md ⧉ for the full mode comparison.
3.4 Consume the event¶
This explains how to consume from the main app backend. If you want to conume from DoctorAI, or an other app, contact us.
File: backend/components/<your_component>/internal/events_pipeline_consumers.py (the only path scanned by the consumer autodiscovery)
from sqlalchemy.orm import Session
from components.events_pipeline.public.consumer import events_pipeline_consumer
from components.events_pipeline.public.event import EventContext
from events_pipeline_common.schemas.widgets import WidgetCreated
from events_pipeline_common.stream_names import StreamName
@events_pipeline_consumer(
StreamName.WIDGETS,
name="widgets:notify_owner",
event_types=[WidgetCreated],
)
def notify_owner(
event: WidgetCreated,
context: EventContext,
session: Session,
) -> None:
# Use `session` for DB writes — the worker owns the transaction.
# Do NOT call session.commit().
...
Key rules:
- Handler signature is
(event, context, session). The worker auto-parses the wire payload into your typedEventSchemabefore calling you. nameis the stable identity stored for dedup + checkpointing. Never rename it.- Don't call
session.commit()— the worker owns the transaction boundary. - Raising re-delivers the event after ~5s. There is no DLQ today; catch + skip if you must drop.
Full reference: consumer.md ⧉.
3.5 Verify locally¶
Run publisher + consumer + send a test event:
# Migrations
direnv exec backend env APP=fr_api flask db upgrade
# Terminal 1 — publisher
cd events_pipeline/events_publisher
uv run python -m events_publisher start --log-level DEBUG
# Terminal 2 — consumer
direnv exec backend env APP=fr_api flask events_pipeline consumer
# Terminal 3 — produce a demo event (or trigger your real action)
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event \
--stream stream:widgets:events \
--event-type WidgetCreated \
--payload '{"widget_id":"abc","owner_id":"u1"}'
End-to-end flow: business logic → events.outbox_event (PG NOTIFY fires) → publisher XADDs to Redis Stream → consumer worker XREADs → your handler.
4. Where things live¶
stdlib/python/src/events_pipeline_common/ # shared with all components, NO backend imports
├── schemas/<domain>.py # @events_pipeline_schema EventSchema classes
└── stream_names.py # StreamName StrEnum
backend/components/{C}/internal/
└── events_pipeline_consumers.py # @events_pipeline_consumer handlers (autodiscovered)
backend/components/events_pipeline/
├── public/ # send_event, decorator, EventContext
├── internal/ # outbox_event, stream_lease, worker code
└── docs/ # this directory
events_pipeline/ # standalone services (separate Python projects)
├── events_publisher/ # PostgreSQL outbox → Redis Streams
└── events_archiver/ # Redis Streams → S3 JSONL
5. Operating it in prod¶
- Dashboards (Datadog EU):
- Infrastructure: https://app.datadoghq.eu/dashboard/t3e-aij-khn ⧉
- Business / per-stream throughput + latency: https://app.datadoghq.eu/dashboard/zn9-nwn-zv6 ⧉
- Alerts: producer backlog monitor
105658839("Payment Processing - Events Pipeline Producer Backlog"). Consumer-side alerts are still TODO at the time of writing. - Outbox cleanup: the publisher service runs an
OutboxCleanupdaemon (events_pipeline/events_publisher/src/events_publisher/db/cleanup.py) that deletes rows wherepublished_at IS NOT NULL AND published_at < now() - 7 days, in 1000-row batches every 5 min. Unbounded outbox growth → publisher down OR rows stuck unpublished. - Processed-event cleanup: each consumer worker prunes its own
processed_eventdedup rows (7d default). - Schema compat: CI runs
check_compat.pyagainstorigin/mainand rejects breaking schema changes (removed fields, new required, type narrowing). New optional fields must use theNOT_SETsentinel — see schemas.md ⧉.
6. Further reading¶
| Topic | File |
|---|---|
| Component overview + data model + diagrams | README.md ⧉ |
Delivery guarantees on producer + consumer (exactly_once / at_least_once / at_most_once) |
guarantees.md ⧉ |
| Schema definition rules + NOT_SET + compat checks | schemas.md ⧉ |
| Consumer internals + dedup + lease + retries | consumer.md ⧉ |
| Publisher service: run / configure / generate test events | events_publisher/README.md |
| Archiver service: Redis → S3 archival | events_pipeline/events_archiver/README.md |
| Public mkdocs site | https://mkdocs.alan.com/components/events_pipeline/ ⧉ |
| Design doc / decisions | Notion: Events Pipeline ⧉ |