Skip to content

Api reference

components.be.public.account

account_has_live_health_contract_since

account_has_live_health_contract_since(account_id, since)

Checks that all companies of an account have had a live health contract in the time between the "since" date and now

Source code in components/be/public/account.py
def account_has_live_health_contract_since(account_id: UUID, since: date) -> bool:
    """Checks that all companies of an account have had a live health contract in the time between the "since" date and now"""
    from components.be.internal.models.be_account import BeAccount
    from components.be.internal.models.be_company import BeCompany
    from components.be.internal.models.health_contract import BeHealthContract

    live_health_contracts = (
        current_session.query(BeAccount)  # noqa: ALN085
        .join(BeAccount.companies)
        .options(joinedload(BeCompany.health_contracts))
        .filter(
            BeAccount.id == account_id,
            BeHealthContract.is_ever_active_between(since, utctoday()),
        )
        .count()
    )

    return live_health_contracts > 0

create_account

create_account(name)

Creates a new account with the given name.

Source code in components/be/public/account.py
def create_account(name: str) -> UUID:
    """
    Creates a new account with the given name.
    """
    account = BeAccount(
        name=name,
    )
    current_session.add(account)
    current_session.flush()
    return account.id

get_account_name

get_account_name(account_id)
Source code in components/be/public/account.py
def get_account_name(account_id: UUID) -> str:  # noqa: D103
    from components.be.internal.models.be_account import BeAccount
    from shared.helpers.get_or_else import get_or_raise_missing_resource

    account = get_or_raise_missing_resource(BeAccount, account_id)
    return account.name

user_is_account_admin

user_is_account_admin(user_id, account_id)
Source code in components/be/public/account.py
def user_is_account_admin(user_id: UUID, account_id: UUID) -> bool:  # noqa: D103
    from components.be.internal.account.queries.account import is_admin

    if not user_id or not account_id:
        return False

    return is_admin(user_id=user_id, account_id=account_id)

components.be.public.affiliation

get_next_affiliation_date

get_next_affiliation_date(user_id, on_date)

Returns the next affiliation date for a user on a given date.

Source code in components/be/public/affiliation.py
def get_next_affiliation_date(user_id: uuid.UUID, on_date: date) -> date | None:
    """
    Returns the next affiliation date for a user on a given date.
    """
    user = get_or_raise_missing_resource(BeUser, user_id)
    if not user.insurance_profile:
        return None

    next_enrollment = get_next_enrollment_on(
        insurance_profile_id=user.insurance_profile.id, the_date=on_date
    )

    return next_enrollment.start_date if next_enrollment else None

components.be.public.auth

authorization

AuthorizationStrategies

alaner_admin class-attribute instance-attribute
alaner_admin = BeAlanerAdminStrategy
authenticated class-attribute instance-attribute
authenticated = BeAuthenticatedStrategy
open class-attribute instance-attribute
open = BeOpenStrategy
owner_only class-attribute instance-attribute
owner_only = BeOwnerOnlyStrategy

BeAlanerAdminStrategy

BeAlanerAdminStrategy(permitted_for=None)

Bases: AlanerAdminStrategy

Source code in components/be/public/auth/authorization.py
def __init__(self, permitted_for: set[EmployeePermission] | None = None) -> None:
    super().__init__(permitted_for=permitted_for)

BeAuthenticatedStrategy

BeAuthenticatedStrategy(allow_deep_link=False)

Bases: AuthenticatedStrategy

Source code in components/be/public/auth/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    super().__init__(allow_deep_link=allow_deep_link)

BeOpenStrategy

BeOpenStrategy(allow_deep_link=False)

Bases: OpenStrategy

Source code in components/be/public/auth/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    super().__init__(allow_deep_link=allow_deep_link)

BeOwnerOnlyStrategy

BeOwnerOnlyStrategy(
    owner_bypass_permitted_for=None, allow_deep_link=False
)

Bases: OwnerOnlyStrategy

Source code in components/be/public/auth/authorization.py
def __init__(
    self,
    owner_bypass_permitted_for: set[EmployeePermission] | None = None,
    allow_deep_link: bool = False,
) -> None:
    super().__init__(
        owner_bypass_permitted_for=owner_bypass_permitted_for,
        allow_deep_link=allow_deep_link,
    )

business_logic

mfa_push_notification

send_mfa_operation_pending_push_notification
send_mfa_operation_pending_push_notification(
    user_id, description, pending_operation_id, type
)

Sends a request to validate an MFA-protected operation

Source code in components/be/public/auth/business_logic/mfa_push_notification.py
@push_notification_sender_sync  # type: ignore[arg-type]
def send_mfa_operation_pending_push_notification(
    user_id: UserId,
    description: str,
    pending_operation_id: UUID,
    type: str | None,  # noqa: A002
) -> UserPushNotificationParams:
    """
    Sends a request to validate an MFA-protected operation
    """
    user = get_resource_or_none(BeUser, user_id)
    lang = user.lang if user is not None else Lang.english
    title = translate(
        language=lang, key_string="push_notification.mfa_operation_pending.title"
    )
    description = translate(
        language=lang, key_string="push_notification.mfa_operation_pending.description"
    )

    current_logger.debug(
        f"Sending MFA push notification: {pending_operation_id}",
        pending_operation_id=pending_operation_id,
        user_id=user_id,
    )
    return UserPushNotificationParams(
        user_id=cast("UUID", user_id),  # safe to cast as we know it's a UUID
        tokens=get_push_notification_tokens_for_user(
            user_app_id=str(user_id), app_name=AppName.ALAN_BE
        ),
        name=PushNotificationName.mfa_operation_pending_push_notification,
        title=title,
        body=description,
        extra_data={
            "operation_id": str(pending_operation_id),
            "description": description,
            "type": type,  # type: ignore[dict-item]
            # Repeated so that it's also available for the screen displayed when the notification is opened
        },
    )

components.be.public.blueprints

admin_api_blueprint

admin_api_blueprint module-attribute

admin_api_blueprint = CustomBlueprint("admin_api", __name__)

admin_tools

init_blueprint

init_blueprint(state)
Source code in components/be/public/blueprints/admin_tools.py
@admin_tools_blueprint.record_once
def init_blueprint(state: BlueprintSetupState) -> None:  # noqa: D103
    from components.be.internal.billing.business_logic.invoice import (
        BEInvoiceLogic,
    )
    from components.be.internal.company_onboarding.actions.add_duty_of_advice import (
        get_admin_tool_duty_of_advice_preview,
    )
    from components.be.internal.company_onboarding.actions.add_needs_analysis import (
        get_admin_tool_needs_analysis_preview,
    )
    from components.be.internal.core_insurance.coverage.user_attestation import (
        UserAttestationLogic,
    )
    from components.be.internal.health_contract.documents.actions.generate_legacy_contract_cancellation_letter import (
        get_admin_tool_legacy_contract_cancellation_letter_preview,
    )
    from components.be.internal.health_contract.documents.actions.generate_sepa_mandate import (
        get_admin_tool_sepa_mandate_preview,
    )

    with state.app.app_context():

        def cb(admin_tool: AdminToolsBlueprint) -> None:
            admin_tool.register_document(
                UserAttestationLogic.get_admin_tool_documents()
            )
            admin_tool.register_document(BEInvoiceLogic.get_admin_tool_documents())
            admin_tool.register_document(get_admin_tool_sepa_mandate_preview())
            admin_tool.register_document(
                get_admin_tool_legacy_contract_cancellation_letter_preview()
            )
            admin_tool.register_document(get_admin_tool_needs_analysis_preview())
            admin_tool.register_document(get_admin_tool_duty_of_advice_preview())

        admin_tools_blueprint.register_document_registration_callback(cb)

load_admin_tools_blueprint

load_admin_tools_blueprint()
Source code in components/be/public/blueprints/admin_tools.py
def load_admin_tools_blueprint() -> AdminToolsBlueprint:  # noqa: D103
    from components.be.internal.admin_tools.views.document_previews import (  # noqa: F401
        document_previews,
    )
    from components.be.internal.admin_tools.views.hospitalization_invoices import (  # noqa: F401
        show_parsed_hospitalization_invoice,
    )
    from components.be.internal.admin_tools.views.index import (  # noqa: F401
        index,
        list_api_endpoints,
        list_commands,
        list_email,
        show_command,
    )
    from components.be.internal.admin_tools.views.sepa_mandate import (  # noqa: F401
        sepa_mandate,
    )
    from components.be.internal.admin_tools.views.upload_file import (  # noqa: F401
        upload_file,
    )
    from components.be.internal.company.views import (  # noqa: F401
        create_account,
        create_company,
        create_iban,
        edit_account,
        edit_company,
        list_accounts,
        list_companies,
    )
    from components.be.internal.health_plan.views.list_plans import (  # noqa: F401
        list_health_plans,
    )
    from components.be.internal.health_plan.views.show_plan import (  # noqa: F401
        show_health_plan,
    )

    _load_mailers()

    return admin_tools_blueprint

assusoft_blueprint

assusoft_blueprint module-attribute

assusoft_blueprint = CustomBlueprint('assusoft', __name__)

register_blueprint

register_blueprint(state)
Source code in components/be/public/blueprints/assusoft_blueprint.py
@assusoft_blueprint.record_once
def register_blueprint(state) -> None:  # type: ignore[no-untyped-def]  # noqa: ARG001, D103
    load_models()

be_api_blueprint

be_api_blueprint module-attribute

be_api_blueprint = CustomBlueprint(
    "be_api_blueprint", __name__
)

be_core_blueprint

be_core_blueprint module-attribute

be_core_blueprint = CustomBlueprint(
    "be-core",
    __name__,
    cli_group=None,
    template_folder="templates",
    static_folder="static",
    static_url_path="/static",
)

register_blueprint

register_blueprint(state)
Source code in components/be/public/blueprints/be_core_blueprint.py
@be_core_blueprint.record_once
def register_blueprint(state) -> None:  # type: ignore[no-untyped-def]  # noqa: ARG001, D103
    # Loading models
    import components.be.internal.models  # noqa: F401

    # Claim management
    from components.be.internal.claim_management.services.stripe.client import (
        claims_stripe,  # noqa: F401
    )
    from components.be.internal.claim_management.services.stripe.webhook import (
        stripe_claims_process_webhook,  # noqa: F401
    )
    from components.be.internal.document_parsing.services.oxygen.webhook import (
        oxygen_claims_document_auto_parsing_webhook,  # noqa: F401
    )

    # Flexben partners callbacks
    from components.be.internal.flexben.checkout.vendors.payflip.webhook import (
        payflip_checkout_process_webhook,  # noqa: F401
    )
    from components.be.internal.flexben.checkout.vendors.sdworx.webhook import (
        process_sdworx_webhook,  # noqa: F401
    )

    # Billing
    from components.be.internal.services import billing_stripe  # noqa: F401

    # Intercom
    from components.be.internal.services.intercom.webhook_controllers.intercom import (  # noqa: F401
        handle_conversation_notification,
        intercom_new_conversation_webhook,
    )

register_commands

register_commands()
Source code in components/be/public/blueprints/be_core_blueprint.py
def register_commands() -> None:  # noqa: D103
    import components.be.internal.affiliation.commands.employee  # noqa: F401, RUF100
    import components.be.internal.affiliation.commands.waiting_period  # noqa: F401, RUF100
    import components.be.internal.billing.commands.invoices  # noqa: F401, RUF100
    import components.be.internal.billing.commands.premium_entry  # noqa: F401, RUF100
    import components.be.internal.billing.commands.stripe  # noqa: F401, RUF100
    import components.be.internal.claim_management.commands.travel_assistance_csv  # noqa: F401, RUF100
    import components.be.internal.company.commands.company  # noqa: F401, RUF100
    import components.be.internal.company_onboarding.commands.company_onboarding  # noqa: F401, RUF100
    import components.be.internal.customer_dashboard.customer_admins.commands.promote_to_account_admins  # noqa: F401, RUF100
    import components.be.internal.customer_dashboard.insights.commands.insights_dashboard  # noqa: F401, RUF100
    import components.be.internal.flexben.commands.flexben  # noqa: F401, RUF100
    import components.be.internal.flexben.commands.flexben_partners  # noqa: F401, RUF100
    import components.be.internal.health_contract.commands.health_contract  # noqa: F401, RUF100
    import components.be.internal.health_plan.commands.coverage_module  # noqa: F401, RUF100
    import components.be.internal.health_plan.commands.health_coverage  # noqa: F401, RUF100
    import components.be.internal.health_plan.commands.health_plan  # noqa: F401, RUF100
    import components.be.internal.member_lifecycle.dependents.commands.dependent  # noqa: F401, RUF100
    import components.be.internal.member_lifecycle.user_profile.commands.life_cycle  # noqa: F401, RUF100
    import components.be.internal.payroll.commands.pay_csv  # noqa: F401, RUF100
    import components.be.internal.security.commands.gdpr  # noqa: F401, RUF100
    import components.be.internal.services.push_notifications.commands.push_notifications  # noqa: F401, RUF100
    import components.be.internal.services.queues.commands.queues  # noqa: F401, RUF100
    import components.be.internal.services.tracking.commands.segment  # noqa: F401, RUF100

