Skip to content

Api reference

components.employment.public.api

cancel

cancel(
    employment_id,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Sets the employment to cancelled.

Source code in components/employment/public/api.py
@employment_component_context
def cancel(
    employment_id: UUID,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Sets the employment to cancelled.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if latest_version_for_employment.is_cancelled:
            current_employment_logger.info(
                f"Didn't cancel Employment {employment_id} it is already cancelled",
                layer="imperative_input_validation",
            )
            return

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=[Cancellation()],
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

employment_component_context

employment_component_context(func)

A decorator to set a context variable indicating that an employment component function is running. This is useful to highlight updates of the legacy Employment table outside of the usage of the Employment Component which will help us finish the migration to the Employment Component.

Source code in components/employment/public/api.py
def employment_component_context(func: Callable[_P, _T]) -> Callable[_P, _T]:
    """
    A decorator to set a context variable indicating that an employment component function is running.
    This is useful to highlight updates of the legacy Employment table outside of the usage of the Employment Component
    which will help us finish the migration to the Employment Component.
    """

    def wrapper(
        *args: _P.args,
        **kwargs: _P.kwargs,
    ) -> _T:
        is_running_employment_component.set(True)
        try:
            return func(*args, **kwargs)
        finally:
            is_running_employment_component.set(False)

    return wrapper

ingest_employment_declaration

ingest_employment_declaration(
    employment_declaration,
    source_information,
    commit,
    event_bus_orchestrator=None,
    retried_blocked_movement=None,
    upstream_retried_blocked_movement=None,
    source_rules_override=None,
)

Ingests the given employment declarations, notifies the consumers, and executes side effects.

Source code in components/employment/public/api.py
@employment_component_context
def ingest_employment_declaration(
    employment_declaration: EmploymentDeclaration[_TValues],
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
    retried_blocked_movement: CoreBlockedMovementModel | None = None,
    upstream_retried_blocked_movement: (UpstreamBlockedMovementModel | None) = None,
    source_rules_override: SourceRules | None = None,
) -> IngestionResult:
    """
    Ingests the given employment declarations, notifies the consumers, and executes side effects.
    """
    current_employment_logger.info(
        "Ingesting employment declaration",
        layer="ingest_employment_declaration",
        clear_attributes={
            "source_type": str(employment_declaration.source_type),
            "company_id": employment_declaration.company_id,
            "user_id": employment_declaration.user_id,
        },
    )

    # TODO @matthieu.stombellini temporary to retry blocked movements created by COM-3782
    if employment_declaration.external_employee_id == "":
        employment_declaration = replace(
            employment_declaration, external_employee_id=None
        )

    with (
        BlockedMovementCreator(
            employment_declaration=employment_declaration,
            source_information=source_information,
            unexpected_error_class=UnexpectedEmploymentError,
            commit=commit,
            retried_blocked_movement=retried_blocked_movement,
            upstream_retried_blocked_movement=upstream_retried_blocked_movement,
            source_rules_override=source_rules_override,
        ) as blocked_movement_manager,
        bound_contextvars(
            user_id=employment_declaration.user_id,
            company_id=employment_declaration.company_id,
        ),
        use_orchestrator_or_create_multi_event_bus(
            event_bus_orchestrator, commit=commit
        ) as (event_bus, commit_and_execute_side_effects),
    ):
        employment_source_data_id = get_or_create_employment_source_data_model(
            employment_declaration.source_type.base_source_type,
            source_information,
        )

        change = create_employment_changes_for_declaration(
            employment_declaration,
            employment_source_data_id=employment_source_data_id,
            source_rules_override=source_rules_override,
        )

        current_session.flush()

        if change is not None:
            notify_consumers_with_blocked_movements(
                change=change,
                blocked_movement_manager=blocked_movement_manager,
                event_bus_orchestrator=event_bus,
                source_type=employment_declaration.source_type,
            )
        commit_and_execute_side_effects()

    return IngestionResult(
        success=not blocked_movement_manager.created_blocked_movement,
        employment_source_data=blocked_movement_manager.employment_source_data
        or EmploymentSourceData.from_employment_source_data_model(
            get_or_raise_missing_resource(
                EmploymentSourceDataModel, employment_source_data_id
            )
        ),
    )

ingest_employment_declaration_without_blocked_movement

ingest_employment_declaration_without_blocked_movement(
    employment_declaration,
    source_information,
    commit,
    event_bus_orchestrator=None,
    source_rules_override=None,
)

Ingests the given employment declarations, notifies the consumers, and executes side effects.

Source code in components/employment/public/api.py
@employment_component_context
def ingest_employment_declaration_without_blocked_movement(
    employment_declaration: EmploymentDeclaration[_TValues],
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
    source_rules_override: SourceRules | None = None,
) -> IngestionResult:
    """
    Ingests the given employment declarations, notifies the consumers, and executes side effects.
    """
    current_employment_logger.info(
        "Ingesting employment declaration without blocked movements",
        layer="ingest_employment_declaration",
        clear_attributes={
            "source_type": str(employment_declaration.source_type),
            "company_id": employment_declaration.company_id,
            "user_id": employment_declaration.user_id,
        },
    )

    with (
        bound_contextvars(
            user_id=employment_declaration.user_id,
            company_id=employment_declaration.company_id,
        ),
        use_orchestrator_or_create_multi_event_bus(
            event_bus_orchestrator, commit=commit
        ) as (event_bus, commit_and_execute_side_effects),
    ):
        employment_source_data_id = get_or_create_employment_source_data_model(
            employment_declaration.source_type.base_source_type,
            source_information,
        )

        change = create_employment_changes_for_declaration(
            employment_declaration,
            employment_source_data_id=employment_source_data_id,
            source_rules_override=source_rules_override,
        )

        current_session.flush()

        if change is not None:
            notify_consumers_without_blocked_movements(
                change=change,
                event_bus_orchestrator=event_bus,
                source_type=employment_declaration.source_type,
            )
        commit_and_execute_side_effects()

    return IngestionResult(
        success=True,
        employment_source_data=EmploymentSourceData.from_employment_source_data_model(
            get_or_raise_missing_resource(
                EmploymentSourceDataModel, employment_source_data_id
            )
        ),
    )

is_running_employment_component module-attribute

is_running_employment_component = ContextVar(
    "is_running_employment_component", default=False
)

resume

resume(
    employment_id,
    source_type,
    source_information,
    commit,
    extended_information=None,
    event_bus_orchestrator=None,
)

Resumes an ended employment.

Source code in components/employment/public/api.py
@employment_component_context
def resume(
    employment_id: UUID,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    extended_information: list[ExtendedInformation[_TValues]] | None = None,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Resumes an ended employment.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if not latest_version_for_employment.end_date:
            current_employment_logger.info(
                f"Didn't resume Employment {employment_id} as it is not ended",
                layer="imperative_input_validation",
            )
            return

        payloads: list[EmploymentChangePayload] = [Resumption()]
        if extended_information:
            payloads.append(
                ExtendedEmploymentValueChange(extended_information=extended_information)
            )

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=payloads,
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

set_extended_employment_values

set_extended_employment_values(
    employment_id,
    values,
    validity_period,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Create a new ExtendedEmploymentUpdate with the given values for the employment with the given id.

The update will contain only the values passed as argument, the other values will remain unchanged. Doc: https://www.notion.so/alaninsurance/Definitions-employments-sources-etc-1301426e8be780cc9796ee31f49559e5?pvs=4#1301426e8be78052977ff4fbb932c8f7 ⧉

Source code in components/employment/public/api.py
@employment_component_context
def set_extended_employment_values(
    employment_id: UUID,
    values: _TValues,
    validity_period: ValidityPeriod | None,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Create a new ExtendedEmploymentUpdate with the given values for the employment with the given id.

    The update will contain only the values passed as argument, the other values will remain unchanged.
    Doc: https://www.notion.so/alaninsurance/Definitions-employments-sources-etc-1301426e8be780cc9796ee31f49559e5?pvs=4#1301426e8be78052977ff4fbb932c8f7
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        # We don't 'mandatory(...)' the result here as it's possible that, if all the values were already registered,
        # nothing happens.
        change = create_employment_changes(
            employment_change_payloads=[
                ExtendedEmploymentValueChange(
                    extended_information=[
                        ExtendedInformation(
                            values=values, validity_period=validity_period
                        )
                    ]
                )
            ],
            initial_core_employment_version=latest_version_for_employment,
            employment_source_data_id=employment_source_data_id,
            source_type=source_type,
        )

        if change is not None:
            notify_consumers_without_blocked_movements(
                change=change,
                source_type=source_type,
                event_bus_orchestrator=event_bus,
            )
        commit_and_execute_side_effects()

set_external_employee_id

set_external_employee_id(
    employment_id,
    new_external_employee_id,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Updates the external employee id of the employment with the given id. WARNING: This function does not check if the external employee id is valid according to local country rules. Please do that check upstream.

Source code in components/employment/public/api.py
@employment_component_context
def set_external_employee_id(
    employment_id: UUID,
    new_external_employee_id: str | None,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Updates the external employee id of the employment with the given id.
    WARNING: This function does not check if the external employee id is valid according to local country rules. Please do that check upstream.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )

        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )
        check_external_employee_id_does_not_collide_with_existing_overlapping_employment(
            external_employee_id=new_external_employee_id,
            user_id=latest_version_for_employment.user_id,
            company_id=latest_version_for_employment.company_id,
            start_date=latest_version_for_employment.start_date,
            end_date=latest_version_for_employment.end_date,
        )

        if (
            latest_version_for_employment.external_employee_id
            == new_external_employee_id
        ):
            current_employment_logger.info(
                f"Can't update external employee id, Employment {employment_id} already has external employee id {new_external_employee_id}",
                layer="imperative_input_validation",
            )
        else:
            change = mandatory(
                create_employment_changes(
                    employment_change_payloads=[
                        ExternalEmployeeIdChange(
                            external_employee_id=new_external_employee_id
                        )
                    ],
                    initial_core_employment_version=latest_version_for_employment,
                    employment_source_data_id=employment_source_data_id,
                    source_type=source_type,
                )
            )

            notify_consumers_without_blocked_movements(
                change=change,
                event_bus_orchestrator=event_bus,
                source_type=source_type,
            )

        commit_and_execute_side_effects()

terminate_or_update_termination

terminate_or_update_termination(
    employment_id,
    end_date,
    extended_information,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)
Source code in components/employment/public/api.py
@employment_component_context
def terminate_or_update_termination(  # noqa: D103
    employment_id: UUID,
    end_date: date,
    extended_information: list[ExtendedInformation[_TValues]],
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )
        current_end_date = latest_version_for_employment.end_date
        payloads: list[EmploymentChangePayload] = [
            ExtendedEmploymentValueChange(extended_information=extended_information)
        ]
        if not current_end_date:
            payloads.append(Termination(end_date=end_date))
        elif current_end_date != end_date:
            payloads.append(EndDateChange(end_date=end_date))

        change = create_employment_changes(
            employment_change_payloads=payloads,
            initial_core_employment_version=latest_version_for_employment,
            employment_source_data_id=employment_source_data_id,
            source_type=source_type,
        )

        if not change:
            current_employment_logger.info(
                f"Can't update termination, Employment {employment_id} already has the requested end date {end_date}",
                layer="imperative_input_validation",
            )
            return

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

transfer

transfer(
    current_employment_id,
    new_employment_declaration,
    source_information,
    commit,
    event_bus_orchestrator=None,
)
Source code in components/employment/public/api.py
@employment_component_context
def transfer(  # noqa: D103
    current_employment_id: UUID,
    new_employment_declaration: EmploymentDeclaration[_TValues],
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> UUID:
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            new_employment_declaration.source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            current_employment_id
        )

        if (
            latest_version_for_employment.company_id
            == new_employment_declaration.company_id
        ):
            raise CannotTransferToSameCompanyError(
                f"Cannot transfer: employment {latest_version_for_employment.employment_id} is already in company {new_employment_declaration.company_id}"
            )

        check_external_employee_id_does_not_collide_with_existing_overlapping_employment(
            external_employee_id=new_employment_declaration.external_employee_id,
            user_id=new_employment_declaration.user_id,
            company_id=new_employment_declaration.company_id,
            start_date=new_employment_declaration.start_date,
            end_date=new_employment_declaration.end_date,
        )

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=[Transfer(new_employment_declaration)],
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=new_employment_declaration.source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=new_employment_declaration.source_type,
            event_bus_orchestrator=event_bus,
        )

        commit_and_execute_side_effects()

        return change.core_employment_version.employment_id

uncancel

uncancel(
    employment_id,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Undo the cancellation of the employment.

Source code in components/employment/public/api.py
@employment_component_context
def uncancel(
    employment_id: UUID,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Undo the cancellation of the employment.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if not latest_version_for_employment.is_cancelled:
            current_employment_logger.info(
                f"Didn't uncancel Employment {employment_id} it is not cancelled",
                layer="imperative_input_validation",
            )
            return

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=[Uncancellation()],
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

update_start_date

update_start_date(
    employment_id,
    new_start_date,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)
Source code in components/employment/public/api.py
@employment_component_context
def update_start_date(  # noqa: D103
    employment_id: UUID,
    new_start_date: date,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if latest_version_for_employment.start_date == new_start_date:
            current_employment_logger.info(
                f"Can't update start date, Employment {employment_id} already has start date {new_start_date}",
                layer="imperative_input_validation",
            )
        else:
            change = mandatory(
                create_employment_changes(
                    employment_change_payloads=[
                        StartDateChange(start_date=new_start_date)
                    ],
                    initial_core_employment_version=latest_version_for_employment,
                    employment_source_data_id=employment_source_data_id,
                    source_type=source_type,
                )
            )

            notify_consumers_without_blocked_movements(
                change=change,
                event_bus_orchestrator=event_bus,
                source_type=source_type,
            )

        commit_and_execute_side_effects()

components.employment.public.business_logic

actions

blocked_movement

amend_core_blocked_movement_note
amend_core_blocked_movement_note(
    core_blocked_movement_id, new_note, commit=True
)

Appends the provided note to the end of the core blocked movement's current note.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def amend_core_blocked_movement_note(
    core_blocked_movement_id: UUID, new_note: str, commit: bool = True
) -> None:
    """
    Appends the provided note to the end of the core blocked movement's current note.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    core_blocked_movement.note = (
        f"{core_blocked_movement.note}\n{new_note}"
        if core_blocked_movement.note
        else new_note
    )
    if commit:
        current_session.commit()
amend_upstream_blocked_movement_note
amend_upstream_blocked_movement_note(
    upstream_blocked_movement_id, new_note, commit=True
)

Appends the provided note to the end of the upstream blocked movement's current note.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def amend_upstream_blocked_movement_note(
    upstream_blocked_movement_id: UUID, new_note: str, commit: bool = True
) -> None:
    """
    Appends the provided note to the end of the upstream blocked movement's current note.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    upstream_blocked_movement.note = (
        f"{upstream_blocked_movement.note}\n{new_note}"
        if upstream_blocked_movement.note
        else new_note
    )
    if commit:
        current_session.commit()
automatically_cancel_pending_core_blocked_movement
automatically_cancel_pending_core_blocked_movement(
    core_blocked_movement_id, commit
)

Automatically cancel a core blocked movement.

This should only be used to automatically cancel a blocked movement immediately after its creation.

Otherwise: - If the cancellation is a result of a manual, human action, use manually_cancel_core_blocked_movement instead. - If the cancellation is part of an async/automated process, use mark_...as_self_healing as soon as possible, then cancel_self_healing..._blocked_movement

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def automatically_cancel_pending_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Automatically cancel a core blocked movement.

    This should only be used to automatically cancel a blocked movement immediately after its creation.

    Otherwise:
    - If the cancellation is a result of a manual, human action, use manually_cancel_core_blocked_movement instead.
    - If the cancellation is part of an async/automated process, use mark_..._as_self_healing as soon as possible, then
      cancel_self_healing_..._blocked_movement
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
automatically_cancel_pending_upstream_blocked_movement
automatically_cancel_pending_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Automatically cancel an upstream blocked movement.

This should only be used to automatically cancel a blocked movement immediately after its creation.

Otherwise: - If the cancellation is a result of a manual, human action, use manually_cancel_upstream_blocked_movement instead. - If the cancellation is part of an async/automated process, use mark_...as_self_healing as soon as possible, then cancel_self_healing..._blocked_movement

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def automatically_cancel_pending_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Automatically cancel an upstream blocked movement.

    This should only be used to automatically cancel a blocked movement immediately after its creation.

    Otherwise:
    - If the cancellation is a result of a manual, human action, use manually_cancel_upstream_blocked_movement instead.
    - If the cancellation is part of an async/automated process, use mark_..._as_self_healing as soon as possible, then
      cancel_self_healing_..._blocked_movement
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
cancel_pending_blocked_movement
cancel_pending_blocked_movement(
    blocked_movement_id, commit
)

Cancels a pending blocked movement by its ID. It'll manage indifferently if it's an upstream or core blocked movement.

If blocked movement is not found or is not in the 'pending' status, an error will be raised

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def cancel_pending_blocked_movement(blocked_movement_id: UUID, commit: bool) -> None:
    """
    Cancels a pending blocked movement by its ID.
    It'll manage indifferently if it's an upstream or core blocked movement.

    If blocked movement is not found or is not in the 'pending' status,
    an error will be raised
    """
    blocked_movement = get_upstream_or_core_blocked_movement_or_none(
        blocked_movement_id
    )
    if not blocked_movement:
        raise BaseErrorCode.missing_resource(
            f"No blocked movement found with ID {blocked_movement_id}"
        )

    if isinstance(blocked_movement, CoreBlockedMovement):
        manually_cancel_core_blocked_movement(blocked_movement_id, commit=commit)
    elif isinstance(blocked_movement, UpstreamBlockedMovement):
        manually_cancel_upstream_blocked_movement(blocked_movement_id, commit=commit)
    else:
        raise ValueError(f"Unsupported blocked movement type: {type(blocked_movement)}")
cancel_self_healing_core_blocked_movement
cancel_self_healing_core_blocked_movement(
    core_blocked_movement_id, commit
)

Cancels a core blocked movement which you previously took ownership of via mark_pending_core_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def cancel_self_healing_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancels a core blocked movement which you previously took ownership of via
    `mark_pending_core_blocked_movement_as_self_healing`.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={BlockedMovementStatus.pending_self_healing},
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
cancel_self_healing_upstream_blocked_movement
cancel_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Cancels an upstream blocked movement which you previously took ownership of via mark_pending_upstream_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def cancel_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancels an upstream blocked movement which you previously took ownership of via
    `mark_pending_upstream_blocked_movement_as_self_healing`.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending_self_healing},
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
manually_cancel_core_blocked_movement
manually_cancel_core_blocked_movement(
    core_blocked_movement_id, commit
)

Cancel a core blocked movement as a result of a manual, human action. The blocked movement may be pending or awaiting_user_response.

To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self healing as early as possible (via mark_pending_core_blocked_movement_as_self_healing), then cancel it using cancel_self_healing_core_blocked_movement when needed.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def manually_cancel_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancel a core blocked movement as a result of a manual, human action. The blocked movement may be pending or awaiting_user_response.

    To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self
    healing as early as possible (via `mark_pending_core_blocked_movement_as_self_healing`), then cancel it
    using `cancel_self_healing_core_blocked_movement` when needed.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={
            BlockedMovementStatus.pending,
            BlockedMovementStatus.awaiting_user_response,
        },
        to_status=BlockedMovementStatus.cancelled,
        commit=commit,
    )
manually_cancel_upstream_blocked_movement
manually_cancel_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Cancel an upstream blocked movement as a result of a manual, human action. The blocked movement may be pending or awaiting_user_response.

To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self healing as early as possible (via mark_pending_upstream_blocked_movement_as_self_healing), then cancel it using cancel_self_healing_upstream_blocked_movement when needed.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def manually_cancel_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancel an upstream blocked movement as a result of a manual, human action. The blocked movement may be pending or awaiting_user_response.

    To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self
    healing as early as possible (via `mark_pending_upstream_blocked_movement_as_self_healing`), then cancel it
    using `cancel_self_healing_upstream_blocked_movement` when needed.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={
            BlockedMovementStatus.pending,
            BlockedMovementStatus.awaiting_user_response,
        },
        to_status=BlockedMovementStatus.cancelled,
        commit=commit,
    )
mark_pending_core_blocked_movement_as_self_healing
mark_pending_core_blocked_movement_as_self_healing(
    core_blocked_movement_id, commit
)

Mark a core blocked movement as self healing. By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_core_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_core_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_core_blocked_movement_as_self_healing(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a core blocked movement as self healing. By using this, you signal to the Employment Component that a
    custom local process will take care of the blocked movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_core_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_core_blocked_movement` to resolve it (`automatically_resolved`)
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
mark_pending_or_awaiting_user_response_core_blocked_movement_as_self_healing
mark_pending_or_awaiting_user_response_core_blocked_movement_as_self_healing(
    core_blocked_movement_id, commit
)

Mark a core blocked movement as self healing. This is a variant of mark_pending_core_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as "awaiting_user_response" by Ops. Typical use cases include:

  • Automatically moving the to a self-healing resolution process
  • An Ops taking a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing resolution process.

By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_core_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_core_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_or_awaiting_user_response_core_blocked_movement_as_self_healing(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a core blocked movement as self healing. This is a variant of
    mark_pending_core_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as
    "awaiting_user_response" by Ops. Typical use cases include:

    - Automatically moving the to a self-healing resolution process
    - An Ops taking a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing
      resolution process.

    By using this, you signal to the Employment Component that a custom local process will take care of the blocked
    movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_core_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_core_blocked_movement` to resolve it (`automatically_resolved`)
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={
            BlockedMovementStatus.awaiting_user_response,
            BlockedMovementStatus.pending,
        },
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
mark_pending_or_awaiting_user_response_upstream_blocked_movement_as_self_healing
mark_pending_or_awaiting_user_response_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id, commit
)

Mark a upstream blocked movement as self healing. This is a variant of mark_pending_upstream_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as "awaiting_user_response" by Ops. Typical use cases include:

  • Automatically moving the to a self-healing resolution process
  • An Ops contacting admin or member about a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing resolution process.

By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_upstream_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_upstream_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_or_awaiting_user_response_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a upstream blocked movement as self healing. This is a variant of
    mark_pending_upstream_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as
    "awaiting_user_response" by Ops. Typical use cases include:

    - Automatically moving the to a self-healing resolution process
    - An Ops contacting admin or member about a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing
      resolution process.

    By using this, you signal to the Employment Component that a custom local process will take care of the blocked
    movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_upstream_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_upstream_blocked_movement` to resolve it (`automatically_resolved`)
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )

    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={
            BlockedMovementStatus.awaiting_user_response,
            BlockedMovementStatus.pending,
        },
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
mark_pending_self_healing_core_blocked_movement_as_awaiting_user_response_self_healing
mark_pending_self_healing_core_blocked_movement_as_awaiting_user_response_self_healing(
    core_blocked_movement_id, commit
)

Mark a core blocked movement you previously took ownership of via mark_pending_core_blocked_movement_as_self_healing as awaiting user response, but still under the "self healing" process.

This is useful if you need to contact the user/admin about the blocked movement, but you still want to keep ownership of the blocked movement.

awaiting_user_response_self_healing is considered as a terminal status in SLA computation.

By using this, you signal to the Employment Component that a custom local process still takes care of the blocked movement. Typically, this blocked movement will not be automatically retried, and won't be displayed by default in the Ops tool.

To "end" a blocked movement you took ownership of, use: - cancel_self_healing_core_blocked_movement to cancel it (automatically_cancelled) - resolve_self_healing_core_blocked_movement to resolve it (automatically_resolved)

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_self_healing_core_blocked_movement_as_awaiting_user_response_self_healing(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a core blocked movement you previously took ownership of via
    `mark_pending_core_blocked_movement_as_self_healing` as awaiting user response,
    but still under the "self healing" process.

    This is useful if you need to contact the user/admin about the blocked movement, but you still want to keep
    ownership of the blocked movement.

    `awaiting_user_response_self_healing` is considered as a terminal status in SLA computation.

    By using this, you signal to the Employment Component that a custom local process still takes care of the blocked movement.
    Typically, this blocked movement will not be automatically retried, and won't be displayed by default in the Ops tool.

    To "end" a blocked movement you took ownership of, use:
    - `cancel_self_healing_core_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_core_blocked_movement` to resolve it (`automatically_resolved`)
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={
            BlockedMovementStatus.pending_self_healing,
        },
        to_status=BlockedMovementStatus.awaiting_user_response_self_healing,
        commit=commit,
    )
mark_pending_upstream_blocked_movement_as_self_healing
mark_pending_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id, commit
)

Mark an upstream blocked movement as self healing. By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_upstream_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_upstream_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark an upstream blocked movement as self healing. By using this, you signal to the Employment Component that a
    custom local process will take care of the blocked movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_upstream_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_upstream_blocked_movement` to resolve it (`automatically_resolved`)
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
resolve_self_healing_core_blocked_movement
resolve_self_healing_core_blocked_movement(
    core_blocked_movement_id, commit
)

Resolves a core blocked movement which you previously took ownership of via mark_pending_core_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def resolve_self_healing_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Resolves a core blocked movement which you previously took ownership of via
    `mark_pending_core_blocked_movement_as_self_healing`.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={BlockedMovementStatus.pending_self_healing},
        to_status=BlockedMovementStatus.automatically_resolved,
        commit=commit,
    )
resolve_self_healing_upstream_blocked_movement
resolve_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Resolves an upstream blocked movement which you previously took ownership of via mark_pending_upstream_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def resolve_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Resolves an upstream blocked movement which you previously took ownership of via
    `mark_pending_upstream_blocked_movement_as_self_healing`.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending_self_healing},
        to_status=BlockedMovementStatus.automatically_resolved,
        commit=commit,
    )
