Skip to content

Event bus

shared.event_bus.delayed_event_bus

DelayedEventBus

DelayedEventBus(actual_event_bus)

Bases: EventBus[T]

A special EventBus that receives events, but does not execute them when 'apply' is called.

Instead, every time 'apply' is called on a DelayedEventBus, the underlying event bus is collected, and we add the would-be executed Callables to a separate event bus.

Once everything is ran, you can then use side_effects_bus.apply(commit=...) to actually run all of the collected side effects.

Usage:

event_bus = DelayedEventBus(MyEventBus())
process_one(event_bus)
process_two(event_bus)
# At this point the events have not actually been applied
# To apply them all at once:
event_bus.apply_without_delay(commit=...)

Make sure you use apply_without_delay(), not apply()!

Source code in shared/event_bus/delayed_event_bus.py
def __init__(self, actual_event_bus: AbstractEventBus[T]) -> None:
    self._immediate_event_bus = actual_event_bus
    self._collected_events_event_bus = _CollectedEventEventBus()

apply

apply(commit)
Source code in shared/event_bus/delayed_event_bus.py
@deprecated(  # (not actually deprecated, this is just to show a warning)
    "Calling 'apply' directly on a DelayedEventBus will not actually execute the side-effects. Did you mean to use .apply_without_delay(...) instead?",
    category=None,  # to disable printing a warning at run-time
)
def apply(self, commit: bool) -> list[T]:
    events: list[T] = []
    for collected_event, collected_handler in self._immediate_event_bus.collect(
        commit
    ):
        events.append(collected_event)
        self._collected_events_event_bus.publish(
            (collected_event, collected_handler)
        )
    return events

apply_without_delay

apply_without_delay(commit)

Actually apply the events that were collected when running apply

Source code in shared/event_bus/delayed_event_bus.py
def apply_without_delay(self, commit: bool) -> list[T]:
    """
    Actually apply the events that were collected when running `apply`
    """
    self.apply(commit=commit)  # Make sure any pending event is collected

    applied_events = self._collected_events_event_bus.apply(commit=commit)

    return [applied_event[0] for applied_event in applied_events]

publish

publish(event)
Source code in shared/event_bus/delayed_event_bus.py
def publish(self, event: T) -> None:
    self._immediate_event_bus.publish(event)

T module-attribute

T = TypeVar('T')

shared.event_bus.event_bus

AbstractEventBus

AbstractEventBus(emit_success_log=True)

Bases: EventBus[T], Generic[T]

Base abstract class for "regular" event bus implementations. Handles event storage and most of the logic.

To implement an EventBus, simply subclass this abstract class and implement the _apply_event method.

Source code in shared/event_bus/event_bus.py
def __init__(self, emit_success_log: bool = True) -> None:
    self._events = []
    self._emit_success_log = emit_success_log

apply

apply(commit)
Source code in shared/event_bus/event_bus.py
@override
@final
def apply(self, commit: bool) -> list[T]:
    applied_events: list[T] = []
    for collected_event, handler in self.collect(commit):
        if commit:
            handler(commit=commit)
            applied_events.append(collected_event)
        else:
            try:
                with no_commit_in_session(
                    commit_at_end=False, rollback_at_end=False
                ):
                    handler(commit=commit)
                applied_events.append(collected_event)
            except CommitInNoCommitBlockError as e:
                _logger.warning(
                    "[Event Bus] Commit detected while running in dry run",
                    exc_info=e,
                )
            except RollbackInNoCommitBlockError as e:
                _logger.warning(
                    "[Event Bus] Rollback detected while running in dry run",
                    exc_info=e,
                )
    return applied_events

collect

collect(commit)

⚠️ Advanced: Prefer using apply over this method. This method should only be used when you need to collect events to execute them later on.

Collect all of the events in this event bus (via _collect_event) into a list of events mapped to the callable to be executed at a later date.

Source code in shared/event_bus/event_bus.py
@final
def collect(self, commit: bool) -> list[tuple[T, CommittableFunction]]:
    """
    **⚠️ Advanced:** Prefer using `apply` over this method. This method should only be used when you need to
    collect events to execute them later on.

    Collect all of the events in this event bus (via `_collect_event`) into a list of events mapped to the callable
    to be executed at a later date.
    """
    collected_events: list[tuple[T, CommittableFunction]] = []
    while self._events:
        event = self._events.pop(0)
        collected_events.append((event, self._collect_event(event, commit)))
    return collected_events

pending_events property

pending_events

publish

publish(event)
Source code in shared/event_bus/event_bus.py
@override
def publish(self, event: T) -> None:
    self._events.append(event)

BasicEventBus

BasicEventBus(emit_success_log=True)

Bases: AbstractEventBus[CommittableFunction]

