Skip to content

Api reference

components.async_exports.public.actions

create_export

create_export

create_export(user_id, export_data)

Create export request and queue processing.

Source code in components/async_exports/internal/business_logic/actions/create_export.py
def create_export(
    user_id: uuid.UUID | int, export_data: ExportRequestData
) -> uuid.UUID:
    """Create export request and queue processing."""
    log = current_logger.bind()

    # 1. Create request
    log.info("Creating export request")
    export_request = ExportRequest(
        profile_id=export_data.profile_id,
        export_type=export_data.export_type,
        filters=export_data.filters,
        file_format=export_data.file_format,
        status=ExportResultProcessingStatus.processing,  # Initial status set to processing for the request
        context_account_id=export_data.context_account_id,
        company_ids=export_data.company_ids,
        operational_scope_ids=export_data.operational_scope_ids,
    )
    current_session.add(export_request)
    current_session.flush()  # Ensure the ID is generated before using it
    log = log.bind(export_request_id=export_request.id)

    # 2. Create result
    log.info(
        f"Created export request with ID {export_request.id}. Now creating export result."
    )
    export_result = ExportResult(
        export_request_id=export_request.id,
        processing_status=ExportResultProcessingStatus.pending,
        id=uuid.uuid4(),
    )
    current_session.add(export_result)
    log = log.bind(export_result_id=export_result.id)
    log.info(f"Created export result with ID {export_result.id}.")

    # 3. Commit before queuing job to ensure worker can access the records
    current_session.commit()

    # 4. Queue job after commit
    log.info(f"Queuing export job for export result ID {export_result.id}.")
    main_queue = current_rq.get_queue(ASYNC_EXPORTS_QUEUE)
    job = main_queue.enqueue(
        process_export_job,
        export_result.id,
        user_id,
        retry=Retry(max=MAX_RETRIES, interval=RETRY_INTERVAL_SECONDS),
    )

    # 5. Store job ID
    export_result.rq_job_id = job.id
    current_session.add(export_result)
    current_session.commit()

    log.bind(job_id=job.id)
    log.info(
        f"Enqueued export job {job.id} for export result ID {export_result.id}.",
        job=job,
        export_result=export_result,
        export_request=export_request,
    )

    return export_result.id

increment_download_counts

increment_export_download_counts

increment_export_download_counts(
    user_id, export_result_ids
)

Increment download counts for export results that the user has permission to access.

This function will: 1. Get export results with proper permission filtering 2. Increment download_count for each result 3. Set first_downloaded_at if not set 4. Update last_downloaded_at 5. Commit the changes

Parameters:

Name Type Description Default
user_id int

The user ID for permission checking

required
export_result_ids list[UUID]

List of export result UUIDs that were downloaded

required

Returns:

Type Description
None

None

Source code in components/async_exports/public/actions/increment_download_counts.py
def increment_export_download_counts(
    user_id: int,
    export_result_ids: list[uuid.UUID],
) -> None:
    """
    Increment download counts for export results that the user has permission to access.

    This function will:
    1. Get export results with proper permission filtering
    2. Increment download_count for each result
    3. Set first_downloaded_at if not set
    4. Update last_downloaded_at
    5. Commit the changes

    Args:
        user_id: The user ID for permission checking
        export_result_ids: List of export result UUIDs that were downloaded

    Returns:
        None
    """
    from components.async_exports.public.queries.get_export_results import (
        get_export_results_for_download,
    )

    # Get export results with permission filtering using public API
    export_result_data = get_export_results_for_download(
        user_id=user_id,
        export_result_ids=set(export_result_ids),
    )

    # Extract the IDs that passed permission checks
    permitted_result_ids = {result_data.id for result_data in export_result_data}

    if not permitted_result_ids:
        return

    # Get the actual ExportResult instances for the permitted results
    export_results = (
        ExportResultBroker.get_non_deleted_export_results_with_request_context(
            export_result_ids=permitted_result_ids
        )
    )

    # Increment download count for each result
    for export_result in export_results:
        export_result.increment_download_count()

    # Commit all changes at once
    current_session.commit()

components.async_exports.public.dependencies

AsyncExportsDependency

Bases: ABC

AsyncExportsDependency defines the interface that apps using the async_exports component need to implement This interface is used to generate exports asynchronously.

generate_export abstractmethod

generate_export(
    export_type,
    filters,
    file_format,
    company_ids,
    operational_scope_ids,
)

