Events Pipeline Component¶
PostgreSQL outbox + Redis Streams event pipeline for durable, ordered event delivery.
Overview¶
The events pipeline provides infrastructure for publishing events from one service to another with at-least-once delivery guarantees and per-stream ordering.
flowchart LR
subgraph Producer
A[Business Logic] -->|same tx| B[(outbox_event)]
end
subgraph Publisher
C[Worker] -->|acquire| D[(stream_lease)]
C -->|poll| B
C -->|publish| E[Redis Streams]
end
subgraph Archiver
E --> H[Archiver Worker]
H -->|acquire| D
H -->|archive| J[S3]
end
subgraph Consumer
E --> F[Consumer Service]
end
G[(stream_policy)] -.->|governs| E
Data Model¶
erDiagram
outbox_event {
bigint id PK
text stream_name
text event_type
jsonb payload
jsonb metadata
timestamp created_at
timestamp published_at
}
stream_lease {
text stream_name PK
text role PK
text owner_id
timestamp lease_until
text checkpoint
timestamp updated_at
}
stream_policy {
text stream_name PK
text security_level
text encryption_alg
text kms_key_id
int dek_rotation_days
int active_key_version
timestamp created_at
timestamp updated_at
}
stream_lease ||--o{ outbox_event : "publishes/archives"
stream_policy ||--o{ outbox_event : "governs"
Database Tables¶
All tables live in the events PostgreSQL schema.
events.outbox_event¶
Durable queue of events to be published. Events are written here in the same transaction as domain data changes, then dispatched to Redis Streams.
| Column | Purpose |
|---|---|
id |
BIGSERIAL PK — guarantees ordering |
stream_name |
Target Redis stream |
event_type |
Logical event name (e.g. "UserCreated") |
payload |
Business data (JSONB) |
metadata |
Trace IDs, schema version, secondary indexes |
created_at |
When the event was created |
published_at |
When the event was published to Redis (NULL = unpublished) |
Indexes: idx_outbox_stream_id on (stream_name, id), idx_outbox_unpublished_stream_id on (stream_name, id) WHERE published_at IS NULL for efficient unpublished event queries.
NOTIFY trigger: On each insert, events.notify_outbox_insert() sends a PostgreSQL notification on the events_outbox_insert channel with the stream name and event ID, enabling real-time publisher wakeup without polling.
Per-Event Published Tracking¶
Events with published_at IS NULL are unpublished. After successful publish, mark_events_published() sets published_at = now(). This avoids the BIGSERIAL commit-order problem where late-committing transactions could be skipped by a watermark.
events.stream_lease¶
Unified worker coordination table. Ensures exactly one worker per (stream, role) pair. Used by both publisher and archiver.
| Column | Purpose |
|---|---|
stream_name |
Stream being owned (composite PK) |
role |
Worker role: publisher or archiver (composite PK) |
owner_id |
Worker holding the lease |
lease_until |
TTL for automatic recovery |
checkpoint |
Role-specific progress (e.g. archiver XREAD position) |
updated_at |
Last lease renewal time |
Index: idx_stream_lease_until on (lease_until) for expired lease discovery.
Owner ID Format¶
The owner_id field identifies the process holding the lease. Format: {role}-{stream_name}-{hostname}-{pid}-{uuid8}.
| Role | Format | Example |
|---|---|---|
| Publisher worker | publisher-{stream}-{host}-{pid}-{uuid8} |
publisher-stream:user:events-web01-12345-a1b2c3d4 |
| Publisher supervisor | supervisor-{host}-{pid}-{uuid8} |
supervisor-web01-12340-f9e8d7c6 |
| Archiver worker | archiver-{stream}-{host}-{pid}-{uuid8} |
archiver-stream:user:events-web01-12346-b5a4c3d2 |
| Archiver supervisor | supervisor-{host}-{pid}-{uuid8} |
supervisor-web01-12340-e1f2a3b4 |
Components: hostname (short, before first .), PID (OS process ID), 8-char random hex (uniqueness across restarts). The owner_id is treated as an opaque string — never parsed, only compared for equality.
Lease Coordination¶
sequenceDiagram
participant W1 as Worker 1
participant W2 as Worker 2
participant DB as PostgreSQL
participant Redis
W1->>DB: Acquire lease (stream:user)
DB-->>W1: Granted
W2->>DB: Acquire lease (stream:user)
DB-->>W2: Denied (already held)
W1->>DB: Claim events WHERE published_at IS NULL
W1->>Redis: XADD events
W1->>DB: SET published_at = now()
events.stream_policy¶
Per-stream security configuration for access control and encryption.
| Column | Purpose |
|---|---|
stream_name |
Stream being configured |
security_level |
open / restricted / confidential |
encryption_alg |
Encryption algorithm (confidential streams only) |
kms_key_id |
KMS key for envelope encryption |
dek_rotation_days |
DEK rotation interval |
active_key_version |
Current key version |