Skip to content

Api reference

components.fr.public.admins

queries

get_account_admins_profile_ids

get_account_admins_profile_ids(account_id)

Get the profile_ids of the admins of an account.

Parameters:

Name Type Description Default
account_id UUID

The id of the account.

required
Source code in components/fr/public/admins/queries.py
def get_account_admins_profile_ids(account_id: uuid.UUID) -> set[uuid.UUID]:
    """
    Get the profile_ids of the admins of an account.

    Args:
        account_id: The id of the account.
    """
    profile_ids = (
        current_session.execute(
            select(User.profile_id)
            .select_from(CompanyAdmin)
            .join(User, User.id == CompanyAdmin.user_id)
            .filter(CompanyAdmin.account_id == account_id)
        )
        .scalars()
        .all()
    )

    return set(profile_ids)

get_company_admins_profile_ids

get_company_admins_profile_ids(company_id)

Get the profile_ids of the admins of a company.

Parameters:

Name Type Description Default
company_id int

The id of the company.

required
Source code in components/fr/public/admins/queries.py
def get_company_admins_profile_ids(company_id: int) -> set[uuid.UUID]:
    """
    Get the profile_ids of the admins of a company.

    Args:
        company_id: The id of the company.
    """
    profile_ids = (
        current_session.execute(
            select(User.profile_id)
            .select_from(CompanyAdmin)
            .join(User, User.id == CompanyAdmin.user_id)
            .filter(CompanyAdmin.company_id == company_id)
        )
        .scalars()
        .all()
    )

    return set(profile_ids)

get_user_can_add_and_refer_company

get_user_can_add_and_refer_company(
    admined_entity_id, admined_entity_type
)
Source code in components/fr/public/admins/queries.py
def get_user_can_add_and_refer_company(  # noqa: D103
    admined_entity_id: str, admined_entity_type: AdminedEntityType
) -> bool:
    from components.fr.internal.product_settings.queries.account_setting import (
        has_account_setting,
    )

    if admined_entity_type == AdminedEntityType.account:
        account_id = admined_entity_id
    elif admined_entity_type == AdminedEntityType.single_company:
        account_id = get_company_id_to_account_id(company_ids=[admined_entity_id])[
            admined_entity_id
        ]
    elif (
        admined_entity_type == AdminedEntityType.operational_scope
        or admined_entity_type == AdminedEntityType.company_for_operational_scope
    ):
        # Operational scope admins can't refer companies
        return False
    else:
        raise NotImplementedError(
            f"admined_entity_type {admined_entity_type} is not supported"
        )

    is_civil_servant = has_account_setting(
        account_id=uuid.UUID(account_id),
        feature_name=AccountFeature.civil_servant,
    )

    return not is_civil_servant

components.fr.public.auth

authorization

AuthorizationStrategies

alaner_admin class-attribute instance-attribute
alaner_admin = FrAlanerAdminStrategy
authenticated class-attribute instance-attribute
authenticated = FrAuthenticatedStrategy
authenticated_with_custom_authorization class-attribute instance-attribute
authenticated_with_custom_authorization = (
    FrAuthenticatedWithCustomAuthorizationStrategy
)
authenticated_with_token_fallback class-attribute instance-attribute
authenticated_with_token_fallback = (
    FrAuthenticatedOrTokenStrategy
)
authenticated_with_token_fallback_with_custom_authorization class-attribute instance-attribute
authenticated_with_token_fallback_with_custom_authorization = (
    FrAuthenticatedOrTokenWithCustomAuthorizationStrategy
)
open class-attribute instance-attribute
open = FrOpenStrategy
owner_only class-attribute instance-attribute
owner_only = FrOwnerOnlyStrategy

FrAlanerAdminStrategy

FrAlanerAdminStrategy(permitted_for=None)

Bases: AlanerAdminStrategy

Source code in components/fr/public/auth/authorization.py
def __init__(self, permitted_for: set[EmployeePermission] | None = None) -> None:
    super().__init__(permitted_for=permitted_for)

FrAuthenticatedOrTokenStrategy

FrAuthenticatedOrTokenStrategy(
    token_extractor=None,
    token_param_name="token",
    schemes=None,
    allow_deep_link=False,
)

Bases: FrAuthenticatedStrategy

Support either one pair of param name and token extractor or a scheme of multiple pairs of param name and token extractor.

Source code in components/fr/public/auth/authorization.py
def __init__(
    self,
    token_extractor: TokenExtractor | None = None,
    token_param_name: str = "token",  # noqa: S107
    # Mapping of param name to TokenExtractor
    schemes: dict[str, TokenExtractor] | None = None,
    allow_deep_link: bool = False,
) -> None:
    """
    Support either one pair of param name and token extractor or a scheme of multiple
    pairs of param name and token extractor.
    """
    super().__init__(allow_deep_link=allow_deep_link)

    if schemes is not None:
        self.schemes = schemes
    elif token_extractor is not None:
        self.schemes = {token_param_name: token_extractor}
    else:
        raise ValueError("Either token_extractor or schemes must be provided")
authentication_required class-attribute instance-attribute
authentication_required = False
authenticator
authenticator()
Source code in components/fr/public/auth/authorization.py
def authenticator(self) -> Callable:  # type: ignore[type-arg] # noqa: D102
    default_authenticator_decorator = super().authenticator

    def endpoint_decorator(endpoint_fn):  # type: ignore[no-untyped-def]
        @wraps(endpoint_fn)
        @default_authenticator_decorator()
        def decorated_endpoint(*args, **kwargs):  # type: ignore[no-untyped-def]
            matching_token_param_name = next(
                (
                    token_param_name
                    for token_param_name in self.schemes.keys()
                    if token_param_name in request.values
                ),
                None,
            )

            if matching_token_param_name is None:
                return endpoint_fn(*args, **kwargs)

            try:
                token = request.values[matching_token_param_name]
                token_extractor = self.schemes[matching_token_param_name]
                token_content = token_extractor(token=token)
            except BadSignature:
                current_logger.warning("Failed to unsign a authentication token")
                raise BaseErrorCode.authorization_error(
                    message="Invalid token signature"
                )

            # injecting the token payload in controller function's arguments
            for key, value in token_content.items():
                if key in signature(endpoint_fn).parameters:
                    kwargs[key] = value
            return endpoint_fn(*args, **kwargs)

        return decorated_endpoint

    return endpoint_decorator
schemes instance-attribute
schemes = schemes

FrAuthenticatedOrTokenWithCustomAuthorizationStrategy

FrAuthenticatedOrTokenWithCustomAuthorizationStrategy(
    token_extractor=None,
    token_param_name="token",
    schemes=None,
    allow_deep_link=False,
)

Bases: FrAuthenticatedOrTokenStrategy

Source code in components/fr/public/auth/authorization.py
def __init__(
    self,
    token_extractor: TokenExtractor | None = None,
    token_param_name: str = "token",  # noqa: S107
    # Mapping of param name to TokenExtractor
    schemes: dict[str, TokenExtractor] | None = None,
    allow_deep_link: bool = False,
) -> None:
    """
    Support either one pair of param name and token extractor or a scheme of multiple
    pairs of param name and token extractor.
    """
    super().__init__(allow_deep_link=allow_deep_link)

    if schemes is not None:
        self.schemes = schemes
    elif token_extractor is not None:
        self.schemes = {token_param_name: token_extractor}
    else:
        raise ValueError("Either token_extractor or schemes must be provided")
ensure_custom_authorization class-attribute instance-attribute
ensure_custom_authorization = True

FrAuthenticatedStrategy

FrAuthenticatedStrategy(allow_deep_link=False)

Bases: AuthenticatedStrategy

Source code in components/fr/public/auth/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    super().__init__(allow_deep_link=allow_deep_link)

FrAuthenticatedWithCustomAuthorizationStrategy

FrAuthenticatedWithCustomAuthorizationStrategy(
    allow_deep_link=False,
)

Bases: AuthenticatedWithCustomAuthorizationStrategy

Source code in components/fr/public/auth/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    super().__init__(allow_deep_link=allow_deep_link)

FrOpenStrategy

FrOpenStrategy(allow_deep_link=False)

Bases: OpenStrategy

Source code in components/fr/public/auth/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    super().__init__(allow_deep_link=allow_deep_link)

FrOwnerOnlyStrategy

FrOwnerOnlyStrategy(
    owner_bypass_permitted_for=None, allow_deep_link=False
)

Bases: OwnerOnlyStrategy

Source code in components/fr/public/auth/authorization.py
def __init__(
    self,
    owner_bypass_permitted_for: set[EmployeePermission] | None = None,
    allow_deep_link: bool = False,
) -> None:
    super().__init__(
        owner_bypass_permitted_for=owner_bypass_permitted_for,
        allow_deep_link=allow_deep_link,
    )

TokenExtractor

Bases: Protocol

__call__
__call__(token)
Source code in components/fr/public/auth/authorization.py
def __call__(self, token: str) -> dict[str, Any]: ...  # noqa: D102

components.fr.public.blueprint

fr module-attribute

fr = create_blueprint(
    "fr", import_name=__name__, template_folder="templates"
)

components.fr.public.claim_management

api

schedule_update_insurance_profile_informations_cache

schedule_update_insurance_profile_informations_cache(
    insurance_profile_id=None,
    insurance_profile_ids=None,
    priority=None,
)

Schedule the update of the insurance profile information cache for the given insurance profile

Source code in components/fr/public/claim_management/api.py
def schedule_update_insurance_profile_informations_cache(
    insurance_profile_id: int | None = None,  # DEPRECATED, use insurance_profile_ids
    insurance_profile_ids: Iterable[int] | None = None,
    priority: JobPriority | None = None,
) -> None:
    """
    Schedule the update of the insurance profile information cache for the given insurance profile
    """
    from components.fr.internal.claim_management.public.insurance_profile_informations_cache.api import (
        schedule_update_insurance_profile_informations_caches,
    )

    # backward compatibility
    if insurance_profile_ids is None and insurance_profile_id is not None:
        insurance_profile_ids = [insurance_profile_id]

    return schedule_update_insurance_profile_informations_caches(
        insurance_profile_ids=mandatory(insurance_profile_ids),
        priority=priority,
    )

upload_alan_therapy_invoice_as_insurance_document

upload_alan_therapy_invoice_as_insurance_document(
    file,
    start_date,
    end_date,
    operator_comment,
    user_id,
    paid_amount,
    executant_number,
    health_professional,
    short_code,
    excluded_company_ids=None,
)

Upload internally an insurance document for Alan Therapy

Source code in components/fr/public/claim_management/api.py
def upload_alan_therapy_invoice_as_insurance_document(
    file: IO,  # type: ignore[type-arg]
    start_date: date,
    end_date: date,
    operator_comment: str,
    user_id: int,
    paid_amount: float | None,
    executant_number: str,
    health_professional: str,
    short_code: str,
    excluded_company_ids: set[int] | None = None,
) -> bool:
    """
    Upload internally an insurance document for Alan Therapy
    """
    from components.fr.internal.claim_management.claim_engine.steps.parsing.entities.in_memory_parsed_document_content import (  # noqa: ALN039
        InMemoryParsedDocumentContent,
        InvoiceCareAct,
        InvoiceContent,
        InvoiceOverallInformation,
    )
    from components.fr.internal.claim_management.internal.misc.business_logic.remaining_usage import (  # noqa: ALN039
        get_health_coverage_on_date,
        get_remaining_usages,
    )
    from components.fr.internal.models.enums.insurance_document_type import (
        ClaimInsuranceDocumentCategory,
    )
    from components.fr.internal.models.enums.practitioner import (
        Practitioner,
    )
    from components.fr.internal.models.user import User
    from shared.helpers.db import current_session

    user = current_session.get(User, user_id)
    if not user:
        raise Exception("User is unknown")

    insurance_profile = user.insurance_profile
    if not insurance_profile:
        raise Exception("Insurance Profile is unknown")

    policy = insurance_profile.policy_on(start_date)
    if not policy:
        raise Exception("Policy is unknown")

    company_id = policy.contract.company_id
    if excluded_company_ids and company_id in excluded_company_ids:
        raise Exception("Company is excluded")

    # Checking coverage and remaining usage

    health_coverage = get_health_coverage_on_date(
        insurance_profile=insurance_profile, reference_date=start_date
    )
    if not health_coverage:
        raise Exception("No health coverage")

    mapping = next(
        (
            m
            for m in health_coverage.internal_care_type_guarantee_mappings
            if m.internal_care_type.short_code == short_code
        ),
        None,
    )
    if not mapping:
        raise Exception("No internal care type mapping")

    remaining_usages = get_remaining_usages(
        insurance_profile=insurance_profile,
        reference_date=start_date,
        internal_health_guarantees_to_consider={mapping.internal_health_guarantee},
    )
    if len(remaining_usages) != 1:
        raise Exception("No remaining usages")

    count_remaining = remaining_usages[0].count_remaining or 0
    if count_remaining < 1:
        raise Exception("No remaining count")

    # Uploading invoice as insurance document
    pre_parsed_content = InMemoryParsedDocumentContent(
        content=InvoiceContent(
            care_acts=[
                InvoiceCareAct(
                    care_code=short_code,
                    start_date=start_date,
                    end_date=end_date,
                    total_spend=paid_amount or 0,
                    tp_amount=0,
                    brss=0,
                    secu_reimbursed_amount=0,
                    ss_coverage_rate=0,
                )
            ],
            overall_information=InvoiceOverallInformation(
                beneficiary=user.insurance_profile.id,  # type: ignore[arg-type,union-attr]
                executant_number=executant_number,
                health_professional=health_professional,
                practitioner=Practitioner.other,
            ),
        ),
        context=ParsingContext().with_no_internal_control(),
        document_category=ClaimInsuranceDocumentCategory.invoice,
    )
    upload_insurance_document_internally(
        file=file,
        user_id=user.id,
        policy_id=policy.id,
        operator_comment=operator_comment,
        pre_parsed_content=pre_parsed_content,
    )
    current_session.commit()
    return True

components.fr.public.clinic

adapter

FrClinicAdapter

Bases: ClinicAdapter

clinic_consent_ai_publish_date = datetime(2023, 12, 6)
get_allowlist_of_dermatology_medical_admins_ids
get_allowlist_of_dermatology_medical_admins_ids()
Source code in components/fr/public/clinic/adapter.py
def get_allowlist_of_dermatology_medical_admins_ids(self) -> list[str]:  # noqa: D102
    from components.fr.public.clinic.internal_teleconsultation import (
        ALLOWLIST_OF_DERMATOLOGY_MEDICAL_ADMINS_IDS,
    )

    return ALLOWLIST_OF_DERMATOLOGY_MEDICAL_ADMINS_IDS
get_app_base_user_data
get_app_base_user_data(app_user_id)
Source code in components/fr/public/clinic/adapter.py
def get_app_base_user_data(self, app_user_id: str) -> BaseUserData:  # noqa: D102
    from components.fr.internal.models.user import User
    from shared.helpers.string import normalize_name

    result = (
        current_session.query(User.first_name, User.last_name)  # type: ignore[call-overload] # noqa: ALN085
        .filter(User.id == app_user_id)
        .one_or_none()
    )

    if not result:
        raise BaseErrorCode.missing_resource(message="User not found")

    first_name, last_name = result

    return BaseUserData(
        first_name=normalize_name(first_name),
        last_name=normalize_name(last_name),
    )
get_app_user_available_health_services
get_app_user_available_health_services(
    profile_service, app_user_id
)
Source code in components/fr/public/clinic/adapter.py
@inject_profile_service
def get_app_user_available_health_services(  # noqa: D102
    self, profile_service: ProfileService, app_user_id: str
) -> list[AvailableHealthService]:
    from components.fr.internal.business_logic.user.queries.user import (
        is_user_eligible_for_telemedicine,
    )
    from components.fr.internal.models.user import User
    from components.fr.public.clinic.internal_teleconsultation import (
        app_user_has_access_to_internal_teleconsultation,
    )

    fr_user = get_or_raise_missing_resource(User, app_user_id)
    user_profile = profile_service.get_profile(fr_user.profile_id)

    user_id = int(app_user_id)
    available_health_services = [
        AvailableHealthService(name=AvailableHealthServiceName.DATO_CONTENT)
    ]

    if is_user_eligible_for_telemedicine(user_id=user_id):
        available_health_services.append(
            AvailableHealthService(
                name=AvailableHealthServiceName.VIDEO_CONSULTATION_LIVI,
            ),
        )

    if self.has_access_to_orientation_call(app_user_id=app_user_id):
        available_health_services.extend(
            [
                AvailableHealthService(
                    name=AvailableHealthServiceName.THERAPY_SESSION,
                ),
                AvailableHealthService(
                    name=AvailableHealthServiceName.ORIENTATION_CALL,
                ),
            ]
        )
    if app_user_has_access_to_internal_teleconsultation(app_user_id):
        available_health_services.append(
            AvailableHealthService(
                name=AvailableHealthServiceName.VIDEO_CONSULTATION,
                is_recommended=True,
                has_upcoming_availability=app_user_has_upcoming_availability_for_alan_consultations(
                    member_app_user=FeatureUser(
                        app_id=AppName.ALAN_FR,
                        app_user_id=app_user_id,
                    ),
                    availability_threshold_in_hours=24,
                    locale=user_profile.preferred_language
                    if user_profile
                    else "fr",
                ),
            )
        )

    # Add dermatology consultation service with allowlist restriction
    if app_user_has_access_to_dermatology_teleconsultation(app_user_id):
        available_health_services.append(
            AvailableHealthService(
                name=AvailableHealthServiceName.DERMATOLOGY_CONSULTATION,
                is_recommended=False,
                has_upcoming_availability=app_user_has_upcoming_availability_for_alan_consultations(
                    member_app_user=FeatureUser(
                        app_id=AppName.ALAN_FR,
                        app_user_id=app_user_id,
                    ),
                    availability_threshold_in_hours=48,
                    locale=user_profile.preferred_language
                    if user_profile
                    else "fr",
                    medical_admins_ids=self.get_allowlist_of_dermatology_medical_admins_ids(),
                ),
            )
        )

    available_health_services.append(
        AvailableHealthService(name=AvailableHealthServiceName.HEALTH_PROGRAM)
    )

    return available_health_services
get_app_user_data
get_app_user_data(
    app_user_id, compute_key_account_info=False
)
Source code in components/fr/public/clinic/adapter.py
def get_app_user_data(  # noqa: D102
    self, app_user_id: str, compute_key_account_info: bool = False
) -> UserData:
    from components.fr.public.user.user import get_user_for_clinic

    user = get_user_for_clinic(int(app_user_id))

    is_key_account_or_large_company_and_not_alaner = False
    if compute_key_account_info:
        is_key_account_or_large_company_and_not_alaner = not user.is_alaner and any(
            user_company.key_account or user_company.n_employees > 100
            for user_company in user.companies
        )

    dependents = (
        [
            Dependent(
                app_user_id=str(insurance_profile.user.id),
                first_name=insurance_profile.user.normalized_first_name,
                last_name=insurance_profile.user.normalized_last_name,
                age=int(insurance_profile.user.age)
                if insurance_profile.user.age
                else None,
                gender=insurance_profile.user.guess_gender,
                birth_date=insurance_profile.user.birth_date,
                dependent_type=(
                    DependentType.PARTNER
                    if insurance_profile.current_enrollment_type
                    == EnrollmentType.partner
                    else DependentType.CHILD
                    if insurance_profile.current_enrollment_type
                    == EnrollmentType.child
                    else None
                ),
            )
            for insurance_profile in user.insurance_profile.get_current_policy_insurance_profiles()
            if insurance_profile.user != user
        ]
        if user.insurance_profile is not None
        else []
    )

    return UserData(
        first_name=user.normalized_first_name,
        last_name=user.normalized_last_name,
        gender=user.guess_gender,
        email=user.email,
        profile_id=user.profile_id,
        birth_date=user.birth_date,
        phone=user.phone,
        country=user.address.country if user.address else None,
        address=user.address.as_one_line if user.address else None,
        ssn=user.insurance_profile.ssn if user.insurance_profile else None,
        lang=user.lang,
        is_key_account_or_large_company_and_not_alaner=is_key_account_or_large_company_and_not_alaner,
        is_alaner=user.is_alaner,
        dependents=dependents,
    )
