Skip to content

Reference

shared.iam.abac

access_policy

AccessPolicy

Bases: ABC

Define an Access Based Access Control (ABAC) policy

__init_subclass__
__init_subclass__()
Source code in shared/iam/abac/access_policy.py
def __init_subclass__(cls) -> None:
    super().__init_subclass__()

    # Skip validation for abstract classes - they may not have descriptions yet
    if inspect.isabstract(cls):
        return

    # Check if description was explicitly set on THIS class
    has_own_description = "description" in cls.__dict__ and isinstance(
        cls.__dict__["description"], str
    )

    if has_own_description:
        # Description is set: leave it as-is
        pass
    elif cls.__doc__ is not None:
        # No description but has docstring: use docstring
        cls.description = cls.__doc__.strip()
    else:
        # Neither description nor docstring: raise error
        raise TypeError(
            f"{cls.__name__} must define either a 'description' class attribute "
            f"or a docstring"
        )
description instance-attribute
description
do_evaluate classmethod
do_evaluate(*args, **kwargs)
Source code in shared/iam/abac/access_policy.py
@classmethod
@final
def do_evaluate(cls, *args: Any, **kwargs: Any) -> bool:
    start = time.perf_counter()
    result = cls.evaluate(*args, **kwargs)
    duration_ms = (time.perf_counter() - start) * 1000

    result_tag = "success" if result else "failure"
    metrics.distribution(
        "iam.access_policy.evaluation_duration_ms",
        duration_ms,
        tags=[f"policy_id:{cls.policy_id}", f"result:{result_tag}"],
    )

    if result:
        current_logger.debug(
            "Access policy evaluation succeeded", access_policy_id=cls.policy_id
        )
    else:
        current_logger.info(
            "Access policy evaluation failed", access_policy_id=cls.policy_id
        )
    return result
enforce classmethod
enforce(*args, **kwargs)
Source code in shared/iam/abac/access_policy.py
@classmethod
@final
def enforce(cls, *args: Any, **kwargs: Any) -> None:
    if not cls.do_evaluate(*args, **kwargs):
        raise AccessPolicyError(f"Access denied for {cls.policy_id}")
evaluate abstractmethod classmethod
evaluate(*args, **kwargs)
Source code in shared/iam/abac/access_policy.py
@classmethod
@abstractmethod
def evaluate(cls, *args: Any, **kwargs: Any) -> bool:
    raise NotImplementedError
policy_id instance-attribute
policy_id

AccessPolicyError

Bases: IamException, Forbidden

level class-attribute instance-attribute
level = 'warning'

P module-attribute

P = ParamSpec('P')

PolicyEnforcedCallable

Bases: Protocol[P, T_co]

A callable with access policies attached

__call__
__call__(*args, **kwargs)
Source code in shared/iam/abac/access_policy.py
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> T_co: ...

T module-attribute

T = TypeVar('T')

T_co module-attribute

T_co = TypeVar('T_co', covariant=True)

enforce_policy

enforce_policy(*policies)

Evaluate one or more policies and raise PermissionError if any of them fails

Source code in shared/iam/abac/access_policy.py
def enforce_policy(
    *policies: type[AccessPolicy],
) -> Callable[[Callable[P, T]], PolicyEnforcedCallable[P, T]]:
    """Evaluate one or more policies and raise PermissionError if any of them fails"""

    if not policies:
        raise ValueError("At least one policy must be provided")

    # Cache inspection results for all policies
    policy_info = {pol: _get_policy_parameters(pol) for pol in policies}

    def decorator(f: Callable[P, T]) -> PolicyEnforcedCallable[P, T]:
        # Inspect function signature at decoration time for validation
        func_sig = inspect.signature(f)
        func_param_names = set(func_sig.parameters.keys())

        # Validate that all required policy parameters are available in the function
        for pol in policies:
            _validate_policy_params(pol, policy_info, func_param_names, f.__name__)

        @wraps(f)
        def decorated(*args: P.args, **kwargs: P.kwargs) -> T:
            # Use bind_partial - don't crash if some params are missing
            bound_args = func_sig.bind_partial(*args, **kwargs)
            bound_args.apply_defaults()

            # Policy enforcement will fail if REQUIRED params are missing
            for pol in policies:
                _enforce_policy_with_bound_args(pol, policy_info[pol], bound_args)

            return f(*args, **kwargs)

        decorated._access_policies = policies  # type: ignore[attr-defined]  # noqa: ALN027
        decorated._policy_info = policy_info  # type: ignore[attr-defined]  # noqa: ALN027

        return cast("PolicyEnforcedCallable[P, T]", decorated)

    return decorator

enforce_policy_before_request

enforce_policy_before_request(*policies)
Source code in shared/iam/abac/access_policy.py
def enforce_policy_before_request(
    *policies: type[AccessPolicy],
) -> Callable[[], None]:
    @enforce_policy(*policies)
    def before_request() -> None:
        pass

    return before_request

or_

or_(*policies)

Create a disjunction (OR) of multiple policies

Source code in shared/iam/abac/access_policy.py
def or_(*policies: type[AccessPolicy]) -> type[AccessPolicy]:
    """Create a disjunction (OR) of multiple policies"""
    if not policies:
        raise ValueError("At least one policy must be provided to or_()")

    # Cache policy info at creation time
    policies_info = {pol: _get_policy_parameters(pol) for pol in policies}

    class DisjunctionPolicy(AccessPolicy):
        policy_id = f"or_({', '.join(p.policy_id for p in policies)})"
        description = f"Disjunction of: {', '.join(p.policy_id for p in policies)}"

        # Store constituent policies for validation
        _constituent_policies: tuple[type[AccessPolicy], ...] = policies

        @classmethod
        def evaluate(cls, *args: Any, **kwargs: Any) -> bool:
            """Evaluate with OR logic - return True if ANY policy passes"""
            # Try each policy with filtered parameters using cached info
            return any(
                _evaluate_policy_with_args(
                    policy,
                    policies_info[policy],
                    *args,
                    **kwargs,
                )
                for policy in policies
            )

    return DisjunctionPolicy

backoffice

BackofficePermissionAccessPolicy

permitted_for classmethod
permitted_for(permissions, stack_level=2)
Source code in shared/iam/abac/backoffice.py
@classmethod
def permitted_for(
    cls,
    permissions: EmployeePermission | set[EmployeePermission],
    stack_level: int = 2,
) -> type[AccessPolicy]:
    class _BackofficePermissionAccessPolicy(AccessPolicy):
        policy_id = f"backoffice-permission-policy({permissions.name if isinstance(permissions, EmployeePermission) else ', '.join([permission.name for permission in permissions])})"
        description = (
            "Access policy for backoffice (EmployeePermission) permissions"
        )

        @classmethod
        def evaluate(cls) -> bool:
            return current_auth_context.has_backoffice_permissions(
                permissions, stack_level
            )

    return _BackofficePermissionAccessPolicy

shared.iam.auth_context

model

AuthContext dataclass

AuthContext(
    id=uuid.uuid4(),
    _real_principal=None,
    _effective_principal=None,
    _delegate_principal=None,
    session_id=None,
    session_scopes=set(),
    impersonation_mode=None,
    cloudflare_zerotrust_identity=None,
    cloudflare_ray_id=None,
    cloudflare_ip_country=None,
)

Keeps track of all the authentication context i.e. who is logged in, on whose behalf, with which scope(s), etc. Parts of it are persisted in TransactionAuthContext for our audit trail.

NEVER EVER create an AuthContext on your own - only access it via current_auth_context

__post_init__
__post_init__()
Source code in shared/iam/auth_context/model.py
def __post_init__(self) -> None:
    if self._real_principal is not None:
        object.__setattr__(self, "_real_principal_id", self._real_principal.id)
    if self._effective_principal is not None:
        object.__setattr__(
            self, "_effective_principal_id", self._effective_principal.id
        )
    if self._delegate_principal is not None:
        object.__setattr__(
            self, "_delegate_principal_id", self._delegate_principal.id
        )

    if self._real_principal is None != self._effective_principal is None:
        raise ValueError(
            "Effective principal must be set if and only if the real principal is set"
        )
    if self.is_anonymous and self._delegate_principal is not None:
        raise ValueError(
            "Delegate principal can only be set if the real principal is set"
        )
    if self.is_impersonated and self.impersonation_mode is None:
        raise ValueError(
            "Impersonation mode must be set when effective and real principals are different"
        )
cloudflare_ip_country class-attribute instance-attribute
cloudflare_ip_country = None

The 2-letter ISO country code from Cloudflare's geo IP (CF-IPCountry header)

cloudflare_ray_id class-attribute instance-attribute
cloudflare_ray_id = None

The Cloudflare Ray ID - unique identifier for the request

cloudflare_zerotrust_identity class-attribute instance-attribute
cloudflare_zerotrust_identity = None
delegate_principal property
delegate_principal

Get the (optional) principal who is performing the action on behalf of the real principal

delegate_principal_as
delegate_principal_as(principal_class)

Get the (optional) principal who is performing the action on behalf of the real principal, cast as a specific class This will raise an exception if the actual type is not the one expected

Source code in shared/iam/auth_context/model.py
def delegate_principal_as(self, principal_class: type[_T]) -> _T | None:
    """Get the (optional) principal who is performing the action on behalf of the real principal, cast as a specific class
    This will raise an exception if the actual type is not the one expected"""
    if self._delegate_principal is None:
        return None

    if not isinstance(self.delegate_principal, principal_class):
        raise ValueError(
            f"Unexpected principal type: {self.delegate_principal.__class__.__name__} (expected {principal_class.__name__})"
        )

    return self.delegate_principal
effective_principal property
effective_principal

Get the principal who is the subject of the action, i.e. the actor or an impersonated user This will raise an exception if it's not set You need to call is_authenticated or is_anonymous to guard against this

effective_principal_as
effective_principal_as(principal_class)

Get the principal who is the subject of the action, i.e. the actor or an impersonated user, cast as a specific class This will raise an exception if the principal is not set or if the actual type is not the one expected You need to call is_authenticated or is_anonymous to guard against this

Source code in shared/iam/auth_context/model.py
def effective_principal_as(self, principal_class: type[_T]) -> _T:
    """Get the principal who is the subject of the action, i.e. the actor or an impersonated user, cast as a specific class
    This will raise an exception if the principal is not set or if the actual type is not the one expected
    You need to call is_authenticated or is_anonymous to guard against this"""
    if not isinstance(self.effective_principal, principal_class):
        raise ValueError(
            f"Unexpected principal type: {self.effective_principal.__class__.__name__} (expected {principal_class.__name__})"
        )

    return self.effective_principal
from_dict classmethod
from_dict(serialized)
Source code in shared/iam/auth_context/model.py
@classmethod
def from_dict(cls, serialized: dict[str, Any] | None) -> "AuthContext":
    if serialized is None:
        return cls()

    model_registry = DbModel.registry._class_registry
    try:
        real_principal = (
            current_session.get(
                model_registry[serialized["real_principal"]["class"]],  # type: ignore[arg-type]
                serialized["real_principal"]["id"],
            )
            if serialized.get("real_principal")
            else None
        )
    except KeyError:
        raise ValueError(
            f"Invalid serialized AuthContext: real principal class {get_from_path(serialized, ['real_principal', 'class'])} not found in registry"
        )
    try:
        effective_principal = (
            current_session.get(
                model_registry[serialized["effective_principal"]["class"]],  # type: ignore[arg-type]
                serialized["effective_principal"]["id"],
            )
            if serialized.get("effective_principal")
            else None
        )
    except KeyError:
        raise ValueError(
            f"Invalid serialized AuthContext: effective principal class {get_from_path(serialized, ['effective_principal', 'class'])} not found in registry"
        )

    try:
        delegate_principal = (
            current_session.get(
                model_registry[serialized["delegate_principal"]["class"]],  # type: ignore[arg-type]
                serialized["delegate_principal"]["id"],
            )
            if serialized.get("delegate_principal")
            else None
        )
    except KeyError:
        raise ValueError(
            f"Invalid serialized AuthContext: effective principal class {get_from_path(serialized, ['effective_principal', 'class'])} not found in registry"
        )

    return cls(
        UUID(serialized["id"]),
        _real_principal=real_principal,
        _effective_principal=effective_principal,
        _delegate_principal=delegate_principal,
        impersonation_mode=serialized.get("impersonation_mode"),
        session_id=serialized.get("session_id"),
        session_scopes=set(serialized["session_scopes"])
        if "session_scopes" in serialized
        else set(),
        cloudflare_zerotrust_identity=serialized.get(
            "cloudflare_zerotrust_identity"
        ),
        cloudflare_ray_id=serialized.get("cloudflare_ray_id"),
        cloudflare_ip_country=serialized.get("cloudflare_ip_country"),
    )
has_backoffice_permissions
has_backoffice_permissions(permissions, stack_level=2)

For now we only check the permissions of the real principal, i.e. the actor - as there is no impersonation / delegation on backoffice

Source code in shared/iam/auth_context/model.py
def has_backoffice_permissions(
    self,
    permissions: EmployeePermission | set[EmployeePermission],
    stack_level: int = 2,
) -> bool:
    """For now we only check the permissions of the real principal, i.e. the actor - as there is no impersonation / delegation on backoffice"""
    from shared.iam.permissions import permissions_intersects

    if self.is_anonymous:
        return False

    principal = self.real_principal
    actor_permissions = (
        principal.permissions
        if principal.auth_principal_type == AuthPrincipalType.back_office
        and hasattr(principal, "permissions")
        else principal.alan_employee.permissions
        if hasattr(principal, "alan_employee")
        else set()
    )

    return permissions_intersects(
        permissions,
        actor_permissions,
        principal,
        stack_level + 1,
    )
id class-attribute instance-attribute
id = field(default_factory=uuid4)
impersonation_mode class-attribute instance-attribute
impersonation_mode = None
is_anonymous property
is_anonymous
is_authenticated property
is_authenticated
is_delegated property
is_delegated
is_impersonated property
is_impersonated
real_principal property
real_principal

Get the actual principal or 'actor', i.e. who is performing the action This will raise an exception if it's not set You need to call is_authenticated or is_anonymous to guard against this

real_principal_as
real_principal_as(principal_class)

Get the actual principal or 'actor', i.e. who is performing the action, cast as a specific class This will raise an exception if the principal is not set or if the actual type is not the one expected This will raise an exception if it's not set You need to call is_authenticated or is_anonymous to guard against this

Source code in shared/iam/auth_context/model.py
def real_principal_as(self, principal_class: type[_T]) -> _T:
    """Get the actual principal or 'actor', i.e. who is performing the action, cast as a specific class
    This will raise an exception if the principal is not set or if the actual type is not the one expected
    This will raise an exception if it's not set
    You need to call is_authenticated or is_anonymous to guard against this"""
    if not isinstance(self.real_principal, principal_class):
        raise ValueError(
            f"Unexpected principal type: {self.real_principal.__class__.__name__} (expected {principal_class.__name__})"
        )
    return self.real_principal
session_id class-attribute instance-attribute
session_id = None
session_scopes class-attribute instance-attribute
session_scopes = field(default_factory=set)
to_dict
to_dict()
Source code in shared/iam/auth_context/model.py
def to_dict(self) -> dict[str, Any]:
    serialized: dict[str, Any] = {
        "id": str(self.id),
    }
    if self.is_authenticated:
        serialized["real_principal"] = {
            "class": self.real_principal.__class__.__name__,
            "id": str(self._real_principal_id),  # noqa:ALN027
        }
        serialized["effective_principal"] = {
            "class": self.effective_principal.__class__.__name__,
            "id": str(self._effective_principal_id),  # noqa:ALN027
        }
    if self.impersonation_mode:
        serialized["impersonation_mode"] = str(self.impersonation_mode)

    if self.delegate_principal:
        serialized["delegate_principal"] = {
            "class": self.delegate_principal.__class__.__name__,
            "id": str(self._delegate_principal_id),  # noqa: ALN027
        }

    if self.session_id:
        serialized["session_id"] = str(self.session_id)

    if self.session_scopes:
        serialized["session_scopes"] = list(self.session_scopes)
    if self.cloudflare_zerotrust_identity:
        serialized["cloudflare_zerotrust_identity"] = (
            self.cloudflare_zerotrust_identity
        )
    if self.cloudflare_ray_id:
        serialized["cloudflare_ray_id"] = self.cloudflare_ray_id
    if self.cloudflare_ip_country:
        serialized["cloudflare_ip_country"] = self.cloudflare_ip_country

    return serialized

providers

anonymous

AnonymousAuthContextProvider

Bases: AuthContextProvider

AuthContextProvider for anonymous users.

If not explicitly added to the auth context providers chain, anonymous requests will be denied. WARNING: this currently cannot be used in a chain with other providers because it ALWAYS handles the request whereas one and only one provider in a chain is allowed to do it (they must be mutually exclusive) This is because some "anonymous" endpoints actually check e.g. authorization headers so we can't exclude them here This should be solved once all "anonymous" endpoints that actually perform authentication are moved to a proper auth provider context

set_auth_context_from_request
set_auth_context_from_request()
Source code in shared/iam/auth_context/providers/anonymous.py
@override
def set_auth_context_from_request(self) -> None:
    # current_auth_context is anonymous by default
    # In the future we could still store additional context like geo IP, etc.
    return
will_handle_request
will_handle_request()
Source code in shared/iam/auth_context/providers/anonymous.py
@override
def will_handle_request(self) -> bool:
    return True

base

AuthContextProvider

Bases: ABC

Base class for auth context providers

An auth context provider is responsible for extracting the authentication context from the request

get_auth_principal_from_email staticmethod
get_auth_principal_from_email(email, *auth_principal_types)
Source code in shared/iam/auth_context/providers/base.py
@staticmethod
def get_auth_principal_from_email(
    email: str, *auth_principal_types: type[AuthPrincipal]
) -> AuthPrincipal | None:
    user: AuthPrincipal | None = None
    for auth_principal_type in auth_principal_types:
        query_filter: dict[str, str] = dict()
        query_filter[auth_principal_type.__identifier_column__] = email
        user = current_session.scalar(
            select(auth_principal_type).filter_by(**query_filter)
        )
        if user is not None and user.is_active:
            break
    return user
set_auth_context_from_request abstractmethod
set_auth_context_from_request()
Source code in shared/iam/auth_context/providers/base.py
@abstractmethod
def set_auth_context_from_request(self) -> None:
    raise NotImplementedError
will_handle_request abstractmethod
will_handle_request()
Source code in shared/iam/auth_context/providers/base.py
@abstractmethod
def will_handle_request(self) -> bool:
    raise NotImplementedError

dev

DevAuthContextProvider
DevAuthContextProvider(*auth_principal_types)

Bases: AuthContextProvider

Source code in shared/iam/auth_context/providers/dev.py
def __init__(self, *auth_principal_types: type[AuthPrincipal]):
    self.auth_principal_types = auth_principal_types
auth_principal_types instance-attribute
auth_principal_types = auth_principal_types
set_auth_context_from_request
set_auth_context_from_request()
Source code in shared/iam/auth_context/providers/dev.py
@override
def set_auth_context_from_request(self) -> None:
    email: str = f"{get_current_user()}@alan.eu"
    principal = self.get_auth_principal_from_email(
        email, *self.auth_principal_types
    )
    if principal is None:
        abort(403, message=f"Could not find principal with email {email}")
    set_auth_context(real_principal=principal)
    return
