Skip to content

Api reference

components.encryption.public.business_logic

encrypt_file_for_doctor_group_and_app_user

encrypt_file_for_doctor_group_and_app_user(
    file_data, app_user
)

Encrypt a file for the doctor group and the app user.

Source code in components/encryption/public/business_logic.py
def encrypt_file_for_doctor_group_and_app_user(
    file_data: bytes, app_user: FeatureUser
) -> str:
    """
    Encrypt a file for the doctor group and the app user.
    """
    user_public_key = get_public_key_of_app_user(app_user=app_user)

    if not user_public_key:
        current_logger.error(
            "User encryption has no public key. Cannot encrypt file for the user."
        )
        raise BaseErrorCode.missing_resource()

    recipients = [
        EncryptionRecipient(
            principal_type="group",
            public_key=get_public_key_of_doctor_group(),
        ),
        EncryptionRecipient(
            principal_type="user",
            public_key=user_public_key,
        ),
    ]

    return encrypt_data_to_jwe(recipients=recipients, plaintext_as_bytes=file_data)

encrypt_message_for_doctor_group

encrypt_message_for_doctor_group(message)

Encrypt a plaintext message only for the doctor group.

Source code in components/encryption/public/business_logic.py
def encrypt_message_for_doctor_group(message: str) -> str:
    """
    Encrypt a plaintext message only for the doctor group.
    """
    recipients = [
        EncryptionRecipient(
            principal_type="group",
            public_key=get_public_key_of_doctor_group(),
        ),
    ]

    return encrypt_data_to_jwe(
        recipients=recipients, plaintext_as_bytes=message.encode("utf-8")
    )

encrypt_message_for_doctor_group_and_app_user

encrypt_message_for_doctor_group_and_app_user(
    message, app_user
)

Encrypt a plaintext message for the doctor group and the app user.

Source code in components/encryption/public/business_logic.py
def encrypt_message_for_doctor_group_and_app_user(
    message: str, app_user: FeatureUser
) -> str:
    """
    Encrypt a plaintext message for the doctor group and the app user.
    """
    user_public_key = get_public_key_of_app_user(app_user=app_user)

    if not user_public_key:
        current_logger.error(
            "User encryption has no public key. Cannot encrypt message for the user."
        )
        raise BaseErrorCode.missing_resource()

    recipients = [
        EncryptionRecipient(
            principal_type="group",
            public_key=get_public_key_of_doctor_group(),
        ),
        EncryptionRecipient(
            principal_type="user",
            public_key=user_public_key,
        ),
    ]

    return encrypt_data_to_jwe(
        recipients=recipients, plaintext_as_bytes=message.encode("utf-8")
    )

get_public_key_of_app_user

get_public_key_of_app_user(app_user)

Get the public key of an app user from their encryption user.

Source code in components/encryption/public/business_logic.py
def get_public_key_of_app_user(app_user: FeatureUser) -> str | None:
    """
    Get the public key of an app user from their encryption user.
    """
    from components.encryption.internal.models.encryption_user import (
        EncryptionUser,
    )

    encryption_user: EncryptionUser | None = (
        current_session.query(EncryptionUser)  # noqa: ALN085
        .filter(EncryptionUser.feature_user == app_user)
        .one_or_none()
    )

    if encryption_user is None:
        current_logger.warning(
            "No encryption user found for app user",
            app_id=app_user.app_id,
            app_user_id=app_user.app_user_id,
        )
        return None

    if encryption_user.user_public_key is None:
        current_logger.warning(
            "Encryption user has no public key",
            app_id=app_user.app_id,
            app_user_id=app_user.app_user_id,
        )
        return None

    return encryption_user.user_public_key

components.encryption.public.commands

PASSWORD_HASH_KEY module-attribute

PASSWORD_HASH_KEY = b'a pseudo random key'

encryption_commands module-attribute

encryption_commands = AppGroup('encryption')

generate_initial_keys

generate_initial_keys(clean_first)

During the migration we will need to create User and Device Keypairs proactively from the backend for all existing members with conversations. For dev/testing, this sets up new keys for all users.

Source code in components/encryption/public/commands.py
@encryption_commands.command()
@click.option("--clean-first", type=bool, is_flag=True)
@do_not_run_in_prod
def generate_initial_keys(clean_first: bool) -> None:
    """
    During the migration we will need to create User and Device Keypairs proactively from the backend for all existing members with conversations.
    For dev/testing, this sets up new keys for all users.
    """
    _generate_initial_keys(clean_first=clean_first)

reset_encryption_keys

reset_encryption_keys(encryption_user_ids, execute)
Source code in components/encryption/public/commands.py
@encryption_commands.command()
@click.argument(
    "encryption_user_ids",
    type=uuid.UUID,
    nargs=-1,
)
@click.option("--execute", is_flag=True, default=False)
def reset_encryption_keys(  # noqa: D103
    encryption_user_ids: list[uuid.UUID],
    execute: bool,
) -> None:
    from sqlalchemy import or_

    from components.clinic.internal.models.clinic_user import (  # noqa: ALN039,ALN069
        ClinicUser,
    )
    from components.clinic.internal.models.medical_conversation import (  # noqa: ALN039,ALN069
        MedicalConversation,
    )
    from components.encryption.internal.models.encryption_user import (
        EncryptionUser,
    )

    for encryption_user_id in encryption_user_ids:
        encryption_user = current_session.scalar(
            select(EncryptionUser)
            .options(joinedload(EncryptionUser.encryption_devices))
            .filter(EncryptionUser.id == encryption_user_id)
        )
        if encryption_user is None:
            click.secho(
                f"Encryption user {encryption_user_id} does not exist",
                err=True,
                fg="red",
            )
            continue

        click.secho(
            f"Found encryption user: {encryption_user.app_id}::{encryption_user.app_user_id}",
            fg="green",
        )

        clinic_user = current_session.scalar(
            select(ClinicUser).filter(
                ClinicUser.feature_user == encryption_user.feature_user
            )
        )
        if clinic_user is not None:
            nb_conversations = (
                current_session.scalar(
                    select(func.count(MedicalConversation.id)).filter(
                        or_(
                            MedicalConversation.member_clinic_user_id == clinic_user.id,
                            MedicalConversation.creator_clinic_user_id
                            == clinic_user.id,
                        )
                    )
                )
                or 0
            )

            if nb_conversations > 0:
                click.secho(
                    f"Cannot reset encryption key of user {encryption_user_id}, we found {nb_conversations} conversations",
                    err=True,
                    fg="red",
                )
                continue

        click.secho(
            f"Resetting keys of user {encryption_user_id}",
            fg="green",
        )
        encryption_user.user_secret_key_as_jwe = None
        encryption_user.user_public_key = None
        encryption_user.device_secret_key_as_jwe = None
        encryption_user.device_public_key = None
        encryption_user.user_secret_recovery_requested = False
        encryption_user.is_device_secret_encrypted = False
        encryption_user.encryption_devices = []

        if execute:
            current_session.commit()
            click.secho(
                f"Keys of user {encryption_user_id} have been reset",
                fg="green",
            )
        else:
            current_session.flush()
            current_session.rollback()
            click.secho(
                f"NOT EXECUTING: Keys of user {encryption_user_id} would have been reset",
                fg="yellow",
            )