Skip to content

Reference

shared.queuing.sqs.blueprint

configure_blueprint

configure_blueprint(state)

Register SQS processors with their target RQ queue names.

Order matters: first matching processor wins.

Source code in shared/queuing/sqs/blueprint.py
@sqs_blueprint.record
def configure_blueprint(state: BlueprintSetupState) -> None:
    """Register SQS processors with their target RQ queue names.

    Order matters: first matching processor wins.
    """
    import shared.queuing.sqs.cli  # noqa: F401, RUF100

    _PROCESSORS.update(state.options.get("jobs_processors", {}))

sqs_blueprint module-attribute

sqs_blueprint = CustomBlueprint('sqs', 'sqs')

shared.queuing.sqs.cli

start_worker

start_worker(queue)

Poll an SQS queue, triage messages to registered processors, and enqueue into RQ.

Source code in shared/queuing/sqs/cli.py
@sqs_blueprint.cli.command("worker")  # type: ignore[misc,no-untyped-call]
@system_command
@click.option(
    "--queue", type=str, required=True, help="The SQS queue we want to poll (HTTP URL)"
)
def start_worker(queue: str) -> None:
    """Poll an SQS queue, triage messages to registered processors, and enqueue into RQ."""

    client = boto3.client("sqs")

    while True:
        response = client.receive_message(
            QueueUrl=queue,
            MaxNumberOfMessages=10,
            WaitTimeSeconds=20,
        )
        if "Messages" in response:
            successfully_processed = []
            for message in response["Messages"]:
                payload = json.loads(message["Body"])
                try:
                    _dispatch(payload, _PROCESSORS)
                    successfully_processed.append(message)
                except Exception:
                    current_logger.exception(
                        f"Failed to process message {message['MessageId']}",
                        message_id=message["MessageId"],
                    )

            if successfully_processed:
                client.delete_message_batch(
                    QueueUrl=queue,
                    Entries=[
                        {
                            "Id": message["MessageId"],
                            "ReceiptHandle": message["ReceiptHandle"],
                        }
                        for message in successfully_processed
                    ],
                )
                current_logger.info(
                    f"Successfully processed {len(successfully_processed)} messages"
                )

shared.queuing.sqs.processors

base

BaseJobsProcessor

Bases: ABC

can_handle abstractmethod staticmethod
can_handle(payload)
Source code in shared/queuing/sqs/processors/base.py
6
7
8
@staticmethod
@abstractmethod
def can_handle(payload: dict[str, Any]) -> bool: ...
process abstractmethod staticmethod
process(payload, **kwargs)
Source code in shared/queuing/sqs/processors/base.py
@staticmethod
@abstractmethod
def process(payload: dict[str, Any], **kwargs) -> None: ...  # type: ignore[no-untyped-def]

inbound_emails

FLASK_INBOUND_EMAIL_PROCESSORS_REGISTRY module-attribute

FLASK_INBOUND_EMAIL_PROCESSORS_REGISTRY = (
    "inbound-email-processors-registry"
)

InboundEmailProcessor

Bases: ABC

process abstractmethod staticmethod
process(sender, subject, body, html, attachments, headers)
Source code in shared/queuing/sqs/processors/inbound_emails.py
@staticmethod
@abstractmethod
def process(  # type: ignore[no-untyped-def]
    sender: str,
    subject: str,
    body: str,
    html: str,
    attachments: list[BytesIO],
    headers: list[dict[str, str]],
):
    pass

InboundEmailProcessorsRegistry

InboundEmailProcessorsRegistry(app)
Source code in shared/queuing/sqs/processors/inbound_emails.py
def __init__(self, app: Flask | None) -> None:
    self._processors = dict()
    if app is not None:
        self.init_app(app)
configure_cli
configure_cli(app)
Source code in shared/queuing/sqs/processors/inbound_emails.py
def configure_cli(self, app) -> None:  # type: ignore[no-untyped-def]
    email_cli = AppGroup("email")

    @email_cli.command("reprocess")
    @click.argument("object_key", type=str, nargs=1)
    @click.option("--recipient", type=str, required=True)
    @click.option("--bucket", type=str, required=True)
    def reprocess(
        recipient: str,
        bucket: str,
        object_key: str,
    ) -> None:
        self._reprocess_email(recipient, bucket, object_key)

    app.cli.add_command(email_cli)