billing_blueprint

billing_blueprint module-attribute

billing_blueprint = CustomBlueprint('billing', __name__)

claim_management_blueprint

claim_management_blueprint module-attribute

claim_management_blueprint = CustomBlueprint(
    "claim-management", __name__
)

register_blueprint

register_blueprint(state)
Source code in components/be/public/blueprints/claim_management_blueprint.py
@claim_management_blueprint.record_once
def register_blueprint(state) -> None:  # type: ignore[no-untyped-def]  # noqa: ARG001, D103
    # Register controllers
    import components.be.internal.claim_management.controllers.revolut_callback  # noqa: F401

data_consistency_blueprint

data_consistency_blueprint module-attribute

data_consistency_blueprint = CustomBlueprint(
    "data_consistency", __name__
)

subscription_api_blueprint

subscription_api_blueprint module-attribute

subscription_api_blueprint = Blueprint(
    "api/contracting/subscription", __name__
)

subscription_api_blueprint_record_once

subscription_api_blueprint_record_once(_)
Source code in components/be/public/blueprints/subscription_api_blueprint.py
7
8
9
@subscription_api_blueprint.record_once
def subscription_api_blueprint_record_once(_: BlueprintSetupState) -> None:  # noqa: D103
    pass

components.be.public.clinic

adapter

BE_COMPANIES_WITH_FIVE_THERAPY_CREDITS module-attribute

BE_COMPANIES_WITH_FIVE_THERAPY_CREDITS = [
    UUID("ac9bb33f-9678-4cde-8962-599ec5f23aca")
]

BE_COMPANIES_WITH_SIX_THERAPY_CREDITS module-attribute

BE_COMPANIES_WITH_SIX_THERAPY_CREDITS = [
    UUID("92c93177-dd7e-45b0-95ba-705190423cad"),
    UUID("f69568ac-21bb-4b45-9174-6594c8490ff2"),
]

BeClinicAdapter

Bases: ClinicAdapter

clinic_consent_ai_publish_date = datetime(2024, 12, 11)
get_allowlist_of_dermatology_medical_admins_ids
get_allowlist_of_dermatology_medical_admins_ids()
Source code in components/be/public/clinic/adapter.py
def get_allowlist_of_dermatology_medical_admins_ids(self) -> list[str]:  # noqa: D102
    # No dermatology session in Be
    return []
get_app_base_user_data
get_app_base_user_data(app_user_id)
Source code in components/be/public/clinic/adapter.py
def get_app_base_user_data(self, app_user_id: str) -> BaseUserData:  # noqa: D102
    from components.be.public.user import get_user as get_be_user

    be_user = get_be_user(UUID(app_user_id))
    return BaseUserData(
        first_name=be_user.normalized_first_name,
        last_name=be_user.normalized_last_name,
    )
get_app_user_available_health_services
get_app_user_available_health_services(app_user_id)
Source code in components/be/public/clinic/adapter.py
def get_app_user_available_health_services(  # noqa: D102
    self, app_user_id: str
) -> list[AvailableHealthService]:
    from components.be.public.user import get_user_capabilities

    available_health_services = [
        AvailableHealthService(name=AvailableHealthServiceName.DATO_CONTENT),
        AvailableHealthService(name=AvailableHealthServiceName.HEALTH_PROGRAM),
    ]
    capabilities = get_user_capabilities(user_id=UUID(app_user_id))

    if capabilities.video_consultation:
        available_health_services.append(
            AvailableHealthService(
                name=AvailableHealthServiceName.VIDEO_CONSULTATION,
            ),
        )

    if capabilities.video_therapy:
        available_health_services.extend(
            [
                AvailableHealthService(
                    name=AvailableHealthServiceName.THERAPY_SESSION,
                ),
                AvailableHealthService(
                    name=AvailableHealthServiceName.ORIENTATION_CALL
                ),
            ]
        )

    return available_health_services
get_app_user_data
get_app_user_data(
    app_user_id, compute_key_account_info=False
)
Source code in components/be/public/clinic/adapter.py
def get_app_user_data(  # noqa: D102
    self,
    app_user_id: str,
    compute_key_account_info: bool = False,  # noqa: ARG002
) -> UserData:
    from components.be.public.user import (
        get_last_itsme_link_for_clinic,
        get_user_for_clinic as get_be_user,
        get_visible_dependents,
    )

    be_user = get_be_user(UUID(app_user_id))
    be_profile = be_user.be_profile
    user_itsme_link = get_last_itsme_link_for_clinic(UUID(app_user_id))

    ssn_string = None
    if be_profile and be_profile.formatted_eid and be_profile.formatted_rrn:
        ssn_string = (
            f"eID: {be_profile.formatted_eid} / NRN: {be_profile.formatted_rrn}"
        )
    elif user_itsme_link:
        ssn_string_parts = []
        if user_itsme_link.eid_card_number:
            ssn_string_parts.append(f"eID: {user_itsme_link.eid_card_number}")
        if user_itsme_link.national_registration_number:
            ssn_string_parts.append(
                f"NRN: {user_itsme_link.national_registration_number}"
            )

        if ssn_string_parts:
            ssn_string = " / ".join(ssn_string_parts)

        if len(ssn_string_parts) < 2:
            current_logger.warning(
                "Belgium clinic user has an ITSME user with missing data",
                user_id=app_user_id,
                user_itsme_link=user_itsme_link.id,
            )

    dependent_users = [
        enrollment.insurance_profile.user
        for enrollment in get_visible_dependents(be_user.id)
    ]

    dependents = [
        Dependent(
            app_user_id=str(user.id),
            first_name=user.normalized_first_name,
            last_name=user.normalized_last_name,
            age=int(user.age) if user.age else None,
            gender=user.gender,
            birth_date=user.birth_date,
            dependent_type=None,  # BE doesn't use dependent type
        )
        for user in dependent_users
        if user != be_user
    ]

    address = be_user.address

    return UserData(
        first_name=be_user.normalized_first_name,
        last_name=be_user.normalized_last_name,
        gender=be_user.gender,
        email=be_user.email,
        profile_id=be_user.profile_id,
        birth_date=be_user.birth_date,
        phone=be_user.phone,
        country=address.country if address else None,
        address=address.as_one_line if address else None,
        ssn=ssn_string,
        lang=be_user.lang,
        is_alaner=be_user.is_alaner,
        dependents=dependents,
    )
get_booking_session_package
get_booking_session_package(app_user_id, session_type)
Source code in components/be/public/clinic/adapter.py
def get_booking_session_package(  # noqa: D102
    self, app_user_id: str, session_type: TherapistBookingSessionType
) -> BookingSessionPackage | None:
    from components.be.public.company import (
        get_company_id_of_user,
    )

    if session_type == TherapistBookingSessionType.therapy:
        company_id = get_company_id_of_user(UUID(app_user_id))

        if not company_id:
            return BookingSessionPackage(
                price_in_cents=THERAPY_SESSION_TNS_PRICE_IN_CENTS,
                included=None,
            )

        return BookingSessionPackage(
            price_in_cents=THERAPY_SESSION_COMPANY_PRICE_IN_CENTS,
            included=BookingSessionPackageCount(
                count_limit=6
                if company_id in BE_COMPANIES_WITH_SIX_THERAPY_CREDITS
                else 5
                if company_id in BE_COMPANIES_WITH_FIVE_THERAPY_CREDITS
                else 2,
            ),
        )
    return None
get_coverage_status
get_coverage_status(app_user_id)
Source code in components/be/public/clinic/adapter.py
def get_coverage_status(self, app_user_id: str) -> CoverageStatus | None:  # noqa: D102
    from components.be.public.clinic.clinic_eligibility import get_coverage_status

    return get_coverage_status(user_id=UUID(app_user_id))
get_last_active_id_verification_request_for_user
get_last_active_id_verification_request_for_user(
    app_user_id,
)

BE implementation of getting the last active ID verification request for a user. Since Belgium doesn't support ID verification for the clinic, this will raise a NotImplementedError if it's called.

Source code in components/be/public/clinic/adapter.py
def get_last_active_id_verification_request_for_user(
    self,
    app_user_id: str,
) -> None:
    """
    BE implementation of getting the last active ID verification request for a user.
    Since Belgium doesn't support ID verification for the clinic, this will raise a NotImplementedError if it's called.
    """
    raise NotImplementedError(
        "BE doesn't support ID verification for clinic users."
    )
has_access_to_orientation_call
has_access_to_orientation_call(app_user_id)
Source code in components/be/public/clinic/adapter.py
def has_access_to_orientation_call(  # noqa: D102
    self, app_user_id: str
) -> bool:
    from components.be.public.user import get_user_capabilities

    capabilities = get_user_capabilities(user_id=UUID(app_user_id))
    return capabilities.video_therapy
has_app_user_permission
has_app_user_permission(app_user_id, permission)
Source code in components/be/public/clinic/adapter.py
def has_app_user_permission(  # noqa: D102
    self, app_user_id: str, permission: EmployeePermission
) -> bool:
    from components.be.public.user import get_user as get_be_user

    be_user = get_be_user(UUID(app_user_id))
    return has_permission(be_user, permission)
is_app_user_admin_of_company
is_app_user_admin_of_company(app_user_id, app_company_id)
Source code in components/be/public/clinic/adapter.py
def is_app_user_admin_of_company(  # noqa: D102
    self, app_user_id: str, app_company_id: str
) -> bool:
    from components.be.public.company import user_is_company_admin

    return user_is_company_admin(
        user_id=UUID(app_user_id), company_id=UUID(app_company_id)
    )
release_date_of_conversations_created_for_therapy_sessions class-attribute instance-attribute
release_date_of_conversations_created_for_therapy_sessions = datetime(
    2025, 2, 17
)
request_id_verification_request_for_user
request_id_verification_request_for_user(
    app_user_id, user_info, commit=True
)

BE implementation of getting or requesting ID verification request for a clinic user. Since BE doesn't support ID verification for clinic users, this will raise a NotImplementedError if it's called.

Source code in components/be/public/clinic/adapter.py
def request_id_verification_request_for_user(
    self,
    app_user_id: str,
    user_info: ClinicUserDataForIdVerification,
    commit: bool = True,
) -> None:
    """
    BE implementation of getting or requesting ID verification request for a clinic user.
    Since BE doesn't support ID verification for clinic users, this will raise a NotImplementedError if it's called.
    """
    raise NotImplementedError(
        "BE doesn't support ID verification for clinic users."
    )
should_request_id_verification_for_user
should_request_id_verification_for_user(app_user_id)

BE implementation of checking if ID verification should be requested for a clinic user. Since BE doesn't support ID verification for clinic users, this will always return False.

Source code in components/be/public/clinic/adapter.py
def should_request_id_verification_for_user(
    self,
    app_user_id: str,  # noqa: ARG002
) -> bool:
    """
    BE implementation of checking if ID verification should be requested for a clinic user.
    Since BE doesn't support ID verification for clinic users, this will always return False.
    """
    return False
update_app_user_phone
update_app_user_phone(app_user_id, phone)
Source code in components/be/public/clinic/adapter.py
def update_app_user_phone(self, app_user_id: str, phone: str | None) -> None:  # noqa: D102
    from components.be.public.user import update_user_phone as update_be_user_phone

    update_be_user_phone(user_id=UUID(app_user_id), phone=phone)
update_app_user_ssn
update_app_user_ssn(app_user_id, ssn, commit=False)
Source code in components/be/public/clinic/adapter.py
def update_app_user_ssn(  # noqa: D102
    self, app_user_id: str, ssn: str | None, commit: bool = False
) -> None:
    from components.be.public.clinic.user import update_user_nrn_and_eid

    if ssn:
        update_user_nrn_and_eid(
            user_id=UUID(app_user_id), rrn_and_eid=ssn, commit=commit
        )
