Skip to content

Reference

shared.messaging.broker

AsyncMessageContext

AsyncMessageContext(func, predicate, queue_name)

Bases: SyncMessageContext[T]

Source code in shared/messaging/broker.py
def __init__(
    self,
    func: Callable[[T], None],
    predicate: Callable[[T], bool],
    queue_name: str,
) -> None:
    super().__init__(func, predicate)
    self.queue_name = queue_name

queue_name instance-attribute

queue_name = queue_name

Broker

Bases: ABC

Base class for the brokers. The broker's responsibility is to receive messages and transmit them to the subscribers.

This class makes no assumption regarding serialization or transport layer.

publish abstractmethod

publish(message)
Source code in shared/messaging/broker.py
@abstractmethod
def publish(self, message: Message) -> None:
    raise NotImplementedError

subscribe abstractmethod

subscribe(message_cls, func, predicate=lambda _: True)

Subscribe to a message type with a handler function. The predicate is used to add a subscription filter depending on the message content

Source code in shared/messaging/broker.py
@abstractmethod
def subscribe(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    """
    Subscribe to a message type with a handler function.
    The predicate is used to add a subscription filter depending on the message content
    """
    raise NotImplementedError

subscribe_async abstractmethod

subscribe_async(
    message_cls, func, queue_name, predicate=lambda _: True
)

Subscribe to a message type with a handler function and a queue name. The predicate is used to add a subscription filter depending on the message content

Source code in shared/messaging/broker.py
@abstractmethod
def subscribe_async(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    queue_name: str,
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    """
    Subscribe to a message type with a handler function and a queue name.
    The predicate is used to add a subscription filter depending on the message content
    """
    raise NotImplementedError

MemoryBroker

MemoryBroker()

Bases: Broker

A broker that stores the subscribers in memory and calls them synchronously or asynchronously.

It's the responsibility of the handlers to manage exceptions, the broker only guarantees delivery to all subscribers.

Source code in shared/messaging/broker.py
def __init__(self) -> None:
    self._sync_handlers: dict[str, list[SyncMessageContext]] = defaultdict(  # type: ignore[type-arg]
        list[SyncMessageContext]  # type: ignore[type-arg]
    )
    self._async_handlers: dict[str, list[AsyncMessageContext]] = defaultdict(  # type: ignore[type-arg]
        list[AsyncMessageContext]  # type: ignore[type-arg]
    )

publish

publish(message)
Source code in shared/messaging/broker.py
@override
def publish(self, message: Message) -> None:
    from shared.queuing.flask_rq import current_rq

    message_key = get_unique_name_for_message(message.__class__)

    for async_context in self._async_handlers[message_key]:
        handler = async_context.func
        queue_name = async_context.queue_name
        predicate = async_context.predicate
        try:
            if predicate(message):
                queue = current_rq.get_queue(queue_name)
                _ = queue.enqueue(handler, message)
            else:
                current_logger.debug(
                    f"Message {message} was filtered out by predicate, not enqueuing",
                )
        except:  # noqa: E722
            current_logger.exception(
                f"Exception occurred while enqueuing message {message} with handler {handler.__name__}",
            )

    for sync_context in self._sync_handlers[message_key]:
        handler = sync_context.func
        predicate = sync_context.predicate
        try:
            if predicate(message):
                handler(message)
            else:
                current_logger.debug(
                    f"Message {message} was filtered out by predicate, not calling",
                )
        except:  # noqa: E722
            current_logger.exception(
                f"Exception occurred while enqueuing message {message} with handler {handler.__name__}",
            )

subscribe

subscribe(message_cls, func, predicate=lambda _: True)
Source code in shared/messaging/broker.py
@override
def subscribe(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    message_key = get_unique_name_for_message(message_cls)
    if message_key in self._async_handlers:
        raise ValueError(
            f"Cannot subscribe to the same message {message_key} both synchronously and asynchronously"
        )
    self._sync_handlers[message_key].append(SyncMessageContext(func, predicate))

subscribe_async

subscribe_async(
    message_cls, func, queue_name, predicate=lambda _: True
)
Source code in shared/messaging/broker.py
@override
def subscribe_async(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    queue_name: str,
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    message_key = get_unique_name_for_message(message_cls)
    if message_key in self._sync_handlers:
        raise ValueError(
            f"Cannot subscribe to the same message {message_key} both synchronously and asynchronously"
        )
    self._async_handlers[message_key].append(
        AsyncMessageContext(func, predicate, queue_name)
    )

NullBroker

NullBroker()

Bases: Broker

A broker that does nothing but store calls.

This is useful for testing purposes.

Source code in shared/messaging/broker.py
def __init__(self) -> None:
    self.published: list[Message] = []
    self.subscribed: list[tuple[type[Message], SyncMessageContext[Message]]] = []
    self.async_subscribed: list[
        tuple[type[Message], AsyncMessageContext[Message]]
    ] = []

async_subscribed instance-attribute

async_subscribed = []

publish

publish(message)
Source code in shared/messaging/broker.py
@override
def publish(self, message: Message) -> None:
    self.published.append(message)

published instance-attribute

published = []

subscribe

subscribe(message_cls, func, predicate=lambda _: True)
Source code in shared/messaging/broker.py
@override
def subscribe(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    self.subscribed.append((message_cls, SyncMessageContext(func, predicate)))  # type: ignore[arg-type]

subscribe_async

subscribe_async(
    message_cls, func, queue_name, predicate=lambda _: True
)
Source code in shared/messaging/broker.py
@override
def subscribe_async(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    queue_name: str,
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    self.async_subscribed.append(
        (message_cls, AsyncMessageContext(func, predicate, queue_name))  # type: ignore[arg-type]
    )

subscribed instance-attribute

subscribed = []

SyncMessageContext

SyncMessageContext(func, predicate)

Bases: Generic[T]

Source code in shared/messaging/broker.py
def __init__(
    self,
    func: Callable[[T], None],
    predicate: Callable[[T], bool],
) -> None:
    self.func = func
    self.predicate = predicate

func instance-attribute

func = func

predicate instance-attribute

predicate = predicate

T module-attribute

T = TypeVar('T', bound=Message)

TestBroker

TestBroker()

Bases: Broker

Test broker that can trigger handlers manually.

Source code in shared/messaging/broker.py
def __init__(self) -> None:
    self.queue: list[Message] = []

    self._sync_handlers: dict[str, list[SyncMessageContext]] = defaultdict(  # type: ignore[type-arg]
        list[SyncMessageContext]  # type: ignore[type-arg]
    )
    self._async_handlers: dict[str, list[AsyncMessageContext]] = defaultdict(  # type: ignore[type-arg]
        list[AsyncMessageContext]  # type: ignore[type-arg]
    )

process_messages

process_messages()
Source code in shared/messaging/broker.py
def process_messages(self) -> None:
    while self.queue:
        message = self.queue.pop(0)
        message_key = get_unique_name_for_message(message.__class__)
        handlers = (
            self._sync_handlers[message_key] + self._async_handlers[message_key]
        )

        for context in handlers:
            handler = context.func
            predicate = context.predicate
            try:
                if predicate(message):
                    handler(message)
                else:
                    current_logger.debug(
                        f"Message {message} was filtered out by predicate, not calling",
                    )
            except:  # noqa: E722  # excluded as part as the ruff baseline on rule introduction, please use a more descriptive error if possible
                current_logger.exception(
                    "Exception occurred while processing message {} in handler {}",
                    message,
                    handler.__name__,
                )

publish

publish(message)
Source code in shared/messaging/broker.py
@override
def publish(self, message: Message) -> None:
    self.queue.append(message)

queue instance-attribute

queue = []

subscribe

subscribe(message_cls, func, predicate=lambda _: True)
Source code in shared/messaging/broker.py
@override
def subscribe(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    message_key = get_unique_name_for_message(message_cls)
    if message_key in self._async_handlers:
        raise ValueError(
            f"Cannot subscribe to the same message {message_key} both synchronously and asynchronously"
        )
    self._sync_handlers[message_key].append(SyncMessageContext(func, predicate))

subscribe_async

subscribe_async(
    message_cls, func, queue_name, predicate=lambda _: True
)
Source code in shared/messaging/broker.py
@override
def subscribe_async(
    self,
    message_cls: type[T],
    func: Callable[[T], None],
    queue_name: str,
    predicate: Callable[[T], bool] = lambda _: True,
) -> None:
    message_key = get_unique_name_for_message(message_cls)
    if message_key in self._sync_handlers:
        raise ValueError(
            f"Cannot subscribe to the same message {message_key} both synchronously and asynchronously"
        )
    self._async_handlers[message_key].append(
        AsyncMessageContext(func, predicate, queue_name)
    )

get_message_broker

get_message_broker()

Return the process-local broker. Raises if not initialized to fail fast in prod.

Source code in shared/messaging/broker.py
def get_message_broker() -> Broker:
    """
    Return the process-local broker.
    Raises if not initialized to fail fast in prod.
    """
    global _MESSAGE_BROKER_INSTANCE
    with _BROKER_LOCK:
        if _MESSAGE_BROKER_INSTANCE is None:
            _MESSAGE_BROKER_INSTANCE = MemoryBroker()
        return _MESSAGE_BROKER_INSTANCE

get_unique_name_for_message

get_unique_name_for_message(message_cls)
Source code in shared/messaging/broker.py
def get_unique_name_for_message(message_cls: type[Message]) -> str:
    return message_cls.__module__ + "." + message_cls.__qualname__

override_message_broker

override_message_broker(temp)

Temporarily replace the process-local broker for the duration of the context. Restores the previous broker afterwards. Does not close either instance.

Source code in shared/messaging/broker.py
@contextmanager
def override_message_broker(temp: Broker) -> Generator[Broker, None, None]:
    """
    Temporarily replace the process-local broker for the duration of the context.
    Restores the previous broker afterwards. Does not close either instance.
    """
    global _MESSAGE_BROKER_INSTANCE
    with _BROKER_LOCK:
        prev = _MESSAGE_BROKER_INSTANCE
        _MESSAGE_BROKER_INSTANCE = temp
    try:
        yield temp
    finally:
        with _BROKER_LOCK:
            _MESSAGE_BROKER_INSTANCE = prev

shared.messaging.consumer

Consumer

Consumer(message_cls, broker)

Bases: Generic[T]

Generic consumer for a given message type.

This implementation requires to pass the type as parameter because there is no supported API for getting the type from the generic information. See https://stackoverflow.com/questions/57706180/generict-base-class-how-to-get-type-of-t-from-within-instance ⧉

Source code in shared/messaging/consumer.py
def __init__(self, message_cls: type[T], broker: Broker) -> None:
    self.message_cls = message_cls
    self.broker = broker
    self._registered = False

broker instance-attribute

broker = broker

message_cls instance-attribute

message_cls = message_cls

on_message

on_message(func)
Source code in shared/messaging/consumer.py
def on_message(self, func: Callable[[T], None]) -> Callable[[T], None]:
    if self._registered:
        raise ConsumerAlreadyRegisteredError(
            "A consumer can only have a single `on_message` handler. Create a new consumer instead."
        )
    self.broker.subscribe(self.message_cls, cast("Callable[[Message], None]", func))
    self._registered = True
    return func

T module-attribute

T = TypeVar('T', bound=Message)

shared.messaging.exceptions

ConsumerAlreadyRegisteredError

Bases: PubSubError

Raised when a consumer's on_message is used more than once.

PubSubError

Bases: Exception

Base class for all pubsub exceptions.

shared.messaging.message

Message dataclass

Message()

Serializable and hashable base class for all messages.

__post_init__

__post_init__()
Source code in shared/messaging/message.py
def __post_init__(self) -> None:
    object.__setattr__(self, "created_at", datetime.utcnow())

created_at class-attribute instance-attribute

created_at = field(init=False)

shared.messaging.message_types

dsn

DsnSalaryReceived dataclass

DsnSalaryReceived(
    user_ref,
    month,
    gross_monthly_income_in_cents,
    commit=True,
)

Bases: Message

commit class-attribute instance-attribute
commit = True
gross_monthly_income_in_cents instance-attribute
gross_monthly_income_in_cents
month instance-attribute
month
user_ref instance-attribute
user_ref

shared.messaging.producer

Producer

Producer(broker)

Producer of messages.

Source code in shared/messaging/producer.py
def __init__(self, broker: Broker) -> None:
    self.broker = broker

broker instance-attribute

broker = broker

publish

publish(message)
Source code in shared/messaging/producer.py
def publish(self, message: Message) -> None:
    self.broker.publish(message)