retry_core_blocked_movement
retry_core_blocked_movement(
    blocked_movement_id,
    *,
    employment_declaration_override=None,
    source_rules_override=None,
    commit=False
)

Retry a core blocked movement

Retries a core blocked movement, using the provided overrides.

Parameters:

Name Type Description Default
blocked_movement_id UUID

The ID of the core blocked movement to retry

required
employment_declaration_override EmploymentDeclaration | None

If set, will be used instead of the blocked movement's declaration. Defaults to None.

None
source_rules_override SourceRules | None

If set, will be used instead of the blocked movement's original source type's source rules. Defaults to None.

None
commit bool

True will commit (incl. executing side effects), False will dry-run. Defaults to False.

False
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def retry_core_blocked_movement(
    blocked_movement_id: UUID,
    *,
    employment_declaration_override: EmploymentDeclaration[Any] | None = None,
    source_rules_override: SourceRules | None = None,
    commit: bool = False,
) -> bool:
    """Retry a core blocked movement

    Retries a core blocked movement, using the provided overrides.

    Args:
        blocked_movement_id (UUID): The ID of the core blocked movement to retry
        employment_declaration_override (EmploymentDeclaration | None, optional): If set, will be used instead of the blocked movement's declaration. Defaults to None.
        source_rules_override (SourceRules | None, optional): If set, will be used instead of the blocked movement's original source type's source rules. Defaults to None.
        commit (bool, optional): True will commit (incl. executing side effects), False will dry-run. Defaults to False.
    """
    blocked_movement: CoreBlockedMovementModel | None = current_session.get(
        BlockedMovementModelBroker.model, blocked_movement_id
    )
    if blocked_movement is None:
        current_employment_logger.error(
            f"Will skip retrying core blocked movement {blocked_movement_id}: blocked movement not found",
            layer="retry_blocked_movements",
            clear_attributes={"upstream_blocked_movement_id": str(blocked_movement_id)},
        )
        raise BaseErrorCode.missing_resource(
            description=f"No enrollment for User {blocked_movement_id}."
        )

    if employment_declaration_override:
        employment_declaration_to_retry = employment_declaration_override
        if not blocked_movement.input_override:
            blocked_movement.input_override = []
        blocked_movement.input_override.append(
            InputOverride(
                retried_at=datetime.utcnow(),
                employment_declaration=employment_declaration_to_retry,
            ).to_dict()
        )
    else:
        employment_declaration_to_retry = EmploymentDeclaration.from_dict(
            blocked_movement.context.get("employment_declaration_to_retry")
            or blocked_movement.context["employment_declaration"]
        )
    if source_rules_override:
        blocked_movement.source_rules_override = source_rules_override  # type: ignore[assignment]
    else:
        source_rules_override = (
            SourceRules.from_dict(blocked_movement.source_rules_override)
            if blocked_movement.source_rules_override
            else None
        )

    retry_function = get_retry_function(employment_declaration_to_retry)

    result = retry_function(
        employment_declaration_to_retry,
        source_information=blocked_movement.employment_source_data_id,
        commit=commit,
        retried_blocked_movement=blocked_movement,
        source_rules_override=source_rules_override,
    )

    return result.success
retry_upstream_blocked_movement
retry_upstream_blocked_movement(
    upstream_blocked_movement_id,
    *,
    upstream_data_override=None,
    commit=False
)

Retries an upstream blocked movement

Parameters:

Name Type Description Default
upstream_blocked_movement_id UUID

The ID of the blocked movement to retry

required
upstream_data_override Optional[dict[str, Any]]

If set, values present in this dict will override those present in the upstream blocked movement's raw_data. Defaults to None.

None
commit bool

True will commit (incl. executing side effects), False will dry-run. Defaults to False.

False
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def retry_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID,
    *,
    upstream_data_override: Optional[dict[str, Any]] = None,
    commit: bool = False,
) -> bool:
    """Retries an upstream blocked movement

    Args:
        upstream_blocked_movement_id (UUID): The ID of the blocked movement to retry
        upstream_data_override (Optional[dict[str, Any]], optional): If set, values present in this dict will override those present in the upstream blocked movement's `raw_data`. Defaults to None.
        commit (bool, optional): True will commit (incl. executing side effects), False will dry-run. Defaults to False.
    """
    from components.employment.public.business_logic.actions.upstream_blocked_movement_creator import (
        UpstreamBlockedMovementCreator,
    )

    blocked_movement: UpstreamBlockedMovementModel | None = current_session.get(
        UpstreamBlockedMovementModelBroker.model, upstream_blocked_movement_id
    )
    if blocked_movement is None:
        current_employment_logger.error(
            f"Will skip retrying upstream blocked movement {upstream_blocked_movement_id}: blocked movement not found",
            layer="retry_blocked_movements",
            clear_attributes={
                "upstream_blocked_movement_id": str(upstream_blocked_movement_id)
            },
        )
        return False

    data_to_retry = blocked_movement.upstream_data

    if upstream_data_override:
        data_to_retry = {
            **data_to_retry,
            **upstream_data_override,
        }
        blocked_movement.input_override = upstream_data_override  # type: ignore[assignment]

    source_type = LocalSourceType(blocked_movement.source_type)
    upstream_retry_handler = get_upstream_retry_handler(source_type)

    if not upstream_retry_handler:
        current_employment_logger.error(
            f"Will skip retrying upstream blocked movement {blocked_movement.id}: {blocked_movement.source_type} does not have an upstream retry handler",
            layer="retry_blocked_movements",
            clear_attributes={
                "upstream_blocked_movement_id": str(blocked_movement.id),
                "source": blocked_movement.source_type,
            },
        )
        return False
    else:
        employment_declaration: EmploymentDeclaration[Any] | None = None
        with UpstreamBlockedMovementCreator(
            commit=commit,
            source=SourceType(blocked_movement.source_type),
            upstream_data=data_to_retry,
            upstream_retried_blocked_movement=blocked_movement,
            source_information=blocked_movement.employment_source_data_id,
        ):
            employment_declaration = upstream_retry_handler(
                data_to_retry,
                blocked_movement.employment_source_data_id,
                commit,
            )
            if employment_declaration is None:
                # This case is OK: no EmploymentDeclaration needed to be made
                return True
        if employment_declaration:
            retry_function = get_retry_function(employment_declaration)

            result = retry_function(
                employment_declaration,
                source_information=blocked_movement.employment_source_data_id,
                commit=commit,
                upstream_retried_blocked_movement=blocked_movement,
            )
            return result.success
        else:
            return False

delete_user

delete_user_in_employment_component
delete_user_in_employment_component(user_id, deleter)

Support function for the "delete user" procedure. Delete all employments (core and extended) that target user and related data (blocked movements and source data).

Note that this will NOT be notified to consumers.

Source code in components/employment/public/business_logic/actions/delete_user.py
def delete_user_in_employment_component(
    user_id: UserId,
    deleter: "Deleter",
) -> None:
    """
    Support function for the "delete user" procedure.
    Delete all employments (core and extended) that target user and related data (blocked movements and source data).

    Note that this will NOT be notified to consumers.
    """
    employment_source_data = (
        current_session.query(EmploymentSourceDataModel)  # noqa: ALN085
        .outerjoin(
            EmploymentSourceDataModel.core_employment_versions,
        )
        .outerjoin(
            EmploymentSourceDataModel.core_blocked_movements,
        )
        .outerjoin(
            EmploymentSourceDataModel.upstream_blocked_movements,
        )
        .filter(
            or_(
                CoreEmploymentVersionModel.user_id == user_id,
                CoreBlockedMovementModel.user_id == user_id,
                UpstreamBlockedMovementModel.user_id == user_id,
            )
        )
        .all()
    )

    core_employments = list(
        flatten([source.core_employment_versions for source in employment_source_data])
    )
    latest_core_employment_versions = (
        current_session.query(LatestCoreEmploymentVersion)  # noqa: ALN085
        .filter(
            LatestCoreEmploymentVersion.employment_id.in_(
                {core_employment.employment_id for core_employment in core_employments}
            )
        )
        .all()
    )
    extended_employments = (
        current_session.query(ExtendedEmploymentUpdateModel)  # noqa: ALN085
        .filter(
            ExtendedEmploymentUpdateModel.employment_id.in_(
                {core_employment.employment_id for core_employment in core_employments}
            )
        )
        .all()
    )
    employment_source_data += [
        extended_employment.employment_source_data  # type: ignore[misc]
        for extended_employment in extended_employments
    ]
    core_blocked_movements = list(
        flatten([source.core_blocked_movements for source in employment_source_data])
    )
    upstream_blocked_movements = list(
        flatten(
            [source.upstream_blocked_movements for source in employment_source_data]
        )
    )
    deleter.delete_objects(
        latest_core_employment_versions
        + core_employments
        + extended_employments
        + core_blocked_movements
        + upstream_blocked_movements
        + employment_source_data  # type: ignore[operator]
    )

employment_source_data

find_similar_source_data_and_mark_as_duplicate
find_similar_source_data_and_mark_as_duplicate(
    source_type,
    user_identifier_field_name,
    company_identifier_field_name,
    start_date_field_name,
    field_names_to_compare,
    raw_data_to_compare,
    commit,
)

Looks at the latest EmploymentSourceData with the same source type and identifier fields. If the values are the same as raw_data_to_compare for all fields in field_names_to_compare, it updates the source's metadata with last_received_at=utcnow() and returns its ID. Otherwise, it returns None.

Note: if an identifier field is not present in raw_data_to_compare, it will only consider EmploymentSourceData that do not have this field in their raw_data.

param field_names_to_compare: list of fields from raw_data that will need to be equal to be considered as similar param raw_data_to_compare: new raw data dict for which we want to look for existing similar data

Source code in components/employment/public/business_logic/actions/employment_source_data.py
def find_similar_source_data_and_mark_as_duplicate(
    source_type: LocalizedSourceType,
    user_identifier_field_name: str,
    company_identifier_field_name: str,
    start_date_field_name: str,
    field_names_to_compare: list[str],
    raw_data_to_compare: dict[str, Any],
    commit: bool,
) -> UUID | None:
    """
    Looks at the latest EmploymentSourceData with the same source type and identifier fields.
    If the values are the same as `raw_data_to_compare` for all fields in `field_names_to_compare`,
    it updates the source's metadata with `last_received_at=utcnow() and returns its ID.`
    Otherwise, it returns None.

    Note: if an identifier field is not present in `raw_data_to_compare`,
    it will only consider EmploymentSourceData that do not have this field in their raw_data.

    param field_names_to_compare: list of fields from `raw_data` that will need to be equal to be considered as similar
    param raw_data_to_compare: new raw data dict for which we want to look for existing similar data
    """
    base_filter = EmploymentSourceDataModel.source_type == source_type.base_source_type
    identifiers = {}
    if user_identifier_field_name in raw_data_to_compare:
        identifiers[user_identifier_field_name] = raw_data_to_compare[
            user_identifier_field_name
        ]
    else:
        base_filter = and_(
            base_filter,
            not_(
                EmploymentSourceDataModel.source_metadata.has_key(
                    user_identifier_field_name
                )
            ),
        )
    if company_identifier_field_name in raw_data_to_compare:
        identifiers[company_identifier_field_name] = raw_data_to_compare[
            company_identifier_field_name
        ]
    else:
        base_filter = and_(
            base_filter,
            not_(
                EmploymentSourceDataModel.source_metadata.has_key(
                    company_identifier_field_name
                )
            ),
        )
    if start_date_field_name in raw_data_to_compare:
        identifiers[start_date_field_name] = raw_data_to_compare[start_date_field_name]
    else:
        base_filter = and_(
            base_filter,
            not_(
                EmploymentSourceDataModel.source_metadata.has_key(start_date_field_name)
            ),
        )
    if identifiers:
        base_filter = and_(
            base_filter, EmploymentSourceDataModel.raw_data.contains(identifiers)
        )

    latest_data_for_user_company: EmploymentSourceDataModel | None = (
        current_session.query(EmploymentSourceDataModel)  # noqa: ALN085
        .filter(base_filter)
        .order_by(EmploymentSourceDataModel.created_at.desc())
        .first()
    )

    if latest_data_for_user_company is None:
        return None

    for field in field_names_to_compare:
        if str(
            latest_data_for_user_company.raw_data.get(field, "field_not_in_data")
        ) != str(raw_data_to_compare.get(field, "field_not_in_data")):
            return None

    update_employment_source_data_metadata_with(
        employment_source_data_id=latest_data_for_user_company.id,
        update_dict={"last_received_at": utcnow()},
        commit=commit,
    )

    return latest_data_for_user_company.id
run_rollout_initialisation
run_rollout_initialisation(
    source_type, source_information, commit
)

For some sources (most likely automated one), when activating it for a company, we don't want to process ALL past data (because of the high risk of data inconsistencies that can be too costly to fix).

Instead, we want to start processing data from the moment the source is activated.

