Skip to content

Events Pipeline Tutorial

A guided introduction for engineers who have never touched the events pipeline. By the end you will know what it is, when (and when not) to use it, and how to publish + consume your first event end-to-end.

For deeper reference once you're past this page: README ⧉, schemas.md ⧉, consumer.md ⧉, publisher README, archiver README.

1. What is the events pipeline

The events pipeline lets one component emit a typed event in a transaction, and any number of other components react to it asynchronously, with exactly-once delivery and per-stream ordering.

It is a transactional outbox ⧉ on top of PostgreSQL, drained into Redis Streams by a dedicated publisher service. Consumers run inside backend apps and process events with exactly-once semantics (lease + dedup + atomic checkpoint).

flowchart LR
    subgraph App
        direction LR
        A[Business Logic]
    end

    subgraph Database
        direction LR
        B[(outbox_event)]
    end

    subgraph Publisher
        direction LR
        C[Publisher Worker]
    end

    subgraph Redis
        direction LR
        S[(Redis stream)]
    end

    subgraph App Consumer
        direction LR
        F[Consumer Worker] -->|call| G[Handler fn]
    end

    subgraph Archiver
        direction LR
        H[Archiver Worker] -->|archive| J[S3]
    end

    A -->|same tx| B
    B -->|change detected| C
    C -->|XADD| S
    S -->|XREAD| F
    S -->|XREAD| H
Hold "Alt" / "Option" to enable pan & zoom

Producer writes the event row in the same DB transaction as its domain mutation. Once that transaction commits, the publisher service eventually picks it up and XADDs it to Redis. Consumer workers XREAD and call your handler. The archiver flushes everything to S3 for replay / audit.

2. When to use it (and when NOT)

Use the events pipeline when these are true:
  • The producing component must not block on the consumer
  • A consumer must reliably react to a domain change (not okay to miss)
  • More than one consumer might want to react now or later
  • Cross-component / cross-app propagation matters
You need Use
Decoupled, durable, multi-consumer fan-out events pipeline
Background work owned by the same component (one producer, one consumer, no replay) RQ background job
Synchronous side-effect inside the request direct function call (TODO: other patterns)

Anti-patterns: - Putting business decisions inside the consumer that the producer's request depends on → use a sync call instead. - Sending an event then immediately reading the resulting state from the same request → use a sync call. - Using the pipeline for "fire and forget" tasks with no semantic meaning (cache busts, log forwarding) → RQ is cheaper.

3. Quick start: ship your first event end-to-end

We'll add a fake WidgetCreated event on a new widgets stream, produce it from a backend action, and consume it from another component.

3.1 Define an EventSchema

File: stdlib/python/src/events_pipeline_common/schemas/widgets.py

from events_pipeline_common.schemas.utils.base import EventSchema
from events_pipeline_common.schemas.utils.registry import events_pipeline_schema


@events_pipeline_schema
class WidgetCreated(EventSchema):
    widget_id: str
    owner_id: str

Class name is globally unique in events_pipeline_common. Use only primitive / stdlib types in fields (no backend imports). Full rules: schemas.md ⧉.

3.2 Add a stream name

File: stdlib/python/src/events_pipeline_common/stream_names.py

class StreamName(StrEnum):
    # ... existing members ...
    WIDGETS = "stream:widgets:events"

Single source of truth for stream identifiers — typos caught by mypy / IDE.

3.3 Produce the event

In the business logic that creates a widget:

from components.events_pipeline.public.producer import send_event
from events_pipeline_common.schemas.widgets import WidgetCreated
from events_pipeline_common.stream_names import StreamName


def create_widget(owner_id: str) -> Widget:
    widget = Widget(owner_id=owner_id)
    current_session.add(widget)
    current_session.flush()

    send_event(
        stream_name=StreamName.WIDGETS,
        event=WidgetCreated(widget_id=str(widget.id), owner_id=owner_id),
    )
    return widget

send_event writes to events.outbox_event inside the surrounding transaction. If the request rolls back, the event vanishes too — the outbox is consistent with your domain data.

For the rare flow where the event must fire regardless of caller outcome (audit log, admin/CLI signal, re-emit from a worker that already committed), pass guarantee=Guarantee.AT_LEAST_ONCE. Breaks outbox atomicity — see guarantees.md ⧉ for the full mode comparison.

3.4 Consume the event

This explains how to consume from the main app backend. If you want to conume from DoctorAI, or an other app, contact us.

