Messaging¶
The messaging module provides a pub/sub system for decoupling application components through (a)synchronous message passing. It enables event-driven architecture where components can publish messages without knowing about their consumers.
Core Concepts¶
Messages¶
Messages are immutable data classes that represent events in your system. All messages must inherit from the Message base class:
from dataclasses import dataclass
from shared.messaging import Message
@dataclass(frozen=True)
class UserRegistered(Message):
user_id: str
email: str
@dataclass(frozen=True)
class OrderPlaced(Message):
order_id: str
customer_id: str
total_amount: float
Broker¶
The broker is responsible for routing messages from publishers to subscribers. It supports both synchronous and asynchronous message processing:
- Synchronous: Messages are processed immediately in the same thread
- Asynchronous: Messages are queued and processed by background workers using RQ (current implementation)
Publishers and Subscribers¶
- Publishers send messages without knowing who will receive them
- Subscribers register handlers for specific message types
Usage¶
Basic Publishing¶
from shared.messaging.broker import get_message_broker
# Get the global broker instance
message_broker = get_message_broker()
# Publish a message
user_registered = UserRegistered(user_id="123", email="john@example.com")
message_broker.publish(user_registered)
Synchronous Subscription¶
Synchronous handlers are executed immediately when a message is published:
def handle_user_registration(event: UserRegistered):
print(f"User {event.email} registered with ID {event.user_id}")
# Subscribe to the message type
message_broker.subscribe(UserRegistered, handle_user_registration)
Asynchronous Subscription¶
Asynchronous handlers are queued and processed by background workers:
from shared.queuing.queue_names import LOW_PRIORITY
def send_welcome_email(event: UserRegistered):
# This will run in a background job
email_service.send_welcome_email(event.email)
# Subscribe with a queue name
message_broker.subscribe_async(
UserRegistered,
send_welcome_email,
queue_name=LOW_PRIORITY
)
Message Filtering with Predicates¶
To avoid spawning RQ jobs that would actually discard events, you can filter messages using predicates:
def handle_premium_orders(event: OrderPlaced):
print(f"Processing premium order: {event.order_id}")
# Only process orders above $100
message_broker.subscribe(
OrderPlaced,
handle_premium_orders,
predicate=lambda order: order.total_amount > 100
)
Note that this is very implementation specific and support might be dropped in the future (RQ)
Integration with Transactions¶
The messaging system integrates seamlessly with the transaction module. Messages published within a transaction are only sent if the transaction commits successfully:
from shared.transaction import transactional, register_event
@transactional
def create_user(session: Session, email: str, name: str):
user = User(email=email, name=name)
session.add(user)
session.flush() # Get the user ID
# Event will only be sent if transaction commits
register_event(UserRegistered(user_id=str(user.id), email=email))
return user
Message Patterns¶
Domain Events¶
Use messages to represent significant business events:
@dataclass(frozen=True)
class PolicyActivated(Message):
policy_id: str
member_id: str
activation_date: datetime
@dataclass(frozen=True)
class ClaimSubmitted(Message):
claim_id: str
policy_id: str
amount: float
Command Messages¶
For triggering actions across components:
@dataclass(frozen=True)
class GenerateInvoice(Message):
subscription_id: str
billing_period: str
@dataclass(frozen=True)
class SendNotification(Message):
recipient_id: str
template: str
data: dict
Testing¶
Using NullBroker for Tests¶
The NullBroker stores published messages without processing them, perfect for testing:
from shared.messaging.broker import NullBroker, override_message_broker
def test_user_registration_publishes_event():
null_broker = NullBroker()
with override_message_broker(null_broker):
# Your code that publishes messages
register_user("john@example.com")
# Verify the message was published
assert len(null_broker.published) == 1
assert isinstance(null_broker.published[0], UserRegistered)
assert null_broker.published[0].email == "john@example.com"
Using TestBroker for Integration Tests¶
The TestBroker allows you to manually trigger message processing:
from shared.messaging.broker import TestBroker, override_message_broker
def test_full_message_flow():
test_broker = TestBroker()
# Set up subscriber
processed_messages = []
def handler(event: UserRegistered):
processed_messages.append(event)
test_broker.subscribe(UserRegistered, handler)
with override_message_broker(test_broker):
# Publish message
register_user("john@example.com")
# Manually process the queue
test_broker.process_messages()
# Verify handler was called
assert len(processed_messages) == 1
Error Handling¶
Message handlers should implement their own error handling. The broker guarantees delivery to all subscribers but doesn't handle exceptions:
def safe_email_handler(event: UserRegistered):
try:
email_service.send_welcome_email(event.email)
except EmailServiceError as e:
current_logger.error(f"Failed to send welcome email: {e}")
# Could retry, send to DLQ, etc.
Best Practices¶
Message Design¶
- Keep messages immutable: Use
@dataclass(frozen=True) - Include relevant context: Add enough information for handlers to process without additional queries
- Use clear naming: Message names should describe what happened, not what should happen
Subscription Management¶
- Handle errors gracefully: Don't let one handler failure affect others
- One subscriber per message type: Don't mix sync and async subscriptions for the same message (can be reconsidered)
- Use predicates for filtering: Keep handlers focused by filtering at subscription time (support might be reconsidered)
Performance Considerations¶
- Use async for non-critical operations: Email sending, analytics, logging
- Use sync for critical business logic: Data consistency, validation
- Batch operations when possible: Group related messages or use bulk operations
Common Patterns¶
Cross-Component Communication¶
# Component A publishes
@dataclass(frozen=True)
class UserProfileUpdated(Message):
user_id: str
changes: dict
# Component B subscribes
def update_recommendations(event: UserProfileUpdated):
recommendation_engine.refresh_user_data(event.user_id)
Event Sourcing¶
@dataclass(frozen=True)
class AccountCreated(Message):
account_id: str
initial_balance: float
@dataclass(frozen=True)
class MoneyDeposited(Message):
account_id: str
amount: float
@dataclass(frozen=True)
class MoneyWithdrawn(Message):
account_id: str
amount: float
Architecture Integration¶
The messaging module supports Alan's modular monolith architecture by:
- Decoupling components: Reduces direct dependencies between business domains
- Enabling async processing: Offloads non-critical work to background jobs
- Supporting event sourcing: Captures business events for audit and replay
- Facilitating testing: Provides test doubles for isolated unit testing
For more advanced patterns and examples, see the transaction module documentation where messaging is used for event publishing after successful database commits.
Implementation Details¶
Memory Broker¶
Maintains an in-memory map of subscribers for each event type with 2 subscribe policies:
- Synchronous: Direct call to the handlers
- Asynchronous: Enqueue each handler to a specific RQ queue
Reading Materials¶
- Modular Monolith with DDD - Modules Integration ⧉
- The Outbox Pattern in Python ⧉
- Outbox Streaming Implementation ⧉
- The Outbox Pattern ⧉
- Eventuate Tram Core ⧉