To do so, we need to create a new EmploymentSourceDataModel with the source_information provided and rely on the "source deduplication" mechanism (see find_similar_source_data_and_mark_as_duplicate above and doc: https://www.notion.so/alaninsurance/Source-Deduplication-37060a0d1c884c1b970aa6b2a4600ea4?pvs=26&qid=1%3Ac0d72661-8c33-41aa-9b99-25d9fd8203ee%3A5 ⧉)

We use the rollout_initialisation metadata key to mark the source as "initialised" (to help excluded those when needed)

These sources are excluded from SLA computations.

Source code in components/employment/public/business_logic/actions/employment_source_data.py
def run_rollout_initialisation(
    source_type: SourceType, source_information: SourceInformation, commit: bool
) -> None:
    """
    For some sources (most likely automated one), when activating it for a company,
    we don't want to process ALL past data (because of the high risk of data inconsistencies that can be too costly to fix).

    Instead, we want to start processing data from the moment the source is activated.

    To do so, we need to create a new EmploymentSourceDataModel with the source_information provided and rely on the "source deduplication" mechanism
    (see find_similar_source_data_and_mark_as_duplicate above
    and doc: https://www.notion.so/alaninsurance/Source-Deduplication-37060a0d1c884c1b970aa6b2a4600ea4?pvs=26&qid=1%3Ac0d72661-8c33-41aa-9b99-25d9fd8203ee%3A5)

    We use the `rollout_initialisation` metadata key to mark the source as "initialised" (to help excluded those when needed)

    These sources are excluded from SLA computations.
    """
    source_information.metadata["rollout_initialisation"] = True
    get_or_create_employment_source_data_model(source_type, source_information)

    if commit:
        current_session.commit()
update_employment_source_data_metadata_with
update_employment_source_data_metadata_with(
    employment_source_data_id, update_dict, commit
)

Update the metadata stored in the DB using the provided update_dict. Each value in the update_dict will update the existing value for the given key. Keys not present in update_dict will not be changed.

Source code in components/employment/public/business_logic/actions/employment_source_data.py
def update_employment_source_data_metadata_with(
    employment_source_data_id: UUID, update_dict: dict[str, Any], commit: bool
) -> None:
    """
    Update the metadata stored in the DB using the provided update_dict. Each value in the update_dict will update
    the existing value for the given key. Keys not present in update_dict will not be changed.
    """
    employment_source_data_model = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    for key, value in update_dict.items():
        employment_source_data_model.source_metadata[key] = value

    if commit:
        current_session.commit()

merge_users

merge_users_in_employment_component
merge_users_in_employment_component(
    merge_from, merge_into, source_information, logs, commit
)

Support function for the "merge user" procedure: change all employments that target user merge_from so that they target user merge_into instead.

This will change ALL employments for the user, including cancelled ones.

Note that this will NOT be notified to consumers.

⚠️ Blocked movements will NOT be retargeted!

If logs is not None, strings describing the actions will be appended to it

Source code in components/employment/public/business_logic/actions/merge_users.py
def merge_users_in_employment_component(
    merge_from: UserId,
    merge_into: UserId,
    source_information: SourceInformationLike,
    logs: list[str] | None,
    commit: bool,
) -> None:
    """
    Support function for the "merge user" procedure: change all employments that target user `merge_from` so that they
    target user `merge_into` instead.

    This will change ALL employments for the user, including cancelled ones.

    Note that this will NOT be notified to consumers.

    ⚠️ Blocked movements will NOT be retargeted!

    If `logs` is not None, strings describing the actions will be appended to it
    """
    employment_source_data_id = get_or_create_employment_source_data_model(
        source_type=GlobalSourceType.internal_change,
        source_information_like=source_information,
    )
    employments = get_core_employments_for_user(merge_from)
    for employment in employments:
        current_employment_logger.info(
            f"Updating Employment Component employment {employment.employment_id} from user {merge_from} to {merge_into}",
            layer="merge_users",
        )
        if logs is not None:
            logs.append(
                f"Updating Employment Component employment {employment.employment_id} from user {merge_from} to {merge_into}"
            )
        new_core_employment_version = replace(
            employment,
            user_id=merge_into,
            source_type=GlobalSourceType.internal_change,
            employment_source_data_id=employment_source_data_id,
            employment_change_types=set(),
        )
        create_new_core_employment_version_model(
            new_core_employment_version, allow_inconsistent_user_id=True
        )

    if commit:
        current_session.commit()

source_information

update_source_information_metadata_with
update_source_information_metadata_with(
    source_information, update_dict
)

Update the provided source_information's metadata in place using the provided update_dict.

Never commits, even if the underlying source_information is a persisted DB model.

Source code in components/employment/public/business_logic/actions/source_information.py
def update_source_information_metadata_with(
    source_information: SourceInformationLike, update_dict: dict[str, Any]
) -> None:
    """
    Update the provided source_information's metadata in place using the provided update_dict.

    Never commits, even if the underlying source_information is a persisted DB model.
    """
    # Simplest case -> already exists in DB, can use the regular flow
    if isinstance(source_information, UUID):
        update_employment_source_data_metadata_with(
            source_information, update_dict, commit=False
        )
        return

    # More complex case -> may or may not exist in DB, directly update the underlying dict
    if isinstance(source_information, SourceInformation):
        if (model := source_information._employment_source_data) is not None:  # noqa: ALN027 # , no way to say "internal to component" in Python
            dict_to_update = model.source_metadata
        dict_to_update = source_information.metadata
    else:
        raise ValueError(f"Invalid SourceInformationLike object: {source_information}")

    for key, value in update_dict.items():
        dict_to_update[key] = value

upstream_blocked_movement_creator

UpstreamBlockedMovementCreator
UpstreamBlockedMovementCreator(
    commit,
    source,
    upstream_data,
    source_information,
    upstream_retried_blocked_movement=None,
    **unexpected_error_kwargs
)

Context manager that catch exceptions and create an upstream blocked movement if needed

upstream_data: data used to decide if we should create a new blocked movement or not Basically, if a new movement is failing with the same error and the same upstream_data, we won't create a new blocked movement

Source code in components/employment/public/business_logic/actions/upstream_blocked_movement_creator.py
def __init__(
    self,
    commit: bool,
    source: SourceType,
    upstream_data: dict[str, Any],
    source_information: SourceInformationLike,
    upstream_retried_blocked_movement: Optional[
        UpstreamBlockedMovementModel
    ] = None,
    **unexpected_error_kwargs: dict[str, Any],
) -> None:
    self.commit = commit
    self.source = source
    self.upstream_data = upstream_data
    self.retried_blocked_movement: Optional[UpstreamBlockedMovementModel] = (
        upstream_retried_blocked_movement
    )
    self.unexpected_error_kwargs = unexpected_error_kwargs
    self.created_blocked_movement = False
    self.source_information = source_information
    self.employment_source_data: EmploymentSourceData | None = None
    """
    If created_blocked_movement is True (i.e. an upstream blocked movement was created because the block inside
    this creator was unsuccessful), employment_source_data will be set to the corresponding EmploymentSourceData.

    If created_blocked_movement is False (i.e. the block inside this creator executed without raising),
    employment_source_data will be None.
    """
__enter__
__enter__()
Source code in components/employment/public/business_logic/actions/upstream_blocked_movement_creator.py
def __enter__(self) -> "UpstreamBlockedMovementCreator":  # noqa: D105
    enter_blocked_movement_creator_semaphore()
    return self
__exit__
__exit__(exc_type, exc, traceback)
Source code in components/employment/public/business_logic/actions/upstream_blocked_movement_creator.py
def __exit__(  # noqa: D105
    self,
    exc_type: Optional[type[BaseException]],
    exc: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> bool:
    exit_blocked_movement_creator_semaphore()

    if not exc:
        if self.retried_blocked_movement:
            self.retried_blocked_movement.status = BlockedMovementStatus.resolved
        if self.commit:
            current_session.commit()
        return False

    try:
        current_employment_logger.warning(
            f"Failed to make declaration with error: {exc}",
            exc_info=True,
            layer="blocked_movement_creator",
        )
        current_session.rollback()

        error = exc
        if not isinstance(error, UpstreamError):
            if isinstance(error, IntegrityError):
                # see https://github.com/alan-eu/alan-apps/pull/12736 for context
                # /!\ this string is used to determine if we should create a new blocked movement
                # if it is updated, it may duplicate some previous blocked movements
                error_message = (
                    f"Constraint {error.orig.diag.constraint_name} of table {error.orig.diag.table_name} "  # type: ignore[union-attr]
                    f"in schema {error.orig.diag.schema_name} raised."  # type: ignore[union-attr]
                )
            else:
                error_message = str(error)
            error = UnexpectedUpstreamError(
                "Unexpected error while ingesting employment declaration",
                error_message=error_message,
                context=UnexpectedUpstreamErrorContext(
                    traceback="".join(format_exception(error))
                ),
            )

        employment_source_data_id = get_or_create_employment_source_data_model(
            self.source, self.source_information
        )
        blocked_movement = get_or_create_upstream_blocked_movement(
            error,
            source_type=self.source,
            upstream_data=self.upstream_data,
            employment_source_data_id=employment_source_data_id,
        )

        if self.commit:
            current_session.commit()
        else:
            current_session.flush()
    except Exception as error:
        current_employment_logger.warning(
            f"Failed to created blocked movement: {error}",
            exc_info=True,
            layer="blocked_movement_creator",
        )

        current_session.rollback()

        unexpected_error = UnexpectedUpstreamError(
            "Unexpected error while creating a blocked movement",
            error_message=str(error),
            context=UnexpectedUpstreamErrorContext(
                traceback="".join(format_exception(error))
            ),
        )
        employment_source_data_id = get_or_create_employment_source_data_model(
            self.source, self.source_information
        )
        blocked_movement = get_or_create_upstream_blocked_movement(
            unexpected_error,
            source_type=self.source,
            upstream_data=self.upstream_data,
            employment_source_data_id=employment_source_data_id,
        )
        if self.commit:
            current_session.commit()
        else:
            current_session.flush()

    if (
        self.retried_blocked_movement
        and blocked_movement != self.retried_blocked_movement
    ):
        self.retried_blocked_movement.status = BlockedMovementStatus.resolved
        blocked_movement.parent_blocked_movement_id = (
            self.retried_blocked_movement.id
        )
        if self.commit:
            current_session.commit()
        else:
            current_session.flush()
    self.created_blocked_movement = True
    employment_source_data_model = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    # In some cases, we can end up with a stale EmploymentSourceDataModel that still contains old (pre-rollback)
    # data. Expire it to force a refresh
    current_session.expire(employment_source_data_model)
    self.employment_source_data = (
        EmploymentSourceData.from_employment_source_data_model(
            employment_source_data_model
        )
    )

    # Stops the exception propagation
    return True
commit instance-attribute
commit = commit
created_blocked_movement instance-attribute
created_blocked_movement = False
employment_source_data instance-attribute
employment_source_data = None

If created_blocked_movement is True (i.e. an upstream blocked movement was created because the block inside this creator was unsuccessful), employment_source_data will be set to the corresponding EmploymentSourceData.

If created_blocked_movement is False (i.e. the block inside this creator executed without raising), employment_source_data will be None.

retried_blocked_movement instance-attribute
retried_blocked_movement = upstream_retried_blocked_movement
source instance-attribute
source = source
source_information instance-attribute
source_information = source_information
unexpected_error_kwargs instance-attribute
unexpected_error_kwargs = unexpected_error_kwargs
upstream_data instance-attribute
upstream_data = upstream_data

queries

blocked_movement

get_core_blocked_movement_or_none
get_core_blocked_movement_or_none(core_blocked_movement_id)

Get a core blocked movement by its ID.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_core_blocked_movement_or_none(
    core_blocked_movement_id: UUID,
) -> CoreBlockedMovement | None:
    """
    Get a core blocked movement by its ID.
    """
    core_blocked_movement_model = get_resource_or_none(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    return (
        CoreBlockedMovement.from_core_blocked_movement_model(
            core_blocked_movement_model
        )
        if core_blocked_movement_model is not None
        else None
    )
get_pending_blocked_movements_for_company_id
get_pending_blocked_movements_for_company_id(
    company_id, from_sources
)

Gets all pending blocked movements for the provided company ID. The intended usage for this function is to retrieve blocked movements for company admins (both displaying them and acting upon them).

This function specifically returns only 'pending' blocked movements (i.e. with the 'pending' status), but not all active blocked movements. This is intentional: all other statuses indicate that the blocked movement is currently being handled, either manually (e.g. 'awaiting_user_response' status) or by some automated process (e.g. 'pending_self_healing'), as it is assumed that we do not want to provide actionability to admins on these blocked movements.

Note: upstream blocked movements must have a proper company_id value set in order to be returned by this function.

The blocked movements are returned in an arbitrary order.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_pending_blocked_movements_for_company_id(
    company_id: str, from_sources: set[SourceType]
) -> list[CoreBlockedMovement | UpstreamBlockedMovement]:
    """
    Gets all pending blocked movements for the provided company ID. The intended usage for this function is to retrieve
    blocked movements for company admins (both displaying them and acting upon them).

    This function specifically returns only 'pending' blocked movements (i.e. with the 'pending' status), but not all
    active blocked movements. This is intentional: all other statuses indicate that the blocked movement is currently
    being handled, either manually (e.g. 'awaiting_user_response' status) or by some automated process (e.g.
    'pending_self_healing'), as it is assumed that we do not want to provide actionability to admins on these blocked
    movements.

    Note: upstream blocked movements must have a proper `company_id` value set in order to be returned by this function.

    The blocked movements are returned in an arbitrary order.
    """
    upstream_blocked_movement = [
        UpstreamBlockedMovement.from_upstream_blocked_movement_model(
            upstream_blocked_movement_model
        )
        for upstream_blocked_movement_model in UpstreamBlockedMovementModelBroker.query().where(
            UpstreamBlockedMovementModel.company_id == company_id,
            UpstreamBlockedMovementModel.source_type.in_(from_sources),
            UpstreamBlockedMovementModel._status == BlockedMovementStatus.pending,  # noqa: ALN027 # OK, see comment on _status
        )
    ]

    core_blocked_movement = [
        CoreBlockedMovement.from_core_blocked_movement_model(
            core_blocked_movement_model
        )
        for core_blocked_movement_model in BlockedMovementModelBroker.query().where(
            CoreBlockedMovementModel.company_id == company_id,
            CoreBlockedMovementModel.source_type.in_(from_sources),
            CoreBlockedMovementModel._status == BlockedMovementStatus.pending,  # noqa: ALN027 # OK, see comment on _status
        )
    ]

    return core_blocked_movement + upstream_blocked_movement
get_upstream_blocked_movement_or_none
get_upstream_blocked_movement_or_none(
    upstream_blocked_movement_id,
)

Get an upstream blocked movement by its ID.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_upstream_blocked_movement_or_none(
    upstream_blocked_movement_id: UUID,
) -> UpstreamBlockedMovement | None:
    """
    Get an upstream blocked movement by its ID.
    """
    upstream_blocked_movement_model = get_resource_or_none(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )

    return (
        UpstreamBlockedMovement.from_upstream_blocked_movement_model(
            upstream_blocked_movement_model
        )
        if upstream_blocked_movement_model is not None
        else None
    )
get_upstream_or_core_blocked_movement_or_none
get_upstream_or_core_blocked_movement_or_none(
    blocked_movement_id,
)

Get an upstream or core blocked movement by its ID.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_upstream_or_core_blocked_movement_or_none(
    blocked_movement_id: UUID,
) -> UpstreamBlockedMovement | CoreBlockedMovement | None:
    """
    Get an upstream or core blocked movement by its ID.
    """
    # Upstreams have prio but there's no rationale behind it - UUID collisions are so rare that this doesn't matter.
    upstream_blocked_movement = get_upstream_blocked_movement_or_none(
        blocked_movement_id
    )
    return (
        upstream_blocked_movement
        if upstream_blocked_movement is not None
        else get_core_blocked_movement_or_none(blocked_movement_id)
    )

core_employment_version

get_all_latest_core_employment_versions_on
get_all_latest_core_employment_versions_on(
    user_ids, company_id, on_date
)

Like get_latest_core_employment_version_on, but supports multiple users.

Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_all_latest_core_employment_versions_on(
    user_ids: set[UserId], company_id: CompanyId | None, on_date: date
) -> list[CoreEmploymentVersion]:
    """
    Like get_latest_core_employment_version_on, but supports multiple users.
    """
    return get_all_latest_core_employment_versions_overlapping(
        user_ids, company_id, on_date, on_date
    )
get_all_latest_core_employment_versions_overlapping
get_all_latest_core_employment_versions_overlapping(
    user_ids, company_id, start_date, end_date
)

Like get_latest_core_employment_versions_overlapping, but for multiple users.

Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_all_latest_core_employment_versions_overlapping(
    user_ids: set[UserId],
    company_id: CompanyId | None,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    """
    Like get_latest_core_employment_versions_overlapping, but for multiple users.
    """
    core_employment_versions_query = (
        CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id.in_(user_ids),
            CoreEmploymentVersionModel.is_ever_active_between(start_date, end_date),
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    )

    if company_id:
        core_employment_versions_query = core_employment_versions_query.filter(
            CoreEmploymentVersionModel.company_id == company_id,
        )

    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version
        )
        for core_employment_version in core_employment_versions_query
    ]
get_core_employments_for_company
get_core_employments_for_company(
    company_id,
    exclude_cancelled=False,
    exclude_terminated=False,
)

Return the core employments for all employees of a company, using their latest version.

Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_company(
    company_id: CompanyId,
    exclude_cancelled: bool = False,
    exclude_terminated: bool = False,
) -> Iterable[CoreEmploymentVersion]:
    """
    Return the core employments for all employees of a company, using their latest version.
    """
    query = CoreEmploymentVersionModelBroker.query_latest_versions().filter(
        CoreEmploymentVersionModel.company_id == company_id
    )

    if exclude_cancelled:
        query = query.filter(CoreEmploymentVersionModel.is_cancelled.is_(False))

    if exclude_terminated:
        query = query.filter(CoreEmploymentVersionModel.is_ended.is_(False))

    query = query.order_by(CoreEmploymentVersionModel.start_date)

    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in query
    ]
get_core_employments_for_external_employee_id_in_company_overlapping
get_core_employments_for_external_employee_id_in_company_overlapping(
    external_employee_id, company_id, start_date, end_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_external_employee_id_in_company_overlapping(  # noqa: D103
    external_employee_id: str,
    company_id: CompanyId,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.external_employee_id == external_employee_id,
            CoreEmploymentVersionModel.company_id == company_id,
            CoreEmploymentVersionModel.is_ever_active_between(start_date, end_date),
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    ]
get_core_employments_for_user
get_core_employments_for_user(user_id)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_user(user_id: UserId) -> list[CoreEmploymentVersion]:  # noqa: D103
    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id == user_id,
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    ]
get_core_employments_for_user_company
get_core_employments_for_user_company(user_id, company_id)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_user_company(  # noqa: D103
    user_id: UserId, company_id: CompanyId
) -> list[CoreEmploymentVersion]:
    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id == user_id,
            CoreEmploymentVersionModel.company_id == company_id,
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    ]
get_latest_cancelled_core_employment_versions_overlapping
get_latest_cancelled_core_employment_versions_overlapping(
    user_id, company_id, start_date, end_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_cancelled_core_employment_versions_overlapping(  # noqa: D103
    user_id: UserId,
    company_id: CompanyId | None,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    core_employment_versions_query = (
        CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id == user_id,
            CoreEmploymentVersionModel.is_cancelled.is_(True),
            or_(
                CoreEmploymentVersionModel.end_date.is_(None),
                CoreEmploymentVersionModel.end_date >= start_date,
            ),
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    )

    if end_date:
        core_employment_versions_query = core_employment_versions_query.filter(
            CoreEmploymentVersionModel.start_date <= end_date,
        )

    if company_id:
        core_employment_versions_query = core_employment_versions_query.filter(
            CoreEmploymentVersionModel.company_id == company_id,
        )

    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version
        )
        for core_employment_version in core_employment_versions_query
    ]
