Skip to content

Pubsub

shared.pubsub.topics

T module-attribute

T = TypeVar('T')

Topic

Topic()

Bases: Generic[T]

Generic T message distribution following the publish/subscribe pattern

This class is a simple device to decouple the publisher from the subscribers. It doesn't implement anything beyond simple message distribution to a list of subscribers. In particular there is no async job processing or exception handling whatsoever.

The contract is as follows: - Publishers call the topic publish method to send a message to all subscribers by calling their receive method - Subscribers call the subscribe method to register themselves as subscribers of a topic, and the unsubscribe method to unregister themselves - Registration can be static or dynamic - Publication is synchronous and blocking on the publisher side - Messages are distributed to all subscribers in no particular order - Subscribers' receive method must not perform any blocking or costly operation and must not raise any exception; they should be as fast as possible - Subscribers are free to implement their own exception handling and async job processing as long as they respect the contract above

Source code in shared/pubsub/topics.py
def __init__(self) -> None:
    self.subscribers = set()

Subscriber

Base interface for T message topic subscribers

receive
receive(_message)
Source code in shared/pubsub/topics.py
def receive(self, _message: T) -> None:
    pass

publish

publish(message)

Distribute message to all subscribers in no particular order

Source code in shared/pubsub/topics.py
def publish(self, message: T) -> None:
    """Distribute message to all subscribers in no particular order"""
    for subscriber in self.subscribers:
        subscriber.receive(message)

subscribe

subscribe(subscriber)

Subscribe to topic

Source code in shared/pubsub/topics.py
def subscribe(self, subscriber: Subscriber) -> None:
    """Subscribe to topic"""
    self.subscribers.add(subscriber)

subscribers instance-attribute

subscribers = set()

unsubscribe

unsubscribe(subscriber)

Unsubscribe from topic

Source code in shared/pubsub/topics.py
def unsubscribe(self, subscriber: Subscriber) -> None:
    """Unsubscribe from topic"""
    self.subscribers.remove(subscriber)