Skip to content

Event Consumer

New here? Start with the Tutorial.

The consumer subsystem provides exactly-once event processing from Redis Streams with automatic deduplication, checkpointing, and lease-based coordination.

Pipeline overview

Where the consumer fits in the overall event pipeline:

flowchart LR
    subgraph Producer
        A[Business Logic] -->|same tx| B[(outbox_event)]
    end

    subgraph Publisher
        C[Publisher Worker] -->|poll| B
        C -->|XADD| S
    end

    S[Redis Streams]

    subgraph Archiver
        H[Archiver Worker] -->|XREAD| S
        H -->|archive| J[S3]
    end

    subgraph Consumer
        F[Consumer Worker] -->|XREAD| S
        F -->|call| G[Handler fn]
    end

    style Consumer fill:#e1f5fe,stroke:#0288d1
Hold "Alt" / "Option" to enable pan & zoom

Consumer architecture

flowchart LR
    subgraph Redis
        S[Redis Streams]
    end

    subgraph Supervisor
        SUP[ConsumerSupervisor] -->|spawns| W1[Worker 1]
        SUP -->|spawns| W2[Worker N]
    end

    subgraph "Worker (per handler)"
        W1 -->|XREAD| S
        W1 -->|acquire lease| DB[(stream_lease)]
        W1 -->|dedup check| PE[(processed_event)]
        W1 -->|call| H[Handler fn]
    end
Hold "Alt" / "Option" to enable pan & zoom

The supervisor autodiscovers registered handlers and spawns one worker process per handler. Each worker acquires a lease on its stream, reads events via XREAD, deduplicates, calls the handler, and updates the checkpoint — all within a single transaction.

Registering a consumer

Place your handler in components/{component}/internal/events_pipeline_consumers.py — this is the only location autodiscover_consumers() scans.

from sqlalchemy.orm import Session

from components.events_pipeline.internal.consumer.event import StreamEvent
from components.events_pipeline.public.consumer import events_pipeline_consumer


@events_pipeline_consumer("stream:user:events", name="handle_user_event")
def handle_user_event(event: StreamEvent, session: Session) -> None:
    ...

Each consumer is registered for exactly one stream. Events are processed one at a time — each handler call gets a single StreamEvent and a Session.

The name is the stable identity of the consumer, stored in the DB for dedup (processed_event.consumer_name) and checkpointing (stream_lease). It must never change across renames or refactors — changing it would cause events to be reprocessed from the beginning.

StreamEvent

Each event passed to the handler is a frozen dataclass:

Field Type Description
stream_name str Redis stream the event came from
redis_id str Redis message ID
event_type str Logical event name (e.g. UserCreated)
outbox_id int Outbox table PK (for traceability)
event_uuid str Globally unique event ID (used for deduplication)
payload dict[str, Any] Business data
metadata dict[str, Any] \| None Trace IDs, schema version, etc.

Session and transaction semantics

This section describes the default Guarantee.EXACTLY_ONCE mode. For the other two modes (AT_LEAST_ONCE, AT_MOST_ONCE), see the Delivery guarantees page — the handler signature drops the session arg, the worker does not wrap the handler in a transaction, and the dedup row is written outside the handler call.

The handler's session shares a transaction with the dedup record insert and checkpoint update. This is what guarantees exactly-once delivery: if the handler succeeds but the process crashes before commit, the entire transaction (handler side-effects + dedup + checkpoint) rolls back and the event is reprocessed.

Calling session.commit() inside a handler raises CommitInTransactionError — the worker owns the transaction boundary. This also applies to current_session.commit(), since current_session resolves to the same session (the REQUIRES_NEW transaction pushes it onto the TransactionContext stack). The commit guard is enforced at the SQLAlchemy level via a before_commit listener, so there is no way to bypass it.

sequenceDiagram
    participant W as Worker
    participant DB as PostgreSQL
    participant H as Handler

    W->>DB: BEGIN (REQUIRES_NEW)
    W->>DB: Check processed_event (dedup)
    W->>DB: INSERT processed_event
    W->>H: handler(event, session)
    H->>DB: Business writes (same tx)
    W->>DB: UPDATE checkpoint
    W->>DB: COMMIT
Hold "Alt" / "Option" to enable pan & zoom

Error handling

If a handler raises an exception:

  1. The entire transaction rolls back — dedup record, handler DB writes, and checkpoint update are all discarded
  2. The worker logs the error and retries after ~5 seconds
  3. Since the checkpoint was not updated, XREAD re-delivers the same event

The event is retried indefinitely until the handler succeeds. There is currently no max-retry limit or dead-letter queue — a poison message (one that always throws) will cause an infinite retry loop. Handlers should catch and handle expected errors internally if they want to skip an event.

Exactly-once delivery

Exactly-once is achieved by three mechanisms working together:

  1. Lease coordination — only one worker per (stream, role) pair via stream_lease table
  2. Deduplicationprocessed_event table tracks consumed event_uuid per consumer (globally unique, safe across multiple databases)
  3. Atomic checkpoint — dedup insert + handler work + checkpoint update in one transaction