get_latest_core_employment_version_on
get_latest_core_employment_version_on(
    user_id, company_id, on_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_core_employment_version_on(  # noqa: D103
    user_id: UserId, company_id: CompanyId | None, on_date: date
) -> CoreEmploymentVersion | None:
    return one_or_none(
        get_latest_core_employment_versions_overlapping(
            user_id, company_id, on_date, on_date
        )
    )
get_latest_core_employment_versions_overlapping
get_latest_core_employment_versions_overlapping(
    user_id, company_id, start_date, end_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_core_employment_versions_overlapping(  # noqa: D103
    user_id: UserId,
    company_id: CompanyId | None,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    return get_all_latest_core_employment_versions_overlapping(
        {user_id},
        company_id,
        start_date,
        end_date,
    )
get_latest_version_for_employment_or_raise
get_latest_version_for_employment_or_raise(employment_id)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_version_for_employment_or_raise(  # noqa: D103
    employment_id: UUID,
) -> CoreEmploymentVersion:
    latest_version_for_employment = (
        CoreEmploymentVersionModelBroker.get_latest_version_for_employment(
            employment_id
        )
    )
    if latest_version_for_employment is None:
        raise EmploymentNotFound(
            f"No employment found for employment_id {employment_id}"
        )

    return latest_version_for_employment

country_helpers

fr
get_all_employment_source_data_for_bulk_action_report
get_all_employment_source_data_for_bulk_action_report(
    min_created_at,
)

FR-specific DB helper: retrieves all Employment Source Data objects that we need to send bulk action reports.

Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_all_employment_source_data_for_bulk_action_report(
    min_created_at: datetime,
) -> list[EmploymentSourceData]:
    """
    FR-specific DB helper: retrieves all Employment Source Data objects that we need to send bulk action reports.
    """
    query = (
        current_session.query(EmploymentSourceDataModel)  # noqa: ALN085
        .options(
            joinedload(EmploymentSourceDataModel.core_employment_versions),
            joinedload(EmploymentSourceDataModel.extended_employment_updates),
            joinedload(EmploymentSourceDataModel.core_blocked_movements),
            joinedload(EmploymentSourceDataModel.upstream_blocked_movements),
        )
        .filter(
            EmploymentSourceDataModel.source_type.in_(
                [
                    SourceType.fr_bulk_invite,
                    SourceType.fr_bulk_removal,
                    SourceType.fr_bulk_cancel,
                ]
            ),
            EmploymentSourceDataModel.created_at > min_created_at,
            EmploymentSourceDataModel.source_metadata.contains(
                {"reported_to_admin_at": None}
            ),
            EmploymentSourceDataModel.source_metadata.has_key("bulk_invite_file_id"),
            not_(
                EmploymentSourceDataModel.source_metadata.contains(
                    {"bulk_invite_file_id": None}
                )
            ),
        )
        .outerjoin(EmploymentSourceDataModel.upstream_blocked_movements)
        .order_by(EmploymentSourceDataModel.created_at)
    )
    return [
        EmploymentSourceData.from_employment_source_data_model(
            employment_source_data_model
        )
        for employment_source_data_model in query
    ]
get_blocked_movements_for_dsn_removal_suggestions
get_blocked_movements_for_dsn_removal_suggestions()

Returns a list of blocked movements that should be considered for DSN removal suggestions. If multiple blocked movements exist for the same user and company, only the one with the latest end date is returned. This is to avoid a weird experience for the admin where they'd see multiple suggestions for the same user.

Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_blocked_movements_for_dsn_removal_suggestions() -> list[CoreBlockedMovement]:
    """
    Returns a list of blocked movements that should be considered for DSN removal suggestions.
    If multiple blocked movements exist for the same user and company, only the one with the latest end date is returned.
    This is to avoid a weird experience for the admin where they'd see multiple suggestions for the same user.
    """
    query = (
        current_session.query(CoreBlockedMovementModel)  # noqa: ALN085
        .filter(
            CoreBlockedMovementModel.source_type
            == SourceType.fr_dsn_removal_suggestion,
            CoreBlockedMovementModel.status  # hybrid property
            == BlockedMovementStatus.pending_self_healing,
            CoreBlockedMovementModel.context["employment_declaration"][
                "end_date"
            ].isnot(None),
        )
        .distinct(CoreBlockedMovementModel.user_id, CoreBlockedMovementModel.company_id)
        .order_by(
            CoreBlockedMovementModel.user_id,
            CoreBlockedMovementModel.company_id,
            cast(
                CoreBlockedMovementModel.context["employment_declaration"][
                    "end_date"
                ].astext,
                Date,
            ).desc(),
            CoreBlockedMovementModel.created_at.desc(),
        )
    )

    return [CoreBlockedMovement.from_core_blocked_movement_model(bm) for bm in query]
get_dsn_removal_suggestion_blocked_movements_to_cancel
get_dsn_removal_suggestion_blocked_movements_to_cancel(
    user_id, company_id
)

When getting the blocked movements for DSN removal suggestions, we only want to keep the latest one. (This is done in the get_blocked_movements_for_dsn_removal_suggestions function above.) However, we need to cancel ALL the suggestion blocked movements, because we don't want to leave them in a pending_self_healing state.

Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_dsn_removal_suggestion_blocked_movements_to_cancel(
    user_id: str, company_id: str
) -> list[CoreBlockedMovement]:
    """
    When getting the blocked movements for DSN removal suggestions, we only want to keep the latest one.
    (This is done in the get_blocked_movements_for_dsn_removal_suggestions function above.)
    However, we need to cancel ALL the suggestion blocked movements,
    because we don't want to leave them in a pending_self_healing state.
    """
    return [
        CoreBlockedMovement.from_core_blocked_movement_model(bm)
        for bm in current_session.query(CoreBlockedMovementModel).filter(  # noqa: ALN085
            CoreBlockedMovementModel.user_id == user_id,
            CoreBlockedMovementModel.company_id == company_id,
            CoreBlockedMovementModel.source_type
            == SourceType.fr_dsn_removal_suggestion,
            CoreBlockedMovementModel.status  # hybrid property
            == BlockedMovementStatus.pending_self_healing,
        )
    ]
get_employment_source_data_extra_fields_for_company
get_employment_source_data_extra_fields_for_company(
    company_id,
    keys,
    on_date,
    raw_data_additional_filters=None,
)

Get values for the specified keys from EmploymentSourceData.raw_data["extra"] for all active employees in company.

This has been built for Finance integration "Flux Retour"

Parameters:

Name Type Description Default
company_id str

Company ID to filter by

required
keys list[str]

List of keys to extract from raw_data["extra"]

required
on_date date

Date to check for active employments

required
raw_data_additional_filters dict[str, Any] | None

Additional filters to apply on raw_data

None

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping user_id to a dictionary of key-value pairs from raw_data["extra"].

dict[str, dict[str, Any]]

Only keys that exist in the data are included.

dict[str, dict[str, Any]]

If the keys are present in several EmploymentSourceData entries for the same user,

dict[str, dict[str, Any]]

the returned values are the initial ones (first received).

Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_employment_source_data_extra_fields_for_company(
    company_id: str,
    keys: list[str],
    on_date: date,
    raw_data_additional_filters: dict[str, Any] | None = None,
) -> dict[str, dict[str, Any]]:
    """
    Get values for the specified keys from EmploymentSourceData.raw_data["extra"]
    for all active employees in company.

    This has been built for Finance integration "Flux Retour"

    Args:
        company_id: Company ID to filter by
        keys: List of keys to extract from raw_data["extra"]
        on_date: Date to check for active employments
        raw_data_additional_filters: Additional filters to apply on raw_data

    Returns:
        Dictionary mapping user_id to a dictionary of key-value pairs from raw_data["extra"].
        Only keys that exist in the data are included.
        If the keys are present in several EmploymentSourceData entries for the same user,
        the returned values are the initial ones (first received).
    """
    from components.employment.internal.models.core_employment_version import (
        CoreEmploymentVersionModel,
    )
    from components.employment.internal.models.latest_core_employment_version import (
        LatestCoreEmploymentVersion,
    )

    # Create CTE for non-cancelled active employment IDs
    # Note: Start date condition removed as all test employments start in the future
    non_cancelled_active_employments_cte = (
        select(CoreEmploymentVersionModel.employment_id)
        .join(
            LatestCoreEmploymentVersion,
            LatestCoreEmploymentVersion.core_employment_version_id
            == CoreEmploymentVersionModel.id,
        )
        .filter(
            CoreEmploymentVersionModel.company_id == company_id,
            CoreEmploymentVersionModel.is_cancelled.is_(False),
            (CoreEmploymentVersionModel.end_date.is_(None))
            | (CoreEmploymentVersionModel.end_date >= on_date),
        )
    ).cte("non_cancelled_active_employments")

    raw_data_filter = EmploymentSourceDataModel.raw_data.has_key("extra")
    if raw_data_additional_filters:
        raw_data_filter = and_(
            raw_data_filter,
            *[
                EmploymentSourceDataModel.raw_data[key].astext == value
                for key, value in raw_data_additional_filters.items()
            ],
        )

    # Get EmploymentSourceData with associated user_id using CTE join
    employment_data_with_users = current_session.execute(
        select(
            CoreEmploymentVersionModel.user_id,
            EmploymentSourceDataModel.raw_data,
            EmploymentSourceDataModel.created_at,
        )
        .select_from(EmploymentSourceDataModel)
        .join(
            CoreEmploymentVersionModel,
            CoreEmploymentVersionModel.employment_source_data_id
            == EmploymentSourceDataModel.id,
        )
        .join(
            non_cancelled_active_employments_cte,
            non_cancelled_active_employments_cte.c.employment_id
            == CoreEmploymentVersionModel.employment_id,
        )
        .filter(raw_data_filter)
        .order_by(
            CoreEmploymentVersionModel.user_id,
            EmploymentSourceDataModel.created_at.desc(),
        )
    ).all()

    results = {}

    for user_id, raw_data, _ in employment_data_with_users:
        # Skip if we already have data for this user (we ordered by created_at desc, so first is the most recent one)
        if user_id in results:
            continue

        extra_data = raw_data["extra"]
        user_result = {}

        for key in keys:
            if key in extra_data:
                user_result[key] = extra_data[key]

        # Only include users that have at least one of the requested keys
        if user_result:
            results[user_id] = user_result

    return results

employment_source_data

get_employment_metadata_value
get_employment_metadata_value(
    employment_source_data_id, key
)

Get the value of the provided metadata key for the employment source data with the provided ID.

Source code in components/employment/public/business_logic/queries/employment_source_data.py
def get_employment_metadata_value(employment_source_data_id: UUID, key: str) -> Any:
    """
    Get the value of the provided metadata key for the employment source data with the provided ID.
    """
    employment_source_data = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    return employment_source_data.source_metadata.get(key, None)
get_employment_source_data_from_id
get_employment_source_data_from_id(
    employment_source_data_id,
)

Get an EmploymentSourceData object from its ID.

Source code in components/employment/public/business_logic/queries/employment_source_data.py
def get_employment_source_data_from_id(
    employment_source_data_id: UUID,
) -> EmploymentSourceData:
    """
    Get an `EmploymentSourceData` object from its ID.
    """
    employment_source_data_model = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    return EmploymentSourceData.from_employment_source_data_model(
        employment_source_data_model
    )
get_employment_source_data_from_workflow_id
get_employment_source_data_from_workflow_id(workflow_id)

Get all EmploymentSourceData objects from a workflow.

Source code in components/employment/public/business_logic/queries/employment_source_data.py
def get_employment_source_data_from_workflow_id(
    workflow_id: str,
) -> Iterator[EmploymentSourceData]:
    """
    Get all `EmploymentSourceData` objects from a workflow.
    """
    models = current_session.scalars(
        select(EmploymentSourceDataModel).filter(
            EmploymentSourceDataModel.source_metadata["workflow_id"].astext
            == workflow_id
        )
    )

    return (
        EmploymentSourceData.from_employment_source_data_model(model)
        for model in models
    )

extended_employment_update

get_employment_values_on
get_employment_values_on(employment_id, on_date)
Source code in components/employment/public/business_logic/queries/extended_employment_update.py
def get_employment_values_on(  # noqa: D103
    employment_id: UUID,
    on_date: date,
) -> _TValues:  # type: ignore[type-var,misc]  # Technically this is not a required generic type, but it's here to force people to type their calls
    values = {}
    for extended_employment_value in get_extended_employment_values_on(
        employment_id, on_date
    ):
        values[extended_employment_value.key] = extended_employment_value.value
    return cast("_TValues", values)

source_information

get_metadata_from_source_information_like
get_metadata_from_source_information_like(
    source_information_like,
)

Retrieve the metadata dictionary from a SourceInformationLike object.

Must not be used for updating the metadata, should only be used for reading the metadata! If you want to update the metadata, use update_employment_source_data_metadata_with or update_source_information_metadata_with instead.

Source code in components/employment/public/business_logic/queries/source_information.py
def get_metadata_from_source_information_like(
    source_information_like: SourceInformationLike,
) -> dict[str, Any]:
    """
    Retrieve the metadata dictionary from a `SourceInformationLike` object.

    Must not be used for updating the metadata, should only be used for reading the metadata! If you want to update
    the metadata, use `update_employment_source_data_metadata_with` or `update_source_information_metadata_with`
    instead.
    """
    if isinstance(source_information_like, UUID):
        return get_or_raise_missing_resource(
            EmploymentSourceDataModel, source_information_like
        ).source_metadata
    elif isinstance(source_information_like, SourceInformation):
        if (model := source_information_like._employment_source_data) is not None:  # noqa: ALN027 # - no way to say "internal to component" in Python
            return model.source_metadata
        return source_information_like.metadata
    else:
        raise ValueError(
            f"Invalid SourceInformationLike object: {source_information_like}"
        )

source_rules

get_source_rules
get_source_rules(source_type)

Get the source rules related to a SourceType

Source code in components/employment/public/business_logic/queries/source_rules.py
def get_source_rules(source_type: SourceType) -> SourceRules:
    """
    Get the source rules related to a `SourceType`
    """
    return RULES_PER_SOURCE_TYPE[source_type]

stale_invitations

get_stale_invitations
get_stale_invitations(company_ids, created_before=None)

Retrieves stale invitations for the given company IDs, optionally filtering by creation date.

Source code in components/employment/public/business_logic/queries/stale_invitations.py
def get_stale_invitations(
    company_ids: list[str] | None, created_before: date | None = None
) -> list[StaleInvitationInformation]:
    """
    Retrieves stale invitations for the given company IDs, optionally filtering by creation date.
    """
    stale_invitations_query = (
        BlockedMovementModelBroker.get_blocked_movements_with_statuses(
            user_ids=[],
            company_ids=company_ids,
            statuses=[
                BlockedMovementStatus.pending_self_healing,
                BlockedMovementStatus.awaiting_user_response_self_healing,
            ],
        ).filter(CoreBlockedMovementModel.error_code == StaleInvitationError.error_code)
    )
    if created_before:
        stale_invitations_query = stale_invitations_query.filter(
            CoreBlockedMovementModel.created_at < created_before
        )

    stale_invitations = []
    for blocked_movement in stale_invitations_query:
        country_code = get_country_code_from_blocked_movement(blocked_movement)
        declaration: EmploymentDeclaration[Any] = EmploymentDeclaration.from_dict(
            blocked_movement.context["employment_declaration"]
        )
        effective_start_date = parse_date(
            blocked_movement.context["effective_start_date"]
        )
        infinitely_valid_information = one(
            [
                update
                for update in declaration.extended_informations
                if update.validity_period is None
            ]
        )
        country_gateway = get_country_gateway(country_code)
        # TODO @matthieu.stombellini SIM-706 this is using some French naming (specifically invite_email)
        stale_invitations.append(
            StaleInvitationInformation(
                blocked_movement_id=blocked_movement.id,
                company_id=blocked_movement.company_id,
                company_display_name=country_gateway.get_company_information(
                    blocked_movement.company_id
                ).display_name,
                employee_full_name=country_gateway.get_user_full_name(
                    blocked_movement.user_id
                ),
                employee_email=infinitely_valid_information.values.get("invite_email"),
                employee_identifier=country_gateway.get_employee_identifier_for_country(
                    infinitely_valid_information.values
                ),
                start_date=effective_start_date,
                created_at=to_datetime(blocked_movement.created_at),
                source_type=parse_base_source_type(blocked_movement.source_type),
                admin_was_notified=blocked_movement.status
                == BlockedMovementStatus.awaiting_user_response_self_healing,
                country_code=country_code,
            )
        )

    return stale_invitations

rules

blocked_movements

make_error_message_from_integrity_error
make_error_message_from_integrity_error(e)

Format an integrity error in a meaningful message to be used in blocked movements.

It doesn't contain the IDs of objects because the error message is used for deduplication of blocked movements.

Source code in components/employment/public/business_logic/rules/blocked_movements.py
def make_error_message_from_integrity_error(e: IntegrityError) -> str:
    """
    Format an integrity error in a meaningful message to be used in blocked movements.

    It doesn't contain the IDs of objects because the error message is used for deduplication of blocked movements.
    """
    return (
        f"Constraint {e.orig.diag.constraint_name} of table {e.orig.diag.table_name} "  # type: ignore[union-attr]
        f"in schema {e.orig.diag.schema_name} raised."  # type: ignore[union-attr]
    )

employment_input_error_info

EmploymentInputErrorCode

Bases: AlanBaseEnum

Generic and global error codes for Employment Component upstreams, especially for extractors

company_is_not_authorized class-attribute instance-attribute
company_is_not_authorized = 'company_is_not_authorized'
company_not_found class-attribute instance-attribute
company_not_found = 'company_not_found'
duplicate_data class-attribute instance-attribute
duplicate_data = 'duplicate_data'
has_reimbursed_care_acts class-attribute instance-attribute
has_reimbursed_care_acts = 'has_reimbursed_care_acts'
invalid_birth_date class-attribute instance-attribute
invalid_birth_date = 'invalid_birth_date'
invalid_ccn_code class-attribute instance-attribute
invalid_ccn_code = 'invalid_ccn_code'
invalid_dni class-attribute instance-attribute
invalid_dni = 'invalid_dni'
invalid_email class-attribute instance-attribute
invalid_email = 'invalid_email'
invalid_end_date class-attribute instance-attribute
invalid_end_date = 'invalid_end_date'
invalid_external_employee_id class-attribute instance-attribute
invalid_external_employee_id = (
    "invalid_external_employee_id"
)
invalid_first_name class-attribute instance-attribute
invalid_first_name = 'invalid_first_name'
invalid_gender class-attribute instance-attribute
invalid_gender = 'invalid_gender'
invalid_hiring_date class-attribute instance-attribute
invalid_hiring_date = 'invalid_hiring_date'
invalid_identity_document_type class-attribute instance-attribute
invalid_identity_document_type = (
    "invalid_identity_document_type"
)
invalid_is_alsace_moselle class-attribute instance-attribute
invalid_is_alsace_moselle = 'invalid_is_alsace_moselle'
invalid_is_unpaid_leave class-attribute instance-attribute
invalid_is_unpaid_leave = 'invalid_is_unpaid_leave'
invalid_language class-attribute instance-attribute
invalid_language = 'invalid_language'
invalid_last_name class-attribute instance-attribute
invalid_last_name = 'invalid_last_name'
invalid_nie class-attribute instance-attribute
invalid_nie = 'invalid_nie'
invalid_ntt class-attribute instance-attribute
invalid_ntt = 'invalid_ntt'
invalid_passport_number class-attribute instance-attribute
invalid_passport_number = 'invalid_passport_number'
invalid_professional_category class-attribute instance-attribute
invalid_professional_category = (
    "invalid_professional_category"
)
invalid_ssn class-attribute instance-attribute
invalid_ssn = 'invalid_ssn'
invalid_start_date class-attribute instance-attribute
invalid_start_date = 'invalid_start_date'
invalid_tax_regime class-attribute instance-attribute
invalid_tax_regime = 'invalid_tax_regime'
invalid_termination_reason class-attribute instance-attribute
invalid_termination_reason = 'invalid_termination_reason'
invalid_unpaid_leave_end_date class-attribute instance-attribute
invalid_unpaid_leave_end_date = (
    "invalid_unpaid_leave_end_date"
)
mandatory_end_date class-attribute instance-attribute
mandatory_end_date = 'mandatory_end_date'
mandatory_termination_type class-attribute instance-attribute
mandatory_termination_type = 'mandatory_termination_type'
missing_hiring_date class-attribute instance-attribute
missing_hiring_date = 'missing_hiring_date'
missing_required_birth_date class-attribute instance-attribute
missing_required_birth_date = 'missing_required_birth_date'
missing_required_email class-attribute instance-attribute
missing_required_email = 'missing_required_email'
missing_required_external_employee_id class-attribute instance-attribute
missing_required_external_employee_id = (
    "missing_required_external_employee_id"
)
missing_required_gender class-attribute instance-attribute
missing_required_gender = 'missing_required_gender'
missing_required_identity_document_type class-attribute instance-attribute
missing_required_identity_document_type = (
    "missing_required_identity_document_type"
)
missing_required_nif class-attribute instance-attribute
missing_required_nif = 'missing_required_nif'
missing_required_ssn_ntt class-attribute instance-attribute
missing_required_ssn_ntt = 'missing_required_ssn_ntt'
missing_tax_regime class-attribute instance-attribute
missing_tax_regime = 'missing_tax_regime'
multiple_employments_found class-attribute instance-attribute
multiple_employments_found = 'multiple_employments_found'
no_active_employment_found class-attribute instance-attribute
no_active_employment_found = 'no_active_employment_found'
not_matching_external_employee_id class-attribute instance-attribute
not_matching_external_employee_id = (
    "not_matching_external_employee_id"
)
start_date_too_far_back class-attribute instance-attribute
start_date_too_far_back = 'start_date_too_far_back'
user_not_found class-attribute instance-attribute
user_not_found = 'user_not_found'
EmploymentInputErrorInfo dataclass
EmploymentInputErrorInfo(
    error_code, message, additional_info=dict()
)

Bases: DataClassJsonMixin

Error returned by extractors in case of an error

additional_info class-attribute instance-attribute
additional_info = field(default_factory=dict)
error_code instance-attribute
error_code
message instance-attribute
message

extractors

This file contains "extractors", which are utility functions that aim to parse and validate items from a dict.

This file only contains global extractors for use in any country - more specialized extractors (e.g. French SSN extractors) should be put in an appropriate spot in each country's component.

Extractors ensure that we perform validation consistently across a range of input methods.

Typical usage of extractors will follow this pattern:

  • Provide a function that raises an exception for a specific error
  • Use partial on extract_or_raise to get your extraction function
  • Use the extraction function with the extractors you need

Here's a working example:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise ExtractorInputException(
        company_id=...,  # or None if you don't have it yet
        user_id=...,  # or None if you don't have it yet
        error=error
    )

extract = partial(extract_or_raise, data=YOUR_DICT, raiser=raiser)

first_name = extract(extractor=extract_mandatory_first_name, field="first_name")
ExtractorInputException
ExtractorInputException(
    user_id, company_id, error, context=None
)

Bases: UpstreamError

A simple exception when you don't need to use a specific exception with except_or_raise.

Usage:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise ExtractorInputException(
        company_id=...,  # or None if you don't have it yet
        user_id=...,  # or None if you don't have it yet
        error=error
    )

Optionally, you can also pass a context value, which will end up in the UpstreamBlockedMovement's context field.

Source code in components/employment/public/business_logic/rules/extractors.py
def __init__(
    self,
    user_id: str | None,
    company_id: str | None,
    error: EmploymentInputErrorInfo,
    context: UpstreamBlockedMovementContext | None = None,
) -> None:
    super().__init__(error.message, context or NoContext())
    self._user_id = user_id
    self._company_id = company_id
    self.error_code = error.error_code
error_code instance-attribute
error_code = error_code
to_blocked_movement_info
to_blocked_movement_info()
Source code in components/employment/public/business_logic/rules/extractors.py
@override
def to_blocked_movement_info(self) -> UpstreamBlockedMovementInfo:
    return UpstreamBlockedMovementInfo(
        user_id=self._user_id,
        company_id=self._company_id,
        error_code=self.error_code,
        error_message=str(self),
        context=self.context,
    )
ExtractorResult module-attribute
ExtractorResult = (
    tuple[T, None] | tuple[None, EmploymentInputErrorInfo]
)

Result of an extractor function. Always a tuple:

  • If a success, the first value is the parsed value (may be None depending on the extractor), the second value is None
  • If a failure, the first value is None, the second value is an EmploymentInputErrorInfo with details on the error.
P module-attribute
P = ParamSpec('P')
T module-attribute
T = TypeVar('T')
cast_to_date_with_validation
cast_to_date_with_validation(
    x, date_format=_DEFAULT_EXTRACTION_DATE_FORMAT
)

Cast the provided value to a datetime object.

By default, strings are parsed using the regular ISO format (YYYY-MM-DD).

Returns a tuple with both the parsed value (or none if the input value was falsy) and a boolean indicating whether the parsing was successful or not.

Note: this in itself is not an extractor, but can be used to build extractors with more useful error diagnostics.

Source code in components/employment/public/business_logic/rules/extractors.py
def cast_to_date_with_validation(
    x: datetime | date | str | None | Any,
    date_format: str = _DEFAULT_EXTRACTION_DATE_FORMAT,
) -> tuple[date | None, Literal[True]] | tuple[None, Literal[False]]:
    """
    Cast the provided value to a `datetime` object.

    By default, strings are parsed using the regular ISO format (YYYY-MM-DD).

    Returns a tuple with both the parsed value (or none if the input value was falsy) and a boolean indicating whether
    the parsing was successful or not.

    Note: this in itself is not an extractor, but can be used to build extractors with more useful error diagnostics.
    """
    if not x:
        return None, True
    if isinstance(x, datetime):
        return x.date(), True
    elif isinstance(x, date):
        return x, True
    elif x and isinstance(x, str):
        try:
            return datetime.strptime(x.split(" ")[0], date_format).date(), True
        except ValueError:
            return None, False
    return None, False
extract_birth_date_from_data
extract_birth_date_from_data(data, field_name)

Birth date extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_birth_date_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[date | None]:
    """
    Birth date extractor
    """
    date, is_valid = cast_to_date_with_validation(data.get(field_name))
    if not is_valid:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_birth_date,
                message=f"Invalid birth date: {data.get(field_name)}",
            ),
        )
    return date, None