will_handle_request
will_handle_request()
Source code in shared/iam/auth_context/providers/dev.py
@override
def will_handle_request(self) -> bool:
    if "authorization" in request.headers:
        return False
    return True

helpers

P module-attribute
P = ParamSpec('P')
T module-attribute
T = TypeVar('T')
set_auth_context_from_provider_chain
set_auth_context_from_provider_chain(
    *auth_context_providers,
)
Source code in shared/iam/auth_context/providers/helpers.py
def set_auth_context_from_provider_chain(
    *auth_context_providers: AuthContextProvider,
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    def decorator(f: Callable[P, T]) -> Callable[P, T]:
        @wraps(f)
        def decorated(*args: P.args, **kwargs: P.kwargs) -> T:
            _do_set_auth_context_from_provider_chain(*auth_context_providers)

            return f(*args, **kwargs)

        return decorated

    return decorator
set_auth_context_from_provider_chain_before_request
set_auth_context_from_provider_chain_before_request(
    *auth_context_providers,
)
Source code in shared/iam/auth_context/providers/helpers.py
def set_auth_context_from_provider_chain_before_request(
    *auth_context_providers: AuthContextProvider,
) -> Callable[[], None]:
    @set_auth_context_from_provider_chain(*auth_context_providers)
    def before_request() -> None:
        pass

    return before_request

service_account

ServiceAccountAuthContextProvider
ServiceAccountAuthContextProvider(*auth_principal_types)

Bases: AuthContextProvider

Source code in shared/iam/auth_context/providers/service_account.py
def __init__(self, *auth_principal_types: type[AuthPrincipal]):
    self.auth_principal_types = auth_principal_types
auth_principal_types instance-attribute
auth_principal_types = auth_principal_types
get_email_from_access_token
get_email_from_access_token(access_token)
Source code in shared/iam/auth_context/providers/service_account.py
@cached_for(hours=1)
def get_email_from_access_token(self, access_token: str) -> str:
    from google.oauth2.credentials import Credentials
    from googleapiclient.discovery import build

    credentials = Credentials(access_token)  # type: ignore[no-untyped-call]
    user_info_service = build("oauth2", "v2", credentials=credentials)
    user_info = user_info_service.userinfo().get().execute()

    email = cast("str", user_info["email"])

    return email
set_auth_context_from_request
set_auth_context_from_request()
Source code in shared/iam/auth_context/providers/service_account.py
@override
def set_auth_context_from_request(self) -> None:
    access_token = get_access_token()

    email = self.get_email_from_access_token(access_token)
    if not email.endswith(".gserviceaccount.com"):
        abort(403, message="Token not from service account")

    service_account = self.get_auth_principal_from_email(
        email, *self.auth_principal_types
    )
    if not service_account:
        abort(403, message="Invalid email")

    set_auth_context(
        real_principal=service_account,
        cloudflare_zerotrust_identity=get_zerotrust_identity(),
    )
will_handle_request
will_handle_request()
Source code in shared/iam/auth_context/providers/service_account.py
@override
def will_handle_request(self) -> bool:
    # We'll probably have to add a differentiator e.g. a custom header value
    # to allow multiple auth context providers using the Authorization header
    if "authorization" not in request.headers:
        return False

    return True

webhook

WebhookAuthContextProvider
WebhookAuthContextProvider(
    auth_type,
    header_name,
    secret_name_config_key,
    auth_principal_type,
    auth_principal_email,
)

Bases: AuthContextProvider

Validate a webhook payload and set a static principal value in the authentication context

Typically the principal would be a specific service account corresponding to the 3rd party sending the payload

Source code in shared/iam/auth_context/providers/webhook.py
def __init__(
    self,
    auth_type: WebhookAuthType,
    header_name: str,
    secret_name_config_key: str,
    auth_principal_type: type[AuthPrincipal],
    auth_principal_email: str,
):
    self.auth_type = auth_type
    self.secret_name_config_key = secret_name_config_key
    self.header_name = header_name
    self.auth_principal_type = auth_principal_type
    self.auth_principal_email = auth_principal_email
auth_principal_email instance-attribute
auth_principal_email = auth_principal_email
auth_principal_type instance-attribute
auth_principal_type = auth_principal_type
auth_type instance-attribute
auth_type = auth_type
header_name instance-attribute
header_name = header_name
secret_name_config_key instance-attribute
secret_name_config_key = secret_name_config_key
set_auth_context_from_request
set_auth_context_from_request()
Source code in shared/iam/auth_context/providers/webhook.py
@override
def set_auth_context_from_request(self) -> None:
    header_value = request.headers[self.header_name]

    if self.auth_type == WebhookAuthType.secret:
        secret_value = raw_secret_from_config(self.secret_name_config_key)
        if secret_value is None:
            abort(403, message="Missing secret in config")

        if header_value != secret_value:
            abort(403, message="Invalid webhook secret")

    elif self.auth_type == WebhookAuthType.sha256:
        signing_key = raw_secret_from_config(self.secret_name_config_key)
        if signing_key is None:
            abort(403, message="Missing signing key")

        computed_signature = f"sha256={hmac.new(signing_key.encode('utf-8'), request.data, hashlib.sha256).hexdigest()}"
        payload_signature = header_value
        if computed_signature != payload_signature:
            if is_development_mode():
                current_logger.warning(
                    "Invalid webhook signature (dev mode)",
                    computed_signature=computed_signature,
                    payload_signature=payload_signature,
                )
            else:
                abort(403, message="Invalid webhook signature")

    principal = self.get_auth_principal_from_email(
        self.auth_principal_email, self.auth_principal_type
    )
    if principal is None:
        current_logger.error(
            "Misconfigured service account for WebhookAuthContextProvider",
            service_account_email=self.auth_principal_email,
        )
        # TODO: @olivier.sambourg 2026-06-30 abort webhook incoming requests when service account is not found
        return

    set_auth_context(real_principal=principal)
    return
will_handle_request
will_handle_request()
Source code in shared/iam/auth_context/providers/webhook.py
@override
def will_handle_request(self) -> bool:
    header_value = request.headers.get(self.header_name)
    if header_value is None:
        return False

    return True
WebhookAuthType

Bases: AlanBaseEnum

secret class-attribute instance-attribute
secret = 'secret'
sha256 class-attribute instance-attribute
sha256 = 'sha256'

zero_trust

ZeroTrustAuthContextProvider
ZeroTrustAuthContextProvider(
    *auth_principal_types, aud_config_key=None
)

Bases: AuthContextProvider

Source code in shared/iam/auth_context/providers/zero_trust.py
def __init__(
    self,
    *auth_principal_types: type[AuthPrincipal],
    aud_config_key: str | None = None,
):
    self.aud_config_key = aud_config_key or "ZEROTRUST_JWT_AUDIENCE"
    self.auth_principal_types = auth_principal_types
aud_config_key instance-attribute
aud_config_key = aud_config_key or 'ZEROTRUST_JWT_AUDIENCE'
auth_principal_types instance-attribute
auth_principal_types = auth_principal_types
set_auth_context_from_request
set_auth_context_from_request()
Source code in shared/iam/auth_context/providers/zero_trust.py
@override
def set_auth_context_from_request(self) -> None:
    token = get_zerotrust_authorization_token_payload(
        current_config[self.aud_config_key]
    )
    if token is None:
        abort(403, message="Invalid Zerotrust token")

    email = token.get("email")
    if email is None:
        abort(
            403, message="Zerotrust service token was used without a bearer token"
        )

    user = self.get_auth_principal_from_email(email, *self.auth_principal_types)
    if not user:
        abort(403, message="Invalid email")

    set_auth_context(
        real_principal=user,
        cloudflare_zerotrust_identity=get_zerotrust_identity(),
    )
    return
will_handle_request
will_handle_request()
Source code in shared/iam/auth_context/providers/zero_trust.py
@override
def will_handle_request(self) -> bool:
    if "authorization" in request.headers:
        return False
    if get_zerotrust_authorization_token() is None:
        return False

    return True

shared.iam.auth_principal

AuthPrincipal

Base mixin for all user models which are allowed to authenticate on our apps, i.e. both users (frontoffice) and Alan admins (backoffice) Principals can be individual people, computers, or services - any entity that can be authenticated.

__auth_principal_type__ instance-attribute

__auth_principal_type__

__identifier_column__ class-attribute instance-attribute

__identifier_column__ = 'email'

auth_principal_type property

auth_principal_type

id instance-attribute

id

is_active abstractmethod property

is_active

principal_name abstractmethod property

principal_name

A name that uniquely identifies this principal

AuthPrincipalType

Bases: AlanBaseEnum

back_office class-attribute instance-attribute

back_office = 'back_office'

front_office class-attribute instance-attribute

front_office = 'front_office'

shared.iam.authenticatable

Authenticatable

Bases: AlanModelWithPrivacyMode, LangMixin

alan_employee instance-attribute

alan_employee

auth_token

auth_token(expiration=None, session_id=None)
Source code in shared/iam/authenticatable.py
def auth_token(
    self,
    expiration: int | None = None,
    session_id: uuid.UUID | None = None,
) -> tuple[str, TokenPayload]:
    # Note: session_id is permitted to be None here only for the purposes of
    # the testsuite.

    if session_id is None and not is_test_mode():
        raise ValueError(f"missing session_id in {get_env_name()} environment")

    issued_at = int(time.time())
    expiration = expiration or _AUTH_TOKEN_EXPIRATION_TIME
    token_payload = TokenPayload(
        id=str(self.id),
        session_id=str(session_id or TESTSUITE_SESSION_ID),
        expires_at=str(issued_at + expiration),
        # Note: redundant with the automatically generated exp claim!
    )
    return (
        make_token(
            claims=token_payload.to_dict(),
            issued_at=issued_at,
            expires_in=expiration,
        ),
        token_payload,
    )

authentication_id

authentication_id(identity_id)
Source code in shared/iam/authenticatable.py
@authentication_id.setter
@deprecated("use authentication component instead", category=AlanDeprecationWarning)
def authentication_id(self, identity_id: uuid.UUID | None) -> None:
    log_user_attribute_access(self, "authentication_id")
    self.keycloak_id = identity_id

create_session_tokens_and_make_response

create_session_tokens_and_make_response(
    refresh_token_type,
    code=None,
    initial_response_dict=None,
)
Source code in shared/iam/authenticatable.py
def create_session_tokens_and_make_response(
    self,
    refresh_token_type: RefreshTokenType,
    code: int | None = None,
    initial_response_dict: dict[str, Any] | None = None,
) -> Response:
    set_auth_globals_for_regular_user(self)

    RefreshToken = get_current_class(BaseRefreshToken)
    refresh_token = RefreshToken(user=self, token_type=refresh_token_type)
    current_session.add(refresh_token)
    current_session.commit()

    refresh_token.update_last_activity()
    session_tokens = self._create_session_token(refresh_token)
    response_dict = session_tokens.to_dict()

    if refresh_token.token_type == RefreshTokenType.web:
        # For web clients, we provide the tokens as cookies.
        # For mobile, we return them in the response, and expect them to be used as bearer tokens.
        del response_dict["token"]
        del response_dict["refresh_token"]

    if initial_response_dict:
        response_dict.update(initial_response_dict)
    return self.set_cookie_and_make_response(
        refresh_token=refresh_token.token,
        token=session_tokens.token,
        response_dict=response_dict,
        code=code,
    )

created_at instance-attribute

created_at

email

email(email)
Source code in shared/iam/authenticatable.py
@email.setter
@deprecated(
    "email must be updated by using ProfileService methods",
    category=AlanDeprecationWarning,
)
def email(self, email: str | None) -> None:
    new_email = normalize_and_check_email_address_format("_email", email)

    if new_email == self._email:
        return  # No change

    if (
        new_email is not None
        and is_production_mode()
        and generic_alan_email_re.match(new_email) is not None
    ):
        # Only raise in production - no need to control for other environments
        # (this way, no need to fix about 400 tests and will keep acceptance tests easy)
        raise ModelValidationError("Forbidden usage of restricted email address")

    self._email = new_email

email_address_conflict_filter classmethod

email_address_conflict_filter(email)
Source code in shared/iam/authenticatable.py
@classmethod
def email_address_conflict_filter(cls, email: str) -> "Query":  # type: ignore[type-arg]
    return current_session.query(cls).filter_by(_email=email)  # noqa: ALN085

first_name

first_name(first_name)
Source code in shared/iam/authenticatable.py
@first_name.setter
@deprecated(
    "first_name must be updated by using ProfileService methods",
    category=AlanDeprecationWarning,
)
def first_name(self, first_name: str | None) -> None:
    if first_name == self._first_name:
        return
    first_name = validates_name("first_name", first_name)
    self._first_name = first_name

generate_global_password_reset_email classmethod

generate_global_password_reset_email(
    email, client_id, redirect_uri
)

Generate a password reset email through our identity provider

Attributes:

Name Type Description
email str

The email address of the user to trigger a password reset

client_id str

Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")

redirect_uri str | None

The URI to redirect to after performing password reset

Source code in shared/iam/authenticatable.py
@classmethod
def generate_global_password_reset_email(
    cls, email: str, client_id: str, redirect_uri: str | None
) -> None:
    """
    Generate a password reset email through our identity provider

    Attributes:
        email (str): The email address of the user to trigger a password reset
        client_id (str): Identity provider client ID (e.g. "alan-mobile-prod", "fr-web-prod")
        redirect_uri (str | None): The URI to redirect to after performing password reset
    """
    # TODO: @thibaut.caillierez: migrate to use the authentication component

    identity_provider = get_identity_provider()
    identity_provider.generate_password_reset_email(
        email=email, client_id=client_id, redirect_uri=redirect_uri
    )

generate_password_reset_token classmethod

generate_password_reset_token(email)

Generate a password reset token (legacy). This option is deprecated as we are progressively using Keycloak for this. Cf. See Authenticatable.generate_global_password_reset_email

Note: If a user doesn't have an auth identity, it gets created before generating the password reset token.

Attributes:

Name Type Description
email str

The email address of the user to generate a password reset token for

Source code in shared/iam/authenticatable.py
@classmethod
@deprecated(
    "Password reset is now done via Keycloak, use generate_global_password_reset_email instead",
    category=AlanDeprecationWarning,
)
def generate_password_reset_token(
    cls, email: str
) -> tuple["Authenticatable | None", str | bytes | None]:
    """
    Generate a password reset token (legacy).
    This option is deprecated as we are progressively using Keycloak for this.
    Cf. See Authenticatable.generate_global_password_reset_email

    Note: If a user doesn't have an auth identity, it gets created before generating the password reset token.

    Attributes:
        email (str): The email address of the user to generate a password reset token for
    """
    from components.authentication.public.api import (  # noqa: ALN002
        AuthenticationService,
    )

    authentication_service = AuthenticationService.create()

    parsed_email = normalize_email_address_format(email)

    user = current_session.query(cls).filter_by(email=parsed_email).one_or_none()  # noqa: ALN085
    if user is None:
        return None, None

    auth_identity = authentication_service.get_keycloak_identity_by_profile_id(
        user.profile_id  # type: ignore[attr-defined]
    )
    if auth_identity is None:
        current_logger.warning(
            "Generating a password reset token for a user without an identity"
        )
        auth_identity_id = authentication_service.create_identity(
            email=email,
            profile_id=user.profile_id,  # type: ignore[attr-defined]
            language=user.lang,
            first_name=user.first_name,
            last_name=user.last_name,
        )
        current_session.commit()
        auth_identity = authentication_service.get_or_raise_keycloak_identity(
            auth_identity_id
        )

    token = generate_password_reset_token(auth_identity)

    return user, token

get_by_token classmethod

get_by_token(token)
Source code in shared/iam/authenticatable.py
@classmethod
def get_by_token(cls, token: str) -> Optional["Authenticatable"]:
    user, _, _ = cls.parse_token(token)
    return user

id instance-attribute

id

keycloak_id

keycloak_id(identity_id)
Source code in shared/iam/authenticatable.py
@keycloak_id.setter
@deprecated("use authentication component instead", category=AlanDeprecationWarning)
def keycloak_id(self, identity_id: uuid.UUID | None) -> None:
    log_user_attribute_access(self, "keycloak_id")
    self._keycloak_id = identity_id

last_name

last_name(last_name)
Source code in shared/iam/authenticatable.py
@last_name.setter
@deprecated(
    "last_name must be updated by using ProfileService methods",
    category=AlanDeprecationWarning,
)
def last_name(self, last_name: str | None) -> None:
    if last_name == self._last_name:
        return

    last_name = validates_name("last_name", last_name)
    self._last_name = last_name

parse_token classmethod

parse_token(token)

Parse the access token and return the user, session-id and issued-at timestamp if valid (or all None if invalid)

Source code in shared/iam/authenticatable.py
@classmethod
def parse_token(
    cls, token: str
) -> tuple[Optional["Authenticatable"], str | None, int | None]:
    """Parse the access token and return the user, session-id and issued-at timestamp if valid (or all None if invalid)"""
    if not token:
        return None, None, None

    try:
        claims = jwt.decode(token, current_config["SECRET_KEY"])
        claims.validate()
    except JoseError:
        current_logger.warning("invalid Alan token", exc_info=True)
        return None, None, None

    ctx_parse_token = UserContextData(
        key=str(claims["id"]),  # NANI!
        kind="user",
    )
    use_cache_user_id = bool_feature_flag(
        feature_flag_key="killswitch-global-backend-cache-user-id-parse-token",
        context_data=ctx_parse_token,
        default_value=False,
    )
    if use_cache_user_id:
        user = get_and_index_object_in_session(current_session, cls, claims["id"])  # type: ignore[type-var]
    else:
        user = current_session.get(cls, claims["id"])

    return user, claims.get("session_id", None), claims.get("iat", None)

refresh_session_tokens

refresh_session_tokens(token)
Source code in shared/iam/authenticatable.py
def refresh_session_tokens(
    self,
    token: str,
) -> SessionTokens:
    # Refreshing session: mint a new access token, preserving the existing session id
    # retrieved through the refresh token (or assigning a new one if previously not present).

    try:
        refresh_token = get_current_class(
            BaseRefreshToken
        ).find_refresh_token_by_token(token)
    except Exception:
        current_logger.error(
            "could not find session to refresh",
            session_id=_get_session_id_from_headers(),
            exc_info=True,
        )
        raise

    if not refresh_token.is_active:
        current_logger.debug(
            "attempt to refresh inactive session",
            session_id=refresh_token.session_id,
        )
        raise BaseErrorCode.user_session_is_expired()
    current_logger.info("refreshing tokens", session_id=refresh_token.session_id)
    refresh_token.mint_new_token(token)
    refresh_token.update_last_activity()
    token, token_payload = self.auth_token(session_id=refresh_token.session_id)

    publish_user_session_refreshed(
        user=self,
        session_id=refresh_token.session_id,
        session_type=refresh_token.token_type,
    )

    return SessionTokens(
        token=token,
        refresh_token=refresh_token.next_token or refresh_token.token,
        token_payload=token_payload,
    )

revoke_all_tokens

revoke_all_tokens()
Source code in shared/iam/authenticatable.py
def revoke_all_tokens(self) -> None:
    RefreshTokenModel = get_current_class(BaseRefreshToken)
    try:
        deleted = (
            current_session.query(RefreshTokenModel)  # noqa: ALN085
            .filter_by(user_id=self.id)
            .delete()
        )
        current_session.commit()
        current_logger.info(
            f"Revoked {deleted} refresh tokens for user {self.id}",
            user_id=self.id,
        )

    except Exception as e:
        current_session.rollback()
        raise e
set_cookie_and_make_response(
    refresh_token, token, code=None, response_dict=None
)
Source code in shared/iam/authenticatable.py
def set_cookie_and_make_response(
    self,
    refresh_token: str,
    token: str,
    code: int | None = None,
    response_dict: dict[str, Any] | None = None,
) -> Response:
    response = make_json_response(response_dict, code=code)
    self.set_session_cookie(
        refresh_token=refresh_token, token=token, response=response
    )
    return response
set_session_cookie(refresh_token, token, response)
Source code in shared/iam/authenticatable.py
def set_session_cookie(
    self, refresh_token: str, token: str, response: Response
) -> None:
    use_secure_cookie = not is_development_mode()
    # We have some preview deployments on `.fly.dev` domain, needing a Lax policy
    samesite = "Lax" if is_stage_mode() else "Strict"
    response.set_cookie(
        "refresh_token",
        value=refresh_token,
        samesite=samesite,
        secure=use_secure_cookie,
        httponly=True,
        path="/auth/refresh",
    )
    response.set_cookie(
        "token",
        value=token,
        samesite=samesite,
        secure=use_secure_cookie,
        httponly=True,
    )

updated_at instance-attribute

updated_at

validate_email_address

validate_email_address(key, address)
Source code in shared/iam/authenticatable.py
@validates("_email")
def validate_email_address(self, key, address) -> str | None:  # type: ignore[no-untyped-def]
    normalized_address = normalize_and_check_email_address_format(key, address)

    # Check for conflict early in validator even though there is a unique constraint on the column, otherwise
    # the check is done at flush time, and we already have propagated the change to the identity provider.

    self._check_email_address_conflict(normalized_address)
    return normalized_address  # type: ignore[no-any-return]

validate_names

validate_names(_, string)
Source code in shared/iam/authenticatable.py
@validates("_first_name", "_last_name")
def validate_names(self, _: str, string: str | None) -> str | None:
    return validates_trim_spaces(string)

validate_refresh_tokens

validate_refresh_tokens(key, refresh_token)
Source code in shared/iam/authenticatable.py
@validates("refresh_tokens")
def validate_refresh_tokens(
    self,
    key: str,  # noqa: ARG002
    refresh_token: "BaseRefreshToken",
) -> "BaseRefreshToken":
    RefreshTokenModel = get_current_class(BaseRefreshToken)
    for old_token in (
        current_session.query(RefreshTokenModel)  # noqa: ALN085
        .with_parent(self)
        .order_by(RefreshTokenModel.updated_at.desc())
        .offset(MAX_REFRESH_TOKENS_PER_USER - 1)
    ):
        current_session.delete(old_token)
    current_session.commit()
    return refresh_token

verification_error staticmethod

verification_error(attribute, desc, user_cls_name=None)
Source code in shared/iam/authenticatable.py
@staticmethod
def verification_error(
    attribute: str, desc: str | None, user_cls_name: str | None = None
) -> BaseErrorCode:
    return BaseErrorCode.login_error(
        attribute=attribute, desc=desc, user_cls_name=user_cls_name
    )

verify_keycloak_access_token classmethod

verify_keycloak_access_token(token)
Source code in shared/iam/authenticatable.py
@classmethod
def verify_keycloak_access_token(
    cls, token: str
) -> tuple[Optional["Authenticatable"], Optional["BaseErrorCode"]]:
    from ddtrace.appsec import track_user_sdk

    identity_provider = get_identity_provider()
    identity_provider_user_id = identity_provider.find_identity_id_from_token(token)

    if identity_provider_user_id is None:
        # TODO: send email in the request params to track it
        track_user_sdk.track_login_failure("", exists=False)
        return None, Authenticatable.verification_error(
            "access_token", "Error while verifying the identity provider token"
        )

    user = (
        current_session.query(cls)  # noqa: ALN085
        .filter_by(authentication_id=identity_provider_user_id)
        .one_or_none()
    )

    if user is None:
        track_user_sdk.track_login_failure(
            "",
            exists=False,
            metadata={"keycloak_id": identity_provider_user_id},
        )
        return None, Authenticatable.verification_error(
            "keycloak_id", "No user found with this Keycloak ID"
        )

    email = mandatory(user.email)
    track_user_sdk.track_login_success(
        email, user_id=user.id, metadata={"usr.email": email}
    )
    return user, None

verify_refresh_token staticmethod

verify_refresh_token(token)
Source code in shared/iam/authenticatable.py
@staticmethod
def verify_refresh_token(token: str) -> Optional["Authenticatable"]:
    if not token:
        current_logger.warning(
            "missing refresh token",
            session_id=_get_session_id_from_headers(),
        )
        return None

    RefreshTokenModel = get_current_class(BaseRefreshToken)
    try:
        refresh_token = RefreshTokenModel.find_refresh_token_by_token(token)
        return refresh_token.user

    except Exception:
        current_logger.warning(
            "invalid refresh token",
            session_id=_get_session_id_from_headers(),
        )
        return None

TESTSUITE_SESSION_ID module-attribute

TESTSUITE_SESSION_ID = 'no-session-id'

UserId module-attribute

UserId = Union[int, UUID]

jwt module-attribute

jwt = JsonWebToken([JWT_ALGORITHM])

shared.iam.authorization

AlanerAdminStrategy

AlanerAdminStrategy(permitted_for=None)

Bases: BaseAuthorizationStrategy

Used for authorization on resources only accessible by Alaners (like marmot, flask-admin, etc.)

Source code in shared/iam/authorization.py
def __init__(
    self,
    # Any permission in this set will allow the actor to access the resource (not _all_ of these permissions are required)
    permitted_for: set[EmployeePermission] | None = None,
) -> None:
    super().__init__()
    if not permitted_for:
        raise Exception(
            "You must specify at least one permission that is required to access an endpoint with AlanerAdminStrategy"
            " (Action: add EmployeePermission.xxx to permitted_for arguments list)"
        )

    self.permitted_for = permitted_for

authentication_required class-attribute instance-attribute

authentication_required = True

authorize

authorize(Controller, method_name, call_args, call_kwargs)
Source code in shared/iam/authorization.py
@contextmanager
def authorize(self, Controller, method_name, call_args, call_kwargs):  # type: ignore[no-untyped-def]  # noqa: ARG002
    with self._ensure_custom_authorization():
        if not is_alaner_admin_with_permissions_or_raise(
            self.permitted_for,
            operation=f"{Controller.name if hasattr(Controller, 'name') else Controller.__name__}.{method_name}",
        ):
            raise RestrictToPermissionsExceptionNotFound()
        yield

ensure_custom_authorization class-attribute instance-attribute

ensure_custom_authorization = False

permitted_for instance-attribute

permitted_for = permitted_for

AuthenticatedStrategy

AuthenticatedStrategy(allow_deep_link=False)

Bases: BaseAuthorizationStrategy

Source code in shared/iam/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    self.allow_deep_link = allow_deep_link

authentication_required class-attribute instance-attribute

authentication_required = True

authorize

authorize(Controller, method_name, call_args, call_kwargs)
Source code in shared/iam/authorization.py
@contextmanager
def authorize(self, Controller, method_name, call_args, call_kwargs):  # type: ignore[no-untyped-def]  # noqa: ARG002
    with self._ensure_custom_authorization():
        yield

ensure_custom_authorization class-attribute instance-attribute

ensure_custom_authorization = False

AuthenticatedWithCustomAuthorizationStrategy

AuthenticatedWithCustomAuthorizationStrategy(
    allow_deep_link=False,
)

Bases: AuthenticatedStrategy

Source code in shared/iam/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    self.allow_deep_link = allow_deep_link

authentication_required class-attribute instance-attribute

authentication_required = True

ensure_custom_authorization class-attribute instance-attribute

ensure_custom_authorization = True

BaseAuthorizationStrategy

BaseAuthorizationStrategy(allow_deep_link=False)

Bases: ABC

Source code in shared/iam/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    self.allow_deep_link = allow_deep_link
allow_deep_link = allow_deep_link

authentication_required class-attribute instance-attribute

authentication_required = True

authenticator

authenticator()
Source code in shared/iam/authorization.py
def authenticator(self) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
    return TokenAuth(
        optional=not self.authentication_required,
        allow_deep_link=self.allow_deep_link,
        cookie_name="token",
    ).login_required

authorize abstractmethod

authorize(Controller, method_name, call_args, call_kwargs)
Source code in shared/iam/authorization.py
@abstractmethod
@contextmanager
def authorize(self, Controller, method_name, call_args, call_kwargs):  # type: ignore[no-untyped-def]
    # NOTE: Wrap your code into the self._ensure_custom_authorization context manager to support ensure_custom_authorization
    raise NotImplementedError(
        "abstract method must be overridden, supporting ensure_custom_authorization"
    )

ensure_custom_authorization class-attribute instance-attribute

ensure_custom_authorization = False

OpenStrategy

OpenStrategy(allow_deep_link=False)

Bases: AuthenticatedStrategy

Source code in shared/iam/authorization.py
def __init__(self, allow_deep_link: bool = False) -> None:
    self.allow_deep_link = allow_deep_link

authentication_required class-attribute instance-attribute

authentication_required = False

ensure_custom_authorization class-attribute instance-attribute

ensure_custom_authorization = False

OwnerOnlyStrategy

OwnerOnlyStrategy(
    owner_bypass_permitted_for=None, allow_deep_link=False
)

Bases: BaseAuthorizationStrategy

Strategy that delegates authorization to the owner_only strategy provided by the controller it applies to.

Source code in shared/iam/authorization.py
def __init__(
    self,
    owner_bypass_permitted_for: set[EmployeePermission] | None = None,
    allow_deep_link: bool = False,
) -> None:
    super().__init__(allow_deep_link=allow_deep_link)
    self.owner_bypass_permitted_for = owner_bypass_permitted_for

authentication_required class-attribute instance-attribute

authentication_required = True

authorize

authorize(Controller, method_name, call_args, call_kwargs)
Source code in shared/iam/authorization.py
@contextmanager
def authorize(self, Controller, method_name, call_args, call_kwargs):  # type: ignore[no-untyped-def]
    with self._ensure_custom_authorization():
        if not hasattr(Controller, "owner_only"):
            raise BaseErrorCode.authorization_error(
                message=f"Unable to check owner for Controller {Controller}"
            )
        # we're calling the method here, after checking it exists
        owner_only_strategy: BaseAuthorizationStrategy = Controller.owner_only(  # noqa: ALN046
            param_name="id",
            owner_bypass_permitted_for=self.owner_bypass_permitted_for,
            allow_deep_link=self.allow_deep_link,
        )
        with owner_only_strategy.authorize(
            Controller=Controller,
            method_name=method_name,
            call_args=call_args,
            call_kwargs=call_kwargs,
        ):
            yield

ensure_custom_authorization class-attribute instance-attribute

ensure_custom_authorization = False

owner_bypass_permitted_for instance-attribute

owner_bypass_permitted_for = owner_bypass_permitted_for

admin_can_access_sensitive_company

admin_can_access_sensitive_company()
Source code in shared/iam/authorization.py
def admin_can_access_sensitive_company():  # type: ignore[no-untyped-def]
    try:
        can_access = is_alaner_admin_with_permissions_or_raise(
            {EmployeePermission.access_sensitive_company},
            "checking company access rights",
        )
    except RestrictToPermissionsExceptionForbidden:
        can_access = False

    return can_access

admin_can_access_sensitive_user

admin_can_access_sensitive_user()
Source code in shared/iam/authorization.py
def admin_can_access_sensitive_user():  # type: ignore[no-untyped-def]
    try:
        can_access = is_alaner_admin_with_permissions_or_raise(
            {EmployeePermission.access_sensitive_user},
            "checking user access rights",
        )

    except RestrictToPermissionsExceptionForbidden:
        can_access = False

    return can_access

check_is_owner

check_is_owner(
    controller,
    object_id,
    operation,
    owner_bypass_permitted_for=None,
)
Source code in shared/iam/authorization.py
def check_is_owner(  # type: ignore[no-untyped-def]
    controller,
    object_id,
    operation: str,
    owner_bypass_permitted_for: set[EmployeePermission] | None = None,
):
    owner_bypass_permitted_for = owner_bypass_permitted_for or set()
    check_access = (
        controller.can_read if request.method in ["GET"] else controller.can_write
    )

    if check_access(g.current_user, object_id) or (
        is_alaner_admin_with_permissions_or_raise(
            owner_bypass_permitted_for, operation=operation
        )
    ):
        return

    raise RestrictToPermissionsExceptionNotFound()

custom_authorization

custom_authorization(authorization_function)

This decorator is used to wrap custom authorization functions that are used in the context of AuthenticatedWithCustomAuthorizationStrategy.

Source code in shared/iam/authorization.py
def custom_authorization(
    authorization_function: Callable[_P, bool],
) -> Callable[_P, None]:
    """
    This decorator is used to wrap custom authorization functions that are used in
    the context of AuthenticatedWithCustomAuthorizationStrategy.
    """

    @functools.wraps(authorization_function)
    def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]
        is_authorized = authorization_function(*args, **kwargs)

        if is_authorized is not True:
            raise RestrictToPermissionsExceptionNotFound()
        else:
            _authenticated_with_custom_authorization_strategy_has_run.set(True)

    return cast("Callable[_P, None]", wrapper)

