Skip to content

Reference

components.authentication.bootstrap

blueprint

get_all_blueprints

get_all_blueprints()
Source code in components/authentication/bootstrap/blueprint.py
def get_all_blueprints() -> list[BlueprintBootstrap]:
    # for now auth blueprints are still generated by shared/helpers/country_custom_flask.py

    from components.authentication.internal.controllers.blueprint import (
        auth_api_blueprint,
    )

    return [
        BlueprintBootstrap(
            blueprint=auth_api_blueprint,
            url_prefix="/api/auth",
        )
    ]

bootstrap

authentication_bootstrap module-attribute

authentication_bootstrap = ComponentBootstrap(
    name="authentication",
    load_all_models=load_all_models,
    get_all_blueprints=get_all_blueprints,
    get_all_command_groups=get_all_command_groups,
    get_all_schemas=get_all_schemas,
    get_flask_admin_configuration=get_flask_admin_configuration,
    subscribe_to_events=subscribe_to_events,
)

command

get_all_command_groups

get_all_command_groups()
Source code in components/authentication/bootstrap/command.py
def get_all_command_groups() -> list[AppGroup]:
    import components.authentication.internal.commands.compromised_accounts
    from components.authentication.internal.commands.app_group import (
        authentication_commands,
    )

    # commands above are not available in apps that do not load all models from all components
    if get_current_app_name() in (AppName.ALAN_FR, AppName.ALAN_BE, AppName.ALAN_ES):
        import components.authentication.internal.commands.fix_email_consistency

    if get_current_app_name() == AppName.ALAN_FR:
        import components.authentication.internal.commands.backfill_authentication_identity

    import components.authentication.internal.commands.fix_inconsistencies  # noqa: F401

    return [authentication_commands]

flask_admin_configuration

AUTHENTICATION_CATEGORY module-attribute

AUTHENTICATION_CATEGORY = 'Authentication'

get_flask_admin_configuration

get_flask_admin_configuration()
Source code in components/authentication/bootstrap/flask_admin_configuration.py
def get_flask_admin_configuration() -> AlanAdminConfiguration:
    from components.authentication.internal.models.authentication_identity import (
        AuthenticationIdentityModel,
    )

    return AlanAdminConfiguration(
        # Add your models here to make them appear in Flask Admin
        mounted_models_specs=OrderedDict(
            [
                (
                    AuthenticationIdentityModel,
                    dict(
                        category=AUTHENTICATION_CATEGORY,
                    ),
                ),
            ]
        ),
        # If you have read-only models
        ro_mounted_models_specs=OrderedDict([]),
        # If some fields of your models are enums, you can specify them here to have a nice display (selectbox)
        enum_specs={},
    )

load_all_models

load_all_models

load_all_models(init_versioning_manager=False)
Source code in components/authentication/bootstrap/load_all_models.py
4
5
6
7
8
9
def load_all_models(init_versioning_manager: bool = False) -> list[type[DbModel]]:  # noqa: ARG001
    from components.authentication.internal.models.authentication_identity import (
        AuthenticationIdentityModel,
    )

    return [AuthenticationIdentityModel]

schemas

get_all_schemas

get_all_schemas()
Source code in components/authentication/bootstrap/schemas.py
4
5
6
7
8
9
def get_all_schemas() -> list[SchemaBootstrap]:
    from components.authentication.internal.models.helpers import (
        AUTHENTICATION_SCHEMA_NAME,
    )

    return [SchemaBootstrap(schema_name=AUTHENTICATION_SCHEMA_NAME, is_historized=True)]

components.authentication.conftest

app

app(flask_app)
Source code in components/authentication/conftest.py
6
7
8
@pytest.fixture(scope="module")
def app(flask_app) -> "CustomFlask":
    return flask_app

flask_app

flask_app()
Source code in components/authentication/conftest.py
@pytest.fixture(scope="module")
def flask_app() -> "CustomFlask":
    from shared.helpers.testing.eu_testing import global_create_test_app

    test_app = global_create_test_app()
    with test_app.app_context():
        from components.authentication.bootstrap.load_all_models import (
            load_all_models as load_all_authentication_models,
        )
        from components.global_profile.bootstrap.load_all_models import (  # noqa: ALN039, ALN043
            load_all_models as load_all_profile_models,
        )

        load_all_authentication_models()
        load_all_profile_models(init_versioning_manager=False)

        yield test_app

components.authentication.internal

application

command_handlers