File: backend/components/<your_component>/internal/events_pipeline_consumers.py (the only path scanned by the consumer autodiscovery)

from sqlalchemy.orm import Session

from components.events_pipeline.public.consumer import events_pipeline_consumer
from components.events_pipeline.public.event import EventContext
from events_pipeline_common.schemas.widgets import WidgetCreated
from events_pipeline_common.stream_names import StreamName


@events_pipeline_consumer(
    StreamName.WIDGETS,
    name="widgets:notify_owner",
    event_types=[WidgetCreated],
)
def notify_owner(
    event: WidgetCreated,
    context: EventContext,
    session: Session,
) -> None:
    # Use `session` for DB writes — the worker owns the transaction.
    # Do NOT call session.commit().
    ...

Key rules:

  • Handler signature is (event, context, session). The worker auto-parses the wire payload into your typed EventSchema before calling you.
  • name is the stable identity stored for dedup + checkpointing. Never rename it.
  • Don't call session.commit() — the worker owns the transaction boundary.
  • Raising re-delivers the event after ~5s. There is no DLQ today; catch + skip if you must drop.

Full reference: consumer.md ⧉.

3.5 Verify locally

Run publisher + consumer + send a test event:

# Migrations
direnv exec backend env APP=fr_api flask db upgrade

# Terminal 1 — publisher
cd events_pipeline/events_publisher
uv run python -m events_publisher start --log-level DEBUG

# Terminal 2 — consumer
direnv exec backend env APP=fr_api flask events_pipeline consumer

# Terminal 3 — produce a demo event (or trigger your real action)
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event \
  --stream stream:widgets:events \
  --event-type WidgetCreated \
  --payload '{"widget_id":"abc","owner_id":"u1"}'

End-to-end flow: business logic → events.outbox_event (PG NOTIFY fires) → publisher XADDs to Redis Stream → consumer worker XREADs → your handler.

4. Where things live

stdlib/python/src/events_pipeline_common/      # shared with all components, NO backend imports
├── schemas/<domain>.py                        # @events_pipeline_schema EventSchema classes
└── stream_names.py                            # StreamName StrEnum

backend/components/{C}/internal/
└── events_pipeline_consumers.py               # @events_pipeline_consumer handlers (autodiscovered)

backend/components/events_pipeline/
├── public/                                    # send_event, decorator, EventContext
├── internal/                                  # outbox_event, stream_lease, worker code
└── docs/                                      # this directory

events_pipeline/                               # standalone services (separate Python projects)
├── events_publisher/                          # PostgreSQL outbox → Redis Streams
└── events_archiver/                           # Redis Streams → S3 JSONL

5. Operating it in prod

  • Dashboards (Datadog EU):
  • Infrastructure: https://app.datadoghq.eu/dashboard/t3e-aij-khn ⧉
  • Business / per-stream throughput + latency: https://app.datadoghq.eu/dashboard/zn9-nwn-zv6 ⧉
  • Alerts: producer backlog monitor 105658839 ("Payment Processing - Events Pipeline Producer Backlog"). Consumer-side alerts are still TODO at the time of writing.
  • Outbox cleanup: the publisher service runs an OutboxCleanup daemon (events_pipeline/events_publisher/src/events_publisher/db/cleanup.py) that deletes rows where published_at IS NOT NULL AND published_at < now() - 7 days, in 1000-row batches every 5 min. Unbounded outbox growth → publisher down OR rows stuck unpublished.
  • Processed-event cleanup: each consumer worker prunes its own processed_event dedup rows (7d default).
  • Schema compat: CI runs check_compat.py against origin/main and rejects breaking schema changes (removed fields, new required, type narrowing). New optional fields must use the NOT_SET sentinel — see schemas.md ⧉.

6. Further reading

Topic File
Component overview + data model + diagrams README.md ⧉
Delivery guarantees on producer + consumer (exactly_once / at_least_once / at_most_once) guarantees.md ⧉
Schema definition rules + NOT_SET + compat checks schemas.md ⧉
Consumer internals + dedup + lease + retries consumer.md ⧉
Publisher service: run / configure / generate test events events_publisher/README.md
Archiver service: Redis → S3 archival events_pipeline/events_archiver/README.md
Public mkdocs site https://mkdocs.alan.com/components/events_pipeline/ ⧉
Design doc / decisions Notion: Events Pipeline ⧉