shared.iam.bearer_token_auth

BearerTokenAuth

BearerTokenAuth(
    optional,
    allow_deep_link=False,
    restrict_location_for_alaner_admin=True,
)

Bases: HTTPTokenAuth

Bearer token authentication

This class authenticates a request based on the presence of a valid Bearer token.

Normally, a request without a bearer token is simply rejected as unauthenticated. This happens in particular when an endpoint is browsed directly from a browser (through a deep link to the endpoint).

If an endpoint is marked as allowing deep links, instead of rejecting requests without a token, we redirect them to a special route in the front-end, which will allow the user to authenticate, and then make the same request again, this time with a proper token, and present the result to the user.

Source code in shared/iam/bearer_token_auth.py
def __init__(
    self,
    optional: bool,
    allow_deep_link: bool = False,
    restrict_location_for_alaner_admin: bool = True,
) -> None:
    super().__init__(verify_token_callback=self.verify_token)
    self.optional = optional
    self.allow_deep_link = allow_deep_link
    self.restrict_location_for_alaner_admin = restrict_location_for_alaner_admin
allow_deep_link = allow_deep_link

optional instance-attribute

optional = optional

restrict_location_for_alaner_admin instance-attribute

restrict_location_for_alaner_admin = (
    restrict_location_for_alaner_admin
)

verify_token

verify_token(token)
Source code in shared/iam/bearer_token_auth.py
def verify_token(self, token: str):  # type: ignore[no-untyped-def]
    reset_auth_globals()

    self._token = token
    # Save token value so that error handler can test it

    # Reject attempt to access a backend endpoint through the front-end route
    # for deep links (identified by the special BACKEND_DEEPLINK_HEADER) if this
    # endpoint is not deep-linkable (allow_deep_link flag).

    if BACKEND_DEEPLINK_HEADER in request.headers and not self.allow_deep_link:
        return False

    if claims_to_be_alaner_admin():  # type: ignore[no-untyped-call]
        return self._verify_auth_token_for_alaner_admin_user(
            token,
            restrict_location=self.restrict_location_for_alaner_admin,
            non_blocking=self.optional,
        )
    else:
        return self._verify_auth_token_for_regular_user(
            token, non_blocking=self.optional
        )

shared.iam.cookie_token_auth

CookieTokenAuth

CookieTokenAuth(cookie_name, verify_token_callback)
Source code in shared/iam/cookie_token_auth.py
def __init__(self, cookie_name, verify_token_callback) -> None:  # type: ignore[no-untyped-def]
    self.cookie_name = cookie_name
    self.verify_token_callback = verify_token_callback

cookie_name instance-attribute

cookie_name = cookie_name

login_required

login_required(f)
Source code in shared/iam/cookie_token_auth.py
def login_required(self, f):  # type: ignore[no-untyped-def]
    @wraps(f)
    def decorated(*args, **kwargs):  # type: ignore[no-untyped-def]
        with tracer.trace(
            "shared.iam.cookie_token_auth.CookieTokenAuth.login_required check"
        ):
            cookie = request.cookies.get(self.cookie_name, "")
            try:
                token_is_verified = self.verify_token_callback(cookie)
            except BaseErrorCode as exc:
                return make_json_response(
                    data=dict(exc),
                    code=401,
                )
            if not token_is_verified:
                return make_json_response(
                    data=dict(BaseErrorCode.cookie_token_error()),
                    code=401,
                )

        return f(*args, **kwargs)

    return decorated

verify_token_callback instance-attribute

verify_token_callback = verify_token_callback

shared.iam.data

tokens

SessionTokens dataclass

SessionTokens(token, refresh_token, token_payload)

Bases: DataClassJsonMixin

refresh_token instance-attribute
refresh_token
to_dict
to_dict(encode_json=False)
Source code in shared/iam/data/tokens.py
def to_dict(self, encode_json: bool = False):  # type: ignore[no-untyped-def]  # noqa: ARG002
    return asdict(self)
token instance-attribute
token
token_payload instance-attribute
token_payload

TokenPayload dataclass

TokenPayload(id, session_id, expires_at)

Bases: DataClassJsonMixin

expires_at instance-attribute
expires_at
id instance-attribute
id
session_id instance-attribute
session_id
to_dict
to_dict(encode_json=False)
Source code in shared/iam/data/tokens.py
def to_dict(self, encode_json: bool = False):  # type: ignore[no-untyped-def]  # noqa: ARG002
    return asdict(self)

TokenResponse dataclass

TokenResponse(
    access_token,
    expires_in,
    refresh_expires_in,
    refresh_token,
    token_type,
    session_state,
    scope,
)

Bases: DataClassJsonMixin

access_token instance-attribute
access_token
expires_in instance-attribute
expires_in
refresh_expires_in instance-attribute
refresh_expires_in
refresh_token instance-attribute
refresh_token
scope instance-attribute
scope
session_state instance-attribute
session_state
token_type instance-attribute
token_type

shared.iam.enum

authentication_type

AuthenticationType

Bases: AlanBaseEnum

bearer class-attribute instance-attribute
bearer = 'bearer'
cookie class-attribute instance-attribute
cookie = 'cookie'

shared.iam.exceptions

IamException

Base class for IAM exceptions

RestrictAlanEmployeeInvalid

RestrictAlanEmployeeInvalid(*args, **kwargs)

Bases: Forbidden, IamException

Error: Token validated, but invalid user (cannot recover, show Forbidden 403)

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    Forbidden.__init__(self, *args, **kwargs)
    self.level = "error"

level instance-attribute

level = 'error'

RestrictAlanerAdminExceptionNotFound

RestrictAlanerAdminExceptionNotFound(*args, **kwargs)

Bases: NotFound, IamException

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    NotFound.__init__(self, *args, **kwargs)
    self.level = "warning"

level instance-attribute

level = 'warning'

RestrictInvalidToken

RestrictInvalidToken(*args, **kwargs)

Bases: Forbidden, IamException

Warning: Invalid token which is expired or malformed (can recover, restart authentication cinematic)

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    Forbidden.__init__(self, *args, **kwargs)
    self.level = "warning"

level instance-attribute

