Skip to content

Api reference

components.emailing.public.app_group

emailing module-attribute

emailing = AppGroup('emailing')

components.emailing.public.blueprint

EmailingBlueprint

EmailingBlueprint(*args, **kwargs)

Bases: Blueprint

Source code in components/emailing/public/blueprint.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    super().__init__(*args, **kwargs)
    self.templates: dict[str, EmailTemplate] = {}  # type: ignore[type-arg]
    self.segments: dict[str, Segment] = {}
    self.recurring_campaigns: dict[str, RecurringCampaign] = {}
    self.non_recurring_campaigns: dict[str, Campaign] = {}
    self.webhooks: dict[str, list] = defaultdict(list)  # type: ignore[type-arg]

get_campaign

get_campaign(campaign_name)
Source code in components/emailing/public/blueprint.py
def get_campaign(self, campaign_name: str) -> Campaign | None:  # noqa: D102
    if campaign_name in self.recurring_campaigns:
        return self.recurring_campaigns[campaign_name]
    if campaign_name in self.non_recurring_campaigns:
        return self.non_recurring_campaigns[campaign_name]
    return None

get_non_recurring_campaign

get_non_recurring_campaign(campaign_name)
Source code in components/emailing/public/blueprint.py
def get_non_recurring_campaign(self, campaign_name: str) -> Campaign:  # noqa: D102
    if campaign_name not in self.non_recurring_campaigns:
        raise ValueError(f"Non recurring campaign {campaign_name} not found")
    return self.non_recurring_campaigns[campaign_name]

get_recurring_campaign

get_recurring_campaign(campaign_name)
Source code in components/emailing/public/blueprint.py
def get_recurring_campaign(self, campaign_name: str) -> RecurringCampaign:  # noqa: D102
    if campaign_name not in self.recurring_campaigns:
        raise ValueError(f"Recurring campaign {campaign_name} not found")
    return self.recurring_campaigns[campaign_name]

get_segment

get_segment(segment_name)
Source code in components/emailing/public/blueprint.py
def get_segment(self, segment_name: str) -> Segment:  # noqa: D102
    if segment_name not in self.segments:
        raise ValueError(f"Segment {segment_name} not found")
    return self.segments[segment_name]

get_segments

get_segments()
Source code in components/emailing/public/blueprint.py
def get_segments(self) -> list[Segment]:  # noqa: D102
    return list(self.segments.values())

get_template

get_template(template_name)
Source code in components/emailing/public/blueprint.py
def get_template(self, template_name: str) -> EmailTemplate:  # type: ignore[type-arg]  # noqa: D102
    if template_name not in self.templates:
        raise ValueError(f"Template {template_name} not found")
    return self.templates[template_name]

get_templates

get_templates()
Source code in components/emailing/public/blueprint.py
def get_templates(self) -> list[EmailTemplate]:  # type: ignore[type-arg]  # noqa: D102
    return list(self.templates.values())

get_webhooks

get_webhooks(campaign_name)
Source code in components/emailing/public/blueprint.py
def get_webhooks(  # noqa: D102
    self, campaign_name: str
) -> list[Callable[[EmailLog, EventType, datetime], None]]:
    return self.webhooks[campaign_name]

list_campaigns

list_campaigns()
Source code in components/emailing/public/blueprint.py
def list_campaigns(self) -> list[Campaign | RecurringCampaign]:  # noqa: D102
    return list(self.list_non_recurring_campaigns()) + list(
        self.list_recurring_campaigns()
    )

list_non_recurring_campaigns

list_non_recurring_campaigns()
Source code in components/emailing/public/blueprint.py
def list_non_recurring_campaigns(self) -> list[Campaign]:  # noqa: D102
    return list(self.non_recurring_campaigns.values())

list_recurring_campaigns

list_recurring_campaigns()
Source code in components/emailing/public/blueprint.py
def list_recurring_campaigns(self) -> list[RecurringCampaign]:  # noqa: D102
    return list(self.recurring_campaigns.values())

non_recurring_campaigns instance-attribute

non_recurring_campaigns = {}

recurring_campaigns instance-attribute

recurring_campaigns = {}

register_non_recurring_campaign

register_non_recurring_campaign(non_recurring_campaign)
Source code in components/emailing/public/blueprint.py
def register_non_recurring_campaign(  # noqa: D102
    self, non_recurring_campaign: Campaign
) -> None:
    if non_recurring_campaign.campaign_name in self.non_recurring_campaigns:
        current_logger.warning(
            f"Non recurring campaign {non_recurring_campaign.campaign_name} already registered"
        )
    self.non_recurring_campaigns[non_recurring_campaign.campaign_name] = (
        non_recurring_campaign
    )
    register_event(
        EmailShootEvent.shoot_completed,
        non_recurring_campaign.on_shoot_completed,
    )
    register_event(
        EmailShootEvent.shoot_scheduled,
        non_recurring_campaign.on_shoot_scheduled,
    )
    register_event(
        EmailShootEvent.shoot_skipped,
        non_recurring_campaign.on_shoot_skipped,
    )

register_recurring_campaign

register_recurring_campaign(recurring_campaign)
Source code in components/emailing/public/blueprint.py
def register_recurring_campaign(  # noqa: D102
    self, recurring_campaign: RecurringCampaign
) -> None:
    if recurring_campaign.campaign_name in self.recurring_campaigns:
        current_logger.warning(
            f"Recurring campaign {recurring_campaign.campaign_name} already registered"
        )
    self.recurring_campaigns[recurring_campaign.campaign_name] = recurring_campaign
    register_event(
        EmailShootEvent.shoot_completed,
        recurring_campaign.on_shoot_completed,
    )
    register_event(
        EmailShootEvent.shoot_scheduled,
        recurring_campaign.on_shoot_scheduled,
    )
    register_event(
        EmailShootEvent.shoot_skipped,
        recurring_campaign.on_shoot_skipped,
    )

register_segment

register_segment(segment)
Source code in components/emailing/public/blueprint.py
def register_segment(self, segment: Segment) -> None:  # noqa: D102
    self.segments[segment.name] = segment

register_template

register_template(template)
Source code in components/emailing/public/blueprint.py
def register_template(self, template: EmailTemplate) -> None:  # type: ignore[type-arg]  # noqa: D102
    self.templates[template.name] = template

register_webhook

register_webhook(campaign_name, handler)

Register a webhook handler for a given campaign name and event type. The handler will be called with the following arguments: - email_log_id: the id of the email log - event_type: the event type - timestamp: the timestamp of the event

Source code in components/emailing/public/blueprint.py
def register_webhook(
    self,
    campaign_name: str,
    handler: Callable[[EmailLog, EventType, datetime], None],
) -> None:
    """
    Register a webhook handler for a given campaign name and event type.
    The handler will be called with the following arguments:
    - email_log_id: the id of the email log
    - event_type: the event type
    - timestamp: the timestamp of the event
    """
    self.webhooks[campaign_name].append(handler)

segments instance-attribute

segments = {}

templates instance-attribute

templates = {}

unregister_all_campaigns

unregister_all_campaigns()
Source code in components/emailing/public/blueprint.py
def unregister_all_campaigns(self) -> None:  # noqa: D102
    self.recurring_campaigns = {}
    self.non_recurring_campaigns = {}

unregister_recurring_campaign

unregister_recurring_campaign(campaign_name)
Source code in components/emailing/public/blueprint.py
def unregister_recurring_campaign(self, campaign_name: str) -> None:  # noqa: D102
    if campaign_name not in self.recurring_campaigns:
        raise ValueError(f"Recurring campaign {campaign_name} not found")
    campaign = self.recurring_campaigns[campaign_name]
    try:
        unregister_event(
            EmailShootEvent.shoot_completed, campaign.on_shoot_completed
        )
        unregister_event(
            EmailShootEvent.shoot_scheduled, campaign.on_shoot_scheduled
        )
        unregister_event(EmailShootEvent.shoot_skipped, campaign.on_shoot_skipped)
    except Exception:
        current_logger.warning(
            f"Failed to unregister events for recurring campaign {campaign_name}"
        )
    del self.recurring_campaigns[campaign_name]

webhooks instance-attribute

webhooks = defaultdict(list)

customerio_api_blueprint module-attribute

customerio_api_blueprint = Blueprint(
    "api/customerio", __name__
)

customerio_api_blueprint_record_once

customerio_api_blueprint_record_once(_)
Source code in components/emailing/public/blueprint.py
@customerio_api_blueprint.record_once
def customerio_api_blueprint_record_once(_: BlueprintSetupState) -> None:  # noqa: D103
    from components.emailing.internal.controllers.webhook import (
        customer_io_webhooks_endpoint,
    )

    customerio_api = CustomApi(customerio_api_blueprint)
    customerio_api.add_endpoint(customer_io_webhooks_endpoint)

emailing_api_blueprint module-attribute

emailing_api_blueprint = EmailingBlueprint(
    "api/emailing", __name__
)

emailing_api_blueprint_record_once

emailing_api_blueprint_record_once(_)
Source code in components/emailing/public/blueprint.py
@emailing_api_blueprint.record_once
def emailing_api_blueprint_record_once(_: BlueprintSetupState) -> None:  # noqa: D103
    from components.emailing.internal.controllers.cancel_shoot import (  # noqa: F401
        cancel_shoot_view,
    )
    from components.emailing.internal.controllers.get_campaigns import (
        emailing_campaign_endpoint,
    )
    from components.emailing.internal.controllers.get_email_logs import (  # noqa: F401
        emailing_email_log_endpoint,
        get_email_logs_view,
    )
    from components.emailing.internal.controllers.get_planning import (
        emailing_planning_endpoint,
    )
    from components.emailing.internal.controllers.get_shoot import (  # noqa: F401
        get_shoot_view,
    )
    from components.emailing.internal.controllers.patch_shoot import (  # noqa: F401
        patch_shoot_view,
    )
    from components.emailing.internal.controllers.patch_shoot_config import (
        emailing_shoot_config_endpoint,
    )
    from components.emailing.internal.controllers.preview_audience import (
        emailing_audience_endpoint,
    )
    from components.emailing.internal.controllers.preview_email import (
        emailing_preview_endpoint,
    )
    from components.emailing.internal.controllers.schedule_shoot import (  # noqa: F401
        schedule_shoot_view,
    )
    from components.emailing.internal.controllers.send_shoot import (
        emailing_shoot_endpoint,
    )
    from components.emailing.internal.controllers.upsert_shoot_custom_block import (
        emailing_shoot_custom_block_endpoint,
    )
    from components.emailing.internal.events.on_email_shoot_completed import (
        on_email_shoot_completed,
    )
    from components.emailing.public.events import (
        register_event as register_email_event,
    )

    emailing_api = CustomApi(emailing_api_blueprint)

    emailing_api.add_endpoint(emailing_campaign_endpoint)
    emailing_api.add_endpoint(emailing_email_log_endpoint)
    emailing_api.add_endpoint(emailing_planning_endpoint)
    emailing_api.add_endpoint(emailing_audience_endpoint)
    emailing_api.add_endpoint(emailing_preview_endpoint)
    emailing_api.add_endpoint(emailing_shoot_endpoint)
    emailing_api.add_endpoint(emailing_shoot_custom_block_endpoint)
    emailing_api.add_endpoint(emailing_shoot_config_endpoint)

    register_email_event(
        EmailShootEvent.shoot_completed,
        on_email_shoot_completed,
    )

record_once

record_once(_)
Source code in components/emailing/public/blueprint.py
@emailing_api_blueprint.record_once
def record_once(_: BlueprintSetupState) -> None:  # noqa: D103
    from components.emailing.external.api.events import (
        register_event,
    )
    from components.emailing.internal.webhook import dispatch_webhook

    register_event(EmailLogEvent.email_log_updated.value, dispatch_webhook)
    register_event(EmailLogEvent.email_log_created.value, dispatch_webhook)

components.emailing.public.campaign

get_campaigns

get_campaigns(
    sent_before=None, sent_after=None, with_shoot_only=False
)

Return list of - non-recurring campaigns that have been sent at least once between provided dates - recurring campaign that haven been registered

Uses the pre-aggregated TuringEmailStats table which contains daily stats for each campaign. The stats are already aggregated per day, per campaign, and per app.

Return a list of Campaign objects.

Source code in components/emailing/public/campaign.py
@tracer.wrap(service="emailing")
def get_campaigns(
    sent_before: datetime | None = None,
    sent_after: datetime | None = None,
    with_shoot_only: bool = False,
) -> list[Campaign]:
    """
    Return list of
     - non-recurring campaigns that have been sent at least once between provided dates
     - recurring campaign that haven been registered

    Uses the pre-aggregated TuringEmailStats table which contains daily stats for each campaign.
    The stats are already aggregated per day, per campaign, and per app.

    Return a list of Campaign objects.
    """
    registered_recurring_campaigns = list_recurring_campaigns()
    recurring_campaign_by_name = key_by(
        registered_recurring_campaigns, key_fn=lambda x: x.campaign_name
    )
    recurring_campaign_names = {c.campaign_name for c in registered_recurring_campaigns}

    app_name = get_current_app_name()

    # Since TuringEmailStats is already aggregated per day with distinct recipient_types,
    # we just need to sum sent_count and combine recipient_types arrays
    query = current_session.query(  # noqa: ALN085
        TuringEmailStats.campaign_name,
        func.sum(TuringEmailStats.sent_count).label("total_sent"),
    ).filter(TuringEmailStats.campaign_name.isnot(None))

    if app_name:
        query = query.filter(TuringEmailStats.app_name == app_name)

    if sent_before:
        query = query.filter(TuringEmailStats.on_date <= sent_before)
    if sent_after:
        query = query.filter(TuringEmailStats.on_date >= sent_after)

    query = query.group_by(TuringEmailStats.campaign_name)

    if with_shoot_only:
        from components.emailing.internal.models.email_shoot import EmailShootV2

        query = query.filter(
            TuringEmailStats.campaign_name.in_(
                current_session.query(EmailShootV2.campaign_name)  # noqa: ALN085
                .filter(EmailShootV2.campaign_name.isnot(None))
                .distinct()
            )
        )

    already_sent_campaigns = [
        Campaign(
            name=name,
            recipient_types=(
                [recurring_campaign_by_name[name].segment_cls.recipient_type]
                if name in recurring_campaign_names
                else []
            ),
            email_count=email_count or 0,
            is_recurring=name in recurring_campaign_names,
        )
        for name, email_count in query
    ]

    campaigns = [
        Campaign(
            name=c.campaign_name,
            recipient_types=[c.segment_cls.recipient_type],
            email_count=0,
            is_recurring=c.is_recurring,
        )
        for c in list_campaigns()
        if c.campaign_name not in [c.name for c in already_sent_campaigns]
    ]

    return already_sent_campaigns + [
        c
        for c in campaigns
        if (not with_shoot_only or c.name in recurring_campaign_names)
    ]

components.emailing.public.dependencies

COMPONENT_NAME module-attribute

COMPONENT_NAME = 'emailing'

EmailingDependency

Bases: ABC

Dependencies needed for the emailing component.

does_user_id_exist abstractmethod

does_user_id_exist(user_id)

Implement does_user_id_exist

Source code in components/emailing/public/dependencies.py
@abstractmethod
def does_user_id_exist(self, user_id: str) -> bool:
    """Implement does_user_id_exist"""

get_user_id_from_email abstractmethod

get_user_id_from_email(email)

Implement get_user_id_from_email

Source code in components/emailing/public/dependencies.py
@abstractmethod
def get_user_id_from_email(self, email: str) -> Optional[uuid.UUID | int]:
    """Implement get_user_id_from_email"""

unsubscribe_email_without_token abstractmethod

unsubscribe_email_without_token(email)

Implement unsubscribe_email_without_token

Source code in components/emailing/public/dependencies.py
@abstractmethod
def unsubscribe_email_without_token(self, email: str) -> None:
    """Implement unsubscribe_email_without_token"""

get_app_dependency

get_app_dependency()

Gets the dependency for the emailing component.

Source code in components/emailing/public/dependencies.py
def get_app_dependency() -> EmailingDependency:
    """Gets the dependency for the emailing component."""
    from flask import current_app

    return cast("CustomFlask", current_app).get_component_dependency(COMPONENT_NAME)  # type: ignore[no-any-return]

set_app_dependency

set_app_dependency(dependency)

Sets the dependency for the emailing component.

Source code in components/emailing/public/dependencies.py
def set_app_dependency(dependency: EmailingDependency) -> None:
    """Sets the dependency for the emailing component."""
    from flask import current_app

    cast("CustomFlask", current_app).add_component_dependency(
        COMPONENT_NAME, dependency
    )