get_booking_session_package
get_booking_session_package(app_user_id, session_type)
Source code in components/fr/public/clinic/adapter.py
def get_booking_session_package(  # noqa: D102
    self,
    app_user_id: str,
    session_type: TherapistBookingSessionType,
) -> BookingSessionPackage | None:
    from components.fr.public.clinic.guarantee import (
        get_therapy_sessions_guarantee_package,
    )

    if session_type == TherapistBookingSessionType.therapy:
        return get_therapy_sessions_guarantee_package(app_user_id=int(app_user_id))
    return None
get_coverage_status
get_coverage_status(app_user_id)
Source code in components/fr/public/clinic/adapter.py
def get_coverage_status(self, app_user_id: str) -> CoverageStatus | None:  # noqa: D102
    from components.fr.public.clinic.clinic_eligibility import (
        get_coverage_status,
    )

    return get_coverage_status(user_id=int(app_user_id))
get_last_active_id_verification_request_for_user
get_last_active_id_verification_request_for_user(
    app_user_id,
)

FR implementation of getting the last active ID verification request for a user.

Source code in components/fr/public/clinic/adapter.py
def get_last_active_id_verification_request_for_user(
    self, app_user_id: str
) -> IDVerificationRequest | None:
    """
    FR implementation of getting the last active ID verification request for a user.
    """
    from components.id_verification.public.business_logic.queries.id_verification import (
        get_last_active_id_verification_request_for_user,
    )

    return get_last_active_id_verification_request_for_user(
        user_id=int(app_user_id)
    )
has_access_to_orientation_call
has_access_to_orientation_call(app_user_id)
Source code in components/fr/public/clinic/adapter.py
def has_access_to_orientation_call(  # noqa: D102
    self, app_user_id: str
) -> bool:
    from components.fr.internal.business_logic.user.queries.user import (
        has_access_to_therapy_session,
    )

    user_id = int(app_user_id)
    return has_access_to_therapy_session(user_id=user_id)
has_app_user_permission
has_app_user_permission(app_user_id, permission)
Source code in components/fr/public/clinic/adapter.py
def has_app_user_permission(  # noqa: D102
    self, app_user_id: str, permission: EmployeePermission
) -> bool:
    from components.fr.internal.models.user import User

    user: User = get_or_raise_missing_resource(User, app_user_id)
    return has_permission(user, permission)
is_app_user_admin_of_company
is_app_user_admin_of_company(app_user_id, app_company_id)
Source code in components/fr/public/clinic/adapter.py
def is_app_user_admin_of_company(  # noqa: D102
    self, app_user_id: str, app_company_id: str
) -> bool:
    from components.fr.internal.models.user import User

    user: User = get_or_raise_missing_resource(User, app_user_id)
    return app_company_id in [
        str(admined_company.id) for admined_company in user.admined_companies
    ]
release_date_of_conversations_created_for_therapy_sessions class-attribute instance-attribute
release_date_of_conversations_created_for_therapy_sessions = datetime(
    2025, 2, 17
)
request_id_verification_request_for_user
request_id_verification_request_for_user(
    app_user_id, user_info, commit=True
)

FR implementation of getting or requesting ID verification for a user.

Source code in components/fr/public/clinic/adapter.py
def request_id_verification_request_for_user(
    self,
    app_user_id: str,
    user_info: ClinicUserDataForIdVerification,
    commit: bool = True,
) -> IDVerificationRequest:
    """
    FR implementation of getting or requesting ID verification for a user.
    """
    from components.id_verification.public.business_logic.queries.id_verification import (
        get_or_request_id_verification,
    )
    from components.id_verification.public.entities.id_verification_request import (
        IDVerificationReason,
    )

    return get_or_request_id_verification(
        user_id=int(app_user_id),
        company_id=None,
        reason=IDVerificationReason.clinic_teleconsultation,
        user_info=convert_user_data_id_verification_request_user_info(user_info),
        commit=commit,
    )
should_request_id_verification_for_user
should_request_id_verification_for_user(app_user_id)

FR implementation of checking if ID verification should be requested for a user. We request ID verification for internal consultation if: - The feature flag "request_id_verification_for_internal_consultation" is enabled for the user - The user does not have an active ID verification request - The user's identity is not validated

Source code in components/fr/public/clinic/adapter.py
def should_request_id_verification_for_user(
    self,
    app_user_id: str,
) -> bool:
    """
    FR implementation of checking if ID verification should be requested for a user.
    We request ID verification for internal consultation if:
    - The feature flag "request_id_verification_for_internal_consultation" is enabled for the user
    - The user does not have an active ID verification request
    - The user's identity is not validated
    """
    from components.clinic.public.business_logic.insi_identity import (
        is_identity_verified_for_user,
    )
    from components.fr.internal.business_logic.feature import (
        is_feature_enabled_for_user_id,
    )
    from components.id_verification.public.business_logic.queries.id_verification import (
        get_last_active_id_verification_request_for_user,
    )

    id_verification_request = get_last_active_id_verification_request_for_user(
        user_id=int(app_user_id)
    )

    # we can safely cast to int because in the FR space the user_id is always an integer
    return (
        is_feature_enabled_for_user_id(
            "request_id_verification_for_internal_consultation", int(app_user_id)
        )
        and id_verification_request is None
        and not is_identity_verified_for_user(
            app_user=FeatureUser(app_user_id=app_user_id, app_id=AppName.ALAN_FR)
        )
    )
update_app_user_phone
update_app_user_phone(profile_service, app_user_id, phone)
Source code in components/fr/public/clinic/adapter.py
@inject_profile_service
def update_app_user_phone(  # noqa: D102
    self, profile_service: ProfileService, app_user_id: str, phone: str | None
) -> None:
    from components.fr.internal.models.user import User

    fr_user = get_or_raise_missing_resource(User, app_user_id)
    profile_service.change_phone_number(fr_user.profile_id, phone_number=phone)
    current_session.commit()
update_app_user_ssn
update_app_user_ssn(app_user_id, ssn, commit=False)
Source code in components/fr/public/clinic/adapter.py
def update_app_user_ssn(  # noqa: D102
    self, app_user_id: str, ssn: str | None, commit: bool = False
) -> None:
    # No SSN update implementation for FR
    pass
upload_invoice_as_insurance_document
upload_invoice_as_insurance_document(
    file, app_user_id, upload_invoice_data
)
Source code in components/fr/public/clinic/adapter.py
def upload_invoice_as_insurance_document(  # noqa: D102
    self,
    file: IO,  # type: ignore[type-arg]
    app_user_id: str,
    upload_invoice_data: UploadInvoiceData,
) -> bool:
    from components.fr.public.claim_management.api import (
        upload_alan_therapy_invoice_as_insurance_document,
    )

    try:
        current_logger.info(
            "Checking if session invoice can be registered as insurance document",
            session_id=upload_invoice_data.session_id,
        )

        return upload_alan_therapy_invoice_as_insurance_document(
            file=file,
            operator_comment=f"This is an Alan Therapist Booking Session invoice ({upload_invoice_data.session_id}). It is not supposed to be reparsed by Tessi.",
            start_date=upload_invoice_data.starts_at,
            end_date=upload_invoice_data.ends_at,
            user_id=int(app_user_id),
            paid_amount=upload_invoice_data.paid_amount,
            short_code="MED_DOUCE_PSYCH",
            executant_number=(
                upload_invoice_data.medical_admin_identifier.replace(" ", "")
                if upload_invoice_data.medical_admin_identifier
                else ""
            ),
            health_professional=upload_invoice_data.medical_admin_name,
        )
    except Exception as e:
        current_logger.info(
            f"Cannot register therapist session invoice as insurance document: {e}",
            session_id=upload_invoice_data.session_id,
        )
        return False
user_has_24_hour_response_guarantee
user_has_24_hour_response_guarantee(app_user_id)
Source code in components/fr/public/clinic/adapter.py
def user_has_24_hour_response_guarantee(  # noqa: D102
    self,
    app_user_id: str,
) -> bool:
    from components.fr.internal.models.queries.user import is_user_civil_servant

    return is_user_civil_servant(user_id=int(app_user_id))
validate_session_duration
validate_session_duration(session_duration)
Source code in components/fr/public/clinic/adapter.py
def validate_session_duration(  # noqa: D102
    self,
    session_duration: int,
) -> None:
    # No banned session durations in FR
    pass

clinic_adapter module-attribute

clinic_adapter = FrClinicAdapter()

clinic_eligibility

This module contains the query to get the current or upcoming period of eligibility to the clinic restricted services.

NOTE: the logic could be reused by other services than the Clinic, provided the country-specific rules are the same. If yes, feel free to rename the file and query to a more generic name.

get_coverage_status

get_coverage_status(user_id)

Return the start and optionally the end date of the current or upcoming period of eligibility to the clinic restricted services.

Source code in components/fr/public/clinic/clinic_eligibility.py
def get_coverage_status(user_id: int) -> CoverageStatus | None:
    """
    Return the start and optionally the end date of the current or upcoming period of eligibility to the clinic restricted services.
    """
    from components.fr.internal.business_logic.user.queries.user_states import (
        get_user_states_ever_active_between,
    )

    user_states = get_user_states_ever_active_between(
        user_id=user_id,
        period_start=utctoday(),
        period_end=None,
    )

    # Sort by earliest start date first
    user_states.sort(
        key=lambda state: state.start_date if state.start_date else date.max
    )

    current_logger.debug(
        f"Clinic restricted service eligibility: {len(user_states)} state(s) found.",
        user_id=user_id,
        state=[state.to_dict() for state in user_states],
    )

    for state in user_states:
        from components.fr.internal.business_logic.user.data.user_state import (
            UserStateInsured,
        )

        # This de facto excludes states like UserStateInvited/UserStateInvitedRetiree, UserStateNotInsuredUnpaidLeave or UserStateExempted
        if isinstance(state, UserStateInsured):
            current_logger.debug(
                f"Clinic restricted service eligibility: {state} is eligible.",
                user_id=user_id,
                state=state.to_dict(),
            )

            return CoverageStatus(
                # Assumption: UserStateInsured always has a start date
                start_date=mandatory(
                    state.start_date,
                    "Found an unexpected user state with no start date.",
                ),
                end_date=state.end_date,
            )

    return None

guarantee

THERAPY_SESSION_COMPANY_PRICE_IN_CENTS module-attribute

THERAPY_SESSION_COMPANY_PRICE_IN_CENTS = 7000

THERAPY_SESSION_TNS_PRICE_IN_CENTS module-attribute

THERAPY_SESSION_TNS_PRICE_IN_CENTS = 8000

get_therapy_sessions_guarantee_package

get_therapy_sessions_guarantee_package(app_user_id)

Get the therapy sessions guarantee data for a user

Parameters:

Name Type Description Default
app_user_id int

The user ID

required

Returns:

Type Description
BookingSessionPackage

The therapy sessions package

Source code in components/fr/public/clinic/guarantee.py
def get_therapy_sessions_guarantee_package(
    app_user_id: int,
) -> BookingSessionPackage:
    """Get the therapy sessions guarantee data for a user

    Arguments:
        app_user_id: The user ID

    Returns:
        The therapy sessions package
    """
    from components.fr.public.company.queries import (
        get_default_company_id_for_user,
    )

    company_id = get_default_company_id_for_user(user_id=app_user_id)

    user_coverage = None
    insurance_profile_id = None

    user_coverage_payload = get_user_coverage_payload(app_user_id)

    if user_coverage_payload is not None:
        user_coverage, insurance_profile_id = user_coverage_payload

    session_count = (
        _get_therapy_session_count(user_coverage.guarantees) if user_coverage else None
    )

    price_in_cents = (
        THERAPY_SESSION_TNS_PRICE_IN_CENTS
        if company_id is None
        else THERAPY_SESSION_COMPANY_PRICE_IN_CENTS
    )

    package_session = None

    if user_coverage and insurance_profile_id:
        reimbursed_package = get_therapy_sessions_reimbursed_data(
            guarantees=user_coverage.guarantees if user_coverage else [],
            insurance_profile_id=insurance_profile_id,
            app_user_id=str(app_user_id),
        )

        if reimbursed_package:
            package_session, max_reimbursement_per_care = reimbursed_package

            # Dynamic pricing is only applied if users do not have legacy forfait
            if (
                not session_count
                and max_reimbursement_per_care is not None
                and max_reimbursement_per_care < price_in_cents
                and package_session.count_remaining
                and package_session.count_remaining > 0
            ):
                price_in_cents = max_reimbursement_per_care

    return BookingSessionPackage(
        price_in_cents=price_in_cents,
        included=(
            BookingSessionPackageCount(count_limit=session_count)
            if session_count
            else None
        ),
        reimbursed=package_session,
    )

get_therapy_sessions_reimbursed_data

get_therapy_sessions_reimbursed_data(
    guarantees, insurance_profile_id, app_user_id
)

Get the therapy sessions reimbursed for a user

Parameters:

Name Type Description Default
guarantees list[CoverageGuarantee]

The user guarantees

required
insurance_profile_id int

The insurance profile ID

required
app_user_id str

The user ID

required
Source code in components/fr/public/clinic/guarantee.py
def get_therapy_sessions_reimbursed_data(
    guarantees: list[CoverageGuarantee],
    insurance_profile_id: int,
    app_user_id: str,
) -> tuple[BookingSessionPackageCount, int | None] | None:
    """
    Get the therapy sessions reimbursed for a user

    Arguments:
        guarantees: The user guarantees
        insurance_profile_id: The insurance profile ID
        app_user_id: The user ID
    """
    from components.clinic.public.business_logic.therapist_booking_session import (
        get_past_sessions_that_need_invoice_generation_count_for_user,
        get_upcoming_reimbursed_therapist_booking_session_count_for_user,
    )
    from components.fr.internal.claim_management.enums.count_and_amount_usage import (
        RemainingUsageCode,
    )
    from components.fr.internal.claim_management.internal.misc.business_logic.remaining_usage import (
        get_remaining_usages_for_insurance_profile,
    )

    psycho_guarantees_with_remaining_code: list[RemainingUsageCode] = [
        RemainingUsageCode.ALTERNATIVE_MEDICINE,
        RemainingUsageCode.PSYCHOLOGY_NON_REIMBURSED,
    ]

    for guarantees_with_remaining_code in psycho_guarantees_with_remaining_code:
        guarantee_data = _find_guarantee_by_name(
            guarantees, guarantees_with_remaining_code.value
        )
        if not guarantee_data:
            continue

        care_type_mappings = guarantee_data.care_type_mappings
        if not care_type_mappings:
            continue

        selector = care_type_mappings[0].selector
        if "MED_DOUCE_PSYCH" not in selector.include_care_types:
            continue

        remaining_usages = get_remaining_usages_for_insurance_profile(
            insurance_profile_id=insurance_profile_id,
            remaining_usage_codes=[guarantees_with_remaining_code],
        )
        if not remaining_usages:
            continue

        remaining_usage: RemainingUsage = remaining_usages[0]
        count_limit = remaining_usage.count_limit

        if count_limit is not None and remaining_usage.count_remaining is not None:
            feature_user = FeatureUser(app_user_id=app_user_id, app_id=AppName.ALAN_FR)
            # We want to deduce the number of upcoming sessions that are going to be reimbursed
            # (the claims engine does not count them as the care acts do not exist yet)
            count_upcoming_reimbursed_sessions = (
                get_upcoming_reimbursed_therapist_booking_session_count_for_user(
                    feature_user=feature_user,
                    session_type=TherapistBookingSessionType.therapy,
                )
            )
            count_past_reimbursed_sessions_that_needs_invoice_generation = (
                get_past_sessions_that_need_invoice_generation_count_for_user(
                    feature_user=feature_user,
                    session_type=TherapistBookingSessionType.therapy,
                )
            )
            # If the member reached the limit of their forfait, we return 0
            count_remaining = max(
                remaining_usage.count_remaining
                - count_upcoming_reimbursed_sessions
                - count_past_reimbursed_sessions_that_needs_invoice_generation,
                0,
            )
            return (
                BookingSessionPackageCount(
                    count_limit=count_limit,
                    count_remaining=count_remaining,
                ),
                remaining_usage.max_reimbursement_per_care,
            )

    return None

get_user_coverage_payload

get_user_coverage_payload(app_user_id)
Source code in components/fr/public/clinic/guarantee.py
def get_user_coverage_payload(  # noqa: D103
    app_user_id: int,
) -> tuple[CoveragePayload, int] | None:
    from components.fr.internal.claim_management.public.coverage.api import (
        get_latest_coverage_payload,
    )

    user = get_or_raise_missing_resource(
        User,
        app_user_id,
        options=[
            selectinload(User.address),
            selectinload(User.companies),
            selectinload(User.insurance_profile).options(
                selectinload(InsuranceProfile.policies),  # type: ignore[arg-type]
            ),
        ],
    )

    insurance_profile = user.insurance_profile

    # Exempted?
    if insurance_profile is None:
        return None

    insurance_policy = insurance_profile.current_policy

    # Exempted?
    if insurance_policy is None:
        return None

    health_coverage = insurance_policy.current_health_coverage
    latest_coverage_payload = get_latest_coverage_payload(health_coverage.name)

    if latest_coverage_payload is None:
        return None

    return latest_coverage_payload, insurance_profile.id

id_verification

convert_user_data_id_verification_request_user_info

convert_user_data_id_verification_request_user_info(
    user_data,
)

Converts UserData to IDVerificationRequestUserInfo for FR.

Source code in components/fr/public/clinic/id_verification.py
def convert_user_data_id_verification_request_user_info(
    user_data: ClinicUserDataForIdVerification,
) -> IDVerificationRequestUserInfo:
    """
    Converts UserData to IDVerificationRequestUserInfo for FR.
    """
    return IDVerificationRequestUserInfo(
        first_name=create_first_name_with_additional_first_names(
            first_name=user_data.first_name,
            additional_first_names=user_data.additional_first_names,
        ),
        last_name=user_data.last_name,
        email=user_data.email,
        additional_properties=IDVerificationRequestUserInfoAdditionalProperties(
            date_of_birth=user_data.date_of_birth,
            place_of_birth=user_data.place_of_birth,
            birth_last_name=user_data.birth_last_name,
            gender=user_data.gender,
            session_id=user_data.session_id,
        ),
    )

create_first_name_with_additional_first_names

create_first_name_with_additional_first_names(
    first_name, additional_first_names
)

Creates a first name string that includes additional first names if provided.

Source code in components/fr/public/clinic/id_verification.py
def create_first_name_with_additional_first_names(
    first_name: str, additional_first_names: list[str] | None
) -> str:
    """
    Creates a first name string that includes additional first names if provided.
    """
    if not additional_first_names:
        return first_name

    all_names = [first_name] + additional_first_names
    return " ".join(all_names)

internal_teleconsultation

ALLOWLIST_OF_DERMATOLOGY_MEDICAL_ADMINS_IDS module-attribute

ALLOWLIST_OF_DERMATOLOGY_MEDICAL_ADMINS_IDS = [
    "3c7c88e9-0794-4d23-9a32-002e75ca1f60"
]

DERMATOLOGY_TELECONSULTATION_FEATURE_NAME module-attribute

DERMATOLOGY_TELECONSULTATION_FEATURE_NAME = (
    "clinic_dermatology_teleconsultation"
)

INTERNAL_TELECONSULTATION_FEATURE_NAME module-attribute

INTERNAL_TELECONSULTATION_FEATURE_NAME = (
    "clinic_internal_teleconsultation"
)

app_user_has_access_to_dermatology_teleconsultation