Generates an export based on the provided export type, filters, and file format. return a dictionary containing the export file content and metadata (items count, mimetype, etc.)

Source code in components/async_exports/public/dependencies.py
@abstractmethod
def generate_export(
    self,
    export_type: ExportType,
    filters: dict[str, str | list[str]] | None,
    file_format: ExportExtension,
    company_ids: list[str],
    operational_scope_ids: list[uuid.UUID],
) -> GeneratedExportData:
    """
    Generates an export based on the provided export type, filters, and file format.
    return a dictionary containing the export file content and metadata (items count, mimetype, etc.)
    """
    raise NotImplementedError(
        "generate_export must be implemented by the dependency"
    )

COMPONENT_NAME module-attribute

COMPONENT_NAME = 'async_exports'

get_app_dependency

get_app_dependency()

Retrieves at runtime the async_exports dependency set by set_app_dependency

Source code in components/async_exports/public/dependencies.py
def get_app_dependency() -> AsyncExportsDependency:
    """Retrieves at runtime the async_exports dependency set by set_app_dependency"""
    from flask import current_app

    app = cast("CustomFlask", current_app)
    return cast(
        "AsyncExportsDependency",
        app.get_component_dependency(COMPONENT_NAME),
    )

set_app_dependency

set_app_dependency(dependency)

Sets the async_exports dependency to the app so it can be accessed within this component at runtime

Source code in components/async_exports/public/dependencies.py
def set_app_dependency(dependency: AsyncExportsDependency) -> None:
    """
    Sets the async_exports dependency to the app so it can be accessed within this component at runtime
    """
    from flask import current_app

    cast("CustomFlask", current_app).add_component_dependency(
        COMPONENT_NAME, dependency
    )

components.async_exports.public.entities

export_info

ExportInfo dataclass

ExportInfo(
    *,
    export_request_id,
    export_type,
    file_format,
    filters,
    created_at,
    context_account_id,
    company_ids,
    operational_scope_ids,
    export_result_id,
    processing_status,
    outcome=None,
    items_count=None,
    processed_at=None,
    error_message=None
)

Information about an export request and its result.

company_ids instance-attribute
company_ids
context_account_id instance-attribute
context_account_id
created_at instance-attribute
created_at
error_message class-attribute instance-attribute
error_message = None
export_request_id instance-attribute
export_request_id
export_result_id instance-attribute
export_result_id
export_type instance-attribute
export_type
file_format instance-attribute
file_format
filters instance-attribute
filters
items_count class-attribute instance-attribute
items_count = None
operational_scope_ids instance-attribute
operational_scope_ids
outcome class-attribute instance-attribute
outcome = None
processed_at class-attribute instance-attribute
processed_at = None
processing_status instance-attribute
processing_status
to_dict
to_dict()

Convert ExportInfo to a dictionary.

Source code in components/async_exports/public/entities/export_info.py
def to_dict(self) -> dict[str, Any]:
    """Convert ExportInfo to a dictionary."""
    return {
        "export_request_id": str(self.export_request_id),
        "export_type": self.export_type.value,
        "file_format": self.file_format.value,
        "filters": self.filters,
        "created_at": self.created_at.isoformat(),
        "context_account_id": str(self.context_account_id),
        "company_ids": self.company_ids,
        "operational_scope_ids": [str(id_) for id_ in self.operational_scope_ids]
        if self.operational_scope_ids
        else None,
        "export_result_id": str(self.export_result_id),
        "processing_status": self.processing_status.value,
        "outcome": self.outcome.value if self.outcome else None,
        "items_count": self.items_count,
        "processed_at": self.processed_at.isoformat()
        if self.processed_at
        else None,
        "error_message": self.error_message,
    }

export_request_data

ExportRequestData dataclass

ExportRequestData(
    *,
    profile_id,
    export_type,
    filters,
    file_format,
    context_account_id,
    company_ids,
    operational_scope_ids
)

Bases: DataClassJsonMixin

Represents data required to create an export request.

company_ids instance-attribute
company_ids
context_account_id instance-attribute
context_account_id
export_type instance-attribute
export_type
file_format instance-attribute
file_format
filters instance-attribute
filters
operational_scope_ids instance-attribute
operational_scope_ids
profile_id instance-attribute
profile_id

export_result_data

ExportResultData dataclass

ExportResultData(
    *,
    id,
    export_request_id,
    export_type,
    file_format,
    outcome,
    uri,
    items_count,
    created_at,
    context_account_id,
    company_ids,
    operational_scope_ids
)

