Skip to content

SQS Job Processing

shared.queuing.sqs bridges AWS SQS with our internal RQ job system. It polls SQS queues, matches incoming messages to registered processors, and enqueues the work into RQ for async execution.

Architecture

flowchart LR
    SES["AWS SES"] -->|publish| SNS[SNS Topic]
    SNS -->|deliver| SQS[SQS Queue]
    SQS -->|poll| Worker["SQS Worker<br/>(cli.py)"]
    Worker -->|dispatch| Processor[BaseJobsProcessor]
    Processor -->|enqueue| RQ[RQ Queue]
    RQ -->|execute| Job["Async Job<br/>(RQ Worker)"]
Hold "Alt" / "Option" to enable pan & zoom

Currently the only registered processor is SesEmailProcessor (inbound email handling). Legacy processors (CommandEventMessagesProcessor, PlatformEventMessagesProcessor) were removed as part of the Beanstalk cleanup. The framework is generic though: any AWS service publishing to SNS/SQS can be consumed by adding a dedicated processor.

Package structure

shared/queuing/sqs/
├── blueprint.py         # Flask-Smorest blueprint & processor registration
├── cli.py               # SQS worker command (long-poll + dispatch loop)
├── utils.py             # Header lookup helpers
└── processors/
    ├── base.py           # BaseJobsProcessor ABC
    └── inbound_emails.py # SES email processing (see dedicated doc)

Core concepts

BaseJobsProcessor

Abstract base class that every SQS processor must implement:

class BaseJobsProcessor(ABC):
    @staticmethod
    @abstractmethod
    def can_handle(payload: dict[str, Any]) -> bool: ...

    @staticmethod
    @abstractmethod
    def process(payload: dict[str, Any], **kwargs) -> None: ...
  • can_handle — inspects the raw SQS payload and returns True if this processor should handle it.
  • process — performs the actual work. Called inside an RQ job.

Blueprint registration

Processors are registered when the sqs_blueprint is mounted on a Flask app. The jobs_processors dict maps processor classes to RQ queue names:

api.register_blueprint(
    sqs_blueprint,
    jobs_processors={
        SesEmailProcessor: "default",
        CustomProcessor: "high-priority",
    },
)

Order matters: the first processor whose can_handle returns True wins.

Worker loop

The flask sqs worker --queue <URL> command runs an infinite loop that:

  1. Long-polls SQS (MaxNumberOfMessages=10, WaitTimeSeconds=20)
  2. For each message, parses the JSON body and calls _dispatch()
  3. _dispatch iterates registered processors, finds a match, and enqueues processor.process(payload) into the mapped RQ queue
  4. Successfully dispatched messages are deleted from SQS in batch
  5. Failed messages are left in the queue (SQS visibility timeout handles retry)

Adding a new processor

  1. Create a class extending BaseJobsProcessor:
from shared.queuing.sqs.processors.base import BaseJobsProcessor


class MyProcessor(BaseJobsProcessor):
    @staticmethod
    def can_handle(payload: dict[str, Any]) -> bool:
        return payload.get("Type") == "MyCustomType"

    @staticmethod
    def process(payload: dict[str, Any], **kwargs) -> None:
        # Your processing logic here
        ...
  1. Register it in your app's blueprint setup:
api.register_blueprint(
    sqs_blueprint,
    jobs_processors={
        MyProcessor: "default",
    },
)

Reference

Further reading