extract_boolean_from_data
extract_boolean_from_data(data, field_name)

Utility for parsing booleans written in plain language.

Note: this is not a real extract (as it doesn't return a proper ExtractorResult), this is intended to be used as a utility to build extractors.

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_boolean_from_data(
    data: dict[str, Any],
    field_name: str,
) -> tuple[bool | None, Literal[True]] | tuple[None, Literal[False]]:
    """
    Utility for parsing booleans written in plain language.

    Note: this is not a real extract (as it doesn't return a proper `ExtractorResult`), this is intended to be used
    as a utility to build extractors.
    """
    raw_value = data.get(field_name)
    if raw_value is None:
        return None, True

    sanitized_value = str(raw_value).lower().strip()
    if (
        sanitized_value == "vrai"
        or sanitized_value == "1"
        or sanitized_value == "true"
        or sanitized_value == "oui"
        or sanitized_value == "yes"
    ):
        # about the 1 -> in case the admin does not set the cell to TEXT, the value will be =TRUE() which is 1, or =FALSE() which is 0
        # about the true -> the cell may also be the boolean value True, which would end up as "true" here
        return True, True
    if (
        sanitized_value == "faux"
        or sanitized_value == "0"
        or sanitized_value == "false"
        or sanitized_value == "non"
        or sanitized_value == "no"
    ):
        return False, True

    return None, False
extract_email_from_data
extract_email_from_data(data, field_name)

Email extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_email_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[str | None]:
    """
    Email extractor
    """
    email = (data.get(field_name) or "").strip() or None
    email_error = None
    if email and not email_re.match(email):
        email_error = EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_email,
            message=f"Invalid email: {email}",
        )
        return None, email_error

    return email, None
extract_end_date_from_data
extract_end_date_from_data(data, field_name)

End date extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_end_date_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[date | None]:
    """
    End date extractor
    """
    end_date, is_valid = cast_to_date_with_validation(data.get(field_name))
    if not is_valid:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_end_date,
                message=f"Invalid end date: {data.get(field_name)}",
            ),
        )
    return end_date, None
extract_external_employee_id_from_data_without_validation
extract_external_employee_id_from_data_without_validation(
    data, field_name
)

External employee ID (matricule, payroll ID, etc.) extractor.

Note: this extractor does NOT perform validation. For France, use the "extract_external_employee_id" function, which does.

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_external_employee_id_from_data_without_validation(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[str | None]:
    """
    External employee ID (matricule, payroll ID, etc.) extractor.

    Note: this extractor does NOT perform validation. For France, use the "extract_external_employee_id" function, which
    does.
    """
    raw_value = data.get(field_name)
    if not raw_value:
        return None, None

    parsed = parse_integer_string(raw_value)
    return parsed, None
extract_gender_from_data
extract_gender_from_data(data, field_name)

Gender extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_gender_from_data(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[UserGender | None]:
    """
    Gender extractor
    """
    raw_value = data.get(field_name)
    if not raw_value:
        return None, None

    try:
        return UserGender.validate(raw_value), None
    except EnumValidationError:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_gender,
            message=f"Gender value is not valid: {raw_value}",
        )
extract_mandatory_birth_date_from_data module-attribute
extract_mandatory_birth_date_from_data = mandatory_extractor(
    extractor=extract_birth_date_from_data,
    missing_value_error_code=missing_required_birth_date,
    value_name="birth date",
)
extract_mandatory_email_from_data module-attribute
extract_mandatory_email_from_data = mandatory_extractor(
    extractor=extract_email_from_data,
    missing_value_error_code=missing_required_email,
    value_name="e-mail",
)
extract_mandatory_first_name_from_data
extract_mandatory_first_name_from_data(data, field_name)

First name extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_mandatory_first_name_from_data(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[str]:
    """
    First name extractor
    """
    raw_name = data.get(field_name)
    field_name_display = snake_to_regular(field_name)
    if not raw_name:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_first_name,
            message=f"{field_name_display} is missing",
        )
    cleaned_name = (
        strip_duplicate_spaces(raw_name.strip()).title() if raw_name else raw_name
    )
    try:
        name = mandatory(validates_name(key=field_name, name=cleaned_name))
    except ModelValidationError:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_first_name,
            message=f"Invalid {field_name_display}: {raw_name}. Please use only letters, numbers, and basic punctuation (no special characters or line breaks).",
        )

    return name, None
extract_mandatory_gender_from_data module-attribute
extract_mandatory_gender_from_data = mandatory_extractor(
    extractor=extract_gender_from_data,
    missing_value_error_code=missing_required_gender,
    value_name="gender",
)
extract_mandatory_last_name_from_data
extract_mandatory_last_name_from_data(data, field_name)

Last name extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_mandatory_last_name_from_data(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[str]:
    """
    Last name extractor
    """
    raw_name = data.get(field_name)
    field_name_display = snake_to_regular(field_name)
    if not raw_name:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_last_name,
            message=f"{field_name_display} is missing",
        )
    cleaned_name = (
        strip_duplicate_spaces(raw_name.strip()).title() if raw_name else raw_name
    )
    try:
        name = mandatory(validates_name(key=field_name, name=cleaned_name))
    except ModelValidationError:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_last_name,
            message=f"Invalid {field_name_display}: {raw_name}. Please use only letters, numbers, and basic punctuation (no special characters or line breaks).",
        )

    return name, None
extract_mandatory_start_date_from_data
extract_mandatory_start_date_from_data(data, field_name)

Start date extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_mandatory_start_date_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[date]:
    """
    Start date extractor
    """
    raw_start_date = data.get(field_name)
    start_date, is_valid = cast_to_date_with_validation(raw_start_date)
    if not is_valid:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_start_date,
                message=f"Invalid start date: {data.get(field_name)}",
            ),
        )
    if not start_date:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_start_date,
                message="Missing start date",
            ),
        )
    return start_date, None
extract_or_raise
extract_or_raise(extractor, data, field_name, raiser)

Used to wrap an extractor to make its use easier in upstream scenarios.

Typical usage:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise SomeException(...)

extract = partial(extract_or_raise, data=..., raiser=raiser)

first_name = extract(extractor=extract_mandatory_first_name_from_data, field_name="first_name")
# ...

Note: if you do not have or need a custom exception for your use case, you can use ExtractorInputException, like so:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise ExtractorInputException(
        company_id=...,  # or None if you don't have it yet
        user_id=...,  # or None if you don't have it yet
        error=error
    )
Source code in components/employment/public/business_logic/rules/extractors.py
def extract_or_raise(
    extractor: _ExtractorFunc[T],
    data: dict[str, Any],
    field_name: str,
    raiser: _ExtractorRaiser,
) -> T:
    """
    Used to wrap an extractor to make its use easier in upstream scenarios.

    Typical usage:

    ```python
    def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
        raise SomeException(...)

    extract = partial(extract_or_raise, data=..., raiser=raiser)

    first_name = extract(extractor=extract_mandatory_first_name_from_data, field_name="first_name")
    # ...
    ```

    Note: if you do not have or need a custom exception for your use case, you can use `ExtractorInputException`, like
    so:

    ```python
    def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
        raise ExtractorInputException(
            company_id=...,  # or None if you don't have it yet
            user_id=...,  # or None if you don't have it yet
            error=error
        )
    ```
    """
    result = extractor(data, field_name)
    if result[1] is not None:
        raiser(result[1])
    return result[0]
mandatory_extractor
mandatory_extractor(
    extractor, missing_value_error_code, value_name
)

Utility function to turn an extractor that returns a value or None if absent into a "mandatory" extractor.

Usage:

extract_mandatory_email_from_data = mandatory_extractor(
    extract_email_from_data,
    EmploymentInputErrorCode.missing_invite_email,
    "Invitation Email"
)

This function is typed properly: the signature of the initial extractor is retained (and you can safely use it with 'partial'), and the resulting function is guaranteed to return a non-None value.

Source code in components/employment/public/business_logic/rules/extractors.py
def mandatory_extractor(
    extractor: Callable[P, ExtractorResult[T | None]],
    missing_value_error_code: EmploymentInputErrorCode,
    value_name: str,
) -> Callable[P, ExtractorResult[T]]:
    """
    Utility function to turn an extractor that returns a value or None if absent into a "mandatory" extractor.

    Usage:

    ```python
    extract_mandatory_email_from_data = mandatory_extractor(
        extract_email_from_data,
        EmploymentInputErrorCode.missing_invite_email,
        "Invitation Email"
    )
    ```

    This function is typed properly: the signature of the initial extractor is retained (and you can safely use it with
    'partial'), and the resulting function is guaranteed to return a non-None value.
    """

    def extractor_wrapped(*args: P.args, **kwargs: P.kwargs) -> ExtractorResult[T]:
        result, error = extractor(*args, **kwargs)
        if error is not None:
            return None, error

        if result is None:
            return (
                None,
                EmploymentInputErrorInfo(
                    error_code=missing_value_error_code,
                    # technically we could use the field_name here but having a specific value here makes the message more useful
                    message=f"Missing value: {value_name}",
                ),
            )

        return mandatory(result), error

    return extractor_wrapped
parse_integer_string
parse_integer_string(x)

Used for SSNs, NTTs, SIRENs and external employee IDs ("matricules"). Sometimes these integer strings are floats in the excel spreadsheet, so it adds a decimal part when we stringify them (str(float(1)) -> '1.0')

Source code in components/employment/public/business_logic/rules/extractors.py
def parse_integer_string(x: Any) -> str | None:
    """
    Used for SSNs, NTTs, SIRENs and external employee IDs ("matricules").
    Sometimes these integer strings are floats in the excel spreadsheet, so it adds
    a decimal part when we stringify them (str(float(1)) -> '1.0')
    """
    if x is None:
        return None

    if isinstance(x, float):
        return str(int(x))
    elif isinstance(x, str):
        return re2.sub(r"\s+", "", x)  # type: ignore[no-any-return]

    return str(x)

stale_invitations

SKIP_STALE_INVITATION_CHECK_METADATA_KEY module-attribute
SKIP_STALE_INVITATION_CHECK_METADATA_KEY = (
    "skip_stale_invitation_check"
)
check_invitation_is_not_stale
check_invitation_is_not_stale(
    start_date, core_employment_version
)

Verifies that an invitation with the provided date does not fall under a "stale invitation" scenario.

Stale invitations are a special flow used to handle invitations that would incur more than 100 days of coverage upon invitation. These can be admin mistakes and might need to be checked - otherwise, admins would incur a lot of regularization.

Note that 'start_date' is not the start date of the employee in the company, but the effective start date of their coverage. For example, employees added with a start date in the 2000s but whose company just signed up for Alan do not count as "stale" invitations.

Source code in components/employment/public/business_logic/rules/stale_invitations.py
def check_invitation_is_not_stale(
    start_date: date, core_employment_version: CoreEmploymentVersion
) -> None:
    """
    Verifies that an invitation with the provided date does not fall under a "stale invitation" scenario.

    Stale invitations are a special flow used to handle invitations that would incur more than 100 days of coverage
    upon invitation. These can be admin mistakes and might need to be checked - otherwise, admins would incur a lot of
    regularization.

    Note that 'start_date' is not the start date of the employee in the company, but the effective **start date of
    their coverage**. For example, employees added with a start date in the 2000s *but* whose company just signed up
    for Alan do not count as "stale" invitations.
    """
    if _should_check_stale_invitation(
        core_employment_version
    ) and is_start_date_too_far_in_the_past(start_date):
        raise StaleInvitationError(
            "Effective start date is too far in the past",
            effective_start_date=start_date,
        )
    return None

components.employment.public.constants

EMPLOYMENT_SCHEMA_NAME module-attribute

EMPLOYMENT_SCHEMA_NAME = 'employment'

components.employment.public.country_gateway

CompanyInformation dataclass

CompanyInformation(
    display_name, account_id, is_opt_out_for_offshoring=None
)

Company information returned by the country gateway.

account_id instance-attribute

account_id

display_name instance-attribute

display_name

is_opt_out_for_offshoring class-attribute instance-attribute

is_opt_out_for_offshoring = None

CountryGateway

Bases: ABC, Generic[_TValues]

Class used by the Employment Component to retrieve local-specific data.

!!! ALL imports to local components MUST be done as local imports. !!!

There should be one implementation of CountryGateway per country that uses the Employment Component. You need to add your country's implementation in components.employment.external.country_gateways

are_companies_in_same_account abstractmethod

are_companies_in_same_account(company_id_1, company_id_2)

Returns True if the two companies are in the same account - false otherwise.

This is used to determine whether a transfer is necessary or not - see here ⧉.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def are_companies_in_same_account(
    self, company_id_1: str, company_id_2: str
) -> bool:
    """
    Returns True if the two companies are in the same account - false otherwise.

    This is used to determine whether a transfer is necessary or not - see [here](https://www.notion.so/alaninsurance/Definitions-employments-sources-etc-1301426e8be780cc9796ee31f49559e5?pvs=4#1341426e8be780e697f3d2c7fd33fe3b).
    """
    ...

get_account_name abstractmethod

get_account_name(account_id)

Retrieves the account's name

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_account_name(self, account_id: UUID) -> str:
    """
    Retrieves the account's name
    """
    ...

get_blocked_invitations_validation_broadcast_id

get_blocked_invitations_validation_broadcast_id()

(Advanced) Get the ID of the daily broadcast to send to admins with stale invitations that need to be validated.

This is used by the send_stale_invitations_validation_broadcasts command to send a daily broadcast to all admins with stale invitations that need to be validated.

If your country does not require Stale Invitations support, you can leave this unimplemented.

Returns:

Type Description
str | None

str | None: The ID of the daily broadcast, or None if no broadcast is required.

Source code in components/employment/public/country_gateway.py
def get_blocked_invitations_validation_broadcast_id(self) -> str | None:
    """
    (Advanced) Get the ID of the daily broadcast to send to admins with stale invitations
    that need to be validated.

    This is used by the `send_stale_invitations_validation_broadcasts` command
    to send a daily broadcast to all admins with stale invitations that need to be validated.

    If your country does not require Stale Invitations support, you can leave this unimplemented.

    Returns:
        str | None: The ID of the daily broadcast, or None if no broadcast is required.
    """
    raise NotImplementedError()

get_company_information abstractmethod

get_company_information(company_id)

Retrieves comprehensive company information including display name, account ID, and settings.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_company_information(
    self,
    company_id: str,
) -> CompanyInformation:
    """
    Retrieves comprehensive company information including display name, account ID, and settings.
    """
    ...

get_consumers_to_notify_for_legacy_backfill

get_consumers_to_notify_for_legacy_backfill()

(Advanced) Gets all employment consumers to notify for legacy backfill.

Typically, consumers are not notified for legacy backfill, as it's data ingested from the country legacy tables so countries already have the data. Some consumers may not look at the "legacy tables" and hence need to be notified for legacy backfill though (e.g. Occupational Health in FR).