level = 'warning'

RestrictToAlmerysExceptionNotFound

RestrictToAlmerysExceptionNotFound(*args, **kwargs)

Bases: NotFound, IamException

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    NotFound.__init__(self, *args, **kwargs)
    self.level = "warning"

level instance-attribute

level = 'warning'

RestrictToPermissionsExceptionForbidden

RestrictToPermissionsExceptionForbidden(*args, **kwargs)

Bases: Forbidden, IamException

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    NotFound.__init__(self, *args, **kwargs)
    self.level = "warning"

level instance-attribute

level = 'warning'

RestrictToPermissionsExceptionNotFound

RestrictToPermissionsExceptionNotFound(*args, **kwargs)

Bases: NotFound, IamException

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    NotFound.__init__(self, *args, **kwargs)
    self.level = "warning"

level instance-attribute

level = 'warning'

RestrictToWhitelistedIpsNotFound

RestrictToWhitelistedIpsNotFound(*args, **kwargs)

Bases: NotFound, IamException

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    NotFound.__init__(self, *args, **kwargs)
    self.level = "warning"

level instance-attribute

level = 'warning'

RestrictToZerotrustAuthenticatedUserNotFound

RestrictToZerotrustAuthenticatedUserNotFound(
    *args, **kwargs
)

Bases: NotFound, IamException

Source code in shared/iam/exceptions.py
def __init__(self, *args, **kwargs) -> None:  # type: ignore[no-untyped-def]
    NotFound.__init__(self, *args, **kwargs)
    self.level = "warning"

level instance-attribute

level = 'warning'

shared.iam.gdpr

AMPLITUDE_ENDPOINT module-attribute

AMPLITUDE_ENDPOINT = (
    "https://amplitude.com/api/2/deletions/users"
)

AMPLITUDE_HEADERS module-attribute

AMPLITUDE_HEADERS = {
    "Content-Type": "application/json",
    "Accept": "application/json",
}

gdpr_erasure

gdpr_erasure(user, profile_service, authentication_service)
Source code in shared/iam/gdpr.py
def gdpr_erasure(
    user: Authenticatable,
    profile_service: "ProfileService",
    authentication_service: "AuthenticationService",
) -> None:
    if hasattr(user, "profile_id"):
        profile = profile_service.get_profile(user.profile_id)

        emails = [getattr(user, "pro_email", None)]
        if profile is not None and profile.email is not None:
            emails.append(profile.email)
    else:
        emails = [user.email, getattr(user, "pro_email", None)]

    if hasattr(user, "has_opted_in_to_commercial_emails"):
        user.has_opted_in_to_commercial_emails = False
        current_session.commit()
        current_logger.info("User unsubscribed from commercial emails")

    redacted_email = invalidate_email(user.id, reason="deleted-user")
    if profile_id := getattr(user, "profile_id", None):
        profile_service.delete_email(
            profile_id=profile_id, redacted_value=redacted_email
        )
        current_logger.info(f"Redacted _email to {redacted_email}")

        user_identity = authentication_service.get_keycloak_identity_by_profile_id(
            profile_id=profile_id
        )
        if user_identity:
            authentication_service.delete_identity(user_identity.id)
            current_logger.info(
                f"Deleted identity {user_identity.id} for user {user.id}"
            )
        current_session.commit()

    user.revoke_all_tokens()
    remove_tracking(str(user.id), emails)

remove_tracking

remove_tracking(user_id, emails)
Source code in shared/iam/gdpr.py
def remove_tracking(user_id: str, emails: list[str | None]) -> None:
    # CustomerIO
    customer_io_client = BaseCustomerIoClient()

    customer_io_client.delete(customer_id=user_id)
    for email in emails:
        if email:
            customer_io_client.delete(customer_id=email)

    current_logger.info("Member deleted in CustomerIO")

    # Amplitude
    auth = _get_amplitude_auth_from_env()

    if alan_employee := g.current_user.alan_employee:
        requester = alan_employee.alan_email
    else:
        raise Exception("Only Alan employees can delete users in Amplitude")

    r = requests.post(
        url=AMPLITUDE_ENDPOINT,
        auth=auth,
        data=json.dumps({"user_ids": [user_id], "requester": requester}),
        headers=AMPLITUDE_HEADERS,
        timeout=60,
    )
    try:
        current_logger.info(f"Member deleted in Amplitude: {r.json()}")
    except JSONDecodeError:
        current_logger.info(f"Response from Amplitude: {r.status_code} - {r.text}")

    # Segment
    SegmentPublicAPIClient().delete_user(user_id)
    current_logger.info("Member deletion request submitted to Segment")

shared.iam.global_authorization

GlobalAuthorizationStrategies

GlobalAuthorizationStrategies()

Bases: IsomorphicAuthorizationStrategies

Source code in shared/iam/global_authorization.py
def __init__(self) -> None:
    app_name = get_current_app_name()
    self.refresh_token_cls = get_current_class(BaseRefreshToken)
    match app_name:
        case AppName.ALAN_FR:
            from components.fr.public.auth.authorization import (  # noqa: ALN002
                AuthorizationStrategies as FrAuthorizationStrategies,
            )

            self.open = FrAuthorizationStrategies.open
            self.authenticated = FrAuthorizationStrategies.authenticated
            self.owner_only = FrAuthorizationStrategies.owner_only  # noqa: ALN046
            self.alaner_admin = FrAuthorizationStrategies.alaner_admin

        case AppName.ALAN_BE:
            from components.be.public.auth.authorization import (  # noqa: ALN002
                AuthorizationStrategies as BeAuthorizationStrategies,
            )

            self.open = BeAuthorizationStrategies.open
            self.authenticated = BeAuthorizationStrategies.authenticated
            self.owner_only = BeAuthorizationStrategies.owner_only  # noqa: ALN046
            self.alaner_admin = BeAuthorizationStrategies.alaner_admin
        case AppName.ALAN_ES:
            from components.es.public.auth.authorization import (  # noqa: ALN002
                AuthorizationStrategies as EsAuthorizationStrategies,
            )

            self.open = EsAuthorizationStrategies.open
            self.authenticated = EsAuthorizationStrategies.authenticated
            self.owner_only = EsAuthorizationStrategies.owner_only  # noqa: ALN046
            self.alaner_admin = EsAuthorizationStrategies.alaner_admin
        case AppName.ALAN_CA:
            from components.ca.public.auth.authorization import (  # noqa: ALN002
                AuthorizationStrategies as CaAuthorizationStrategies,
            )

            self.open = CaAuthorizationStrategies.open
            self.authenticated = CaAuthorizationStrategies.authenticated
            self.owner_only = CaAuthorizationStrategies.owner_only  # noqa: ALN046
            self.alaner_admin = CaAuthorizationStrategies.alaner_admin
        case AppName.SHARED_TESTING:
            from shared.iam.tests.authorisation_strategies import (
                AuthorizationStrategies as TestAuthorizationStrategies,
            )

            self.open = TestAuthorizationStrategies.open
            self.authenticated = TestAuthorizationStrategies.authenticated
            self.owner_only = TestAuthorizationStrategies.owner_only  # noqa: ALN046
            self.alaner_admin = TestAuthorizationStrategies.alaner_admin

        case _:
            raise NotImplementedError(f"Unknown app: {app_name}")

alaner_admin instance-attribute

alaner_admin = alaner_admin

authenticated instance-attribute

authenticated = authenticated

open instance-attribute

open = open

owner_only instance-attribute

owner_only = owner_only

refresh_token_cls instance-attribute

refresh_token_cls = get_current_class(BaseRefreshToken)

IsomorphicAuthorizationStrategies

alaner_admin instance-attribute

alaner_admin

authenticated instance-attribute

authenticated

open instance-attribute

open

owner_only instance-attribute

owner_only

refresh_token_cls instance-attribute

refresh_token_cls

not_implemented_strategy

not_implemented_strategy()
Source code in shared/iam/global_authorization.py
def not_implemented_strategy():  # type: ignore[no-untyped-def]
    raise NotImplementedError(
        "Please don't use the GlobalAuthorizationStrategies class directly, but instantiate it first: GlobalAuthorizationStrategies()"
    )

shared.iam.google_auth

GOOGLE_OAUTH_USER_INFO_URL module-attribute

GOOGLE_OAUTH_USER_INFO_URL = (
    "https://www.googleapis.com/oauth2/v2/userinfo"
)

P module-attribute

P = ParamSpec('P')

T module-attribute

T = TypeVar('T')

get_client_config

get_client_config(client_id, client_secret)

Returns the client config for the OAuth call.

The values nees to be taken from config (current_config.get("ADMIN_OAUTH_CLIENT_ID")). For all environments (acceptance, prod, etc.) from AWS secrets This also needs to match the Google Console configuration.

Source code in shared/iam/google_auth.py
def get_client_config(client_id: str, client_secret: str) -> dict:  # type: ignore[type-arg]
    """
    Returns the client config for the OAuth call.

    The values nees to be taken from config (`current_config.get("ADMIN_OAUTH_CLIENT_ID")`).
    For all environments (acceptance, prod, etc.) from AWS secrets
    This also needs to match the Google Console configuration.
    """
    return {
        "web": {
            "client_id": client_id,
            "client_secret": client_secret,
            "project_id": "airflow-197413",
            "auth_uri": "https://accounts.google.com/o/oauth2/auth",
            "token_uri": "https://oauth2.googleapis.com/token",
            "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
            "redirect_uris": [
                f"{current_config['BASE_URL']}/oauth/authenticated",
                "http://localhost:8001/oauth/authenticated",
                "http://localhost:8003/oauth/authenticated",
                "http://localhost:8004/oauth/authenticated",
                "https://api-stage.alan.com/oauth/authenticated",
                "https://api-acceptance.alan.com/oauth/authenticated",
                "https://admin-api-acceptance.alan.com/oauth/authenticated",
                "https://api.alan.com/oauth/authenticated",
                "https://admin-api.alan.com/oauth/authenticated",
            ],
        }
    }

get_response_unauthorized_access_employee

get_response_unauthorized_access_employee()

Returns a 403 response for invalid alan employee

Source code in shared/iam/google_auth.py
def get_response_unauthorized_access_employee():  # type: ignore[no-untyped-def]
    """Returns a 403 response for invalid alan employee"""
    return make_response(
        "Unauthorized Access. Make sure you are listed as an (active) AlanEmployee in the backend DB.",
        403,
    )

get_response_unauthorized_service_account

get_response_unauthorized_service_account()

Returns a 403 response for invalid service account

Source code in shared/iam/google_auth.py
def get_response_unauthorized_service_account():  # type: ignore[no-untyped-def]
    """Returns a 403 response for invalid service account"""
    return make_response(
        "Unauthorized Access. Make sure there is an active entry for the service account in the AlanEmployee table.",
        403,
    )

google_auth_login_required

google_auth_login_required(f)

Use as annotation to require google auth login for this endpoint.

If the user isn't logged in (no token in session), this method returns a redirect to the google auth URL. The given permitted_for will also validate the user permission. This is used for backend admin URLs (not frontend admin URLs).

If auth_only is True, only require authentication. If auth_only is False, user must have at least one of the permissions from permitted_for. (I.e. this will always fail if permitted_for is empty and auth_only is False).

Source code in shared/iam/google_auth.py
def google_auth_login_required(f: Callable[P, T]) -> Callable[P, T]:
    """
    Use as annotation to require google auth login for this endpoint.

    If the user isn't logged in (no token in session), this method returns a redirect
    to the google auth URL. The given `permitted_for` will also validate the user
    permission. This is used for backend admin URLs (not frontend admin URLs).

    If auth_only is True, only require authentication. If auth_only is False, user must
    have at least one of the permissions from permitted_for. (I.e. this will always fail
    if permitted_for is empty and auth_only is False).
    """

    @wraps(f)
    @restrict_to_privileged_request  # type: ignore[misc]
    def decorated(*args: P.args, **kwargs: P.kwargs) -> T:
        with tracer.trace("shared.iam.google_auth.google_auth_login_required check"):
            if is_oauth_enabled():
                reset_auth_globals()

                if SESSION_KEY_TOKEN_CLAIMS not in session:
                    # The token is not in session, we redirect the authorization URL
                    # and store the current URL to return later
                    return redirect_to_authorize(request.url)  # type: ignore[no-any-return]
                try:
                    user = get_alaner_admin_user_from_claims(
                        session[SESSION_KEY_TOKEN_CLAIMS],
                        restrict_location=True,
                    )
                except RestrictInvalidToken:
                    return redirect_to_authorize(request.url)  # type: ignore[no-any-return]
                except RestrictAlanEmployeeInvalid:
                    # This user is logged in using google auth, but not present in our DB
                    session.clear()
                    return get_response_unauthorized_access_employee()  # type: ignore[no-any-return,no-untyped-call]
            else:
                # Used in dev. We want `set_auth_globals_for_alaner_admin_user` to be filled with a user.
                # So let's take the current aws user
                user = get_authenticated_user_from_email(
                    f"{get_current_user()}@alan.eu",
                )

            set_auth_globals_for_alaner_admin_user(user)

        return f(*args, **kwargs)

    return decorated

make_token_for_credentials

make_token_for_credentials(
    credentials, restrict_location=True
)

Returns a JWT token from the given google credentials, returns None if the employee is not found

Uses the credentials token to query user info url (needs the userinfo.email scope), then loads the alan employee and user for return.

Raises HTTPError: 401 Client Error: Unauthorized for url if an invalid credential token is provided. This error is recoverable (restart the oauth cinematic).

Raises RestrictAlanEmployeeInvalid: if the alan employee is not found or invalid. This error is NOT recoverable (show 403).

Source code in shared/iam/google_auth.py
def make_token_for_credentials(
    credentials: "Credentials", restrict_location: bool = True
) -> str:
    """
    Returns a JWT token from the given google credentials, returns None if the employee is not found

    Uses the credentials token to query user info url (needs the userinfo.email scope),
    then loads the alan employee and user for return.

    Raises HTTPError: `401 Client Error: Unauthorized for url` if an invalid credential token is provided. This
    error is recoverable (restart the oauth cinematic).

    Raises RestrictAlanEmployeeInvalid: if the alan employee is not found or invalid. This error is NOT
    recoverable (show 403).
    """
    import requests

    response = requests.get(
        GOOGLE_OAUTH_USER_INFO_URL,
        headers={"Authorization": f"Bearer {credentials.token}"},
        timeout=60,
    )
    # Can raise HTTPError (the google credentials are invalid or expired)
    response.raise_for_status()
    response_json = response.json()
    email = response_json["email"]
    # Can raise RestrictAlanEmployeeInvalid
    user = get_authenticated_user_from_email(email, restrict_location=restrict_location)
    return make_token_for_id(user.id)

redirect_to_authorize

redirect_to_authorize(url)

Returns the redirect to the authorize URL and add the next location to the session. The user will be redirected back to the given url at the end of the authentication cinematic. Most of the time, request.url is sufficient for the url value.

Source code in shared/iam/google_auth.py
def redirect_to_authorize(url: str):  # type: ignore[no-untyped-def]
    """
    Returns the redirect to the authorize URL and add the next location to the session. The user
    will be redirected back to the given `url` at the end of the authentication cinematic. Most of
    the time, `request.url` is sufficient for the `url` value.
    """
    if SESSION_KEY_STATE in session:
        del session[SESSION_KEY_STATE]
    session[SESSION_KEY_NEXT_LOCATION] = url
    return redirect("/oauth/authorize")

shared.iam.headers

ALANER_ADMIN_HEADER module-attribute

ALANER_ADMIN_HEADER = 'X-ALAN-SUPERADMIN'

AUTHENTICATION_METHOD_HEADER module-attribute

AUTHENTICATION_METHOD_HEADER = 'X-APP-AUTH'
BACKEND_DEEPLINK_HEADER = 'X-ALAN-BACKEND-DEEPLINK'

IMPERSONATED_USER_HEADER module-attribute

IMPERSONATED_USER_HEADER = 'X-ALAN-IMPERSONATED-USER'

IMPERSONATION_MODE_HEADER module-attribute

IMPERSONATION_MODE_HEADER = 'X-ALAN-IMPERSONATION-MODE'

InvalidAccessToken

Bases: Exception

SESSION_KEY_GOOGLE_TOKEN module-attribute

SESSION_KEY_GOOGLE_TOKEN = 'googleToken'

SESSION_KEY_NEXT_LOCATION module-attribute

SESSION_KEY_NEXT_LOCATION = 'adminNextLocation'

SESSION_KEY_STATE module-attribute

SESSION_KEY_STATE = 'state'

SESSION_KEY_TOKEN_CLAIMS module-attribute

SESSION_KEY_TOKEN_CLAIMS = 'adminToken'

claims_to_be_alaner_admin

claims_to_be_alaner_admin()
Source code in shared/iam/headers.py
def claims_to_be_alaner_admin():  # type: ignore[no-untyped-def]
    return (
        ALANER_ADMIN_HEADER in request.headers
        or IMPERSONATED_USER_HEADER in request.headers
        or ("admin_token" in request.args and request.method == "GET")
    )

get_access_token

get_access_token()
Source code in shared/iam/headers.py
def get_access_token() -> str:
    auth_header = flask.request.headers.get("Authorization")
    if not auth_header:
        raise InvalidAccessToken("Access token is missing from request")
    bearer_token_re = r"^Bearer (?P<access_token>.+)"
    match = re.search(bearer_token_re, auth_header)
    if not match:
        raise InvalidAccessToken("Access token is missing from request")
    token = match.groupdict().get("access_token")
    return token  # type: ignore[no-any-return]

shared.iam.helpers

ImpersonationMode

Bases: Enum

read_only class-attribute instance-attribute

read_only = 'read-only'

read_write class-attribute instance-attribute

read_write = 'read-write'

service_account_delegation class-attribute instance-attribute

service_account_delegation = 'service-account-delegation'

A special type of impersonation used by the API Proxy system. Refer to the following documentation for more information:

Note that, under this impersonation mode, X-ALAN-IMPERSONATED-USER will contain an Alaner email instead of the user ID.

block_request_if_user_blocked

block_request_if_user_blocked(user)

Block request if user is registered as blocked in Datadog.

Will raise a 403 Forbidden error if user is blocked.

Blocked users can be viewed in Datadog: https://app.datadoghq.eu/security/appsec/denylist ⧉

Source code in shared/iam/helpers.py
def block_request_if_user_blocked(user: "Authenticatable | None") -> None:
    """
    Block request if user is registered as blocked in Datadog.

    Will raise a 403 Forbidden error if user is blocked.

    Blocked users can be viewed in Datadog: https://app.datadoghq.eu/security/appsec/denylist
    """
    from ddtrace.appsec.trace_utils import (  # type: ignore[attr-defined]
        block_request,
        should_block_user,
    )

    if user:
        block = False

        try:
            block = should_block_user(tracer, str(user.id))
        except Exception as exc:
            current_app.logger.warning(
                "Unable to check if user is blocked", exc_info=exc
            )

        if block:
            block_request()

can_actor_bypass_permissions

can_actor_bypass_permissions()
Source code in shared/iam/helpers.py
def can_actor_bypass_permissions() -> bool:
    # Ignore permissions when on a development environment (with OAuth disabled,
    # running on a "safe" well-known network.

    return not is_oauth_enabled() and is_from_authorized_location()

current_auth_context module-attribute

