Skip to content

Event Consumer

The consumer subsystem provides exactly-once event processing from Redis Streams with automatic deduplication, checkpointing, and lease-based coordination.

Pipeline overview

Where the consumer fits in the overall event pipeline:

flowchart LR
    subgraph Producer
        A[Business Logic] -->|same tx| B[(outbox_event)]
    end

    subgraph Publisher
        C[Publisher Worker] -->|poll| B
        C -->|XADD| S
    end

    S[Redis Streams]

    subgraph Archiver
        H[Archiver Worker] -->|XREAD| S
        H -->|archive| J[S3]
    end

    subgraph Consumer
        F[Consumer Worker] -->|XREAD| S
        F -->|call| G[Handler fn]
    end

    style Consumer fill:#e1f5fe,stroke:#0288d1
Hold "Alt" / "Option" to enable pan & zoom

Consumer architecture

flowchart LR
    subgraph Redis
        S[Redis Streams]
    end

    subgraph Supervisor
        SUP[ConsumerSupervisor] -->|spawns| W1[Worker 1]
        SUP -->|spawns| W2[Worker N]
    end

    subgraph "Worker (per handler)"
        W1 -->|XREAD| S
        W1 -->|acquire lease| DB[(stream_lease)]
        W1 -->|dedup check| PE[(processed_event)]
        W1 -->|call| H[Handler fn]
    end
Hold "Alt" / "Option" to enable pan & zoom

The supervisor autodiscovers registered handlers and spawns one worker process per handler. Each worker acquires a lease on its stream, reads events via XREAD, deduplicates, calls the handler, and updates the checkpoint — all within a single transaction.

Registering a consumer

Place your handler in components/{component}/internal/event_consumers.py — this is the only location autodiscover_consumers() scans.

from sqlalchemy.orm import Session

from components.events_pipeline.internal.consumer.event import StreamEvent
from components.events_pipeline.public.consumer import stream_consumer


@stream_consumer("stream:user:events", name="handle_user_event")
def handle_user_event(event: StreamEvent, session: Session) -> None:
    ...

Each consumer is registered for exactly one stream. Events are processed one at a time — each handler call gets a single StreamEvent and a Session.

The name is the stable identity of the consumer, stored in the DB for dedup (processed_event.consumer_name) and checkpointing (stream_lease). It must never change across renames or refactors — changing it would cause events to be reprocessed from the beginning.

StreamEvent

Each event passed to the handler is a frozen dataclass:

Field Type Description
stream_name str Redis stream the event came from
redis_id str Redis message ID
event_type str Logical event name (e.g. UserCreated)
outbox_id int Outbox table PK (for traceability)
event_uuid str Globally unique event ID (used for deduplication)
payload dict[str, Any] Business data
metadata dict[str, Any] \| None Trace IDs, schema version, etc.

Session and transaction semantics

The handler's session shares a transaction with the dedup record insert and checkpoint update. This is what guarantees exactly-once delivery: if the handler succeeds but the process crashes before commit, the entire transaction (handler side-effects + dedup + checkpoint) rolls back and the event is reprocessed.

Calling session.commit() inside a handler raises CommitInTransactionError — the worker owns the transaction boundary. This also applies to current_session.commit(), since current_session resolves to the same session (the REQUIRES_NEW transaction pushes it onto the TransactionContext stack). The commit guard is enforced at the SQLAlchemy level via a before_commit listener, so there is no way to bypass it.

sequenceDiagram
    participant W as Worker
    participant DB as PostgreSQL
    participant H as Handler

    W->>DB: BEGIN (REQUIRES_NEW)
    W->>DB: Check processed_event (dedup)
    W->>DB: INSERT processed_event
    W->>H: handler(event, session)
    H->>DB: Business writes (same tx)
    W->>DB: UPDATE checkpoint
    W->>DB: COMMIT
Hold "Alt" / "Option" to enable pan & zoom

Error handling

If a handler raises an exception:

  1. The entire transaction rolls back — dedup record, handler DB writes, and checkpoint update are all discarded
  2. The worker logs the error and retries after ~5 seconds
  3. Since the checkpoint was not updated, XREAD re-delivers the same event

The event is retried indefinitely until the handler succeeds. There is currently no max-retry limit or dead-letter queue — a poison message (one that always throws) will cause an infinite retry loop. Handlers should catch and handle expected errors internally if they want to skip an event.

Exactly-once delivery

Exactly-once is achieved by three mechanisms working together:

  1. Lease coordination — only one worker per (stream, role) pair via stream_lease table
  2. Deduplicationprocessed_event table tracks consumed event_uuid per consumer (globally unique, safe across multiple databases)
  3. Atomic checkpoint — dedup insert + handler work + checkpoint update in one transaction

If a worker crashes, its lease expires and another instance can take over from the last committed checkpoint.

Processed event cleanup

Old processed_event rows are cleaned up automatically by each worker. Configurable via:

Parameter Default Description
cleanup_retention_seconds 7 days How long to keep dedup records
cleanup_interval_seconds 5 min How often to run cleanup
cleanup_batch_size 1000 Max rows deleted per batch

Consumer naming

The full consumer name stored in the DB is {app_name}.{name}:

fr_api.handle_user_events

app_name comes from the APP env var at runtime. name is the explicit string passed to @stream_consumer.

Running locally

Prerequisites

  • PostgreSQL running locally (default: localhost:5432/alan_backend)
  • Redis running locally (default: localhost:6379/0)
  • Database migrations applied
# Run migrations (if not already done)
direnv exec backend env APP=fr_api flask db upgrade

1. Start the publisher

The consumer reads from Redis Streams, so the publisher must be running to move events from the outbox into Redis:

cd events_pipeline/events_publisher
uv run python -m events_publisher start --log-level DEBUG

2. Start the consumer

The consumer runs as a Flask command, tied to a specific app (country):

# For FR
direnv exec backend env APP=fr_api flask events_pipeline consumer

# For BE
direnv exec backend env APP=be_api flask events_pipeline consumer

The APP env var determines which app context is loaded and which consumers are discovered.

Options

Option Default Description
--poll-interval 5 Seconds between stream discovery polls
--batch-size 100 Max events per XREAD
--heartbeat-dir /tmp/events_consumer/heartbeats Directory for heartbeat files
--heartbeat-timeout 30 Kill worker if no heartbeat for N seconds
--graceful-shutdown-timeout 30 Wait N seconds before SIGKILL
--lease-duration 30 Lease duration in seconds
--lease-renewal 25 Renew lease after N seconds
--cleanup-retention 7 days Keep processed_event records for N seconds
--cleanup-interval 5 min Seconds between cleanup runs
--cleanup-batch-size 1000 Max rows deleted per cleanup batch

3. Send test events

Use the demo commands to produce events for e2e verification:

# Single event
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event

# Customize stream and payload
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event \
  --stream stream:user:events \
  --event-type UserCreated \
  --payload '{"user_id": "123"}'

# Continuous events (random batches, Ctrl+C to stop)
direnv exec backend env APP=fr_api flask events_pipeline_demo send-events-continuous

4. Monitor

# Watch all Redis commands
redis-cli MONITOR

# Read a specific stream
redis-cli XREAD STREAMS stream:demo:events 0

# Check consumer leases
direnv exec backend env APP=fr_api flask events_pipeline consumer-status

Full local e2e flow

Producer (send-event) → outbox_event table → Publisher → Redis Stream → Consumer → handler fn
  1. send-event inserts into events.outbox_event + fires NOTIFY
  2. Publisher picks it up, XADDs to Redis Stream
  3. Consumer XREADs, deduplicates, calls handler, commits checkpoint