Skip to content

Messaging

The messaging module provides a pub/sub system for decoupling application components through (a)synchronous message passing. It enables event-driven architecture where components can publish messages without knowing about their consumers.

Core Concepts

Messages

Messages are immutable data classes that represent events in your system. All messages must inherit from the Message base class:

from dataclasses import dataclass
from shared.messaging import Message

@dataclass(frozen=True)
class UserRegistered(Message):
    user_id: str
    email: str

@dataclass(frozen=True)
class OrderPlaced(Message):
    order_id: str
    customer_id: str
    total_amount: float

Broker

The broker is responsible for routing messages from publishers to subscribers. It supports both synchronous and asynchronous message processing:

  • Synchronous: Messages are processed immediately in the same thread
  • Asynchronous: Messages are queued and processed by background workers using RQ (current implementation)

Publishers and Subscribers

  • Publishers send messages without knowing who will receive them
  • Subscribers register handlers for specific message types

Usage

Basic Publishing

from shared.messaging.broker import get_message_broker

# Get the global broker instance
message_broker = get_message_broker()

# Publish a message
user_registered = UserRegistered(user_id="123", email="john@example.com")
message_broker.publish(user_registered)

Synchronous Subscription

Synchronous handlers are executed immediately when a message is published:

def handle_user_registration(event: UserRegistered):
    print(f"User {event.email} registered with ID {event.user_id}")

# Subscribe to the message type
message_broker.subscribe(UserRegistered, handle_user_registration)

Asynchronous Subscription

Asynchronous handlers are queued and processed by background workers:

from shared.queuing.queue_names import LOW_PRIORITY

def send_welcome_email(event: UserRegistered):
    # This will run in a background job
    email_service.send_welcome_email(event.email)

# Subscribe with a queue name
message_broker.subscribe_async(
    UserRegistered,
    send_welcome_email,
    queue_name=LOW_PRIORITY
)

Message Filtering with Predicates

To avoid spawning RQ jobs that would actually discard events, you can filter messages using predicates:

def handle_premium_orders(event: OrderPlaced):
    print(f"Processing premium order: {event.order_id}")

# Only process orders above $100
message_broker.subscribe(
    OrderPlaced,
    handle_premium_orders,
    predicate=lambda order: order.total_amount > 100
)

Note that this is very implementation specific and support might be dropped in the future (RQ)

Integration with Transactions

The messaging system integrates seamlessly with the transaction module. Messages published within a transaction are only sent if the transaction commits successfully:

from shared.transaction import transactional, register_event

@transactional
def create_user(session: Session, email: str, name: str):
    user = User(email=email, name=name)
    session.add(user)
    session.flush()  # Get the user ID

    # Event will only be sent if transaction commits
    register_event(UserRegistered(user_id=str(user.id), email=email))

    return user

Message Patterns

Domain Events

Use messages to represent significant business events:

@dataclass(frozen=True)
class PolicyActivated(Message):
    policy_id: str
    member_id: str
    activation_date: datetime

@dataclass(frozen=True)
class ClaimSubmitted(Message):
    claim_id: str
    policy_id: str
    amount: float

Command Messages

For triggering actions across components:

@dataclass(frozen=True)
class GenerateInvoice(Message):
    subscription_id: str
    billing_period: str

@dataclass(frozen=True)
class SendNotification(Message):
    recipient_id: str
    template: str
    data: dict

Testing

Using NullBroker for Tests

The NullBroker stores published messages without processing them, perfect for testing:

from shared.messaging.broker import NullBroker, override_message_broker

def test_user_registration_publishes_event():
    null_broker = NullBroker()

    with override_message_broker(null_broker):
        # Your code that publishes messages
        register_user("john@example.com")

        # Verify the message was published
        assert len(null_broker.published) == 1
        assert isinstance(null_broker.published[0], UserRegistered)
        assert null_broker.published[0].email == "john@example.com"

Using TestBroker for Integration Tests

The TestBroker allows you to manually trigger message processing:

from shared.messaging.broker import TestBroker, override_message_broker

def test_full_message_flow():
    test_broker = TestBroker()

    # Set up subscriber
    processed_messages = []

    def handler(event: UserRegistered):
        processed_messages.append(event)

    test_broker.subscribe(UserRegistered, handler)

    with override_message_broker(test_broker):
        # Publish message
        register_user("john@example.com")

        # Manually process the queue
        test_broker.process_messages()

        # Verify handler was called
        assert len(processed_messages) == 1

Error Handling

Message handlers should implement their own error handling. The broker guarantees delivery to all subscribers but doesn't handle exceptions:

def safe_email_handler(event: UserRegistered):
    try:
        email_service.send_welcome_email(event.email)
    except EmailServiceError as e:
        current_logger.error(f"Failed to send welcome email: {e}")
        # Could retry, send to DLQ, etc.

Best Practices

Message Design

  • Keep messages immutable: Use @dataclass(frozen=True)
  • Include relevant context: Add enough information for handlers to process without additional queries
  • Use clear naming: Message names should describe what happened, not what should happen

Subscription Management

  • Handle errors gracefully: Don't let one handler failure affect others
  • One subscriber per message type: Don't mix sync and async subscriptions for the same message (can be reconsidered)
  • Use predicates for filtering: Keep handlers focused by filtering at subscription time (support might be reconsidered)

Performance Considerations

  • Use async for non-critical operations: Email sending, analytics, logging
  • Use sync for critical business logic: Data consistency, validation
  • Batch operations when possible: Group related messages or use bulk operations

Common Patterns

Cross-Component Communication

# Component A publishes
@dataclass(frozen=True)
class UserProfileUpdated(Message):
    user_id: str
    changes: dict

# Component B subscribes
def update_recommendations(event: UserProfileUpdated):
    recommendation_engine.refresh_user_data(event.user_id)

Event Sourcing

@dataclass(frozen=True)
class AccountCreated(Message):
    account_id: str
    initial_balance: float

@dataclass(frozen=True)
class MoneyDeposited(Message):
    account_id: str
    amount: float

@dataclass(frozen=True)
class MoneyWithdrawn(Message):
    account_id: str
    amount: float

Architecture Integration

The messaging module supports Alan's modular monolith architecture by:

  • Decoupling components: Reduces direct dependencies between business domains
  • Enabling async processing: Offloads non-critical work to background jobs
  • Supporting event sourcing: Captures business events for audit and replay
  • Facilitating testing: Provides test doubles for isolated unit testing

For more advanced patterns and examples, see the transaction module documentation where messaging is used for event publishing after successful database commits.

Implementation Details

Memory Broker

Maintains an in-memory map of subscribers for each event type with 2 subscribe policies:

  • Synchronous: Direct call to the handlers
  • Asynchronous: Enqueue each handler to a specific RQ queue

Reading Materials

Reference