upload_invoice_as_insurance_document
upload_invoice_as_insurance_document(
    file, app_user_id, upload_invoice_data
)
Source code in components/be/public/clinic/adapter.py
def upload_invoice_as_insurance_document(  # noqa: D102
    self,
    file: IO,  # type: ignore[type-arg]  # noqa: ARG002
    app_user_id: str,  # noqa: ARG002
    upload_invoice_data: UploadInvoiceData,  # noqa: ARG002
) -> bool:
    # BE doesn't support insurance document uploads
    return False
user_has_24_hour_response_guarantee
user_has_24_hour_response_guarantee(app_user_id)
Source code in components/be/public/clinic/adapter.py
def user_has_24_hour_response_guarantee(  # noqa: D102
    self,
    app_user_id: str,  # noqa: ARG002
) -> bool:
    # BE doesn't support 24 hour response guarantee
    return False
validate_session_duration
validate_session_duration(session_duration)
Source code in components/be/public/clinic/adapter.py
def validate_session_duration(  # noqa: D102
    self,
    session_duration: int,
) -> None:
    # No banned session durations in BE
    pass

THERAPY_SESSION_COMPANY_PRICE_IN_CENTS module-attribute

THERAPY_SESSION_COMPANY_PRICE_IN_CENTS = 7500

THERAPY_SESSION_TNS_PRICE_IN_CENTS module-attribute

THERAPY_SESSION_TNS_PRICE_IN_CENTS = 8000

clinic_adapter module-attribute

clinic_adapter = BeClinicAdapter()

clinic_eligibility

This module contains the query to get the current or upcoming period of eligibility to the clinic restricted services.

NOTE: the logic could be reused by other services than the Clinic, provided the country-specific rules are the same. If yes, feel free to rename the file and query to a more generic name.

get_coverage_status

get_coverage_status(user_id)

Return the start and optionally the end date of the current or upcoming period of eligibility to the clinic restricted services.

Source code in components/be/public/clinic/clinic_eligibility.py
def get_coverage_status(user_id: UUID) -> CoverageStatus | None:
    """
    Return the start and optionally the end date of the current or upcoming period of eligibility to the clinic restricted services.
    """
    from components.be.internal.affiliation.queries.enrollment import (
        get_all_enrollments_active_or_not_yet_started_on,
    )

    insurance_profile = (
        current_session.query(BeInsuranceProfile.id)  # noqa: ALN085
        .filter(BeInsuranceProfile.user_id == user_id)
        .one_or_none()
    )

    if not insurance_profile:
        return None

    enrollments = sorted(
        get_all_enrollments_active_or_not_yet_started_on(
            insurance_profile_id=insurance_profile.id, the_date=date.today()
        ),
        key=lambda e: e.start_date,
    )

    if not enrollments:
        return None

    start_date = enrollments[0].start_date
    end_date = enrollments[0].end_date

    # Get the end date of the last enrollment if there is no break between the current and the next enrollment
    # If there is a break, we should wait the next enrollment to start the new period of eligibility
    for index, enrollment in enumerate(enrollments):
        if index == 0:
            continue
        if not end_date or enrollment.start_date - timedelta(days=1) > end_date:
            break
        end_date = enrollment.end_date

    return CoverageStatus(
        start_date=start_date,
        end_date=end_date,
    )

user

update_user_nrn_and_eid

update_user_nrn_and_eid(user_id, rrn_and_eid, commit=False)
Source code in components/be/public/clinic/user.py
def update_user_nrn_and_eid(  # noqa: D103
    user_id: uuid.UUID, rrn_and_eid: str, commit: bool = False
) -> None:
    from components.be.internal.customer_health_partner.clinic.actions.user import (
        update_user_nrn_and_eid as _update_user_nrn_and_eid,
    )

    _update_user_nrn_and_eid(user_id=user_id, rrn_and_eid=rrn_and_eid, commit=commit)

components.be.public.command_log

get_command_logs

get_command_logs(start_at, end_at)
Source code in components/be/public/command_log.py
def get_command_logs(start_at: datetime, end_at: datetime) -> list[CommandLog]:  # noqa: D103
    logs = current_session.scalars(
        select(BeCommandLog).filter(
            BeCommandLog.created_at >= start_at, BeCommandLog.created_at < end_at
        )
    )

    return [
        CommandLog(
            id=log.id,
            command=log.command,
            run_at=log.run_at,
            completed_at=log.completed_at,
            success=log.success,
            model_slug="becommandlog",
        )
        for log in logs
    ]

components.be.public.company

company_has_live_health_contract_since

company_has_live_health_contract_since(company_id, since)

Checks that a company has had a live health contract in the time between the "since" date and now

Source code in components/be/public/company.py
def company_has_live_health_contract_since(company_id: uuid.UUID, since: date) -> bool:
    """Checks that a company has had a live health contract in the time between the "since" date and now"""
    live_health_contracts = (
        current_session.query(BeCompany)  # noqa: ALN085
        .options(joinedload(BeCompany.health_contracts))
        .filter(
            BeCompany.id == company_id,
            BeHealthContract.is_ever_active_between(since, utctoday()),
        )
        .count()
    )

    return live_health_contracts > 0

get_account_ids_from_company_ids

get_account_ids_from_company_ids(company_ids)

Extract account IDs list from company IDs.

Source code in components/be/public/company.py
def get_account_ids_from_company_ids(company_ids: Iterable[str]) -> list[str]:
    """Extract account IDs list from company IDs."""
    company_to_account_mapping = get_company_id_to_account_id(company_ids)
    account_ids = set(company_to_account_mapping.values())
    return sorted(account_ids)

get_company_admins

get_company_admins(company_id, on_date)

Returns list of company admins active on on_date for a specific company.

Source code in components/be/public/company.py
def get_company_admins(company_id: uuid.UUID, on_date: date) -> list[CompanyAdmin]:
    """
    Returns list of company admins active on on_date for a specific company.
    """
    return _get_company_admins(company_id=company_id, on_date=on_date)

get_company_display_name

get_company_display_name(company_id)
Source code in components/be/public/company.py
def get_company_display_name(company_id: uuid.UUID) -> str:  # noqa: D103
    company = get_or_raise_missing_resource(BeCompany, company_id)
    return company.display_name

get_company_display_name_and_account_id

get_company_display_name_and_account_id(company_id)
Source code in components/be/public/company.py
def get_company_display_name_and_account_id(  # noqa: D103
    company_id: uuid.UUID,
) -> tuple[str, uuid.UUID]:
    company = get_or_raise_missing_resource(BeCompany, company_id)
    return company.display_name, company.account_id

get_company_id_of_user

get_company_id_of_user(user_id)
Source code in components/be/public/company.py
def get_company_id_of_user(user_id: uuid.UUID) -> uuid.UUID | None:  # noqa: D103
    try:
        contract = get_active_or_future_health_contract(user_id=user_id)
        return contract.company_id
    except Exception:
        return None

get_company_id_to_account_id

get_company_id_to_account_id(company_ids)
Source code in components/be/public/company.py
def get_company_id_to_account_id(  # noqa: D103
    company_ids: Iterable[str],
) -> dict[str, str]:
    companies = current_session.query(BeCompany).filter(BeCompany.id.in_(company_ids))  # noqa: ALN085
    return {str(company.id): str(company.account_id) for company in companies}

get_company_ids_for_account_id

get_company_ids_for_account_id(account_id)
Source code in components/be/public/company.py
def get_company_ids_for_account_id(account_id: uuid.UUID) -> list[uuid.UUID]:  # noqa: D103
    companies = (
        current_session.query(BeCompany)  # noqa: ALN085
        .filter(BeCompany.account_id == account_id)
        .all()
    )
    return [company.id for company in companies]

get_company_vat

get_company_vat(company_id)
Source code in components/be/public/company.py
def get_company_vat(company_id: uuid.UUID) -> str:  # noqa: D103
    return get_company_vat_business_logic(company_id=company_id)

get_employees_invite_emails

get_employees_invite_emails(company_id)
Source code in components/be/public/company.py
def get_employees_invite_emails(company_id: uuid.UUID) -> set[str]:  # noqa: D103
    active_employments = (
        current_session.query(BeEmployment)  # noqa: ALN085
        .filter(
            BeEmployment.company_id == company_id,
            BeEmployment.is_active,
            BeEmployment.invite_email.isnot(None),
        )
        .all()
    )
    return {employment.invite_email for employment in active_employments}  # type: ignore[misc]

get_user_admined_company_ids

get_user_admined_company_ids(user_id)
Source code in components/be/public/company.py
def get_user_admined_company_ids(user_id: uuid.UUID) -> list[uuid.UUID]:  # noqa: D103
    companies = company_logic.get_admined_companies_by_user(user_id=user_id)
    return [company.id for company in companies]

is_company

is_company(company_id)
Source code in components/be/public/company.py
def is_company(company_id: uuid.UUID) -> bool:  # noqa: D103
    return current_session.get(BeCompany, company_id) is not None

user_is_company_admin

user_is_company_admin(user_id, company_id)
Source code in components/be/public/company.py
def user_is_company_admin(user_id: uuid.UUID, company_id: uuid.UUID) -> bool:  # noqa: D103
    return company_logic.user_is_admin(user_id, company_id)

components.be.public.coverage_module

CoverageModuleMappingsByHealthPlan dataclass

CoverageModuleMappingsByHealthPlan(mappings)

Represents the mapping of current coverage modules names to new ones for each health plan.

extract_coverage_module_mappings classmethod

extract_coverage_module_mappings(coverage_mapping_data)

Creates instance from JSON string containing coverage mappings. Returns None if data is missing or invalid.

Source code in components/be/public/coverage_module.py
@classmethod
def extract_coverage_module_mappings(
    cls,
    coverage_mapping_data: str | None,
) -> Optional["CoverageModuleMappingsByHealthPlan"]:
    """
    Creates instance from JSON string containing coverage mappings.
    Returns None if data is missing or invalid.
    """
    if not coverage_mapping_data:
        return None

    instance = cls(mappings={})
    json_data = json.loads(coverage_mapping_data)

    if not json_data:
        return None

    for health_plan_id, mappings in json_data.items():
        instance.mappings[health_plan_id] = HealthPlanCoverageMappings.from_dict(
            mappings
        )

    return instance

from_dict classmethod

from_dict(data)

Creates instance from dictionary format.

Source code in components/be/public/coverage_module.py
@classmethod
def from_dict(
    cls, data: dict[str, list[tuple[str, str]]]
) -> "CoverageModuleMappingsByHealthPlan":
    """
    Creates instance from dictionary format.
    """
    instance = cls(mappings={})
    for health_plan_id, mappings in data.items():
        instance.mappings[health_plan_id] = HealthPlanCoverageMappings.from_dict(
            mappings
        )
    return instance

get_module_mappings_for_health_plan

get_module_mappings_for_health_plan(health_plan_id)

Returns the coverage mappings for a given health plan.

Source code in components/be/public/coverage_module.py
def get_module_mappings_for_health_plan(
    self, health_plan_id: UUID
) -> HealthPlanCoverageMappings | None:
    """
    Returns the coverage mappings for a given health plan.
    """
    return self.mappings.get(str(health_plan_id))

mappings instance-attribute

mappings

to_dict

to_dict()

Serializes the mappings to dictionary format.

Source code in components/be/public/coverage_module.py
def to_dict(self) -> dict[str, list[tuple[str, str]]]:
    """
    Serializes the mappings to dictionary format.
    """
    return {
        health_plan_id: mappings.to_dict()
        for health_plan_id, mappings in self.mappings.items()
    }

HealthPlanCoverageMappings dataclass

HealthPlanCoverageMappings(mappings)

Represents mappings from current to new coverage module names for a single health plan.

from_dict classmethod

from_dict(data)

Creates instance from a list of tuples format. Validates and transforms string values to CoverageModuleName.

Source code in components/be/public/coverage_module.py
@classmethod
def from_dict(cls, data: list[tuple[str, str]]) -> "HealthPlanCoverageMappings":
    """
    Creates instance from a list of tuples format.
    Validates and transforms string values to CoverageModuleName.
    """
    validated_mappings = {
        CoverageModuleName.validate(current): CoverageModuleName.validate(new)
        for current, new in data
    }
    return cls(mappings=validated_mappings)

get_mapped_module_name

get_mapped_module_name(current_module_name)

Returns the mapped coverage module name for a given current module name.