app_user_has_access_to_dermatology_teleconsultation(
    app_user_id,
)

Check if the app user has access to dermatology teleconsultation

Source code in components/fr/public/clinic/internal_teleconsultation.py
def app_user_has_access_to_dermatology_teleconsultation(app_user_id: str) -> bool:
    """
    Check if the app user has access to dermatology teleconsultation
    """
    try:
        return app_user_has_access_to_feature_flag(
            app_user_id,
            feature_name=DERMATOLOGY_TELECONSULTATION_FEATURE_NAME,
        )
    except Exception:
        current_logger.exception(
            "Error checking if app user has access to dermatology teleconsultation",
            app_user_id=app_user_id,
        )
        return False

app_user_has_access_to_internal_teleconsultation

app_user_has_access_to_internal_teleconsultation(
    app_user_id,
)

Check if the app user has access to internal teleconsultation

Source code in components/fr/public/clinic/internal_teleconsultation.py
def app_user_has_access_to_internal_teleconsultation(app_user_id: str) -> bool:
    """
    Check if the app user has access to internal teleconsultation
    """
    try:
        return app_user_has_access_to_feature_flag(
            app_user_id,
            feature_name=INTERNAL_TELECONSULTATION_FEATURE_NAME,
        )
    except Exception:
        current_logger.exception(
            "Error checking if app user has access to internal teleconsultation",
            app_user_id=app_user_id,
        )
        return False

components.fr.public.company

queries

get_all_franchises

get_all_franchises()
Source code in components/fr/internal/business_logic/company/queries/franchise.py
def get_all_franchises() -> list[FranchiseEntity]:
    franchises = current_session.query(Franchise).all()  # noqa: ALN085
    return [_convert_model_to_entity(franchise) for franchise in franchises]

get_company_ids_from_account

get_company_ids_from_account(account_id)
Source code in components/fr/internal/business_logic/account/queries/account.py
def get_company_ids_from_account(account_id: uuid.UUID) -> list[int]:
    company_ids = (
        current_session.query(Company.id).filter(Company.account_id == account_id).all()  # noqa: ALN085
    )
    return [company_id for (company_id,) in company_ids]

get_default_company_id_for_user

get_default_company_id_for_user(user_id)
Source code in components/fr/internal/business_logic/company/queries/company.py
@tracer.wrap()
def get_default_company_id_for_user(user_id: int) -> int | None:
    from components.fr.internal.models.queries.user import (
        get_active_or_last_employment,
    )

    # Pre-load relationships.
    # NOTE: We are using `joinedload` even for one-to-many relationships, because
    # we expect the cardinality to be low (only a few employments/policies per user).
    # This saves some round-trips to the database.
    user = (
        current_session.query(User)  # noqa: ALN085
        .filter(User.id == user_id)
        .options(
            joinedload(User.alan_employee),
            joinedload(User.employments),
            joinedload(User.insurance_profile).options(
                joinedload(InsuranceProfile.enrollments)
                .joinedload(Enrollment.policy)
                .joinedload(Policy.contract),
                joinedload(InsuranceProfile.policies).options(  # type: ignore[arg-type]
                    joinedload(Policy.contract),
                    joinedload(Policy.enrollments),
                ),
            ),
        )
        .one_or_none()
    )
    if not user:
        raise ErrorCode.missing_resource()

    # User is currently on a policy from an individual contract
    if (
        user.insurance_profile
        and user.insurance_profile.current_policy
        and user.insurance_profile.current_policy.contract.contractee_type
        == ContracteeType.individual
    ):
        return None

    # User is a beneficiary
    if (
        user.insurance_profile
        and user.insurance_profile.current_policy
        and user.insurance_profile
        != user.insurance_profile.current_policy.primary_profile
        and user.insurance_profile.current_policy.contract.contractee_type
        == ContracteeType.company
    ):
        return user.insurance_profile.current_policy.contract.company_id

    # User is currently insured
    if (
        user.insurance_profile
        and user.insurance_profile.active_enrollment
        and user.insurance_profile.active_enrollment.policy.contract.contractee_type
        == ContracteeType.company
    ):
        return user.insurance_profile.active_enrollment.policy.contract.company_id

    # User is invited or exempted or non-currently insured
    active_or_last_employment = get_active_or_last_employment(user.id)
    if active_or_last_employment:
        return active_or_last_employment.company_id

    return None

get_franchise

get_franchise(id)
Source code in components/fr/internal/business_logic/company/queries/franchise.py
def get_franchise(id: uuid.UUID) -> FranchiseEntity:
    franchise = get_or_raise_missing_resource(Franchise, id)
    return _convert_model_to_entity(franchise)

components.fr.public.contract

queries

health_contracts

get_health_contract_information
get_health_contract_information(contract_id, on_date)
Source code in components/fr/internal/business_logic/contract/queries/health_contracts.py
def get_health_contract_information(
    contract_id: int, on_date: date
) -> HealthContractInfo:
    contract = get_or_raise_missing_resource(Contract, contract_id)
    is_company_contractee = contract.company_contractee is not None
    contract_version = get_ongoing_contract_version(
        contract_id=contract_id, on_date=on_date
    )
    company_name = (
        contract.company_contractee.display_name
        if contract.company_contractee
        else None
    )

    return HealthContractInfo(
        contractee_type=(
            ContracteeType.company
            if is_company_contractee
            else ContracteeType.individual
        ),
        company_name=company_name,
        company_participation_unit="cents",
        company_participation_partner=(
            int(contract_version.participation_partner)
            if contract_version and contract_version.participation_partner is not None
            else None
        ),
        company_participation_children=(
            int(contract_version.participation_children)
            if contract_version and contract_version.participation_children is not None
            else None
        ),
        is_direct_billing=(
            contract_version.is_direct_billing
            if contract_version and contract_version.is_direct_billing
            else False
        ),
    )

income

get_dependent_income_brackets_for_subscriptions
get_dependent_income_brackets_for_subscriptions(
    subscription_ids, on_date
)

Get dependent income brackets for multiple subscriptions from contract discount rules.

This function extracts income brackets from the discount rules associated with each contract. It filters for discounts that are valid on the given date and converts discount rule income brackets to IncomeBracketEntity objects.

Parameters:

Name Type Description Default
subscription_ids list[int]

List of subscription IDs to get dependent brackets for

required
on_date date

The date to check the income brackets against

required

Returns: dict[int, list[IncomeBracketEntity]]: Mapping of subscription_id to dependent brackets

Source code in components/fr/internal/contract/queries/income.py
def get_dependent_income_brackets_for_subscriptions(
    subscription_ids: list[int],
    on_date: date,
) -> dict[int, list[IncomeBracketEntity]]:
    """
    Get dependent income brackets for multiple subscriptions from contract discount rules.

    This function extracts income brackets from the discount rules associated with each contract.
    It filters for discounts that are valid on the given date and converts discount rule
    income brackets to IncomeBracketEntity objects.

    Args:
        subscription_ids: List of subscription IDs to get dependent brackets for
        on_date: The date to check the income brackets against
    Returns:
        dict[int, list[IncomeBracketEntity]]: Mapping of subscription_id to dependent brackets
    """
    if not subscription_ids:
        return {}

    contracts = _get_contracts_by_ids(subscription_ids)

    result = {}
    for contract in contracts:
        # Only return brackets for contracts linked to a company (employee plans)
        # and not individual contracts (retiree)
        if contract.company_contractee is None:
            continue

        brackets = []

        # Find discounts that are valid on the given date
        for discount in contract.discounts:
            if discount.start_month <= on_date <= discount.end_month:
                # Extract income brackets from discount rules
                for discount_rule in discount.discount_rules:
                    if discount_rule.primary_min_income is not None:
                        bracket = IncomeBracketEntity(
                            min=discount_rule.primary_min_income,
                            max=discount_rule.primary_max_income,
                        )
                        brackets.append(bracket)

        # Sort brackets by min income for consistent ordering
        brackets.sort(key=lambda b: b.min)

        if brackets:
            result[contract.id] = brackets

    return result
get_income_brackets_for_subscriptions
get_income_brackets_for_subscriptions(
    subscription_ids, on_date, for_target=None
)

Returns income brackets for multiple subscriptions on a specific date. This function is optimized to avoid N+1 queries by batching database operations.

Parameters:

Name Type Description Default
subscription_ids list[int]

List of subscription IDs to get income brackets for

required
on_date date

The date to check the income brackets against

required
for_target Optional[IncomeBracketTarget]

Optional target for which the income brackets are requested

None

Returns: dict[int, list[IncomeBracketEntity]]: Mapping of subscription_id to income brackets Only includes subscriptions that have income brackets

Source code in components/fr/internal/contract/queries/income.py
def get_income_brackets_for_subscriptions(
    subscription_ids: list[int],
    on_date: date,
    for_target: Optional[IncomeBracketTarget] = None,
) -> dict[int, list[IncomeBracketEntity]]:
    """
    Returns income brackets for multiple subscriptions on a specific date.
    This function is optimized to avoid N+1 queries by batching database operations.

    Args:
        subscription_ids: List of subscription IDs to get income brackets for
        on_date: The date to check the income brackets against
        for_target: Optional target for which the income brackets are requested
    Returns:
        dict[int, list[IncomeBracketEntity]]: Mapping of subscription_id to income brackets
        Only includes subscriptions that have income brackets
    """
    if not subscription_ids:
        return {}

    option_brackets = get_option_income_brackets_for_subscriptions(
        subscription_ids, on_date
    )

    dependent_brackets = get_dependent_income_brackets_for_subscriptions(
        subscription_ids, on_date
    )

    result = {}
    for subscription_id in subscription_ids:
        brackets_for_option = option_brackets.get(subscription_id, [])
        brackets_for_dependent = dependent_brackets.get(subscription_id, [])

        if for_target == IncomeBracketTarget.option and not brackets_for_option:
            continue
        if for_target == IncomeBracketTarget.dependent and not brackets_for_dependent:
            continue

        merged_brackets = _merge_income_brackets(
            brackets_for_option, brackets_for_dependent
        )

        if merged_brackets:
            result[subscription_id] = merged_brackets

    return result
get_member_type_for_user_on
get_member_type_for_user_on(user_id, on_date)
Source code in components/fr/internal/contract/queries/income.py
def get_member_type_for_user_on(user_id: str, on_date: date) -> Optional[MemberType]:
    subscription_id = get_subscription_id_for_user_on(
        user_id=int(user_id), on_date=on_date
    )
    if subscription_id is None:
        return None

    contract = get_or_raise_missing_resource(Contract, subscription_id)
    plan = contract.plan_on(on_date)

    if (
        plan.is_collective_retiree_plan
    ):  # For our use case, the retirees are attached to a collective plan
        return MemberType.retiree

    if plan.is_company_plan:
        return MemberType.employee

    return None
get_option_income_brackets_for_subscriptions
get_option_income_brackets_for_subscriptions(
    subscription_ids, on_date
)

Get option income brackets for multiple subscriptions.

Parameters:

Name Type Description Default
subscription_ids list[int]

List of subscription IDs to get option brackets for

required
on_date date

The date to check the income brackets against

required

Returns: dict[int, list[IncomeBracketEntity]]: Mapping of subscription_id to option brackets

Source code in components/fr/internal/contract/queries/income.py
def get_option_income_brackets_for_subscriptions(
    subscription_ids: list[int], on_date: date
) -> dict[int, list[IncomeBracketEntity]]:
    """
    Get option income brackets for multiple subscriptions.

    Args:
        subscription_ids: List of subscription IDs to get option brackets for
        on_date: The date to check the income brackets against
    Returns:
        dict[int, list[IncomeBracketEntity]]: Mapping of subscription_id to option brackets
    """
    if not subscription_ids:
        return {}

    contracts = _get_contracts_by_ids(subscription_ids)

    return {
        contract.id: [
            IncomeBracketEntity(
                min=option_price_rule.primary_income_bracket[0],
                max=option_price_rule.primary_income_bracket[1],
            )
            for option_price_rule in contract.plan_on(on_date).option_price_rules
            if option_price_rule.primary_income_bracket is not None
        ]
        for contract in contracts
    }

components.fr.public.customer_health_partner

get_admin_traits

get_admin_traits_to_notify

get_admin_traits_to_notify(admin_id, company_ids)

Return the list of AdminTraits for admins who should be notified about the well-being assessment report.

Source code in components/fr/public/customer_health_partner/get_admin_traits.py
def get_admin_traits_to_notify(
    admin_id: str | None, company_ids: list[str]
) -> Sequence[AdminTraits]:
    """
    Return the list of AdminTraits for admins who should be notified about the well-being assessment report.
    """
    users_traits_of_users_to_notify: list[AdminTraits] = []
    # we notify the admin who created the assessment
    if admin_id is not None:
        admin_user = get_or_raise_missing_resource(User, admin_id)
        users_traits_of_users_to_notify.append(AdminTraits(admin_user))

    # we notify all wellbeing referents
    for company_id in company_ids:
        for wellbeing_referent_user_trait in [
            AdminTraits(user)
            for user in get_active_wellbeing_referent_for_company(
                company_id=int(company_id)
            )
        ]:
            if wellbeing_referent_user_trait not in users_traits_of_users_to_notify:
                users_traits_of_users_to_notify.append(wellbeing_referent_user_trait)

    return users_traits_of_users_to_notify

components.fr.public.demo_account

helpers

is_demo_mode_enabled

is_demo_mode_enabled()

:return: a boolean telling if the app runs in demo mode (i.e. either in the demo environment, or in dev environment with demo mode enabled)

Source code in components/fr/public/demo_account/helpers.py
def is_demo_mode_enabled() -> bool:
    """
    :return: a boolean telling if the app runs in demo mode (i.e. either in the demo environment, or in dev environment with demo mode enabled)
    """
    return (
        is_demo_mode()
        or is_development_mode()
        and current_config.get("LOCAL_DEMO_MODE", False)
    )

components.fr.public.document_parsing

entities

document_category

DocumentCategoryDisplayInfo dataclass
DocumentCategoryDisplayInfo(category, lang, label, icon)

Bases: DataClassJsonMixin

This is the info to display a document category (label, icon)

category instance-attribute
category
icon instance-attribute
icon
label instance-attribute
label
lang instance-attribute
lang

rejection_reason

RejectionReason dataclass
RejectionReason(reason, defaultMessage)

Bases: DataClassJsonMixin

A rejection reason is made of 2 things: - a reason, it's like a key - a default Message we want to display to the member. It will be used in email/member dashboard/default of a CareEvent

defaultMessage instance-attribute
defaultMessage
reason instance-attribute
reason

queries

document_category

get_document_categories
get_document_categories(lang=Lang.english)

Get the list of document categories with display info for document in France For now we only return categories that are handled by the new parsing tool return: list[DocumentCategoryDisplayInfo]

Source code in components/fr/public/document_parsing/queries/document_category.py
def get_document_categories(
    lang: Lang = Lang.english,
) -> list[DocumentCategoryDisplayInfo]:
    """
    Get the list of document categories with display info for document in France
    For now we only return categories that are handled by the new parsing tool
    return: list[DocumentCategoryDisplayInfo]
    """
    document_categories_with_icon = [
        (ClaimInsuranceDocumentCategory.quote, "IconNotes"),
        (ClaimInsuranceDocumentCategory.ss_attestation, "IconAddressBook"),
        (
            ClaimInsuranceDocumentCategory.birth_or_adoption_certificate,
            "IconBabyCarriage",
        ),
        (ClaimInsuranceDocumentCategory.ss_decompte, "IconAbacus"),
        (ClaimInsuranceDocumentCategory.invoice, "IconFileInvoice"),
        (ClaimInsuranceDocumentCategory.mutuelle_decompte, "IconScale"),
        (ClaimInsuranceDocumentCategory.prescription, "IconFilePencil"),
        (
            ClaimInsuranceDocumentCategory.mutuelle_no_coverage_attestation,
            "IconUmbrella",
        ),
        (ClaimInsuranceDocumentCategory.unsupported, "IconFileUnknown"),
        (
            ClaimInsuranceDocumentCategory.cancel_teletransmission_request,
            "IconCircleXFilled",
        ),
        (ClaimInsuranceDocumentCategory.medical_results, "IconReportMedical"),
        (ClaimInsuranceDocumentCategory.medical_imaging, "IconDeviceCameraPhone"),
        (ClaimInsuranceDocumentCategory.medical_certificate, "IconFileCertificate"),
        (ClaimInsuranceDocumentCategory.non_claims_prescription, "IconFileDescription"),
        (ClaimInsuranceDocumentCategory.other_health, "IconQuestionMark"),
        (ClaimInsuranceDocumentCategory.unknown, "IconQuestionMark"),
    ]
    return [
        DocumentCategoryDisplayInfo(
            category=category,
            lang=lang,
            label=translate(
                language=lang, key_string=f"parsing_tool.document_categories.{category}"
            ),
            icon=icon,
        )
        for category, icon in document_categories_with_icon
    ]
get_document_category_label
get_document_category_label(
    document_category, lang=Lang.english
)

Retrieve the label for the given document category return: string

Source code in components/fr/public/document_parsing/queries/document_category.py
def get_document_category_label(
    document_category: ClaimInsuranceDocumentCategory,
    lang: Lang = Lang.english,
) -> str:
    """
    Retrieve the label for the given document category
    return: string
    """
    return translate(
        language=lang,
        key_string=f"parsing_tool.document_categories.{document_category}",
    )

helpers