A very simple EventBus where each event is a CommitableFunction that is called on apply.

Source code in shared/event_bus/event_bus.py
def __init__(self, emit_success_log: bool = True) -> None:
    self._events = []
    self._emit_success_log = emit_success_log

CommittableFunction

Bases: Protocol

A function that only takes a 'commit' boolean as a keyword-only argument.

The reason for this protocol's existence is to safely enable usage of partial functions: - This will break at runtime and with mypy:

```
def my_event_handler(event: Event, commit: bool):
    ...
func: Callable[[bool], None] = partial(my_event_handler, event=...)
func(True)  # will fail AT RUNTIME (saying the event param was defined multiple times)
func(commit=True)  # mypy error because commit isn't defined on Callable
```
  • However this will work just fine:

    def my_event_handler(event: Event, commit: bool):
        ...
    func: CommittableFunction = partial(my_event_handler, event=...)
    func(True) # mypy error (because of the '*', no positional arguments are allowed)
    func(commit=True) # Works!
    

__call__

__call__(*, commit)
Source code in shared/event_bus/event_bus.py
def __call__(self, *, commit: bool) -> None: ...

EventBus

Bases: ABC, Generic[T]

A class that allows you to progressively collect events over a period of time (via publish), then perform some action on them all at once (via apply).

Usage diagram:

apply abstractmethod

apply(commit)

Apply all of the events published so far, and clear this event bus. The list of events that were applied is returned.

NB: Some implementations will not immediately actually apply the events (e.g. DelayedEventBus). Do not make assumptions on this.

Source code in shared/event_bus/event_bus.py
@abstractmethod
def apply(self, commit: bool) -> list[T]:
    """
    Apply all of the events published so far, and clear this event bus.
    The list of events that were applied is returned.

    NB: Some implementations will not immediately *actually* apply the events (e.g. `DelayedEventBus`). Do not make
    assumptions on this.
    """
    ...

publish abstractmethod

publish(event)

Publish an event for later handling in this EventBus.

Source code in shared/event_bus/event_bus.py
@abstractmethod
def publish(self, event: T) -> None:
    """
    Publish an event for later handling in this EventBus.
    """
    ...

publish_all

publish_all(events)

Publish a list of events for later handling in this EventBus.

Source code in shared/event_bus/event_bus.py
def publish_all(self, events: list[T]) -> None:
    """
    Publish a list of events for later handling in this EventBus.
    """
    for event in events:
        self.publish(event)

T module-attribute

T = TypeVar('T')

shared.event_bus.multi_event_bus

EventBusOrchestrator

Bases: ABC

A class that coordinates several event buses.

This class allows you to create new event buses that will be registered under this orchestrator. For example:

def foobar(event_bus_orchestrator: EventBusOrchestrator):
    # Create and register your own event bus...
    my_event_bus = event_bus_orchestrator.wrap(MyEventBus())
    # Publish stuff to it...
    my_event_bus.publish(...)
    # And whoever called you will handle the rest!

This provides a write-only API to register event buses. The idea is that, in an API, when you want to have consumers that register their own EventBuses without the ability to apply() a whole MultiEventBus, you provide them with an EventBusOrchestrator instead. For example, instead of:

def some_handler(event_bus: MultiEventBus):
    # Potentially dangerous, you don't want the handler to call .apply() the whole event_bus!
    ...
# VS
def some_handler(event_bus: EventBusOrchestrator):
    # You can only register event buses and nothing else, all good!
    event_bus.wrap(...)

wrap abstractmethod

wrap(event_bus)

Returns a DelayedEventBus that wraps the provided event_bus, registering it in this orchestrator.

Usage:

multi_event_bus = MultiEventBus()
i_want_foo_eventbus(event_bus=multi_event_bus.wrap(FooEventBus(...)))
i_want_bar_eventbus(event_bus=multi_event_bus.wrap(BarEventBus(...)))
Source code in shared/event_bus/multi_event_bus.py
@abstractmethod
def wrap(self, event_bus: AbstractEventBus[T]) -> DelayedEventBus[T]:
    """
    Returns a DelayedEventBus that wraps the provided event_bus, registering it in this orchestrator.

    Usage:

    ```
    multi_event_bus = MultiEventBus()
    i_want_foo_eventbus(event_bus=multi_event_bus.wrap(FooEventBus(...)))
    i_want_bar_eventbus(event_bus=multi_event_bus.wrap(BarEventBus(...)))
    ```
    """
    ...

MultiEventBus

MultiEventBus()

Bases: AbstractEventBus[DelayedEventBus[Any]], EventBusOrchestrator

An EventBus that coordinates other event-bus.

To add an event-bus, use the wrap function, which will return a DelayedEventBus you can publish events to.