get_first_processor_for
get_first_processor_for(recipients)
Source code in shared/queuing/sqs/processors/inbound_emails.py
def get_first_processor_for(
    self, recipients: list[str]
) -> InboundEmailProcessor | None:
    for recipient in recipients:
        if recipient in self._processors:
            return self._processors[recipient]
    return None
init_app
init_app(app)
Source code in shared/queuing/sqs/processors/inbound_emails.py
def init_app(self, app: Flask) -> None:
    app.extensions[FLASK_INBOUND_EMAIL_PROCESSORS_REGISTRY] = self
    if is_worker_environment() or is_development_mode():
        self.configure_cli(app)
register_email_processor
register_email_processor(recipient, processor)
Source code in shared/queuing/sqs/processors/inbound_emails.py
def register_email_processor(
    self,
    recipient: str,
    processor: InboundEmailProcessor,
) -> None:
    if recipient not in self._processors:
        self._processors[recipient] = processor
    else:
        current_logger.warning(
            f"Inbound email processor already declared for {recipient}"
        )
    return

PASS module-attribute

PASS = 'pass'

SesEmailProcessor

Bases: BaseJobsProcessor

can_handle staticmethod
can_handle(payload)
Source code in shared/queuing/sqs/processors/inbound_emails.py
@staticmethod
def can_handle(payload: dict[str, Any]) -> bool:
    return (
        payload.get("Type") == SQS_TYPE_NOTIFICATION
        and payload.get("Subject") == SQS_SUBJECT_SES_EMAIL_NOTIFICATION
    )
process staticmethod
process(payload, **kwargs)
Source code in shared/queuing/sqs/processors/inbound_emails.py
@staticmethod
def process(payload: dict[str, Any], **kwargs) -> None:  # type: ignore[no-untyped-def]  # noqa: ARG004
    process_ses_email(json.loads(payload["Message"]))

process_ses_email

process_ses_email(message)
Source code in shared/queuing/sqs/processors/inbound_emails.py
def process_ses_email(message: dict[str, Any]) -> None:
    current_logger.info("Processing SES email")

    receipt = message["receipt"]
    recipients = receipt["recipients"]

    action = receipt["action"]
    bucket = action["bucketName"]
    object_key = action["objectKey"]

    mail = message["mail"]
    headers = mail["headers"]
    sender = get_header_value(headers, "X-Original-Sender") or mail["source"]

    subject = mail["commonHeaders"].get("subject", "")

    logger = current_logger.bind(
        sender=sender,
        bucket=bucket,
        object_key=object_key,
        recipients=recipients,
    )

    if not (
        _validate_email_authenticity(receipt, logger)
        and _validate_email_safety(receipt, logger)
    ):
        return

    _validate_email_credibility(receipt, logger)

    registry = current_app.extensions.get(FLASK_INBOUND_EMAIL_PROCESSORS_REGISTRY)
    if registry is None:
        logger.error("No inbound email processors registry configured, skipping")
        return

    processor = registry.get_first_processor_for(recipients)

    if processor is None:
        logger.info("No processor found for inbound email, skipping")
        return

    try:
        email_message = _get_email_message(bucket, object_key)
        attachments = _get_attachments(email_message)

        plain_body = cast("EmailMessage", email_message.get_body("plain"))
        html_body = cast("EmailMessage", email_message.get_body("html"))

        logger.info("Processing inbound email")
        processor.process(
            sender=sender,
            subject=subject,
            body=plain_body.get_content() if plain_body else None,
            html=html_body.get_content() if html_body else None,
            attachments=attachments,
            headers=headers,
        )
    except Exception:
        logger.exception("Failed to process inbound email")

shared.queuing.sqs.utils

get_header_value

get_header_value(headers, name)
Source code in shared/queuing/sqs/utils.py
7
8
9
def get_header_value(headers: list[dict[str, str]], name: str) -> str | None:
    values = get_header_values(headers, name)
    return values[0] if values else None

get_header_values

get_header_values(headers, name)
Source code in shared/queuing/sqs/utils.py
1
2
3
4
def get_header_values(headers: list[dict[str, str]], name: str) -> list[str]:
    return [
        header["value"] for header in headers if header["name"].lower() == name.lower()
    ]