components.emailing.public.email

Campaign dataclass

Campaign(
    name, recipient_types, email_count, is_recurring=False
)

email_count instance-attribute

email_count

is_recurring class-attribute instance-attribute

is_recurring = False

name instance-attribute

name

recipient_types instance-attribute

recipient_types

EmailLogMetrics dataclass

EmailLogMetrics(
    pending_count,
    sent_count,
    delivered_count,
    failed_count,
    bounced_count,
    dropped_count,
    skipped_count,
    total_count,
    opened_count,
    clicked_count,
)

bounced_count instance-attribute

bounced_count

clicked_count instance-attribute

clicked_count

delivered_count instance-attribute

delivered_count

dropped_count instance-attribute

dropped_count

failed_count instance-attribute

failed_count

opened_count instance-attribute

opened_count

pending_count instance-attribute

pending_count

sent_count instance-attribute

sent_count

skipped_count instance-attribute

skipped_count

total_count instance-attribute

total_count

batch_send_email

batch_send_email(
    recipients,
    template_name,
    template_settings,
    enqueue_as_draft=False,
    transactional_message_id=None,
    message_frequency=None,
    campaign_name=None,
    dry_run=False,
    defer_send=True,
    message_metadata=None,
    account_ref=None,
    sender_email_address=None,
    email_priority=EmailPriority.low,
    log_params=None,
)

Send email messages to a list of audience members using a registered email template. See send_email for more details.

Source code in components/emailing/public/email.py
@tracer.wrap(service="emailing")
def batch_send_email(
    recipients: list[EmailRecipient],
    template_name: str,
    template_settings: dict,  # type: ignore[type-arg]
    enqueue_as_draft: bool = False,
    transactional_message_id: str | None = None,
    message_frequency: MessageFrequency | None = None,
    campaign_name: str | None = None,
    dry_run: bool = False,
    defer_send: bool = True,
    message_metadata: dict | None = None,  # type: ignore[type-arg]
    account_ref: UUID | None = None,
    sender_email_address: str | None = None,
    email_priority: EmailPriority = EmailPriority.low,
    log_params: dict | None = None,  # type: ignore[type-arg]
) -> None:
    """
    Send email messages to a list of audience members using a registered email template. See send_email for more
    details.
    """
    for audience_member in recipients:
        send_email(
            recipient=audience_member,
            template_name=template_name,
            template_settings=template_settings,
            transactional_message_id=transactional_message_id,
            campaign_name=campaign_name,
            dry_run=dry_run,
            enqueue_as_draft=enqueue_as_draft,
            message_frequency=message_frequency,
            async_send=True,
            defer_send=defer_send,
            message_metadata=message_metadata,
            account_ref=account_ref,
            sender_email_address=sender_email_address,
            log_params=log_params,
            email_priority=email_priority,
        )
    current_logger.info(
        f"Email {template_name} sending triggered for {len(recipients)} recipients",
        template_name=template_name,
        audience_size=len(recipients),
        dry_run=dry_run,
        async_send=True,
        message_frequency=message_frequency,
        message_metadata=message_metadata,
        campaign_name=campaign_name,
        **(log_params or {}),
    )

get_email_log_metrics

get_email_log_metrics(
    is_dry_run=None,
    account_refs=None,
    sent_after=None,
    sent_before=None,
    campaign_names=None,
    message_metadata=None,
)
Source code in components/emailing/public/email.py
@tracer.wrap(service="emailing")
def get_email_log_metrics(  # noqa: D103
    is_dry_run: bool | None = None,
    account_refs: list[UUID] | None = None,
    sent_after: datetime | None = None,
    sent_before: datetime | None = None,
    campaign_names: list[str] | None = None,
    message_metadata: dict | None = None,  # type: ignore[type-arg]
) -> EmailLogMetrics:
    from components.emailing.internal.helpers import with_contains_params_query

    EmailLogSQLA = get_email_log_class()

    query = current_session.query(  # noqa: ALN085
        func.count(EmailLogSQLA.id).filter(EmailLogSQLA.state == EmailLogState.pending),
        func.count(EmailLogSQLA.id).filter(EmailLogSQLA.state == EmailLogState.sent),
        func.count(EmailLogSQLA.id).filter(
            EmailLogSQLA.state == EmailLogState.delivered
        ),
        func.count(EmailLogSQLA.id).filter(EmailLogSQLA.state == EmailLogState.skipped),
        func.count(EmailLogSQLA.opened_at),
        func.count(EmailLogSQLA.clicks),  # Count clicks
        func.count(EmailLogSQLA.id).filter(EmailLogSQLA.state == EmailLogState.bounced),
        func.count(EmailLogSQLA.id).filter(EmailLogSQLA.state == EmailLogState.failed),
        func.count(EmailLogSQLA.id).filter(EmailLogSQLA.state == EmailLogState.dropped),
        # All states
        func.count(EmailLogSQLA.id),
    )

    if sent_before:
        query = query.filter(EmailLogSQLA.created_at <= sent_before)
    if sent_after:
        query = query.filter(EmailLogSQLA.created_at >= sent_after)
    if campaign_names:
        query = query.filter(EmailLogSQLA.campaign_name.in_(campaign_names))
    if account_refs:
        query = query.filter(EmailLogSQLA.account_ref.in_(account_refs))
    if is_dry_run is not None:
        query = query.filter(EmailLogSQLA.dry_run == is_dry_run)

    if message_metadata:
        query = with_contains_params_query(
            column=EmailLogSQLA.params["message_metadata"],
            query=query,
            params=message_metadata,
        )

    (
        pending_count,
        sent_count,
        delivered_count,
        skipped_count,
        opened_count,
        clicked_count,
        bounced_count,
        failed_count,
        dropped_count,
        total_count,
    ) = query.one()

    return EmailLogMetrics(
        pending_count=pending_count,
        sent_count=sent_count,
        delivered_count=delivered_count,
        failed_count=failed_count,
        bounced_count=bounced_count,
        dropped_count=dropped_count,
        skipped_count=skipped_count,
        total_count=total_count,
        opened_count=opened_count,
        clicked_count=clicked_count,
    )

paginate_email_logs

paginate_email_logs(
    page,
    per_page=500,
    email_logs_ids=None,
    user_ids=None,
    account_refs=None,
    email_addresses=None,
    template_names=None,
    campaign_names=None,
    is_dry_run=False,
    delivery_ids=None,
    has_been_sent=None,
    has_been_delivered=None,
    has_been_opened=None,
    has_been_clicked=None,
    has_failed=None,
    state=None,
    message_metadata=None,
    sent_before=None,
    sent_after=None,
)

Returns a paginated list of email logs.

:param page: The page number to return. Starts at 1. :param per_page: The number of items per page. :param email_logs_ids: Filter by email log ids. :param account_refs: Filter by account refs. :param user_ids: Filter by recipient user ids. :param is_dry_run: Keep only dry run email logs if True. :param email_addresses: Filter by recipient email addresses. :param template_names: Filter by template names ⚠️ using this args can be slow as it's not indexed for now :param campaign_names: Filter by campaign names ⚠️ using this args can be slow as it's not indexed for now :param state : Filter by email log state. :param delivery_ids: Filter by delivery ids. :param has_been_sent: Keep only sent email logs if True. :param has_been_delivered: Keep only delivered email logs if True. :param has_been_opened: Keep only opened email logs if True. :param has_been_clicked: Keep only clicked email logs if True. :param has_failed: Keep only failed email logs if True. :param sent_before: Filter by sent date before. :param sent_after: Filter by sent date after. :param message_metadata: Filter by message metadata.

Source code in components/emailing/public/email.py
@tracer.wrap(service="emailing")
def paginate_email_logs(
    page: int,
    per_page: int = 500,
    email_logs_ids: list[str] | None = None,
    user_ids: list[str] | None = None,
    account_refs: list[UUID] | None = None,
    email_addresses: list[str] | None = None,
    template_names: list[str] | None = None,
    campaign_names: list[str] | None = None,
    is_dry_run: bool | None = False,
    delivery_ids: list[str] | None = None,
    has_been_sent: bool | None = None,
    has_been_delivered: bool | None = None,
    has_been_opened: bool | None = None,
    has_been_clicked: bool | None = None,
    has_failed: bool | None = None,
    state: EmailLogState | None = None,
    message_metadata: dict | None = None,  # type: ignore[type-arg]
    sent_before: datetime | None = None,
    sent_after: datetime | None = None,
) -> Paginate[EmailLog]:
    """
    Returns a paginated list of email logs.

    :param page: The page number to return. Starts at 1.
    :param per_page: The number of items per page.
    :param email_logs_ids: Filter by email log ids.
    :param account_refs: Filter by account refs.
    :param user_ids: Filter by recipient user ids.
    :param is_dry_run: Keep only dry run email logs if True.
    :param email_addresses: Filter by recipient email addresses.
    :param template_names: Filter by template names ⚠️ using this args can be slow as it's not indexed for now
    :param campaign_names: Filter by campaign names ⚠️ using this args can be slow as it's not indexed for now
    :param state : Filter by email log state.
    :param delivery_ids: Filter by delivery ids.
    :param has_been_sent: Keep only sent email logs if True.
    :param has_been_delivered: Keep only delivered email logs if True.
    :param has_been_opened: Keep only opened email logs if True.
    :param has_been_clicked: Keep only clicked email logs if True.
    :param has_failed: Keep only failed email logs if True.
    :param sent_before: Filter by sent date before.
    :param sent_after: Filter by sent date after.
    :param message_metadata: Filter by message metadata.
    """
    return external_log.paginate_email_logs(
        page=page,
        per_page=per_page,
        email_logs_ids=email_logs_ids,
        account_refs=account_refs,
        user_ids=user_ids,
        email_addresses=email_addresses,
        template_names=template_names,
        campaign_names=campaign_names,
        delivery_ids=delivery_ids,
        is_dry_run=is_dry_run,
        has_been_sent=has_been_sent,
        has_been_delivered=has_been_delivered,
        has_been_opened=has_been_opened,
        has_been_clicked=has_been_clicked,
        has_failed=has_failed,
        message_metadata=message_metadata,
        sent_before=sent_before,
        sent_after=sent_after,
        state=state,
    )

preview_email

preview_email(template_name, recipient, template_settings)

Preview an email using a registered email template.

Source code in components/emailing/public/email.py
@tracer.wrap(service="emailing")
def preview_email(
    template_name: str,
    recipient: EmailRecipient,
    template_settings: dict,  # type: ignore[type-arg]
) -> EmailContent:
    """
    Preview an email using a registered email template.
    """
    template: EmailTemplate = get_template(template_name)  # type: ignore[type-arg]
    email_content = template.generate_email_content(
        recipient, template_settings, preview=True
    )
    current_logger.info(
        f"Previewed email {template_name}",
        user_context=recipient.user_context,
        template_settings=template_settings,
    )
    return email_content

send_email

send_email(
    recipient,
    template_name=None,
    template_settings=None,
    campaign_name=None,
    transactional_message_id=None,
    message_metadata=None,
    message_frequency=None,
    account_ref=None,
    sender_email_address=None,
    enqueue_as_draft=False,
    async_send=False,
    defer_send=False,
    silent_failure=False,
    dry_run=False,
    log_params=None,
    email_priority=EmailPriority.low,
)

Send email to a recipient address using a registered email template.

:param recipient: The recipient of the email defined by its user_id (required for customerio), email address and user context (provided to the template to customise the email content).

:param template_name: Template name of the template to be retrieved from the template registry and use to render the email. It should not be used with template_uri.

:param template_settings: Settings that will be used to customise template rendering. Those settings will be provided to the template got from the template registry. Ignored if template_uri & template data are provided.

:param transactional_message_id: optional transactional message id that will be used to send the email through the email provider. If not provided, the default transactional message id will be used.

:param campaign_name: identifies a group of messages together. For instance, it's used to register webhooks that will be called when events are received for this campaign.

:param message_metadata: optional metadata that will be persisted in the email log

:param dry_run: if True, the email will not be sent and email log won't be created. If async_send is True, the email will be enqueued anyway.

:param enqueue_as_draft: if True, the email will be enqueued as draft in the CRM and won't be sent. The email log will be created anyway, flag as sent

:param account_ref: optional account reference to log this email in the context of an account.

:param sender_email_address: optional sender email address that will be used to send the email. Defaults to config["EMAIL_SENDER_NAME"] if None.

:param message_frequency: optional frequency limit that will be used to manage the number of emails sent to a user.

:param async_send: If True, the email will be enqueued in the queue and sent asynchronously. Queues depend on the email_priority. If False, the email will be sent synchronously.

:param defer_send: Defer the send email using side effect stack. This requires to be executed through a side_effect context manager that manage the side effect. It's True by default to force consumer to explicitly handle the side effect.

:param silent_failure: If True, any exception will be caught and logged (email log success=False is created). If False the exception will be raised.

:param log_params: Additional parameters to log (current_logger)

:param email_priority: The priority of the email. It can be high or low. High priority emails are sent in priority

This method acquires a lock on the recipient to prevent multiple emails being sent to the same recipient at the same time. Thus preventing race conditions that could lead to sending more emails than the message frequency allows.

:return: The delivery id of the email if it was sent synchronously, None otherwise.

Source code in components/emailing/public/email.py
@tracer.wrap(service="emailing")
def send_email(
    recipient: EmailRecipient,
    template_name: str | None = None,
    template_settings: dict | None = None,  # type: ignore[type-arg]
    campaign_name: str | None = None,
    transactional_message_id: str | None = None,
    message_metadata: dict | None = None,  # type: ignore[type-arg]
    message_frequency: MessageFrequency | None = None,
    account_ref: UUID | None = None,
    sender_email_address: str | None = None,
    enqueue_as_draft: bool = False,
    async_send: bool = False,
    defer_send: bool = False,
    silent_failure: bool = False,
    dry_run: bool = False,
    log_params: dict | None = None,  # type: ignore[type-arg]
    email_priority: EmailPriority = EmailPriority.low,
) -> None:
    """
    Send email to a recipient address using a registered email template.

    :param recipient: The recipient of the email defined by its user_id (required for customerio),
        email address and user context (provided to the template to customise the email content).

    :param template_name: Template name of the template to be retrieved from the template registry
        and use to render the email. It should not be used with template_uri.

    :param template_settings: Settings that will be used to customise template rendering.
        Those settings will be provided to the template got from the template registry.
        Ignored if template_uri & template data are provided.

    :param transactional_message_id: optional transactional message id that will be used to send the email
        through the email provider. If not provided, the default transactional message id will be used.

    :param campaign_name: identifies a group of messages together. For instance, it's used to register webhooks
        that will be called when events are received for this campaign.

    :param message_metadata: optional metadata that will be persisted in the email log

    :param dry_run: if True, the email will not be sent and email log won't be created.
        If async_send is True, the email will be enqueued anyway.

    :param enqueue_as_draft: if True, the email will be enqueued as draft in the CRM and won't be sent.
        The email log will be created anyway, flag as sent

    :param account_ref: optional account reference to log this email in the context of an account.

    :param sender_email_address: optional sender email address that will be used to send the email.
        Defaults to config["EMAIL_SENDER_NAME"] if None.

    :param message_frequency: optional frequency limit that will be used to manage the number of emails sent to a user.

    :param async_send: If True, the email will be enqueued in the queue and sent asynchronously. Queues depend on the
        email_priority. If False, the email will be sent synchronously.

    :param defer_send: Defer the send email using side effect stack.
        This requires to be executed through a side_effect context manager that manage the side effect.
        It's True by default to force consumer to explicitly handle the side effect.

    :param silent_failure: If True, any exception will be caught and logged (email log success=False is created).
        If False the exception will be raised.

    :param log_params: Additional parameters to log (current_logger)

    :param email_priority: The priority of the email. It can be high or low. High priority emails are sent in priority

    This method acquires a lock on the recipient to prevent multiple emails being sent to the same recipient at the same time.
    Thus preventing race conditions that could lead to sending more emails than the message frequency allows.

    :return: The delivery id of the email if it was sent synchronously, None otherwise.


    """
    from components.emailing.internal.send_email import (
        generate_and_send_email as bl_generate_and_send_email,
    )

    kwargs: dict[str, Any] = {
        "recipient": recipient,
        "template_name": template_name,
        "template_settings": template_settings,
        "transactional_message_id": transactional_message_id,
        "message_metadata": message_metadata,
        "campaign_name": campaign_name,
        "dry_run": dry_run,
        "account_ref": account_ref,
        "sender_email_address": sender_email_address,
        "message_frequency": message_frequency,
        "enqueue_as_draft": enqueue_as_draft,
        "silent_failure": silent_failure,
        "log_params": log_params,
    }

    match async_send, defer_send:
        # Defer async job
        case True, True:
            queue_name = get_emailing_queue_name(email_priority)
            defer_rq_job(queue_name, bl_generate_and_send_email, **kwargs)
        # Enqueue async job
        case True, False:
            queue_name = get_emailing_queue_name(email_priority)
            queue = current_rq.get_queue(queue_name)
            queue.enqueue(bl_generate_and_send_email, **kwargs)
        # Defer sync job
        case False, True:
            defer_call(bl_generate_and_send_email, **kwargs)
        # Send email sync
        case False, False:
            return bl_generate_and_send_email(**kwargs)
    return None

