Attribute-Based Access Control (ABAC)¶
ABAC provides fine-grained access control based on attributes of the user, resource, and environment.
Migration in progress
ABAC policies are not yet available on all backends. While policies could technically use the legacy g.current_user / g.actor globals, it's recommended to wait until current_auth_context is available on your backend before implementing access policies.
ABAC will fully replace ad-hoc permission checks in code as well as BaseController.can_read / can_write. Roles (EmployeeRole) will continue to be used - ABAC policies can check roles as part of their evaluation.
Some features (e.g., CustomBlueprint.default_access_policies, CustomMethodView HEAD endpoints) require Flask-smorest, which is an ongoing migration.
AccessPolicy¶
An AccessPolicy defines a single access control rule. Policies are reusable classes that can be composed and applied to multiple endpoints.
Creating a Policy¶
from shared.iam.abac.access_policy import AccessPolicy
class DocumentOwnerPolicy(AccessPolicy):
"""Only document owners can access their documents"""
policy_id = "document-owner"
@classmethod
def evaluate(cls, document_id: int, user_id: int) -> bool:
document = Document.query.get(document_id)
return document is not None and document.owner_id == user_id
Policy Requirements¶
policy_id- Unique string identifier for logging and metricsdescription- Either a class attribute OR docstring (required)evaluate()- Classmethod returningTrue(allow) orFalse(deny)
# Option 1: Use docstring as description
class MyPolicy(AccessPolicy):
"""This docstring becomes the description"""
policy_id = "my-policy"
@classmethod
def evaluate(cls) -> bool:
return True
# Option 2: Explicit description attribute
class MyPolicy(AccessPolicy):
policy_id = "my-policy"
description = "Explicit description here"
@classmethod
def evaluate(cls) -> bool:
return True
Policy Parameters¶
Policy parameters are automatically injected from the decorated function's arguments:
class ResourceAccessPolicy(AccessPolicy):
"""Check user can access resource"""
policy_id = "resource-access"
@classmethod
def evaluate(cls, resource_id: str, user_id: int) -> bool:
# These params come from the decorated function
resource = get_resource(resource_id)
return resource.can_access(user_id)
@enforce_policy(ResourceAccessPolicy)
def get_resource_details(resource_id: str, user_id: int, include_metadata: bool = False):
# resource_id and user_id are passed to the policy
# include_metadata is not used by the policy
pass
Parameter matching rules:
- Policy receives only parameters it declares
- Function can have more parameters than policy needs
- Missing required policy parameters raise
ValueErrorat decoration time - Optional policy parameters use function defaults
enforce_policy Decorator¶
The @enforce_policy decorator evaluates policies before function execution:
from shared.iam.abac.access_policy import enforce_policy
@enforce_policy(DocumentOwnerPolicy)
def update_document(document_id: int, user_id: int, data: dict):
# Only executes if policy passes
document = Document.query.get(document_id)
document.update(data)
Multiple Policies (AND Logic)¶
Pass multiple policies - all must pass:
@enforce_policy(AuthenticatedPolicy, DocumentOwnerPolicy, ActiveSubscriptionPolicy)
def premium_document_action(document_id: int, user_id: int):
# Requires: authenticated AND owner AND active subscription
pass
OR Logic with or_()¶
Use or_() when any policy passing is sufficient:
from shared.iam.abac.access_policy import enforce_policy, or_
@enforce_policy(or_(AdminPolicy, DocumentOwnerPolicy))
def delete_document(document_id: int, user_id: int):
# Allows: admin OR document owner
pass
Combining AND and OR¶
@enforce_policy(
AuthenticatedPolicy,
or_(AdminPolicy, DocumentOwnerPolicy),
ActiveSubscriptionPolicy,
)
def complex_action(document_id: int, user_id: int):
# Requires: authenticated AND (admin OR owner) AND active subscription
pass
Built-in Policies¶
BackofficePermissionAccessPolicy¶
Factory for creating permission-based policies for Alaners:
from shared.iam.abac.backoffice import BackofficePermissionAccessPolicy
from shared.models.enums.employee_permission import EmployeePermission
# Single permission
@enforce_policy(BackofficePermissionAccessPolicy.permitted_for(
EmployeePermission.manage_users
))
def admin_only_action():
pass
# Multiple permissions (OR logic)
@enforce_policy(BackofficePermissionAccessPolicy.permitted_for({
EmployeePermission.manage_users,
EmployeePermission.view_reports,
}))
def admin_action():
# Passes if user has ANY of these permissions
pass
Error Handling¶
AccessPolicyError¶
When a policy fails, AccessPolicyError is raised (HTTP 403):
from shared.iam.abac.access_policy import AccessPolicyError
try:
restricted_action()
except AccessPolicyError as e:
# e.description contains the policy_id
print(f"Access denied: {e}")
Validation Errors¶
Missing required parameters raise ValueError at decoration time:
class RequiresUserIdPolicy(AccessPolicy):
policy_id = "requires-user"
@classmethod
def evaluate(cls, user_id: int) -> bool:
return user_id > 0
# This raises ValueError immediately - user_id not in function signature
@enforce_policy(RequiresUserIdPolicy)
def broken_function(document_id: int): # Missing user_id!
pass
Examples¶
Simple Authentication Policy¶
class AuthenticatedPolicy(AccessPolicy):
"""Require authentication"""
policy_id = "authenticated"
@classmethod
def evaluate(cls) -> bool:
return current_auth_context.is_authenticated
Resource Ownership Policy¶
class ProjectMemberPolicy(AccessPolicy):
"""User must be a member of the project"""
policy_id = "project-member"
@classmethod
def evaluate(cls, project_id: int) -> bool:
if current_auth_context.is_anonymous:
return False
user = current_auth_context.real_principal_as(User)
project = Project.query.get(project_id)
return project is not None and user.id in project.member_ids
Role-Based Policy¶
class AdminOrManagerPolicy(AccessPolicy):
"""User must be admin or manager"""
policy_id = "admin-or-manager"
@classmethod
def evaluate(cls, organization_id: int) -> bool:
if current_auth_context.is_anonymous:
return False
user = current_auth_context.real_principal_as(User)
membership = OrganizationMembership.query.filter_by(
user_id=user.id,
organization_id=organization_id,
).first()
return membership is not None and membership.role in ("admin", "manager")
Time-Based Policy¶
from datetime import datetime, time
class BusinessHoursPolicy(AccessPolicy):
"""Only allow during business hours"""
policy_id = "business-hours"
@classmethod
def evaluate(cls) -> bool:
now = datetime.now().time()
start = time(9, 0)
end = time(17, 0)
return start <= now <= end
Combining Policies¶
# Define individual policies
class VerifiedEmailPolicy(AccessPolicy):
"""User must have verified email"""
policy_id = "verified-email"
@classmethod
def evaluate(cls) -> bool:
user = current_auth_context.real_principal_as(User)
return user.email_verified
class ActiveSubscriptionPolicy(AccessPolicy):
"""User must have active subscription"""
policy_id = "active-subscription"
@classmethod
def evaluate(cls, user_id: int) -> bool:
return Subscription.is_active(user_id)
# Combine in endpoint
@enforce_policy(
AuthenticatedPolicy,
VerifiedEmailPolicy,
ActiveSubscriptionPolicy,
)
def premium_feature(user_id: int):
# Requires all three policies to pass
pass
Blueprint-Level Policies¶
Apply policies to all routes in a blueprint using the default_access_policies parameter:
from shared.api.custom_smorest_blueprint import CustomBlueprint
from shared.iam.abac.backoffice import BackofficePermissionAccessPolicy
from shared.models.enums.employee_permission import EmployeePermission
# All routes require admin permission
admin_blueprint = CustomBlueprint(
"admin",
__name__,
default_access_policies=[
BackofficePermissionAccessPolicy.permitted_for(EmployeePermission.admin)
],
)
Preemptive Permission Checks (HEAD Endpoints)¶
Views extending CustomMethodView automatically get HEAD endpoints for preemptive permission checking. This allows clients to verify access before attempting an operation.
sequenceDiagram
participant Client
participant API
participant Policy
Client->>API: HEAD /users/123?method=delete
API->>Policy: Evaluate DeleteUserPolicy(user_id=123)
Policy-->>API: Allow/Deny
alt Access Allowed
API-->>Client: 204 No Content
Client->>API: DELETE /users/123
API-->>Client: 200 OK
else Access Denied
API-->>Client: 403 Forbidden
Note over Client: UI disables delete button
end
Usage¶
Send a HEAD request with a method query parameter specifying which HTTP method to check:
Response Codes¶
| Code | Meaning |
|---|---|
| 204 | Access allowed |
| 403 | Access denied (check X-Alan-Failed-Access-Policy-Id header) |
| 404 | Method not implemented on this resource |
| 501 | Policy requires body parameters (cannot pre-check) |
Limitations¶
- Only path parameters are available for policy evaluation
- Body parameters are not available (HEAD requests have no body)
- Policies requiring body parameters will return 501
This means policies based on resource IDs work fine (e.g., CanAccessUser(user_id)), but policies requiring request body data cannot be pre-checked.
Observability¶
Policies automatically emit metrics and logs:
Metrics:
- iam.access_policy.evaluation_duration_ms with tags policy_id and result
Logging:
- Debug: Policy evaluation succeeded
- Info: Policy evaluation failed (includes access_policy_id)
Best Practices¶
- Keep policies focused - One policy = one rule
- Use descriptive policy_ids - They appear in logs and metrics
- Compose with
or_()and multiple policies - Don't put complex logic in one policy - Test policies independently - They're just functions returning bool
- Fail secure - Return
Falsewhen in doubt - Document with docstrings - They become the policy description