Skip to content

API Reference

apps.eu_tools.user_lifecycle.models.task.PauseTaskException dataclass

PauseTaskException(
    resume_label,
    cancel_label,
    pinged_groups=None,
    pinged_user=None,
    additional_context=None,
)

Bases: Exception

additional_context class-attribute instance-attribute

additional_context = None

cancel_label instance-attribute

cancel_label

pinged_groups class-attribute instance-attribute

pinged_groups = None

pinged_user class-attribute instance-attribute

pinged_user = None

resume_label instance-attribute

resume_label

apps.eu_tools.user_lifecycle.models.task.PauseTaskTrigger

__call__

__call__(
    resume_label,
    cancel_label,
    pinged_groups=None,
    pinged_user=None,
    additional_context=None,
)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def __call__(  # type: ignore[no-untyped-def]
    self,
    resume_label: str,
    cancel_label: str,
    pinged_groups: list[str] | None = None,
    pinged_user: str | None = None,
    additional_context: dict[str, str] | None = None,
):
    raise PauseTaskException(
        resume_label,
        cancel_label,
        pinged_groups,
        pinged_user,
        additional_context,
    )

apps.eu_tools.user_lifecycle.models.task.UserLifecycleOffboardingTask

UserLifecycleOffboardingTask(user)

Bases: UserLifecycleTask, ABC

Source code in apps/eu_tools/user_lifecycle/models/task.py
def __init__(self, user: AlanUser) -> None:
    self.user = user
    if self.id is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask unique identifier for {self.__class__.name}"
        )
    if self.label is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask label for {self.__class__.name}"
        )
    if self.channel is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask channel for {self.__class__.name}"
        )
    task_state = (
        current_session.scalars(
            select(UserLifecycleTaskStatus).filter_by(task_id=self.id)
        )
        .unique()
        .first()
    )
    self._quiet = task_state.quiet if task_state else False

channel property

channel

apps.eu_tools.user_lifecycle.models.task.UserLifecycleOnboardingTask

UserLifecycleOnboardingTask(user)

Bases: UserLifecycleTask

Source code in apps/eu_tools/user_lifecycle/models/task.py
def __init__(self, user: AlanUser) -> None:
    self.user = user
    if self.id is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask unique identifier for {self.__class__.name}"
        )
    if self.label is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask label for {self.__class__.name}"
        )
    if self.channel is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask channel for {self.__class__.name}"
        )
    task_state = (
        current_session.scalars(
            select(UserLifecycleTaskStatus).filter_by(task_id=self.id)
        )
        .unique()
        .first()
    )
    self._quiet = task_state.quiet if task_state else False

onboarding_offset_in_days class-attribute instance-attribute

onboarding_offset_in_days = {alaner: 0, external: 0}

provider instance-attribute

provider

run

run(logger, _pause_callback, dry_run)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def run(
    self,
    logger: BoundLogger,
    _pause_callback: PauseTaskTrigger,
    dry_run: bool,
) -> list[str] | None:
    return self.provider.provision_user(self.user, logger, dry_run)

should_run

should_run(_logger)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def should_run(self, _logger: BoundLogger) -> bool:
    if not self.user.is_started_on(
        utctoday() + timedelta(days=self.onboarding_offset_in_days[self.user.type])
    ):
        return False

    if self.user.is_ended:
        return False

    if self.user.type == AlanAccountType.alaner and not is_allowed_to_onboard(
        cast("Alaner", self.user)
    ):
        return False

    return True

apps.eu_tools.user_lifecycle.models.task.UserLifecycleTask

UserLifecycleTask(user)

Bases: ABC

Source code in apps/eu_tools/user_lifecycle/models/task.py
def __init__(self, user: AlanUser) -> None:
    self.user = user
    if self.id is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask unique identifier for {self.__class__.name}"
        )
    if self.label is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask label for {self.__class__.name}"
        )
    if self.channel is None:
        raise RuntimeError(
            f"Missing UserLifecycleTask channel for {self.__class__.name}"
        )
    task_state = (
        current_session.scalars(
            select(UserLifecycleTaskStatus).filter_by(task_id=self.id)
        )
        .unique()
        .first()
    )
    self._quiet = task_state.quiet if task_state else False

channel instance-attribute

channel

id instance-attribute

id

label instance-attribute

label

max_fails_before_deactivation class-attribute instance-attribute

max_fails_before_deactivation = 5

process_cancel

process_cancel(
    reporter,
    message_ts,
    dry_run,
    task_log_id=None,
    task_log=None,
)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def process_cancel(
    self,
    reporter: UserLifecycleSlackReporter,
    message_ts: str,
    dry_run: bool,
    task_log_id: UUID | None = None,
    task_log: UserLifecycleTaskLog | None = None,
) -> bool:
    logger = self._get_logger(dry_run)

    if current_auth_context.is_anonymous:
        return False

    canceled_by = current_auth_context.real_principal_as(Alaner)
    if self.should_cancel(canceled_by, logger, reporter):
        logger.info(
            f"Canceling UserLifecycleTask {self.id} for {self.user.full_name}",
            canceled_by=canceled_by.id,
        )
        reporter.stop(
            self.label,
            channel=self.channel,
            task_id=self.id,
            success=False,
            message_ts=message_ts,
            additional_context=[f"Canceled by <@{canceled_by.slack_id}>"],
        )
        if not dry_run:
            task_log = (
                task_log
                if task_log
                else current_session.get(UserLifecycleTaskLog, task_log_id)
            )
            task_log.canceled_at = datetime.utcnow()  # type: ignore[union-attr]
            task_log.canceled_by = canceled_by.id  # type: ignore[union-attr]
            task_log.permalink = get_permalink(  # type: ignore[union-attr]
                mandatory(get_channel_id(self.channel)),
                message_ts,
            )

            current_session.commit()

        logger.info(
            f"Canceled UserLifecycleTask {self.id} for {self.user.full_name}",
            canceled_by=canceled_by.id,
        )
        return True
    return False

process_resume

process_resume(
    reporter,
    message_ts,
    dry_run,
    task_log_id=None,
    task_log=None,
)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def process_resume(
    self,
    reporter: UserLifecycleSlackReporter,
    message_ts: str,
    dry_run: bool,
    task_log_id: UUID | None = None,
    task_log: UserLifecycleTaskLog | None = None,
) -> bool:
    logger = self._get_logger(dry_run)

    if current_auth_context.is_anonymous:
        return False

    resumed_by = current_auth_context.real_principal_as(Alaner)

    try:
        if self.should_resume(resumed_by, logger, reporter):
            logger.info(
                f"Resuming UserLifecycleTask {self.id} for {self.user.full_name}",
                resumed_by=resumed_by.id,
            )

            reporter.resume(
                self.label,
                channel=self.channel,
                task_id=self.id,
                message_ts=message_ts,
                additional_context=[f"Resumed by <@{resumed_by.slack_id}>"],
            )

            if not dry_run:
                task_log = (
                    task_log
                    if task_log
                    else current_session.get(UserLifecycleTaskLog, task_log_id)
                )
                task_log.resumed_at = datetime.utcnow()  # type: ignore[union-attr]
                task_log.resumed_by = resumed_by.id  # type: ignore[union-attr]
                task_log.permalink = get_permalink(  # type: ignore[union-attr]
                    mandatory(get_channel_id(self.channel)), message_ts
                )

                current_session.commit()

            additional_context = self.resume(logger, dry_run) or []
            additional_context.append(f"Resumed by <@{resumed_by.slack_id}>")
            reporter.stop(
                self.label,
                channel=self.channel,
                task_id=self.id,
                success=True,
                message_ts=message_ts,
                additional_context=additional_context,
            )
            if not dry_run:
                task_log = (
                    task_log
                    if task_log
                    else current_session.get(UserLifecycleTaskLog, task_log_id)
                )
                task_log.completed_at = datetime.utcnow()  # type: ignore[union-attr]
                task_log.permalink = get_permalink(  # type: ignore[union-attr]
                    mandatory(get_channel_id(self.channel)), message_ts
                )

                current_session.commit()

            logger.info(
                f"Completed UserLifecycleTask {self.id} for {self.user.full_name}",
                resumed_by=resumed_by.id,
            )
            return True

    except Exception:
        current_session.rollback()  # In case the session is invalid, e.g. after an IntegrityError
        reporter.stop(
            self.label,
            channel=self.channel,
            task_id=self.id,
            success=False,
            message_ts=message_ts,
            additional_context=[f"Resumed by <@{resumed_by.slack_id}>"],
            error_occurred=True,
        )
        if not dry_run:
            task_log = (
                task_log
                if task_log
                else current_session.get(UserLifecycleTaskLog, task_log_id)
            )
            task_log.failed_at = datetime.utcnow()  # type: ignore[union-attr]
            task_log.permalink = get_permalink(  # type: ignore[union-attr]
                mandatory(get_channel_id(self.channel)), message_ts
            )

            current_session.commit()

        logger.exception(
            f"Failed UserLifecycleTask {self.id} for {self.user.full_name}"
        )
    return False

process_run

