Event Consumer¶
New here? Start with the Tutorial.
The consumer subsystem provides exactly-once event processing from Redis Streams with automatic deduplication, checkpointing, and lease-based coordination.
Pipeline overview¶
Where the consumer fits in the overall event pipeline:
flowchart LR
subgraph Producer
A[Business Logic] -->|same tx| B[(outbox_event)]
end
subgraph Publisher
C[Publisher Worker] -->|poll| B
C -->|XADD| S
end
S[Redis Streams]
subgraph Archiver
H[Archiver Worker] -->|XREAD| S
H -->|archive| J[S3]
end
subgraph Consumer
F[Consumer Worker] -->|XREAD| S
F -->|call| G[Handler fn]
end
style Consumer fill:#e1f5fe,stroke:#0288d1
Consumer architecture¶
flowchart LR
subgraph Redis
S[Redis Streams]
end
subgraph Supervisor
SUP[ConsumerSupervisor] -->|spawns| W1[Worker 1]
SUP -->|spawns| W2[Worker N]
end
subgraph "Worker (per handler)"
W1 -->|XREAD| S
W1 -->|acquire lease| DB[(stream_lease)]
W1 -->|dedup check| PE[(processed_event)]
W1 -->|call| H[Handler fn]
end
The supervisor autodiscovers registered handlers and spawns one worker process per handler. Each worker acquires a lease on its stream, reads events via XREAD, deduplicates, calls the handler, and updates the checkpoint — all within a single transaction.
Registering a consumer¶
Place your handler in components/{component}/internal/events_pipeline_consumers.py — this is the only location autodiscover_consumers() scans.
from sqlalchemy.orm import Session
from components.events_pipeline.internal.consumer.event import StreamEvent
from components.events_pipeline.public.consumer import events_pipeline_consumer
@events_pipeline_consumer("stream:user:events", name="handle_user_event")
def handle_user_event(event: StreamEvent, session: Session) -> None:
...
Each consumer is registered for exactly one stream. Events are processed one at a time — each handler call gets a single StreamEvent and a Session.
The name is the stable identity of the consumer, stored in the DB for dedup (processed_event.consumer_name) and checkpointing (stream_lease). It must never change across renames or refactors — changing it would cause events to be reprocessed from the beginning.
StreamEvent¶
Each event passed to the handler is a frozen dataclass:
| Field | Type | Description |
|---|---|---|
stream_name |
str |
Redis stream the event came from |
redis_id |
str |
Redis message ID |
event_type |
str |
Logical event name (e.g. UserCreated) |
outbox_id |
int |
Outbox table PK (for traceability) |
event_uuid |
str |
Globally unique event ID (used for deduplication) |
payload |
dict[str, Any] |
Business data |
metadata |
dict[str, Any] \| None |
Trace IDs, schema version, etc. |
Session and transaction semantics¶
This section describes the default Guarantee.EXACTLY_ONCE mode. For
the other two modes (AT_LEAST_ONCE, AT_MOST_ONCE), see the Delivery
guarantees page — the handler signature drops the
session arg, the worker does not wrap the handler in a transaction, and
the dedup row is written outside the handler call.
The handler's session shares a transaction with the dedup record insert and checkpoint update. This is what guarantees exactly-once delivery: if the handler succeeds but the process crashes before commit, the entire transaction (handler side-effects + dedup + checkpoint) rolls back and the event is reprocessed.
Calling session.commit() inside a handler raises CommitInTransactionError — the worker owns the transaction boundary. This also applies to current_session.commit(), since current_session resolves to the same session (the REQUIRES_NEW transaction pushes it onto the TransactionContext stack). The commit guard is enforced at the SQLAlchemy level via a before_commit listener, so there is no way to bypass it.
sequenceDiagram
participant W as Worker
participant DB as PostgreSQL
participant H as Handler
W->>DB: BEGIN (REQUIRES_NEW)
W->>DB: Check processed_event (dedup)
W->>DB: INSERT processed_event
W->>H: handler(event, session)
H->>DB: Business writes (same tx)
W->>DB: UPDATE checkpoint
W->>DB: COMMIT
Error handling¶
If a handler raises an exception:
- The entire transaction rolls back — dedup record, handler DB writes, and checkpoint update are all discarded
- The worker logs the error and retries after ~5 seconds
- Since the checkpoint was not updated,
XREADre-delivers the same event
The event is retried indefinitely until the handler succeeds. There is currently no max-retry limit or dead-letter queue — a poison message (one that always throws) will cause an infinite retry loop. Handlers should catch and handle expected errors internally if they want to skip an event.
Exactly-once delivery¶
Exactly-once is achieved by three mechanisms working together:
- Lease coordination — only one worker per (stream, role) pair via
stream_leasetable - Deduplication —
processed_eventtable tracks consumedevent_uuidper consumer (globally unique, safe across multiple databases) - Atomic checkpoint — dedup insert + handler work + checkpoint update in one transaction
If a worker crashes, its lease expires and another instance can take over from the last committed checkpoint.
This is the default mode (Guarantee.EXACTLY_ONCE). When a handler
genuinely cannot fit inside a single worker-owned transaction (helpers
that already commit, external API calls between writes, etc.), opt out
with guarantee=Guarantee.AT_LEAST_ONCE or Guarantee.AT_MOST_ONCE. See
Delivery guarantees for the full mapping table and
trade-offs on both sides of the pipeline.
Telemetry¶
events_pipeline.consumer.lag_ms¶
Gauge emitted by every consumer worker on each loop iteration of _consume_loop
(see internal/consumer/lag.py). Value is the age, in milliseconds, of the
next unconsumed event in the stream past the worker's current checkpoint,
measured from the event's creation time (when send_event ran on the
producer side, sampled with Redis time and stamped into metadata under
__creation_timestamp_ms__). 0 means the consumer is caught up.
Measuring from creation time rather than from the stream message id means publisher latency is visible in the lag — if events sit in the outbox for a while before being XADDed to Redis, that gap shows up here too.
For events emitted before the producer started stamping
__creation_timestamp_ms__ (in-flight upgrade window), the metric falls back
to the Redis stream message id, which understates lag by the publisher delay.
Tags:
| Tag | Value |
|---|---|
stream_name |
Redis stream name |
consumer_name |
Full consumer name ({app}.{registration_name}) |
How it works: one XRANGE stream "(checkpoint" + COUNT 1 per loop iteration
(microseconds even on big streams), plus one XRANGE immediately after each
drained batch so the gauge snaps back to 0 without waiting for the next loop.
Each XRANGE returns the message fields, so the metadata is parsed for the
creation timestamp without extra round-trips.
events_pipeline.consumer.lag_events_count¶
Companion gauge to lag_ms: number of unconsumed events past the consumer's
checkpoint. Computed by the consumer supervisor (one process per backend
service) every 10 s in SupervisorMetricsReporter._report_lag
(internal/consumer/metrics/reporter.py), not in the per-worker loop —
unbounded XRANGE runs once per stream per supervisor cycle, regardless of
how many consumer workers exist.
The value is exact (no cap). When consumer_lag >= 100_000 the
supervisor emits a WARN log so on-call can spot supervisor strain (a single
XRANGE call streams ~10MB+ at that scale).
Use alongside lag_ms:
- lag_ms high + lag_events_count low → a few sticky events (poison
message?) blocking the consumer.
- lag_ms high + lag_events_count very high → consumer is throughput-bound,
not latency-bound on a single event.
Tags (same as lag_ms):
| Tag | Value |
|---|---|
stream_name |
Redis stream name |
consumer_name |
Full consumer name ({app}.{registration_name}) |
Recommended Datadog alerts¶
| Condition | Severity |
|---|---|
events_pipeline.consumer.lag_ms > 60_000 for 5m |
warn |
events_pipeline.consumer.lag_ms > 300_000 for 5m |
page |
events_pipeline.consumer.lag_events_count > 10_000 for 5m |
warn (real backlog) |
no_data on events_pipeline.consumer.lag_ms for 5m |
page (worker dead / not emitting) |
Start at warn-only to baseline per-stream normal range before promoting to
page. The producer-side counterpart is monitor 105658839 ("Events Pipeline
Producer Backlog").
events_pipeline.consumer.unknown_event_type¶
Counter incremented every time a consumer worker encounters an event_type it
has no schema for. The worker blocks (re-reading the same event) until a
schema is deployed — a steady non-zero rate on this metric means a stuck
worker, not silent data loss.
Crash-loop protection¶
ConsumerSupervisor forks one worker per consumer. A worker that dies fast on startup (broken import, DB unreachable, OOM during _setup) gets respawned by the supervisor on the next loop iteration. Without a brake, that becomes a tight fork loop on a ~2 GB Flask parent — copy-on-write of the whole heap on every fork — and the host OOMs in minutes.
BaseSupervisor (inherited by all three supervisors: publisher, archiver, consumer) ships with a crash-loop guard.
Mechanism¶
- Rolling-window crash counter: when a child dies, the supervisor records the timestamp. Default: ≥ 5 crashes within 60 s ⇒ the consumer is quarantined. Tunable via the
crash_window_seconds/max_crashes_in_windowconstructor args. - Sticky quarantine, no auto-recovery: quarantined consumers are skipped during the spawn pass for the lifetime of the supervisor process. There is no cooldown — auto-recovery would re-enter the fork loop on an unresolved bug. Release is an explicit operator action (see below).
- Healthy-run reset: a worker that lived ≥
healthy_run_seconds(default 60 s) clears its crash history. A crash after a stable run is treated as a fresh incident, not part of the startup loop. - Clock-drift safety: uptime is measured from
time.monotonic()(viaProcessInfo.started_monotonic), not wall clock.
Persistence across restarts (consumer-only)¶
ConsumerSupervisor overrides the _on_quarantine hook to persist quarantine events to a Redis hash, and reads the hash back at startup to seed the in-memory state. This makes quarantine survive supervisor restarts — a restart alone is not enough to release a quarantined consumer; the operator must clear the Redis entry.
Hash: events_consumer:quarantined_consumers
- Field (= consumer name, namespaced by app): {app_name}.{registration_name} — e.g. fr_api.handle_user_event. Cross-app collisions are impossible by construction (same hash is shared across all apps; each supervisor filters by its own {app_name}. prefix on startup).
- Value (JSON): {"quarantined_at": "<ISO 8601 Z>", "reason": "crash_loop", "stream_name": "<stream>"}.
# Inspect quarantined consumers
redis-cli HGETALL events_consumer:quarantined_consumers
# Release one consumer (it will be respawned on the next supervisor poll)
redis-cli HDEL events_consumer:quarantined_consumers fr_api.handle_user_event
Note: HDEL is necessary but not sufficient — the running supervisor still holds the consumer in its in-memory _quarantined set. To actually release a consumer mid-run you must either (a) HDEL then restart the supervisor (it will not rehydrate that entry), or (b) restart the supervisor and HDEL — order is irrelevant. A future operator CLI may add a SIGNAL-driven in-process release.
Errors talking to Redis on the _on_quarantine write path and the startup-rehydration read path are swallowed and logged. A Redis hiccup never crashes the supervisor — the in-memory set remains the authority for the running process; only the persistence-across-restart property degrades.
Telemetry¶
| Metric | Type | Tags | Meaning |
|---|---|---|---|
supervisor.worker.quarantined |
counter | stream_name:<consumer_name> |
Fired once when a stream tips into quarantine. |
supervisor.workers.quarantined |
gauge | none | Number of streams currently quarantined. |
Log signals:
- supervisor_worker_quarantined (error): emitted at the moment of quarantine, with crash_count + window_seconds.
- restored_quarantine_from_redis (info): emitted at startup if at least one entry was rehydrated, with the count.
- failed_to_record_quarantine_in_redis / failed_to_restore_quarantine_from_redis: emitted on Redis errors (commit / startup respectively).
Recommended Datadog alerts¶
| Condition | Severity |
|---|---|
supervisor.workers.quarantined > 0 |
warn (a consumer is parked — investigate) |
Rate of supervisor.worker.quarantined ≥ 1 per 5 min |
page (active crash-loop) |
Processed event cleanup¶
Old processed_event rows are cleaned up automatically by each worker. Configurable via:
| Parameter | Default | Description |
|---|---|---|
cleanup_retention_seconds |
7 days | How long to keep dedup records |
cleanup_interval_seconds |
5 min | How often to run cleanup |
cleanup_batch_size |
1000 | Max rows deleted per batch |
Consumer naming¶
The full consumer name stored in the DB is {app_name}.{name}:
app_name comes from the APP env var at runtime. name is the explicit string passed to @events_pipeline_consumer.
Running locally¶
Prerequisites¶
- PostgreSQL running locally (default:
localhost:5432/alan_backend) - Redis running locally (default:
localhost:6379/0) - Database migrations applied
1. Start the publisher¶
The consumer reads from Redis Streams, so the publisher must be running to move events from the outbox into Redis:
2. Start the consumer¶
The consumer runs as a Flask command, tied to a specific app (country):
# For FR
direnv exec backend env APP=fr_api flask events_pipeline consumer
# For BE
direnv exec backend env APP=be_api flask events_pipeline consumer
The APP env var determines which app context is loaded and which consumers are discovered.
Options¶
| Option | Default | Description |
|---|---|---|
--poll-interval |
5 | Seconds between stream discovery polls |
--batch-size |
100 | Max events per XREAD |
--heartbeat-dir |
/tmp/events_consumer/heartbeats |
Directory for heartbeat files |
--heartbeat-timeout |
30 | Kill worker if no heartbeat for N seconds |
--graceful-shutdown-timeout |
30 | Wait N seconds before SIGKILL |
--lease-duration |
30 | Lease duration in seconds |
--lease-renewal |
25 | Renew lease after N seconds |
--cleanup-retention |
7 days | Keep processed_event records for N seconds |
--cleanup-interval |
5 min | Seconds between cleanup runs |
--cleanup-batch-size |
1000 | Max rows deleted per cleanup batch |
3. Send test events¶
Use the demo commands to produce events for e2e verification:
# Single event
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event
# Customize stream and payload
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event \
--stream stream:user:events \
--event-type UserCreated \
--payload '{"user_id": "123"}'
# Continuous events (random batches, Ctrl+C to stop)
direnv exec backend env APP=fr_api flask events_pipeline_demo send-events-continuous
4. Monitor¶
# Watch all Redis commands
redis-cli MONITOR
# Read a specific stream
redis-cli XREAD STREAMS stream:demo:events 0
# Check consumer leases
direnv exec backend env APP=fr_api flask events_pipeline consumer-status
Full local e2e flow¶
send-eventinserts intoevents.outbox_event+ fires NOTIFY- Publisher picks it up, XADDs to Redis Stream
- Consumer XREADs, deduplicates, calls handler, commits checkpoint