SQS Job Processing¶
shared.queuing.sqs bridges AWS SQS with our internal RQ job system. It polls SQS queues, matches incoming messages to
registered processors, and enqueues the work into RQ for async execution.
Architecture¶
flowchart LR
SES["AWS SES"] -->|publish| SNS[SNS Topic]
SNS -->|deliver| SQS[SQS Queue]
SQS -->|poll| Worker["SQS Worker<br/>(cli.py)"]
Worker -->|dispatch| Processor[BaseJobsProcessor]
Processor -->|enqueue| RQ[RQ Queue]
RQ -->|execute| Job["Async Job<br/>(RQ Worker)"]
Currently the only registered processor is SesEmailProcessor (inbound email handling). Legacy processors
(CommandEventMessagesProcessor, PlatformEventMessagesProcessor) were removed as part of the Beanstalk cleanup.
The framework is generic though: any AWS service publishing to SNS/SQS can be consumed by adding a dedicated processor.
Package structure¶
shared/queuing/sqs/
├── blueprint.py # Flask-Smorest blueprint & processor registration
├── cli.py # SQS worker command (long-poll + dispatch loop)
├── utils.py # Header lookup helpers
└── processors/
├── base.py # BaseJobsProcessor ABC
└── inbound_emails.py # SES email processing (see dedicated doc)
Core concepts¶
BaseJobsProcessor¶
Abstract base class that every SQS processor must implement:
class BaseJobsProcessor(ABC):
@staticmethod
@abstractmethod
def can_handle(payload: dict[str, Any]) -> bool: ...
@staticmethod
@abstractmethod
def process(payload: dict[str, Any], **kwargs) -> None: ...
can_handle— inspects the raw SQS payload and returnsTrueif this processor should handle it.process— performs the actual work. Called inside an RQ job.
Blueprint registration¶
Processors are registered when the sqs_blueprint is mounted on a Flask app. The jobs_processors dict maps processor
classes to RQ queue names:
api.register_blueprint(
sqs_blueprint,
jobs_processors={
SesEmailProcessor: "default",
CustomProcessor: "high-priority",
},
)
Order matters: the first processor whose can_handle returns True wins.
Worker loop¶
The flask sqs worker --queue <URL> command runs an infinite loop that:
- Long-polls SQS (
MaxNumberOfMessages=10,WaitTimeSeconds=20) - For each message, parses the JSON body and calls
_dispatch() _dispatchiterates registered processors, finds a match, and enqueuesprocessor.process(payload)into the mapped RQ queue- Successfully dispatched messages are deleted from SQS in batch
- Failed messages are left in the queue (SQS visibility timeout handles retry)
Adding a new processor¶
- Create a class extending
BaseJobsProcessor:
from shared.queuing.sqs.processors.base import BaseJobsProcessor
class MyProcessor(BaseJobsProcessor):
@staticmethod
def can_handle(payload: dict[str, Any]) -> bool:
return payload.get("Type") == "MyCustomType"
@staticmethod
def process(payload: dict[str, Any], **kwargs) -> None:
# Your processing logic here
...
- Register it in your app's blueprint setup:
Reference¶
Further reading¶
- Inbound Email Processing — SES/SNS/SQS pipeline, validation, and processor registry