HEALTH_DOCUMENT_UPLOAD_URL module-attribute
HEALTH_DOCUMENT_UPLOAD_URL = (
    "https://alan.com/app/dashboard?documentUploader=claim"
)
REJECTION_BODY_FOR_USER module-attribute
REJECTION_BODY_FOR_USER = {'default_message': {unreadable: f'🧐 Nous ne parvenons pas à lire les informations sur ce document.

👉 Nous vous invitons à nous le transférer à nouveau :
  - dans un format standard type (.JPEG, .PNG ou .PDF),
  - En vous assurant que l'image est nette et que la qualité est suffisante.

  Transférer une nouvelle version du document :{HEALTH_DOCUMENT_UPLOAD_URL}
', unusable_partial_document: f'🤔 Le document envoyé n'est pas complet. Des informations sont donc manquantes pour pouvoir estimer votre remboursement.

👉 Nous vous invitons à nous le renvoyer dans son intégralité (toutes les pages, chaque page entièrement visible).

💁 Dans le cas d'un décompte de soins, vous pouvez le télécharger depuis le site de la Sécurité Sociale, rubrique "Mes paiements" en cliquant sur "Télécharger un décompte" (la petite flèche située à droite des frais de santé concernés).

Transférer une nouvelle version du document : f{HEALTH_DOCUMENT_UPLOAD_URL}', duplicate: "🤔 On dirait que ce document est déjà en notre possession.\n\n👉 Si c'est une erreur, vous pouvez nous renvoyer le bon document, nous le traiterons rapidement.", beneficiary_not_covered: "🤔 On dirait que ce document concerne un bénéficiaire qui n'est pas couvert par Alan.\n\n👉 Nous ne pouvons donc pas procéder au remboursement.", insufficent_info_for_parsing: "🤔 Le document ne contient pas toutes les informations pour estimer votre remboursement. Il y manque :\n\n- Nom du bénéficiaire\n- Nom du professionnel de santé\n- Numéro d'inscription au registre ADELI du professionnel de santé\n- Type de soin / consultation\n- Prix de l'acte (pour chacun des actes séparément)\n- Base de remboursement de la Sécurité sociale ou BRSS (pour chacun des actes séparément)\n- Répartition du dépassement d'honoraires (sur chacun des actes)\n- Code de regroupement pour chacun des actes mentionnés\n- Code de regroupement pour chacun des éléments de l'équipement optique afin que nous puissions identifier la complexité de vos verres et appliquer la garantie correspondante\n- Date de chaque soin / consultation- Décompte complet et détaillé correspondant\n- Facture acquittée à la réception du colis\n- Devis initial avec les bases des actes envisagés\n- Type de soins ou facture détaillée correspondante\n- Si l'intervention porte sur les deux yeux ou un seul oeil (si oui, lequel ?)\n- Décompte complet et détaillé des soins remboursés, accompagné de ce document\n- Certains codes de regroupements ou codes actes apparaissant dans le document sont périmés et ne doivent plus être utilisés\n\n😀 Cependant sachez qu'en général, nous n'avons pas besoin de facture ! Si votre télétransmission est activée, et que votre soin est couvert par la Sécurité sociale, nous devrions recevoir les infos directement de leur part. Toutefois, si vous trouvez le délai trop long (> 15 jours), vous gardez bien sûr la possibilité :\n- de vérifier sur votre compte Améli si le remboursement a déjà été fait par la Sécurité sociale : si oui, vous pouvez nous transmettre le décompte à la place,\n- et, à défaut, de contacter le professionnel de santé pour savoir s'il a bien fait la déclaration de son côté", non_EU_prescription: "🤔 On dirait que ce document est une ordonnance obtenue en dehors de l'Union Européenne.\n\n👉 Nous acceptons uniquement les ordonnances venant des pays de l'Union Européenne.\n\nTransférer une nouvelle version du document : https://alan.com/app/dashboard?documentUploader=claim", other_insurer_decompte_100: '🧐 Nous ne pouvons pas effectuer de remboursement complémentaire car les soins ont déjà été intégralement pris en charge par votre complémentaire principale. Merci de nous transmettre uniquement des documents pour lesquels il reste des frais à votre charge.', cares_received_abroad: '😎 Alan couvre les soins des séjours à létranger sur la base de remboursement de la Sécurité Sociale française.\n\n👉 Pensez donc à conserver lensemble des factures acquittées et des justificatifs de paiement lors des soins effectués hors de France.\nEnsuite, adressez ces factures accompagnées du formulaire S3125 « Soins reçus à létranger », à votre caisse dassurance maladie.\n\nVous trouverez ce document sur le site Ameli : https://www.ameli.fr/sites/default/files/formualires/221/s3125.pdf\n\nNous récupérerons ensuite les informations de la Sécurité Sociale (télétransmission ou envoi du décompte de votre part)', secu_decompte_100: '🧐 Nous ne pouvons pas effectuer de remboursement complémentaire car les soins ont déjà été intégralement pris en charge par la sécurité sociale. Merci de nous transmettre uniquement des documents pour lesquels il reste des frais à votre charge.', ask_paid_invoice: f'🤔 Le document indique une demande de règlement de votre part.

👉 Nous vous invitons à nous transférer la facture acquittée (parfois appelée quittance ou bordereau).

Vous avez également la possibilité de nous transférer un décompte de Sécu (si possible au format .PDF pour un traitement automatique ultra-rapide !).

Transférer une nouvelle version du document : {HEALTH_DOCUMENT_UPLOAD_URL}', missing_mt_or_dmt: '🤔 Le document ne contient pas toutes les informations pour estimer votre remboursement.\n\n👉 Nous vous invitons à renseigner le DMT/MT en cliquant sur le bouton ci-dessous ou à nous renvoyer une version contenant les informations suivantes :\n- Code MT ou "Mode de traitement". Ce mode est un code utilisé par l\'Assurance Maladie pour identifier la nature du soin;\n- Code DMT ou "Discipline Médico-tarifaire". Cette discipline est utilisée dans le contexte hospitalier, et permet de caractériser l\'activité de l\'unité médicale ou du service qui vous a fourni le soin.\n\n☎️ Ces informations vous seront fournies en général par l\'hôpital, nous vous invitons à les contacter directement.', etiopathe: "😔 Nous ne pouvons pas couvrir ces soins.\n\n👉 En effet nous ne remboursons les séances d'étiopathie que lorsqu’elles sont dispensées par un professionnel enregistré à l'Institut Français d’Étiopathie.", kine_invoice_without_mezieres: "🤗 Nous avons bien reçu votre facture de kinésithérapie. Elle n'est pas nécessaire pour vous rembourser ! En effet, nous remboursons les actes de kinésithérapie à réception des informations de la Sécurité Sociale.\n\n👉 Si la télétransmission est active, nous devrions recevoir automatiquement ces informations, et nous pourrons calculer votre remboursement dans la foulée.\nSi la sécurité sociale vous a déjà remboursé et que la télétransmission tarde à arriver ou qu’elle est inactive, transférez-nous votre décompte Améli reprenant ces soins depuis votre application Alan.", hospital_receipt: '🤗 Nous avons bien le reçu de paiement pour vos frais hospitaliers.\n\n👉 Ce seul document ne nous permet pas de traiter votre dossier. Si vos soins ont eu lieu en :\nCLINIQUE PRIVEE :\n- Nous avons besoin du bordereau de facturation détaillé acquitté : celui-ci reprend l\'intégralité de vos frais hospitaliers et est à demander au secrétariat de l\'établissement dans lequel vous avez reçu les soins. Sans ce document, nous ne pourrons malheureusement pas traiter votre dossier.\nHOPITAL PUBLIC :\n- Nous avons besoin de l\'avis des sommes à payer détaillé (recto et verso s\'il y en a un).\n- Cependant, dans le cas où vous voyez déjà des soins hospitaliers en attente de traitement depuis votre compte Alan (frais de séjour, consultation anesthésiste à l\'hôpital, consultation chirurgien à l\'hôpital, etc.), n\'hésitez pas à nous contacter afin que nous puissions manuellement rapprocher les informations entre-elles.\n\nN\'hésitez pas à consulter <a href=\'https://alan.com/app/helpV2/a_Mon-remboursement-na-pas-ete-effectue-pour-mes-frais-dhopital\' target="_blank">cet article de notre centre d\'aide</a> pour y voir plus clair', laboratory_invoice: "🤗 Nous avons bien reçu votre facture d'analyses en laboratoire. Elle n'est pas nécessaire pour vous rembourser ! En effet, nous remboursons les actes de laboratoire à réception des informations de la Sécurité Sociale.\n\n👉 Si la télétransmission est active, nous devrions recevoir automatiquement ces informations, et nous pourrons calculer votre remboursement dans la foulée.\nSi la sécurité sociale vous a déjà remboursé et que la télétransmission tarde à arriver ou qu’elle est inactive, transférez-nous votre décompte Améli reprenant ces soins depuis votre application Alan.", titre_de_recette: "🤔 Le document indique que vous avez payé l'intégralité des soins.\n\n👉 Nous vous invitons à envoyer ce document à votre Caisse de Sécurité sociale afin de vous faire rembourser la part qu'ils vous doivent.\n\nLes informations nécessaires à votre remboursement par Alan seront alors disponibles.", acupuncture: '😔 Nous ne pouvons pas couvrir ces soins.\n\n👉 En effet nous ne remboursons les séances d’acupuncture que lorsqu’elles sont dispensées par un médecin diplômé, une sage-femme ou un dentiste (et seulement dans le cas où votre couverture Alan inclut ce type de soin).\n\nTransférer une nouvelle version du document : https://alan.com/app/dashboard?documentUploader=claim', alternative_medicine_quantity_limit_reached: '🤔 Le document ne contient pas toutes les informations pour estimer votre remboursement.\n\n👉 Nous vous invitons à nous renvoyer une version contenant les informations suivantes :\n\n- Date à laquelle chaque soin a été réalisé\n\nTransférer une nouvelle version du document : https://alan.com/app/dashboard?documentUploader=claim', quittance_100: '🤔 Nous avons bien pris connaissance de votre document, et il semblerait que vous ayez réglé l’intégralité de la somme. Vous devez donc l’envoyer à la Sécurité sociale, elle en a besoin pour vous rembourser.\n\n👉 Une fois que nous aurons reçu les informations de remboursements de la part de la Sécu, nous serons en mesure de vous rembourser à notre tour.', hospi_addressed_alan: "🧐 Nous ne pouvons pas vous rembourser cette facture car celle-ci est adressée à Alan.\n\nCela signifie donc que vous n'avez rien réglé. L'établissement doit envoyer la facture à Almérys (notre partenaire de tiers-payant).\n\nNous vous invitons donc à contacter l'établissement afin de leur faire part de cette information.", estimation_already_sent: f'🤭 On dirait que notre estimation vous à déjà été transmis.
  👉 Si cest une erreur, vous pouvez nous renvoyer le bon document. On sen occupera rapidement!
  Transférer une nouvelle version du document : {HEALTH_DOCUMENT_UPLOAD_URL}
        ', almerys_PEC_rejected: "🤔 Ce document indique qu'une prise en charge a été effectuée auprès d'Almérys et que celle-ci a été rejetée.\n\n  👉 En effet, conformément au refus délivré par Almerys, votre forfait est épuisé et nous ne pouvons donc intervenir.\n\n  Si vous avez des questions au sujet de vos forfait, n'hésitez pas à nous contacter directement depuis votre espace Alan.", almerys_PEC_request: f'🤔 Ce document indique qu'une demande de prise en charge a été effectuée auprès d'Almérys. Ce document ne nous est pas destiné et ne nous permet pas de vous rembourser.

  👉 Si vous avez réglé des frais, nous vous invitons à nous adresser la facture acquittée.

  Transférer une nouvelle version du document : {HEALTH_DOCUMENT_UPLOAD_URL}', arret_de_travail: '🤭 Ce document ne semble pas être lié à un remboursement de soins.\n\n  👉 Si vous souhaitez demander le versement d\'indemnités complémentaires, vous pouvez ajouter ce document depuis la page principale de votre compte Alan sur ordinateur, en sélectionnant l\'onglet "Prévoyance" au centre de l\'écran.\n  Si vous ne voyez pas d’onglet “Prévoyance” sur votre espace, c’est peut-être que votre employeur n’a pas souscrit de contrat de Prévoyance avec nous, ou que le dossier d’arrêt de travail n’est pas encore ouvert : dans les deux cas, nous vous inviter à vérifier tout cela avec lui !\n  Si vous êtes en portabilité, nous vous invitons à nous contacter pour ouvrir votre dossier.', feuille_de_soins: '😑 Nous avons bien pris connaissance de votre feuille de soin. Il est important de l’envoyer uniquement à la Sécurité sociale, elle en a besoin pour vous rembourser.\n  👉 Une fois votre soin effectué et le remboursement de la Sécu fait, toutes les informations nécessaires seront disponibles pour qu’Alan vous rembourse. Plus besoin de nous envoyer de feuille de soin.', french_state_payment_attestation: " Nous avons bien reçu votre attestation de paiement. Elle n'est pas nécessaire pour vous rembourser ! Si ce n'est pas déjà fait, envoyez-nous le bordereau de facturation acquitté pour ces soins.\n\n  👉 Si vous avez reçu une demande de notre part pour valider que vous avez avancé les soins, merci de valider cela directement sur la page du soin correspondant dans l'application. Si vous n'avez reçu aucune demande de notre part, on est bons ! 👌", hospi_PEC_request: '🤔 Ce document est une demande de prise en charge pour des frais d’hospitalisation.\n\n  👉 Vous pouvez à tout moment et en toute autonomie effectuer votre demande depuis votre compte.\n  Depuis un ordinateur :\n  - Rendez-vous sur votre espace perso et cliquez en bas à gauche sur “Séjour à l’hôpital”.\n  - Une petite explication sur les conditions d’une prise en charge s’affiche, avec le bouton “Demander une prise en charge”.\n  - Cliquez dessus et remplissez le formulaire !\n  Depuis l’application mobile Alan :\n  - Depuis l’onglet “Demandes” (en bas de l’écran), cliquez sur “Séjour à l’hôpital”.', indemnite_journalieres: '🤭 Ce document ne semble pas être lié à un remboursement de soins.\n\n  👉 S\'il s’agit pour vous de justifier de l\'indemnisation de votre arrêt de travail par la Sécu, dans le but de demander le versement de vos indemnités complémentaires, vous pouvez ajouter ce document depuis la page principale de votre compte Alan sur ordinateur, en sélectionnant l\'onglet "Prévoyance" au centre de l\'écran.\n  Si vous ne voyez pas d’onglet “Prévoyance” sur votre espace depuis un ordinateur, c’est peut-être que votre employeur n’a pas souscrit de contrat de Prévoyance avec nous, ou qu\'il n\'a pas encore ouvert le dossier d’arrêt de travail : dans les deux cas, nous vous inviter à vérifier tout cela avec lui !\n\n  👉 S\'il s’agit de justifier du maintien de votre portabilité, vous pouvez ajouter ce document, toujours depuis la page principale de votre compte Alan sur ordinateur, en cliquant sur "Couverture - Ex-salarié".', payment_card_ticket: "🤗 Nous avons bien reçu votre ticket de carte bleue. Il n'est pas nécessaire pour vous rembourser ! Si ce n'est pas déjà fait et que le soin effectué n'est pas couvert par la Sécurité sociale, envoyez-nous la facture détaillée et acquittée.\n\n  👉 Si vous avez reçu une demande de notre part pour valider que vous avez avancé les soins, merci de valider cela directement sur la page du soin correspondant dans l'application. Si vous n'avez reçu aucune demande de notre part, on est bons ! 👌", RIB: ' On dirait que ce document n’a pas de rapport avec un remboursement de santé.\n\n  👉 S’il s’agit de mettre à jour votre RIB, vous pouvez le faire directement depuis votre compte:\n  - Cliquez sur votre nom tout en haut à droite de l’écran puis sur Mon compte\n  - Cliquez sur Coordonnées bancaires au centre de l’écran\n  - Cliquez ensuite sur Modifier pour modifier votre RIB.', PEC_request: " Ce document est une demande de prise en charge qui doit être transmise à notre partenaire de tiers-payant.\n\n  👉 Merci d'indiquer à votre professionnel de santé de s’adresser directement à Almerys.", generic_not_usable: f'🤭 On dirait que ce document n’a pas de rapport avec un remboursement de santé.

  👉 Si cest une erreur, vous pouvez nous renvoyer le bon document. On sen occupera rapidement !

  Transférer une nouvelle version du document : {HEALTH_DOCUMENT_UPLOAD_URL}', attestation_perimee: f'👉 Nous vous invitons à nous renvoyer une attestation en cours de validité (datée de moins de 3 mois).

  Transférer une nouvelle version du document : {HEALTH_DOCUMENT_UPLOAD_URL}', ss_attestation_incomplete: f'🤭 Le document ne contient pas toutes les informations pour que nous puissions le traiter. Il nous manque des informations concernant le parent de référence.

  👉 Nous vous invitons à nous renvoyer une version mentionnant le parent de référence ainsi que les bénéficiaires qui lui sont rattachés.

  Transférer une nouvelle version du document : {HEALTH_DOCUMENT_UPLOAD_URL}'}}
SUB_REJECTION_LABEL module-attribute
SUB_REJECTION_LABEL = {
    beneficiary_name_missing: "Le nom du bénéficiaire n'est pas spécifié",
    beneficiary_name_not_in_policy: "Le nom du bénéficiaire n'est pas couvert par le contrat",
    beneficiary_name_not_matching: "Le nom du bénéficiaire ne correspond pas à celui enregistré sur le contrat",
    health_professional_name_missing: "Le nom du professionnel de santé n'est pas spécifié",
    health_professional_name_exact_name_unknown: "Le nom exact du professionnel de santé n'est pas spécifié",
    health_professional_adeli_missing: "Le Nº ADELI/RPPS n'est pas spécifié",
    health_professional_adeli_not_readable: "Le Nº ADELI/RPPS n'est pas complet/lisible",
    health_professional_signature_missing: "La signature n'est pas présente",
    health_professional_signature_stamp_missing: "Le tampon n'est pas présent",
    care_type_unknown: "Le type de soin n'est pas spécifié",
    care_act_price_not_separated: "Le prix de chaque soin n'est pas indiqué séparément",
    care_act_missing: "Le code de regroupement pour chacun des actes mentionnés n'est pas spécifié",
    care_act_optic_equipment_missing: "Le code de regroupement pour chacun des éléments de l’équipement optique n’est pas spécifié",
    care_act_expired_code: "Certains codes de regroupements ou codes actes apparaissant dans le document sont périmés et ne doivent plus être utilisés",
    care_act_not_specified: "Le détail des actes envisagés n'est pas spécifié",
    care_act_eyes_missing: "Oeil(s) non spécifié(s)",
    quote_date_missing: "La date n'est pas spécifiée",
    quote_date_invalid: "La date n'est pas valide",
    quote_date_expired: "Le devis a expiré",
    brss_missing: "La BRSS de chaque soin n'est pas spécifiée",
    brss_not_separated: "La BRSS de chaque soin n'est pas indiquée séparément",
    exceeding_fees_not_separated: "Les dépassements d'honoraires de chaque soin ne sont pas indiqués séparément",
    surco_quote_missing: "Le devis initial avec les bases des actes envisagés est manquant",
}

rejection

CATEGORY_REJECTION_REASONS module-attribute
CATEGORY_REJECTION_REASONS = {
    prescription: [
        unreadable,
        unusable_partial_document,
        duplicate,
        beneficiary_not_covered,
        insufficent_info_for_parsing,
        non_EU_prescription,
    ],
    mutuelle_decompte: [
        unreadable,
        unusable_partial_document,
        duplicate,
        beneficiary_not_covered,
        insufficent_info_for_parsing,
        other_insurer_decompte_100,
    ],
    ss_decompte: [
        unreadable,
        unusable_partial_document,
        duplicate,
        beneficiary_not_covered,
        cares_received_abroad,
        insufficent_info_for_parsing,
        secu_decompte_100,
    ],
    invoice: [
        unreadable,
        unusable_partial_document,
        duplicate,
        ask_paid_invoice,
        beneficiary_not_covered,
        cares_received_abroad,
        insufficent_info_for_parsing,
        missing_mt_or_dmt,
        etiopathe,
        kine_invoice_without_mezieres,
        hospital_receipt,
        laboratory_invoice,
        titre_de_recette,
        acupuncture,
        alternative_medicine_quantity_limit_reached,
        quittance_100,
        hospi_addressed_alan,
    ],
    mutuelle_no_coverage_attestation: [
        unreadable,
        unusable_partial_document,
        duplicate,
        beneficiary_not_covered,
        insufficent_info_for_parsing,
    ],
    birth_or_adoption_certificate: [
        unreadable,
        unusable_partial_document,
        duplicate,
        beneficiary_not_covered,
        insufficent_info_for_parsing,
    ],
    quote: [
        unreadable,
        unusable_partial_document,
        duplicate,
        beneficiary_not_covered,
        cares_received_abroad,
        insufficent_info_for_parsing,
        missing_mt_or_dmt,
        etiopathe,
        acupuncture,
        estimation_already_sent,
    ],
    unsupported: [
        unreadable,
        duplicate,
        beneficiary_not_covered,
        insufficent_info_for_parsing,
        almerys_PEC_rejected,
        almerys_PEC_request,
        arret_de_travail,
        feuille_de_soins,
        french_state_payment_attestation,
        hospi_PEC_request,
        indemnite_journalieres,
        payment_card_ticket,
        RIB,
        PEC_request,
        generic_not_usable,
    ],
    non_claims_prescription: DEFAULT_REJECTION_REASONS,
    medical_certificate: DEFAULT_REJECTION_REASONS,
    medical_imaging: DEFAULT_REJECTION_REASONS,
    medical_results: DEFAULT_REJECTION_REASONS,
    cancel_teletransmission_request: DEFAULT_REJECTION_REASONS,
    other_health: [
        unreadable,
        duplicate,
        beneficiary_not_covered,
        insufficent_info_for_parsing,
        almerys_PEC_rejected,
        almerys_PEC_request,
        arret_de_travail,
        feuille_de_soins,
        french_state_payment_attestation,
        hospi_PEC_request,
        indemnite_journalieres,
        payment_card_ticket,
        RIB,
        PEC_request,
        generic_not_usable,
    ],
    ss_attestation: [
        unreadable,
        unusable_partial_document,
        duplicate,
        beneficiary_not_covered,
        insufficent_info_for_parsing,
        attestation_perimee,
        ss_attestation_incomplete,
    ],
}
DEFAULT_REJECTION_REASONS module-attribute
DEFAULT_REJECTION_REASONS = [
    unreadable,
    unusable_partial_document,
    duplicate,
    beneficiary_not_covered,
    insufficent_info_for_parsing,
    non_EU_prescription,
]
get_rejection_reasons_per_category
get_rejection_reasons_per_category(document_category)

Get the list of rejection reason for a document category with the default message we want to send to the user return: list[RejectionReason]

Source code in components/fr/public/document_parsing/queries/rejection.py
def get_rejection_reasons_per_category(
    document_category: ClaimInsuranceDocumentCategory,
) -> list[RejectionReason]:
    """
    Get the list of rejection reason for a document category with the default message we want to send to the user
    return: list[RejectionReason]
    """
    reason_keys = CATEGORY_REJECTION_REASONS.get(document_category, [])
    return [
        RejectionReason(
            reason=key, defaultMessage=REJECTION_BODY_FOR_USER["default_message"][key]
        )
        for key in reason_keys
    ]
get_sub_rejection_reasons_per_category
get_sub_rejection_reasons_per_category(document_category)

Get the list of sub rejection reason for a document category For now we only return for quote return: dict[str, list[RejectionReason]] | None

Source code in components/fr/public/document_parsing/queries/rejection.py
def get_sub_rejection_reasons_per_category(
    document_category: ClaimInsuranceDocumentCategory,
) -> dict[str, list[RejectionReason]] | None:
    """
    Get the list of sub rejection reason for a document category
    For now we only return for quote
    return: dict[str, list[RejectionReason]] | None
    """
    if document_category == ClaimInsuranceDocumentCategory.quote:
        return {
            DocumentRejectionReason.insufficent_info_for_parsing: [
                RejectionReason(reason=key, defaultMessage=SUB_REJECTION_LABEL[key])
                for key in DocumentSubRejectionReason
            ]
        }
    return None

components.fr.public.dsn

company

Set of functions to query information about companies present in the DSN.

The DSN data model is composed of DSN "companies" and "establishments". A "company" in the context of the module does NOT refer specifically to a DSN "company" but to any entity (usually an "establishment").

get_company_from_siret

get_company_from_siret(siret)

Fetch the company based on its SIRET.

Source code in components/fr/public/dsn/company.py
def get_company_from_siret(siret: str) -> DSNCompanyData | None:
    """
    Fetch the company based on its SIRET.
    """
    from components.fr.internal.dsn.models.dsn_company import DsnCompany
    from components.fr.internal.dsn.models.dsn_establishment import DsnEstablishment

    if len(siret) != 14:
        raise ValueError("SIRET must be 14 characters long")
    siren = siret[:9]
    nic = siret[9:]

    result = (
        current_session.query(DsnCompany, DsnEstablishment)  # noqa: ALN085
        .select_from(DsnEstablishment)
        .join(DsnCompany)
        .filter(
            DsnCompany.siren == siren,
            DsnEstablishment.nic == nic,
        )
        # Assumption: the SIRET refers to a unique DSN Establishment
        .one_or_none()
    )

    if not result:
        return None

    dsn_company, dsn_establishment = result

    return DSNCompanyData(
        siren=siren,
        nic=nic,
        name=dsn_establishment.name,
        ape=dsn_establishment.apet,
        postal_code=dsn_establishment.postal_code,
        city=dsn_establishment.city,
    )

contract_download

get_contracts_download_zip_for_companies

get_contracts_download_zip_for_companies(company_ids)

This function is used to download the contracts (both PDF and XML, in ZIP) for a given companies.

Parameters:

Name Type Description Default
company_ids list[int] | set[int]

List of company IDs to download contracts for

required

Returns:

Type Description
tuple[dict[str, str] | None, bool]

tuple[dict[str, str] | None, bool]: Zip attachment and boolean indicating if any FDP was found

Source code in components/fr/public/dsn/contract_download.py
def get_contracts_download_zip_for_companies(
    company_ids: list[int] | set[int],
) -> tuple[dict[str, str] | None, bool]:
    """
    This function is used to download the contracts (both PDF and XML, in ZIP) for a given companies.

    Args:
        company_ids: List of company IDs to download contracts for

    Returns:
        tuple[dict[str, str] | None, bool]: Zip attachment and boolean indicating if any FDP was found
    """
    attachments: dict[str, IO[Any]] = {}
    has_fdp = False

    companies = current_session.scalars(
        select(Company).filter(Company.id.in_(company_ids))
    ).all()

    for company in companies:
        company_folder = f"{company.display_name}"

        # Fdp covers all health/prev contracts of the company, in case of multiple contracts which contract we use to get the fdp is irrelevant
        health_contracts = [
            contract
            for contract in company.contracts
            if contract.status == ContractStatus.active
        ]
        if not health_contracts:
            continue

        latest_health_fdp = None
        for contract in health_contracts:
            if contract.latest_fiche_de_parametrage:
                latest_health_fdp = contract.latest_fiche_de_parametrage
                break

        prevoyance_contracts = [
            contract
            for contract in company.prevoyance_contracts
            if contract.status == ContractStatus.active
        ]

        latest_prevoyance_fdp = None
        for contract in prevoyance_contracts:  # type: ignore[assignment]
            if contract.latest_fiche_de_parametrage:
                latest_prevoyance_fdp = contract.latest_fiche_de_parametrage
                break

        if latest_health_fdp:
            has_fdp = True
            _add_attachments(attachments, company_folder, latest_health_fdp.id)
        if latest_prevoyance_fdp:
            has_fdp = True
            _add_attachments(attachments, company_folder, latest_prevoyance_fdp.id)

    zip_attachment: dict[str, str] | None = (
        zip_as_attachment(
            attachments,
            "Alan - Instructions et fiches de paramétrage DSN.zip"
            if has_fdp
            else "Alan - Instructions.zip",
        )
        if attachments
        else None
    )

    return zip_attachment, has_fdp

entities

DSNCompanyData dataclass

DSNCompanyData(siren, nic, name, postal_code, city, ape)

Bases: DataClassJsonMixin

The data about a company, from the DSN establishments data.

ape instance-attribute
ape

The APE code of the company.

city instance-attribute
city
name instance-attribute
name

The entity's "enseigne" from the DSN.

nic instance-attribute
nic
postal_code instance-attribute
postal_code
siren instance-attribute
siren
siret property
siret

The SIRET of the company.

components.fr.public.employees

employees

get_employees_for_admin_dashboard

get_employees_for_admin_dashboard(
    company_ids,
    scope_ids,
    cursor,
    limit,
    search,
    employee_types,
    professional_categories,
    ccn_ids,
    status_details,
)

Get employees for the admin dashboard. It won't make any admin rights checks, please make sure the user has the rights to see the given companies & scopes. :param company_ids: The companies to fetch the employees from, it will fetch all employees from the companies, except if scopes for those companies are also given :param scope_ids: The scopes to fetch the employees from, if some scopes belong to given companies ids, only employees from those scopes will be fetched :param cursor: The cursor to handle pagination :param limit: The limit of employees to fetch :param search: The search string to filter employees :param employee_types: The employee types to filter :param professional_categories: The professional categories to filter :param ccn_ids: The CCN ids to filter :return: The paginated employees, filtered by given filters

Source code in components/fr/public/employees/employees.py
def get_employees_for_admin_dashboard(
    company_ids: Iterable[int],
    scope_ids: Iterable[uuid.UUID],
    cursor: int,
    limit: int,
    search: str | None,
    employee_types: set[EmployeeQueryBuilderEmployeeType] | None,
    professional_categories: set[ProfessionalCategory | None] | None,
    ccn_ids: set[int | None] | None,
    status_details: set[StatusDetail] | None,
) -> PaginatedEmployeesForAdminDashboard:
    """
    Get employees for the admin dashboard.
    It won't make any admin rights checks, please make sure the user has the rights to see the given companies & scopes.
    :param company_ids: The companies to fetch the employees from, it will fetch all employees from the companies, except if scopes for those companies are also given
    :param scope_ids: The scopes to fetch the employees from, if some scopes belong to given companies ids, only employees from those scopes will be fetched
    :param cursor: The cursor to handle pagination
    :param limit: The limit of employees to fetch
    :param search: The search string to filter employees
    :param employee_types: The employee types to filter
    :param professional_categories: The professional categories to filter
    :param ccn_ids: The CCN ids to filter
    :return: The paginated employees, filtered by given filters
    """
    nics_by_company_id, entity_codes_by_company_id = get_scope_filtering_values(
        scope_ids=scope_ids
    )
    all_company_ids = (
        set(company_ids)
        | set(nics_by_company_id.keys())
        | set(entity_codes_by_company_id.keys())
    )

    return internal_get_employees_for_admin_dashboard(
        all_company_ids,
        cursor,
        limit,
        nics_by_company_id,
        entity_codes_by_company_id,
        search,
        employee_types,
        professional_categories,
        ccn_ids,
        status_details,
    )

terminated_employees

get_terminated_employee_details

get_terminated_employee_details(user_id, company_id)
Source code in components/fr/public/employees/terminated_employees.py
def get_terminated_employee_details(  # noqa: D103
    user_id: str, company_id: int
) -> TerminatedEmployeeDetails:
    return internal_get_terminated_employee_details(user_id, company_id)

get_terminated_employees

get_terminated_employees(
    company_ids,
    operational_scope_ids,
    cursor,
    limit,
    search,
    terminated_employee_types,
)
Source code in components/fr/public/employees/terminated_employees.py
def get_terminated_employees(  # noqa: D103
    company_ids: Iterable[int],
    operational_scope_ids: Iterable[UUID],
    cursor: int,
    limit: int,
    search: str | None,
    terminated_employee_types: set[TerminatedEmployeeType] | None,
) -> PaginatedTerminatedEmployeesForAdminDashboard:
    from components.fr.internal.operational_scopes.business_logic.queries import (
        get_scope_filtering_values,
        get_scope_filtering_values_for_user,
    )

    nics_by_company_id, entity_codes_by_company_id = (
        get_scope_filtering_values(operational_scope_ids)
        if operational_scope_ids
        else get_scope_filtering_values_for_user(g.current_user.id, set(company_ids))
    )
    all_company_ids = (
        set(company_ids)
        | set(nics_by_company_id.keys())
        | set(entity_codes_by_company_id.keys())
    )

    return internal_get_terminated_employees(
        all_company_ids,
        cursor,
        limit,
        search,
        terminated_employee_types=terminated_employee_types,
        nics_by_company_id=nics_by_company_id,
        entity_codes_by_company_id=entity_codes_by_company_id,
    )

get_terminated_employees_counts

get_terminated_employees_counts(
    company_ids, operational_scope_ids
)
Source code in components/fr/public/employees/terminated_employees.py
def get_terminated_employees_counts(  # noqa: D103
    company_ids: Iterable[int],
    operational_scope_ids: Iterable[UUID],
) -> TerminatedEmployeesByTypeCounts:
    from components.fr.internal.operational_scopes.business_logic.queries import (
        get_scope_filtering_values,
        get_scope_filtering_values_for_user,
    )

    nics_by_company_id, entity_codes_by_company_id = (
        get_scope_filtering_values(operational_scope_ids)
        if operational_scope_ids
        else get_scope_filtering_values_for_user(g.current_user.id, set(company_ids))
    )
    all_company_ids = (
        set(company_ids)
        | set(nics_by_company_id.keys())
        | set(entity_codes_by_company_id.keys())
    )

    return internal_get_terminated_employees_counts(
        company_ids=all_company_ids,
        nics_by_company_id=nics_by_company_id,
        entity_codes_by_company_id=entity_codes_by_company_id,
    )

components.fr.public.employment

fr_country_gateway

FrCountryGateway

Bases: CountryGateway[FrExtendedValues]

Implementation of the Employment Component's CountryGateway for France.

are_companies_in_same_account
are_companies_in_same_account(company_id_1, company_id_2)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def are_companies_in_same_account(
    self, company_id_1: str, company_id_2: str
) -> bool:
    from components.fr.internal.models.company import Company

    company_1 = get_or_raise_missing_resource(Company, int(company_id_1))
    company_2 = get_or_raise_missing_resource(Company, int(company_id_2))
    return company_1.account_id == company_2.account_id
get_account_name
get_account_name(account_id)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_account_name(self, account_id: UUID) -> str:
    from components.fr.internal.models.account import Account

    return get_or_raise_missing_resource(Account, account_id).name
get_blocked_invitations_validation_broadcast_id
get_blocked_invitations_validation_broadcast_id()
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_blocked_invitations_validation_broadcast_id(self) -> str | None:
    return "221"
get_company_information
get_company_information(company_id)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_company_information(
    self,
    company_id: str,
) -> CompanyInformation:
    from components.fr.internal.models.company import Company

    company = get_or_raise_missing_resource(Company, int(company_id))
    return CompanyInformation(
        display_name=company.display_name,
        account_id=company.account_id,
        is_opt_out_for_offshoring=company.is_opt_out_for_offshoring,
    )
get_consumers_to_notify_for_legacy_backfill
get_consumers_to_notify_for_legacy_backfill()

Returns the consumers to notify for the legacy backfill source type.

Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_consumers_to_notify_for_legacy_backfill(
    self,
) -> set[EmploymentConsumer[FrExtendedValues]]:
    """
    Returns the consumers to notify for the legacy backfill source type.
    """
    from components.occupational_health.public.employment.employment_consumer import (
        occupational_health_employment_change_consumer,
    )

    return {
        occupational_health_employment_change_consumer,
    }
get_employee_identifier_for_country
get_employee_identifier_for_country(extended_values)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_employee_identifier_for_country(
    self, extended_values: FrExtendedValues
) -> str | None:
    return extended_values.get("ssn") or extended_values.get("ntt")
get_employment_consumers
get_employment_consumers()

Gets all employment consumers contributed by this country.

Notes: 1. ALL Employment Consumers will be called regardless of the country of origin. 2. The function that will be called must have all local code as LOCAL imports - otherwise, this breaks Canada (where loading non-CA models is forbidden)

Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_employment_consumers(self) -> set[EmploymentConsumer[FrExtendedValues]]:
    """
    Gets all employment consumers contributed by this country.

    Notes:
    1. ALL Employment Consumers will be called regardless of the country of origin.
    2. The function that will be called must have all local code as LOCAL imports - otherwise, this breaks Canada
    (where loading non-CA models is forbidden)
    """
    from components.fr.public.operational_scopes.employment_consumer import (
        operational_scopes_employment_change_consumer,
    )
    from components.fr_health_insurance_affiliation.public.employment_consumer import (
        fr_health_affiliation_employment_change_consumer,
    )
    from components.occupational_health.public.employment.employment_consumer import (
        occupational_health_employment_change_consumer,
    )

    return {
        operational_scopes_employment_change_consumer,
        fr_health_affiliation_employment_change_consumer,
        occupational_health_employment_change_consumer,
    }
get_retry_function
get_retry_function()

(Advanced) Get the function used for retrying Core Blocked Movements.

You should generally not need to implement this.

Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_retry_function(self) -> RetryFunction[FrExtendedValues]:
    """
    (Advanced) Get the function used for retrying Core Blocked Movements.

    You should generally not need to implement this.
    """
    from components.fr.internal.fr_employment_data_sources.business_logic.global_affiliation_transition.ingest_employment_declaration import (
        ingest_employment_declaration_with_fr_legacy_update,
    )

    return ingest_employment_declaration_with_fr_legacy_update
get_source_detail_for_blocked_movement
get_source_detail_for_blocked_movement(
    employment_source_data_id,
)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_source_detail_for_blocked_movement(
    self, employment_source_data_id: UUID
) -> str | None:
    from components.employment.public.business_logic.queries.employment_source_data import (
        get_employment_source_data_from_id,
    )

    employment_source_data = get_employment_source_data_from_id(
        employment_source_data_id
    )

    if employment_source_data.source_type == SourceType.fr_external_api:
        if "payfit" in employment_source_data.metadata.get("endpoint", ""):
            return "payfit"
    return None
get_upstream_retry_handler
get_upstream_retry_handler(source_type)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_upstream_retry_handler(
    self, source_type: SourceType
) -> UpstreamBlockedMovementRetryFunction[FrExtendedValues] | None:
    return _fr_upstream_retry_handlers.get(source_type)
get_user_admined_company_ids
get_user_admined_company_ids(user_id)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_user_admined_company_ids(self, user_id: str) -> list[str]:
    from components.fr.internal.business_logic.company.queries.company import (
        get_user_admined_company_ids,
    )

    return [str(id) for id in get_user_admined_company_ids(int(user_id))]
get_user_full_name
get_user_full_name(user_id)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def get_user_full_name(self, user_id: str) -> str | None:
    from components.fr.internal.models.user import User

    user = get_resource_or_none(User, int(user_id))
    return user.full_name if user else None
last_stale_invite_notification_email_sent_to_admin_on
last_stale_invite_notification_email_sent_to_admin_on(
    company_id,
)
Source code in components/fr/public/employment/fr_country_gateway.py
@override
def last_stale_invite_notification_email_sent_to_admin_on(
    self, company_id: str
) -> datetime | None:
    from components.fr.internal.models.company import Company
    from components.fr.internal.models.email_log import EmailLog

    company = get_or_raise_missing_resource(Company, int(company_id))

    new_blocked_invitation_broadcast = "cio-broadcast-221"

    admins = [
        company_admin.user_id
        for company_admin in company.company_admins  # company_admins include account and operation scope admins
    ]

    stale_invite_notification_email = (
        current_session.scalars(
            select(EmailLog)
            .filter(
                EmailLog.mailer == new_blocked_invitation_broadcast,
                EmailLog.sent.is_(
                    True
                ),  # mypy thinks is_ doesn't exist but it does
                EmailLog.user_id.in_(admins),
            )
            .order_by(EmailLog.sent_at.desc())
        )
        .unique()
        .first()
    )
    if not stale_invite_notification_email:
        return None
    return stale_invite_notification_email.sent_at

fr_extended_values

EmployeeTerminationType

Bases: AlanBaseEnum

ani_member_declared_new_job class-attribute instance-attribute
ani_member_declared_new_job = 'ani_member_declared_new_job'
deceased class-attribute instance-attribute
deceased = 'deceased'
departure_end_apprentice class-attribute instance-attribute
departure_end_apprentice = 'departure_end_apprentice'
departure_end_cdd class-attribute instance-attribute
departure_end_cdd = 'departure_end_cdd'
departure_negociated_termination class-attribute instance-attribute
departure_negociated_termination = (
    "departure_negociated_termination"
)
departure_resignation class-attribute instance-attribute
departure_resignation = 'departure_resignation'
departure_resignation_with_ani class-attribute instance-attribute
departure_resignation_with_ani = (
    "departure_resignation_with_ani"
)
departure_termination class-attribute instance-attribute
departure_termination = 'departure_termination'
departure_termination_fault class-attribute instance-attribute
departure_termination_fault = 'departure_termination_fault'
dispensed class-attribute instance-attribute
dispensed = 'dispensed'
fraud_confirmed class-attribute instance-attribute
fraud_confirmed = 'fraud_confirmed'
invited_in_ani class-attribute instance-attribute
invited_in_ani = 'invited_in_ani'
invited_in_retirement class-attribute instance-attribute
invited_in_retirement = 'invited_in_retirement'
migrated class-attribute instance-attribute
migrated = 'migrated'
other class-attribute instance-attribute
other = 'other'
retirement class-attribute instance-attribute
retirement = 'retirement'
retirement_with_ani class-attribute instance-attribute
retirement_with_ani = 'retirement_with_ani'
unpaid_invoices class-attribute instance-attribute
unpaid_invoices = 'unpaid_invoices'
unpaid_leave class-attribute instance-attribute
unpaid_leave = 'unpaid_leave'
unpaid_leave_exit class-attribute instance-attribute
unpaid_leave_exit = 'unpaid_leave_exit'

ExemptionType

Bases: AlanBaseEnum

acs_or_cmu class-attribute instance-attribute
acs_or_cmu = 'acs_or_cmu'
already_covered_indiv class-attribute instance-attribute
already_covered_indiv = 'already_covered_indiv'
already_covered_military class-attribute instance-attribute
already_covered_military = 'already_covered_military'
already_covered_other_employer class-attribute instance-attribute
already_covered_other_employer = (
    "already_covered_other_employer"
)
already_covered_partner class-attribute instance-attribute
already_covered_partner = 'already_covered_partner'
already_covered_regime_local class-attribute instance-attribute
already_covered_regime_local = (
    "already_covered_regime_local"
)
already_covered_state_agent class-attribute instance-attribute
already_covered_state_agent = 'already_covered_state_agent'
cdd_already_covered class-attribute instance-attribute
cdd_already_covered = 'cdd_already_covered'
cdd_less_than_a_year class-attribute instance-attribute
cdd_less_than_a_year = 'cdd_less_than_a_year'
cdd_less_than_three_months class-attribute instance-attribute
cdd_less_than_three_months = 'cdd_less_than_three_months'
continue_to_refuse_coverage class-attribute instance-attribute
continue_to_refuse_coverage = 'continue_to_refuse_coverage'
health_payment_beneficiary class-attribute instance-attribute
health_payment_beneficiary = 'health_payment_beneficiary'
low_salary class-attribute instance-attribute
low_salary = 'low_salary'
madelin class-attribute instance-attribute
madelin = 'madelin'
refuse_coverage class-attribute instance-attribute
refuse_coverage = 'refuse_coverage'

FrEmploymentDeclaration module-attribute

FrEmploymentDeclaration = EmploymentDeclaration[
    FrExtendedValues
]

FrExtendedValues

Bases: ExtendedValuesDict

Local values for France employments in the Employment Component

ccn_code instance-attribute
ccn_code
entity_code instance-attribute
entity_code
invite_address instance-attribute
invite_address
invite_as_exempted instance-attribute
invite_as_exempted
invite_birth_date instance-attribute
invite_birth_date
invite_email instance-attribute
invite_email
invite_exemption_type instance-attribute
invite_exemption_type
invite_iban instance-attribute
invite_iban
invite_in_unpaid_leave instance-attribute
invite_in_unpaid_leave
invite_unpaid_leave_ends_on instance-attribute
invite_unpaid_leave_ends_on
invite_unpaid_leave_mandatory_coverage instance-attribute
invite_unpaid_leave_mandatory_coverage
is_alsace_moselle instance-attribute
is_alsace_moselle
nic instance-attribute
nic
ntt instance-attribute
ntt
professional_category instance-attribute
professional_category
ssn instance-attribute
ssn
termination_type instance-attribute
termination_type

ProfessionalCategory

Bases: AlanBaseEnum

This enum is used to model both the professional category of persons and abstract objects, such as contracts: - persons can be "cadres", "non_cadres", or NULL which models we don't know their professional category; - abstract entities can be "cadres", "non_cadres", or NULL which models we don't have to apply any constraint on the object regarding the professional category. Either way, we want to get rid of the "all" value. It was more targeted at the second use case but it's more coherent to model objects the same way for both kind of objects.

all class-attribute instance-attribute
all = 'all'
cadres class-attribute instance-attribute
cadres = 'cadres'
display_name staticmethod
display_name(population)
Source code in components/fr/internal/models/enums/professional_category.py
@staticmethod
def display_name(population: Optional["ProfessionalCategory"]) -> str | None:
    return (
        {
            ProfessionalCategory.cadres: "cadres",
            ProfessionalCategory.non_cadres: "non cadres",
        }.get(population)
        if population
        else None
    )
non_cadres class-attribute instance-attribute
non_cadres = 'non_cadres'
suffix staticmethod
suffix(population)
Source code in components/fr/internal/models/enums/professional_category.py
@staticmethod
def suffix(population: Optional["ProfessionalCategory"]) -> str:
    return (
        {
            ProfessionalCategory.cadres: PROFESSIONAL_CATEGORY_CADRE_SUFFIX,
            ProfessionalCategory.non_cadres: PROFESSIONAL_CATEGORY_NON_CADRE_SUFFIX,
        }.get(population, "")
        if population
        else ""
    )

components.fr.public.entities

components.fr.public.enums

components.fr.public.events

subscription

subscribe_to_events

subscribe_to_events()

All event subscriptions for France should be done here.

Source code in components/fr/public/events/subscription.py
def subscribe_to_events() -> None:
    """
    All event subscriptions for France should be done here.
    """
    from components.ca.public.events.subscription import subscribe_to_ca_global_events
    from components.es.public.events.subscription import subscribe_to_es_global_events
    from components.fr.internal.events.subscribers import (
        notify_tracking_when_address_changed,
        unblock_non_verified_accounts_by_verification_request_id,
    )
    from components.global_profile.public.events import (
        ProfileAddressChanged,
    )
    from components.id_verification.public.events import (
        IdVerificationValidatedForOnboarding,
    )
    from shared.messaging.broker import get_message_broker
    from shared.queuing.config import LOW_PRIORITY_QUEUE

    message_broker = get_message_broker()

    subscribe_to_es_global_events()
    subscribe_to_ca_global_events()

    # Subscriptions to global profile events

    message_broker.subscribe_async(
        ProfileAddressChanged,
        notify_tracking_when_address_changed,
        queue_name=LOW_PRIORITY_QUEUE,
    )

    # Subscriptions to ID verification events
    message_broker.subscribe_async(
        IdVerificationValidatedForOnboarding,
        unblock_non_verified_accounts_by_verification_request_id,
        queue_name=LOW_PRIORITY_QUEUE,
    )

components.fr.public.feature

app_user_has_access_to_feature_flag

app_user_has_access_to_feature_flag(
    app_user_id,
    *,
    feature_name,
    eligible_user_ids_metadata_key="eligible_user_ids",
    eligible_company_ids_metadata_key="eligible_company_ids"
)

Generic helper to check whether an app user has access to a feature flag in FR.

The access rules are: - If the feature is enabled for the user directly -> True - Else, if the user ID is present in the feature metadata under eligible_user_ids_metadata_key -> True - Else, if the user's default company is present in the feature metadata under eligible_company_ids_metadata_key -> True - Otherwise -> False

All user IDs here are FR-side integers, but we receive app_user_id as a str.

Source code in components/fr/public/feature.py
def app_user_has_access_to_feature_flag(
    app_user_id: str,
    *,
    feature_name: str,
    eligible_user_ids_metadata_key: str = "eligible_user_ids",
    eligible_company_ids_metadata_key: str = "eligible_company_ids",
) -> bool:
    """
    Generic helper to check whether an app user has access to a feature flag in FR.

    The access rules are:
    - If the feature is enabled for the user directly -> True
    - Else, if the user ID is present in the feature metadata under eligible_user_ids_metadata_key -> True
    - Else, if the user's default company is present in the feature metadata under eligible_company_ids_metadata_key -> True
    - Otherwise -> False

    All user IDs here are FR-side integers, but we receive app_user_id as a str.
    """
    try:
        from components.fr.internal.business_logic.feature import (
            get_feature_metadata_value,
            is_feature_enabled_for_user_id,
        )

        user_id_int = int(app_user_id)

        # Direct feature flag enabled for this user
        if is_feature_enabled_for_user_id(feature_name, user_id_int):
            return True

        # Explicit user allowlist from metadata
        eligible_user_ids: list[int] = cast(
            "list[int]",
            get_feature_metadata_value(
                feature_name,
                eligible_user_ids_metadata_key,
                [],
            ),
        )
        if user_id_int in eligible_user_ids:
            return True

        # Company-level allowlist from metadata
        eligible_company_ids: list[int] = cast(
            "list[int]",
            get_feature_metadata_value(
                feature_name,
                eligible_company_ids_metadata_key,
                [],
            ),
        )
        user_company_id = get_default_company_id_for_user(user_id_int)
        return user_company_id in eligible_company_ids

    except Exception:
        current_logger.exception(
            "Error checking if app user has access to feature flag",
            app_user_id=app_user_id,
            feature_name=feature_name,
        )
        return False

is_feature_active

is_feature_active(feature_name)

Check if a feature is active :param feature_name: the name of the feature :return: true if the feature with this name is found and active

Source code in components/fr/public/feature.py
def is_feature_active(
    feature_name: str,
) -> bool:
    """
    Check if a feature is active
    :param feature_name: the name of the feature
    :return: true if the feature with this name is found and active
    """
    return Feature.is_enabled(name=feature_name)

components.fr.public.fraud_detection

enums

FraudRelevantUserChangeType

Bases: AlanBaseEnum

EmailUpdated class-attribute instance-attribute
EmailUpdated = 'email_updated'
SettlementIbanUpdated class-attribute instance-attribute
SettlementIbanUpdated = 'settlement_iban_updated'

queries

find_most_recent_fraud_relevant_user_changes

find_most_recent_fraud_relevant_user_changes(
    user_ids,
    change_type=None,
    limit=None,
    since=None,
    until=None,
)
Source code in components/fr/internal/fraud_detection/business_logic/queries/fraud_relevant_user_change.py
def find_most_recent_fraud_relevant_user_changes(
    user_ids: Iterable[int],
    change_type: FraudRelevantUserChangeType | None = None,
    limit: int | None = None,
    since: datetime | None = None,
    until: datetime | None = None,
) -> list[FraudRelevantUserChangeEntity]:
    query = current_session.query(FraudRelevantUserChange).filter(  # noqa: ALN085
        FraudRelevantUserChange.user_id.in_(user_ids)
    )
    if change_type:
        query = query.filter(FraudRelevantUserChange.type == change_type)
    if since:
        query = query.filter(FraudRelevantUserChange.created_at >= since)
    if until:
        query = query.filter(FraudRelevantUserChange.created_at <= until)
    if limit:
        query = query.limit(limit)
    query = query.order_by(FraudRelevantUserChange.created_at.asc())

    return [
        FraudRelevantUserChangeEntity(
            user_id=change.user_id,
            actor_id=change.actor_id,
            type=change.type,
            source=change.source,
            new_values=change.new_values,
            old_values=change.old_values,
            occurred_at=change.created_at,
        )
        for change in query.all()
    ]

components.fr.public.global_customer_dashboard

admin

get_admined_entities_for_entity_selector_fr

get_admined_entities_for_entity_selector_fr(user_id)
Source code in components/fr/public/global_customer_dashboard/admin.py
def get_admined_entities_for_entity_selector_fr(  # noqa: D103
    user_id: str,
) -> list[AdminedEntityForEntitySelector]:
    from components.fr.internal.models.account import Account
    from components.fr.internal.models.company import Company
    from components.fr.internal.operational_scopes.models.operational_scope import (
        OperationalScope,
    )

    return get_admined_entities_for_entity_selector_global(
        user_id=user_id,
        operational_scope_model=OperationalScope,
        company_model=Company,
        account_model=Account,
        admined_entities_query_api=admined_entities_query_api_fr(),
    )

get_pending_onboardings_for_admined_entity_selector_fr

get_pending_onboardings_for_admined_entity_selector_fr(
    user_id,
)
Source code in components/fr/public/global_customer_dashboard/admin.py
def get_pending_onboardings_for_admined_entity_selector_fr(  # noqa: D103
    user_id: str,
) -> list[PendingOnboarding]:
    from components.fr.internal.business_logic.company.queries.company import (
        get_companies_onboarding_status,
    )
    from components.fr.internal.models.company import Company
    from components.fr.internal.models.enums.company_onboarding_status import (
        CompanyOnboardingStatus,
    )

    def companies_having_pending_onboardings(company_ids: list[str]) -> dict[str, bool]:
        company_id_to_onboarding_status = get_companies_onboarding_status(
            [int(company_id) for company_id in company_ids]
        )

        return {
            company_id: company_id_to_onboarding_status[int(company_id)]
            not in {
                CompanyOnboardingStatus.completed,
                CompanyOnboardingStatus.missing_signature_callback,
            }
            for company_id in company_ids
        }

    return get_pending_onboardings_from_admined_entities(
        user_id=user_id,
        company_model=Company,
        companies_having_pending_onboardings=companies_having_pending_onboardings,
        admined_entities_query_api=admined_entities_query_api_fr(),
    )

product_setting

get_account_settings_for_account

get_account_settings_for_account(account_id)

Returns the list of activated AccountSetting for the given account :param account_id: The ID of the account we want AccountSetting for :return: The list of activated AccountSetting for this account

Source code in components/fr/public/global_customer_dashboard/product_setting.py
def get_account_settings_for_account(account_id: uuid.UUID) -> list[ProductSetting]:
    """
    Returns the list of activated AccountSetting for the given account
    :param account_id: The ID of the account we want AccountSetting for
    :return: The list of activated AccountSetting for this account
    """
    return get_account_settings(account_id=account_id)

get_account_settings_for_company

get_account_settings_for_company(company_id)

Returns the list of activated AccountSetting for the given company :param company_id: The ID of the company we want AccountSetting for :return: The list of activated AccountSetting for this company

Source code in components/fr/public/global_customer_dashboard/product_setting.py
def get_account_settings_for_company(company_id: int) -> list[ProductSetting]:
    """
    Returns the list of activated AccountSetting for the given company
    :param company_id: The ID of the company we want AccountSetting for
    :return: The list of activated AccountSetting for this company
    """
    return get_account_settings_for_company_internal(company_id=company_id)

components.fr.public.health_pricing

pricing_type

PricingType

Bases: AlanBaseEnum

Enum for pricing types that are used in the health pricing component.

adult_children class-attribute instance-attribute
adult_children = 'adult_children'
children class-attribute instance-attribute
children = 'children'
family class-attribute instance-attribute
family = 'family'
partner class-attribute instance-attribute
partner = 'partner'
primary class-attribute instance-attribute
primary = 'primary'

components.fr.public.operational_scopes

employment_consumer

operational_scopes_employment_change_consumer

operational_scopes_employment_change_consumer(
    employment_change, event_bus_orchestrator
)
Source code in components/fr/public/operational_scopes/employment_consumer.py
def operational_scopes_employment_change_consumer(  # noqa: D103
    employment_change: EmploymentChange["FrExtendedValues"],
    event_bus_orchestrator: EventBusOrchestrator,
) -> None:
    if employment_change.country_code != CountryCode.fr:
        return

    from components.fr.internal.operational_scopes.business_logic.actions import (
        process_employment_change,
    )

    process_employment_change(employment_change, event_bus_orchestrator)

factories

OperationalScopeFactory

Bases: AlanBaseFactory['OperationalScope']

Meta
model class-attribute instance-attribute
model = OperationalScope
Params
is_group class-attribute instance-attribute
is_group = Trait(
    type=group, value=lazy_attribute(lambda o: name)
)
company class-attribute instance-attribute
company = SubFactory(CompanyFullFactory)
name class-attribute instance-attribute
name = LazyFunction(word)
type class-attribute instance-attribute
type = siret
value class-attribute instance-attribute
value = sequence(lambda n: f'{n}')

components.fr.public.policy

policy

get_n_base_filtered_children_on

get_n_base_filtered_children_on(
    policy_id, date, adult_children
)
Source code in components/fr/public/policy/policy.py
@deprecated(
    "Avoid using beneficiary count methods, use rather the unified pricing abstraction to get costs."
)
def get_n_base_filtered_children_on(  # noqa: D103
    policy_id: int, date: datetime.date, adult_children: bool
) -> int:
    return internal_get_n_base_filtered_children_on(policy_id, date, adult_children)

get_n_base_paid_filtered_children_on

get_n_base_paid_filtered_children_on(
    policy_id, on_date, adult_children
)
Source code in components/fr/public/policy/policy.py
@deprecated(
    "Avoid using beneficiary count methods, use rather the unified pricing abstraction to get costs."
)
def get_n_base_paid_filtered_children_on(  # noqa: D103
    policy_id: int, on_date: datetime.date, adult_children: bool
) -> int:
    return internal_get_n_base_paid_children_on(policy_id, on_date, adult_children)

is_policy_change_significant

is_policy_change_significant(
    old_policy_id,
    new_policy_id,
    new_start_date,
    old_policy_precomputed_coverage=None,
)

Returns True if the change between two policies is significant, and member notification and action is required.

Returns False if the change can be considered "transparent" and does not require member action.

This function should be used primarily for employee transfer cases.

Source code in components/fr/public/policy/policy.py
def is_policy_change_significant(
    old_policy_id: int,
    new_policy_id: int,
    new_start_date: datetime.date,
    old_policy_precomputed_coverage: _PrecomputedPolicyCoverage | None = None,
) -> bool:
    """
    Returns True if the change between two policies is significant, and member notification and action is required.

    Returns False if the change can be considered "transparent" and does not require member action.

    This function should be used primarily for employee transfer cases.
    """
    last_day_date = new_start_date - datetime.timedelta(days=1)

    if is_policy_coverage_different(
        old_policy_id=old_policy_id,
        new_policy_id=new_policy_id,
        last_day_date=last_day_date,
        new_start_date=new_start_date,
        old_policy_precomputed_coverage=old_policy_precomputed_coverage,
    ):
        return True

    (no_option_available_anymore, new_option, new_price_for_same_option) = (
        change_in_option_coverage(old_policy_id, new_policy_id, new_start_date)
    )
    if no_option_available_anymore or new_option or new_price_for_same_option:
        return True

    old_policy = get_or_raise_missing_resource(Policy, old_policy_id)
    was_ani = old_policy.is_ani_on(new_start_date - datetime.timedelta(days=1))
    if was_ani:
        return True

    was_individual = old_policy.contract.contractee_type == ContracteeType.individual
    if was_individual:
        return True

    return False

is_policy_coverage_different

is_policy_coverage_different(
    old_policy_id,
    new_policy_id,
    last_day_date,
    new_start_date,
    old_policy_precomputed_coverage=None,
)

Returns True if the coverage differs between the two policies (either in pricing or in actual coverage).

Source code in components/fr/public/policy/policy.py
def is_policy_coverage_different(
    old_policy_id: int,
    new_policy_id: int,
    last_day_date: datetime.date,
    new_start_date: datetime.date,
    old_policy_precomputed_coverage: _PrecomputedPolicyCoverage | None = None,
) -> bool:
    """
    Returns True if the coverage differs between the two policies (either in pricing or in actual coverage).
    """
    from components.fr.internal.business_logic.health_pricing.policy import (
        get_policy_cost_split_by_primary_and_dependants,
        is_different_salary_rate_pricing_between_policies,
    )

    old_policy = get_or_raise_missing_resource(Policy, old_policy_id)
    new_policy = get_or_raise_missing_resource(Policy, new_policy_id)

    old_coverage_key = (
        old_policy_precomputed_coverage.coverage_key
        if old_policy_precomputed_coverage is not None
        else old_policy.contract.health_coverage_key(last_day_date)
    )
    new_coverage_key = new_policy.contract.health_coverage_key(new_start_date)

    old_policy_price_split = (
        old_policy_precomputed_coverage.policy_price_split
        if old_policy_precomputed_coverage is not None
        else get_policy_cost_split_by_primary_and_dependants(
            old_policy.id, last_day_date
        )
    )
    new_policy_price_split = get_policy_cost_split_by_primary_and_dependants(
        new_policy.id, new_start_date
    )

    old_coverage_price = old_policy_price_split.total_cost
    new_coverage_price = new_policy_price_split.total_cost

    is_coverage_different = (
        old_coverage_key != new_coverage_key
        or old_coverage_price != new_coverage_price
        or is_different_salary_rate_pricing_between_policies(
            old_policy=old_policy,
            new_policy=new_policy,
            last_day_date=last_day_date,
            new_start_date=new_start_date,
        )
    )
    return is_coverage_different

precompute_policy_coverage_for_policy_comparison

precompute_policy_coverage_for_policy_comparison(
    policy_id, new_start_date
)

For use with is_policy_change_significant in case you need to cancel the policy afterwards (otherwise, after cancellation, the cost becomes 0)

Source code in components/fr/public/policy/policy.py
def precompute_policy_coverage_for_policy_comparison(
    policy_id: int, new_start_date: datetime.date
) -> _PrecomputedPolicyCoverage:
    """
    For use with is_policy_change_significant in case you need to cancel the policy afterwards (otherwise, after cancellation, the cost becomes 0)
    """
    from components.fr.internal.business_logic.health_pricing.policy import (
        get_policy_cost_split_by_primary_and_dependants,
    )

    policy = get_or_raise_missing_resource(Policy, policy_id)
    return _PrecomputedPolicyCoverage(
        policy.contract.health_coverage_key(new_start_date),
        get_policy_cost_split_by_primary_and_dependants(policy.id, new_start_date),
    )

components.fr.public.prevoyance

entities

PrevoyanceGuaranteeSpecBeneficiaryEligibilityCriteria dataclass

PrevoyanceGuaranteeSpecBeneficiaryEligibilityCriteria(
    beneficiary_relationship_type,
    max_age_at_disability_start_date=None,
    min_covered_age=None,
    max_covered_age=None,
    disabled_max_covered_age=None,
    student_max_covered_age=None,
    unemployed_without_benefits_max_covered_age=None,
    apprentice_max_covered_age=None,
    is_dependent=None,
    is_under_supervision_or_in_psychiatric_hospital=None,
    paid_for_the_funeral=None,
)

Bases: DataClassJsonMixin

This dataclass describes the beneficiary criteria to be eligible to a prevoyance guarantee spec.

Only specific beneficiary could benefit from the coverage of certain guarantees (eg rente education only for children at charge, member in case of work stoppage)

apprentice_max_covered_age class-attribute instance-attribute
apprentice_max_covered_age = None
beneficiary_relationship_type instance-attribute
beneficiary_relationship_type
disabled_max_covered_age class-attribute instance-attribute
disabled_max_covered_age = None
is_dependent class-attribute instance-attribute
is_dependent = None
is_under_supervision_or_in_psychiatric_hospital class-attribute instance-attribute
is_under_supervision_or_in_psychiatric_hospital = None
max_age_at_disability_start_date class-attribute instance-attribute
max_age_at_disability_start_date = None
max_covered_age class-attribute instance-attribute
max_covered_age = None
min_covered_age class-attribute instance-attribute
min_covered_age = None
paid_for_the_funeral class-attribute instance-attribute
paid_for_the_funeral = None
student_max_covered_age class-attribute instance-attribute
student_max_covered_age = None
unemployed_without_benefits_max_covered_age class-attribute instance-attribute
unemployed_without_benefits_max_covered_age = None

PrevoyanceGuaranteeSpecEligibilityCriteria dataclass

PrevoyanceGuaranteeSpecEligibilityCriteria(
    event_criteria, member_criteria, beneficiary_criteria
)

Bases: DataClassJsonMixin

These criteria determine in which circumstances guarantees are eligible and could be covered.

When in such circumstances, we would only create / suggest claims with the eligible guarantees (eg for a death: death lump sum, rente education, funerals, etc).

Criteria are of different nature: - Event: guarantees only apply to specific events, in specific situation (eg. accidental death) - Member: guarantees eligibility could depend on the member tenure, on the member dependents, on their disability condition etc - Beneficiary: some guarantees only apply to specific beneficiaries (eg rente education only for children at charge)

Exploration can be found here https://docs.google.com/spreadsheets/d/1yyLaVasQnwSVhASZ3c41d7hFkp56RxiRmSwlMBHj2Mg/edit?usp=sharing ⧉ https://docs.google.com/document/d/1qIsYQgD6Iy9dLN61iKAbAD0S-LLHRCcmvE_owSyvFbk/edit?usp=sharing ⧉

beneficiary_criteria instance-attribute
beneficiary_criteria
event_criteria instance-attribute
event_criteria
member_criteria instance-attribute
member_criteria

PrevoyanceGuaranteeSpecEventEligibilityCriteria dataclass

PrevoyanceGuaranteeSpecEventEligibilityCriteria(
    event_type,
    event_user_relationship_type,
    death_event_user_min_age=None,
    event_is_not_covered_by_secu=None,
    event_is_covered_in_parental_leave=None,
    event_is_accidental=None,
    event_is_accidental_or_after_professional_disease=None,
    event_required_assistance_from_third_party=None,
    event_is_long_term_condition=None,
    work_stoppage_valid_reasons=None,
    work_stoppage_min_duration_days=None,
    death_is_before_member_death=None,
    death_is_simultaneous_partner_death=None,
    simultaneous_partner_death_max_delay_months=None,
    disability_is_atmp=None,
    disability_rate_min=None,
    disability_rate_max=None,
    disability_categories=None,
)

Bases: DataClassJsonMixin

This dataclass describes the prevoyance event criteria to be eligible to a prevoyance guarantee spec

Guarantee would only cover specific events based on these criteria (eg. type of event, if death is accidental or on the disability rate)

death_event_user_min_age class-attribute instance-attribute
death_event_user_min_age = None
death_is_before_member_death class-attribute instance-attribute
death_is_before_member_death = None
death_is_simultaneous_partner_death class-attribute instance-attribute
death_is_simultaneous_partner_death = None
disability_categories class-attribute instance-attribute
disability_categories = None
disability_is_atmp class-attribute instance-attribute
disability_is_atmp = None
disability_rate_max class-attribute instance-attribute
disability_rate_max = None
disability_rate_min class-attribute instance-attribute
disability_rate_min = None
event_is_accidental class-attribute instance-attribute
event_is_accidental = None
event_is_accidental_or_after_professional_disease class-attribute instance-attribute
event_is_accidental_or_after_professional_disease = None
event_is_covered_in_parental_leave class-attribute instance-attribute
event_is_covered_in_parental_leave = None
event_is_long_term_condition class-attribute instance-attribute
event_is_long_term_condition = None
event_is_not_covered_by_secu class-attribute instance-attribute
event_is_not_covered_by_secu = None
event_required_assistance_from_third_party class-attribute instance-attribute
event_required_assistance_from_third_party = None
event_type instance-attribute
event_type
event_user_relationship_type instance-attribute
event_user_relationship_type
simultaneous_partner_death_max_delay_months class-attribute instance-attribute
simultaneous_partner_death_max_delay_months = None
work_stoppage_min_duration_days class-attribute instance-attribute
work_stoppage_min_duration_days = None
work_stoppage_valid_reasons class-attribute instance-attribute
work_stoppage_valid_reasons = None

PrevoyanceGuaranteeSpecMemberEligibilityCriteria dataclass

PrevoyanceGuaranteeSpecMemberEligibilityCriteria(
    min_tenure_months=None,
    max_tenure_months=None,
    max_tenure_or_less_than_200_hours_per_trimester=None,
    min_hours_worked_in_trimester=None,
    max_hours_worked_in_trimester=None,
    min_covered_presence_days=None,
    has_dependent_children=None,
    has_dependent=None,
    is_married_or_pacsed=None,
)

Bases: DataClassJsonMixin

This dataclass describes the member criteria to be eligible to a prevoyance guarantee spec

It could be based on the member tenure in the company at the event date or on their dependents at charge

has_dependent class-attribute instance-attribute
has_dependent = None
has_dependent_children class-attribute instance-attribute
has_dependent_children = None
is_married_or_pacsed class-attribute instance-attribute
is_married_or_pacsed = None
max_hours_worked_in_trimester class-attribute instance-attribute
max_hours_worked_in_trimester = None
max_tenure_months class-attribute instance-attribute
max_tenure_months = None
max_tenure_or_less_than_200_hours_per_trimester class-attribute instance-attribute
max_tenure_or_less_than_200_hours_per_trimester = None
min_covered_presence_days class-attribute instance-attribute
min_covered_presence_days = None
min_hours_worked_in_trimester class-attribute instance-attribute
min_hours_worked_in_trimester = None
min_tenure_months class-attribute instance-attribute
min_tenure_months = None

queries

PrevoyanceCCNContstraintEntity dataclass

PrevoyanceCCNContstraintEntity(
    id, ccn_id, non_cadre_coverage_required
)

Bases: DataClassJsonMixin

Represents a constraint for a prevoyance contract CCN.

Attributes:

Name Type Description
id int

The unique identifier of the constraint.

ccn_id int

The ID of the CCN.

non_cadre_coverage_required bool

Indicates if non-cadre coverage for all contracts on the ccn.

ccn_id instance-attribute
ccn_id
id instance-attribute
id
non_cadre_coverage_required instance-attribute
non_cadre_coverage_required

PrevoyanceParticipationCCNContstraintEntity dataclass

PrevoyanceParticipationCCNContstraintEntity(
    id, ccn_id, professional_category, minimum_participation
)

Bases: DataClassJsonMixin

Represents a participation constraint for a prevoyance contract CCN.

Attributes:

Name Type Description
id int

The unique identifier of the constraint.

ccn_id int

The ID of the CCN.

professional_category Optional[ProfessionalCategory]

The professional category.

minimum_participation float

The minimum participation required for the professional category on the ccn.

ccn_id instance-attribute
ccn_id
id instance-attribute
id
minimum_participation instance-attribute
minimum_participation
professional_category instance-attribute
professional_category

get_prevoyance_ccn_constraints

get_prevoyance_ccn_constraints(ccn_ids)

Return the list of PrevoyanceCCNCoinstraints for a list of ccn_ids.

Source code in components/fr/public/prevoyance/queries.py
def get_prevoyance_ccn_constraints(
    ccn_ids: list[int],
) -> list[PrevoyanceCCNContstraintEntity] | None:
    """
    Return the list of PrevoyanceCCNCoinstraints for a list of ccn_ids.
    """
    prevoyance_ccn_constraints = (
        current_session.query(PrevoyanceCCNConstraint)  # noqa: ALN085
        .filter(PrevoyanceCCNConstraint.ccn_id.in_(ccn_ids))
        .all()
    )
    if len(prevoyance_ccn_constraints) == 0:
        return None

    return [
        PrevoyanceCCNContstraintEntity(
            id=prevoyance_ccn_constraint.id,  # type: ignore[arg-type]
            ccn_id=prevoyance_ccn_constraint.ccn_id,
            non_cadre_coverage_required=prevoyance_ccn_constraint.non_cadre_coverage_required,
        )
        for prevoyance_ccn_constraint in prevoyance_ccn_constraints
    ]

get_prevoyance_participation_ccn_constraints

get_prevoyance_participation_ccn_constraints(
    ccn_id, professional_category=None
)

Return the list of CCN constraints that apply to the company's prevoyance contract.

Source code in components/fr/public/prevoyance/queries.py
def get_prevoyance_participation_ccn_constraints(
    ccn_id: int,
    professional_category: ProfessionalCategory | None = None,
) -> PrevoyanceParticipationCCNContstraintEntity | None:
    """
    Return the list of CCN constraints that apply to the company's prevoyance contract.
    """
    ccn = current_session.get(CCN, ccn_id)
    if not ccn:
        return None

    prevoyance_participation_ccn_constraints = (
        current_session.query(PrevoyanceParticipationCCNConstraint)  # noqa: ALN085
        .filter(
            PrevoyanceParticipationCCNConstraint.ccn_id == ccn.id,
            # We always include all or none entries so that we can apply them to `cadres/non-cadres` as well.
            or_(
                PrevoyanceParticipationCCNConstraint.professional_category
                == ProfessionalCategory.all,
                PrevoyanceParticipationCCNConstraint.professional_category == None,
                PrevoyanceParticipationCCNConstraint.professional_category
                == professional_category,
            ),
        )
        .all()
    )

    if len(prevoyance_participation_ccn_constraints) == 0:
        return None

    # With the DB constraints we can only ever have one entry for a given ccn_id and professional_category, but we check just in case and for typing.
    if len(prevoyance_participation_ccn_constraints) > 1:
        current_logger.error(
            f"Multiple participation constraints found for ccn_id={ccn_id}, professional_category={professional_category}, returning None and not handling ccn compliance in this case."
        )

        return None

    prevoyance_participation_ccn_constraint = prevoyance_participation_ccn_constraints[
        0
    ]

    return PrevoyanceParticipationCCNContstraintEntity(
        id=prevoyance_participation_ccn_constraint.id,  # type: ignore[arg-type]
        ccn_id=prevoyance_participation_ccn_constraint.ccn_id,
        professional_category=prevoyance_participation_ccn_constraint.professional_category,
        minimum_participation=prevoyance_participation_ccn_constraint.minimum_participation,  # type: ignore[arg-type]
    )

get_work_stoppages_for_users

get_work_stoppages_for_users(
    user_ids, ever_active_during_period=None
)

Get the list of work stoppages for a list of users.

Source code in components/fr/public/prevoyance/queries.py
def get_work_stoppages_for_users(
    user_ids: Iterable[int],
    ever_active_during_period: tuple[date, date] | None = None,
) -> dict[int, list[WorkStoppage]]:
    """
    Get the list of work stoppages for a list of users.
    """
    from components.fr.internal.models.employment import Employment
    from components.fr.internal.prevoyance_claim_management.models.internalized_work_stoppage import (
        InternalizedWorkStoppage,
    )
    from components.fr.internal.prevoyance_claim_management.models.internalized_work_stoppage_info import (
        InternalizedWorkStoppageInfo,
    )

    query = (
        current_session.query(  # noqa: ALN085
            InternalizedWorkStoppage.id,
            InternalizedWorkStoppageInfo.start_date,
            InternalizedWorkStoppageInfo.end_date,
            InternalizedWorkStoppageInfo.reason,
            Employment.company_id,
            Employment.user_id,
        )
        .select_from(InternalizedWorkStoppage)
        .join(InternalizedWorkStoppage.employment)
        .join(InternalizedWorkStoppage.internalized_work_stoppage_infos)  # type: ignore[arg-type]
        .filter(
            Employment.user_id.in_(user_ids),
            InternalizedWorkStoppageInfo.is_cancelled.is_(False),
            InternalizedWorkStoppageInfo.is_obsolete.is_(False),
        )
    )

    if ever_active_during_period is not None:
        period_start, period_end = ever_active_during_period
        query = query.filter(
            InternalizedWorkStoppageInfo.start_date <= period_end,
            InternalizedWorkStoppageInfo.end_date >= period_start,
        )

    work_stoppage_by_user_id_mapping: dict[int, list[WorkStoppage]] = defaultdict(list)

    for row in query.all():
        work_stoppage_by_user_id_mapping[row.user_id].append(
            WorkStoppage(
                user_id=row.user_id,
                company_id=row.company_id,
                reason=row.reason,
                start_date=row.start_date,
                end_date=row.end_date,
            )
        )

    return dict(work_stoppage_by_user_id_mapping)

components.fr.public.queries

command_logs

get_command_logs

get_command_logs(start_at, end_at)
Source code in components/fr/public/queries/command_logs.py
def get_command_logs(start_at: datetime, end_at: datetime) -> list[CommandLogEntity]:  # noqa: D103
    logs = current_session.scalars(
        select(CommandLog).filter(
            CommandLog.created_at >= start_at, CommandLog.created_at < end_at
        )
    )

    return [
        CommandLogEntity(
            id=log.id,
            command=log.command,
            run_at=log.run_at,
            completed_at=log.completed_at,
            success=log.success,
            model_slug="commandlog",
        )
        for log in logs
    ]

policy

get_current_or_future_subscription_info_for_user_on

get_current_or_future_subscription_info_for_user_on(
    user_id, on_date
)
Source code in components/fr/internal/policy/queries.py
def get_current_or_future_subscription_info_for_user_on(
    user_id: int,
    on_date: date,
) -> UserSubscriptionInfo | None:
    user = get_or_raise_missing_resource(User, user_id)

    user_profile = user.insurance_profile
    if user_profile is None:
        return None

    last_or_current_or_future_policy = user_profile.current_policy

    if (
        last_or_current_or_future_policy is None
        or last_or_current_or_future_policy.end_date is not None
        and last_or_current_or_future_policy.is_ended_on(on_date)
    ):  # We don't want to consider ended policies
        return None

    return (
        UserSubscriptionInfo(
            policy_id=last_or_current_or_future_policy.id,
            contract_id=last_or_current_or_future_policy.contract_id,
            coverage_start_date=last_or_current_or_future_policy.start_date,
        )
        if user_profile.current_policy
        else None
    )

get_subscription_id_for_user_on

get_subscription_id_for_user_on(user_id, on_date)

Returns user subscription id linked to the user on a given on_date

user_id: The user ID to get affiliation periods for on_date: The date to check the affiliation periods against

Returns:

Type Description
int | None

policy contract ID

Source code in components/fr/internal/policy/queries.py
def get_subscription_id_for_user_on(
    user_id: int,
    on_date: date,
) -> int | None:
    """
    Returns user subscription id linked to the user on a given on_date

    user_id: The user ID to get affiliation periods for
    on_date: The date to check the affiliation periods against

    Returns:
      policy contract ID
    """
    policy_id = get_policy_id_for_user_on(user_id, on_date)

    policy = get_or_raise_missing_resource(Policy, policy_id)

    return policy.contract_id

get_subscription_ids_for_users_on

get_subscription_ids_for_users_on(user_ids, on_date)

Returns user subscription ids linked to users on a given on_date

Parameters:

Name Type Description Default
user_ids list[int]

List of user IDs to get subscription IDs for

required
on_date date

The date to check the affiliation periods against

required

Returns:

Type Description
dict[int, int]

dict[int, int]: Mapping of user_id to subscription_id (contract_id). Only includes users with active subscriptions.

Source code in components/fr/internal/policy/queries.py
def get_subscription_ids_for_users_on(
    user_ids: list[int],
    on_date: date,
) -> dict[int, int]:
    """
    Returns user subscription ids linked to users on a given on_date

    Args:
        user_ids: List of user IDs to get subscription IDs for
        on_date: The date to check the affiliation periods against

    Returns:
        dict[int, int]: Mapping of user_id to subscription_id (contract_id).
            Only includes users with active subscriptions.
    """
    from sqlalchemy import select

    from components.fr.internal.models.enrollment import Enrollment as FrEnrollment
    from components.fr.internal.models.insurance_profile import InsuranceProfile
    from components.fr.internal.models.policy import Policy
    from shared.helpers.db import current_session

    if not user_ids:
        return {}

    # Query enrollments active on the given date with joins to get user_id and contract_id
    results = current_session.execute(
        select(FrEnrollment, InsuranceProfile.user_id, Policy.contract_id)
        .join(
            InsuranceProfile,
            FrEnrollment.insurance_profile_id == InsuranceProfile.id,
        )
        .join(Policy, FrEnrollment.policy_id == Policy.id)
        .where(
            InsuranceProfile.user_id.in_(user_ids),
            FrEnrollment.is_active_on(on_date),
        )
    ).all()

    return {user_id: contract_id for enrollment, user_id, contract_id in results}

has_active_option_on

has_active_option_on(user_id, on_date)

Returns whether the user has an active or future option on a given on_date

user_id: The user ID to get affiliation periods for on_date: The date to check the affiliation periods against

Returns:

Type Description
bool

boolean

Source code in components/fr/internal/policy/queries.py
def has_active_option_on(user_id: int, on_date: date) -> bool:
    """
    Returns whether the user has an active or future option on a given on_date

    user_id: The user ID to get affiliation periods for
    on_date: The date to check the affiliation periods against

    Returns:
      boolean
    """
    policy_id = get_policy_id_for_user_on(user_id, on_date)

    policy = get_or_raise_missing_resource(Policy, policy_id)

    return bool(policy.option_contract_on(on_date)) or bool(
        policy.upcoming_option_contract
    )

has_active_or_future_children_beneficiary_on

has_active_or_future_children_beneficiary_on(
    user_id, on_date
)

Returns whether the user has an active or future child beneficiary on a given date

user_id: The user ID to get affiliation periods for on_date: The date to check the affiliation periods against

Returns:

Type Description
bool

boolean

Source code in components/fr/internal/policy/queries.py
def has_active_or_future_children_beneficiary_on(user_id: int, on_date: date) -> bool:
    """
    Returns whether the user has an active or future child beneficiary on a given date

    user_id: The user ID to get affiliation periods for
    on_date: The date to check the affiliation periods against

    Returns:
      boolean
    """
    children_enrollments_for_user_on_date = get_user_and_dependent_enrollments_on(
        user_id, on_date, EnrollmentType.child
    )

    return (
        len(children_enrollments_for_user_on_date) > 0
        if children_enrollments_for_user_on_date
        else False
    )

components.fr.public.scim_api

adapter

FrScimAdapter

FrScimAdapter()

Bases: GenericScimAdapter

SCIM adapter for fr_api.

Source code in components/fr/public/scim_api/adapter.py
def __init__(self) -> None:
    super().__init__()
    self.profile_service = ProfileService.create(app_name=AppName.ALAN_FR)
create_app_user
create_app_user(first_name, last_name, email)

Create a user with the given first and last name. and returns the user ID.

Source code in components/fr/public/scim_api/adapter.py
@override
def create_app_user(
    self, first_name: str, last_name: str, email: str
) -> int | uuid.UUID:
    """
    Create a user with the given first and last name. and returns the user ID.
    """
    user = create_profile_with_user(first_name=first_name, last_name=last_name)
    return user.id
get_scim_users_data
get_scim_users_data(alan_employees)

Returns the first and last name of users from a list of AlanEmployee objects.

Source code in components/fr/public/scim_api/adapter.py
@override
def get_scim_users_data(
    self,
    alan_employees: list[AlanEmployee],  # type: ignore[override]
) -> dict[int | uuid.UUID, AlanEmployeeIdentity]:
    """
    Returns the first and last name of users from a list of AlanEmployee objects.
    """
    user_profiles = self.profile_service.get_profiles(
        profile_ids={
            alan_employee.user.profile_id for alan_employee in alan_employees
        }
    )
    user_profiles_dict = {
        user_profile.id: user_profile for user_profile in user_profiles
    }

    return {
        alan_employee.user_id: AlanEmployeeIdentity(
            first_name=user_profiles_dict[alan_employee.user.profile_id].first_name,
            last_name=user_profiles_dict[alan_employee.user.profile_id].last_name,
        )
        for alan_employee in alan_employees
        if alan_employee.user.profile_id in user_profiles_dict
    }
get_user_data
get_user_data(user_id)

Returns user's first and last name by user_id.

Source code in components/fr/public/scim_api/adapter.py
@override
def get_user_data(self, user_id: int | uuid.UUID) -> AlanEmployeeIdentity:
    """
    Returns user's first and last name by user_id.
    """
    if not isinstance(user_id, int):
        raise TypeError("User ID must be a int")

    user = current_session.query(User).where(User.id == user_id).one_or_none()  # noqa: ALN085
    if user is None:
        raise BaseErrorCode.missing_resource(f"User id {user_id} not found")
    user_profile = self.profile_service.get_or_raise_profile(
        profile_id=user.profile_id
    )

    return AlanEmployeeIdentity(
        first_name=user_profile.first_name, last_name=user_profile.last_name
    )
profile_service instance-attribute
profile_service = create(app_name=ALAN_FR)

test

test_adapter

adapter
adapter()

Fixture for the EsGenericScimAdapter instance.

Source code in components/fr/public/scim_api/test/test_adapter.py
@pytest.fixture
def adapter() -> FrScimAdapter:
    """Fixture for the EsGenericScimAdapter instance."""
    return FrScimAdapter()
profile_service
profile_service()

Fixture for the profile service.

Source code in components/fr/public/scim_api/test/test_adapter.py
@pytest.fixture
def profile_service() -> ProfileService:
    """Fixture for the profile service."""
    return ProfileService.create(app_name=AppName.ALAN_FR)
test_create_app_user
test_create_app_user(adapter, profile_service)

Test create_app_user creates a new user correctly.

Source code in components/fr/public/scim_api/test/test_adapter.py
@pytest.mark.usefixtures("db")
def test_create_app_user(adapter, profile_service):
    """Test create_app_user creates a new user correctly."""
    user_id = adapter.create_app_user(
        first_name="John", last_name="Doe", email="john.doe@alan.eu"
    )

    created_user = current_session.query(User).filter(User.id == user_id).one_or_none()  # noqa: ALN085

    assert created_user is not None
    created_profile = profile_service.get_profile(profile_id=created_user.profile_id)
    assert created_profile is not None
    assert created_profile.first_name == "John"
    assert created_profile.last_name == "Doe"
test_get_scim_users_data
test_get_scim_users_data(adapter)

Test get_scim_users_data returns correct mapping of user data.

Source code in components/fr/public/scim_api/test/test_adapter.py
@pytest.mark.usefixtures("db")
def test_get_scim_users_data(adapter):
    """Test get_scim_users_data returns correct mapping of user data."""
    # Create test data
    user1, user2, employee1, employee2 = _provision_test_data()

    result = adapter.get_scim_users_data([employee1, employee2])
    assert len(result) == 2
    assert result[user1.id].first_name == "John"
    assert result[user1.id].last_name == "Doe"
    assert result[user2.id].first_name == "Jane"
    assert result[user2.id].last_name == "Smith"
test_get_user_data
test_get_user_data(adapter)

Test get_user_data returns correct user identity.

Source code in components/fr/public/scim_api/test/test_adapter.py
@pytest.mark.usefixtures("db")
def test_get_user_data(adapter):
    """Test get_user_data returns correct user identity."""
    # Create test data
    user1, _, _, _ = _provision_test_data()

    # Test with int
    result = adapter.get_user_data(user_id=user1.id)
    assert result.first_name == "John"
    assert result.last_name == "Doe"

    # Test with non-int
    with pytest.raises(TypeError):
        adapter.get_user_data(user_id=uuid.uuid4())

    # Test with non-existent user
    with pytest.raises(BaseErrorCode):
        adapter.get_user_data(user_id=999999)

components.fr.public.services

push_notifications

get_push_notification_logs_for_user

get_push_notification_logs_for_user(
    app_name,
    app_user_id,
    notification_names,
    created_at__gte=None,
)

Return a list of all the push notification logs ever created for the given user and notification names.

Source code in components/fr/public/services/push_notifications.py
def get_push_notification_logs_for_user(
    app_name: AppName,
    app_user_id: str,
    notification_names: list[BasePushNotificationName],
    created_at__gte: datetime | None = None,
) -> list[InMemoryPushNotificationLog]:
    """
    Return a list of all the push notification logs ever created for the given user and notification names.
    """
    return [
        InMemoryPushNotificationLog.from_model(notification_log=item)
        for item in current_session.query(PushNotificationLog).filter(  # noqa: ALN085
            PushNotificationLog.app_id == app_name,
            PushNotificationLog.app_user_id == app_user_id,
            PushNotificationLog.name.in_(notification_names),
            PushNotificationLog.created_at >= created_at__gte  # type: ignore[arg-type]
            if created_at__gte
            else True,
        )
    ]

push_notification_sender_async

push_notification_sender_async(sender)
Source code in components/fr/internal/push_notifications/push_notification_sender.py
def push_notification_sender_async(
    sender: Callable[P, PushNotificationParams | None],
) -> Callable[P, None]:
    @wraps(sender)
    def decorated_function(*args, **kwargs) -> None:  # type: ignore[no-untyped-def]
        pn_params: PushNotificationParams | None = sender(*args, **kwargs)

        if pn_params is None:
            return

        push_notification_logic.send_push_notification_async(
            notification_params=pn_params,
            commit=pn_params.commit,
        )

    return decorated_function

push_notification_sender_sync

push_notification_sender_sync(sender)
Source code in components/fr/internal/push_notifications/push_notification_sender.py
def push_notification_sender_sync(
    sender: Callable[P, PushNotificationParams | None],
) -> Callable[P, None]:
    @wraps(sender)
    def decorated_function(*args, **kwargs) -> None:  # type: ignore[no-untyped-def]
        pn_params: PushNotificationParams | None = sender(*args, **kwargs)

        if pn_params is None:
            return

        push_notification_logic.send_push_notification_sync(
            notification_params=pn_params,
            delete_token=delete_token,
            commit=pn_params.commit,
        )

    return decorated_function

components.fr.public.test_data_generator

get_test_data_generation_config

get_test_data_generation_config()
Source code in components/fr/internal/admin_tools/fixtures/test_data_generation_config.py
def get_test_data_generation_config() -> TestDataGeneratorConfig:
    return TestDataGeneratorConfig(
        patched_factories=patched_factories,
        handlers=get_fixture_handlers(),
        async_queue=MAIN_QUEUE,
        result_view_builder=result_view_builder,
    )

components.fr.public.user

create_profile_with_user

create_or_assign_profile_with_authenticatable_user

create_or_assign_profile_with_authenticatable_user(
    profile_service,
    authentication_service,
    email,
    prehashed_password,
    mfa_required=None,
    first_name=None,
    last_name=None,
    birth_date=None,
    language=None,
    phone_number=None,
    gender=None,
    empty_user=None,
)

Helper to create an authenticatable user (with credentials) and helps manage cross-country identities In this method we create a profile without identity elements, then get (or create) the corresponding user and try to set the credentials of this user. Set credentials handles the cases of conflict with existing identities. Finally, we set the profile email address if all steps before didn't raise

Source code in components/fr/public/user/create_profile_with_user.py
@inject_profile_service
@inject_authentication_service
def create_or_assign_profile_with_authenticatable_user(
    profile_service: ProfileService,
    authentication_service: AuthenticationService,
    email: str,
    prehashed_password: str,
    mfa_required: bool | None = None,
    first_name: str | None = None,
    last_name: str | None = None,
    birth_date: date | None = None,
    language: Lang | None = None,
    phone_number: str | None = None,
    gender: UserGender | None = None,
    empty_user: User | None = None,
) -> User:
    """
    Helper to create an authenticatable user (with credentials) and helps manage cross-country identities
    In this method we create a profile without identity elements, then get (or create) the corresponding user and try
    to set the credentials of this user. Set credentials handles the cases of conflict with existing identities.
    Finally, we set the profile email address if all steps before didn't raise
    """
    selected_language = language or Lang.french
    profile_id = None
    keycloak_id: uuid.UUID | None = None

    existing_user_in_same_country_with_email: User | None = (
        current_session.query(User).filter_by(email=email).one_or_none()  # noqa: ALN085
    )
    if existing_user_in_same_country_with_email is not None:
        current_logger.warning(
            f"can't set credentials: user with email {email} already exists"
        )
        raise BaseErrorCode.user_with_email_already_exists(
            message="User with this email already exists"
        )

    # Check if a profile with the same email already exists (aka a user from another country)
    existing_profile_with_email = profile_service.get_profile_by_email(email)

    if existing_profile_with_email:
        if existing_identity_with_email := authentication_service.get_identity_by_email(
            mandatory(existing_profile_with_email.email)
        ):
            if authentication_service.check_identity_password(
                identity_id=existing_identity_with_email.id,
                prehashed_password=prehashed_password,
            ):
                profile_id = existing_profile_with_email.id
                keycloak_id = existing_identity_with_email.id
                if empty_user is not None:
                    profile_service.merge_profile_into_another(
                        source_profile_id=empty_user.profile_id,
                        target_profile_id=profile_id,
                    )
                    empty_user.profile_id = profile_id
                    current_session.flush()
                _set_profile_data(
                    profile_service=profile_service,
                    profile_id=existing_profile_with_email.id,
                    first_name=first_name,
                    last_name=last_name,
                    birth_date=birth_date,
                    gender=gender,
                    selected_language=selected_language,
                    email=email,
                )

            else:
                raise BaseErrorCode.login_error()
        else:
            # this shouldn't happen but as email between keycloak and our backend is not always consistent it may
            raise ValueError(
                f"Profile with email {email} already exist bu no identity has this email address"
            )

    if empty_user is not None:
        profile_id = empty_user.profile_id
        _set_profile_data(
            profile_service=profile_service,
            profile_id=profile_id,
            first_name=first_name,
            last_name=last_name,
            birth_date=birth_date,
            gender=gender,
            selected_language=selected_language,
            email=email,
        )

    if not profile_id:
        profile_id = empty_user.profile_id if empty_user is not None else uuid.uuid4()
        # if there is an email this will also create the identity
        profile_id = profile_service.create_profile(
            profile_id=profile_id,
            email=email,
            first_name=first_name,
            last_name=last_name,
            birth_date=birth_date,
            preferred_language=LanguageMapper.to_entity(selected_language),
        )
        profile_service.change_phone_number(profile_id, phone_number=phone_number)

    user: User | None = (
        current_session.query(User).filter_by(profile_id=profile_id).one_or_none()  # noqa: ALN085
    )
    if not user:
        user = User(
            first_name=first_name,
            last_name=last_name,
            profile_id=profile_id,
            birth_date=birth_date,
            lang=selected_language,
            phone=phone_number,
            gender=gender,
        )

        current_session.add(user)
        current_session.flush()

    if keycloak_id is not None:
        # we found a matching identity,
        # TODO: @thibaut.caillierez: remove this branch once profile is directly linked to the identity without the user
        user.keycloak_id = keycloak_id
        user.email = email
    else:
        identity = mandatory(
            authentication_service.get_keycloak_identity_by_profile_id(profile_id)
        )
        authentication_service.set_identity_credentials(
            identity_id=identity.id,
            prehashed_password=prehashed_password,
            email=email,
            is_email_verified=True,
            mfa_required=mfa_required,
        )
    return user

create_profile_with_user

create_profile_with_user(
    *,
    profile_service,
    email=None,
    first_name=None,
    last_name=None,
    birth_date=None,
    language=None,
    phone_number=None,
    gender=None
)

Create a new user (and profile) with the given parameters.

Source code in components/fr/public/user/create_profile_with_user.py
@inject_profile_service
def create_profile_with_user(
    *,
    profile_service: ProfileService,
    email: str | None = None,
    first_name: str | None = None,
    last_name: str | None = None,
    birth_date: date | None = None,
    language: Lang | None = None,
    phone_number: str | None = None,
    gender: UserGender | None = None,
) -> User:
    """
    Create a new user (and profile) with the given parameters.
    """
    selected_language = language or Lang.french

    if email is not None:
        existing_profile_with_email = profile_service.get_profile_by_email(email)

        if existing_profile_with_email:
            raise ValueError(f"Profile with email {email} already exists")

    profile_id = profile_service.create_profile(
        email=email,
        first_name=first_name,
        last_name=last_name,
        birth_date=birth_date,
        preferred_language=LanguageMapper.to_entity(selected_language),
        gender=GenderMapper.to_entity(gender),
    )
    profile_service.change_phone_number(profile_id, phone_number=phone_number)

    # Profile is configured to work with France user table for now
    # Therefore, creating a profile will create a user under the hood
    # TODO @freddy.guitton won't be necessary when Profile is configured to use its own table
    user: User | None = (
        current_session.query(User).filter_by(profile_id=profile_id).one_or_none()  # noqa: ALN085
    )
    if not user:
        user = User(
            email=email,
            first_name=first_name,
            last_name=last_name,
            profile_id=profile_id,
            birth_date=birth_date,
            lang=selected_language,
            phone=phone_number,
            gender=gender,
        )

        current_session.add(user)
        current_session.flush()

    return user

insurance_profile

set_ssn_ntt_on_user

set_ssn_ntt_on_user(user_id, ssn=None, ntt=None)

Set the SSN or NTT for an existing user.

Do NOT commit the session.

Source code in components/fr/public/user/insurance_profile.py
def set_ssn_ntt_on_user(
    user_id: int,
    ssn: str | None = None,
    ntt: str | None = None,
) -> None:
    """
    Set the SSN or NTT for an existing user.

    Do NOT commit the session.
    """
    if not ssn and not ntt:
        raise ValueError("Please provide at least one of SSN or NTT.")

    insurance_profile = (
        current_session.query(InsuranceProfile)  # noqa: ALN085
        .filter(
            InsuranceProfile.user_id == user_id,
        )
        .one_or_none()
    )

    if not insurance_profile:
        insurance_profile = InsuranceProfile(user_id=user_id)
        current_session.add(insurance_profile)

    if ssn and insurance_profile.ssn != ssn:
        current_logger.info(f"Updating SSN for user {user_id}.")
        insurance_profile.ssn = ssn

    if ntt and insurance_profile.ntt != ntt:
        current_logger.info(f"Updating NTT for user {user_id}.")
        insurance_profile.ntt = ntt

    current_session.flush()

user

get_user

get_user(user_id)

Get user data

Parameters:

Name Type Description Default
user_id int

user id

required

Returns:

Type Description
User

French user data

Source code in components/fr/public/user/user.py
def get_user(user_id: int) -> User:
    """Get user data

    Args:
        user_id: user id

    Returns:
        French user data
    """
    return get_or_raise_missing_resource(User, user_id)

get_user_for_clinic

get_user_for_clinic(user_id)

Get user data for clinic with linked data

Parameters:

Name Type Description Default
user_id int

user id

required

Returns:

Type Description
User

French user data

Source code in components/fr/public/user/user.py
def get_user_for_clinic(user_id: int) -> User:
    """Get user data for clinic with linked data

    Args:
        user_id: user id

    Returns:
        French user data
    """
    return get_or_raise_missing_resource(
        User,
        user_id,
        options=[
            selectinload(User.address),
            selectinload(User.companies),
            selectinload(User.insurance_profile),
        ],
    )

get_user_from_email

get_user_from_email(email_address)
Source code in components/fr/public/user/user.py
def get_user_from_email(email_address: str) -> User | None:  # noqa: D103
    from components.fr.internal.business_logic.user.queries.user import (
        get_user_from_email as internal_get_user_from_email,
    )

    return internal_get_user_from_email(email_address=email_address)

get_user_pro_email_with_perso_fallback

get_user_pro_email_with_perso_fallback(user_id)

Get the pro email of the user, falling back to personal email if pro email is not set.

Parameters:

Name Type Description Default
user_id int

The user ID

required

Returns:

Type Description
str | None

The pro email if available, otherwise the personal email.

Source code in components/fr/public/user/user.py
def get_user_pro_email_with_perso_fallback(user_id: int) -> str | None:
    """Get the pro email of the user, falling back to personal email if pro email is not set.

    Args:
        user_id: The user ID

    Returns:
        The pro email if available, otherwise the personal email.
    """
    user = get_user(user_id)
    return user.pro_email_with_perso_fallback

is_user

is_user(user_id)
Source code in components/fr/public/user/user.py
def is_user(user_id: int) -> bool:  # noqa: D103
    return get_resource_or_none(User, user_id) is not None

components.fr.public.validators

ape_validator

ApeValidationError

Bases: Exception

wrong_format staticmethod
wrong_format(code)
Source code in components/fr/public/validators/ape_validator.py
2
3
4
5
6
@staticmethod
def wrong_format(code):  # type: ignore[no-untyped-def]  # noqa: D102
    return ApeValidationError(
        f"The ape code '{code}' must be composed of 4 digits + 1 letter"
    )

ApeValidator

  • Allows to check if a APE code (or NAF) is valid
validate_ape_code staticmethod
validate_ape_code(raw_code)
Source code in components/fr/public/validators/ape_validator.py
@staticmethod
def validate_ape_code(raw_code: str | None) -> str | None:  # noqa: D102
    if raw_code is None:
        return None

    raw_code = raw_code.replace(".", "").strip()

    if (
        len(raw_code) != 5
        or not raw_code[0:4].isnumeric()
        or not raw_code[-1].isalpha()
    ):
        raise ApeValidationError.wrong_format(raw_code)  # type: ignore[no-untyped-call]

    return raw_code