change_identity_first_name
change_identity_first_name(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def change_identity_first_name(
    command: ChangeIdentityFirstNameCommand, unit_of_work: UnitOfWork
) -> None:
    sanitized_first_name = validates_name("first_name", command.first_name)

    identity = unit_of_work.identity_provider.get_identity(command.identity_id)
    if identity is None:
        current_logger.warning(
            f"Failed to set first name for identity {command.identity_id} to {sanitized_first_name}. Identity not found.",
        )
        return
    if identity.first_name == sanitized_first_name:
        return

    old_first_name = identity.first_name
    identity.set_first_name(command.first_name)

    unit_of_work.events.append(
        IdentityFirstNameChanged(
            identity_id=command.identity_id,
            new_first_name=sanitized_first_name,
            old_first_name=old_first_name,
        )
    )
change_identity_language
change_identity_language(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def change_identity_language(
    command: ChangeIdentityLanguageCommand, unit_of_work: UnitOfWork
) -> None:
    identity = unit_of_work.identity_provider.get_identity(command.identity_id)
    if identity is None:
        current_logger.warning(
            f"Failed to set language for identity {command.identity_id} to {command.language}. Identity not found.",
        )
        return
    if identity.language == command.language:
        return

    old_language = identity.language
    identity.set_language(command.language)

    unit_of_work.events.append(
        IdentityLanguageChanged(
            identity_id=command.identity_id,
            old_language=old_language,
            new_language=command.language,
        )
    )
change_identity_last_name
change_identity_last_name(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def change_identity_last_name(
    command: ChangeIdentityLastNameCommand, unit_of_work: UnitOfWork
) -> None:
    sanitized_last_name = validates_name("last_name", command.last_name)

    identity = unit_of_work.identity_provider.get_identity(command.identity_id)
    if identity is None:
        current_logger.warning(
            f"Failed to set last name for identity {command.identity_id} to {sanitized_last_name}. Identity not found.",
        )
        return
    if identity.last_name == sanitized_last_name:
        return

    old_last_name = identity.last_name
    identity.set_last_name(command.last_name)

    unit_of_work.events.append(
        IdentityLastNameChanged(
            identity_id=command.identity_id,
            old_last_name=old_last_name,
            new_last_name=sanitized_last_name,
        )
    )
change_mfa_status
change_mfa_status(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def change_mfa_status(
    command: ChangeIdentityMfaStatusCommand, unit_of_work: UnitOfWork
) -> None:
    identity = unit_of_work.identity_provider.get_identity(command.identity_id)
    if identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Identity with id {command.identity_id} not found"
        )

    if command.mfa_enabled is not None:
        identity.set_mfa_enabled(command.mfa_enabled)
    if command.mfa_required is not None:
        identity.set_mfa_required(command.mfa_required)
clear_identity_email
clear_identity_email(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def clear_identity_email(command: ClearIdentityEmail, unit_of_work: UnitOfWork) -> None:
    identity = unit_of_work.identity_provider.get_identity(command.identity_id)
    if identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Identity with id {command.identity_id} not found"
        )
    identity.set_email(email=command.invalidated_email, is_email_verified=True)
    if (
        authentication_identity
        := unit_of_work.authentication_repository.get_by_keycloak_id(identity.id)
    ):
        unit_of_work.authentication_repository.delete(authentication_identity)

    unit_of_work.events.append(
        IdentityEmailCleared(
            identity_id=command.identity_id,
            invalidated_email=command.invalidated_email,
        )
    )
create_identity
create_identity(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def create_identity(
    command: IdentityCreationCommand, unit_of_work: SQLAlchemyUnitOfWork
) -> None:
    sanitized_email = normalize_and_check_email_address_format("email", command.email)
    sanitized_first_name = validates_name("first_name", command.first_name)
    sanitized_last_name = validates_name("last_name", command.last_name)

    if (
        is_production_mode()
        and generic_alan_email_re.match(sanitized_email) is not None
    ):
        # Only raise in production - no need to control for other environments
        # (this way, no need to fix about 400 tests and will keep acceptance tests easy)
        raise ModelValidationError("Forbidden usage of restricted email address")

    identity = unit_of_work.identity_provider.find_identity(sanitized_email)
    if identity is not None:
        current_logger.warning(
            f"Identity provider {sanitized_email} is already registered"
        )
        if (
            unit_of_work.authentication_repository.get_by_keycloak_id(identity.id)
            is not None
        ):
            raise BaseErrorCode.resource_already_exists(
                message=f"Identity with email {sanitized_email} already exists"
            )
        else:
            # The identity exist but is not used, we can clear the credentials and reuse it
            identity.clear_password()
            identity.set_first_and_last_names(
                first_name=sanitized_first_name, last_name=sanitized_last_name
            )
            identity.set_language(language=command.language)
            identity.set_mfa_enabled(
                command.mfa_required if command.mfa_required is not None else False
            )
            identity.set_mfa_required(
                command.mfa_required if command.mfa_required is not None else False
            )
    else:
        identity = unit_of_work.identity_provider.create_new_identity(
            email=sanitized_email,
            first_name=sanitized_first_name,
            last_name=sanitized_last_name,
            language=command.language,
            # We set the same value in both properties as it does not make sense to have different values here
            mfa_enabled=command.mfa_required
            if command.mfa_required is not None
            else False,
            mfa_required=command.mfa_required
            if command.mfa_required is not None
            else False,
        )
    if command.profile_id is not None:
        auth_identity = AuthenticationIdentity(
            profile_id=command.profile_id,
            keycloak_id=identity.id,
        )
        unit_of_work.authentication_repository.save(auth_identity)
    unit_of_work.events.append(
        IdentityCreatedEvent(
            identity_id=identity.id,
            email=sanitized_email,
            first_name=sanitized_first_name,
            last_name=sanitized_last_name,
            language=command.language,
            profile_id=command.profile_id,
            is_email_verified=False,
        )
    )
create_or_change_keycloak_id
create_or_change_keycloak_id(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def create_or_change_keycloak_id(
    command: UpdateKeycloakIdCommand, unit_of_work: UnitOfWork
) -> None:
    identity = unit_of_work.authentication_repository.get_by_profile_id(
        command.profile_id
    )
    # nothing to do
    if identity is None and command.keycloak_id is None:
        return
    # create
    elif identity is None and command.keycloak_id is not None:
        identity = AuthenticationIdentity(
            profile_id=command.profile_id,
            keycloak_id=command.keycloak_id,
        )
        unit_of_work.authentication_repository.save(identity)

    elif identity is not None:
        # dissociate
        if command.keycloak_id is None:
            unit_of_work.authentication_repository.delete(identity)
        # update
        elif identity.keycloak_id != command.keycloak_id:
            identity.keycloak_id = command.keycloak_id
            unit_of_work.authentication_repository.save(identity)
delete_identity
delete_identity(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def delete_identity(command: DeleteIdentityCommand, unit_of_work: UnitOfWork) -> None:
    identity = unit_of_work.identity_provider.get_identity(command.identity_id)

    if identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Identity with id {command.identity_id} not found"
        )

    authentication_identity = unit_of_work.authentication_repository.get_by_keycloak_id(
        identity.id
    )
    if authentication_identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Authentication identity with keycloak id {identity.id} not found"
        )

    unit_of_work.authentication_repository.delete(authentication_identity)
    # TODO: @thibaut.caillierez: make the wizardry happening here more explicit
    identity.delete()
delete_identity_credentials
delete_identity_credentials(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def delete_identity_credentials(
    command: DeleteIdentityCredentialsCommand, unit_of_work: UnitOfWork
) -> None:
    if identity := unit_of_work.identity_provider.get_identity(command.identity_id):
        identity.clear_password()
log_out_identity_from_all_sessions
log_out_identity_from_all_sessions(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def log_out_identity_from_all_sessions(
    command: LogOutIdentityFromAllSessionsCommand, unit_of_work: UnitOfWork
) -> None:
    identity = unit_of_work.identity_provider.get_identity(command.identity_id)
    if identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Identity with id {command.identity_id} not found"
        )
    identity.logout_all_sessions()
request_change_identity_email
request_change_identity_email(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def request_change_identity_email(
    command: RequestChangeIdentityEmailCommand, unit_of_work: SQLAlchemyUnitOfWork
) -> None:
    sanitized_email = normalize_and_check_email_address_format("email", command.email)

    identity = unit_of_work.identity_provider.get_identity(command.identity_id)
    if identity is None or identity.email == command.email:
        return

    if (
        is_production_mode()
        and generic_alan_email_re.match(sanitized_email) is not None
    ):
        # Only raise in production - no need to control for other environments
        # (this way, no need to fix about 400 tests and will keep acceptance tests easy)
        raise ModelValidationError("Forbidden usage of restricted email address")

    # Check if we already have a known identity for the new email. This can happen if the user is already
    # insured in another country.

    identity_with_same_email = unit_of_work.identity_provider.find_identity(
        sanitized_email
    )

    event: DomainEvent
    if (
        identity_with_same_email is not None
        and identity_with_same_email.id != command.identity_id
    ):
        event = MergeIdentityEvent(
            from_identity_id=command.identity_id,
            to_identity_id=identity_with_same_email.id,
            with_password_reset=True,
        )
    else:
        event = ChangeIdentityEmailApprovedEvent(
            identity_id=command.identity_id,
            old_email=identity.email,
            new_email=sanitized_email,
        )

    unit_of_work.events.append(event)
request_set_identity_credentials
request_set_identity_credentials(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def request_set_identity_credentials(
    command: ChangeIdentityCredentialsCommand, unit_of_work: UnitOfWork
) -> None:
    existing_identity_with_id = unit_of_work.identity_provider.get_identity(
        command.identity_id
    )

    if not existing_identity_with_id:
        raise BaseErrorCode.missing_resource(
            message=f"Identity with id {command.identity_id} not found"
        )

    sanitized_email = normalize_and_check_email_address_format("email", command.email)
    existing_identity_with_email = unit_of_work.identity_provider.find_identity(
        sanitized_email
    )

    if (
        not existing_identity_with_email
        or existing_identity_with_email.id == command.identity_id
    ):
        # Ideal case, there is no conflict, or we are dealing with the same identity
        existing_identity_with_id.set_email(
            email=sanitized_email, is_email_verified=True
        )
        existing_identity_with_id.set_password(
            prehashed_password=command.prehashed_password
        )

        if command.mfa_required is not None:
            # We set the same value in both properties as it does not make sense to have different values here
            existing_identity_with_id.set_mfa_enabled(enabled=command.mfa_required)
            existing_identity_with_id.set_mfa_required(required=command.mfa_required)
    else:
        if not is_test_mode():
            current_logger.info(
                f"Trying to update identity {command.identity_id} to existing identity {existing_identity_with_email.id} for email {sanitized_email}."
            )
        if existing_identity_with_email.check_password(
            prehashed_password=command.prehashed_password
        ):
            # There is already an identity using this email but the entered credentials match this identity so we can merge the two
            unit_of_work.events.append(
                MergeIdentityEvent(
                    from_identity_id=command.identity_id,
                    to_identity_id=existing_identity_with_email.id,
                    with_password_reset=False,
                )
            )
        else:
            raise BaseErrorCode.login_error(
                attribute="password",
                desc="Existing login credentials found: password is incorrect",
            )
send_password_reset_email
send_password_reset_email(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def send_password_reset_email(
    command: SendPasswordResetEmailCommand, unit_of_work: UnitOfWork
) -> None:
    affected_identity_id = unit_of_work.identity_provider.generate_password_reset_email(
        command.email, command.client_id, command.redirect_uri
    )
    if affected_identity_id:
        unit_of_work.events.append(
            PasswordResetEmailSent(
                identity_id=affected_identity_id,
                email=command.email,
                client_id=command.client_id,
                redirect_uri=command.redirect_uri,
            )
        )
send_verification_email
send_verification_email(command, unit_of_work)
Source code in components/authentication/internal/application/command_handlers.py
def send_verification_email(
    command: SendVerificationEmailCommand, unit_of_work: UnitOfWork
) -> None:
    if not command.identity_id and not command.email:
        raise BaseErrorCode.invalid_arguments(
            message="No identity_id or email provided"
        )
    identity: AuthIdentity | None = None
    if command.identity_id:
        identity = unit_of_work.identity_provider.get_identity(command.identity_id)
        if identity is None:
            raise BaseErrorCode.missing_resource(
                message=f"Identity with id {command.identity_id} not found"
            )

    if not command.identity_id or not identity:
        identity = unit_of_work.identity_provider.find_identity(command.email)

    if not identity:
        raise BaseErrorCode.missing_resource(
            message=f"Identity with {' or '.join([f'identity id {command.identity_id}', f'email {command.email}'])} not found"
        )

    unit_of_work.identity_provider.generate_verification_email(
        identity_id=identity.id,
        client_id=command.client_id,
        redirect_uri=command.redirect_uri,
    )

commands

ChangeIdentityCredentialsCommand dataclass
ChangeIdentityCredentialsCommand(
    identity_id,
    email,
    is_email_verified,
    prehashed_password,
    mfa_required,
)

Bases: Command

email instance-attribute
email
identity_id instance-attribute
identity_id
is_email_verified instance-attribute
is_email_verified
mfa_required instance-attribute
mfa_required
prehashed_password instance-attribute
prehashed_password
ChangeIdentityFirstNameCommand dataclass
ChangeIdentityFirstNameCommand(identity_id, first_name)

Bases: Command

first_name instance-attribute
first_name
identity_id instance-attribute
identity_id
ChangeIdentityLanguageCommand dataclass
ChangeIdentityLanguageCommand(identity_id, language)

Bases: Command

identity_id instance-attribute
identity_id
language instance-attribute
language
ChangeIdentityLastNameCommand dataclass
ChangeIdentityLastNameCommand(identity_id, last_name)

Bases: Command

identity_id instance-attribute
identity_id
last_name instance-attribute
last_name
ChangeIdentityMfaStatusCommand dataclass
ChangeIdentityMfaStatusCommand(
    identity_id, mfa_enabled, mfa_required
)

Bases: Command

identity_id instance-attribute
identity_id
mfa_enabled instance-attribute
mfa_enabled
mfa_required instance-attribute
mfa_required
ClearIdentityEmail dataclass
ClearIdentityEmail(identity_id, invalidated_email)

Bases: Command

identity_id instance-attribute
identity_id
invalidated_email instance-attribute
invalidated_email
Command dataclass
Command()

Bases: ABC

DeleteIdentityCommand dataclass
DeleteIdentityCommand(identity_id)

Bases: Command

identity_id instance-attribute
identity_id
DeleteIdentityCredentialsCommand dataclass
DeleteIdentityCredentialsCommand(identity_id)

Bases: Command

identity_id instance-attribute
identity_id
IdentityCreationCommand dataclass
IdentityCreationCommand(
    profile_id,
    email,
    first_name,
    last_name,
    language,
    mfa_required,
)

Bases: Command

email instance-attribute
email
first_name instance-attribute
first_name
language instance-attribute
language
last_name instance-attribute
last_name
mfa_required instance-attribute
mfa_required
profile_id instance-attribute
profile_id
LogOutIdentityFromAllSessionsCommand dataclass
LogOutIdentityFromAllSessionsCommand(identity_id)

Bases: Command

identity_id instance-attribute
identity_id
RequestChangeIdentityEmailCommand dataclass
RequestChangeIdentityEmailCommand(identity_id, email)

Bases: Command

email instance-attribute
email
identity_id instance-attribute
identity_id
SendPasswordResetEmailCommand dataclass
SendPasswordResetEmailCommand(
    identity_id, email, client_id, redirect_uri
)

Bases: Command

client_id instance-attribute
client_id
email instance-attribute
email
identity_id instance-attribute
identity_id
redirect_uri instance-attribute
redirect_uri
SendVerificationEmailCommand dataclass
SendVerificationEmailCommand(
    identity_id, email, client_id, redirect_uri
)

Bases: Command

client_id instance-attribute
client_id
email instance-attribute
email
identity_id instance-attribute
identity_id
redirect_uri instance-attribute
redirect_uri
UpdateKeycloakIdCommand dataclass
UpdateKeycloakIdCommand(profile_id, keycloak_id)

Bases: Command

keycloak_id instance-attribute
keycloak_id
profile_id instance-attribute
profile_id

event_handlers

change_identity_email
change_identity_email(event, unit_of_work)
Source code in components/authentication/internal/application/event_handlers.py
def change_identity_email(
    event: ChangeIdentityEmailApprovedEvent, unit_of_work: UnitOfWork
) -> None:
    sanitized_email = mandatory(
        normalize_and_check_email_address_format("email", event.new_email)
    )

    identity = unit_of_work.identity_provider.get_identity(event.identity_id)
    if identity is None or identity.email == event.new_email:
        raise BaseErrorCode.missing_resource(
            message=f"Identity {event.identity_id} not found"
        )

    identity.set_email(sanitized_email, is_email_verified=True)

    unit_of_work.events.append(
        IdentityEmailChangedEvent(
            identity_id=identity.id,
            old_email=event.old_email,
            new_email=sanitized_email,
        )
    )
merge_identities
merge_identities(event, unit_of_work)
Source code in components/authentication/internal/application/event_handlers.py
def merge_identities(event: MergeIdentityEvent, unit_of_work: UnitOfWork) -> None:
    from_keycloak_identity = unit_of_work.identity_provider.get_identity(
        event.from_identity_id
    )
    to_keycloak_identity = unit_of_work.identity_provider.get_identity(
        event.to_identity_id
    )

    if from_keycloak_identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Identity {event.from_identity_id} not found, cannot merge it into {event.to_identity_id}"
        )
    if to_keycloak_identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Identity {event.to_identity_id} not found, cannot merge {event.from_identity_id} into it"
        )

    from_authentication_identity = (
        unit_of_work.authentication_repository.get_by_keycloak_id(
            from_keycloak_identity.id
        )
    )
    from_keycloak_identity.update_pending_deletion(True)
    if from_authentication_identity:
        from_authentication_identity.keycloak_id = to_keycloak_identity.id
        unit_of_work.authentication_repository.save(from_authentication_identity)

    if event.with_password_reset:
        to_keycloak_identity.clear_password()

    unit_of_work.events.append(
        IdentityMergedEvent(
            from_identity_id=from_keycloak_identity.id,
            to_identity_id=to_keycloak_identity.id,
            with_password_reset=event.with_password_reset,
        )
    )

message_bus

Message module-attribute
Message = DomainEvent | Command
MessageBus
MessageBus(unit_of_work, event_handlers, command_handlers)

MessageBus that dispatches messages to their respective handlers.

Note: this is a glorified for loop.

The order in which handlers will be called is NOT guaranteed.

Message inheritance isn't supported at the moment and handlers needs to be explicitly attached to their exact types.

Taking a fictional example:

class CompanyUpdated(DomainEvent): ... class VatNumberUpdated(CompanyUpdated): ... class SiretNumberUpdated(CompanyUpdated): ...

event_handlers = { CompanyUpdated: [print] }

If VatNumberUpdated is triggered, print will NOT be called.

Source code in components/authentication/internal/application/message_bus.py
def __init__(
    self,
    unit_of_work: UnitOfWork,
    event_handlers: dict[type[DomainEvent], list[Callable[[DomainEvent], None]]],
    command_handlers: dict[type[Command], list[Callable[[Command], None]]],
):
    """
    The order in which handlers will be called is NOT guaranteed.

    Message inheritance isn't supported at the moment and handlers needs to be explicitly attached to their exact types.

    Taking a fictional example:

    class CompanyUpdated(DomainEvent): ...
    class VatNumberUpdated(CompanyUpdated): ...
    class SiretNumberUpdated(CompanyUpdated): ...

    event_handlers = {
        CompanyUpdated: [print]
    }

    If VatNumberUpdated is triggered, print will NOT be called.
    """
    self._event_handlers_types = frozenset(event_handlers.keys())
    self._command_handlers_types = frozenset(command_handlers.keys())
    MessageBus._validate_no_subtypes(self._event_handlers_types)  # noqa: ALN027
    MessageBus._validate_no_subtypes(self._command_handlers_types)  # noqa: ALN027

    self.unit_of_work = unit_of_work
    # shuffle handlers to avoid code that relies on events being processed in a deterministic order
    # uses random.sample instead of random.shuffle, as sample returns a new collection
    self.event_handlers = {
        type_: random.sample(handlers, len(handlers))
        for type_, handlers in event_handlers.items()
    }
    self.command_handlers = {
        type_: random.sample(handlers, len(handlers))
        for type_, handlers in command_handlers.items()
    }
    self.processed: list[Message] = []
collect_processed
collect_processed()
Source code in components/authentication/internal/application/message_bus.py
def collect_processed(self) -> list[Message]:
    processed = self.processed.copy()
    self.processed.clear()
    return processed
command_handlers instance-attribute
command_handlers = {
    type_: (sample(handlers, len(handlers)))
    for (type_, handlers) in (items())
}
event_handlers instance-attribute
event_handlers = {
    type_: (sample(handlers, len(handlers)))
    for (type_, handlers) in (items())
}
handle
handle(message)
Source code in components/authentication/internal/application/message_bus.py
def handle(self, message: Message) -> None:
    queue = deque([message])
    while queue:
        message = queue.popleft()
        message_type = type(message)

        if isinstance(message, DomainEvent):
            MessageBus._validate_is_not_subtype(  # noqa: ALN027
                message_type,
                self._event_handlers_types,
            )
            domain_event_handlers = self.event_handlers.get(type(message), [])

            for event_handler in domain_event_handlers:
                event_handler(message)
        elif isinstance(message, Command):
            MessageBus._validate_is_not_subtype(  # noqa: ALN027
                message_type,
                self._command_handlers_types,
            )
            command_handlers = self.command_handlers.get(type(message), [])

            for command_handler in command_handlers:
                command_handler(message)
        else:
            assert_never()

        self.processed.append(message)
        queue.extend(self.unit_of_work.flush_events())
processed instance-attribute
processed = []
unit_of_work instance-attribute
unit_of_work = unit_of_work
T module-attribute
T = TypeVar('T', bound=Message)

subscribers

update_keycloak_identity_information
update_keycloak_identity_information(event)

Update a keycloak identity first and last name when changed in the global profile.

Source code in components/authentication/internal/application/subscribers.py
def update_keycloak_identity_information(event: IdentityInformationChanged) -> None:
    """
    Update a keycloak identity first and last name when changed in the global profile.
    """
    from shared.helpers.db import current_session
    from shared.helpers.logging.logger import current_logger

    authentication_service = AuthenticationService.create()

    auth_identity = authentication_service.get_keycloak_identity_by_profile_id(
        event.profile_id
    )
    if auth_identity is None:
        return
    if auth_identity.first_name != event.first_name:
        current_logger.info(
            f"Updating Keycloak identity first name keycloak id {auth_identity.id}"
        )
        authentication_service.change_identity_first_name(
            auth_identity.id, first_name=event.first_name
        )
    if auth_identity.last_name != event.last_name:
        current_logger.info(
            f"Updating Keycloak identity last name keycloak id {auth_identity.id}"
        )
        authentication_service.change_identity_last_name(
            auth_identity.id, last_name=event.last_name
        )
    current_session.commit()
update_keycloak_identity_language
update_keycloak_identity_language(event)

Update a keycloak identity preferred language when changed in the global profile.

Source code in components/authentication/internal/application/subscribers.py
def update_keycloak_identity_language(event: PreferredLanguageChanged) -> None:
    """
    Update a keycloak identity preferred language when changed in the global profile.
    """

    from shared.helpers.db import current_session
    from shared.helpers.logging.logger import current_logger

    authentication_service = AuthenticationService.create()

    auth_identity = authentication_service.get_keycloak_identity_by_profile_id(
        event.profile_id
    )
    if auth_identity is None:
        return
    if auth_identity.language != LanguageMapper.to_model(event.preferred_language):
        current_logger.info(
            f"Updating Keycloak identity language keycloak id {auth_identity.id}"
        )
        authentication_service.change_identity_language(
            auth_identity.id, language=LanguageMapper.to_model(event.preferred_language)
        )
    current_session.commit()

business_logic

actions

lockdown_compromised_account
CompromisedAccountStatus dataclass
CompromisedAccountStatus(
    keycloak_id,
    app_name=optional_to_empty_str_field(),
    user_id=optional_to_empty_str_field(),
    credentials_reset=False,
    email_sent=False,
    email_changed=False,
    old_email=optional_to_empty_str_field(),
    latest_fraud_change=optional_to_empty_str_field(),
)

Bases: DataClassJsonMixin

Dataclass tracking all operations done for a compromised account.

app_name class-attribute instance-attribute
app_name = optional_to_empty_str_field()
credentials_reset class-attribute instance-attribute
credentials_reset = False
email_changed class-attribute instance-attribute
email_changed = False
email_sent class-attribute instance-attribute
email_sent = False
get_headers classmethod
get_headers()
Source code in components/authentication/internal/business_logic/actions/lockdown_compromised_account.py
@classmethod
def get_headers(cls) -> list[str]:
    return [col.name for col in fields(cls)] + ["marmot_url"]
keycloak_id class-attribute instance-attribute
keycloak_id = field(
    metadata=config(encoder=str, decoder=UUID)
)
latest_fraud_change class-attribute instance-attribute
latest_fraud_change = optional_to_empty_str_field()
marmot_url property
marmot_url

Compute Marmot URL from app_name and user_id

old_email class-attribute instance-attribute
old_email = optional_to_empty_str_field()
to_row
to_row()

Convert to spreadsheet row with proper encoding, preserving field order

Source code in components/authentication/internal/business_logic/actions/lockdown_compromised_account.py
def to_row(self) -> list[Any]:
    """Convert to spreadsheet row with proper encoding, preserving field order"""
    headers = self.get_headers()
    data_dict = self.to_dict()
    # Add marmot_url to the dict since it's a property
    data_dict["marmot_url"] = self.marmot_url
    return [data_dict.get(col, "") for col in headers]
user_id class-attribute instance-attribute
user_id = optional_to_empty_str_field()
NotificationStrategy

Bases: AlanBaseEnum

Strategy for sending compromise notifications.

force class-attribute instance-attribute
force = 'force'
no class-attribute instance-attribute
no = 'no'
yes class-attribute instance-attribute
yes = 'yes'
build_marmot_url
build_marmot_url(app_name, user_id)
Source code in components/authentication/internal/business_logic/actions/lockdown_compromised_account.py
def build_marmot_url(app_name: AppName | None, user_id: str | None) -> str:
    if user_id is None or app_name is None:
        return ""

    match app_name:
        case AppName.ALAN_FR:
            return f"https://alan.com/marmot/fr/user/{user_id}"
        case AppName.ALAN_ES:
            return f"https://es.alan.com/marmot/user/{user_id}"
        case AppName.ALAN_BE:
            return f"https://be.alan.com/marmot/user/{user_id}"
        case _:
            return ""
lockdown_compromised_accounts
lockdown_compromised_accounts(
    compromised_accounts,
    notify_users,
    check_changes_since_days=7,
    dry_run=True,
)
Source code in components/authentication/internal/business_logic/actions/lockdown_compromised_account.py
def lockdown_compromised_accounts(
    compromised_accounts: list[CompromisedAccountStatus],
    notify_users: NotificationStrategy,
    check_changes_since_days: int = 7,
    dry_run: bool = True,
) -> list[CompromisedAccountStatus]:
    # Phase 1: Check if any recent email change happened
    email_changes = get_email_changes_by_keycloak_ids(
        [account.keycloak_id for account in compromised_accounts],
        days_since=check_changes_since_days,
    )
    for account in compromised_accounts:
        account.email_changed = account.keycloak_id in email_changes
        account.old_email = email_changes.get(account.keycloak_id, {}).get("old_email")
    current_logger.info(
        f"Found {len(email_changes)} compromised accounts with email changes in the past {check_changes_since_days} days"
    )

    # Phase 2: Reset credentials & send email
    compromised_accounts = [
        _reset_credentials_and_send_email(
            compromised_account,
            notify_users=notify_users,
            dry_run=dry_run,
        )
        for compromised_account in compromised_accounts
    ]

    # Phase 3: For FR only users, get additionnal fraud changes
    fraud_changes = get_fr_users_fraud_relevant_changes(
        [
            account.user_id
            for account in compromised_accounts
            if account.user_id is not None and account.app_name == AppName.ALAN_FR
        ]
    )
    for account in compromised_accounts:
        if account.user_id is not None:
            account.latest_fraud_change = fraud_changes.get(account.user_id)
    current_logger.info(
        f"Found {len(fraud_changes)} compromised accounts with fraud changes in the past {check_changes_since_days} days"
    )

    return compromised_accounts
optional_to_empty_str_field
optional_to_empty_str_field()

Helper for optional string fields that converts None ↔ empty string

Source code in components/authentication/internal/business_logic/actions/lockdown_compromised_account.py
def optional_to_empty_str_field() -> Any:
    """Helper for optional string fields that converts None ↔ empty string"""
    return field(
        default=None,
        metadata=config(
            encoder=lambda x: "" if x is None else x,
            decoder=lambda x: None if x == "" else x,
        ),
    )
reset_user_credentials
reset_user_credentials
reset_user_credentials(keycloak_id, dry_run=True)

Reset user credentials by removing password and revoking all sessions.

Parameters:

Name Type Description Default
keycloak_id UUID

The Keycloak ID of the user

required
dry_run bool

Whether to perform a dry run (default True)

True

Returns:

Type Description
UUID | None

User ID if successful, None if failed or dry run

Source code in components/authentication/internal/business_logic/actions/reset_user_credentials.py
def reset_user_credentials(keycloak_id: UUID, dry_run: bool = True) -> UUID | None:
    """
    Reset user credentials by removing password and revoking all sessions.

    Args:
        keycloak_id: The Keycloak ID of the user
        dry_run: Whether to perform a dry run (default True)

    Returns:
        User ID if successful, None if failed or dry run
    """
    authentication_service = AuthenticationService.create()

    user = get_user_with_keycloak_id(keycloak_id)
    identity = authentication_service.get_keycloak_identity(keycloak_id=keycloak_id)

    if not user:
        current_logger.info(f"No user found with keycloak_id: {keycloak_id}")
        return None
    if not identity:
        current_logger.warning(
            f"No identity found for user {user.id} with keycloak_id {keycloak_id}"
        )
        return None

    current_logger.info(f"Found user {user.id} with keycloak_id {keycloak_id}")

    if dry_run:
        current_logger.info(
            f"DRY RUN: Would remove password and revoke all tokens for user {user.id}"
        )
        return user.id

    authentication_service.delete_identity_credentials(identity_id=identity.id)
    user.revoke_all_tokens()
    # Clear sessions in Keycloak
    authentication_service.logout_identity_from_all_sessions(identity_id=identity.id)

    current_logger.info(
        f"Successfully reset credentials for user {user.id} with keycloak_id {keycloak_id}"
    )

    return user.id

queries

get_user_changes
get_email_changes_by_keycloak_ids
get_email_changes_by_keycloak_ids(
    keycloak_ids, days_since=7
)

Get email changes for users identified by their Keycloak IDs.

Flow: keycloak_id → authentication_identity → profile_id → Activity

Source code in components/authentication/internal/business_logic/queries/get_user_changes.py
def get_email_changes_by_keycloak_ids(
    keycloak_ids: list[UUID], days_since: int = 7
) -> dict[UUID, dict[str, Any]]:
    """
    Get email changes for users identified by their Keycloak IDs.

    Flow: keycloak_id → authentication_identity → profile_id → Activity
    """
    query = text("""
         SELECT DISTINCT ON (ai.keycloak_id)
             ai.keycloak_id,
             p.id AS profile_id,
             p.email AS current_email,
             a.id AS activity_id,
             a.old_data->>'email' AS old_email,
             a.changed_data->>'email' AS new_email,
             a.issued_at AS changed_at,
             t.actor_id
         FROM authentication.authentication_identity ai
             INNER JOIN global_profile.profile p
         ON ai.profile_id = p.id
             INNER JOIN activity a
             ON (a.old_data->>'id')::uuid = p.id
             LEFT JOIN transaction t
             ON a.transaction_id = t.id
         WHERE
             ai.keycloak_id IN :keycloak_ids
           AND a.schema_name = 'global_profile'
           AND a.table_name = 'profile'
           AND a.verb = 'update'
           AND a.changed_data ? 'email'
           AND a.issued_at >= :start_date
         ORDER BY
             ai.keycloak_id,
             a.issued_at DESC
                 """)

    result = current_session.execute(
        query,
        {
            "keycloak_ids": tuple(keycloak_ids),
            "start_date": datetime.now() - timedelta(days=days_since),
        },
    ).mappings()

    email_changes = cast("list[dict[str, Any]]", result.fetchall())
    return {change["keycloak_id"]: change for change in email_changes}
get_fr_users_fraud_relevant_changes
get_fr_users_fraud_relevant_changes(user_ids, days_since=7)
Source code in components/authentication/internal/business_logic/queries/get_user_changes.py
def get_fr_users_fraud_relevant_changes(
    user_ids: list[str], days_since: int = 7
) -> dict[str, str]:
    if get_current_app_name() != AppName.ALAN_FR:
        return {}

    from components.fr.public.fraud_detection.enums import (
        FraudRelevantUserChangeType,
    )
    from components.fr.public.fraud_detection.queries import (
        find_most_recent_fraud_relevant_user_changes,
    )

    fraud_changes = find_most_recent_fraud_relevant_user_changes(
        [int(user_id) for user_id in user_ids],
        change_type=FraudRelevantUserChangeType.SettlementIbanUpdated,
        since=datetime.now() - timedelta(days=days_since),
    )

    results = {}
    for change in fraud_changes:
        if change.actor_id != change.user_id:
            # We're only interested in changes done from the user account
            continue
        data = {
            "occurred_at": str(change.occurred_at),
            "type": str(change.type),
            "old_values": change.old_values.get(),
            "new_values": change.new_values.get(),
        }
        results[str(change.user_id)] = str(data)

    return results
user
get_user_with_keycloak_id
get_user_with_keycloak_id(keycloak_id)
Source code in components/authentication/internal/business_logic/queries/user.py
def get_user_with_keycloak_id(keycloak_id: UUID) -> BaseUser | None:
    UserModel = get_current_class(BaseUser)

    return current_session.scalars(
        select(UserModel).where(UserModel._keycloak_id == keycloak_id)  # noqa: ALN027 issue with keycloak_id being a hybrid property
    ).one_or_none()

commands

app_group

authentication_commands module-attribute
authentication_commands = AppGroup('authentication')

backfill_authentication_identity

backfill_users_identity
backfill_users_identity(dry_run=True)
Source code in components/authentication/internal/commands/backfill_authentication_identity.py
@authentication_commands.command()
@command_with_dry_run
def backfill_users_identity(dry_run: bool = True) -> None:
    _backfill_users_identity(dry_run=dry_run)
check_all_user_have_identity
check_all_user_have_identity()

Check if all the users with a keycloak_id have an identity in the authentication_identity table

Source code in components/authentication/internal/commands/backfill_authentication_identity.py
@authentication_commands.command()
def check_all_user_have_identity() -> None:
    """
    Check if all the users with a keycloak_id have an identity in the authentication_identity table
    """
    from components.be.internal.models.be_user import BeUser  # noqa: ALN069
    from components.ca.internal.tech.models.ca_user import CaUser  # noqa: ALN069
    from components.es.internal.models.es_user import EsUser  # noqa: ALN069
    from components.fr.internal.models.user import User  # noqa: ALN069

    user_classes = [BeUser, CaUser, EsUser, User]

    for user_class in user_classes:
        users_without_identity = current_session.scalars(
            select(user_class)
            .outerjoin(
                AuthenticationIdentityModel,
                user_class.profile_id == AuthenticationIdentityModel.profile_id,
            )
            .where(
                user_class.keycloak_id.is_not(None),  # type: ignore[attr-defined]
                AuthenticationIdentityModel.id.is_(None),
            )
        ).all()
        if users_without_identity:
            current_logger.error(
                f"Found {len(users_without_identity)} users without identity in {user_class.__name__}"
            )
            for user in users_without_identity:
                current_logger.error(
                    f"User {user.id} with email {user.email} has no identity"
                )
find_identity_conlicts
find_identity_conlicts()

Find any keycloak_id conflicts between users in different components.

Source code in components/authentication/internal/commands/backfill_authentication_identity.py
@authentication_commands.command()
def find_identity_conlicts() -> None:
    """
    Find any keycloak_id conflicts between users in different components.
    """
    from components.be.internal.models.be_user import BeUser  # noqa: ALN069
    from components.ca.internal.tech.models.ca_user import CaUser  # noqa: ALN069
    from components.es.internal.models.es_user import EsUser  # noqa: ALN069
    from components.fr.internal.models.user import User  # noqa: ALN069

    user_classes = [BeUser, CaUser, EsUser, User]
    user_aliases = [aliased(cls) for cls in user_classes]

    for i in range(len(user_aliases)):
        for j in range(i + 1, len(user_aliases)):
            user_a = user_aliases[i]
            user_b = user_aliases[j]

            conflicts = current_session.execute(
                select(
                    user_a.keycloak_id,
                    user_a.id,
                    user_a.profile_id,
                    user_b.id,
                    user_b.profile_id,
                )  # type: ignore[call-overload]
                .join(user_b, user_a.keycloak_id == user_b.keycloak_id)
                .where(
                    and_(
                        user_a.keycloak_id.is_not(None),  # type: ignore[attr-defined]
                        user_b.keycloak_id.is_not(None),  # type: ignore[attr-defined]
                        user_a.profile_id != user_b.profile_id,
                    )
                )
            ).all()

            if conflicts:
                current_logger.error(
                    f"Found {len(conflicts)} keycloak_id conflicts between {user_a.__name__} and {user_b.__name__}"
                )
                for conflict in conflicts:
                    keycloak_id = conflict[0]
                    user_a_id = conflict[1]
                    user_a_profile_id = conflict[2]
                    user_b_id = conflict[3]
                    user_b_profile_id = conflict[4]
                    current_logger.error(
                        f"Conflict on keycloak_id {keycloak_id}: {user_a.__name__} (id: {user_a_id}, profile_id: {user_a_profile_id}) and {user_b.__name__} (id: {user_b_id}, profile_id: {user_b_profile_id})"
                    )
            else:
                current_logger.info(
                    f"No keycloak_id conflicts between {user_a.__name__} and {user_b.__name__}"
                )

compromised_accounts

lockdown_compromised_user_accounts
lockdown_compromised_user_accounts(
    gsheet_key,
    keycloak_ids,
    gsheet_tab,
    notify_users,
    check_changes_since_days=7,
    dry_run=True,
)

Lockdown compromised user accounts by removing password and revoking all sessions. How To: https://www.notion.so/alaninsurance/Compromised-member-accounts-18f1426e8be780e783a3e250de980c42 ⧉

This command can work in two modes: 1. --gsheet-key: Read keycloak_ids from a spreadsheet and update it with results 2. --keycloak-ids: Process a list of IDs directly without spreadsheet persistence

Additionally, we check for any email change (all countries) or relevant fraud change (FR only)

Source code in components/authentication/internal/commands/compromised_accounts.py
@authentication_commands.command()
@click.argument(
    "keycloak-ids",
    nargs=-1,
    type=UUID,
)
@click.option(
    "--gsheet-key",
    help="Google spreadsheet key containing keycloak_ids to process",
)
@click.option(
    "--notify-users",
    type=click.Choice(NotificationStrategy.get_values(), case_sensitive=False),
    default="no",
    help="Send account compromise notification email after resetting credentials. 'force' sends even if email was changed.",
)
@click.option(
    "--gsheet-tab",
    help="Google spreadsheet tab name (only used with --gsheet-key)",
    required=False,
    default="Sheet1",
)
@click.option(
    "--check-changes-since-days",
    type=int,
    default=7,
    help="Number of days to look back for email & fraud changes (default: 7)",
)
@command_with_dry_run
def lockdown_compromised_user_accounts(
    gsheet_key: str | None,
    keycloak_ids: tuple[UUID],
    gsheet_tab: str,
    notify_users: str,
    check_changes_since_days: int = 7,
    dry_run: bool = True,
) -> None:
    """
    Lockdown compromised user accounts by removing password and revoking all sessions.
    How To: https://www.notion.so/alaninsurance/Compromised-member-accounts-18f1426e8be780e783a3e250de980c42

    This command can work in two modes:
    1. --gsheet-key: Read keycloak_ids from a spreadsheet and update it with results
    2. --keycloak-ids: Process a list of IDs directly without spreadsheet persistence

    Additionally, we check for any email change (all countries) or relevant fraud change (FR only)
    """

    # Validate input: exactly one of the two options must be provided
    if (gsheet_key and keycloak_ids) or (not gsheet_key and not keycloak_ids):
        raise ValueError(
            "Provide exactly one of --gsheet-key option or keycloak-ids args"
        )

    if gsheet_key:
        # Mode 1: Spreadsheet-based processing
        current_logger.info(f"Processing accounts from spreadsheet: {gsheet_key}")  # type: ignore[unreachable]
        compromised_accounts = _load_accounts_from_spreadsheet(gsheet_key, gsheet_tab)
    else:
        # Mode 2: Direct ID processing
        current_logger.info("Processing accounts from keycloak ids")
        compromised_accounts = [
            CompromisedAccountStatus(keycloak_id=keycloak_id)
            for keycloak_id in keycloak_ids
        ]

    current_logger.info(f"Found {len(compromised_accounts)} compromised accounts")
    if not compromised_accounts:
        current_logger.warning("No compromised accounts loaded")
        return

    # Process accounts
    compromised_accounts = lockdown_compromised_accounts(
        compromised_accounts,
        notify_users=NotificationStrategy.validate(notify_users),
        check_changes_since_days=check_changes_since_days,
        dry_run=dry_run,
    )

    if gsheet_key:
        # Update spreadsheet with results
        _update_spreadsheet_with_results(gsheet_key, gsheet_tab, compromised_accounts)  # type: ignore[unreachable]

        current_logger.info(
            f"Updated spreadsheet {gsheet_key} with {len(compromised_accounts)} rows",
            spreadsheet_id=gsheet_key,
            rows_count=len(compromised_accounts),
        )
notify_compromised_accounts
notify_compromised_accounts(keycloak_ids, dry_run=True)
Source code in components/authentication/internal/commands/compromised_accounts.py
@authentication_commands.command()
@click.argument(
    "keycloak-ids",
    nargs=-1,
    type=UUID,
)
@command_with_dry_run
def notify_compromised_accounts(
    keycloak_ids: tuple[UUID],
    dry_run: bool = True,
) -> None:
    for keycloak_id in keycloak_ids:
        try:
            user = get_user_with_keycloak_id(keycloak_id)
            if not user:
                current_logger.info(
                    f"No user found for keycloak_id {keycloak_id}",
                    keycloak_id=keycloak_id,
                )
                continue

            if not dry_run:
                send_account_compromised_after_credential_stuffing_email(user.id)
            else:
                current_logger.info(
                    f"Would have sent email to user {user.id}", user_id=user.id
                )
        except Exception as exc:
            current_logger.error(
                f"Error while sending email for keycloak_id {keycloak_id}",
                exc=exc,
                keycloak_id=keycloak_id,
            )
reset_users_credentials
reset_users_credentials(keycloak_ids, dry_run=True)
Source code in components/authentication/internal/commands/compromised_accounts.py
@authentication_commands.command()
@click.argument(
    "keycloak-ids",
    nargs=-1,
    type=UUID,
)
@command_with_dry_run
def reset_users_credentials(
    keycloak_ids: tuple[UUID],
    dry_run: bool = True,
) -> None:
    for keycloak_id in keycloak_ids:
        try:
            reset_user_credentials(keycloak_id, dry_run=dry_run)
        except Exception as exc:
            current_logger.error(
                f"Error while resetting users credentials for {keycloak_id}", exc=exc
            )

fix_email_consistency

fix_be_ca_email_inconsistency
fix_be_ca_email_inconsistency(dry_run=True)
Source code in components/authentication/internal/commands/fix_email_consistency.py
@authentication_commands.command()
@command_with_dry_run
def fix_be_ca_email_inconsistency(dry_run: bool = True) -> None:
    from components.be.internal.models.be_user import BeUser  # noqa: ALN069
    from components.ca.internal.tech.models.ca_user import CaUser  # noqa: ALN069

    inconsistencies = _find_inconsistencies(BeUser, CaUser)
    _fix_inconsistencies(inconsistencies, BeUser, CaUser, dry_run=dry_run)
fix_be_es_email_inconsistency
fix_be_es_email_inconsistency(dry_run=True)
Source code in components/authentication/internal/commands/fix_email_consistency.py
@authentication_commands.command()
@command_with_dry_run
def fix_be_es_email_inconsistency(dry_run: bool = True) -> None:
    from components.be.internal.models.be_user import BeUser  # noqa: ALN069
    from components.es.internal.models.es_user import EsUser  # noqa: ALN069

    inconsistencies = _find_inconsistencies(BeUser, EsUser)
    _fix_inconsistencies(inconsistencies, BeUser, EsUser, dry_run=dry_run)
fix_be_fr_email_inconsistency
fix_be_fr_email_inconsistency(dry_run=True)
Source code in components/authentication/internal/commands/fix_email_consistency.py
@authentication_commands.command()
@command_with_dry_run
def fix_be_fr_email_inconsistency(dry_run: bool = True) -> None:
    from components.be.internal.models.be_user import BeUser  # noqa: ALN069
    from components.fr.internal.models.user import User as FrUser  # noqa: ALN069

    inconsistencies = _find_inconsistencies(BeUser, FrUser)
    _fix_inconsistencies(inconsistencies, BeUser, FrUser, dry_run=dry_run)
fix_ca_es_email_inconsistency
fix_ca_es_email_inconsistency(dry_run=True)
Source code in components/authentication/internal/commands/fix_email_consistency.py
@authentication_commands.command()
@command_with_dry_run
def fix_ca_es_email_inconsistency(dry_run: bool = True) -> None:
    from components.ca.internal.tech.models.ca_user import CaUser  # noqa: ALN069
    from components.es.internal.models.es_user import EsUser  # noqa: ALN069

    inconsistencies = _find_inconsistencies(CaUser, EsUser)
    _fix_inconsistencies(inconsistencies, CaUser, EsUser, dry_run=dry_run)
fix_ca_fr_email_inconsistency
fix_ca_fr_email_inconsistency(dry_run=True)
Source code in components/authentication/internal/commands/fix_email_consistency.py
@authentication_commands.command()
@command_with_dry_run
def fix_ca_fr_email_inconsistency(dry_run: bool = True) -> None:
    from components.ca.internal.tech.models.ca_user import CaUser  # noqa: ALN069
    from components.fr.internal.models.user import User as FrUser  # noqa: ALN069

    inconsistencies = _find_inconsistencies(CaUser, FrUser)
    _fix_inconsistencies(inconsistencies, CaUser, FrUser, dry_run=dry_run)
fix_es_fr_email_inconsistency
fix_es_fr_email_inconsistency(dry_run=True)
Source code in components/authentication/internal/commands/fix_email_consistency.py
@authentication_commands.command()
@command_with_dry_run
def fix_es_fr_email_inconsistency(dry_run: bool = True) -> None:
    from components.es.internal.models.es_user import EsUser  # noqa: ALN069
    from components.fr.internal.models.user import User as FrUser  # noqa: ALN069

    inconsistencies = _find_inconsistencies(EsUser, FrUser)
    _fix_inconsistencies(inconsistencies, EsUser, FrUser, dry_run=dry_run)

fix_inconsistencies

fix_identity_inconsistencies
fix_identity_inconsistencies(
    profile_id,
    dry_run=False,
    first_name=None,
    last_name=None,
    language=None,
)

Fix inconsistent keycloak identity for a given profile ID if possible. We only consider inconsistencies on the first name, last name, and language. If one of those field is inconsistent, the command will: - if the field is given as argument of the command replace this field in the profile table and keycloak by the provided value - if the field is not given as argument of the command, it will try to fix it automatically

Two names can be automatically fixed if they are the same once all capitalization, accents are removed and the string has been stripped.

Source code in components/authentication/internal/commands/fix_inconsistencies.py
@authentication_commands.command()
@command_with_dry_run
@click.argument("profile_id", type=click.UUID, nargs=1)
@click.option(
    "--first-name", type=str, help="First name to override the already set ones"
)
@click.option(
    "--last-name", type=str, help="Last name to override the already set ones"
)
@click.option(
    "--language",
    type=str,
    help="Preferred language to override the already set ones",
)
def fix_identity_inconsistencies(
    profile_id: uuid.UUID,
    dry_run: bool = False,
    first_name: str | None = None,
    last_name: str | None = None,
    language: str | None = None,
) -> None:
    """
    Fix inconsistent keycloak identity for a given profile ID if possible.
    We only consider inconsistencies on the first name, last name, and language.
    If one of those field is inconsistent, the command will:
    - if the field is given as argument of the command replace this field in the profile table and keycloak by the provided value
    - if the field is not given as argument of the command, it will try to fix it automatically

    Two names can be automatically fixed if they are the same once all capitalization, accents are removed and the string has been stripped.
    """
    from shared.helpers.db import current_session

    _fix_inconsistent_identity(
        profile_id=profile_id,
        first_name=first_name,
        last_name=last_name,
        language=language,
        dry_run=dry_run,
    )

    if dry_run:
        click.echo(
            "This is a dry run. No changes have been made to the database or Keycloak."
        )
        current_session.rollback()
    else:
        click.echo("Changes have been applied to the database and Keycloak.")
        current_session.commit()
fix_many_identities_inconsistencies
fix_many_identities_inconsistencies(
    profile_ids, dry_run=False
)

Fix inconsistent keycloak identity for a given profile ID if possible. We only consider inconsistencies on the first name, last name, and language. If one of those field is inconsistent, the command will: - if the field is given as argument of the command replace this field in the profile table and keycloak by the provided value - if the field is not given as argument of the command, it will try to fix it automatically

Two names can be automatically fixed if they are the same once all capitalization, accents are removed and the string has been stripped.

Source code in components/authentication/internal/commands/fix_inconsistencies.py
@authentication_commands.command()
@click.argument("profile_ids", type=click.UUID, nargs=-1)
@command_with_dry_run
def fix_many_identities_inconsistencies(
    profile_ids: tuple[uuid.UUID],
    dry_run: bool = False,
) -> None:
    """
    Fix inconsistent keycloak identity for a given profile ID if possible.
    We only consider inconsistencies on the first name, last name, and language.
    If one of those field is inconsistent, the command will:
    - if the field is given as argument of the command replace this field in the profile table and keycloak by the provided value
    - if the field is not given as argument of the command, it will try to fix it automatically

    Two names can be automatically fixed if they are the same once all capitalization, accents are removed and the string has been stripped.
    """
    from shared.helpers.db import current_session

    to_be_manually_solved = set()
    for profile_id in profile_ids:
        try:
            _fix_inconsistent_identity(profile_id=profile_id, dry_run=dry_run)
        except ValueError:
            to_be_manually_solved.add(profile_id)

    click.echo(
        "The following list couldn't be automatically merged and should be manually fixed:"
    )
    click.echo(to_be_manually_solved)
    if dry_run:
        click.echo(
            "This is a dry run. No changes have been made to the database or Keycloak."
        )
        current_session.rollback()
    else:
        click.echo("Changes have been applied to the database and Keycloak.")
        current_session.commit()

controllers

api

create_api
create_api(app_or_blueprint)
Source code in components/authentication/internal/controllers/api.py
def create_api(app_or_blueprint: CustomBlueprint) -> None:
    from shared.api.custom_api import CustomApi

    api = CustomApi(app_or_blueprint)

    @api.representation("application/json")
    def output_json(data, code, headers=None):  # type: ignore[no-untyped-def]
        """Tells flask-restful to use the flask json serializer instead of json."""
        return make_response(jsonify(data), code, headers)

    from components.authentication.internal.controllers.token import (
        token_endpoint,
    )

    api.add_endpoint(token_endpoint)

blueprint

auth_api_blueprint module-attribute
auth_api_blueprint = CustomBlueprint(
    "auth_api_blueprint", __name__
)

token

TokenController

Bases: BaseController

token_endpoint module-attribute
token_endpoint = Endpoint('tokens')

domain

entities

AuthenticationIdentity dataclass
AuthenticationIdentity(
    *, id=uuid.uuid4(), profile_id, keycloak_id
)

Links a profile to an authentication identity (e.g. keycloak_id).

id class-attribute instance-attribute
id = field(default_factory=uuid4)
keycloak_id instance-attribute
keycloak_id
profile_id instance-attribute
profile_id

events

ChangeIdentityEmailApprovedEvent dataclass
ChangeIdentityEmailApprovedEvent(
    identity_id, old_email, new_email
)

Bases: DomainEvent

identity_id instance-attribute
identity_id
new_email instance-attribute
new_email
old_email instance-attribute
old_email
DomainEvent dataclass
DomainEvent()

Bases: ABC

IdentityCreatedEvent dataclass
IdentityCreatedEvent(
    identity_id,
    profile_id,
    email,
    first_name,
    last_name,
    language,
    is_email_verified,
)

Bases: DomainEvent

email instance-attribute
email
first_name instance-attribute
first_name
identity_id instance-attribute
identity_id
is_email_verified instance-attribute
is_email_verified
language instance-attribute
language
last_name instance-attribute
last_name
profile_id instance-attribute
profile_id
IdentityEmailChangedEvent dataclass
IdentityEmailChangedEvent(
    identity_id, old_email, new_email
)

Bases: DomainEvent

identity_id instance-attribute
identity_id
new_email instance-attribute
new_email
old_email instance-attribute
old_email
IdentityEmailCleared dataclass
IdentityEmailCleared(identity_id, invalidated_email)

Bases: DomainEvent

identity_id instance-attribute
identity_id
invalidated_email instance-attribute
invalidated_email
IdentityFirstNameChanged dataclass
IdentityFirstNameChanged(
    identity_id, old_first_name, new_first_name
)

Bases: DomainEvent

identity_id instance-attribute
identity_id
new_first_name instance-attribute
new_first_name
old_first_name instance-attribute
old_first_name
IdentityLanguageChanged dataclass
IdentityLanguageChanged(
    identity_id, old_language, new_language
)

Bases: DomainEvent

identity_id instance-attribute
identity_id
new_language instance-attribute
new_language
old_language instance-attribute
old_language
IdentityLastNameChanged dataclass
IdentityLastNameChanged(
    identity_id, old_last_name, new_last_name
)

Bases: DomainEvent

identity_id instance-attribute
identity_id
new_last_name instance-attribute
new_last_name
old_last_name instance-attribute
old_last_name
IdentityMergedEvent dataclass
IdentityMergedEvent(
    from_identity_id, to_identity_id, with_password_reset
)

Bases: DomainEvent

from_identity_id instance-attribute
from_identity_id
to_identity_id instance-attribute
to_identity_id
with_password_reset instance-attribute
with_password_reset
MergeIdentityEvent dataclass
MergeIdentityEvent(
    from_identity_id, to_identity_id, with_password_reset
)

Bases: DomainEvent

from_identity_id instance-attribute
from_identity_id
to_identity_id instance-attribute
to_identity_id
with_password_reset instance-attribute
with_password_reset
PasswordResetEmailSent dataclass
PasswordResetEmailSent(
    identity_id, email, client_id, redirect_uri
)

Bases: DomainEvent

client_id instance-attribute
client_id
email instance-attribute
email
identity_id instance-attribute
identity_id
redirect_uri instance-attribute
redirect_uri

infrastructure

double_write_repository

DoubleWriteAuthenticationRepository
DoubleWriteAuthenticationRepository(
    session=None, app_name=None
)

Bases: BaseAuthenticationRepository

Source code in components/authentication/internal/infrastructure/double_write_repository.py
def __init__(
    self,
    session: Session | None = None,
    app_name: AppName | None = None,
) -> None:
    self.session: Session = session or current_session
    self.retro_compatibility_repository = (
        RetroCompatibilityAuthenticationRepository(
            session=self.session,
            app_name=app_name,
        )
    )
    self.authentication_repository = AuthenticationRepository(
        session=self.session,
    )
authentication_repository instance-attribute
authentication_repository = AuthenticationRepository(
    session=session
)
delete
delete(authentication_identity)
Source code in components/authentication/internal/infrastructure/double_write_repository.py
def delete(self, authentication_identity: AuthenticationIdentity) -> None:
    self.authentication_repository.delete(authentication_identity)
    self.retro_compatibility_repository.delete(authentication_identity)
get_by_id
get_by_id(authentication_identity_id)
Source code in components/authentication/internal/infrastructure/double_write_repository.py
def get_by_id(
    self, authentication_identity_id: uuid.UUID
) -> AuthenticationIdentity | None:
    return self.authentication_repository.get_by_id(authentication_identity_id)
get_by_keycloak_id
get_by_keycloak_id(keycloak_id)
Source code in components/authentication/internal/infrastructure/double_write_repository.py
def get_by_keycloak_id(
    self, keycloak_id: uuid.UUID
) -> AuthenticationIdentity | None:
    if use_authentication_table_for_keycloak_id():
        return self.authentication_repository.get_by_keycloak_id(
            keycloak_id=keycloak_id
        )
    else:
        retro_compat_value = self.retro_compatibility_repository.get_by_keycloak_id(
            keycloak_id=keycloak_id
        )
        if retro_compat_value is None:
            return None
        authentication_repo_value = (
            self.authentication_repository.get_by_keycloak_id(
                keycloak_id=keycloak_id
            )
        )
        return AuthenticationIdentity(
            id=authentication_repo_value.id
            if authentication_repo_value
            else uuid.uuid4(),  # if we already have a real value, we should use it's id to avoid collisions
            profile_id=retro_compat_value.profile_id,
            keycloak_id=retro_compat_value.keycloak_id,
        )
get_by_profile_id
get_by_profile_id(profile_id)
Source code in components/authentication/internal/infrastructure/double_write_repository.py
def get_by_profile_id(self, profile_id: uuid.UUID) -> AuthenticationIdentity | None:
    if use_authentication_table_for_keycloak_id():
        return self.authentication_repository.get_by_profile_id(
            profile_id=profile_id
        )
    else:
        retro_compat_value = self.retro_compatibility_repository.get_by_profile_id(
            profile_id=profile_id
        )
        if retro_compat_value is None:
            return None
        authentication_repo_value = (
            self.authentication_repository.get_by_profile_id(profile_id=profile_id)
        )
        return AuthenticationIdentity(
            id=authentication_repo_value.id
            if authentication_repo_value
            else uuid.uuid4(),  # if we already have a real value, we should use it's id to avoid collisions
            profile_id=retro_compat_value.profile_id,
            keycloak_id=retro_compat_value.keycloak_id,
        )
retro_compatibility_repository instance-attribute
retro_compatibility_repository = (
    RetroCompatibilityAuthenticationRepository(
        session=session, app_name=app_name
    )
)
save
save(authentication_identity)
Source code in components/authentication/internal/infrastructure/double_write_repository.py
def save(self, authentication_identity: AuthenticationIdentity) -> None:
    self.authentication_repository.save(authentication_identity)
    self.retro_compatibility_repository.save(authentication_identity)
session instance-attribute
session = session or current_session

event_dispatcher

AlanMessagingEventDispatcher
AlanMessagingEventDispatcher(
    producer=None, mappers=MAPPERS
)

Bases: EventDispatcher

EventDispatcher implementation using shared.messaging.

Source code in components/authentication/internal/infrastructure/event_dispatcher.py
def __init__(
    self,
    producer: Producer | None = None,
    mappers: dict[type[DomainEvent], Callable[..., Message]] = MAPPERS,
) -> None:
    self.producer: Producer = producer or Producer(get_message_broker())
    self.mappers: dict[type[DomainEvent], Callable[..., Message]] = mappers
dispatch
dispatch(domain_event)
Source code in components/authentication/internal/infrastructure/event_dispatcher.py
@override
def dispatch(self, domain_event: DomainEvent) -> None:
    if type(domain_event) not in self.mappers:
        return

    message = self.mappers[type(domain_event)](domain_event)

    # publish is already guaranteed to not raise exceptions, no need to catch them
    self.producer.publish(message)
mappers instance-attribute
mappers = mappers
producer instance-attribute
producer = producer or Producer(get_message_broker())
EventDispatcher

Bases: ABC

Interface for dispatching domain events as integration events.

dispatch abstractmethod
dispatch(domain_event)

Dispatch a domain event as an integration event.

WARNING: this method should not raise exceptions.

Source code in components/authentication/internal/infrastructure/event_dispatcher.py
@abstractmethod
def dispatch(self, domain_event: DomainEvent) -> None:
    """
    Dispatch a domain event as an integration event.

    WARNING: this method should not raise exceptions.
    """
    pass
MAPPERS module-attribute
MAPPERS = {
    IdentityCreatedEvent: identity_created_mapper,
    IdentityEmailChangedEvent: email_changed_mapper,
    PasswordResetEmailSent: password_reset_email_sent_mapper,
    IdentityMergedEvent: identity_merged_mapper,
    IdentityEmailCleared: cleared_identity_email_mapper,
}
TestEventDispatcher
TestEventDispatcher()

Bases: EventDispatcher

Event dispatcher for testing purposes. It will store all the dispatched events in a list.

Source code in components/authentication/internal/infrastructure/event_dispatcher.py
def __init__(self) -> None:
    self.dispatched_events: list[DomainEvent] = []
dispatch
dispatch(domain_event)
Source code in components/authentication/internal/infrastructure/event_dispatcher.py
@override
def dispatch(self, domain_event: DomainEvent) -> None:
    self.dispatched_events.append(domain_event)
dispatched_events instance-attribute
dispatched_events = []
cleared_identity_email_mapper
cleared_identity_email_mapper(event)
Source code in components/authentication/internal/infrastructure/event_dispatcher.py
def cleared_identity_email_mapper(
    event: IdentityEmailClearedDomainEvent,
) -> IdentityEmailCleared:
    return IdentityEmailCleared(
        identity_id=event.identity_id,
        invalidated_email=event.invalidated_email,
    )
email_changed_mapper
email_changed_mapper(event)
Source code in components/authentication/internal/infrastructure/event_dispatcher.py
def email_changed_mapper(
    event: IdentityEmailChangedDomainEvent,
) -> IdentityEmailChangedEvent:
    return IdentityEmailChangedEvent(
        identity_id=event.identity_id,
        old_email=event.old_email,
        new_email=event.new_email,
    )
identity_created_mapper
identity_created_mapper(event)
Source code in components/authentication/internal/infrastructure/event_dispatcher.py
def identity_created_mapper(event: IdentityCreatedDomainEvent) -> IdentityCreatedEvent:
    return IdentityCreatedEvent(
        identity_id=event.identity_id,
        email=event.email,
        first_name=event.first_name,
        last_name=event.last_name,
        language=event.language,
        profile_id=event.profile_id,
    )
identity_merged_mapper
identity_merged_mapper(event)
Source code in components/authentication/internal/infrastructure/event_dispatcher.py
def identity_merged_mapper(
    event: IdentityMergedDomainEvent,
) -> IdentityMergedEvent:
    return IdentityMergedEvent(
        from_identity_id=event.from_identity_id,
        to_identity_id=event.to_identity_id,
    )
password_reset_email_sent_mapper
password_reset_email_sent_mapper(event)
Source code in components/authentication/internal/infrastructure/event_dispatcher.py
def password_reset_email_sent_mapper(
    event: PasswordResetEmailSentDomainEvent,
) -> PasswordResetEmailSentEvent:
    return PasswordResetEmailSentEvent(
        identity_id=event.identity_id,
        email=event.email,
        client_id=event.client_id,
        redirect_uri=event.redirect_uri,
    )

flask_auth

create_mfa
create_mfa()
Source code in components/authentication/internal/infrastructure/flask_auth.py
def create_mfa() -> MFA:
    notifiers: list[Notifier] = [
        PrintNotifier(),
        EmailNotifier(
            get_user_cls=get_user_cls,
        ),
    ]

    if is_development_mode():
        notifiers.append(IosSimulatorJsonNotifier())

    return MFA(
        storage=create_mfa_storage(),  # type: ignore[no-untyped-call]
        notifiers=notifiers,
        _get_user_cls=get_user_cls,
    )
data_auth module-attribute
data_auth = LoginFormAuth(
    verify_password_callback=_verify_password
)
refresh_auth module-attribute
refresh_auth = CookieTokenAuth(
    verify_token_callback=_verify_refresh_token,
    cookie_name="refresh_token",
)
refresh_mobile_auth module-attribute
refresh_mobile_auth = HTTPTokenAuth(
    verify_token_callback=_verify_refresh_token,
    scheme="Refresh",
)

identity_provider

AuthIdentity
__eq__
__eq__(other)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def __eq__(self, other):  # type: ignore[no-untyped-def]
    return isinstance(other, type(self)) and self.__key() == other.__key()  # type: ignore[no-untyped-call]
__hash__
__hash__()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def __hash__(self):  # type: ignore[no-untyped-def]
    return hash(self.__key())  # type: ignore[no-untyped-call]
check_password
check_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def check_password(self, prehashed_password: str) -> bool:
    raise NotImplementedError
clear_password
clear_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def clear_password(self) -> None:
    raise NotImplementedError
delete
delete()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def delete(self) -> None:
    raise NotImplementedError
email property
email
email_verified property
email_verified
first_name property
first_name
has_password
has_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def has_password(self) -> bool:
    raise NotImplementedError
id property
id
language property
language
last_name property
last_name
logout_all_sessions
logout_all_sessions()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def logout_all_sessions(self) -> None:
    raise NotImplementedError
mfa_enabled property
mfa_enabled
mfa_required property
mfa_required
set_email
set_email(email, is_email_verified)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_email(  # type: ignore[no-untyped-def]
    self, email: str | None, is_email_verified: bool
):  # TODO: Remove Optional and don't allow None values
    raise NotImplementedError
set_first_and_last_names
set_first_and_last_names(
    first_name, last_name, refresh_keycloak_user=True
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_and_last_names(  # type: ignore[no-untyped-def]
    self,
    first_name: str | None,
    last_name: str | None,
    refresh_keycloak_user: bool = True,
):
    raise NotImplementedError
set_first_name
set_first_name(first_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_name(self, first_name: str | None):  # type: ignore[no-untyped-def]
    raise NotImplementedError
set_language
set_language(language)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_language(self, language: Lang):  # type: ignore[no-untyped-def]
    raise NotImplementedError
set_last_name
set_last_name(last_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_last_name(self, last_name: str | None):  # type: ignore[no-untyped-def]
    raise NotImplementedError
set_mfa_enabled
set_mfa_enabled(enabled)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_enabled(self, enabled: bool) -> None:
    raise NotImplementedError
set_mfa_required
set_mfa_required(required)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_required(self, required: bool) -> None:
    raise NotImplementedError
set_password
set_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_password(self, prehashed_password: str):  # type: ignore[no-untyped-def]
    raise NotImplementedError
update_pending_deletion
update_pending_deletion(pending_deletion)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def update_pending_deletion(self, pending_deletion: bool) -> None:
    raise NotImplementedError
DevIdentity
DevIdentity(
    id,
    email,
    language,
    first_name,
    last_name,
    email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)

Bases: AuthIdentity

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(
    self,
    id: UUID,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> None:
    self._id = id
    self._email: str = email
    self._email_verified = email_verified
    self._first_name: str | None = first_name
    self._last_name: str | None = last_name
    self._mfa_enabled = mfa_enabled
    self._mfa_required = mfa_required
    self._language = language
    self._pending_deletion = False
check_password
check_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def check_password(self, prehashed_password: str) -> bool:
    return prehashed_password == PasswordMixin.prehash_password("azerty")
clear_password
clear_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def clear_password(self) -> None:
    warn_because_not_available("DevIdentity.clear_password")
delete
delete()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def delete(self) -> None:
    warn_because_not_available("DevIdentity.delete")
email property
email
email_verified property
email_verified
first_name property
first_name
has_password
has_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def has_password(self) -> bool:
    return True
id property
id
language property
language
last_name property
last_name
logout_all_sessions
logout_all_sessions()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def logout_all_sessions(self) -> None:
    pass
mfa_enabled property
mfa_enabled
mfa_required property
mfa_required
set_email
set_email(email, is_email_verified=True)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_email(self, email: str | None, is_email_verified: bool = True) -> None:
    if email:
        self._email = email
        self._email_verified = is_email_verified
set_first_and_last_names
set_first_and_last_names(
    first_name, last_name, refresh_keycloak_user=True
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_and_last_names(
    self,
    first_name: str | None,
    last_name: str | None,
    refresh_keycloak_user: bool = True,  # noqa: ARG002
) -> None:
    self._first_name = first_name or ""
    self._last_name = last_name or ""
set_first_name
set_first_name(first_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_name(self, first_name: str | None) -> None:
    if first_name:
        self._first_name = first_name
set_language
set_language(language)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_language(self, language: Lang):  # type: ignore[no-untyped-def]
    warn_because_not_available("DevIdentity.set_language")
    self._language = language
set_last_name
set_last_name(last_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_last_name(self, last_name: str | None) -> None:
    if last_name:
        self._last_name = last_name
set_mfa_enabled
set_mfa_enabled(enabled)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_enabled(self, enabled) -> None:  # type: ignore[no-untyped-def]
    warn_because_not_available("DevIdentity.set_mfa_enabled")
    self._mfa_enabled = enabled
set_mfa_required
set_mfa_required(required)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_required(self, required: bool) -> None:
    warn_because_not_available("DevIdentity.set_mfa_required")
    self._mfa_required = required
set_password
set_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_password(self, prehashed_password: str) -> None:  # noqa: ARG002
    warn_because_not_available("DevIdentity.set_password")
update_pending_deletion
update_pending_deletion(pending_deletion)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def update_pending_deletion(self, pending_deletion: bool) -> None:
    self._pending_deletion = pending_deletion
DevIdentityProvider
DevIdentityProvider()

Bases: IdentityProvider

A stubbed identity provider for the dev environment. Cases to test: - Standard login with email: http://localhost:4001/login ⧉ - Company creation: http://localhost:4001/fr-company-discovery/share?contractCoverOption=coverChildren&ccnCode=1486&participation=50&healthProduct=green&choosePrevoyance=true&hasLegacyHealthContract=true&hasLegacyPrevoyanceContract=true ⧉ - Fixture: http://localhost:8001/admin_tools/test_data_generator/new?fixture=LSBjb21wYW55Og%3D%3D ⧉ - User auth login: http://localhost:8002/auth/login?next=%2Foauth2%2Fauthorize%3Fresponse_type%3Dcode%26client_id%3Dmind_dev%26redirect_uri%3Djourapp%3A%252F%252Fauthcallback%252Falan%26scope%3Dopenid%2520email%26state%3D9dc9c12d-0d3b-44f1-af8f-c3b8e829b1eb ⧉ - Freelancer signup: http://localhost:4001/freelancer-signup ⧉

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(self) -> None:
    self._cache: dict[UUID, DevIdentity] = {}
    super().__init__()
create_new_identity
create_new_identity(
    email,
    language,
    first_name,
    last_name,
    is_email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def create_new_identity(
    self,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    is_email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> DevIdentity:
    identity = DevIdentity(
        id=uuid4(),
        email=email,
        language=language,
        first_name=first_name,
        last_name=last_name,
        email_verified=is_email_verified,
        mfa_enabled=mfa_enabled,
        mfa_required=mfa_required,
    )
    self._cache[identity.id] = identity
    return identity
exchange_token_for_user
exchange_token_for_user(target_client_id, email)

This provides a fake exchanged token for the scenario where the backend issues Keycloak tokens on behalf of the user.

Attributes:

Name Type Description
target_client_id str

The target client ID to exchange token with

email str

Email of the user to exchange token for

Returns:

Type Description
dict

Dictionary containing access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def exchange_token_for_user(
    self,
    target_client_id: str,  # noqa: ARG002
    email: str,
) -> dict:  # type: ignore[type-arg]
    """
    This provides a fake exchanged token for the scenario where the backend issues Keycloak tokens on behalf of the user.

    Attributes:
        target_client_id (str): The target client ID to exchange token with
        email (str): Email of the user to exchange token for

    Returns:
        Dictionary containing access and refresh tokens
    """
    return self._generate_fake_tokens(email, self._get_dev_prehashed_password())
find_identity
find_identity(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity(self, email: str | None) -> DevIdentity | None:
    if email is None:
        return None

    stored_identity = _find_dev_identity(email=email)
    if stored_identity is not None:
        self._cache[stored_identity.id] = stored_identity
    ids = {stored_identity.id} if stored_identity else set()
    for id, identity in self._cache.items():
        if identity.email == email:
            ids.add(identity.id)
            self._cache[id] = identity

    if len(ids) > 1:
        raise MultipleResultsFound(
            "Found multiple user matching authentication id or email"
        )
    if len(ids) == 1:
        return self._cache[ids.pop()]
    return None
find_identity_id_from_token
find_identity_id_from_token(token)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity_id_from_token(self, token: str) -> UUID | None:
    # Sent by a local mocked front end
    # frontend/shared/service-worker-mocks/handlers/keycloak-handlers.ts
    access_token = jwt.decode(
        token,
        options={"verify_signature": False},
        algorithms=[],
    )

    identity = self.find_identity(email=access_token["email"])
    return identity.id if identity else None
generate_password_reset_email
generate_password_reset_email(
    email, client_id, redirect_uri
)

Generate a password reset email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a password reset

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing password reset

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_password_reset_email(
    self,
    email: str,  # noqa: ARG002
    client_id: str,  # noqa: ARG002
    redirect_uri: str | None,  # noqa: ARG002
) -> None:
    """
    Generate a password reset email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a password reset
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing password reset
    """
    warn_because_not_available("DevIdentityProvider.generate_password_reset_email")
    return
generate_verification_email
generate_verification_email(
    identity_id, client_id, redirect_uri
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_verification_email(
    self,
    identity_id: UUID,  # noqa: ARG002
    client_id: str,  # noqa: ARG002
    redirect_uri: str | None,  # noqa: ARG002
) -> None:
    warn_because_not_available("DevIdentityProvider.generate_verification_email")
    return
get_identity
get_identity(authentication_id)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity(self, authentication_id: UUID | None) -> DevIdentity | None:
    if authentication_id is None:
        return None
    if authentication_id in self._cache:
        return self._cache[authentication_id]
    result = _find_dev_identity(id=authentication_id)
    if result:
        self._cache[authentication_id] = result
        return result
    return None
get_identity_id
get_identity_id(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_id(self, email: str) -> UUID | None:
    identity = self.find_identity(email)
    return identity.id if identity else None
healthcheck
healthcheck()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def healthcheck(self) -> bool:
    return True
refresh_exchanged_token
refresh_exchanged_token(target_client_id, refresh_token)

This provides a fake exchanged refresh tokens for the scenario where the backend issues Keycloak tokens on behalf of the user.

Attributes:

Name Type Description
target_client_id str

The target client ID to refresh token with

refresh_token str

The refresh token to use

Returns:

Type Description
dict

Dictionary containing refreshed access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def refresh_exchanged_token(
    self,
    target_client_id: str,  # noqa: ARG002
    refresh_token: str,
) -> dict:  # type: ignore[type-arg]
    """
    This provides a fake exchanged refresh tokens for the scenario where the backend issues Keycloak tokens on behalf of the user.

    Attributes:
        target_client_id (str): The target client ID to refresh token with
        refresh_token (str): The refresh token to use

    Returns:
        Dictionary containing refreshed access and refresh tokens
    """
    # Get email from refresh token as we know it's a fake token
    email = jwt.decode(
        refresh_token, options={"verify_signature": False}, algorithms=[]
    )["email"]
    return self._generate_fake_tokens(email, self._get_dev_prehashed_password())
IdentityProvider
create_new_identity
create_new_identity(
    email,
    language,
    first_name,
    last_name,
    is_email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def create_new_identity(
    self,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    is_email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> AuthIdentity:
    raise NotImplementedError
exchange_token_for_user
exchange_token_for_user(target_client_id, email)

Exchange token for a given user using service account

Attributes:

Name Type Description
target_client_id str

The target client ID to exchange token with

email str

Email of the user to exchange token for

Returns:

Type Description
dict

Dictionary containing access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def exchange_token_for_user(self, target_client_id: str, email: str) -> dict:  # type: ignore[type-arg]
    """
    Exchange token for a given user using service account

    Attributes:
        target_client_id (str): The target client ID to exchange token with
        email (str): Email of the user to exchange token for

    Returns:
        Dictionary containing access and refresh tokens
    """
    raise NotImplementedError
find_identity
find_identity(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity(self, email: str | None) -> AuthIdentity | None:
    raise NotImplementedError
find_identity_id_from_token
find_identity_id_from_token(token)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity_id_from_token(self, token: str) -> UUID | None:
    raise NotImplementedError
generate_password_reset_email
generate_password_reset_email(
    email, client_id, redirect_uri
)

Generate a password reset email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a password reset

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing password reset

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_password_reset_email(
    self, email: str, client_id: str, redirect_uri: str | None
) -> UUID | None:
    """
    Generate a password reset email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a password reset
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing password reset
    """
    raise NotImplementedError
generate_verification_email
generate_verification_email(
    identity_id, client_id, redirect_uri
)

Generate a verification email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a verification

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing verification

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_verification_email(
    self, identity_id: UUID, client_id: str, redirect_uri: str | None
) -> None:
    """
    Generate a verification email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a verification
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing verification
    """
    raise NotImplementedError
get_identity
get_identity(authentication_id)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity(self, authentication_id: UUID | None) -> AuthIdentity | None:
    raise NotImplementedError
get_identity_id
get_identity_id(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_id(self, email: str) -> UUID | None:
    raise NotImplementedError
healthcheck
healthcheck()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def healthcheck(self) -> bool:
    raise NotImplementedError
refresh_exchanged_token
refresh_exchanged_token(target_client_id, refresh_token)

Refresh an exchanged token using service account

Attributes:

Name Type Description
target_client_id str

The target client ID to refresh token with

refresh_token str

The refresh token to use

Returns:

Type Description
dict

Dictionary containing refreshed access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def refresh_exchanged_token(
    self, target_client_id: str, refresh_token: str
) -> dict:  # type: ignore[type-arg]
    """
    Refresh an exchanged token using service account

    Attributes:
        target_client_id (str): The target client ID to refresh token with
        refresh_token (str): The refresh token to use

    Returns:
        Dictionary containing refreshed access and refresh tokens
    """
    raise NotImplementedError
IdentityProviderType

Bases: Enum

keycloak class-attribute instance-attribute
keycloak = 1
stubbed class-attribute instance-attribute
stubbed = 0
KeycloakIdentity
KeycloakIdentity(id, keycloak_user, admin_client)

Bases: AuthIdentity

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(  # type: ignore[no-untyped-def]
    self,
    id: str,
    keycloak_user,
    admin_client: KeycloakAdmin,
) -> None:
    self._id = id
    self._keycloak_user = keycloak_user
    self.admin_client = admin_client
    keycloak_host = current_config.get("KEYCLOAK_HOST")
    keycloak_realm = current_config.get("KEYCLOAK_REALM")
    self.client = OAuth2Session(
        client_id=current_config.get("KEYCLOAK_CLIENT_ID"),
        token_endpoint=f"{keycloak_host}/realms/{keycloak_realm}/protocol/openid-connect/token",
    )
admin_client instance-attribute
admin_client = admin_client
check_password
check_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def check_password(self, prehashed_password: str) -> bool:
    result = self._check_password(prehashed_password)

    if not result and not self.has_password():  # type: ignore[no-untyped-call]
        current_logger.info(
            f"Prehashed password is not set for keycloak identity {self.id}"
        )
        raise BaseErrorCode.prehashed_password_not_set(str(self.id))

    return result
clear_password
clear_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def clear_password(self) -> None:
    credentials = self.admin_client.get_credentials(str(self.id))

    for credential in credentials:
        self.admin_client.delete_credential(
            user_id=str(self.id), credential_id=credential["id"]
        )

    current_logger.info(f"Password of user {self.id} cleared in Keycloak")
client instance-attribute
client = OAuth2Session(
    client_id=get("KEYCLOAK_CLIENT_ID"),
    token_endpoint=f"{keycloak_host}/realms/{keycloak_realm}/protocol/openid-connect/token",
)
delete
delete()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def delete(self) -> None:
    # Commit or rollback changes depending on the current transaction result.
    _on_commit(current_session, lambda: self._delete_user())
    _on_rollback(current_session, lambda: self.update_pending_deletion(False))

    self.update_pending_deletion(True)
    current_logger.info(
        f"User identity {self.id} will be deleted in Keycloak after commit"
    )
email property
email
email_verified property
email_verified
first_name property
first_name
has_password
has_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def has_password(self):  # type: ignore[no-untyped-def]
    credentials_metadata = _get_user_credential_metadata_cached(
        self.admin_client, str(self.id)
    )
    has_password = any(c["type"] == "password" for c in credentials_metadata)
    return has_password
id property
id
language property
language
last_name property
last_name
logout_all_sessions
logout_all_sessions()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def logout_all_sessions(self) -> None:
    sessions_count = len(self.admin_client.get_sessions(str(self.id)))
    self.admin_client.user_logout(str(self.id))
    current_logger.info(
        f"User logout for user {self.id} - cleared {sessions_count} sessions"
    )
mfa_enabled property
mfa_enabled
mfa_required property
mfa_required
set_email
set_email(email, is_email_verified)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_email(self, email: str | None, is_email_verified: bool):  # type: ignore[no-untyped-def]
    if email is None:
        raise ValueError("Email cannot be None in Keycloak")

    self._update_user(
        payload={
            "email": email,
            "username": email,
            "emailVerified": is_email_verified,
        },
    )

    current_logger.info(f"Email of user {self.id} updated in Keycloak")
set_first_and_last_names
set_first_and_last_names(
    first_name, last_name, refresh_keycloak_user=True
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_and_last_names(
    self,
    first_name: str | None,
    last_name: str | None,
    refresh_keycloak_user: bool = True,
) -> None:
    first_name = first_name or ""
    last_name = last_name or ""

    self._update_user(
        payload={
            "firstName": first_name,
            "lastName": last_name,
        },
        refresh_keycloak_user=refresh_keycloak_user,
    )

    current_logger.info(
        f"First name and last name updated in Keycloak for user {self.id}"
    )
set_first_name
set_first_name(first_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_name(self, first_name: str | None) -> None:
    first_name = first_name or ""

    self._update_user(
        payload={
            "firstName": first_name,
        },
    )

    current_logger.info(f"First name of user {self.id} updated in Keycloak")
set_language
set_language(language)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_language(self, language: Lang) -> None:
    self._update_user_attribute(name="locale", value=language)

    current_logger.info(
        f"Language of user {self.id} updated to {language} in Keycloak"
    )
set_last_name
set_last_name(last_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_last_name(self, last_name: str | None) -> None:
    last_name = last_name or ""

    self._update_user(
        payload={
            "lastName": last_name,
        },
    )

    current_logger.info(f"Last name of user {self.id} updated in Keycloak")
set_mfa_enabled
set_mfa_enabled(enabled)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_enabled(self, enabled: bool) -> None:
    self._update_user_attribute(
        name="mfaEnabled",
        value="yes" if enabled else "no",
    )

    current_logger.info(
        f"MFA enabled flag of user {self.id} updated to {enabled} in Keycloak"
    )
set_mfa_required
set_mfa_required(required)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_required(self, required: bool) -> None:
    self._update_user_attribute(
        name="mfaRequired",
        value="yes" if required else "no",
    )

    current_logger.info(
        f"MFA required flag of user {self.id} updated to {required} in Keycloak"
    )
set_password
set_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_password(self, prehashed_password: str) -> None:
    self.admin_client.set_user_password(
        user_id=str(self.id), password=prehashed_password, temporary=False
    )

    current_logger.info(f"Password of user {self.id} updated in Keycloak")
update_pending_deletion
update_pending_deletion(pending_deletion)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def update_pending_deletion(self, pending_deletion: bool) -> None:
    self._update_user_attribute(
        name="pendingDeletion",
        value="yes" if pending_deletion else "no",
    )
KeycloakIdentityProvider
KeycloakIdentityProvider()

Bases: IdentityProvider

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(self) -> None:
    self.keycloak_admin_client = get_admin_client()
    super().__init__()
create_new_identity
create_new_identity(
    email,
    language,
    first_name,
    last_name,
    is_email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def create_new_identity(
    self,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    is_email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> KeycloakIdentity:
    keycloak_user_id: str = self._create_keycloak_user(
        email=email,
        language=language,
        first_name=first_name,
        last_name=last_name,
        email_verified=is_email_verified,
        mfa_enabled=mfa_enabled,
        mfa_required=mfa_required,
    )

    current_logger.info(
        f"Created new user in Keycloak with email {email} and id {keycloak_user_id}"
    )

    keycloak_user = _get_user_cached(self.keycloak_admin_client, keycloak_user_id)
    return KeycloakIdentity(
        id=keycloak_user["id"],
        keycloak_user=keycloak_user,
        admin_client=self.keycloak_admin_client,
    )
exchange_token_for_user
exchange_token_for_user(target_client_id, email)

Exchange token for a user using Keycloak service account

Attributes:

Name Type Description
target_client_id str

The target client ID to exchange token with

email str

Email of the user to exchange token for

Returns:

Type Description
dict

Dictionary containing access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def exchange_token_for_user(self, target_client_id: str, email: str) -> dict:  # type: ignore[type-arg]
    """
    Exchange token for a user using Keycloak service account

    Attributes:
        target_client_id (str): The target client ID to exchange token with
        email (str): Email of the user to exchange token for

    Returns:
        Dictionary containing access and refresh tokens
    """
    keycloak_openid = self.keycloak_admin_client.connection.keycloak_openid
    try:
        # Get service account token
        service_token = keycloak_openid.token(grant_type=["client_credentials"])

        # Perform token exchange
        exchanged_token = keycloak_openid.token(
            subject_token=service_token["access_token"],
            grant_type="urn:ietf:params:oauth:grant-type:token-exchange",
            audience=target_client_id,
            requested_subject=email,
        )

        return {
            "accessToken": exchanged_token["access_token"],
            "refreshToken": exchanged_token["refresh_token"],
        }

    except KeycloakAuthenticationError as e:
        current_logger.error(
            f"Keycloak token exchange authentication error: {str(e)}"
        )
        raise BaseErrorCode.token_error(error="Failed to exchange token")
    except KeycloakOperationError as e:
        current_logger.error(f"Keycloak token exchange operation error: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Failed to perform token exchange operation"
        )
    except Exception as e:
        current_logger.error(f"Unexpected error during token exchange: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Unexpected error during token exchange"
        )
find_identity
find_identity(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity(self, email: str | None) -> KeycloakIdentity | None:
    if not email:
        return None

    keycloak_user = self._find_keycloak_user_by_email(
        email=email,
    )

    if keycloak_user is None:
        return None

    return KeycloakIdentity(
        id=keycloak_user["id"],
        keycloak_user=keycloak_user,
        admin_client=self.keycloak_admin_client,
    )
find_identity_id_from_token
find_identity_id_from_token(token)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity_id_from_token(self, token: str) -> UUID | None:
    keycloak_user = get_user_from_access_token(token)
    return keycloak_user.id if keycloak_user else None
generate_password_reset_email
generate_password_reset_email(
    email, client_id, redirect_uri
)

Generate a password reset email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a password reset

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing password reset

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_password_reset_email(
    self, email: str, client_id: str, redirect_uri: str | None
) -> UUID | None:
    """
    Generate a password reset email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a password reset
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing password reset
    """
    user_id = self.get_identity_id(email=email)
    if user_id:
        self.keycloak_admin_client.send_update_account(
            user_id=user_id,
            payload=["UPDATE_PASSWORD"],
            client_id=client_id,
            redirect_uri=redirect_uri,
            lifespan=1800,  # 30 min
        )
        current_logger.info(
            f"Password reset email generated for {email} with client {client_id} & redirect URI {redirect_uri}"
        )

    return user_id
generate_verification_email
generate_verification_email(
    identity_id, client_id, redirect_uri
)

Generate a verification email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a verification

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing verification

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_verification_email(
    self, identity_id: UUID, client_id: str, redirect_uri: str | None
) -> None:
    """
    Generate a verification email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a verification
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing verification
    """
    self.keycloak_admin_client.send_update_account(
        user_id=identity_id,
        payload=["VERIFY_EMAIL"],
        client_id=client_id,
        redirect_uri=redirect_uri,
        lifespan=1800,  # 30 min
    )
    current_logger.info(
        f"Verification email generated for identity id {identity_id} with client {client_id} & redirect URI {redirect_uri}"
    )
get_identity
get_identity(authentication_id)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity(self, authentication_id: UUID | None) -> KeycloakIdentity | None:
    if not authentication_id:
        return None

    try:
        keycloak_user = _get_user_cached(
            self.keycloak_admin_client, str(authentication_id)
        )
    except KeycloakBusinessError as e:
        if e.alancode == HTTPStatus.NOT_FOUND:
            return None

        raise

    # Check if user is not marked as "deleted".
    if self._is_pending_deletion(keycloak_user):
        return None

    return KeycloakIdentity(
        id=keycloak_user["id"],
        keycloak_user=keycloak_user,
        admin_client=self.keycloak_admin_client,
    )
get_identity_id
get_identity_id(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_id(self, email) -> UUID | None:  # type: ignore[no-untyped-def]
    if email is None:
        return None

    keycloak_user = self._find_by_email(
        email=email,
    )

    if not keycloak_user:
        current_logger.debug(f"No user found in Keycloak with email {email}")
        return None

    return UUID(keycloak_user["id"])
healthcheck
healthcheck()

This healthcheck is checking Keycloak service account has the right permissions so eventually making sure Keycloak is working as expected

Source code in components/authentication/internal/infrastructure/identity_provider.py
def healthcheck(self) -> bool:
    """
    This healthcheck is checking Keycloak service account has the right permissions
    so eventually making sure Keycloak is working as expected
    """
    app_name = get_current_app_name()

    if app_name == AppName.EU_TOOLS:
        # We skip eu-tools as we don't need to check the service account permissions on this one
        # Cf. https://alanhealth.slack.com/archives/C06P9PMR011/p1732783502638769?thread_ts=1732774294.726989&cid=C06P9PMR011
        current_logger.info(
            "No healthcheck for eu-tools app for now, skipping and assuming auth healthcheck is ok"
        )
        return True

    keycloak_admin_client_id = current_config.get("KEYCLOAK_ADMIN_CLIENT_ID")

    if not keycloak_admin_client_id:
        current_logger.critical(
            "KEYCLOAK_ADMIN_CLIENT_ID env var is not defined or is empty but required for Keycloak interactions from the backend"
        )
        return False

    # Keycloak service accounts are always under the form of "service-account-" + client ID
    # Cf. https://www.keycloak.org/docs/latest/server_admin/#adding-or-removing-roles-for-clients-service-account
    keycloak_service_account_username = (
        "service-account-" + keycloak_admin_client_id
    )
    keycloak_service_account_id = self.keycloak_admin_client.get_user_id(
        keycloak_service_account_username
    )

    if keycloak_service_account_id:
        all_roles = self.keycloak_admin_client.get_all_roles_of_user(
            keycloak_service_account_id
        )

        required_roles = {"manage-users", "view-users"}

        role_names = [
            mapping["name"]
            for client_mapping in all_roles.get("clientMappings", {}).values()
            for mapping in client_mapping.get("mappings", [])
        ]

        if required_roles.issubset(role_names):
            return True

        current_logger.critical(
            f"One of the required roles {required_roles} might be missing for Keycloak client service account: {keycloak_service_account_username}"
        )
    return False
keycloak_admin_client instance-attribute
keycloak_admin_client = get_admin_client()
refresh_exchanged_token
refresh_exchanged_token(target_client_id, refresh_token)

Refresh an exchanged token using Keycloak service account

Attributes:

Name Type Description
target_client_id str

The target client ID to refresh token with

refresh_token str

The refresh token to use

Returns:

Type Description
dict

Dictionary containing refreshed access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def refresh_exchanged_token(
    self, target_client_id: str, refresh_token: str
) -> dict:  # type: ignore[type-arg]
    """
    Refresh an exchanged token using Keycloak service account

    Attributes:
        target_client_id (str): The target client ID to refresh token with
        refresh_token (str): The refresh token to use

    Returns:
        Dictionary containing refreshed access and refresh tokens
    """
    keycloak_openid = self.keycloak_admin_client.connection.keycloak_openid
    try:
        refreshed_token = keycloak_openid.token(
            grant_type="refresh_token",
            refresh_token=refresh_token,
            audience=target_client_id,
        )

        return {
            "accessToken": refreshed_token["access_token"],
            "refreshToken": refreshed_token["refresh_token"],
        }

    except KeycloakAuthenticationError as e:
        current_logger.error(
            f"Keycloak token refresh authentication error: {str(e)}"
        )
        raise BaseErrorCode.token_error(error="Failed to refresh token")
    except KeycloakOperationError as e:
        current_logger.error(f"Keycloak token refresh operation error: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Failed to perform token refresh operation"
        )
    except Exception as e:
        current_logger.error(f"Unexpected error during token refresh: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Unexpected error during token refresh"
        )
ON_COMMIT_HOOKS module-attribute
ON_COMMIT_HOOKS = '_keycloak_idp_on_commit_hooks'
ON_ROLLBACK_HOOKS module-attribute
ON_ROLLBACK_HOOKS = '_keycloak_idp_on_rollback_hooks'
get_identity_provider
get_identity_provider()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_provider() -> IdentityProvider:
    return _get_identity_provider()
get_identity_provider_type
get_identity_provider_type()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_provider_type() -> IdentityProviderType:
    if not is_development_mode():
        # We don't allow another identity provider in other environments than dev.
        return IdentityProviderType.keycloak

    # By default we use the stubbed identity provider in dev environment.
    return env.enum(  # type: ignore[no-any-return]
        "DEV_IDENTITY_PROVIDER_TYPE",
        "stubbed",
        type=IdentityProviderType,
        ignore_case=True,
    )
should_use_keycloak
should_use_keycloak()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def should_use_keycloak() -> bool:
    return get_identity_provider_type() == IdentityProviderType.keycloak
warn_because_not_available
warn_because_not_available(method_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def warn_because_not_available(method_name) -> None:  # type: ignore[no-untyped-def]
    message = (
        f"⚠️ The {method_name} method is stubbed and not fully supported, so you may run into errors. You can "
        "ping @auth_maintainers if you encounter any, and set `DEV_IDENTITY_PROVIDER_TYPE` env var to 'keycloak' to "
        "enable the full login experience for both backend and frontend."
    )

    current_logger.warning(message)

mfa_auth

BadNonce
BadNonce()

Bases: MFAError

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    super().__init__("BAD_NONCE")
DictMFAStorage
DictMFAStorage()

Bases: MFAStorage

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    self._store: dict[UUID, DictMFAStorageEntry] = {}
get
get(pending_operation_id)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def get(self, pending_operation_id: UUID) -> PendingOperation | None:
    entry = self._store.get(pending_operation_id)
    if entry is None:
        return None

    expired_at = entry.expired_at
    is_outdated = expired_at < datetime.datetime.utcnow()
    if is_outdated:
        del self._store[pending_operation_id]
        return None

    return entry.pending_operation  # type: ignore[no-any-return]
put
put(pending_operation)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def put(self, pending_operation: PendingOperation) -> None:
    self._store[pending_operation.id] = DictMFAStorageEntry(
        pending_operation=pending_operation,
        expired_at=self._get_expired_at_from_now(),  # type: ignore[no-untyped-call]
    )
update
update(pending_operation)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def update(self, pending_operation: PendingOperation) -> None:
    entry = self._store.get(pending_operation.id)
    expired_at = entry.expired_at if entry else self._get_expired_at_from_now()  # type: ignore[no-untyped-call]
    self._store[pending_operation.id] = DictMFAStorageEntry(
        pending_operation=pending_operation,
        expired_at=expired_at,
    )
DictMFAStorageEntry module-attribute
DictMFAStorageEntry = namedtuple(
    "DictMFAStorageEntry",
    ["pending_operation", "expired_at"],
)
DictNotifier
DictNotifier()

Bases: Notifier

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    self.store = {}  # type: ignore[var-annotated]
get
get(operation_id)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def get(self, operation_id: UUID) -> int | None:
    return self.store.get(operation_id)
name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.dict
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self,
    pending_operation: PendingOperation,
    origin: MfaRequestOrigin,  # noqa: ARG002
) -> None:
    self.store[pending_operation.id] = pending_operation.validation_code
store instance-attribute
store = {}
EmailNotifier
EmailNotifier(get_user_cls)

Bases: Notifier

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self, get_user_cls: Callable[[], type[Authenticatable]]) -> None:
    self._get_user_cls = get_user_cls
name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.email
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    if origin == MfaRequestOrigin.login_web:
        current_logger.debug(
            "Skipping email notification as MFA request is coming from web app"
        )
        return

    user: BaseUser = current_session.get(  # type: ignore[assignment]
        self._get_user_cls(),  # type: ignore[arg-type]
        pending_operation.user_id,
    )
    send_mfa_code_email(
        first_name=user.first_name,  # type: ignore[arg-type]
        email=user.email,  # type: ignore[arg-type]
        validation_code=pending_operation.validation_code,
        validation_code_description=pending_operation.description,
        validation_code_type=pending_operation.type,
        lang=getattr(user, "lang", None),
        # This email needs to be sent synchronously as the member needs it
        # as soon as possible.
        # Also, in case of an incident with workers, the member might never
        # receive the email, and would be blocked on authentication...
        is_async=False,
    )
InvalidUser
InvalidUser()

Bases: MFAError

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    super().__init__("INVALID_USER")
IosSimulatorJsonNotifier

Bases: Notifier

name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.ios_simulator
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    if origin == MfaRequestOrigin.login_mobile:
        # Sending a push notification makes no sense.
        return

    with NamedTemporaryFile(
        prefix="mfa-notif-", suffix=".apns", delete=False
    ) as jf:
        jf.write(
            json.dumps(
                {
                    "Simulator Target Bundle": "alan.health.ios",
                    "google.c.a.e": 1,
                    "gcm.message_id": "0:1538488916770554%a88db343a88db34",
                    "aps": {
                        "alert": {
                            "title": "Opération en attente de validation",
                            "body": pending_operation.description,
                        },
                    },
                    "operation_id": str(pending_operation.id),
                    "validation_code": str(pending_operation.validation_code),
                    "description": pending_operation.description,
                    "type": pending_operation.type,
                    "name": "mfa_operation_pending_push_notification",
                }
            ).encode("UTF-8")
        )
        current_logger.info(f"Notification dumped to {jf.name}")
MFA
MFA(storage, notifiers, _get_user_cls)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(
    self,
    storage: MFAStorage,
    notifiers: list[Notifier],
    _get_user_cls: Callable[[], type[Authenticatable]],
) -> None:
    self.storage = storage
    self.notifiers = {notifier.name(): notifier for notifier in notifiers}

    self._get_user_cls = _get_user_cls
get_operation_status
get_operation_status(operation_id, nonce)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def get_operation_status(self, operation_id: UUID, nonce: UUID) -> ValidationStatus:
    pending_operation = self._get_pending_operation(operation_id)
    if pending_operation.nonce != nonce:
        raise BadNonce

    return pending_operation.status
mfa_required
mfa_required(op_description, op_type)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def mfa_required(  # type: ignore[no-untyped-def]
    self,
    op_description: str,
    op_type: str,
):
    def decorator(f):  # type: ignore[no-untyped-def]
        @wraps(f)
        def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]
            authentication_service = AuthenticationService.create()
            assert g.current_user is not None
            identity = authentication_service.get_keycloak_identity_by_profile_id(
                g.current_user.profile_id
            )
            assert identity is not None

            if not identity.mfa_enabled:
                return f(*args, **kwargs)

            parser = reqparse.RequestParser()
            parser.add_argument(
                "operation_id",
                type=UUID,
                help="Pending operation ID",
                required=False,
            )
            parser.add_argument(
                "nonce",
                type=UUID,
                help="Pending operation nonce (only known by original client)",
                required=False,
            )
            parser.add_argument(
                "refresh_token_type",
                type=str,
                help="type of the refresh token: web or mobile",
                required=True,
            )

            params = parser.parse_args(strict=False)
            pending_operation_id = params.get("operation_id")

            if pending_operation_id is None:
                refresh_token_type = params.get("refresh_token_type")
                origin = MfaRequestOrigin[f"login_{refresh_token_type}"]

                pending_operation = self._create_pending_operation(
                    user_id=g.current_user.id,
                    description=op_description,
                    type=op_type,
                    origin=origin,
                )

                current_logger.info(
                    f"MFA operation {pending_operation.id} required on {request.path} for user {g.current_user.id}",
                    pending_operation_id=pending_operation.id,
                    origin=origin,
                )
            else:
                pending_operation = self._get_pending_operation(
                    pending_operation_id
                )

                if pending_operation.nonce != params.get("nonce"):
                    current_logger.warning(
                        f"bad MFA nonce: expected {pending_operation.nonce}, got {params.get('nonce')}"
                    )
                    self.update_status(pending_operation, ValidationStatus.FAILURE)

            if pending_operation.status == ValidationStatus.PENDING:
                raise BaseErrorCode.mfa_verification_pending(
                    operation_id=str(pending_operation.id),
                    nonce=str(pending_operation.nonce),
                )

            elif pending_operation.status == ValidationStatus.FAILURE:
                raise BaseErrorCode.authorization_error()

            else:
                return f(*args, **kwargs)

        return wrapper

    return decorator
notifiers instance-attribute
notifiers = {(name()): notifierfor notifier in notifiers}
register_notifier
register_notifier(notifier)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def register_notifier(self, notifier: Notifier) -> None:
    self.notifiers[notifier.name()] = notifier
send_email
send_email(operation_id, nonce)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def send_email(self, operation_id: UUID, nonce: UUID):  # type: ignore[no-untyped-def]
    pending_operation = self._get_pending_operation(operation_id)
    if pending_operation.nonce != nonce:
        raise BadNonce

    email_notifier = self.notifiers.get(NotifierType.email)
    if not email_notifier:
        raise NotImplementedError(
            "Cannot send email notification as email notifier is not register"
        )

    email_notifier.notify(
        pending_operation=pending_operation,
        origin=MfaRequestOrigin.email,
    )
storage instance-attribute
storage = storage
update_status
update_status(pending_operation, new_status)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def update_status(
    self, pending_operation: PendingOperation, new_status: ValidationStatus
) -> None:
    # Rejection status is permanent
    if pending_operation.status == ValidationStatus.FAILURE:
        return

    pending_operation.status = new_status
    self.storage.update(pending_operation)
validate_or_reject
validate_or_reject(
    operation_id,
    validation_code=None,
    authenticated=False,
    authenticated_user=None,
    reject=False,
)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def validate_or_reject(
    self,
    operation_id: UUID,
    validation_code: int | None = None,
    authenticated: bool = False,
    authenticated_user: UserId | None = None,
    reject: bool = False,
) -> bool:
    pending_operation = self._get_pending_operation(operation_id)

    if pending_operation.status == ValidationStatus.SUCCESS:
        current_logger.warning(
            f"MFA operation already validated: {operation_id}",
            operation_id=operation_id,
        )
        raise OperationValidated
    elif pending_operation.status == ValidationStatus.FAILURE:
        current_logger.warning(
            f"MFA operation already rejected: {operation_id}",
            operation_id=operation_id,
        )
        raise OperationRejected

    # Check for authenticated user id
    # The authenticated user id might be an UUID (international app or during tests)
    # or an int (fr-app).
    # When the authenticated user id is an UUID, it is ultimately stored as string.
    # To be sure equality works in all cases, cast both values to a string.
    if authenticated and str(pending_operation.user_id) != str(authenticated_user):
        current_logger.warning(
            f"MFA operation validation related to a different user: {operation_id}",
            operation_id=operation_id,
            operation_user_id=pending_operation.user_id,
            authenticated_user_id=authenticated_user,
        )
        raise InvalidUser

    if reject:
        current_logger.debug(
            f"Rejecting MFA operation: {operation_id}", operation_id=operation_id
        )
        new_status = ValidationStatus.FAILURE
    elif authenticated or pending_operation.validation_code == validation_code:
        current_logger.debug(
            f"Validating MFA operation: {operation_id}", operation_id=operation_id
        )
        new_status = ValidationStatus.SUCCESS
    else:
        return False

    self.update_status(pending_operation, new_status)
    return True
MFAError
MFAError(error)

Bases: Exception

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self, error: str) -> None:
    self.error = error
error instance-attribute
error = error
MFAStorage
get
get(pending_operation_id)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def get(self, pending_operation_id: UUID) -> PendingOperation | None:
    raise NotImplementedError
put
put(pending_operation)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def put(self, pending_operation: PendingOperation):  # type: ignore[no-untyped-def]
    raise NotImplementedError
update
update(pending_operation)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def update(self, pending_operation: PendingOperation):  # type: ignore[no-untyped-def]
    raise NotImplementedError
MfaRequestOrigin

Bases: AlanBaseEnum

email class-attribute instance-attribute
email = 'email'
login_mobile class-attribute instance-attribute
login_mobile = 'login_mobile'
login_web class-attribute instance-attribute
login_web = 'login_web'
Notifier
name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self) -> NotifierType:
    raise NotImplementedError
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(self, pending_operation: PendingOperation, origin: MfaRequestOrigin):  # type: ignore[no-untyped-def]
    raise NotImplementedError
NotifierType

Bases: AlanBaseEnum

dict class-attribute instance-attribute
dict = 'dict'
email class-attribute instance-attribute
email = 'email'
ios_simulator class-attribute instance-attribute
ios_simulator = 'ios_simulator'
print class-attribute instance-attribute
print = 'print'
push class-attribute instance-attribute
push = 'push'
OperationExpired
OperationExpired()

Bases: MFAError

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    super().__init__("OPERATION_EXPIRED")
OperationRejected
OperationRejected()

Bases: MFAError

Operation has reached terminal status (either validated or rejected)

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    super().__init__("OPERATION_REJECTED")
OperationValidated
OperationValidated()

Bases: MFAError

Operation has reached terminal status (either validated or rejected)

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    super().__init__("OPERATION_VALIDATED")
PENDING_OPERATION_TTL module-attribute
PENDING_OPERATION_TTL = 15 * 60
PendingOperation dataclass
PendingOperation(
    id,
    nonce,
    user_id,
    status,
    validation_code,
    description,
    type,
)

Bases: DataClassJsonMixin

description instance-attribute
description
id instance-attribute
id
nonce instance-attribute
nonce
status instance-attribute
status
type instance-attribute
type
user_id instance-attribute
user_id
validation_code instance-attribute
validation_code
PrintNotifier

Bases: Notifier

name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.print
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    current_logger.info(
        "MFA pending", pending_operation=asdict(pending_operation), origin=origin
    )
PushNotificationSender module-attribute
PushNotificationSender = Callable[
    [UserId, str, UUID, str | None], None
]
PushNotifier
PushNotifier(send_mfa_operation_pending_notification)

Bases: Notifier

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(
    self, send_mfa_operation_pending_notification: PushNotificationSender
) -> None:
    self._send_push_notification = send_mfa_operation_pending_notification
name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.push
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    if origin == MfaRequestOrigin.login_mobile:
        # Sending a push notification makes no sense.
        current_logger.debug(
            "Skipping push notification as MFA request is coming from mobile app"
        )
        return

    self._send_push_notification(  # type: ignore[misc]
        pending_operation.user_id,
        pending_operation.description,
        pending_operation.id,
        pending_operation.type,
    )
RedisMFAStorage
RedisMFAStorage(redis_connection)

Bases: MFAStorage

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self, redis_connection: redis.Redis | FakeStrictRedis) -> None:
    self.redis_connection = redis_connection
get
get(pending_operation_id)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def get(self, pending_operation_id: UUID) -> PendingOperation | None:
    pending_operation_json = self._redis.get(
        self.pending_operation_redis_id(pending_operation_id)
    )

    if pending_operation_json:
        # Use infer_missing=True to handle missing type attribute.
        return PendingOperation.from_json(
            pending_operation_json,
            infer_missing=True,
        )

    return None
pending_operation_redis_id staticmethod
pending_operation_redis_id(pending_operation_id)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
@staticmethod
def pending_operation_redis_id(pending_operation_id: UUID) -> str:
    return f"mfa_{pending_operation_id}"
put
put(pending_operation)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def put(self, pending_operation: PendingOperation) -> None:
    self._redis.setex(
        name=self.pending_operation_redis_id(pending_operation.id),
        time=PENDING_OPERATION_TTL,
        value=pending_operation.to_json(),
    )
redis_connection instance-attribute
redis_connection = redis_connection
update
update(pending_operation)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def update(self, pending_operation: PendingOperation) -> None:
    key = self.pending_operation_redis_id(pending_operation.id)
    ttl = self._redis.ttl(key)
    self._redis.setex(
        name=key,
        time=ttl if ttl else PENDING_OPERATION_TTL,
        value=pending_operation.to_json(),
    )
ValidationStatus

Bases: AlanBaseEnum

FAILURE class-attribute instance-attribute
FAILURE = 'FAILURE'
PENDING class-attribute instance-attribute
PENDING = 'PENDING'
SUCCESS class-attribute instance-attribute
SUCCESS = 'SUCCESS'
create_mfa_storage
create_mfa_storage()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def create_mfa_storage():  # type: ignore[no-untyped-def]
    try:
        redis_connection = get_redis_main_connection()
        return RedisMFAStorage(redis_connection[0])
    except:  # noqa: E722  # excluded as part as the ruff baseline on rule introduction, please use a more descriptive error if possible
        current_logger.exception("Error when initializing Redis MFA storage")
        return DictMFAStorage()

repository

AuthenticationIdentityMapper

Mapper between AuthenticationIdentity (domain dataclass) and AuthenticationIdentityModel (SQLAlchemy model).

to_entity staticmethod
to_entity(model)
Source code in components/authentication/internal/infrastructure/repository.py
@staticmethod
def to_entity(model: AuthenticationIdentityModel) -> AuthenticationIdentity:
    return AuthenticationIdentity(
        id=model.id,
        profile_id=model.profile_id,
        keycloak_id=model.keycloak_id,
    )
to_model staticmethod
to_model(entity)
Source code in components/authentication/internal/infrastructure/repository.py
@staticmethod
def to_model(entity: AuthenticationIdentity) -> AuthenticationIdentityModel:
    return AuthenticationIdentityModel(
        id=entity.id,
        profile_id=entity.profile_id,
        keycloak_id=entity.keycloak_id,
    )
AuthenticationRepository
AuthenticationRepository(session=None)

Bases: BaseAuthenticationRepository

Source code in components/authentication/internal/infrastructure/repository.py
def __init__(self, session: Session | None = None) -> None:
    self.session: Session = session or current_session
delete
delete(authentication_identity)
Source code in components/authentication/internal/infrastructure/repository.py
def delete(self, authentication_identity: AuthenticationIdentity) -> None:
    # TODO: @thibaut.Caillierez: use the id instead of the keycloak_id as soon as the source of truth in the double write repo is the authentication repo
    stmt = select(AuthenticationIdentityModel).where(
        AuthenticationIdentityModel.keycloak_id
        == authentication_identity.keycloak_id
    )
    model = self.session.scalar(stmt)
    if model is not None:
        self.session.delete(model)
get_by_id
get_by_id(authentication_identity_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_by_id(
    self, authentication_identity_id: uuid.UUID
) -> AuthenticationIdentity | None:
    stmt = select(AuthenticationIdentityModel).where(
        AuthenticationIdentityModel.id == authentication_identity_id
    )
    model = self.session.scalar(stmt)
    if model is None:
        return None

    return self.mapper.to_entity(model)
get_by_keycloak_id
get_by_keycloak_id(keycloak_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_by_keycloak_id(
    self, keycloak_id: uuid.UUID
) -> AuthenticationIdentity | None:
    stmt = select(AuthenticationIdentityModel).where(
        AuthenticationIdentityModel.keycloak_id == keycloak_id
    )
    model = self.session.scalar(stmt)
    if model is None:
        return None

    return self.mapper.to_entity(model)
get_by_profile_id
get_by_profile_id(profile_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_by_profile_id(self, profile_id: uuid.UUID) -> AuthenticationIdentity | None:
    stmt = select(AuthenticationIdentityModel).where(
        AuthenticationIdentityModel.profile_id == profile_id
    )
    model = self.session.scalar(stmt)
    if model is None:
        return None

    return self.mapper.to_entity(model)
mapper class-attribute instance-attribute
mapper = AuthenticationIdentityMapper
save
save(authentication_identity)
Source code in components/authentication/internal/infrastructure/repository.py
def save(self, authentication_identity: AuthenticationIdentity) -> None:
    model = self.mapper.to_model(authentication_identity)
    self.session.merge(model)
session instance-attribute
session = session or current_session
BaseAuthenticationRepository

Bases: ABC

delete abstractmethod
delete(authentication_identity)
Source code in components/authentication/internal/infrastructure/repository.py
@abstractmethod
def delete(self, authentication_identity: AuthenticationIdentity) -> None:
    raise NotImplementedError
get_by_id abstractmethod
get_by_id(authentication_identity_id)
Source code in components/authentication/internal/infrastructure/repository.py
@abstractmethod
def get_by_id(
    self, authentication_identity_id: uuid.UUID
) -> AuthenticationIdentity | None:
    raise NotImplementedError
get_by_keycloak_id abstractmethod
get_by_keycloak_id(keycloak_id)
Source code in components/authentication/internal/infrastructure/repository.py
@abstractmethod
def get_by_keycloak_id(
    self, keycloak_id: uuid.UUID
) -> AuthenticationIdentity | None:
    raise NotImplementedError
get_by_profile_id abstractmethod
get_by_profile_id(profile_id)
Source code in components/authentication/internal/infrastructure/repository.py
@abstractmethod
def get_by_profile_id(self, profile_id: uuid.UUID) -> AuthenticationIdentity | None:
    raise NotImplementedError
get_or_raise_by_id
get_or_raise_by_id(authentication_identity_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_or_raise_by_id(
    self, authentication_identity_id: uuid.UUID
) -> AuthenticationIdentity:
    authentication_identity = self.get_by_id(authentication_identity_id)
    if authentication_identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Identity with id {authentication_identity_id} not found"
        )
    return authentication_identity
get_or_raise_by_keycloak_id
get_or_raise_by_keycloak_id(keycloak_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_or_raise_by_keycloak_id(
    self, keycloak_id: uuid.UUID
) -> AuthenticationIdentity:
    authentication_identity = self.get_by_keycloak_id(keycloak_id)
    if authentication_identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Profile with keycloak_id {keycloak_id} not found"
        )
    return authentication_identity
get_or_raise_by_profile_id
get_or_raise_by_profile_id(profile_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_or_raise_by_profile_id(
    self, profile_id: uuid.UUID
) -> AuthenticationIdentity:
    authentication_identity = self.get_by_profile_id(profile_id)
    if authentication_identity is None:
        raise BaseErrorCode.missing_resource(
            message=f"Profile with profile_id {profile_id} not found"
        )
    return authentication_identity
save abstractmethod
save(authentication_identity)
Source code in components/authentication/internal/infrastructure/repository.py
@abstractmethod
def save(self, authentication_identity: AuthenticationIdentity) -> None:
    raise NotImplementedError
InMemoryAuthenticationRepository
InMemoryAuthenticationRepository(memory=None)

Bases: BaseAuthenticationRepository

AuthenticationRepository using an in-memory dictionary, there are no transactions to manage.

This is useful for testing purposes.

Source code in components/authentication/internal/infrastructure/repository.py
def __init__(
    self, memory: dict[uuid.UUID, AuthenticationIdentity] | None = None
) -> None:
    self.memory = memory if memory is not None else {}
delete
delete(authentication_identity)
Source code in components/authentication/internal/infrastructure/repository.py
def delete(self, authentication_identity: AuthenticationIdentity) -> None:
    self.memory.pop(authentication_identity.id, None)
get_by_id
get_by_id(authentication_identity_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_by_id(
    self, authentication_identity_id: uuid.UUID
) -> AuthenticationIdentity | None:
    return self.memory.get(authentication_identity_id)
get_by_keycloak_id
get_by_keycloak_id(keycloak_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_by_keycloak_id(
    self, keycloak_id: uuid.UUID
) -> AuthenticationIdentity | None:
    for authentication_identity in self.memory.values():
        if authentication_identity.keycloak_id == keycloak_id:
            return authentication_identity
    return None
get_by_profile_id
get_by_profile_id(profile_id)
Source code in components/authentication/internal/infrastructure/repository.py
def get_by_profile_id(self, profile_id: uuid.UUID) -> AuthenticationIdentity | None:
    for authentication_identity in self.memory.values():
        if authentication_identity.profile_id == profile_id:
            return authentication_identity
    return None
memory instance-attribute
memory = memory if memory is not None else {}
save
save(authentication_identity)
Source code in components/authentication/internal/infrastructure/repository.py
def save(self, authentication_identity: AuthenticationIdentity) -> None:
    self.memory[authentication_identity.id] = authentication_identity

retro_compatibility_repository

RetroCompatibilityAuthenticationRepository
RetroCompatibilityAuthenticationRepository(
    session=None, app_name=None
)

Bases: BaseAuthenticationRepository

This class is a retro-compatibility layer for the authentication repository. It uses the country users to link a profile id to an authentication identity. ⚠️ Cannot work in 🇨🇦 that doesn't import all models from all components.

Source code in components/authentication/internal/infrastructure/retro_compatibility_repository.py
def __init__(
    self,
    session: Session | None = None,
    app_name: AppName | None = None,
) -> None:
    app_name = app_name or get_current_app_name()
    self.session: Session = session or current_session
    match app_name:
        case AppName.ALAN_FR:
            from components.be.internal.models.be_user import BeUser  # noqa: ALN069
            from components.ca.internal.tech.models.ca_user import (  # noqa: ALN069
                CaUser,
            )
            from components.es.internal.models.es_user import EsUser  # noqa: ALN069
            from components.fr.internal.models.user import (  # noqa: ALN069
                User as FrUser,
            )

            self.models_lookup_table = [FrUser, BeUser, EsUser, CaUser]
        case AppName.ALAN_BE:
            from components.be.internal.models.be_user import BeUser  # noqa: ALN069
            from components.ca.internal.tech.models.ca_user import (  # noqa: ALN069
                CaUser,
            )
            from components.es.internal.models.es_user import EsUser  # noqa: ALN069
            from components.fr.internal.models.user import (  # noqa: ALN069
                User as FrUser,
            )

            self.models_lookup_table = [BeUser, FrUser, EsUser, CaUser]
        case AppName.ALAN_ES:
            from components.be.internal.models.be_user import BeUser  # noqa: ALN069
            from components.ca.internal.tech.models.ca_user import (  # noqa: ALN069
                CaUser,
            )
            from components.es.internal.models.es_user import EsUser  # noqa: ALN069
            from components.fr.internal.models.user import (  # noqa: ALN069
                User as FrUser,
            )

            self.models_lookup_table = [EsUser, FrUser, BeUser, CaUser]
        case AppName.ALAN_CA:
            from components.ca.internal.tech.models.ca_user import (  # noqa: ALN069
                CaUser,
            )

            self.models_lookup_table = [
                CaUser,
            ]
        case AppName.SHARED_TESTING:
            from shared.models.testing.versioning import TestUser

            self.models_lookup_table = [TestUser]
delete
delete(authentication_identity)
Source code in components/authentication/internal/infrastructure/retro_compatibility_repository.py
def delete(self, authentication_identity: AuthenticationIdentity) -> None:
    for model in self.models_lookup_table:
        user = (
            self.session.query(model)
            .filter(model.profile_id == authentication_identity.profile_id)
            .one_or_none()
        )
        if user:
            user.keycloak_id = None
get_by_id
get_by_id(authentication_identity_id)
Source code in components/authentication/internal/infrastructure/retro_compatibility_repository.py
def get_by_id(
    self, authentication_identity_id: uuid.UUID
) -> AuthenticationIdentity | None:
    raise NotImplementedError()
get_by_keycloak_id
get_by_keycloak_id(keycloak_id)
Source code in components/authentication/internal/infrastructure/retro_compatibility_repository.py
def get_by_keycloak_id(
    self, keycloak_id: uuid.UUID
) -> AuthenticationIdentity | None:
    for model in self.models_lookup_table:
        user = (
            self.session.query(model)
            .filter(model.keycloak_id == keycloak_id)  # type: ignore[arg-type]
            .one_or_none()
        )
        if user:
            return AuthenticationIdentity(
                id=uuid.uuid4(),
                profile_id=user.profile_id,
                keycloak_id=user.keycloak_id,  # type: ignore[arg-type]
            )
    return None
get_by_profile_id
get_by_profile_id(profile_id)
Source code in components/authentication/internal/infrastructure/retro_compatibility_repository.py
def get_by_profile_id(self, profile_id: uuid.UUID) -> AuthenticationIdentity | None:
    for model in self.models_lookup_table:
        user = (
            self.session.query(model)
            .filter(model.profile_id == profile_id)
            .one_or_none()
        )
        if user:
            return AuthenticationIdentity(
                id=user.id,
                profile_id=user.profile_id,
                keycloak_id=user.keycloak_id,  # type: ignore[arg-type]
            )
    return None
models_lookup_table instance-attribute
models_lookup_table = [TestUser]
save
save(authentication_identity)
Source code in components/authentication/internal/infrastructure/retro_compatibility_repository.py
def save(self, authentication_identity: AuthenticationIdentity) -> None:
    for model in self.models_lookup_table:
        user = (
            self.session.query(model)
            .filter(model.profile_id == authentication_identity.profile_id)
            .one_or_none()
        )
        if user:
            user.keycloak_id = authentication_identity.keycloak_id
session instance-attribute
session = session or current_session

unit_of_work

InMemoryUnitOfWork
InMemoryUnitOfWork(event_dispatcher=None)

Bases: UnitOfWork

UnitOfWork using an in-memory dictionary, there are no transactions to manage.

This is useful for testing purposes.

Source code in components/authentication/internal/infrastructure/unit_of_work.py
def __init__(
    self,
    event_dispatcher: EventDispatcher | None = None,
) -> None:
    super().__init__(event_dispatcher=event_dispatcher)

    # keep the same memory for the lifetime of the unit of work
    self.memory: dict[uuid.UUID, AuthenticationIdentity] = {}
__enter__
__enter__()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def __enter__(self) -> Self:
    self.authentication_repository = InMemoryAuthenticationRepository(self.memory)
    self.identity_provider: IdentityProvider = get_identity_provider()
    return super().__enter__()
close
close()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def close(self) -> None:
    pass
commit
commit()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def commit(self) -> None:
    pass
memory instance-attribute
memory = {}
rollback
rollback()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def rollback(self) -> None:
    pass
SQLAlchemyUnitOfWork
SQLAlchemyUnitOfWork(
    session=None,
    commit_at_end=False,
    dispatch_at_end=True,
    event_dispatcher=None,
    authentication_repository_factory=None,
)

Bases: UnitOfWork

Source code in components/authentication/internal/infrastructure/unit_of_work.py
def __init__(
    self,
    session: Session | None = None,
    commit_at_end: bool = False,
    dispatch_at_end: bool = True,
    event_dispatcher: EventDispatcher | None = None,
    authentication_repository_factory: Callable[
        [Session],
        BaseAuthenticationRepository,
    ]
    | None = None,
) -> None:
    super().__init__(event_dispatcher=event_dispatcher)

    self.session: Session = session or current_session

    self.commit_at_end: bool = commit_at_end
    self.dispatch_at_end: bool = dispatch_at_end
    self.authentication_repository_factory = (
        authentication_repository_factory or AuthenticationRepository
    )
__enter__
__enter__()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def __enter__(self) -> Self:
    self.identity_provider: IdentityProvider = get_identity_provider()
    self.authentication_repository = self.authentication_repository_factory(  # type: ignore[call-arg]
        session=self.session
    )
    return super().__enter__()
authentication_repository_factory instance-attribute
authentication_repository_factory = (
    authentication_repository_factory
    or AuthenticationRepository
)
close
close()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def close(self) -> None:
    return
commit
commit()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def commit(self) -> None:
    if not self.commit_at_end:
        self.session.flush()
        return

    self.session.commit()
commit_at_end instance-attribute
commit_at_end = commit_at_end
dispatch_at_end instance-attribute
dispatch_at_end = dispatch_at_end
rollback
rollback()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@override
def rollback(self) -> None:
    self.session.rollback()
session instance-attribute
session = session or current_session
UnitOfWork
UnitOfWork(event_dispatcher=None)

Bases: ABC

Source code in components/authentication/internal/infrastructure/unit_of_work.py
def __init__(self, event_dispatcher: EventDispatcher | None = None) -> None:
    self.event_dispatcher: EventDispatcher = (
        event_dispatcher or AlanMessagingEventDispatcher()
    )

    # events to be handled by the message bus
    self.events: list[DomainEvent] = []

    # events to be sent at the end of the work to the event dispatcher
    self._event_outbox: list[DomainEvent] = []
__enter__
__enter__()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
def __enter__(self) -> Self:
    return self
__exit__
__exit__(exc_type, exc_value, traceback)
Source code in components/authentication/internal/infrastructure/unit_of_work.py
def __exit__(
    self,
    exc_type: type[BaseException] | None,
    exc_value: BaseException | None,
    traceback: TracebackType | None,
) -> None:
    # Not returning True: exc_type (if it exists) will be propagated
    if exc_type is not None:
        current_logger.error(f"{exc_type} occurred within the context, rollbacking")
        self.rollback()
        self.close()
        raise

    try:
        self.commit()
    except Exception:
        self.rollback()
        self.close()
        raise
    else:
        # We don't want to rollback if there is a dispatch issue
        self.dispatch()
    finally:
        self.close()
authentication_repository instance-attribute
authentication_repository
close abstractmethod
close()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@abstractmethod
def close(self) -> None:
    raise NotImplementedError
commit abstractmethod
commit()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@abstractmethod
def commit(self) -> None:
    raise NotImplementedError
dispatch
dispatch()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
def dispatch(self) -> None:
    if len(events := self.flush_events()) > 0:
        current_logger.error(
            "%d events were not flushed prior to dispatching, something is wrong with your code",
            len(events),
            events=events,
        )

    events_to_dispatch = self._event_outbox.copy()
    self._event_outbox.clear()

    for event in events_to_dispatch:
        self.event_dispatcher.dispatch(event)
event_dispatcher instance-attribute
event_dispatcher = (
    event_dispatcher or AlanMessagingEventDispatcher()
)
events instance-attribute
events = []
flush_events
flush_events()

Flush events from unit of work and return them.

Additionally, it stores them internally to be dispatched later.

Source code in components/authentication/internal/infrastructure/unit_of_work.py
def flush_events(self) -> list[DomainEvent]:
    """
    Flush events from unit of work and return them.

    Additionally, it stores them internally to be dispatched later.
    """
    events: list[DomainEvent] = self.events.copy()
    self.events.clear()
    self._event_outbox.extend(events)

    return events
identity_provider instance-attribute
identity_provider
rollback abstractmethod
rollback()
Source code in components/authentication/internal/infrastructure/unit_of_work.py
@abstractmethod
def rollback(self) -> None:
    raise NotImplementedError

mailers

account_compromise

send_account_compromised_after_credential_stuffing_email
send_account_compromised_after_credential_stuffing_email(
    user_id,
)

Send notification email to user about account compromise after credential stuffing attack.

Parameters:

Name Type Description Default
user_id UUID

UUID of the user to notify

required

Returns:

Type Description
LangTemplateMailerParams

LangTemplateMailerParams for the email to be sent

Source code in components/authentication/internal/mailers/account_compromise.py
@async_mailer(
    category=EmailCategory.transactional,
    priority=EmailPriority.high,
    email_queue_name=EMAIL_QUEUE,
)
def send_account_compromised_after_credential_stuffing_email(
    user_id: UUID,
) -> LangTemplateMailerParams:
    """
    Send notification email to user about account compromise after credential stuffing attack.

    Args:
        user_id: UUID of the user to notify

    Returns:
        LangTemplateMailerParams for the email to be sent
    """
    profile_service = ProfileService.create()

    UserModel = get_current_class(BaseUser)
    user = get_or_raise_missing_resource(UserModel, user_id)

    profile = profile_service.get_or_raise_profile(user.profile_id)
    lang = ProfileCompat(profile).lang_compat if profile else None

    # Log security event
    current_logger.info(
        "Sending account compromise notification email",
        user_id=str(user_id),
        has_profile=profile is not None,
        user_lang=str(lang) if lang else None,
        event_type="account_compromise_credential_stuffing",
    )

    # Create personalization based on user's email (BaseUser-compatible)
    email = getattr(user, "email", None) or getattr(
        user, "last_employment_invite_email", None
    )
    if not email:
        raise ValueError(f"No email found for user {user_id}")

    personalization = MailPersonalization(email)

    # Prepare template arguments
    template_args = {
        "FIRST_NAME": getattr(user, "first_name", "User"),
    }

    return LangTemplateMailerParams(
        personalization=personalization,
        lang=lang,
        template="account_compromised_after_credential_stuffing",
        template_args=template_args,
        recipient_user_id=str(user_id),
        transactional_message_id="441" if is_production_mode() else "215",
        plain_text=False,
    )

async_mailer

async_mailer
async_mailer(
    category,
    priority,
    email_queue_name,
    delivery=DEFAULT_DELIVERY,
    job_timeout=None,
    enqueue=True,
    redacted_args=None,
)
Source code in components/authentication/internal/mailers/async_mailer.py
def async_mailer(
    category: EmailCategory,
    priority: EmailPriority,
    email_queue_name: Optional[str],
    delivery: Union[CustomerIODelivery, SendgridDelivery] = DEFAULT_DELIVERY,
    job_timeout: Optional[int] = None,
    enqueue: bool = True,
    redacted_args: set[str] | None = None,
) -> Callable[[_MailerCallable], _MailerCallable]:
    return base_async_mailer(
        category=category,
        priority=priority,
        email_queue_name=email_queue_name,
        delivery=delivery,
        job_timeout=job_timeout,
        enqueue=enqueue,
        redacted_args=redacted_args,
    )

models

authentication_identity

AuthenticationIdentityModel

Bases: BaseModel

__table_args__ class-attribute instance-attribute
__table_args__ = {'schema': AUTHENTICATION_SCHEMA_NAME}
__tablename__ class-attribute instance-attribute
__tablename__ = 'authentication_identity'
id class-attribute instance-attribute
id = mapped_column(UUID(as_uuid=True), primary_key=True)
keycloak_id class-attribute instance-attribute
keycloak_id = mapped_column(
    UUID(as_uuid=True),
    unique=True,
    nullable=False,
    index=True,
)
profile_id class-attribute instance-attribute
profile_id = mapped_column(
    UUID(as_uuid=True),
    unique=True,
    nullable=False,
    index=True,
)

helpers

AUTHENTICATION_SCHEMA_NAME module-attribute
AUTHENTICATION_SCHEMA_NAME = 'authentication'

components.authentication.observability

obs module-attribute

obs = ServiceObservability('authentication')

components.authentication.public

api

AuthenticationService

AuthenticationService(unit_of_work=None, message_bus=None)

Entry point to interact with the identity provider

This service is a CRUD api to interact with the identity provider.

Instantiates the ProfileService

Default values are provided for target production use, you may need to override them.

Source code in components/authentication/public/api.py
def __init__(
    self,
    unit_of_work: UnitOfWork | None = None,
    message_bus: MessageBus | None = None,
):
    """
    Instantiates the ProfileService

    Default values are provided for target production use, you may need to override them.

    """
    self.unit_of_work: UnitOfWork = unit_of_work or SQLAlchemyUnitOfWork()

    injected_event_handlers = {
        event_type: [
            # technically that cast is incorrect since handlers don't accept any random commands
            cast(
                "Callable[[DomainEvent], None]",
                functools.partial(handler, unit_of_work=self.unit_of_work),
            )
            for handler in handlers
        ]
        for event_type, handlers in _EVENT_HANDLERS.items()
    }
    injected_command_handlers = {
        command_type: [
            # technically that cast is incorrect since handlers don't accept any random commands
            cast(
                "Callable[[Command], None]",
                functools.partial(handler, unit_of_work=self.unit_of_work),
            )
            for handler in handlers
        ]
        for command_type, handlers in _COMMAND_HANDLERS.items()
    }

    self.message_bus: MessageBus = message_bus or MessageBus(
        self.unit_of_work, injected_event_handlers, injected_command_handlers
    )
change_identity_email
change_identity_email(identity_id, *, email)

Request an email change for an identity.

This method handles complex email change scenarios: 1. Approved: Email is available and gets updated successfully 2. Denied: Email conflicts with existing identity (raises error) 3. Merged: Email belongs to another identity - triggers identity merge

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to change email for

required
email str

New email address (will be normalized)

required

Raises:

Type Description
BaseErrorCode

If the email change is denied due to conflicts

Events Emitted

IdentityEmailChangedEvent: When email is successfully changed IdentityMergedEvent: When identities are merged due to email conflict

Examples:

>>> try:
...     authentication_service.change_identity_email(
...         identity_id=identity_id,
...         email="new-email@example.com"
...     )
...     print("Email changed successfully")
... except BaseErrorCode:
...     print("Email change denied - conflict with existing identity")
Note

Email merging happens when the target email already belongs to another identity. In this case, the two identities are merged and the old identity is marked for deletion.

Source code in components/authentication/public/api.py
def change_identity_email(self, identity_id: UUID, *, email: str) -> None:
    """
    Request an email change for an identity.

    This method handles complex email change scenarios:
    1. **Approved**: Email is available and gets updated successfully
    2. **Denied**: Email conflicts with existing identity (raises error)
    3. **Merged**: Email belongs to another identity - triggers identity merge

    Args:
        identity_id: UUID of the identity to change email for
        email: New email address (will be normalized)

    Raises:
        BaseErrorCode: If the email change is denied due to conflicts

    Events Emitted:
        IdentityEmailChangedEvent: When email is successfully changed
        IdentityMergedEvent: When identities are merged due to email conflict

    Examples:
        >>> try:
        ...     authentication_service.change_identity_email(
        ...         identity_id=identity_id,
        ...         email="new-email@example.com"
        ...     )
        ...     print("Email changed successfully")
        ... except BaseErrorCode:
        ...     print("Email change denied - conflict with existing identity")

    Note:
        Email merging happens when the target email already belongs to another
        identity. In this case, the two identities are merged and the old
        identity is marked for deletion.
    """
    command = RequestChangeIdentityEmailCommand(
        identity_id=identity_id, email=email
    )
    with self.unit_of_work:
        self.message_bus.handle(command)
change_identity_first_name
change_identity_first_name(identity_id, *, first_name)

Update the first name of an identity.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to update

required
first_name str | None

New first name, or None to clear the existing name

required
Events Emitted

IdentityFirstNameChanged: When the first name is updated

Examples:

Set a first name:

>>> authentication_service.change_identity_first_name(
...     identity_id=identity_id,
...     first_name="Jane"
... )

Clear the first name:

>>> authentication_service.change_identity_first_name(
...     identity_id=identity_id,
...     first_name=None
... )
Note

This method will eventually be replaced by a subscription to profile events (IdentityInformationChanged) to maintain data consistency.

Source code in components/authentication/public/api.py
def change_identity_first_name(
    self, identity_id: UUID, *, first_name: str | None
) -> None:
    """
    Update the first name of an identity.

    Args:
        identity_id: UUID of the identity to update
        first_name: New first name, or None to clear the existing name

    Events Emitted:
        IdentityFirstNameChanged: When the first name is updated

    Examples:
        Set a first name:

        >>> authentication_service.change_identity_first_name(
        ...     identity_id=identity_id,
        ...     first_name="Jane"
        ... )

        Clear the first name:

        >>> authentication_service.change_identity_first_name(
        ...     identity_id=identity_id,
        ...     first_name=None
        ... )

    Note:
        This method will eventually be replaced by a subscription to profile
        events (IdentityInformationChanged) to maintain data consistency.
    """
    command = ChangeIdentityFirstNameCommand(
        identity_id=identity_id, first_name=first_name
    )
    with self.unit_of_work:
        self.message_bus.handle(command)
change_identity_language
change_identity_language(identity_id, *, language)

Update the preferred language of an identity.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to update

required
language Lang

New preferred language (ISO639 language code)

required
Events Emitted

IdentityLanguageChanged: When the language preference is updated

Examples:

>>> authentication_service.change_identity_language(
...     identity_id=identity_id,
...     language=Lang.FRENCH
... )
Note

This method will eventually be replaced by a subscription to profile events (PreferredLanguageChanged) to maintain data consistency.

Source code in components/authentication/public/api.py
def change_identity_language(self, identity_id: UUID, *, language: Lang) -> None:
    """
    Update the preferred language of an identity.

    Args:
        identity_id: UUID of the identity to update
        language: New preferred language (ISO639 language code)

    Events Emitted:
        IdentityLanguageChanged: When the language preference is updated

    Examples:
        >>> authentication_service.change_identity_language(
        ...     identity_id=identity_id,
        ...     language=Lang.FRENCH
        ... )

    Note:
        This method will eventually be replaced by a subscription to profile
        events (PreferredLanguageChanged) to maintain data consistency.
    """
    command = ChangeIdentityLanguageCommand(
        identity_id=identity_id, language=language
    )
    with self.unit_of_work:
        self.message_bus.handle(command)
change_identity_last_name
change_identity_last_name(identity_id, *, last_name)

Update the last name of an identity.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to update

required
last_name str | None

New last name, or None to clear the existing name

required
Events Emitted

IdentityLastNameChanged: When the last name is updated

Examples:

Set a last name:

>>> authentication_service.change_identity_last_name(
...     identity_id=identity_id,
...     last_name="Smith"
... )

Clear the last name:

>>> authentication_service.change_identity_last_name(
...     identity_id=identity_id,
...     last_name=None
... )
Note

This method will eventually be replaced by a subscription to profile events (IdentityInformationChanged) to maintain data consistency.

Source code in components/authentication/public/api.py
def change_identity_last_name(
    self, identity_id: UUID, *, last_name: str | None
) -> None:
    """
    Update the last name of an identity.

    Args:
        identity_id: UUID of the identity to update
        last_name: New last name, or None to clear the existing name

    Events Emitted:
        IdentityLastNameChanged: When the last name is updated

    Examples:
        Set a last name:

        >>> authentication_service.change_identity_last_name(
        ...     identity_id=identity_id,
        ...     last_name="Smith"
        ... )

        Clear the last name:

        >>> authentication_service.change_identity_last_name(
        ...     identity_id=identity_id,
        ...     last_name=None
        ... )

    Note:
        This method will eventually be replaced by a subscription to profile
        events (IdentityInformationChanged) to maintain data consistency.
    """
    command = ChangeIdentityLastNameCommand(
        identity_id=identity_id, last_name=last_name
    )
    with self.unit_of_work:
        self.message_bus.handle(command)
change_keycloak_id
change_keycloak_id(*, profile_id, new_keycloak_id)

Update the Keycloak ID associated with a profile.

This method updates the link between a profile and its Keycloak identity. It's primarily used during migration scenarios or when reassigning identities between Keycloak instances.

Parameters:

Name Type Description Default
profile_id UUID

UUID of the profile to update the Keycloak link for

required
new_keycloak_id UUID | None

New Keycloak identity ID, or None to unlink

required

Examples:

Link profile to new Keycloak identity:

>>> authentication_service.change_keycloak_id(
...     profile_id=profile_id,
...     new_keycloak_id=new_identity_id
... )

Unlink profile from Keycloak (temporary state):

>>> authentication_service.change_keycloak_id(
...     profile_id=profile_id,
...     new_keycloak_id=None
... )
Use Cases
  • Keycloak instance migrations
  • Identity consolidation after mergers
  • Fixing broken identity links
  • Temporary unlinking during maintenance
Source code in components/authentication/public/api.py
def change_keycloak_id(
    self, *, profile_id: UUID, new_keycloak_id: UUID | None
) -> None:
    """
    Update the Keycloak ID associated with a profile.

    This method updates the link between a profile and its Keycloak identity.
    It's primarily used during migration scenarios or when reassigning
    identities between Keycloak instances.

    Args:
        profile_id: UUID of the profile to update the Keycloak link for
        new_keycloak_id: New Keycloak identity ID, or None to unlink

    Examples:
        Link profile to new Keycloak identity:

        >>> authentication_service.change_keycloak_id(
        ...     profile_id=profile_id,
        ...     new_keycloak_id=new_identity_id
        ... )

        Unlink profile from Keycloak (temporary state):

        >>> authentication_service.change_keycloak_id(
        ...     profile_id=profile_id,
        ...     new_keycloak_id=None
        ... )

    Use Cases:
        - Keycloak instance migrations
        - Identity consolidation after mergers
        - Fixing broken identity links
        - Temporary unlinking during maintenance
    """
    command = UpdateKeycloakIdCommand(
        profile_id=profile_id, keycloak_id=new_keycloak_id
    )

    with self.unit_of_work:
        self.message_bus.handle(command)
change_mfa_status
change_mfa_status(
    identity_id, *, mfa_enabled=None, mfa_required=None
)

Update the Multi-Factor Authentication settings for an identity.

This method allows independent control of two MFA settings: - mfa_enabled: Whether the user has MFA configured/activated - mfa_required: Whether MFA is mandatory for this identity

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to update

required
mfa_enabled bool | None

Whether MFA is enabled/configured (None = no change)

None
mfa_required bool | None

Whether MFA is required for login (None = no change)

None

Examples:

Require MFA for a high-privilege user:

>>> authentication_service.change_mfa_status(
...     identity_id=admin_identity_id,
...     mfa_required=True
... )

User successfully configured MFA:

>>> authentication_service.change_mfa_status(
...     identity_id=identity_id,
...     mfa_enabled=True
... )

Disable MFA requirement (but keep it configured):

>>> authentication_service.change_mfa_status(
...     identity_id=identity_id,
...     mfa_required=False  # mfa_enabled unchanged
... )
MFA States
  • Required + Enabled: User must use MFA and has it configured ✅
  • Required + Not Enabled: User must configure MFA before login ⚠️
  • Not Required + Enabled: User can optionally use MFA
  • Not Required + Not Enabled: Standard password-only login
Source code in components/authentication/public/api.py
def change_mfa_status(
    self,
    identity_id: UUID,
    *,
    mfa_enabled: bool | None = None,
    mfa_required: bool | None = None,
) -> None:
    """
    Update the Multi-Factor Authentication settings for an identity.

    This method allows independent control of two MFA settings:
    - **mfa_enabled**: Whether the user has MFA configured/activated
    - **mfa_required**: Whether MFA is mandatory for this identity

    Args:
        identity_id: UUID of the identity to update
        mfa_enabled: Whether MFA is enabled/configured (None = no change)
        mfa_required: Whether MFA is required for login (None = no change)

    Examples:
        Require MFA for a high-privilege user:

        >>> authentication_service.change_mfa_status(
        ...     identity_id=admin_identity_id,
        ...     mfa_required=True
        ... )

        User successfully configured MFA:

        >>> authentication_service.change_mfa_status(
        ...     identity_id=identity_id,
        ...     mfa_enabled=True
        ... )

        Disable MFA requirement (but keep it configured):

        >>> authentication_service.change_mfa_status(
        ...     identity_id=identity_id,
        ...     mfa_required=False  # mfa_enabled unchanged
        ... )

    MFA States:
        - **Required + Enabled**: User must use MFA and has it configured ✅
        - **Required + Not Enabled**: User must configure MFA before login ⚠️
        - **Not Required + Enabled**: User can optionally use MFA
        - **Not Required + Not Enabled**: Standard password-only login
    """
    command = ChangeIdentityMfaStatusCommand(
        identity_id=identity_id,
        mfa_enabled=mfa_enabled,
        mfa_required=mfa_required,
    )

    with self.unit_of_work:
        self.message_bus.handle(command)
check_identity_has_password
check_identity_has_password(identity_id)

Check if an identity has a password configured.

This is useful for determining if an identity can use password-based authentication or if it relies solely on external identity providers.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to check

required

Returns:

Name Type Description
bool bool

True if the identity has a password set, False otherwise (also returns False if identity doesn't exist)

Examples:

>>> if authentication_service.check_identity_has_password(identity_id):
...     print("Password authentication available")
... else:
...     print("No password set - external auth required")
Source code in components/authentication/public/api.py
def check_identity_has_password(self, identity_id: UUID) -> bool:
    """
    Check if an identity has a password configured.

    This is useful for determining if an identity can use password-based
    authentication or if it relies solely on external identity providers.

    Args:
        identity_id: UUID of the identity to check

    Returns:
        bool: True if the identity has a password set, False otherwise
             (also returns False if identity doesn't exist)

    Examples:
        >>> if authentication_service.check_identity_has_password(identity_id):
        ...     print("Password authentication available")
        ... else:
        ...     print("No password set - external auth required")
    """
    with self.unit_of_work:
        domain_entity = self.unit_of_work.identity_provider.get_identity(
            identity_id
        )
        if domain_entity is None:
            current_logger.warning(
                f"No auth identity found for identity_id {identity_id}"
            )
            return False

        return domain_entity.has_password()
check_identity_password
check_identity_password(identity_id, *, prehashed_password)

Verify if the provided password is correct for an identity.

This method performs password verification against the identity's stored credentials in Keycloak. The password should be pre-hashed according to the system's hashing requirements.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to authenticate

required
prehashed_password str

The password after client-side hashing

required

Returns:

Name Type Description
bool bool

True if the password is correct, False otherwise (also returns False if identity doesn't exist or has no password)

Examples:

Typical login flow:

>>> if not authentication_service.check_identity_has_password(identity_id):
...     return "No password set for this identity"
>>> if authentication_service.check_identity_password(
...     identity_id=identity_id,
...     prehashed_password=hashed_password
... ):
...     return "Authentication successful"
... else:
...     return "Invalid credentials"
Security Note

Failed password attempts are logged with warnings for security monitoring.

Source code in components/authentication/public/api.py
def check_identity_password(
    self, identity_id: UUID, *, prehashed_password: str
) -> bool:
    """
    Verify if the provided password is correct for an identity.

    This method performs password verification against the identity's
    stored credentials in Keycloak. The password should be pre-hashed
    according to the system's hashing requirements.

    Args:
        identity_id: UUID of the identity to authenticate
        prehashed_password: The password after client-side hashing

    Returns:
        bool: True if the password is correct, False otherwise
             (also returns False if identity doesn't exist or has no password)

    Examples:
        Typical login flow:

        >>> if not authentication_service.check_identity_has_password(identity_id):
        ...     return "No password set for this identity"
        >>> if authentication_service.check_identity_password(
        ...     identity_id=identity_id,
        ...     prehashed_password=hashed_password
        ... ):
        ...     return "Authentication successful"
        ... else:
        ...     return "Invalid credentials"

    Security Note:
        Failed password attempts are logged with warnings for security monitoring.
    """
    with self.unit_of_work:
        domain_entity = self.unit_of_work.identity_provider.get_identity(
            identity_id
        )
        if domain_entity is None:
            current_logger.warning(
                f"No auth identity found for identity_id {identity_id}"
            )
            return False

        result = domain_entity.check_password(prehashed_password=prehashed_password)
        if result:
            return result

        current_logger.warning(f"Wrong password for authentication {identity_id}")
    return False
clear_identity_email
clear_identity_email(identity_id, *, invalidated_email)

Replace an identity's email with an invalidated placeholder email.

This method is used to effectively "clear" an email while maintaining database constraints that require email uniqueness. The original email is replaced with an invalidated version that won't conflict with future registrations.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to clear the email for

required
invalidated_email str

Invalidated email format to replace the original (e.g., "invalidated-{timestamp}@domain.com")

required
Events Emitted

IdentityEmailCleared: When the email is successfully invalidated

Examples:

>>> import time
>>> timestamp = int(time.time())
>>> invalidated = f"invalidated-{timestamp}@example.com"
>>> authentication_service.clear_identity_email(
...     identity_id=identity_id,
...     invalidated_email=invalidated
... )
Use Cases
  • GDPR compliance (email anonymization)
  • Freeing up email for re-registration
  • Deactivating accounts while preserving audit trails
  • Handling bounced/invalid email addresses
Note

The invalidated email should follow a consistent pattern to avoid conflicts and maintain traceability for audit purposes.

Source code in components/authentication/public/api.py
def clear_identity_email(
    self, identity_id: UUID, *, invalidated_email: str
) -> None:
    """
    Replace an identity's email with an invalidated placeholder email.

    This method is used to effectively "clear" an email while maintaining
    database constraints that require email uniqueness. The original email
    is replaced with an invalidated version that won't conflict with future
    registrations.

    Args:
        identity_id: UUID of the identity to clear the email for
        invalidated_email: Invalidated email format to replace the original
                         (e.g., "invalidated-{timestamp}@domain.com")

    Events Emitted:
        IdentityEmailCleared: When the email is successfully invalidated

    Examples:
        >>> import time
        >>> timestamp = int(time.time())
        >>> invalidated = f"invalidated-{timestamp}@example.com"
        >>> authentication_service.clear_identity_email(
        ...     identity_id=identity_id,
        ...     invalidated_email=invalidated
        ... )

    Use Cases:
        - GDPR compliance (email anonymization)
        - Freeing up email for re-registration
        - Deactivating accounts while preserving audit trails
        - Handling bounced/invalid email addresses

    Note:
        The invalidated email should follow a consistent pattern to avoid
        conflicts and maintain traceability for audit purposes.
    """
    command = ClearIdentityEmail(
        identity_id=identity_id, invalidated_email=invalidated_email
    )

    with self.unit_of_work:
        self.message_bus.handle(command)
create classmethod
create(app_name=None)

Create a new instance of the AuthenticationService with the default parameters.

Source code in components/authentication/public/api.py
@classmethod
def create(cls, app_name: AppName | None = None) -> "AuthenticationService":
    """
    Create a new instance of the AuthenticationService with the default parameters.
    """
    app_name = app_name or get_current_app_name()
    from components.authentication.internal.infrastructure.double_write_repository import (
        DoubleWriteAuthenticationRepository,
    )

    return AuthenticationService(
        unit_of_work=SQLAlchemyUnitOfWork(
            authentication_repository_factory=lambda session: DoubleWriteAuthenticationRepository(
                session=session, app_name=app_name
            )
        )
    )
create_identity
create_identity(
    *,
    email,
    language,
    first_name=None,
    last_name=None,
    profile_id=None,
    mfa_required=None
)

Create a new identity in the identity provider (Keycloak).

This method creates a new authentication identity and links it to a profile if provided. The identity is created in Keycloak and a corresponding record is stored in the authentication_identity table to maintain the link with the profile.

Parameters:

Name Type Description Default
email str

Valid email address for the identity (will be normalized)

required
language Lang

ISO639 language code for the identity's preferred language

required
first_name str | None

Optional first name of the user -> will be used in email sent by keycloak

None
last_name str | None

Optional last name of the user -> will be used in email sent by keycloak

None
profile_id UUID | None

Optional UUID of the associated profile (can be linked later)

None
mfa_required bool | None

Whether MFA is required for this identity (None = use default)

None

Returns:

Name Type Description
UUID UUID

The Keycloak identity ID of the newly created identity

Raises:

Type Description
conflict

If an identity with this email already exists

missing_resource

If identity creation fails

Events Emitted

IdentityCreatedEvent: When the identity is successfully created

Examples:

Create a new identity with all details:

>>> authentication_service = AuthenticationService.create()
>>> identity_id = authentication_service.create_identity(
...     email="user@example.com",
...     language=Lang.ENGLISH,
...     first_name="John",
...     last_name="Doe",
...     profile_id=profile_id,
...     mfa_required=False
... )

Create minimal identity without profile:

>>> identity_id = authentication_service.create_identity(
...     email="minimal@example.com",
...     language=Lang.FRENCH
... )
Source code in components/authentication/public/api.py
def create_identity(
    self,
    *,
    email: str,
    language: Lang,
    first_name: str | None = None,
    last_name: str | None = None,
    profile_id: UUID
    | None = None,  # optional for now since an identity can be created without a profile
    mfa_required: bool | None = None,
) -> UUID:
    """
    Create a new identity in the identity provider (Keycloak).

    This method creates a new authentication identity and links it to a profile if provided.
    The identity is created in Keycloak and a corresponding record is stored in the
    authentication_identity table to maintain the link with the profile.

    Args:
        email: Valid email address for the identity (will be normalized)
        language: ISO639 language code for the identity's preferred language
        first_name: Optional first name of the user -> will be used in email sent by keycloak
        last_name: Optional last name of the user   -> will be used in email sent by keycloak
        profile_id: Optional UUID of the associated profile (can be linked later)
        mfa_required: Whether MFA is required for this identity (None = use default)

    Returns:
        UUID: The Keycloak identity ID of the newly created identity

    Raises:
        BaseErrorCode.conflict: If an identity with this email already exists
        BaseErrorCode.missing_resource: If identity creation fails

    Events Emitted:
        IdentityCreatedEvent: When the identity is successfully created

    Examples:
        Create a new identity with all details:

        >>> authentication_service = AuthenticationService.create()
        >>> identity_id = authentication_service.create_identity(
        ...     email="user@example.com",
        ...     language=Lang.ENGLISH,
        ...     first_name="John",
        ...     last_name="Doe",
        ...     profile_id=profile_id,
        ...     mfa_required=False
        ... )

        Create minimal identity without profile:

        >>> identity_id = authentication_service.create_identity(
        ...     email="minimal@example.com",
        ...     language=Lang.FRENCH
        ... )
    """
    command = IdentityCreationCommand(
        profile_id=profile_id,
        email=email,
        first_name=first_name,
        last_name=last_name,
        language=language,
        mfa_required=mfa_required,
    )

    with self.unit_of_work:
        self.message_bus.handle(command)
        created_identity_id = self.unit_of_work.identity_provider.get_identity_id(
            email=email
        )
        if created_identity_id is None:
            raise BaseErrorCode.missing_resource(
                message=f"Identity with email {email} not found after creation"
            )

    return created_identity_id
delete_identity
delete_identity(identity_id)

Mark an identity for deletion.

This method prepares an identity for complete removal from both Keycloak and the local authentication_identity table. The actual deletion occurs when the database transaction commits.

⚠️ IMPORTANT: This operation is transactional - if the session is rolled back, the identity will not be deleted.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to delete

required

Examples:

>>> try:
...     authentication_service.delete_identity(identity_id)
...     # Other operations...
...     # If everything succeeds, identity will be deleted on commit
... except Exception:
...     print("Identity deletion prevented by rollback")
Use Cases
  • Account deactivation/closure
  • GDPR compliance (right to be forgotten)
  • Cleaning up test/duplicate identities
Note

Consider clearing or anonymizing related data before deletion to maintain referential integrity across the system.

Source code in components/authentication/public/api.py
def delete_identity(self, identity_id: UUID) -> None:
    """
    Mark an identity for deletion.

    This method prepares an identity for complete removal from both Keycloak
    and the local authentication_identity table. The actual deletion occurs
    when the database transaction commits.

    **⚠️ IMPORTANT**: This operation is transactional - if the session is
    rolled back, the identity will not be deleted.

    Args:
        identity_id: UUID of the identity to delete

    Examples:
        >>> try:
        ...     authentication_service.delete_identity(identity_id)
        ...     # Other operations...
        ...     # If everything succeeds, identity will be deleted on commit
        ... except Exception:
        ...     print("Identity deletion prevented by rollback")

    Use Cases:
        - Account deactivation/closure
        - GDPR compliance (right to be forgotten)
        - Cleaning up test/duplicate identities

    Note:
        Consider clearing or anonymizing related data before deletion to
        maintain referential integrity across the system.
    """
    command = DeleteIdentityCommand(identity_id=identity_id)

    with self.unit_of_work:
        self.message_bus.handle(command)
delete_identity_credentials
delete_identity_credentials(identity_id)

Remove the credentials (email and password) from an identity.

This effectively disables password-based authentication for the identity, forcing it to rely on external identity providers for authentication.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to remove credentials from

required

Examples:

>>> authentication_service.delete_identity_credentials(identity_id)
>>> has_password = authentication_service.check_identity_has_password(identity_id)
>>> print(f"Has password: {has_password}")  # False
Use Cases
  • Migrating from password to external authentication
  • Security incident response (disable compromised credentials)
  • Enforcing external authentication policies
Source code in components/authentication/public/api.py
def delete_identity_credentials(self, identity_id: UUID) -> None:
    """
    Remove the credentials (email and password) from an identity.

    This effectively disables password-based authentication for the identity,
    forcing it to rely on external identity providers for authentication.

    Args:
        identity_id: UUID of the identity to remove credentials from

    Examples:
        >>> authentication_service.delete_identity_credentials(identity_id)
        >>> has_password = authentication_service.check_identity_has_password(identity_id)
        >>> print(f"Has password: {has_password}")  # False

    Use Cases:
        - Migrating from password to external authentication
        - Security incident response (disable compromised credentials)
        - Enforcing external authentication policies
    """
    command = DeleteIdentityCredentialsCommand(
        identity_id=identity_id,
    )
    with self.unit_of_work:
        self.message_bus.handle(command)
exchange_token_for_user
exchange_token_for_user(target_client_id, email)

Exchange a service account token for a user token.

This method uses the service account's privileges to generate access and refresh tokens on behalf of a user. This is typically used for server-to-server authentication scenarios where you need to act as a specific user.

Parameters:

Name Type Description Default
target_client_id str

The Keycloak client ID to generate tokens for

required
email str

Email address of the user to impersonate

required

Returns:

Name Type Description
dict dict

Dictionary containing 'access_token' and 'refresh_token' keys

Raises:

Type Description
missing_resource

If no identity exists for the email

Examples:

>>> tokens = authentication_service.exchange_token_for_user(
...     target_client_id="mobile-app",
...     email="user@example.com"
... )
>>> access_token = tokens['access_token']
>>> refresh_token = tokens['refresh_token']
>>> headers = {'Authorization': f'Bearer {access_token}'}
Security Note

This is a privileged operation that should only be used in trusted server-side contexts. Never expose this functionality to client-side code.

Source code in components/authentication/public/api.py
def exchange_token_for_user(self, target_client_id: str, email: str) -> dict:  # type: ignore[type-arg]
    """
    Exchange a service account token for a user token.

    This method uses the service account's privileges to generate access and
    refresh tokens on behalf of a user. This is typically used for server-to-server
    authentication scenarios where you need to act as a specific user.

    Args:
        target_client_id: The Keycloak client ID to generate tokens for
        email: Email address of the user to impersonate

    Returns:
        dict: Dictionary containing 'access_token' and 'refresh_token' keys

    Raises:
        BaseErrorCode.missing_resource: If no identity exists for the email

    Examples:
        >>> tokens = authentication_service.exchange_token_for_user(
        ...     target_client_id="mobile-app",
        ...     email="user@example.com"
        ... )
        >>> access_token = tokens['access_token']
        >>> refresh_token = tokens['refresh_token']
        >>> headers = {'Authorization': f'Bearer {access_token}'}

    Security Note:
        This is a privileged operation that should only be used in trusted
        server-side contexts. Never expose this functionality to client-side code.
    """
    with self.unit_of_work:
        identity = self.unit_of_work.identity_provider.find_identity(email)

        if identity is None:
            raise BaseErrorCode.missing_resource(
                message=f"Identity with email {email} not found"
            )
        return self.unit_of_work.identity_provider.exchange_token_for_user(
            target_client_id=target_client_id, email=email
        )
get_identity_by_email
get_identity_by_email(email)

Retrieve an identity by its email address.

The email is automatically normalized before lookup to ensure consistent formatting across the system.

Parameters:

Name Type Description Default
email str

Email address to search for (will be normalized)

required

Returns:

Type Description
AuthIdentity | None

AuthIdentity | None: The identity with this email if found, None otherwise

Examples:

Email is automatically normalized:

>>> identity = authentication_service.get_identity_by_email("User@Example.COM")
>>> if identity:
...     print(f"Normalized email: {identity.email}")  # user@example.com
...     print(f"Has password: {identity.has_password}")
Source code in components/authentication/public/api.py
def get_identity_by_email(self, email: str) -> AuthIdentity | None:
    """
    Retrieve an identity by its email address.

    The email is automatically normalized before lookup to ensure consistent
    formatting across the system.

    Args:
        email: Email address to search for (will be normalized)

    Returns:
        AuthIdentity | None: The identity with this email if found, None otherwise

    Examples:
        Email is automatically normalized:

        >>> identity = authentication_service.get_identity_by_email("User@Example.COM")
        >>> if identity:
        ...     print(f"Normalized email: {identity.email}")  # user@example.com
        ...     print(f"Has password: {identity.has_password}")
    """
    with self.unit_of_work:
        domain_entity = self.unit_of_work.identity_provider.find_identity(
            normalize_email_address_format(email)
        )

    if domain_entity is None:
        return None
    return AuthIdentity.from_domain(domain_entity)
get_keycloak_identity
get_keycloak_identity(keycloak_id)

Retrieve an identity directly by its Keycloak ID.

This method queries Keycloak directly using the identity ID to retrieve the full identity information.

Parameters:

Name Type Description Default
keycloak_id UUID

UUID of the identity in Keycloak

required

Returns:

Type Description
AuthIdentity | None

AuthIdentity | None: The identity data if found in Keycloak, None otherwise

Examples:

>>> identity = authentication_service.get_keycloak_identity(identity_id)
>>> if identity:
...     print(f"Identity email: {identity.email}")
...     print(f"Language: {identity.language}")
Source code in components/authentication/public/api.py
def get_keycloak_identity(self, keycloak_id: UUID) -> AuthIdentity | None:
    """
    Retrieve an identity directly by its Keycloak ID.

    This method queries Keycloak directly using the identity ID to retrieve
    the full identity information.

    Args:
        keycloak_id: UUID of the identity in Keycloak

    Returns:
        AuthIdentity | None: The identity data if found in Keycloak, None otherwise

    Examples:
        >>> identity = authentication_service.get_keycloak_identity(identity_id)
        >>> if identity:
        ...     print(f"Identity email: {identity.email}")
        ...     print(f"Language: {identity.language}")
    """
    with self.unit_of_work:
        domain_entity = self.unit_of_work.identity_provider.get_identity(
            keycloak_id
        )

    if domain_entity is None:
        return None
    return AuthIdentity.from_domain(domain_entity)
get_keycloak_identity_by_profile_id
get_keycloak_identity_by_profile_id(profile_id)

Retrieve an identity by its associated profile_id.

This method looks up the authentication_identity table to find the Keycloak ID associated with the given profile_id, then retrieves the full identity data from Keycloak.

Parameters:

Name Type Description Default
profile_id UUID

UUID of the profile to find the associated identity for

required

Returns:

Type Description
AuthIdentity | None

AuthIdentity | None: The identity data if found, None if no identity is linked to this profile or if the identity doesn't exist in Keycloak

Examples:

Retrieve identity for a profile:

>>> identity = authentication_service.get_keycloak_identity_by_profile_id(profile_id)
>>> if identity:
...     print(f"Found identity: {identity.email}")
...     print(f"MFA required: {identity.mfa_required}")
... else:
...     print("No identity found for this profile")
Source code in components/authentication/public/api.py
def get_keycloak_identity_by_profile_id(
    self, profile_id: UUID
) -> AuthIdentity | None:
    """
    Retrieve an identity by its associated profile_id.

    This method looks up the authentication_identity table to find the Keycloak ID
    associated with the given profile_id, then retrieves the full identity data
    from Keycloak.

    Args:
        profile_id: UUID of the profile to find the associated identity for

    Returns:
        AuthIdentity | None: The identity data if found, None if no identity
                            is linked to this profile or if the identity
                            doesn't exist in Keycloak

    Examples:
        Retrieve identity for a profile:

        >>> identity = authentication_service.get_keycloak_identity_by_profile_id(profile_id)
        >>> if identity:
        ...     print(f"Found identity: {identity.email}")
        ...     print(f"MFA required: {identity.mfa_required}")
        ... else:
        ...     print("No identity found for this profile")
    """
    with self.unit_of_work:
        authentication_identity = (
            self.unit_of_work.authentication_repository.get_by_profile_id(
                profile_id
            )
        )
        if (
            authentication_identity is None
            or authentication_identity.keycloak_id is None
        ):
            return None
        domain_entity = self.unit_of_work.identity_provider.get_identity(
            authentication_identity.keycloak_id
        )
        if domain_entity is None:
            return None
    return AuthIdentity.from_domain(domain_entity)
get_or_raise_keycloak_identity
get_or_raise_keycloak_identity(identity_id)

Retrieve an identity by its Keycloak ID or raise an error if not found.

This is a convenience method for cases where the identity must exist.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity in Keycloak

required

Returns:

Name Type Description
AuthIdentity AuthIdentity

The identity data

Raises:

Type Description
login_error

If the identity is not found

Examples:

>>> try:
...     identity = authentication_service.get_or_raise_keycloak_identity(identity_id)
...     print(f"Found identity: {identity.email}")
... except BaseErrorCode:
...     print("Identity not found - login error")
Source code in components/authentication/public/api.py
def get_or_raise_keycloak_identity(self, identity_id: UUID) -> AuthIdentity:
    """
    Retrieve an identity by its Keycloak ID or raise an error if not found.

    This is a convenience method for cases where the identity must exist.

    Args:
        identity_id: UUID of the identity in Keycloak

    Returns:
        AuthIdentity: The identity data

    Raises:
        BaseErrorCode.login_error: If the identity is not found

    Examples:
        >>> try:
        ...     identity = authentication_service.get_or_raise_keycloak_identity(identity_id)
        ...     print(f"Found identity: {identity.email}")
        ... except BaseErrorCode:
        ...     print("Identity not found - login error")
    """
    identity = self.get_keycloak_identity(identity_id)
    if identity is None:
        raise BaseErrorCode.login_error()
    return identity
logout_identity_from_all_sessions
logout_identity_from_all_sessions(identity_id)

Force logout of an identity from all active sessions.

This method invalidates all active sessions for the specified identity across all clients and devices. The user will need to re-authenticate for any further requests.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity to logout from all sessions

required

Examples:

>>> authentication_service.logout_identity_from_all_sessions(identity_id)
>>> print("User logged out from all sessions")
Use Cases
  • Security incident response (compromised account)
  • Password changes requiring re-authentication
  • Account suspension/deactivation
  • Administrative logout for policy enforcement
  • User-requested "logout everywhere" feature
Note

This affects all active sessions including web browsers, mobile apps, and API tokens. The logout is immediate and cannot be undone.

Source code in components/authentication/public/api.py
def logout_identity_from_all_sessions(self, identity_id: UUID) -> None:
    """
    Force logout of an identity from all active sessions.

    This method invalidates all active sessions for the specified identity
    across all clients and devices. The user will need to re-authenticate
    for any further requests.

    Args:
        identity_id: UUID of the identity to logout from all sessions

    Examples:
        >>> authentication_service.logout_identity_from_all_sessions(identity_id)
        >>> print("User logged out from all sessions")

    Use Cases:
        - Security incident response (compromised account)
        - Password changes requiring re-authentication
        - Account suspension/deactivation
        - Administrative logout for policy enforcement
        - User-requested "logout everywhere" feature

    Note:
        This affects all active sessions including web browsers, mobile apps,
        and API tokens. The logout is immediate and cannot be undone.
    """
    command = LogOutIdentityFromAllSessionsCommand(
        identity_id=identity_id,
    )
    with self.unit_of_work:
        self.message_bus.handle(command)
message_bus instance-attribute
message_bus = message_bus or MessageBus(
    unit_of_work,
    injected_event_handlers,
    injected_command_handlers,
)
refresh_exchanged_token
refresh_exchanged_token(refresh_token, target_client_id)

Refresh an exchanged token to get a new access token.

This method uses a refresh token (obtained from exchange_token_for_user) to generate a new access token and refresh token pair. This extends the user session without requiring re-authentication.

Parameters:

Name Type Description Default
refresh_token str

The refresh token from a previous token exchange

required
target_client_id str

The Keycloak client ID the tokens are for

required

Returns:

Name Type Description
dict dict

Dictionary containing new 'access_token' and 'refresh_token'

Examples:

>>> initial_tokens = authentication_service.exchange_token_for_user(
...     target_client_id="mobile-app",
...     email="user@example.com"
... )
>>> # Later, when access token expires
>>> refreshed_tokens = authentication_service.refresh_exchanged_token(
...     refresh_token=initial_tokens['refresh_token'],
...     target_client_id="mobile-app"
... )
>>> new_access_token = refreshed_tokens['access_token']
Token Lifecycle
  1. exchange_token_for_user() → initial tokens
  2. Use access_token for API calls
  3. When access_token expires → refresh_exchanged_token()
  4. Repeat step 3 until refresh_token expires
Source code in components/authentication/public/api.py
def refresh_exchanged_token(
    self, refresh_token: str, target_client_id: str
) -> dict:  # type: ignore[type-arg]
    """
    Refresh an exchanged token to get a new access token.

    This method uses a refresh token (obtained from exchange_token_for_user)
    to generate a new access token and refresh token pair. This extends the
    user session without requiring re-authentication.

    Args:
        refresh_token: The refresh token from a previous token exchange
        target_client_id: The Keycloak client ID the tokens are for

    Returns:
        dict: Dictionary containing new 'access_token' and 'refresh_token'

    Examples:
        >>> initial_tokens = authentication_service.exchange_token_for_user(
        ...     target_client_id="mobile-app",
        ...     email="user@example.com"
        ... )
        >>> # Later, when access token expires
        >>> refreshed_tokens = authentication_service.refresh_exchanged_token(
        ...     refresh_token=initial_tokens['refresh_token'],
        ...     target_client_id="mobile-app"
        ... )
        >>> new_access_token = refreshed_tokens['access_token']

    Token Lifecycle:
        1. exchange_token_for_user() → initial tokens
        2. Use access_token for API calls
        3. When access_token expires → refresh_exchanged_token()
        4. Repeat step 3 until refresh_token expires
    """
    with self.unit_of_work:
        return self.unit_of_work.identity_provider.refresh_exchanged_token(
            refresh_token=refresh_token,
            target_client_id=target_client_id,
        )
send_password_reset_email
send_password_reset_email(
    identity_id, *, email, client_id, redirect_uri
)

Send a password reset email to an identity.

This triggers Keycloak to send a password reset email to the specified identity. The email parameter is used for verification to ensure the request is legitimate.

Parameters:

Name Type Description Default
identity_id UUID

UUID of the identity requesting password reset

required
email str

Email address of the identity (for verification)

required
client_id str

The client ID for the reset flow

required
redirect_uri str | None

Optional URI to redirect to after reset completion

required
Events Emitted

PasswordResetEmailSentEvent: When the reset email is successfully sent

Examples:

>>> authentication_service.send_password_reset_email(
...     identity_id=identity_id,
...     email="user@example.com",
...     client_id="web-app",
...     redirect_uri="https://app.example.com/reset-complete"
... )
Security Note

The email parameter serves as a verification step - the identity must own the email address to receive the reset link.

Source code in components/authentication/public/api.py
def send_password_reset_email(
    self, identity_id: UUID, *, email: str, client_id: str, redirect_uri: str | None
) -> None:
    """
    Send a password reset email to an identity.

    This triggers Keycloak to send a password reset email to the specified
    identity. The email parameter is used for verification to ensure the
    request is legitimate.

    Args:
        identity_id: UUID of the identity requesting password reset
        email: Email address of the identity (for verification)
        client_id: The client ID for the reset flow
        redirect_uri: Optional URI to redirect to after reset completion

    Events Emitted:
        PasswordResetEmailSentEvent: When the reset email is successfully sent

    Examples:
        >>> authentication_service.send_password_reset_email(
        ...     identity_id=identity_id,
        ...     email="user@example.com",
        ...     client_id="web-app",
        ...     redirect_uri="https://app.example.com/reset-complete"
        ... )

    Security Note:
        The email parameter serves as a verification step - the identity must
        own the email address to receive the reset link.
    """
    command = SendPasswordResetEmailCommand(
        identity_id=identity_id,
        email=email,
        client_id=client_id,
        redirect_uri=redirect_uri,
    )

    with self.unit_of_work:
        self.message_bus.handle(command)
send_verification_email
send_verification_email(
    *, client_id, redirect_uri, identity_id=None, email=None
)

Send an email verification message to an identity.

This triggers Keycloak to send a verification email to the user. Either identity_id or email must be provided. If both are given, identity_id takes precedence.

Parameters:

Name Type Description Default
client_id str

The client ID for the verification flow

required
redirect_uri str | None

Optional URI to redirect to after verification

required
identity_id UUID | None

UUID of the identity to send verification to (preferred)

None
email str | None

Email address to send verification to (alternative)

None

Raises:

Type Description
missing_resource

If neither identity_id nor email is provided, or if the specified identity/email is not found

Examples:

Send verification by identity ID (preferred):

>>> authentication_service.send_verification_email(
...     identity_id=identity_id,
...     client_id="web-app",
...     redirect_uri="https://app.example.com/verify-success"
... )

Send verification by email (fallback):

>>> authentication_service.send_verification_email(
...     email="user@example.com",
...     client_id="web-app",
...     redirect_uri=None  # Use default redirect
... )
Note

The actual email sending is handled by Keycloak's email verification system.

Source code in components/authentication/public/api.py
def send_verification_email(
    self,
    *,
    client_id: str,
    redirect_uri: str | None,
    identity_id: UUID | None = None,
    email: str | None = None,
) -> None:
    """
    Send an email verification message to an identity.

    This triggers Keycloak to send a verification email to the user. Either
    identity_id or email must be provided. If both are given, identity_id
    takes precedence.

    Args:
        client_id: The client ID for the verification flow
        redirect_uri: Optional URI to redirect to after verification
        identity_id: UUID of the identity to send verification to (preferred)
        email: Email address to send verification to (alternative)

    Raises:
        BaseErrorCode.missing_resource: If neither identity_id nor email is provided,
                                      or if the specified identity/email is not found

    Examples:
        Send verification by identity ID (preferred):

        >>> authentication_service.send_verification_email(
        ...     identity_id=identity_id,
        ...     client_id="web-app",
        ...     redirect_uri="https://app.example.com/verify-success"
        ... )

        Send verification by email (fallback):

        >>> authentication_service.send_verification_email(
        ...     email="user@example.com",
        ...     client_id="web-app",
        ...     redirect_uri=None  # Use default redirect
        ... )

    Note:
        The actual email sending is handled by Keycloak's email verification system.
    """
    command = SendVerificationEmailCommand(
        identity_id=identity_id,
        email=email,
        client_id=client_id,
        redirect_uri=redirect_uri,
    )

    with self.unit_of_work:
        self.message_bus.handle(command)
set_identity_credentials
set_identity_credentials(
    identity_id,
    *,
    email,
    prehashed_password,
    is_email_verified,
    mfa_required=None
)

Set or update the credentials (email and password) for an existing identity.

⚠️ IMPORTANT: The identity must already exist. Use create_identity() first if you need to create a new identity.

This method handles several scenarios: 1. Same email: Updates password for existing identity 2. New available email: Updates both email and password 3. Email conflict: Raises error if email belongs to different identity

Parameters:

Name Type Description Default
identity_id UUID

UUID of the existing identity to update

required
email str

Email address to set (will be normalized)

required
prehashed_password str

Pre-hashed password to set

required
is_email_verified bool

Whether the email has been verified

required
mfa_required bool | None

MFA requirement setting (None = no change)

None

Raises:

Type Description
missing_resource

If the identity doesn't exist

conflict

If email is already used by different identity

Examples:

Set initial credentials for new identity:

>>> authentication_service.set_identity_credentials(
...     identity_id=identity_id,
...     email="user@example.com",
...     prehashed_password=hashed_password,
...     is_email_verified=True,
...     mfa_required=False
... )

Update password for existing identity:

>>> authentication_service.set_identity_credentials(
...     identity_id=identity_id,
...     email="user@example.com",  # same email
...     prehashed_password=new_hashed_password,
...     is_email_verified=True
... )
Source code in components/authentication/public/api.py
def set_identity_credentials(
    self,
    identity_id: UUID,
    *,
    email: str,
    prehashed_password: str,
    is_email_verified: bool,
    mfa_required: bool | None = None,
) -> None:
    """
    Set or update the credentials (email and password) for an existing identity.

    **⚠️ IMPORTANT**: The identity must already exist. Use create_identity() first
    if you need to create a new identity.

    This method handles several scenarios:
    1. **Same email**: Updates password for existing identity
    2. **New available email**: Updates both email and password
    3. **Email conflict**: Raises error if email belongs to different identity

    Args:
        identity_id: UUID of the existing identity to update
        email: Email address to set (will be normalized)
        prehashed_password: Pre-hashed password to set
        is_email_verified: Whether the email has been verified
        mfa_required: MFA requirement setting (None = no change)

    Raises:
        BaseErrorCode.missing_resource: If the identity doesn't exist
        BaseErrorCode.conflict: If email is already used by different identity

    Examples:
        Set initial credentials for new identity:

        >>> authentication_service.set_identity_credentials(
        ...     identity_id=identity_id,
        ...     email="user@example.com",
        ...     prehashed_password=hashed_password,
        ...     is_email_verified=True,
        ...     mfa_required=False
        ... )

        Update password for existing identity:

        >>> authentication_service.set_identity_credentials(
        ...     identity_id=identity_id,
        ...     email="user@example.com",  # same email
        ...     prehashed_password=new_hashed_password,
        ...     is_email_verified=True
        ... )
    """
    command = ChangeIdentityCredentialsCommand(
        identity_id=identity_id,
        email=email,
        prehashed_password=prehashed_password,
        is_email_verified=is_email_verified,
        mfa_required=mfa_required,
    )
    with self.unit_of_work:
        self.message_bus.handle(command)
unit_of_work instance-attribute
unit_of_work = unit_of_work or SQLAlchemyUnitOfWork()

blueprints

blueprint

AuthBlueprint
AuthBlueprint(mfa, *args, **kwargs)

Bases: CustomBlueprint

Base blueprint for authentication.

Initialize the blueprint.

Source code in components/authentication/public/blueprints/blueprint.py
def __init__(self, mfa: MFA | None, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    """
    Initialize the blueprint.
    """
    super().__init__(*args, **kwargs)
    self._mfa = mfa
log_api_call classmethod
log_api_call(method, kwargs)

Log the API call.

Source code in components/authentication/public/blueprints/blueprint.py
@classmethod
def log_api_call(cls, method, kwargs) -> None:  # type: ignore[no-untyped-def]
    """
    Log the API call.
    """
    log_api_call(
        controller_name=cls.__name__,
        method=method,
        kwargs=kwargs,
    )
register_mfa_notifier
register_mfa_notifier(notifier)

Register a notifier for MFA.

Source code in components/authentication/public/blueprints/blueprint.py
def register_mfa_notifier(self, notifier: Notifier) -> None:
    """
    Register a notifier for MFA.
    """
    if self._mfa:
        self._mfa.register_notifier(notifier)
route
route(rule, **options)

Route decorator that logs the API call.

Source code in components/authentication/public/blueprints/blueprint.py
def route(self, rule: str, **options: Any) -> Callable[[_VT], _VT]:
    """
    Route decorator that logs the API call.
    """

    def decorator(f):  # type: ignore[no-untyped-def]
        @wraps(f)
        def decorated_function(*args, **kwargs):  # type: ignore[no-untyped-def]
            self.log_api_call(
                method=f.__name__,
                kwargs=kwargs,
            )

            return f(*args, **kwargs)

        return super(AuthBlueprint, self).route(rule, **options)(decorated_function)

    return decorator
create_auth_blueprint
create_auth_blueprint(
    token_auth, user_password_changed=None, enable_mfa=False
)

Create an auth blueprint.

Source code in components/authentication/public/blueprints/blueprint.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
def create_auth_blueprint(
    token_auth: TokenAuth,
    user_password_changed: Callable[[BaseUser], None] | None = None,
    enable_mfa: bool = False,
) -> AuthBlueprint:
    """
    Create an auth blueprint.
    """
    mfa: MFA | None = None
    non_blocking_auth = TokenAuth(
        cookie_name="token",
        optional=True,
    )

    if enable_mfa:
        mfa = create_mfa()
        mfa_required_if_enabled = mfa.mfa_required
    else:
        mfa_required_if_enabled = _mfa_disabled_decorator

    auth_blueprint = AuthBlueprint(
        name="auth",
        import_name=__name__,
        mfa=mfa,
    )

    def handle_user_password_changed(user) -> None:  # type: ignore[no-untyped-def]
        user.revoke_all_tokens()
        if user_password_changed:
            user_password_changed(user)

    def login_required_via_idp(f):  # type: ignore[no-untyped-def]
        """
        We create a decorator, so this code can be executed before the mfa decorator.
        Indeed, the mfa decorator throws if the current user is not set in globals.
        In dev, this call is stubbed. See find_identity_provider_user_id_from_token() below
        """

        @wraps(f)
        def decorated_function(*args, **kwargs):  # type: ignore[no-untyped-def]
            from shared.models.mixins.user import BaseUser

            reset_auth_globals()

            user_class = get_current_class(BaseUser)
            user, error = user_class.verify_keycloak_access_token(
                request.json.get("access_token")  # type: ignore[union-attr]
            )
            if error:
                current_logger.warning("Login error", error=error)
                # We don't expose details about login error to the client to avoid leaking sensitive information
                return make_json_response(
                    dict(
                        alancode=error.alancode,
                        alandesc=error.alandesc,
                        type=error.type,
                    ),
                    401,
                )

            set_auth_globals_for_regular_user(user)

            refresh_token_type = request.json.get(  # type: ignore[union-attr]
                "refresh_token_type", RefreshTokenType.mobile
            )

            res = f(*args, **kwargs)
            if isinstance(res, Response):
                if res.status_code < 300:
                    raise RuntimeError(
                        "DataAuth.login_required_via_idp callbacks should not return a non-error response."
                    )
                return res

            return g.current_user.create_session_tokens_and_make_response(
                refresh_token_type=refresh_token_type
            )

        return decorated_function

    @auth_blueprint.route("/login", methods=["POST"])
    @exponential_backoff(
        ignore_alan_code=[
            # Ignore multi-factor authentication error (do not impact
            # rate limiting).
            # Why ignoring this error: In this specific case, credentials are
            # valid (said differently: credential authentication is a success),
            # but the operation is rejected because of a missing MFA
            # operation id.
            # It makes sense to ignore rate limiting in this specific case as
            # the MFA validation is already rate limited, so brute forcing the
            # MFA validation is already handled.
            PENDING_MFA_ALAN_CODE,
        ],
    )
    @handle_with_api_error_handler
    @data_auth.login_required  # type: ignore[misc]
    @mfa_required_if_enabled(  # type: ignore[misc]
        op_description="Connexion à votre compte Alan",
        op_type="CONNECTION",
    )
    @obs.api_call()
    def login() -> None:
        """
        Login route.

        Requires a valid login payload, sends back auth and refresh tokens. See `data_auth.login_required`
        for more details.
        """
        return

    # Find in datadog using @request.path:/auth/login_idp
    @auth_blueprint.route("/login_idp", methods=["POST"])
    @exponential_backoff(
        ignore_alan_code=[
            # Ignore multi-factor authentication error (do not impact
            # rate limiting).
            # Why ignoring this error: In this specific case, credentials are
            # valid (said differently: credential authentication is a success),
            # but the operation is rejected because of a missing MFA
            # operation id.
            # It makes sense to ignore rate limiting in this specific case as
            # the MFA validation is already rate limited, so brute forcing the
            # MFA validation is already handled.
            PENDING_MFA_ALAN_CODE,
        ],
    )
    @handle_with_api_error_handler
    @login_required_via_idp  # type: ignore[misc]
    @mfa_required_if_enabled(  # type: ignore[misc]
        op_description="Connexion à votre compte Alan",
        op_type="CONNECTION",
    )
    @obs.api_call()
    def login_idp() -> None:
        return

    # Find in datadog using @request.path:/auth/deduce_countries
    @auth_blueprint.route("/deduce_countries", methods=["POST"])
    @handle_with_api_error_handler
    @obs.api_call()
    def _deduce_countries():  # type: ignore[no-untyped-def]
        parser = reqparse.RequestParser()
        parser.add_argument(
            "email",
            type=str,
            help="Email",
            required=True,
        )
        email = parser.parse_args(strict=False).get("email").lower().strip()

        identity_provider = get_identity_provider()
        if not identity_provider.find_identity_id_from_token(
            request.json.get("access_token")  # type: ignore[union-attr]
        ):
            return make_json_response({}, 401)

        countries = current_session.execute(
            text(
                """
                SELECT 'fr' FROM public.user where email=:email
                UNION SELECT 'be' FROM be.user where email=:email
                UNION SELECT 'es' FROM es.user where email=:email
                UNION SELECT 'ca' FROM ca.user where email=:email
            """
            ),
            params={"email": email},
        ).fetchall()

        countries = [country[0] for country in countries]
        return make_json_response(countries)

    @auth_blueprint.route("/refresh", methods=["POST"])
    @handle_with_api_error_handler
    @exponential_backoff()
    @obs.api_call()
    def refresh():  # type: ignore[no-untyped-def]
        refresh_token_type = request.get_json().get("refresh_token_type")  # type: ignore[union-attr]
        if refresh_token_type == RefreshTokenType.web:
            return _refresh_web()
        else:
            if "Authorization" not in request.headers:
                current_logger.warning("No Authorization header in request")
                # _refresh_mobile() will throw a 401

            return _refresh_mobile()

    @auth_blueprint.route("/check_mfa", methods=["POST"])
    @handle_with_api_error_handler
    @exponential_backoff()
    @obs.api_call()
    def check_mfa():  # type: ignore[no-untyped-def]
        if not mfa:
            abort(404)

        parser = reqparse.RequestParser()
        parser.add_argument(
            "operation_id",
            type=UUID,
            help="Pending operation ID",
            required=True,
        )
        parser.add_argument(
            "nonce",
            type=UUID,
            help="Pending operation nonce",
            required=True,
        )

        params = parser.parse_args(strict=False)
        try:
            pending_operation_status = mfa.get_operation_status(
                operation_id=params.pop("operation_id"),
                nonce=params.pop("nonce"),
            )
            return make_json_response({"status": str(pending_operation_status)})

        except MFAError as mfa_error:
            return make_json_response({"error": mfa_error.error})

    @auth_blueprint.route("/send_mfa_email", methods=["POST"])
    @handle_with_api_error_handler
    @exponential_backoff()
    @obs.api_call()
    def send_mfa_validation_code_email():  # type: ignore[no-untyped-def]
        if not mfa:
            abort(404)

        parser = reqparse.RequestParser()
        parser.add_argument(
            "operation_id",
            type=UUID,
            help="Pending operation ID",
            required=True,
        )
        parser.add_argument(
            "nonce",
            type=UUID,
            help="Pending operation nonce",
            required=True,
        )

        params = parser.parse_args(strict=False)

        try:
            mfa.send_email(
                operation_id=params.pop("operation_id"),
                nonce=params.pop("nonce"),
            )
        except MFAError as mfa_error:
            return make_json_response({"error": mfa_error.error}, 400)

        return make_empty_response()

    @auth_blueprint.route("/validate_mfa", methods=["POST"])
    @handle_with_api_error_handler
    @non_blocking_auth.login_required
    @exponential_backoff()
    @obs.api_call()
    def validate_mfa():  # type: ignore[no-untyped-def]
        if not mfa:
            abort(404)

        RefreshToken = get_current_class(BaseRefreshToken)
        parser = reqparse.RequestParser()
        parser.add_argument(
            "operation_id",
            type=UUID,
            help="Pending operation ID",
            required=True,
        )
        parser.add_argument(
            "validation_code",
            type=int,
            help="Validation code",
            required=False,
        )

        params = parser.parse_args(strict=False)
        is_mobile_authenticated = (
            g.current_user is not None
            and g.session_id is not None
            and current_session.query(RefreshToken)  # noqa: ALN085
            .filter(RefreshToken.session_id == g.session_id)
            .one()
            .token_type
            == "mobile"
        )
        try:
            if mfa.validate_or_reject(
                operation_id=params.pop("operation_id"),
                validation_code=params.pop("validation_code", None),
                authenticated=is_mobile_authenticated,
                authenticated_user=(
                    g.current_user.id if is_mobile_authenticated else None
                ),
            ):
                return make_empty_response()
            else:
                return make_json_response({"error": "WRONG_CODE"}, 400)

        except MFAError as mfa_error:
            return make_json_response({"error": mfa_error.error}, 400)

    @auth_blueprint.route("/reject_mfa", methods=["POST"])
    @handle_with_api_error_handler
    @token_auth.login_required
    @exponential_backoff()
    @obs.api_call()
    def reject_mfa():  # type: ignore[no-untyped-def]
        if not mfa:
            abort(404)

        parser = reqparse.RequestParser()
        parser.add_argument(
            "operation_id",
            type=UUID,
            help="Pending operation ID",
            required=True,
        )

        params = parser.parse_args(strict=False)
        try:
            mfa.validate_or_reject(
                operation_id=params.pop("operation_id"),
                reject=True,
                authenticated=True,
                authenticated_user=g.current_user.id,
            )
            return make_empty_response()

        except MFAError as mfa_error:
            return make_json_response({"error": mfa_error.error}, 400)

    @refresh_auth.login_required
    def _refresh_web():  # type: ignore[no-untyped-def]
        """
        Token refresh route.

        Requires a valid refresh token, sets the refresh token in a cookie and sends back an updated auth token.
        Invalidates the old refresh token.
        """
        refresh_token = request.cookies.get("refresh_token")
        token_dict = g.current_user.refresh_session_tokens(refresh_token).to_dict()
        new_refresh_token = token_dict.pop("refresh_token")
        return g.current_user.set_cookie_and_make_response(
            refresh_token=new_refresh_token,
            token=token_dict["token"],
            response_dict=token_dict,
        )

    @refresh_mobile_auth.login_required
    def _refresh_mobile():  # type: ignore[no-untyped-def]
        """
        Token refresh route for mobile.

        Requires a valid refresh token, sends back a new refresh token and an updated auth token.
        Invalidates the old refresh token.
        """
        # At this point, the login_required decorator has already successfully validated the presented refresh token
        # and set the current user in globals.

        _, refresh_token = request.headers["Authorization"].split(None, 1)
        token_dict = g.current_user.refresh_session_tokens(refresh_token).to_dict()
        return make_json_response(token_dict)

    @auth_blueprint.route("/logout", methods=["DELETE"])
    @handle_with_api_error_handler
    @exponential_backoff()
    @obs.api_call()
    def logout():  # type: ignore[no-untyped-def]
        """
        Logout route.

        Revoke the current tokens (auth and refresh).
        """
        parser = reqparse.RequestParser()
        parser.add_argument(
            "refresh_token",
            type=str,
            help="the refresh token to invalidate",
        )
        parser.add_argument(
            "refresh_token_type",
            type=str,
            help="type of the refresh token: web or mobile",
        )
        args = parser.parse_args(strict=False)

        if args.get("refresh_token_type") == "mobile":
            return _logout_mobile(args.get("refresh_token"))
        else:
            return _logout_web()

    def _logout_mobile(refresh_token_string: str | None) -> Response:
        if refresh_token_string:
            _delete_refresh_token(filter_by={"token": refresh_token_string})

        return make_empty_response()

    def _logout_web() -> Response:
        RefreshToken = get_current_class(BaseRefreshToken)
        response = make_empty_response()

        auth_token_string = request.cookies.get("token")
        if auth_token_string:
            user, session_id, _ = RefreshToken.user_cls().parse_token(auth_token_string)
            if user and session_id:
                _delete_refresh_token(
                    filter_by={"user_id": user.id, "session_id": session_id}
                )
        delete_cookie(name="refresh_token", response=response)
        delete_cookie(name="token", response=response)

        return response

    def _delete_refresh_token(filter_by: dict) -> None:  # type: ignore[type-arg]
        RefreshToken = get_current_class(BaseRefreshToken)
        refresh_token = (
            current_session.query(RefreshToken).filter_by(**filter_by).one_or_none()  # noqa: ALN085
        )
        if refresh_token is not None:
            current_logger.info(
                f"revoking session for user {refresh_token.user.id}, session {refresh_token.session_id} ({refresh_token.token_type})",
                user_id=refresh_token.user.id,
                session_id=refresh_token.session_id,
                session_type=refresh_token.token_type,
            )

            current_session.delete(refresh_token)

            publish_user_session_ended(
                user=refresh_token.user,
                session_id=refresh_token.session_id,
                session_type=refresh_token.token_type,
            )

            current_session.commit()

    # TODO: @thibaut.caillierez: deprecate this once the password reset flow is completely done in keycloak
    @auth_blueprint.route("/password_reset", methods=["POST"])
    @handle_with_api_error_handler
    @exponential_backoff()
    @obs.api_call()
    def password_reset():  # type: ignore[no-untyped-def]
        from ddtrace.appsec import track_user_sdk

        parser = reqparse.RequestParser()
        parser.add_argument(
            "token",
            type=str,
            help="Password reset token sent in email",
            required=True,
        )
        parser.add_argument(
            "password",
            type=str,
            help="Password of the user",
            required=False,
        )
        parser.add_argument(
            "prehashed_password",
            type=str,
            help="Password of the user, pre-hashed on client side",
            required=True,
        )
        parser.add_argument(
            "refresh_token_type",
            type=str,
            help="type of the refresh token: web or mobile",
            required=True,
        )
        params = parser.parse_args(strict=True)

        token = params.pop("token", None)

        profile_service = ProfileService.create()

        auth_identity = verify_token_and_reset_password(
            token, params.get("prehashed_password")
        )
        profile = profile_service.get_profile_by_email(email=auth_identity.email)

        if auth_identity is None:
            raise BaseErrorCode.token_error(error="bad reset token")

        if profile is None:
            raise BaseErrorCode.missing_resource(
                f"No profile match the keycloak identity {auth_identity.id}"
            )

        user_cls: type[BaseUser] = get_current_class(BaseRefreshToken).user_cls()  # type: ignore[assignment]
        user = current_session.scalars(
            select(user_cls).where(user_cls.profile_id == profile.id)
        ).one_or_none()

        if user is None:
            raise BaseErrorCode.missing_resource(
                f"No user match the keycloak identity {auth_identity.id} and profile {profile.id}"
            )

        handle_user_password_changed(user)

        # We need to set user even if the user is not logged in for user attribution in Datadog
        set_member_user(
            user_id=str(user.id),
            email=profile.email,
        )
        track_user_sdk.track_custom_event(
            event_name=DatadogEventName.PASSWORD_CHANGE_SUCCESS.value,
            metadata={
                "usr.id": user.id,
                "usr.login": auth_identity.email,
                "usr.email": profile.email,
                "exists": True,
            },
        )

        # When password reset request, we don't create a session to force the user to login
        # So as someone who manages to steal this link, won't be able to log in (as they don't know the email)
        return make_empty_response()

    @auth_blueprint.route("/password_change", methods=["POST"])
    @handle_with_api_error_handler
    @token_auth.login_required
    @obs.api_call()
    def password_change():  # type: ignore[no-untyped-def]
        from ddtrace.appsec import track_user_sdk

        parser = reqparse.RequestParser()
        parser.add_argument(
            "password",
            type=str,
            help="New password of the user",
            required=False,
        )
        parser.add_argument(
            "prehashed_password",
            type=str,
            help="New password of the user, pre-hashed on client side",
            required=True,
        )
        parser.add_argument(
            "current_password",
            type=str,
            help="Current password of the user",
            required=False,
        )
        parser.add_argument(
            "current_prehashed_password",
            type=str,
            help="Current password of the user, pre-hashed on client side",
            required=True,
        )
        parser.add_argument(
            "refresh_token_type",
            type=str,
            help="type of the refresh token: web or mobile",
            required=True,
        )
        # This endpoint was originally used only for weak password changes.
        # The 'default' option below should be removed after unifying endpoints.
        parser.add_argument(
            "password_change_reason",
            type=str,
            help="Reason why the password needs to be changed",
            required=False,
            default="weak_password",
            choices=["weak_password", "requested_by_user"],
        )

        params = parser.parse_args(strict=True)

        user = g.current_user

        # If current password is specified, we check that it matches the
        # user password or return an error
        current_prehashed_password = params.get("current_prehashed_password")
        new_prehashed_password = params.get("prehashed_password")

        if new_prehashed_password is None:
            return make_json_response({"error": "new password is required"}, 400)

        authentication_service = AuthenticationService.create()
        identity = authentication_service.get_keycloak_identity_by_profile_id(
            profile_id=user.profile_id
        )
        if identity is None:
            raise ValueError(
                f"No identity found for user {user.id}, profile_id={user.profile_id}"
            )
        if not authentication_service.check_identity_password(
            identity_id=identity.id, prehashed_password=current_prehashed_password
        ):
            return make_json_response({"error": "wrong current password"}, 400)
        else:
            authentication_service.set_identity_credentials(
                identity_id=identity.id,
                prehashed_password=new_prehashed_password,
                email=identity.email,
                is_email_verified=True,
            )

        current_session.commit()

        current_logger.info(f"Password updated: {params['password_change_reason']}")

        handle_user_password_changed(user)

        track_user_sdk.track_custom_event(
            event_name=DatadogEventName.PASSWORD_CHANGE_SUCCESS.value,
            metadata={
                "usr.id": user.id,
                "usr.login": user.email,
                "usr.email": user.email,
                "exists": True,
            },
        )

        # Issue new tokens just for the calling session
        return user.create_session_tokens_and_make_response(
            refresh_token_type=params["refresh_token_type"]
        )

    @auth_blueprint.route("/mfa", methods=["GET"])
    @handle_with_api_error_handler
    @token_auth.login_required
    @obs.api_call()
    def get_mfa_status():  # type: ignore[no-untyped-def]
        authentication_service = AuthenticationService.create()
        if not mfa:
            abort(404)

        user = g.current_user
        identity = authentication_service.get_keycloak_identity_by_profile_id(
            user.profile_id
        )

        if not identity:
            abort(500)

        return make_json_response(
            {
                "enabled": identity.mfa_enabled,
                "required": identity.mfa_required,
            }
        )

    @auth_blueprint.route("/mfa", methods=["PATCH"])
    @handle_with_api_error_handler
    @token_auth.login_required
    @obs.api_call()
    def enable_disable_mfa():  # type: ignore[no-untyped-def]
        from ddtrace.appsec import track_user_sdk

        authentication_service = AuthenticationService.create()

        if not mfa:
            abort(404)

        parser = reqparse.RequestParser()
        parser.add_argument(
            "enabled",
            type=bool,
            help="MFA enabled",
            required=True,
        )

        params = parser.parse_args(strict=True)

        user = g.current_user
        identity = authentication_service.get_keycloak_identity_by_profile_id(
            user.profile_id
        )

        if not identity:
            abort(500)

        # Check if MFA is required and the user is trying to disable it
        if not params.enabled and identity.mfa_required:
            abort(403)

        authentication_service.change_mfa_status(
            identity_id=identity.id, mfa_enabled=params.enabled
        )
        ## When MFA is enabled, should disabling it be a protected operation???

        current_session.commit()

        datadog_event_name = (
            DatadogEventName.MFA_ENABLED.value
            if params.enabled
            else DatadogEventName.MFA_DISABLED.value
        )
        track_user_sdk.track_custom_event(
            event_name=datadog_event_name,
            metadata={
                "usr.id": user.id,
                "usr.login": identity.email,
                "usr.email": user.email,
            },
        )
        return make_empty_response()

    @auth_blueprint.route("/exchange_token", methods=["POST"])
    @handle_with_api_error_handler
    @token_auth.login_required
    @exponential_backoff()
    @deprecated(
        "This endpoint is deprecated and will be removed in a future version. Please use the /api/auth/tokens/exchange_token instead.",
        category=AlanDeprecationWarning,
    )
    @obs.api_call()
    def exchange_token():  # type: ignore[no-untyped-def]
        """
        Exchange backend token for Keycloak tokens using service account

        Uses the service account to authenticate with Keycloak and perform a token exchange
        using the current user's email as the subject.
        """
        parser = reqparse.RequestParser()
        parser.add_argument(
            "target_client_id",
            type=str,
            help="Target client ID for the token exchange",
            required=True,
        )
        # This argument is optional as it's not really used
        # It's here to be compliant with our frontend auth API
        parser.add_argument(
            "refresh_token_type",
            type=str,
            help="type of the refresh token: web or mobile",
            required=False,
        )
        params = parser.parse_args(strict=True)

        user = g.current_user

        # Use the IdentityProvider to exchange token for the user
        result = exchange_token_for_user(
            target_client_id=params.get("target_client_id"), email=user.email
        )

        # Return the tokens
        return make_json_response(result)

    @auth_blueprint.route("/refresh_token_exchange", methods=["POST"])
    @handle_with_api_error_handler
    @token_auth.login_required
    @exponential_backoff()
    @deprecated(
        "This endpoint is deprecated and will be removed in a future version. Please use the /api/auth/tokens/refresh_token_exchange instead.",
        category=AlanDeprecationWarning,
    )
    @obs.api_call()
    def refresh_token_exchange():  # type: ignore[no-untyped-def]
        """
        Refresh token obtained from token exchange

        Uses the service account to refresh a token previously obtained through token exchange.
        """
        parser = reqparse.RequestParser()
        parser.add_argument(
            "target_client_id",
            type=str,
            help="Target client ID for the token exchange",
            required=True,
        )
        parser.add_argument(
            "refresh_token",
            type=str,
            help="Refresh token obtained from token exchange",
            required=True,
        )
        # This argument is optional as it's not really used
        # It's here to be compliant with our frontend auth API
        parser.add_argument(
            "refresh_token_type",
            type=str,
            help="type of the refresh token: web or mobile",
            required=False,
        )
        params = parser.parse_args(strict=True)

        # Use the IdentityProvider to refresh the token
        result = refresh_exchanged_token(
            target_client_id=params.get("target_client_id"),
            refresh_token=params.get("refresh_token"),
        )

        # Return the refreshed tokens
        return make_json_response(result)

    return auth_blueprint

blueprint_with_password_reset_request

create_auth_blueprint_with_password_reset_request
create_auth_blueprint_with_password_reset_request(
    token_auth, user_password_changed=None, enable_mfa=False
)

Create an auth blueprint from AuthBlueprint with a password reset request endpoint.

Source code in components/authentication/public/blueprints/blueprint_with_password_reset_request.py
def create_auth_blueprint_with_password_reset_request(  # type: ignore[no-untyped-def]
    token_auth: TokenAuth,
    user_password_changed: Callable | None = None,  # type: ignore[type-arg]
    enable_mfa: bool = False,
):
    """
    Create an auth blueprint from AuthBlueprint with a password reset request endpoint.
    """
    auth_blueprint = create_auth_blueprint(
        user_password_changed=user_password_changed,
        token_auth=token_auth,
        enable_mfa=enable_mfa,
    )

    # add password_reset_request route to auth_blueprint
    # the user class needs to have a lang field (see LangMixin)
    @auth_blueprint.route("/password_reset_request", methods=["POST"])
    @handle_with_api_error_handler
    @limit_by_param(
        param_name="email",
        request_limit=5,
        time_window=3600,  # 1 hour
        error_message="TOO_MANY_PASSWORD_RESET_REQUESTS_FOR_THIS_EMAIL",
    )
    @exponential_backoff()
    @obs.api_call()
    def password_reset_request():  # type: ignore[no-untyped-def]
        parser = reqparse.RequestParser()
        parser.add_argument("email", type=str, help="Email of the user", required=True)
        parser.add_argument("clientId", type=str, help="Keycloak Client ID")
        parser.add_argument(
            "refresh_token_type",
            type=str,
            help="type of the refresh token: web or mobile",
            choices=["web", "mobile"],
        )
        params = parser.parse_args(strict=True)

        if is_global_password_reset_enabled(params["email"]) and params["clientId"]:
            # Global password reset is behind a feature flag to control its rollout
            from shared.iam.keycloak import build_login_url

            login_url = build_login_url(
                client_id=params["clientId"], email=params["email"]
            )
            get_current_class(
                BaseRefreshToken
            ).user_cls().generate_global_password_reset_email(
                params["email"], client_id=params["clientId"], redirect_uri=login_url
            )
        else:
            current_logger.warning(
                "clientId not set in request parameters or global password reset flag (killswitch-global-backend-global-password-reset) is disabled, fallback to old password reset flow"
            )

            (
                user,
                token,
            ) = (
                get_current_class(BaseRefreshToken)
                .user_cls()
                .generate_password_reset_token(params["email"])
            )
            if user is None:
                raise BaseErrorCode.password_reset_bad_email()

            reset_url = current_app.front_end_url.build_url(  # type: ignore[attr-defined]
                "PASSWORD_RESET_BASE_URL", query_args={"token": token}
            )

            send_password_reset_email(  # type: ignore[call-arg]
                email=mandatory(user.email),
                reset_url=reset_url,
                lang=user.lang,
                email_queue_name=current_config["AUTH_MAIL_QUEUE"],
            )

        return make_json_response({"result": "password reset email sent"}, 200)

    return auth_blueprint
is_global_password_reset_enabled
is_global_password_reset_enabled(email)

Check if global password reset is enabled for the given email.

Source code in components/authentication/public/blueprints/blueprint_with_password_reset_request.py
def is_global_password_reset_enabled(email: str) -> bool:
    """
    Check if global password reset is enabled for the given email.
    """
    # DO NOT REPLICATE THIS CONTEXT, it's specific to this use-case, take a look at the [context documentation](https://github.com/alan-eu/alan-apps/tree/main/backend/shared/feature_flags) for more details
    user_context = UserContextData(key=email, kind="user")
    return bool_feature_flag(
        feature_flag_key="killswitch-global-backend-global-password-reset",
        context_data=user_context,
        default_value=False,
    )

conftest

authentication_service
authentication_service()

Standard authentication service

Source code in components/authentication/public/blueprints/conftest.py
@pytest.fixture
def authentication_service() -> AuthenticationService:
    """
    Standard authentication service
    """
    return AuthenticationService.create(app_name=AppName.SHARED_TESTING)
test_client
test_client(flask_app)
Source code in components/authentication/public/blueprints/conftest.py
@pytest.fixture(scope="module")
def test_client(flask_app: "CustomFlask"):  # noqa: D103
    return flask_app.test_client()

entities

AuthIdentity dataclass

AuthIdentity(
    id,
    first_name,
    last_name,
    email,
    language,
    email_verified,
    mfa_enabled,
    mfa_required=False,
)

Dataclass to isolate the component internal identity and its logic from the public one.

__eq__
__eq__(other)
Source code in components/authentication/public/entities.py
def __eq__(self, other):  # type: ignore[no-untyped-def]  # noqa: D105
    return isinstance(other, type(self)) and self.__key() == other.__key()  # type: ignore[no-untyped-call]
__hash__
__hash__()
Source code in components/authentication/public/entities.py
def __hash__(self):  # type: ignore[no-untyped-def]  # noqa: D105
    return hash(self.__key())  # type: ignore[no-untyped-call]
email instance-attribute
email
email_verified instance-attribute
email_verified
first_name instance-attribute
first_name
from_domain classmethod
from_domain(domain_identity)

Mapper from the domain identity to the public one.

Source code in components/authentication/public/entities.py
@classmethod
def from_domain(cls, domain_identity: DomainAuthIdentity) -> Self:
    """
    Mapper from the domain identity to the public one.
    """
    return cls(
        id=domain_identity.id,
        first_name=domain_identity.first_name,
        last_name=domain_identity.last_name,
        email=domain_identity.email,
        language=domain_identity.language,
        email_verified=domain_identity.email_verified,
        mfa_enabled=domain_identity.mfa_enabled,
        mfa_required=domain_identity.mfa_required,
    )
id instance-attribute
id
language instance-attribute
language
last_name instance-attribute
last_name
mfa_enabled instance-attribute
mfa_enabled
mfa_required class-attribute instance-attribute
mfa_required = False

events

component_events

IdentityCreatedEvent dataclass
IdentityCreatedEvent(
    identity_id,
    email,
    first_name,
    last_name,
    language,
    profile_id,
)

Bases: Message

Identity successfully created in Keycloak.

Emitted by: AuthenticationService.create_identity()

email instance-attribute
email
first_name instance-attribute
first_name
identity_id instance-attribute
identity_id
language instance-attribute
language
last_name instance-attribute
last_name
profile_id instance-attribute
profile_id
IdentityEmailChangedEvent dataclass
IdentityEmailChangedEvent(
    identity_id, old_email, new_email
)

Bases: Message

Identity email address successfully changed.

Emitted by: AuthenticationService.change_identity_email() when email change is approved

identity_id instance-attribute
identity_id
new_email instance-attribute
new_email
old_email instance-attribute
old_email
IdentityEmailCleared dataclass
IdentityEmailCleared(identity_id, invalidated_email)

Bases: Message

Identity email replaced with invalidated placeholder.

Emitted by: AuthenticationService.clear_identity_email()

identity_id instance-attribute
identity_id
invalidated_email instance-attribute
invalidated_email
IdentityMergedEvent dataclass
IdentityMergedEvent(from_identity_id, to_identity_id)

Bases: Message

Two identities merged due to email conflict.

Emitted by: AuthenticationService.change_identity_email() when target email already belongs to another identity, triggering automatic merge

from_identity_id instance-attribute
from_identity_id
to_identity_id instance-attribute
to_identity_id
PasswordResetEmailSentEvent dataclass
PasswordResetEmailSentEvent(
    identity_id, email, client_id, redirect_uri
)

Bases: Message

Password reset email sent to identity.

Emitted by: AuthenticationService.send_password_reset_email()

client_id instance-attribute
client_id
email instance-attribute
email
identity_id instance-attribute
identity_id
redirect_uri instance-attribute
redirect_uri

subscription

subscribe_to_events
subscribe_to_events()

All event subscriptions for the authentication component are here

Source code in components/authentication/public/events/subscription.py
def subscribe_to_events() -> None:
    """
    All event subscriptions for the authentication component are here
    """
    from components.authentication.internal.application.subscribers import (
        update_keycloak_identity_information,
        update_keycloak_identity_language,
    )
    from components.global_profile.public.events import (
        IdentityInformationChanged,
        PreferredLanguageChanged,
    )
    from shared.messaging.broker import get_message_broker

    message_broker = get_message_broker()

    # Update the first name and last name in Keycloak when they change in Global Profile
    message_broker.subscribe_async(
        IdentityInformationChanged,
        update_keycloak_identity_information,
        queue_name=LOW_PRIORITY_QUEUE,
    )

    # Update the preferred language in Keycloak when it changes in Global Profile
    message_broker.subscribe_async(
        PreferredLanguageChanged,
        update_keycloak_identity_language,
        queue_name=LOW_PRIORITY_QUEUE,
    )

feature_flags

use_authentication_table_for_keycloak_id

use_authentication_table_for_keycloak_id()

Determine whether to use the authentication table or the user model to get a person's keycloak_id.

Source code in components/authentication/public/feature_flags.py
def use_authentication_table_for_keycloak_id() -> bool:
    """
    Determine whether to use the authentication table or the user model to get a person's keycloak_id.
    """
    return bool_feature_flag(
        feature_flag_key="killswitch-backend-global-use-authentication-table-for-keycloak-id",
        default_value=False,
    )

identity_provider

AuthIdentity

__eq__
__eq__(other)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def __eq__(self, other):  # type: ignore[no-untyped-def]
    return isinstance(other, type(self)) and self.__key() == other.__key()  # type: ignore[no-untyped-call]
__hash__
__hash__()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def __hash__(self):  # type: ignore[no-untyped-def]
    return hash(self.__key())  # type: ignore[no-untyped-call]
check_password
check_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def check_password(self, prehashed_password: str) -> bool:
    raise NotImplementedError
clear_password
clear_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def clear_password(self) -> None:
    raise NotImplementedError
delete
delete()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def delete(self) -> None:
    raise NotImplementedError
email property
email
email_verified property
email_verified
first_name property
first_name
has_password
has_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def has_password(self) -> bool:
    raise NotImplementedError
id property
id
language property
language
last_name property
last_name
logout_all_sessions
logout_all_sessions()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def logout_all_sessions(self) -> None:
    raise NotImplementedError
mfa_enabled property
mfa_enabled
mfa_required property
mfa_required
set_email
set_email(email, is_email_verified)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_email(  # type: ignore[no-untyped-def]
    self, email: str | None, is_email_verified: bool
):  # TODO: Remove Optional and don't allow None values
    raise NotImplementedError
set_first_and_last_names
set_first_and_last_names(
    first_name, last_name, refresh_keycloak_user=True
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_and_last_names(  # type: ignore[no-untyped-def]
    self,
    first_name: str | None,
    last_name: str | None,
    refresh_keycloak_user: bool = True,
):
    raise NotImplementedError
set_first_name
set_first_name(first_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_name(self, first_name: str | None):  # type: ignore[no-untyped-def]
    raise NotImplementedError
set_language
set_language(language)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_language(self, language: Lang):  # type: ignore[no-untyped-def]
    raise NotImplementedError
set_last_name
set_last_name(last_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_last_name(self, last_name: str | None):  # type: ignore[no-untyped-def]
    raise NotImplementedError
set_mfa_enabled
set_mfa_enabled(enabled)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_enabled(self, enabled: bool) -> None:
    raise NotImplementedError
set_mfa_required
set_mfa_required(required)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_required(self, required: bool) -> None:
    raise NotImplementedError
set_password
set_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_password(self, prehashed_password: str):  # type: ignore[no-untyped-def]
    raise NotImplementedError
update_pending_deletion
update_pending_deletion(pending_deletion)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def update_pending_deletion(self, pending_deletion: bool) -> None:
    raise NotImplementedError

DevIdentity

DevIdentity(
    id,
    email,
    language,
    first_name,
    last_name,
    email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)

Bases: AuthIdentity

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(
    self,
    id: UUID,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> None:
    self._id = id
    self._email: str = email
    self._email_verified = email_verified
    self._first_name: str | None = first_name
    self._last_name: str | None = last_name
    self._mfa_enabled = mfa_enabled
    self._mfa_required = mfa_required
    self._language = language
    self._pending_deletion = False
check_password
check_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def check_password(self, prehashed_password: str) -> bool:
    return prehashed_password == PasswordMixin.prehash_password("azerty")
clear_password
clear_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def clear_password(self) -> None:
    warn_because_not_available("DevIdentity.clear_password")
delete
delete()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def delete(self) -> None:
    warn_because_not_available("DevIdentity.delete")
email property
email
email_verified property
email_verified
first_name property
first_name
has_password
has_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def has_password(self) -> bool:
    return True
id property
id
language property
language
last_name property
last_name
logout_all_sessions
logout_all_sessions()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def logout_all_sessions(self) -> None:
    pass
mfa_enabled property
mfa_enabled
mfa_required property
mfa_required
set_email
set_email(email, is_email_verified=True)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_email(self, email: str | None, is_email_verified: bool = True) -> None:
    if email:
        self._email = email
        self._email_verified = is_email_verified
set_first_and_last_names
set_first_and_last_names(
    first_name, last_name, refresh_keycloak_user=True
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_and_last_names(
    self,
    first_name: str | None,
    last_name: str | None,
    refresh_keycloak_user: bool = True,  # noqa: ARG002
) -> None:
    self._first_name = first_name or ""
    self._last_name = last_name or ""
set_first_name
set_first_name(first_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_name(self, first_name: str | None) -> None:
    if first_name:
        self._first_name = first_name
set_language
set_language(language)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_language(self, language: Lang):  # type: ignore[no-untyped-def]
    warn_because_not_available("DevIdentity.set_language")
    self._language = language
set_last_name
set_last_name(last_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_last_name(self, last_name: str | None) -> None:
    if last_name:
        self._last_name = last_name
set_mfa_enabled
set_mfa_enabled(enabled)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_enabled(self, enabled) -> None:  # type: ignore[no-untyped-def]
    warn_because_not_available("DevIdentity.set_mfa_enabled")
    self._mfa_enabled = enabled
set_mfa_required
set_mfa_required(required)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_required(self, required: bool) -> None:
    warn_because_not_available("DevIdentity.set_mfa_required")
    self._mfa_required = required
set_password
set_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_password(self, prehashed_password: str) -> None:  # noqa: ARG002
    warn_because_not_available("DevIdentity.set_password")
update_pending_deletion
update_pending_deletion(pending_deletion)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def update_pending_deletion(self, pending_deletion: bool) -> None:
    self._pending_deletion = pending_deletion

DevIdentityProvider

DevIdentityProvider()

Bases: IdentityProvider

A stubbed identity provider for the dev environment. Cases to test: - Standard login with email: http://localhost:4001/login ⧉ - Company creation: http://localhost:4001/fr-company-discovery/share?contractCoverOption=coverChildren&ccnCode=1486&participation=50&healthProduct=green&choosePrevoyance=true&hasLegacyHealthContract=true&hasLegacyPrevoyanceContract=true ⧉ - Fixture: http://localhost:8001/admin_tools/test_data_generator/new?fixture=LSBjb21wYW55Og%3D%3D ⧉ - User auth login: http://localhost:8002/auth/login?next=%2Foauth2%2Fauthorize%3Fresponse_type%3Dcode%26client_id%3Dmind_dev%26redirect_uri%3Djourapp%3A%252F%252Fauthcallback%252Falan%26scope%3Dopenid%2520email%26state%3D9dc9c12d-0d3b-44f1-af8f-c3b8e829b1eb ⧉ - Freelancer signup: http://localhost:4001/freelancer-signup ⧉

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(self) -> None:
    self._cache: dict[UUID, DevIdentity] = {}
    super().__init__()
create_new_identity
create_new_identity(
    email,
    language,
    first_name,
    last_name,
    is_email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def create_new_identity(
    self,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    is_email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> DevIdentity:
    identity = DevIdentity(
        id=uuid4(),
        email=email,
        language=language,
        first_name=first_name,
        last_name=last_name,
        email_verified=is_email_verified,
        mfa_enabled=mfa_enabled,
        mfa_required=mfa_required,
    )
    self._cache[identity.id] = identity
    return identity
exchange_token_for_user
exchange_token_for_user(target_client_id, email)

This provides a fake exchanged token for the scenario where the backend issues Keycloak tokens on behalf of the user.

Attributes:

Name Type Description
target_client_id str

The target client ID to exchange token with

email str

Email of the user to exchange token for

Returns:

Type Description
dict

Dictionary containing access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def exchange_token_for_user(
    self,
    target_client_id: str,  # noqa: ARG002
    email: str,
) -> dict:  # type: ignore[type-arg]
    """
    This provides a fake exchanged token for the scenario where the backend issues Keycloak tokens on behalf of the user.

    Attributes:
        target_client_id (str): The target client ID to exchange token with
        email (str): Email of the user to exchange token for

    Returns:
        Dictionary containing access and refresh tokens
    """
    return self._generate_fake_tokens(email, self._get_dev_prehashed_password())
find_identity
find_identity(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity(self, email: str | None) -> DevIdentity | None:
    if email is None:
        return None

    stored_identity = _find_dev_identity(email=email)
    if stored_identity is not None:
        self._cache[stored_identity.id] = stored_identity
    ids = {stored_identity.id} if stored_identity else set()
    for id, identity in self._cache.items():
        if identity.email == email:
            ids.add(identity.id)
            self._cache[id] = identity

    if len(ids) > 1:
        raise MultipleResultsFound(
            "Found multiple user matching authentication id or email"
        )
    if len(ids) == 1:
        return self._cache[ids.pop()]
    return None
find_identity_id_from_token
find_identity_id_from_token(token)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity_id_from_token(self, token: str) -> UUID | None:
    # Sent by a local mocked front end
    # frontend/shared/service-worker-mocks/handlers/keycloak-handlers.ts
    access_token = jwt.decode(
        token,
        options={"verify_signature": False},
        algorithms=[],
    )

    identity = self.find_identity(email=access_token["email"])
    return identity.id if identity else None
generate_password_reset_email
generate_password_reset_email(
    email, client_id, redirect_uri
)

Generate a password reset email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a password reset

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing password reset

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_password_reset_email(
    self,
    email: str,  # noqa: ARG002
    client_id: str,  # noqa: ARG002
    redirect_uri: str | None,  # noqa: ARG002
) -> None:
    """
    Generate a password reset email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a password reset
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing password reset
    """
    warn_because_not_available("DevIdentityProvider.generate_password_reset_email")
    return
generate_verification_email
generate_verification_email(
    identity_id, client_id, redirect_uri
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_verification_email(
    self,
    identity_id: UUID,  # noqa: ARG002
    client_id: str,  # noqa: ARG002
    redirect_uri: str | None,  # noqa: ARG002
) -> None:
    warn_because_not_available("DevIdentityProvider.generate_verification_email")
    return
get_identity
get_identity(authentication_id)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity(self, authentication_id: UUID | None) -> DevIdentity | None:
    if authentication_id is None:
        return None
    if authentication_id in self._cache:
        return self._cache[authentication_id]
    result = _find_dev_identity(id=authentication_id)
    if result:
        self._cache[authentication_id] = result
        return result
    return None
get_identity_id
get_identity_id(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_id(self, email: str) -> UUID | None:
    identity = self.find_identity(email)
    return identity.id if identity else None
healthcheck
healthcheck()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def healthcheck(self) -> bool:
    return True
refresh_exchanged_token
refresh_exchanged_token(target_client_id, refresh_token)

This provides a fake exchanged refresh tokens for the scenario where the backend issues Keycloak tokens on behalf of the user.

Attributes:

Name Type Description
target_client_id str

The target client ID to refresh token with

refresh_token str

The refresh token to use

Returns:

Type Description
dict

Dictionary containing refreshed access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def refresh_exchanged_token(
    self,
    target_client_id: str,  # noqa: ARG002
    refresh_token: str,
) -> dict:  # type: ignore[type-arg]
    """
    This provides a fake exchanged refresh tokens for the scenario where the backend issues Keycloak tokens on behalf of the user.

    Attributes:
        target_client_id (str): The target client ID to refresh token with
        refresh_token (str): The refresh token to use

    Returns:
        Dictionary containing refreshed access and refresh tokens
    """
    # Get email from refresh token as we know it's a fake token
    email = jwt.decode(
        refresh_token, options={"verify_signature": False}, algorithms=[]
    )["email"]
    return self._generate_fake_tokens(email, self._get_dev_prehashed_password())

IdentityProvider

create_new_identity
create_new_identity(
    email,
    language,
    first_name,
    last_name,
    is_email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def create_new_identity(
    self,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    is_email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> AuthIdentity:
    raise NotImplementedError
exchange_token_for_user
exchange_token_for_user(target_client_id, email)

Exchange token for a given user using service account

Attributes:

Name Type Description
target_client_id str

The target client ID to exchange token with

email str

Email of the user to exchange token for

Returns:

Type Description
dict

Dictionary containing access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def exchange_token_for_user(self, target_client_id: str, email: str) -> dict:  # type: ignore[type-arg]
    """
    Exchange token for a given user using service account

    Attributes:
        target_client_id (str): The target client ID to exchange token with
        email (str): Email of the user to exchange token for

    Returns:
        Dictionary containing access and refresh tokens
    """
    raise NotImplementedError
find_identity
find_identity(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity(self, email: str | None) -> AuthIdentity | None:
    raise NotImplementedError
find_identity_id_from_token
find_identity_id_from_token(token)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity_id_from_token(self, token: str) -> UUID | None:
    raise NotImplementedError
generate_password_reset_email
generate_password_reset_email(
    email, client_id, redirect_uri
)

Generate a password reset email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a password reset

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing password reset

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_password_reset_email(
    self, email: str, client_id: str, redirect_uri: str | None
) -> UUID | None:
    """
    Generate a password reset email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a password reset
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing password reset
    """
    raise NotImplementedError
generate_verification_email
generate_verification_email(
    identity_id, client_id, redirect_uri
)

Generate a verification email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a verification

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing verification

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_verification_email(
    self, identity_id: UUID, client_id: str, redirect_uri: str | None
) -> None:
    """
    Generate a verification email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a verification
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing verification
    """
    raise NotImplementedError
get_identity
get_identity(authentication_id)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity(self, authentication_id: UUID | None) -> AuthIdentity | None:
    raise NotImplementedError
get_identity_id
get_identity_id(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_id(self, email: str) -> UUID | None:
    raise NotImplementedError
healthcheck
healthcheck()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def healthcheck(self) -> bool:
    raise NotImplementedError
refresh_exchanged_token
refresh_exchanged_token(target_client_id, refresh_token)

Refresh an exchanged token using service account

Attributes:

Name Type Description
target_client_id str

The target client ID to refresh token with

refresh_token str

The refresh token to use

Returns:

Type Description
dict

Dictionary containing refreshed access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def refresh_exchanged_token(
    self, target_client_id: str, refresh_token: str
) -> dict:  # type: ignore[type-arg]
    """
    Refresh an exchanged token using service account

    Attributes:
        target_client_id (str): The target client ID to refresh token with
        refresh_token (str): The refresh token to use

    Returns:
        Dictionary containing refreshed access and refresh tokens
    """
    raise NotImplementedError

IdentityProviderType

Bases: Enum

keycloak class-attribute instance-attribute
keycloak = 1
stubbed class-attribute instance-attribute
stubbed = 0

KeycloakIdentity

KeycloakIdentity(id, keycloak_user, admin_client)

Bases: AuthIdentity

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(  # type: ignore[no-untyped-def]
    self,
    id: str,
    keycloak_user,
    admin_client: KeycloakAdmin,
) -> None:
    self._id = id
    self._keycloak_user = keycloak_user
    self.admin_client = admin_client
    keycloak_host = current_config.get("KEYCLOAK_HOST")
    keycloak_realm = current_config.get("KEYCLOAK_REALM")
    self.client = OAuth2Session(
        client_id=current_config.get("KEYCLOAK_CLIENT_ID"),
        token_endpoint=f"{keycloak_host}/realms/{keycloak_realm}/protocol/openid-connect/token",
    )
admin_client instance-attribute
admin_client = admin_client
check_password
check_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def check_password(self, prehashed_password: str) -> bool:
    result = self._check_password(prehashed_password)

    if not result and not self.has_password():  # type: ignore[no-untyped-call]
        current_logger.info(
            f"Prehashed password is not set for keycloak identity {self.id}"
        )
        raise BaseErrorCode.prehashed_password_not_set(str(self.id))

    return result
clear_password
clear_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def clear_password(self) -> None:
    credentials = self.admin_client.get_credentials(str(self.id))

    for credential in credentials:
        self.admin_client.delete_credential(
            user_id=str(self.id), credential_id=credential["id"]
        )

    current_logger.info(f"Password of user {self.id} cleared in Keycloak")
client instance-attribute
client = OAuth2Session(
    client_id=get("KEYCLOAK_CLIENT_ID"),
    token_endpoint=f"{keycloak_host}/realms/{keycloak_realm}/protocol/openid-connect/token",
)
delete
delete()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def delete(self) -> None:
    # Commit or rollback changes depending on the current transaction result.
    _on_commit(current_session, lambda: self._delete_user())
    _on_rollback(current_session, lambda: self.update_pending_deletion(False))

    self.update_pending_deletion(True)
    current_logger.info(
        f"User identity {self.id} will be deleted in Keycloak after commit"
    )
email property
email
email_verified property
email_verified
first_name property
first_name
has_password
has_password()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def has_password(self):  # type: ignore[no-untyped-def]
    credentials_metadata = _get_user_credential_metadata_cached(
        self.admin_client, str(self.id)
    )
    has_password = any(c["type"] == "password" for c in credentials_metadata)
    return has_password
id property
id
language property
language
last_name property
last_name
logout_all_sessions
logout_all_sessions()
Source code in components/authentication/internal/infrastructure/identity_provider.py
def logout_all_sessions(self) -> None:
    sessions_count = len(self.admin_client.get_sessions(str(self.id)))
    self.admin_client.user_logout(str(self.id))
    current_logger.info(
        f"User logout for user {self.id} - cleared {sessions_count} sessions"
    )
mfa_enabled property
mfa_enabled
mfa_required property
mfa_required
set_email
set_email(email, is_email_verified)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_email(self, email: str | None, is_email_verified: bool):  # type: ignore[no-untyped-def]
    if email is None:
        raise ValueError("Email cannot be None in Keycloak")

    self._update_user(
        payload={
            "email": email,
            "username": email,
            "emailVerified": is_email_verified,
        },
    )

    current_logger.info(f"Email of user {self.id} updated in Keycloak")
set_first_and_last_names
set_first_and_last_names(
    first_name, last_name, refresh_keycloak_user=True
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_and_last_names(
    self,
    first_name: str | None,
    last_name: str | None,
    refresh_keycloak_user: bool = True,
) -> None:
    first_name = first_name or ""
    last_name = last_name or ""

    self._update_user(
        payload={
            "firstName": first_name,
            "lastName": last_name,
        },
        refresh_keycloak_user=refresh_keycloak_user,
    )

    current_logger.info(
        f"First name and last name updated in Keycloak for user {self.id}"
    )
set_first_name
set_first_name(first_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_first_name(self, first_name: str | None) -> None:
    first_name = first_name or ""

    self._update_user(
        payload={
            "firstName": first_name,
        },
    )

    current_logger.info(f"First name of user {self.id} updated in Keycloak")
set_language
set_language(language)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_language(self, language: Lang) -> None:
    self._update_user_attribute(name="locale", value=language)

    current_logger.info(
        f"Language of user {self.id} updated to {language} in Keycloak"
    )
set_last_name
set_last_name(last_name)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_last_name(self, last_name: str | None) -> None:
    last_name = last_name or ""

    self._update_user(
        payload={
            "lastName": last_name,
        },
    )

    current_logger.info(f"Last name of user {self.id} updated in Keycloak")
set_mfa_enabled
set_mfa_enabled(enabled)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_enabled(self, enabled: bool) -> None:
    self._update_user_attribute(
        name="mfaEnabled",
        value="yes" if enabled else "no",
    )

    current_logger.info(
        f"MFA enabled flag of user {self.id} updated to {enabled} in Keycloak"
    )
set_mfa_required
set_mfa_required(required)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_mfa_required(self, required: bool) -> None:
    self._update_user_attribute(
        name="mfaRequired",
        value="yes" if required else "no",
    )

    current_logger.info(
        f"MFA required flag of user {self.id} updated to {required} in Keycloak"
    )
set_password
set_password(prehashed_password)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def set_password(self, prehashed_password: str) -> None:
    self.admin_client.set_user_password(
        user_id=str(self.id), password=prehashed_password, temporary=False
    )

    current_logger.info(f"Password of user {self.id} updated in Keycloak")
update_pending_deletion
update_pending_deletion(pending_deletion)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def update_pending_deletion(self, pending_deletion: bool) -> None:
    self._update_user_attribute(
        name="pendingDeletion",
        value="yes" if pending_deletion else "no",
    )

KeycloakIdentityProvider

KeycloakIdentityProvider()

Bases: IdentityProvider

Source code in components/authentication/internal/infrastructure/identity_provider.py
def __init__(self) -> None:
    self.keycloak_admin_client = get_admin_client()
    super().__init__()
create_new_identity
create_new_identity(
    email,
    language,
    first_name,
    last_name,
    is_email_verified=True,
    mfa_enabled=False,
    mfa_required=False,
)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def create_new_identity(
    self,
    email: str,
    language: Lang,
    first_name: str | None,
    last_name: str | None,
    is_email_verified: bool = True,
    mfa_enabled: bool = False,
    mfa_required: bool = False,
) -> KeycloakIdentity:
    keycloak_user_id: str = self._create_keycloak_user(
        email=email,
        language=language,
        first_name=first_name,
        last_name=last_name,
        email_verified=is_email_verified,
        mfa_enabled=mfa_enabled,
        mfa_required=mfa_required,
    )

    current_logger.info(
        f"Created new user in Keycloak with email {email} and id {keycloak_user_id}"
    )

    keycloak_user = _get_user_cached(self.keycloak_admin_client, keycloak_user_id)
    return KeycloakIdentity(
        id=keycloak_user["id"],
        keycloak_user=keycloak_user,
        admin_client=self.keycloak_admin_client,
    )
exchange_token_for_user
exchange_token_for_user(target_client_id, email)

Exchange token for a user using Keycloak service account

Attributes:

Name Type Description
target_client_id str

The target client ID to exchange token with

email str

Email of the user to exchange token for

Returns:

Type Description
dict

Dictionary containing access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def exchange_token_for_user(self, target_client_id: str, email: str) -> dict:  # type: ignore[type-arg]
    """
    Exchange token for a user using Keycloak service account

    Attributes:
        target_client_id (str): The target client ID to exchange token with
        email (str): Email of the user to exchange token for

    Returns:
        Dictionary containing access and refresh tokens
    """
    keycloak_openid = self.keycloak_admin_client.connection.keycloak_openid
    try:
        # Get service account token
        service_token = keycloak_openid.token(grant_type=["client_credentials"])

        # Perform token exchange
        exchanged_token = keycloak_openid.token(
            subject_token=service_token["access_token"],
            grant_type="urn:ietf:params:oauth:grant-type:token-exchange",
            audience=target_client_id,
            requested_subject=email,
        )

        return {
            "accessToken": exchanged_token["access_token"],
            "refreshToken": exchanged_token["refresh_token"],
        }

    except KeycloakAuthenticationError as e:
        current_logger.error(
            f"Keycloak token exchange authentication error: {str(e)}"
        )
        raise BaseErrorCode.token_error(error="Failed to exchange token")
    except KeycloakOperationError as e:
        current_logger.error(f"Keycloak token exchange operation error: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Failed to perform token exchange operation"
        )
    except Exception as e:
        current_logger.error(f"Unexpected error during token exchange: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Unexpected error during token exchange"
        )
find_identity
find_identity(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity(self, email: str | None) -> KeycloakIdentity | None:
    if not email:
        return None

    keycloak_user = self._find_keycloak_user_by_email(
        email=email,
    )

    if keycloak_user is None:
        return None

    return KeycloakIdentity(
        id=keycloak_user["id"],
        keycloak_user=keycloak_user,
        admin_client=self.keycloak_admin_client,
    )
find_identity_id_from_token
find_identity_id_from_token(token)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def find_identity_id_from_token(self, token: str) -> UUID | None:
    keycloak_user = get_user_from_access_token(token)
    return keycloak_user.id if keycloak_user else None
generate_password_reset_email
generate_password_reset_email(
    email, client_id, redirect_uri
)

Generate a password reset email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a password reset

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing password reset

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_password_reset_email(
    self, email: str, client_id: str, redirect_uri: str | None
) -> UUID | None:
    """
    Generate a password reset email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a password reset
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing password reset
    """
    user_id = self.get_identity_id(email=email)
    if user_id:
        self.keycloak_admin_client.send_update_account(
            user_id=user_id,
            payload=["UPDATE_PASSWORD"],
            client_id=client_id,
            redirect_uri=redirect_uri,
            lifespan=1800,  # 30 min
        )
        current_logger.info(
            f"Password reset email generated for {email} with client {client_id} & redirect URI {redirect_uri}"
        )

    return user_id
generate_verification_email
generate_verification_email(
    identity_id, client_id, redirect_uri
)

Generate a verification email in our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a verification

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing verification

Source code in components/authentication/internal/infrastructure/identity_provider.py
def generate_verification_email(
    self, identity_id: UUID, client_id: str, redirect_uri: str | None
) -> None:
    """
    Generate a verification email in our identity provider

    Attributes:
        email (str): The email address of the user to trigger a verification
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing verification
    """
    self.keycloak_admin_client.send_update_account(
        user_id=identity_id,
        payload=["VERIFY_EMAIL"],
        client_id=client_id,
        redirect_uri=redirect_uri,
        lifespan=1800,  # 30 min
    )
    current_logger.info(
        f"Verification email generated for identity id {identity_id} with client {client_id} & redirect URI {redirect_uri}"
    )
get_identity
get_identity(authentication_id)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity(self, authentication_id: UUID | None) -> KeycloakIdentity | None:
    if not authentication_id:
        return None

    try:
        keycloak_user = _get_user_cached(
            self.keycloak_admin_client, str(authentication_id)
        )
    except KeycloakBusinessError as e:
        if e.alancode == HTTPStatus.NOT_FOUND:
            return None

        raise

    # Check if user is not marked as "deleted".
    if self._is_pending_deletion(keycloak_user):
        return None

    return KeycloakIdentity(
        id=keycloak_user["id"],
        keycloak_user=keycloak_user,
        admin_client=self.keycloak_admin_client,
    )
get_identity_id
get_identity_id(email)
Source code in components/authentication/internal/infrastructure/identity_provider.py
def get_identity_id(self, email) -> UUID | None:  # type: ignore[no-untyped-def]
    if email is None:
        return None

    keycloak_user = self._find_by_email(
        email=email,
    )

    if not keycloak_user:
        current_logger.debug(f"No user found in Keycloak with email {email}")
        return None

    return UUID(keycloak_user["id"])
healthcheck
healthcheck()

This healthcheck is checking Keycloak service account has the right permissions so eventually making sure Keycloak is working as expected

Source code in components/authentication/internal/infrastructure/identity_provider.py
def healthcheck(self) -> bool:
    """
    This healthcheck is checking Keycloak service account has the right permissions
    so eventually making sure Keycloak is working as expected
    """
    app_name = get_current_app_name()

    if app_name == AppName.EU_TOOLS:
        # We skip eu-tools as we don't need to check the service account permissions on this one
        # Cf. https://alanhealth.slack.com/archives/C06P9PMR011/p1732783502638769?thread_ts=1732774294.726989&cid=C06P9PMR011
        current_logger.info(
            "No healthcheck for eu-tools app for now, skipping and assuming auth healthcheck is ok"
        )
        return True

    keycloak_admin_client_id = current_config.get("KEYCLOAK_ADMIN_CLIENT_ID")

    if not keycloak_admin_client_id:
        current_logger.critical(
            "KEYCLOAK_ADMIN_CLIENT_ID env var is not defined or is empty but required for Keycloak interactions from the backend"
        )
        return False

    # Keycloak service accounts are always under the form of "service-account-" + client ID
    # Cf. https://www.keycloak.org/docs/latest/server_admin/#adding-or-removing-roles-for-clients-service-account
    keycloak_service_account_username = (
        "service-account-" + keycloak_admin_client_id
    )
    keycloak_service_account_id = self.keycloak_admin_client.get_user_id(
        keycloak_service_account_username
    )

    if keycloak_service_account_id:
        all_roles = self.keycloak_admin_client.get_all_roles_of_user(
            keycloak_service_account_id
        )

        required_roles = {"manage-users", "view-users"}

        role_names = [
            mapping["name"]
            for client_mapping in all_roles.get("clientMappings", {}).values()
            for mapping in client_mapping.get("mappings", [])
        ]

        if required_roles.issubset(role_names):
            return True

        current_logger.critical(
            f"One of the required roles {required_roles} might be missing for Keycloak client service account: {keycloak_service_account_username}"
        )
    return False
keycloak_admin_client instance-attribute
keycloak_admin_client = get_admin_client()
refresh_exchanged_token
refresh_exchanged_token(target_client_id, refresh_token)

Refresh an exchanged token using Keycloak service account

Attributes:

Name Type Description
target_client_id str

The target client ID to refresh token with

refresh_token str

The refresh token to use

Returns:

Type Description
dict

Dictionary containing refreshed access and refresh tokens

Source code in components/authentication/internal/infrastructure/identity_provider.py
def refresh_exchanged_token(
    self, target_client_id: str, refresh_token: str
) -> dict:  # type: ignore[type-arg]
    """
    Refresh an exchanged token using Keycloak service account

    Attributes:
        target_client_id (str): The target client ID to refresh token with
        refresh_token (str): The refresh token to use

    Returns:
        Dictionary containing refreshed access and refresh tokens
    """
    keycloak_openid = self.keycloak_admin_client.connection.keycloak_openid
    try:
        refreshed_token = keycloak_openid.token(
            grant_type="refresh_token",
            refresh_token=refresh_token,
            audience=target_client_id,
        )

        return {
            "accessToken": refreshed_token["access_token"],
            "refreshToken": refreshed_token["refresh_token"],
        }

    except KeycloakAuthenticationError as e:
        current_logger.error(
            f"Keycloak token refresh authentication error: {str(e)}"
        )
        raise BaseErrorCode.token_error(error="Failed to refresh token")
    except KeycloakOperationError as e:
        current_logger.error(f"Keycloak token refresh operation error: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Failed to perform token refresh operation"
        )
    except Exception as e:
        current_logger.error(f"Unexpected error during token refresh: {str(e)}")
        raise BaseErrorCode.token_error(
            error="Unexpected error during token refresh"
        )

inject_authentication_service

T module-attribute

T = TypeVar('T')

inject_authentication_service

inject_authentication_service(f)

Dependency injection decorator for AuthenticationService.

Automatically injects an AuthenticationService instance into functions that have an 'authentication_service' parameter. If the function doesn't have this parameter, the decorator becomes a no-op.

Parameters:

Name Type Description Default
f Callable[..., T]

The function to decorate

required

Returns:

Type Description
Callable[..., T]

The original function or a wrapped function with automatic service injection

Examples:

Basic usage:

>>> @inject_authentication_service
... def create_user_session(user_id: int, authentication_service: AuthenticationService) -> str:
...     return authentication_service.create_session_token(user_id)
>>> token = create_user_session(user_id=123)  # Service injected automatically

Function without injection (no-op):

>>> @inject_authentication_service
... def simple_function(value: str) -> str:
...     return value.upper()
>>> result = simple_function("hello")  # Returns "HELLO", no injection needed
Source code in components/authentication/public/inject_authentication_service.py
def inject_authentication_service(f: Callable[..., T]) -> Callable[..., T]:
    """
    Dependency injection decorator for AuthenticationService.

    Automatically injects an AuthenticationService instance into functions that have
    an 'authentication_service' parameter. If the function doesn't have this parameter,
    the decorator becomes a no-op.

    Args:
        f: The function to decorate

    Returns:
        The original function or a wrapped function with automatic service injection

    Examples:
        Basic usage:

        >>> @inject_authentication_service
        ... def create_user_session(user_id: int, authentication_service: AuthenticationService) -> str:
        ...     return authentication_service.create_session_token(user_id)
        >>> token = create_user_session(user_id=123)  # Service injected automatically

        Function without injection (no-op):

        >>> @inject_authentication_service
        ... def simple_function(value: str) -> str:
        ...     return value.upper()
        >>> result = simple_function("hello")  # Returns "HELLO", no injection needed
    """
    from components.authentication.public.api import AuthenticationService

    method_signature = signature(f)

    @wraps(f)
    def decorated_function(*args, **kwargs):  # type: ignore[no-untyped-def]
        kwargs["authentication_service"] = AuthenticationService.create()
        return f(*args, **kwargs)

    if "authentication_service" in method_signature.parameters:
        return decorated_function
    return f

mfa_auth

DictNotifier

DictNotifier()

Bases: Notifier

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self) -> None:
    self.store = {}  # type: ignore[var-annotated]
get
get(operation_id)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def get(self, operation_id: UUID) -> int | None:
    return self.store.get(operation_id)
name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.dict
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self,
    pending_operation: PendingOperation,
    origin: MfaRequestOrigin,  # noqa: ARG002
) -> None:
    self.store[pending_operation.id] = pending_operation.validation_code
store instance-attribute
store = {}

EmailNotifier

EmailNotifier(get_user_cls)

Bases: Notifier

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(self, get_user_cls: Callable[[], type[Authenticatable]]) -> None:
    self._get_user_cls = get_user_cls
name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.email
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    if origin == MfaRequestOrigin.login_web:
        current_logger.debug(
            "Skipping email notification as MFA request is coming from web app"
        )
        return

    user: BaseUser = current_session.get(  # type: ignore[assignment]
        self._get_user_cls(),  # type: ignore[arg-type]
        pending_operation.user_id,
    )
    send_mfa_code_email(
        first_name=user.first_name,  # type: ignore[arg-type]
        email=user.email,  # type: ignore[arg-type]
        validation_code=pending_operation.validation_code,
        validation_code_description=pending_operation.description,
        validation_code_type=pending_operation.type,
        lang=getattr(user, "lang", None),
        # This email needs to be sent synchronously as the member needs it
        # as soon as possible.
        # Also, in case of an incident with workers, the member might never
        # receive the email, and would be blocked on authentication...
        is_async=False,
    )

IosSimulatorJsonNotifier

Bases: Notifier

name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.ios_simulator
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    if origin == MfaRequestOrigin.login_mobile:
        # Sending a push notification makes no sense.
        return

    with NamedTemporaryFile(
        prefix="mfa-notif-", suffix=".apns", delete=False
    ) as jf:
        jf.write(
            json.dumps(
                {
                    "Simulator Target Bundle": "alan.health.ios",
                    "google.c.a.e": 1,
                    "gcm.message_id": "0:1538488916770554%a88db343a88db34",
                    "aps": {
                        "alert": {
                            "title": "Opération en attente de validation",
                            "body": pending_operation.description,
                        },
                    },
                    "operation_id": str(pending_operation.id),
                    "validation_code": str(pending_operation.validation_code),
                    "description": pending_operation.description,
                    "type": pending_operation.type,
                    "name": "mfa_operation_pending_push_notification",
                }
            ).encode("UTF-8")
        )
        current_logger.info(f"Notification dumped to {jf.name}")

MFA

MFA(storage, notifiers, _get_user_cls)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(
    self,
    storage: MFAStorage,
    notifiers: list[Notifier],
    _get_user_cls: Callable[[], type[Authenticatable]],
) -> None:
    self.storage = storage
    self.notifiers = {notifier.name(): notifier for notifier in notifiers}

    self._get_user_cls = _get_user_cls
get_operation_status
get_operation_status(operation_id, nonce)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def get_operation_status(self, operation_id: UUID, nonce: UUID) -> ValidationStatus:
    pending_operation = self._get_pending_operation(operation_id)
    if pending_operation.nonce != nonce:
        raise BadNonce

    return pending_operation.status
mfa_required
mfa_required(op_description, op_type)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def mfa_required(  # type: ignore[no-untyped-def]
    self,
    op_description: str,
    op_type: str,
):
    def decorator(f):  # type: ignore[no-untyped-def]
        @wraps(f)
        def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]
            authentication_service = AuthenticationService.create()
            assert g.current_user is not None
            identity = authentication_service.get_keycloak_identity_by_profile_id(
                g.current_user.profile_id
            )
            assert identity is not None

            if not identity.mfa_enabled:
                return f(*args, **kwargs)

            parser = reqparse.RequestParser()
            parser.add_argument(
                "operation_id",
                type=UUID,
                help="Pending operation ID",
                required=False,
            )
            parser.add_argument(
                "nonce",
                type=UUID,
                help="Pending operation nonce (only known by original client)",
                required=False,
            )
            parser.add_argument(
                "refresh_token_type",
                type=str,
                help="type of the refresh token: web or mobile",
                required=True,
            )

            params = parser.parse_args(strict=False)
            pending_operation_id = params.get("operation_id")

            if pending_operation_id is None:
                refresh_token_type = params.get("refresh_token_type")
                origin = MfaRequestOrigin[f"login_{refresh_token_type}"]

                pending_operation = self._create_pending_operation(
                    user_id=g.current_user.id,
                    description=op_description,
                    type=op_type,
                    origin=origin,
                )

                current_logger.info(
                    f"MFA operation {pending_operation.id} required on {request.path} for user {g.current_user.id}",
                    pending_operation_id=pending_operation.id,
                    origin=origin,
                )
            else:
                pending_operation = self._get_pending_operation(
                    pending_operation_id
                )

                if pending_operation.nonce != params.get("nonce"):
                    current_logger.warning(
                        f"bad MFA nonce: expected {pending_operation.nonce}, got {params.get('nonce')}"
                    )
                    self.update_status(pending_operation, ValidationStatus.FAILURE)

            if pending_operation.status == ValidationStatus.PENDING:
                raise BaseErrorCode.mfa_verification_pending(
                    operation_id=str(pending_operation.id),
                    nonce=str(pending_operation.nonce),
                )

            elif pending_operation.status == ValidationStatus.FAILURE:
                raise BaseErrorCode.authorization_error()

            else:
                return f(*args, **kwargs)

        return wrapper

    return decorator
notifiers instance-attribute
notifiers = {(name()): notifierfor notifier in notifiers}
register_notifier
register_notifier(notifier)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def register_notifier(self, notifier: Notifier) -> None:
    self.notifiers[notifier.name()] = notifier
send_email
send_email(operation_id, nonce)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def send_email(self, operation_id: UUID, nonce: UUID):  # type: ignore[no-untyped-def]
    pending_operation = self._get_pending_operation(operation_id)
    if pending_operation.nonce != nonce:
        raise BadNonce

    email_notifier = self.notifiers.get(NotifierType.email)
    if not email_notifier:
        raise NotImplementedError(
            "Cannot send email notification as email notifier is not register"
        )

    email_notifier.notify(
        pending_operation=pending_operation,
        origin=MfaRequestOrigin.email,
    )
storage instance-attribute
storage = storage
update_status
update_status(pending_operation, new_status)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def update_status(
    self, pending_operation: PendingOperation, new_status: ValidationStatus
) -> None:
    # Rejection status is permanent
    if pending_operation.status == ValidationStatus.FAILURE:
        return

    pending_operation.status = new_status
    self.storage.update(pending_operation)
validate_or_reject
validate_or_reject(
    operation_id,
    validation_code=None,
    authenticated=False,
    authenticated_user=None,
    reject=False,
)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def validate_or_reject(
    self,
    operation_id: UUID,
    validation_code: int | None = None,
    authenticated: bool = False,
    authenticated_user: UserId | None = None,
    reject: bool = False,
) -> bool:
    pending_operation = self._get_pending_operation(operation_id)

    if pending_operation.status == ValidationStatus.SUCCESS:
        current_logger.warning(
            f"MFA operation already validated: {operation_id}",
            operation_id=operation_id,
        )
        raise OperationValidated
    elif pending_operation.status == ValidationStatus.FAILURE:
        current_logger.warning(
            f"MFA operation already rejected: {operation_id}",
            operation_id=operation_id,
        )
        raise OperationRejected

    # Check for authenticated user id
    # The authenticated user id might be an UUID (international app or during tests)
    # or an int (fr-app).
    # When the authenticated user id is an UUID, it is ultimately stored as string.
    # To be sure equality works in all cases, cast both values to a string.
    if authenticated and str(pending_operation.user_id) != str(authenticated_user):
        current_logger.warning(
            f"MFA operation validation related to a different user: {operation_id}",
            operation_id=operation_id,
            operation_user_id=pending_operation.user_id,
            authenticated_user_id=authenticated_user,
        )
        raise InvalidUser

    if reject:
        current_logger.debug(
            f"Rejecting MFA operation: {operation_id}", operation_id=operation_id
        )
        new_status = ValidationStatus.FAILURE
    elif authenticated or pending_operation.validation_code == validation_code:
        current_logger.debug(
            f"Validating MFA operation: {operation_id}", operation_id=operation_id
        )
        new_status = ValidationStatus.SUCCESS
    else:
        return False

    self.update_status(pending_operation, new_status)
    return True

Notifier

name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self) -> NotifierType:
    raise NotImplementedError
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(self, pending_operation: PendingOperation, origin: MfaRequestOrigin):  # type: ignore[no-untyped-def]
    raise NotImplementedError

PrintNotifier

Bases: Notifier

name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.print
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    current_logger.info(
        "MFA pending", pending_operation=asdict(pending_operation), origin=origin
    )

PushNotificationSender module-attribute

PushNotificationSender = Callable[
    [UserId, str, UUID, str | None], None
]

PushNotifier

PushNotifier(send_mfa_operation_pending_notification)

Bases: Notifier

Source code in components/authentication/internal/infrastructure/mfa_auth.py
def __init__(
    self, send_mfa_operation_pending_notification: PushNotificationSender
) -> None:
    self._send_push_notification = send_mfa_operation_pending_notification
name
name()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def name(self):  # type: ignore[no-untyped-def]
    return NotifierType.push
notify
notify(pending_operation, origin)
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def notify(
    self, pending_operation: PendingOperation, origin: MfaRequestOrigin
) -> None:
    if origin == MfaRequestOrigin.login_mobile:
        # Sending a push notification makes no sense.
        current_logger.debug(
            "Skipping push notification as MFA request is coming from mobile app"
        )
        return

    self._send_push_notification(  # type: ignore[misc]
        pending_operation.user_id,
        pending_operation.description,
        pending_operation.id,
        pending_operation.type,
    )

create_mfa_storage

create_mfa_storage()
Source code in components/authentication/internal/infrastructure/mfa_auth.py
def create_mfa_storage():  # type: ignore[no-untyped-def]
    try:
        redis_connection = get_redis_main_connection()
        return RedisMFAStorage(redis_connection[0])
    except:  # noqa: E722  # excluded as part as the ruff baseline on rule introduction, please use a more descriptive error if possible
        current_logger.exception("Error when initializing Redis MFA storage")
        return DictMFAStorage()

password_reset

PASSWORD_RESET_STORAGE module-attribute

PASSWORD_RESET_STORAGE = 'password_reset_storage'

PasswordResetStorage

PasswordResetStorage(app=None, redis_connection=None)

Storage backend for password reset data using Redis.

Handles storing and retrieving password reset tokens and associated data.

Source code in components/authentication/public/password_reset.py
def __init__(self, app=None, redis_connection=None) -> None:  # type: ignore[no-untyped-def]
    if app is not None:
        self.init_app(app, redis_connection)
delete
delete(storage_id)

Delete a password reset item from storage.

Parameters:

Name Type Description Default
storage_id str

The storage identifier to delete

required
Source code in components/authentication/public/password_reset.py
def delete(self, storage_id: str) -> None:
    """Delete a password reset item from storage.

    Args:
        storage_id: The storage identifier to delete
    """
    key = self._redis_key(storage_id)
    self._redis.delete(key)  # type: ignore[union-attr]
get
get(storage_id)

Retrieve a password reset item from storage.

Parameters:

Name Type Description Default
storage_id str

The storage identifier

required

Returns:

Type Description
PasswordResetStorageItem | None

PasswordResetStorageItem | None: The stored reset item if found, None otherwise

Source code in components/authentication/public/password_reset.py
def get(self, storage_id: str) -> PasswordResetStorageItem | None:
    """Retrieve a password reset item from storage.

    Args:
        storage_id (str): The storage identifier

    Returns:
        PasswordResetStorageItem | None: The stored reset item if found, None otherwise
    """
    key = self._redis_key(storage_id)
    value = self._redis.get(key)  # type: ignore[union-attr]

    if value is None:
        return None

    return PasswordResetStorageItem.from_json(value)  # type: ignore[arg-type]
init_app
init_app(app, redis_connection)

Initialize the storage with a Flask app and Redis connection.

Parameters:

Name Type Description Default
app

Flask application instance

required
redis_connection

Redis connection instance

required
Source code in components/authentication/public/password_reset.py
def init_app(self, app, redis_connection) -> None:  # type: ignore[no-untyped-def]
    """Initialize the storage with a Flask app and Redis connection.

    Args:
        app: Flask application instance
        redis_connection: Redis connection instance
    """
    self._redis = redis_connection
    app.extensions[PASSWORD_RESET_STORAGE] = self
set
set(storage_id, item)

Store a password reset item with expiration.

Parameters:

Name Type Description Default
storage_id str

The storage identifier

required
item PasswordResetStorageItem

The reset item to store

required
Source code in components/authentication/public/password_reset.py
def set(self, storage_id: str, item: PasswordResetStorageItem) -> None:
    """Store a password reset item with expiration.

    Args:
        storage_id (str): The storage identifier
        item (PasswordResetStorageItem): The reset item to store
    """
    key = self._redis_key(storage_id)
    self._redis.setex(  # type: ignore[union-attr]
        name=key,
        time=current_config["PASSWORD_RESET_EXPIRATION_TIME"],
        value=item.to_json(),
    )

PasswordResetStorageItem dataclass

PasswordResetStorageItem(nonce, email)

Bases: DataClassJsonMixin

Storage item for password reset data.

Attributes:

Name Type Description
nonce str

A unique identifier for this password reset request

email str

The email address associated with the reset request

email instance-attribute
email
nonce instance-attribute
nonce

generate_password_reset_token

generate_password_reset_token(auth_identity)

Generate a password reset token for the given auth identity.

Parameters:

Name Type Description Default
auth_identity AuthIdentity

The authentication identity for which to generate the reset token.

required

Returns:

Type Description
str | bytes

str | bytes: A signed token containing the email and a nonce for password reset verification.

Source code in components/authentication/public/password_reset.py
def generate_password_reset_token(
    auth_identity: AuthIdentity | DomainAuthIdentity,
) -> str | bytes:
    """Generate a password reset token for the given auth identity.

    Args:
        auth_identity (AuthIdentity): The authentication identity for which to generate the reset token.

    Returns:
        str | bytes: A signed token containing the email and a nonce for password reset verification.
    """
    serializer = URLSafeTimedSerializer(current_config["SECRET_KEY"])
    nonce_storage = current_app.password_reset_storage  # type: ignore[attr-defined]

    nonce = str(uuid.uuid4())
    nonce_storage.set(
        storage_id=auth_identity.id,
        item=PasswordResetStorageItem(nonce=nonce, email=auth_identity.email),
    )

    data = {"email": auth_identity.email, "nonce": nonce}
    return serializer.dumps(data, salt=_PASSWORD_RESET_SALT)

verify_token_and_reset_password

verify_token_and_reset_password(
    token, prehashed_password_string
)

Verify a password reset token and update the user's password if valid.

Parameters:

Name Type Description Default
token str

The password reset token to verify

required
prehashed_password_string str

The new pre-hashed password to set

required

Returns:

Name Type Description
DomainAuthIdentity AuthIdentity | AuthIdentity

The authentication identity whose password was reset

Raises:

Type Description
BaseErrorCode

If the token is expired or invalid

Source code in components/authentication/public/password_reset.py
def verify_token_and_reset_password(
    token: str, prehashed_password_string: str
) -> DomainAuthIdentity | AuthIdentity:
    """Verify a password reset token and update the user's password if valid.

    Args:
        token (str): The password reset token to verify
        prehashed_password_string (str): The new pre-hashed password to set

    Returns:
        DomainAuthIdentity: The authentication identity whose password was reset

    Raises:
        BaseErrorCode: If the token is expired or invalid
    """
    from components.authentication.public.api import AuthenticationService

    expiration = current_config["PASSWORD_RESET_EXPIRATION_TIME"]
    serializer = URLSafeTimedSerializer(current_config["SECRET_KEY"])
    authentication_service = AuthenticationService.create()
    email = None
    try:
        data = serializer.loads(token, salt=_PASSWORD_RESET_SALT, max_age=expiration)

        if "email" not in data and "nonce" not in data:
            raise ValueError("invalid token, missing data")

        email = data["email"]
        auth_identity = authentication_service.get_identity_by_email(email)
        if auth_identity is None:
            raise ValueError(f"Can't find auth identity with email {email}")

        storage = current_app.password_reset_storage  # type: ignore[attr-defined]
        stored_data = storage.get(auth_identity.id)
        if stored_data is None:
            raise ValueError(
                f"Stale password reset token for auth identity {auth_identity.id}"
            )

        if email != stored_data.email:
            raise ValueError("invalid token, wrong email")

        if data["nonce"] != stored_data.nonce:
            raise ValueError("invalid token, wrong nonce")

        storage.delete(auth_identity.id)

        current_logger.info(f"Resetting password for auth identity {auth_identity.id}")
        authentication_service.set_identity_credentials(
            identity_id=auth_identity.id,
            email=auth_identity.email,
            prehashed_password=prehashed_password_string,
            is_email_verified=True,
        )

        return auth_identity
    except SignatureExpired:
        data = serializer.loads(token, salt=_PASSWORD_RESET_SALT)
        email = data["email"] if "email" in data else None
        raise BaseErrorCode.token_error(error=f"expired reset token for email {email}")
    except (BadSignature, ValueError) as error:
        raise BaseErrorCode.token_error(
            error=f"bad reset token for email {email}"
        ) from error