Event bus
shared.event_bus.delayed_event_bus ¶
DelayedEventBus ¶
A special EventBus that receives events, but does not execute them when 'apply' is called.
Instead, every time 'apply' is called on a DelayedEventBus, the underlying event bus is collected, and we add the would-be executed Callables to a separate event bus.
Once everything is ran, you can then use side_effects_bus.apply(commit=...) to actually run all of the collected
side effects.
Usage:
event_bus = DelayedEventBus(MyEventBus())
process_one(event_bus)
process_two(event_bus)
# At this point the events have not actually been applied
# To apply them all at once:
event_bus.apply_without_delay(commit=...)
Make sure you use apply_without_delay(), not apply()!
Source code in shared/event_bus/delayed_event_bus.py
apply ¶
Source code in shared/event_bus/delayed_event_bus.py
apply_without_delay ¶
Actually apply the events that were collected when running apply
Source code in shared/event_bus/delayed_event_bus.py
shared.event_bus.event_bus ¶
AbstractEventBus ¶
Bases: EventBus[T], Generic[T]
Base abstract class for "regular" event bus implementations. Handles event storage and most of the logic.
To implement an EventBus, simply subclass this abstract class and implement the _apply_event method.
Source code in shared/event_bus/event_bus.py
apply ¶
Source code in shared/event_bus/event_bus.py
collect ¶
⚠️ Advanced: Prefer using apply over this method. This method should only be used when you need to
collect events to execute them later on.
Collect all of the events in this event bus (via _collect_event) into a list of events mapped to the callable
to be executed at a later date.
Source code in shared/event_bus/event_bus.py
BasicEventBus ¶
Bases: AbstractEventBus[CommittableFunction]
A very simple EventBus where each event is a CommitableFunction that is called on apply.
Source code in shared/event_bus/event_bus.py
CommittableFunction ¶
Bases: Protocol
A function that only takes a 'commit' boolean as a keyword-only argument.
The reason for this protocol's existence is to safely enable usage of partial functions: - This will break at runtime and with mypy:
```
def my_event_handler(event: Event, commit: bool):
...
func: Callable[[bool], None] = partial(my_event_handler, event=...)
func(True) # will fail AT RUNTIME (saying the event param was defined multiple times)
func(commit=True) # mypy error because commit isn't defined on Callable
```
-
However this will work just fine:
EventBus ¶
Bases: ABC, Generic[T]
A class that allows you to progressively collect events over a period of time (via publish), then perform some
action on them all at once (via apply).
Usage diagram:
apply
abstractmethod
¶
Apply all of the events published so far, and clear this event bus. The list of events that were applied is returned.
NB: Some implementations will not immediately actually apply the events (e.g. DelayedEventBus). Do not make
assumptions on this.
Source code in shared/event_bus/event_bus.py
publish
abstractmethod
¶
publish_all ¶
shared.event_bus.multi_event_bus ¶
EventBusOrchestrator ¶
Bases: ABC
A class that coordinates several event buses.
This class allows you to create new event buses that will be registered under this orchestrator. For example:
def foobar(event_bus_orchestrator: EventBusOrchestrator):
# Create and register your own event bus...
my_event_bus = event_bus_orchestrator.wrap(MyEventBus())
# Publish stuff to it...
my_event_bus.publish(...)
# And whoever called you will handle the rest!
This provides a write-only API to register event buses. The idea is that, in an API, when you want to have consumers that register their own EventBuses without the ability to apply() a whole MultiEventBus, you provide them with an EventBusOrchestrator instead. For example, instead of:
def some_handler(event_bus: MultiEventBus):
# Potentially dangerous, you don't want the handler to call .apply() the whole event_bus!
...
# VS
def some_handler(event_bus: EventBusOrchestrator):
# You can only register event buses and nothing else, all good!
event_bus.wrap(...)
wrap
abstractmethod
¶
Returns a DelayedEventBus that wraps the provided event_bus, registering it in this orchestrator.
Usage:
multi_event_bus = MultiEventBus()
i_want_foo_eventbus(event_bus=multi_event_bus.wrap(FooEventBus(...)))
i_want_bar_eventbus(event_bus=multi_event_bus.wrap(BarEventBus(...)))
Source code in shared/event_bus/multi_event_bus.py
MultiEventBus ¶
Bases: AbstractEventBus[DelayedEventBus[Any]], EventBusOrchestrator
An EventBus that coordinates other event-bus.
To add an event-bus, use the wrap function, which will return a DelayedEventBus you can publish events to.
Internally, a MultiEventBus is an EventBus whose events are DelayedEventBuses. You should consider using wrap
instead of publish for simplicity.
Source code in shared/event_bus/multi_event_bus.py
apply_and_get_applied_events ¶
wrap ¶
Returns a DelayedEventBus that wraps the provided event_bus, registering it in this MultiEventBus.
Usage:
multi_event_bus = MultiEventBus()
i_want_foo_eventbus(event_bus=multi_event_bus.wrap(FooEventBus(...)))
i_want_bar_eventbus(event_bus=multi_event_bus.wrap(BarEventBus(...)))
Source code in shared/event_bus/multi_event_bus.py
use_orchestrator_or_create_multi_event_bus ¶
A context manager that implements the pattern where:
- If you have an orchestrator, use it and don't apply side-effects immediately (your caller is responsible for managing its event bus)
- If you don't have an orchestrator, create a MultiEventBus, use it and apply side-effects immediately (you are responsible for managing the event bus)
This context manager transparently manages a MultiEventBus and applies side-effects as needed.
Usage example:
def my_complicated_process(
...,
# If None -> caller does not want to handle the event bus, we should manage it instead
# If not None -> caller wants to handle the event bus, we should just publish to the one they provide
event_bus_orchestrator: EventBusOrchestrator | None = None,
commit: bool = False,
):
with use_orchestrator_or_create_multi_event_bus(
event_bus_orchestrator,
commit=commit
) as (event_bus, commit_and_execute_side_effects):
event_bus.wrap(...)
event_bus.wrap(...)
# ...
commit_and_execute_side_effects()
Source code in shared/event_bus/multi_event_bus.py
shared.event_bus.shared_side_effects ¶
SharedSideEffectsEventBus ¶
Bases: AbstractEventBus[SideEffectCall]
An EventBus that wraps SideEffectCall objects from shared.side_effects.
Warning: the SharedSideEffectsEventBus will maintain a single SharedSideEffectContext for the whole event bus.