Event Schemas¶
Events are typed Pydantic classes (EventSchema subclasses) shared between producers and consumers. Classes live in stdlib so any component can import them. Stream names are a separate enum, decoupled from schemas — the same schema class can flow on any stream a producer chooses.
Where things live¶
stdlib/python/src/events_pipeline_common/
├── schemas/
│ ├── <domain>.py # user schemas (one file per domain)
│ └── utils/ # framework plumbing
│ ├── base.py # EventSchema base class
│ ├── registry.py # @events_pipeline_schema decorator
│ ├── check_compat.py # CI backward-compat check
│ └── not_set.py # NOT_SET sentinel
└── stream_names.py # StreamName(StrEnum) — stream identifiers
Defining a schema¶
from events_pipeline_common.schemas.utils.base import EventSchema
from events_pipeline_common.schemas.utils.registry import events_pipeline_schema
@events_pipeline_schema
class MyEvent(EventSchema):
entity_id: str
status: str
EventSchemahasextra="allow"(unknown fields are accepted — additive changes are non-breaking) andfrozen=True(instances are immutable).@events_pipeline_schematakes no arguments. Class names are globally unique across the wholeevents_pipeline_commonpackage; duplicate names raiseValueErrorat import time.- Must live in
events_pipeline_common/schemas/*.py(enforced at runtime by the decorator).
Stream names¶
All stream identifiers are members of the StreamName enum:
# stdlib/python/src/events_pipeline_common/stream_names.py
class StreamName(StrEnum):
DEMO_EVENTS = "stream:demo:events"
EMPLOYMENT_CHANGES = "stream:employment:employment_changes"
# ...
Producers and consumers reference streams via enum members so typos are caught by mypy / the IDE:
from events_pipeline_common.stream_names import StreamName
from events_pipeline_common.schemas.employment_changes import EmploymentChanged
send_event(stream_name=StreamName.EMPLOYMENT_CHANGES, event=EmploymentChanged(...))
Adding a new stream = adding a member to the enum. Streams are not registered anywhere else; they are plain Redis keys.
Backward-compatibility rules¶
CI runs check_compat.py against origin/main on every PR that touches events_pipeline_common/schemas/. It rejects:
- Removed fields — a field present on
mainthat no longer exists. - New required fields — an added field without a default value.
- New optional fields that don't use the
NOT_SETsentinel (see below). - Type changes on existing fields — except top-level union widening (
str→str | intis fine;str | int→strorstr→intare not). - Existing optional fields becoming required — if a field on
mainhad a default, the current version must still have one. - Non-primitive types — annotations may only reference primitives (
str,int,float,bool,bytes), stdlib value types (UUID,datetime,date,time,timedelta,Decimal), built-in generic containers (list,dict,tuple,set,frozenset), typing special forms (Optional,Union,Literal,Any), theNotSetsentinel, or classes defined in the same schemas module. Anything else — e.g. a domain-layerAddressimported frombackend/components/— is flagged.
Renames trip rule #1 (the old name disappears). If you want to rename, keep the old field around with Field(deprecated="reason") until all consumers migrate. For rules #4 and #5, the recommended fix is to add a new event type on the same stream (see "Breaking shape changes" below) rather than mutate the existing schema.
Annotation equality is evaluated after canonicalisation — Optional[X], X | None, and Union[X, None] are all treated as the same thing; union members are sorted; forward-ref quotes ("MyClass") are stripped.
New optional fields: the NOT_SET convention¶
A new optional field must:
- Include
NotSetin its type annotation. - Default to
NOT_SET.
from events_pipeline_common.schemas.utils.not_set import NOT_SET, NotSet
@events_pipeline_schema
class MyEvent(EventSchema):
existing_field: str
new_field: str | NotSet = NOT_SET # <- new field must look like this
Why: consumers need to distinguish "the producer didn't set this field" (e.g. an older producer that predates the new field) from "the producer explicitly set this field to an empty-but-valid value" (e.g. empty string, zero, empty list). A bare default like "", 0, [], or even None collides with real values the producer might legitimately send. NOT_SET is a dedicated sentinel that never occurs as a real value.
Consumer-side check:
if event.new_field is NOT_SET:
# Producer did not set this field.
...
else:
# `event.new_field` holds the producer-supplied value.
...
NotSet is a single-member enum, so is comparison works and Pydantic handles serialization natively (wire value: the string "__NOT_SET__").
Deprecating a field¶
Don't remove. Mark it deprecated:
from pydantic import Field
@events_pipeline_schema
class MyEvent(EventSchema):
old_field: str | None = Field(default=None, deprecated="Use new_field instead")
new_field: str | NotSet = NOT_SET
Pydantic emits a DeprecationWarning on access. The field stays in the schema (removing it would fail the compat check) until every consumer has dropped its reads.
Breaking shape changes: add a new event type on the same stream¶
Some changes can't be made additive: changing a field's type, renaming, upgrading an optional field to required, adding an enum value that in-flight consumers can't match. For those:
- Add a new event type (new
EventSchemaclass) on the same stream. The old type keeps its schema, markedField(deprecated=...)where relevant. - Update every consumer of the stream to handle both the old and new types (typically via
match). - Switch producers to emit the new type.
- Once the stream has drained (no more events of the old type exist or may be replayed), remove the old schema and the old consumer branches.
This preserves the ordered, single-stream history that consumers rely on — no new stream, no parallel consumer pipeline. The old events stay consumable by historical-replay tooling until you decide to drop the schema.
New enum values are breaking — multi-PR migration¶
Adding a new member to an existing StrEnum used on the wire is a breaking change from the consumer's perspective (a consumer running old code may not have a match case for the new value). Treat it as a two-step rollout:
- PR 1: update all consumers to handle the new enum value (e.g. add a
matchcase, even if only to log). Deploy and verify rollout in every environment. - PR 2: start emitting the new value from the producer.
At no point is a live consumer seeing a value it can't match.
Blocking on unknown event types¶
If a consumer is registered without an event_types=[...] filter, the worker looks up every incoming event_type in the global schema registry. Unknown types cause the worker to raise UnknownEventTypeError and block on that message — no silent data loss.
The error is logged once per (stream_name, event_type) per worker process (at WARNING level). Subsequent occurrences on the same poll cycle only bump the Datadog counter events_pipeline.consumer.unknown_event_type (tagged with stream_name, event_type, consumer). Consumer-lag metrics and that counter are the operational signals; logs don't spam.
To unblock: deploy a schema for the unknown event type. Once registered, the next poll picks it up and processing resumes.