process_run(reporter, dry_run)
Source code in apps/eu_tools/user_lifecycle/models/task.py
@tracer.wrap()
def process_run(self, reporter: UserLifecycleSlackReporter, dry_run: bool) -> None:
    logger = self._get_logger(dry_run)

    try:
        message_ts = None
        task_log = None
        thread_ts = None

        if not dry_run:
            task_log = UserLifecycleTaskLog(
                task_id=self.id,
                alaner_id=(
                    cast("Alaner", self.user).id
                    if self.user.type == AlanAccountType.alaner
                    else None
                ),
                external_user_id=(
                    self.user.id
                    if self.user.type == AlanAccountType.external
                    else None
                ),
                service_account_id=(
                    self.user.id
                    if self.user.type == AlanAccountType.service_account
                    else None
                ),
                started_at=datetime.utcnow(),
                correlation_id=g.correlation_id,
            )
            current_session.add(task_log)
            current_session.commit()

        should_run_start = time.perf_counter()
        should_run_result = self.should_run(logger)
        should_run_duration_ms = (time.perf_counter() - should_run_start) * 1000
        metrics.distribution(
            "user_lifecycle.task.should_run.duration",
            should_run_duration_ms,
            tags=[f"task_id:{self.id}", f"result:{should_run_result}"],
        )

        if should_run_result:
            logger.info(
                f"Starting UserLifecycleTask {self.id} for {self.user.name if self.user.type == AlanAccountType.service_account else self.user.full_name}"
            )
            if not self.quiet:
                message_ts, thread_ts = reporter.start(
                    self.label,
                    channel=self.channel,
                    task_id=self.id,
                )

            if not dry_run and message_ts:
                task_log.permalink = get_permalink(  # type: ignore[union-attr]
                    mandatory(get_channel_id(self.channel)),
                    message_ts,
                )
                if thread_ts:
                    task_log.thread_permalink = get_permalink(  # type: ignore[union-attr]
                        mandatory(get_channel_id(self.channel)),
                        thread_ts,
                    )

                current_session.commit()

            additional_context = self.run(logger, PauseTaskTrigger(), dry_run) or []

            if not self.quiet:
                message_ts = reporter.stop(
                    self.label,
                    channel=self.channel,
                    task_id=self.id,
                    success=True,
                    message_ts=message_ts,
                    additional_context=additional_context,
                )
            if not dry_run:
                task_log.completed_at = datetime.utcnow()  # type: ignore[union-attr]
                current_session.commit()

            logger.info(
                f"Completed UserLifecycleTask {self.id} for {self.user.name if self.user.type == AlanAccountType.service_account else self.user.full_name}"
            )
        else:
            logger.info(
                f"Skipping UserLifecycleTask {self.id} for {self.user.name if self.user.type == AlanAccountType.service_account else self.user.full_name}"
            )
            if not dry_run:
                task_log.skipped_at = datetime.utcnow()  # type: ignore[union-attr]
                if message_ts:
                    task_log.permalink = get_permalink(  # type: ignore[union-attr]
                        mandatory(get_channel_id(self.channel)),
                        message_ts,
                    )
                current_session.commit()

    except PauseTaskException as pte:
        message_ts = reporter.pause(
            self.label,
            dry_run=dry_run,
            resume_label=pte.resume_label,
            cancel_label=pte.cancel_label,
            channel=self.channel,
            task_id=self.id,
            task_log_id=task_log.id if task_log else None,
            user_id=self.user.id,
            message_ts=message_ts,
            pinged_groups=pte.pinged_groups,
            pinged_user=pte.pinged_user,
            additional_context=pte.additional_context,
            is_manual_action=self.requires_manual_action,
        )
        if not dry_run:
            task_log.paused_at = datetime.utcnow()  # type: ignore[union-attr]
            if message_ts:
                task_log.permalink = get_permalink(  # type: ignore[union-attr]
                    mandatory(get_channel_id(self.channel)),
                    message_ts,
                )

            current_session.commit()

        logger.info(
            f"Paused UserLifecycleTask {self.id} for {self.user.name if self.user.type == AlanAccountType.service_account else self.user.full_name}"
        )

    except Exception:
        current_session.rollback()  # In case the session is invalid, e.g. after an IntegrityError
        logger.exception(
            f"Failed UserLifecycleTask {self.id} for {self.user.name if self.user.type == AlanAccountType.service_account else self.user.full_name}"
        )
        message_ts = reporter.stop(
            self.label,
            channel=self.channel,
            task_id=self.id,
            success=False,
            message_ts=message_ts,
            error_occurred=True,
        )
        previous_fails = self._get_recent_fails()  # type: ignore[no-untyped-call]
        if previous_fails >= self.max_fails_before_deactivation:
            task_status = (
                current_session.execute(
                    select(UserLifecycleTaskStatus).filter(
                        UserLifecycleTaskStatus.task_id == self.id
                    )
                )
                .scalars()
                .unique()
                .one()
            )
            if not task_status.deactivated_at:
                task_status.deactivated_at = datetime.utcnow()
                task_status.deactivation_reason = f"Automatically deactivated, the task failed more than {self.max_fails_before_deactivation} times"
                reporter.notify_of_task_deactivation(
                    channel="#alan_home_alerts" if not dry_run else "#eng_sandbox",
                    task_id=self.id,
                    task_label=self.label,
                    fails_threshold=self.max_fails_before_deactivation,
                )
        if not dry_run:
            task_log.failed_at = datetime.utcnow()  # type: ignore[union-attr]
            if message_ts:
                task_log.permalink = get_permalink(  # type: ignore[union-attr]
                    mandatory(get_channel_id(self.channel)),
                    message_ts,
                )

            current_session.commit()

        else:
            current_session.rollback()

provider class-attribute instance-attribute

provider = None

quiet property

quiet

rank class-attribute instance-attribute

rank = 999

ranked_subclasses classmethod

ranked_subclasses()
Source code in apps/eu_tools/user_lifecycle/models/task.py
@classmethod
def ranked_subclasses(cls):  # type: ignore[no-untyped-def]
    task_activity = {
        task.task_id: task.is_active
        for task in current_session.scalars(select(UserLifecycleTaskStatus))
        .unique()
        .all()  # noqa: ALN085
    }
    return filter(
        # The default value is True to ease the development process
        lambda subclass: getattr(subclass, "id", None) is not None
        and task_activity.get(subclass.id),
        sorted(subclasses_recursive(cls), key=attrgetter("rank")),
    )

requires_manual_action class-attribute instance-attribute

requires_manual_action = False

resume

resume(logger, dry_run)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def resume(self, logger: BoundLogger, dry_run: bool) -> list[str] | None:  # noqa: ARG002
    return []

run abstractmethod

run(logger, pause_callback, dry_run)
Source code in apps/eu_tools/user_lifecycle/models/task.py
@abstractmethod
def run(
    self,
    logger: BoundLogger,
    pause_callback: PauseTaskTrigger,
    dry_run: bool,
) -> list[str] | None:
    pass

should_cancel

should_cancel(canceled_by, logger, reporter)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def should_cancel(
    self,
    canceled_by: Alaner,
    logger: BoundLogger,  # noqa: ARG002
    reporter: UserLifecycleSlackReporter,
) -> bool:
    if has_permission(
        canceled_by, EmployeePermission.eu_tools_user_lifecycle_resume_task
    ):
        return True
    else:
        reporter.handle_permission_issues(self.channel, canceled_by.slack_id)
        return False

should_resume

should_resume(resumed_by, logger, reporter)
Source code in apps/eu_tools/user_lifecycle/models/task.py
def should_resume(
    self,
    resumed_by: Alaner,
    logger: BoundLogger,
    reporter: UserLifecycleSlackReporter,
) -> bool:
    if has_permission(
        resumed_by, EmployeePermission.eu_tools_user_lifecycle_resume_task
    ):
        return self.should_run(logger)
    else:
        reporter.handle_permission_issues(self.channel, resumed_by.slack_id)
        return False

should_run abstractmethod

should_run(logger)
Source code in apps/eu_tools/user_lifecycle/models/task.py
@abstractmethod
def should_run(self, logger: BoundLogger) -> bool:
    return False

user instance-attribute

user = user

apps.eu_tools.user_lifecycle.models.check.AccountOffboardingProcess

Bases: AlanBaseEnum

automatic class-attribute instance-attribute

automatic = 'automatic'

manual class-attribute instance-attribute

manual = 'manual'

apps.eu_tools.user_lifecycle.models.check.AnomalousAccount dataclass

AnomalousAccount(
    id,
    label,
    created_on=None,
    created_by=None,
    url=None,
    pretty_label=None,
    additional_context=None,
)

Bases: DataClassJsonMixin

additional_context class-attribute instance-attribute

additional_context = None

created_by class-attribute instance-attribute

created_by = None

created_on class-attribute instance-attribute

created_on = None

id instance-attribute

id

label instance-attribute

label

pretty_label class-attribute instance-attribute

pretty_label = None

url class-attribute instance-attribute

url = None

apps.eu_tools.user_lifecycle.models.check.MANUAL_OFFBOARDING_GRACE_DAYS module-attribute

MANUAL_OFFBOARDING_GRACE_DAYS = 5

apps.eu_tools.user_lifecycle.models.check.UserLifecycleAnomalyCheck

UserLifecycleAnomalyCheck(dry_run=False)

Bases: ABC

Source code in apps/eu_tools/user_lifecycle/models/check.py
def __init__(self, dry_run: bool | None = False) -> None:
    self.dry_run = dry_run or False
    if self.id is None:
        raise RuntimeError(
            f"Missing UserLifecycleCheck unique identifier for {self.__class__.name}"
        )
    if self.label is None:
        raise RuntimeError(
            f"Missing UserLifecycleCheck label for {self.__class__.name}"
        )
    if self.channel is None:
        raise RuntimeError(
            f"Missing UserLifecycleCheck channel for {self.__class__.name}"
        )

action_label class-attribute instance-attribute

action_label = None

active class-attribute instance-attribute

active = True

anomaly_description abstractmethod

anomaly_description(plural=False)
Source code in apps/eu_tools/user_lifecycle/models/check.py
@abstractmethod
def anomaly_description(self, plural: bool = False) -> str:
    pass

can_cancel_or_resume

can_cancel_or_resume(alaner)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def can_cancel_or_resume(self, alaner: Alaner) -> bool:
    required_permissions = {EmployeePermission.eu_tools_user_lifecycle_resume_check}

    if self.resolution_permitted_for is not None:
        required_permissions.update(self.resolution_permitted_for)

    if not has_permission(alaner, required_permissions):
        self.handle_permission_issues(alaner.slack_id)
        return False
    return True

channel instance-attribute

channel

cta class-attribute instance-attribute

cta = None

dry_run class-attribute instance-attribute

dry_run = dry_run or False

emoji instance-attribute

emoji

find_and_track_anomalies

find_and_track_anomalies(logger)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def find_and_track_anomalies(self, logger: BoundLogger) -> list[AnomalousAccount]:
    unknown_accounts = self.find_anomalies(logger)
    self._track_anomalies(unknown_accounts, logger)
    unknown_accounts = self.remove_justified_anomalies(unknown_accounts, logger)
    return unknown_accounts

find_anomalies abstractmethod

find_anomalies(logger)
Source code in apps/eu_tools/user_lifecycle/models/check.py
@abstractmethod
def find_anomalies(self, logger: BoundLogger) -> list[AnomalousAccount]:
    pass

fix_anomaly abstractmethod

fix_anomaly(account, logger)
Source code in apps/eu_tools/user_lifecycle/models/check.py
@abstractmethod
def fix_anomaly(
    self,
    account: AnomalousAccount,
    logger: BoundLogger,
) -> list[str] | None:
    pass

fix_description instance-attribute

fix_description

handle_permission_issues

handle_permission_issues(slack_id)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def handle_permission_issues(self, slack_id: str | None) -> None:
    if slack_id:
        notify_user_of_permission_issues(
            slack_id=slack_id,
            channel_id=self.channel if not self.dry_run else "#eng_sandbox",
            thread_ts=self.thread_ts,
        )

id instance-attribute

id

label instance-attribute

label

main_message class-attribute instance-attribute

main_message = None

post_pre_context

post_pre_context()

Use this method if you need to share additional context as first message in the thread, before listing anomalous accounts

Source code in apps/eu_tools/user_lifecycle/models/check.py
def post_pre_context(self) -> None:
    """
    Use this method if you need to share additional context as first message in the thread, before listing anomalous accounts
    """
    return

process_cancel

process_cancel(
    account, message_ts, check_log_id, thread_ts=None
)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def process_cancel(
    self,
    account: AnomalousAccount,
    message_ts: str,
    check_log_id: UUID | None,
    thread_ts: str | None = None,
) -> None:
    self.thread_ts = thread_ts
    if current_auth_context.is_anonymous:
        return
    canceled_by = current_auth_context.real_principal_as(Alaner)

    if self.can_cancel_or_resume(canceled_by):
        logger = self._get_logger()  # type: ignore[no-untyped-call]
        logger.info(
            f"Canceling check for account {account.id} by {canceled_by.slack_handle}",
            account_id=account.id,
        )
        text = f"<{account.url}|{account.label}>" if account.url else account.label
        context = self._get_context_for(account)

        self._post_or_update_message(
            text,
            ":notdone:",
            message_ts=message_ts,
            account_id=account.id,
            blocks=[
                ContextBlock(elements=context),
                ContextBlock(
                    elements=[
                        MarkdownTextObject(
                            text=f"Canceled by <@{canceled_by.slack_id}>"
                        )
                    ]
                ),
            ],
        )
        if not self.dry_run:
            check_log = current_session.get(
                UserLifecycleProvisioningCheckLog, check_log_id
            )
            check_log.canceled_at = datetime.utcnow()  # type: ignore[union-attr]
            check_log.canceled_by = canceled_by.id  # type: ignore[union-attr]
            current_session.commit()