Source code in components/be/public/coverage_module.py
def get_mapped_module_name(
    self, current_module_name: CoverageModuleName
) -> CoverageModuleName | None:
    """
    Returns the mapped coverage module name for a given current module name.
    """
    return self.mappings.get(current_module_name)

mappings instance-attribute

mappings

to_dict

to_dict()

Serializes the mappings to a list of tuples format. Converts CoverageModuleName enum values to strings.

Source code in components/be/public/coverage_module.py
def to_dict(self) -> list[tuple[str, str]]:
    """
    Serializes the mappings to a list of tuples format.
    Converts CoverageModuleName enum values to strings.
    """
    return [(str(current), str(new)) for current, new in self.mappings.items()]

components.be.public.customer_health_partner

employees_count

get_employees_count

get_employees_count(account_id)
Source code in components/be/public/customer_health_partner/employees_count.py
def get_employees_count(account_id: UUID) -> int:  # noqa: D103
    from components.be.internal.models.be_company import BeCompany
    from components.be.internal.models.be_employment import (
        BeEmployment,
    )

    all_company_ids_from_account = [
        str(company_id)
        for (company_id,) in current_session.query(BeCompany.id)  # noqa: ALN085
        .filter(BeCompany.account_id == account_id)
        .all()
    ]

    total_number_of_employees = (
        current_session.query(BeEmployment.id)  # noqa: ALN085
        .filter(
            BeEmployment.company_id.in_(all_company_ids_from_account),
            BeEmployment.is_cancelled is not True,  # type: ignore[arg-type,comparison-overlap]
            BeEmployment.start_date <= date.today(),
            BeEmployment.end_date.is_(None),
        )
        .join(BeEmployment.user)
        .distinct()
        .count()
    )

    return total_number_of_employees

get_admin_traits

get_admin_traits_to_notify

get_admin_traits_to_notify(admin_id, company_ids)

Return the list of AdminTraits for admins who should be notified about the well-being assessment report.

Source code in components/be/public/customer_health_partner/get_admin_traits.py
def get_admin_traits_to_notify(
    admin_id: str | None,
    company_ids: list[str],  # noqa: ARG001
) -> Sequence[AdminTraits]:
    """
    Return the list of AdminTraits for admins who should be notified about the well-being assessment report.
    """
    users_traits_of_users_to_notify: list[AdminTraits] = []

    # we notify only the admin who created the assessment
    if admin_id is not None:
        admin_user = get_or_raise_missing_resource(BeUser, admin_id)
        users_traits_of_users_to_notify.append(AdminTraits(admin_user))

    return users_traits_of_users_to_notify

get_company_ids_for_account_company_ids

get_company_ids_for_account_company_ids

get_company_ids_for_account_company_ids(
    company_ids_in_account,
)
Source code in components/be/public/customer_health_partner/get_company_ids_for_account_company_ids.py
def get_company_ids_for_account_company_ids(  # noqa: D103
    company_ids_in_account: list[str],
) -> set[str]:
    from components.be.internal.models.be_account import BeAccount
    from components.be.internal.models.be_company import (
        BeCompany,
    )

    account: BeAccount = mandatory(
        current_session.get(BeCompany, company_ids_in_account[0]).account  # type: ignore[union-attr]
    )
    account_company_ids = {str(company.id) for company in account.companies}
    return account_company_ids

components.be.public.events

subscription

subscribe_to_events

subscribe_to_events()

All event subscriptions for France should be done here.

Source code in components/be/public/events/subscription.py
1
2
3
4
5
6
7
8
9
def subscribe_to_events() -> None:
    """
    All event subscriptions for France should be done here.
    """
    from components.ca.public.events.subscription import subscribe_to_ca_global_events
    from components.es.public.events.subscription import subscribe_to_es_global_events

    subscribe_to_es_global_events()
    subscribe_to_ca_global_events()

components.be.public.feature

get_feature_metadata_value

get_feature_metadata_value(feature_name, key, default=None)
Source code in components/be/public/feature.py
def get_feature_metadata_value(  # noqa: D103
    feature_name: str,
    key: str,
    default: Any | None = None,
) -> Any | None:
    return feature_logic.get_feature_metadata_value(
        feature_name=feature_name, key=key, default=default
    )

is_feature_active

is_feature_active(feature_name)

Check if a feature is active :param feature_name: the name of the feature :return: true if the feature with this name is found and active

Source code in components/be/public/feature.py
def is_feature_active(
    feature_name: str,
) -> bool:
    """
    Check if a feature is active
    :param feature_name: the name of the feature
    :return: true if the feature with this name is found and active
    """
    return FeatureLogic.is_feature_active(feature_name=feature_name)

components.be.public.global_customer_dashboard

admin

admin_can_edit_rights_target_admin

admin_can_edit_rights_target_admin(
    target_admin_user_id, admin_user_company_id
)
Source code in components/be/public/global_customer_dashboard/admin.py
def admin_can_edit_rights_target_admin(  # noqa: D103
    target_admin_user_id: uuid.UUID, admin_user_company_id: str
) -> bool:
    company_targeted = get_or_raise_missing_resource(BeCompany, admin_user_company_id)

    # Reasoning: we already know that the current user is NOT an AccountAdmin
    # else the query would not have been triggered from the Frontend. So we only need to know if the targeted
    # admin is an AccountAdmin of the account that contains the company the current user is a CompanyAdmin of.
    target_user_is_account_admin = current_session.query(  # noqa: ALN085
        current_session.query(BeAccountAdmin)  # noqa: ALN085
        .filter(
            BeAccountAdmin.user_id == target_admin_user_id,
            BeAccountAdmin.account_id == company_targeted.account_id,
            BeAccountAdmin.is_not_ended,
        )
        .exists()
    ).scalar()

    return not target_user_is_account_admin

edit_admin_entities

edit_admin_entities(
    admin_user_id,
    added_account_ids,
    removed_account_ids,
    added_company_ids,
    removed_company_ids,
    save=True,
)
Source code in components/be/public/global_customer_dashboard/admin.py
def edit_admin_entities(  # noqa: D103
    admin_user_id: uuid.UUID,
    added_account_ids: set[uuid.UUID],
    removed_account_ids: set[uuid.UUID],
    added_company_ids: set[uuid.UUID],
    removed_company_ids: set[uuid.UUID],
    save: bool = True,
) -> None:
    check_resource_exists_or_raise(BeUser, admin_user_id)

    _remove_admin_from_accounts(
        admin_user_id=admin_user_id, removed_account_ids=removed_account_ids
    )
    _remove_admin_from_companies(
        admin_user_id=admin_user_id, removed_company_ids=removed_company_ids
    )

    for company_id in added_company_ids:
        check_resource_exists_or_raise(BeCompany, company_id)
        if not company_logic.user_is_admin(
            user_id=admin_user_id, company_id=company_id
        ):
            company_admin = BeCompanyAdmin(
                user_id=admin_user_id,
                company_id=company_id,
            )
            current_session.add(company_admin)

    for account_id in added_account_ids:
        check_resource_exists_or_raise(BeAccount, account_id)
        if not is_admin(user_id=admin_user_id, account_id=account_id):
            add_account_admin(
                user_id=admin_user_id, account_id=account_id, commit=False
            )

    if save:
        current_session.commit()

get_account_details

get_account_details(account_id)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_account_details(account_id: uuid.UUID) -> CustomerDashboardAccount:  # noqa: D103
    account = get_or_raise_missing_resource(BeAccount, account_id)
    return CustomerDashboardAccount(id=str(account.id), name=account.name)

get_admin

get_admin(admin_user_id, user_id)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_admin(  # noqa: D103
    admin_user_id: uuid.UUID, user_id: uuid.UUID
) -> CustomerDashboardCompanyAdmin | None:
    admin_user_companies = (
        select(BeCompanyAdmin.company_id)
        .filter(BeCompanyAdmin.user_id == admin_user_id, BeCompanyAdmin.is_not_ended)
        .cte("admin_user_companies")
    )
    admin_user_accounts = (
        select(BeAccountAdmin.account_id)
        .filter(BeAccountAdmin.user_id == admin_user_id, BeAccountAdmin.is_not_ended)
        .cte("admin_user_accounts")
    )
    admin_user_transitively_admined_companies = (
        select(BeCompany.id.label("company_id"))
        .select_from(BeAccountAdmin)
        .join(BeAccountAdmin.account)
        .join(BeAccount.companies)
        .filter(
            BeAccountAdmin.user_id == admin_user_id,
            BeAccountAdmin.is_not_ended,
        )
        .cte("admin_indirect_companies")
    )

    result = current_session.execute(
        select(BeUser, BeCompanyAdmin)
        .outerjoin(BeCompanyAdmin)
        .outerjoin(BeAccountAdmin)
        .filter(
            or_(
                and_(
                    BeCompanyAdmin.user_id == user_id,
                    BeCompanyAdmin.is_not_ended,
                    BeCompanyAdmin.company_id.in_(
                        select(admin_user_companies.c.company_id).union(
                            select(
                                admin_user_transitively_admined_companies.c.company_id
                            ),
                        ),
                    ),
                ),
                and_(
                    BeAccountAdmin.user_id == user_id,
                    BeAccountAdmin.is_not_ended,
                    BeAccountAdmin.account_id.in_(
                        select(admin_user_accounts.c.account_id)
                    ),
                ),
            ),
        )
        .limit(1)
    ).first()

    if result is None:
        return None

    user, company_admin = result
    profile_service = ProfileService.create()
    profile = profile_service.get_or_raise_profile(user.profile_id)

    email = (
        get_privacy_compliant_email_for_user(
            company_admin_id=company_admin.id, company_id=company_admin.company_id
        )
        if company_admin
        # Account admin invitations don't exist in BE. Directly going for the
        # fallback value that is used in get_privacy_compliant_email_for_user
        else profile.email
    )

    return CustomerDashboardCompanyAdmin(
        id=str(user.id),
        first_name=profile.first_name,
        last_name=profile.last_name or "",
        email=email,
        type=EntityType.company,
    )

get_admined_entities_for_entity_selctor_be

get_admined_entities_for_entity_selctor_be(user_id)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_admined_entities_for_entity_selctor_be(  # noqa: D103
    user_id: str,
) -> list[AdminedEntityForEntitySelector]:
    return get_admined_entities_for_entity_selector_global(
        user_id=user_id,
        operational_scope_model=None,
        company_model=BeCompany,
        account_model=BeAccount,
        admined_entities_query_api=get_admined_entities_query_api_be(),
    )

get_admined_entities_query_api_be

get_admined_entities_query_api_be()

Get Belgium concrete instance of AdminedEntitiesQueryApi, bound to BE models

Source code in components/be/public/global_customer_dashboard/admin.py
def get_admined_entities_query_api_be() -> AdminedEntitiesQueryApi:
    """
    Get Belgium concrete instance of AdminedEntitiesQueryApi, bound to BE models
    """
    return AdminedEntitiesQueryApi(
        company_admin_model=BeCompanyAdmin,
        account_admin_model=BeAccountAdmin,
        get_account_id_to_company_ids_fn=get_account_id_to_company_ids,
    )

get_admined_entity_from_id_be

get_admined_entity_from_id_be(*, user_id, entity_id)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_admined_entity_from_id_be(  # noqa: D103
    *,
    user_id: str,
    entity_id: str,
) -> AdminedEntityForEntitySelector:
    admin_entities = get_admined_entities_for_entity_selector_global(
        user_id=user_id,
        operational_scope_model=None,
        company_model=BeCompany,
        account_model=BeAccount,
        admined_entities_query_api=get_admined_entities_query_api_be(),
    )

    # We only care about the id property here, since if the entity is a company, that's the company id,
    # and if the entity is an account, we fill its id field with the account id
    admin_entity = list(
        filter(lambda entity: str(entity.id) == entity_id, admin_entities)
    )

    if len(admin_entity) != 1:
        raise BaseErrorCode.forbidden(
            message="The queried entity id should correspond to an entity the user administrates"
        )

    return admin_entity[0]

get_admins_for_entities