If you want the consumer to also be notified for other source types, you should add it to the get_employment_consumers method.

Source code in components/employment/public/country_gateway.py
def get_consumers_to_notify_for_legacy_backfill(
    self,
) -> set[EmploymentConsumer[_TValues]]:
    """
    (Advanced) Gets all employment consumers to notify for legacy backfill.

    Typically, consumers are not notified for legacy backfill,
    as it's data ingested from the country legacy tables so countries already have the data.
    Some consumers may not look at the "legacy tables"
    and hence need to be notified for legacy backfill though (e.g. Occupational Health in FR).

    If you want the consumer to also be notified for other source types,
    you should add it to the get_employment_consumers method.
    """
    return set()

get_employee_identifier_for_country abstractmethod

get_employee_identifier_for_country(extended_values)

Retrieves the Employee Identifier from the provided extended values.

Note: implementation is only required if your country requires Stale Invitations support.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_employee_identifier_for_country(
    self, extended_values: _TValues
) -> str | None:
    """
    Retrieves the Employee Identifier from the provided extended values.

    Note: implementation is only required if your country requires Stale Invitations support.
    """

get_employment_consumers abstractmethod

get_employment_consumers()

Gets all employment consumers contributed by this country.

Notes: 1. ALL Employment Consumers will be called regardless of the country of origin. 2. The function that will be called must have all local code as LOCAL (in-function) imports - otherwise, this breaks Canada (where loading non-CA models is forbidden)

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_employment_consumers(self) -> set[EmploymentConsumer[_TValues]]:
    """
    Gets all employment consumers contributed by this country.

    Notes:
    1. ALL Employment Consumers will be called regardless of the country of origin.
    2. The function that will be called must have all local code as LOCAL (in-function) imports - otherwise, this breaks Canada
    (where loading non-CA models is forbidden)
    """
    ...

get_retry_function

get_retry_function()

(Advanced) Get the function used for retrying Core Blocked Movements.

You should generally not need to implement this.

Source code in components/employment/public/country_gateway.py
def get_retry_function(self) -> RetryFunction[_TValues]:
    """
    (Advanced) Get the function used for retrying Core Blocked Movements.

    You should generally not need to implement this.
    """
    from components.employment.public.api import ingest_employment_declaration

    return ingest_employment_declaration

get_source_detail_for_blocked_movement

get_source_detail_for_blocked_movement(
    _employment_source_data_id,
)

(Advanced) Get detailed information on the blocked movement source.

For example, in France, this is used to extract the source's external provider when the source type is an external API. If not null, it will then be displayed alongside the source type on the blocked movements ops tool.

Source code in components/employment/public/country_gateway.py
def get_source_detail_for_blocked_movement(
    self, _employment_source_data_id: UUID
) -> str | None:
    """
    (Advanced) Get detailed information on the blocked movement source.

    For example, in France, this is used to extract the source's external provider
    when the source type is an external API. If not null, it will then be displayed
    alongside the source type on the blocked movements ops tool.
    """
    return None

get_upstream_retry_handler abstractmethod

get_upstream_retry_handler(source_type)

Retrieves the upstream blocked movement retry function that corresponds to the given source_type.

Parameters:

Name Type Description Default
source_type SourceType

The source type for which to retrieve the retry function - guaranteed to be of the CountryCode that corresponds to this CountryGateway.

required

Returns:

Type Description
UpstreamBlockedMovementRetryFunction[_TValues] | None

UpstreamBlockedMovementRetryFunction[_TValues] | None: The retry function, or None if no retry function is available for the given source type.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_upstream_retry_handler(
    self, source_type: SourceType
) -> UpstreamBlockedMovementRetryFunction[_TValues] | None:
    """
    Retrieves the upstream blocked movement retry function that corresponds to the given source_type.

    Args:
        source_type (SourceType): The source type for which to retrieve the retry function - guaranteed to be of the CountryCode that corresponds to this CountryGateway.

    Returns:
        UpstreamBlockedMovementRetryFunction[_TValues] | None: The retry function, or None if no retry function is available for the given source type.
    """

get_user_admined_company_ids abstractmethod

get_user_admined_company_ids(user_id)

Retrieves the list of company IDs admined by this user.

Note: implementation is only required if your country requires Stale Invitations support.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_user_admined_company_ids(self, user_id: str) -> list[str]:
    """
    Retrieves the list of company IDs admined by this user.

    Note: implementation is only required if your country requires Stale Invitations support.
    """
    ...

get_user_full_name abstractmethod

get_user_full_name(user_id)

Retrieves a user's full name (BaseUser.full_name), or None if the user does not exist

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_user_full_name(self, user_id: str) -> str | None:
    """
    Retrieves a user's full name (BaseUser.full_name), or None if the user does not exist
    """
    ...

last_stale_invite_notification_email_sent_to_admin_on

last_stale_invite_notification_email_sent_to_admin_on(
    company_id,
)

(Advanced) Get the timestamp of the last email sent to an admin of the given company regarding stale invitations.

This is used to determine whether an admin has already been notified of the stale invitations If they have, we will set the stale invitation's status to awaiting_user_response_pending_self_healing which will - count as a terminal status for the purpose of our affiliation speed computation - not notify admins again for this stale invitation.

If your country does not require Stale Invitations support, you can leave this unimplemented.

Parameters:

Name Type Description Default
company_id str

The company ID for which to retrieve the last notification timestamp.

required

Returns: datetime | None: The timestamp of the last email sent, or None if no email has been sent.

Source code in components/employment/public/country_gateway.py
def last_stale_invite_notification_email_sent_to_admin_on(
    self,
    company_id: str,
) -> datetime | None:
    """
    (Advanced) Get the timestamp of the last email sent to an admin of the given company
    regarding stale invitations.

    This is used to determine whether an admin has already been notified of the stale invitations
    If they have, we will set the stale invitation's status to `awaiting_user_response_pending_self_healing`
    which will
    - count as a terminal status for the purpose of our affiliation speed computation
    - not notify admins again for this stale invitation.

    If your country does not require Stale Invitations support, you can leave this unimplemented.

    Args:
        company_id (str): The company ID for which to retrieve the last notification timestamp.
    Returns:
        datetime | None: The timestamp of the last email sent, or None if no email has been sent.
    """
    raise NotImplementedError()

UpstreamBlockedMovementRetryFunction

Bases: Protocol, Generic[_TValues]

A function that retries an Upstream Blocked Movement:

__call__

__call__(
    upstream_data, root_employment_source_data_id, commit
)

Retry an Upstream Blocked Movement from the provided upstream_data.

Parameters:

Name Type Description Default
upstream_data dict

The upstream_data for the UpstreamBlockedMovement - this is the dict that was provided in the UpstreamBlockedMovementCreator

required
root_employment_source_data_id UUID

The Employment Source Data ID that corresponds to this upstream blocked movement - in order to correctly correlate the upstream blocked movement with the retry's results, you must set the EmploymentDeclaration's source_information field to this UUID.

required
commit bool

As a general rule, the Upstream Retry Handler should NOT commit - disregard this parameter.

required

Returns:

Type Description
EmploymentDeclaration[_TValues] | None

EmploymentDeclaration[_TValues] - if an EmploymentDeclaration needs to be processed as part of the retry

EmploymentDeclaration[_TValues] | None

None - if no EmploymentDeclaration needs to be processed as part of the retry.

Source code in components/employment/public/country_gateway.py
def __call__(
    self,
    # the data as reported in the upstream blocked movement via upstream_data, potentially with some values overridden
    upstream_data: dict[str, Any],
    # root_employment_source_data_id is the employment source data ID of the parent.
    # You must set your EmploymentDeclaration's source_information field with this UUID to correctly link the
    # consequences of the retried ingestion.
    root_employment_source_data_id: UUID,
    commit: bool,
) -> (
    EmploymentDeclaration[_TValues]
    | None  # Return None if nothing needs to be done in the end
):
    """
    Retry an Upstream Blocked Movement from the provided upstream_data.

    Args:
        upstream_data (dict): The `upstream_data` for the `UpstreamBlockedMovement` - this is the dict that was provided in the `UpstreamBlockedMovementCreator`
        root_employment_source_data_id (UUID): The Employment Source Data ID that corresponds to this upstream blocked movement - in order to correctly correlate the upstream blocked movement with the retry's results, you **must** set the EmploymentDeclaration's source_information field to this UUID.
        commit (bool): As a general rule, the Upstream Retry Handler should NOT commit - disregard this parameter.

    Returns:
        EmploymentDeclaration[_TValues] - if an EmploymentDeclaration needs to be processed as part of the retry
        None - if no EmploymentDeclaration needs to be processed as part of the retry.
    """
    ...

components.employment.public.entities

CompanyId module-attribute

CompanyId = str

CoreBlockedMovement dataclass

CoreBlockedMovement(
    *,
    id,
    user_id,
    company_id,
    source_type,
    error_code,
    error_message,
    context,
    employment_id,
    status,
    note,
    input_override,
    parent_blocked_movement_id,
    new_blocked_movements_after_retry,
    upstream_parent_blocked_movement_id,
    employment_source_data_id
)

Bases: DataClassJsonMixin

A blocked movement representing an error that occurred either:

  • Within the Employment Component itself
  • In one of the consumers of the Employment Component

company_id instance-attribute

company_id

context instance-attribute

context

employment_id instance-attribute

employment_id

employment_source_data_id instance-attribute

employment_source_data_id

error_code instance-attribute

error_code

error_message instance-attribute

error_message

from_core_blocked_movement_model classmethod

from_core_blocked_movement_model(
    core_blocked_movement_model,
)

Internal conversion from the DB model to this dataclass

Source code in components/employment/public/entities.py
@classmethod
def from_core_blocked_movement_model(
    cls, core_blocked_movement_model: "CoreBlockedMovementModel"
) -> "CoreBlockedMovement":
    """
    Internal conversion from the DB model to this dataclass
    """
    return cls(
        id=core_blocked_movement_model.id,
        user_id=core_blocked_movement_model.user_id,
        company_id=core_blocked_movement_model.company_id,
        source_type=parse_base_source_type(core_blocked_movement_model.source_type),
        error_code=core_blocked_movement_model.error_code,
        error_message=core_blocked_movement_model.error_message,
        context=core_blocked_movement_model.context,
        employment_id=core_blocked_movement_model.employment_id,
        status=core_blocked_movement_model.status,
        note=core_blocked_movement_model.note,
        input_override=core_blocked_movement_model.input_override,
        parent_blocked_movement_id=core_blocked_movement_model.parent_blocked_movement_id,
        new_blocked_movements_after_retry=[
            new_blocked_movement.id
            for new_blocked_movement in core_blocked_movement_model.new_blocked_movements_after_retry
        ]
        if core_blocked_movement_model.new_blocked_movements_after_retry
        else None,
        upstream_parent_blocked_movement_id=core_blocked_movement_model.upstream_parent_blocked_movement_id,
        employment_source_data_id=core_blocked_movement_model.employment_source_data_id,
    )

id instance-attribute

id

input_override instance-attribute

input_override

is_active

is_active()

Returns True if this blocked movement is considered to be active, i.e. something will happen and its status is expected to evolve (either by itself or via Ops action).

Source code in components/employment/public/entities.py
def is_active(self) -> bool:
    """
    Returns True if this blocked movement is considered to be active, i.e. something will happen and its status is
    expected to evolve (either by itself or via Ops action).
    """
    return self.status in ACTIVE_BLOCKED_MOVEMENT_STATUSES

is_cancelled

is_cancelled()

Returns True if this blocked movement is considered to be cancelled, i.e. it was not and will not be processed by the Employment Component.

Source code in components/employment/public/entities.py
def is_cancelled(self) -> bool:
    """
    Returns True if this blocked movement is considered to be cancelled, i.e. it was not and will not be processed
    by the Employment Component.
    """
    return self.status in CANCELLED_BLOCKED_MOVEMENT_STATUSES

new_blocked_movements_after_retry instance-attribute

new_blocked_movements_after_retry

note instance-attribute

note

parent_blocked_movement_id instance-attribute

parent_blocked_movement_id

source_type instance-attribute

source_type

status instance-attribute

status

upstream_parent_blocked_movement_id instance-attribute

upstream_parent_blocked_movement_id

user_id instance-attribute

user_id

CoreEmploymentVersion dataclass

CoreEmploymentVersion(
    *,
    employment_id,
    user_id,
    company_id,
    start_date=isodate_field(),
    end_date=optional_isodate_field(),
    is_cancelled,
    source_type,
    employment_change_types,
    employment_source_data_id,
    external_employee_id=None
)

Bases: DataClassJsonMixin

A version of core information of an employment in the Employment Component.

This may or may not be the latest version of the employment depending on how you got this object.

company_id instance-attribute

company_id

The ID of the employment's company

employment_change_types instance-attribute

employment_change_types

The change types that happened for this employment

employment_id instance-attribute

employment_id

The ID of the employment within the Employment Component

employment_source_data_id instance-attribute

employment_source_data_id

The Employment Source Data ID which this Core Employment Version comes from

end_date class-attribute instance-attribute

end_date = optional_isodate_field()

End date of the employment. As much as possible, this should correspond to the real work contract's end date.

external_employee_id class-attribute instance-attribute

external_employee_id = None

An ID for the employee on the customer's HRIS.

In France, this corresponds to the "Matricule RH". Completely optional, can be None.

from_core_employment_version_model classmethod

from_core_employment_version_model(
    core_employment_version_model,
)

Internal conversion from the DB model to this dataclass

Source code in components/employment/public/entities.py
@classmethod
def from_core_employment_version_model(
    cls, core_employment_version_model: "CoreEmploymentVersionModel"
) -> "CoreEmploymentVersion":
    """
    Internal conversion from the DB model to this dataclass
    """
    return cls(
        employment_id=core_employment_version_model.employment_id,
        user_id=core_employment_version_model.user_id,
        company_id=core_employment_version_model.company_id,
        start_date=core_employment_version_model.start_date,
        end_date=core_employment_version_model.end_date,
        is_cancelled=core_employment_version_model.is_cancelled,
        source_type=parse_base_source_type(
            core_employment_version_model.source_type
        ),
        employment_change_types=set(
            core_employment_version_model.employment_change_types
        ),
        employment_source_data_id=core_employment_version_model.employment_source_data_id,  # type: ignore[arg-type] # will be non nullable in the next PR
        external_employee_id=core_employment_version_model.external_employee_id,
    )

is_cancelled instance-attribute

is_cancelled

If True, the employment is cancelled and "no longer counts", as if it was deleted

source_type instance-attribute

source_type

The source from which this employment version was created

start_date class-attribute instance-attribute

start_date = isodate_field()

Start date of the employment. As much as possible, this should correspond to the real work contract's start date.

user_id instance-attribute

user_id

The ID of the employment's user.

EmploymentChange dataclass

EmploymentChange(
    core_employment_version,
    extended_employment_updates,
    previous_core_employment_version,
    country_code,
)

Bases: DataClassJsonMixin, Generic[_TValues]

A change that happened as a result of an action in the Employment Component.

To know what changes happened: - Use core_employment_version.employment_change_types to know if the employment's state changed (e.g. invitation, termination, etc., see EmploymentChangeType for the full list). This can be empty if the only changes were updates to extended values. - Use extended_employment_updates to know which extended values changed, got added, got removed, etc.

core_employment_version instance-attribute

core_employment_version

The Core Employment Version related to this change.

country_code instance-attribute

country_code

The country from/for which this EmploymentChange emanates.

This will generally be hinted at by core_employment_version.source_type, but not all source types are "local": some are global and will provide no indication of their origin. You should thus rely on this field, which is guaranteed to be set.

extended_employment_updates instance-attribute

extended_employment_updates

Extended employment value updates related to this change.

NB: Will only contain brand new values updated as part of this change, will NOT contain all of the known extended values for the employment.

previous_core_employment_version instance-attribute

previous_core_employment_version

The last known core_employment_version for the employment, which is replaced (either literally or logically, see below) by the EmploymentChange's core_employment_version.

Note that this does not necessarily refer to an employment with an identical employment_id (e.g. transfers use previous_core_employment_version to indicate which CoreEmploymentVersion we are transfering from, but it is in another company, hence we have a previous_core_employment_version with a different employment_id)

EmploymentConsumer

Bases: Protocol, Generic[_TValues]

A consumer of Employment changes.

Employment Consumers are contributed via the CountryGateway.get_employment_consumers function.

__call__

__call__(employment_change, event_bus_orchestrator)

React to an employment changing. This function has a few rules:

  • It must not commit
  • All side-effects, such as sending an email, must be published to the EventBusOrchestrator, e.g.
Source code in components/employment/public/entities.py
def __call__(
    self,
    employment_change: EmploymentChange[_TValues],
    event_bus_orchestrator: EventBusOrchestrator,
) -> None:
    """
    React to an employment changing. This function has a few rules:

    - It **must not** commit
    - All side-effects, such as sending an email, must be published to the EventBusOrchestrator, e.g.
    """
    ...

__name__ instance-attribute

__name__

EmploymentDeclaration dataclass

EmploymentDeclaration(
    *,
    user_id,
    company_id,
    start_date=isodate_field(),
    end_date=optional_isodate_field(),
    source_type,
    extended_informations,
    external_employee_id=None
)

Bases: DataClassJsonMixin, Generic[_TValues]

An employment declaration that can be ingested by the Employment Component via the ingest_employment_declaration.

Part of the Declarative API, see https://www.notion.so/alaninsurance/Employment-Component-API-entrypoints-1261426e8be780b0ab95d8a8ee57c5cd?pvs=4 ⧉

company_id instance-attribute

company_id

ID of the company where there is an employment

end_date class-attribute instance-attribute

end_date = optional_isodate_field()

End date of the employment. As much as possible, this should correspond to the real work contract's end date.

extended_informations instance-attribute

extended_informations

Extended information about the employment.

Upon ingestion, new information provided in this array is stored and sent to consumers.

external_employee_id class-attribute instance-attribute

external_employee_id = None

An ID for the employee on the customer's HRIS.

In France, this corresponds to the "Matricule RH". Completely optional, can be None.

source_type class-attribute instance-attribute

source_type = field(
    metadata=config(decoder=_localized_source_type_decoder)
)

The source type from which this employment comes from. Either:

  • A local source type, e.g. SourceType.fr_bulk_invite
  • A global source type associated with a country code, e.g. GlobalSourceType.legacy_backfill.with_country(CountryCode.fr)

start_date class-attribute instance-attribute

start_date = isodate_field()

Start date of the employment. As much as possible, this should correspond to the real work contract's start date.

to_core_employment_version

to_core_employment_version(
    employment_change_types,
    employment_id,
    employment_source_data_id,
)

Converts this EmploymentDeclaration into a CoreEmploymentVersion.

Intended for internal use by the Employment Component.

Source code in components/employment/public/entities.py
def to_core_employment_version(
    self,
    employment_change_types: set[EmploymentChangeType],
    employment_id: UUID | None,
    employment_source_data_id: UUID,
) -> "CoreEmploymentVersion":
    """
    Converts this EmploymentDeclaration into a CoreEmploymentVersion.

    Intended for internal use by the Employment Component.
    """
    return CoreEmploymentVersion(
        employment_id=employment_id or uuid4(),
        user_id=self.user_id,
        company_id=self.company_id,
        start_date=self.start_date,
        end_date=self.end_date,
        is_cancelled=False,
        source_type=self.source_type.base_source_type,
        employment_change_types=employment_change_types,
        external_employee_id=self.external_employee_id,
        employment_source_data_id=employment_source_data_id,
    )

user_id instance-attribute

user_id

ID of the user who has an employment

EmploymentSourceData dataclass

EmploymentSourceData(
    *,
    id,
    source_type,
    raw_data,
    metadata,
    core_employment_versions,
    extended_employment_updates,
    upstream_blocked_movements,
    core_blocked_movements
)

Bases: DataClassJsonMixin

Represents data attached to an ingestion in the Employment Component.

This corresponds to a SourceInformation that was attached to some action(s) done in the Employment Component.