process_resume

process_resume(
    account, message_ts, check_log_id, thread_ts=None
)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def process_resume(
    self,
    account: AnomalousAccount,
    message_ts: str,
    check_log_id: UUID | None,
    thread_ts: str | None = None,
) -> None:
    self.thread_ts = thread_ts
    if current_auth_context.is_anonymous:
        return
    resumed_by = current_auth_context.real_principal_as(Alaner)

    if self.can_cancel_or_resume(resumed_by):
        logger = self._get_logger()  # type: ignore[no-untyped-call]
        logger.info(
            f"About to {self.fix_description} account {account.id} by {resumed_by.slack_handle}",
            account_id=account.id,
        )
        text = f"<{account.url}|{account.label}>" if account.url else account.label
        context = self._get_context_for(account)
        blocks = [
            ContextBlock(elements=context),
            ContextBlock(
                elements=[
                    MarkdownTextObject(text=f"Resumed by <@{resumed_by.slack_id}>")
                ]
            ),
        ]

        self._post_or_update_message(
            text,
            ":loading:",
            message_ts=message_ts,
            account_id=account.id,
            blocks=blocks,
        )
        if not self.dry_run:
            check_log = current_session.get(
                UserLifecycleProvisioningCheckLog, check_log_id
            )
            check_log.resumed_at = datetime.utcnow()  # type: ignore[union-attr]
            check_log.resumed_by = resumed_by.id  # type: ignore[union-attr]
            current_session.commit()

        try:
            additional_context = self.fix_anomaly(account, logger)
            self._mark_anomaly_as_resolved(account, logger)
            if additional_context:
                blocks.extend(
                    [
                        ContextBlock(
                            elements=[
                                MarkdownTextObject(
                                    text=context,
                                )
                            ]
                        )
                        for context in additional_context
                    ]
                )

            self._post_or_update_message(
                text,
                ":check:",
                message_ts=message_ts,
                account_id=account.id,
                blocks=blocks,
            )
            if not self.dry_run:
                check_log.completed_at = datetime.utcnow()  # type: ignore[union-attr]
                current_session.commit()
        except Exception:
            self._post_or_update_message(
                text,
                ":notdone:",
                message_ts=message_ts,
                account_id=account.id,
                blocks=blocks,
            )
            logger.exception(
                f"Failed to {self.fix_description} account {account.id} for provider {self.label}",
                account_id=account.id,
            )
            if not self.dry_run:
                check_log.failed_at = datetime.utcnow()  # type: ignore[union-attr]
                current_session.commit()

process_run

process_run()
Source code in apps/eu_tools/user_lifecycle/models/check.py
def process_run(self) -> None:
    logger = self._get_logger()  # type: ignore[no-untyped-call]

    if anomalies := self.find_and_track_anomalies(logger):
        self.post_pre_context()

    alan_home_base_url = (
        "https://home.alan.com" if not self.dry_run else "http://localhost:8885"
    )

    for account in anomalies:
        logger.info(
            f"Found {self.anomaly_description()} {account.pretty_label or account.label} for provider {self.label}",
            account_id=account.id,
        )
        text = f"<{account.url}|{account.label}>" if account.url else account.label
        context = self._get_context_for(account)

        check_log = None
        if not self.dry_run:
            check_log = UserLifecycleProvisioningCheckLog(
                check_id=self.id,
                account_id=str(account.id),
                started_at=datetime.utcnow(),
            )
            current_session.add(check_log)
            current_session.commit()

        payload = dumps(
            {
                "check_id": self.id,
                "account": account.to_dict(),
                "check_log_id": str(check_log.id) if check_log else None,
                "dry_run": self.dry_run,
            }
        )

        action_blocks = (
            [
                ActionsBlock(
                    elements=[
                        ButtonElement(
                            text="Ignore",
                            action_id="user_lifecycle_check_cancel",
                            value=payload,
                        ),
                        ButtonElement(
                            text=self.action_label,
                            action_id="user_lifecycle_check_resume",
                            value=payload,
                            style="danger",
                        ),
                        LinkButtonElement(
                            text="Justify",
                            url=f"{alan_home_base_url}/anomalies?account={account.id}&provider={self.id}&mail={account.pretty_label or account.label}",
                            action_id="user_lifecycle_check_justify",
                        ),
                    ]
                ),
            ]
            if self.action_label
            else []
        )
        message_ts = self._post_or_update_message(
            text,
            ":paused:",
            account_id=account.id,
            blocks=[
                ContextBlock(elements=context),
                *action_blocks,
            ],
        )
        if not self.dry_run:
            if check_log:
                check_log.permalink = get_permalink(
                    mandatory(get_channel_id(self.channel)), message_ts
                )
            current_session.commit()

provider class-attribute instance-attribute

provider = None

rank class-attribute instance-attribute

rank = 999

ranked_subclasses classmethod

ranked_subclasses()
Source code in apps/eu_tools/user_lifecycle/models/check.py
@classmethod
def ranked_subclasses(cls):  # type: ignore[no-untyped-def]
    return filter(
        lambda subclass: subclass.active and getattr(subclass, "id", None),
        sorted(subclasses_recursive(cls), key=attrgetter("rank")),
    )

remove_justified_anomalies

remove_justified_anomalies(unknown_accounts, logger)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def remove_justified_anomalies(
    self,
    unknown_accounts: list[AnomalousAccount],
    logger: BoundLogger,  # noqa: ARG002
) -> list[AnomalousAccount]:
    provider_name = self.id

    justified_anomalies = self._get_anomalies_for_provider(provider_name, True)
    justified_anomalies_keys = {
        (anomaly.provider, anomaly.account_id) for anomaly in justified_anomalies
    }

    return [
        account
        for account in unknown_accounts
        if self._get_anomaly_key(account) not in justified_anomalies_keys
    ]

resolution_permitted_for class-attribute instance-attribute

resolution_permitted_for = None

thread_ts class-attribute instance-attribute

thread_ts = None

apps.eu_tools.user_lifecycle.models.check.UserLifecycleProvisioningCheck

UserLifecycleProvisioningCheck(dry_run=False)

Bases: UserLifecycleAnomalyCheck, ABC

Source code in apps/eu_tools/user_lifecycle/models/check.py
def __init__(self, dry_run: bool | None = False) -> None:
    self.dry_run = dry_run or False
    if self.id is None:
        raise RuntimeError(
            f"Missing UserLifecycleCheck unique identifier for {self.__class__.name}"
        )
    if self.label is None:
        raise RuntimeError(
            f"Missing UserLifecycleCheck label for {self.__class__.name}"
        )
    if self.channel is None:
        raise RuntimeError(
            f"Missing UserLifecycleCheck channel for {self.__class__.name}"
        )

anomaly_description

anomaly_description(plural=False)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def anomaly_description(self, plural: bool = False) -> str:
    return "unknown accounts" if plural else "unknown account"

fix_description class-attribute instance-attribute

fix_description = 'deprovision'

apps.eu_tools.user_lifecycle.models.check.active_alan_users

active_alan_users(
    with_externals,
    with_future=False,
    offboarding=AccountOffboardingProcess.automatic,
)
Source code in apps/eu_tools/user_lifecycle/models/check.py
def active_alan_users(
    with_externals: bool,
    with_future: bool = False,
    offboarding: AccountOffboardingProcess = AccountOffboardingProcess.automatic,
) -> set[str]:
    grace_days = (
        MANUAL_OFFBOARDING_GRACE_DAYS
        if offboarding == AccountOffboardingProcess.manual
        else 0
    )

    def filter_for_today(user_class):  # type: ignore[no-untyped-def]
        if with_future:
            filter_condition = not_(user_class.is_ended_on(utctoday(), grace_days))
        else:
            filter_condition = user_class.is_active_on(utctoday(), grace_days)

        return filter_condition

    all_internals = {
        u.email
        for u in current_session.query(Alaner).filter(filter_for_today(Alaner))  # type: ignore[no-untyped-call] # noqa: ALN085
    }
    if with_externals:
        all_externals = {
            u.email
            for u in current_session.query(ExternalUser).filter(  # noqa: ALN085
                filter_for_today(ExternalUser)  # type: ignore[no-untyped-call]
            )
        }
    else:
        all_externals = set()

    return all_internals | all_externals

apps.eu_tools.user_lifecycle.models.provider.UserLifecycleProvider

Bases: ABC

deprovision_user abstractmethod classmethod

deprovision_user(user_id, logger, dry_run)
Source code in apps/eu_tools/user_lifecycle/models/provider.py
@classmethod
@abstractmethod
def deprovision_user(cls, user_id: str, logger: BoundLogger, dry_run: bool):  # type: ignore[no-untyped-def]
    pass

description instance-attribute

description

get_all_users abstractmethod classmethod

get_all_users()

When a provider supports the concept of "inactive" accounts, these accounts should be included in the results returned by this method. It's the consumer's responsibility to filter the accounts based on their active status.

Source code in apps/eu_tools/user_lifecycle/models/provider.py
@classmethod
@abstractmethod
def get_all_users(cls) -> dict:  # type: ignore[type-arg]
    """
    When a provider supports the concept of "inactive" accounts,
    these accounts should be included in the results returned by this method.
    It's the consumer's responsibility to filter the accounts based on their active status.
    """

get_target_roles classmethod

get_target_roles(user)
Source code in apps/eu_tools/user_lifecycle/models/provider.py
@classmethod
def get_target_roles(cls, user: AlanUser) -> set[str]:
    from apps.eu_tools.user_lifecycle.business_logic import has_role  # noqa: ALN009

    target_roles = set()
    for role, provider_roles in cls.role_mapping.items():
        if has_role(user, role):
            target_roles.update(provider_roles)
    return target_roles

id instance-attribute

id

provision_user abstractmethod classmethod

provision_user(user, logger, dry_run)
Source code in apps/eu_tools/user_lifecycle/models/provider.py
@classmethod
@abstractmethod
def provision_user(
    cls, user: AlanUser, logger: BoundLogger, dry_run: bool
) -> list[str] | None:
    pass

role_mapping instance-attribute

role_mapping

update_roles abstractmethod classmethod

update_roles(user, logger, dry_run)
Source code in apps/eu_tools/user_lifecycle/models/provider.py
@classmethod
@abstractmethod
def update_roles(
    cls,
    user: AlanUser,
    logger: BoundLogger,
    dry_run: bool,
) -> tuple[set[str], set[str]]:
    pass

apps.eu_tools.user_lifecycle.models.role_definition.BaseRoleDefinition

Bases: ABC

__generated_by__ class-attribute instance-attribute

__generated_by__ = None

__init_subclass__

__init_subclass__(**kwargs)
Source code in apps/eu_tools/user_lifecycle/models/role_definition.py
def __init_subclass__(cls, **kwargs: Any) -> None:
    super().__init_subclass__(**kwargs)

    # Skip validation for abstract classes
    if inspect.isabstract(cls):
        return

    # Validate grant_management_access_policy params if defined on this class
    if "grant_management_access_policy" in cls.__dict__:
        policy_or_policies = cls.grant_management_access_policy
        if policy_or_policies is not None:
            policies = (
                policy_or_policies
                if isinstance(policy_or_policies, list)
                else [policy_or_policies]
            )
            available_params = _ALWAYS_AVAILABLE_PARAMS | set(
                cls.__metadata__.keys()
            )
            for policy in policies:
                _validate_grant_management_policy(
                    policy, available_params, cls.__name__
                )

__metadata__ class-attribute instance-attribute