get_admins_for_entities(
    company_ids,
    sort_filter,
    sort_direction,
    cursor=None,
    limit=None,
)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_admins_for_entities(  # noqa: D103
    company_ids: Iterable[uuid.UUID],
    sort_filter: CustomerDashboardCompanyAdminListSortField,
    sort_direction: PaginationSortDirectionType,
    cursor: int | None = None,
    limit: int | None = None,
) -> PaginatedAdminsForAdminDashboard:
    all_account_ids = set()
    accounts = (
        get_customer_admin_dependency()
        .get_account_org_tree_query_api()
        .get_account_for_entities(
            [(company_id, EntityType.company) for company_id in company_ids]
        )
    )
    for account in accounts:
        all_account_ids.add(uuid.UUID(account.id))

    admins_query = (
        current_session.query(  # noqa: ALN085
            BeUser,
            func.array_remove(func.array_agg(BeCompanyAdmin.company_id), None),
            func.array_remove(
                func.array_agg(
                    case(
                        (BeAccountAdmin.is_not_ended, BeAccountAdmin.account_id),
                        else_=None,
                    )
                ),
                None,
            ),
        )
        .select_from(BeUser)
        .join(BeCompanyAdmin)
        .outerjoin(BeAccountAdmin)
        .filter(
            or_(
                and_(
                    BeCompanyAdmin.company_id.in_(company_ids),
                    BeCompanyAdmin.is_not_ended,
                ),
                and_(
                    BeAccountAdmin.account_id.in_(all_account_ids),
                    BeAccountAdmin.is_not_ended,
                ),
            )
        )
        .group_by(BeUser)
    )

    admins_query = paginate_query(
        query=admins_query,
        sort_filter=sort_filter,
        sort_direction=sort_direction,
        cursor=cursor,
        limit=limit,
        sort_model=BeUser,
    )

    profile_service = ProfileService.create()
    user_ids = [user.id for user, _, _ in admins_query]
    profiles_mapping = profile_service.user_compat.get_user_profile_mapping(user_ids)

    res = []

    user_id_to_invitation_email = get_invitation_email_for_users_and_entities(
        {
            str(user.id): {
                (str(company_id), EntityType.company)
                for company_id in admined_company_ids
            }
            for user, admined_company_ids, _ in admins_query
        }
    )

    for user, admined_company_ids, admined_account_ids in admins_query:
        profile = profiles_mapping.get_or_raise_profile_for_user(user.id)
        email = user_id_to_invitation_email[str(user.id)]
        res.append(
            CustomerDashboardAdminWithEntities(
                id=str(user.id),
                first_name=profile.first_name,
                last_name=profile.last_name or "",
                email=email,
                type=EntityType.account
                if len(admined_account_ids) > 0
                else EntityType.company,
                account_id=first_or_none(admined_account_ids),
                company_ids=admined_company_ids,
            )
        )
    return PaginatedAdminsForAdminDashboard(
        admins=res,
        meta=PageInfoMeta(
            cursor=cursor,
            has_next=len(res) == limit if limit is not None else None,
        ),
    )

get_common_companies_for_admins

get_common_companies_for_admins(
    admin_user_id, target_admin_user_id
)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_common_companies_for_admins(  # noqa: D103
    admin_user_id: uuid.UUID, target_admin_user_id: uuid.UUID
) -> set[CustomerDashboardCompany]:
    check_resource_exists_or_raise(BeUser, admin_user_id)

    admin_companies = (
        current_session.query(BeCompany)  # noqa: ALN085
        .join(BeCompanyAdmin)
        .filter(BeCompanyAdmin.user_id == admin_user_id, BeCompanyAdmin.is_not_ended)
    )

    admin_companies_ids = {
        _company_to_customer_dashboard_company(company=company)
        for company in admin_companies
    }

    if admin_user_id == target_admin_user_id:
        return admin_companies_ids

    # This logic is flawed, it does not handle the case of an account admin trying to check a single company admin
    check_resource_exists_or_raise(BeUser, target_admin_user_id)

    target_admin_companies = (
        current_session.query(BeCompany)  # noqa: ALN085
        .join(BeCompanyAdmin)
        .filter(
            BeCompanyAdmin.user_id == target_admin_user_id, BeCompanyAdmin.is_not_ended
        )
    )

    target_admin_companies_ids = {
        _company_to_customer_dashboard_company(company=company)
        for company in target_admin_companies
    }
    common_company_ids = admin_companies_ids.intersection(target_admin_companies_ids)

    return common_company_ids

get_company_details

get_company_details(company_id)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_company_details(company_id: uuid.UUID) -> CustomerDashboardCompany:  # noqa: D103
    company = get_or_raise_missing_resource(BeCompany, company_id)
    return _company_to_customer_dashboard_company(company=company)

get_invitation_details

get_invitation_details(invitation_id)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_invitation_details(  # noqa: D103
    invitation_id: uuid.UUID,
) -> CustomerDashboardCompanyAdminInvitationWithCompanies:
    invitations_query = current_session.query(BeCompanyAdminInvitation).filter(  # noqa: ALN085
        BeCompanyAdminInvitation.id == invitation_id,
        BeCompanyAdminInvitation.company_admin_id.is_(None),
    )
    invitation = invitations_query.first()
    if not invitation:
        raise BaseErrorCode.missing_resource(
            message=f"BeCompanyAdminInvitation '{invitation_id}' does not exist"
        )

    return CustomerDashboardCompanyAdminInvitationWithCompanies(
        id=str(invitation.id),
        invite_email=invitation.invite_email,  # type: ignore[arg-type]
        invitation_date=invitation.created_at.date(),
        responsibilities=[],
        companies=[_company_to_customer_dashboard_company(company=invitation.company)],
        type=AdminedEntityType.single_company,
    )

get_onboarded_employees

get_onboarded_employees(
    company_ids, cursor, limit, search=None
)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_onboarded_employees(  # noqa: D103
    company_ids: list[uuid.UUID],
    cursor: int,
    limit: int,
    search: str | None = None,
) -> list[Employee]:
    employees_data = company_logic.get_employees(
        company_ids=company_ids,
        filter_statuses=[EmployeeStatusFilter.onboarded],
        filter_keywords=[search] if search else None,
        cursor=cursor,
        limit=limit,
    )

    employees: list[Employee] = []
    for employment, _, _, _ in employees_data:
        if not employment:
            continue
        profile_service = ProfileService.create()
        profile = profile_service.user_compat.get_or_raise_profile_by_user_id(
            employment.user_id
        )
        employees.append(
            Employee(
                user_id=str(employment.user_id),
                # we do eager load employment.user so no impact on perf
                first_name=profile.first_name,
                last_name=profile.last_name,
                employment_id=employment.id,
                company_id=str(employment.company_id),
                invite_email=employment.invite_email,
                employee_type=EmployeeType.insured,
                external_employee_id=employment.payroll_id,
            )
        )

    return employees

get_onboarded_employees_for_account

get_onboarded_employees_for_account(
    account_id, cursor, limit, search=None
)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_onboarded_employees_for_account(  # noqa: D103
    account_id: uuid.UUID,
    cursor: int,
    limit: int,
    search: str | None = None,
) -> list[Employee]:
    company_ids = current_session.scalars(
        select(BeCompany.id).filter(BeCompany.account_id == account_id)
    ).all()

    return get_onboarded_employees(
        company_ids=[company_id for company_id in company_ids],
        search=search,
        cursor=cursor,
        limit=limit,
    )

get_pending_admin_invitations_for_companies

get_pending_admin_invitations_for_companies(
    company_ids,
    sort_filter,
    sort_direction,
    cursor=None,
    limit=None,
)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_pending_admin_invitations_for_companies(  # noqa: D103
    company_ids: Iterable[uuid.UUID],
    sort_filter: CustomerDashboardCompanyAdminInvitationsSortField,
    sort_direction: PaginationSortDirectionType,
    cursor: int | None = None,
    limit: int | None = None,
) -> "PaginatedCustomerDashboardAdminInvitation":
    invitations_query = current_session.query(BeCompanyAdminInvitation).filter(  # noqa: ALN085
        BeCompanyAdminInvitation.company_id.in_(company_ids),
        BeCompanyAdminInvitation.company_admin_id.is_(None),
        BeCompanyAdminInvitation.cancelled_at.is_(None),
    )
    invitations_query = paginate_query(
        query=invitations_query,
        sort_filter=sort_filter,
        sort_direction=sort_direction,
        cursor=cursor,
        limit=limit,
        sort_model=BeCompanyAdminInvitation,
    )
    invitations_result = [
        CustomerDashboardAdminInvitationWithEntityIds(
            id=str(invitation.id),
            invite_email=invitation.invite_email,  # type: ignore[arg-type]
            invitation_date=invitation.created_at,
            responsibilities=[],
            type=AdminedEntityType.single_company,
            account_id=None,
            company_ids=[invitation.company_id],
        )
        for invitation in invitations_query
    ]

    return PaginatedCustomerDashboardAdminInvitation(
        pending_invitations=invitations_result,
        meta=PageInfoMeta(
            cursor=cursor,
            has_next=len(invitations_result) == limit if limit is not None else None,
        ),
    )

get_pending_onboardings_for_admined_entity_selector_be

get_pending_onboardings_for_admined_entity_selector_be(
    user_id,
)
Source code in components/be/public/global_customer_dashboard/admin.py
def get_pending_onboardings_for_admined_entity_selector_be(  # noqa: D103
    user_id: str,
) -> list[PendingOnboarding]:
    profile_service = ProfileService.create()
    profile = profile_service.user_compat.get_or_raise_profile_by_user_id(user_id)
    if not profile.email:
        return []  # user is not onboarded

    pending_company_onboardings = current_session.query(BeCompanyOnboarding).filter(  # noqa: ALN085
        or_(
            BeCompanyOnboarding.admin_user_id == user_id,
            BeCompanyOnboarding.admin_auth_email == profile.email,
        ),
        BeCompanyOnboarding.onboarding_complete.is_(False),
    )

    pending_onboardings = []
    for company_onboarding in pending_company_onboardings:
        pending_onboardings.append(
            PendingOnboarding(
                id=company_onboarding.id,
                display_name=company_onboarding.company_name,
                onboarding_url=company_onboarding.company_signup_url,
                type=PendingOnboardingType.be_company_onboarding,
                company_id=company_onboarding.company_id,  # type: ignore[arg-type]
            )
        )

    return pending_onboardings

promote_user_to_admin

promote_user_to_admin(
    company_ids, account_id, invited_user_id
)
Source code in components/be/public/global_customer_dashboard/admin.py
def promote_user_to_admin(  # noqa: D103
    company_ids: list[uuid.UUID],
    account_id: uuid.UUID | None,
    invited_user_id: uuid.UUID,
) -> None:
    if account_id:
        company_ids = current_session.scalars(  # type: ignore[assignment]
            select(BeCompany.id).filter(BeCompany.account_id == account_id)
        ).all()

    for company_id in company_ids:
        check_access_control_or_raise(company_id=company_id)

    if (
        is_employee_in_one_of_companies(
            user_id=invited_user_id,
            company_ids=company_ids,
            not_ended_on_date=utctoday(),
        )
        is False
    ):
        raise BaseErrorCode.forbidden(message="User must an employee of the company")

    for company_id in company_ids:
        company_logic.promote_user_to_admin(
            company_id=company_id, invited_user_id=invited_user_id
        )
    if account_id:
        add_account_admin(user_id=invited_user_id, account_id=account_id)

remove_admin

remove_admin(
    admin_user_id, target_admin_user_id, save=True
)
Source code in components/be/public/global_customer_dashboard/admin.py
def remove_admin(  # noqa: D103
    admin_user_id: uuid.UUID, target_admin_user_id: uuid.UUID, save: bool = True
) -> None:
    check_resource_exists_or_raise(BeUser, admin_user_id)
    check_resource_exists_or_raise(BeUser, target_admin_user_id)

    admin_account_ids = set(
        c.account_id
        for c in current_session.query(BeAccountAdmin)  # noqa: ALN085
        .with_for_update()
        .filter(
            BeAccountAdmin.user_id == admin_user_id,
            BeAccountAdmin.account_id.isnot(None),
            BeAccountAdmin.is_not_ended,
        )
        .with_entities(BeAccountAdmin.account_id)
        .all()
    )
    target_admin_account_ids = set(
        c.account_id
        for c in current_session.query(BeAccountAdmin)  # noqa: ALN085
        .with_for_update()
        .filter(
            BeAccountAdmin.user_id == target_admin_user_id,
            BeAccountAdmin.account_id.isnot(None),
            BeAccountAdmin.is_not_ended,
        )
        .with_entities(BeAccountAdmin.account_id)
        .all()
    )
    common_account_ids = admin_account_ids.intersection(target_admin_account_ids)

    # remove admin rights on accounts the current admin has access to:
    _remove_admin_from_accounts(
        admin_user_id=target_admin_user_id, removed_account_ids=common_account_ids
    )

    # remove admin rights on companies the current admin has access to:
    common_companies = get_common_companies_for_admins(
        admin_user_id=admin_user_id, target_admin_user_id=target_admin_user_id
    )
    common_company_ids = {uuid.UUID(company.id) for company in common_companies}
    _remove_admin_from_companies(
        admin_user_id=target_admin_user_id, removed_company_ids=common_company_ids
    )

    if save:
        current_session.commit()