Concretely, this may correspond to a UCE action on Marmot, a row in an Excel file, an admin action on their dashboard, etc.

core_blocked_movements instance-attribute

core_blocked_movements

core_employment_versions instance-attribute

core_employment_versions

extended_employment_updates instance-attribute

extended_employment_updates

from_employment_source_data_model staticmethod

from_employment_source_data_model(
    employment_source_data_model,
)

Internal conversion from the DB model to this dataclass

Source code in components/employment/public/entities.py
@staticmethod
def from_employment_source_data_model(
    employment_source_data_model: "EmploymentSourceDataModel",
) -> "EmploymentSourceData":
    """
    Internal conversion from the DB model to this dataclass
    """
    return EmploymentSourceData(
        id=employment_source_data_model.id,
        source_type=parse_base_source_type(
            employment_source_data_model.source_type
        ),
        raw_data=employment_source_data_model.raw_data,
        metadata=employment_source_data_model.source_metadata,
        core_employment_versions=[
            CoreEmploymentVersion.from_core_employment_version_model(
                core_employment_version_model
            )
            for core_employment_version_model in employment_source_data_model.core_employment_versions
        ],
        extended_employment_updates=[
            ExtendedEmploymentUpdate.from_extended_employment_update_model(
                extended_employment_update_model
            )
            for extended_employment_update_model in employment_source_data_model.extended_employment_updates
        ],
        core_blocked_movements=[
            CoreBlockedMovement.from_core_blocked_movement_model(
                core_blocked_movement_model
            )
            for core_blocked_movement_model in employment_source_data_model.core_blocked_movements
        ],
        upstream_blocked_movements=[
            UpstreamBlockedMovement.from_upstream_blocked_movement_model(
                upstream_blocked_movement_model
            )
            for upstream_blocked_movement_model in employment_source_data_model.upstream_blocked_movements
        ],
    )

id instance-attribute

id

ID of this Employment Source Data

metadata instance-attribute

metadata

Mutable metadata associated with this Employment Source Data. No specific format is expected (other than this has to be serializable to JSON). Can be used to store simple logging data, or to implement advanced custom flows on top of the Employment Component.

raw_data instance-attribute

raw_data

The raw data, as it was received by the upstream. Taken from SourceInformation.raw_data

source_type instance-attribute

source_type

The source which created this Employment Source Data

upstream_blocked_movements instance-attribute

upstream_blocked_movements

ExtendedEmploymentUpdate dataclass

ExtendedEmploymentUpdate(
    employment_id,
    validity_period,
    source_type,
    values,
    employment_source_data_id,
)

Bases: DataClassJsonMixin, Generic[_TValues]

An update to the extended values of an employment.

NB: Do not directly use this class for getting all of the latest extended values of an employment, use get_employment_values_on instead.

employment_id instance-attribute

employment_id

The ID of the employment within the Employment Component

employment_source_data_id instance-attribute

employment_source_data_id

The Employment Source Data ID which this Core Employment Version comes from

from_extended_employment_update_model classmethod

from_extended_employment_update_model(
    extended_employment_update_model,
)

Internal conversion from the DB model to this dataclass

Source code in components/employment/public/entities.py
@classmethod
def from_extended_employment_update_model(
    cls, extended_employment_update_model: "ExtendedEmploymentUpdateModel"
) -> "ExtendedEmploymentUpdate[_TValues]":
    """
    Internal conversion from the DB model to this dataclass
    """
    return cls(
        employment_id=extended_employment_update_model.employment_id,
        validity_period=extended_employment_update_model.validity_period,
        source_type=parse_base_source_type(
            extended_employment_update_model.source_type
        ),
        values=cast("_TValues", extended_employment_update_model.values),
        employment_source_data_id=extended_employment_update_model.employment_source_data_id,  # type: ignore[arg-type] # will be non nullable in the next PR
    )

source_type instance-attribute

source_type

Source which created this ExtendedEmploymentUpdate

validity_period instance-attribute

validity_period

Validity period for the values in this update

values instance-attribute

values

Values associated to this ExtendedEmploymentUpdate

ExtendedInformation dataclass

ExtendedInformation(validity_period, values)

Bases: DataClassJsonMixin, Generic[_TValues]

Extended information related to an employment, see also EmploymentDeclaration.extended_informations.

validity_period instance-attribute

validity_period

A validity period for the extended information. There are several possibilities:

  • validity_period=None: Information is valid for the entire duration of the employment
  • validity_period=ValidityPeriod(...): Information is valid for a specific duration within the employment.

values instance-attribute

values

The values in question, as a dict. Keys must always be strings, while values must be anything that can easily be translated to JSON by SQLAlchemy. We recommend only using simple types here (numbers, booleans, strings, etc.) to avoid serialization/deserialization issues.

Refer to ExtendedValuesDict for more information.

ExtendedValuesDict

Bases: TypedDict

Subclass this class to define your extended values like so (the total=False is mandatory):

class CcExtendedValues(ExtendedValuesDict, total=False):
    local_identifier: str
    something_optional: str | None

CcEmploymentDeclaration = EmploymentDeclaration[CcExtendedValues]

# ...

employment_declaration = CcEmploymentDeclaration(
    user_id=...
    ...
)

Cc = Country Code. For example, in France, you would use the FrExtendedValues name.

Typing rules

Because this dictionary is serialized and deserialized by SQLAlchemy, you must only use simple types, such as:

  • Strings (str)
  • Numbers (int/float)
  • Booleans (bool)
  • None
  • Lists of the aforementioned types (list[...])
  • Dicts with string keys and values of the aforementioned types (dict[str, ...])

Using anything else may lead to errors at runtime. We unfortunately cannot check this right via mypy now, see PEP 728.

Having None as a valid type is significant:

  • Not specifying None in your type (e.g. foo: str) means that it can be absent or present, and if present, it will not be None (*equivalent to T | undefined in TypeScript)
  • Specifying None in your type (e.g. bar: str | None) means that it can be absent or present, and if present, it can be None (*equivalent to T | null | undefined in TypeScript)
Sensitive values (Kay anonymization)

The JSON is encrypted on Turing, but is passed through normally on Kay. In order to anonmyize extended values...

  • ... with an AlwaysNone strategy, use the SensitiveNone type alias:
  • ... with a custom strategoy, use Annotated[..., SensitiveValue(kay.SomeStrategy())]:
class CcExtendedValues(ExtendedValuesDict, total=False):
    something_sensitive: SensitiveNone[str | None]
    some_email: Annotated[str | None, SensitiveValue(kay.Something())]

The resulting Kay strategy is built by the Employment Component, from all known ExtendedValuesDict subclasses.

GenericBlockedMovementInfo dataclass

GenericBlockedMovementInfo(error_code)

Bases: DataClassJsonMixin

This dataclass is meant to represent both BlockedMovementInfo and UpstreamBlockedMovementInfo for the public API

error_code instance-attribute

error_code

IngestionResult dataclass

IngestionResult(success, employment_source_data)

Bases: DataClassJsonMixin

The result of a call to the ingest_employment_declaration function.

employment_source_data instance-attribute

employment_source_data

The EmploymentSourceData object associated with this ingestion, created from the provided SourceInformation..

success instance-attribute

success

If True, the declaration was processed successfully. If False, the declaration failed to be processed, and a blocked movement was created.

Inspect employment_source_data to find out more.

NoContext dataclass

NoContext()

Bases: UpstreamBlockedMovementContext

A simple context that contains nothing for Upstream Blocked Movements.

RULE_CASE_GETTER_BY_EMPLOYMENT_CHANGE_TYPE module-attribute

RULE_CASE_GETTER_BY_EMPLOYMENT_CHANGE_TYPE = {
    invitation: lambda **_: invite,
    termination: lambda end_date, **_: (
        terminate
        if not is_end_date_too_far_in_the_past(end_date)
        else terminate_with_end_date_more_than_100_days_in_the_past
    ),
    start_date_change: lambda **_: change_start_date,
    end_date_change: lambda end_date, **_: (
        change_end_date
        if not is_end_date_too_far_in_the_past(end_date)
        else change_end_date_more_than_100_days_in_the_past
    ),
    resumption: lambda **_: resume,
    cancellation: lambda **_: cancel,
    transfer: lambda start_date, **_: (
        transfer
        if not is_transfer_date_too_far_in_the_past(
            start_date
        )
        else transfer_more_than_100_days_in_the_past
    ),
    external_employee_id_change: lambda external_employee_id, **_: (
        change_external_employee_id
        if external_employee_id is not None
        else change_external_employee_id_if_none
    ),
}

RetryFunction

Bases: Protocol, Generic[_TValues]

Function used for retrying an EmploymentDeclaration (as part of both Upstream and Core Blocked Movements).

This is a technical detail that should only be used in cases that need to do something, such as a legacy backfill, before sending an EmploymentDeclaration.

__call__

__call__(
    employment_declaration,
    source_information,
    commit,
    event_bus_orchestrator=None,
    retried_blocked_movement=None,
    upstream_retried_blocked_movement=None,
    source_rules_override=None,
)

Process an EmploymentDeclaration - signature is the exact same as the ingest_employment_declaration function.

Source code in components/employment/public/entities.py
def __call__(
    self,
    employment_declaration: EmploymentDeclaration[_TValues],
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
    retried_blocked_movement: "CoreBlockedMovementModel | None" = None,
    upstream_retried_blocked_movement: "UpstreamBlockedMovementModel | None" = None,
    source_rules_override: SourceRules | None = None,
) -> IngestionResult:
    """
    Process an EmploymentDeclaration - signature is the exact same as the `ingest_employment_declaration` function.
    """
    ...

SensitiveNone module-attribute

SensitiveNone = Annotated[_T, SensitiveValue(AlwaysNone())]

SensitiveValue dataclass

SensitiveValue(kay_strategy)

Identifies a value as sensitive - the provided Kay strategy will be applied on this value (as part of kay.StructedJsonStrategy()).

Must be used as an annotation on ExtendedValuesDict fields, like this:

invite_email: Annotated[str | None, SensitiveValue(kay.AnonymizedEmail("invite"))]

For the AlwaysNone strategy, use the SensitiveNone alias instead:

something_sensitive: SensitiveNone[str | None]

kay_strategy instance-attribute

kay_strategy

SourceInformation

Bases: DataClassJsonMixin

Provides additional information on the source of an EmploymentDeclaration or any action within the Employment Component.

This item allows accurately tracking the results of an action that impacts the Employment Component. Once processed, the SourceInformation object becomes a row in the employment.employment_source_data table. All models created as a result of the ingestion or action are correlated to this EmploymentSourceData row via the employment_source_data_id.

There are several use cases for the SourceInformation object: - Keep a log of the raw data, as it is received by you. This could be an Excel row, the body of a POST API call, etc. - Keeping queryable and mutable metadata about an ingestion in the Employment Component.

Re-using SourceInformation objects

If your use case requires doing multiple things in the Employment Component, you can reuse the SourceInformation object.

For example, if your use case requires cancelling an employment, then ingesting an EmploymentDeclaration, you can use the same SourceInformation object to correlate these actions together:

source_information = SourceInformation(raw_data={...}, metadata={...})
cancel(employment_id=..., source_information=source_information)

declaration = EmploymentDeclaration(...)
result = ingest_employment_declaration(employment_declaration=declaration, source_information=source_information)

employment_source_data = result.employment_source_data
employment_source_data.core_employment_versions  # will contain the cancelled employment + the new ingestion's result

metadata class-attribute instance-attribute

metadata = field(on_setattr=frozen)

Additional mutable metadata. For example, this could contain, IDs to the various files used to build a declaration, a flag, etc.

This metadata is mutable, see update_employment_source_data_metadata_with

raw_data class-attribute instance-attribute

raw_data = field(on_setattr=frozen)

The raw data your upstream used to build the EmploymentDeclaration.

For example, if creating an EmploymentDeclaration because of an API called from a third party, you could put the body of this API call here. Philosophically, your upstream would be able to fully reconstruct the EmploymentDeclaration with the information in raw_data.

Used mainly for logging purposes. Once the employment declaration is ingested, this value is read-only.

source_type class-attribute instance-attribute

source_type = field(default=None, on_setattr=frozen)

The source type for this SourceInformation.

You do not need to set this value: by default, the SourceInformation will take on the source_type it is first used with. For example, here, it will automatically get the fr_admin_dashboard source type:

source_information = SourceInformation(...)
cancel(..., source_type=SourceType.fr_admin_dashboard, source_information=source_information)
# -> Resulting EmploymentSourceData entry has source type `fr_admin_dashboard`

Setting this value forces a specific source type to be used. This is useful in case the first of this SourceInformation does something unrelated, like a Legacy Backfill in France.

source_information = SourceInformation(..., source_type=SourceType.fr_admin_dashbaord)
cancel(..., source_type=GlobalSourceType.legacy_backfill, source_information=source_information)
# -> Resulting EmploymentSourceData entry has source type `fr_admin_dashboard`

SourceInformationLike module-attribute

SourceInformationLike = SourceInformation | UUID

