Skip to content

Event Schemas

Events are typed Pydantic classes (EventSchema subclasses) shared between producers and consumers. Classes live in stdlib so any component can import them. Stream names are a separate enum, decoupled from schemas — the same schema class can flow on any stream a producer chooses.

Where things live

stdlib/python/src/events_pipeline_common/
├── schemas/
│   ├── <domain>.py             # user schemas (one file per domain)
│   └── utils/                  # framework plumbing
│       ├── base.py             # EventSchema base class
│       ├── registry.py         # @events_pipeline_schema decorator
│       ├── check_compat.py     # CI backward-compat check
│       └── not_set.py          # NOT_SET sentinel
└── stream_names.py             # StreamName(StrEnum) — stream identifiers

Defining a schema

from events_pipeline_common.schemas.utils.base import EventSchema
from events_pipeline_common.schemas.utils.registry import events_pipeline_schema


@events_pipeline_schema
class MyEvent(EventSchema):
    entity_id: str
    status: str
  • EventSchema has extra="allow" (unknown fields are accepted — additive changes are non-breaking) and frozen=True (instances are immutable).
  • @events_pipeline_schema takes no arguments. Class names are globally unique across the whole events_pipeline_common package; duplicate names raise ValueError at import time.
  • Must live in events_pipeline_common/schemas/*.py (enforced at runtime by the decorator).

Stream names

All stream identifiers are members of the StreamName enum:

# stdlib/python/src/events_pipeline_common/stream_names.py
class StreamName(StrEnum):
    DEMO_EVENTS = "stream:demo:events"
    EMPLOYMENT_CHANGES = "stream:employment:employment_changes"
    # ...

Producers and consumers reference streams via enum members so typos are caught by mypy / the IDE:

from events_pipeline_common.stream_names import StreamName
from events_pipeline_common.schemas.employment_changes import EmploymentChanged

send_event(stream_name=StreamName.EMPLOYMENT_CHANGES, event=EmploymentChanged(...))

Adding a new stream = adding a member to the enum. Streams are not registered anywhere else; they are plain Redis keys.

Backward-compatibility rules

CI runs check_compat.py against origin/main on every PR that touches events_pipeline_common/schemas/. It rejects:

  1. Removed fields — a field present on main that no longer exists.
  2. New required fields — an added field without a default value.
  3. New optional fields that don't use the NOT_SET sentinel (see below).
  4. Type changes on existing fields — except top-level union widening (strstr | int is fine; str | intstr or strint are not).
  5. Existing optional fields becoming required — if a field on main had a default, the current version must still have one.
  6. Non-primitive types — annotations may only reference primitives (str, int, float, bool, bytes), stdlib value types (UUID, datetime, date, time, timedelta, Decimal), built-in generic containers (list, dict, tuple, set, frozenset), typing special forms (Optional, Union, Literal, Any), the NotSet sentinel, or classes defined in the same schemas module. Anything else — e.g. a domain-layer Address imported from backend/components/ — is flagged.

Renames trip rule #1 (the old name disappears). If you want to rename, keep the old field around with Field(deprecated="reason") until all consumers migrate. For rules #4 and #5, the recommended fix is to add a new event type on the same stream (see "Breaking shape changes" below) rather than mutate the existing schema.

Annotation equality is evaluated after canonicalisation — Optional[X], X | None, and Union[X, None] are all treated as the same thing; union members are sorted; forward-ref quotes ("MyClass") are stripped.

New optional fields: the NOT_SET convention

A new optional field must:

  • Include NotSet in its type annotation.
  • Default to NOT_SET.
from events_pipeline_common.schemas.utils.not_set import NOT_SET, NotSet


@events_pipeline_schema
class MyEvent(EventSchema):
    existing_field: str
    new_field: str | NotSet = NOT_SET   # <- new field must look like this

Why: consumers need to distinguish "the producer didn't set this field" (e.g. an older producer that predates the new field) from "the producer explicitly set this field to an empty-but-valid value" (e.g. empty string, zero, empty list). A bare default like "", 0, [], or even None collides with real values the producer might legitimately send. NOT_SET is a dedicated sentinel that never occurs as a real value.

Consumer-side check:

if event.new_field is NOT_SET:
    # Producer did not set this field.
    ...
else:
    # `event.new_field` holds the producer-supplied value.
    ...

NotSet is a single-member enum, so is comparison works and Pydantic handles serialization natively (wire value: the string "__NOT_SET__").

Deprecating a field

Don't remove. Mark it deprecated:

from pydantic import Field


@events_pipeline_schema
class MyEvent(EventSchema):
    old_field: str | None = Field(default=None, deprecated="Use new_field instead")
    new_field: str | NotSet = NOT_SET

Pydantic emits a DeprecationWarning on access. The field stays in the schema (removing it would fail the compat check) until every consumer has dropped its reads.

Breaking shape changes: add a new event type on the same stream

Some changes can't be made additive: changing a field's type, renaming, upgrading an optional field to required, adding an enum value that in-flight consumers can't match. For those:

  1. Add a new event type (new EventSchema class) on the same stream. The old type keeps its schema, marked Field(deprecated=...) where relevant.
  2. Update every consumer of the stream to handle both the old and new types (typically via match).
  3. Switch producers to emit the new type.
  4. Once the stream has drained (no more events of the old type exist or may be replayed), remove the old schema and the old consumer branches.

This preserves the ordered, single-stream history that consumers rely on — no new stream, no parallel consumer pipeline. The old events stay consumable by historical-replay tooling until you decide to drop the schema.

New enum values are breaking — multi-PR migration

Adding a new member to an existing StrEnum used on the wire is a breaking change from the consumer's perspective (a consumer running old code may not have a match case for the new value). Treat it as a two-step rollout:

  1. PR 1: update all consumers to handle the new enum value (e.g. add a match case, even if only to log). Deploy and verify rollout in every environment.
  2. PR 2: start emitting the new value from the producer.

At no point is a live consumer seeing a value it can't match.

Blocking on unknown event types

If a consumer is registered without an event_types=[...] filter, the worker looks up every incoming event_type in the global schema registry. Unknown types cause the worker to raise UnknownEventTypeError and block on that message — no silent data loss.

The error is logged once per (stream_name, event_type) per worker process (at WARNING level). Subsequent occurrences on the same poll cycle only bump the Datadog counter events_pipeline.consumer.unknown_event_type (tagged with stream_name, event_type, consumer). Consumer-lag metrics and that counter are the operational signals; logs don't spam.

To unblock: deploy a schema for the unknown event type. Once registered, the next poll picks it up and processing resumes.