__metadata__ = dict()

can_be_requested class-attribute instance-attribute

can_be_requested = True

current_grantees classmethod

current_grantees()
Source code in apps/eu_tools/user_lifecycle/models/role_definition.py
@classmethod
def current_grantees(cls) -> set[AlanUser]:
    return cls.grantees_at(utctoday())

description instance-attribute

description

grant_management_access_policy class-attribute instance-attribute

grant_management_access_policy = None

grant_management_permitted_for class-attribute instance-attribute

grant_management_permitted_for = set()

grantees_at classmethod

grantees_at(date_or_datetime)
Source code in apps/eu_tools/user_lifecycle/models/role_definition.py
@classmethod
def grantees_at(
    cls, date_or_datetime: datetime.date | datetime.datetime
) -> set[AlanUser]:
    return set(
        role_grant.grantee
        for role_grant in current_session.scalars(
            select(RoleGrant).where(
                RoleGrant.role_id == cls.role_id,
                RoleGrant.is_active_at(date_or_datetime),
            )
        )
        if role_grant.grantee is not None
        and not role_grant.grantee.is_ended_on(
            date_or_datetime.date()
            if isinstance(date_or_datetime, datetime.datetime)
            else date_or_datetime
        )
    )

role_grant_rule_ids abstractmethod classmethod

role_grant_rule_ids()
Source code in apps/eu_tools/user_lifecycle/models/role_definition.py
@classmethod
@abstractmethod
def role_grant_rule_ids(cls) -> list[str]:
    pass

role_id instance-attribute

role_id

self_service_duration class-attribute instance-attribute

self_service_duration = 0

self_service_permitted_for class-attribute instance-attribute

self_service_permitted_for = set()

slack_handle_to_ping class-attribute instance-attribute

slack_handle_to_ping = None

apps.eu_tools.user_lifecycle.models.role_definition.RoleDefinition

Bases: BaseRoleDefinition, ABC

apps.eu_tools.user_lifecycle.models.role_definition.RoleDefinitionGenerator

Bases: ABC

role_definitions abstractmethod classmethod

role_definitions()
Source code in apps/eu_tools/user_lifecycle/models/role_definition.py
@classmethod
@abstractmethod
def role_definitions(cls) -> list[type[BaseRoleDefinition]]:
    pass

apps.eu_tools.user_lifecycle.models.role_definition.get_all_role_definitions

get_all_role_definitions()
Source code in apps/eu_tools/user_lifecycle/models/role_definition.py
@cached_for(minutes=10, local_ram_cache_only=True, no_serialization=True)
def get_all_role_definitions() -> dict[str, type[BaseRoleDefinition]]:
    role_definitions: dict[str, type[BaseRoleDefinition]] = {
        role_definition.role_id: role_definition  # type:ignore[type-abstract]
        for role_definition in RoleDefinition.__subclasses__()
    }

    role_definitions.update(
        {
            role_definition.role_id: role_definition
            for role_definition_generator in RoleDefinitionGenerator.__subclasses__()
            for role_definition in role_definition_generator.role_definitions()
        }
    )

    return role_definitions

apps.eu_tools.user_lifecycle.models.role_grant.RoleGrant

Bases: BaseModel

__repr__

__repr__()
Source code in apps/eu_tools/user_lifecycle/models/role_grant.py
def __repr__(self) -> str:
    return f"<{self.__class__.__name__} [{self.id}]: {self.role_id}>"

__table_args__ class-attribute instance-attribute

__table_args__ = (
    CheckConstraint(
        "(grantee_alaner_id IS NOT NULL)::int + (grantee_external_user_id IS NOT NULL)::int + (grantee_service_account_id IS NOT NULL)::int = 1",
        name="role_grant_grantee_is_not_null",
    ),
    CheckConstraint(
        "(role_request_id IS NULL) != (rule_id IS NULL)",
        name="role_grant_rule_or_request_is_not_null",
    ),
    UniqueConstraint(
        "grantee_alaner_id",
        "role_id",
        "role_request_id",
        name="single_request_for_alaners",
    ),
    UniqueConstraint(
        "grantee_external_user_id",
        "role_id",
        "role_request_id",
        name="single_request_for_external_users",
    ),
    UniqueConstraint(
        "grantee_service_account_id",
        "role_id",
        "role_request_id",
        name="single_request_for_service_accounts",
    ),
    ExcludeConstraint(
        (grantee_alaner_id, "="),
        (role_id, "="),
        (rule_id, "="),
        (text("tsrange(starts_at, ends_at, '[)')"), "&&"),
        name="no_overlapping_grants_for_alaners",
    ),
    ExcludeConstraint(
        (grantee_external_user_id, "="),
        (role_id, "="),
        (rule_id, "="),
        (text("tsrange(starts_at, ends_at, '[)')"), "&&"),
        name="no_overlapping_grants_for_external_users",
    ),
)

__tablename__ class-attribute instance-attribute

__tablename__ = 'role_grant'

ends_at class-attribute instance-attribute

ends_at = mapped_column(DateTime)

grantee property

grantee

grantee_alaner_id class-attribute instance-attribute

grantee_alaner_id = mapped_column(Integer, index=True)

grantee_external_user_id class-attribute instance-attribute

grantee_external_user_id = mapped_column(
    UUID(as_uuid=True), index=True
)

grantee_service_account_id class-attribute instance-attribute

grantee_service_account_id = mapped_column(
    UUID(as_uuid=True), index=True
)

is_active

is_active()
Source code in apps/eu_tools/user_lifecycle/models/role_grant.py
@is_active.expression  # type: ignore[no-redef]
def is_active(cls):
    return cls.is_active_at(func.now())

is_active_at

is_active_at(date)
Source code in apps/eu_tools/user_lifecycle/models/role_grant.py
@is_active_at.expression  # type: ignore[no-redef]
def is_active_at(cls, date):
    return (cls.starts_at <= date) & ~((cls.ends_at != None) & (cls.ends_at < date))

role_id class-attribute instance-attribute

role_id = mapped_column(Text, nullable=False, index=True)

role_request class-attribute instance-attribute

role_request = relationship(
    "RoleRequest",
    foreign_keys=role_request_id,
    primaryjoin="RoleRequest.id == RoleGrant.role_request_id",
    back_populates="role_grant",
    uselist=False,
    remote_side="RoleRequest.id",
)

role_request_id class-attribute instance-attribute

role_request_id = mapped_column(
    UUID(as_uuid=True), index=True
)

rule_id class-attribute instance-attribute

rule_id = mapped_column(Text)

starts_at class-attribute instance-attribute

starts_at = mapped_column(DateTime, nullable=False)

apps.eu_tools.user_lifecycle.models.role_request.RoleRequest

Bases: BaseModel

__repr__

__repr__()
Source code in apps/eu_tools/user_lifecycle/models/role_request.py
def __repr__(self) -> str:
    return f"<{self.__class__.__name__} [{self.id}]: {self.role_id}>"

__table_args__ class-attribute instance-attribute

__table_args__ = (
    CheckConstraint(
        "(grantee_alaner_id IS NOT NULL)::int + (grantee_external_user_id IS NOT NULL)::int + (grantee_service_account_id IS NOT NULL)::int = 1",
        name="role_request_grantee_is_not_null",
    ),
)

__tablename__ class-attribute instance-attribute

__tablename__ = 'role_request'

approved_at class-attribute instance-attribute

approved_at = mapped_column(DateTime)

approver class-attribute instance-attribute

approver = relationship(
    "Alaner",
    foreign_keys=approver_id,
    primaryjoin="Alaner.id == RoleRequest.approver_id",
    uselist=False,
    remote_side="Alaner.id",
)

approver_id class-attribute instance-attribute

approver_id = mapped_column(Integer)

ends_at class-attribute instance-attribute

ends_at = mapped_column(DateTime, nullable=False)

grantee property

grantee

grantee_alaner_id class-attribute instance-attribute

grantee_alaner_id = mapped_column(Integer)

grantee_external_user_id class-attribute instance-attribute

grantee_external_user_id = mapped_column(UUID(as_uuid=True))

grantee_service_account_id class-attribute instance-attribute

grantee_service_account_id = mapped_column(
    UUID(as_uuid=True)
)
permalink = mapped_column(Text)

request_reason class-attribute instance-attribute

request_reason = mapped_column(Text, nullable=False)

requested_at class-attribute instance-attribute

requested_at = mapped_column(DateTime, nullable=False)

requester class-attribute instance-attribute

requester = relationship(
    "Alaner",
    foreign_keys=requester_id,
    primaryjoin="Alaner.id == RoleRequest.requester_id",
    uselist=False,
    remote_side="Alaner.id",
)

requester_id class-attribute instance-attribute

requester_id = mapped_column(Integer, nullable=False)

rescinded_at class-attribute instance-attribute

rescinded_at = mapped_column(DateTime)

rescinder class-attribute instance-attribute

rescinder = relationship(
    "Alaner",
    foreign_keys=rescinder_id,
    primaryjoin="Alaner.id == RoleRequest.rescinder_id",
    uselist=False,
    remote_side="Alaner.id",
)

rescinder_id class-attribute instance-attribute

rescinder_id = mapped_column(Integer)

rescission_reason class-attribute instance-attribute

rescission_reason = mapped_column(Text)

role_grant class-attribute instance-attribute

role_grant = relationship(
    RoleGrant,
    primaryjoin="RoleRequest.id == foreign(RoleGrant.role_request_id)",
    back_populates="role_request",
    uselist=False,
    remote_side="RoleGrant.role_request_id",
)

role_id class-attribute instance-attribute

role_id = mapped_column(Text, nullable=False)

starts_at class-attribute instance-attribute

starts_at = mapped_column(DateTime, nullable=False)

apps.eu_tools.user_lifecycle.models.role_grant_rule_definition.BaseRoleGrantRuleDefinition

Bases: ABC

description instance-attribute

description

grantees_definition abstractmethod classmethod

grantees_definition()
Source code in apps/eu_tools/user_lifecycle/models/role_grant_rule_definition.py
@classmethod
@abstractmethod
def grantees_definition(cls) -> set[AlanUser]:
    pass

non_empty_check class-attribute instance-attribute

non_empty_check = True

rule_id instance-attribute

rule_id

apps.eu_tools.user_lifecycle.models.role_grant_rule_definition.RoleGrantRuleDefinition

apps.eu_tools.user_lifecycle.models.role_grant_rule_definition.RoleGrantRuleGenerator

Bases: ABC

role_grant_rules abstractmethod classmethod

role_grant_rules()
Source code in apps/eu_tools/user_lifecycle/models/role_grant_rule_definition.py
@classmethod
@abstractmethod
def role_grant_rules(cls) -> dict[str, type[BaseRoleGrantRuleDefinition]]:
    pass

rule_ids_for classmethod

rule_ids_for(*names)
Source code in apps/eu_tools/user_lifecycle/models/role_grant_rule_definition.py
@classmethod
def rule_ids_for(cls, *names: str) -> list[str]:
    return [cls._rule_id_for(name) for name in names]

apps.eu_tools.user_lifecycle.business_logic.add_role_grant_for_pending_request

add_role_grant_for_pending_request(pending_request)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def add_role_grant_for_pending_request(pending_request: RoleRequest) -> None:
    if pending_request.approved_at is None:
        return

    current_logger.info(
        f"Adding role grant for role {pending_request.role_id} and request {pending_request.id} to {pending_request.grantee.email}"
    )
    current_session.add(
        RoleGrant(
            grantee_alaner_id=pending_request.grantee_alaner_id,
            grantee_external_user_id=pending_request.grantee_external_user_id,
            grantee_service_account_id=pending_request.grantee_service_account_id,
            role_id=pending_request.role_id,
            role_request_id=pending_request.id,
            starts_at=pending_request.starts_at,
            ends_at=pending_request.ends_at,
        )
    )