A type that represents anything that can be converted into an EmploymentSourceDataModel, and that is usable from the outside (as things outside of the Employment Component don't have access to the raw model).

UUID is a more advanced option to correctly support edge cases. You should use SourceInformation in almost all cases.

SourceRules dataclass

SourceRules(
    invite,
    terminate,
    change_start_date,
    change_end_date,
    resume,
    cancel,
    transfer,
    change_external_employee_id,
    terminate_with_end_date_more_than_100_days_in_the_past,
    change_end_date_more_than_100_days_in_the_past,
    change_external_employee_id_if_none,
    transfer_more_than_100_days_in_the_past,
)

Bases: DataClassJsonMixin

This dataclass allows to define what action to take for each "rule case". The rules are defined for each source type in sources_configuration.py.

Rule cases are changes that can happen to an employee. The main rule cases map 1-1 to EmploymentChangeTypes. There are also more specific rule cases to handle edge cases.

__post_init__

__post_init__()

In order to be able to retry old blocked movements (which contain JSONified objects of this class), we need to provide a fallback for new cases. This is the way to do it with Python dataclasses whose default value depends on other fields.

Source code in components/employment/public/entities.py
def __post_init__(self) -> None:
    """
    In order to be able to retry old blocked movements (which contain JSONified objects of this class), we need to
    provide a fallback for new cases. This is the way to do it with Python dataclasses whose default value depends
    on other fields.
    """
    if (
        cast("RuleCaseAction | None", self.transfer_more_than_100_days_in_the_past)
        is None
    ):
        self.transfer_more_than_100_days_in_the_past = self.transfer

cancel instance-attribute

cancel

change_end_date instance-attribute

change_end_date

change_end_date_more_than_100_days_in_the_past instance-attribute

change_end_date_more_than_100_days_in_the_past

change_external_employee_id instance-attribute

change_external_employee_id

change_external_employee_id_if_none instance-attribute

change_external_employee_id_if_none

The employment's external employee ID is changed (EmploymentChangeType.external_employee_id_change), but the new external employee ID is None, which can mean that it is not known.

This allows you to decide if None values you receive should clear the external employee ID value of the employment (= apply) or None values should be ignored (= ignore)

change_start_date instance-attribute

change_start_date

get_rule_case_action_for_employment_change_type

get_rule_case_action_for_employment_change_type(
    employment_change_type,
    start_date,
    end_date,
    external_employee_id,
)

Define how to compute the rule case for a given employment change type and apply it to the employment_change_type parameter.

Source code in components/employment/public/entities.py
def get_rule_case_action_for_employment_change_type(
    self,
    employment_change_type: EmploymentChangeType,
    start_date: date | None,
    end_date: date | None,
    external_employee_id: str | None,
) -> tuple[RuleCaseAction, RuleCase]:
    """
    Define how to compute the rule case for a given employment change type
    and apply it to the employment_change_type parameter.
    """
    rule_case = RULE_CASE_GETTER_BY_EMPLOYMENT_CHANGE_TYPE[employment_change_type](
        start_date=start_date,
        end_date=end_date,
        external_employee_id=external_employee_id,
    )
    rule_case_name = RuleCase(rule_case)
    rule_case_action = getattr(self, rule_case)
    current_employment_logger.info(
        f"Will {rule_case_action} rule case {rule_case_name} for change type {employment_change_type}",
        layer="ingest_employment_declaration",
    )

    return rule_case_action, rule_case_name

invite instance-attribute

invite

resume instance-attribute

resume

terminate instance-attribute

terminate

terminate_with_end_date_more_than_100_days_in_the_past instance-attribute

terminate_with_end_date_more_than_100_days_in_the_past

transfer instance-attribute

transfer

transfer_more_than_100_days_in_the_past instance-attribute

transfer_more_than_100_days_in_the_past

StaleInvitationInformation dataclass

StaleInvitationInformation(
    blocked_movement_id,
    company_id,
    company_display_name,
    employee_full_name,
    employee_email,
    employee_identifier,
    start_date,
    admin_was_notified,
    created_at,
    source_type,
    country_code,
)

Bases: DataClassJsonMixin

admin_was_notified instance-attribute

admin_was_notified

Whether the admin was notified of the stale invitation - through CIO campaign cio-broadcast-221

blocked_movement_id instance-attribute

blocked_movement_id

company_display_name instance-attribute

company_display_name

company_id instance-attribute

company_id

country_code instance-attribute

country_code

created_at instance-attribute

created_at

employee_email instance-attribute

employee_email

employee_full_name instance-attribute

employee_full_name

employee_identifier instance-attribute

employee_identifier

source_type instance-attribute

source_type

start_date instance-attribute

start_date

StaleInvitationResolutionParameters dataclass

StaleInvitationResolutionParameters(
    cancel,
    can_have_target_start_date_more_than_100_days_in_the_past,
    blocked_movement_id,
    start_date_override=None,
)

Bases: DataClassJsonMixin

__post_init__

__post_init__()
Source code in components/employment/public/entities.py
def __post_init__(self) -> None:  # noqa: D105
    if (
        self.cancel
        and self.can_have_target_start_date_more_than_100_days_in_the_past
    ):
        raise ValueError(
            "Only one of cancel or can_have_target_start_date_more_than_100_days_in_the_past must be provided"
        )

blocked_movement_id instance-attribute

blocked_movement_id

can_have_target_start_date_more_than_100_days_in_the_past instance-attribute

can_have_target_start_date_more_than_100_days_in_the_past

cancel instance-attribute

cancel

start_date_override class-attribute instance-attribute

start_date_override = None

UpstreamBlockedMovement dataclass

UpstreamBlockedMovement(
    *,
    id,
    user_id,
    company_id,
    source_type,
    error_code,
    error_message,
    context,
    status,
    note,
    input_override,
    parent_blocked_movement_id,
    employment_source_data_id
)

Bases: DataClassJsonMixin

An upstream blocked movement, i.e. an error that happened within an Upstream (e.g. while building an Employment Declaration)

company_id instance-attribute

company_id

context instance-attribute

context

employment_source_data_id instance-attribute

employment_source_data_id

error_code instance-attribute

error_code

error_message instance-attribute

error_message

from_upstream_blocked_movement_model classmethod

from_upstream_blocked_movement_model(
    upstream_blocked_movement_model,
)

Internal conversion from the DB model to this dataclass

Source code in components/employment/public/entities.py
@classmethod
def from_upstream_blocked_movement_model(
    cls,
    upstream_blocked_movement_model: "UpstreamBlockedMovementModel",
) -> "UpstreamBlockedMovement":
    """
    Internal conversion from the DB model to this dataclass
    """
    return cls(
        id=upstream_blocked_movement_model.id,
        user_id=upstream_blocked_movement_model.user_id,
        company_id=upstream_blocked_movement_model.company_id,
        source_type=parse_base_source_type(
            upstream_blocked_movement_model.source_type
        ),
        error_code=upstream_blocked_movement_model.error_code,
        error_message=upstream_blocked_movement_model.error_message,
        context=upstream_blocked_movement_model.context,
        status=upstream_blocked_movement_model.status,
        note=upstream_blocked_movement_model.note,
        input_override=upstream_blocked_movement_model.input_override,
        parent_blocked_movement_id=upstream_blocked_movement_model.parent_blocked_movement_id,
        employment_source_data_id=upstream_blocked_movement_model.employment_source_data_id,
    )

id instance-attribute

id

input_override instance-attribute

input_override

is_active

is_active()

Returns True if this blocked movement is considered to be active, i.e. something will happen and its status is expected to evolve (either by itself or via Ops action).

Source code in components/employment/public/entities.py
def is_active(self) -> bool:
    """
    Returns True if this blocked movement is considered to be active, i.e. something will happen and its status is
    expected to evolve (either by itself or via Ops action).
    """
    return self.status in ACTIVE_BLOCKED_MOVEMENT_STATUSES

is_cancelled

is_cancelled()

Returns True if this blocked movement is considered to be cancelled, i.e. it was not and will not be processed by the Employment Component.

Source code in components/employment/public/entities.py
def is_cancelled(self) -> bool:
    """
    Returns True if this blocked movement is considered to be cancelled, i.e. it was not and will not be processed
    by the Employment Component.
    """
    return self.status in CANCELLED_BLOCKED_MOVEMENT_STATUSES

note instance-attribute

note

parent_blocked_movement_id instance-attribute

parent_blocked_movement_id

source_type instance-attribute

source_type

status instance-attribute

status

user_id instance-attribute

user_id

UpstreamBlockedMovementContext dataclass

UpstreamBlockedMovementContext()

Bases: DataClassJsonMixin

Root class for the context field of Upstream Blocked Movements

UpstreamBlockedMovementInfo dataclass

UpstreamBlockedMovementInfo(
    user_id, company_id, error_code, error_message, context
)

Bases: DataClassJsonMixin

Information to be used by UpstreamError implementations that are used to create an Upstream Blocked Movement

company_id instance-attribute

company_id

context instance-attribute

context

error_code instance-attribute

error_code

error_message instance-attribute

error_message

user_id instance-attribute

user_id

UserId module-attribute

UserId = str

components.employment.public.enums

ACTIVE_BLOCKED_MOVEMENT_STATUSES module-attribute

ACTIVE_BLOCKED_MOVEMENT_STATUSES = frozenset(
    {
        pending,
        awaiting_user_response,
        pending_self_healing,
        awaiting_user_response_self_healing,
    }
)

An immutable set containing all of the BlockedMovementStatuses considered "active", i.e. actions (of whichever kind) are required or are being done.

BLOCKED_MOVEMENT_SLA_TERMINAL_STATUSES module-attribute

BLOCKED_MOVEMENT_SLA_TERMINAL_STATUSES = frozenset(
    {
        cancelled,
        resolved,
        awaiting_user_response,
        awaiting_user_response_self_healing,
        automatically_resolved,
        automatically_ignored,
        automatically_cancelled,
    }
)

BlockedMovementStatus

Bases: AlanBaseEnum

The status of a blocked movement.

For more information, see https://www.notion.so/alaninsurance/Blocked-Movements-246a87a20f7a4cd388aee8a6327c4148?pvs=4 ⧉

automatically_cancelled class-attribute instance-attribute

automatically_cancelled = 'automatically_cancelled'

A self-healing blocked movement (whose status was pending_self_healing) that was cancelled as part of its automatic self-healing process.

This is equivalent to cancelled for regular pending blocked movements.

automatically_ignored class-attribute instance-attribute

automatically_ignored = 'automatically_ignored'

A blocked movement that was created, but immediately ignored.

When a source rule dictates that a movement shall be ignored, instead of silently ignoring, a blocked movement with this status is created instead.

Automatically ignored blocked movements are automatically retried daily for technical reasons (context ⧉).

automatically_resolved class-attribute instance-attribute

automatically_resolved = 'automatically_resolved'

A self-healing blocked movement (whose status was pending_self_healing) that was resolved as part of its automatic self-healing process.

This is equivalent to resolved for regular pending blocked movements.

awaiting_user_response class-attribute instance-attribute

awaiting_user_response = 'awaiting_user_response'

Ops has contacted the admin or member about this Blocked Movement. It is considered a terminal status in terms of SLA computation (our Ops' job is done), but is not the final status for a blocked movement (see https://www.notion.so/alaninsurance/Blocked-Movements-246a87a20f7a4cd388aee8a6327c4148?pvs=4#11c0a501e8b24cbb9603fa7dd273c321 ⧉).

awaiting_user_response blocked movements are automatically retried daily.

awaiting_user_response_self_healing class-attribute instance-attribute

awaiting_user_response_self_healing = (
    "awaiting_user_response_self_healing"
)

A blocked movement that is 'awaiting_user_response', but whose resolution is expected to happen automatically - this is a "self-healing" blocked movement.

Typical use cases include blocked movements that are created and automatically sent to the admin for action, and if the admin doesn't act after a certain time, the blocked movement is automatically resolved or cancelled by a daily command.

They are NOT automatically retried. This gives full control over their lifecycle to local code.

cancelled class-attribute instance-attribute

cancelled = 'cancelled'

A blocked movement that was cancelled - it was not resolved, usually because no action was required.

pending class-attribute instance-attribute

pending = 'pending'

A blocked movement that requires Ops attention. This is the default for newly created blocked movements.

Pending blocked movements are automatically retried daily.

pending_self_healing class-attribute instance-attribute

pending_self_healing = 'pending_self_healing'

A blocked movement that is 'pending', but whose resolution is expected to happen automatically - this is a "self-healing" blocked movement.

Typical use cases include blocked movements that are created, but then resolved or cancelled by a daily command. By setting a blocked movement to have pending_self_healing status, it will not appear in the Blocked Movements tool by default - meaning it will not create noise for Ops.

Note that blocked movements are never created with this state directly - you must manually mark pending blocked movements as self-healing via the mark_pending_(core|upstream)_blocked_movement_as_self_healing function.

Self-healing blocked movements, i.e. those with status pending_self_healing, are NOT automatically retried. This gives full control over their lifecycle to local code.

resolved class-attribute instance-attribute

resolved = 'resolved'

A blocked movement that was resolved, either automatically (by a daily retry, or a retry triggered by an Ops) or manually (status manually set to 'resolved').

Note that just because a blocked movement is resolved doesn't mean that the situation at hand is solved. A blocked movement may be resolved because another error happened - so this blocked movement is 'resolved' in the sense that we no longer get this specific error.

CANCELLED_BLOCKED_MOVEMENT_STATUSES module-attribute

CANCELLED_BLOCKED_MOVEMENT_STATUSES = frozenset(
    {cancelled, automatically_cancelled}
)

An immutable set containing all of the BlockedMovementStatuses considered "cancelled", i.e. the blocked movement is not valid and/or there's nothing to process.

CountryCode

Bases: AlanBaseEnum

The country associated with something in the Employment Component.

Usually deduced from the source_type of what you're looking at.

be class-attribute instance-attribute

be = 'be'

ca class-attribute instance-attribute

ca = 'ca'

es class-attribute instance-attribute

es = 'es'

fr class-attribute instance-attribute

fr = 'fr'

from_app_name staticmethod

from_app_name(app_name)

Turns an AppName into a CountryCode - not compatible with all AppNames.

Source code in components/employment/public/enums.py
@staticmethod
def from_app_name(app_name: AppName) -> "CountryCode":
    """
    Turns an AppName into a CountryCode - not compatible with all AppNames.
    """
    match app_name:
        case AppName.ALAN_FR:
            return CountryCode.fr
        case AppName.ALAN_BE:
            return CountryCode.be
        case AppName.ALAN_ES:
            return CountryCode.es
        case AppName.ALAN_CA:
            return CountryCode.ca

    raise ValueError(f"Unknown AppName: {app_name}")

test class-attribute instance-attribute

test = 'test'

EmploymentChangeType

Bases: AlanBaseEnum

The type of change that occurred.

cancellation class-attribute instance-attribute

cancellation = 'cancellation'

An employment is cancelled, and code should act as if this employment had never existed.

end_date_change class-attribute instance-attribute

end_date_change = 'end_date_change'

A change in the end date of an employment.

external_employee_id_change class-attribute instance-attribute

external_employee_id_change = 'external_employee_id_change'

The external employee ID of an employee is changed.

invitation class-attribute instance-attribute

invitation = 'invitation'

An invitation: an employee joins a company.

Mutually exclusive with all other change types.

NB: An invitation can also have an end_date in case of short-term work contracts. ⚠️ In this case, you will not receive a termination change type

resumption class-attribute instance-attribute

resumption = 'resumption'

A termination is cancelled, i.e. the end_date of an employment is cleared.

start_date_change class-attribute instance-attribute

start_date_change = 'start_date_change'

A change in the start date of an employment.

termination class-attribute instance-attribute

termination = 'termination'

A termination: an employee is terminated (an end_date is set).

NB: In case an employee is invited with an end_date, only invitation is set.

transfer class-attribute instance-attribute

transfer = 'transfer'

An employee is transferred from one company to another. This is both a termination (in the previous company) and an invitation (in the new company). This movement is computed when we receive a new employment for an employee that already has an employment in another company of the same account. Notes: - An employee moving from one company to another company not part of the same account is not considered as a transfer, an invitation will be computed in the new company and a termination in the old company when we receive the information. This can potentially happen with a delay between the two actions - It is still possible to force a transfer between two companies that are not part of the same account by using the imperative function transfer

Mutually exclusive with all other change types.

uncancellation class-attribute instance-attribute

uncancellation = 'uncancellation'

An employment was cancelled by mistake and we want to undo the cancellation.

RuleCase

Bases: AlanBaseEnum

The different cases that can Source Rules rule over.

cancel class-attribute instance-attribute

cancel = 'cancel'

change_end_date class-attribute instance-attribute

change_end_date = 'change_end_date'

change_end_date_more_than_100_days_in_the_past class-attribute instance-attribute

change_end_date_more_than_100_days_in_the_past = (
    "change_end_date_more_than_100_days_in_the_past"
)

change_external_employee_id class-attribute instance-attribute

change_external_employee_id = 'change_external_employee_id'

change_external_employee_id_if_none class-attribute instance-attribute

change_external_employee_id_if_none = (
    "change_external_employee_id_if_none"
)

change_start_date class-attribute instance-attribute

change_start_date = 'change_start_date'

invite class-attribute instance-attribute

invite = 'invite'

resume class-attribute instance-attribute

resume = 'resume'

terminate class-attribute instance-attribute

terminate = 'terminate'

terminate_with_end_date_more_than_100_days_in_the_past class-attribute instance-attribute

terminate_with_end_date_more_than_100_days_in_the_past = (
    "terminate_with_end_date_more_than_100_days_in_the_past"
)

transfer class-attribute instance-attribute

transfer = 'transfer'

transfer_more_than_100_days_in_the_past class-attribute instance-attribute

transfer_more_than_100_days_in_the_past = (
    "transfer_more_than_100_days_in_the_past"
)

RuleCaseAction

Bases: AlanBaseEnum

An action to take when a rule is met.

apply class-attribute instance-attribute

apply = 'apply'

Apply as usual.

block class-attribute instance-attribute

block = 'block'

Block: the whole declaration is blocked (a core blocked movement is created).

ignore class-attribute instance-attribute

ignore = 'ignore'

Ignore: skip this rule.

  • If all movements are ignored, creates a core blocked movement with status automatically_ignored
  • If some movements are ignored, the ignored one are not processed while the other are processed as usual

components.employment.public.exceptions

ActionNotAllowed

ActionNotAllowed(not_allowed_rule_cases)

Bases: DeclarativeInputError

Raised when the source rules don't allow an action

Source code in components/employment/public/exceptions.py
def __init__(self, not_allowed_rule_cases: set[RuleCase]) -> None:
    message = (
        "Termination date is too far in the past."
        if not_allowed_rule_cases
        == {RuleCase.terminate_with_end_date_more_than_100_days_in_the_past}
        else (
            f"Employment change types {not_allowed_rule_cases} are not allowed by sources rules"
        )
    )
    super().__init__(message)
    self.not_allowed_rule_cases = not_allowed_rule_cases

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> "ActionNotAllowedContext":
    return ActionNotAllowedContext(
        error_message=str(self),
        employment_declaration=employment_declaration,
        not_allowed_rule_cases=self.not_allowed_rule_cases,
    )

error_code class-attribute instance-attribute

error_code = 'action_not_allowed'

not_allowed_rule_cases instance-attribute

not_allowed_rule_cases = not_allowed_rule_cases

AutomaticallyIgnoreConsumerError

AutomaticallyIgnoreConsumerError(message, consumer)

Bases: ConsumerError

A consumer error that is automatically ignored when created as a blocked movement.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    consumer: str,
) -> None:
    super().__init__(message)
    self.consumer = consumer

to_blocked_movement_info

to_blocked_movement_info(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def to_blocked_movement_info(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementInfo:
    regular_blocked_movement_info = super().to_blocked_movement_info(
        employment_declaration
    )
    return replace(
        regular_blocked_movement_info,
        target_status=BlockedMovementStatus.automatically_ignored,
    )

ConsumerError

ConsumerError(message, consumer)

Bases: DeclarativeInputError

Root class for errors raised within employment consumers that should lead to core blocked movements.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    consumer: str,
) -> None:
    super().__init__(message)
    self.consumer = consumer

consumer instance-attribute

consumer = consumer

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementContext:
    return DeclarativeBlockedMovementContext(
        consumer=self.consumer,
        error_message=str(self),
        employment_declaration=employment_declaration,
    )

error_code class-attribute instance-attribute

error_code = 'consumer_error'

UnexpectedConsumerError

UnexpectedConsumerError(
    message, consumer, error_message, traceback
)

Bases: ConsumerError

Error class used for unexpected errors raised from employment consumers.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    consumer: str,
    error_message: str,
    traceback: str,
) -> None:
    super().__init__(message, consumer)
    self.error_message = error_message
    self.traceback = traceback

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementContext:
    return DeclarativeBlockedMovementContext(
        consumer=self.consumer,
        error_message=str(self),
        error_type=self.__class__,
        traceback=self.traceback,
        employment_declaration=employment_declaration,
    )

error_code class-attribute instance-attribute

error_code = 'unexpected_consumer_error'

error_message instance-attribute

error_message = error_message

traceback instance-attribute

traceback = traceback

UnexpectedLegacyBackfillError

UnexpectedLegacyBackfillError(
    message, error_message, traceback
)

Bases: DeclarativeInputError

Error type for unexpected errors that occur during the legacy_backfill process.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    error_message: str,
    traceback: str,
) -> None:
    super().__init__(message)
    self.error_message = error_message
    self.traceback = traceback

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementContext:
    return DeclarativeBlockedMovementContext(
        error_message=str(self),
        error_type=self.__class__,
        traceback=self.traceback,
        employment_declaration=employment_declaration,
    )

error_code class-attribute instance-attribute

error_code = 'unexpected_legacy_backfill_error'

error_message instance-attribute

error_message = error_message

to_blocked_movement_info

to_blocked_movement_info(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def to_blocked_movement_info(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementInfo:
    regular_blocked_movement_info = super().to_blocked_movement_info(
        employment_declaration
    )
    return replace(
        regular_blocked_movement_info,
        source_type=GlobalSourceType.legacy_backfill,
    )

traceback instance-attribute

traceback = traceback

UnexpectedUpstreamError

UnexpectedUpstreamError(message, error_message, context)

Bases: UpstreamError

Error type used when an unexpected error occurs within an upstream (i.e. while preparing an Employment Declaration)

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    error_message: str,
    context: UnexpectedUpstreamErrorContext,
) -> None:
    super().__init__(message, context)
    self.error_message = error_message

error_code class-attribute instance-attribute

error_code = 'unexpected_upstream_error'

error_message instance-attribute

error_message = error_message

to_blocked_movement_info

to_blocked_movement_info()
Source code in components/employment/public/exceptions.py
@override
def to_blocked_movement_info(self) -> UpstreamBlockedMovementInfo:
    blocked_movement = UpstreamBlockedMovementInfo(
        error_code=self.error_code,
        error_message=self.error_message,
        user_id=None,
        company_id=None,
        context=self.context,
    )

    return blocked_movement

UnexpectedUpstreamErrorContext dataclass

UnexpectedUpstreamErrorContext(traceback)

Bases: UpstreamBlockedMovementContext

Context type for UnexpectedUpstreamError

traceback instance-attribute

traceback

UpstreamError

UpstreamError(message, context)

Bases: ABC, Exception

An error that occurs within the upstream phase, i.e. when preparing an Employment Declaration.

Source code in components/employment/public/exceptions.py
def __init__(self, message: str, context: UpstreamBlockedMovementContext) -> None:
    super().__init__(message)
    self.context = context

context instance-attribute

context = context

error_code class-attribute instance-attribute

error_code = 'upstream_error'

to_blocked_movement_info abstractmethod

to_blocked_movement_info()

Generate an UpstreamBlockedMovementInfo object that will be used for creating the blocked movement.

Source code in components/employment/public/exceptions.py
@abstractmethod
def to_blocked_movement_info(self) -> UpstreamBlockedMovementInfo:
    """
    Generate an UpstreamBlockedMovementInfo object that will be used for creating the blocked movement.
    """
    raise NotImplementedError

components.employment.public.source_type

SourceType module-attribute

SourceType = LocalSourceType

components.employment.public.utils

dump_results

ExtraDumpData dataclass

ExtraDumpData(help_text, data)

Extra information to output for upload_ingestion_results_as_csv

data instance-attribute
data
help_text instance-attribute
help_text

upload_ingestion_results_as_csv

upload_ingestion_results_as_csv(
    ingestion_results, extra_data=None
)

Utility function that uploads the ingestion provided ingestion results as CSVs to S3 and prints links to download the files in the logs.

The main use-case for this function is for outputting the results of dry-run processes.

Source code in components/employment/public/utils/dump_results.py
def upload_ingestion_results_as_csv(
    ingestion_results: list[IngestionResult],
    extra_data: list[ExtraDumpData] | None = None,
) -> None:
    """
    Utility function that uploads the ingestion provided ingestion results as CSVs to S3 and prints links to download
    the files in the logs.

    The main use-case for this function is for outputting the results of dry-run processes.
    """
    _upload_as_csv(
        "Employment Source Data created",
        [
            _FlatEmploymentSourceData(
                id=result.employment_source_data.id,
                source_type=result.employment_source_data.source_type,
                raw_data=result.employment_source_data.raw_data,
                metadata=result.employment_source_data.metadata,
                core_employment_versions_count=len(
                    result.employment_source_data.core_employment_versions
                ),
                extended_employment_updates_count=len(
                    result.employment_source_data.extended_employment_updates
                ),
                upstream_blocked_movements_count=len(
                    result.employment_source_data.upstream_blocked_movements
                ),
                core_blocked_movements_count=len(
                    result.employment_source_data.core_blocked_movements
                ),
            )
            for result in ingestion_results
        ],
    )
    _upload_as_csv(
        "Core Employment Versions created",
        list(
            flatten(
                result.employment_source_data.core_employment_versions
                for result in ingestion_results
            )
        ),
    )
    _upload_as_csv(
        "Extended Employment Updates created",
        list(
            flatten(
                result.employment_source_data.extended_employment_updates
                for result in ingestion_results
            )
        ),
    )
    _upload_as_csv(
        "Upstream Blocked Movements created",
        list(
            flatten(
                result.employment_source_data.upstream_blocked_movements
                for result in ingestion_results
            )
        ),
    )
    _upload_as_csv(
        "Core Blocked Movements created",
        list(
            flatten(
                result.employment_source_data.core_blocked_movements
                for result in ingestion_results
            )
        ),
    )

    if extra_data is not None:
        for extra_data_item in extra_data:
            _upload_as_csv(extra_data_item.help_text, extra_data_item.data)

user_matching

UserMatchingProcessorWithEmploymentException

Bases: UserMatchingProcessor[TSearchParams, TUser], Generic[TSearchParams, TUser]

A UserMatchingProcessor that raises exceptions that are compatible with the Employment Component's exception typing.

By inheriting from this class instead of the regular UserMatchingProcessor, your errors will show up as clean blocked movement instead of 'unexpected_upstream_error' blocked movements