send_email_raw

send_email_raw(request)

Send email to a recipient address providing a raw email request that you can build yourself.

:param request: The email request to send

Source code in components/emailing/public/email.py
@tracer.wrap(service="emailing")
def send_email_raw(request: EmailRequest) -> MailerResponse:
    """
    Send email to a recipient address providing a raw email request that you can build yourself.

    :param request: The email request to send
    """
    mailer = CustomerIOMailer()
    log_params = {
        "campaign_name": request.campaign_name,
        "recipient_address": request.recipient.email_address,
        "user_id": request.recipient.user_ref,
    }
    current_logger.info(
        f"Sending email {request.content.template.name} to {request.recipient.user_ref}",
        **log_params,
    )
    response = mailer.send_email(request)
    external_log.create_email_log_from(response=response, request=request)
    if response.success:
        current_logger.info(
            f"Email {request.content.template.name} sent to {request.recipient.email_address}",
            delivery_id=response.delivery_id,
            **log_params,
        )
    else:
        current_logger.error(
            f"Error while sending email {request.content.template.name} to {request.recipient.email_address}",
            error=response.error,
            **log_params,
        )
    return response

components.emailing.public.entities

audience

campaign

Campaign dataclass

Campaign(
    campaign_name,
    template_cls,
    segment_cls,
    app_name,
    tags=list(),
    sending_options=EmailSendingOptions(),
    legacy_mailer=None,
    is_recurring=False,
)

Bases: DataClassJsonMixin, ABC

This class defines a campaign which is a way to send email.

app_name instance-attribute
app_name
campaign_name instance-attribute
campaign_name
is_recurring class-attribute instance-attribute
is_recurring = False
legacy_mailer class-attribute instance-attribute
legacy_mailer = None
on_email_bounced
on_email_bounced(email_log, occurred_at)

This method should be called when an email is bounced.

Source code in components/emailing/public/entities/campaign.py
def on_email_bounced(self, email_log: EmailLog, occurred_at: datetime) -> None:
    """
    This method should be called when an email is bounced.
    """
    pass
on_email_clicked
on_email_clicked(email_log, occurred_at)

This method should be called when an email is clicked.

Source code in components/emailing/public/entities/campaign.py
def on_email_clicked(self, email_log: EmailLog, occurred_at: datetime) -> None:
    """
    This method should be called when an email is clicked.
    """
    pass
on_email_delivered
on_email_delivered(email_log, occurred_at)

This method should be called when an email is delivered.

Source code in components/emailing/public/entities/campaign.py
def on_email_delivered(self, email_log: EmailLog, occurred_at: datetime) -> None:
    """
    This method should be called when an email is delivered.
    """
    pass
on_email_dropped
on_email_dropped(email_log, occurred_at)

This method should be called when an email is dropped.

Source code in components/emailing/public/entities/campaign.py
def on_email_dropped(self, email_log: EmailLog, occurred_at: datetime) -> None:
    """
    This method should be called when an email is dropped.
    """
    pass
on_email_opened
on_email_opened(email_log, occurred_at)

This method should be called when an email is opened.

Source code in components/emailing/public/entities/campaign.py
def on_email_opened(self, email_log: EmailLog, occurred_at: datetime) -> None:
    """
    This method should be called when an email is opened.
    """
    pass
on_email_requested
on_email_requested(email_log, occurred_at)

This method should be called when an email is requested.

Source code in components/emailing/public/entities/campaign.py
def on_email_requested(self, email_log: EmailLog, occurred_at: datetime) -> None:
    """
    This method should be called when an email is requested.
    """
    pass
on_email_sent
on_email_sent(email_log, occurred_at)

This method should be called when an email is sent.

Source code in components/emailing/public/entities/campaign.py
def on_email_sent(self, email_log: EmailLog, occurred_at: datetime) -> None:
    """
    This method should be called when an email is sent.
    """
    pass
on_shoot_completed
on_shoot_completed(email_shoot, occurred_at)

This method should be called when a shoot is completed (all emails have been tried to be sent).

Source code in components/emailing/public/entities/campaign.py
def on_shoot_completed(
    self, email_shoot: EmailShoot, occurred_at: datetime
) -> None:
    """
    This method should be called when a shoot is completed (all emails have been tried to be sent).
    """
    pass
on_shoot_scheduled
on_shoot_scheduled(email_shoot, occurred_at)

This method should be called when a shoot is scheduled.

Source code in components/emailing/public/entities/campaign.py
def on_shoot_scheduled(
    self, email_shoot: EmailShoot, occurred_at: datetime
) -> None:
    """
    This method should be called when a shoot is scheduled.
    """
    pass
on_shoot_skipped
on_shoot_skipped(email_shoot, occurred_at)

This method should be called when a shoot is skipped.

Source code in components/emailing/public/entities/campaign.py
def on_shoot_skipped(self, email_shoot: EmailShoot, occurred_at: datetime) -> None:
    """
    This method should be called when a shoot is skipped.
    """
    pass
on_webhook_received
on_webhook_received(email_log, event_type, occurred_at)

This method should be called when an event is received for a shoot.

Source code in components/emailing/public/entities/campaign.py
def on_webhook_received(
    self, email_log: EmailLog, event_type: EventType, occurred_at: datetime
) -> None:
    """
    This method should be called when an event is received for a shoot.
    """
    match event_type:
        case EventType.requested:
            self.on_email_requested(email_log, occurred_at)
        case EventType.sent:
            self.on_email_sent(email_log, occurred_at)
        case EventType.delivered:
            self.on_email_delivered(email_log, occurred_at)
        case EventType.bounced:
            self.on_email_bounced(email_log, occurred_at)
        case EventType.dropped:
            self.on_email_dropped(email_log, occurred_at)
        case EventType.opened:
            self.on_email_opened(email_log, occurred_at)
        case EventType.clicked:
            self.on_email_clicked(email_log, occurred_at)
        case _:
            current_logger.info(
                f"Unsupported event type {event_type} for campaign {self.campaign_name}"
            )
segment_cls instance-attribute
segment_cls
segment_name property
segment_name

Returns the segment name.

sending_options class-attribute instance-attribute
sending_options = field(default_factory=EmailSendingOptions)
tags class-attribute instance-attribute
tags = field(default_factory=list)
template_cls instance-attribute
template_cls
template_name property
template_name

Returns the template name.

EmailShootConfig dataclass

EmailShootConfig(
    config_id,
    scheduled_at,
    audience,
    message_frequency,
    template_settings=dict(),
    metadata=dict(),
    state=EmailShootState.pending,
    custom_blocks_variation_id=None,
    sender_email_address=None,
    manual_send_only=False,
)

An EmailShootConfig is a non persisted object that defines how an email shoot should be sent. You can see it as a recipe or a promise of a shoot. This is used by the recurring campaign when generating future shoots through planning.

This class encapsulates all the necessary settings and parameters required to configure and execute it, including scheduling, audience targeting, message frequency, and template customization.

:param config_id: A unique identifier for this shoot configuration.

:param scheduled_at: The date and time when the email shoot should be executed

:param audience: The target audience for this email shoot, defining who should receive the emails

:param message_frequency: Controls how often messages can be sent to the same recipient

:param template_settings: A dictionary containing custom settings for the email template

:param metadata: Additional metadata associated with this shoot configuration. Can be used to store arbitrary key-value pairs for tracking or reference

:param state: The current state of the email shoot (pending by default but you can control it)

:param custom_blocks_variation_id: Optional identifier for custom block variations in the email template. Used to test different versions of email content

:param sender_email_address: The email address that will appear as the sender. If None, defaults to the value in config["EMAIL_SENDER_NAME"]

:param manual_send_only: If True, the automatic sending of the shoot will be disabled. The shoot will wait for a manual intervention to be sent.

audience instance-attribute
audience
config_id instance-attribute
config_id
custom_blocks_variation_id class-attribute instance-attribute
custom_blocks_variation_id = None
manual_send_only class-attribute instance-attribute
manual_send_only = False
message_frequency instance-attribute
message_frequency
metadata class-attribute instance-attribute
metadata = field(default_factory=dict)
scheduled_at instance-attribute
scheduled_at
sender_email_address class-attribute instance-attribute
sender_email_address = None
state class-attribute instance-attribute
state = field(default=pending)
template_settings class-attribute instance-attribute
template_settings = field(default_factory=dict)

Planning dataclass

Planning(before, after, shoots)
after instance-attribute
after
before instance-attribute
before
get_shoots
get_shoots(states=None, is_persisted=None)

This method returns the list of shoots that match the given states and is_persisted value.

Source code in components/emailing/public/entities/campaign.py
def get_shoots(
    self,
    states: list[EmailShootState] | None = None,
    is_persisted: bool | None = None,
) -> list[EmailShoot]:
    """
    This method returns the list of shoots that match the given states and is_persisted value.
    """
    return [
        shoot
        for shoot in self.shoots
        if (states is None or shoot.state in states)
        and (is_persisted is None or shoot.is_persisted == is_persisted)
    ]
shoots instance-attribute
shoots

RecurringCampaign dataclass

RecurringCampaign(
    campaign_name,
    template_cls,
    segment_cls,
    app_name,
    tags=list(),
    sending_options=EmailSendingOptions(),
    legacy_mailer=None,
    is_recurring=True,
    enabled=True,
)

Bases: Campaign

This class defines a recurring campaign which is a way to send email regularly based on a recurrence rule, an audience, a template and a shoot planning definition.

cancel_shoot_config
cancel_shoot_config(config_id, commit)
Source code in components/emailing/public/entities/campaign.py
def cancel_shoot_config(self, config_id: str, commit: bool):  # type: ignore[no-untyped-def]  # noqa: D102
    raise NotImplementedError("This is not implemented")
complete_shoot_config
complete_shoot_config(
    config_id, shoot_id, completed_at, commit
)
Source code in components/emailing/public/entities/campaign.py
def complete_shoot_config(  # noqa: D102
    self, config_id: str, shoot_id: UUID, completed_at: datetime, commit: bool
) -> None: ...
enabled class-attribute instance-attribute
enabled = True
get_shoot_configs abstractmethod
get_shoot_configs(
    after=None,
    before=None,
    account_refs=None,
    config_id=None,
    limit=None,
)

This method should return the list of shoot configurations to be sent between the given dates.

Source code in components/emailing/public/entities/campaign.py
@abstractmethod
def get_shoot_configs(
    self,
    after: datetime | None = None,
    before: datetime | None = None,
    account_refs: list[str] | None = None,
    config_id: str | None = None,
    limit: int | None = None,
) -> list["EmailShootConfig"]:
    """
    This method should return the list of shoot configurations to be sent between the given dates.
    """
    ...
is_recurring class-attribute instance-attribute
is_recurring = True
patch_shoot_config
patch_shoot_config(
    config_id,
    commit,
    scheduled_at=NOT_SET,
    custom_blocks_variation_id=NOT_SET,
)
Source code in components/emailing/public/entities/campaign.py
def patch_shoot_config(  # type: ignore[no-untyped-def]  # noqa: D102
    self,
    config_id: str,
    commit: bool,
    scheduled_at: NotSet[datetime] = NOT_SET,
    custom_blocks_variation_id: NotSet[str | None] = NOT_SET,
):
    raise NotImplementedError("This is not implemented")
resume_shoot_config
resume_shoot_config(config_id, commit)
Source code in components/emailing/public/entities/campaign.py
def resume_shoot_config(self, config_id: str, commit: bool):  # type: ignore[no-untyped-def]  # noqa: D102
    raise NotImplementedError("This is not implemented")

custom_block

CustomBlock dataclass

CustomBlock(block_type, title, content, variation_id)

Bases: DataClassJsonMixin

block_type instance-attribute
block_type
content instance-attribute
content
default classmethod
default(variation_id, block_type)
Source code in components/emailing/public/entities/custom_block.py
@classmethod
def default(cls, variation_id: str, block_type: str):  # type: ignore[no-untyped-def]  # noqa: D102
    return cls(
        block_type=block_type,
        title=None,
        content="",
        variation_id=variation_id,
    )
from_model classmethod
from_model(model)
Source code in components/emailing/public/entities/custom_block.py
@classmethod
def from_model(cls, model: CustomBlockSQLA):  # type: ignore[no-untyped-def]  # noqa: D102
    return cls(
        block_type=model.block_type,
        title=model.title,
        content=model.content,
        variation_id=model.variation_id,
    )
title instance-attribute
title
variation_id instance-attribute
variation_id

email

EmailContent dataclass

EmailContent(
    subject,
    body,
    pre_header,
    attachments,
    template_data,
    template,
    user_context,
    template_settings,
)

This class represents the content of an email. It also contains the data that has been used to render the HTML template.

An email content is created from an email template, a user_ref, a user_context and a template settings.

attachments instance-attribute
attachments
body instance-attribute
body
pre_header instance-attribute
pre_header
subject instance-attribute
subject
template instance-attribute
template
template_args property
template_args
template_data instance-attribute
template_data
template_settings instance-attribute
template_settings
user_context instance-attribute
user_context

EmailLog dataclass

EmailLog(
    email_log_ref,
    user_id,
    email_address,
    state,
    template_name,
    user_context,
    message_metadata,
    delivery_id,
    success,
    sent_at,
    delivered_at,
    bounced_at,
    dropped_at,
    clicked,
    opened,
    created_at,
    error_message,
    error_type,
    opened_at,
    campaign_name,
    attachment_filenames,
    recipient_type,
    is_dry_run,
    account_ref,
    template_settings,
)

This class represents an email log. It is used to provide information about an email that has been sent.

account_ref instance-attribute
account_ref
attachment_filenames instance-attribute
attachment_filenames
bounced_at instance-attribute
bounced_at
campaign_name instance-attribute
campaign_name
clicked instance-attribute
clicked
created_at instance-attribute
created_at
delivered_at instance-attribute
delivered_at
delivery_id instance-attribute
delivery_id
dropped_at instance-attribute
dropped_at
email_address instance-attribute
email_address
email_log_ref instance-attribute
email_log_ref
error_message instance-attribute
error_message
error_type instance-attribute
error_type
from_model classmethod
from_model(model)
Source code in components/emailing/public/entities/email.py
@classmethod
def from_model(cls, model: "EmailLogSQLA"):  # type: ignore[no-untyped-def]  # noqa: D102
    params = model.params or {}

    return cls(
        user_id=str(model.user_id),
        email_address=model.email_address or "",
        template_name=model.mailer,
        delivery_id=model.delivery_id,
        bounced_at=model.bounced_at,
        dropped_at=model.dropped_at,
        created_at=model.created_at,
        sent_at=model.sent_at,
        success=model.success,
        clicked=bool(model.clicks),
        delivered_at=model.delivered_at,
        opened_at=model.opened_at,
        opened=bool(model.opened_at),
        user_context=params.get("user_context", {}) or {},
        message_metadata=params.get("message_metadata", {}) or {},
        template_settings=params.get("template_settings", {}) or {},
        email_log_ref=str(model.id),
        campaign_name=model.campaign_name or model.mailer,
        is_dry_run=model.dry_run,
        account_ref=model.account_ref,
        attachment_filenames=model.attachment_filenames or [],
        recipient_type=(
            RecipientType(model.recipient_type) if model.recipient_type else None
        ),
        state=EmailLogState(model.state),
        error_message=(
            model.dropped_reason
            or model.bounced_reason
            or str(model.result.get("error_string"))
            if model.result
            else None
        ),
        error_type=EmailErrors(model.error) if model.error else None,
    )
is_dry_run instance-attribute
is_dry_run
message_metadata instance-attribute
message_metadata
opened instance-attribute
opened
opened_at instance-attribute
opened_at
recipient_type instance-attribute
recipient_type
sent_at instance-attribute
sent_at
state instance-attribute
state
success instance-attribute
success
template_name instance-attribute
template_name
template_settings instance-attribute
template_settings
user_context instance-attribute
user_context
user_id instance-attribute
user_id

EmailLogState