apps.eu_tools.user_lifecycle.business_logic.build_remaining_roles_message

build_remaining_roles_message(grant, active_role_grants)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def build_remaining_roles_message(
    grant: RoleGrant, active_role_grants: list[RoleGrant]
) -> str:
    expired_provider = grant.role_id.split(":")[0]
    remaining_roles = [
        role_grant.role_id
        for role_grant in active_role_grants
        if (
            role_grant.role_id.split(":")[0] == expired_provider
            and role_grant.role_id != grant.role_id
        )
    ]
    if not remaining_roles:
        return ""
    remaining_roles_str = ", ".join(f"`{role}`" for role in remaining_roles)
    return (
        "\nExternal user still has"
        if grant.grantee_external_user_id
        else "\nYou still have"
    ) + f" the following `{expired_provider}` roles: {remaining_roles_str}"

apps.eu_tools.user_lifecycle.business_logic.can_manage_grant

can_manage_grant(role_request, user=None)

Check if a user can manage grants for this role request.

When user is provided, the auth context is temporarily overridden so that access policies evaluate against that user instead of the current principal. When user is None, policies use the current auth context as-is.

Source code in apps/eu_tools/user_lifecycle/business_logic.py
def can_manage_grant(role_request: RoleRequest, user: AlanUser | None = None) -> bool:
    """Check if a user can manage grants for this role request.

    When ``user`` is provided, the auth context is temporarily overridden so
    that access policies evaluate against that user instead of the current
    principal. When ``user`` is ``None``, policies use the current auth context
    as-is.
    """
    role_definition = get_all_role_definitions().get(role_request.role_id)
    if role_definition is None:
        raise ValueError(f"Role {role_request.role_id} does not exist")

    if role_definition.grant_management_access_policy is not None:
        access_policies = (
            role_definition.grant_management_access_policy
            if isinstance(role_definition.grant_management_access_policy, list)
            else [role_definition.grant_management_access_policy]
        )
        # Build kwargs with all available context
        kwargs = {
            "role_request_id": role_request.id,
            "role_definition": role_definition,
            **role_definition.__metadata__,
        }

        # When evaluating for an explicit user, temporarily override the auth
        # context so policies see that user as the principal (e.g. filtering
        # eligible approvers in a loop).
        if user is not None:
            previous_context = _get_current_auth_context()
            set_auth_context(user)
        try:
            return all(
                _evaluate_policy_with_args(
                    access_policy,
                    _get_policy_parameters(access_policy),
                    **kwargs,
                )
                for access_policy in access_policies
            )
        finally:
            if user is not None:
                _do_set_auth_context(previous_context)

    effective_user = user or _get_current_auth_context().real_principal
    return has_permission(
        effective_user, set(role_definition.grant_management_permitted_for)
    )

apps.eu_tools.user_lifecycle.business_logic.cancel_stale_requests

cancel_stale_requests(dry_run)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def cancel_stale_requests(dry_run: bool) -> None:
    for role_request in current_session.query(RoleRequest).filter(  # noqa: ALN085
        RoleRequest.approved_at.is_(None),
        RoleRequest.rescinded_at.is_(None),
        RoleRequest.ends_at <= datetime.combine(utctoday(), datetime.min.time()),
    ):
        current_logger.info(
            f"Rescinding expired request {role_request.id} for role {role_request.role_id}"
        )
        role_request.rescinded_at = datetime.utcnow()
        role_request.rescission_reason = "Request has expired"

    if dry_run:
        current_session.rollback()
    else:
        current_session.commit()

apps.eu_tools.user_lifecycle.business_logic.extension_exists

extension_exists(grant)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def extension_exists(grant: RoleGrant) -> bool:
    end_date = cast("datetime", grant.ends_at).date()

    if end_date.weekday() == 5:
        check_dates = [
            end_date,
            end_date + timedelta(days=1),
            end_date + timedelta(days=2),
        ]
    elif end_date.weekday() == 6:
        check_dates = [end_date, end_date + timedelta(days=1)]
    else:
        check_dates = [end_date]

    return cast(
        "bool",
        grant.grantee.role_grants.filter(
            RoleGrant.role_id == grant.role_id,
            or_(*[func.date(RoleGrant.starts_at) == date for date in check_dates]),
        ).count()
        > 0,
    )

apps.eu_tools.user_lifecycle.business_logic.get_approval_process_message

get_approval_process_message(role_request)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def get_approval_process_message(
    role_request: RoleRequest,
) -> tuple[str, list[Block]]:
    role_requested = (
        current_session.execute(
            select(AlanHomeRole).filter(AlanHomeRole.role_id == role_request.role_id)
        )
        .scalars()
        .unique()
        .one_or_none()
    )
    if role_requested is None:
        raise ValueError("Invalid role_id")

    slack_handles_to_ping = []
    additionnal_context_blocks = []

    user: Alaner | ExternalUser | ServiceAccount | None = None
    community = None

    # Get the user and determine the community
    if role_request.grantee_service_account_id is not None:
        # Service account - get user and community from ownership
        user = current_session.get(
            ServiceAccount, role_request.grantee_service_account_id
        )
        if user is not None:
            community = get_communities_owning_service_account(str(user.id))
    else:
        # Alaner or External User
        if role_request.grantee_alaner_id is not None:
            user = current_session.get(Alaner, role_request.grantee_alaner_id)
        elif role_request.grantee_external_user_id is not None:
            user = current_session.get(
                ExternalUser, role_request.grantee_external_user_id
            )

        if user is not None:
            # Only Alaner and ExternalUser have community attribute
            if hasattr(user, "community"):
                community = user.community
            else:
                community = None

    # Get security referents from the community
    if role_requested.slack_handle_to_ping is not None:
        slack_handles_to_ping = [role_requested.slack_handle_to_ping]

    elif user is not None and community is not None:
        security_referents = [
            referent
            for referent in community.security_referents
            if can_manage_grant(role_request, user=referent)
        ]
        slack_handles_to_ping += [
            f"<@{referent.slack_id}>"
            for referent in security_referents
            if not referent.is_ooo
        ]
    else:
        if role_request.grantee_service_account_id is not None:
            raise ValueError("Service account not found or not owned by a community")
        else:
            raise ValueError("User not found or not in a community")

    if len(slack_handles_to_ping) == 0:
        slack_handles_to_ping.append(mention_usergroup("security_oncall"))
    else:
        additionnal_context_blocks += [
            ContextBlock(
                elements=[
                    MarkdownTextObject(
                        text="💡 You can escalate to `security_oncall` if your security community referent is not available."
                    ),
                ]
            )
        ]

    if not is_production_mode():
        slack_handles_to_ping = ["`security_oncall`"]

    text = (
        f"{' '.join(slack_handles_to_ping)}: "
        f"<@{role_request.requester.slack_id if role_request.requester else role_request.grantee}> "
        f"is requesting role `{role_request.role_id}`"
    )

    if (
        role_request.grantee.type == AlanAccountType.external
        or role_request.grantee.type == AlanAccountType.service_account
    ):
        text += f" for {role_request.grantee.email}:"
    elif role_request.grantee != role_request.requester:
        text += f" for <@{role_request.grantee.slack_id}>:"
    else:
        text += ":"
    return text, [
        SectionBlock(
            text=text,
            accessory=LinkButtonElement(
                text=":house: See in Alan Home",
                url=f"https://home.alan.com/profile/{role_request.grantee.id}/roles",
                action_id="go_to_alan_home_role_request",
            ),
        ),
        SectionBlock(
            text="\n".join(
                f"> {reason_line}"
                for reason_line in role_request.request_reason.split("\n")
            )
        ),
        ContextBlock(
            elements=[
                MarkdownTextObject(
                    text=f"From {role_request.starts_at.date().isoformat()} to {role_request.ends_at.date().isoformat()} ({naturaldelta(role_request.ends_at - role_request.starts_at)})"
                ),
            ]
        ),
        *additionnal_context_blocks,
    ]

apps.eu_tools.user_lifecycle.business_logic.get_error_lack_of_permissions_message

get_error_lack_of_permissions_message(required_permissions)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def get_error_lack_of_permissions_message(
    required_permissions: set[EmployeePermission],
) -> str:
    missing_permission_message = (
        f"missing one of these permissions: {', '.join(required_permissions)}"
        if required_permissions
        else "no configured permissions"
    )
    return (
        f"You're not allowed to rescind this role request: {missing_permission_message}"
    )

apps.eu_tools.user_lifecycle.business_logic.get_role_documentation_url

get_role_documentation_url(role_id)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def get_role_documentation_url(role_id: str) -> Optional[str]:
    role_parts = role_id.split(":")
    role_primary_scope = role_parts[0]
    role_name = role_parts[-1]
    if role_primary_scope == "alan":
        return f"https://api.alan.com/admin_tools/permissions_and_roles#role-{role_name.replace('-', '_')}"
    return None

apps.eu_tools.user_lifecycle.business_logic.has_any_role

has_any_role(user, roles)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def has_any_role(
    user: AlanAccount, roles: Iterable[type[BaseRoleDefinition] | str]
) -> bool:
    roles_by_id = {role_grant.role_id for role_grant in user.active_role_grants}
    return any(
        (role.role_id if hasattr(role, "role_id") else role) in roles_by_id
        for role in roles
    )

apps.eu_tools.user_lifecycle.business_logic.has_any_role_at

has_any_role_at(user, roles, date)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def has_any_role_at(
    user: AlanAccount, roles: Iterable[type[BaseRoleDefinition] | str], date: datetime
) -> bool:
    return (  # type: ignore[no-any-return]
        user.role_grants.filter(  # type: ignore[attr-defined]
            RoleGrant.role_id.in_(
                [role.role_id if hasattr(role, "role_id") else role for role in roles]
            ),
            RoleGrant.is_active_at(date),
        ).count()
        > 0
    )

apps.eu_tools.user_lifecycle.business_logic.has_overlapping_end_date