Represents export result data for file downloads.

company_ids instance-attribute
company_ids
context_account_id instance-attribute
context_account_id
created_at instance-attribute
created_at
export_request_id instance-attribute
export_request_id
export_type instance-attribute
export_type
file_format instance-attribute
file_format
id instance-attribute
id
items_count instance-attribute
items_count
operational_scope_ids instance-attribute
operational_scope_ids
outcome instance-attribute
outcome
uri instance-attribute
uri

generated_export_data

GeneratedExportData dataclass

GeneratedExportData(file_content, mimetype, items_count)

Bases: DataClassJsonMixin

Represents the generated export data including file content, MIME type, and item count.

file_content instance-attribute
file_content
items_count instance-attribute
items_count
mimetype instance-attribute
mimetype

components.async_exports.public.enums

enums

ExportExtension

Bases: AlanBaseEnum

File extensions for export files.

csv class-attribute instance-attribute
csv = 'csv'
xlsx class-attribute instance-attribute
xlsx = 'xlsx'

ExportResultOutcome

Bases: AlanBaseEnum

Final outcome of an export result. - success: Export completed successfully with a file available. - failed: Export failed and no file is available.

failed class-attribute instance-attribute
failed = 'failed'
success class-attribute instance-attribute
success = 'success'

ExportResultProcessingStatus

Bases: AlanBaseEnum

Status of the background job processing. It's mirrored in the ExportRequest model for denormalisation purposes.

cancelled class-attribute instance-attribute
cancelled = 'cancelled'
pending class-attribute instance-attribute
pending = 'pending'
processed class-attribute instance-attribute
processed = 'processed'
processing class-attribute instance-attribute
processing = 'processing'

export_type

ExportConfig

Single source of truth for export types and their scope configuration.

EXPORT_DEFINITIONS class-attribute instance-attribute
EXPORT_DEFINITIONS = {
    ExportType("employees"): ExportConfiguration(
        scope="global", retention_period_days=30
    ),
    ExportType("employees_active"): ExportConfiguration(
        scope="global", retention_period_days=30
    ),
    ExportType("employees_suspended"): ExportConfiguration(
        scope="global", retention_period_days=30
    ),
    ExportType("exemptions"): ExportConfiguration(
        scope="local", retention_period_days=30
    ),
    ExportType(
        "premium_discrepancies"
    ): ExportConfiguration(
        scope="local", retention_period_days=30
    ),
    ExportType("billing_changes"): ExportConfiguration(
        scope="local", retention_period_days=1
    ),
    ExportType(
        "occupational_health_employees"
    ): ExportConfiguration(
        scope="global", retention_period_days=30
    ),
}
is_global_export classmethod
is_global_export(export_type)

Check if an export type uses global components.

Source code in components/async_exports/public/enums/export_type.py
@classmethod
def is_global_export(cls, export_type: ExportType) -> bool:
    """Check if an export type uses global components."""
    configuration = cls.EXPORT_DEFINITIONS.get(export_type)
    return configuration is not None and configuration.scope == "global"
is_local_export classmethod
is_local_export(export_type)

Check if an export type uses local components.

Source code in components/async_exports/public/enums/export_type.py
@classmethod
def is_local_export(cls, export_type: ExportType) -> bool:
    """Check if an export type uses local components."""
    configuration = cls.EXPORT_DEFINITIONS.get(export_type)
    return configuration is not None and configuration.scope == "local"

ExportConfiguration dataclass

ExportConfiguration(*, scope, retention_period_days=None)

Configuration of an export type, including its scope and optional retention period.

retention_period_days class-attribute instance-attribute
retention_period_days = None
scope instance-attribute
scope

ExportScope module-attribute

ExportScope = Literal['global', 'local']

ExportType

Bases: AlanBaseEnum

This enum defines the export types that can be generated. It serves as a single source of truth for export types and their scope configuration. Add new export types here as needed.

billing_changes class-attribute instance-attribute
billing_changes = 'billing_changes'
employees class-attribute instance-attribute
employees = 'employees'
employees_active class-attribute instance-attribute
employees_active = 'employees_active'
employees_suspended class-attribute instance-attribute
employees_suspended = 'employees_suspended'
exemptions class-attribute instance-attribute
exemptions = 'exemptions'
occupational_health_employees class-attribute instance-attribute
occupational_health_employees = (
    "occupational_health_employees"
)
premium_discrepancies class-attribute instance-attribute
premium_discrepancies = 'premium_discrepancies'