Internally, a MultiEventBus is an EventBus whose events are DelayedEventBuses. You should consider using wrap instead of publish for simplicity.

Source code in shared/event_bus/multi_event_bus.py
def __init__(self) -> None:
    super().__init__(emit_success_log=False)
    self._applied_events: list[Any] = []

apply_and_get_applied_events

apply_and_get_applied_events(commit)
Source code in shared/event_bus/multi_event_bus.py
def apply_and_get_applied_events(self, commit: bool) -> list[Any]:
    self._applied_events = []
    super().apply(commit)
    return self._applied_events

wrap

wrap(event_bus)

Returns a DelayedEventBus that wraps the provided event_bus, registering it in this MultiEventBus.

Usage:

multi_event_bus = MultiEventBus()
i_want_foo_eventbus(event_bus=multi_event_bus.wrap(FooEventBus(...)))
i_want_bar_eventbus(event_bus=multi_event_bus.wrap(BarEventBus(...)))
Source code in shared/event_bus/multi_event_bus.py
def wrap(self, event_bus: AbstractEventBus[T]) -> DelayedEventBus[T]:
    """
    Returns a DelayedEventBus that wraps the provided event_bus, registering it in this MultiEventBus.

    Usage:

    ```
    multi_event_bus = MultiEventBus()
    i_want_foo_eventbus(event_bus=multi_event_bus.wrap(FooEventBus(...)))
    i_want_bar_eventbus(event_bus=multi_event_bus.wrap(BarEventBus(...)))
    ```
    """
    delayed_event_bus = DelayedEventBus(event_bus)
    self.publish(delayed_event_bus)
    return delayed_event_bus

T module-attribute

T = TypeVar('T')

use_orchestrator_or_create_multi_event_bus

use_orchestrator_or_create_multi_event_bus(
    orchestrator, commit
)

A context manager that implements the pattern where:

  • If you have an orchestrator, use it and don't apply side-effects immediately (your caller is responsible for managing its event bus)
  • If you don't have an orchestrator, create a MultiEventBus, use it and apply side-effects immediately (you are responsible for managing the event bus)

This context manager transparently manages a MultiEventBus and applies side-effects as needed.

Usage example:

def my_complicated_process(
    ...,
    # If None -> caller does not want to handle the event bus, we should manage it instead
    # If not None -> caller wants to handle the event bus, we should just publish to the one they provide
    event_bus_orchestrator: EventBusOrchestrator | None = None,
    commit: bool = False,
):
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator,
        commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        event_bus.wrap(...)
        event_bus.wrap(...)
        # ...
        commit_and_execute_side_effects()
Source code in shared/event_bus/multi_event_bus.py
@contextmanager
def use_orchestrator_or_create_multi_event_bus(
    orchestrator: EventBusOrchestrator | None,
    commit: bool,
) -> Generator[tuple[EventBusOrchestrator, Callable[[], None]], None, None]:
    """
    A context manager that implements the pattern where:

    - If you have an orchestrator, use it and don't apply side-effects immediately (your caller is responsible for
      managing its event bus)
    - If you don't have an orchestrator, create a MultiEventBus, use it and apply side-effects immediately (you are
      responsible for managing the event bus)

    This context manager transparently manages a MultiEventBus and applies side-effects as needed.



    Usage example:

    ```
    def my_complicated_process(
        ...,
        # If None -> caller does not want to handle the event bus, we should manage it instead
        # If not None -> caller wants to handle the event bus, we should just publish to the one they provide
        event_bus_orchestrator: EventBusOrchestrator | None = None,
        commit: bool = False,
    ):
        with use_orchestrator_or_create_multi_event_bus(
            event_bus_orchestrator,
            commit=commit
        ) as (event_bus, commit_and_execute_side_effects):
            event_bus.wrap(...)
            event_bus.wrap(...)
            # ...
            commit_and_execute_side_effects()

    ```

    """
    if orchestrator is None:
        event_bus = MultiEventBus()
        orchestrator = event_bus
    else:
        event_bus = None

    def commit_and_execute_side_effects() -> None:
        if commit:
            current_session.commit()
        else:
            current_session.flush()

        if event_bus is not None:
            event_bus.apply(commit=commit)

    yield orchestrator, commit_and_execute_side_effects

shared.event_bus.shared_side_effects

SharedSideEffectsEventBus

SharedSideEffectsEventBus()

Bases: AbstractEventBus[SideEffectCall]

An EventBus that wraps SideEffectCall objects from shared.side_effects.

Warning: the SharedSideEffectsEventBus will maintain a single SharedSideEffectContext for the whole event bus.

Source code in shared/event_bus/shared_side_effects.py
def __init__(self) -> None:
    super().__init__()
    self._context = SharedSideEffectContext()