current_auth_context = cast(
    "AuthContext", LocalProxy(_get_current_auth_context)
)
delete_cookie(name, response)
Source code in shared/iam/helpers.py
def delete_cookie(name: str, response: Response) -> None:
    response.set_cookie(name, "", expires=0)

get_user_cls

get_user_cls()
Source code in shared/iam/helpers.py
def get_user_cls() -> type["Authenticatable"]:
    return get_current_class(BaseRefreshToken).user_cls()

is_alaner_admin

is_alaner_admin()
Source code in shared/iam/helpers.py
def is_alaner_admin() -> bool:
    return getattr(g, "alaner_admin", False)

is_impersonated

is_impersonated()
Source code in shared/iam/helpers.py
def is_impersonated() -> bool:
    return getattr(g, "impersonation_mode", None) is not None

is_oauth_enabled

is_oauth_enabled()
Source code in shared/iam/helpers.py
def is_oauth_enabled() -> bool:
    admin_oauth_enabled = current_config.get("ADMIN_OAUTH_ENABLED")
    return False if admin_oauth_enabled is None else bool(admin_oauth_enabled)

reset_auth_context

reset_auth_context()
Source code in shared/iam/helpers.py
def reset_auth_context() -> AuthContext:
    return _do_set_auth_context(AuthContext())

reset_auth_globals

reset_auth_globals()
Source code in shared/iam/helpers.py
def reset_auth_globals() -> None:
    g.actor = None
    g.actor_id = None
    g.current_user = None
    g.current_user_id = None
    g.impersonation_mode = None
    g.alaner_admin = False
    g.session_id = None
    g.alan_employee_email = None

restrict_to_alaner_admin

restrict_to_alaner_admin(f)

Restrict endpoints reserved for super admins.

This should be kept for endpoints that expect only super admins.

Source code in shared/iam/helpers.py
def restrict_to_alaner_admin(f):  # type: ignore[no-untyped-def]
    """Restrict endpoints reserved for super admins.

    This should be kept for endpoints that expect only super admins.
    """

    @wraps(f)
    def decorated_function(*args, **kwargs):  # type: ignore[no-untyped-def]
        if not is_alaner_admin():
            raise RestrictAlanerAdminExceptionNotFound()
        return f(*args, **kwargs)

    return decorated_function

restrict_to_local_machine

restrict_to_local_machine(f)

Restrict endpoints to localhost only.

Source code in shared/iam/helpers.py
def restrict_to_local_machine(f):  # type: ignore[no-untyped-def]
    """Restrict endpoints to localhost only."""

    @wraps(f)
    def decorated_function(*args, **kwargs):  # type: ignore[no-untyped-def]
        remote_addr = get_remote_addr(request)
        if not is_from_local_machine(remote_addr):
            raise RestrictToWhitelistedIpsNotFound()
        return f(*args, **kwargs)

    return decorated_function

restrict_to_privileged_request

restrict_to_privileged_request(f)
Source code in shared/iam/helpers.py
def restrict_to_privileged_request(f):  # type: ignore[no-untyped-def]
    @wraps(f)
    def decorated(*args, **kwargs):  # type: ignore[no-untyped-def]
        audience = current_config.get("ZEROTRUST_JWT_AUDIENCE")
        if (
            not is_authorized_by_zerotrust(audience)
            and not is_from_authorized_location()
        ):
            raise RestrictToWhitelistedIpsNotFound()

        return f(*args, **kwargs)

    return decorated

set_auth_context

set_auth_context(
    real_principal,
    effective_principal=None,
    impersonation_mode=None,
    session_id=None,
    cloudflare_zerotrust_identity=None,
)
Source code in shared/iam/helpers.py
def set_auth_context(
    real_principal: AuthPrincipal,
    effective_principal: AuthPrincipal | None = None,
    impersonation_mode: ImpersonationMode | None = None,
    session_id: uuid.UUID | None = None,
    cloudflare_zerotrust_identity: dict[str, Any] | None = None,
) -> AuthContext:
    # Auto-extract cloudflare request metadata when in request context
    cloudflare_ray_id = None
    cloudflare_ip_country = None
    if has_request_context():
        cloudflare_ray_id = get_cloudflare_ray_id()
        cloudflare_ip_country = get_cloudflare_ip_country()

    auth_context = AuthContext(
        _real_principal=real_principal,
        _effective_principal=effective_principal or real_principal,
        impersonation_mode=impersonation_mode,
        session_id=session_id,
        cloudflare_zerotrust_identity=cloudflare_zerotrust_identity,
        cloudflare_ray_id=cloudflare_ray_id,
        cloudflare_ip_country=cloudflare_ip_country,
    )
    return _do_set_auth_context(auth_context)

set_auth_context_from_dict

set_auth_context_from_dict(serialized_values)
Source code in shared/iam/helpers.py
@contextmanager
def set_auth_context_from_dict(
    serialized_values: dict[str, Any] | None,
) -> Generator[None, None, None]:
    try:
        _do_set_auth_context(AuthContext.from_dict(serialized_values))
        yield
    finally:
        reset_auth_context()

set_auth_globals_for_alaner_admin_user

set_auth_globals_for_alaner_admin_user(user)
Source code in shared/iam/helpers.py
def set_auth_globals_for_alaner_admin_user(user: "Authenticatable | None") -> None:
    g.actor = user
    g.current_user = user
    # Can be None when oauth is disabled (e.g. dev)
    g.actor_id = user.id if user else None
    g.current_user_id = user.id if user else None
    g.alaner_admin = True

    alan_employee = getattr(user, "alan_employee", None) if user else None
    alan_employee_email = alan_employee.alan_email if alan_employee else None
    g.alan_employee_email = alan_employee_email

    try:
        user_id: str | None | UUID | int = g.current_user_id
        if isinstance(user_id, UUID):
            # Explicitly serialize as string to avoid sending "UUID('43338094-9618-40b9-b74c-6bbacf085cb9')" as the
            # user id (but instead send "43338094-9618-40b9-b74c-6bbacf085cb9").
            user_id = str(user_id)

        tag_current_request_root_span({"user.id": user_id})
        set_alan_admin_user(
            user_id=str(user_id),
            email=alan_employee_email,
        )
        sentry_sdk.set_user({"id": user_id, "ip": "{{auto}}"})
        sentry_sdk.set_tag("alaner_admin", True)
    except Exception as exc:
        current_app.logger.warning(
            "Unable to set user metadata on dd traces", exc_info=exc
        )

set_auth_globals_for_impersonated_user

set_auth_globals_for_impersonated_user(
    actor,
    impersonated_user,
    impersonation_mode=ImpersonationMode.read_only,
)
Source code in shared/iam/helpers.py
def set_auth_globals_for_impersonated_user(  # type: ignore[no-untyped-def]
    actor,
    impersonated_user,
    impersonation_mode=ImpersonationMode.read_only,
) -> None:
    if impersonation_mode == ImpersonationMode.service_account_delegation:
        # In service account delegation, the impersonated user is the actual actor.
        # See here for details: https://www.notion.so/alaninsurance/Regular-case-Delegation-Impersonation-taxonomy-1bd1426e8be78026ba0fedc04475425b?pvs=4
        g.actor = impersonated_user
        g.actor_id = impersonated_user.id
        g.alaner_admin = True
    else:
        g.actor = actor
        # Can be None when oauth is disabled (e.g. dev)
        g.actor_id = actor.id if actor else None
    g.current_user = impersonated_user
    g.current_user_id = impersonated_user.id
    g.impersonation_mode = impersonation_mode

    alan_employee = getattr(actor, "alan_employee", None) if actor else None
    alan_employee_email = alan_employee.alan_email if alan_employee else None
    g.alan_employee_email = alan_employee_email

    try:
        if hasattr(impersonated_user, "profile_id"):
            from components.global_profile.public.api import (  # noqa: ALN002
                ProfileService,
            )

            profile = ProfileService.create().get_profile(impersonated_user.profile_id)
            impersonated_user_email = profile.email if profile else None
        else:
            impersonated_user_email = impersonated_user.email

        tag_current_request_root_span(
            {"user.id": g.current_user_id, "actor.id": g.actor_id}
        )
        set_impersonated_member_user(
            user_id=str(g.current_user_id),
            email=impersonated_user_email,
            actor_id=str(g.actor_id),
            actor_email=alan_employee_email,
        )
        sentry_sdk.set_user({"id": g.current_user_id, "ip": "{{auto}}"})
        sentry_sdk.set_tag("alaner_admin", True)
    except Exception as exc:
        current_app.logger.warning(
            "Unable to set user metadata on dd traces", exc_info=exc
        )

set_auth_globals_for_regular_user

set_auth_globals_for_regular_user(
    user, session_id=None, set_actor=True
)
Source code in shared/iam/helpers.py
def set_auth_globals_for_regular_user(  # type: ignore[no-untyped-def]
    user, session_id=None, set_actor: bool = True
) -> None:
    g.actor = user
    g.current_user = user
    g.actor_id = user.id if user and set_actor else None
    g.current_user_id = user.id if user else None
    g.session_id = session_id

    if not user:
        return

    try:
        # TODO(Global Architecture): Cannot call profile service here, as it would slow down the request
        set_member_user(
            user_id=str(g.current_user_id),
            email=user.email,
        )
        sentry_sdk.set_user({"id": g.current_user_id, "ip": "{{auto}}"})
        sentry_sdk.set_tag("alaner_admin", False)
    except Exception as exc:
        current_app.logger.debug(
            "Unable to set user metadata on dd traces and sentry", exc_info=exc
        )

set_auth_globals_from_saved_values

set_auth_globals_from_saved_values(
    actor_id, current_user_id=None, impersonation_mode=None
)
Source code in shared/iam/helpers.py
@contextmanager
def set_auth_globals_from_saved_values(  # type: ignore[no-untyped-def]
    actor_id: "Authenticatable | None",
    current_user_id: "Authenticatable | None" = None,
    impersonation_mode: ImpersonationMode | None = None,
):
    try:
        user_cls = get_user_cls()
        user = (
            current_session.get(user_cls, current_user_id) if current_user_id else None
        )
        # Can be None when oauth is disabled (e.g. dev)
        actor = current_session.get(user_cls, actor_id) if actor_id else None

        if impersonation_mode is not None:
            set_auth_globals_for_impersonated_user(actor, user, impersonation_mode)
        else:
            set_auth_globals_for_regular_user(user)

    except Exception:  # noqa: S110
        # Fails on eu-tools which is not using user_cls / RefreshToken / etc.
        pass

    try:
        yield
    finally:
        reset_auth_globals()

shared.iam.http_token_auth

HTTPTokenAuth

HTTPTokenAuth(verify_token_callback, scheme='Bearer')

Bases: HTTPTokenAuth

Source code in shared/iam/http_token_auth.py
def __init__(self, verify_token_callback, scheme: str = "Bearer") -> None:  # type: ignore[no-untyped-def]
    super().__init__(scheme)

    self.verify_token_callback = verify_token_callback
    self.error_handler(self._custom_error_handler)

verify_token_callback instance-attribute

verify_token_callback = verify_token_callback

shared.iam.keycloak

KeycloakAdminWithErrorHandling

Bases: KeycloakAdmin

Overrides KeycloakAdmin methods to handle errors. The admin client raises exceptions after fetching the server using a generic function So we can't use the decorator directly on core methods for fetching data, we need to override each high-level method we use.

create_user

create_user(payload, exist_ok=False)
Source code in shared/iam/keycloak.py
@with_keycloak_error_handling(args_to_log={"payload"})  # type: ignore[misc]
def create_user(self, payload, exist_ok: bool = False):  # type: ignore[no-untyped-def]
    return super().create_user(payload, exist_ok)

get_user

get_user(user_id)
Source code in shared/iam/keycloak.py
@with_keycloak_error_handling(args_to_log={"user_id"})
def get_user(self, user_id):  # type: ignore[no-untyped-def]
    return super().get_user(user_id)

update_user

update_user(user_id, payload)
Source code in shared/iam/keycloak.py
@with_keycloak_error_handling(args_to_log={"user_id", "payload"})
def update_user(self, user_id, payload):  # type: ignore[no-untyped-def]
    return super().update_user(user_id, payload)

KeycloakBusinessError

KeycloakBusinessError(code)

Bases: BaseErrorCode

These errors are expected to occur due to user actions in normal application operations, and do not constitute technical errors requiring specific attention/remediation by Alan Eng. HTTP_ERRORS: list of Keycloak server http statuses that we can map as Keycloak Business Error

These errors: - don't trigger Sentry. See: https://github.com/alan-eu/alan-apps/blob/538fb56cf374fc1a0719668e0d70e5359edbde91/backend/shared/helpers/sentry/setup.py#L71 ⧉ - Are caught by our Flask controllers => views will return an HTTP 4XX (and not a 500), see https://github.com/alan-eu/alan-apps/blob/538fb56cf374fc1a0719668e0d70e5359edbde91/backend/shared/api/helpers.py#L132 ⧉ - NB: You can pass http_code=<your_code> to additional_args to change the HTTP status code returned to the client.

Source code in shared/iam/keycloak.py
def __init__(self, code: int) -> None:
    super().__init__(
        code,
        "authentication",
        "Keycloak business error",
    )

HTTP_ERRORS class-attribute instance-attribute

HTTP_ERRORS = [NOT_FOUND, CONFLICT]

component class-attribute instance-attribute

component = 'iam'

KeycloakUser

KeycloakUser(id, email)
Source code in shared/iam/keycloak.py
def __init__(self, id: UUID, email: str) -> None:
    self.id = id
    self.email = email

email instance-attribute

email = email

id instance-attribute

id = id

build_login_url

build_login_url(client_id, email)

Build a login URL depending on client ID. It uses either the front end URL or deep link URL as base URL. :param client_id: a str representing the Keycloak client :param email: a str representing the email address for login :return: the login URL

Source code in shared/iam/keycloak.py
def build_login_url(client_id: str, email: str) -> str:
    """
    Build a login URL depending on client ID. It uses either the front end URL or deep link URL as base URL.
    :param client_id: a str representing the Keycloak client
    :param email: a str representing the email address for login
    :return: the login URL
    """

    # Mobile app Keycloak clients follow this pattern: alan-mobile-[environment]
    build_method = (
        current_app.front_end_url.build_deep_link  # type: ignore[attr-defined]
        if "alan-mobile" in client_id
        else current_app.front_end_url.build_url  # type: ignore[attr-defined]
    )
    return build_method(key="LOGIN_URL", query_args={"email": email})  # type: ignore[no-any-return]

get_admin_client

get_admin_client(realm_name=None)
Source code in shared/iam/keycloak.py
@retry(delay=1, max_retries=5, retry_on=KeycloakConnectionError)
def get_admin_client(
    realm_name: str | None = None,
) -> KeycloakAdminWithErrorHandling:
    # The SDK makes a call to keycloak when being instantiated, so we want to
    # instantiate only once by request.
    # https://github.com/alan-eu/alan-backend/pull/39327
    client = g.get("keycloak_admin_client")

    if client:
        return client  # type: ignore[no-any-return]

    if realm_name is None:
        realm_name = current_config.get("KEYCLOAK_REALM")

    timeout = current_config.get("KEYCLOAK_ADMIN_TIMEOUT")

    zerotrust_secrets = {}
    try:
        zerotrust_secrets = cast(
            "dict",  # type: ignore[type-arg]
            json_secret_from_config("ZERO_TRUST_SERVICE_SECRET_NAME", {}),
        )
    except RuntimeError as e:
        current_logger.warning(
            "cannot set ZERO_TRUST_SERVICE_SECRET_NAME, using empty dict as headers",
            error=str(e),
        )

    connection = KeycloakOpenIDConnection(
        server_url=current_config.get("KEYCLOAK_HOST"),
        user_realm_name=realm_name,
        realm_name=realm_name,
        client_id=current_config.get("KEYCLOAK_ADMIN_CLIENT_ID"),
        client_secret_key=raw_secret_from_config(
            "KEYCLOAK_ADMIN_CLIENT_SECRET_KEY_NAME",
            default_secret_value=current_config.get("KEYCLOAK_ADMIN_CLIENT_SECRET_KEY"),
        ),
        verify=True,
        timeout=timeout,
        custom_headers={
            **zerotrust_secrets,
        },
    )
    client = KeycloakAdminWithErrorHandling(connection=connection)

    g.keycloak_admin_client = client

    return client

get_user_from_access_token

get_user_from_access_token(access_token)
Source code in shared/iam/keycloak.py
def get_user_from_access_token(access_token: str) -> KeycloakUser | None:
    keycloak_openid = KeycloakOpenID(
        server_url=current_config.get("KEYCLOAK_HOST"),
        realm_name=current_config.get("KEYCLOAK_REALM"),
        client_id="",  # Not needed for this call
    )
    try:
        userinfo = keycloak_openid.userinfo(access_token)

        return KeycloakUser(
            id=userinfo["sub"],
            email=userinfo["email"],
        )
    except KeycloakAuthenticationError as e:
        current_logger.exception(f"Keycloak authentication error: {e}")
        return None

with_keycloak_error_handling

with_keycloak_error_handling(args_to_log=None)
Source code in shared/iam/keycloak.py
def with_keycloak_error_handling(args_to_log: set[str] | None = None):  # type: ignore[no-untyped-def]
    def decorator(func):  # type: ignore[no-untyped-def]
        """
        A decorator function to handle Keycloak errors:

        It raises a KeycloakBusinessError if the error is in the IGNORED_KEYCLOAK_ERRORS list.
        These are considered business errors, not server errors, and thus the request is stopped but not logged in Sentry.

        Args:
            - args_to_log (set[str], optional): A set of arguments to log in case of a business error. Defaults to None.
        """

        @wraps(func)
        def wrapper(*args, **kwargs):  # type: ignore[no-untyped-def]
            try:
                return func(*args, **kwargs)
            except KeycloakOperationError as error:
                if error.response_code in KeycloakBusinessError.HTTP_ERRORS:
                    logger_args = {
                        key: value
                        for key, value in get_args_from(func, args, kwargs).items()
                        if key in (args_to_log or set())
                    }
                    current_logger.warning(f"Keycloak error {error}", **logger_args)
                    raise KeycloakBusinessError(error.response_code)
                raise error

        return wrapper

    return decorator

shared.iam.login_form_auth

LoginFormAuth

LoginFormAuth(verify_password_callback)