If a worker crashes, its lease expires and another instance can take over from the last committed checkpoint.

This is the default mode (Guarantee.EXACTLY_ONCE). When a handler genuinely cannot fit inside a single worker-owned transaction (helpers that already commit, external API calls between writes, etc.), opt out with guarantee=Guarantee.AT_LEAST_ONCE or Guarantee.AT_MOST_ONCE. See Delivery guarantees for the full mapping table and trade-offs on both sides of the pipeline.

Telemetry

events_pipeline.consumer.lag_ms

Gauge emitted by every consumer worker on each loop iteration of _consume_loop (see internal/consumer/lag.py). Value is the age, in milliseconds, of the next unconsumed event in the stream past the worker's current checkpoint, measured from the event's creation time (when send_event ran on the producer side, sampled with Redis time and stamped into metadata under __creation_timestamp_ms__). 0 means the consumer is caught up.

Measuring from creation time rather than from the stream message id means publisher latency is visible in the lag — if events sit in the outbox for a while before being XADDed to Redis, that gap shows up here too.

For events emitted before the producer started stamping __creation_timestamp_ms__ (in-flight upgrade window), the metric falls back to the Redis stream message id, which understates lag by the publisher delay.

Tags:

Tag Value
stream_name Redis stream name
consumer_name Full consumer name ({app}.{registration_name})

How it works: one XRANGE stream "(checkpoint" + COUNT 1 per loop iteration (microseconds even on big streams), plus one XRANGE immediately after each drained batch so the gauge snaps back to 0 without waiting for the next loop. Each XRANGE returns the message fields, so the metadata is parsed for the creation timestamp without extra round-trips.

events_pipeline.consumer.lag_events_count

Companion gauge to lag_ms: number of unconsumed events past the consumer's checkpoint. Computed by the consumer supervisor (one process per backend service) every 10 s in SupervisorMetricsReporter._report_lag (internal/consumer/metrics/reporter.py), not in the per-worker loop — unbounded XRANGE runs once per stream per supervisor cycle, regardless of how many consumer workers exist.

The value is exact (no cap). When consumer_lag >= 100_000 the supervisor emits a WARN log so on-call can spot supervisor strain (a single XRANGE call streams ~10MB+ at that scale).

Use alongside lag_ms: - lag_ms high + lag_events_count low → a few sticky events (poison message?) blocking the consumer. - lag_ms high + lag_events_count very high → consumer is throughput-bound, not latency-bound on a single event.

Tags (same as lag_ms):

Tag Value
stream_name Redis stream name
consumer_name Full consumer name ({app}.{registration_name})
Condition Severity
events_pipeline.consumer.lag_ms > 60_000 for 5m warn
events_pipeline.consumer.lag_ms > 300_000 for 5m page
events_pipeline.consumer.lag_events_count > 10_000 for 5m warn (real backlog)
no_data on events_pipeline.consumer.lag_ms for 5m page (worker dead / not emitting)

Start at warn-only to baseline per-stream normal range before promoting to page. The producer-side counterpart is monitor 105658839 ("Events Pipeline Producer Backlog").

events_pipeline.consumer.unknown_event_type

Counter incremented every time a consumer worker encounters an event_type it has no schema for. The worker blocks (re-reading the same event) until a schema is deployed — a steady non-zero rate on this metric means a stuck worker, not silent data loss.

Crash-loop protection

ConsumerSupervisor forks one worker per consumer. A worker that dies fast on startup (broken import, DB unreachable, OOM during _setup) gets respawned by the supervisor on the next loop iteration. Without a brake, that becomes a tight fork loop on a ~2 GB Flask parent — copy-on-write of the whole heap on every fork — and the host OOMs in minutes.

BaseSupervisor (inherited by all three supervisors: publisher, archiver, consumer) ships with a crash-loop guard.

Mechanism

  • Rolling-window crash counter: when a child dies, the supervisor records the timestamp. Default: ≥ 5 crashes within 60 s ⇒ the consumer is quarantined. Tunable via the crash_window_seconds / max_crashes_in_window constructor args.
  • Sticky quarantine, no auto-recovery: quarantined consumers are skipped during the spawn pass for the lifetime of the supervisor process. There is no cooldown — auto-recovery would re-enter the fork loop on an unresolved bug. Release is an explicit operator action (see below).
  • Healthy-run reset: a worker that lived ≥ healthy_run_seconds (default 60 s) clears its crash history. A crash after a stable run is treated as a fresh incident, not part of the startup loop.
  • Clock-drift safety: uptime is measured from time.monotonic() (via ProcessInfo.started_monotonic), not wall clock.

Persistence across restarts (consumer-only)

ConsumerSupervisor overrides the _on_quarantine hook to persist quarantine events to a Redis hash, and reads the hash back at startup to seed the in-memory state. This makes quarantine survive supervisor restarts — a restart alone is not enough to release a quarantined consumer; the operator must clear the Redis entry.