remove_admin_invitation

remove_admin_invitation(invitation_id, save=True)
Source code in components/be/public/global_customer_dashboard/admin.py
def remove_admin_invitation(invitation_id: uuid.UUID, save: bool = True) -> None:  # noqa: D103
    company_logic.cancel_admin_invitation(invitation_id=invitation_id, commit=save)

send_admin_invitation_email

send_admin_invitation_email(invitation_id)
Source code in components/be/public/global_customer_dashboard/admin.py
def send_admin_invitation_email(invitation_id: uuid.UUID) -> None:  # noqa: D103
    company_logic.send_admin_invitation_email(invitation_id=invitation_id)

user_can_admin_entities

user_can_admin_entities(user_id, company_ids, account_ids)
Source code in components/be/public/global_customer_dashboard/admin.py
def user_can_admin_entities(  # noqa: D103
    user_id: uuid.UUID,
    company_ids: set[uuid.UUID],
    account_ids: set[uuid.UUID],
) -> bool:
    check_resource_exists_or_raise(BeUser, user_id)

    can_admin_companies = company_logic.user_can_admin_all(user_id, list(company_ids))
    can_admin_accounts = current_session.query(BeAccountAdmin).filter(  # noqa: ALN085
        BeAccountAdmin.user_id == user_id,
        BeAccountAdmin.account_id.in_(list(account_ids)),
        BeAccountAdmin.is_not_ended,
    ).count() == len(account_ids)

    return can_admin_companies and can_admin_accounts

customer_health_partner

get_company_ids_by_external_ids

get_company_ids_by_external_ids(external_ids)

Get company ids by external ids (VAT number)

:param external_ids: set[str] - The external ids to get the company ids for. :return: dict[str, str | None] - A dictionary mapping external ids to company ids.

Source code in components/be/public/global_customer_dashboard/customer_health_partner.py
def get_company_ids_by_external_ids(external_ids: set[str]) -> dict[str, str | None]:
    """
    Get company ids by external ids (VAT number)

    :param external_ids: set[str] - The external ids to get the company ids for.
    :return: dict[str, str | None] - A dictionary mapping external ids to company ids.
    """
    companies: list[BeCompany] = (
        current_session.query(BeCompany).filter(BeCompany.vat.in_(external_ids)).all()  # noqa: ALN085
    )
    vat_to_company_id: dict[str, str] = {
        company.vat: str(company.id) for company in companies
    }
    return {vat: vat_to_company_id.get(vat, None) for vat in external_ids}

get_company_name_from_companies

get_company_name_from_companies(company_ids)
Source code in components/be/public/global_customer_dashboard/customer_health_partner.py
def get_company_name_from_companies(company_ids: set[UUID]) -> str:  # noqa: D103
    companies = (
        current_session.query(BeCompany).filter(BeCompany.id.in_(company_ids)).all()  # noqa: ALN085
    )
    if len(companies) == 0:
        raise ValueError("No company found for company ids")
    for company in companies:
        if company.account:
            return company.account.name
    return companies[0].name

get_company_names_by_ids

get_company_names_by_ids(*, company_ids)
Source code in components/be/public/global_customer_dashboard/customer_health_partner.py
def get_company_names_by_ids(*, company_ids: set[str]) -> dict[str, str]:  # noqa: D103
    companies = (
        current_session.query(BeCompany).filter(BeCompany.id.in_(company_ids)).all()  # noqa: ALN085
    )
    return {str(company.id): company.display_name for company in companies}

get_user_email_from_id

get_user_email_from_id(*, user_id)
Source code in components/be/public/global_customer_dashboard/customer_health_partner.py
def get_user_email_from_id(*, user_id: str) -> str | None:  # noqa: D103
    profile_service = ProfileService.create()
    profile = profile_service.user_compat.get_profile_by_user_id(user_id)

    return profile.email if profile else None

get_user_info_from_id

get_user_info_from_id(*, user_id)
Source code in components/be/public/global_customer_dashboard/customer_health_partner.py
def get_user_info_from_id(  # noqa: D103
    *,
    user_id: str,
) -> WellbeingAssessmentDisplayUserInfo:
    profile_service = ProfileService.create()
    profile = profile_service.user_compat.get_or_raise_profile_by_user_id(user_id)

    if profile.email is None:
        raise ValueError(f"User {user_id} has no email")

    return WellbeingAssessmentDisplayUserInfo(
        id=user_id,
        full_name=profile.full_name,
        name=profile.full_name,
        email=profile.email,
        normalized_full_name=profile.normalized_full_name,
        pro_email=profile.email,
    )

get_wellbeing_assessment_feature_summary

get_wellbeing_assessment_feature_summary(account_id)
Source code in components/be/public/global_customer_dashboard/customer_health_partner.py
def get_wellbeing_assessment_feature_summary(account_id: str):  # type: ignore[no-untyped-def]  # noqa: D103
    account = get_or_raise_missing_resource(BeAccount, account_id)

    return get_wellbeing_assessment_feature_summary_logic(
        company_ids=[str(c.id) for c in account.companies], can_create_assessment=True
    )

validate_company_external_id

validate_company_external_id(external_id)

Validate a Belgian VAT number. The VAT number should start with 'BE' followed by 10 digits

:param external_id: str - The VAT number to validate. :return: bool - True if the VAT number is valid, False otherwise.

Source code in components/be/public/global_customer_dashboard/customer_health_partner.py
def validate_company_external_id(external_id: str) -> bool:
    """
    Validate a Belgian VAT number.
    The VAT number should start with 'BE' followed by 10 digits

    :param external_id: str - The VAT number to validate.
    :return: bool - True if the VAT number is valid, False otherwise.
    """
    vat_regex = r"^BE\d{10}$"
    return re2.search(vat_regex, external_id) is not None

components.be.public.health_contract

does_health_contract_have_affiliation_delay

does_health_contract_have_affiliation_delay(
    health_contract_id, on_date
)

Returns whether the health contract has an affiliation delay on a given date.

Source code in components/be/public/health_contract.py
def does_health_contract_have_affiliation_delay(
    health_contract_id: uuid.UUID,
    on_date: date,
) -> bool:
    """
    Returns whether the health contract has an affiliation delay on a given date.
    """
    return get_health_contract_version_on(
        health_contract_id=health_contract_id, on_date=on_date
    ).has_dependent_affiliation_delay

get_health_contract_current_or_next_or_last_health_plan_id

get_health_contract_current_or_next_or_last_health_plan_id(
    health_contract_id, on_date
)

Returns the health plan ID of a health contract on a given date.

Source code in components/be/public/health_contract.py
def get_health_contract_current_or_next_or_last_health_plan_id(
    health_contract_id: uuid.UUID, on_date: date
) -> uuid.UUID:
    """
    Returns the health plan ID of a health contract on a given date.
    """
    return get_current_or_next_or_last_health_contract_version_on(
        health_contract_id=health_contract_id, on_date=on_date
    ).health_plan_id

get_health_contract_participation_scheme

get_health_contract_participation_scheme(
    health_contract_id, on_date
)

Returns the participation scheme of a health contract on a given date.

Source code in components/be/public/health_contract.py
def get_health_contract_participation_scheme(
    health_contract_id: uuid.UUID, on_date: date
) -> ParticipationScheme:
    """
    Returns the participation scheme of a health contract on a given date.
    """
    return get_health_contract_version_on(
        health_contract_id=health_contract_id, on_date=on_date
    ).participation_json

get_health_contract_start_date

get_health_contract_start_date(health_contract_id)

Returns the contract start date.

Source code in components/be/public/health_contract.py
def get_health_contract_start_date(
    health_contract_id: uuid.UUID,
) -> date:
    """
    Returns the contract start date.
    """
    health_contract = get_or_raise_missing_resource(
        BeHealthContract, health_contract_id
    )
    return health_contract.start_date

get_primary_price_for_individual_contract_on_renewal

get_primary_price_for_individual_contract_on_renewal(
    health_contract_id, on_date, new_health_plan_id
)

Calculate the primary price for an individual health contract renewal.

This function retrieves the age of the primary and computes the price given the new health plan's price rules on the default coverage.

Source code in components/be/public/health_contract.py
def get_primary_price_for_individual_contract_on_renewal(
    health_contract_id: uuid.UUID,
    on_date: date,
    new_health_plan_id: uuid.UUID,
) -> int:
    """
    Calculate the primary price for an individual health contract renewal.

    This function retrieves the age of the primary and computes the price given the new health plan's price rules
    on the default coverage.
    """
    from components.be.internal.health_plan.queries.price_rule import get_price_rules
    from components.be.internal.models.be_user import BeUser

    health_contract = get_or_raise_missing_resource(
        BeHealthContract, health_contract_id
    )
    individual: BeUser = mandatory(health_contract.individual)
    member_age = age_on(mandatory(individual.birth_date), Month(on_date).last_day)
    coverage_module_name = get_default_coverage_module_name(
        health_plan_id=new_health_plan_id,
        enrollment_type=EnrollmentType.primary,
    )
    price_rules = get_price_rules(
        health_plan_id=new_health_plan_id,
        coverage_module_name=coverage_module_name,
        member_age=member_age,
    )
    if len(price_rules) != 1:
        raise ValueError("Unable to find one rule for member")

    return price_rules[0].total_premium_cents

get_primary_price_increase_percentage_on_renewal

get_primary_price_increase_percentage_on_renewal(
    health_contract_id, on_date, new_health_plan_id
)

Calculate the percentage increase in the primary price for a health contract renewal.

This function compares the current health plan's primary price with the new health plan's primary price on the default coverage. It determines if there's a price increase and, if so, calculates the percentage of that increase. Returns None if: - There is no price increase or there is a decrease - The contract is not active on the given date

Source code in components/be/public/health_contract.py
def get_primary_price_increase_percentage_on_renewal(
    health_contract_id: uuid.UUID, on_date: date, new_health_plan_id: uuid.UUID
) -> Decimal | None:
    """
    Calculate the percentage increase in the primary price for a health contract renewal.

    This function compares the current health plan's primary price with the new health plan's primary price
    on the default coverage. It determines if there's a price increase and, if so, calculates the percentage
    of that increase. Returns None if:
      - There is no price increase or there is a decrease
      - The contract is not active on the given date
    """
    return _get_primary_price_increase_percentage_on_renewal(
        health_contract_id=health_contract_id,
        on_date=on_date,
        new_health_plan_id=new_health_plan_id,
    )

components.be.public.health_plan

does_health_plan_default_coverage_have_daily_care

does_health_plan_default_coverage_have_daily_care(
    health_plan_id,
)
Source code in components/be/public/health_plan.py
def does_health_plan_default_coverage_have_daily_care(  # noqa: D103
    health_plan_id: UUID,
) -> bool:
    default_included_coverages = get_health_plan_default_included_coverages(
        health_plan_id=health_plan_id
    )

    return any(
        coverage_type == CoverageType.DAILY_CARE
        for coverage_type in default_included_coverages
    )

does_health_plan_default_coverage_have_hospi

does_health_plan_default_coverage_have_hospi(
    health_plan_id,
)
Source code in components/be/public/health_plan.py
def does_health_plan_default_coverage_have_hospi(  # noqa: D103
    health_plan_id: UUID,
) -> bool:
    default_included_coverages = get_health_plan_default_included_coverages(
        health_plan_id=health_plan_id
    )

    return any(
        coverage_type == CoverageType.HOSPITALIZATION
        for coverage_type in default_included_coverages
    )

get_default_coverage_module_name

get_default_coverage_module_name(
    health_plan_id, enrollment_type
)

Returns the name of the default coverage module for the given health plan and enrollment type.

Source code in components/be/public/health_plan.py
def get_default_coverage_module_name(
    health_plan_id: UUID,
    enrollment_type: EnrollmentType,
) -> CoverageModuleName:
    """
    Returns the name of the default coverage module for the given health plan and enrollment type.
    """
    return get_default_coverage_module(
        health_plan_id=health_plan_id, enrollment_type=enrollment_type
    ).name

get_health_plan_all_member_prices_in_cents

get_health_plan_all_member_prices_in_cents(
    health_plan_id, participation_scheme=None
)