Bases: AlanBaseEnum

This class represents the state of an email log. States are exclusive and represent the status of an email.

:param delivered: We received a webhook from the email provider that the email was delivered :param sent: We successfully generated and sent the email within the email provider :param failed: We failed to generate and send the email within the email provider :param bounced: We received a webhook from the email provider that the email bounced :param dropped: We received a webhook from the email provider that the email was dropped :param skipped: We skipped sending the email because generator raised a SkippingEmailSendingException especially used when the user message limit is reached :param pending: The email is pending to be sent, either because it's in the queue or because it's in dry run mode

bounced class-attribute instance-attribute
bounced = 'bounced'
delivered class-attribute instance-attribute
delivered = 'delivered'
dropped class-attribute instance-attribute
dropped = 'dropped'
failed class-attribute instance-attribute
failed = 'failed'
pending class-attribute instance-attribute
pending = 'pending'
sent class-attribute instance-attribute
sent = 'sent'
skipped class-attribute instance-attribute
skipped = 'skipped'

EmailRequest dataclass

EmailRequest(
    recipient,
    content,
    sending_options,
    campaign_name=None,
    message_metadata=None,
    message_frequency=None,
    account_ref=None,
)
account_ref class-attribute instance-attribute
account_ref = None
campaign_name class-attribute instance-attribute
campaign_name = None
content instance-attribute
content
message_frequency class-attribute instance-attribute
message_frequency = None
message_metadata class-attribute instance-attribute
message_metadata = None
recipient instance-attribute
recipient
sending_options instance-attribute
sending_options

EmailSendingOptions dataclass

EmailSendingOptions(
    send_to_unsubscribed=True,
    tracked=True,
    disable_message_retention=False,
    queue_draft=False,
    message_id=None,
    sender_email_address=None,
    dry_run=False,
)
default_message_id staticmethod
default_message_id()
Source code in components/emailing/public/entities/email.py
@staticmethod
def default_message_id() -> str:  # noqa: D102
    if is_production_mode():
        return "2"
    else:
        return "1"
disable_message_retention class-attribute instance-attribute
disable_message_retention = False
dry_run class-attribute instance-attribute
dry_run = False
message_id class-attribute instance-attribute
message_id = None
queue_draft class-attribute instance-attribute
queue_draft = False
send_to_unsubscribed class-attribute instance-attribute
send_to_unsubscribed = True
sender_email_address class-attribute instance-attribute
sender_email_address = None
tracked class-attribute instance-attribute
tracked = True

MessageFrequency dataclass

MessageFrequency(
    idempotency_key=None,
    idempotency_params=None,
    message_limit=1,
)

Bases: DataClassJsonMixin

This class defines the message limit settings to manage the number of emails sent to a user.

:param idempotency_key: The idempotency key used to identify the message. If None, all user message will be considered for the limit. This key will always be used in addition to the user id. :param message_limit: The limit of messages that can be sent to a user. If None, no limit will be applied.

Example
Prevent multiple message for same template to be sent to a user

limit_settings = LimitSettings(idempotency_key="my_email_template_name", limit=1)

Prevent too many messages to be sent to a user (more than 3)

limit_settings = LimitSettings(limit=3)

__post_init__
__post_init__()
Source code in components/emailing/public/entities/email.py
def __post_init__(self):  # type: ignore[no-untyped-def]  # noqa: D105
    # You can provide either an idempotency_key or idempotency_params, but not both
    if self.idempotency_key and self.idempotency_params:
        raise ValueError(
            "You can provide either an idempotency_key or idempotency_params, but not both"
        )
idempotency_key class-attribute instance-attribute
idempotency_key = None
idempotency_params class-attribute instance-attribute
idempotency_params = None
message_limit class-attribute instance-attribute
message_limit = 1
scoped_to_campaign staticmethod
scoped_to_campaign(
    campaign_name, idempotency_params, message_limit=1
)

Use this method unless you want a limit that applies cross campaigns.

Source code in components/emailing/public/entities/email.py
@staticmethod
def scoped_to_campaign(
    campaign_name: str, idempotency_params: dict[str, Any], message_limit: int = 1
) -> "MessageFrequency":
    """
    Use this method unless you want a limit that applies cross campaigns.
    """
    return MessageFrequency(
        message_limit=message_limit,
        idempotency_params={"campaign_name": campaign_name, **idempotency_params},
    )

error

EmailingErrorCode

Bases: BaseErrorCode

Represents a business errors (and not technical ones) If raised - it returns 4xx if HTTP context (no sentry) - it still make jobs failed in async context (if not caught)

component class-attribute instance-attribute
component = 'emailing'
http_code class-attribute instance-attribute
http_code = 400
template_settings_not_found staticmethod
template_settings_not_found(missing_key)

Error raised when a required key is missing in template settings when rendering an email

Source code in components/emailing/public/entities/error.py
@staticmethod
def template_settings_not_found(missing_key: str) -> "BaseErrorCode":
    """
    Error raised when a required key is missing in template settings when rendering an email
    """
    return EmailingErrorCode(
        1600,
        "emailing_template_settings_not_found",
        f"Bad template settings '{missing_key}' is required",
    )

recipient

EmailRecipient dataclass

EmailRecipient(
    user_ref,
    email_address,
    user_context=dict(),
    recipient_type=RecipientType.member,
)

Bases: DataClassJsonMixin

This class represents a member of an audience. It is used to provide information to send email to a specific user.

:param user_ref: The user id of the audience member :param email_address: The email address of the audience member :param user_context: The user context of the audience member

email_address instance-attribute
email_address
recipient_type class-attribute instance-attribute
recipient_type = member
user_context class-attribute instance-attribute
user_context = field(default_factory=dict)
user_ref instance-attribute
user_ref

RecipientType

Bases: AlanBaseEnum

This class represents the type of recipient. It is used to provide information to send email to a specific user.

admin class-attribute instance-attribute
admin = 'admin'
member class-attribute instance-attribute
member = 'member'

segment

Audience dataclass

Audience(segment, settings, limit=None)

Audience is list the users that will receive an email. It is defined by a segment and settings to filter the segment.

__eq__
__eq__(other)
Source code in components/emailing/public/entities/segment.py
def __eq__(self, other: object):  # type: ignore[no-untyped-def]  # noqa: D105
    if isinstance(other, Audience):
        return (
            self.segment.name == other.segment.name
            and self.settings == other.settings
            and self.limit == other.limit
        )
    return False
limit class-attribute instance-attribute
limit = None
list_recipients
list_recipients()
Source code in components/emailing/public/entities/segment.py
def list_recipients(self) -> list[EmailRecipient]:  # noqa: D102
    audience = self.segment.list_recipients(self.settings)[: self.limit]
    current_logger.info(
        f"Found {len(audience)} recipients on segment {self.segment.name}",
        segment_name=self.segment.name,
        settings=self.settings,
        limit=self.limit,
    )

    return audience
paginate_recipients
paginate_recipients()
Source code in components/emailing/public/entities/segment.py
def paginate_recipients(self) -> Paginate[EmailRecipient]:  # noqa: D102
    pagination = self.segment.paginate_recipients(self.settings)
    current_logger.info(
        f"Found {pagination.items_count} audience members on segment {self.segment.name}",
        segment_name=self.segment.name,
        settings=self.settings,
    )
    return pagination
segment instance-attribute
segment
settings instance-attribute
settings

Segment

Bases: ABC

This class defines a segment. It is used to provide a list of audience members

Example

class MySegment(Segment): name = "my_segment"

def get_query(self, settings: dict, limit: int | None = None) -> Query:
    return User.query.filter(User.is_active == True)

def to_recipient(self, model: User) -> EmailRecipient:
    return EmailRecipient(
        user_ref=model.id,
        email_address=model.email,
        user_context={"account_ref": model.account_ref},
    )
get_query
get_query(settings, limit=None)
Source code in components/emailing/public/entities/segment.py
def get_query(self, settings: dict, limit: int | None = None) -> "Query":  # type: ignore[type-arg]  # noqa: D102
    raise NotImplementedError("get_query should be implemented")
list_recipients
list_recipients(settings, limit=None)
Source code in components/emailing/public/entities/segment.py
def list_recipients(  # noqa: D102
    self,
    settings: dict,  # type: ignore[type-arg]
    limit: int | None = None,
) -> list[EmailRecipient]:
    return [self.to_recipient(model) for model in self.get_query(settings, limit)]
name instance-attribute
name
paginate_recipients
paginate_recipients(settings)
Source code in components/emailing/public/entities/segment.py
def paginate_recipients(self, settings: dict) -> Paginate[EmailRecipient]:  # type: ignore[type-arg]  # noqa: D102
    return Paginate.from_query(
        query=self.get_query(settings),
        per_page=self._per_page,
        on_item=self.to_recipient,
    )
recipient_type class-attribute instance-attribute
recipient_type = member
to_recipient
to_recipient(model)
Source code in components/emailing/public/entities/segment.py
def to_recipient(self, model: Any) -> EmailRecipient:  # noqa: D102
    raise NotImplementedError("to_recipient should be implemented")

shoot

EmailShoot dataclass

EmailShoot(
    shoot_id,
    scheduled_at,
    state,
    started_at,
    cancelled_at,
    completed_at,
    updated_at,
    recipient_count,
    sent_count,
    delivered_count,
    opened_count,
    clicked_count,
    failed_count,
    audience,
    template,
    template_settings,
    campaign_name,
    sender_email_address,
    message_frequency,
    transactional_message_id,
    enqueue_as_draft,
    dry_run,
    metadata,
    is_persisted,
    app_name,
    tags=list(),
    custom_blocks_variation_id=None,
    manual_send_only=True,
)

This class represents an email shoot. It is used to schedule email sending.

A scheduled shoot is about sending messages (emails) defined by a template to a segment of users defined by an audience. The audience is defined by a segment and settings to filter the segment.

app_name instance-attribute
app_name
audience instance-attribute
audience
campaign_name instance-attribute
campaign_name
cancelled_at instance-attribute
cancelled_at
clicked_count instance-attribute
clicked_count
completed_at instance-attribute
completed_at
custom_blocks_variation_id class-attribute instance-attribute
custom_blocks_variation_id = None
delivered_count instance-attribute
delivered_count
dry_run instance-attribute
dry_run
enqueue_as_draft instance-attribute
enqueue_as_draft
failed_count instance-attribute
failed_count
from_model classmethod
from_model(model)
Source code in components/emailing/public/entities/shoot.py
@classmethod
def from_model(cls, model: "EmailShootSQLA"):  # type: ignore[no-untyped-def]  # noqa: D102
    from components.emailing.public.registry import (
        get_segment,
        get_template,
    )

    return cls(
        shoot_id=model.shoot_id,
        scheduled_at=model.scheduled_at,
        started_at=model.started_at,
        cancelled_at=model.cancelled_at,
        completed_at=model.completed_at,
        updated_at=model.updated_at,
        sent_count=model.sent_count,
        recipient_count=model.recipient_count,
        delivered_count=model.delivered_count,
        opened_count=model.opened_count,
        clicked_count=model.clicked_count,
        failed_count=model.failed_count,
        audience=Audience(
            segment=get_segment(name=model.segment_name),
            settings=model.audience_settings or {},
            limit=model.audience_limit,
        ),
        template=get_template(name=model.template_name),
        template_settings=model.template_settings or {},
        message_frequency=MessageFrequency(
            idempotency_key=model.idempotency_key,
            message_limit=model.max_message_per_key,
            idempotency_params=model.idempotency_params,
        ),
        state=EmailShootState(model.state),
        transactional_message_id=model.transactional_message_id,
        campaign_name=model.campaign_name,
        enqueue_as_draft=model.enqueue_as_draft,
        dry_run=model.dry_run,
        metadata=model._metadata or {},  # noqa: ALN027
        tags=model.tags,
        is_persisted=True,
        custom_blocks_variation_id=model.custom_blocks_variation_id,
        sender_email_address=model.sender_email_address,
        app_name=AppName(model.app_name),
    )
is_persisted instance-attribute
is_persisted
manual_send_only class-attribute instance-attribute
manual_send_only = True
message_frequency instance-attribute
message_frequency
metadata instance-attribute
metadata
opened_count instance-attribute
opened_count
recipient_count instance-attribute
recipient_count
scheduled_at instance-attribute
scheduled_at
sender_email_address instance-attribute
sender_email_address
sent_count instance-attribute
sent_count
shoot_id instance-attribute
shoot_id
started_at instance-attribute
started_at
state instance-attribute
state
tags class-attribute instance-attribute
tags = field(default_factory=list)
template instance-attribute
template
template_settings instance-attribute
template_settings
transactional_message_id instance-attribute
transactional_message_id
updated_at instance-attribute
updated_at

SkippingEmailSendingException

Bases: Exception

This exception should be raised when an email should not be sent.

template

CustomerioManagedEmailTemplate

Bases: EmailTemplate

This class defines a customerio-managed email template. It means that the HTML file is hosted in customerio

generate_email_content
generate_email_content(
    recipient, template_settings, preview=False
)
Source code in components/emailing/public/entities/template.py
def generate_email_content(  # noqa: D102
    self,
    recipient: EmailRecipient,
    template_settings: dict[str, Any],
    preview: bool = False,
) -> "EmailContent":
    settings = TemplateSettings(template_settings)
    template_data = self.generator.get_template_data(recipient, settings)
    return EmailContent(
        subject=None,
        body=None,
        pre_header=None,
        attachments=self.generator.get_attachments(recipient, settings),  # type: ignore[arg-type]
        template_data={**template_data.to_dict(), "IS_PREVIEW": preview},
        template=self,
        user_context=recipient.user_context,
        template_settings=template_settings,
    )
transaction_message_id instance-attribute
transaction_message_id

EmailDataGenerator

Bases: ABC, Generic[U]

This class defines a data generator. It is used to provide the data to render an email template. Data template should be provided by a class that inherits from TemplateData. It should provide data no matter the template is self-managed or cio-managed

get_attachments
get_attachments(recipient, settings)

Attachments should be provided as a dict with the filename as key and the content as value, or as a list of dict with the filename, content as keys.

Source code in components/emailing/public/entities/template.py
def get_attachments(
    self,
    recipient: EmailRecipient,  # noqa: ARG002
    settings: dict[str, Any],  # noqa: ARG002
) -> dict[str, str] | list[dict[str, str]] | None:
    """
    Attachments should be provided as a dict with the filename as key and the content as value,
    or as a list of dict with the `filename`, `content` as keys.
    """
    return None
get_template_data abstractmethod
get_template_data(recipient, template_settings)
Source code in components/emailing/public/entities/template.py
@abstractmethod
def get_template_data(  # noqa: D102
    self,
    recipient: EmailRecipient,
    template_settings: dict[str, Any] | TemplateSettings,
) -> U: ...

EmailTemplate

Bases: ABC, Generic[U]

This class defines an email template. It can be self-managed or cio-managed. We expect the subclasses to implement the methods to email_content.

Email template should be registered using the register_template function.

custom_block_types class-attribute instance-attribute
custom_block_types = None
generate_email_content abstractmethod
generate_email_content(
    recipient, template_settings, preview=False
)
Source code in components/emailing/public/entities/template.py
@abstractmethod
def generate_email_content(  # noqa: D102
    self,
    recipient: EmailRecipient,
    template_settings: dict,  # type: ignore[type-arg]
    preview: bool = False,
) -> "EmailContent": ...
generator instance-attribute
generator
name instance-attribute
name
title instance-attribute
title

SelfManagedEmailTemplate

Bases: EmailTemplate

This class defines a self-managed email template. It means that the HTML file is hosted in our backend stored in the template file uri. It relies on a generator to provide the data to render the template.

It should also implement a method to provide the subject of the email.

The subclasses of this class should be registered using the register_template function.

:param title: The title of the template :param name: The unique name of the template :param uri: The uri of the HTML template file :param generator: The generator that will provide the data to render the template

__eq__
__eq__(other)
Source code in components/emailing/public/entities/template.py
def __eq__(self, other: object):  # type: ignore[no-untyped-def]  # noqa: D105
    if isinstance(other, SelfManagedEmailTemplate):
        return self.name == other.name and self.uri == other.uri
    return False
