Skip to content

Events Pipeline Component

PostgreSQL outbox + Redis Streams event pipeline for durable, ordered event delivery.

Overview

The events pipeline provides infrastructure for publishing events from one service to another with at-least-once delivery guarantees and per-stream ordering.

flowchart LR
    subgraph Producer
        A[Business Logic] -->|same tx| B[(outbox_event)]
    end

    subgraph Publisher
        C[Worker] -->|acquire| D[(stream_lease)]
        C -->|poll| B
        C -->|publish| E[Redis Streams]
    end

    subgraph Archiver
        E --> H[Archiver Worker]
        H -->|acquire| D
        H -->|archive| J[S3]
    end

    subgraph Consumer
        E --> F[Consumer Service]
    end

    G[(stream_policy)] -.->|governs| E
Hold "Alt" / "Option" to enable pan & zoom

Data Model

erDiagram
    outbox_event {
        bigint id PK
        text stream_name
        text event_type
        jsonb payload
        jsonb metadata
        timestamp created_at
        timestamp published_at
    }

    stream_lease {
        text stream_name PK
        text role PK
        text owner_id
        timestamp lease_until
        text checkpoint
        timestamp updated_at
    }

    stream_policy {
        text stream_name PK
        text security_level
        text encryption_alg
        text kms_key_id
        int dek_rotation_days
        int active_key_version
        timestamp created_at
        timestamp updated_at
    }

    stream_lease ||--o{ outbox_event : "publishes/archives"
    stream_policy ||--o{ outbox_event : "governs"
Hold "Alt" / "Option" to enable pan & zoom

Database Tables

All tables live in the events PostgreSQL schema.

events.outbox_event

Durable queue of events to be published. Events are written here in the same transaction as domain data changes, then dispatched to Redis Streams.

Column Purpose
id BIGSERIAL PK — guarantees ordering
stream_name Target Redis stream
event_type Logical event name (e.g. "UserCreated")
payload Business data (JSONB)
metadata Trace IDs, schema version, secondary indexes
created_at When the event was created
published_at When the event was published to Redis (NULL = unpublished)

Indexes: idx_outbox_stream_id on (stream_name, id), idx_outbox_unpublished_stream_id on (stream_name, id) WHERE published_at IS NULL for efficient unpublished event queries.

NOTIFY trigger: On each insert, events.notify_outbox_insert() sends a PostgreSQL notification on the events_outbox_insert channel with the stream name and event ID, enabling real-time publisher wakeup without polling.

Per-Event Published Tracking

Events with published_at IS NULL are unpublished. After successful publish, mark_events_published() sets published_at = now(). This avoids the BIGSERIAL commit-order problem where late-committing transactions could be skipped by a watermark.

events.stream_lease

Unified worker coordination table. Ensures exactly one worker per (stream, role) pair. Used by both publisher and archiver.

Column Purpose
stream_name Stream being owned (composite PK)
role Worker role: publisher or archiver (composite PK)
owner_id Worker holding the lease
lease_until TTL for automatic recovery
checkpoint Role-specific progress (e.g. archiver XREAD position)
updated_at Last lease renewal time

Index: idx_stream_lease_until on (lease_until) for expired lease discovery.

Owner ID Format

The owner_id field identifies the process holding the lease. Format: {role}-{stream_name}-{hostname}-{pid}-{uuid8}.

Role Format Example
Publisher worker publisher-{stream}-{host}-{pid}-{uuid8} publisher-stream:user:events-web01-12345-a1b2c3d4
Publisher supervisor supervisor-{host}-{pid}-{uuid8} supervisor-web01-12340-f9e8d7c6
Archiver worker archiver-{stream}-{host}-{pid}-{uuid8} archiver-stream:user:events-web01-12346-b5a4c3d2
Archiver supervisor supervisor-{host}-{pid}-{uuid8} supervisor-web01-12340-e1f2a3b4

Components: hostname (short, before first .), PID (OS process ID), 8-char random hex (uniqueness across restarts). The owner_id is treated as an opaque string — never parsed, only compared for equality.

Lease Coordination

sequenceDiagram
    participant W1 as Worker 1
    participant W2 as Worker 2
    participant DB as PostgreSQL
    participant Redis

    W1->>DB: Acquire lease (stream:user)
    DB-->>W1: Granted
    W2->>DB: Acquire lease (stream:user)
    DB-->>W2: Denied (already held)
    W1->>DB: Claim events WHERE published_at IS NULL
    W1->>Redis: XADD events
    W1->>DB: SET published_at = now()
Hold "Alt" / "Option" to enable pan & zoom

events.stream_policy

Per-stream security configuration for access control and encryption.

Column Purpose
stream_name Stream being configured
security_level open / restricted / confidential
encryption_alg Encryption algorithm (confidential streams only)
kms_key_id KMS key for envelope encryption
dek_rotation_days DEK rotation interval
active_key_version Current key version