components.async_exports.public.queries

get_export

get_exports_query

get_exports_query(
    profile_id, user_id, export_type, limit=10
)

Get recent export requests and their results for a profile and an export type.

Parameters:

Name Type Description Default
profile_id UUID

The profile ID to get exports for

required
user_id UUID | int

The user ID for permission checks

required
export_type ExportType

Filter by export type

required
limit int

Maximum number of results (default: 10)

10

Returns:

Type Description
list[ExportInfo]

List of export information sorted by creation date (newest first)

Source code in components/async_exports/internal/business_logic/queries/get_exports.py
def get_exports_query(
    profile_id: uuid.UUID,
    user_id: uuid.UUID | int,
    export_type: ExportType,
    limit: int = 10,
) -> list[ExportInfo]:
    """
    Get recent export requests and their results for a profile and an export type.

    Args:
        profile_id: The profile ID to get exports for
        user_id: The user ID for permission checks
        export_type: Filter by export type
        limit: Maximum number of results (default: 10)

    Returns:
        List of export information sorted by creation date (newest first)
    """
    # Get recent export requests with results eager loaded (single optimized query)
    export_requests = ExportRequestBroker.get_exports_with_results_for_profile(
        profile_id=profile_id, export_type=export_type, limit=limit
    )

    # Filter by user permissions and build response using eager loaded data
    export_infos = []
    for request in export_requests:
        # Check if user can admin all entities for this export request
        if export_type != ExportType.occupational_health_employees:
            admined_entities = request.admined_entities
            if not user_can_admin_admined_entities(
                user_id=int(user_id), admined_entities=admined_entities
            ):
                continue

        # Get the latest result (results are ordered by created_at desc)
        latest_result = request.results[0] if request.results else None

        # Skip export requests that have no results or the latest result is deleted
        if latest_result is None or latest_result.deleted_at is not None:
            continue

        export_info = ExportInfo(
            # Export Request fields
            export_request_id=request.id,
            export_type=ExportType(request.export_type),
            file_format=ExportExtension(request.file_format),
            filters=request.filters,
            created_at=request.created_at,
            context_account_id=request.context_account_id,
            company_ids=request.company_ids,
            operational_scope_ids=request.operational_scope_ids,
            # Export Result fields
            export_result_id=mandatory(latest_result).id,
            processing_status=mandatory(latest_result).processing_status,
            outcome=latest_result.outcome if latest_result else None,
            items_count=latest_result.items_count if latest_result else None,
            processed_at=latest_result.processed_at if latest_result else None,
            error_message=latest_result.error_message if latest_result else None,
        )
        export_infos.append(export_info)

    return export_infos

get_export_results

get_export_results_for_download

get_export_results_for_download(user_id, export_result_ids)

Get export results for download with permission filtering.

Parameters:

Name Type Description Default
user_id int | UUID

The user ID for permission checking

required
export_result_ids set[UUID]

List of export result IDs to retrieve

required

Returns:

Type Description
list[ExportResultData]

List of ExportResultData objects for permitted results

Source code in components/async_exports/public/queries/get_export_results.py
def get_export_results_for_download(
    user_id: int | uuid.UUID,
    export_result_ids: set[uuid.UUID],
) -> list[ExportResultData]:
    """
    Get export results for download with permission filtering.

    Args:
        user_id: The user ID for permission checking
        export_result_ids: List of export result IDs to retrieve

    Returns:
        List of ExportResultData objects for permitted results
    """
    # Get export results with request context eager loaded
    export_results = (
        ExportResultBroker.get_non_deleted_export_results_with_request_context(
            export_result_ids=export_result_ids
        )
    )

    result_data = []
    for export_result in export_results:
        request = export_result.export_request
        export_type = ExportType(request.export_type)

        if export_type != ExportType.occupational_health_employees:
            admined_entities = request.admined_entities
            if not user_can_admin_admined_entities(
                user_id=user_id, admined_entities=admined_entities
            ):
                continue

        result_data.append(
            ExportResultData(
                id=export_result.id,
                export_request_id=request.id,
                export_type=export_type,
                file_format=request.file_format,
                outcome=export_result.outcome,
                uri=export_result.uri,
                items_count=export_result.items_count,
                created_at=request.created_at,
                context_account_id=request.context_account_id,
                company_ids=request.company_ids,
                operational_scope_ids=request.operational_scope_ids,
            )
        )

    return result_data