Skip to content

Api reference

components.employment.public.api

cancel

cancel(
    employment_id,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Sets the employment to cancelled.

Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def cancel(
    employment_id: UUID,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Sets the employment to cancelled.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if latest_version_for_employment.is_cancelled:
            current_employment_logger.info(
                f"Didn't cancel Employment {employment_id} it is already cancelled",
                layer="imperative_input_validation",
            )
            return

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=[Cancellation()],
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

employment_component_context

employment_component_context(func)

A decorator to set a context variable indicating that an employment component function is running. This is useful to highlight updates of the legacy Employment table outside of the usage of the Employment Component which will help us finish the migration to the Employment Component.

Source code in components/employment/public/api.py
def employment_component_context(func: Callable[_P, _T]) -> Callable[_P, _T]:
    """
    A decorator to set a context variable indicating that an employment component function is running.
    This is useful to highlight updates of the legacy Employment table outside of the usage of the Employment Component
    which will help us finish the migration to the Employment Component.
    """

    def wrapper(
        *args: _P.args,
        **kwargs: _P.kwargs,
    ) -> _T:
        is_running_employment_component.set(True)
        try:
            return func(*args, **kwargs)
        finally:
            is_running_employment_component.set(False)

    return wrapper

ingest_employment_declaration

ingest_employment_declaration(
    employment_declaration,
    source_information,
    commit,
    event_bus_orchestrator=None,
    retried_blocked_movement=None,
    upstream_retried_blocked_movement=None,
    source_rules_override=None,
)

Ingests the given employment declarations, notifies the consumers, and executes side effects.

Source code in components/employment/public/api.py
@employment_component_context
def ingest_employment_declaration(
    employment_declaration: EmploymentDeclaration[_TValues],
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
    retried_blocked_movement: CoreBlockedMovementModel | None = None,
    upstream_retried_blocked_movement: (UpstreamBlockedMovementModel | None) = None,
    source_rules_override: SourceRules | None = None,
) -> IngestionResult:
    """
    Ingests the given employment declarations, notifies the consumers, and executes side effects.
    """
    current_employment_logger.info(
        "Ingesting employment declaration",
        layer="ingest_employment_declaration",
        clear_attributes={
            "source_type": str(employment_declaration.source_type),
            "company_id": employment_declaration.company_id,
            "user_id": employment_declaration.user_id,
        },
    )

    # TODO @matthieu.stombellini temporary to retry blocked movements created by COM-3782
    if employment_declaration.external_employee_id == "":
        employment_declaration = replace(
            employment_declaration, external_employee_id=None
        )

    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        with (
            CoreBlockedMovementCreator(
                employment_declaration=employment_declaration,
                source_information=source_information,
                unexpected_error_class=UnexpectedEmploymentError,
                commit=commit,
                retried_blocked_movement=retried_blocked_movement,
                upstream_retried_blocked_movement=upstream_retried_blocked_movement,
                source_rules_override=source_rules_override,
            ) as blocked_movement_manager,
            bound_contextvars(
                user_id=employment_declaration.user_id,
                company_id=employment_declaration.company_id,
            ),
        ):
            employment_source_data_id = get_or_create_employment_source_data_model(
                employment_declaration.source_type.base_source_type,
                source_information,
            )

            change = create_employment_changes_for_declaration(
                employment_declaration,
                employment_source_data_id=employment_source_data_id,
                source_rules_override=source_rules_override,
            )

            current_session.flush()

            if change is not None:
                notify_consumers_with_blocked_movements(
                    change=change,
                    blocked_movement_manager=blocked_movement_manager,
                    event_bus_orchestrator=event_bus,
                    source_type=employment_declaration.source_type,
                )
        if not blocked_movement_manager.created_blocked_movement:
            commit_and_execute_side_effects()
            # note: in case a blocked movement was created, the BM creator would have already committed

    return IngestionResult(
        success=not blocked_movement_manager.created_blocked_movement,
        employment_source_data=blocked_movement_manager.employment_source_data
        or EmploymentSourceData.from_employment_source_data_model(
            get_or_raise_missing_resource(
                EmploymentSourceDataModel, employment_source_data_id
            )
        ),
        blocked_movement=blocked_movement_manager.blocked_movement,
    )

ingest_employment_declaration_without_blocked_movement

ingest_employment_declaration_without_blocked_movement(
    employment_declaration,
    source_information,
    commit,
    event_bus_orchestrator=None,
    source_rules_override=None,
)

Ingests the given employment declarations, notifies the consumers, and executes side effects.

Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def ingest_employment_declaration_without_blocked_movement(
    employment_declaration: EmploymentDeclaration[_TValues],
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
    source_rules_override: SourceRules | None = None,
) -> IngestionResult:
    """
    Ingests the given employment declarations, notifies the consumers, and executes side effects.
    """
    current_employment_logger.info(
        "Ingesting employment declaration without blocked movements",
        layer="ingest_employment_declaration",
        clear_attributes={
            "source_type": str(employment_declaration.source_type),
            "company_id": employment_declaration.company_id,
            "user_id": employment_declaration.user_id,
        },
    )

    with (
        bound_contextvars(
            user_id=employment_declaration.user_id,
            company_id=employment_declaration.company_id,
        ),
        use_orchestrator_or_create_multi_event_bus(
            event_bus_orchestrator, commit=commit
        ) as (event_bus, commit_and_execute_side_effects),
    ):
        employment_source_data_id = get_or_create_employment_source_data_model(
            employment_declaration.source_type.base_source_type,
            source_information,
        )

        change = create_employment_changes_for_declaration(
            employment_declaration,
            employment_source_data_id=employment_source_data_id,
            source_rules_override=source_rules_override,
        )

        current_session.flush()

        if change is not None:
            notify_consumers_without_blocked_movements(
                change=change,
                event_bus_orchestrator=event_bus,
                source_type=employment_declaration.source_type,
            )
        commit_and_execute_side_effects()

    return IngestionResult(
        success=True,
        employment_source_data=EmploymentSourceData.from_employment_source_data_model(
            get_or_raise_missing_resource(
                EmploymentSourceDataModel, employment_source_data_id
            )
        ),
        blocked_movement=None,
    )

is_running_employment_component module-attribute

is_running_employment_component = ContextVar(
    "is_running_employment_component", default=False
)

resume

resume(
    employment_id,
    source_type,
    source_information,
    commit,
    extended_information=None,
    event_bus_orchestrator=None,
)

Resumes an ended employment.

Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def resume(
    employment_id: UUID,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    extended_information: list[ExtendedInformation[_TValues]] | None = None,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Resumes an ended employment.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if not latest_version_for_employment.end_date:
            current_employment_logger.info(
                f"Didn't resume Employment {employment_id} as it is not ended",
                layer="imperative_input_validation",
            )
            return

        payloads: list[EmploymentChangePayload] = [Resumption()]
        if extended_information:
            payloads.append(
                ExtendedEmploymentValueChange(extended_information=extended_information)
            )

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=payloads,
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

set_extended_employment_values

set_extended_employment_values(
    employment_id,
    values,
    validity_period,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Create a new ExtendedEmploymentUpdate with the given values for the employment with the given id.

The update will contain only the values passed as argument, the other values will remain unchanged. Doc: https://www.notion.so/alaninsurance/Definitions-employments-sources-etc-1301426e8be780cc9796ee31f49559e5?pvs=4#1301426e8be78052977ff4fbb932c8f7 ⧉

Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def set_extended_employment_values(
    employment_id: UUID,
    values: _TValues,
    validity_period: ValidityPeriod | None,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Create a new ExtendedEmploymentUpdate with the given values for the employment with the given id.

    The update will contain only the values passed as argument, the other values will remain unchanged.
    Doc: https://www.notion.so/alaninsurance/Definitions-employments-sources-etc-1301426e8be780cc9796ee31f49559e5?pvs=4#1301426e8be78052977ff4fbb932c8f7
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        # We don't 'mandatory(...)' the result here as it's possible that, if all the values were already registered,
        # nothing happens.
        change = create_employment_changes(
            employment_change_payloads=[
                ExtendedEmploymentValueChange(
                    extended_information=[
                        ExtendedInformation(
                            values=values, validity_period=validity_period
                        )
                    ]
                )
            ],
            initial_core_employment_version=latest_version_for_employment,
            employment_source_data_id=employment_source_data_id,
            source_type=source_type,
        )

        if change is not None:
            notify_consumers_without_blocked_movements(
                change=change,
                source_type=source_type,
                event_bus_orchestrator=event_bus,
            )
        commit_and_execute_side_effects()

set_external_employee_id

set_external_employee_id(
    employment_id,
    new_external_employee_id,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Updates the external employee id of the employment with the given id. WARNING: This function does not check if the external employee id is valid according to local country rules. Please do that check upstream.

Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def set_external_employee_id(
    employment_id: UUID,
    new_external_employee_id: str | None,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Updates the external employee id of the employment with the given id.
    WARNING: This function does not check if the external employee id is valid according to local country rules. Please do that check upstream.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )

        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )
        check_external_employee_id_does_not_collide_with_existing_overlapping_employment(
            external_employee_id=new_external_employee_id,
            user_id=latest_version_for_employment.user_id,
            company_id=latest_version_for_employment.company_id,
            start_date=latest_version_for_employment.start_date,
            end_date=latest_version_for_employment.end_date,
        )

        if (
            latest_version_for_employment.external_employee_id
            == new_external_employee_id
        ):
            current_employment_logger.info(
                f"Can't update external employee id, Employment {employment_id} already has external employee id {new_external_employee_id}",
                layer="imperative_input_validation",
            )
        else:
            change = mandatory(
                create_employment_changes(
                    employment_change_payloads=[
                        ExternalEmployeeIdChange(
                            external_employee_id=new_external_employee_id
                        )
                    ],
                    initial_core_employment_version=latest_version_for_employment,
                    employment_source_data_id=employment_source_data_id,
                    source_type=source_type,
                )
            )

            notify_consumers_without_blocked_movements(
                change=change,
                event_bus_orchestrator=event_bus,
                source_type=source_type,
            )

        commit_and_execute_side_effects()

terminate_or_update_termination

terminate_or_update_termination(
    employment_id,
    end_date,
    extended_information,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)
Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def terminate_or_update_termination(  # noqa: D103
    employment_id: UUID,
    end_date: date,
    extended_information: list[ExtendedInformation[_TValues]],
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )
        current_end_date = latest_version_for_employment.end_date
        payloads: list[EmploymentChangePayload] = [
            ExtendedEmploymentValueChange(extended_information=extended_information)
        ]
        if not current_end_date:
            payloads.append(Termination(end_date=end_date))
        elif current_end_date != end_date:
            payloads.append(EndDateChange(end_date=end_date))

        change = create_employment_changes(
            employment_change_payloads=payloads,
            initial_core_employment_version=latest_version_for_employment,
            employment_source_data_id=employment_source_data_id,
            source_type=source_type,
        )

        if not change:
            current_employment_logger.info(
                f"Can't update termination, Employment {employment_id} already has the requested end date {end_date}",
                layer="imperative_input_validation",
            )
            return

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

transfer

transfer(
    current_employment_id,
    new_employment_declaration,
    source_information,
    commit,
    event_bus_orchestrator=None,
)
Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def transfer(  # noqa: D103
    current_employment_id: UUID,
    new_employment_declaration: EmploymentDeclaration[_TValues],
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> UUID:
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            new_employment_declaration.source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            current_employment_id
        )

        if (
            latest_version_for_employment.company_id
            == new_employment_declaration.company_id
        ):
            raise CannotTransferToSameCompanyError(
                f"Cannot transfer: employment {latest_version_for_employment.employment_id} is already in company {new_employment_declaration.company_id}"
            )

        check_external_employee_id_does_not_collide_with_existing_overlapping_employment(
            external_employee_id=new_employment_declaration.external_employee_id,
            user_id=new_employment_declaration.user_id,
            company_id=new_employment_declaration.company_id,
            start_date=new_employment_declaration.start_date,
            end_date=new_employment_declaration.end_date,
        )

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=[Transfer(new_employment_declaration)],
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=new_employment_declaration.source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=new_employment_declaration.source_type,
            event_bus_orchestrator=event_bus,
        )

        commit_and_execute_side_effects()

        return change.core_employment_version.employment_id

uncancel

uncancel(
    employment_id,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)

Undo the cancellation of the employment.

Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def uncancel(
    employment_id: UUID,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    """
    Undo the cancellation of the employment.
    """
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if not latest_version_for_employment.is_cancelled:
            current_employment_logger.info(
                f"Didn't uncancel Employment {employment_id} it is not cancelled",
                layer="imperative_input_validation",
            )
            return

        change = mandatory(
            create_employment_changes(
                employment_change_payloads=[Uncancellation()],
                initial_core_employment_version=latest_version_for_employment,
                employment_source_data_id=employment_source_data_id,
                source_type=source_type,
            )
        )

        notify_consumers_without_blocked_movements(
            change=change,
            source_type=source_type,
            event_bus_orchestrator=event_bus,
        )
        commit_and_execute_side_effects()

update_start_date

update_start_date(
    employment_id,
    new_start_date,
    source_type,
    source_information,
    commit,
    event_bus_orchestrator=None,
)
Source code in components/employment/public/api.py
@deprecated("Use 'employment_session()' pattern instead of calling this directly")
@employment_component_context
def update_start_date(  # noqa: D103
    employment_id: UUID,
    new_start_date: date,
    source_type: LocalizedSourceType,
    source_information: SourceInformationLike,
    commit: bool,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> None:
    with use_orchestrator_or_create_multi_event_bus(
        event_bus_orchestrator, commit=commit
    ) as (event_bus, commit_and_execute_side_effects):
        employment_source_data_id = get_or_create_employment_source_data_model(
            source_type.base_source_type, source_information
        )
        latest_version_for_employment = get_latest_version_for_employment_or_raise(
            employment_id
        )

        if latest_version_for_employment.start_date == new_start_date:
            current_employment_logger.info(
                f"Can't update start date, Employment {employment_id} already has start date {new_start_date}",
                layer="imperative_input_validation",
            )
        else:
            change = mandatory(
                create_employment_changes(
                    employment_change_payloads=[
                        StartDateChange(start_date=new_start_date)
                    ],
                    initial_core_employment_version=latest_version_for_employment,
                    employment_source_data_id=employment_source_data_id,
                    source_type=source_type,
                )
            )

            notify_consumers_without_blocked_movements(
                change=change,
                event_bus_orchestrator=event_bus,
                source_type=source_type,
            )

        commit_and_execute_side_effects()

components.employment.public.api_v2

EmploymentApiSession

Bases: EventBusOrchestrator

This is the primary API facade to use the Employment Component's write API.

You can obtain an object of this type by using the employment_session utility.

Note that most of the functions available here are idempotent.

Implementation note
  • This is an abstract base class that implements the bridge between the employment_session and the "raw" API.
  • Implementations only need to implement the plumbing functions (such as EventBus management).

applied_events abstractmethod property

applied_events

You can use this property in order to retrieve all of the events that were applied (via the EventBus mechanism) as part of the Employment Session.

This value is only set AFTER the employment_session block is exited, and will error out if you try to call it beforehand.

cancel

cancel(employment_id)

Cancels the provided employment.

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def cancel(self, employment_id: UUID) -> None:
    """
    Cancels the provided employment.

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    cancel(employment_id, **self._generate_call_args())

defer_on_commit

defer_on_commit(func)

Defers a function call until after the transaction handled by this employment session is committed.

Note that the function is NOT executed if the employment session is in dry-run mode.

This is syntactic sugar over the EventBus mechanism.

Source code in components/employment/public/api_v2.py
def defer_on_commit(self, func: Callable[[], None]) -> None:
    """
    Defers a function call until after the transaction handled by this employment session is committed.

    Note that the function is NOT executed if the employment session is in dry-run mode.

    This is syntactic sugar over the EventBus mechanism.
    """
    event_bus = self.wrap(BasicEventBus())
    event_bus.publish(lambda commit: func() if commit else None)

derive_for_legacy_backfill abstractmethod

derive_for_legacy_backfill()

Copy this EmploymentApiSession, but set up the source_type for legacy backfill.

The main change is that, when using the returned EmploymentApiSession, (most) local consumers will not be notified when a change happens.

For example, if your EmploymentApiSession has the source_type fr_admin_dashboard, this would result in a GlobalSourceType.legacy_backfill.with_country(CountryCode.fr) source_type being used.

Source code in components/employment/public/api_v2.py
@abstractmethod
def derive_for_legacy_backfill(self) -> "EmploymentApiSession":
    """
    Copy this EmploymentApiSession, but set up the source_type for legacy backfill.

    The main change is that, when using the returned EmploymentApiSession, (most) local consumers will not be
    notified when a change happens.

    For example, if your EmploymentApiSession has the source_type `fr_admin_dashboard`, this would result in a
    `GlobalSourceType.legacy_backfill.with_country(CountryCode.fr)` source_type being used.
    """
    ...

find_similar_source_data_and_mark_as_duplicate

find_similar_source_data_and_mark_as_duplicate(
    user_identifier_field_name,
    company_identifier_field_name,
    start_date_field_name,
    field_names_to_compare,
)

Entrypoint for the Source Deduplication feature (docs ⧉) Looks at the latest EmploymentSourceData with the same source type and identifier fields.

If the values are the same as raw_data_to_compare for all fields in field_names_to_compare, it updates the source's metadata with last_received_at=utcnow() and returns its ID. Otherwise, it returns None.

Note: if an identifier field is not present in raw_data_to_compare, it will only consider EmploymentSourceData that do not have this field in their raw_data.

param field_names_to_compare: list of fields from raw_data that will need to be equal to be considered as similar param raw_data_to_compare: new raw data dict for which we want to look for existing similar data

Source code in components/employment/public/api_v2.py
def find_similar_source_data_and_mark_as_duplicate(
    self,
    user_identifier_field_name: str,
    company_identifier_field_name: str,
    start_date_field_name: str,
    field_names_to_compare: list[str],
) -> UUID | None:
    """
    Entrypoint for the Source Deduplication feature ([docs](https://www.notion.so/alaninsurance/Source-Deduplication-37060a0d1c884c1b970aa6b2a4600ea4?source=copy_link))
    Looks at the latest EmploymentSourceData with the same source type and identifier fields.

    If the values are the same as `raw_data_to_compare` for all fields in `field_names_to_compare`,
    it updates the source's metadata with `last_received_at=utcnow() and returns its ID.`
    Otherwise, it returns None.

    Note: if an identifier field is not present in `raw_data_to_compare`,
    it will only consider EmploymentSourceData that do not have this field in their raw_data.

    param field_names_to_compare: list of fields from `raw_data` that will need to be equal to be considered as similar
    param raw_data_to_compare: new raw data dict for which we want to look for existing similar data
    """
    call_args = self._generate_call_args()
    return find_similar_source_data_and_mark_as_duplicate(
        user_identifier_field_name=user_identifier_field_name,
        company_identifier_field_name=company_identifier_field_name,
        start_date_field_name=start_date_field_name,
        field_names_to_compare=field_names_to_compare,
        source_type=call_args["source_type"],
        raw_data_to_compare=get_raw_data_from_source_information_like(
            call_args["source_information"]
        ),
        commit=False,
    )

ingest_or_block

ingest_or_block(
    upstream, data, *, source_rules_override=None
)

Ingest the provided data using the provided upstream. Errors will result in a blocked movement that will appear in local tooling.

See the documentation for Upstream for more details.

This function returns an UpstreamResult which you can use to inspect what happened - see the docstrings for the various "URXxx" classes for details.

You can use the source_rules_override parameter to override the SourceRules that would apply to the provided source_type.

Source code in components/employment/public/api_v2.py
def ingest_or_block(
    self,
    upstream: "Upstream[_TInput, _TValues]",
    data: "_TInput",
    *,
    source_rules_override: SourceRules | None = None,
) -> "UpstreamResult":
    """
    Ingest the provided data using the provided upstream. Errors will result in a blocked movement that will appear
    in local tooling.

    See the documentation for `Upstream` for more details.

    This function returns an `UpstreamResult` which you can use to inspect what happened - see the docstrings
    for the various "URXxx" classes for details.

    You can use the `source_rules_override` parameter to override the `SourceRules` that would apply to the provided
    source_type.
    """
    return upstream.ingest_data(
        data,
        self,
        source_rules_override=source_rules_override,
    )

ingest_or_raise

ingest_or_raise(
    employment_declaration, source_rules_override=None
)

Ingest the provided EmploymentDeclaration, without any blocked movements management. Errors will be raised as regular Python exceptions.

Notes: - this function is fairly advanced - in most cases, you should primarily use the Upstream API and resort to this function if required. - the source_type of the EmploymentDeclaration must match that of this EmploymentApiSession.

Parameters:

Name Type Description Default
employment_declaration EmploymentDeclaration[_TValues]

the EmploymentDeclaration to ingest.

required
source_rules_override SourceRules | None

you can use this parameter to override the SourceRules defined in sources_configuration.py

None
Source code in components/employment/public/api_v2.py
def ingest_or_raise(
    self,
    employment_declaration: EmploymentDeclaration[_TValues],
    source_rules_override: SourceRules | None = None,
) -> None:
    """
    Ingest the provided EmploymentDeclaration, without any blocked movements management. Errors will be raised as
    regular Python exceptions.

    Notes:
    - this function is fairly advanced - in most cases, you should primarily use the Upstream API and resort to
    this function if required.
    - the source_type of the EmploymentDeclaration must match that of this EmploymentApiSession.

    Args:
        employment_declaration: the EmploymentDeclaration to ingest.
        source_rules_override: you can use this parameter to override the `SourceRules` defined in `sources_configuration.py`
    """
    call_args = self._generate_call_args()

    if employment_declaration.source_type != call_args["source_type"]:
        raise ValueError(
            f"EmploymentDeclaration object is of the wrong type (expected {call_args['source_type']}, got {employment_declaration.source_type})"
        )

    ingest_employment_declaration_without_blocked_movement(
        employment_declaration,
        # Cannot use **call_args here because it contains a source_type, but the ingest function uses the source
        # type of the EmploymentDeclaration
        event_bus_orchestrator=call_args["event_bus_orchestrator"],
        source_information=call_args["source_information"],
        commit=call_args["commit"],
        source_rules_override=source_rules_override,
    )

resume

resume(employment_id, extended_information=None)

Resume (aka "unterminate") the provided employment.

You may optionally provided extended information that will be set on the employment.

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def resume(
    self,
    employment_id: UUID,
    extended_information: list[ExtendedInformation[_TValues]] | None = None,
) -> None:
    """
    Resume (aka "unterminate") the provided employment.

    You may optionally provided extended information that will be set on the employment.

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    resume(
        employment_id=employment_id,
        extended_information=extended_information,
        **self._generate_call_args(),
    )

set_extended_employment_values

set_extended_employment_values(
    employment_id, values, validity_period
)

Set extended values on the provided employment.

To learn more about extended values, see here ⧉

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def set_extended_employment_values(
    self,
    employment_id: UUID,
    values: _TValues,
    validity_period: ValidityPeriod | None,
) -> None:
    """
    Set extended values on the provided employment.

    To learn more about extended values, see [here](https://www.notion.so/alaninsurance/Definitions-employments-sources-etc-1301426e8be780cc9796ee31f49559e5?source=copy_link#1301426e8be78052977ff4fbb932c8f7)

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    set_extended_employment_values(
        employment_id, values, validity_period, **self._generate_call_args()
    )

set_external_employee_id

set_external_employee_id(
    employment_id, new_external_employee_id
)

Set the external employee ID (aka matricule, payroll ID, etc.) of the provided employment to the value.

External employee IDs must be unique within a company for the provided user (employments for different users with the same external employee ID may not overlap).

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def set_external_employee_id(
    self, employment_id: UUID, new_external_employee_id: str | None
) -> None:
    """
    Set the external employee ID (aka matricule, payroll ID, etc.) of the provided employment to the value.

    External employee IDs must be unique within a company for the provided user (employments for different users
    with the same external employee ID may not overlap).

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    set_external_employee_id(
        employment_id=employment_id,
        new_external_employee_id=new_external_employee_id,
        **self._generate_call_args(),
    )

terminate_or_update_termination

terminate_or_update_termination(
    employment_id, end_date, extended_information=None
)

Terminate or update the end date of the provided employment.

  • If no end_date is currently set, terminates the employment
  • If an end_date is already set, update the end date

If you want to remove the existing end_date, you'll need to use the 'resume' function instead.

You can optionally provide extended values that will be stored in the employment.

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def terminate_or_update_termination(
    self,
    employment_id: UUID,
    end_date: date,
    extended_information: list[ExtendedInformation[_TValues]] | None = None,
) -> None:
    """
    Terminate or update the end date of the provided employment.

    - If no end_date is currently set, terminates the employment
    - If an end_date is already set, update the end date

    If you want to *remove* the existing end_date, you'll need to use the 'resume' function instead.

    You can optionally provide extended values that will be stored in the employment.

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    terminate_or_update_termination(
        employment_id,
        end_date,
        extended_information=extended_information or [],
        **self._generate_call_args(),
    )

transfer

transfer(current_employment_id, new_employment_declaration)

Transfer an existing employment to a new company.

  • The existing employment will be terminated (or its end date will be updated) if the new start date is after the existing employment's start date.
  • The existing employment will be cancelled if the new start date is on or before the existing employment's start date.

The new employment will be initialized using the data from the new_employment_declaration. The source_type of the new_employment_declaration must match that of this EmploymentApiSession.

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def transfer(
    self,
    current_employment_id: UUID,
    new_employment_declaration: EmploymentDeclaration[_TValues],
) -> None:
    """
    Transfer an existing employment to a new company.

    - The existing employment will be terminated (or its end date will be updated) if the new start date is after
      the existing employment's start date.
    - The existing employment will be cancelled if the new start date is on or before the existing employment's
      start date.

    The new employment will be initialized using the data from the new_employment_declaration. The source_type of
    the new_employment_declaration must match that of this EmploymentApiSession.

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    call_args = self._generate_call_args()

    if new_employment_declaration.source_type != call_args["source_type"]:
        raise ValueError(
            f"EmploymentDeclaration object is of the wrong type (expected {call_args['source_type']}, got {new_employment_declaration.source_type})"
        )

    transfer(
        current_employment_id=current_employment_id,
        new_employment_declaration=new_employment_declaration,
        # Cannot use **call_args here because it contains a source_type, but the ingest function uses the source
        # type of the EmploymentDeclaration
        event_bus_orchestrator=call_args["event_bus_orchestrator"],
        source_information=call_args["source_information"],
        commit=call_args["commit"],
    )

uncancel

uncancel(employment_id)

"Uncancel" the provided employment.

Use with care: "uncancellation" can be generally quite error-prone in local code, e.g. if multiple cancelled policies match a cancelled employment, you may not know which one to uncancel. You can use the metadata field of the employment_session to provide data that can be used to resolve situations like this.

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def uncancel(self, employment_id: UUID) -> None:
    """
    "Uncancel" the provided employment.

    Use with care: "uncancellation" can be generally quite error-prone in local code, e.g. if multiple cancelled
    policies match a cancelled employment, you may not know which one to uncancel. You can use the `metadata` field
    of the `employment_session` to provide data that can be used to resolve situations like this.

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    uncancel(employment_id=employment_id, **self._generate_call_args())

update_start_date

update_start_date(employment_id, new_start_date)

Update the start date of the provided employment.

Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment, EsEmployment, etc.)

Source code in components/employment/public/api_v2.py
def update_start_date(self, employment_id: UUID, new_start_date: date) -> None:
    """
    Update the start date of the provided employment.

    Note that the employment_id must be that of the Employment Component's model, NOT the local model (Employment,
    EsEmployment, etc.)
    """
    update_start_date(employment_id, new_start_date, **self._generate_call_args())

with_nested_orchestrator abstractmethod

with_nested_orchestrator()

Copy this EmploymentApiSession, in such a way that events published within this block are:

  • Kept if the block completes successfully
  • "Rolled back" if the block fails with an exception

For example:

with employment_session(...) as employment_api:
    a = employment_api.wrap(BasicEventBus())
    a.publish(lambda commit: print("A")

    with employment_api.with_nested_orchestrator() as employment_api_nested:
        b = employment_api_nested.wrap(BasicEventBus())
        b.publish(lambda commit: print("B"))
    # Block completes normally, events are pushed to the employment_api normally

    try:
        with employment_api.with_nested_orchestrator() as employment_api_nested:
            c = employment_api_nested.wrap(BasicEventBus())
            c.publish(lambda commit: print("C"))
            raise ValueError("Oops")
            # Block completes with an exception, events are NOT pushed to employment_api
    except:
        pass

# Printed => A B, but not C

Source code in components/employment/public/api_v2.py
@abstractmethod
def with_nested_orchestrator(
    self,
) -> AbstractContextManager["EmploymentApiSession"]:
    """
    Copy this EmploymentApiSession, in such a way that events published within this block are:

    - Kept if the block completes successfully
    - "Rolled back" if the block fails with an exception


    For example:
    ```python
    with employment_session(...) as employment_api:
        a = employment_api.wrap(BasicEventBus())
        a.publish(lambda commit: print("A")

        with employment_api.with_nested_orchestrator() as employment_api_nested:
            b = employment_api_nested.wrap(BasicEventBus())
            b.publish(lambda commit: print("B"))
        # Block completes normally, events are pushed to the employment_api normally

        try:
            with employment_api.with_nested_orchestrator() as employment_api_nested:
                c = employment_api_nested.wrap(BasicEventBus())
                c.publish(lambda commit: print("C"))
                raise ValueError("Oops")
                # Block completes with an exception, events are NOT pushed to employment_api
        except:
            pass

    # Printed => A B, but not C
    ```
    """

SessionDerivedCallArgs

Bases: TypedDict

Internal implementation detail of EmploymentApiSession

commit instance-attribute

commit

event_bus_orchestrator instance-attribute

event_bus_orchestrator

source_information instance-attribute

source_information

source_type instance-attribute

source_type

employment_session

employment_session(
    source_type, *, dry_run, raw_data, metadata=None
)

This is the primary entrypoint to the Employment Component's Write API.

  • This context manager represents an "Employment Component transaction", and will handle things like commit, event buses, etc.
  • It yields an EmploymentApiSession object you can use to:
  • Use the write API
  • Add additional side-effects to be run after the commit (via the EventBus API)

Here's an example:

with employment_session(
    source_type=SourceType.fr_admin_dashboard,
    dry_run=False,
    raw_data=params,  # or whichever "raw data" arguments are being used
    metadata={"endpoint": "admin_dashboard.cancel_employment"}
) as employment_api:
    employment_api.cancel(employment_id=...)
Transaction management and side-effects

employment_session takes care of the entire SQLAlchemy transaction and side-effects (i.e. "post-commit-stuff") execution.

  • If dry_run is set to False, the transaction is committed at the end of the with employment_session block and side-effects are executed
  • If dry_run is set to True, the transaction will be flushed and side-effects will be run in dry run mode (= most of them will do nothing)
    • Note that you are expected to rollback the transaction at some point - DO NOT commit it, since side-effects, such as emails and triggering RQ jobs, will not be executed.

Side-effects can be registered by leveraging the EventBus API, since the employment_api object implements EventBusOrchestrator. For example, to trigger a callable:

with employment_session(dry_run=False, ...) as employment_api:
    employment_api.defer_on_commit(lambda: current_logger.info("I'm a side effect!"))
    current_logger.info("Foobar")
    # => Foobar
# (db commit is issued since dry_run=False)
# => I'm a side effect

Note that side-effects deferred in this manner will only be executed if dry_run=False. For more advanced use cases, and if you need to support dry-runnable side-effects, you can instead use the EventBusOrchestrator API:

with employment_session(dry_run=True, ...) as employment_api:
    simple_event_bus = employment_api.wrap(BasicEventBus())
    simple_event_bus.publish(lambda commit: current_logger.info("It's for real" if commit else "It's dry-run"))
    current_logger.info("Foobar")
    # => Foobar
# (db transaction is flushed but not committed since dry_run=True)
# => It's dry-run

The EventBus API also has a compatibility layer for shared.side_effects side-effects, see SharedSideEffectsEventBus for details.

Caveats
  • ⚠️ You can only either fully dry run the process OR fully commit and execute side effects. It is not possible to use this in a "do not commit yet, I'll commit later" scenario.
  • "raw_data"/"metadata"/"source_type" are persisted into an EmploymentSourceData row if and only if you call an Employment Component API.
Arguments
  • source_type: the source from which your action comes from.
  • dry_run: if False, transaction will be committed and side-effects will be executed at the end of the block. if True, the block is run in "dry run" mode (you MUST NOT commit what was created as part of the dry run mode). In dry-run mode:
    • Model changes are not persisted to the DB (but are still kept in the transaction if you need to inspect them).
  • raw_data: this is the "raw data" behind your action. This can correspond to an Excel row, a params dictionary - it depends on what your use case is. This data will end up logged in an EmploymentSourceData row for later use. This value is immutable once persisted.
  • metadata: this is the metadata attached to your action. This will generally contain identifiers, actor_ids, etc. This value is optional and will result in an empty dictionary if not specified. For example, if processing a file that is recorded in the DB, you would:
    • Use one employment_session per row
    • Write the file's database ID in metadata
    • And the row's content in raw_data
Source code in components/employment/public/api_v2.py
@contextmanager
def employment_session(
    source_type: SourceType
    | LocalizedSourceType,  # useless typing: SourceType implements LocalizedSourceType, but it's to make the SourceType enum discoverable
    *,
    dry_run: bool,
    raw_data: dict[str, Any],
    metadata: dict[str, Any] | None = None,
) -> Iterator["EmploymentApiSession"]:
    """
    This is the primary entrypoint to the Employment Component's Write API.

    - This context manager represents an "Employment Component transaction", and will handle things like commit, event buses, etc.
    - It yields an EmploymentApiSession object you can use to:
      - Use the write API
      - Add additional side-effects to be run after the commit (via the `EventBus` API)

    Here's an example:

    ```python
    with employment_session(
        source_type=SourceType.fr_admin_dashboard,
        dry_run=False,
        raw_data=params,  # or whichever "raw data" arguments are being used
        metadata={"endpoint": "admin_dashboard.cancel_employment"}
    ) as employment_api:
        employment_api.cancel(employment_id=...)
    ```

    ### Transaction management and side-effects

    `employment_session` takes care of the entire SQLAlchemy transaction and side-effects (i.e. "post-commit-stuff")
    execution.

    - If dry_run is set to `False`, the transaction is committed at the end of the `with employment_session` block and side-effects are executed
    - If dry_run is set to `True`, the transaction will be flushed and side-effects will be run *in dry run mode* (= most of them will do nothing)
        - Note that you are expected to **rollback** the transaction at some point - DO NOT commit it, since side-effects, such as emails and triggering RQ jobs, will not be executed.

    Side-effects can be registered by leveraging the EventBus API, since the `employment_api` object implements `EventBusOrchestrator`. For example, to trigger a callable:

    ```python
    with employment_session(dry_run=False, ...) as employment_api:
        employment_api.defer_on_commit(lambda: current_logger.info("I'm a side effect!"))
        current_logger.info("Foobar")
        # => Foobar
    # (db commit is issued since dry_run=False)
    # => I'm a side effect
    ```

    Note that side-effects deferred in this manner will only be executed if `dry_run=False`. For more advanced use cases, and if you need to support dry-runnable side-effects, you can instead use the `EventBusOrchestrator` API:

    ```python
    with employment_session(dry_run=True, ...) as employment_api:
        simple_event_bus = employment_api.wrap(BasicEventBus())
        simple_event_bus.publish(lambda commit: current_logger.info("It's for real" if commit else "It's dry-run"))
        current_logger.info("Foobar")
        # => Foobar
    # (db transaction is flushed but not committed since dry_run=True)
    # => It's dry-run
    ```

    The EventBus API also has a compatibility layer for `shared.side_effects` side-effects, see `SharedSideEffectsEventBus` for details.

    ### Caveats

    -  ⚠️ You can only either fully dry run the process OR fully commit and execute side effects. It is not possible to use this in a "do not commit yet, I'll commit later" scenario.
    - "raw_data"/"metadata"/"source_type" are persisted into an EmploymentSourceData row if and only if you call an Employment Component API.

    ### Arguments

    - source_type: the source from which your action comes from.
    - dry_run: if False, transaction will be committed and side-effects will be executed at the end of the block. if True, the block is run in "dry run" mode (you MUST NOT commit what was created as part of the dry run mode). In dry-run mode:
        - Model changes are not persisted to the DB (but are still kept in the transaction if you need to inspect them).
    - raw_data: this is the "raw data" behind your action. This can correspond to an Excel row, a `params` dictionary - it depends on what your use case is. This data will end up logged in an `EmploymentSourceData` row for later use. This value is immutable once persisted.
    - metadata: this is the metadata attached to your action. This will generally contain identifiers, `actor_id`s, etc. This value is optional and will result in an empty dictionary if not specified.
        For example, if processing a file that is recorded in the DB, you would:
        - Use one `employment_session` per row
        - Write the file's database ID in `metadata`
        - And the row's content in `raw_data`
    """
    from components.employment.internal.api_v2_impl import EmploymentApiSessionImpl

    source_information = SourceInformation(
        raw_data=raw_data.copy(),
        metadata=(metadata.copy() if metadata is not None else {}),
        source_type=source_type.base_source_type,
    )

    main_event_bus = MultiEventBus()

    api_session = EmploymentApiSessionImpl(
        source_information, source_type, main_event_bus
    )
    yield api_session

    if dry_run:
        current_session.flush()
    else:
        current_session.commit()

    events = main_event_bus.apply_and_get_named_applied_events(commit=not dry_run)
    api_session._record_applied_events(events)  # noqa: ALN027 # intentional to hide this bit from the public API

components.employment.public.boomerang_companies

Boomerang company logic - framing: https://github.com/alan-eu/Topics/discussions/32109?sort=old#discussioncomment-15825299 ⧉.

A "boomerang company" is one that churned (ended all contracts) and later re-signed after a gap: - step A: when contracts are terminated -> do nothing - step B: when a new contract is signed -> terminate "stale" employments - step C: when a boomerang companies re-invite employments for which we terminated employments -> we resume them, and consumers must properly implement this type of resumption

get_boomerang_end_date_for_company

get_boomerang_end_date_for_company(
    company_id, app_name, new_contract_id
)

Return the end date of the most recent contract if the company is a boomerang, else None.

A company is a boomerang if it has no active contracts (other than the new one) but has previous ended contracts. Returns None if the company has active contracts (no gap), has never had a contract, or the new contract is not yet visible in the subscriptions API.

Occupational Health (OH) subscriptions must be checked separately from health insurance subscriptions. OH contracts are fully unknown from that part of contracting so they are invisible to get_subscriptions_for_companies_by_app.

Parameters:

Name Type Description Default
company_id str

ID of the company to check.

required
app_name AppName

country app (FR, BE, ES, CA).

required
new_contract_id str

ID of the newly signed contract; used to find its start date (the reference date) and exclude it from the boomerang check.

required
Source code in components/employment/public/boomerang_companies.py
def get_boomerang_end_date_for_company(
    company_id: str,
    app_name: AppName,
    new_contract_id: str,
) -> date | None:
    """Return the end date of the most recent contract if the company is a boomerang, else None.

    A company is a boomerang if it has no active contracts (other than the new one) but
    has previous ended contracts.
    Returns None if the company has active contracts (no
    gap), has never had a contract, or the new contract is not yet visible in the
    subscriptions API.

    Occupational Health (OH) subscriptions must be checked separately from health
    insurance subscriptions.  OH contracts are fully unknown from that part of contracting
    so they are invisible to `get_subscriptions_for_companies_by_app`.

    Args:
        company_id: ID of the company to check.
        app_name: country app (FR, BE, ES, CA).
        new_contract_id: ID of the newly signed contract; used to find its start date
            (the reference date) and exclude it from the boomerang check.
    """
    all_subscriptions = get_country_gateway(
        CountryCode.from_app_name(app_name)
    ).get_subscriptions_for_company(company_id)

    new_subscription = next(
        (sub for sub in all_subscriptions if sub.id == new_contract_id), None
    )
    if new_subscription is None:
        return None

    reference_date = new_subscription.start_date
    # Exclude co-signed contracts (same start_date, e.g. cadres + non-cadres signed
    # simultaneously). This also excludes the new contract itself. Pre-existing contracts
    # starting on a different date — including future ones — are kept and can still block
    # boomerang detection.
    other_subscriptions = [
        sub for sub in all_subscriptions if sub.start_date != reference_date
    ]

    active_subscriptions = [
        sub
        for sub in other_subscriptions
        if not (sub.end_date and sub.end_date < reference_date)
    ]
    if active_subscriptions:
        return None

    ended_subscriptions = [
        sub
        for sub in other_subscriptions
        if sub.end_date and sub.end_date < reference_date
    ]
    if not ended_subscriptions:
        return None

    return max(mandatory(sub.end_date) for sub in ended_subscriptions)

terminate_all_open_employments_for_company

terminate_all_open_employments_for_company(
    company_id,
    end_date,
    country_code,
    dry_run,
    context=None,
)

Terminate all employments active on end_date for a company.

Handles two cases: - open employments (end_date IS NULL): terminated at end_date - employments terminated too late (end_date > end_date): updated to end_date

The boomerang_company source type allows Step C to later identify these terminations and bypass the usual resume=ignore rules on re-invitation.

Source code in components/employment/public/boomerang_companies.py
def terminate_all_open_employments_for_company(
    company_id: str,
    end_date: date,
    country_code: CountryCode,
    dry_run: bool,
    context: dict[str, str] | None = None,
) -> None:
    """Terminate all employments active on end_date for a company.

    Handles two cases:
    - open employments (end_date IS NULL): terminated at ``end_date``
    - employments terminated too late (end_date > ``end_date``): updated to ``end_date``

    The ``boomerang_company`` source type allows Step C to later identify these
    terminations and bypass the usual resume=ignore rules on re-invitation.
    """
    from components.employment.public.api_v2 import employment_session

    source_type = GlobalSourceType.boomerang_company.with_country(country_code)
    versions_to_terminate = (
        CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.company_id == company_id,
            CoreEmploymentVersionModel.is_cancelled.is_(False),
            or_(
                CoreEmploymentVersionModel.end_date.is_(None),
                CoreEmploymentVersionModel.end_date > end_date,
            ),
        )
        .all()
    )
    with employment_session(
        source_type=source_type,
        dry_run=dry_run,
        raw_data={"end_date": str(end_date)},
        metadata=context,
    ) as employment_api:
        for version in versions_to_terminate:
            employment_api.terminate_or_update_termination(
                employment_id=version.employment_id,
                end_date=end_date,
            )

components.employment.public.business_logic

actions

ai_analysis

launch_blocked_movement_ai_agent
launch_blocked_movement_ai_agent(
    blocked_movement_id,
    agent_branch=None,
    wait_for_completion=False,
    tool_mocks=None,
    bypass_tool_review=False,
)

Launch AI agent conversation for a blocked movement.

Source code in components/employment/public/business_logic/actions/ai_analysis.py
def launch_blocked_movement_ai_agent(
    blocked_movement_id: UUID,
    agent_branch: str | None = None,
    wait_for_completion: bool = False,
    tool_mocks: dict[str, dict[str, str | bool]] | None = None,
    bypass_tool_review: bool = False,
) -> ConversationResponse:
    """Launch AI agent conversation for a blocked movement."""
    from components.employment.internal.business_logic.actions.ai_analysis import (
        launch_blocked_movement_ai_agent as _launch_blocked_movement_ai_agent,
    )

    return _launch_blocked_movement_ai_agent(
        blocked_movement_id=blocked_movement_id,
        agent_branch=agent_branch,
        wait_for_completion=wait_for_completion,
        tool_mocks=tool_mocks,
        bypass_tool_review=bypass_tool_review,
    )

blocked_movement

amend_core_blocked_movement_note
amend_core_blocked_movement_note(
    core_blocked_movement_id, new_note, commit=True
)

Appends the provided note to the end of the core blocked movement's current note.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def amend_core_blocked_movement_note(
    core_blocked_movement_id: UUID, new_note: str, commit: bool = True
) -> None:
    """
    Appends the provided note to the end of the core blocked movement's current note.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    core_blocked_movement.note = (
        f"{core_blocked_movement.note}\n{new_note}"
        if core_blocked_movement.note
        else new_note
    )
    if commit:
        current_session.commit()
amend_upstream_blocked_movement_note
amend_upstream_blocked_movement_note(
    upstream_blocked_movement_id, new_note, commit=True
)

Appends the provided note to the end of the upstream blocked movement's current note.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def amend_upstream_blocked_movement_note(
    upstream_blocked_movement_id: UUID, new_note: str, commit: bool = True
) -> None:
    """
    Appends the provided note to the end of the upstream blocked movement's current note.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    upstream_blocked_movement.note = (
        f"{upstream_blocked_movement.note}\n{new_note}"
        if upstream_blocked_movement.note
        else new_note
    )
    if commit:
        current_session.commit()
automatically_cancel_pending_core_blocked_movement
automatically_cancel_pending_core_blocked_movement(
    core_blocked_movement_id, commit
)

Automatically cancel a core blocked movement.

This should only be used to automatically cancel a blocked movement immediately after its creation.

Otherwise: - If the cancellation is a result of a manual, human action, use manually_cancel_core_blocked_movement instead. - If the cancellation is part of an async/automated process, use mark_...as_self_healing as soon as possible, then cancel_self_healing..._blocked_movement

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def automatically_cancel_pending_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Automatically cancel a core blocked movement.

    This should only be used to automatically cancel a blocked movement immediately after its creation.

    Otherwise:
    - If the cancellation is a result of a manual, human action, use manually_cancel_core_blocked_movement instead.
    - If the cancellation is part of an async/automated process, use mark_..._as_self_healing as soon as possible, then
      cancel_self_healing_..._blocked_movement
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
automatically_cancel_pending_upstream_blocked_movement
automatically_cancel_pending_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Automatically cancel an upstream blocked movement.

This should only be used to automatically cancel a blocked movement immediately after its creation.

Otherwise: - If the cancellation is a result of a manual, human action, use manually_cancel_upstream_blocked_movement instead. - If the cancellation is part of an async/automated process, use mark_...as_self_healing as soon as possible, then cancel_self_healing..._blocked_movement

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def automatically_cancel_pending_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Automatically cancel an upstream blocked movement.

    This should only be used to automatically cancel a blocked movement immediately after its creation.

    Otherwise:
    - If the cancellation is a result of a manual, human action, use manually_cancel_upstream_blocked_movement instead.
    - If the cancellation is part of an async/automated process, use mark_..._as_self_healing as soon as possible, then
      cancel_self_healing_..._blocked_movement
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
cancel_pending_blocked_movement
cancel_pending_blocked_movement(
    blocked_movement_id, commit
)

Cancels a pending blocked movement by its ID. It'll manage indifferently if it's an upstream or core blocked movement.

If blocked movement is not found or is not in the 'pending' status, an error will be raised

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def cancel_pending_blocked_movement(blocked_movement_id: UUID, commit: bool) -> None:
    """
    Cancels a pending blocked movement by its ID.
    It'll manage indifferently if it's an upstream or core blocked movement.

    If blocked movement is not found or is not in the 'pending' status,
    an error will be raised
    """
    blocked_movement = get_upstream_or_core_blocked_movement_or_none(
        blocked_movement_id
    )
    if not blocked_movement:
        raise BaseErrorCode.missing_resource(
            f"No blocked movement found with ID {blocked_movement_id}"
        )

    if isinstance(blocked_movement, CoreBlockedMovement):
        manually_cancel_core_blocked_movement(blocked_movement_id, commit=commit)
    elif isinstance(blocked_movement, UpstreamBlockedMovement):
        manually_cancel_upstream_blocked_movement(blocked_movement_id, commit=commit)
    else:
        raise ValueError(f"Unsupported blocked movement type: {type(blocked_movement)}")
cancel_self_healing_core_blocked_movement
cancel_self_healing_core_blocked_movement(
    core_blocked_movement_id, commit
)

Cancels a core blocked movement which you previously took ownership of via mark_pending_core_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def cancel_self_healing_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancels a core blocked movement which you previously took ownership of via
    `mark_pending_core_blocked_movement_as_self_healing`.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={
            BlockedMovementStatus.pending_self_healing,
            BlockedMovementStatus.awaiting_user_response_self_healing,
        },
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
cancel_self_healing_upstream_blocked_movement
cancel_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Cancels an upstream blocked movement which you previously took ownership of via mark_pending_upstream_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def cancel_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancels an upstream blocked movement which you previously took ownership of via
    `mark_pending_upstream_blocked_movement_as_self_healing`.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending_self_healing},
        to_status=BlockedMovementStatus.automatically_cancelled,
        commit=commit,
    )
manually_cancel_core_blocked_movement
manually_cancel_core_blocked_movement(
    core_blocked_movement_id, commit
)

Cancel a core blocked movement as a result of a manual, human action. The blocked movement may be pending, awaiting_user_response, or awaiting_user_response_self_healing.

To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self healing as early as possible (via mark_pending_core_blocked_movement_as_self_healing), then cancel it using cancel_self_healing_core_blocked_movement when needed.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
@enqueueable
def manually_cancel_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancel a core blocked movement as a result of a manual, human action. The blocked movement may be pending, awaiting_user_response, or awaiting_user_response_self_healing.

    To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self
    healing as early as possible (via `mark_pending_core_blocked_movement_as_self_healing`), then cancel it
    using `cancel_self_healing_core_blocked_movement` when needed.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={
            BlockedMovementStatus.pending,
            BlockedMovementStatus.awaiting_user_response,
            BlockedMovementStatus.awaiting_user_response_self_healing,
        },
        to_status=BlockedMovementStatus.cancelled,
        commit=commit,
    )
manually_cancel_upstream_blocked_movement
manually_cancel_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Cancel an upstream blocked movement as a result of a manual, human action. The blocked movement may be pending, awaiting_user_response, or awaiting_user_response_self_healing.

To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self healing as early as possible (via mark_pending_upstream_blocked_movement_as_self_healing), then cancel it using cancel_self_healing_upstream_blocked_movement when needed.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def manually_cancel_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Cancel an upstream blocked movement as a result of a manual, human action. The blocked movement may be pending, awaiting_user_response, or awaiting_user_response_self_healing.

    To automatically cancel a blocked movement as a result of automated action, mark the blocked movement as self
    healing as early as possible (via `mark_pending_upstream_blocked_movement_as_self_healing`), then cancel it
    using `cancel_self_healing_upstream_blocked_movement` when needed.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={
            BlockedMovementStatus.pending,
            BlockedMovementStatus.awaiting_user_response,
            BlockedMovementStatus.awaiting_user_response_self_healing,
        },
        to_status=BlockedMovementStatus.cancelled,
        commit=commit,
    )
mark_pending_core_blocked_movement_as_self_healing
mark_pending_core_blocked_movement_as_self_healing(
    core_blocked_movement_id, commit
)

Mark a core blocked movement as self healing. By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_core_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_core_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_core_blocked_movement_as_self_healing(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a core blocked movement as self healing. By using this, you signal to the Employment Component that a
    custom local process will take care of the blocked movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_core_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_core_blocked_movement` to resolve it (`automatically_resolved`)
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
mark_pending_or_awaiting_user_response_core_blocked_movement_as_self_healing
mark_pending_or_awaiting_user_response_core_blocked_movement_as_self_healing(
    core_blocked_movement_id, commit
)

Mark a core blocked movement as self healing. This is a variant of mark_pending_core_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as "awaiting_user_response" by Ops. Typical use cases include:

  • Automatically moving the to a self-healing resolution process
  • An Ops taking a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing resolution process.

By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_core_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_core_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_or_awaiting_user_response_core_blocked_movement_as_self_healing(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a core blocked movement as self healing. This is a variant of
    mark_pending_core_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as
    "awaiting_user_response" by Ops. Typical use cases include:

    - Automatically moving the to a self-healing resolution process
    - An Ops taking a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing
      resolution process.

    By using this, you signal to the Employment Component that a custom local process will take care of the blocked
    movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_core_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_core_blocked_movement` to resolve it (`automatically_resolved`)
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={
            BlockedMovementStatus.awaiting_user_response,
            BlockedMovementStatus.pending,
        },
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
mark_pending_or_awaiting_user_response_upstream_blocked_movement_as_self_healing
mark_pending_or_awaiting_user_response_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id, commit
)

Mark a upstream blocked movement as self healing. This is a variant of mark_pending_upstream_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as "awaiting_user_response" by Ops. Typical use cases include:

  • Automatically moving the to a self-healing resolution process
  • An Ops contacting admin or member about a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing resolution process.

By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_upstream_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_upstream_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_or_awaiting_user_response_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a upstream blocked movement as self healing. This is a variant of
    mark_pending_upstream_blocked_movement_as_self_healing you can use in case the blocked movement may have been marked as
    "awaiting_user_response" by Ops. Typical use cases include:

    - Automatically moving the to a self-healing resolution process
    - An Ops contacting admin or member about a blocked movement (marking it as awaiting_user_response) then manually putting it back to a self-healing
      resolution process.

    By using this, you signal to the Employment Component that a custom local process will take care of the blocked
    movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_upstream_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_upstream_blocked_movement` to resolve it (`automatically_resolved`)
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )

    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={
            BlockedMovementStatus.awaiting_user_response,
            BlockedMovementStatus.pending,
        },
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
mark_pending_self_healing_core_blocked_movement_as_awaiting_user_response_self_healing
mark_pending_self_healing_core_blocked_movement_as_awaiting_user_response_self_healing(
    core_blocked_movement_id, commit
)

Mark a core blocked movement you previously took ownership of via mark_pending_core_blocked_movement_as_self_healing as awaiting user response, but still under the "self healing" process.

This is useful if you need to contact the user/admin about the blocked movement, but you still want to keep ownership of the blocked movement.

awaiting_user_response_self_healing is considered as a terminal status in SLA computation.

By using this, you signal to the Employment Component that a custom local process still takes care of the blocked movement. Typically, this blocked movement will not be automatically retried, and won't be displayed by default in the Ops tool.

To "end" a blocked movement you took ownership of, use: - cancel_self_healing_core_blocked_movement to cancel it (automatically_cancelled) - resolve_self_healing_core_blocked_movement to resolve it (automatically_resolved)

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_self_healing_core_blocked_movement_as_awaiting_user_response_self_healing(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark a core blocked movement you previously took ownership of via
    `mark_pending_core_blocked_movement_as_self_healing` as awaiting user response,
    but still under the "self healing" process.

    This is useful if you need to contact the user/admin about the blocked movement, but you still want to keep
    ownership of the blocked movement.

    `awaiting_user_response_self_healing` is considered as a terminal status in SLA computation.

    By using this, you signal to the Employment Component that a custom local process still takes care of the blocked movement.
    Typically, this blocked movement will not be automatically retried, and won't be displayed by default in the Ops tool.

    To "end" a blocked movement you took ownership of, use:
    - `cancel_self_healing_core_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_core_blocked_movement` to resolve it (`automatically_resolved`)
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={
            BlockedMovementStatus.pending_self_healing,
        },
        to_status=BlockedMovementStatus.awaiting_user_response_self_healing,
        commit=commit,
    )
mark_pending_upstream_blocked_movement_as_self_healing
mark_pending_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id, commit
)

Mark an upstream blocked movement as self healing. By using this, you signal to the Employment Component that a custom local process will take care of the blocked movement from now on.

To "end" a blocked movement you take ownership of via this function, use:

  • cancel_self_healing_upstream_blocked_movement to cancel it (automatically_cancelled)
  • resolve_self_healing_upstream_blocked_movement to resolve it (automatically_resolved)
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def mark_pending_upstream_blocked_movement_as_self_healing(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Mark an upstream blocked movement as self healing. By using this, you signal to the Employment Component that a
    custom local process will take care of the blocked movement from now on.

    To "end" a blocked movement you take ownership of via this function, use:

    - `cancel_self_healing_upstream_blocked_movement` to cancel it (`automatically_cancelled`)
    - `resolve_self_healing_upstream_blocked_movement` to resolve it (`automatically_resolved`)
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending},
        to_status=BlockedMovementStatus.pending_self_healing,
        commit=commit,
    )
resolve_self_healing_core_blocked_movement
resolve_self_healing_core_blocked_movement(
    core_blocked_movement_id, commit
)

Resolves a core blocked movement which you previously took ownership of via mark_pending_core_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def resolve_self_healing_core_blocked_movement(
    core_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Resolves a core blocked movement which you previously took ownership of via
    `mark_pending_core_blocked_movement_as_self_healing`.
    """
    core_blocked_movement = get_or_raise_missing_resource(
        CoreBlockedMovementModel, core_blocked_movement_id
    )
    _transition_blocked_movement_status(
        core_blocked_movement,
        from_status={BlockedMovementStatus.pending_self_healing},
        to_status=BlockedMovementStatus.automatically_resolved,
        commit=commit,
    )
resolve_self_healing_upstream_blocked_movement
resolve_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id, commit
)

Resolves an upstream blocked movement which you previously took ownership of via mark_pending_upstream_blocked_movement_as_self_healing.

Source code in components/employment/public/business_logic/actions/blocked_movement.py
def resolve_self_healing_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID, commit: bool
) -> None:
    """
    Resolves an upstream blocked movement which you previously took ownership of via
    `mark_pending_upstream_blocked_movement_as_self_healing`.
    """
    upstream_blocked_movement = get_or_raise_missing_resource(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )
    _transition_blocked_movement_status(
        upstream_blocked_movement,
        from_status={BlockedMovementStatus.pending_self_healing},
        to_status=BlockedMovementStatus.automatically_resolved,
        commit=commit,
    )
retry_core_blocked_movement
retry_core_blocked_movement(
    blocked_movement_id,
    *,
    employment_declaration_override=None,
    source_rules_override=None,
    commit=False,
    event_bus_orchestrator=None
)

Retry a core blocked movement

Retries a core blocked movement, using the provided overrides.

Parameters:

Name Type Description Default
blocked_movement_id UUID

The ID of the core blocked movement to retry

required
employment_declaration_override EmploymentDeclaration | None

If set, will be used instead of the blocked movement's declaration. Defaults to None.

None
source_rules_override SourceRules | None

If set, will be used instead of the blocked movement's original source type's source rules and persisted for future retries. Defaults to None.

None
commit bool

True will commit (incl. executing side effects), False will dry-run. Defaults to False.

False
event_bus_orchestrator EventBusOrchestrator

If set, events will be published to this orchestrator,

None
Source code in components/employment/public/business_logic/actions/blocked_movement.py
@enqueueable
def retry_core_blocked_movement(
    blocked_movement_id: UUID,
    *,
    employment_declaration_override: EmploymentDeclaration[Any] | None = None,
    source_rules_override: SourceRules | None = None,
    commit: bool = False,
    event_bus_orchestrator: EventBusOrchestrator | None = None,
) -> bool:
    """Retry a core blocked movement

    Retries a core blocked movement, using the provided overrides.

    Args:
        blocked_movement_id (UUID): The ID of the core blocked movement to retry
        employment_declaration_override (EmploymentDeclaration | None, optional): If set, will be used instead of the blocked movement's declaration. Defaults to None.
        source_rules_override (SourceRules | None, optional): If set, will be used instead of the blocked movement's original source type's source rules and persisted for future retries. Defaults to None.
        commit (bool, optional): True will commit (incl. executing side effects), False will dry-run. Defaults to False.
        event_bus_orchestrator (EventBusOrchestrator, optional): If set, events will be published to this orchestrator,
        otherwise they will be executed immediately (in dry run or not, depending on the commit parameter)
    """
    blocked_movement: CoreBlockedMovementModel | None = current_session.get(
        BlockedMovementModelBroker.model, blocked_movement_id
    )
    if blocked_movement is None:
        current_employment_logger.error(
            f"Will skip retrying core blocked movement {blocked_movement_id}: blocked movement not found",
            layer="retry_blocked_movements",
            clear_attributes={"upstream_blocked_movement_id": str(blocked_movement_id)},
        )
        raise BaseErrorCode.missing_resource(
            description=f"No enrollment for User {blocked_movement_id}."
        )

    if employment_declaration_override:
        employment_declaration_to_retry = employment_declaration_override
        if not blocked_movement.input_override:
            blocked_movement.input_override = []
        blocked_movement.input_override.append(
            InputOverride(
                retried_at=datetime.utcnow(),
                employment_declaration=employment_declaration_to_retry,
            ).to_dict()
        )
    else:
        employment_declaration_to_retry = EmploymentDeclaration.from_dict(
            blocked_movement.context.get("employment_declaration_to_retry")
            or blocked_movement.context["employment_declaration"]
        )
    if source_rules_override:
        blocked_movement.source_rules_override = source_rules_override  # type: ignore[assignment]
    else:
        source_rules_override = (
            SourceRules.from_dict(blocked_movement.source_rules_override)
            if blocked_movement.source_rules_override
            else None
        )

    retry_function = get_retry_function(employment_declaration_to_retry)

    result = retry_function(
        employment_declaration_to_retry,
        source_information=blocked_movement.employment_source_data_id,
        commit=commit,
        retried_blocked_movement=blocked_movement,
        source_rules_override=source_rules_override,
        event_bus_orchestrator=event_bus_orchestrator,
    )

    return result.success
retry_upstream_blocked_movement
retry_upstream_blocked_movement(
    upstream_blocked_movement_id,
    *,
    upstream_data_override=None,
    source_rules_override=None,
    commit=False
)

Retries an upstream blocked movement

Parameters:

Name Type Description Default
upstream_blocked_movement_id UUID

The ID of the blocked movement to retry

required
upstream_data_override Optional[dict[str, Any]]

If set, values present in this dict will override those present in the upstream blocked movement's raw_data. Defaults to None.

None
source_rules_override SourceRules | None

If set, will be used instead of the blocked movement's original source type's source rules and persisted for future retries. Defaults to None.

None
commit bool

True will commit (incl. executing side effects), False will dry-run. Defaults to False.

False
Source code in components/employment/public/business_logic/actions/blocked_movement.py
def retry_upstream_blocked_movement(
    upstream_blocked_movement_id: UUID,
    *,
    upstream_data_override: Optional[dict[str, Any]] = None,
    source_rules_override: SourceRules | None = None,
    commit: bool = False,
) -> bool:
    """Retries an upstream blocked movement

    Args:
        upstream_blocked_movement_id (UUID): The ID of the blocked movement to retry
        upstream_data_override (Optional[dict[str, Any]], optional): If set, values present in this dict will override those present in the upstream blocked movement's `raw_data`. Defaults to None.
        source_rules_override (SourceRules | None, optional): If set, will be used instead of the blocked movement's original source type's source rules and persisted for future retries. Defaults to None.
        commit (bool, optional): True will commit (incl. executing side effects), False will dry-run. Defaults to False.
    """
    from components.employment.public.business_logic.actions.upstream_blocked_movement_creator import (
        UpstreamBlockedMovementCreator,
    )

    blocked_movement: UpstreamBlockedMovementModel | None = current_session.get(
        UpstreamBlockedMovementModelBroker.model, upstream_blocked_movement_id
    )
    if blocked_movement is None:
        current_employment_logger.error(
            f"Will skip retrying upstream blocked movement {upstream_blocked_movement_id}: blocked movement not found",
            layer="retry_blocked_movements",
            clear_attributes={
                "upstream_blocked_movement_id": str(upstream_blocked_movement_id)
            },
        )
        return False

    data_to_retry = blocked_movement.upstream_data

    if upstream_data_override:
        data_to_retry = {
            **data_to_retry,
            **upstream_data_override,
        }
        blocked_movement.input_override = upstream_data_override  # type: ignore[assignment]

    if source_rules_override:
        blocked_movement.source_rules_override = source_rules_override  # type: ignore[assignment]
    else:
        source_rules_override = (
            SourceRules.from_dict(blocked_movement.source_rules_override)
            if blocked_movement.source_rules_override
            else None
        )

    source_type = LocalSourceType(blocked_movement.source_type)
    upstream_retry_handler = get_upstream_retry_handler(source_type)

    if not upstream_retry_handler:
        current_employment_logger.error(
            f"Will skip retrying upstream blocked movement {blocked_movement.id}: {blocked_movement.source_type} does not have an upstream retry handler",
            layer="retry_blocked_movements",
            clear_attributes={
                "upstream_blocked_movement_id": str(blocked_movement.id),
                "source": blocked_movement.source_type,
            },
        )
        return False
    else:
        main_event_bus = MultiEventBus()
        employment_declaration: EmploymentDeclaration[Any] | None = None
        no_ingestion_required = False
        with UpstreamBlockedMovementCreator(
            commit=commit,
            source=SourceType(blocked_movement.source_type),
            upstream_data=data_to_retry,
            upstream_retried_blocked_movement=blocked_movement,
            source_information=blocked_movement.employment_source_data_id,
            source_rules_override=source_rules_override,
        ):
            employment_declaration = upstream_retry_handler(
                upstream_data=data_to_retry,
                root_employment_source_data_id=blocked_movement.employment_source_data_id,
                event_bus_orchestrator=main_event_bus,
            )
            if employment_declaration is None:
                # This case is OK: no EmploymentDeclaration needed to be made
                no_ingestion_required = True
        if employment_declaration:
            retry_function = get_retry_function(employment_declaration)

            result = retry_function(
                employment_declaration,
                source_information=blocked_movement.employment_source_data_id,
                commit=commit,
                upstream_retried_blocked_movement=blocked_movement,
                event_bus_orchestrator=main_event_bus,
                source_rules_override=source_rules_override,
            )

            if result.success:
                main_event_bus.apply(commit=commit)

            return result.success
        elif no_ingestion_required:
            main_event_bus.apply(commit=commit)
            return True
        else:
            return False

delete_user

delete_user_in_employment_component
delete_user_in_employment_component(user_id, deleter)

Support function for the "delete user" procedure. Delete all employments (core and extended) that target user and related data (blocked movements and source data).

Note that this will NOT be notified to consumers.

Source code in components/employment/public/business_logic/actions/delete_user.py
def delete_user_in_employment_component(
    user_id: UserId,
    deleter: "Deleter",
) -> None:
    """
    Support function for the "delete user" procedure.
    Delete all employments (core and extended) that target user and related data (blocked movements and source data).

    Note that this will NOT be notified to consumers.
    """
    employment_source_data = (
        current_session.query(EmploymentSourceDataModel)  # noqa: ALN085
        .outerjoin(
            EmploymentSourceDataModel.core_employment_versions,
        )
        .outerjoin(
            EmploymentSourceDataModel.core_blocked_movements,
        )
        .outerjoin(
            EmploymentSourceDataModel.upstream_blocked_movements,
        )
        .filter(
            or_(
                CoreEmploymentVersionModel.user_id == user_id,
                CoreBlockedMovementModel.user_id == user_id,
                UpstreamBlockedMovementModel.user_id == user_id,
            )
        )
        .all()
    )

    core_employments = list(
        flatten([source.core_employment_versions for source in employment_source_data])
    )
    latest_core_employment_versions = (
        current_session.query(LatestCoreEmploymentVersion)  # noqa: ALN085
        .filter(
            LatestCoreEmploymentVersion.employment_id.in_(
                {core_employment.employment_id for core_employment in core_employments}
            )
        )
        .all()
    )
    extended_employments = (
        current_session.query(ExtendedEmploymentUpdateModel)  # noqa: ALN085
        .filter(
            ExtendedEmploymentUpdateModel.employment_id.in_(
                {core_employment.employment_id for core_employment in core_employments}
            )
        )
        .all()
    )
    employment_source_data += [
        extended_employment.employment_source_data  # type: ignore[misc]
        for extended_employment in extended_employments
    ]
    core_blocked_movements = list(
        flatten([source.core_blocked_movements for source in employment_source_data])
    )
    upstream_blocked_movements = list(
        flatten(
            [source.upstream_blocked_movements for source in employment_source_data]
        )
    )
    deleter.delete_objects(
        latest_core_employment_versions
        + core_employments
        + extended_employments
        + core_blocked_movements
        + upstream_blocked_movements
        + employment_source_data  # type: ignore[operator]
    )

employment_source_data

find_similar_source_data_and_mark_as_duplicate
find_similar_source_data_and_mark_as_duplicate(
    source_type,
    user_identifier_field_name,
    company_identifier_field_name,
    start_date_field_name,
    field_names_to_compare,
    raw_data_to_compare,
    commit,
)

Looks at the latest EmploymentSourceData with the same source type and identifier fields. If the values are the same as raw_data_to_compare for all fields in field_names_to_compare, it updates the source's metadata with last_received_at=utcnow() and returns its ID. Otherwise, it returns None.

Note: if an identifier field is not present in raw_data_to_compare, it will only consider EmploymentSourceData that do not have this field in their raw_data.

param field_names_to_compare: list of fields from raw_data that will need to be equal to be considered as similar param raw_data_to_compare: new raw data dict for which we want to look for existing similar data

Source code in components/employment/public/business_logic/actions/employment_source_data.py
def find_similar_source_data_and_mark_as_duplicate(
    source_type: LocalizedSourceType,
    user_identifier_field_name: str,
    company_identifier_field_name: str,
    start_date_field_name: str,
    field_names_to_compare: list[str],
    raw_data_to_compare: dict[str, Any],
    commit: bool,
) -> UUID | None:
    """
    Looks at the latest EmploymentSourceData with the same source type and identifier fields.
    If the values are the same as `raw_data_to_compare` for all fields in `field_names_to_compare`,
    it updates the source's metadata with `last_received_at=utcnow() and returns its ID.`
    Otherwise, it returns None.

    Note: if an identifier field is not present in `raw_data_to_compare`,
    it will only consider EmploymentSourceData that do not have this field in their raw_data.

    param field_names_to_compare: list of fields from `raw_data` that will need to be equal to be considered as similar
    param raw_data_to_compare: new raw data dict for which we want to look for existing similar data
    """
    base_filter = EmploymentSourceDataModel.source_type == source_type.base_source_type
    identifiers = {}
    if user_identifier_field_name in raw_data_to_compare:
        identifiers[user_identifier_field_name] = raw_data_to_compare[
            user_identifier_field_name
        ]
    else:
        base_filter = and_(
            base_filter,
            not_(
                EmploymentSourceDataModel.source_metadata.has_key(
                    user_identifier_field_name
                )
            ),
        )
    if company_identifier_field_name in raw_data_to_compare:
        identifiers[company_identifier_field_name] = raw_data_to_compare[
            company_identifier_field_name
        ]
    else:
        base_filter = and_(
            base_filter,
            not_(
                EmploymentSourceDataModel.source_metadata.has_key(
                    company_identifier_field_name
                )
            ),
        )
    if start_date_field_name in raw_data_to_compare:
        identifiers[start_date_field_name] = raw_data_to_compare[start_date_field_name]
    else:
        base_filter = and_(
            base_filter,
            not_(
                EmploymentSourceDataModel.source_metadata.has_key(start_date_field_name)
            ),
        )
    if identifiers:
        base_filter = and_(
            base_filter, EmploymentSourceDataModel.raw_data.contains(identifiers)
        )

    latest_data_for_user_company: EmploymentSourceDataModel | None = (
        current_session.query(EmploymentSourceDataModel)  # noqa: ALN085
        .filter(base_filter)
        .order_by(EmploymentSourceDataModel.created_at.desc())
        .first()
    )

    if latest_data_for_user_company is None:
        return None

    for field in field_names_to_compare:
        if str(
            latest_data_for_user_company.raw_data.get(field, "field_not_in_data")
        ) != str(raw_data_to_compare.get(field, "field_not_in_data")):
            return None

    update_employment_source_data_metadata_with(
        employment_source_data_id=latest_data_for_user_company.id,
        update_dict={"last_received_at": utcnow()},
        commit=commit,
    )

    return latest_data_for_user_company.id
run_rollout_initialisation
run_rollout_initialisation(
    source_type, source_information, commit
)

For some sources (most likely automated one), when activating it for a company, we don't want to process ALL past data (because of the high risk of data inconsistencies that can be too costly to fix).

Instead, we want to start processing data from the moment the source is activated.

To do so, we need to create a new EmploymentSourceDataModel with the source_information provided and rely on the "source deduplication" mechanism (see find_similar_source_data_and_mark_as_duplicate above and doc: https://www.notion.so/alaninsurance/Source-Deduplication-37060a0d1c884c1b970aa6b2a4600ea4?pvs=26&qid=1%3Ac0d72661-8c33-41aa-9b99-25d9fd8203ee%3A5 ⧉)

We use the rollout_initialisation metadata key to mark the source as "initialised" (to help excluded those when needed)

These sources are excluded from SLA computations.

Source code in components/employment/public/business_logic/actions/employment_source_data.py
def run_rollout_initialisation(
    source_type: SourceType, source_information: SourceInformation, commit: bool
) -> None:
    """
    For some sources (most likely automated one), when activating it for a company,
    we don't want to process ALL past data (because of the high risk of data inconsistencies that can be too costly to fix).

    Instead, we want to start processing data from the moment the source is activated.

    To do so, we need to create a new EmploymentSourceDataModel with the source_information provided and rely on the "source deduplication" mechanism
    (see find_similar_source_data_and_mark_as_duplicate above
    and doc: https://www.notion.so/alaninsurance/Source-Deduplication-37060a0d1c884c1b970aa6b2a4600ea4?pvs=26&qid=1%3Ac0d72661-8c33-41aa-9b99-25d9fd8203ee%3A5)

    We use the `rollout_initialisation` metadata key to mark the source as "initialised" (to help excluded those when needed)

    These sources are excluded from SLA computations.
    """
    source_information.metadata["rollout_initialisation"] = True
    get_or_create_employment_source_data_model(source_type, source_information)

    if commit:
        current_session.commit()
update_employment_source_data_metadata_with
update_employment_source_data_metadata_with(
    employment_source_data_id, update_dict, commit
)

Update the metadata stored in the DB using the provided update_dict. Each value in the update_dict will update the existing value for the given key. Keys not present in update_dict will not be changed.

Source code in components/employment/public/business_logic/actions/employment_source_data.py
def update_employment_source_data_metadata_with(
    employment_source_data_id: UUID, update_dict: dict[str, Any], commit: bool
) -> None:
    """
    Update the metadata stored in the DB using the provided update_dict. Each value in the update_dict will update
    the existing value for the given key. Keys not present in update_dict will not be changed.
    """
    employment_source_data_model = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    for key, value in update_dict.items():
        employment_source_data_model.source_metadata[key] = value

    if commit:
        current_session.commit()

merge_users

merge_users_in_employment_component
merge_users_in_employment_component(
    merge_from, merge_into, source_information, logs, commit
)

Support function for the "merge user" procedure: change all employments and blocked movements that target user merge_from so that they target user merge_into instead.

This will change ALL employments for the user, including cancelled ones. This will change ALL blocked movements for the user (active and terminal).

Note that this will NOT be notified to consumers.

If logs is not None, strings describing the actions will be appended to it.

Source code in components/employment/public/business_logic/actions/merge_users.py
def merge_users_in_employment_component(
    merge_from: UserId,
    merge_into: UserId,
    source_information: SourceInformationLike,
    logs: list[str] | None,
    commit: bool,
) -> None:
    """
    Support function for the "merge user" procedure: change all employments and blocked movements that target user
    `merge_from` so that they target user `merge_into` instead.

    This will change ALL employments for the user, including cancelled ones.
    This will change ALL blocked movements for the user (active and terminal).

    Note that this will NOT be notified to consumers.

    If `logs` is not None, strings describing the actions will be appended to it.
    """
    employment_source_data_id = get_or_create_employment_source_data_model(
        source_type=GlobalSourceType.internal_change,
        source_information_like=source_information,
    )
    employments = get_core_employments_for_user(merge_from)
    for employment in employments:
        current_employment_logger.info(
            f"Updating Employment Component employment {employment.employment_id} from user {merge_from} to {merge_into}",
            layer="merge_users",
        )
        if logs is not None:
            logs.append(
                f"Updating Employment Component employment {employment.employment_id} from user {merge_from} to {merge_into}"
            )
        new_core_employment_version = replace(
            employment,
            user_id=merge_into,
            source_type=GlobalSourceType.internal_change,
            employment_source_data_id=employment_source_data_id,
            employment_change_types=set(),
        )
        create_new_core_employment_version_model(
            new_core_employment_version, allow_inconsistent_user_id=True
        )

    _retarget_blocked_movements(merge_from, merge_into, logs)

    if commit:
        current_session.commit()

source_information

update_source_information_metadata_with
update_source_information_metadata_with(
    source_information, update_dict
)

Update the provided source_information's metadata in place using the provided update_dict.

Never commits, even if the underlying source_information is a persisted DB model.

Source code in components/employment/public/business_logic/actions/source_information.py
def update_source_information_metadata_with(
    source_information: SourceInformationLike, update_dict: dict[str, Any]
) -> None:
    """
    Update the provided source_information's metadata in place using the provided update_dict.

    Never commits, even if the underlying source_information is a persisted DB model.
    """
    # Simplest case -> already exists in DB, can use the regular flow
    if isinstance(source_information, UUID):
        update_employment_source_data_metadata_with(
            source_information, update_dict, commit=False
        )
        return

    # More complex case -> may or may not exist in DB, directly update the underlying dict
    if isinstance(source_information, SourceInformation):
        if (model := source_information._employment_source_data) is not None:  # noqa: ALN027 # , no way to say "internal to component" in Python
            dict_to_update = model.source_metadata
        dict_to_update = source_information.metadata
    else:
        raise ValueError(f"Invalid SourceInformationLike object: {source_information}")

    for key, value in update_dict.items():
        dict_to_update[key] = value

upstream_blocked_movement_creator

UpstreamBlockedMovementCreator
UpstreamBlockedMovementCreator(
    commit,
    source,
    upstream_data,
    source_information,
    upstream_retried_blocked_movement=None,
    source_rules_override=None,
)

Context manager that catch exceptions and create an upstream blocked movement if needed

upstream_data: data used to decide if we should create a new blocked movement or not Basically, if a new movement is failing with the same error and the same upstream_data, we won't create a new blocked movement

Source code in components/employment/public/business_logic/actions/upstream_blocked_movement_creator.py
def __init__(
    self,
    commit: bool,
    source: SourceType,
    upstream_data: dict[str, Any],
    source_information: SourceInformationLike,
    upstream_retried_blocked_movement: Optional[
        UpstreamBlockedMovementModel
    ] = None,
    source_rules_override: SourceRules | None = None,
) -> None:
    self.commit = commit
    self.source = source
    self.upstream_data = upstream_data
    self.retried_blocked_movement: Optional[UpstreamBlockedMovementModel] = (
        upstream_retried_blocked_movement
    )
    self.created_blocked_movement = False
    self.blocked_movement: UpstreamBlockedMovement | None = None
    self.source_information = source_information
    self.source_rules_override = source_rules_override
    self.employment_source_data: EmploymentSourceData | None = None
    """
    If created_blocked_movement is True (i.e. an upstream blocked movement was created because the block inside
    this creator was unsuccessful), employment_source_data will be set to the corresponding EmploymentSourceData.

    If created_blocked_movement is False (i.e. the block inside this creator executed without raising),
    employment_source_data will be None.
    """
__enter__
__enter__()
Source code in components/employment/public/business_logic/actions/upstream_blocked_movement_creator.py
def __enter__(self) -> "UpstreamBlockedMovementCreator":  # noqa: D105
    enter_blocked_movement_creator_semaphore()
    self.block_transaction = transaction(propagation=Propagation.NESTED)
    self.block_transaction.__enter__()
    return self
__exit__
__exit__(exc_type, exc, traceback)
Source code in components/employment/public/business_logic/actions/upstream_blocked_movement_creator.py
def __exit__(  # noqa: D105
    self,
    exc_type: Optional[type[BaseException]],
    exc: Optional[BaseException],
    traceback: Optional[TracebackType],
) -> bool:
    exit_blocked_movement_creator_semaphore()
    self.block_transaction.__exit__(exc_type, exc, traceback)

    if not exc:
        if self.retried_blocked_movement:
            self.retried_blocked_movement.status = BlockedMovementStatus.resolved

        if self.commit:
            current_session.commit()

        return False

    current_employment_logger.warning(
        f"Failed to make declaration with error: {exc}",
        exc_info=True,
        layer="blocked_movement_creator",
    )

    employment_source_data_id = get_or_create_employment_source_data_model(
        self.source, self.source_information
    )

    try:
        with transaction(propagation=Propagation.NESTED):
            error = exc
            if not isinstance(error, UpstreamError):
                if isinstance(error, IntegrityError):
                    # see https://github.com/alan-eu/alan-apps/pull/12736 for context
                    # /!\ this string is used to determine if we should create a new blocked movement
                    # if it is updated, it may duplicate some previous blocked movements
                    error_message = (
                        f"Constraint {error.orig.diag.constraint_name} of table {error.orig.diag.table_name} "  # type: ignore[union-attr]
                        f"in schema {error.orig.diag.schema_name} raised."  # type: ignore[union-attr]
                    )
                else:
                    error_message = str(error)
                error = UnexpectedUpstreamError(
                    "Unexpected error while ingesting employment declaration",
                    error_message=error_message,
                    context=UnexpectedUpstreamErrorContext(
                        traceback="".join(format_exception(error))
                    ),
                )

            blocked_movement = get_or_create_upstream_blocked_movement(
                error,
                source_type=self.source,
                upstream_data=self.upstream_data,
                employment_source_data_id=employment_source_data_id,
                source_rules_override=self.source_rules_override,
            )
    except Exception as error:
        current_employment_logger.warning(
            f"Failed to created blocked movement: {error}",
            exc_info=True,
            layer="blocked_movement_creator",
        )

        unexpected_error = UnexpectedUpstreamError(
            "Unexpected error while creating a blocked movement",
            error_message=str(error),
            context=UnexpectedUpstreamErrorContext(
                traceback="".join(format_exception(error))
            ),
        )

        blocked_movement = get_or_create_upstream_blocked_movement(
            unexpected_error,
            source_type=self.source,
            upstream_data=self.upstream_data,
            employment_source_data_id=employment_source_data_id,
            source_rules_override=self.source_rules_override,
        )

    if (
        self.retried_blocked_movement
        and blocked_movement != self.retried_blocked_movement
    ):
        self.retried_blocked_movement.status = BlockedMovementStatus.resolved
        blocked_movement.parent_blocked_movement_id = (
            self.retried_blocked_movement.id
        )

    self.created_blocked_movement = True
    self.blocked_movement = (
        UpstreamBlockedMovement.from_upstream_blocked_movement_model(
            blocked_movement
        )
    )
    employment_source_data_model = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    # In some cases, we can end up with a stale EmploymentSourceDataModel that still contains old (pre-rollback)
    # data. Expire it to force a refresh
    current_session.expire(employment_source_data_model)
    self.employment_source_data = (
        EmploymentSourceData.from_employment_source_data_model(
            employment_source_data_model
        )
    )

    if self.commit:
        current_session.commit()
    # Stops the exception propagation
    return True
blocked_movement instance-attribute
blocked_movement = None
commit instance-attribute
commit = commit
created_blocked_movement instance-attribute
created_blocked_movement = False
employment_source_data instance-attribute
employment_source_data = None

If created_blocked_movement is True (i.e. an upstream blocked movement was created because the block inside this creator was unsuccessful), employment_source_data will be set to the corresponding EmploymentSourceData.

If created_blocked_movement is False (i.e. the block inside this creator executed without raising), employment_source_data will be None.

retried_blocked_movement instance-attribute
retried_blocked_movement = upstream_retried_blocked_movement
source instance-attribute
source = source
source_information instance-attribute
source_information = source_information
source_rules_override instance-attribute
source_rules_override = source_rules_override
upstream_data instance-attribute
upstream_data = upstream_data

admin_resolvers

Admin error resolvers for blocked movements.

This package provides the infrastructure for admin-resolvable blocked movements.

admin_error_resolver

AdminErrorResolver

Bases: ABC

Base class for admin error resolvers.

Each resolver handles a specific error code and provides
  • Ability to check if a blocked movement can be resolved
  • Data needed for the admin to make a resolution decision
  • Validation of resolution parameters
  • Application of the resolution chosen by the admin

Global resolver classes are defined in GLOBAL_RESOLVER_CLASSES dict. Country-specific resolver classes are provided via CountryGateway.get_admin_error_resolver_classes().

apply_resolution abstractmethod
apply_resolution(blocked_movement, action, params=None)

Apply the admin's resolution choice to the blocked movement. Implementations should call validate_params() before processing.

Parameters:

Name Type Description Default
blocked_movement CoreBlockedMovement | UpstreamBlockedMovement

The blocked movement to resolve

required
action AdminResolutionAction

The action chosen by the admin (cancel or apply)

required
params dict[str, Any] | None

Optional parameters for the action. For 'apply' action: - if provided, used as override data for retry. - if None, retries as-is without modifications.

None

Raises:

Type Description
ResolutionActionError

If the action is invalid or the resolution cannot be applied

Source code in components/employment/public/business_logic/admin_resolvers/admin_error_resolver.py
@abstractmethod
def apply_resolution(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
    action: AdminResolutionAction,
    params: dict[str, Any] | None = None,
) -> None:
    """
    Apply the admin's resolution choice to the blocked movement.
    Implementations should call validate_params() before processing.

    Args:
        blocked_movement: The blocked movement to resolve
        action: The action chosen by the admin (cancel or apply)
        params: Optional parameters for the action.
                For 'apply' action:
                - if provided, used as override data for retry.
                - if None, retries as-is without modifications.

    Raises:
        ResolutionActionError: If the action is invalid or the resolution cannot be applied
    """
can_resolve
can_resolve(blocked_movement)

Check if this resolver can handle the given blocked movement.

Default implementation checks: - The error_code matches - The status is awaiting_user_response_self_healing

Override this method if you need additional validation.

Source code in components/employment/public/business_logic/admin_resolvers/admin_error_resolver.py
def can_resolve(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> bool:
    """
    Check if this resolver can handle the given blocked movement.

    Default implementation checks:
    - The error_code matches
    - The status is awaiting_user_response_self_healing

    Override this method if you need additional validation.
    """
    if blocked_movement.error_code != self.error_code:
        return False

    if (
        blocked_movement.status
        != BlockedMovementStatus.awaiting_user_response_self_healing
    ):
        return False

    return True
category abstractmethod property
category

Category for grouping in admin reporting alerts.

error_code abstractmethod property
error_code

The error code this resolver handles.

filter_resolvable_movements classmethod
filter_resolvable_movements(blocked_movements)

Pre-filter movements before building resolution contexts.

Override to apply per-resolver deduplication or filtering logic (e.g., keeping only one BM per employee). Default returns all movements unchanged.

Parameters:

Name Type Description Default
blocked_movements list[CoreBlockedMovement | UpstreamBlockedMovement]

Candidate blocked movements for this resolver.

required

Returns:

Type Description
list[CoreBlockedMovement | UpstreamBlockedMovement]

Filtered list of blocked movements.

Source code in components/employment/public/business_logic/admin_resolvers/admin_error_resolver.py
@classmethod
def filter_resolvable_movements(
    cls,
    blocked_movements: list[CoreBlockedMovement | UpstreamBlockedMovement],
) -> list[CoreBlockedMovement | UpstreamBlockedMovement]:
    """Pre-filter movements before building resolution contexts.

    Override to apply per-resolver deduplication or filtering logic
    (e.g., keeping only one BM per employee). Default returns all movements unchanged.

    Args:
        blocked_movements: Candidate blocked movements for this resolver.

    Returns:
        Filtered list of blocked movements.
    """
    return blocked_movements
get_resolution_context abstractmethod
get_resolution_context(blocked_movement)

Fetch and return the data needed for the admin to resolve this blocked movement.

Parameters:

Name Type Description Default
blocked_movement CoreBlockedMovement | UpstreamBlockedMovement

The blocked movement to resolve

required

Returns:

Type Description
AdminResolutionContext

AdminResolutionContext containing resolution options and context.

AdminResolutionContext

The context dict MUST include all fields from RequiredContextFields: - employee_full_name: str - employee_email: str (can be empty) - company_display_name: str - created_at: str (ISO timestamp)

Raises:

Type Description
ResolutionActionError

If the blocked movement is invalid or missing required data

Source code in components/employment/public/business_logic/admin_resolvers/admin_error_resolver.py
@abstractmethod
def get_resolution_context(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> AdminResolutionContext:
    """
    Fetch and return the data needed for the admin to resolve this blocked movement.

    Args:
        blocked_movement: The blocked movement to resolve

    Returns:
        AdminResolutionContext containing resolution options and context.
        The context dict MUST include all fields from RequiredContextFields:
            - employee_full_name: str
            - employee_email: str (can be empty)
            - company_display_name: str
            - created_at: str (ISO timestamp)

    Raises:
        ResolutionActionError: If the blocked movement is invalid or missing required data
    """
validate_params
validate_params(_action, _params)

Validate the resolution parameters before applying.

Override this method to add resolver-specific validation. Default implementation does nothing.

Parameters:

Name Type Description Default
action

The action chosen by the admin

required
params

Optional parameters for the action

required

Raises:

Type Description
ResolutionActionError

If params are invalid

Source code in components/employment/public/business_logic/admin_resolvers/admin_error_resolver.py
def validate_params(
    self,
    _action: AdminResolutionAction,
    _params: dict[str, Any] | None,
) -> None:
    """
    Validate the resolution parameters before applying.

    Override this method to add resolver-specific validation.
    Default implementation does nothing.

    Args:
        action: The action chosen by the admin
        params: Optional parameters for the action

    Raises:
        ResolutionActionError: If params are invalid
    """
    # Default implementation does nothing - subclasses can override
    return
RequiredContextFields

Bases: TypedDict

Required context fields for admin resolution dashboard.

These fields are required by the frontend to display blocked movements in the admin dashboard. Resolvers MUST include these fields in their context.

company_display_name instance-attribute
company_display_name

Display name of the company.

created_at instance-attribute
created_at

ISO timestamp when the blocked movement was created.

employee_email instance-attribute
employee_email

Email of the affected employee (can be empty string).

employee_full_name instance-attribute
employee_full_name

Display name of the affected employee.

create_admin_resolution_context
create_admin_resolution_context(
    *,
    blocked_movement_id,
    error_code,
    source_type,
    employee_full_name,
    employee_email,
    company_display_name,
    created_at,
    **extra_context
)

Build an AdminResolutionContext with required fields enforced at call site.

This helper ensures all required fields are provided by making them explicit keyword arguments. Additional error-specific context can be passed as extra context

Parameters:

Name Type Description Default
blocked_movement_id UUID

ID of the blocked movement

required
error_code str

Error code for routing to resolver

required
source_type BaseSourceType

Source type of the blocked movement (e.g. "fr_dsn", "fr_admin_dashboard")

required
employee_full_name str

Display name of affected employee

required
employee_email str

Email of affected employee (can be empty)

required
company_display_name str

Display name of company

required
created_at str

ISO timestamp string

required
**extra_context Any

Additional error-specific context fields

{}

Returns:

Type Description
AdminResolutionContext

AdminResolutionContext with all required fields

Source code in components/employment/public/business_logic/admin_resolvers/admin_error_resolver.py
def create_admin_resolution_context(
    *,
    blocked_movement_id: UUID,
    error_code: str,
    source_type: BaseSourceType,
    employee_full_name: str,
    employee_email: str,
    company_display_name: str,
    created_at: str,
    **extra_context: Any,
) -> AdminResolutionContext:
    """
    Build an AdminResolutionContext with required fields enforced at call site.

    This helper ensures all required fields are provided by making them
    explicit keyword arguments. Additional error-specific context can be
    passed as extra context

    Args:
        blocked_movement_id: ID of the blocked movement
        error_code: Error code for routing to resolver
        source_type: Source type of the blocked movement (e.g. "fr_dsn", "fr_admin_dashboard")
        employee_full_name: Display name of affected employee
        employee_email: Email of affected employee (can be empty)
        company_display_name: Display name of company
        created_at: ISO timestamp string
        **extra_context: Additional error-specific context fields

    Returns:
        AdminResolutionContext with all required fields
    """
    context: dict[str, Any] = {
        "employee_full_name": employee_full_name,
        "employee_email": employee_email,
        "company_display_name": company_display_name,
        "created_at": created_at,
        **extra_context,
    }

    return AdminResolutionContext(
        blocked_movement_id=blocked_movement_id,
        error_code=error_code,
        source_type=source_type,
        context=context,
    )

registry

Registry for admin error resolvers.

Global resolvers are defined in the GLOBAL_RESOLVER_CLASSES dict. Country-specific resolvers are provided via CountryGateway.get_admin_error_resolver_classes().

GLOBAL_RESOLVER_CLASSES module-attribute
GLOBAL_RESOLVER_CLASSES = {
    error_code: ExternalEmployeeIdConflictResolver,
    value: MissingRequiredExternalEmployeeIdResolver,
    error_code: StaleInvitationResolver,
}
get_admin_error_resolver_classes
get_admin_error_resolver_classes(country_code)

Get all admin error resolver classes for a given country.

Combines global resolvers with country-specific resolvers. Country-specific resolvers take precedence if there's a conflict.

Parameters:

Name Type Description Default
country_code CountryCode

The country code to get resolvers for

required

Returns:

Type Description
dict[str, type[AdminErrorResolver]]

Dictionary mapping error codes to resolver classes

Source code in components/employment/public/business_logic/admin_resolvers/registry.py
def get_admin_error_resolver_classes(
    country_code: CountryCode,
) -> dict[str, type[AdminErrorResolver]]:
    """
    Get all admin error resolver classes for a given country.

    Combines global resolvers with country-specific resolvers.
    Country-specific resolvers take precedence if there's a conflict.

    Args:
        country_code: The country code to get resolvers for

    Returns:
        Dictionary mapping error codes to resolver classes
    """
    resolver_classes = GLOBAL_RESOLVER_CLASSES.copy()
    country_gateway = get_country_gateway(country_code)
    country_resolver_classes = country_gateway.get_admin_error_resolver_classes()
    resolver_classes.update(country_resolver_classes)
    return resolver_classes
get_all_resolvable_error_codes
get_all_resolvable_error_codes(country_code)

Get all error codes that have registered resolvers for a given country.

Parameters:

Name Type Description Default
country_code CountryCode

The country code to get resolvers for

required

Returns:

Type Description
list[str]

List of error codes that can be resolved by admins

Source code in components/employment/public/business_logic/admin_resolvers/registry.py
def get_all_resolvable_error_codes(country_code: CountryCode) -> list[str]:
    """
    Get all error codes that have registered resolvers for a given country.

    Args:
        country_code: The country code to get resolvers for

    Returns:
        List of error codes that can be resolved by admins
    """
    resolver_classes = get_admin_error_resolver_classes(country_code)
    return list(resolver_classes.keys())
get_resolver
get_resolver(error_code, country_code)

Get a resolver instance for a given error code in a specific country.

Parameters:

Name Type Description Default
error_code str

The error code to get a resolver for

required
country_code CountryCode

The country code to get resolvers for

required

Returns:

Type Description
AdminErrorResolver | None

A new resolver instance, or None if no resolver is registered for this error code

Source code in components/employment/public/business_logic/admin_resolvers/registry.py
def get_resolver(
    error_code: str, country_code: CountryCode
) -> AdminErrorResolver | None:
    """
    Get a resolver instance for a given error code in a specific country.

    Args:
        error_code: The error code to get a resolver for
        country_code: The country code to get resolvers for

    Returns:
        A new resolver instance, or None if no resolver is registered for this error code
    """
    resolver_classes = get_admin_error_resolver_classes(country_code)
    resolver_class = resolver_classes.get(error_code)
    if resolver_class is None:
        return None
    return resolver_class()

resolvers

external_employee_id_conflict_resolver
ExternalEmployeeIdConflictResolver

Bases: AdminErrorResolver

Resolver for external_employee_id_conflict blocked movements.

This error occurs when an employment declaration has an external employee ID that already belongs to another active employment in the same company.

apply_resolution
apply_resolution(blocked_movement, action, params=None)

Apply the resolution action to the blocked movement.

Parameters:

Name Type Description Default
blocked_movement CoreBlockedMovement | UpstreamBlockedMovement

The blocked movement to resolve

required
action AdminResolutionAction

The action chosen by the admin (cancel or apply)

required
params dict[str, Any] | None

Parameters for the action (required for apply): - new_external_employee_id (str): The new external employee ID

None

Raises:

Type Description
ResolutionAttemptError

If the action is invalid or the resolution cannot be applied

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/external_employee_id_conflict_resolver.py
@override
def apply_resolution(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
    action: AdminResolutionAction,
    params: dict[str, Any] | None = None,
) -> None:
    """
    Apply the resolution action to the blocked movement.

    Args:
        blocked_movement: The blocked movement to resolve
        action: The action chosen by the admin (cancel or apply)
        params: Parameters for the action (required for apply):
            - new_external_employee_id (str): The new external employee ID

    Raises:
        ResolutionAttemptError: If the action is invalid or the resolution cannot be applied
    """
    core_blocked_movement = mandatory_type(
        expected_type=CoreBlockedMovement,
        value=blocked_movement,
    )

    if core_blocked_movement.error_code != self.error_code:
        raise ResolutionActionError(
            f"Blocked movement {core_blocked_movement.id} has error_code "
            f"{core_blocked_movement.error_code}, expected {self.error_code}"
        )

    if action == AdminResolutionAction.cancel:
        manually_cancel_core_blocked_movement(core_blocked_movement.id, commit=True)
    elif action == AdminResolutionAction.apply:
        country_code = get_country_code_from_blocked_movement(core_blocked_movement)
        country_gateway = get_country_gateway(country_code)

        employment_declaration = self._extract_employment_declaration(
            core_blocked_movement
        )

        new_external_employee_id, error_info = (
            country_gateway.extract_external_employee_id_from_data(
                mandatory(params),
                "new_external_employee_id",
                int(core_blocked_movement.company_id),
            )
        )

        if not new_external_employee_id:
            raise ResolutionAttemptError(
                error_info.message
                if error_info
                else "Missing new external employee id",
                error_code=error_info.error_code.value if error_info else "",
            )

        conflicting_employment = _get_conflicting_employment(
            external_employee_id=new_external_employee_id,
            company_id=core_blocked_movement.company_id,
            user_id=core_blocked_movement.user_id,
            start_date=employment_declaration.start_date,
            end_date=employment_declaration.end_date,
        )

        if conflicting_employment:
            raise ResolutionAttemptError(
                message=f"The external employee id '{new_external_employee_id}' is already used by another employee in this company",
                error_code=self.error_code,
            )

        retry_core_blocked_movement(
            blocked_movement_id=core_blocked_movement.id,
            employment_declaration_override=replace(
                employment_declaration,
                external_employee_id=new_external_employee_id,
            ),
            commit=True,
        )
    else:
        assert_never(action)
can_resolve
can_resolve(blocked_movement)

Check if this resolver can handle the given blocked movement. Only core blocked movements can be resolved.

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/external_employee_id_conflict_resolver.py
@override
def can_resolve(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> bool:
    """
    Check if this resolver can handle the given blocked movement.
    Only core blocked movements can be resolved.
    """
    if isinstance(blocked_movement, UpstreamBlockedMovement):
        return False

    return super().can_resolve(blocked_movement)
category property
category

Category for grouping in admin reporting alerts.

error_code property
error_code

The error code this resolver handles.

get_resolution_context
get_resolution_context(blocked_movement)

Get resolution data for the blocked movement.

Retrieves the blocked movement and its conflicting employment information to provide context for resolution.

Parameters:

Name Type Description Default
blocked_movement CoreBlockedMovement | UpstreamBlockedMovement

The blocked movement to build context for

required

Returns:

Type Description
AdminResolutionContext

AdminResolutionContext with inbound + conflicting employee info

Raises:

Type Description
ResolutionAttemptError

If the blocked movement is missing required data

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/external_employee_id_conflict_resolver.py
@override
def get_resolution_context(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> AdminResolutionContext:
    """
    Get resolution data for the blocked movement.

    Retrieves the blocked movement and its conflicting employment information
    to provide context for resolution.

    Args:
        blocked_movement: The blocked movement to build context for

    Returns:
        AdminResolutionContext with inbound + conflicting employee info

    Raises:
        ResolutionAttemptError: If the blocked movement is missing required data
    """
    core_blocked_movement = mandatory_type(
        expected_type=CoreBlockedMovement,
        value=blocked_movement,
    )

    country_code = get_country_code_from_blocked_movement(core_blocked_movement)
    country_gateway = get_country_gateway(country_code)

    employment_declaration = self._extract_employment_declaration(
        core_blocked_movement
    )

    if not employment_declaration.external_employee_id:
        raise ResolutionAttemptError(
            f"Blocked movement {core_blocked_movement.id} has no external employee id in context."
        )

    conflicting_employment = _get_conflicting_employment(
        external_employee_id=employment_declaration.external_employee_id,
        company_id=core_blocked_movement.company_id,
        user_id=core_blocked_movement.user_id,
        start_date=employment_declaration.start_date,
        end_date=employment_declaration.end_date,
    )

    inbound_employee_full_name = (
        country_gateway.get_user_full_name(core_blocked_movement.user_id)
        if core_blocked_movement.user_id
        else ""
    ) or ""

    company_info = country_gateway.get_company_information(
        core_blocked_movement.company_id
    )
    company_display_name = company_info.display_name if company_info else ""

    employee_email = (
        country_gateway.get_user_email(core_blocked_movement.user_id) or ""
        if core_blocked_movement.user_id
        else ""
    )

    infinitely_valid_extended_info = first_or_none(
        employment_declaration.extended_informations,
        predicate=lambda ei: ei.validity_period is None,
    )
    employee_identifier = (
        country_gateway.get_employee_identifier_for_country(
            infinitely_valid_extended_info.values
        )
        or ""
        if infinitely_valid_extended_info
        else ""
    )

    conflicting_employee_full_name = ""
    conflicting_external_employee_id = (
        employment_declaration.external_employee_id or ""
    )
    if conflicting_employment:
        conflicting_employee_full_name = (
            country_gateway.get_user_full_name(conflicting_employment.user_id)
            if conflicting_employment.user_id
            else ""
        ) or ""
        conflicting_external_employee_id = (
            conflicting_employment.external_employee_id or ""
        )

    return create_admin_resolution_context(
        blocked_movement_id=core_blocked_movement.id,
        error_code=self.error_code,
        source_type=core_blocked_movement.source_type,
        employee_full_name=inbound_employee_full_name,
        employee_email=employee_email,
        company_display_name=company_display_name,
        created_at=str(core_blocked_movement.created_at),
        conflicting_external_employee_id=conflicting_external_employee_id,
        conflicting_employee_full_name=conflicting_employee_full_name,
        employee_identifier=employee_identifier,
        no_longer_conflicting=conflicting_employment is None,
    )
missing_required_external_employee_id_resolver
MissingRequiredExternalEmployeeIdResolver

Bases: AdminErrorResolver

Resolver for missing_required_external_employee_id blocked movements.

apply_resolution
apply_resolution(blocked_movement, action, params=None)

Apply the resolution action to the blocked movement.

Supports 'cancel' and 'apply' actions. For 'apply', requires 'new_external_employee_id' in the params.

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/missing_required_external_employee_id_resolver.py
@override
def apply_resolution(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
    action: AdminResolutionAction,
    params: dict[str, Any] | None = None,
) -> None:
    """
    Apply the resolution action to the blocked movement.

    Supports 'cancel' and 'apply' actions. For 'apply', requires
    'new_external_employee_id' in the params.
    """
    upstream_blocked_movement = mandatory_type(
        expected_type=UpstreamBlockedMovement,
        value=blocked_movement,
    )

    if not upstream_blocked_movement.company_id:
        raise ResolutionActionError(
            f"Blocked movement {upstream_blocked_movement.id} has no company_id"
        )

    if upstream_blocked_movement.error_code != self.error_code:
        raise ResolutionActionError(
            f"Blocked movement {upstream_blocked_movement.id} has error_code "
            f"{upstream_blocked_movement.error_code}, expected {self.error_code}"
        )

    if action == AdminResolutionAction.cancel:
        manually_cancel_upstream_blocked_movement(
            upstream_blocked_movement_id=upstream_blocked_movement.id,
            commit=True,
        )
    elif action == AdminResolutionAction.apply:
        upstream_data_record = upstream_blocked_movement.upstream_data["record"]

        country_code = get_country_code_from_blocked_movement(
            upstream_blocked_movement
        )
        country_gateway = get_country_gateway(country_code)

        new_external_employee_id, error_info = (
            country_gateway.extract_external_employee_id_from_data(
                mandatory(params),
                "new_external_employee_id",
                int(upstream_blocked_movement.company_id),
            )
        )

        if not new_external_employee_id:
            raise ResolutionActionError(
                "Missing new external employee id", error_info
            )

        start_date, error_info = extract_mandatory_start_date_from_data(
            data=upstream_data_record,
            field_name="start_date",
        )

        if not start_date:
            raise ResolutionActionError("Missing start date", error_info)

        conflicting_employments = (
            get_core_employments_for_external_employee_id_in_company_overlapping(
                external_employee_id=new_external_employee_id,
                company_id=upstream_blocked_movement.company_id,
                start_date=start_date,
                end_date=upstream_data_record.get("end_date"),
            )
        )

        if conflicting_employments:
            raise ExternalEmployeeIdConflictError(
                message=f"The external employee id '{new_external_employee_id}' is already used by another employee in this company",
                conflicting_employment_id=conflicting_employments[0].employment_id,
            )

        upstream_data_override = deepcopy(upstream_blocked_movement.upstream_data)
        upstream_data_override["record"] = {
            **upstream_data_override["record"],
            "external_id": new_external_employee_id,
        }
        upstream_data_override["record"]["external_id"] = new_external_employee_id

        retry_upstream_blocked_movement(
            upstream_blocked_movement_id=upstream_blocked_movement.id,
            upstream_data_override=upstream_data_override,
            commit=True,
        )
    else:
        assert_never(action)
can_resolve
can_resolve(blocked_movement)

Check if this resolver can handle the given blocked movement. In this case, we only handle upstream instances.

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/missing_required_external_employee_id_resolver.py
@override
def can_resolve(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> bool:
    """
    Check if this resolver can handle the given blocked movement.
    In this case, we only handle upstream instances.
    """
    if isinstance(blocked_movement, CoreBlockedMovement):
        return False

    return super().can_resolve(blocked_movement)
category property
category

Category for grouping in admin reporting alerts.

error_code property
error_code

The error code this resolver handles.

get_resolution_context
get_resolution_context(blocked_movement)

Get resolution data for the blocked movement.

Retrieves the blocked movement information to provide context for resolution.

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/missing_required_external_employee_id_resolver.py
@override
def get_resolution_context(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> AdminResolutionContext:
    """
    Get resolution data for the blocked movement.

    Retrieves the blocked movement information to provide context for resolution.
    """
    upstream_blocked_movement = mandatory_type(
        expected_type=UpstreamBlockedMovement,
        value=blocked_movement,
    )

    upstream_data_record = upstream_blocked_movement.upstream_data["record"]

    company_id = mandatory(
        upstream_blocked_movement.company_id, message="Company ID is required"
    )

    country_code = get_country_code_from_blocked_movement(upstream_blocked_movement)
    country_gateway = get_country_gateway(country_code)

    employee_full_name = _get_user_display_name_from_user_id_or_dict(
        upstream_data_record,
        upstream_blocked_movement.user_id,
        country_gateway,
    )

    company_info = country_gateway.get_company_information(company_id)
    company_display_name = company_info.display_name if company_info else ""

    employee_identifier = (
        country_gateway.get_employee_identifier_for_country(upstream_data_record)
        or ""
    )

    return create_admin_resolution_context(
        blocked_movement_id=upstream_blocked_movement.id,
        error_code=self.error_code,
        source_type=upstream_blocked_movement.source_type,
        employee_full_name=employee_full_name or "(Inconnu)",
        employee_email=upstream_data_record.get("email", ""),
        company_display_name=company_display_name,
        created_at=str(upstream_blocked_movement.created_at),
        employee_identifier=employee_identifier,
        company_id=company_id,
    )
stale_invitation_resolver
StaleInvitationResolutionParams

Bases: TypedDict

Parameters for resolving a stale invitation blocked movement.

All fields are optional. When params is None, the resolution will retry without any overrides.

Expected keys
  • start_date_override: str (ISO date string for new start date) or None
start_date_override instance-attribute
start_date_override
StaleInvitationResolver

Bases: AdminErrorResolver

Resolver for stale invitation blocked movements.

Stale invitations occur when an employee invitation has an effective start date that is more than 100 days in the past. Admins can resolve these by: - Canceling the invitation - Retrying with a corrected start date - Retrying with permission to skip the stale check

apply_resolution
apply_resolution(blocked_movement, action, params=None)

Apply the admin's resolution choice to the stale invitation.

Parameters:

Name Type Description Default
blocked_movement CoreBlockedMovement | UpstreamBlockedMovement

The blocked movement to resolve

required
action AdminResolutionAction

The action chosen by the admin (cancel or apply)

required
params dict[str, Any] | None

Optional parameters for the action: - start_date_override (str, optional): ISO date string for new start date

None

Raises:

Type Description
ResolutionActionError

If the action is invalid or the resolution cannot be applied

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/stale_invitation_resolver.py
@override
def apply_resolution(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
    action: AdminResolutionAction,
    params: dict[str, Any] | None = None,
) -> None:
    """
    Apply the admin's resolution choice to the stale invitation.

    Args:
        blocked_movement: The blocked movement to resolve
        action: The action chosen by the admin (cancel or apply)
        params: Optional parameters for the action:
            - start_date_override (str, optional): ISO date string for new start date

    Raises:
        ResolutionActionError: If the action is invalid or the resolution cannot be applied
    """
    core_blocked_movement = mandatory_type(
        expected_type=CoreBlockedMovement,
        value=blocked_movement,
    )

    self.validate_params(action, params)

    if core_blocked_movement.error_code != self.error_code:
        raise ResolutionActionError(
            f"Blocked movement {core_blocked_movement.id} has error_code "
            f"{core_blocked_movement.error_code}, expected {self.error_code}"
        )

    if action == AdminResolutionAction.cancel:
        update_core_blocked_movement_status(
            core_blocked_movement_id=core_blocked_movement.id,
            new_status=BlockedMovementStatus.cancelled,
            commit=True,
        )
    elif action == AdminResolutionAction.apply:
        # Build resolution parameters
        # retry without overrides by default, if params provided, use them for overrides
        # Always skip stale check for future retries since admin explicitly resolved this
        start_date_override: str | None = None
        if params:
            typed_params = cast("StaleInvitationResolutionParams", params)
            start_date_override = typed_params.get("start_date_override")

        resolution_parameters = StaleInvitationResolutionParameters(
            blocked_movement_id=core_blocked_movement.id,
            cancel=False,
            can_have_target_start_date_more_than_100_days_in_the_past=True,
            start_date_override=start_date_override,
        )

        retry_stale_invitation(
            resolution_parameters=resolution_parameters,
            commit=True,
        )
    else:
        assert_never(action)
can_resolve
can_resolve(blocked_movement)

Check if this resolver can handle the given blocked movement.

Only core blocked movements can be resolved (stale invitations are always core).

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/stale_invitation_resolver.py
@override
def can_resolve(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> bool:
    """
    Check if this resolver can handle the given blocked movement.

    Only core blocked movements can be resolved (stale invitations are always core).
    """
    if isinstance(blocked_movement, UpstreamBlockedMovement):
        return False

    return super().can_resolve(blocked_movement)
category property
category

Category for grouping in admin reporting alerts.

error_code property
error_code

The error code this resolver handles.

get_resolution_context
get_resolution_context(blocked_movement)

Return resolution data for a stale invitation blocked movement.

Extracts employee information and defines display fields for the admin.

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/stale_invitation_resolver.py
@override
def get_resolution_context(
    self,
    blocked_movement: CoreBlockedMovement | UpstreamBlockedMovement,
) -> AdminResolutionContext:
    """
    Return resolution data for a stale invitation blocked movement.

    Extracts employee information and defines display fields for the admin.
    """
    core_blocked_movement: CoreBlockedMovement = mandatory_type(
        expected_type=CoreBlockedMovement,
        value=blocked_movement,
    )

    core_blocked_movement_context = mandatory(core_blocked_movement.context)

    employment_declaration_dict = core_blocked_movement_context.get(
        "employment_declaration"
    )
    if not employment_declaration_dict:
        raise ResolutionActionError(
            f"Blocked movement {core_blocked_movement.id} missing employment_declaration in context"
        )

    employment_declaration: EmploymentDeclaration[Any] = (
        EmploymentDeclaration.from_dict(employment_declaration_dict)
    )

    effective_start_date, is_valid = cast_to_date_with_validation(
        employment_declaration.start_date
    )

    if not is_valid or effective_start_date is None:
        raise ResolutionActionError(
            f"Blocked movement {core_blocked_movement.id} has invalid start_date: "
            f"{employment_declaration.start_date}"
        )

    country_code = get_country_code_from_blocked_movement(core_blocked_movement)
    country_gateway = get_country_gateway(country_code)

    employee_full_name: str = (
        country_gateway.get_user_full_name(core_blocked_movement.user_id) or ""
    )

    externed_information = one(
        [
            update
            for update in employment_declaration.extended_informations
            if update.validity_period is None
        ]
    )

    employee_email = (
        country_gateway.get_employee_email_from_extended_values(
            externed_information.values
        )
        or ""
    )

    employee_identifier = (
        country_gateway.get_employee_identifier_for_country(
            externed_information.values
        )
        or ""
    )

    company_info = country_gateway.get_company_information(
        core_blocked_movement.company_id
    )

    company_display_name = company_info.display_name if company_info else ""

    return create_admin_resolution_context(
        blocked_movement_id=core_blocked_movement.id,
        error_code=self.error_code,
        source_type=core_blocked_movement.source_type,
        employee_full_name=employee_full_name,
        employee_email=employee_email,
        company_display_name=company_display_name,
        created_at=str(core_blocked_movement.created_at),
        error_message=core_blocked_movement.error_message,
        employee_identifier=employee_identifier,
        effective_start_date=effective_start_date.isoformat(),
        start_date=str(employment_declaration.start_date),
    )
validate_params
validate_params(action, params)

Validate resolution parameters for stale invitation.

Parameters:

Name Type Description Default
action AdminResolutionAction

The action chosen by the admin

required
params dict[str, Any] | None

Optional parameters for the action

required

Raises:

Type Description
ResolutionActionError

If params contain invalid keys or values

Source code in components/employment/public/business_logic/admin_resolvers/resolvers/stale_invitation_resolver.py
@override
def validate_params(
    self,
    action: AdminResolutionAction,
    params: dict[str, Any] | None,
) -> None:
    """
    Validate resolution parameters for stale invitation.

    Args:
        action: The action chosen by the admin
        params: Optional parameters for the action

    Raises:
        ResolutionActionError: If params contain invalid keys or values
    """
    if params is None:
        return

    # Validate start_date_override if provided
    start_date_override = params.get("start_date_override")
    if start_date_override is not None:
        if not isinstance(start_date_override, str):
            raise ResolutionActionError(
                f"start_date_override must be a string, got {type(start_date_override).__name__}"
            )
        # Validate ISO date format
        try:
            date.fromisoformat(start_date_override)
        except ValueError as e:
            raise ResolutionActionError(
                f"start_date_override must be a valid ISO date (YYYY-MM-DD): {e}"
            ) from e

queries

admin_blocked_movements

MAX_DISPLAYED_RESOLUTIONS module-attribute
MAX_DISPLAYED_RESOLUTIONS = 150
get_admin_resolvable_blocked_movements
get_admin_resolvable_blocked_movements(
    company_ids, country_code
)

Get blocked movements grouped by action type for admin resolution dashboard.

Parameters:

Name Type Description Default
company_ids list[str]

Company IDs to filter blocked movements for.

required
country_code CountryCode

The country from which the resolvable BM list is requested.

required

Returns:

Type Description
AdminResolutionsResponse

AdminResolutionsResponse with action_required (awaiting_user_response_self_healing)

AdminResolutionsResponse

and under_investigation (pending) lists.

Source code in components/employment/public/business_logic/queries/admin_blocked_movements.py
def get_admin_resolvable_blocked_movements(
    company_ids: list[str],
    country_code: CountryCode,
) -> AdminResolutionsResponse:
    """
    Get blocked movements grouped by action type for admin resolution dashboard.

    Args:
        company_ids: Company IDs to filter blocked movements for.
        country_code: The country from which the resolvable BM list is requested.

    Returns:
        AdminResolutionsResponse with action_required (awaiting_user_response_self_healing)
        and under_investigation (pending) lists.
    """
    company_ids_set = set(company_ids)
    if not company_ids_set:
        return AdminResolutionsResponse(action_required=[], under_investigation=[])

    gateway = get_country_gateway(country_code)
    resolvable_error_codes = set(get_all_resolvable_error_codes(country_code))
    under_investigation_error_codes = set(
        gateway.get_all_under_investigation_error_codes()
    )

    if not resolvable_error_codes and not under_investigation_error_codes:
        return AdminResolutionsResponse(action_required=[], under_investigation=[])

    core_action_required_query: Query[CoreBlockedMovementModel] | None = None
    upstream_action_required_query: Query[UpstreamBlockedMovementModel] | None = None
    if resolvable_error_codes:
        core_action_required_query, upstream_action_required_query = (
            build_admin_retrievable_blocked_movements_queries(
                company_ids=company_ids_set,
                error_codes=resolvable_error_codes,
                status=BlockedMovementStatus.awaiting_user_response_self_healing,
            )
        )

    core_under_investigation_query: Query[CoreBlockedMovementModel] | None = None
    upstream_under_investigation_query: Query[UpstreamBlockedMovementModel] | None = (
        None
    )
    if under_investigation_error_codes:
        core_under_investigation_query, upstream_under_investigation_query = (
            build_admin_retrievable_blocked_movements_queries(
                company_ids=company_ids_set,
                error_codes=under_investigation_error_codes,
                status=BlockedMovementStatus.pending,
            )
        )

    action_required_resolutions = _fetch_and_build_tab(
        core_query=core_action_required_query,
        upstream_query=upstream_action_required_query,
        builder=_build_resolvable_blocked_movement_context,
        tab_label="action_required",
        company_ids_set=company_ids_set,
    )

    under_investigation_resolutions = _fetch_and_build_tab(
        core_query=core_under_investigation_query,
        upstream_query=upstream_under_investigation_query,
        builder=_build_under_investigation_blocked_movement_context,
        tab_label="under_investigation",
        company_ids_set=company_ids_set,
    )

    return AdminResolutionsResponse(
        action_required=action_required_resolutions,
        under_investigation=under_investigation_resolutions,
    )
get_admin_resolvable_blocked_movements_stats
get_admin_resolvable_blocked_movements_stats(
    company_ids, country_code, error_codes=None
)

Get statistics about pending admin resolutions grouped by category.

Parameters:

Name Type Description Default
company_ids list[str]

Company IDs to filter blocked movements for

required
country_code CountryCode

The country from which the resolvable BM list is requested

required
error_codes list[str] | None

Optional list of error codes to filter by. If None, uses all resolvable error codes.

None

Returns:

Type Description
AdminResolutionsStats

AdminResolutionsStats, containing statistics grouped by category.

Source code in components/employment/public/business_logic/queries/admin_blocked_movements.py
def get_admin_resolvable_blocked_movements_stats(
    company_ids: list[str],
    country_code: CountryCode,
    error_codes: list[str] | None = None,
) -> AdminResolutionsStats:
    """
    Get statistics about pending admin resolutions grouped by category.

    Args:
        company_ids: Company IDs to filter blocked movements for
        country_code: The country from which the resolvable BM list is requested
        error_codes: Optional list of error codes to filter by. If None, uses all resolvable error codes.

    Returns:
        AdminResolutionsStats, containing statistics grouped by category.
    """
    from components.employment.internal.business_logic.queries.blocked_movements import (
        get_admin_resolvable_blocked_movements_stats,
    )

    return get_admin_resolvable_blocked_movements_stats(
        company_ids=company_ids,
        country_code=country_code,
        error_codes=error_codes,
    )
get_company_ids_with_resolvable_errors
get_company_ids_with_resolvable_errors(country_code)

Return company_ids that have at least one admin-resolvable blocked movement.

Parameters:

Name Type Description Default
country_code CountryCode

Country to get resolvable error codes for.

required

Returns:

Type Description
set[str]

Set of company_id strings with resolvable errors.

Source code in components/employment/public/business_logic/queries/admin_blocked_movements.py
def get_company_ids_with_resolvable_errors(
    country_code: CountryCode,
) -> set[str]:
    """Return company_ids that have at least one admin-resolvable blocked movement.

    Args:
        country_code: Country to get resolvable error codes for.

    Returns:
        Set of company_id strings with resolvable errors.
    """
    resolvable_codes = set(get_all_resolvable_error_codes(country_code))
    if not resolvable_codes:
        return set()

    core_query, upstream_query = build_admin_retrievable_blocked_movements_queries(
        error_codes=resolvable_codes,
        status=BlockedMovementStatus.awaiting_user_response_self_healing,
    )
    core_ids = core_query.with_entities(CoreBlockedMovementModel.company_id)
    upstream_ids = upstream_query.with_entities(UpstreamBlockedMovementModel.company_id)
    return {row[0] for row in core_ids.union(upstream_ids).all()}
get_resolvable_errors_count_by_category
get_resolvable_errors_count_by_category(
    company_ids, country_code
)

Return count of resolvable blocked movements grouped by reporting category.

Lightweight SQL COUNT alternative to get_admin_resolvable_blocked_movements_stats for cases where only category totals are needed (e.g., digest emails).

Parameters:

Name Type Description Default
company_ids list[str]

Company IDs to scope the query to.

required
country_code CountryCode

Country to get resolvable error codes for.

required

Returns:

Type Description
dict[AdminReportingCategory, int]

Mapping of reporting category to blocked movement count.

dict[AdminReportingCategory, int]

Categories with zero matches are omitted.

Source code in components/employment/public/business_logic/queries/admin_blocked_movements.py
def get_resolvable_errors_count_by_category(
    company_ids: list[str],
    country_code: CountryCode,
) -> dict[AdminReportingCategory, int]:
    """Return count of resolvable blocked movements grouped by reporting category.

    Lightweight SQL COUNT alternative to get_admin_resolvable_blocked_movements_stats
    for cases where only category totals are needed (e.g., digest emails).

    Args:
        company_ids: Company IDs to scope the query to.
        country_code: Country to get resolvable error codes for.

    Returns:
        Mapping of reporting category to blocked movement count.
        Categories with zero matches are omitted.
    """
    from sqlalchemy import func

    resolver_classes = get_admin_error_resolver_classes(country_code)
    if not resolver_classes:
        return {}

    error_codes = set(resolver_classes.keys())
    code_to_category = {code: cls().category for code, cls in resolver_classes.items()}

    core_query, upstream_query = build_admin_retrievable_blocked_movements_queries(
        error_codes=error_codes,
        company_ids=set(company_ids),
        status=BlockedMovementStatus.awaiting_user_response_self_healing,
    )

    counts: dict[AdminReportingCategory, int] = {}
    for query, model_cls in (
        (core_query, CoreBlockedMovementModel),
        (upstream_query, UpstreamBlockedMovementModel),
    ):
        for error_code, count in (
            query.with_entities(model_cls.error_code, func.count())
            .group_by(model_cls.error_code)
            .all()
        ):
            category = code_to_category.get(error_code)
            if category is not None:
                counts[category] = counts.get(category, 0) + count

    return counts

blocked_movement

get_blocked_movement_ids_by_upstream_data_path_value
get_blocked_movement_ids_by_upstream_data_path_value(
    path, value
)

Get the upstream blocked movement ids for which upstream_data matches a given value at the specified path.

Parameters:

Name Type Description Default
path str

Dot-separated path to the nested key (e.g., "invite_email" or "record.email")

required
value str

The value to match

required
Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_blocked_movement_ids_by_upstream_data_path_value(
    path: str, value: str
) -> list[UUID]:
    """
    Get the upstream blocked movement ids for which upstream_data matches a given value at the specified path.

    Args:
        path: Dot-separated path to the nested key (e.g., "invite_email" or "record.email")
        value: The value to match
    """
    path_parts = path.split(".")

    jsonb_field: Any = UpstreamBlockedMovementModel.upstream_data
    for part in path_parts:
        jsonb_field = jsonb_field[part]

    return [
        blocked_movement_id
        for (blocked_movement_id,) in UpstreamBlockedMovementModelBroker.query()
        .with_entities(UpstreamBlockedMovementModel.id)
        .where(jsonb_field.astext == value)
        .all()
    ]
get_core_blocked_movement_ids_by_context_path_value
get_core_blocked_movement_ids_by_context_path_value(
    path, value
)

Get the core blocked movement ids for which context matches a given value at the specified path.

Parameters:

Name Type Description Default
path str

Dot-separated path to the nested key (e.g., "employment_declaration.extended_informations.0.values.invite_email")

required
value str

The value to match

required
Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_core_blocked_movement_ids_by_context_path_value(
    path: str, value: str
) -> list[UUID]:
    """
    Get the core blocked movement ids for which context matches a given value at the specified path.

    Args:
        path: Dot-separated path to the nested key
              (e.g., "employment_declaration.extended_informations.0.values.invite_email")
        value: The value to match
    """
    path_parts = path.split(".")

    jsonb_field: Any = CoreBlockedMovementModel.context
    for part in path_parts:
        jsonb_field = jsonb_field[int(part) if part.isdigit() else part]

    return [
        blocked_movement_id
        for (blocked_movement_id,) in BlockedMovementModelBroker.query()
        .with_entities(CoreBlockedMovementModel.id)
        .where(jsonb_field.astext == value)
        .all()
    ]
get_core_blocked_movement_or_none
get_core_blocked_movement_or_none(core_blocked_movement_id)

Get a core blocked movement by its ID.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_core_blocked_movement_or_none(
    core_blocked_movement_id: UUID,
) -> CoreBlockedMovement | None:
    """
    Get a core blocked movement by its ID.
    """
    core_blocked_movement_model = get_resource_or_none(
        CoreBlockedMovementModel, core_blocked_movement_id
    )

    return (
        CoreBlockedMovement.from_core_blocked_movement_model(
            core_blocked_movement_model
        )
        if core_blocked_movement_model is not None
        else None
    )
get_pending_blocked_movements_for_company_id
get_pending_blocked_movements_for_company_id(
    company_id, from_sources
)

Gets all pending blocked movements for the provided company ID. The intended usage for this function is to retrieve blocked movements for company admins (both displaying them and acting upon them).

This function specifically returns only 'pending' blocked movements (i.e. with the 'pending' status), but not all active blocked movements. This is intentional: all other statuses indicate that the blocked movement is currently being handled, either manually (e.g. 'awaiting_user_response' status) or by some automated process (e.g. 'pending_self_healing'), as it is assumed that we do not want to provide actionability to admins on these blocked movements.

Note: upstream blocked movements must have a proper company_id value set in order to be returned by this function.

The blocked movements are returned in an arbitrary order.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_pending_blocked_movements_for_company_id(
    company_id: str, from_sources: set[SourceType]
) -> list[CoreBlockedMovement | UpstreamBlockedMovement]:
    """
    Gets all pending blocked movements for the provided company ID. The intended usage for this function is to retrieve
    blocked movements for company admins (both displaying them and acting upon them).

    This function specifically returns only 'pending' blocked movements (i.e. with the 'pending' status), but not all
    active blocked movements. This is intentional: all other statuses indicate that the blocked movement is currently
    being handled, either manually (e.g. 'awaiting_user_response' status) or by some automated process (e.g.
    'pending_self_healing'), as it is assumed that we do not want to provide actionability to admins on these blocked
    movements.

    Note: upstream blocked movements must have a proper `company_id` value set in order to be returned by this function.

    The blocked movements are returned in an arbitrary order.
    """
    upstream_blocked_movement = [
        UpstreamBlockedMovement.from_upstream_blocked_movement_model(
            upstream_blocked_movement_model
        )
        for upstream_blocked_movement_model in UpstreamBlockedMovementModelBroker.query().where(
            UpstreamBlockedMovementModel.company_id == company_id,
            UpstreamBlockedMovementModel.source_type.in_(from_sources),
            UpstreamBlockedMovementModel._status == BlockedMovementStatus.pending,  # noqa: ALN027 # OK, see comment on _status
        )
    ]

    core_blocked_movement = [
        CoreBlockedMovement.from_core_blocked_movement_model(
            core_blocked_movement_model
        )
        for core_blocked_movement_model in BlockedMovementModelBroker.query().where(
            CoreBlockedMovementModel.company_id == company_id,
            CoreBlockedMovementModel.source_type.in_(from_sources),
            CoreBlockedMovementModel._status == BlockedMovementStatus.pending,  # noqa: ALN027 # OK, see comment on _status
        )
    ]

    return core_blocked_movement + upstream_blocked_movement
get_upstream_blocked_movement_or_none
get_upstream_blocked_movement_or_none(
    upstream_blocked_movement_id,
)

Get an upstream blocked movement by its ID.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_upstream_blocked_movement_or_none(
    upstream_blocked_movement_id: UUID,
) -> UpstreamBlockedMovement | None:
    """
    Get an upstream blocked movement by its ID.
    """
    upstream_blocked_movement_model = get_resource_or_none(
        UpstreamBlockedMovementModel, upstream_blocked_movement_id
    )

    return (
        UpstreamBlockedMovement.from_upstream_blocked_movement_model(
            upstream_blocked_movement_model
        )
        if upstream_blocked_movement_model is not None
        else None
    )
get_upstream_or_core_blocked_movement_or_none
get_upstream_or_core_blocked_movement_or_none(
    blocked_movement_id,
)

Get an upstream or core blocked movement by its ID.

Source code in components/employment/public/business_logic/queries/blocked_movement.py
def get_upstream_or_core_blocked_movement_or_none(
    blocked_movement_id: UUID,
) -> UpstreamBlockedMovement | CoreBlockedMovement | None:
    """
    Get an upstream or core blocked movement by its ID.
    """
    # Upstreams have prio but there's no rationale behind it - UUID collisions are so rare that this doesn't matter.
    upstream_blocked_movement = get_upstream_blocked_movement_or_none(
        blocked_movement_id
    )
    return (
        upstream_blocked_movement
        if upstream_blocked_movement is not None
        else get_core_blocked_movement_or_none(blocked_movement_id)
    )

core_employment_version

get_all_latest_core_employment_versions_on
get_all_latest_core_employment_versions_on(
    user_ids, company_id, on_date
)

Like get_latest_core_employment_version_on, but supports multiple users.

Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_all_latest_core_employment_versions_on(
    user_ids: set[UserId], company_id: CompanyId | None, on_date: date
) -> list[CoreEmploymentVersion]:
    """
    Like get_latest_core_employment_version_on, but supports multiple users.
    """
    return get_all_latest_core_employment_versions_overlapping(
        user_ids, company_id, on_date, on_date
    )
get_all_latest_core_employment_versions_overlapping
get_all_latest_core_employment_versions_overlapping(
    user_ids, company_id, start_date, end_date
)

Like get_latest_core_employment_versions_overlapping, but for multiple users.

Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_all_latest_core_employment_versions_overlapping(
    user_ids: set[UserId],
    company_id: CompanyId | None,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    """
    Like get_latest_core_employment_versions_overlapping, but for multiple users.
    """
    core_employment_versions_query = (
        CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id.in_(user_ids),
            CoreEmploymentVersionModel.is_ever_active_between(start_date, end_date),
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    )

    if company_id:
        core_employment_versions_query = core_employment_versions_query.filter(
            CoreEmploymentVersionModel.company_id == company_id,
        )

    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version
        )
        for core_employment_version in core_employment_versions_query
    ]
get_core_employments_for_company
get_core_employments_for_company(
    company_id,
    exclude_cancelled=False,
    exclude_terminated=False,
)

Return the core employments for all employees of a company, using their latest version.

Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_company(
    company_id: CompanyId,
    exclude_cancelled: bool = False,
    exclude_terminated: bool = False,
) -> Iterable[CoreEmploymentVersion]:
    """
    Return the core employments for all employees of a company, using their latest version.
    """
    query = CoreEmploymentVersionModelBroker.query_latest_versions().filter(
        CoreEmploymentVersionModel.company_id == company_id
    )

    if exclude_cancelled:
        query = query.filter(CoreEmploymentVersionModel.is_cancelled.is_(False))

    if exclude_terminated:
        query = query.filter(CoreEmploymentVersionModel.is_ended.is_(False))

    query = query.order_by(CoreEmploymentVersionModel.start_date)

    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in query
    ]
get_core_employments_for_external_employee_id_in_company_overlapping
get_core_employments_for_external_employee_id_in_company_overlapping(
    external_employee_id, company_id, start_date, end_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_external_employee_id_in_company_overlapping(  # noqa: D103
    external_employee_id: str,
    company_id: CompanyId,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.external_employee_id == external_employee_id,
            CoreEmploymentVersionModel.company_id == company_id,
            CoreEmploymentVersionModel.is_ever_active_between(start_date, end_date),
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    ]
get_core_employments_for_user
get_core_employments_for_user(user_id)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_user(user_id: UserId) -> list[CoreEmploymentVersion]:  # noqa: D103
    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id == user_id,
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    ]
get_core_employments_for_user_company
get_core_employments_for_user_company(user_id, company_id)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_core_employments_for_user_company(  # noqa: D103
    user_id: UserId, company_id: CompanyId
) -> list[CoreEmploymentVersion]:
    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version_model
        )
        for core_employment_version_model in CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id == user_id,
            CoreEmploymentVersionModel.company_id == company_id,
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    ]
get_latest_cancelled_core_employment_versions_overlapping
get_latest_cancelled_core_employment_versions_overlapping(
    user_id, company_id, start_date, end_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_cancelled_core_employment_versions_overlapping(  # noqa: D103
    user_id: UserId,
    company_id: CompanyId | None,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    core_employment_versions_query = (
        CoreEmploymentVersionModelBroker.query_latest_versions()
        .filter(
            CoreEmploymentVersionModel.user_id == user_id,
            CoreEmploymentVersionModel.is_cancelled.is_(True),
            or_(
                CoreEmploymentVersionModel.end_date.is_(None),
                CoreEmploymentVersionModel.end_date >= start_date,
            ),
        )
        .order_by(CoreEmploymentVersionModel.start_date)
    )

    if end_date:
        core_employment_versions_query = core_employment_versions_query.filter(
            CoreEmploymentVersionModel.start_date <= end_date,
        )

    if company_id:
        core_employment_versions_query = core_employment_versions_query.filter(
            CoreEmploymentVersionModel.company_id == company_id,
        )

    return [
        CoreEmploymentVersion.from_core_employment_version_model(
            core_employment_version
        )
        for core_employment_version in core_employment_versions_query
    ]
get_latest_core_employment_version_on
get_latest_core_employment_version_on(
    user_id, company_id, on_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_core_employment_version_on(  # noqa: D103
    user_id: UserId, company_id: CompanyId | None, on_date: date
) -> CoreEmploymentVersion | None:
    return one_or_none(
        get_latest_core_employment_versions_overlapping(
            user_id, company_id, on_date, on_date
        )
    )
get_latest_core_employment_versions_overlapping
get_latest_core_employment_versions_overlapping(
    user_id, company_id, start_date, end_date
)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_core_employment_versions_overlapping(  # noqa: D103
    user_id: UserId,
    company_id: CompanyId | None,
    start_date: date,
    end_date: date | None,
) -> list[CoreEmploymentVersion]:
    return get_all_latest_core_employment_versions_overlapping(
        {user_id},
        company_id,
        start_date,
        end_date,
    )
get_latest_version_for_employment_or_raise
get_latest_version_for_employment_or_raise(employment_id)
Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_latest_version_for_employment_or_raise(  # noqa: D103
    employment_id: UUID,
) -> CoreEmploymentVersion:
    latest_version_for_employment = (
        CoreEmploymentVersionModelBroker.get_latest_version_for_employment(
            employment_id
        )
    )
    if latest_version_for_employment is None:
        raise EmploymentNotFound(
            f"No employment found for employment_id {employment_id}"
        )

    return latest_version_for_employment
get_or_raise_latest_core_employment_version_on
get_or_raise_latest_core_employment_version_on(
    user_id, company_id, on_date
)

Get the latest core employment version for a user and company on a given date.

Raises BaseErrorCode.missing_resource if no employment is found.

Source code in components/employment/public/business_logic/queries/core_employment_version.py
def get_or_raise_latest_core_employment_version_on(
    user_id: UUID,
    company_id: UUID,
    on_date: date,
) -> CoreEmploymentVersion:
    """
    Get the latest core employment version for a user and company on a given date.

    Raises BaseErrorCode.missing_resource if no employment is found.
    """
    from shared.errors.error_code import BaseErrorCode

    employment = get_latest_core_employment_version_on(
        user_id=str(user_id),
        company_id=str(company_id),
        on_date=on_date,
    )

    if employment is None:
        raise BaseErrorCode.missing_resource(
            message=f"Employment not found for user_id=[{user_id}] and company_id=[{company_id}] on date {on_date}"
        )

    return employment

country_helpers

fr
get_all_employment_source_data_for_bulk_action_report
get_all_employment_source_data_for_bulk_action_report(
    min_created_at,
)

FR-specific DB helper: retrieves all Employment Source Data objects that we need to send bulk action reports.

Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_all_employment_source_data_for_bulk_action_report(
    min_created_at: datetime,
) -> list[EmploymentSourceData]:
    """
    FR-specific DB helper: retrieves all Employment Source Data objects that we need to send bulk action reports.
    """
    query = (
        current_session.query(EmploymentSourceDataModel)  # noqa: ALN085
        .options(
            joinedload(EmploymentSourceDataModel.core_employment_versions),
            joinedload(EmploymentSourceDataModel.extended_employment_updates),
            joinedload(EmploymentSourceDataModel.core_blocked_movements),
            joinedload(EmploymentSourceDataModel.upstream_blocked_movements),
        )
        .filter(
            EmploymentSourceDataModel.source_type.in_(
                [
                    SourceType.fr_bulk_invite,
                    SourceType.fr_bulk_removal,
                    SourceType.fr_bulk_cancel,
                ]
            ),
            EmploymentSourceDataModel.created_at > min_created_at,
            EmploymentSourceDataModel.source_metadata.contains(
                {"reported_to_admin_at": None}
            ),
            EmploymentSourceDataModel.source_metadata.has_key("bulk_invite_file_id"),
            not_(
                EmploymentSourceDataModel.source_metadata.contains(
                    {"bulk_invite_file_id": None}
                )
            ),
        )
        .outerjoin(EmploymentSourceDataModel.upstream_blocked_movements)
        .order_by(EmploymentSourceDataModel.created_at)
    )
    return [
        EmploymentSourceData.from_employment_source_data_model(
            employment_source_data_model
        )
        for employment_source_data_model in query
    ]
get_blocked_movements_for_dsn_removal_suggestions
get_blocked_movements_for_dsn_removal_suggestions(
    company_ids=None,
    status=BlockedMovementStatus.pending_self_healing,
    created_before=None,
    keep_duplicates_for_same_company_user=False,
)

Returns blocked movements for DSN removal suggestions.

By default, returns one movement per (user_id, company_id) pair, the one with the latest end date. Set keep_duplicates_for_same_company_user=True to return all matching movements, which is useful for bulk cancellation where every stale movement must be processed.

Parameters:

Name Type Description Default
company_ids Optional[set[str]]

Filter by company IDs. If None, returns movements for all companies.

None
status BlockedMovementStatus

The status to filter by. Use pending_self_healing to get new suggestions to send, and awaiting_user_response_self_healing to get pending suggestions that have already been sent.

pending_self_healing
created_before datetime | None

If set, return only movements created before this datetime.

None
keep_duplicates_for_same_company_user bool

If False (default), return one movement per (user_id, company_id) pair, ordered by end date descending. If True, return all matching movements.

False
Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_blocked_movements_for_dsn_removal_suggestions(
    company_ids: Optional[set[str]] = None,
    status: BlockedMovementStatus = BlockedMovementStatus.pending_self_healing,
    created_before: datetime | None = None,
    keep_duplicates_for_same_company_user: bool = False,
) -> list[CoreBlockedMovement]:
    """
    Returns blocked movements for DSN removal suggestions.

    By default, returns one movement per (user_id, company_id) pair, the one with the
    latest end date. Set `keep_duplicates_for_same_company_user=True` to return all
    matching movements, which is useful for bulk cancellation where every stale movement
    must be processed.

    Args:
        company_ids: Filter by company IDs. If None, returns movements for all companies.
        status: The status to filter by. Use `pending_self_healing` to get new suggestions
            to send, and `awaiting_user_response_self_healing` to get pending suggestions
            that have already been sent.
        created_before: If set, return only movements created before this datetime.
        keep_duplicates_for_same_company_user: If False (default), return one movement per
            (user_id, company_id) pair, ordered by end date descending. If True, return all
            matching movements.
    """
    query = current_session.query(CoreBlockedMovementModel).filter(  # noqa: ALN085
        CoreBlockedMovementModel.source_type == SourceType.fr_dsn_removal_suggestion,
        CoreBlockedMovementModel.status  # hybrid property
        == status,
        CoreBlockedMovementModel.context["employment_declaration"]["end_date"].isnot(
            None
        ),
    )

    if keep_duplicates_for_same_company_user:
        query = query.order_by(CoreBlockedMovementModel.created_at.desc())
    else:
        query = query.distinct(
            CoreBlockedMovementModel.user_id, CoreBlockedMovementModel.company_id
        ).order_by(
            CoreBlockedMovementModel.user_id,
            CoreBlockedMovementModel.company_id,
            cast(
                CoreBlockedMovementModel.context["employment_declaration"][
                    "end_date"
                ].astext,
                Date,
            ).desc(),
            CoreBlockedMovementModel.created_at.desc(),
        )

    if company_ids:
        query = query.filter(CoreBlockedMovementModel.company_id.in_(company_ids))

    if created_before:
        query = query.filter(CoreBlockedMovementModel.created_at < created_before)

    return [CoreBlockedMovement.from_core_blocked_movement_model(bm) for bm in query]
get_dsn_removal_suggestion_blocked_movements_to_cancel
get_dsn_removal_suggestion_blocked_movements_to_cancel(
    user_id, company_id
)

When getting the blocked movements for DSN removal suggestions, we only want to keep the latest one. (This is done in the get_blocked_movements_for_dsn_removal_suggestions function above.) However, we need to cancel ALL the suggestion blocked movements, because we don't want to leave them in a pending_self_healing state.

Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_dsn_removal_suggestion_blocked_movements_to_cancel(
    user_id: str, company_id: str
) -> list[CoreBlockedMovement]:
    """
    When getting the blocked movements for DSN removal suggestions, we only want to keep the latest one.
    (This is done in the get_blocked_movements_for_dsn_removal_suggestions function above.)
    However, we need to cancel ALL the suggestion blocked movements,
    because we don't want to leave them in a pending_self_healing state.
    """
    return [
        CoreBlockedMovement.from_core_blocked_movement_model(bm)
        for bm in current_session.query(CoreBlockedMovementModel).filter(  # noqa: ALN085
            CoreBlockedMovementModel.user_id == user_id,
            CoreBlockedMovementModel.company_id == company_id,
            CoreBlockedMovementModel.source_type
            == SourceType.fr_dsn_removal_suggestion,
            CoreBlockedMovementModel.status  # hybrid property
            == BlockedMovementStatus.pending_self_healing,
        )
    ]
get_employment_source_data_extra_fields_for_company
get_employment_source_data_extra_fields_for_company(
    company_id,
    keys,
    on_date,
    raw_data_additional_filters=None,
)

Get values for the specified keys from EmploymentSourceData.raw_data["extra"] for all active employees in company.

This has been built for Finance integration "Flux Retour"

Parameters:

Name Type Description Default
company_id str

Company ID to filter by

required
keys list[str]

List of keys to extract from raw_data["extra"]

required
on_date date

Date to check for active employments

required
raw_data_additional_filters dict[str, Any] | None

Additional filters to apply on raw_data

None

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping user_id to a dictionary of key-value pairs from raw_data["extra"].

dict[str, dict[str, Any]]

Only keys that exist in the data are included.

dict[str, dict[str, Any]]

If the keys are present in several EmploymentSourceData entries for the same user,

dict[str, dict[str, Any]]

the returned values are the initial ones (first received).

Source code in components/employment/public/business_logic/queries/country_helpers/fr.py
def get_employment_source_data_extra_fields_for_company(
    company_id: str,
    keys: list[str],
    on_date: date,
    raw_data_additional_filters: dict[str, Any] | None = None,
) -> dict[str, dict[str, Any]]:
    """
    Get values for the specified keys from EmploymentSourceData.raw_data["extra"]
    for all active employees in company.

    This has been built for Finance integration "Flux Retour"

    Args:
        company_id: Company ID to filter by
        keys: List of keys to extract from raw_data["extra"]
        on_date: Date to check for active employments
        raw_data_additional_filters: Additional filters to apply on raw_data

    Returns:
        Dictionary mapping user_id to a dictionary of key-value pairs from raw_data["extra"].
        Only keys that exist in the data are included.
        If the keys are present in several EmploymentSourceData entries for the same user,
        the returned values are the initial ones (first received).
    """
    from components.employment.internal.models.core_employment_version import (
        CoreEmploymentVersionModel,
    )
    from components.employment.internal.models.latest_core_employment_version import (
        LatestCoreEmploymentVersion,
    )

    # Create CTE for non-cancelled active employment IDs
    # Note: Start date condition removed as all test employments start in the future
    non_cancelled_active_employments_cte = (
        select(CoreEmploymentVersionModel.employment_id)
        .join(
            LatestCoreEmploymentVersion,
            LatestCoreEmploymentVersion.core_employment_version_id
            == CoreEmploymentVersionModel.id,
        )
        .filter(
            CoreEmploymentVersionModel.company_id == company_id,
            CoreEmploymentVersionModel.is_cancelled.is_(False),
            (CoreEmploymentVersionModel.end_date.is_(None))
            | (CoreEmploymentVersionModel.end_date >= on_date),
        )
    ).cte("non_cancelled_active_employments")

    raw_data_filter = EmploymentSourceDataModel.raw_data.has_key("extra")
    if raw_data_additional_filters:
        raw_data_filter = and_(
            raw_data_filter,
            *[
                EmploymentSourceDataModel.raw_data[key].astext == value
                for key, value in raw_data_additional_filters.items()
            ],
        )

    # Get EmploymentSourceData with associated user_id using CTE join
    employment_data_with_users = current_session.execute(
        select(
            CoreEmploymentVersionModel.user_id,
            EmploymentSourceDataModel.raw_data,
            EmploymentSourceDataModel.created_at,
        )
        .select_from(EmploymentSourceDataModel)
        .join(
            CoreEmploymentVersionModel,
            CoreEmploymentVersionModel.employment_source_data_id
            == EmploymentSourceDataModel.id,
        )
        .join(
            non_cancelled_active_employments_cte,
            non_cancelled_active_employments_cte.c.employment_id
            == CoreEmploymentVersionModel.employment_id,
        )
        .filter(raw_data_filter)
        .order_by(
            CoreEmploymentVersionModel.user_id,
            EmploymentSourceDataModel.created_at.desc(),
        )
    ).all()

    results = {}

    for user_id, raw_data, _ in employment_data_with_users:
        # Skip if we already have data for this user (we ordered by created_at desc, so first is the most recent one)
        if user_id in results:
            continue

        extra_data = raw_data["extra"]
        user_result = {}

        for key in keys:
            if key in extra_data:
                user_result[key] = extra_data[key]

        # Only include users that have at least one of the requested keys
        if user_result:
            results[user_id] = user_result

    return results

employment_source_data

get_employment_metadata_value
get_employment_metadata_value(
    employment_source_data_id, key
)

Get the value of the provided metadata key for the employment source data with the provided ID.

Source code in components/employment/public/business_logic/queries/employment_source_data.py
def get_employment_metadata_value(employment_source_data_id: UUID, key: str) -> Any:
    """
    Get the value of the provided metadata key for the employment source data with the provided ID.
    """
    employment_source_data = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    return employment_source_data.source_metadata.get(key, None)
get_employment_source_data_from_id
get_employment_source_data_from_id(
    employment_source_data_id,
)

Get an EmploymentSourceData object from its ID.

Source code in components/employment/public/business_logic/queries/employment_source_data.py
def get_employment_source_data_from_id(
    employment_source_data_id: UUID,
) -> EmploymentSourceData:
    """
    Get an `EmploymentSourceData` object from its ID.
    """
    employment_source_data_model = get_or_raise_missing_resource(
        EmploymentSourceDataModel, employment_source_data_id
    )
    return EmploymentSourceData.from_employment_source_data_model(
        employment_source_data_model
    )
get_employment_source_data_from_workflow_id
get_employment_source_data_from_workflow_id(workflow_id)

Get all EmploymentSourceData objects from a workflow.

Source code in components/employment/public/business_logic/queries/employment_source_data.py
def get_employment_source_data_from_workflow_id(
    workflow_id: str,
) -> Iterator[EmploymentSourceData]:
    """
    Get all `EmploymentSourceData` objects from a workflow.
    """
    models = current_session.scalars(
        select(EmploymentSourceDataModel).filter(
            EmploymentSourceDataModel.source_metadata["workflow_id"].astext
            == workflow_id
        )
    )

    return (
        EmploymentSourceData.from_employment_source_data_model(model)
        for model in models
    )

extended_employment_update

batch_get_employment_value_timeline
batch_get_employment_value_timeline(
    employment_ids, key, overlap_period=None
)

Return the timeline of values for a given key, for multiple employments in a single query.

Source code in components/employment/public/business_logic/queries/extended_employment_update.py
def batch_get_employment_value_timeline(
    employment_ids: set[UUID],
    key: str,
    overlap_period: ValidityPeriod | None = None,
) -> dict[UUID, ValueTimeline]:
    """Return the timeline of values for a given key, for multiple employments in a single query."""
    updates_per_employment = batch_get_extended_employment_updates_for_key(
        employment_ids, key, overlap_period
    )
    return {
        employment_id: get_value_timeline(
            employment_id, updates, key, overlap_period=overlap_period
        )
        for employment_id, updates in updates_per_employment.items()
    }
get_employment_value_timeline
get_employment_value_timeline(
    employment_id, key, overlap_period=None
)

Return the full timeline of values for a given key on an employment.

Source code in components/employment/public/business_logic/queries/extended_employment_update.py
def get_employment_value_timeline(
    employment_id: UUID,
    key: str,
    overlap_period: ValidityPeriod | None = None,
) -> ValueTimeline:
    """Return the full timeline of values for a given key on an employment."""
    updates = get_extended_employment_updates_for_key(
        employment_id, key, overlap_period
    )
    return get_value_timeline(
        employment_id, updates, key, overlap_period=overlap_period
    )
get_employment_values_batch_on
get_employment_values_batch_on(on_date_per_employment_id)

Return the employment values for a batch of employment IDs, on a specific date per employment ID.

Return a dictionary mapping employment IDs to their corresponding extended values dictionary.

Source code in components/employment/public/business_logic/queries/extended_employment_update.py
def get_employment_values_batch_on(
    on_date_per_employment_id: dict[UUID, date],
) -> dict[UUID, _TValues]:
    """
    Return the employment values for a batch of employment IDs, on a specific date per employment ID.

    Return a dictionary mapping employment IDs to their corresponding extended values dictionary.
    """
    values_per_employment_id: dict[UUID, Any] = {
        employment_id: dict() for employment_id in on_date_per_employment_id.keys()
    }

    for (
        employment_id,
        extended_employment_values,
    ) in batch_get_extended_employments_values_on(
        on_date_per_employment_id=on_date_per_employment_id
    ).items():
        for extended_employment_value in extended_employment_values:
            values_per_employment_id[employment_id][extended_employment_value.key] = (
                extended_employment_value.value
            )

    return cast("dict[UUID, _TValues]", values_per_employment_id)
get_employment_values_on
get_employment_values_on(employment_id, on_date)
Source code in components/employment/public/business_logic/queries/extended_employment_update.py
def get_employment_values_on(  # noqa: D103
    employment_id: UUID,
    on_date: date,
) -> _TValues:  # type: ignore[type-var,misc]  # Technically this is not a required generic type, but it's here to force people to type their calls
    values = {}
    for extended_employment_value in get_extended_employment_values_on(
        employment_id, on_date
    ):
        values[extended_employment_value.key] = extended_employment_value.value
    return cast("_TValues", values)

source_information

get_metadata_from_source_information_like
get_metadata_from_source_information_like(
    source_information_like,
)

Retrieve the metadata dictionary from a SourceInformationLike object.

Must not be used for updating the metadata, should only be used for reading the metadata! If you want to update the metadata, use update_employment_source_data_metadata_with or update_source_information_metadata_with instead.

Source code in components/employment/public/business_logic/queries/source_information.py
def get_metadata_from_source_information_like(
    source_information_like: SourceInformationLike,
) -> dict[str, Any]:
    """
    Retrieve the metadata dictionary from a `SourceInformationLike` object.

    Must not be used for updating the metadata, should only be used for reading the metadata! If you want to update
    the metadata, use `update_employment_source_data_metadata_with` or `update_source_information_metadata_with`
    instead.
    """
    if isinstance(source_information_like, UUID):
        return get_or_raise_missing_resource(
            EmploymentSourceDataModel, source_information_like
        ).source_metadata
    elif isinstance(source_information_like, SourceInformation):
        if (model := source_information_like._employment_source_data) is not None:  # noqa: ALN027 # - no way to say "internal to component" in Python
            return model.source_metadata
        return source_information_like.metadata
    else:
        assert_never(source_information_like)
get_raw_data_from_source_information_like
get_raw_data_from_source_information_like(
    source_information_like,
)

Retrieve the raw_data dictionary from a SourceInformationLike object.

Can only be used for reading the raw_data - this value is immutable by design.

Source code in components/employment/public/business_logic/queries/source_information.py
def get_raw_data_from_source_information_like(
    source_information_like: SourceInformationLike,
) -> dict[str, Any]:
    """
    Retrieve the raw_data dictionary from a `SourceInformationLike` object.

    Can only be used for reading the raw_data - this value is immutable by design.
    """
    if isinstance(source_information_like, UUID):
        return get_or_raise_missing_resource(
            EmploymentSourceDataModel, source_information_like
        ).raw_data.copy()
    elif isinstance(source_information_like, SourceInformation):
        if (model := source_information_like._employment_source_data) is not None:  # noqa: ALN027 # - no way to say "internal to component" in Python
            return model.raw_data.copy()
        return source_information_like.raw_data.copy()
    else:
        assert_never(source_information_like)

source_rules

get_source_rules
get_source_rules(source_type)

Get the source rules related to a SourceType

Source code in components/employment/public/business_logic/queries/source_rules.py
def get_source_rules(source_type: SourceType) -> SourceRules:
    """
    Get the source rules related to a `SourceType`
    """
    return RULES_PER_SOURCE_TYPE[source_type]

stale_invitations

get_stale_invitations
get_stale_invitations(company_ids, created_before=None)

Retrieves stale invitations for the given company IDs, optionally filtering by creation date.

Source code in components/employment/public/business_logic/queries/stale_invitations.py
def get_stale_invitations(
    company_ids: list[str] | None, created_before: date | None = None
) -> list[StaleInvitationInformation]:
    """
    Retrieves stale invitations for the given company IDs, optionally filtering by creation date.
    """
    stale_invitations_query = (
        BlockedMovementModelBroker.get_blocked_movements_with_statuses(
            user_ids=[],
            company_ids=company_ids,
            statuses=[
                BlockedMovementStatus.pending_self_healing,
                BlockedMovementStatus.awaiting_user_response_self_healing,
            ],
        ).filter(CoreBlockedMovementModel.error_code == StaleInvitationError.error_code)
    )
    if created_before:
        stale_invitations_query = stale_invitations_query.filter(
            CoreBlockedMovementModel.created_at < created_before
        )

    stale_invitations = []
    for blocked_movement in stale_invitations_query:
        country_code = get_country_code_from_blocked_movement(blocked_movement)
        declaration: EmploymentDeclaration[Any] = EmploymentDeclaration.from_dict(
            blocked_movement.context["employment_declaration"]
        )
        effective_start_date = parse_date(
            blocked_movement.context["effective_start_date"]
        )
        infinitely_valid_information = one(
            [
                update
                for update in declaration.extended_informations
                if update.validity_period is None
            ]
        )
        country_gateway = get_country_gateway(country_code)
        stale_invitations.append(
            StaleInvitationInformation(
                blocked_movement_id=blocked_movement.id,
                company_id=blocked_movement.company_id,
                company_display_name=mandatory(
                    country_gateway.get_company_information(blocked_movement.company_id)
                ).display_name,
                employee_full_name=country_gateway.get_user_full_name(
                    blocked_movement.user_id
                ),
                employee_email=country_gateway.get_employee_email_from_extended_values(
                    infinitely_valid_information.values
                ),
                employee_identifier=country_gateway.get_employee_identifier_for_country(
                    infinitely_valid_information.values
                ),
                start_date=effective_start_date,
                created_at=to_datetime(blocked_movement.created_at),
                source_type=parse_base_source_type(blocked_movement.source_type),
                admin_was_notified=blocked_movement.status
                == BlockedMovementStatus.awaiting_user_response_self_healing,
                country_code=country_code,
            )
        )

    return stale_invitations

weekly_employment_stats

Query functions for weekly employment statistics.

WeeklyEmploymentStats dataclass
WeeklyEmploymentStats(
    new_employees_added,
    employees_transferred,
    employees_removed,
    source_types=frozenset(),
)

Weekly employment change counts per change type.

employees_removed instance-attribute
employees_removed
employees_transferred instance-attribute
employees_transferred
has_changes property
has_changes

Whether any employment changes occurred.

new_employees_added instance-attribute
new_employees_added
source_types class-attribute instance-attribute
source_types = field(default_factory=frozenset)
total_changes property
total_changes

Sum of all change counts.

get_company_ids_with_weekly_changes
get_company_ids_with_weekly_changes(
    start_datetime, end_datetime
)

Get company IDs that have at least one employment change in the period.

Parameters:

Name Type Description Default
start_datetime datetime

Start of reporting period (inclusive).

required
end_datetime datetime

End of reporting period (exclusive).

required

Returns:

Type Description
set[str]

Set of company_id strings with changes.

Source code in components/employment/public/business_logic/queries/weekly_employment_stats.py
def get_company_ids_with_weekly_changes(
    start_datetime: datetime,
    end_datetime: datetime,
) -> set[str]:
    """
    Get company IDs that have at least one employment change in the period.

    Args:
        start_datetime: Start of reporting period (inclusive).
        end_datetime: End of reporting period (exclusive).

    Returns:
        Set of company_id strings with changes.
    """
    stmt = (
        select(CoreEmploymentVersionModel.company_id)
        .where(
            CoreEmploymentVersionModel.created_at >= start_datetime,
            CoreEmploymentVersionModel.created_at < end_datetime,
            CoreEmploymentVersionModel.is_cancelled.is_(False),
            CoreEmploymentVersionModel.source_type.in_(ADMIN_RETRIEVABLE_SOURCE_TYPE),
        )
        .distinct()
    )
    return set(current_session.scalars(stmt))
get_weekly_employment_stats_for_companies
get_weekly_employment_stats_for_companies(
    company_ids, start_datetime, end_datetime
)

Get employment statistics for companies within a date range.

Queries CoreEmploymentVersionModel for changes created within the period, grouped by EmploymentChangeType.

Parameters:

Name Type Description Default
company_ids list[str]

Company IDs to filter by.

required
start_datetime datetime

Start of reporting period (inclusive).

required
end_datetime datetime

End of reporting period (exclusive).

required

Returns:

Type Description
WeeklyEmploymentStats

WeeklyEmploymentStats with counts per change type and distinct source types.

Source code in components/employment/public/business_logic/queries/weekly_employment_stats.py
def get_weekly_employment_stats_for_companies(
    company_ids: list[str],
    start_datetime: datetime,
    end_datetime: datetime,
) -> WeeklyEmploymentStats:
    """
    Get employment statistics for companies within a date range.

    Queries CoreEmploymentVersionModel for changes created within the period,
    grouped by EmploymentChangeType.

    Args:
        company_ids: Company IDs to filter by.
        start_datetime: Start of reporting period (inclusive).
        end_datetime: End of reporting period (exclusive).

    Returns:
        WeeklyEmploymentStats with counts per change type and distinct source types.
    """
    company_ids_set = set(company_ids)
    if not company_ids_set:
        return WeeklyEmploymentStats(
            new_employees_added=0,
            employees_transferred=0,
            employees_removed=0,
        )

    # Unnest change types so we can filter out terminations whose end_date
    # is after the reporting period.
    unnested = (
        select(
            func.unnest(CoreEmploymentVersionModel.employment_change_types).label(
                "change_type"
            ),
            CoreEmploymentVersionModel.end_date,
            CoreEmploymentVersionModel.source_type,
        )
        .where(
            CoreEmploymentVersionModel.company_id.in_(company_ids_set),
            CoreEmploymentVersionModel.created_at >= start_datetime,
            CoreEmploymentVersionModel.created_at < end_datetime,
            CoreEmploymentVersionModel.is_cancelled.is_(False),
            CoreEmploymentVersionModel.source_type.in_(ADMIN_RETRIEVABLE_SOURCE_TYPE),
        )
        .subquery()
    )

    period_filter = or_(
        unnested.c.change_type != EmploymentChangeType.termination.value,
        unnested.c.end_date <= end_datetime.date(),
    )

    counts_stmt = (
        select(unnested.c.change_type, func.count().label("cnt"))
        .where(period_filter)
        .group_by(unnested.c.change_type)
    )
    results = current_session.execute(counts_stmt).all()
    counts: dict[str, int] = {row.change_type: row.cnt for row in results}

    source_types_stmt = select(unnested.c.source_type).where(period_filter).distinct()

    return WeeklyEmploymentStats(
        new_employees_added=counts.get(EmploymentChangeType.invitation.value, 0),
        employees_transferred=counts.get(EmploymentChangeType.transfer.value, 0),
        employees_removed=counts.get(EmploymentChangeType.termination.value, 0),
        source_types=frozenset(current_session.scalars(source_types_stmt).all()),
    )

rules

blocked_movements

make_error_message_from_integrity_error
make_error_message_from_integrity_error(e)

Format an integrity error in a meaningful message to be used in blocked movements.

It doesn't contain the IDs of objects because the error message is used for deduplication of blocked movements.

Source code in components/employment/public/business_logic/rules/blocked_movements.py
def make_error_message_from_integrity_error(e: IntegrityError) -> str:
    """
    Format an integrity error in a meaningful message to be used in blocked movements.

    It doesn't contain the IDs of objects because the error message is used for deduplication of blocked movements.
    """
    return (
        f"Constraint {e.orig.diag.constraint_name} of table {e.orig.diag.table_name} "  # type: ignore[union-attr]
        f"in schema {e.orig.diag.schema_name} raised."  # type: ignore[union-attr]
    )

employment_input_error_info

EmploymentInputErrorCode

Bases: AlanBaseEnum

Generic and global error codes for Employment Component upstreams, especially for extractors

company_is_not_authorized class-attribute instance-attribute
company_is_not_authorized = 'company_is_not_authorized'
company_not_found class-attribute instance-attribute
company_not_found = 'company_not_found'
dsn_contract_without_company class-attribute instance-attribute
dsn_contract_without_company = (
    "dsn_contract_without_company"
)
dsn_invitation_without_email class-attribute instance-attribute
dsn_invitation_without_email = (
    "dsn_invitation_without_email"
)
duplicate_data class-attribute instance-attribute
duplicate_data = 'duplicate_data'
has_reimbursed_care_acts class-attribute instance-attribute
has_reimbursed_care_acts = 'has_reimbursed_care_acts'
invalid_birth_date class-attribute instance-attribute
invalid_birth_date = 'invalid_birth_date'
invalid_ccn_code class-attribute instance-attribute
invalid_ccn_code = 'invalid_ccn_code'
invalid_dni class-attribute instance-attribute
invalid_dni = 'invalid_dni'
invalid_email class-attribute instance-attribute
invalid_email = 'invalid_email'
invalid_end_date class-attribute instance-attribute
invalid_end_date = 'invalid_end_date'
invalid_external_employee_id class-attribute instance-attribute
invalid_external_employee_id = (
    "invalid_external_employee_id"
)
invalid_first_name class-attribute instance-attribute
invalid_first_name = 'invalid_first_name'
invalid_gender class-attribute instance-attribute
invalid_gender = 'invalid_gender'
invalid_hiring_date class-attribute instance-attribute
invalid_hiring_date = 'invalid_hiring_date'
invalid_identity_document_type class-attribute instance-attribute
invalid_identity_document_type = (
    "invalid_identity_document_type"
)
invalid_is_alsace_moselle class-attribute instance-attribute
invalid_is_alsace_moselle = 'invalid_is_alsace_moselle'
invalid_is_unpaid_leave class-attribute instance-attribute
invalid_is_unpaid_leave = 'invalid_is_unpaid_leave'
invalid_language class-attribute instance-attribute
invalid_language = 'invalid_language'
invalid_last_name class-attribute instance-attribute
invalid_last_name = 'invalid_last_name'
invalid_nie class-attribute instance-attribute
invalid_nie = 'invalid_nie'
invalid_ntt class-attribute instance-attribute
invalid_ntt = 'invalid_ntt'
invalid_passport_number class-attribute instance-attribute
invalid_passport_number = 'invalid_passport_number'
invalid_professional_category class-attribute instance-attribute
invalid_professional_category = (
    "invalid_professional_category"
)
invalid_ssn class-attribute instance-attribute
invalid_ssn = 'invalid_ssn'
invalid_start_date class-attribute instance-attribute
invalid_start_date = 'invalid_start_date'
invalid_tax_regime class-attribute instance-attribute
invalid_tax_regime = 'invalid_tax_regime'
invalid_termination_reason class-attribute instance-attribute
invalid_termination_reason = 'invalid_termination_reason'
invalid_unpaid_leave_end_date class-attribute instance-attribute
invalid_unpaid_leave_end_date = (
    "invalid_unpaid_leave_end_date"
)
mandatory_end_date class-attribute instance-attribute
mandatory_end_date = 'mandatory_end_date'
mandatory_termination_type class-attribute instance-attribute
mandatory_termination_type = 'mandatory_termination_type'
missing_hiring_date class-attribute instance-attribute
missing_hiring_date = 'missing_hiring_date'
missing_required_birth_date class-attribute instance-attribute
missing_required_birth_date = 'missing_required_birth_date'
missing_required_email class-attribute instance-attribute
missing_required_email = 'missing_required_email'
missing_required_external_employee_id class-attribute instance-attribute
missing_required_external_employee_id = (
    "missing_required_external_employee_id"
)
missing_required_gender class-attribute instance-attribute
missing_required_gender = 'missing_required_gender'
missing_required_identity_document_type class-attribute instance-attribute
missing_required_identity_document_type = (
    "missing_required_identity_document_type"
)
missing_required_nif class-attribute instance-attribute
missing_required_nif = 'missing_required_nif'
missing_required_ssn_ntt class-attribute instance-attribute
missing_required_ssn_ntt = 'missing_required_ssn_ntt'
missing_tax_regime class-attribute instance-attribute
missing_tax_regime = 'missing_tax_regime'
multiple_employments_found class-attribute instance-attribute
multiple_employments_found = 'multiple_employments_found'
no_active_employment_found class-attribute instance-attribute
no_active_employment_found = 'no_active_employment_found'
not_matching_external_employee_id class-attribute instance-attribute
not_matching_external_employee_id = (
    "not_matching_external_employee_id"
)
retry_failed_given_admin_overrides class-attribute instance-attribute
retry_failed_given_admin_overrides = (
    "retry_failed_given_admin_overrides"
)
start_date_too_far_back class-attribute instance-attribute
start_date_too_far_back = 'start_date_too_far_back'
user_not_found class-attribute instance-attribute
user_not_found = 'user_not_found'
EmploymentInputErrorInfo dataclass
EmploymentInputErrorInfo(
    error_code, message, additional_info=dict()
)

Bases: DataClassJsonMixin

Error returned by extractors in case of an error

additional_info class-attribute instance-attribute
additional_info = field(default_factory=dict)
error_code instance-attribute
error_code
message instance-attribute
message

extractors

This file contains "extractors", which are utility functions that aim to parse and validate items from a dict.

This file only contains global extractors for use in any country - more specialized extractors (e.g. French SSN extractors) should be put in an appropriate spot in each country's component.

Extractors ensure that we perform validation consistently across a range of input methods.

Typical usage of extractors will follow this pattern:

  • Provide a function that raises an exception for a specific error
  • Use partial on extract_or_raise to get your extraction function
  • Use the extraction function with the extractors you need

Here's a working example:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise ExtractorInputException(
        company_id=...,  # or None if you don't have it yet
        user_id=...,  # or None if you don't have it yet
        error=error
    )

extract = partial(extract_or_raise, data=YOUR_DICT, raiser=raiser)

first_name = extract(extractor=extract_mandatory_first_name, field="first_name")
ExtractorInputException
ExtractorInputException(
    user_id, company_id, error, context=None
)

Bases: UpstreamError

A simple exception when you don't need to use a specific exception with except_or_raise.

Usage:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise ExtractorInputException(
        company_id=...,  # or None if you don't have it yet
        user_id=...,  # or None if you don't have it yet
        error=error
    )

Optionally, you can also pass a context value, which will end up in the UpstreamBlockedMovement's context field.

Source code in components/employment/public/business_logic/rules/extractors.py
def __init__(
    self,
    user_id: str | None,
    company_id: str | None,
    error: EmploymentInputErrorInfo,
    context: UpstreamBlockedMovementContext | None = None,
) -> None:
    super().__init__(error.message, context or NoContext())
    self._user_id = user_id
    self._company_id = company_id
    self.error_code = error.error_code
error_code instance-attribute
error_code = error_code
to_blocked_movement_info
to_blocked_movement_info()
Source code in components/employment/public/business_logic/rules/extractors.py
@override
def to_blocked_movement_info(self) -> UpstreamBlockedMovementInfo:
    return UpstreamBlockedMovementInfo(
        user_id=self._user_id,
        company_id=self._company_id,
        error_code=self.error_code,
        error_message=str(self),
        context=self.context,
    )
ExtractorResult module-attribute
ExtractorResult = (
    tuple[T, None] | tuple[None, EmploymentInputErrorInfo]
)

Result of an extractor function. Always a tuple:

  • If a success, the first value is the parsed value (may be None depending on the extractor), the second value is None
  • If a failure, the first value is None, the second value is an EmploymentInputErrorInfo with details on the error.
P module-attribute
P = ParamSpec('P')
T module-attribute
T = TypeVar('T')
cast_to_date_with_validation
cast_to_date_with_validation(
    x, date_format=_DEFAULT_EXTRACTION_DATE_FORMAT
)

Cast the provided value to a datetime object.

By default, strings are parsed using the regular ISO format (YYYY-MM-DD).

Returns a tuple with both the parsed value (or none if the input value was falsy) and a boolean indicating whether the parsing was successful or not.

Note: this in itself is not an extractor, but can be used to build extractors with more useful error diagnostics.

Source code in components/employment/public/business_logic/rules/extractors.py
def cast_to_date_with_validation(
    x: datetime | date | str | None | Any,
    date_format: str = _DEFAULT_EXTRACTION_DATE_FORMAT,
) -> tuple[date | None, Literal[True]] | tuple[None, Literal[False]]:
    """
    Cast the provided value to a `datetime` object.

    By default, strings are parsed using the regular ISO format (YYYY-MM-DD).

    Returns a tuple with both the parsed value (or none if the input value was falsy) and a boolean indicating whether
    the parsing was successful or not.

    Note: this in itself is not an extractor, but can be used to build extractors with more useful error diagnostics.
    """
    if not x:
        return None, True
    if isinstance(x, datetime):
        return x.date(), True
    elif isinstance(x, date):
        return x, True
    elif x and isinstance(x, str):
        try:
            return datetime.strptime(x.split(" ")[0], date_format).date(), True
        except ValueError:
            return None, False
    return None, False
extract_birth_date_from_data
extract_birth_date_from_data(data, field_name)

Birth date extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_birth_date_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[date | None]:
    """
    Birth date extractor
    """
    date, is_valid = cast_to_date_with_validation(data.get(field_name))
    if not is_valid:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_birth_date,
                message=f"Invalid birth date: {data.get(field_name)}",
            ),
        )
    return date, None
extract_boolean_from_data
extract_boolean_from_data(data, field_name)

Utility for parsing booleans written in plain language.

Note: this is not a real extract (as it doesn't return a proper ExtractorResult), this is intended to be used as a utility to build extractors.

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_boolean_from_data(
    data: dict[str, Any],
    field_name: str,
) -> tuple[bool | None, Literal[True]] | tuple[None, Literal[False]]:
    """
    Utility for parsing booleans written in plain language.

    Note: this is not a real extract (as it doesn't return a proper `ExtractorResult`), this is intended to be used
    as a utility to build extractors.
    """
    raw_value = data.get(field_name)
    if raw_value is None:
        return None, True

    sanitized_value = str(raw_value).lower().strip()
    if (
        sanitized_value == "vrai"
        or sanitized_value == "1"
        or sanitized_value == "true"
        or sanitized_value == "oui"
        or sanitized_value == "yes"
    ):
        # about the 1 -> in case the admin does not set the cell to TEXT, the value will be =TRUE() which is 1, or =FALSE() which is 0
        # about the true -> the cell may also be the boolean value True, which would end up as "true" here
        return True, True
    if (
        sanitized_value == "faux"
        or sanitized_value == "0"
        or sanitized_value == "false"
        or sanitized_value == "non"
        or sanitized_value == "no"
    ):
        return False, True

    return None, False
extract_email_from_data
extract_email_from_data(data, field_name)

Email extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_email_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[str | None]:
    """
    Email extractor
    """
    email = (data.get(field_name) or "").strip() or None
    email_error = None
    if email and not email_re.match(email):
        email_error = EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_email,
            message=f"Invalid email: {email}",
        )
        return None, email_error

    return email, None
extract_end_date_from_data
extract_end_date_from_data(data, field_name)

End date extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_end_date_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[date | None]:
    """
    End date extractor
    """
    end_date, is_valid = cast_to_date_with_validation(data.get(field_name))
    if not is_valid:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_end_date,
                message=f"Invalid end date: {data.get(field_name)}",
            ),
        )
    return end_date, None
extract_external_employee_id_from_data_without_validation
extract_external_employee_id_from_data_without_validation(
    data, field_name
)

External employee ID (matricule, payroll ID, etc.) extractor.

Note: this extractor does NOT perform validation. For France, use the "extract_external_employee_id" function, which does.

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_external_employee_id_from_data_without_validation(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[str | None]:
    """
    External employee ID (matricule, payroll ID, etc.) extractor.

    Note: this extractor does NOT perform validation. For France, use the "extract_external_employee_id" function, which
    does.
    """
    raw_value = data.get(field_name)
    if not raw_value:
        return None, None

    parsed = parse_integer_string(raw_value)
    return parsed, None
extract_gender_from_data
extract_gender_from_data(data, field_name)

Gender extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_gender_from_data(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[UserGender | None]:
    """
    Gender extractor
    """
    raw_value = data.get(field_name)
    if not raw_value:
        return None, None

    try:
        return UserGender.validate(raw_value), None
    except EnumValidationError:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_gender,
            message=f"Gender value is not valid: {raw_value}",
        )
extract_mandatory_birth_date_from_data module-attribute
extract_mandatory_birth_date_from_data = mandatory_extractor(
    extractor=extract_birth_date_from_data,
    missing_value_error_code=missing_required_birth_date,
    value_name="birth date",
)
extract_mandatory_email_from_data module-attribute
extract_mandatory_email_from_data = mandatory_extractor(
    extractor=extract_email_from_data,
    missing_value_error_code=missing_required_email,
    value_name="e-mail",
)
extract_mandatory_first_name_from_data
extract_mandatory_first_name_from_data(data, field_name)

First name extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_mandatory_first_name_from_data(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[str]:
    """
    First name extractor
    """
    raw_name = data.get(field_name)
    field_name_display = snake_to_regular(field_name)
    if not raw_name:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_first_name,
            message=f"{field_name_display} is missing",
        )
    cleaned_name = (
        strip_duplicate_spaces(raw_name.strip()).title() if raw_name else raw_name
    )
    try:
        name = mandatory(validates_name(key=field_name, name=cleaned_name))
    except ModelValidationError:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_first_name,
            message=f"Invalid {field_name_display}: {raw_name}. Please use only letters, numbers, and basic punctuation (no special characters or line breaks).",
        )

    return name, None
extract_mandatory_gender_from_data module-attribute
extract_mandatory_gender_from_data = mandatory_extractor(
    extractor=extract_gender_from_data,
    missing_value_error_code=missing_required_gender,
    value_name="gender",
)
extract_mandatory_last_name_from_data
extract_mandatory_last_name_from_data(data, field_name)

Last name extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_mandatory_last_name_from_data(
    data: dict[str, Any], field_name: str
) -> ExtractorResult[str]:
    """
    Last name extractor
    """
    raw_name = data.get(field_name)
    field_name_display = snake_to_regular(field_name)
    if not raw_name:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_last_name,
            message=f"{field_name_display} is missing",
        )
    cleaned_name = (
        strip_duplicate_spaces(raw_name.strip()).title() if raw_name else raw_name
    )
    try:
        name = mandatory(validates_name(key=field_name, name=cleaned_name))
    except ModelValidationError:
        return None, EmploymentInputErrorInfo(
            error_code=EmploymentInputErrorCode.invalid_last_name,
            message=f"Invalid {field_name_display}: {raw_name}. Please use only letters, numbers, and basic punctuation (no special characters or line breaks).",
        )

    return name, None
extract_mandatory_start_date_from_data
extract_mandatory_start_date_from_data(data, field_name)

Start date extractor

Source code in components/employment/public/business_logic/rules/extractors.py
def extract_mandatory_start_date_from_data(
    data: dict[str, Any],
    field_name: str,
) -> ExtractorResult[date]:
    """
    Start date extractor
    """
    raw_start_date = data.get(field_name)
    start_date, is_valid = cast_to_date_with_validation(raw_start_date)
    if not is_valid:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_start_date,
                message=f"Invalid start date: {data.get(field_name)}",
            ),
        )
    if not start_date:
        return (
            None,
            EmploymentInputErrorInfo(
                error_code=EmploymentInputErrorCode.invalid_start_date,
                message="Missing start date",
            ),
        )
    return start_date, None
extract_or_raise
extract_or_raise(extractor, data, field_name, raiser)

Used to wrap an extractor to make its use easier in upstream scenarios.

Typical usage:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise SomeException(...)

extract = partial(extract_or_raise, data=..., raiser=raiser)

first_name = extract(extractor=extract_mandatory_first_name_from_data, field_name="first_name")
# ...

Note: if you do not have or need a custom exception for your use case, you can use ExtractorInputException, like so:

def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
    raise ExtractorInputException(
        company_id=...,  # or None if you don't have it yet
        user_id=...,  # or None if you don't have it yet
        error=error
    )
Source code in components/employment/public/business_logic/rules/extractors.py
def extract_or_raise(
    extractor: _ExtractorFunc[T],
    data: dict[str, Any],
    field_name: str,
    raiser: _ExtractorRaiser,
) -> T:
    """
    Used to wrap an extractor to make its use easier in upstream scenarios.

    Typical usage:

    ```python
    def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
        raise SomeException(...)

    extract = partial(extract_or_raise, data=..., raiser=raiser)

    first_name = extract(extractor=extract_mandatory_first_name_from_data, field_name="first_name")
    # ...
    ```

    Note: if you do not have or need a custom exception for your use case, you can use `ExtractorInputException`, like
    so:

    ```python
    def raiser(error: EmploymentInputErrorInfo) -> NoReturn:
        raise ExtractorInputException(
            company_id=...,  # or None if you don't have it yet
            user_id=...,  # or None if you don't have it yet
            error=error
        )
    ```
    """
    result = extractor(data, field_name)
    if result[1] is not None:
        raiser(result[1])
    return result[0]
mandatory_extractor
mandatory_extractor(
    extractor, missing_value_error_code, value_name
)

Utility function to turn an extractor that returns a value or None if absent into a "mandatory" extractor.

Usage:

extract_mandatory_email_from_data = mandatory_extractor(
    extract_email_from_data,
    EmploymentInputErrorCode.missing_invite_email,
    "Invitation Email"
)

This function is typed properly: the signature of the initial extractor is retained (and you can safely use it with 'partial'), and the resulting function is guaranteed to return a non-None value.

Source code in components/employment/public/business_logic/rules/extractors.py
def mandatory_extractor(
    extractor: Callable[P, ExtractorResult[T | None]],
    missing_value_error_code: EmploymentInputErrorCode,
    value_name: str,
) -> Callable[P, ExtractorResult[T]]:
    """
    Utility function to turn an extractor that returns a value or None if absent into a "mandatory" extractor.

    Usage:

    ```python
    extract_mandatory_email_from_data = mandatory_extractor(
        extract_email_from_data,
        EmploymentInputErrorCode.missing_invite_email,
        "Invitation Email"
    )
    ```

    This function is typed properly: the signature of the initial extractor is retained (and you can safely use it with
    'partial'), and the resulting function is guaranteed to return a non-None value.
    """

    def extractor_wrapped(*args: P.args, **kwargs: P.kwargs) -> ExtractorResult[T]:
        result, error = extractor(*args, **kwargs)
        if error is not None:
            return None, error

        if result is None:
            return (
                None,
                EmploymentInputErrorInfo(
                    error_code=missing_value_error_code,
                    # technically we could use the field_name here but having a specific value here makes the message more useful
                    message=f"Missing value: {value_name}",
                ),
            )

        return mandatory(result), error

    return extractor_wrapped
parse_integer_string
parse_integer_string(x)

Used for SSNs, NTTs, SIRENs and external employee IDs ("matricules"). Sometimes these integer strings are floats in the excel spreadsheet, so it adds a decimal part when we stringify them (str(float(1)) -> '1.0')

Source code in components/employment/public/business_logic/rules/extractors.py
def parse_integer_string(x: Any) -> str | None:
    """
    Used for SSNs, NTTs, SIRENs and external employee IDs ("matricules").
    Sometimes these integer strings are floats in the excel spreadsheet, so it adds
    a decimal part when we stringify them (str(float(1)) -> '1.0')
    """
    if x is None:
        return None

    if isinstance(x, float):
        return str(int(x))
    elif isinstance(x, str):
        return re2.sub(r"\s+", "", x)  # type: ignore[no-any-return]

    return str(x)

stale_invitations

SKIP_STALE_INVITATION_CHECK_METADATA_KEY module-attribute
SKIP_STALE_INVITATION_CHECK_METADATA_KEY = (
    "skip_stale_invitation_check"
)
check_invitation_is_not_stale
check_invitation_is_not_stale(
    start_date, core_employment_version
)

Verifies that an invitation with the provided date does not fall under a "stale invitation" scenario.

Stale invitations are a special flow used to handle invitations that would incur more than 100 days of coverage upon invitation. These can be admin mistakes and might need to be checked - otherwise, admins would incur a lot of regularization.

Note that 'start_date' is not the start date of the employee in the company, but the effective start date of their coverage. For example, employees added with a start date in the 2000s but whose company just signed up for Alan do not count as "stale" invitations.

Source code in components/employment/public/business_logic/rules/stale_invitations.py
def check_invitation_is_not_stale(
    start_date: date, core_employment_version: CoreEmploymentVersion
) -> None:
    """
    Verifies that an invitation with the provided date does not fall under a "stale invitation" scenario.

    Stale invitations are a special flow used to handle invitations that would incur more than 100 days of coverage
    upon invitation. These can be admin mistakes and might need to be checked - otherwise, admins would incur a lot of
    regularization.

    Note that 'start_date' is not the start date of the employee in the company, but the effective **start date of
    their coverage**. For example, employees added with a start date in the 2000s *but* whose company just signed up
    for Alan do not count as "stale" invitations.
    """
    if _should_check_stale_invitation(
        core_employment_version
    ) and is_start_date_too_far_in_the_past(start_date):
        raise StaleInvitationError(
            "Effective start date is too far in the past",
            effective_start_date=start_date,
        )
    return None

components.employment.public.constants

EMPLOYMENT_SCHEMA_NAME module-attribute

EMPLOYMENT_SCHEMA_NAME = 'employment'

components.employment.public.country_gateway

AffiliationItem dataclass

AffiliationItem(
    title,
    id,
    is_cancelled,
    start_date=optional_isodate_field(),
    end_date=optional_isodate_field(),
)

Bases: DataClassJsonMixin

An affiliation item related to an employment (see CountryGateway.get_affiliation_items_related_to_employment for details).

This dataclass is only used as a DTO sent to the front-end for display purposes. All values within it should not be used in business logic.

end_date class-attribute instance-attribute

end_date = optional_isodate_field()

id instance-attribute

id

is_cancelled instance-attribute

is_cancelled

start_date class-attribute instance-attribute

start_date = optional_isodate_field()

title instance-attribute

title

CompanyInformation dataclass

CompanyInformation(
    display_name, account_id, is_opt_out_for_offshoring=None
)

Company information returned by the country gateway.

account_id instance-attribute

account_id

display_name instance-attribute

display_name

is_opt_out_for_offshoring class-attribute instance-attribute

is_opt_out_for_offshoring = None

CountryGateway

Bases: ABC, Generic[_TValues]

Class used by the Employment Component to retrieve local-specific data.

!!! ALL imports to local components MUST be done as local imports. !!!

There should be one implementation of CountryGateway per country that uses the Employment Component. You need to add your country's implementation in components.employment.external.country_gateways

app_name abstractmethod property

app_name

App name identifying the country this gateway operates for.

are_companies_in_same_account abstractmethod

are_companies_in_same_account(company_id_1, company_id_2)

Returns True if the two companies are in the same account - false otherwise.

This is used to determine whether a transfer is necessary or not - see here ⧉.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def are_companies_in_same_account(
    self, company_id_1: str, company_id_2: str
) -> bool:
    """
    Returns True if the two companies are in the same account - false otherwise.

    This is used to determine whether a transfer is necessary or not - see [here](https://www.notion.so/alaninsurance/Definitions-employments-sources-etc-1301426e8be780cc9796ee31f49559e5?pvs=4#1341426e8be780e697f3d2c7fd33fe3b).
    """
    ...

extract_external_employee_id_from_data

extract_external_employee_id_from_data(
    data, field_name, company_id
)

Extracts and validates the external employee ID from the provided data.

Default implementation uses extract_external_employee_id_from_data_without_validation. Override this method if you need country-specific validation.

Parameters:

Name Type Description Default
data dict[str, Any]

dict containing the external employee ID

required
field_name str

key to read from the data dict

required
company_id int

used by some country implementations for validation

required
Source code in components/employment/public/country_gateway.py
def extract_external_employee_id_from_data(
    self,
    data: dict[str, Any],
    field_name: str,
    company_id: int,
) -> "ExtractorResult[str | None]":
    """
    Extracts and validates the external employee ID from the provided data.

    Default implementation uses extract_external_employee_id_from_data_without_validation.
    Override this method if you need country-specific validation.

    Args:
        data: dict containing the external employee ID
        field_name: key to read from the data dict
        company_id: used by some country implementations for validation
    """
    from components.employment.public.business_logic.rules.extractors import (
        extract_external_employee_id_from_data_without_validation,
    )

    # Default implementation doesn't use company_id, but it's part of the signature
    # for countries that need it (FR gateway for example)
    _ = company_id

    return extract_external_employee_id_from_data_without_validation(
        data, field_name
    )

get_account_name abstractmethod

get_account_name(account_id)

Retrieves the account's name

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_account_name(self, account_id: UUID) -> str:
    """
    Retrieves the account's name
    """
    ...

get_admin_error_resolver_classes

get_admin_error_resolver_classes()

Gets all admin error resolver classes contributed by this country.

Returns:

Type Description
dict[str, type[Any]]

Dictionary mapping error codes to AdminErrorResolver classes.

dict[str, type[Any]]

All imports to local components MUST be done as local imports (e.g, loading non-CA models in Canada is forbidden).

Note

If your country does not have any country-specific resolvers, you can leave this unimplemented. (it will return an empty dict by default).

Source code in components/employment/public/country_gateway.py
def get_admin_error_resolver_classes(self) -> dict[str, type[Any]]:
    """
    Gets all admin error resolver classes contributed by this country.

    Returns:
        Dictionary mapping error codes to AdminErrorResolver classes.
        All imports to local components MUST be done as local imports (e.g, loading non-CA models in Canada is forbidden).

    Note:
        If your country does not have any country-specific resolvers, you can leave this unimplemented.
        (it will return an empty dict by default).
    """
    return {}
get_affiliation_items_related_to_employment(employment_id)

Returns a list of affiliation items "related to" the provided employment, or None if this feature is not supported by this CountryGateway. The employment_id is the Employment Component ID, NOT the local ID.

There is no specific definition of what "related to" means, although this should roughly corresponds to affiliations that would be modified, impacted, clamped or otherwise checked in case of modifications for this employment. The exact semantics are up to the local logic.

Note: this function is used for display purposes only in the Scrappy Employment Component Tool.

Source code in components/employment/public/country_gateway.py
def get_affiliation_items_related_to_employment(
    self,
    employment_id: UUID,  # noqa: ARG002  # meant to be used by implementations
) -> list[AffiliationItem] | None:
    """
    Returns a list of affiliation items "related to" the provided employment, or None if this feature is not
    supported by this CountryGateway. The employment_id is the Employment Component ID, NOT the local ID.

    There is no specific definition of what "related to" means, although this should roughly corresponds to
    affiliations that would be modified, impacted, clamped or otherwise checked in case of modifications for this
    employment. The exact semantics are up to the local logic.

    Note: this function is used for display purposes only in the Scrappy Employment Component Tool.
    """
    return None

get_all_under_investigation_error_codes

get_all_under_investigation_error_codes()

All error codes for the under_investigation tab (global + country-specific).

Source code in components/employment/public/country_gateway.py
def get_all_under_investigation_error_codes(self) -> frozenset[str]:
    """
    All error codes for the under_investigation tab (global + country-specific).
    """
    from components.employment.internal.exceptions import (
        OverlappingSeveralEmployments,
    )

    global_error_codes = frozenset(
        {
            OverlappingSeveralEmployments.error_code,
        }
    )
    return global_error_codes | self.get_local_under_investigation_error_codes()

get_blocked_invitations_validation_broadcast_id

get_blocked_invitations_validation_broadcast_id()

(Advanced) Get the ID of the daily broadcast to send to admins with stale invitations that need to be validated.

This is used by the send_stale_invitations_validation_broadcasts command to send a daily broadcast to all admins with stale invitations that need to be validated.

If your country does not require Stale Invitations support, you can leave this unimplemented.

Returns:

Type Description
str | None

str | None: The ID of the daily broadcast, or None if no broadcast is required.

Source code in components/employment/public/country_gateway.py
def get_blocked_invitations_validation_broadcast_id(self) -> str | None:
    """
    (Advanced) Get the ID of the daily broadcast to send to admins with stale invitations
    that need to be validated.

    This is used by the `send_stale_invitations_validation_broadcasts` command
    to send a daily broadcast to all admins with stale invitations that need to be validated.

    If your country does not require Stale Invitations support, you can leave this unimplemented.

    Returns:
        str | None: The ID of the daily broadcast, or None if no broadcast is required.
    """
    raise NotImplementedError()

get_company_information abstractmethod

get_company_information(company_id)

Retrieves comprehensive company information including display name, account ID, and settings. Can return None if the company does not exist.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_company_information(
    self,
    company_id: str,
) -> CompanyInformation | None:
    """
    Retrieves comprehensive company information including display name, account ID, and settings.
    Can return None if the company does not exist.
    """
    ...

get_consumers_to_notify_for_legacy_backfill

get_consumers_to_notify_for_legacy_backfill()

(Advanced) Gets all employment consumers to notify for legacy backfill.

Typically, consumers are not notified for legacy backfill, as it's data ingested from the country legacy tables so countries already have the data. Some consumers may not look at the "legacy tables" and hence need to be notified for legacy backfill though (e.g. Occupational Health in FR).

If you want the consumer to also be notified for other source types, you should add it to the get_employment_consumers method.

Source code in components/employment/public/country_gateway.py
def get_consumers_to_notify_for_legacy_backfill(
    self,
) -> set[EmploymentConsumer[_TValues]]:
    """
    (Advanced) Gets all employment consumers to notify for legacy backfill.

    Typically, consumers are not notified for legacy backfill,
    as it's data ingested from the country legacy tables so countries already have the data.
    Some consumers may not look at the "legacy tables"
    and hence need to be notified for legacy backfill though (e.g. Occupational Health in FR).

    If you want the consumer to also be notified for other source types,
    you should add it to the get_employment_consumers method.
    """
    return set()

get_employee_email_from_extended_values abstractmethod

get_employee_email_from_extended_values(extended_values)

Retrieves the employee email from the provided extended values.

Note: implementation is only required if your country requires Stale Invitations support.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_employee_email_from_extended_values(
    self, extended_values: _TValues
) -> str | None:
    """
    Retrieves the employee email from the provided extended values.

    Note: implementation is only required if your country requires Stale Invitations support.
    """

get_employee_identifier_for_country abstractmethod

get_employee_identifier_for_country(extended_values)

Retrieves the Employee Identifier from the provided extended values.

Note: implementation is only required if your country requires Stale Invitations support.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_employee_identifier_for_country(
    self, extended_values: _TValues
) -> str | None:
    """
    Retrieves the Employee Identifier from the provided extended values.

    Note: implementation is only required if your country requires Stale Invitations support.
    """

get_employment_consumers abstractmethod

get_employment_consumers()

Gets all employment consumers contributed by this country.

Notes: 1. ALL Employment Consumers will be called regardless of the country of origin. 2. The function that will be called must have all local code as LOCAL (in-function) imports - otherwise, this breaks Canada (where loading non-CA models is forbidden)

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_employment_consumers(self) -> set[EmploymentConsumer[_TValues]]:
    """
    Gets all employment consumers contributed by this country.

    Notes:
    1. ALL Employment Consumers will be called regardless of the country of origin.
    2. The function that will be called must have all local code as LOCAL (in-function) imports - otherwise, this breaks Canada
    (where loading non-CA models is forbidden)
    """
    ...

get_local_under_investigation_error_codes

get_local_under_investigation_error_codes()

Country-specific error codes to display in the admin under_investigation tab.

Returns:

Type Description
frozenset[str]

Error codes for which pending blocked movements should be shown to admins.

frozenset[str]

All imports to local components must be done as local imports.

Source code in components/employment/public/country_gateway.py
def get_local_under_investigation_error_codes(self) -> frozenset[str]:
    """
    Country-specific error codes to display in the admin under_investigation tab.

    Returns:
        Error codes for which pending blocked movements should be shown to admins.
        All imports to local components must be done as local imports.
    """
    return frozenset()

get_retry_function

get_retry_function()

(Advanced) Get the function used for retrying Core Blocked Movements.

You should generally not need to implement this.

Source code in components/employment/public/country_gateway.py
def get_retry_function(self) -> RetryFunction[_TValues]:
    """
    (Advanced) Get the function used for retrying Core Blocked Movements.

    You should generally not need to implement this.
    """
    from components.employment.public.api import ingest_employment_declaration

    return ingest_employment_declaration

get_source_detail_for_blocked_movement

get_source_detail_for_blocked_movement(
    _employment_source_data_id,
)

(Advanced) Get detailed information on the blocked movement source.

For example, in France, this is used to extract the source's external provider when the source type is an external API. If not null, it will then be displayed alongside the source type on the blocked movements ops tool.

Source code in components/employment/public/country_gateway.py
def get_source_detail_for_blocked_movement(
    self, _employment_source_data_id: UUID
) -> str | None:
    """
    (Advanced) Get detailed information on the blocked movement source.

    For example, in France, this is used to extract the source's external provider
    when the source type is an external API. If not null, it will then be displayed
    alongside the source type on the blocked movements ops tool.
    """
    return None

get_subscriptions_for_company

get_subscriptions_for_company(company_id)

Get all contract subscriptions for a company. It is used to compute whether a new contract is a boomerang or not. If get_subscriptions_for_companies_by_app doesn't return ALL contract types, it can be override to include them (e.g. Occupation Health in France that uses a different stack)

Source code in components/employment/public/country_gateway.py
def get_subscriptions_for_company(
    self, company_id: str
) -> list[SubscriptionVersionSummary]:
    """
    Get all contract subscriptions for a company.
    It is used to compute whether a new contract is a boomerang or not.
    If get_subscriptions_for_companies_by_app doesn't return ALL contract types,
    it can be override to include them
    (e.g. Occupation Health in France that uses a different stack)
    """
    from components.contracting.public.subscription.subscription import (
        get_subscriptions_for_companies_by_app,
    )

    subscriptions = get_subscriptions_for_companies_by_app(
        company_id,
        app_name=self.app_name,
        include_ended=True,
        # include_will_end=None avoids ValueError for BE/ES/CA (they don't support this
        # parameter) while still giving correct results for FR.
        include_will_end=None,
    )

    return [
        SubscriptionVersionSummary(
            id=subscription.id,
            start_date=subscription.start_date,
            end_date=subscription.end_date,
        )
        for subscription in subscriptions
    ]

get_upstream_retry_handler abstractmethod

get_upstream_retry_handler(source_type)

Retrieves the upstream blocked movement retry function that corresponds to the given source_type.

Parameters:

Name Type Description Default
source_type SourceType

The source type for which to retrieve the retry function - guaranteed to be of the CountryCode that corresponds to this CountryGateway.

required

Returns:

Type Description
UpstreamBlockedMovementRetryFunction[_TValues] | None

UpstreamBlockedMovementRetryFunction[_TValues] | None: The retry function, or None if no retry function is available for the given source type.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_upstream_retry_handler(
    self, source_type: SourceType
) -> UpstreamBlockedMovementRetryFunction[_TValues] | None:
    """
    Retrieves the upstream blocked movement retry function that corresponds to the given source_type.

    Args:
        source_type (SourceType): The source type for which to retrieve the retry function - guaranteed to be of the CountryCode that corresponds to this CountryGateway.

    Returns:
        UpstreamBlockedMovementRetryFunction[_TValues] | None: The retry function, or None if no retry function is available for the given source type.
    """

get_user_admined_company_ids abstractmethod

get_user_admined_company_ids(user_id)

Retrieves the list of company IDs admined by this user.

Note: implementation is only required if your country requires Stale Invitations support.

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_user_admined_company_ids(self, user_id: str) -> list[str]:
    """
    Retrieves the list of company IDs admined by this user.

    Note: implementation is only required if your country requires Stale Invitations support.
    """
    ...

get_user_email

get_user_email(_user_id)

Retrieves a user's email address, or None if the user does not exist.

Override in country gateways that need email display for under_investigation BMs.

Source code in components/employment/public/country_gateway.py
def get_user_email(self, _user_id: str) -> str | None:
    """
    Retrieves a user's email address, or None if the user does not exist.

    Override in country gateways that need email display for under_investigation BMs.
    """
    return None

get_user_full_name abstractmethod

get_user_full_name(user_id)

Retrieves a user's full name (BaseUser.full_name), or None if the user does not exist

Source code in components/employment/public/country_gateway.py
@abstractmethod
def get_user_full_name(self, user_id: str) -> str | None:
    """
    Retrieves a user's full name (BaseUser.full_name), or None if the user does not exist
    """
    ...

last_stale_invite_notification_email_sent_to_admin_on

last_stale_invite_notification_email_sent_to_admin_on(
    company_id,
)

(Advanced) Get the timestamp of the last email sent to an admin of the given company regarding stale invitations.

This is used to determine whether an admin has already been notified of the stale invitations If they have, we will set the stale invitation's status to awaiting_user_response_pending_self_healing which will - count as a terminal status for the purpose of our affiliation speed computation - not notify admins again for this stale invitation.

If your country does not require Stale Invitations support, you can leave this unimplemented.

Parameters:

Name Type Description Default
company_id str

The company ID for which to retrieve the last notification timestamp.

required

Returns: datetime | None: The timestamp of the last email sent, or None if no email has been sent.

Source code in components/employment/public/country_gateway.py
def last_stale_invite_notification_email_sent_to_admin_on(
    self,
    company_id: str,
) -> datetime | None:
    """
    (Advanced) Get the timestamp of the last email sent to an admin of the given company
    regarding stale invitations.

    This is used to determine whether an admin has already been notified of the stale invitations
    If they have, we will set the stale invitation's status to `awaiting_user_response_pending_self_healing`
    which will
    - count as a terminal status for the purpose of our affiliation speed computation
    - not notify admins again for this stale invitation.

    If your country does not require Stale Invitations support, you can leave this unimplemented.

    Args:
        company_id (str): The company ID for which to retrieve the last notification timestamp.
    Returns:
        datetime | None: The timestamp of the last email sent, or None if no email has been sent.
    """
    raise NotImplementedError()

UpstreamBlockedMovementRetryFunction

Bases: Protocol, Generic[_TValues]

A function that retries an Upstream Blocked Movement:

__call__

__call__(
    *,
    upstream_data,
    root_employment_source_data_id,
    event_bus_orchestrator
)

Retry an Upstream Blocked Movement from the provided upstream_data.

Parameters:

Name Type Description Default
upstream_data dict

The upstream_data for the UpstreamBlockedMovement - this is the dict that was provided in the UpstreamBlockedMovementCreator

required
root_employment_source_data_id UUID

The Employment Source Data ID that corresponds to this upstream blocked movement - in order to correctly correlate the upstream blocked movement with the retry's results, you must set the EmploymentDeclaration's source_information field to this UUID.

required
event_bus_orchestrator EventBusOrchestrator

Should you need side-effects to be run, you can use this EventBusOrchestrator to do so.

required

Returns:

Type Description
EmploymentDeclaration[_TValues] | None

EmploymentDeclaration[_TValues] - if an EmploymentDeclaration needs to be processed as part of the retry

EmploymentDeclaration[_TValues] | None

None - if no EmploymentDeclaration needs to be processed as part of the retry.

Source code in components/employment/public/country_gateway.py
def __call__(
    self,
    *,  # so that implementation can use **kwargs to get rid of arguments they don't need
    # the data as reported in the upstream blocked movement via upstream_data, potentially with some values overridden
    upstream_data: dict[str, Any],
    # root_employment_source_data_id is the employment source data ID of the parent.
    # You must set your EmploymentDeclaration's source_information field with this UUID to correctly link the
    # consequences of the retried ingestion.
    root_employment_source_data_id: UUID,
    event_bus_orchestrator: EventBusOrchestrator,
) -> (
    EmploymentDeclaration[_TValues]
    | None  # Return None if nothing needs to be done in the end
):
    """
    Retry an Upstream Blocked Movement from the provided upstream_data.

    Args:
        upstream_data (dict): The `upstream_data` for the `UpstreamBlockedMovement` - this is the dict that was provided in the `UpstreamBlockedMovementCreator`
        root_employment_source_data_id (UUID): The Employment Source Data ID that corresponds to this upstream blocked movement - in order to correctly correlate the upstream blocked movement with the retry's results, you **must** set the EmploymentDeclaration's source_information field to this UUID.
        event_bus_orchestrator (EventBusOrchestrator): Should you need side-effects to be run, you can use this EventBusOrchestrator to do so.

    Returns:
        EmploymentDeclaration[_TValues] - if an EmploymentDeclaration needs to be processed as part of the retry
        None - if no EmploymentDeclaration needs to be processed as part of the retry.
    """
    ...

components.employment.public.entities

ExtendedEmploymentValue dataclass

ExtendedEmploymentValue(
    *, validity_period, key, value, source_type
)

Bases: WithValidityPeriod

WARNING: In this class, infinitely valid values are represented as validity_period = ValidityPeriod.infinity instead of None.

__repr__

__repr__()
Source code in components/employment/internal/entities/extended_employment_value.py
def __repr__(self) -> str:
    return f"ExtendedEmploymentValue(key={self.key}, validity_period={self.validity_period})"

do_overlap

do_overlap(other)
Source code in components/employment/internal/entities/extended_employment_value.py
def do_overlap(self, other: "WithValidityPeriod") -> bool:
    if not isinstance(other, ExtendedEmploymentValue):
        raise ValueError(
            f"Impossible comparison between a {self.__class__} and a {other.__class__}"
        )

    if self.key != other.key:
        raise ValueError(
            f"Impossible comparison between values of different keys, {self.key} and {other.key}"
        )

    return self.validity_period.do_overlap(other.validity_period)

key instance-attribute

key

source_type instance-attribute

source_type

validity_period instance-attribute

validity_period

WARNING: Infinitely valid values are represented as validity_period = ValidityPeriod.infinity instead of None.

value instance-attribute

value

ValueTimeline dataclass

ValueTimeline(*, employment_id, key)

Bases: Timeline[ExtendedEmploymentValue]

employment_id instance-attribute

employment_id

key instance-attribute

key

components.employment.public.enums

ACTIVE_BLOCKED_MOVEMENT_STATUSES module-attribute

ACTIVE_BLOCKED_MOVEMENT_STATUSES = frozenset(
    {
        pending,
        awaiting_user_response,
        pending_self_healing,
        awaiting_user_response_self_healing,
    }
)

An immutable set containing all of the BlockedMovementStatuses considered "active", i.e. actions (of whichever kind) are required or are being done.

AdminReportingCategory

Bases: AlanBaseEnum

Categories for grouping blocked movements in admin reporting alerts.

creation class-attribute instance-attribute

creation = 'creation'

Enrollment/creation failures requiring admin action.

non_actionable class-attribute instance-attribute

non_actionable = 'non_actionable'

Issues under investigation by Alan (informational only).

termination class-attribute instance-attribute

termination = 'termination'

Termination failures requiring admin action.

AdminResolutionAction

Bases: AlanBaseEnum

Possible resolution actions admins can take when resolving blocked movements.

These are basic actions similar to what Ops can do in the internal tool: - cancel: Cancel the blocked movement - apply: Retry the blocked movement. If params are provided, they are used as overrides. If no params are provided, retries as-is without modifications.

apply class-attribute instance-attribute

apply = 'apply'

Retry the blocked movement.

Behavior depends on whether params are provided: - If params are provided: retries with override data (params are used as overrides) - If params are None/not provided: retries as-is without modifications

The params dict structure depends on the specific blocked movement type and resolver.

cancel class-attribute instance-attribute

cancel = 'cancel'

Cancel the blocked movement without retrying.

AnyRuleCaseAction module-attribute

AnyRuleCaseAction = (
    RuleCaseAction | UseAlternativeRuleCaseAction
)

BLOCKED_MOVEMENT_SLA_TERMINAL_STATUSES module-attribute

BLOCKED_MOVEMENT_SLA_TERMINAL_STATUSES = frozenset(
    {
        cancelled,
        resolved,
        awaiting_user_response,
        awaiting_user_response_self_healing,
        automatically_resolved,
        automatically_ignored,
        automatically_cancelled,
    }
)

BlockedMovementStatus

Bases: AlanBaseEnum

The status of a blocked movement.

For more information, see https://www.notion.so/alaninsurance/Blocked-Movements-246a87a20f7a4cd388aee8a6327c4148?pvs=4 ⧉

automatically_cancelled class-attribute instance-attribute

automatically_cancelled = 'automatically_cancelled'

A self-healing blocked movement (whose status was pending_self_healing) that was cancelled as part of its automatic self-healing process.

This is equivalent to cancelled for regular pending blocked movements.

automatically_ignored class-attribute instance-attribute

automatically_ignored = 'automatically_ignored'

A blocked movement that was created, but immediately ignored.

When a source rule dictates that a movement shall be ignored, instead of silently ignoring, a blocked movement with this status is created instead.

Automatically ignored blocked movements are automatically retried daily for technical reasons (context ⧉).

automatically_resolved class-attribute instance-attribute

automatically_resolved = 'automatically_resolved'

A self-healing blocked movement (whose status was pending_self_healing) that was resolved as part of its automatic self-healing process.

This is equivalent to resolved for regular pending blocked movements.

awaiting_user_response class-attribute instance-attribute

awaiting_user_response = 'awaiting_user_response'

Ops has contacted the admin or member about this Blocked Movement. It is considered a terminal status in terms of SLA computation (our Ops' job is done), but is not the final status for a blocked movement (see https://www.notion.so/alaninsurance/Blocked-Movements-246a87a20f7a4cd388aee8a6327c4148?pvs=4#11c0a501e8b24cbb9603fa7dd273c321 ⧉).

awaiting_user_response blocked movements are automatically retried daily.

awaiting_user_response_self_healing class-attribute instance-attribute

awaiting_user_response_self_healing = (
    "awaiting_user_response_self_healing"
)

A blocked movement that is 'awaiting_user_response', but whose resolution is expected to happen automatically - this is a "self-healing" blocked movement.

Typical use cases include blocked movements that are created and automatically sent to the admin for action, and if the admin doesn't act after a certain time, the blocked movement is automatically resolved or cancelled by a daily command.

They are NOT automatically retried. This gives full control over their lifecycle to local code.

cancelled class-attribute instance-attribute

cancelled = 'cancelled'

A blocked movement that was cancelled - it was not resolved, usually because no action was required.

pending class-attribute instance-attribute

pending = 'pending'

A blocked movement that requires Ops attention. This is the default for newly created blocked movements.

Pending blocked movements are automatically retried daily.

pending_self_healing class-attribute instance-attribute

pending_self_healing = 'pending_self_healing'

A blocked movement that is 'pending', but whose resolution is expected to happen automatically - this is a "self-healing" blocked movement.

Typical use cases include blocked movements that are created, but then resolved or cancelled by a daily command. By setting a blocked movement to have pending_self_healing status, it will not appear in the Blocked Movements tool by default - meaning it will not create noise for Ops.

Note that blocked movements are never created with this state directly - you must manually mark pending blocked movements as self-healing via the mark_pending_(core|upstream)_blocked_movement_as_self_healing function.

Self-healing blocked movements, i.e. those with status pending_self_healing, are NOT automatically retried. This gives full control over their lifecycle to local code.

resolved class-attribute instance-attribute

resolved = 'resolved'

A blocked movement that was resolved, either automatically (by a daily retry, or a retry triggered by an Ops) or manually (status manually set to 'resolved').

Note that just because a blocked movement is resolved doesn't mean that the situation at hand is solved. A blocked movement may be resolved because another error happened - so this blocked movement is 'resolved' in the sense that we no longer get this specific error.

CANCELLED_BLOCKED_MOVEMENT_STATUSES module-attribute

CANCELLED_BLOCKED_MOVEMENT_STATUSES = frozenset(
    {cancelled, automatically_cancelled}
)

An immutable set containing all of the BlockedMovementStatuses considered "cancelled", i.e. the blocked movement is not valid and/or there's nothing to process.

CountryCode

Bases: AlanBaseEnum

The country associated with something in the Employment Component.

Usually deduced from the source_type of what you're looking at.

be class-attribute instance-attribute

be = 'be'

ca class-attribute instance-attribute

ca = 'ca'

es class-attribute instance-attribute

es = 'es'

fr class-attribute instance-attribute

fr = 'fr'

from_app_name staticmethod

from_app_name(app_name)

Turns an AppName into a CountryCode - not compatible with all AppNames.

Source code in components/employment/public/enums.py
@staticmethod
def from_app_name(app_name: AppName) -> "CountryCode":
    """
    Turns an AppName into a CountryCode - not compatible with all AppNames.
    """
    match app_name:
        case AppName.ALAN_FR:
            return CountryCode.fr
        case AppName.ALAN_BE:
            return CountryCode.be
        case AppName.ALAN_ES:
            return CountryCode.es
        case AppName.ALAN_CA:
            return CountryCode.ca

    raise ValueError(f"Unknown AppName: {app_name}")

test class-attribute instance-attribute

test = 'test'

EmploymentChangeType

Bases: AlanBaseEnum

The type of change that occurred.

cancellation class-attribute instance-attribute

cancellation = 'cancellation'

An employment is cancelled, and code should act as if this employment had never existed.

end_date_change class-attribute instance-attribute

end_date_change = 'end_date_change'

A change in the end date of an employment.

external_employee_id_change class-attribute instance-attribute

external_employee_id_change = 'external_employee_id_change'

The external employee ID of an employee is changed.

invitation class-attribute instance-attribute

invitation = 'invitation'

An invitation: an employee joins a company.

Mutually exclusive with all other change types.

NB: An invitation can also have an end_date in case of short-term work contracts. ⚠️ In this case, you will not receive a termination change type

resumption class-attribute instance-attribute

resumption = 'resumption'

A termination is cancelled, i.e. the end_date of an employment is cleared.

start_date_change class-attribute instance-attribute

start_date_change = 'start_date_change'

A change in the start date of an employment.

termination class-attribute instance-attribute

termination = 'termination'

A termination: an employee is terminated (an end_date is set).

NB: In case an employee is invited or transferred with an end_date, only invitation/transfer is sent.

transfer class-attribute instance-attribute

transfer = 'transfer'

An employee is transferred from one company to another. This is both a termination (in the previous company) and an invitation (in the new company). This movement is computed when we receive a new employment for an employee that already has an employment in another company of the same account. Notes: - An employee moving from one company to another company not part of the same account is not considered as a transfer, an invitation will be computed in the new company and a termination in the old company when we receive the information. This can potentially happen with a delay between the two actions - It is still possible to force a transfer between two companies that are not part of the same account by using the imperative function transfer

Mutually exclusive with all other change types.

NB: A transfer can also have an end_date in the new company in case it's already known. ⚠️ In this case, you will not receive a termination change type

uncancellation class-attribute instance-attribute

uncancellation = 'uncancellation'

An employment was cancelled by mistake and we want to undo the cancellation.

RuleCase

Bases: AlanBaseEnum

The different cases that can Source Rules rule over.

cancel class-attribute instance-attribute

cancel = 'cancel'

change_end_date class-attribute instance-attribute

change_end_date = 'change_end_date'

change_end_date_more_than_100_days_in_the_past class-attribute instance-attribute

change_end_date_more_than_100_days_in_the_past = (
    "change_end_date_more_than_100_days_in_the_past"
)

change_external_employee_id class-attribute instance-attribute

change_external_employee_id = 'change_external_employee_id'

change_external_employee_id_if_none class-attribute instance-attribute

change_external_employee_id_if_none = (
    "change_external_employee_id_if_none"
)

change_start_date class-attribute instance-attribute

change_start_date = 'change_start_date'

invite class-attribute instance-attribute

invite = 'invite'

resume class-attribute instance-attribute

resume = 'resume'

terminate class-attribute instance-attribute

terminate = 'terminate'

terminate_with_end_date_more_than_100_days_in_the_past class-attribute instance-attribute

terminate_with_end_date_more_than_100_days_in_the_past = (
    "terminate_with_end_date_more_than_100_days_in_the_past"
)

transfer class-attribute instance-attribute

transfer = 'transfer'

transfer_more_than_100_days_in_the_past class-attribute instance-attribute

transfer_more_than_100_days_in_the_past = (
    "transfer_more_than_100_days_in_the_past"
)

RuleCaseAction

Bases: AlanBaseEnum

An action to take when a rule is met.

apply class-attribute instance-attribute

apply = 'apply'

Apply as usual.

block class-attribute instance-attribute

block = 'block'

Block: the whole declaration is blocked (a core blocked movement is created).

ignore class-attribute instance-attribute

ignore = 'ignore'

Ignore: skip this rule.

  • If all movements are ignored, creates a core blocked movement with status automatically_ignored
  • If some movements are ignored, the ignored one are not processed while the other are processed as usual

TransferRuleCaseAction

Bases: UseAlternativeRuleCaseAction, AlanBaseEnum

Special RuleCaseAction that can only be used with transfers: instead of transferring, invite in the new company instead.

Note that, when a transfer is changed to an invitation like this, the RuleCase.invitation rule is NOT checked.

process_as_invitation class-attribute instance-attribute

process_as_invitation = 'process_as_invitation'

UseAlternativeRuleCaseAction

Simple marker parent class, used to identify RuleCaseActions which should result in using an "alternative payload". The exact meaning depends on the rule case.

components.employment.public.exceptions

ActionNotAllowed

ActionNotAllowed(not_allowed_rule_cases)

Bases: DeclarativeInputError

Raised when the source rules don't allow an action

Source code in components/employment/public/exceptions.py
def __init__(self, not_allowed_rule_cases: set[RuleCase]) -> None:
    message = (
        "Termination date is too far in the past."
        if not_allowed_rule_cases
        == {RuleCase.terminate_with_end_date_more_than_100_days_in_the_past}
        else (
            f"Employment change types {not_allowed_rule_cases} are not allowed by sources rules"
        )
    )
    super().__init__(message)
    self.not_allowed_rule_cases = not_allowed_rule_cases

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> "ActionNotAllowedContext":
    return ActionNotAllowedContext(
        error_message=str(self),
        employment_declaration=employment_declaration,
        not_allowed_rule_cases=self.not_allowed_rule_cases,
    )

error_code class-attribute instance-attribute

error_code = 'action_not_allowed'

not_allowed_rule_cases instance-attribute

not_allowed_rule_cases = not_allowed_rule_cases

AdminResolverInternalError

AdminResolverInternalError(
    message,
    resolver_class,
    blocked_movement_id=None,
    **extra_context
)

Bases: Exception

Error raised when an admin resolver encounters an internal error indicating a bug or data inconsistency that should not happen under normal circumstances.

Use this for cases like
  • Missing required data that should always be present
  • Invalid state that indicates a programming error
  • Data corruption or inconsistency

Do NOT use this for expected business errors that admin can fix (use ResolutionAttemptError instead).

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    resolver_class: str,
    blocked_movement_id: str | UUID | None = None,
    **extra_context: Any,
) -> None:
    super().__init__(message)
    self.blocked_movement_id = blocked_movement_id
    self.resolver_class = resolver_class
    self.extra_context = extra_context

    current_logger.error(
        f"[AdminResolver] Internal error: {message}. "
        "CTA: Check blocked movement data integrity and resolver implementation.",
        layer="admin_resolver_internal_error",
        blocked_movement_id=str(self.blocked_movement_id)
        if self.blocked_movement_id
        else None,
        resolver_class=self.resolver_class,
        **self.extra_context,
    )

blocked_movement_id instance-attribute

blocked_movement_id = blocked_movement_id

extra_context instance-attribute

extra_context = extra_context

resolver_class instance-attribute

resolver_class = resolver_class

AutomaticallyIgnoreConsumerError

AutomaticallyIgnoreConsumerError(message, consumer)

Bases: ConsumerError

A consumer error that is automatically ignored when created as a blocked movement.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    consumer: str,
) -> None:
    super().__init__(message)
    self.consumer = consumer

to_blocked_movement_info

to_blocked_movement_info(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def to_blocked_movement_info(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementInfo:
    regular_blocked_movement_info = super().to_blocked_movement_info(
        employment_declaration
    )
    return replace(
        regular_blocked_movement_info,
        target_status=BlockedMovementStatus.automatically_ignored,
    )

AutomaticallyIgnoreUpstreamError

AutomaticallyIgnoreUpstreamError(message, context)

Bases: UpstreamError

An upstream error that is automatically ignored when created as a blocked movement.

Source code in components/employment/public/exceptions.py
def __init__(self, message: str, context: UpstreamBlockedMovementContext) -> None:
    super().__init__(message)
    self.context = context

ConsumerError

ConsumerError(message, consumer)

Bases: DeclarativeInputError

Root class for errors raised within employment consumers that should lead to core blocked movements.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    consumer: str,
) -> None:
    super().__init__(message)
    self.consumer = consumer

consumer instance-attribute

consumer = consumer

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementContext:
    return DeclarativeBlockedMovementContext(
        consumer=self.consumer,
        error_message=str(self),
        employment_declaration=employment_declaration,
    )

error_code class-attribute instance-attribute

error_code = 'consumer_error'

ExternalEmployeeIdConflictError

ExternalEmployeeIdConflictError(
    message, conflicting_employment_id
)

Bases: DeclarativeInputError

Error raised by external employee ID changes that would lead to a conflict with an existing employment. External employee IDs must be unique within a company for overlapping employments.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    conflicting_employment_id: UUID,
) -> None:
    super().__init__(message)
    self.conflicting_employment_id = conflicting_employment_id

conflicting_employment_id instance-attribute

conflicting_employment_id = conflicting_employment_id

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> ConflictingExternalIdContext:
    return ConflictingExternalIdContext(
        error_message=str(self),
        employment_declaration=employment_declaration,
        conflicting_employment_id=self.conflicting_employment_id,
    )

error_code class-attribute instance-attribute

error_code = 'external_employee_id_conflict'

ResolutionActionError

Bases: Exception

Error raised when an admin's resolution action is invalid or cannot be applied.

ResolutionAttemptError

ResolutionAttemptError(message, error_code=None)

Bases: Exception

Error raised when
  • preflight validation on admin resolution parameters fails
  • an attempt to resolve a blocked movement fails following a retry with admin overrides
Source code in components/employment/public/exceptions.py
def __init__(self, message: str, error_code: str | None = None) -> None:
    super().__init__(message)
    self.error_code = error_code or ""

error_code instance-attribute

error_code = error_code or ''

TooManyResolutionsError

Bases: Exception

Error raised when there are too many pending resolutions for an admin to handle.

UnexpectedConsumerError

UnexpectedConsumerError(
    message, consumer, error_message, traceback
)

Bases: ConsumerError

Error class used for unexpected errors raised from employment consumers.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    consumer: str,
    error_message: str,
    traceback: str,
) -> None:
    super().__init__(message, consumer)
    self.error_message = error_message
    self.traceback = traceback

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementContext:
    return DeclarativeBlockedMovementContext(
        consumer=self.consumer,
        error_message=str(self),
        error_type=self.__class__,
        traceback=self.traceback,
        employment_declaration=employment_declaration,
    )