has_overlapping_end_date(
    alaner_id,
    role_id,
    current_request_id,
    starts_at,
    new_ends_at,
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def has_overlapping_end_date(
    alaner_id: int,
    role_id: str,
    current_request_id: UUID,
    starts_at: datetime,
    new_ends_at: datetime,
) -> bool:
    return (
        current_session.scalar(
            select(RoleRequest).where(
                RoleRequest.grantee_alaner_id == alaner_id,
                RoleRequest.role_id == role_id,
                RoleRequest.id != current_request_id,
                RoleRequest.rescinded_at.is_(None),
                RoleRequest.starts_at >= starts_at,
                RoleRequest.starts_at <= new_ends_at,
            )
        )
        is not None
    )

apps.eu_tools.user_lifecycle.business_logic.has_overlapping_role_grant

has_overlapping_role_grant(
    alaner_id, role_id, starts_at, ends_at
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def has_overlapping_role_grant(
    alaner_id: int, role_id: str, starts_at: datetime | None, ends_at: datetime | None
) -> bool:
    return (
        current_session.scalar(
            select(RoleGrant).where(
                RoleGrant.grantee_alaner_id == alaner_id,
                RoleGrant.role_id == role_id,
                RoleGrant.starts_at != RoleGrant.ends_at,
                or_(
                    and_(
                        func.date(RoleGrant.starts_at) <= func.date(starts_at),
                        func.date(starts_at) < func.date(RoleGrant.ends_at),
                    ),
                    and_(
                        func.date(RoleGrant.starts_at) < func.date(ends_at),
                        func.date(RoleGrant.ends_at) >= func.date(ends_at),
                    ),
                    and_(
                        func.date(starts_at) <= func.date(RoleGrant.starts_at),
                        func.date(ends_at) >= func.date(RoleGrant.ends_at),
                    ),
                    # Permanent grant that started before the new grant ends
                    and_(
                        RoleGrant.ends_at.is_(None),
                        func.date(RoleGrant.starts_at) < func.date(ends_at),
                    ),
                ),
            )
        )
        is not None
    )

apps.eu_tools.user_lifecycle.business_logic.has_overlapping_role_request

has_overlapping_role_request(
    alaner_id, role_id, starts_at, ends_at
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def has_overlapping_role_request(
    alaner_id: int, role_id: str, starts_at: datetime, ends_at: datetime
) -> bool:
    return (
        current_session.scalar(
            select(RoleRequest).where(
                RoleRequest.grantee_alaner_id == alaner_id,
                RoleRequest.role_id == role_id,
                RoleRequest.rescinded_at.is_(None),
                or_(
                    # New request starts during an existing request
                    and_(
                        RoleRequest.starts_at <= starts_at,
                        starts_at < RoleRequest.ends_at,
                    ),
                    # New request ends during an existing request
                    and_(
                        RoleRequest.starts_at < ends_at,
                        RoleRequest.ends_at >= ends_at,
                    ),
                    # New request completely contains an existing request
                    and_(
                        starts_at <= RoleRequest.starts_at,
                        ends_at >= RoleRequest.ends_at,
                    ),
                ),
            )
        )
        is not None
    )

apps.eu_tools.user_lifecycle.business_logic.has_role

has_role(user, role)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def has_role(user: AlanAccount, role: type[BaseRoleDefinition] | str) -> bool:
    return has_any_role(user, roles=[role])

apps.eu_tools.user_lifecycle.business_logic.has_role_at

has_role_at(user, role, date)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def has_role_at(
    user: AlanAccount, role: type[BaseRoleDefinition] | str, date: datetime
) -> bool:
    return has_any_role_at(user, [role], date)

apps.eu_tools.user_lifecycle.business_logic.notify_of_role_removal

notify_of_role_removal(dry_run)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def notify_of_role_removal(dry_run: bool) -> None:
    for grant in current_session.query(RoleGrant).filter(  # noqa: ALN085
        not_(RoleGrant.is_active),
        RoleGrant.ends_at.between(
            datetime.utcnow() - timedelta(hours=24), datetime.utcnow()
        ),
    ):
        active_role_grants = grant.grantee.role_grants.filter(RoleGrant.is_active)
        current_role_grant_ids = {
            role_grant.role_id for role_grant in active_role_grants
        }

        if grant.role_id not in current_role_grant_ids:
            active_role_grants = grant.grantee.role_grants.filter(
                RoleGrant.is_active
            ).all()
            remaining_roles_for_provider_message = build_remaining_roles_message(
                grant, active_role_grants
            )

            role_doc_link = get_role_documentation_url(grant.role_id)
            role_doc_message = (
                f" (<{role_doc_link}|documentation>)" if role_doc_link else ""
            )

            if (
                grant.grantee_external_user_id is not None
                and grant.grantee.referent is not None
            ):
                current_logger.info(
                    f"Notifying {grant.grantee.referent.full_name} that {grant.grantee.email} was removed the role `{grant.role_id}`."
                )
                if not dry_run and grant.grantee.referent.slack_id:
                    send_dm_to_slack_user(
                        grant.grantee.referent.slack_id,
                        f"External user {grant.grantee.email}'s role `{grant.role_id}`{role_doc_message} has expired."
                        f"{remaining_roles_for_provider_message}"
                        f"Please request an extension "
                        f"<https://home.alan.com/profile/{grant.grantee.id}/roles?request=True&role={grant.role_id}|here> if needed.",
                    )
            current_logger.info(
                f"Notifying {grant.grantee.full_name} they were removed the role `{grant.role_id}`"
            )
            if not dry_run and grant.grantee.slack_id:
                send_dm_to_slack_user(
                    grant.grantee.slack_id,
                    f"Your role `{grant.role_id}`{role_doc_message} has expired. "
                    f"{remaining_roles_for_provider_message}"
                    f"Please request an extension "
                    f"<https://home.alan.com/profile/{grant.grantee.id}/roles?request=True&role={grant.role_id}|here> if needed.",
                )

apps.eu_tools.user_lifecycle.business_logic.process_approved_role_request_and_notify

process_approved_role_request_and_notify(
    role_request_id,
    channel_id,
    message_ts,
    payload,
    with_retry_button=False,
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def process_approved_role_request_and_notify(
    role_request_id: UUID,
    channel_id: str,
    message_ts: str,
    payload: dict,  # type: ignore[type-arg]
    with_retry_button: bool = False,
) -> None:
    role_request = mandatory(current_session.get(RoleRequest, role_request_id))
    text, blocks = get_approval_process_message(role_request)
    if role_request.approver is None:
        raise ValueError("Request is not approved")

    if process_user_lifecycle_task_related_to_role_request(
        role_request,
        payload.get("dry_run", False),
        run_async=False,
    ):
        current_app.slack_web_client.chat_update(  # type: ignore[attr-defined]
            channel=channel_id,
            ts=message_ts,
            text=text,
            blocks=[
                *blocks,
                ContextBlock(
                    elements=[
                        MarkdownTextObject(
                            text=f"Approved by <@{role_request.approver.slack_id}>, role synced :done:"
                        )
                    ]
                ),
            ],
        )

        if role_request.grantee.slack_id:
            duration_text = (
                f"until {role_request.ends_at.strftime('%Y-%m-%d')}"
                if role_request.ends_at
                else "permanently"
            )

            current_app.slack_web_client.chat_postMessage(  # type: ignore[attr-defined]
                channel=role_request.grantee.slack_id,
                text=f":white_check_mark: Your role request has been approved and synced!\n"
                f"• *Role:* `{role_request.role_id}`\n"
                f"• *Duration:* {duration_text}\n"
                f"• *Request details:* <{role_request.permalink}|View request>\n\n"
                "You should now have access. Let us know if you encounter any issues! :rocket:",
            )
        else:
            current_logger.warning(
                "Could not send Slack notification - user has no slack_id",
                user_email=role_request.grantee.email,
            )
    else:
        blocks.append(
            ContextBlock(
                elements=[
                    MarkdownTextObject(
                        text=f"Approved by <@{role_request.approver.slack_id}>, :warning: an error occured during the sync"
                    )
                ]
            )
        )
        if with_retry_button:
            blocks.append(
                ActionsBlock(
                    elements=[
                        ButtonElement(
                            text="Try again",
                            action_id="grant_management_retry_task_run",
                            value=dumps(payload),
                        ),
                    ]
                )
            )
        current_app.slack_web_client.chat_update(  # type: ignore[attr-defined]
            channel=channel_id,
            ts=message_ts,
            text=text,
            blocks=blocks,
        )

        current_app.slack_web_client.chat_postMessage(  # type: ignore[attr-defined]
            channel=channel_id,
            thread_ts=message_ts,
            text=f":warning: {'`@q_branch_oncall`' if payload.get('dry_run', False) else '<!subteam^{SlackGroup.qbranch_oncall}>'} The role request approval for <@{role_request.grantee.slack_id}> failed during sync. Please investigate.",
        )

apps.eu_tools.user_lifecycle.business_logic.process_manual_approval

process_manual_approval(role_request)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def process_manual_approval(role_request: RoleRequest):  # type: ignore[no-untyped-def]
    if role_request.approved_at:
        raise ValueError("Request is already approved")

    if role_request.rescinded_at:
        raise ValueError("Request is already rescinded")

    approver = current_auth_context.real_principal_as(Alaner)
    if role_request.grantee_alaner_id == approver.id:
        raise PermissionError("Cannot approve requests from self")

    role_definition = (
        current_session.execute(
            select(AlanHomeRole).filter(AlanHomeRole.role_id == role_request.role_id)
        )
        .scalars()
        .unique()
        .one_or_none()
    )
    if role_definition is None:
        raise ValueError("Invalid role_id")

    if not can_manage_grant(role_request):
        raise PermissionError(
            get_error_lack_of_permissions_message(
                role_definition.grant_management_permitted_for  # type: ignore[arg-type]
            )
        )

    role_request.approved_at = datetime.utcnow()
    role_request.approver_id = approver.id

apps.eu_tools.user_lifecycle.business_logic.process_request_update

process_request_update(role_request, payload)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def process_request_update(
    role_request: RoleRequest,
    payload: dict[str, Any],
) -> tuple[str | None, datetime | None]:
    old_reason = role_request.request_reason if "request_reason" in payload else None
    old_ends_at = role_request.ends_at if "ends_at" in payload else None

    if "request_reason" in payload:
        role_request.request_reason = payload["request_reason"]

    if "ends_at" in payload:
        new_ends_at = to_datetime(payload["ends_at"])
        if new_ends_at <= role_request.starts_at:
            raise ValueError("End date must be after start date")
        role_request.ends_at = new_ends_at

    return old_reason, old_ends_at

apps.eu_tools.user_lifecycle.business_logic.process_rescission

process_rescission(role_request, reason)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def process_rescission(role_request: RoleRequest, reason: str):  # type: ignore[no-untyped-def]
    if not reason:
        raise ValueError("You must provide a reason")
    if role_request.rescinded_at:
        raise ValueError("Request is already rescinded")

    role_definition = (
        current_session.execute(
            select(AlanHomeRole).filter(AlanHomeRole.role_id == role_request.role_id)
        )
        .scalars()
        .unique()
        .one_or_none()
    )
    if role_definition is None:
        raise ValueError("Invalid role_id")

    rescinder = current_auth_context.real_principal_as(Alaner)
    if (
        rescinder.id
        != role_request.grantee.id  # A grantee is always allowed to rescind their own requests
        or (
            role_request.grantee.type == AlanAccountType.external
            and role_request.grantee.referent
            and role_request.grantee.referent.id != rescinder.id
        )  # A referent is always allowed to rescind a request for their external user
    ):
        if not can_manage_grant(role_request):
            raise PermissionError(
                get_error_lack_of_permissions_message(
                    role_definition.grant_management_permitted_for  # type: ignore[arg-type]
                )
            )

    role_request.rescinded_at = datetime.utcnow()
    role_request.rescinder_id = rescinder.id
    role_request.rescission_reason = reason

apps.eu_tools.user_lifecycle.business_logic.process_self_service_approval

process_self_service_approval(
    role_request, role_definition
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def process_self_service_approval(
    role_request: RoleRequest, role_definition: AlanHomeRole
) -> None:
    if (
        role_request.grantee.type == AlanAccountType.alaner
        and has_permission(
            role_request.grantee, set(role_definition.self_service_permitted_for)
        )
        and role_request.ends_at
        <= (
            role_request.starts_at
            + timedelta(hours=role_definition.self_service_duration)
        )
    ):
        role_request.approver_id = role_request.grantee_alaner_id
        role_request.approved_at = datetime.utcnow()
process_user_lifecycle_task_related_to_role_request(
    role_request, dry_run, run_async=True
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def process_user_lifecycle_task_related_to_role_request(
    role_request: RoleRequest, dry_run: bool, run_async: bool = True
) -> bool:
    try:
        from apps.eu_tools.user_lifecycle.models.task import UserLifecycleTask

        tasks_to_process = []
        for task in UserLifecycleTask.ranked_subclasses():  # type: ignore[no-untyped-call]
            if task.provider:
                provider_role_ids = {
                    role.role_id for role in getattr(task.provider, "role_mapping", {})
                }
                if hasattr(task.provider, "get_role_mapping"):
                    provider_role_ids.update(
                        {
                            role.role_id if hasattr(role, "role_id") else role
                            for role in task.provider.get_role_mapping(
                                role_request.grantee
                            )
                        }
                    )

                if role_request.role_id in provider_role_ids:
                    tasks_to_process.append(task.id)

        if run_async:
            current_rq.get_queue(DEFAULT_PRIORITY).enqueue(
                process_user_lifecycle_tasks_for,
                user_id=role_request.grantee.id,
                user_type=role_request.grantee.type,
                task_pattern=tasks_to_process,
                dry_run=dry_run,
                retry=Retry(max=3),
            )
        else:
            return process_user_lifecycle_tasks_for(
                role_request.grantee.id,
                role_request.grantee.type,
                tasks_to_process,
                dry_run,
            )

        return True
    except Exception:
        current_logger.exception(
            f"Error processing role request {role_request.id} for role {role_request.role_id}"
        )
        return False

apps.eu_tools.user_lifecycle.business_logic.process_user_lifecycle_tasks_for

process_user_lifecycle_tasks_for(
    user_id,
    user_type,
    task_pattern,
    dry_run,
    batch_started_at=None,
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
@enqueueable
def process_user_lifecycle_tasks_for(
    user_id: int | UUID,
    user_type: AlanAccountType,  # noqa: ALN072
    task_pattern: str | list[str] | None,
    dry_run: bool,
    batch_started_at: float | None = None,
) -> bool:
    try:
        alan_types_map = {
            AlanAccountType.external: ExternalUser,
            AlanAccountType.alaner: Alaner,
            AlanAccountType.service_account: ServiceAccount,
        }

        user_model = alan_types_map[user_type]

        # Create the appropriate joinedload based on the user type
        if user_type == AlanAccountType.alaner:
            load_option = joinedload(Alaner.active_role_grants)
        elif user_type == AlanAccountType.external:
            load_option = joinedload(ExternalUser.active_role_grants)
        elif user_type == AlanAccountType.service_account:
            load_option = joinedload(ServiceAccount.active_role_grants)
        else:
            raise ValueError(f"Unsupported user type: {user_type}")

        user = current_session.get(
            user_model,
            user_id,
            options=[load_option],
        )

        if (
            user_type == AlanAccountType.alaner
            and not user.is_ended  # type: ignore[union-attr]
            and is_administratively_suspended(cast("Alaner", user))
        ):
            current_logger.info(f"Skipping user {user.id} in administrative leave")  # type: ignore[union-attr]
            return False
        reporter = UserLifecycleSlackReporter(user, dry_run)  # type: ignore[arg-type]
        for task_class in UserLifecycleTask.ranked_subclasses():  # type: ignore[no-untyped-call]
            if task_pattern is not None and (
                (
                    isinstance(task_pattern, str)
                    and not fnmatch(task_class.id, task_pattern)
                )
                or (
                    isinstance(task_pattern, list) and task_class.id not in task_pattern
                )
            ):
                continue
            task_class(user).process_run(reporter, dry_run)

        if batch_started_at is not None:
            elapsed_ms = (time.time() - batch_started_at) * 1000
            metrics.distribution(
                "user_lifecycle.run_tasks.job_elapsed",
                elapsed_ms,
                tags=[f"user_type:{user_type.value}", "success:true"],
            )
        return True

    except Exception:
        current_logger.exception(
            f"Could not process lifecycle tasks for {user.name if user_type == AlanAccountType.service_account else user.full_name}"  # type: ignore[union-attr]
        )
        if batch_started_at is not None:
            elapsed_ms = (time.time() - batch_started_at) * 1000
            metrics.distribution(
                "user_lifecycle.run_tasks.job_elapsed",
                elapsed_ms,
                tags=[f"user_type:{user_type.value}", "success:false"],
            )
        return False

apps.eu_tools.user_lifecycle.business_logic.remind_of_future_role_removals

remind_of_future_role_removals(dry_run)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def remind_of_future_role_removals(dry_run: bool) -> None:
    #  On monday we also inform if a removal occurs during the next weekend
    start_day = (
        utctoday() + timedelta(days=5)
        if utctoday().weekday() == 0
        else utctoday() + timedelta(days=7)
    )
    end_day = utctoday() + timedelta(days=7, hours=23)
    for grant in current_session.query(RoleGrant).filter(  # noqa: ALN085
        RoleGrant.is_active,
        RoleGrant.ends_at.isnot(None),
        RoleGrant.ends_at.between(start_day, end_day),
    ):
        if extension_exists(grant):
            current_logger.info(
                f"Extension already exists for role `{grant.role_id}` for {grant.grantee.email}. Skipping reminder."
            )
            continue

        is_covered_by_baseline_or_other_grant = (
            grant.grantee.role_grants.filter(
                RoleGrant.role_id == grant.role_id,
                or_(
                    and_(RoleGrant.rule_id.is_not(None), RoleGrant.ends_at.is_(None)),
                    RoleGrant.starts_at > grant.ends_at,
                ),
            ).count()
            > 0
        )
        if not is_covered_by_baseline_or_other_grant:
            active_role_grants = grant.grantee.role_grants.filter(
                RoleGrant.is_active
            ).all()
            remaining_roles_for_provider_message = build_remaining_roles_message(
                grant, active_role_grants
            )

            role_doc_link = get_role_documentation_url(grant.role_id)
            role_doc_message = (
                f" (<{role_doc_link}|documentation>)" if role_doc_link else ""
            )

            if (
                grant.grantee_external_user_id is not None
                and grant.grantee.referent is not None
            ):
                current_logger.info(
                    f"Reminding {grant.grantee.referent.full_name} of upcoming role removals `{grant.role_id}` on "
                    f"{grant.ends_at.date().isoformat()} for {grant.grantee.email}."  # type: ignore[union-attr]
                )
                if not dry_run and grant.grantee.referent.slack_id:
                    send_dm_to_slack_user(
                        grant.grantee.referent.slack_id,
                        f"External user {grant.grantee.email}'s role `{grant.role_id}`{role_doc_message} will expire on {grant.ends_at.date().isoformat()}."  # type: ignore[union-attr]
                        f"{remaining_roles_for_provider_message}"
                        f"\nPlease request an extension "
                        f"<https://home.alan.com/profile/{grant.grantee.id}/roles?request=True&role={grant.role_id}&start-at={end_day.strftime('%Y-%m-%d')}|here> if needed.",
                    )
            else:
                current_logger.info(
                    f"Reminding {grant.grantee.full_name} of upcoming role removals `{grant.role_id}` on "
                    f"{grant.ends_at.date().isoformat()}."  # type: ignore[union-attr]
                )
                if not dry_run and grant.grantee.slack_id:
                    send_dm_to_slack_user(
                        grant.grantee.slack_id,
                        f"Your role `{grant.role_id}`{role_doc_message} will expire on {grant.ends_at.date().isoformat()}. "  # type: ignore[union-attr]
                        f"{remaining_roles_for_provider_message}"
                        f"\nPlease request an extension "
                        f"<https://home.alan.com/profile/{grant.grantee.slack_handle}/roles?request=True&role={grant.role_id}&start-at={end_day.strftime('%Y-%m-%d')}|here> if needed.",
                    )

apps.eu_tools.user_lifecycle.business_logic.revoke_ended_users_grants_and_requests

revoke_ended_users_grants_and_requests(dry_run)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def revoke_ended_users_grants_and_requests(dry_run: bool) -> None:
    for role_grant in (
        current_session.query(RoleGrant)  # noqa: ALN085
        .join(RoleGrant._grantee_alaner, isouter=True)  # noqa: ALN027
        .join(RoleGrant._grantee_external_user, isouter=True)  # noqa: ALN027
        .join(RoleGrant._grantee_service_account, isouter=True)  # noqa: ALN027
        .filter(
            RoleGrant.is_active,
            or_(Alaner.is_ended, ExternalUser.is_ended),
        )
    ):
        current_logger.info(
            f"Revoking role grant {role_grant.id} for role {role_grant.role_id}"
        )
        role_grant.ends_at = max(
            min(utctoday(), role_grant.grantee.last_day),
            role_grant.starts_at.date() + timedelta(days=1),
        )

    for role_request in (
        current_session.query(RoleRequest)  # noqa: ALN085
        .join(RoleRequest._grantee_alaner, isouter=True)  # noqa: ALN027
        .join(RoleRequest._grantee_external_user, isouter=True)  # noqa: ALN027
        .join(RoleRequest._grantee_service_account, isouter=True)  # noqa: ALN027
        .filter(
            RoleRequest.approved_at.is_(None),
            RoleRequest.rescinded_at.is_(None),
            or_(Alaner.is_ended, ExternalUser.is_ended),
        )
    ):
        current_logger.info(
            f"Rescinding expired request {role_request.id} for role {role_request.role_id}"
        )
        role_request.rescinded_at = datetime.utcnow()
        role_request.rescission_reason = "User has left the company"

    if dry_run:
        current_session.rollback()
    else:
        current_session.commit()

apps.eu_tools.user_lifecycle.business_logic.revoke_role_grant_for_rescinded_request

revoke_role_grant_for_rescinded_request(rescinded_request)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def revoke_role_grant_for_rescinded_request(rescinded_request: RoleRequest) -> None:
    current_logger.info(
        f"Revoking role grant for role {rescinded_request.role_id} and rescinded request {rescinded_request.id} to {rescinded_request.grantee.email}"
    )
    if rescinded_request.role_grant is not None:
        rescinded_request.role_grant.ends_at = max(
            datetime.utcnow(),
            rescinded_request.role_grant.starts_at,
        )

        # Remove the potential roles the grant was providing
        process_user_lifecycle_task_related_to_role_request(
            rescinded_request, is_development_mode(), run_async=True
        )

apps.eu_tools.user_lifecycle.business_logic.revoke_unknown_grants

revoke_unknown_grants(dry_run)

Revoke active grants from rules or roles that no longer exist

Source code in apps/eu_tools/user_lifecycle/business_logic.py
def revoke_unknown_grants(dry_run: bool) -> None:
    """Revoke active grants from rules or roles that no longer exist"""
    known_roles = [role.role_id for role in current_session.query(AlanHomeRole)]  # noqa: ALN085
    known_rules = [rule.rule_id for rule in current_session.query(RoleGrantRule)]  # noqa: ALN085

    for grant in current_session.query(RoleGrant).filter(  # noqa: ALN085
        RoleGrant.role_id.notin_(known_roles),
        RoleGrant.is_active,
    ):
        grant.ends_at = datetime.utcnow()
    for grant in current_session.query(RoleGrant).filter(  # noqa: ALN085
        RoleGrant.rule_id.isnot(None),
        RoleGrant.rule_id.notin_(known_rules),
        RoleGrant.is_active,
    ):
        grant.ends_at = datetime.utcnow()

    if dry_run:
        current_session.rollback()
    else:
        current_session.commit()

apps.eu_tools.user_lifecycle.business_logic.start_approval_process

start_approval_process(role_request)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def start_approval_process(role_request: RoleRequest):  # type: ignore[no-untyped-def]
    if role_request.approved_at:
        raise ValueError("Request is already approved")

    if role_request.rescinded_at:
        raise ValueError("Request is already rescinded")

    text, blocks = get_approval_process_message(role_request)
    payload = dumps(
        {
            "role_request_id": str(role_request.id),
            "dry_run": not is_production_mode(),
        }
    )

    message = current_app.slack_web_client.chat_postMessage(  # type: ignore[attr-defined]
        channel="#role_requests" if is_production_mode() else "#eng_sandbox",
        text=text,
        blocks=[
            *blocks,
            ActionsBlock(
                elements=[
                    ButtonElement(
                        text=":x: Decline",
                        action_id="grants_management_decline_request",
                        value=payload,
                    ),
                    ButtonElement(
                        text=":check: Approve",
                        action_id="grants_management_approve_request",
                        value=payload,
                    ),
                ]
            ),
        ],
    )
    role_request.permalink = get_permalink(
        channel_id=message["channel"], message_ts=message["ts"]
    )

apps.eu_tools.user_lifecycle.business_logic.update_request_based_grants

update_request_based_grants(dry_run)

Update grants based on requests

Source code in apps/eu_tools/user_lifecycle/business_logic.py
def update_request_based_grants(dry_run: bool) -> None:
    """Update grants based on requests"""
    for role in current_session.query(AlanHomeRole):  # noqa: ALN085
        # a. Approved requests with no grant

        pending_requests = (
            current_session.query(RoleRequest)  # noqa: ALN085
            .join(RoleRequest.role_grant, isouter=True)
            .filter(
                RoleRequest.role_id == role.role_id,
                RoleRequest.approved_at.isnot(None),
                RoleRequest.rescinded_at.is_(None),
                # No grant
                RoleGrant.id.is_(None),
            )
        )
        for pending_request in pending_requests:
            add_role_grant_for_pending_request(pending_request)

        # b. Rescinded requests with an active grant
        rescinded_requests = (
            current_session.query(RoleRequest)  # noqa: ALN085
            .join(RoleRequest.role_grant)
            .filter(
                RoleRequest.role_id == role.role_id,
                RoleRequest.rescinded_at.isnot(None),
                or_(
                    RoleGrant.ends_at.is_(None),
                    RoleGrant.ends_at >= datetime.utcnow(),
                ),
            )
        )
        for rescinded_request in rescinded_requests:
            revoke_role_grant_for_rescinded_request(rescinded_request)

        if dry_run:
            current_session.rollback()
        else:
            current_session.commit()

apps.eu_tools.user_lifecycle.business_logic.update_role

update_role(role_definition, dry_run, slack_monitor=None)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def update_role(
    role_definition: type[BaseRoleDefinition],
    dry_run: bool,
    slack_monitor: SlackMonitor | None = None,
) -> None:
    current_logger.info(f"Updating role {role_definition.role_id}")
    try:
        role = (
            current_session.execute(
                select(AlanHomeRole).filter(
                    AlanHomeRole.role_id == role_definition.role_id
                )
            )
            .scalars()
            .unique()
            .one_or_none()
        )

        if role is None:
            role = AlanHomeRole(role_id=role_definition.role_id)
            current_session.add(role)
        role.description = role_definition.description
        role.can_be_requested = role_definition.can_be_requested
        role.self_service_permitted_for = list(
            role_definition.self_service_permitted_for
        )
        role.self_service_duration = role_definition.self_service_duration
        role.grant_management_permitted_for = list(
            role_definition.grant_management_permitted_for
        )
        role.slack_handle_to_ping = role_definition.slack_handle_to_ping

        role.role_metadata = role_definition.__metadata__
        role.generated_by = (
            str(role_definition.__generated_by__)
            if role_definition.__generated_by__
            else None
        )
        _, line_no = getsourcelines(role_definition)
        role.github_url = f"https://github.com/alan-eu/alan-apps/tree/main/backend/{relpath(getfile(role_definition))}#L{line_no}"

        current_session.query(RoleGrantRulePolicy).filter(  # noqa: ALN085
            RoleGrantRulePolicy.alan_home_role_id == role.id
        ).delete()
        role_grant_rules = (
            current_session.execute(
                select(RoleGrantRule).filter(
                    RoleGrantRule.rule_id.in_(role_definition.role_grant_rule_ids())
                )
            )
            .scalars()
            .unique()
            .all()
        )

        if missing_role_grant_rules := (
            set(role_definition.role_grant_rule_ids())
            - {role_grant_rule.rule_id for role_grant_rule in role_grant_rules}
        ):
            current_logger.warning(
                f"Missing role grant rules for {role_definition.role_id}: {missing_role_grant_rules}"
            )
            if slack_monitor:
                slack_monitor.add_context(
                    "",
                    blocks=[
                        SectionBlock(
                            text=f":warning: Missing role grant rules for `{role_definition.role_id}`: `{missing_role_grant_rules}`"
                        ),
                        ContextBlock(
                            elements=[
                                MarkdownTextObject(
                                    text=f":github: <{role.github_url}|Source code>"
                                )
                            ]
                        ),
                    ],
                )

        role.role_grant_rules = role_grant_rules  # type: ignore[assignment]

        if dry_run:
            current_session.rollback()
        else:
            current_session.commit()

    except Exception as e:
        current_logger.exception(f"Could not update role {role_definition.role_id}")
        if slack_monitor:
            slack_monitor.add_context(
                f"Could not sync alan home role `{role_definition.role_id}`: ```{str(e)}```"
            )

apps.eu_tools.user_lifecycle.business_logic.update_role_definitions_for

update_role_definitions_for(generator, dry_run)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def update_role_definitions_for(
    generator: type[RoleDefinitionGenerator], dry_run: bool
) -> None:
    for role_defininition in generator.role_definitions():
        update_role(role_defininition, dry_run=dry_run, slack_monitor=None)

apps.eu_tools.user_lifecycle.business_logic.update_rule_based_grants

update_rule_based_grants(dry_run)

Update grants based on rules

Source code in apps/eu_tools/user_lifecycle/business_logic.py
def update_rule_based_grants(dry_run: bool) -> None:
    """Update grants based on rules"""

    for role in current_session.query(AlanHomeRole).options(  # noqa: ALN085
        joinedload(AlanHomeRole.role_grant_rules)
    ):
        # Revoke grants based on rules no longer assigned to this role
        role_grant_rule_ids = {rule.rule_id for rule in role.role_grant_rules}
        for outdated_rule_grant in current_session.query(RoleGrant).filter(  # noqa: ALN085
            RoleGrant.role_id == role.role_id,
            RoleGrant.is_active,
            RoleGrant.rule_id.isnot(None),
            RoleGrant.rule_id.notin_(role_grant_rule_ids),
        ):
            current_logger.info(
                f"Revoking role grant for role {role.role_id} and rule {outdated_rule_grant.rule_id} to {outdated_rule_grant.grantee.email if outdated_rule_grant.grantee else 'ex-user'}"
            )
            outdated_rule_grant.ends_at = datetime.utcnow()

        # Update grants based on rules currently assigned to this role
        for rule in role.role_grant_rules:
            target_grantees = rule.grantees
            current_grants = (
                current_session.execute(
                    select(RoleGrant).filter(
                        RoleGrant.role_id == role.role_id,
                        RoleGrant.rule_id == rule.rule_id,
                        RoleGrant.is_active,
                    )
                )
                .scalars()
                .unique()
                .all()
            )

            current_grantees = set(grant.grantee for grant in current_grants)
            for missing_grantee in target_grantees - current_grantees:
                current_logger.info(
                    f"Adding role grant for role {role.role_id} and rule {rule.rule_id} to {missing_grantee.email}"
                )
                current_session.add(
                    RoleGrant(
                        grantee_alaner_id=(
                            missing_grantee.id
                            if missing_grantee.type == AlanAccountType.alaner
                            else None
                        ),
                        grantee_external_user_id=(
                            missing_grantee.id
                            if missing_grantee.type == AlanAccountType.external
                            else None
                        ),
                        grantee_service_account_id=(
                            missing_grantee.id
                            if missing_grantee.type == AlanAccountType.service_account
                            else None
                        ),
                        role_id=role.role_id,
                        rule_id=rule.rule_id,
                        starts_at=datetime.utcnow(),
                    )
                )

            stale_grants = [
                grant
                for grant in current_grants
                if grant.grantee not in target_grantees
            ]
            for stale_grant in stale_grants:
                current_logger.info(
                    f"Revoking role grant for role {role.role_id} and rule {rule.rule_id} to {stale_grant.grantee.email if stale_grant.grantee else 'ex-user'}"
                )
                stale_grant.ends_at = datetime.utcnow()

    if dry_run:
        current_session.rollback()
    else:
        current_session.commit()

apps.eu_tools.user_lifecycle.business_logic.validate_no_overlapping_requests_nor_grants

validate_no_overlapping_requests_nor_grants(
    alaner_id,
    role_id,
    starts_at,
    ends_at,
    current_request_id=None,
)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def validate_no_overlapping_requests_nor_grants(
    alaner_id: int,
    role_id: str,
    starts_at: datetime,
    ends_at: datetime,
    current_request_id: UUID | None = None,
) -> None:
    if current_request_id is not None:
        # Case: Updating existing request (only check end date overlap)
        if has_overlapping_end_date(
            alaner_id=alaner_id,
            role_id=role_id,
            current_request_id=current_request_id,
            starts_at=starts_at,
            new_ends_at=ends_at,
        ):
            raise ValueError(
                "The new end date would overlap with another existing role request"
            )
    else:
        # Case: Creating new request
        if has_overlapping_role_request(
            alaner_id=alaner_id, role_id=role_id, starts_at=starts_at, ends_at=ends_at
        ):
            raise ValueError("An overlapping role request already exists for this role")

    if has_overlapping_role_grant(
        alaner_id=alaner_id, role_id=role_id, starts_at=starts_at, ends_at=ends_at
    ):
        raise ValueError("An overlapping role grant already exists for this role")

apps.eu_tools.user_lifecycle.business_logic.validate_request_status_can_be_amended

validate_request_status_can_be_amended(role_request)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def validate_request_status_can_be_amended(role_request: RoleRequest) -> None:
    if role_request.approved_at:
        raise ValueError("Request is already approved")
    if role_request.rescinded_at:
        raise ValueError("Request is already rescinded")

apps.eu_tools.user_lifecycle.business_logic.validate_role_and_permissions

validate_role_and_permissions(role_request)
Source code in apps/eu_tools/user_lifecycle/business_logic.py
def validate_role_and_permissions(
    role_request: RoleRequest,
) -> AlanHomeRole:
    role_requested: AlanHomeRole | None = (
        current_session.execute(
            select(AlanHomeRole).filter(AlanHomeRole.role_id == role_request.role_id)
        )
        .scalars()
        .unique()
        .one_or_none()
    )

    if role_requested is None:
        raise ValueError("Invalid role_id")

    is_external_referent = (
        role_request.grantee.type == AlanAccountType.external
        and role_request.grantee.referent
        and role_request.grantee.referent.id
        == current_auth_context.real_principal_as(Alaner).id
    )

    has_management_permission = can_manage_grant(role_request)

    if not (is_external_referent or has_management_permission):
        raise PermissionError(
            get_error_lack_of_permissions_message(
                set(role_requested.grant_management_permitted_for)
            )
        )

    return role_requested