generate_email_content
generate_email_content(
    recipient, template_settings, preview=False
)
Source code in components/emailing/public/entities/template.py
def generate_email_content(  # noqa: D102
    self,
    recipient: EmailRecipient,
    template_settings: dict[str, Any] | TemplateSettings,
    preview: bool = False,
) -> "EmailContent":
    from components.emailing.internal.custom_block import (
        list_custom_blocks,
    )

    current_logger.info(
        f"Generating email content for template {self.name}",
        template_name=self.name,
        user_context=recipient.user_context,
        template_settings=template_settings,
    )

    settings = TemplateSettings(template_settings)
    template_data = self.generator.get_template_data(recipient, settings)

    # TODO: Once all templates are migrated to TemplateData, we should stop using a dict here and instead add CUSTOM_BLOCKS & HAS_ATTACHMENTS to the TemplateData class
    additional_template_args: dict[str, Any] = {}
    if self.custom_block_types and "custom_blocks_variation_id" in settings:
        blocks = list_custom_blocks(
            variation_ids=[settings["custom_blocks_variation_id"]]
        )
        additional_template_args["CUSTOM_BLOCKS"] = {
            block.block_type: asdict(block) for block in blocks
        }
    attachments = self.generator.get_attachments(recipient, settings)

    additional_template_args["HAS_ATTACHMENTS"] = bool(attachments)
    template_data = {
        **(
            template_data
            if isinstance(template_data, dict)
            else template_data.to_dict()
        ),
        **additional_template_args,
    }
    return EmailContent(
        subject=self.get_subject(template_data),
        body=self.get_body(template_data, IS_PREVIEW=preview),
        pre_header=self.get_pre_header(template_data),
        attachments=(
            convert_attachments_to_dict(attachments)
            if isinstance(attachments, list)
            else attachments
        ),
        template_data=template_data,
        template=self,
        user_context=recipient.user_context,
        template_settings=template_settings,
    )
get_body
get_body(template_data, **kwargs)

This method should return the body of the email

Source code in components/emailing/public/entities/template.py
def get_body(self, template_data: dict | TemplateData, **kwargs) -> str:  # type: ignore[type-arg, no-untyped-def]
    """
    This method should return the body of the email
    """
    if isinstance(template_data, TemplateData):
        template_data = template_data.to_dict()

    # add kwargs
    template_data.update(kwargs)

    return render_email_template(
        template_name=self.uri,
        template_args=template_data,
    )
get_pre_header
get_pre_header(template_data)
Source code in components/emailing/public/entities/template.py
def get_pre_header(self, template_data: TemplateData | dict) -> str | None:  # type: ignore[type-arg]  # noqa: ARG002, D102
    return None
get_subject
get_subject(template_data)

This method should return the subject of the email that should be defined as a jinja block in the HTML template file by default

Source code in components/emailing/public/entities/template.py
def get_subject(self, template_data: TemplateData | dict) -> str:  # type: ignore[type-arg]
    """
    This method should return the subject of the email that should be defined as a
    jinja block in the HTML template file by default
    """
    if isinstance(template_data, TemplateData):
        template_data = template_data.to_dict()

    return mandatory(
        render_email_subject(
            template_name=self.uri,
            template_args=template_data,
        )
    )
uri instance-attribute
uri

TemplateData

Bases: DataClassJsonMixin

This class should represent the data that will be used to render the HTML template.

TemplateSettings

TemplateSettings(settings)

Bases: dict[str, Any]

A wrapper around template settings dictionary to safely handle missing keys. Inherits from dict to maintain backward compatibility with existing code.

Initialize the TemplateSettings object with the provided settings.

Source code in components/emailing/public/entities/template.py
def __init__(self, settings: dict[str, Any]) -> None:
    """
    Initialize the TemplateSettings object with the provided settings.
    """
    super().__init__(settings)
__getitem__
__getitem__(key)

Get a required value from template settings, raising a business error if missing.

Source code in components/emailing/public/entities/template.py
def __getitem__(self, key: str) -> Any:
    """
    Get a required value from template settings, raising a business error if missing.
    """
    try:
        return super().__getitem__(key)
    except KeyError:
        raise EmailingErrorCode.template_settings_not_found(key)

U module-attribute

U = TypeVar('U', bound=Union[dict[str, Any], TemplateData])

components.emailing.public.enums

EmailShootEvent

Bases: AlanBaseEnum

Enumeration for the different events of an email shoot.

shoot_completed class-attribute instance-attribute

shoot_completed = 'shoot_completed'

shoot_scheduled class-attribute instance-attribute

shoot_scheduled = 'shoot_scheduled'

shoot_skipped class-attribute instance-attribute

shoot_skipped = 'shoot_skipped'

EmailShootState

Bases: AlanBaseEnum

Enumeration for the different states of an email shoot.

States
  • pending: Shoot not yet created but only configured defined through recurring campaign.
  • scheduled: Shoot is created and scheduled to be sent.
  • sending: Shoot is being sent (send_shoot has been called).
  • completed: All emails have been tried to be sent (completed does not mean all emails have been sent or delivered).
  • cancelled: Shoot has been cancelled and won't be sent.
  • skipped: Shoot is skipped when audience is empty.

cancelled class-attribute instance-attribute

cancelled = 'cancelled'

completed class-attribute instance-attribute

completed = 'completed'

failed class-attribute instance-attribute

failed = 'failed'

pending class-attribute instance-attribute

pending = 'pending'

scheduled class-attribute instance-attribute

scheduled = 'scheduled'

sending class-attribute instance-attribute

sending = 'sending'

skipped class-attribute instance-attribute

skipped = 'skipped'

components.emailing.public.events

EmailLogEvent

Bases: AlanBaseEnum

email_log_created class-attribute instance-attribute

email_log_created = 'email_log_created'

email_log_updated class-attribute instance-attribute

email_log_updated = 'email_log_updated'

event_handler module-attribute

event_handler = EventHandler(
    supported_events=[
        shoot_completed,
        shoot_skipped,
        shoot_scheduled,
    ]
)

register_email_log_event

register_email_log_event(event_name, handler)
Source code in components/emailing/public/events.py
def register_email_log_event(event_name: str, handler: Callable) -> None:  # type: ignore[type-arg]  # noqa: D103
    from components.emailing.external.api.events import (
        register_event as internal_register_event,
    )

    internal_register_event(event_name, handler)

register_event

register_event(event_name, handler)

Register an event handler for the given event name.

Source code in components/emailing/public/events.py
def register_event(
    event_name: str, handler: Callable[[EmailShoot, datetime], None]
) -> None:
    """
    Register an event handler for the given event name.
    """
    event_handler.register_events(**{event_name: handler})

unregister_event

unregister_event(event_name, handler)

Unregister an event handler for the given event name

Source code in components/emailing/public/events.py
def unregister_event(
    event_name: str, handler: Callable[[EmailShoot, datetime], None]
) -> None:
    """
    Unregister an event handler for the given event name
    """
    event_handler.unregister_events(**{event_name: handler})

components.emailing.public.recurring

RECURRING_CAMPAIGN_TAG module-attribute

RECURRING_CAMPAIGN_TAG = 'recurring'

cancel_shoot_config

cancel_shoot_config(campaign_name, config_id, commit=True)
Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def cancel_shoot_config(  # noqa: D103
    campaign_name: str, config_id: str, commit: bool = True
) -> None:
    from components.emailing.public.registry import get_recurring_campaign

    campaign = get_recurring_campaign(campaign_name)
    campaign.cancel_shoot_config(config_id, commit=commit)
    current_logger.info(
        "Shoot config cancelled", campaign_name=campaign_name, config_id=config_id
    )

get_pending_shoot

get_pending_shoot(campaign_name, config_id)
Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def get_pending_shoot(  # noqa: D103
    campaign_name: str,
    config_id: str,
) -> EmailShoot:
    from components.emailing.public.registry import get_recurring_campaign

    campaign = get_recurring_campaign(campaign_name)
    config = one(
        campaign.get_shoot_configs(
            config_id=config_id,
        )
    )
    return shoot_from_config(config, campaign)

get_planning

get_planning(
    campaign_name,
    before,
    after,
    account_refs=None,
    user_ids=None,
    config_id=None,
    states=None,
    limit=None,
)

Returns the planning of a recurring campaign

:param campaign_name: The name of the recurring campaign to get the planning from :param config_id: The config_id to filter the planning on :param account_refs: The account_refs to filter the planning on :param user_ids: The user_ids to filter the planning on :param before: Maximum date of the planning. Required. Avoid datetime.max() to avoid performance issues. :param after: Minimum date of the planning :param states: The shoot states to filter the planning on :param limit: Limit

Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def get_planning(
    campaign_name: str | None,
    before: datetime,
    after: datetime,
    account_refs: list[str] | None = None,
    user_ids: list[str] | None = None,
    config_id: str | None = None,
    states: list[EmailShootState] | None = None,
    limit: int | None = None,
) -> Planning:
    """
    Returns the planning of a recurring campaign

    :param campaign_name: The name of the recurring campaign to get the planning from
    :param config_id: The config_id to filter the planning on
    :param account_refs: The account_refs to filter the planning on
    :param user_ids: The user_ids to filter the planning on
    :param before: Maximum date of the planning. Required. Avoid datetime.max() to avoid performance issues.
    :param after: Minimum date of the planning
    :param states: The shoot states to filter the planning on
    :param limit: Limit
    """
    from components.emailing.public.registry import (
        get_campaign,
    )

    if campaign_name:
        campaigns: list[Campaign | RecurringCampaign] = [
            get_campaign(campaign_name=campaign_name)
        ]
    else:
        campaigns = list_recurring_campaigns()  # type: ignore[assignment]

    metadata = {}
    if config_id:
        metadata["config_id"] = config_id

    shoots = list_shoots(
        scheduled_before=before,
        scheduled_after=after,
        campaign_name=campaign_name,
        account_refs=account_refs,
        user_ids=user_ids,
        metadata=metadata if metadata else None,
        states=states,
        limit=limit,
    )

    pending_shoots = []
    if states is None or EmailShootState.pending in states:
        for campaign in campaigns:
            if not campaign.is_recurring:
                continue

            configs = [
                config
                for config in cast("RecurringCampaign", campaign).get_shoot_configs(
                    after=after,
                    before=before,
                    account_refs=account_refs,
                    config_id=config_id,
                    limit=limit,
                )
                if after
                <= config.scheduled_at
                <= before  # Filter out configs that are not in the planning range if any
            ]

            for config in configs:
                if (
                    account_refs
                    and config.metadata.get("account_id") not in account_refs
                ):
                    continue

                pending_shoots.append(
                    shoot_from_config(config, cast("RecurringCampaign", campaign))
                )

    # Remove configs that are already persisted (actual shoot with the same config_id)
    persisted_shoots = list_shoots(
        shoot_ids=[shoot.shoot_id for shoot in pending_shoots],
    )
    persisted_shoot_ids = {shoot.shoot_id for shoot in persisted_shoots}
    pending_shoots = [
        shoot for shoot in pending_shoots if shoot.shoot_id not in persisted_shoot_ids
    ]

    return Planning(
        before=before,
        after=after,
        shoots=pending_shoots + shoots,
    )

patch_shoot_config

patch_shoot_config(
    campaign_name,
    config_id,
    scheduled_at=NOT_SET,
    custom_blocks_variation_id=NOT_SET,
    commit=True,
)
Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def patch_shoot_config(  # noqa: D103
    campaign_name: str,
    config_id: str,
    scheduled_at: NotSet[datetime] = NOT_SET,
    custom_blocks_variation_id: NotSet[str | None] = NOT_SET,
    commit: bool = True,
) -> None:
    from components.emailing.public.registry import get_recurring_campaign

    campaign = get_recurring_campaign(campaign_name)
    campaign.patch_shoot_config(
        config_id,
        scheduled_at=scheduled_at,
        commit=commit,
        custom_blocks_variation_id=custom_blocks_variation_id,
    )
    current_logger.info(
        "Shoot config patched", campaign_name=campaign_name, config_id=config_id
    )

process_pending_shoots

process_pending_shoots(
    on_date,
    backfill_days=0,
    campaign_name=None,
    account_ids=None,
    user_ids=None,
    silent_failure=True,
    commit=True,
)

Schedule and send all shoots that need to be sent for recurring campaigns.

Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def process_pending_shoots(
    on_date: datetime,
    backfill_days: int = 0,
    campaign_name: str | None = None,
    account_ids: list[str] | None = None,
    user_ids: list[str] | None = None,
    silent_failure: bool = True,
    commit: bool = True,
) -> None:
    """Schedule and send all shoots that need to be sent for recurring campaigns."""
    shoots = schedule_pending_shoots(
        scheduled_before=on_date,
        backfill_days=backfill_days,
        campaign_name=campaign_name,
        account_ids=account_ids,
        user_ids=user_ids,
        silent_failure=silent_failure,
        commit=commit,
    )
    batch_send_shoot(
        shoot_ids=[shoot.shoot_id for shoot in shoots],
        async_shoot_send=True,
        commit=commit,
    )
    current_logger.info(
        f"Processed {len(shoots)} shoots for recurring campaign",
        campaign_name=campaign_name,
        shoot_count=len(shoots),
    )

resume_shoot_config

resume_shoot_config(campaign_name, config_id, commit=True)
Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def resume_shoot_config(  # noqa: D103
    campaign_name: str, config_id: str, commit: bool = True
) -> None:
    from components.emailing.public.registry import get_recurring_campaign

    campaign = get_recurring_campaign(campaign_name)
    campaign.resume_shoot_config(config_id, commit=commit)
    current_logger.info(
        "Shoot config resume", campaign_name=campaign_name, config_id=config_id
    )

schedule_pending_shoots

schedule_pending_shoots(
    scheduled_before,
    scheduled_after=None,
    backfill_days=None,
    campaign_name=None,
    account_ids=None,
    user_ids=None,
    silent_failure=True,
    commit=True,
)

Schedule all pending shoots for a recurring campaign at the given date.

Schedule only if the campaign is enabled.

:param campaign_name: The name of the recurring campaign to process :param scheduled_before: The date to process the recurring campaign on :param scheduled_after: The date to start the backfill from :param backfill_days: The number of days to backfill. If 0, only the shoots that are scheduled on the given date will be sent (if not already sent). :param silent_failure: If True, the function will not raise an exception if a shoot fails to be sent only logs :param account_ids: The account_refs to filter the planning on :param user_ids: The user_ids to filter the planning on :param commit: If True, the shoots will be scheduled and shoot.

Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def schedule_pending_shoots(
    scheduled_before: datetime,
    scheduled_after: datetime | None = None,
    backfill_days: int | None = None,
    campaign_name: str | None = None,
    account_ids: list[str] | None = None,
    user_ids: list[str] | None = None,
    silent_failure: bool = True,
    commit: bool = True,
) -> list[EmailShoot]:
    """
    Schedule all pending shoots for a recurring campaign at the given date.

    Schedule only if the campaign is enabled.

    :param campaign_name: The name of the recurring campaign to process
    :param scheduled_before: The date to process the recurring campaign on
    :param scheduled_after: The date to start the backfill from
    :param backfill_days: The number of days to backfill. If 0, only the shoots that are scheduled on the given date
        will be sent  (if not already sent).
    :param silent_failure: If True, the function will not raise an exception if a shoot fails to be sent only logs
    :param account_ids: The account_refs to filter the planning on
    :param user_ids: The user_ids to filter the planning on
    :param commit: If True, the shoots will be scheduled and shoot.
    """
    from components.emailing.internal.custom_block import list_custom_blocks

    if backfill_days is not None and scheduled_after is not None:
        raise ValueError("Cannot provide both backfill_days and scheduled_after")

    if backfill_days is not None:
        scheduled_after = scheduled_before - relativedelta(
            days=backfill_days, hour=0, minute=0, second=0
        )
    elif scheduled_after is None:
        scheduled_after = scheduled_before - relativedelta(
            days=0, hour=0, minute=0, second=0
        )

    current_logger.info(
        f"Triggering emails for recurring campaign between {scheduled_after} and {scheduled_before}",
        campaign_name=campaign_name,
        scheduled_after=scheduled_after,
        scheduled_before=scheduled_before,
    )

    planning = get_planning(
        campaign_name=campaign_name,
        after=scheduled_after,
        before=scheduled_before,
        account_refs=account_ids,
        user_ids=user_ids,
    )

    pending_shoots = planning.get_shoots(states=[EmailShootState.pending])

    current_logger.info(
        f"Found {len(pending_shoots)} shoots to send",
        campaign_name=campaign_name,
        shoot_count=len(pending_shoots),
    )

    all_custom_blocks = list_custom_blocks(
        variation_ids=uniquify(
            compact([shoot.custom_blocks_variation_id for shoot in pending_shoots])
        )
    )

    current_logger.info(
        f"Found {len(all_custom_blocks)} custom blocks",
        campaign_name=campaign_name,
        custom_block_count=len(all_custom_blocks),
    )

    custom_blocks_by_variation_id = group_by(
        all_custom_blocks, lambda custom_block: custom_block.variation_id
    )

    scheduled_shoots = []

    for shoot in pending_shoots:
        try:
            campaign = get_recurring_campaign(
                campaign_name=mandatory(shoot.campaign_name)
            )

            if not campaign.enabled:
                current_logger.info(
                    "Campaign is disabled - skipping",
                    campaign_name=campaign_name,
                    shoot_id=shoot.shoot_id,
                )
                continue

            if shoot.manual_send_only:
                current_logger.warning(
                    "Shoot is not set to be auto-sent - skipping",
                    campaign_name=campaign_name,
                    shoot_id=shoot.shoot_id,
                )
                continue

            custom_blocks = (
                custom_blocks_by_variation_id.get(shoot.custom_blocks_variation_id)
                if shoot.custom_blocks_variation_id
                else None
            )
            scheduled_shoot = schedule_shoot(
                shoot_id=shoot.shoot_id,
                scheduled_at=shoot.scheduled_at,
                audience=shoot.audience,
                template=campaign.template_cls,
                template_settings=shoot.template_settings,
                transactional_message_id=shoot.transactional_message_id,
                tags=[RECURRING_CAMPAIGN_TAG] + campaign.tags,
                message_frequency=shoot.message_frequency,
                campaign_name=campaign.campaign_name,
                metadata=shoot.metadata,
                sender_email_address=(
                    shoot.sender_email_address
                    if shoot.sender_email_address
                    else campaign.sending_options.sender_email_address
                ),
                enqueue_as_draft=campaign.sending_options.queue_draft,
                dry_run=campaign.sending_options.dry_run,
                commit=commit,
                custom_blocks=custom_blocks,
                send_if_scheduled_in_past=False,
            )
            scheduled_shoots.append(scheduled_shoot) if scheduled_shoot else None

        except Exception as e:
            current_session.rollback()
            if silent_failure:
                current_logger.exception(
                    "Failed to send shoot",
                    campaign_name=campaign_name,
                    shoot_id=shoot.shoot_id,
                )
            else:
                raise e

    return scheduled_shoots