Returns the default price for primary, partner, child and adult child enrollment types. If a participation scheme is provided, it calculates the price paid by the member.

Source code in components/be/public/health_plan.py
def get_health_plan_all_member_prices_in_cents(
    health_plan_id: UUID,
    participation_scheme: ParticipationScheme | None = None,
) -> HealthPlanMemberPrices:
    """
    Returns the default price for primary, partner, child and adult child enrollment types. If a participation scheme is
    provided, it calculates the price paid by the member.
    """
    return _get_health_plan_all_member_prices_in_cents(
        health_plan_id, participation_scheme
    )

get_health_plan_default_primary_prices

get_health_plan_default_primary_prices(health_plan_id)
Source code in components/be/public/health_plan.py
def get_health_plan_default_primary_prices(  # noqa: D103
    health_plan_id: UUID,
) -> list[PriceRuleDetails]:
    return get_health_plan_default_prices_in_cents(
        health_plan_id=health_plan_id, enrollment_type=EnrollmentType.primary
    )

get_health_plan_single_room_contribution_in_cents

get_health_plan_single_room_contribution_in_cents(
    health_plan_id,
)

Returns the personal contribution in cents for a single room hospitalization.

Source code in components/be/public/health_plan.py
def get_health_plan_single_room_contribution_in_cents(
    health_plan_id: UUID,
) -> int:
    """
    Returns the personal contribution in cents for a single room hospitalization.
    """
    health_plan = get_or_raise_missing_resource(BeHealthPlan, health_plan_id)
    return health_plan.hospitalization_single_room_personal_contribution_in_cents

get_health_plans_with_modules_by_ids

get_health_plans_with_modules_by_ids(health_plan_ids)

Returns the health plans with their coverage modules by their IDs.

Source code in components/be/public/health_plan.py
def get_health_plans_with_modules_by_ids(
    health_plan_ids: list[UUID],
) -> list[HealthPlanWithModules]:
    """
    Returns the health plans with their coverage modules by their IDs.
    """
    return _get_health_plans_with_modules_by_ids(health_plan_ids=health_plan_ids)

get_primary_default_coverage_price

get_primary_default_coverage_price(
    health_plan_id, min_age=CHILD_AGE_LIMIT_YEARS
)
Source code in components/be/public/health_plan.py
def get_primary_default_coverage_price(  # noqa: D103
    health_plan_id: UUID, min_age: int = CHILD_AGE_LIMIT_YEARS
) -> int:
    return get_health_plan_default_price_in_cents(
        health_plan_id=health_plan_id,
        enrollment_type=EnrollmentType.primary,
        min_age=min_age,
    )

components.be.public.helpers

front_end

FRONT_END_PATHS module-attribute

FRONT_END_PATHS = dict(
    APP_URL="/dashboard",
    DEPENDENT_INVITE_URL="/password_creation",
    LOGIN_URL="/login",
    MARMOT_URL="/marmot",
    MARMOT_CLAIM_MANAGEMENT="/marmot/claim_management",
    MARMOT_ACCOUNT_URL="/TODO-TODO-TODO",
    MARMOT_USER_URL="/marmot/user/",
    MARMOT_COMPANY_URL="/marmot/company/",
    PASSWORD_RESET_BASE_URL="/password_reset",
    UNSUBSCRIBE_URL="/unsubscribe",
    CUSTOMER_ADMIN_ONBOARDING_INVITE_URL="/onboarding/customer-admin-onboarding",
)

front_end_url module-attribute

front_end_url = FrontendURL(paths=FRONT_END_PATHS)

init_data_loader

init_data_loader

init_data_loader()

Create the base data to be populated in fresh local DBs.

Executed by "flask data init".

Please edit the README.md file in components/be when editing this file.

Source code in components/be/public/helpers/init_data_loader.py
def init_data_loader() -> None:
    """
    Create the base data to be populated in fresh local DBs.

    Executed by "flask data init".

    Please edit the README.md file in components/be when editing this file.
    """
    from datetime import date

    from components.be.internal.claim_management.business_logic.actions.create_claim_management_user import (
        create_users,
    )
    from components.be.internal.health_contract.actions.add_health_contract_versions import (
        add_health_contract_version_and_enrollments,
    )
    from components.be.internal.health_plan.actions.create_loterie_nationale_modular_plan import (
        create_loterie_nationale_panda_modular,
    )
    from components.be.internal.health_plan.actions.create_loterie_nationale_retiree_modular_plan import (
        create_loterie_nationale_retiree_panda_modular,
    )
    from components.be.internal.health_plan.actions.create_plans import (
        create_default_plans,
    )
    from components.be.internal.models.be_user import BeUser
    from components.be.internal.models.coverage_module import CoverageModuleName
    from components.be.internal.models.tests.factories import (
        BeCompanyFactory,
        BeUserFactory,
    )
    from components.be.internal.models.tests.use_case_factories.active_health_contract_factory import (
        create_active_health_contract,
    )
    from components.be.internal.models.tests.use_case_factories.enrolled_flexben_user_factory import (
        create_enrolled_flexben_employee,
    )
    from components.be.internal.models.tests.use_case_factories.enrolled_user_factory import (
        create_enrolled_employee,
        create_enrolled_individual,
    )
    from components.global_profile.public.tests.init_profile_for_users import (
        create_profile_for_users,
    )

    create_default_plans()
    create_users()

    # employee on standard employee contract
    company = BeCompanyFactory.create(
        name="Valmy Corp",
        display_name="Valmy",
    )
    enrollment = create_enrolled_employee(
        start_date=date(2021, 1, 1),
        user_id=BeUserFactory.create(
            email=get_prefixed_recipient_with_aws_user("+employee@alan.eu"),
        ).id,
        company_id=company.id,
    )

    # amend the employee contract
    add_health_contract_version_and_enrollments(
        health_contract=enrollment.policy.health_contract,
        new_start_date=date(2022, 1, 1),
        participation=80,
    )

    # employee on modular employee contract
    modular_company = BeCompanyFactory.create(
        name="Modular Valmy Corp",
        display_name="Modular Valmy",
    )
    modular_plan = create_loterie_nationale_panda_modular()
    create_enrolled_employee(
        start_date=date(2021, 2, 1),
        coverage_module_name=CoverageModuleName.primary_default,
        user_id=BeUserFactory.create(
            email=get_prefixed_recipient_with_aws_user("+employee-modular@alan.eu"),
        ).id,
        company_id=modular_company.id,
        health_plan_id=modular_plan.id,
    )

    # plan with new pricing rules
    create_plan_with_three_price_rules()

    # covered individual on individual contract
    create_enrolled_individual(
        start_date=date(2022, 2, 1),
        user_id=BeUserFactory.create(
            email=get_prefixed_recipient_with_aws_user("+indi@alan.eu"),
        ).id,
    )

    # add retiree contract to the company created above
    retiree_plan = create_loterie_nationale_retiree_panda_modular()
    retiree_health_contract, _ = create_active_health_contract(
        start_date=date(2021, 1, 1),
        health_plan=retiree_plan,
        company=company,
    )

    # add covered retiree on the retiree contract
    create_enrolled_individual(
        start_date=date(2022, 2, 1),
        user_id=BeUserFactory.create(
            email=get_prefixed_recipient_with_aws_user("+retiree@alan.eu"),
        ).id,
        parent_health_contract_id=retiree_health_contract.id,
        health_plan_id=retiree_plan.id,
    )

    # employees on a flexben plan
    create_enrolled_flexben_employee(
        email=get_prefixed_recipient_with_aws_user("+employee-flexben@alan.eu"),
        with_default_processed_flexben_choice=False,
    )
    create_enrolled_flexben_employee(
        email=get_prefixed_recipient_with_aws_user(
            "+employee-flexben-withchoice@alan.eu"
        ),
        company_name="Flexben Valmy",
        with_default_processed_flexben_choice=True,
    )

    create_profile_for_users(user_cls=BeUser, commit=True)

templating

configure_templating

configure_templating(jinja_env)

Inspired from https://github.com/alan-eu/alan-apps/blob/main/backend/components/fr/internal/helpers/templating.py#L46 ⧉

Source code in components/be/public/helpers/templating.py
def configure_templating(jinja_env: Environment) -> None:
    """
    Inspired from https://github.com/alan-eu/alan-apps/blob/main/backend/components/fr/internal/helpers/templating.py#L46
    """
    jinja_env.filters.update(
        {
            "french_date": french_date_format,
            "dutch_date": dutch_date_format,
            "amount_with_currency": amount_with_currency,
        }
    )

components.be.public.scim_api

adapter

BeScimAdapter

BeScimAdapter()

Bases: GenericScimAdapter

SCIM adapter for be_api

Source code in components/be/public/scim_api/adapter.py
def __init__(self) -> None:
    super().__init__()
    self.profile_service = ProfileService.create(app_name=AppName.ALAN_BE)
create_app_user
create_app_user(first_name, last_name, email)

Create a user with the given first and last name. and returns the user ID.

Source code in components/be/public/scim_api/adapter.py
@override
def create_app_user(
    self, first_name: str, last_name: str, email: str
) -> int | uuid.UUID:
    """
    Create a user with the given first and last name. and returns the user ID.
    """
    user = create_profile_with_user(first_name=first_name, last_name=last_name)
    return user.id
get_scim_users_data
get_scim_users_data(alan_employees)

Returns the first and last name of users from a list of AlanEmployee objects.

Source code in components/be/public/scim_api/adapter.py
@override
def get_scim_users_data(
    self,
    alan_employees: list[BeAlanEmployee],  # type: ignore[override]
) -> dict[int | uuid.UUID, AlanEmployeeIdentity]:
    """
    Returns the first and last name of users from a list of AlanEmployee objects.
    """
    user_profiles = self.profile_service.get_profiles(
        profile_ids={
            alan_employee.user.profile_id for alan_employee in alan_employees
        }
    )
    user_profiles_dict = {
        user_profile.id: user_profile for user_profile in user_profiles
    }

    return {
        alan_employee.user_id: AlanEmployeeIdentity(
            first_name=user_profiles_dict[alan_employee.user.profile_id].first_name,
            last_name=user_profiles_dict[alan_employee.user.profile_id].last_name,
        )
        for alan_employee in alan_employees
        if alan_employee.user.profile_id in user_profiles_dict
    }
get_user_data
get_user_data(user_id)

Returns user's first and last name by user_id.

Source code in components/be/public/scim_api/adapter.py
@override
def get_user_data(self, user_id: int | uuid.UUID) -> AlanEmployeeIdentity:
    """
    Returns user's first and last name by user_id.
    """
    if not isinstance(user_id, uuid.UUID):
        raise TypeError("User ID must be a UUID")

    user = current_session.query(BeUser).where(BeUser.id == user_id).one_or_none()  # noqa: ALN085
    if user is None:
        raise BaseErrorCode.missing_resource(f"User with ID {user_id} not found")
    user_profile = self.profile_service.get_or_raise_profile(
        profile_id=user.profile_id
    )

    return AlanEmployeeIdentity(
        first_name=user_profile.first_name, last_name=user_profile.last_name
    )
profile_service instance-attribute
profile_service = create(app_name=ALAN_BE)

test

test_adapter

adapter
adapter()

Fixture for the EsGenericScimAdapter instance.

Source code in components/be/public/scim_api/test/test_adapter.py
@pytest.fixture
def adapter() -> BeScimAdapter:
    """Fixture for the EsGenericScimAdapter instance."""
    return BeScimAdapter()
profile_service
profile_service()

Fixture for the profile service.

Source code in components/be/public/scim_api/test/test_adapter.py
@pytest.fixture
def profile_service() -> ProfileService:
    """Fixture for the profile service."""
    return ProfileService.create(app_name=AppName.ALAN_BE)
test_create_app_user
test_create_app_user(adapter, profile_service)

Test create_app_user creates a new user correctly.

Source code in components/be/public/scim_api/test/test_adapter.py
@pytest.mark.usefixtures("db")
def test_create_app_user(adapter, profile_service):
    """Test create_app_user creates a new user correctly."""
    user_id = adapter.create_app_user(
        first_name="John", last_name="Doe", email="john.doe@alan.eu"
    )

    created_user = (
        current_session.query(BeUser).filter(BeUser.id == user_id).one_or_none()  # noqa: ALN085
    )

    assert created_user is not None
    created_profile = profile_service.get_profile(profile_id=created_user.profile_id)
    assert created_profile is not None
    assert created_profile.first_name == "John"
    assert created_profile.last_name == "Doe"