error_code class-attribute instance-attribute

error_code = 'unexpected_consumer_error'

error_message instance-attribute

error_message = error_message

traceback instance-attribute

traceback = traceback

UnexpectedLegacyBackfillError

UnexpectedLegacyBackfillError(
    message, error_message, traceback
)

Bases: DeclarativeInputError

Error type for unexpected errors that occur during the legacy_backfill process.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    error_message: str,
    traceback: str,
) -> None:
    super().__init__(message)
    self.error_message = error_message
    self.traceback = traceback

create_blocked_movement_context

create_blocked_movement_context(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def create_blocked_movement_context(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementContext:
    return DeclarativeBlockedMovementContext(
        error_message=str(self),
        error_type=self.__class__,
        traceback=self.traceback,
        employment_declaration=employment_declaration,
    )

error_code class-attribute instance-attribute

error_code = 'unexpected_legacy_backfill_error'

error_message instance-attribute

error_message = error_message

to_blocked_movement_info

to_blocked_movement_info(employment_declaration)
Source code in components/employment/public/exceptions.py
@override
def to_blocked_movement_info(
    self, employment_declaration: EmploymentDeclaration[Any]
) -> DeclarativeBlockedMovementInfo:
    regular_blocked_movement_info = super().to_blocked_movement_info(
        employment_declaration
    )
    return replace(
        regular_blocked_movement_info,
        source_type=GlobalSourceType.legacy_backfill,
    )

traceback instance-attribute

traceback = traceback

UnexpectedUpstreamError

UnexpectedUpstreamError(message, error_message, context)

Bases: UpstreamError

Error type used when an unexpected error occurs within an upstream (i.e. while preparing an Employment Declaration)

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    error_message: str,
    context: UnexpectedUpstreamErrorContext,
) -> None:
    super().__init__(message, context)
    self.error_message = error_message

error_code class-attribute instance-attribute

error_code = 'unexpected_upstream_error'

error_message instance-attribute

error_message = error_message

to_blocked_movement_info

to_blocked_movement_info()
Source code in components/employment/public/exceptions.py
@override
def to_blocked_movement_info(self) -> UpstreamBlockedMovementInfo:
    blocked_movement = UpstreamBlockedMovementInfo(
        error_code=self.error_code,
        error_message=self.error_message,
        user_id=None,
        company_id=None,
        context=self.context,
    )

    return blocked_movement

UnexpectedUpstreamErrorContext dataclass

UnexpectedUpstreamErrorContext(traceback)

Bases: UpstreamBlockedMovementContext

Context type for UnexpectedUpstreamError

traceback instance-attribute

traceback

UpstreamError

UpstreamError(message, context)

Bases: ABC, Exception

An error that occurs within the upstream phase, i.e. when preparing an Employment Declaration.

Source code in components/employment/public/exceptions.py
def __init__(self, message: str, context: UpstreamBlockedMovementContext) -> None:
    super().__init__(message)
    self.context = context

context instance-attribute

context = context

error_code class-attribute instance-attribute

error_code = 'upstream_error'

to_blocked_movement_info abstractmethod

to_blocked_movement_info()

Generate an UpstreamBlockedMovementInfo object that will be used for creating the blocked movement.

Source code in components/employment/public/exceptions.py
@abstractmethod
def to_blocked_movement_info(self) -> UpstreamBlockedMovementInfo:
    """
    Generate an UpstreamBlockedMovementInfo object that will be used for creating the blocked movement.
    """
    raise NotImplementedError

UserNotFoundError

UserNotFoundError(message, consumer)

Bases: ConsumerError

Error raised when a user ID cannot be found, typically after a user merge event when the employment change still refers to the old user ID.

Source code in components/employment/public/exceptions.py
def __init__(
    self,
    message: str,
    consumer: str,
) -> None:
    super().__init__(
        message,
        consumer=consumer,
    )

error_code class-attribute instance-attribute

error_code = 'user_not_found'

components.employment.public.source_type

SourceType module-attribute

SourceType = LocalSourceType

components.employment.public.upstream_api

URBlockedMovement dataclass

URBlockedMovement(employment_source_data_id, success)

Bases: ABC

An UpstreamResult where a blocked movement happened.

Check the URCoreBlockedMovement and URUpstreamBlockedMovement classes for details.

blocked_movement abstractmethod property

blocked_movement

Returns the blocked movement represented by this UpstreamResult.

employment_source_data_id instance-attribute

employment_source_data_id

success instance-attribute

success

with_data

with_data()

Returns an UpstreamResultWithData, which includes both the data from this UpstreamResult along with the corresponding EmploymentSourceData object.

Source code in components/employment/public/upstream_api.py
def with_data(self) -> "UpstreamResultWithData":
    """
    Returns an `UpstreamResultWithData`, which includes both the data from this UpstreamResult along with the
    corresponding EmploymentSourceData object.
    """
    return UpstreamResultWithData(
        self, get_employment_source_data_from_id(self.employment_source_data_id)
    )

URCoreBlockedMovement dataclass

URCoreBlockedMovement(
    employment_source_data_id, success, blocked_movement
)

Bases: URBlockedMovement

The Upstream successfully built an EmploymentDeclaration, but the ingestion or a local consumer raised an exception, which created a blocked movement.

blocked_movement instance-attribute

blocked_movement

URSuccess dataclass

URSuccess(employment_source_data_id, success=True)

The Upstream successfully processed the input, resulting in changes in the Employment Component, which you can inspect via the with_data function.

employment_source_data_id instance-attribute

employment_source_data_id

success class-attribute instance-attribute

success = True

with_data

with_data()

Returns an UpstreamResultWithData, which includes both the data from this UpstreamResult along with the corresponding EmploymentSourceData object.

Source code in components/employment/public/upstream_api.py
def with_data(self) -> "UpstreamResultWithData":
    """
    Returns an `UpstreamResultWithData`, which includes both the data from this UpstreamResult along with the
    corresponding EmploymentSourceData object.
    """
    return UpstreamResultWithData(
        self, get_employment_source_data_from_id(self.employment_source_data_id)
    )

URSuccessNoOp dataclass

URSuccessNoOp(success=True)

The Upstream successfully processed the input, but no declaration was ingested. This can happen for two reasons:

  • A regular upstream returned None in its build_declaration function
  • An action upstream succeeded.

success class-attribute instance-attribute

success = True

URUpstreamBlockedMovement dataclass

URUpstreamBlockedMovement(
    employment_source_data_id, success, blocked_movement
)

Bases: URBlockedMovement

The Upstream's build_declaration (or process_input_data for action upstreams) raised an exception, which created a blocked movement.

blocked_movement instance-attribute

blocked_movement

Upstream

Bases: ABC, Generic[_TInput, _TValues]

Upstreams are a mechanism which allow you to:

  • Ingest data into the Employment Component.
  • Perform actions within the Employment Component via its API.
  • Affect other stacks.
  • In a way that is retriable: failures lead to a "blocked movement", which can be retried and acted upon via tooling ⧉

The type parameters of this class are:

  • _TInput: The type of the object passed to this upstream as input
  • _TValues: The type of local extended values (check ExtendedValuesDict for details)
Kinds of upstreams

Broadly speaking, there are two kinds of upstreams:

Regular upstreams

Upstreams can declaratively send data to the Employment Component. This is similar to the ingest_or_raise function, but with blocked movements support.

To implement this kind of upstream, subclass Upstream, DictUpstream or DataclassUpstream (available in upstream_utils).

For example:

class MySourceParams(TypedDict):
    foo: str
    bar: str
    # ...

# Xx = the country code, e.g. Fr, Es, Ca...
class _XxMySourceUpstream(DictUpstream[MySourceParams, XxExtendedValues]):
    source_type: ClassVar = SourceType.xx_my_source

    @override
    def build_declaration(self, input_data: BulkImportRowParams, source_information: SourceInformationLike) -> EmploymentDeclaration[XxExtendedValues]:
        # ...

XxMySourceUpstream: Final = _XxMySourceUpstream()

# ...
with employment_session(...) as employment_api:
    employment_api.ingest_or_block(MySourceUpstream, {"foo": ..., "bar": ...})
Action upstreams

Action upstreams do NOT send data to the Employment Component, and instead perform some other action, which may leverage the Employment API or not.

To implement this kind of upstream, subclass Upstream + ActionUpstreamMixin, or ActionDictUpstream, or ActionDataclassUpstream (available in upstream_utils).

class MySourceParams(TypedDict):
    foo: str
    bar: str
    # ...

# Xx = the country code, e.g. Fr, Es, Ca...
class _XxMySourceUpstream(DictActionUpstream[MySourceParams]):
    source_type: ClassVar = SourceType.xx_my_source

    @override
    def process_input_data(self, input_data: MySourceParams, employment_api: EmploymentApiSession) -> None:
        # ...

XxMySourceUpstream: Final = _XxMySourceUpstream()

# ...
with employment_session(...) as employment_api:
    employment_api.ingest_or_block(MySourceUpstream, {"foo": ..., "bar": ...})
Registering upstreams

For the retry mechanism to work, upstreams must be registered in the local country's CountryGateway.get_retry_handlers implementation like so:

@override
def get_upstream_retry_handler(
    self, source_type: SourceType
) -> UpstreamBlockedMovementRetryFunction[XxExtendedValues] | None:
    from components.xx...my_source import XxMySourceUpstream

    return {
        SourceType.xx_my_source: XxMySourceUpstream.retry_handler
    }

build_declaration abstractmethod

build_declaration(input_data, source_information)

This function builds an EmploymentDeclaration from the provided input_data. This EmploymentDecalration will then be ingested in the Employment Component.

Note: this function must not commit nor persist anything definitively.

You can optionally return None if you do not want/need an EmploymentDeclaration to be ingested.

Source code in components/employment/public/upstream_api.py
@abstractmethod
def build_declaration(
    self, input_data: _TInput, source_information: SourceInformationLike
) -> EmploymentDeclaration[_TValues] | None:
    """
    This function builds an EmploymentDeclaration from the provided `input_data`. This EmploymentDecalration will
    then be ingested in the Employment Component.

    Note: this function **must not commit** nor persist anything definitively.

    You can optionally return `None` if you do not want/need an EmploymentDeclaration to be ingested.
    """

ingest_data

ingest_data(
    input_data, employment_api, source_rules_override=None
)

Main function used to run the Upstream on some data.

Note: you should use employment_api.ingest_or_block instead directly calling this function.

Source code in components/employment/public/upstream_api.py
@final
def ingest_data(
    self,
    input_data: _TInput,
    employment_api: EmploymentApiSession,
    source_rules_override: SourceRules | None = None,
) -> UpstreamResult:
    """
    Main function used to run the Upstream on some data.

    Note: you should use `employment_api.ingest_or_block` instead directly calling this function.
    """
    call_args = employment_api._generate_call_args()  # noqa: ALN027
    if call_args["source_type"] != self.source_type:
        raise ValueError(
            f"source_type from employment_session ({call_args['source_type']}) does not match source_type from Upstream instance ({self.source_type})"
        )

    with (
        UpstreamBlockedMovementCreator(
            upstream_data=self._serialize_input_data(input_data),
            source=self.source_type,
            commit=False,
            source_information=call_args["source_information"],
            source_rules_override=source_rules_override,
        ) as blocked_movement_creator,
        employment_api.with_nested_orchestrator() as employment_api_nested,
    ):
        employment_declaration = self._process_upstream(
            input_data, employment_api_nested
        )

    if blocked_movement_creator.created_blocked_movement:
        return URUpstreamBlockedMovement(
            success=False,
            employment_source_data_id=mandatory(
                blocked_movement_creator.employment_source_data
            ).id,
            blocked_movement=mandatory(blocked_movement_creator.blocked_movement),
        )

    if not employment_declaration:
        return URSuccessNoOp()

    ingestion_result = self._ingest_declaration(
        employment_declaration=employment_declaration,
        source_information=call_args["source_information"],
        event_bus_orchestrator=call_args["event_bus_orchestrator"],
        source_rules_override=source_rules_override,
    )

    if ingestion_result.success:
        return URSuccess(ingestion_result.employment_source_data.id)
    else:
        return URCoreBlockedMovement(
            success=False,
            employment_source_data_id=ingestion_result.employment_source_data.id,
            blocked_movement=mandatory(ingestion_result.blocked_movement),
        )

retry_handler

retry_handler(
    *,
    upstream_data,
    root_employment_source_data_id,
    event_bus_orchestrator
)

Function that you must register to your CountryGateway to allow blocked movements to be retried (see Upstream docstring for details).

Source code in components/employment/public/upstream_api.py
def retry_handler(
    self,
    *,
    upstream_data: dict[str, Any],
    root_employment_source_data_id: UUID,
    event_bus_orchestrator: EventBusOrchestrator,
) -> EmploymentDeclaration[_TValues] | None:
    """
    Function that you must register to your CountryGateway to allow blocked movements to be retried (see Upstream
    docstring for details).
    """
    input_data = self._deserialize_input_data(upstream_data)
    return self._process_upstream(
        input_data=input_data,
        employment_api=EmploymentApiSessionImpl(
            source_information=root_employment_source_data_id,
            source_type=self.source_type,
            event_bus_orchestrator=event_bus_orchestrator,
        ),
    )

source_type abstractmethod property

source_type

Override this to indicate the SourceType associated with this Upstream subclass:

source_type: ClassVar = SourceType.xx_my_source

UpstreamResult module-attribute

UpstreamResult = (
    URUpstreamBlockedMovement
    | URCoreBlockedMovement
    | URSuccess
    | URSuccessNoOp
)

UpstreamResultWithData dataclass

UpstreamResultWithData(
    upstream_result, employment_source_data
)

Utility class returned by with_data, which combines an UpstreamResult with the corresponding EmploymentSourceData.

employment_source_data instance-attribute

employment_source_data

upstream_result instance-attribute

upstream_result

components.employment.public.upstream_utils

Convenience utility classes built on top of the Upstream API

ActionUpstreamMixin

Bases: Upstream[_TInput, Never]

A mixin that allows you to define action upstreams. Refer to the Upstream documentation for more information.

Instead of overriding build_declaration, override process_input_data.

build_declaration

build_declaration(input_data, source_information)
Source code in components/employment/public/upstream_utils.py
@override
@final
def build_declaration(
    self, input_data: _TInput, source_information: SourceInformation | UUID
) -> NoReturn:
    raise ValueError(
        "Programming error: build_declaration cannot be called on an ActionUpstreamMixin"
    )

process_input_data abstractmethod

process_input_data(input_data, employment_api)
Source code in components/employment/public/upstream_utils.py
@abstractmethod
def process_input_data(  # noqa: D102
    self, input_data: _TInput, employment_api: EmploymentApiSession
) -> None: ...

retry_handler

retry_handler(
    *,
    upstream_data,
    root_employment_source_data_id,
    event_bus_orchestrator
)
Source code in components/employment/public/upstream_utils.py
@override  # not a useful override, this is just for typing purposes
def retry_handler(
    self,
    *,
    upstream_data: dict[str, Any],
    root_employment_source_data_id: UUID,
    event_bus_orchestrator: EventBusOrchestrator,
) -> None:
    super().retry_handler(
        upstream_data=upstream_data,
        root_employment_source_data_id=root_employment_source_data_id,
        event_bus_orchestrator=event_bus_orchestrator,
    )

DataclassActionUpstream

DataclassActionUpstream(input_type)

Bases: DataclassUpstream[TInputDataclass, Never], ActionUpstreamMixin[TInputDict]

Convenience superclass that combines DataclassUpstream and ActionUpstreamMixin

Source code in components/employment/public/upstream_utils.py
def __init__(self, input_type: type[TInputDataclass]) -> None:
    self._input_type = input_type

DataclassUpstream

DataclassUpstream(input_type)

Bases: Upstream[TInputDataclass, _TValues]

Upstream superclass that provides default serialization and deserialization functions for dataclasses which implement DataClassJsonMixin

Source code in components/employment/public/upstream_utils.py
def __init__(self, input_type: type[TInputDataclass]) -> None:
    self._input_type = input_type

DictActionUpstream

Bases: DictUpstream[TInputDict, Never], ActionUpstreamMixin[TInputDict]

Convenience superclass that combines DictUpstream and ActionUpstreamMixin

DictUpstream

Bases: Upstream[TInputDict, _TValues]

Upstream superclass that provides default serialization and deserialization functions for dictionaries/TypedDicts.

TInputDataclass module-attribute

TInputDataclass = TypeVar(
    "TInputDataclass", bound=DataClassJsonMixin
)

TInputDict module-attribute

TInputDict = TypeVar('TInputDict', bound=Mapping[str, Any])

components.employment.public.utils

dump_results

ExtraDumpData dataclass

ExtraDumpData(help_text, data)

Extra information to output for upload_ingestion_results_as_csv

data instance-attribute
data
help_text instance-attribute
help_text

upload_ingestion_results_as_csv

upload_ingestion_results_as_csv(
    ingestion_results, extra_data=None
)

Utility function that uploads the ingestion provided ingestion results as CSVs to S3 and prints links to download the files in the logs.

The main use-case for this function is for outputting the results of dry-run processes.

Source code in components/employment/public/utils/dump_results.py
def upload_ingestion_results_as_csv(
    ingestion_results: list[UpstreamResultWithData] | list[IngestionResult],
    extra_data: list[ExtraDumpData] | None = None,
) -> None:
    """
    Utility function that uploads the ingestion provided ingestion results as CSVs to S3 and prints links to download
    the files in the logs.

    The main use-case for this function is for outputting the results of dry-run processes.
    """
    _upload_as_csv(
        "Employment Source Data created",
        [
            _FlatEmploymentSourceData(
                id=result.employment_source_data.id,
                source_type=result.employment_source_data.source_type,
                raw_data=result.employment_source_data.raw_data,
                metadata=result.employment_source_data.metadata,
                core_employment_versions_count=len(
                    result.employment_source_data.core_employment_versions
                ),
                extended_employment_updates_count=len(
                    result.employment_source_data.extended_employment_updates
                ),
                upstream_blocked_movements_count=len(
                    result.employment_source_data.upstream_blocked_movements
                ),
                core_blocked_movements_count=len(
                    result.employment_source_data.core_blocked_movements
                ),
            )
            for result in ingestion_results
        ],
    )
    _upload_as_csv(
        "Core Employment Versions created",
        list(
            flatten(
                result.employment_source_data.core_employment_versions
                for result in ingestion_results
            )
        ),
    )
    _upload_as_csv(
        "Extended Employment Updates created",
        list(
            flatten(
                result.employment_source_data.extended_employment_updates
                for result in ingestion_results
            )
        ),
    )
    _upload_as_csv(
        "Upstream Blocked Movements created",
        list(
            flatten(
                result.employment_source_data.upstream_blocked_movements
                for result in ingestion_results
            )
        ),
    )
    _upload_as_csv(
        "Core Blocked Movements created",
        list(
            flatten(
                result.employment_source_data.core_blocked_movements
                for result in ingestion_results
            )
        ),
    )

    if extra_data is not None:
        for extra_data_item in extra_data:
            _upload_as_csv(extra_data_item.help_text, extra_data_item.data)

override_employment_declaration

override_employment_declaration_start_date

override_employment_declaration_start_date(
    employment_declaration, new_start_date
)

Updates both the start date of the employment declaration and the validity period of any extended information linked to it.

When the start date is overridden, any extended information that had a validity period starting on the original start date will have its validity period updated to start on the new start date as well.

Source code in components/employment/public/utils/override_employment_declaration.py
def override_employment_declaration_start_date(
    employment_declaration: EmploymentDeclaration[Any], new_start_date: date
) -> EmploymentDeclaration[Any]:
    """
    Updates both the start date of the employment declaration
    and the validity period of any extended information linked to it.

    When the start date is overridden, any extended information that had a validity period
    starting on the original start date will have its validity period updated to start
    on the new start date as well.
    """
    updated_extended_informations = []
    for extended_info in employment_declaration.extended_informations:
        original_validity_period = extended_info.validity_period

        # Leave it untouched if it has no dates
        if original_validity_period is None:
            updated_extended_informations.append(extended_info)
            continue

        # If the extended info had the same date as the employment declaration, we should also update it
        if original_validity_period.start_date == employment_declaration.start_date:
            updated_validity_period = ValidityPeriod(
                start_date=new_start_date,
                end_date=original_validity_period.end_date,
            )
            updated_extended_info = replace(
                extended_info, validity_period=updated_validity_period
            )
            updated_extended_informations.append(updated_extended_info)
            continue

        # Leave it untouched otherwise
        updated_extended_informations.append(extended_info)
    return replace(
        employment_declaration,
        start_date=new_start_date,
        extended_informations=updated_extended_informations,
    )

user_matching

UserMatchingProcessorWithEmploymentException

Bases: UserMatchingProcessor[TSearchParams, TUser], Generic[TSearchParams, TUser]

A UserMatchingProcessor that raises exceptions that are compatible with the Employment Component's exception typing.

By inheriting from this class instead of the regular UserMatchingProcessor, your errors will show up as clean blocked movement instead of 'unexpected_upstream_error' blocked movements