shoot_from_config

shoot_from_config(config, campaign)
Source code in components/emailing/public/recurring.py
@tracer.wrap(service="emailing")
def shoot_from_config(  # noqa: D103
    config: EmailShootConfig, campaign: RecurringCampaign
) -> EmailShoot:
    # generate stable uuid from config.config_id (str)
    shoot_id = uuid.uuid5(uuid.NAMESPACE_DNS, config.config_id)
    return EmailShoot(
        shoot_id=shoot_id,
        scheduled_at=config.scheduled_at,
        state=config.state,
        started_at=None,
        cancelled_at=None,
        completed_at=None,
        updated_at=None,
        recipient_count=None,
        sent_count=0,
        delivered_count=0,
        opened_count=0,
        clicked_count=0,
        failed_count=0,
        audience=config.audience,
        template=campaign.template_cls(),
        template_settings=config.template_settings,
        campaign_name=campaign.campaign_name,
        message_frequency=config.message_frequency,
        transactional_message_id=campaign.sending_options.message_id
        or campaign.sending_options.default_message_id(),
        enqueue_as_draft=campaign.sending_options.queue_draft,
        metadata={
            **config.metadata,
            "config_id": config.config_id,
        },
        tags=campaign.tags + [RECURRING_CAMPAIGN_TAG],
        dry_run=campaign.sending_options.dry_run,
        is_persisted=False,
        custom_blocks_variation_id=config.custom_blocks_variation_id,
        sender_email_address=(
            config.sender_email_address
            if config.sender_email_address
            else campaign.sending_options.sender_email_address
        ),
        manual_send_only=config.manual_send_only,
        app_name=AppName.ALAN_FR,
    )

components.emailing.public.registry

get_campaign

get_campaign(campaign_name)
Source code in components/emailing/public/registry.py
def get_campaign(campaign_name: str) -> Campaign:  # noqa: D103
    if has_recurring_campaign(campaign_name):
        return get_recurring_campaign(campaign_name)

    return get_non_recurring_campaign(campaign_name)

get_non_recurring_campaign

get_non_recurring_campaign(campaign_name)
Source code in components/emailing/public/registry.py
def get_non_recurring_campaign(  # noqa: D103
    campaign_name: str,
) -> Campaign:
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.get_non_recurring_campaign(
        campaign_name=campaign_name
    )

get_recurring_campaign

get_recurring_campaign(campaign_name)
Source code in components/emailing/public/registry.py
def get_recurring_campaign(  # noqa: D103
    campaign_name: str,
) -> RecurringCampaign:
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.get_recurring_campaign(campaign_name=campaign_name)

get_segment

get_segment(name)
Source code in components/emailing/public/registry.py
def get_segment(name: str) -> Segment:  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.get_segment(name)

get_template

get_template(name)
Source code in components/emailing/public/registry.py
def get_template(name: str) -> EmailTemplate:  # type: ignore[type-arg]  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.get_template(name)

has_campaign

has_campaign(campaign_name)
Source code in components/emailing/public/registry.py
def has_campaign(campaign_name: str) -> bool:  # noqa: D103
    return has_recurring_campaign(campaign_name) or has_non_recurring_campaign(
        campaign_name
    )

has_non_recurring_campaign

has_non_recurring_campaign(campaign_name)
Source code in components/emailing/public/registry.py
def has_non_recurring_campaign(campaign_name: str) -> bool:  # noqa: D103
    try:
        get_non_recurring_campaign(campaign_name)
        return True
    except Exception:
        return False

has_recurring_campaign

has_recurring_campaign(campaign_name)
Source code in components/emailing/public/registry.py
def has_recurring_campaign(campaign_name: str) -> bool:  # noqa: D103
    try:
        get_recurring_campaign(campaign_name)
        return True
    except Exception:
        return False

has_segment

has_segment(name)
Source code in components/emailing/public/registry.py
def has_segment(name: str) -> bool:  # noqa: D103
    try:
        get_segment(name)
        return True
    except Exception:
        return False

has_template

has_template(name)
Source code in components/emailing/public/registry.py
def has_template(name: str) -> bool:  # noqa: D103
    try:
        get_template(name)
        return True
    except Exception:
        return False

list_campaigns

list_campaigns()
Source code in components/emailing/public/registry.py
def list_campaigns() -> list[Campaign | RecurringCampaign]:  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.list_campaigns()

list_non_recurring_campaigns

list_non_recurring_campaigns()
Source code in components/emailing/public/registry.py
def list_non_recurring_campaigns() -> list[Campaign]:  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.list_non_recurring_campaigns()

list_recurring_campaigns

list_recurring_campaigns()
Source code in components/emailing/public/registry.py
def list_recurring_campaigns() -> list[RecurringCampaign]:  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.list_recurring_campaigns()

list_templates

list_templates()
Source code in components/emailing/public/registry.py
def list_templates() -> list[EmailTemplate]:  # type: ignore[type-arg]  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    return emailing_api_blueprint.get_templates()

load_all_models

load_all_models()
Source code in components/emailing/public/registry.py
def load_all_models():  # type: ignore[no-untyped-def]  # noqa: D103
    from components.emailing.internal.models.custom_block import (
        CustomBlock,
    )
    from components.emailing.internal.models.email_shoot import EmailShootV2
    from components.emailing.internal.models.turing_email_stats import TuringEmailStats

    return [EmailShootV2, CustomBlock, TuringEmailStats]

register_non_recurring

register_non_recurring(campaign)

Register a non-recurring campaign to be used by the emailing service.

Source code in components/emailing/public/registry.py
def register_non_recurring(campaign: Campaign) -> None:
    """
    Register a non-recurring campaign to be used by the emailing service.
    """
    from components.emailing.public.blueprint import emailing_api_blueprint

    emailing_api_blueprint.register_non_recurring_campaign(campaign)

register_recurring

register_recurring(recurring_campaign)

Decorator to register a recurring campaign to be used by the emailing service.

Source code in components/emailing/public/registry.py
def register_recurring(recurring_campaign: RecurringCampaign) -> None:
    """
    Decorator to register a recurring campaign to be used by the emailing service.
    """
    from components.emailing.public.blueprint import emailing_api_blueprint

    emailing_api_blueprint.register_recurring_campaign(recurring_campaign)

register_segment

register_segment(_class=None)

Decorator to register a segment to be used by the emailing service.

Example: class MySegment(Segment): ...

Source code in components/emailing/public/registry.py
def register_segment(_class: type[Segment] | None = None):  # type: ignore[no-untyped-def]
    """
    Decorator to register a segment to be used by the emailing service.

    Example:
    class MySegment(Segment):
        ...
    """

    def decorator(segment: type[Segment]):  # type: ignore[no-untyped-def]
        from components.emailing.public.blueprint import emailing_api_blueprint

        emailing_api_blueprint.register_segment(segment())
        return segment

    if _class is None:
        return decorator
    else:
        return decorator(_class)

register_template

register_template(_class=None)

Decorator to register an email template to be used by the emailing service. Example: @register_template class MyTemplate(SelfManagedEmailTemplate): field = "value"

Source code in components/emailing/public/registry.py
def register_template(_class: type[EmailTemplate] | None = None):  # type: ignore[type-arg,no-untyped-def]
    """
    Decorator to register an email template to be used by the emailing service.
    Example:
    @register_template
    class MyTemplate(SelfManagedEmailTemplate):
        field = "value"
    """

    def decorator(template: type[EmailTemplate]):  # type: ignore[type-arg,no-untyped-def]
        from components.emailing.public.blueprint import emailing_api_blueprint

        @wraps(template)
        def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]  # noqa: ARG001
            return template

        emailing_api_blueprint.register_template(template())

        return wrapper

    if _class is None:
        return decorator
    else:
        return decorator(_class)

register_webhook

register_webhook(campaign_name)

Decorator to register a webhook to be called when an event is triggered.

Example

@register_webhook("my_campaign") def my_webhook(email_log: EmailLog, event_type: EventType, occurred_at: datetime]) -> None: if event_type == EventType.SENT: ... ...

Source code in components/emailing/public/registry.py
def register_webhook(campaign_name: str):  # type: ignore[no-untyped-def]
    """
    Decorator to register a webhook to be called when an event is triggered.

    Example:
        @register_webhook("my_campaign")
        def my_webhook(email_log: EmailLog, event_type: EventType, occurred_at: datetime]) -> None:
            if event_type == EventType.SENT:
                ...
            ...
    """

    def decorator(handler: Callable[[EmailLog, EventType, datetime], None]):  # type: ignore[no-untyped-def]
        from components.emailing.public.blueprint import emailing_api_blueprint

        @wraps(handler)
        def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]
            return handler(*args, **kwargs)

        emailing_api_blueprint.register_webhook(campaign_name, handler)

        return wrapper

    return decorator

unregister_all_campaigns

unregister_all_campaigns()
Source code in components/emailing/public/registry.py
def unregister_all_campaigns() -> None:  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    emailing_api_blueprint.unregister_all_campaigns()

unregister_recurring

unregister_recurring(campaign_name)
Source code in components/emailing/public/registry.py
def unregister_recurring(campaign_name: str) -> None:  # noqa: D103
    from components.emailing.public.blueprint import emailing_api_blueprint

    emailing_api_blueprint.unregister_recurring_campaign(campaign_name)

components.emailing.public.shoot

batch_send_shoot

batch_send_shoot(
    shoot_ids=None,
    scheduled_before=None,
    scheduled_after=None,
    campaign_name=None,
    backfill_days=None,
    commit=True,
    async_shoot_send=True,
)

Batch send all scheduled shoots that match the given criteria.

:param shoot_ids: Send only these shoots :param scheduled_before: Send shoots scheduled before this date :param scheduled_after: Send shoots scheduled after this date. Cannot be used with backfill_days :param campaign_name: Only send shoots for this campaign :param backfill_days: Number of days to look back for shoots to send. Cannot be used with scheduled_after :param commit: Whether to commit the changes to the database :param async_shoot_send: Whether to process shoots asynchronously :raises ValueError: If both backfill_days and scheduled_after are provided

Source code in components/emailing/public/shoot.py
def batch_send_shoot(
    shoot_ids: list[UUID] | None = None,
    scheduled_before: datetime | None = None,
    scheduled_after: datetime | None = None,
    campaign_name: str | None = None,
    backfill_days: int | None = None,
    commit: bool = True,
    async_shoot_send: bool = True,
) -> None:
    """
    Batch send all scheduled shoots that match the given criteria.

    :param shoot_ids: Send only these shoots
    :param scheduled_before: Send shoots scheduled before this date
    :param scheduled_after: Send shoots scheduled after this date. Cannot be used with backfill_days
    :param campaign_name: Only send shoots for this campaign
    :param backfill_days: Number of days to look back for shoots to send. Cannot be used with scheduled_after
    :param commit: Whether to commit the changes to the database
    :param async_shoot_send: Whether to process shoots asynchronously
    :raises ValueError: If both backfill_days and scheduled_after are provided
    """
    if shoot_ids is None and scheduled_before is None:
        raise ValueError("Either shoot_ids or scheduled_before must be provided")
    elif backfill_days is not None and scheduled_after is not None:
        raise ValueError("Cannot provide both backfill_days and scheduled_after")

    if backfill_days is not None and scheduled_before is not None:
        scheduled_after = scheduled_before - relativedelta(
            days=backfill_days, hour=0, minute=0, second=0
        )
    elif scheduled_after is None and scheduled_before is not None:
        scheduled_after = scheduled_before - relativedelta(
            days=0, hour=0, minute=0, second=0
        )

    past_scheduled_shoots = list_shoots(
        shoot_ids=shoot_ids,
        scheduled_before=scheduled_before,
        scheduled_after=scheduled_after,
        states=[EmailShootState.scheduled],
        campaign_name=campaign_name,
    )

    current_logger.info(
        f"Found {len(past_scheduled_shoots)} shoots to send before {scheduled_before}",
        scheduled_before=scheduled_before,
        scheduled_after=scheduled_after,
        campaign_name=campaign_name,
        async_shoot_send=async_shoot_send,
    )

    # We use priority queue to process shoots quickly, emails are sent in low priority
    queue = current_rq.get_queue(EMAILING_QUEUE)
    for shoot in past_scheduled_shoots:
        current_logger.info(
            f"Trigger job to send {shoot.shoot_id}", shoot_id=shoot.shoot_id
        )
        if async_shoot_send:
            queue.enqueue(
                send_shoot,
                shoot.shoot_id,
                _commit=commit,
            )
        else:
            try:
                send_shoot(shoot.shoot_id, _commit=commit)
            except Exception as e:
                current_logger.error(
                    f"Error sending shoot {shoot.shoot_id}",
                    shoot_id=shoot.shoot_id,
                    error=str(e),
                )

cancel_shoot

cancel_shoot(shoot_id, commit=True)

Cancel an email shoot

Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def cancel_shoot(shoot_id: UUID, commit: bool = True) -> None:
    """
    Cancel an email shoot
    """
    set_email_shoot_state(shoot_id, EmailShootState.cancelled, commit=commit)
    current_logger.info(f"Email shoot {shoot_id} cancelled", shoot_id=shoot_id)

duplicate_shoot

duplicate_shoot(shoot_id, scheduled_at, commit=True)

Duplicate an email shoot

Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def duplicate_shoot(
    shoot_id: UUID,
    scheduled_at: datetime,
    commit: bool = True,
) -> EmailShoot:
    """
    Duplicate an email shoot
    """
    shoot = get_shoot(shoot_id)
    new_shoot = create_email_shoot(
        shoot_id=None,
        scheduled_at=scheduled_at,
        segment_name=shoot.audience.segment.name,
        audience_settings=shoot.audience.settings,
        template_name=shoot.template.name,
        template_settings=shoot.template_settings,
        audience_limit=shoot.audience.limit,
        transactional_message_id=shoot.transactional_message_id,
        message_frequency=shoot.message_frequency,
        campaign_name=shoot.campaign_name,
        enqueue_as_draft=shoot.enqueue_as_draft,
        metadata=shoot.metadata,
        tags=shoot.tags,
        custom_blocks_variation_id=shoot.custom_blocks_variation_id,
        sender_email_address=shoot.sender_email_address,
    )

    current_logger.info(
        f"Email shoot {shoot_id} duplicated to {new_shoot.shoot_id}",
        shoot_id=shoot_id,
        new_shoot_id=new_shoot.shoot_id,
    )

    if commit:
        current_session.commit()

    return new_shoot

get_email_logs

