Event Consumer¶
The consumer subsystem provides exactly-once event processing from Redis Streams with automatic deduplication, checkpointing, and lease-based coordination.
Pipeline overview¶
Where the consumer fits in the overall event pipeline:
flowchart LR
subgraph Producer
A[Business Logic] -->|same tx| B[(outbox_event)]
end
subgraph Publisher
C[Publisher Worker] -->|poll| B
C -->|XADD| S
end
S[Redis Streams]
subgraph Archiver
H[Archiver Worker] -->|XREAD| S
H -->|archive| J[S3]
end
subgraph Consumer
F[Consumer Worker] -->|XREAD| S
F -->|call| G[Handler fn]
end
style Consumer fill:#e1f5fe,stroke:#0288d1
Consumer architecture¶
flowchart LR
subgraph Redis
S[Redis Streams]
end
subgraph Supervisor
SUP[ConsumerSupervisor] -->|spawns| W1[Worker 1]
SUP -->|spawns| W2[Worker N]
end
subgraph "Worker (per handler)"
W1 -->|XREAD| S
W1 -->|acquire lease| DB[(stream_lease)]
W1 -->|dedup check| PE[(processed_event)]
W1 -->|call| H[Handler fn]
end
The supervisor autodiscovers registered handlers and spawns one worker process per handler. Each worker acquires a lease on its stream, reads events via XREAD, deduplicates, calls the handler, and updates the checkpoint — all within a single transaction.
Registering a consumer¶
Place your handler in components/{component}/internal/event_consumers.py — this is the only location autodiscover_consumers() scans.
from sqlalchemy.orm import Session
from components.events_pipeline.internal.consumer.event import StreamEvent
from components.events_pipeline.public.consumer import stream_consumer
@stream_consumer("stream:user:events", name="handle_user_event")
def handle_user_event(event: StreamEvent, session: Session) -> None:
...
Each consumer is registered for exactly one stream. Events are processed one at a time — each handler call gets a single StreamEvent and a Session.
The name is the stable identity of the consumer, stored in the DB for dedup (processed_event.consumer_name) and checkpointing (stream_lease). It must never change across renames or refactors — changing it would cause events to be reprocessed from the beginning.
StreamEvent¶
Each event passed to the handler is a frozen dataclass:
| Field | Type | Description |
|---|---|---|
stream_name |
str |
Redis stream the event came from |
redis_id |
str |
Redis message ID |
event_type |
str |
Logical event name (e.g. UserCreated) |
outbox_id |
int |
Outbox table PK (for traceability) |
event_uuid |
str |
Globally unique event ID (used for deduplication) |
payload |
dict[str, Any] |
Business data |
metadata |
dict[str, Any] \| None |
Trace IDs, schema version, etc. |
Session and transaction semantics¶
The handler's session shares a transaction with the dedup record insert and checkpoint update. This is what guarantees exactly-once delivery: if the handler succeeds but the process crashes before commit, the entire transaction (handler side-effects + dedup + checkpoint) rolls back and the event is reprocessed.
Calling session.commit() inside a handler raises CommitInTransactionError — the worker owns the transaction boundary. This also applies to current_session.commit(), since current_session resolves to the same session (the REQUIRES_NEW transaction pushes it onto the TransactionContext stack). The commit guard is enforced at the SQLAlchemy level via a before_commit listener, so there is no way to bypass it.
sequenceDiagram
participant W as Worker
participant DB as PostgreSQL
participant H as Handler
W->>DB: BEGIN (REQUIRES_NEW)
W->>DB: Check processed_event (dedup)
W->>DB: INSERT processed_event
W->>H: handler(event, session)
H->>DB: Business writes (same tx)
W->>DB: UPDATE checkpoint
W->>DB: COMMIT
Error handling¶
If a handler raises an exception:
- The entire transaction rolls back — dedup record, handler DB writes, and checkpoint update are all discarded
- The worker logs the error and retries after ~5 seconds
- Since the checkpoint was not updated,
XREADre-delivers the same event
The event is retried indefinitely until the handler succeeds. There is currently no max-retry limit or dead-letter queue — a poison message (one that always throws) will cause an infinite retry loop. Handlers should catch and handle expected errors internally if they want to skip an event.
Exactly-once delivery¶
Exactly-once is achieved by three mechanisms working together:
- Lease coordination — only one worker per (stream, role) pair via
stream_leasetable - Deduplication —
processed_eventtable tracks consumedevent_uuidper consumer (globally unique, safe across multiple databases) - Atomic checkpoint — dedup insert + handler work + checkpoint update in one transaction
If a worker crashes, its lease expires and another instance can take over from the last committed checkpoint.
Processed event cleanup¶
Old processed_event rows are cleaned up automatically by each worker. Configurable via:
| Parameter | Default | Description |
|---|---|---|
cleanup_retention_seconds |
7 days | How long to keep dedup records |
cleanup_interval_seconds |
5 min | How often to run cleanup |
cleanup_batch_size |
1000 | Max rows deleted per batch |
Consumer naming¶
The full consumer name stored in the DB is {app_name}.{name}:
app_name comes from the APP env var at runtime. name is the explicit string passed to @stream_consumer.
Running locally¶
Prerequisites¶
- PostgreSQL running locally (default:
localhost:5432/alan_backend) - Redis running locally (default:
localhost:6379/0) - Database migrations applied
1. Start the publisher¶
The consumer reads from Redis Streams, so the publisher must be running to move events from the outbox into Redis:
2. Start the consumer¶
The consumer runs as a Flask command, tied to a specific app (country):
# For FR
direnv exec backend env APP=fr_api flask events_pipeline consumer
# For BE
direnv exec backend env APP=be_api flask events_pipeline consumer
The APP env var determines which app context is loaded and which consumers are discovered.
Options¶
| Option | Default | Description |
|---|---|---|
--poll-interval |
5 | Seconds between stream discovery polls |
--batch-size |
100 | Max events per XREAD |
--heartbeat-dir |
/tmp/events_consumer/heartbeats |
Directory for heartbeat files |
--heartbeat-timeout |
30 | Kill worker if no heartbeat for N seconds |
--graceful-shutdown-timeout |
30 | Wait N seconds before SIGKILL |
--lease-duration |
30 | Lease duration in seconds |
--lease-renewal |
25 | Renew lease after N seconds |
--cleanup-retention |
7 days | Keep processed_event records for N seconds |
--cleanup-interval |
5 min | Seconds between cleanup runs |
--cleanup-batch-size |
1000 | Max rows deleted per cleanup batch |
3. Send test events¶
Use the demo commands to produce events for e2e verification:
# Single event
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event
# Customize stream and payload
direnv exec backend env APP=fr_api flask events_pipeline_demo send-event \
--stream stream:user:events \
--event-type UserCreated \
--payload '{"user_id": "123"}'
# Continuous events (random batches, Ctrl+C to stop)
direnv exec backend env APP=fr_api flask events_pipeline_demo send-events-continuous
4. Monitor¶
# Watch all Redis commands
redis-cli MONITOR
# Read a specific stream
redis-cli XREAD STREAMS stream:demo:events 0
# Check consumer leases
direnv exec backend env APP=fr_api flask events_pipeline consumer-status
Full local e2e flow¶
send-eventinserts intoevents.outbox_event+ fires NOTIFY- Publisher picks it up, XADDs to Redis Stream
- Consumer XREADs, deduplicates, calls handler, commits checkpoint