test_get_scim_users_data
test_get_scim_users_data(adapter)

Test get_scim_users_data returns correct mapping of user data.

Source code in components/be/public/scim_api/test/test_adapter.py
@pytest.mark.usefixtures("db")
def test_get_scim_users_data(adapter):
    """Test get_scim_users_data returns correct mapping of user data."""
    # Create test data
    user1, user2, employee1, employee2 = _provision_test_data()

    result = adapter.get_scim_users_data([employee1, employee2])
    assert len(result) == 2
    assert result[user1.id].first_name == "John"
    assert result[user1.id].last_name == "Doe"
    assert result[user2.id].first_name == "Jane"
    assert result[user2.id].last_name == "Smith"
test_get_user_data
test_get_user_data(adapter)

Test get_user_data returns correct user identity.

Source code in components/be/public/scim_api/test/test_adapter.py
@pytest.mark.usefixtures("db")
def test_get_user_data(adapter):
    """Test get_user_data returns correct user identity."""
    # Create test data
    user1, _, _, _ = _provision_test_data()

    # Test with UUID
    result = adapter.get_user_data(user_id=user1.id)
    assert result.first_name == "John"
    assert result.last_name == "Doe"

    # Test with non-UUID
    with pytest.raises(TypeError):
        adapter.get_user_data(user_id=123)

    # Test with non-existent user
    with pytest.raises(BaseErrorCode):
        adapter.get_user_data(user_id=uuid.uuid4())

components.be.public.services

push_notifications

P module-attribute

P = ParamSpec('P')

get_push_notification_logs_for_user

get_push_notification_logs_for_user(
    user_id, notification_names, created_at__gte=None
)

Return a list of all the push notification logs ever created for the given user and notification names.

Source code in components/be/public/services/push_notifications.py
def get_push_notification_logs_for_user(
    user_id: uuid.UUID,
    notification_names: list[BasePushNotificationName],
    created_at__gte: datetime | None = None,
) -> list[InMemoryPushNotificationLog]:
    """
    Return a list of all the push notification logs ever created for the given user and notification names.
    """
    return [
        InMemoryPushNotificationLog.from_model(notification_log=item)
        for item in current_session.query(BePushNotificationLog).filter(  # noqa: ALN085
            BePushNotificationLog.user_id == user_id,
            BePushNotificationLog.name.in_(notification_names),
            BePushNotificationLog.created_at >= created_at__gte  # type: ignore[arg-type]
            if created_at__gte
            else True,
        )
    ]

get_push_notification_tokens_for_user

get_push_notification_tokens_for_user(user_id)
Source code in components/be/public/services/push_notifications.py
def get_push_notification_tokens_for_user(user_id: uuid.UUID) -> list[str]:  # noqa: D103
    from sqlalchemy import or_

    from components.be.internal.models.be_push_notification_token import (
        BePushNotificationToken,
    )

    push_notification_tokens: list[BePushNotificationToken] = (
        current_session.query(BePushNotificationToken)  # noqa: ALN085
        .filter(
            or_(
                BePushNotificationToken.has_permission.is_(True),
                BePushNotificationToken.has_permission.is_(
                    None
                ),  # has_permission was added later and we could not backfill
            ),
            BePushNotificationToken.user_id == user_id,
        )
        .all()
    )

    return [
        push_notification_token.registration_token
        for push_notification_token in push_notification_tokens
    ]

push_notification_sender_async

push_notification_sender_async(sender)
Source code in components/be/public/services/push_notifications.py
def push_notification_sender_async(  # noqa: D103
    sender: Callable[P, PushNotificationParams | None],
) -> Callable[P, None]:
    @wraps(sender)
    def decorated_function(*args, **kwargs) -> None:  # type: ignore[no-untyped-def]
        pn_params: PushNotificationParams | None = sender(*args, **kwargs)

        if pn_params is None:
            return

        be_push_notification_logic.send_push_notification_async(
            notification_params=pn_params,
            commit=pn_params.commit,
        )

    return decorated_function

push_notification_sender_sync

push_notification_sender_sync(sender)
Source code in components/be/public/services/push_notifications.py
def push_notification_sender_sync(  # noqa: D103
    sender: Callable[P, PushNotificationParams | None],
) -> Callable[P, None]:
    @wraps(sender)
    def decorated_function(*args, **kwargs) -> None:  # type: ignore[no-untyped-def]
        pn_params: PushNotificationParams | None = sender(*args, **kwargs)

        if pn_params is None:
            return

        be_push_notification_logic.send_push_notification_sync(
            notification_params=pn_params,
            delete_token=delete_token,
            commit=pn_params.commit,
        )

    return decorated_function

components.be.public.test_data_generator

get_test_data_generation_config

get_test_data_generation_config()
Source code in components/be/internal/admin_tools/test_data_generator/test_data_generation_config.py
def get_test_data_generation_config() -> TestDataGeneratorConfig:
    return TestDataGeneratorConfig(
        patched_factories=patched_factories,
        handlers=get_fixture_handlers(),
        async_queue=LOW_PRIORITY_QUEUE,
    )

components.be.public.user

find_company_ids_wher_user_has_employment

find_company_ids_wher_user_has_employment(user_id)
Source code in components/be/public/user.py
def find_company_ids_wher_user_has_employment(user_id: uuid.UUID) -> list[uuid.UUID]:  # noqa: D103
    from components.be.internal.company.queries.employment import (
        get_user_employments,
    )

    employments = get_user_employments(user_id=user_id)

    return [employment.company_id for employment in employments]

find_company_ids_wher_user_is_admin

find_company_ids_wher_user_is_admin(user_id)
Source code in components/be/public/user.py
def find_company_ids_wher_user_is_admin(user_id: uuid.UUID) -> list[uuid.UUID]:  # noqa: D103
    from components.be.internal.company.queries.company_admin import (
        get_user_company_admins,
    )

    company_admins = get_user_company_admins(user_id=user_id)
    return [company_admin.company_id for company_admin in company_admins]  # type: ignore[misc]

find_user_id_by_email

find_user_id_by_email(email)
Source code in components/be/public/user.py
def find_user_id_by_email(email: str) -> uuid.UUID | None:  # noqa: D103
    from components.be.internal.user.queries.user import (
        get_user_by_email,
    )

    user = get_user_by_email(email=email)
    return user.id if user else None
get_last_itsme_link_for_clinic(user_id)

Get the last itsme user link for clinic context

Source code in components/be/public/user.py
def get_last_itsme_link_for_clinic(user_id: uuid.UUID) -> BeItsmeUserLink | None:
    """Get the last itsme user link for clinic context"""
    return (
        current_session.query(BeItsmeUserLink)  # noqa: ALN085
        .filter(
            BeItsmeUserLink.alan_user_id == user_id,
            BeItsmeUserLink.context
            == ItsmeAuthenticationContext.clinic_id_authentication,
        )
        .order_by(BeItsmeUserLink.created_at.desc())
        .first()
    )
get_last_itsme_link_for_self_serve_signatory(user_id)

Get the last itsme user link for self serve signatory context

Source code in components/be/public/user.py
def get_last_itsme_link_for_self_serve_signatory(
    user_id: uuid.UUID,
) -> BeItsmeUserLink | None:
    """Get the last itsme user link for self serve signatory context"""
    return (
        current_session.query(BeItsmeUserLink)  # noqa: ALN085
        .filter(
            BeItsmeUserLink.alan_user_id == user_id,
            BeItsmeUserLink.context
            == ItsmeAuthenticationContext.self_serve_signatory_id_authentication,
        )
        .order_by(BeItsmeUserLink.created_at.desc())
        .first()
    )

get_user

get_user(user_id, options=None)
Source code in components/be/public/user.py
def get_user(user_id: uuid.UUID, options: list | None = None) -> BeUser:  # type: ignore[type-arg]  # noqa: D103
    return get_or_raise_missing_resource(
        BeUser,
        user_id,
        options=options,
    )

get_user_active_health_contract_and_plan_ids

get_user_active_health_contract_and_plan_ids(
    user_id, on_date
)

Retrieves the IDs of the health contract and plan that the user is currently enrolled in. Returns: A tuple of (health_contract_id, health_plan_id) if user has an active enrollment, None otherwise

Source code in components/be/public/user.py
def get_user_active_health_contract_and_plan_ids(
    user_id: uuid.UUID, on_date: date
) -> tuple[uuid.UUID, uuid.UUID] | None:
    """
    Retrieves the IDs of the health contract and plan that the user is currently enrolled in.
    Returns:
        A tuple of (health_contract_id, health_plan_id) if user has an active enrollment, None otherwise
    """
    return _get_user_active_health_contract_and_plan_ids(
        user_id=user_id, on_date=on_date
    )

get_user_capabilities

get_user_capabilities(user_id)
Source code in components/be/public/user.py
def get_user_capabilities(  # noqa: D103
    user_id: uuid.UUID,
) -> Capabilities:
    return get_capabilities(user_id=user_id)

get_user_email

get_user_email(user_id)
Source code in components/be/public/user.py
def get_user_email(user_id: uuid.UUID) -> str | None:  # noqa: D103
    user = get_or_raise_missing_resource(BeUser, user_id)
    return user.email

get_user_for_clinic

get_user_for_clinic(user_id)

Get user data for clinic

Parameters:

Name Type Description Default
user_id UUID

user id

required

Returns:

Type Description
BeUser

Belgium user data

Source code in components/be/public/user.py
def get_user_for_clinic(user_id: uuid.UUID) -> BeUser:
    """Get user data for clinic

    Args:
        user_id: user id

    Returns:
        Belgium user data
    """
    return get_user(
        user_id,
        options=[
            selectinload(BeUser.consent),
            selectinload(BeUser.be_profile),
            selectinload(BeUser.address),
        ],
    )

get_user_from_email

get_user_from_email(email)
Source code in components/be/public/user.py
def get_user_from_email(email: str) -> BeUser | None:  # noqa: D103
    from components.be.internal.user.queries.user import (
        get_user_by_email,
    )

    return get_user_by_email(email=email)

get_user_full_name

get_user_full_name(user_id)
Source code in components/be/public/user.py
def get_user_full_name(user_id: uuid.UUID) -> str:  # noqa: D103
    user = get_or_raise_missing_resource(BeUser, user_id)
    return user.full_name

get_visible_dependents

get_visible_dependents(user_id)
Source code in components/be/public/user.py
def get_visible_dependents(user_id: uuid.UUID):  # type: ignore[no-untyped-def]  # noqa: D103
    from components.be.internal.core_insurance.enrollment.queries.enrollments import (
        get_active_enrollments_visible_to_user,
    )

    return get_active_enrollments_visible_to_user(user_id=user_id)

is_user

is_user(user_id)
Source code in components/be/public/user.py
def is_user(user_id: uuid.UUID) -> bool:  # noqa: D103
    return get_resource_or_none(BeUser, user_id) is not None

update_user_phone

update_user_phone(profile_service, user_id, phone)
Source code in components/be/public/user.py
@inject_profile_service
def update_user_phone(  # noqa: D103
    profile_service: ProfileService, user_id: uuid.UUID, phone: str | None
) -> None:
    user = get_or_raise_missing_resource(BeUser, user_id)

    profile_service.change_phone_number(profile_id=user.profile_id, phone_number=phone)
    current_session.commit()

user_can_write

user_can_write(user_id, target_user_id)
Source code in components/be/public/user.py
def user_can_write(user_id: uuid.UUID, target_user_id: uuid.UUID) -> bool:  # noqa: D103
    from components.be.internal.affiliation.dependent.dependent import (
        dependent_logic,
    )

    if user_id == target_user_id:
        return True

    return dependent_logic.is_user_able_to_modify_user(
        current_user_id=user_id, other_user_id=target_user_id
    )

user_has_live_enrollment_since

user_has_live_enrollment_since(user_id, since)

Checks that a user has had a live enrollment in the time between the "since" date and now

Source code in components/be/public/user.py
def user_has_live_enrollment_since(user_id: uuid.UUID, since: date) -> bool:
    """Checks that a user has had a live enrollment in the time between the "since" date and now"""
    user_with_live_enrollment_count = (
        current_session.query(BeUser)  # noqa: ALN085
        .options(
            joinedload(BeUser.insurance_profile).joinedload(
                BeInsuranceProfile.enrollments
            )
        )
        .filter(
            BeUser.id == user_id, BeEnrollment.is_ever_active_between(since, utctoday())
        )
        .count()
    )

    return user_with_live_enrollment_count > 0