get_email_logs(shoot_id)
Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def get_email_logs(  # noqa: D103
    shoot_id: UUID,
) -> list[EmailLog]:
    shoot = get_shoot(shoot_id)
    if not shoot.started_at:
        return []
    pagination = paginate_email_logs(
        page=1,
        per_page=5000,
        campaign_names=[mandatory(shoot.campaign_name)],
        message_metadata={"shoot_id": str(shoot_id)},
        is_dry_run=None,
        # We add those filters for performance reasons
        sent_after=(shoot.completed_at or shoot.started_at) - timedelta(days=5),
        sent_before=(
            (shoot.completed_at or shoot.started_at) + timedelta(days=5)
            if shoot.completed_at
            else None
        ),
    )
    return pagination.items

get_preview_shoot_url

get_preview_shoot_url(
    template_name,
    segment_name,
    template_settings,
    audience_settings,
)

Generate a preview shoot url for a given template and audience It requires a flask app to be running Returns None if the url cannot be generated

Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def get_preview_shoot_url(
    template_name: str,
    segment_name: str,
    template_settings: dict,  # type: ignore[type-arg]
    audience_settings: dict,  # type: ignore[type-arg]
) -> str | None:
    """
    Generate a preview shoot url for a given template and audience
    It requires a flask app to be running
    Returns None if the url cannot be generated
    """
    try:
        base_url = (
            f"{current_config['FRONT_END_BASE_URL']}/sales-tools/emailing/segment"
        )
        from urllib.parse import quote

        return f"{base_url}?templateName={template_name}&jsonTemplateSettings={quote(json.dumps(template_settings))}&segmentName={segment_name}&jsonAudienceSettings={json.dumps(audience_settings)}"
    except Exception:
        current_logger.exception(
            "Failed to generate preview shoot url",
            template_name=template_name,
            template_settings=template_settings,
            audience_settings=audience_settings,
            segment_name=segment_name,
        )
        return None

get_shoot

get_shoot(shoot_id)
Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def get_shoot(shoot_id: UUID) -> EmailShoot:  # noqa: D103
    return get_email_shoot(shoot_id)

has_shoot

has_shoot(shoot_id)
Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def has_shoot(shoot_id: UUID) -> bool:  # noqa: D103
    try:
        get_shoot(shoot_id)
        return True
    except ValueError:
        return False

list_shoots

list_shoots(
    shoot_ids=None,
    scheduled_before=None,
    scheduled_after=None,
    campaign_name=None,
    limit=None,
    states=None,
    metadata=None,
    account_refs=None,
    user_ids=None,
    tags=None,
)
Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def list_shoots(  # noqa: D103
    shoot_ids: list[UUID] | None = None,
    scheduled_before: datetime | None = None,
    scheduled_after: datetime | None = None,
    campaign_name: str | None = None,
    limit: int | None = None,
    states: list[EmailShootState] | None = None,
    metadata: dict[str, Any] | None = None,
    account_refs: list[str] | None = None,
    user_ids: list[str] | None = None,
    tags: list[str] | None = None,
) -> list[EmailShoot]:
    return list_email_shoots(
        shoot_ids=shoot_ids,
        scheduled_before=scheduled_before,
        scheduled_after=scheduled_after,
        campaign_name=campaign_name,
        limit=limit,
        states=states,
        metadata=metadata,
        account_refs=account_refs,
        user_ids=user_ids,
        tags=tags,
    )

patch_shoot

patch_shoot(
    shoot_id,
    scheduled_at=NOT_SET,
    segment=NOT_SET,
    audience_settings=NOT_SET,
    template=NOT_SET,
    template_settings=NOT_SET,
    tags=NOT_SET,
    audience_limit=NOT_SET,
    campaign_name=NOT_SET,
    message_frequency=NOT_SET,
    transactional_message_id=NOT_SET,
    metadata=NOT_SET,
    enqueue_as_draft=NOT_SET,
    custom_blocks_variation_id=NOT_SET,
    commit=True,
)

Patch an email shoot. It only edits the shoot if the parameter is not None

Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def patch_shoot(
    shoot_id: UUID,
    scheduled_at: NotSet[datetime] = NOT_SET,
    segment: NotSet[type[Segment]] = NOT_SET,
    audience_settings: NotSet[dict | None] = NOT_SET,  # type: ignore[type-arg]
    template: NotSet[type[EmailTemplate]] = NOT_SET,  # type: ignore[type-arg]
    template_settings: NotSet[dict] = NOT_SET,  # type: ignore[type-arg]
    tags: NotSet[list[str]] = NOT_SET,
    audience_limit: NotSet[int | None] = NOT_SET,
    campaign_name: NotSet[str | None] = NOT_SET,
    message_frequency: NotSet[MessageFrequency | None] = NOT_SET,
    transactional_message_id: NotSet[str | None] = NOT_SET,
    metadata: NotSet[dict] = NOT_SET,  # type: ignore[type-arg]
    enqueue_as_draft: NotSet[bool] = NOT_SET,
    custom_blocks_variation_id: NotSet[str | None] = NOT_SET,
    commit: bool = True,
) -> EmailShoot:
    """
    Patch an email shoot. It only edits the shoot if the parameter is not None
    """
    if is_set(segment) and not has_segment(segment.name):
        raise ValueError(f"Unknown segment {segment.name}")
    if is_set(template) and not has_template(template.name):
        raise ValueError(f"Unknown template {template.name}")

    shoot = update_email_shoot(
        shoot_id=shoot_id,
        scheduled_at=scheduled_at,
        segment_name=(NOT_SET if segment is NOT_SET else segment.name),
        audience_settings=audience_settings,
        template_name=(NOT_SET if template is NOT_SET else template.name),
        template_settings=template_settings,
        audience_limit=audience_limit,
        transactional_message_id=transactional_message_id,
        message_frequency=message_frequency,
        campaign_name=campaign_name,
        enqueue_as_draft=enqueue_as_draft,
        metadata=metadata,
        tags=tags,
        custom_blocks_variation_id=custom_blocks_variation_id,
    )

    if commit:
        current_session.commit()

    current_logger.info(
        f"Email shoot {shoot_id} patched",
        shoot_id=shoot_id,
    )

    return shoot

schedule_shoot

schedule_shoot(
    scheduled_at,
    audience,
    template,
    template_settings,
    tags=None,
    campaign_name=None,
    message_frequency=None,
    transactional_message_id=None,
    custom_blocks=None,
    metadata=None,
    enqueue_as_draft=False,
    shoot_id=None,
    sender_email_address=None,
    dry_run=False,
    commit=True,
    send_if_scheduled_in_past=True,
    _async_send=True,
)

Schedule an email shoot at a given time

:param scheduled_at: The time at which the shoot should be sent. If in the past, the shoot will be sent immediately async. :param audience: The audience to use :param template: Template to use :param template_settings: The settings to use to render the template :param tags: The tags to add to the shoot :param message_frequency: The optional limit settings that will be used to manage the number of emails max sent to a user. :param transactional_message_id: The optional transactional message id that will be used to send the email :param campaign_name: Identifies a group of messages. This is used to trigger the right webhooks :param custom_blocks: The optional custom blocks to adapt the email content :param metadata: The optional metadata to add to the shoot :param shoot_id: The optional shoot id to use else a new one will be generated :param sender_email_address: optional sender email address that will be used to send the email. Defaults to config["EMAIL_SENDER_NAME"] if None. :param enqueue_as_draft: If True, the shoot will enqueue message as a draft when sending :param commit: If True, the shoot will be persisted in the database :param dry_run: If True, the shoot will not be sent but will be persisted in the database and email log will be created :param send_if_scheduled_in_past: If True, the shoot will be sent immediately if scheduled in the past :param _async_send: If True, the shoot will be sent async if it needs to be sent immediately

:return: The scheduled shoot

Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def schedule_shoot(
    scheduled_at: datetime,
    audience: Audience,
    template: type[EmailTemplate],  # type: ignore[type-arg]
    template_settings: dict,  # type: ignore[type-arg]
    tags: list[str] | None = None,
    campaign_name: str | None = None,
    message_frequency: MessageFrequency | None = None,
    transactional_message_id: str | None = None,
    custom_blocks: list[CustomBlock] | None = None,
    metadata: dict | None = None,  # type: ignore[type-arg]
    enqueue_as_draft: bool = False,
    shoot_id: UUID | None = None,
    sender_email_address: str | None = None,
    dry_run: bool = False,
    commit: bool = True,
    send_if_scheduled_in_past: bool = True,
    _async_send: bool = True,
) -> EmailShoot | None:
    """
    Schedule an email shoot at a given time

    :param scheduled_at: The time at which the shoot should be sent. If in the past, the shoot will be sent immediately
        async.
    :param audience: The audience to use
    :param template: Template to use
    :param template_settings: The settings to use to render the template
    :param tags: The tags to add to the shoot
    :param message_frequency: The optional limit settings that will be used to manage the number of emails max sent
        to a user.
    :param transactional_message_id: The optional transactional message id that will be used to send the email
    :param campaign_name: Identifies a group of messages. This is used to trigger the right webhooks
    :param custom_blocks: The optional custom blocks to adapt the email content
    :param metadata: The optional metadata to add to the shoot
    :param shoot_id: The optional shoot id to use else a new one will be generated
    :param sender_email_address: optional sender email address that will be used to send the email.
        Defaults to config["EMAIL_SENDER_NAME"] if None.
    :param enqueue_as_draft: If True, the shoot will enqueue message as a draft when sending
    :param commit: If True, the shoot will be persisted in the database
    :param dry_run: If True, the shoot will not be sent but will be persisted in the database and email log will be
        created
    :param send_if_scheduled_in_past: If True, the shoot will be sent immediately if scheduled in the past
    :param _async_send: If True, the shoot will be sent async if it needs to be sent immediately

    :return: The scheduled shoot
    """
    if not has_segment(audience.segment.name):
        raise ValueError(f"Unknown segment {audience.segment.name}")
    if not has_template(template.name):
        raise ValueError(f"Unknown template {template.name}")
    if not custom_blocks:
        custom_blocks = []

    with AdvisoryLock(
        f"shoot_scheduling_lock_{shoot_id or 'skipped'}", ignore_lock=shoot_id is None
    ):
        if shoot_id and has_shoot(shoot_id):
            current_logger.warning(
                "Shoot already scheduled because shoot_id already exists - skipping",
                campaign_name=campaign_name,
                shoot_id=shoot_id,
            )
            return None

        custom_blocks_variation_id = one_or_none(
            uniquify([cb.variation_id for cb in custom_blocks]),
            message="All custom blocks must have the same variation id",
        )

        shoot = create_email_shoot(
            shoot_id=shoot_id,
            scheduled_at=scheduled_at,
            segment_name=audience.segment.name,
            audience_settings=audience.settings,
            template_name=template.name,
            template_settings=template_settings,
            audience_limit=audience.limit,
            transactional_message_id=transactional_message_id,
            message_frequency=message_frequency,
            campaign_name=campaign_name,
            enqueue_as_draft=enqueue_as_draft,
            dry_run=dry_run,
            metadata=metadata,
            tags=tags,
            custom_blocks_variation_id=custom_blocks_variation_id,
            sender_email_address=sender_email_address,
        )

        for custom_block in custom_blocks:
            upsert_custom_block(
                variation_id=custom_block.variation_id,
                block_type=custom_block.block_type,
                content=custom_block.content,
                title=custom_block.title or "",
            )

        if commit:
            current_session.commit()
        else:
            current_session.flush()

        event_handler.trigger_event(
            event_name=EmailShootEvent.shoot_scheduled,
            email_shoot=shoot,
            occurred_at=datetime.utcnow(),
        )
        current_logger.info(
            f"Email shoot scheduled at {scheduled_at}", shoot_id=shoot.shoot_id
        )

        if scheduled_at <= datetime.utcnow() and commit and send_if_scheduled_in_past:
            current_logger.info(
                "Scheduling an email shoot in the past, sending it immediately",
                shoot_id=shoot.shoot_id,
                scheduled_at=scheduled_at,
                tags=tags,
            )
            send_shoot(shoot.shoot_id)

        return shoot

send_shoot

send_shoot(
    shoot_id,
    enqueue_as_draft=None,
    dry_run=None,
    is_retry=False,
    _commit=True,
)

Send an email shoot to its audience.

:param shoot_id id of the shoot to send :param dry_run If not dry run, the shoot will be marked as sending then completed when all the emails have been processed. :param enqueue_as_draft If True, the shoot will enqueue message as a draft when sending. If False it will send If None we let customer.io decides. :param is_retry If True, the shoot will be rescheduled (state set back to scheduled) before being sent

Source code in components/emailing/public/shoot.py
@tracer.wrap(service="emailing")
def send_shoot(
    shoot_id: UUID,
    enqueue_as_draft: bool | None = None,
    dry_run: bool | None = None,
    is_retry: bool = False,
    _commit: bool = True,
) -> None:
    """
    Send an email shoot to its audience.

    :param shoot_id id of the shoot to send
    :param dry_run If not dry run, the shoot will be marked as sending then completed
        when all the emails have been processed.
    :param enqueue_as_draft If True, the shoot will enqueue message as a draft when sending. If False it will send
        If None we let customer.io decides.
    :param is_retry If True, the shoot will be rescheduled (state set back to scheduled) before being sent
    """
    shoot = get_shoot(shoot_id)
    email_priority = EmailPriority.low
    emailing_queue = get_emailing_queue_name(email_priority)

    if dry_run is None:
        dry_run = shoot.dry_run
    elif dry_run != shoot.dry_run:
        current_logger.warning(
            f"Overriding shoot dry_run with {dry_run}", shoot_id=shoot_id
        )

    if enqueue_as_draft is None:
        enqueue_as_draft = shoot.enqueue_as_draft
    elif enqueue_as_draft != shoot.enqueue_as_draft:
        current_logger.warning(
            f"Overriding shoot enqueue_as_draft with {enqueue_as_draft}",
            shoot_id=shoot_id,
        )

    if is_retry:
        current_logger.info(
            f"Retrying shoot {shoot_id}, rescheduling it",
            shoot_id=shoot_id,
        )
        set_email_shoot_state(shoot_id, EmailShootState.scheduled, commit=_commit)
        shoot = get_shoot(shoot_id)

    if shoot.state != EmailShootState.scheduled:
        raise ValueError(f"Only scheduled shoots can be sent, got {shoot.state}")

    lock_key = f"emailing/send_shoot/{str(shoot_id)}"
    with AdvisoryLock(lock_key):
        try:
            set_email_shoot_state(shoot_id, EmailShootState.sending, commit=_commit)
            shoot = get_shoot(shoot_id)
            recipients = shoot.audience.list_recipients()
            message_metadata = shoot.metadata.copy()
            message_metadata["shoot_id"] = shoot_id

            if not recipients:
                current_logger.warning(
                    f"Recipients for shoot {shoot_id}, skipping",
                    shoot_id=shoot_id,
                )
                set_email_shoot_state(shoot_id, EmailShootState.skipped, commit=_commit)
                return

            set_metrics(recipient_count=len(recipients), shoot_id=shoot_id)

            if _commit:
                current_session.commit()
            else:
                current_session.flush()

            @tracer.wrap(service="emailing")
            def mark_shoot_as_completed(
                shared_context: SharedSideEffectContext,
            ) -> None:
                """
                This callback enqueue a job that will be executed only when all the jobs
                triggered.
                """
                kwargs = {
                    "shoot_id": shoot_id,
                    "state": EmailShootState.completed,
                    "commit": _commit,
                }

                if shared_context.job_ids:
                    current_rq.get_queue(emailing_queue).enqueue(
                        set_email_shoot_state,
                        depends_on=shared_context.job_ids,
                        **kwargs,  # type: ignore[arg-type]
                    )
                    current_logger.info(
                        f"Enqueued job to mark shoot {shoot_id} as completed",
                        shoot_id=shoot_id,
                    )
                else:
                    set_email_shoot_state(**kwargs)  # type: ignore[arg-type]

            with side_effects(
                trigger_on_exit=_commit,
                ignore_leftovers=not _commit,
                on_all_executed=mark_shoot_as_completed,
            ):
                template_settings = shoot.template_settings

                if shoot.custom_blocks_variation_id:
                    template_settings["custom_blocks_variation_id"] = (
                        shoot.custom_blocks_variation_id
                    )
                else:
                    template_settings["custom_blocks_variation_id"] = str(shoot_id)

                batch_send_email(
                    recipients=recipients,
                    template_name=shoot.template.name,
                    template_settings=template_settings,
                    transactional_message_id=shoot.transactional_message_id,
                    message_frequency=shoot.message_frequency,
                    dry_run=dry_run,
                    enqueue_as_draft=enqueue_as_draft,
                    message_metadata=message_metadata,
                    campaign_name=shoot.campaign_name,
                    account_ref=shoot.metadata.get("account_id")
                    or shoot.metadata.get("account_ref"),
                    sender_email_address=shoot.sender_email_address,
                    # do not change this as we never want to flood the high priority queue with shoot emails
                    email_priority=email_priority,
                    defer_send=True,
                    log_params={"shoot_id": shoot_id},
                )

            current_logger.info(
                f"Email shoot {shoot_id} triggered for {len(recipients)} recipients",
                shoot_id=shoot_id,
                audience_size=len(recipients),
            )
        except Exception as e:
            set_email_shoot_state(shoot_id, EmailShootState.failed, commit=_commit)
            current_logger.error(
                "Failed to send email shoot put status to failed", shoot_id=shoot_id
            )
            raise e