Inspired by flask_httpauth (https://github.com/miguelgrinberg/Flask-HTTPAuth ⧉).

Source code in shared/iam/login_form_auth.py
def __init__(self, verify_password_callback) -> None:  # type: ignore[no-untyped-def]
    self.verify_password_callback = verify_password_callback
    self.error_payload = None
    self.error_handler(self.custom_error_handler)  # type: ignore[no-untyped-call]

authenticate

authenticate(auth)
Source code in shared/iam/login_form_auth.py
def authenticate(self, auth):  # type: ignore[no-untyped-def]
    if auth is None:
        return False
    email, prehashed_password = (
        auth["email"],
        auth["prehashed_password"],
    )
    if self.verify_password_callback:
        result, errors = self.verify_password_callback(email, prehashed_password)
        if not result:
            self.error_payload = errors
        return result
    return False

custom_error_handler

custom_error_handler()
Source code in shared/iam/login_form_auth.py
def custom_error_handler(self):  # type: ignore[no-untyped-def]
    if self.error_payload:
        return make_response(jsonify(**self.error_payload), 401, None)  # type: ignore[unreachable]
    return '{"error": "Unauthorized Access"}'

error_handler

error_handler(f)
Source code in shared/iam/login_form_auth.py
def error_handler(self, f):  # type: ignore[no-untyped-def]
    @wraps(f)
    def decorated(*args, **kwargs):  # type: ignore[no-untyped-def]
        res = f(*args, **kwargs)
        res = make_response(res)
        if 200 <= res.status_code < 300:
            res.status_code = 401
        return res

    self.auth_error_callback = decorated
    return decorated

error_payload instance-attribute

error_payload = None

login_required

login_required(f)
Source code in shared/iam/login_form_auth.py
def login_required(self, f):  # type: ignore[no-untyped-def]
    return self._login_required(f)

login_required_mind

login_required_mind(f)
Source code in shared/iam/login_form_auth.py
def login_required_mind(self, f):  # type: ignore[no-untyped-def]
    return self._login_required(f, allow_return_data=True)

schema class-attribute instance-attribute

schema = LoginFormAuthSchema()

verify_password_callback instance-attribute

verify_password_callback = verify_password_callback

LoginFormAuthSchema

Bases: Schema

Schema for login form authentication arguments.

email class-attribute instance-attribute

email = Str(
    required=True,
    error_messages={"required": "Email of the user"},
)

password class-attribute instance-attribute

password = Str()

prehashed_password class-attribute instance-attribute

prehashed_password = Str(
    required=True,
    error_messages={
        "required": "Prehashed password of the user"
    },
)

refresh_token_type class-attribute instance-attribute

refresh_token_type = Str()

shared.iam.monitoring

KEYCLOAK_ALAN_REALM_ID module-attribute

KEYCLOAK_ALAN_REALM_ID = (
    "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64"
)

TrackedKeycloakEventType

Bases: AlanBaseEnum

login_error class-attribute instance-attribute

login_error = 'LOGIN_ERROR'

login_success class-attribute instance-attribute

login_success = 'LOGIN'

update_credential class-attribute instance-attribute

update_credential = 'UPDATE_CREDENTIAL'

update_credential_error class-attribute instance-attribute

update_credential_error = 'UPDATE_CREDENTIAL_ERROR'

user_disabled_by_temporary_lockout class-attribute instance-attribute

user_disabled_by_temporary_lockout = (
    "USER_DISABLED_BY_TEMPORARY_LOCKOUT"
)

track_keycloak_event_in_datadog

track_keycloak_event_in_datadog(event)

Track Keycloak events in our API as we don't have a built-in provider for Keycloak. Tracked: Login success / Login failure

Example of events: 1. Login success: { "time": 1734544774538, "type": "LOGIN", "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64", "clientId": "fr-web-dev", "userId": "abc", "sessionId": "def", "ipAddress": "1.1.1.1", "details": { "auth_method": "openid-connect", "token_id": "ghi", "grant_type": "password", "refresh_token_type": "Refresh", "scope": "profile email openid", "refresh_token_id": "mno", "client_auth_method": "client-secret", "username": "toto@alan.eu" } }

  1. Login failure: { "time": 1734546787082, "type": "LOGIN_ERROR", "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64", "clientId": "fr-web-dev", "ipAddress": "1.1.1.1", "error": "user_not_found", "details": { "auth_method": "openid-connect", "grant_type": "password", "client_auth_method": "client-secret", "username": "toto@alan.eu" } }

  2. Update credential / password: { "time": 1734548799123, "type": "UPDATE_CREDENTIAL", "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64", "clientId": "fr-web-dev", "userId": "abc", "ipAddress": "1.1.1.1", "details": { "auth_method": "openid-connect", "code_id": "def", "credential_type": "password", "custom_required_action": "UPDATE_PASSWORD", "redirect_uri": "http://localhost:4001/login?email=toto%40alan.eu ⧉", "remember_me": "false", "response_mode": "query", "response_type": "code", "username": "toto@alan.eu" } }

  3. Update credential / password error: { "time": 1734548799123, "type": "UPDATE_CREDENTIAL_ERROR", "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64", "clientId": "fr-web-dev", "userId": "abc", "ipAddress": "1.1.1.1", "error": "password_missing", "details": { "auth_method": "openid-connect", "code_id": "def", "credential_type": "password", "custom_required_action": "UPDATE_PASSWORD", "redirect_uri": "http://localhost:4001/login?email=toto%40alan.eu ⧉", "remember_me": "false", "response_mode": "query", "response_type": "code", "username": "toto@alan.eu" } }

  4. User disabled by temporary lockout:

{ "type": "USER_DISABLED_BY_TEMPORARY_LOCKOUT", "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64", "realmName": "alan", "clientId": "null", "userId": "9e23c536-c22f-45c5-8c0d-bc985cb7cd9f", "ipAddress": "2a0d:e487:138f:3f9e:8e:d53c:4016:f0cb", "reason": "brute_force_attack detected", "not_before": "2025-10-02T13:06:34", "num_failures": "5" }

If Keycloak events' schema changes, let this function fail, so we can update it.

Source code in shared/iam/monitoring.py
def track_keycloak_event_in_datadog(event: dict[str, Any]) -> None:
    """
    Track Keycloak events in our API as we don't have a built-in provider for Keycloak.
    Tracked: Login success / Login failure

    Example of events:
    1. Login success:
    {
      "time": 1734544774538,
      "type": "LOGIN",
      "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64",
      "clientId": "fr-web-dev",
      "userId": "abc",
      "sessionId": "def",
      "ipAddress": "1.1.1.1",
      "details": {
        "auth_method": "openid-connect",
        "token_id": "ghi",
        "grant_type": "password",
        "refresh_token_type": "Refresh",
        "scope": "profile email openid",
        "refresh_token_id": "mno",
        "client_auth_method": "client-secret",
        "username": "toto@alan.eu"
      }
    }

    2. Login failure:
    {
      "time": 1734546787082,
      "type": "LOGIN_ERROR",
      "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64",
      "clientId": "fr-web-dev",
      "ipAddress": "1.1.1.1",
      "error": "user_not_found",
      "details": {
        "auth_method": "openid-connect",
        "grant_type": "password",
        "client_auth_method": "client-secret",
        "username": "toto@alan.eu"
      }
    }

    3. Update credential / password:
    {
      "time": 1734548799123,
      "type": "UPDATE_CREDENTIAL",
      "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64",
      "clientId": "fr-web-dev",
      "userId": "abc",
      "ipAddress": "1.1.1.1",
      "details": {
        "auth_method": "openid-connect",
        "code_id": "def",
        "credential_type": "password",
        "custom_required_action": "UPDATE_PASSWORD",
        "redirect_uri": "http://localhost:4001/login?email=toto%40alan.eu",
        "remember_me": "false",
        "response_mode": "query",
        "response_type": "code",
        "username": "toto@alan.eu"
      }
    }

    4. Update credential / password error:
    {
      "time": 1734548799123,
      "type": "UPDATE_CREDENTIAL_ERROR",
      "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64",
      "clientId": "fr-web-dev",
      "userId": "abc",
      "ipAddress": "1.1.1.1",
      "error": "password_missing",
      "details": {
        "auth_method": "openid-connect",
        "code_id": "def",
        "credential_type": "password",
        "custom_required_action": "UPDATE_PASSWORD",
        "redirect_uri": "http://localhost:4001/login?email=toto%40alan.eu",
        "remember_me": "false",
        "response_mode": "query",
        "response_type": "code",
        "username": "toto@alan.eu"
      }
    }

    5. User disabled by temporary lockout:

    {
      "type": "USER_DISABLED_BY_TEMPORARY_LOCKOUT",
      "realmId": "e5c69b4c-7d1a-4fc1-bf3f-e90d812fee64",
      "realmName": "alan",
      "clientId": "null",
      "userId": "9e23c536-c22f-45c5-8c0d-bc985cb7cd9f",
      "ipAddress": "2a0d:e487:138f:3f9e:8e:d53c:4016:f0cb",
      "reason": "brute_force_attack detected",
      "not_before": "2025-10-02T13:06:34",
      "num_failures": "5"
    }

    If Keycloak events' schema changes, let this function fail, so we can update it.
    """

    from ddtrace.appsec import track_user_sdk
    from ddtrace.trace import tracer

    if event.get("realmId") != KEYCLOAK_ALAN_REALM_ID:
        return

    if not TrackedKeycloakEventType.has_value(event.get("type")):
        return

    current_logger.info("Tracking Keycloak event in Datadog", keycloak_event=event)

    span = tracer.current_root_span()

    if not span:
        current_logger.warning("Working outside of a span, can't track Keycloak event")
        return

    span.service = KEYCLOAK_DD_SERVICE
    span.set_tag("network.client.ip", event["ipAddress"])
    span.set_tag("http.client_ip", event["ipAddress"])
    # I didn't manage to edit the `date` attribute nor the resource of the span - it's OK for now

    user_id = event.get("userId")
    email = event["details"].get("username")

    if user_id:
        set_member_user(user_id=user_id, email=email)

    match event["type"]:
        case TrackedKeycloakEventType.login_success:
            metadata = {
                "usr.email": email,
                "clientId": event["clientId"],
                "sessionId": event["sessionId"],
                **event["details"],
            }
            track_user_sdk.track_login_success(
                mandatory(email),
                user_id=mandatory(user_id),
                metadata=metadata,
            )
        case TrackedKeycloakEventType.login_error:
            metadata = {
                "usr.email": email,
                "clientId": event["clientId"],
                "error": event["error"],
                **event["details"],
            }
            track_user_sdk.track_login_failure(
                email or "",
                exists=bool(user_id),
                user_id=user_id,
                metadata=metadata,
            )
        case TrackedKeycloakEventType.update_credential:
            metadata = {
                "usr.id": user_id,
                "usr.email": email,
                "usr.login": email,
                "clientId": event["clientId"],
                **event["details"],
            }
            if event["details"].get("credential_type") == "password":
                track_user_sdk.track_custom_event(
                    DatadogEventName.PASSWORD_CHANGE_SUCCESS.value,
                    metadata=metadata,
                )
        case TrackedKeycloakEventType.update_credential_error:
            metadata = {
                "usr.id": user_id,
                "usr.email": email,
                "usr.login": email,
                "error": event["error"],
                "clientId": event["clientId"],
                **event["details"],
            }
            if event["details"].get("credential_type") == "password":
                track_user_sdk.track_custom_event(
                    DatadogEventName.PASSWORD_CHANGE_FAILURE.value,
                    metadata=metadata,
                )
        case TrackedKeycloakEventType.user_disabled_by_temporary_lockout:
            metadata = {
                "usr.id": user_id,
                **event["details"],
            }
            track_user_sdk.track_custom_event(
                DatadogEventName.USER_DISABLED_BY_TEMPORARY_LOCKOUT.value,
                metadata=metadata,
            )

shared.iam.openid_connect

AsymmetricKeyTargetUsage

Bases: AlanBaseEnum

encrypt class-attribute instance-attribute

encrypt = 'enc'

sign class-attribute instance-attribute

sign = 'sig'

ClaimBucket

Bases: AlanBaseEnum

id_token class-attribute instance-attribute

id_token = 'id_token'

userinfo class-attribute instance-attribute

userinfo = 'userinfo'

JWT_EXPIRATION_TIME module-attribute

JWT_EXPIRATION_TIME = 60 * 5

OpenIdConnectTokenExchangeResponse dataclass

OpenIdConnectTokenExchangeResponse(
    id_token,
    id_token_payload,
    access_token,
    token_type,
    expires_in,
)

Bases: DataClassJsonMixin

access_token instance-attribute

access_token

expires_in instance-attribute

expires_in

id_token instance-attribute

id_token

id_token_payload instance-attribute

id_token_payload

token_type instance-attribute

token_type

PKCE_CODE_CACHE_EXPIRATION_TIME module-attribute

PKCE_CODE_CACHE_EXPIRATION_TIME = 5 * 60

======================== OpenID Connect 1.0 Authorization Code flow ========================

Hard specs: https://openid.net/specs/openid-connect-core-1_0.html ⧉

Overview: { Identification Provider } { Human } { Alan Mobile } { Alan Backend } { Identification Provider } { Frontend } { Member } { App } { Server } { Backend } | | | | | | |>--1."login with"--| | | | | |>---2./get_authorization_url----| | |--------------------(redirected)-------------<|-3. url with signed claims req-<| | |>-4.Provider login prompt-| | | | |------5.Identifies-------<| | | | |>-------------6.Redirect w/ auth code------(pass)------------------------------| | | | | |>---7.signed token request---| | | | |-----8.ID+Access tokens-----<| | | | | | | | | | | |---------9."200 OK"------------<| | | |-----happiness----<| | | | | | | |

exchange_code_for_tokens

exchange_code_for_tokens(
    id_provider_root_url,
    client_id,
    auth_code,
    redirect_uri,
    state,
)

Exchange an authorization code for an ID token and an access token.

Source code in shared/iam/openid_connect.py
def exchange_code_for_tokens(
    id_provider_root_url: str,
    client_id: str,
    auth_code: str,
    redirect_uri: str,
    state: str,
) -> OpenIdConnectTokenExchangeResponse:
    """
    Exchange an authorization code for an ID token and an access token.
    """

    provider_config = _get_id_provider_configuration(id_provider_root_url)
    token_endpoint_url = provider_config["token_endpoint"]

    payload = {
        "iss": client_id,
        "sub": client_id,
        "aud": token_endpoint_url,
    }

    assertion_jwe = _sign_and_encrypt_nested_jwt(
        payload,
        _get_own_private_key(AsymmetricKeyTargetUsage.sign),
        _get_id_provider_public_key(
            provider_config["jwks_uri"], AsymmetricKeyTargetUsage.encrypt
        ),
    )

    redis, redis_not_faked = get_redis_main_connection()
    pkce_code_redis_key = _make_redis_pkce_cache_key(
        id_provider_root_url, client_id, state
    )
    pkce_code = (redis.get(pkce_code_redis_key) or b"").decode("ascii")  # type: ignore[union-attr]

    # prevent replay attacks
    redis.delete(pkce_code_redis_key)

    exchange_payload = {
        "grant_type": "authorization_code",
        "code": auth_code,
        "redirect_uri": redirect_uri,
        "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
        "client_assertion": assertion_jwe,
    }

    if redis_not_faked:
        exchange_payload["code_verifier"] = pkce_code

    # token exchanges often have a 1s cutoff on the provider side.
    # this time constraint can often be reached in scenarios where there is low feature usage,
    # the provider needs to fetch our public key set, and CDN caches are cold.
    # to augment our chances of success, we retry the request a few times with an exponential delay.
    retry_count = 0
    retry_delay_seconds = 0.1
    last_error = None
    while retry_count < 3:
        current_logger.info(
            f"Exchanging authorization code for tokens (attempt {retry_count})"
        )

        response = requests.post(
            token_endpoint_url,
            data=exchange_payload,
            timeout=60,
        )

        if response.status_code == 200:
            break

        response_json = response.json()
        error = response_json.get("error")
        detail = response_json.get("detail", "")

        # If we get an unauthorized_client error with "Jwt id already exists", we should stop retrying
        # as this indicates the code has already been used
        if (
            response.status_code == 400
            and error == "unauthorized_client"
            and "Jwt id already exists" in detail
        ):
            current_logger.error(
                f"Authorization code already used: {detail}. Stopping retries."
            )
            raise ValueError(f"Authorization code already used: {detail}")

        current_logger.warning(
            f"Token exchange failed (attempt {retry_count}): {response.status_code} {response.text}"
        )
        last_error = f"{response.status_code} {response.text}"

        sleep(retry_delay_seconds)  # give caches time to warm up
        retry_count += 1
        retry_delay_seconds = math.pow(1 + retry_delay_seconds, retry_count * 2) - 1

    if response.status_code != 200:
        raise ValueError(
            f"Failed to exchange auth code for tokens after {retry_count} attempts. Last error: {last_error}"
        )

    response_dict = response.json()

    id_token_payload = _decrypt_and_verify_nested_jwt(
        response_dict["id_token"],
        _get_own_private_key(AsymmetricKeyTargetUsage.encrypt),
        _get_id_provider_public_key(
            provider_config["jwks_uri"], AsymmetricKeyTargetUsage.sign
        ),
    )

    return OpenIdConnectTokenExchangeResponse(
        id_token=response_dict["id_token"],
        id_token_payload=id_token_payload,
        access_token=response_dict["access_token"],
        token_type=response_dict["token_type"],
        expires_in=int(response_dict.get("expires_in", response_dict["expire_in"])),
    )

get_authorization_url

get_authorization_url(
    id_provider_root_url,
    client_id,
    redirect_uri,
    scope,
    state,
    claims=None,
    ui_locales=None,
)

Build an authorization request URL, to be opened on the member's device to authenticate with the identification provider.

Source code in shared/iam/openid_connect.py
def get_authorization_url(
    id_provider_root_url: str,
    client_id: str,
    redirect_uri: str,
    scope: list[str],
    # pass a state (session identifier) to encode in the request, you need to validate it when catching the redirect
    # also used transparently for PKCE checks: https://tools.ietf.org/html/rfc7636#section-4.6
    state: str,
    # allow asking for specific claims to be returned in the id_token directly, without a subsequent userInfo request
    # cf. https://openid.net/specs/openid-connect-core-1_0.html#ClaimsParameter
    claims: dict[ClaimBucket, dict[str, None]] | None = None,
    ui_locales: list[str] | None = None,
) -> str:
    """
    Build an authorization request URL, to be opened on the member's device to authenticate with the identification provider.
    """

    provider_config = _get_id_provider_configuration(id_provider_root_url)
    authorization_endpoint_url = provider_config["authorization_endpoint"]

    # automatically prepend the 'openid' required scope
    scope = (["openid"] + scope) if "openid" not in scope else scope

    jwt_payload = {
        "iss": client_id,
        "aud": authorization_endpoint_url,
        "redirect_uri": redirect_uri,
        "state": state,
        "scope": " ".join(scope),
    }

    if claims:
        serialized_claims = json.dumps(claims, separators=(",", ":"))
        jwt_payload["claims"] = serialized_claims

    if ui_locales:
        jwt_payload["ui_locales"] = " ".join(ui_locales)

    # generate a urandom pkce code
    # store in Redis to find it back on token exchange with the same state
    redis, redis_not_faked = get_redis_main_connection()

    # disabled on acceptance
    if redis_not_faked:
        pkce_code_redis_key = _make_redis_pkce_cache_key(
            id_provider_root_url, client_id, state
        )

        pkce_code = ""
        previous_pkce_code = redis.get(pkce_code_redis_key)

        if previous_pkce_code:
            pkce_code = previous_pkce_code.decode("ascii")  # type: ignore[union-attr]
        else:
            pkce_code = secrets.token_hex(64)
            redis.setex(
                name=pkce_code_redis_key,
                value=pkce_code,
                time=PKCE_CODE_CACHE_EXPIRATION_TIME,
            )

        pkce_code_hashed = (
            urlsafe_b64encode(hashlib.sha256(pkce_code.encode("ascii")).digest())
            .decode("ascii")
            .rstrip("=")
        )

        jwt_payload.update(
            {
                "code_challenge": pkce_code_hashed,
                "code_challenge_method": "S256",
            }
        )

    jwe = _sign_and_encrypt_nested_jwt(
        jwt_payload,
        _get_own_private_key(AsymmetricKeyTargetUsage.sign),
        _get_id_provider_public_key(
            provider_config["jwks_uri"], AsymmetricKeyTargetUsage.encrypt
        ),
    )

    clear_payload = {
        "client_id": client_id,
        "response_type": "code",
        "redirect_uri": redirect_uri,
        "state": state,
        "scope": "openid",  # we pass more detailed scope in the JWT that supersedes this base value
        "request": jwe,
    }

    return f"{authorization_endpoint_url}?{urlencode(clear_payload)}"

get_userinfo

get_userinfo(id_provider_root_url, access_token)

Retrieve OpenID user info for the previously agreed claims.

Source code in shared/iam/openid_connect.py
def get_userinfo(
    id_provider_root_url: str,
    access_token: str,
) -> dict:  # type: ignore[type-arg]
    """
    Retrieve OpenID user info for the previously agreed claims.
    """

    provider_config = _get_id_provider_configuration(id_provider_root_url)
    userinfo_endpoint_url = provider_config["userinfo_endpoint"]

    response = requests.get(
        userinfo_endpoint_url,
        headers={"Authorization": f"Bearer {access_token}"},
        timeout=60,
    )

    if response.status_code != 200:
        raise ValueError(
            f"Failed to retrieve user info: {response.status_code} {response.text}"
        )

    jwe = response.text

    payload = _decrypt_and_verify_nested_jwt(
        jwe,
        _get_own_private_key(AsymmetricKeyTargetUsage.encrypt),
        _get_id_provider_public_key(
            provider_config["jwks_uri"], AsymmetricKeyTargetUsage.sign
        ),
    )

    return payload

shared.iam.passwords

This module is a shallow implementation of the frontend password strength estimation library zxcvbn. It provides functions to evaluate password strength and determine if a password meets a minimum strength requirement.

Source: frontend/shared/password-utils/index.ts

get_password_score

get_password_score(password)

Evaluate the strength of a password using zxcvbn.

Parameters:

Name Type Description Default
password str

The password to evaluate

required

Returns:

Name Type Description
int int

The strength score of the password (0-4)

Source code in shared/iam/passwords.py
def get_password_score(password: str) -> int:
    """
    Evaluate the strength of a password using zxcvbn.

    Args:
        password (str): The password to evaluate

    Returns:
        int: The strength score of the password (0-4)
    """
    result = zxcvbn(password)
    return cast("int", result.get("score"))

is_password_strong_enough

is_password_strong_enough(password)

Check if the password meets the minimum strength requirement.

Parameters:

Name Type Description Default
password str

The password to check

required
Source code in shared/iam/passwords.py
def is_password_strong_enough(password: str) -> bool:
    """
    Check if the password meets the minimum strength requirement.

    Args:
        password (str): The password to check
    """
    score = get_password_score(password)
    return score > 2

shared.iam.permissions

P module-attribute

P = ParamSpec('P')

T module-attribute

T = TypeVar('T')

alaner_admin_has_permission

alaner_admin_has_permission(
    requested_permissions, stack_level=2
)

Return True if current actor has at least one of the given permissions

Source code in shared/iam/permissions.py
def alaner_admin_has_permission(
    requested_permissions: EmployeePermission | set[EmployeePermission],
    stack_level: int = 2,
) -> bool:
    """Return True if current actor has at least one of the given permissions"""
    actor_permissions = get_actor_permissions()
    return permissions_intersects(
        requested_permissions,
        actor_permissions,
        _get_actor_safe(),  # type: ignore[no-untyped-call]
        stack_level + 1,
    )

get_actor_permissions

get_actor_permissions()

Return the permissions of the current actor.

WARNING, don't use this to check permissions. We have a strict policy, instead use alaner_admin_has_permission so that it is appropriately logged.

Source code in shared/iam/permissions.py
def get_actor_permissions() -> set[EmployeePermission]:
    """Return the permissions of the current actor.

    WARNING, don't use this to check permissions. We have a strict policy, instead use alaner_admin_has_permission so that it
    is appropriately logged.
    """
    if can_actor_bypass_permissions():
        return set(
            permission for role in EmployeeRole for permission in role.permissions
        )
    else:
        return _get_permissions(_get_actor_safe())  # type: ignore[no-untyped-call]

get_alan_employees_with_permission

get_alan_employees_with_permission(
    alan_employee_cls, permission
)
this should not be used to test whether a particular User has a given permission, because:
  • it's not efficient, as we loop over all users
  • it doesn't call permissions_intersects which tracks permission checks
Source code in shared/iam/permissions.py
def get_alan_employees_with_permission(
    alan_employee_cls: type["BaseAlanEmployee"],
    permission: "EmployeePermission",
) -> list["BaseAlanEmployee"]:
    """
    WARNING: this should not be used to test whether a particular User has a given permission, because:
     - it's not efficient, as we loop over all users
     - it doesn't call `permissions_intersects` which tracks permission checks
    """
    roles_with_permission = [
        role for role in EmployeeRole if permission in role.permissions
    ]
    if not roles_with_permission:
        raise RuntimeError(f"Could not find any role for permission {permission}")

    return (
        current_session.query(alan_employee_cls)  # noqa: ALN085
        .filter(
            alan_employee_cls.roles.overlap(roles_with_permission),
            or_(
                alan_employee_cls.end_date.is_(None),
                alan_employee_cls.end_date > utctoday(),
            ),
        )
        .all()
    )

get_roles

get_roles(user)
Source code in shared/iam/permissions.py
def get_roles(user) -> set[EmployeeRole]:  # type: ignore[no-untyped-def]
    if not user or not user.alan_employee:
        return set()

    return set(user.alan_employee.roles) or set()

has_permission

has_permission(user, requested_permissions, stack_level=2)

Return True if given user has at least one of the given permissions

Source code in shared/iam/permissions.py
def has_permission(  # type: ignore[no-untyped-def]
    user,
    requested_permissions: EmployeePermission | set[EmployeePermission],
    stack_level: int = 2,
) -> bool:
    """Return True if given user has at least one of the given permissions"""
    user_permissions = _get_permissions(user)
    return permissions_intersects(
        requested_permissions, user_permissions, user, stack_level + 1
    )

is_alaner_admin_with_permissions_or_raise

is_alaner_admin_with_permissions_or_raise(
    permitted_for, operation
)

Return True if current actor (which must be an Alaner admin) has one of the listed permissions

Raise a missing permission error if the user is authenticated as an Alaner admin but does not have the required permissions.

Source code in shared/iam/permissions.py
def is_alaner_admin_with_permissions_or_raise(
    permitted_for: set[EmployeePermission], operation: str
) -> bool:
    """Return True if current actor (which must be an Alaner admin) has one of the listed permissions

    Raise a missing permission error if the user is authenticated as an Alaner admin but does not have the required permissions."""

    if not is_alaner_admin():
        return False

    elif alaner_admin_has_permission(permitted_for):
        return True

    else:
        # Here we know we have an authenticated super admin on the client side,
        # so we can provide a helpful indication of what permissions are needed for this
        # operation (rather than a generic 404 Not found).

        return raise_missing_permission_error(permitted_for, operation)

is_authenticated_actor_with_permissions_or_raise

is_authenticated_actor_with_permissions_or_raise(
    permitted_for, operation
)

Return True if current actor (which must be an Alaner admin) has one of the listed permissions

Raise a missing permission error if the user is authenticated as an Alaner admin but does not have the required permissions.

Source code in shared/iam/permissions.py
def is_authenticated_actor_with_permissions_or_raise(
    permitted_for: set[EmployeePermission], operation: str
) -> bool:
    """Return True if current actor (which must be an Alaner admin) has one of the listed permissions

    Raise a missing permission error if the user is authenticated as an Alaner admin but does not have the required permissions."""

    if current_auth_context.has_backoffice_permissions(permitted_for):
        return True

    else:
        # Here we know we have an authenticated super admin on the client side,
        # so we can provide a helpful indication of what permissions are needed for this
        # operation (rather than a generic 404 Not found).

        return raise_missing_permission_error(permitted_for, operation)

permissions_intersects

permissions_intersects(
    requested_permissions,
    permissions,
    entity=None,
    stack_level=2,
)

Return True if the given two Sets of permissions intersect.

The first Set is the requested permissions, the second the available permissions. Properly logs the check. Can receive an entity and stack level for improved logging

Source code in shared/iam/permissions.py
@request_cached()
def permissions_intersects(  # type: ignore[no-untyped-def]
    requested_permissions: EmployeePermission | set[EmployeePermission],
    permissions: set[EmployeePermission],
    entity=None,
    stack_level: int = 2,
) -> bool:
    """Return True if the given two Sets of permissions intersect.

    The first Set is the requested permissions, the second the available
    permissions. Properly logs the check. Can receive an entity and stack level
    for improved logging

    """
    req = (
        requested_permissions
        if isinstance(requested_permissions, set)
        else {requested_permissions}
    )
    intersection = req & permissions
    match: bool = len(intersection) != 0
    stack_txt = format_stack(limit=stack_level)[0]
    stack_txt = re.sub(r"\s+", " ", stack_txt)
    stack_txt = re.sub(r"^\s+", "", stack_txt)
    stack_txt = stack_txt.replace("\n", "-")
    id = getattr(entity, "id", None)
    entity_type = type(entity).__name__

    # Do not log this in DEV mode, it's too verbose
    logging_level = DEBUG if is_development_mode() else INFO
    # No need to log this if the match is "ok", it generates 300k logs per hour
    logging_level = DEBUG if match else logging_level

    current_logger.log(
        logging_level,
        f"permissions challenge result: {match}",
        entity_type=entity_type,
        entity_id=id,
        requested_permissions=requested_permissions,
        result=match,
        intersection=intersection,
        origin_frame=stack_txt,
    )
    return match

permitted_for

permitted_for(permissions)
Source code in shared/iam/permissions.py
def permitted_for(
    permissions: set[EmployeePermission],
) -> Callable[[Callable[P, T]], Callable[P, T]]:
    def decorator(f: Callable[P, T]) -> Callable[P, T]:
        @wraps(f)
        def decorated(*args: P.args, **kwargs: P.kwargs) -> T:
            # Same as @flask_login.login_required
            if request.method in config.EXEMPT_METHODS:
                pass
            elif not (
                is_alaner_admin_with_permissions_or_raise(
                    permissions, operation=f.__name__
                )
                or BackofficePermissionAccessPolicy.permitted_for(
                    permissions
                ).do_evaluate()  # Temporary until we replace all usage of @permitted_for with @enforce_policy
                or is_authenticated_actor_with_permissions_or_raise(
                    permissions, operation=f.__name__
                )
            ):
                raise RestrictToPermissionsExceptionNotFound()

            return f(*args, **kwargs)

        return decorated

    return decorator

raise_missing_permission_error

raise_missing_permission_error(permitted_for, operation)
Source code in shared/iam/permissions.py
def raise_missing_permission_error(
    permitted_for: set[EmployeePermission], operation: str
) -> NoReturn:
    permission_names = {p.name for p in permitted_for}
    raise RestrictToPermissionsExceptionForbidden(
        description=f"Unauthorized: {operation} requires any of these permissions: {', '.join(permission_names)} (https://www.notion.so/alaninsurance/App-roles-and-permissions-cheat-sheet-for-Alaners-b22a1643244e4b568d151d7ad612f372)"
    )

shared.iam.scim_api

AlanEmployeeIdentity dataclass

AlanEmployeeIdentity(first_name, last_name, name=None)

first_name instance-attribute

first_name

full_name property

full_name

last_name instance-attribute

last_name

name class-attribute instance-attribute

name = None

GenericScimAdapter

Generic SCIM adapter for all apps but BE, CA, ES and FR

create_app_user

create_app_user(first_name, last_name, email)
Source code in shared/iam/scim_api.py
def create_app_user(
    self,
    first_name: str,
    last_name: str,
    email: str,  # noqa: ARG002
) -> int | uuid.UUID:
    AlanEmployee = get_alan_employee_cls()
    user_cls = AlanEmployee.user.mapper.class_
    user = user_cls(
        first_name=first_name,
        last_name=last_name,
    )
    current_session.add(user)
    current_session.flush()
    return user.id  # type: ignore[no-any-return]

create_scim_user

create_scim_user(first_name, last_name, email, groups)
Source code in shared/iam/scim_api.py
def create_scim_user(
    self, first_name: str, last_name: str, email: str, groups: list[str]
) -> int:
    AlanEmployee = get_alan_employee_cls()
    if (
        current_session.query(AlanEmployee)  # noqa: ALN085
        .filter(AlanEmployee.alan_email == email)
        .one_or_none()
    ):
        raise BaseErrorCode.resource_already_exists(
            message=f"AlanEmployee {email} already exists"
        )

    user_id = self.create_app_user(
        first_name=first_name, last_name=last_name, email=email
    )

    alan_employee = AlanEmployee(
        alan_email=email,
        roles=groups,
        user_id=user_id,
        start_date=utctoday(),
    )
    current_session.add(alan_employee)
    current_session.commit()

    return alan_employee.id

delete_scim_user

delete_scim_user(alan_employee_id)
Source code in shared/iam/scim_api.py
def delete_scim_user(self, alan_employee_id: int) -> None:
    AlanEmployee = get_alan_employee_cls()
    alan_employee = current_session.get(AlanEmployee, alan_employee_id)
    if not alan_employee:
        raise BaseErrorCode.missing_resource(
            f"AlanEmployee {alan_employee_id} not found"
        )

    alan_employee.end_date = utctoday() - timedelta(days=1)
    current_session.commit()

get_all_scim_users

get_all_scim_users(count, start_index)
Source code in shared/iam/scim_api.py
def get_all_scim_users(self, count: int, start_index: int) -> dict[str, Any]:
    AlanEmployee = get_alan_employee_cls()
    query = current_session.query(AlanEmployee).options(  # noqa: ALN085
        joinedload(AlanEmployee.user)  # type: ignore[arg-type]
    )
    page = paginate(
        query=query,
        page=(start_index // count) + 1,
        per_page=count,
    )
    users_data: dict[int | uuid.UUID, AlanEmployeeIdentity] = (
        self.get_scim_users_data(page.items)
    )

    resources = [
        dict(
            active=a.is_active,
            id=a.id,
            display_name=users_data[a.user_id].full_name
            if a.user_id in users_data
            else None,
            given_name=users_data[a.user_id].first_name
            if a.user_id in users_data
            else None,
            family_name=users_data[a.user_id].last_name
            if a.user_id in users_data
            else None,
            emails=[
                dict(
                    primary=True,
                    type="work",
                    value=a.alan_email,
                ),
            ],
            groups=a.roles,
        )
        for a in page.items
    ]

    return dict(
        schemas=["urn:ietf:params:scim:api:messages:2.0:ListResponse"],
        total_results=page.total,
        start_index=start_index,
        items_per_page=page.per_page,
        resources=resources,
    )

get_scim_user

get_scim_user(alan_employee_id)
Source code in shared/iam/scim_api.py
def get_scim_user(self, alan_employee_id: int) -> dict[str, Any] | None:
    AlanEmployee = get_alan_employee_cls()
    alan_employee = current_session.get(AlanEmployee, alan_employee_id)
    if not alan_employee:
        raise BaseErrorCode.missing_resource(
            message=f"AlanEmployee {alan_employee_id} not found"
        )

    user_data = self.get_user_data(user_id=alan_employee.user_id)  # type: ignore[arg-type]

    return dict(
        schemas=["urn:ietf:params:scim:schemas:core:2.0:User"],
        id=alan_employee.id,
        display_name=user_data.full_name,
        given_name=user_data.first_name,
        family_name=user_data.last_name,
        emails=[
            dict(
                type="work",
                primary=True,
                value=alan_employee.alan_email,
            )
        ],
        groups=alan_employee.roles,
        active=alan_employee.is_active,
    )

get_scim_users_data

get_scim_users_data(alan_employees)
Source code in shared/iam/scim_api.py
def get_scim_users_data(
    self, alan_employees: list[BaseAlanEmployee]
) -> dict[int | uuid.UUID, AlanEmployeeIdentity]:
    return {
        alan_employee.user_id: AlanEmployeeIdentity(  # type: ignore[misc]
            first_name=alan_employee.user.first_name,
            last_name=alan_employee.user.last_name,
        )
        for alan_employee in alan_employees
    }

get_user_data

get_user_data(user_id)
Source code in shared/iam/scim_api.py
def get_user_data(self, user_id: int | uuid.UUID) -> AlanEmployeeIdentity:
    AlanEmployee = get_alan_employee_cls()
    user_cls = AlanEmployee.user.mapper.class_
    user = (
        current_session.query(user_cls).filter(user_cls.id == user_id).one_or_none()  # noqa: ALN085
    )
    if user is None:
        raise BaseErrorCode.missing_resource()
    return AlanEmployeeIdentity(
        first_name=user.first_name,
        last_name=user.last_name,
    )

update_scim_user

update_scim_user(alan_employee_id, payload)
Source code in shared/iam/scim_api.py
def update_scim_user(self, alan_employee_id: int, payload: dict[str, Any]) -> None:
    AlanEmployee = get_alan_employee_cls()
    alan_employee = current_session.get(AlanEmployee, alan_employee_id)
    if not alan_employee:
        raise BaseErrorCode.missing_resource(
            f"AlanEmployee {alan_employee_id} not found"
        )

    if payload.get("emails"):
        alan_employee.alan_email = payload["emails"][0]["value"]
    if "groups" in payload:
        alan_employee.roles = payload["groups"]
    if "active" in payload and payload["active"] is not None:
        if payload["active"] and not alan_employee.is_active:
            if (
                alan_employee.start_date is None
                or alan_employee.start_date > utctoday()
            ):
                alan_employee.start_date = utctoday()
            elif alan_employee.end_date and alan_employee.end_date < utctoday():
                alan_employee.end_date = None
        elif not payload["active"] and alan_employee.is_active:
            alan_employee.end_date = utctoday() - timedelta(days=1)

    current_session.commit()

P module-attribute

P = ParamSpec('P')

ScimAuthSchema

Bases: Schema

authorization class-attribute instance-attribute

authorization = String(
    data_key="Authorization", required=True
)

ScimEmailSchema

Bases: Schema

primary class-attribute instance-attribute

primary = Boolean()

type class-attribute instance-attribute

type = String()

value class-attribute instance-attribute

value = Email(required=True)

ScimErrorSchema

Bases: Schema

detail class-attribute instance-attribute

detail = String()

schemas class-attribute instance-attribute

schemas = List(String())

status class-attribute instance-attribute

status = Integer()

ScimUserCountryDataSchema

Bases: Schema

family_name class-attribute instance-attribute

family_name = String(data_key='familyName', required=True)

given_name class-attribute instance-attribute

given_name = String(data_key='givenName', required=True)

ScimUserCreateSchema

Bases: Schema

active class-attribute instance-attribute

active = Boolean(missing=True)

emails class-attribute instance-attribute

emails = List(
    Nested(ScimEmailSchema),
    required=True,
    validate=Length(min=1),
)

family_name class-attribute instance-attribute

family_name = String(data_key='familyName', required=True)

given_name class-attribute instance-attribute

given_name = String(data_key='givenName', required=True)

groups class-attribute instance-attribute

groups = List(
    String(validate=create_marshmallow_validator())
)

schemas class-attribute instance-attribute

schemas = List(String())

ScimUserEditSchema

Bases: Schema

active class-attribute instance-attribute

active = Boolean()

emails class-attribute instance-attribute

emails = List(Nested(ScimEmailSchema))

family_name class-attribute instance-attribute

family_name = String(data_key='familyName')

given_name class-attribute instance-attribute

given_name = String(data_key='givenName')

groups class-attribute instance-attribute

groups = List(
    String(validate=create_marshmallow_validator())
)

schemas class-attribute instance-attribute

schemas = List(String())

ScimUserOutputSchema

Bases: ScimUserEditSchema

display_name class-attribute instance-attribute

display_name = String(data_key='displayName')

id class-attribute instance-attribute

id = Integer()

ScimUsersListSchema

Bases: Schema

items_per_page class-attribute instance-attribute

items_per_page = Integer(data_key='itemsPerPage')

resources class-attribute instance-attribute

resources = List(
    Nested(ScimUserOutputSchema), data_key="Resources"
)

schemas class-attribute instance-attribute

schemas = List(String())

start_index class-attribute instance-attribute

start_index = Integer(data_key='startIndex')

total_results class-attribute instance-attribute

total_results = Integer(data_key='totalResults')

ScimUsersQueryArgSchema

Bases: Schema

count class-attribute instance-attribute

count = Integer(missing=100)

start_index class-attribute instance-attribute

start_index = Integer(missing=1, data_key='startIndex')

T module-attribute

T = TypeVar('T')

current_alan_employees

Bases: MethodView

get

get(count=100, start_index=1)
Source code in shared/iam/scim_api.py
@scim_blueprint.response(status_code=200, schema=ScimUsersListSchema)
@scim_blueprint.arguments(ScimUsersQueryArgSchema, location="query", as_kwargs=True)
@scim_auth_required
def get(self, count: int = 100, start_index: int = 1):  # type: ignore[no-untyped-def]
    return get_scim_adapter().get_all_scim_users(
        count=count, start_index=start_index
    )

post

post(payload)
Source code in shared/iam/scim_api.py
@scim_blueprint.response(status_code=201, schema=ScimUserOutputSchema)
@scim_blueprint.arguments(ScimUserCreateSchema)
@scim_auth_required
@scim_api_error_handler
def post(self, payload):  # type: ignore[no-untyped-def]
    scim_adapter = get_scim_adapter()

    email: str = get_from_path(payload, ["emails", 0, "value"])
    given_name: str = payload["given_name"]
    family_name: str = payload["family_name"]
    groups: list[str] = payload["groups"]

    alan_employee_id = scim_adapter.create_scim_user(
        email=email, first_name=given_name, last_name=family_name, groups=groups
    )

    return scim_adapter.get_scim_user(alan_employee_id=alan_employee_id)

get_alan_employee_cls

get_alan_employee_cls()

Get the Alan employee class.

Source code in shared/iam/scim_api.py
def get_alan_employee_cls() -> type["BaseAlanEmployee"]:
    """Get the Alan employee class."""
    if is_test_mode():
        from shared.models.testing.models import TestAlanEmployee

        return TestAlanEmployee
    global _alan_employee_cls
    return mandatory(_alan_employee_cls)

get_scim_adapter

get_scim_adapter()

Get the SCIM adapter instance.

Source code in shared/iam/scim_api.py
def get_scim_adapter() -> "GenericScimAdapter":
    """Get the SCIM adapter instance."""
    if is_test_mode():
        return GenericScimAdapter()
    global _scim_adapter
    return mandatory(_scim_adapter)

record

record(state)

Record callback to store the adapter instance.

Source code in shared/iam/scim_api.py
@scim_blueprint.record_once
def record(state: BlueprintSetupState) -> None:
    """Record callback to store the adapter instance."""
    global _scim_adapter, _alan_employee_cls
    _scim_adapter = state.options.get("scim_adapter", GenericScimAdapter())
    _alan_employee_cls = state.options.get(
        "alan_employee_cls", get_current_class(BaseAlanEmployee)
    )

scim_api_error_handler

scim_api_error_handler(f)
Source code in shared/iam/scim_api.py
def scim_api_error_handler(f: Callable[P, T]) -> Callable[P, "T | Response"]:
    @wraps(f)
    def decorated_function(*args: P.args, **kwargs: P.kwargs) -> "T | Response":
        try:
            return f(*args, **kwargs)
        except BaseErrorCode as e:
            if e.alancode == 901:
                return make_response(
                    ScimErrorSchema().dumps(
                        dict(
                            schemas=["urn:ietf:params:scim:api:messages:2.0:Error"],
                            detail="User does not exists",
                            status=404,
                        ),
                    ),
                    404,
                )
            elif e.alancode == 907:
                return make_response(
                    ScimErrorSchema().dumps(
                        dict(
                            schemas=["urn:ietf:params:scim:api:messages:2.0:Error"],
                            detail="User already exists",
                            status=409,
                        ),
                    ),
                    409,
                )
            raise

    return decorated_function

scim_auth_required

scim_auth_required(f)
Source code in shared/iam/scim_api.py
def scim_auth_required(f: Callable[P, T]) -> Callable[P, T]:
    @wraps(f)
    @scim_blueprint.arguments(ScimAuthSchema, location="headers", as_kwargs=True)
    def decorated(*args: P.args, **kwargs: P.kwargs) -> T:
        authorization = cast("str | None", kwargs.pop("authorization", None))
        if not authorization or not authorization.startswith("Bearer "):
            abort(404)
        token = authorization.removeprefix("Bearer ")

        scim_public_key = serialization.load_pem_public_key(
            current_config["EU_TOOLS_PUBLIC_KEY"]
        )
        # Decode and validate signature, aud(ience), and exp(iration)
        jwt.decode(
            token,
            scim_public_key,  # type: ignore[arg-type]
            audience=current_config["BASE_URL"],
            algorithms="RS256",
        )
        return f(*args, **kwargs)

    return decorated

scim_blueprint module-attribute

scim_blueprint = CustomBlueprint(
    "scim", "scim", url_prefix="/scim/v2"
)

shared.iam.sensitive_data_mixin

SensitiveData

Allow marking the model instances as "sensitive data", meaning that not every admin can view their information.

is_sensitive_data class-attribute instance-attribute

is_sensitive_data = mapped_column(
    Boolean, nullable=False, server_default=false()
)

shared.iam.token_auth

TokenAuth

TokenAuth(
    cookie_name,
    optional,
    allow_deep_link=False,
    restrict_location_for_alaner_admin=True,
)

Token authentication

This class authenticates a request based on - the presence of a valid token passed in a Bearer authorization header for mobile requests - the presence of a valid token passed in a cookie for web requests

Source code in shared/iam/token_auth.py
def __init__(
    self,
    cookie_name: str,
    optional: bool,
    allow_deep_link: bool = False,
    restrict_location_for_alaner_admin: bool = True,
) -> None:
    self.bearer_token_auth = BearerTokenAuth(
        optional=optional,
        allow_deep_link=allow_deep_link,
        restrict_location_for_alaner_admin=restrict_location_for_alaner_admin,
    )
    self.cookie_token_auth = CookieTokenAuth(
        cookie_name=cookie_name,
        verify_token_callback=self.bearer_token_auth.verify_token,
    )
    self.cookie_name = cookie_name

bearer_token_auth instance-attribute

bearer_token_auth = BearerTokenAuth(
    optional=optional,
    allow_deep_link=allow_deep_link,
    restrict_location_for_alaner_admin=restrict_location_for_alaner_admin,
)

cookie_name instance-attribute

cookie_name = cookie_name

cookie_token_auth instance-attribute

cookie_token_auth = CookieTokenAuth(
    cookie_name=cookie_name,
    verify_token_callback=verify_token,
)
is_client_authenticating_with_cookie()
Source code in shared/iam/token_auth.py
def is_client_authenticating_with_cookie(self) -> bool:
    auth_header = request.headers.get(AUTHENTICATION_METHOD_HEADER)
    logger.debug(f"header X-APP-AUTH: {auth_header}")
    return auth_header == AuthenticationType.cookie

login_required

login_required(f)
Source code in shared/iam/token_auth.py
def login_required(self, f: Callable[_P, _T]) -> Callable[_P, _T]:
    @wraps(f)
    def decorated(*args, **kwargs):  # type: ignore[no-untyped-def]
        func = f
        if (
            self.is_client_authenticating_with_cookie()
            and ALANER_ADMIN_HEADER in request.headers
        ):
            # X-ALAN-SUPERADMIN=1 + X-APP-AUTH=cookie (impersonation is not supported yet)
            logger.debug(
                "Request from web by admin, authentication by Flask session cookie"
            )
            func = google_auth_login_required(f)
        elif (
            self.is_client_authenticating_with_cookie()
            and not claims_to_be_alaner_admin()  # type: ignore[no-untyped-call]
        ):
            # X-APP-AUTH=cookie - neither X-ALAN-SUPERADMIN or X-ALAN-IMPERSONATED-USER is set
            logger.debug("Request from web, authentication by cookie")
            func = self.cookie_token_auth.login_required(f)  # type: ignore[no-untyped-call]
        else:
            # X-APP-AUTH=bearer or X-ALAN-SUPERADMIN is set
            logger.debug(
                "Request from mobile or default behaviour, authentication by header"
            )
            func = self.bearer_token_auth.login_required(f)
        return func(*args, **kwargs)

    return decorated

shared.iam.token_auth_helpers

JWT_ALGORITHM module-attribute

JWT_ALGORITHM = 'HS512'

get_alaner_admin_user_from_claims

get_alaner_admin_user_from_claims(
    claims, restrict_location=True
)

Returns the User from the JWT token claims, none if not found or not from authorized location. This method is used for admin URLs only.

Raises RestrictInvalidToken: if claims are missing (warning)

Raises RestrictAlanEmployeeInvalid: if the alan employee is unauthorized (error)

Source code in shared/iam/token_auth_helpers.py
def get_alaner_admin_user_from_claims(  # type: ignore[no-untyped-def]
    claims: JWTClaims | dict | None,  # type: ignore[type-arg]
    restrict_location: bool = True,
):
    """
    Returns the User from the JWT token claims, none if not found or not from authorized location.
    This method is used for admin URLs only.

    Raises RestrictInvalidToken: if claims are missing (warning)

    Raises RestrictAlanEmployeeInvalid: if the alan employee is unauthorized (error)
    """

    if claims is None:
        raise RestrictInvalidToken()

    try:
        # Note: the Flask session JSON serializer will return a dict for a JWTClaims object (which is a subclass
        # of dict, so we can't just test for isinstance(claims, dict), because that would be True
        # for JWTClaims as well).

        if isinstance(claims, dict):
            claims = JWTClaims(claims, header=None)

        # Now revalidate claims to check for expiration

        claims.validate()  # type: ignore[union-attr]

    except (AttributeError, JoseError):
        # AttributeError is raised if we have a string instead of the dict in the session token

        raise RestrictInvalidToken()

    return get_authenticated_user_from_id(claims["id"], restrict_location)

get_alaner_admin_user_from_token

get_alaner_admin_user_from_token(
    token, restrict_location=True
)
Source code in shared/iam/token_auth_helpers.py
def get_alaner_admin_user_from_token(token: str, restrict_location: bool = True):  # type: ignore[no-untyped-def]
    return get_alaner_admin_user_from_claims(
        get_claims_from_token(token), restrict_location
    )

get_authenticated_user_from_email

get_authenticated_user_from_email(
    alan_email, restrict_location=True
)

Return the User from the given email, if the user is an alan employee, is currently active, and connected from an authorized location. This method is used for admin URLs only.

Raises RestrictAlanEmployeeInvalid: if not the case

Source code in shared/iam/token_auth_helpers.py
def get_authenticated_user_from_email(  # type: ignore[no-untyped-def]
    alan_email: str,
    restrict_location: bool = True,
):
    """
    Return the User from the given email, if the user is an alan employee, is currently active,
    and connected from an authorized location. This method is used for admin URLs only.

    Raises RestrictAlanEmployeeInvalid: if not the case
    """
    AlanEmployee = get_current_class(BaseAlanEmployee)
    alan_employee = (
        current_session.query(AlanEmployee)  # noqa: ALN085
        .filter(AlanEmployee.alan_email == alan_email)
        .one_or_none()
    )
    return _get_authenticated_user(
        alan_employee,
        restrict_location=restrict_location,
    )

get_authenticated_user_from_id

get_authenticated_user_from_id(
    user_id, restrict_location=True
)

Return the User from the given id, if the user is an alan employee, is currently active, and connected from an authorized location. This method is used for admin URLs only.

Raises RestrictAlanEmployeeInvalid: if not the case

Source code in shared/iam/token_auth_helpers.py
def get_authenticated_user_from_id(  # type: ignore[no-untyped-def]
    user_id: int,
    restrict_location: bool = True,
):
    """
    Return the User from the given id, if the user is an alan employee, is currently active,
    and connected from an authorized location. This method is used for admin URLs only.

    Raises RestrictAlanEmployeeInvalid: if not the case
    """
    AlanEmployee = get_current_class(BaseAlanEmployee)
    try:
        alan_employee = (
            current_session.query(AlanEmployee)  # noqa: ALN085
            .filter(AlanEmployee.user_id == user_id)
            .one_or_none()
        )
    except DataError:
        logger.warning(f"Invalid user id format: {user_id}", exc=True)
        alan_employee = None
    return _get_authenticated_user(
        alan_employee,
        restrict_location,
    )

get_claims_from_token

get_claims_from_token(token)

Returns the claims from the JWT token, none if invalid token or expired. This method is used for admin URLs only. The claims do not contain the signature, and do so not constitute sensitive credentials.

Source code in shared/iam/token_auth_helpers.py
def get_claims_from_token(token: str) -> JWTClaims | None:
    """
    Returns the claims from the JWT token, none if invalid token or expired. This method is used for
    admin URLs only. The claims do not contain the signature, and do so not constitute sensitive credentials.
    """
    if not token:
        return None

    try:
        claims = jwt.decode(token, current_config["SECRET_KEY"])
        claims.validate()
        return claims
    except JoseError as e:
        logger.warning(f"token verification failed: {e}")
        return None

make_token

make_token(claims, issued_at, expires_in)
Source code in shared/iam/token_auth_helpers.py
def make_token(claims: Mapping, issued_at: int, expires_in: int) -> str:  # type: ignore[type-arg]
    payload = dict(
        claims,
        iat=issued_at,
        exp=issued_at + expires_in,
    )
    return jwt.encode(  # type: ignore[no-any-return]
        header={"alg": JWT_ALGORITHM},
        payload=payload,
        key=current_config["SECRET_KEY"],
    ).decode("ascii")

make_token_for_id

make_token_for_id(user_id)

Returns the JWT token for the given user id, with an expiration time at 23:00 (cutoff) or the next day at 23:00 if we're 1 hour before the cutoff. This method is used for admin URLs only.

Source code in shared/iam/token_auth_helpers.py
def make_token_for_id(user_id: int | str) -> str:
    """
    Returns the JWT token for the given user id, with an expiration time at 23:00 (cutoff) or the next day
    at 23:00 if we're 1 hour before the cutoff. This method is used for admin URLs only.
    """
    now = datetime.now().replace(microsecond=0)
    expires_cutoff = now.replace(hour=23, minute=0, second=0)
    expires_in: timedelta = expires_cutoff - now
    if expires_in < timedelta(hours=1):
        # If less than 1 hour remains to cutoff (or after cutoff), wrap to next day
        expires_in += timedelta(days=1)

    return make_token(
        claims={"id": str(user_id)},
        issued_at=int(now.timestamp()),
        expires_in=int(expires_in.total_seconds()),
    )

shared.iam.tracking

events

publish_user_session_ended

publish_user_session_ended(user, session_id, session_type)
Source code in shared/iam/tracking/events.py
@catch_and_log(TrackingException)
def publish_user_session_ended(
    user: "Authenticatable", session_id: UUID, session_type: str
) -> None:
    profile = AuthProfileTraits(user)
    properties = {"session_id": str(session_id), "session_type": session_type}
    client.identify(profile.identifier, traits_provider=profile.get_attributes)
    client.track(
        profile.identifier,
        event="user_session_ended",
        properties=properties,
        # Disable Amplitude for this event
        integrations={"Amplitude": False, "Amplitude (Actions)": False},
    )

publish_user_session_refreshed

publish_user_session_refreshed(
    user, session_id, session_type
)
Source code in shared/iam/tracking/events.py
@catch_and_log(TrackingException)
def publish_user_session_refreshed(
    user: "Authenticatable", session_id: UUID, session_type: str
) -> None:
    profile = AuthProfileTraits(user)
    properties = {"session_id": str(session_id), "session_type": session_type}
    client.identify(profile.identifier, traits_provider=profile.get_attributes)
    client.track(
        profile.identifier,
        event="user_session_refreshed",
        properties=properties,
        # Disable Amplitude for this event
        integrations={"Amplitude": False, "Amplitude (Actions)": False},
    )

publish_user_session_started

publish_user_session_started(
    user, session_id, session_type
)
Source code in shared/iam/tracking/events.py
@catch_and_log(TrackingException)
def publish_user_session_started(
    user: "Authenticatable", session_id: UUID, session_type: str
) -> None:
    profile = AuthProfileTraits(user)
    properties = {"session_id": str(session_id), "session_type": session_type}
    client.identify(profile.identifier, traits_provider=profile.get_attributes)
    client.track(
        profile.identifier,
        event="user_session_started",
        properties=properties,
        # Disable Amplitude for this event
        integrations={"Amplitude": False, "Amplitude (Actions)": False},
    )

traits

AuthProfileTraits

AuthProfileTraits(identifiable, front_end_url=None)

Bases: UserTraits['Authenticatable']

Source code in shared/services/segment/base.py
def __init__(
    self, identifiable: T, front_end_url: FrontendURL | None = None
) -> None:
    self.user = identifiable
    self._front_end_url = front_end_url
get_attributes
get_attributes()

Return authentication-relevant attributes from the authenticatable user.

Source code in shared/iam/tracking/traits.py
def get_attributes(self) -> dict[str, str | None]:
    """
    Return authentication-relevant attributes from the authenticatable user.
    """

    attributes = {
        "alan_user_id": str(self.user.id) if self.user.id else None,
        "keycloak_id": str(self.user.keycloak_id)
        if self.user.keycloak_id
        else None,
        "created_at": str(self.user.created_at),
    }

    # Add email, first_name, and last_name if they exist (to avoid overriding to empty strings)
    if self.user.email:
        attributes["email"] = self.user.email
    if self.user.first_name:
        attributes["first_name"] = self.user.first_name
    if self.user.last_name:
        attributes["last_name"] = self.user.last_name

    return attributes

shared.iam.user_auth

validate_at_hash

validate_at_hash(at_hash, access_token, alg)

Validates at_hash (access token hash) claim stored in the ID token against the access token itself. Described here: https://openid.net/specs/openid-connect-core-1_0.html#CodeFlowTokenValidation ⧉ To do this, we use the same function used by authlib to generate the at_hash and compare the result.

Source code in shared/iam/user_auth.py
def validate_at_hash(at_hash: str, access_token: str, alg: str) -> None:
    """
    Validates at_hash (access token hash) claim stored in the ID token against the access token itself.
    Described here: https://openid.net/specs/openid-connect-core-1_0.html#CodeFlowTokenValidation
    To do this, we use the same function used by authlib to generate the at_hash and compare the result.
    """
    expected_at_hash = to_native(create_half_hash(access_token, alg=alg))

    if at_hash != expected_at_hash:
        raise ValueError("The at_hash is not consistent with the access token")