Hash: events_consumer:quarantined_consumers - Field (= consumer name, namespaced by app): {app_name}.{registration_name} — e.g. fr_api.handle_user_event. Cross-app collisions are impossible by construction (same hash is shared across all apps; each supervisor filters by its own {app_name}. prefix on startup). - Value (JSON): {"quarantined_at": "<ISO 8601 Z>", "reason": "crash_loop", "stream_name": "<stream>"}.

# Inspect quarantined consumers
redis-cli HGETALL events_consumer:quarantined_consumers

# Release one consumer (it will be respawned on the next supervisor poll)
redis-cli HDEL events_consumer:quarantined_consumers fr_api.handle_user_event

Note: HDEL is necessary but not sufficient — the running supervisor still holds the consumer in its in-memory _quarantined set. To actually release a consumer mid-run you must either (a) HDEL then restart the supervisor (it will not rehydrate that entry), or (b) restart the supervisor and HDEL — order is irrelevant. A future operator CLI may add a SIGNAL-driven in-process release.

Errors talking to Redis on the _on_quarantine write path and the startup-rehydration read path are swallowed and logged. A Redis hiccup never crashes the supervisor — the in-memory set remains the authority for the running process; only the persistence-across-restart property degrades.

Telemetry

Metric Type Tags Meaning
supervisor.worker.quarantined counter stream_name:<consumer_name> Fired once when a stream tips into quarantine.
supervisor.workers.quarantined gauge none Number of streams currently quarantined.

Log signals: - supervisor_worker_quarantined (error): emitted at the moment of quarantine, with crash_count + window_seconds. - restored_quarantine_from_redis (info): emitted at startup if at least one entry was rehydrated, with the count. - failed_to_record_quarantine_in_redis / failed_to_restore_quarantine_from_redis: emitted on Redis errors (commit / startup respectively).

Condition Severity
supervisor.workers.quarantined > 0 warn (a consumer is parked — investigate)
Rate of supervisor.worker.quarantined ≥ 1 per 5 min page (active crash-loop)

Processed event cleanup

Old processed_event rows are cleaned up automatically by each worker. Configurable via:

Parameter Default Description
cleanup_retention_seconds 7 days How long to keep dedup records
cleanup_interval_seconds 5 min How often to run cleanup
cleanup_batch_size 1000 Max rows deleted per batch

Consumer naming

The full consumer name stored in the DB is {app_name}.{name}:

fr_api.handle_user_events

app_name comes from the APP env var at runtime. name is the explicit string passed to @events_pipeline_consumer.

Running locally

Prerequisites

  • PostgreSQL running locally (default: localhost:5432/alan_backend)
  • Redis running locally (default: localhost:6379/0)
  • Database migrations applied
# Run migrations (if not already done)
direnv exec backend env APP=fr_api flask db upgrade

1. Start the publisher

The consumer reads from Redis Streams, so the publisher must be running to move events from the outbox into Redis:

cd events_pipeline/events_publisher
uv run python -m events_publisher start --log-level DEBUG

2. Start the consumer

The consumer runs as a Flask command, tied to a specific app (country):

# For FR
direnv exec backend env APP=fr_api flask events_pipeline consumer

# For BE
direnv exec backend env APP=be_api flask events_pipeline consumer

The APP env var determines which app context is loaded and which consumers are discovered.

Options

Option Default Description
--poll-interval 5 Seconds between stream discovery polls
--batch-size 100 Max events per XREAD
--heartbeat-dir /tmp/events_consumer/heartbeats Directory for heartbeat files
--heartbeat-timeout 30 Kill worker if no heartbeat for N seconds
--graceful-shutdown-timeout 30 Wait N seconds before SIGKILL
--lease-duration 30 Lease duration in seconds
--lease-renewal 25 Renew lease after N seconds
--cleanup-retention 7 days Keep processed_event records for N seconds
--cleanup-interval 5 min Seconds between cleanup runs
--cleanup-batch-size 1000 Max rows deleted per cleanup batch

3. Send test events

Use the demo commands to produce events for e2e verification:

# Single event
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event

# Customize stream and payload
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event \
  --stream stream:user:events \
  --event-type UserCreated \
  --payload '{"user_id": "123"}'

# Continuous events (random batches, Ctrl+C to stop)
direnv exec backend env APP=fr_api flask events_pipeline_demo send-events-continuous

4. Monitor

# Watch all Redis commands
redis-cli MONITOR

# Read a specific stream
redis-cli XREAD STREAMS stream:demo:events 0

# Check consumer leases
direnv exec backend env APP=fr_api flask events_pipeline consumer-status

Full local e2e flow

Producer (send-event) → outbox_event table → Publisher → Redis Stream → Consumer → handler fn
  1. send-event inserts into events.outbox_event + fires NOTIFY
  2. Publisher picks it up, XADDs to Redis Stream
  3. Consumer XREADs, deduplicates, calls handler, commits checkpoint