components.emailing.public.testing

factories

CustomBlockFactory

Bases: AlanBaseFactory['CustomBlock']

Meta
model class-attribute instance-attribute
model = CustomBlock
block_type class-attribute instance-attribute
block_type = 'intro'
content class-attribute instance-attribute
content = 'Content of the block'
title class-attribute instance-attribute
title = 'Title of the block'
variation_id class-attribute instance-attribute
variation_id = Faker('uuid4')

EmailLogFactory

Bases: Factory

Meta
model class-attribute instance-attribute
model = EmailLog
account_ref class-attribute instance-attribute
account_ref = UUID(
    "1234-1234-1234-1234-1234-1234-1234-1234"
)
attachment_filenames class-attribute instance-attribute
attachment_filenames = []
bounced_at class-attribute instance-attribute
bounced_at = None
campaign_name class-attribute instance-attribute
campaign_name = 'campaign_name'
clicked class-attribute instance-attribute
clicked = False
created_at class-attribute instance-attribute
created_at = datetime(2021, 1, 1)
delivered_at class-attribute instance-attribute
delivered_at = datetime(2021, 1, 1)
delivery_id class-attribute instance-attribute
delivery_id = 'delivery_id'
dropped_at class-attribute instance-attribute
dropped_at = None
email_address class-attribute instance-attribute
email_address = 'foo@bar.com'
email_log_ref class-attribute instance-attribute
email_log_ref = '1234'
error_message class-attribute instance-attribute
error_message = None
error_type class-attribute instance-attribute
error_type = None
is_dry_run class-attribute instance-attribute
is_dry_run = False
message_metadata class-attribute instance-attribute
message_metadata = {}
opened class-attribute instance-attribute
opened = False
opened_at class-attribute instance-attribute
opened_at = None
recipient_type class-attribute instance-attribute
recipient_type = member
sent_at class-attribute instance-attribute
sent_at = datetime(2021, 1, 1)
state class-attribute instance-attribute
state = sent
success class-attribute instance-attribute
success = True
template_name class-attribute instance-attribute
template_name = 'template_name'
template_settings class-attribute instance-attribute
template_settings = {}
user_context class-attribute instance-attribute
user_context = {}
user_id class-attribute instance-attribute
user_id = 'user_id'

EmailShootConfigFactory

Bases: Factory

Meta
model class-attribute instance-attribute
model = EmailShootConfig
audience class-attribute instance-attribute
audience = Audience(
    segment=DummySegment(),
    settings={"company_id": "1234", "quz": 2},
)
config_id class-attribute instance-attribute
config_id = Faker('uuid4')
manual_send_only class-attribute instance-attribute
manual_send_only = False
message_frequency class-attribute instance-attribute
message_frequency = MessageFrequency(
    message_limit=1,
    idempotency_params={
        "campaign_name": "recurring_campaign_name",
        "company_id": "1234",
    },
)
metadata class-attribute instance-attribute
metadata = {}
scheduled_at class-attribute instance-attribute
scheduled_at = Faker('date_time')
state class-attribute instance-attribute
state = pending
template_settings class-attribute instance-attribute
template_settings = {'dummy': 'template_settings'}

EmailShootFactory

Bases: Factory

Meta
model class-attribute instance-attribute
model = EmailShoot
app_name class-attribute instance-attribute
app_name = ALAN_FR
audience class-attribute instance-attribute
audience = Audience(
    segment=DummySegment(),
    settings={"foo": "bar"},
    limit=None,
)
campaign_name class-attribute instance-attribute
campaign_name = 'campaign_name'
cancelled_at class-attribute instance-attribute
cancelled_at = None
clicked_count class-attribute instance-attribute
clicked_count = 0
completed_at class-attribute instance-attribute
completed_at = None
delivered_count class-attribute instance-attribute
delivered_count = 0
dry_run class-attribute instance-attribute
dry_run = False
enqueue_as_draft class-attribute instance-attribute
enqueue_as_draft = False
failed_count class-attribute instance-attribute
failed_count = 0
is_persisted class-attribute instance-attribute
is_persisted = True
message_frequency class-attribute instance-attribute
message_frequency = None
metadata class-attribute instance-attribute
metadata = {'foo': 'bar'}
opened_count class-attribute instance-attribute
opened_count = 0
recipient_count class-attribute instance-attribute
recipient_count = None
scheduled_at class-attribute instance-attribute
scheduled_at = Faker('date_time')
sender_email_address class-attribute instance-attribute
sender_email_address = None
sent_count class-attribute instance-attribute
sent_count = 0
shoot_id class-attribute instance-attribute
shoot_id = Faker('uuid4', cast_to=None)
started_at class-attribute instance-attribute
started_at = None
state class-attribute instance-attribute
state = scheduled
tags class-attribute instance-attribute
tags = ['tag1', 'tag2']
template class-attribute instance-attribute
template = DummyTemplate()
template_settings class-attribute instance-attribute
template_settings = {}
transactional_message_id class-attribute instance-attribute
transactional_message_id = '1234'
updated_at class-attribute instance-attribute
updated_at = None

EmailShootSQLAFactory

Bases: AlanBaseFactory['EmailShoot']

Meta
model class-attribute instance-attribute
model = EmailShootV2
app_name class-attribute instance-attribute
app_name = ALAN_FR
audience_limit class-attribute instance-attribute
audience_limit = None
audience_settings class-attribute instance-attribute
audience_settings = None
campaign_name class-attribute instance-attribute
campaign_name = 'campaign_name'
cancelled_at class-attribute instance-attribute
cancelled_at = None
clicked_count class-attribute instance-attribute
clicked_count = 0
completed_at class-attribute instance-attribute
completed_at = None
delivered_count class-attribute instance-attribute
delivered_count = 0
enqueue_as_draft class-attribute instance-attribute
enqueue_as_draft = False
failed_count class-attribute instance-attribute
failed_count = 0
idempotency_key class-attribute instance-attribute
idempotency_key = None
max_message_per_key class-attribute instance-attribute
max_message_per_key = None
opened_count class-attribute instance-attribute
opened_count = 0
scheduled_at class-attribute instance-attribute
scheduled_at = datetime(2021, 1, 1)
segment_name class-attribute instance-attribute
segment_name = 'dummy_segment_1'
sent_count class-attribute instance-attribute
sent_count = 0
shoot_id class-attribute instance-attribute
shoot_id = Faker('uuid4', cast_to=None)
started_at class-attribute instance-attribute
started_at = None
state class-attribute instance-attribute
state = scheduled
tags class-attribute instance-attribute
tags = []
template_name class-attribute instance-attribute
template_name = 'dummy_template_1'
template_settings class-attribute instance-attribute
template_settings = None
transactional_message_id class-attribute instance-attribute
transactional_message_id = None

RecurringCampaignFactory

Bases: Factory

Meta
model class-attribute instance-attribute
model = DummyRecurringCampaign
campaign_name class-attribute instance-attribute
campaign_name = 'campaign_name'
recurrence class-attribute instance-attribute
recurrence = rrule(DAILY, dtstart=datetime(2021, 1, 1))
template_cls class-attribute instance-attribute
template_cls = DummyTemplate
template_settings class-attribute instance-attribute
template_settings = {}

TuringEmailStatsFactory

Bases: AlanBaseFactory['TuringEmailStats']

Meta
model class-attribute instance-attribute
model = TuringEmailStats
app_name class-attribute instance-attribute
app_name = ALAN_FR
campaign_name class-attribute instance-attribute
campaign_name = 'campaign_name'
delivered_count class-attribute instance-attribute
delivered_count = 9
on_date class-attribute instance-attribute
on_date = date(2021, 1, 1)
recipient_types class-attribute instance-attribute
recipient_types = [member]
sent_count class-attribute instance-attribute
sent_count = 10

mock

EmailLogMock

EmailLogMock(mocker)
Source code in components/emailing/public/testing/mock.py
def __init__(self, mocker) -> None:  # type: ignore[no-untyped-def]
    def create_email_log(*args, **kwargs):  # type: ignore[no-untyped-def]  # noqa: ARG001
        return EmailLogFactory.create(delivery_id=FAKE_DELIVERY_ID)

    # mock return value of create_email_log implemented in `components.emailing.external.api.legacy_email_log.create_email_log`
    self.mocked_create_email_log = mocker.patch(
        "components.emailing.external.api.legacy_email_log.create_email_log",
        side_effect=create_email_log,
    )
assert_created_with_args
assert_created_with_args(*args, **kwargs)
Source code in components/emailing/public/testing/mock.py
def assert_created_with_args(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]  # noqa: D102
    self.mocked_create_email_log.assert_called_once_with(*args, **kwargs)
has_been_created
has_been_created()
Source code in components/emailing/public/testing/mock.py
def has_been_created(self):  # type: ignore[no-untyped-def]  # noqa: D102
    return self.mocked_create_email_log.call_count > 0
mocked_create_email_log instance-attribute
mocked_create_email_log = patch(
    "components.emailing.external.api.legacy_email_log.create_email_log",
    side_effect=create_email_log,
)
reset_mock
reset_mock()
Source code in components/emailing/public/testing/mock.py
def reset_mock(self) -> None:  # noqa: D102
    self.mocked_create_email_log.reset_mock(return_value=True, side_effect=True)

FAKE_DELIVERY_ID module-attribute

FAKE_DELIVERY_ID = 'f089d3da4533470e82cb0540c4954b43'

MailerMock

MailerMock(mocker)
Source code in components/emailing/public/testing/mock.py
def __init__(self, mocker) -> None:  # type: ignore[no-untyped-def]
    self.mocker = mocker
    self.set_response(FAKE_DELIVERY_ID, True)
assert_any_email_sent
assert_any_email_sent(
    recipient_address=None,
    body=None,
    subject=None,
    attachments_count=None,
)

Assert that at least an email has been sent with the given properties

Source code in components/emailing/public/testing/mock.py
def assert_any_email_sent(
    self,
    recipient_address: Optional[str] = None,
    body: Optional[str] = None,
    subject: Optional[str] = None,
    attachments_count: Optional[int] = None,
) -> None:
    """
    Assert that at least an email has been sent with the given properties
    """
    requests = self.get_last_email_requests()
    if recipient_address:
        self._assert_almost_in(
            recipient_address, [req.recipient.email_address for req in requests]
        )
    if body:
        self._assert_almost_in(body, [req.content.body for req in requests])
    if subject:
        self._assert_almost_in(subject, [req.content.subject for req in requests])
    if attachments_count:
        assert attachments_count in [
            len((req.content.attachments or {}).keys()) for req in requests
        ]
assert_email_not_sent
assert_email_not_sent(email=None, body=None, subject=None)

Assert that no email has been sent with the given properties

Source code in components/emailing/public/testing/mock.py
def assert_email_not_sent(
    self,
    email: str | None = None,
    body: Union[str, list[str]] | None = None,
    subject: str | None = None,
) -> None:
    """
    Assert that no email has been sent with the given properties
    """
    requests = self.get_last_email_requests()

    # If not arguments just assert that no email has been sent
    if not email and not body and not subject:
        assert len(requests) == 0
    if email:
        self._assert_not_almost_in(
            email, [req.recipient.email_address for req in requests]
        )
    if body and isinstance(body, str):
        self._assert_not_almost_in(body, [req.content.body for req in requests])
    if body and isinstance(body, list):
        for x in body:
            self._assert_not_almost_in(x, [req.content.body for req in requests])
    if subject:
        self._assert_not_almost_in(
            subject, [req.content.subject for req in requests]
        )
assert_email_sent
assert_email_sent(
    user_id=None,
    email=None,
    body=None,
    not_body=None,
    subject=None,
    index=None,
    transactional_message_id=None,
    attachments_count=None,
)

Assert the nth email sent defined by index args has been set with the given property

Source code in components/emailing/public/testing/mock.py
def assert_email_sent(
    self,
    user_id: Optional[str] = None,
    email: Union[Optional[str], list[str]] | None = None,
    body: Union[Optional[str], list[str]] | None = None,
    not_body: Union[Optional[str], list[str]] | None = None,
    subject: Optional[str] = None,
    index: Optional[int] = None,
    transactional_message_id: Optional[str] = None,
    attachments_count: Optional[int] = None,
) -> None:
    """
    Assert the nth email sent defined by index args has been set with the
    given property
    """
    if index is None:
        if email is None or email and isinstance(email, str):
            index = self._find_request_index(
                email=email, transactional_message_id=transactional_message_id
            )
        else:
            index = 0
    request = self.get_last_email_requests()[index]
    if user_id:
        assert user_id == request.recipient.user_ref
    if email and isinstance(email, str):
        assert email in request.recipient.email_address
    if email and isinstance(email, list):
        for x in email:
            assert x in request.recipient.email_address, (
                f"{x} not found in {request.recipient.email_address}"
            )
    if body and isinstance(body, str) and request.content.body:
        assert request.content.subject is not None, "Body should not be None"
        assert body in request.content.body
    if body and isinstance(body, list) and request.content.body:
        for x in body:
            assert x in request.content.body, (
                f"{x} not found in {request.content.body}"
            )
    if not_body and isinstance(not_body, str) and request.content.body:
        assert not_body not in request.content.body
    if not_body and isinstance(not_body, list) and request.content.body:
        for x in not_body:
            assert x not in request.content.body, (
                f"{x} found in {request.content.body}"
            )
    if subject:
        assert request.content.subject is not None, "Subject should not be None"
        assert subject in request.content.subject
    if attachments_count is not None:
        if attachments_count > 0:
            assert request.content.attachments is not None, (
                "Email attachments is None while attachment count is > 0"
            )
        assert attachments_count == (
            len(request.content.attachments) if request.content.attachments else 0
        )

    if transactional_message_id:
        assert transactional_message_id == request.sending_options.message_id
assert_email_sent_count
assert_email_sent_count(count)
Source code in components/emailing/public/testing/mock.py
def assert_email_sent_count(self, count: int) -> None:  # noqa: D102
    assert count == self.mocked_send_email.call_count
assert_single_email_sent
assert_single_email_sent(
    user_id=None,
    email=None,
    body=None,
    not_body=None,
    subject=None,
    transactional_message_id=None,
    attachments_count=None,
)
Source code in components/emailing/public/testing/mock.py
def assert_single_email_sent(  # noqa: D102
    self,
    user_id: Optional[str] = None,
    email: Union[Optional[str], list[str]] | None = None,
    body: Union[Optional[str], list[str]] | None = None,
    not_body: Union[Optional[str], list[str]] | None = None,
    subject: Optional[str] = None,
    transactional_message_id: Optional[str] = None,
    attachments_count: Optional[int] = None,
) -> None:
    self.assert_email_sent_count(1)
    self.assert_email_sent(
        user_id,
        email,
        body,
        not_body,
        subject,
        0,
        transactional_message_id,
        attachments_count,
    )
get_last_email_requests
get_last_email_requests()
Source code in components/emailing/public/testing/mock.py
def get_last_email_requests(self) -> list["EmailRequest"]:  # noqa: D102
    return [x[0][0] for x in self.mocked_send_email.call_args_list]
last_email_request property
last_email_request
mocker instance-attribute
mocker = mocker
reset_mock
reset_mock()
Source code in components/emailing/public/testing/mock.py
def reset_mock(self) -> None:  # noqa: D102
    if self.mocked_send_email:
        self.mocked_send_email.reset_mock(return_value=True, side_effect=True)
set_response
set_response(delivery_id, success, error=None)
Source code in components/emailing/public/testing/mock.py
def set_response(  # noqa: D102
    self, delivery_id: str | None, success: bool, error: str | None = None
) -> None:
    def send_email(*args, **kwargs):  # type: ignore[no-untyped-def]  # noqa: ARG001
        return MailerResponse(delivery_id=delivery_id, success=success, error=error)

    self.mocked_send_email = self.mocker.patch.object(
        CustomerIOMailer, "send_email", side_effect=send_email
    )