Skip to content

Api reference

components.occupational_health.public.actions

account

move_occupational_health_account_models_from_source_to_target_account

move_occupational_health_account_models_from_source_to_target_account(
    source_account_id, target_account_id
)

Function to move all occupational health models linked to an account from a source account to a target account in case of an account being deleted/merged

Source code in components/occupational_health/public/actions/account.py
def move_occupational_health_account_models_from_source_to_target_account(
    source_account_id: UUID,
    target_account_id: UUID,
) -> None:
    """
    Function to move all occupational health models linked to an account from a source account to a target account
    in case of an account being deleted/merged
    """
    error = None
    messages: list[str] = []
    source_account_id_typed = AccountId(source_account_id)
    target_account_id_typed = AccountId(target_account_id)

    with transaction(propagation=Propagation.REQUIRES_NEW) as session:
        try:
            messages += _move_subscription_model(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_affiliations(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_affiliation_strategy_rules(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_affiliation_movements(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_dashboard_admins(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_mass_affiliation_logs(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_profile_situations(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_visits(source_account_id_typed, target_account_id_typed)
            messages += _move_workspace_actions(
                source_account_id_typed, target_account_id_typed
            )
            messages += _move_profile_employment_data(
                source_account_id_typed, target_account_id_typed
            )

            # TODO: occupational_health_billed_entity => check logic?
        except Exception as e:
            error = e
            current_logger.exception(
                "Failed to move occupational health models from source account to target account, rollbacking changes",
                source_account_id=source_account_id,
                target_account_id=target_account_id,
            )
            messages.append(
                " ⚠️ Exception occurred while moving occupational health models from source account to target account"
            )
            messages.append(str(e))
            session.rollback()

    if len(messages) > 0:
        _post_slack_notification(source_account_id, target_account_id, messages)

    if error is not None:
        raise error

billing

Public actions for occupational health billing invoice emails.

send_billing_invoices_email

send_billing_invoices_email(account_id)

Send billing invoice emails to all HR contacts of an account.

Source code in components/occupational_health/public/actions/billing.py
def send_billing_invoices_email(account_id: AccountId) -> None:
    """Send billing invoice emails to all HR contacts of an account."""
    from components.occupational_health.internal.business_logic.billing.send_invoice_email import (
        send_billing_invoices_email as send_billing_invoices_email_internal,
    )

    send_billing_invoices_email_internal(account_id=account_id)

billing_entity

Public actions for managing occupational health billed entities.

create_billed_entity

create_billed_entity(
    entity_name,
    siren,
    postal_street,
    postal_code,
    postal_city,
    postal_country_code,
    account_id,
    subscription_ref,
    emails_to_notify,
    siret=None,
    has_installment_plan_per_year=None,
    save=True,
)

Create a new billed entity for occupational health billing.

Parameters:

Name Type Description Default
entity_name str

Legal name of the entity

required
siren str

French company identifier (9 digits)

required
postal_street str

Street address

required
postal_code str

Postal code

required
postal_city str

City name

required
postal_country_code str

ISO 3166-1 alpha-2 country code (e.g. 'FR', 'BE')

required
account_id AccountId

Account ID this entity belongs to

required
subscription_ref UUID

Reference of the subscription

required
emails_to_notify list[str]

List of HR email addresses to notify

required
siret str | None

French establishment identifier (14 digits, optional)

None
has_installment_plan_per_year dict[str, bool] | None

Year-specific boolean for installment plan

None
save bool

Whether to commit the transaction

True

Returns:

Type Description
OccupationalHealthBilledEntity

The created billed entity

Source code in components/occupational_health/public/actions/billing_entity.py
def create_billed_entity(
    entity_name: str,
    siren: str,
    postal_street: str,
    postal_code: str,
    postal_city: str,
    postal_country_code: str,
    account_id: AccountId,
    subscription_ref: UUID,
    emails_to_notify: list[str],
    siret: str | None = None,
    has_installment_plan_per_year: dict[str, bool] | None = None,
    save: bool = True,
) -> OccupationalHealthBilledEntity:
    """
    Create a new billed entity for occupational health billing.

    Args:
        entity_name: Legal name of the entity
        siren: French company identifier (9 digits)
        postal_street: Street address
        postal_code: Postal code
        postal_city: City name
        postal_country_code: ISO 3166-1 alpha-2 country code (e.g. 'FR', 'BE')
        account_id: Account ID this entity belongs to
        subscription_ref: Reference of the subscription
        emails_to_notify: List of HR email addresses to notify
        siret: French establishment identifier (14 digits, optional)
        has_installment_plan_per_year: Year-specific boolean for installment plan
        save: Whether to commit the transaction

    Returns:
        The created billed entity
    """
    return create_billed_entity_internal(
        entity_name=entity_name,
        siren=siren,
        postal_street=postal_street,
        postal_code=postal_code,
        postal_city=postal_city,
        postal_country_code=postal_country_code,
        account_id=account_id,
        subscription_ref=subscription_ref,
        emails_to_notify=emails_to_notify,
        siret=siret,
        has_installment_plan_per_year=has_installment_plan_per_year,
        save=save,
    )

update_billed_entity

update_billed_entity(
    billed_entity_id,
    entity_name=NOT_SET,
    siren=NOT_SET,
    siret=NOT_SET,
    postal_street=NOT_SET,
    postal_code=NOT_SET,
    postal_city=NOT_SET,
    postal_country_code=NOT_SET,
    subscription_ref=NOT_SET,
    emails_to_notify=NOT_SET,
    has_installment_plan_per_year=NOT_SET,
    save=True,
)

Update an existing billed entity.

Parameters:

Name Type Description Default
billed_entity_id UUID

ID of the billed entity to update

required
entity_name NotSet[str]

Legal name of the entity

NOT_SET
siren NotSet[str]

French company identifier (9 digits)

NOT_SET
siret NotSet[str | None]

French establishment identifier (14 digits, optional)

NOT_SET
postal_street NotSet[str]

Street address

NOT_SET
postal_code NotSet[str]

Postal code

NOT_SET
postal_city NotSet[str]

City name

NOT_SET
postal_country_code NotSet[str]

ISO 3166-1 alpha-2 country code (e.g. 'FR', 'BE')

NOT_SET
subscription_ref NotSet[UUID]

Reference of the subscription

NOT_SET
emails_to_notify NotSet[list[str]]

List of HR email addresses to notify

NOT_SET
has_installment_plan_per_year NotSet[dict[str, bool] | None]

Year-specific boolean for installment plan

NOT_SET
save bool

Whether to commit the transaction

True

Returns:

Type Description
OccupationalHealthBilledEntity

The updated billed entity

Source code in components/occupational_health/public/actions/billing_entity.py
def update_billed_entity(
    billed_entity_id: UUID,
    entity_name: NotSet[str] = NOT_SET,
    siren: NotSet[str] = NOT_SET,
    siret: NotSet[str | None] = NOT_SET,
    postal_street: NotSet[str] = NOT_SET,
    postal_code: NotSet[str] = NOT_SET,
    postal_city: NotSet[str] = NOT_SET,
    postal_country_code: NotSet[str] = NOT_SET,
    subscription_ref: NotSet[UUID] = NOT_SET,
    emails_to_notify: NotSet[list[str]] = NOT_SET,
    has_installment_plan_per_year: NotSet[dict[str, bool] | None] = NOT_SET,
    save: bool = True,
) -> OccupationalHealthBilledEntity:
    """
    Update an existing billed entity.

    Args:
        billed_entity_id: ID of the billed entity to update
        entity_name: Legal name of the entity
        siren: French company identifier (9 digits)
        siret: French establishment identifier (14 digits, optional)
        postal_street: Street address
        postal_code: Postal code
        postal_city: City name
        postal_country_code: ISO 3166-1 alpha-2 country code (e.g. 'FR', 'BE')
        subscription_ref: Reference of the subscription
        emails_to_notify: List of HR email addresses to notify
        has_installment_plan_per_year: Year-specific boolean for installment plan
        save: Whether to commit the transaction

    Returns:
        The updated billed entity
    """
    return update_billed_entity_internal(
        billed_entity_id=billed_entity_id,
        entity_name=entity_name,
        siren=siren,
        siret=siret,
        postal_street=postal_street,
        postal_code=postal_code,
        postal_city=postal_city,
        postal_country_code=postal_country_code,
        subscription_ref=subscription_ref,
        emails_to_notify=emails_to_notify,
        has_installment_plan_per_year=has_installment_plan_per_year,
        save=save,
    )

doctolib_matching

Public facade for Doctolib matching actions.

apply_cancellations

apply_cancellations(items)

Entry point — delegates to DoctolibMatchingApplier.

Source code in components/occupational_health/internal/business_logic/doctolib/actions.py
def apply_cancellations(items: list[CancelledItem]) -> ApplyResultSummary:
    """Entry point — delegates to DoctolibMatchingApplier."""
    return DoctolibMatchingApplier().apply_cancellations(items)

apply_rescheduling

apply_rescheduling(items)

Entry point — delegates to DoctolibMatchingApplier.

Source code in components/occupational_health/internal/business_logic/doctolib/actions.py
def apply_rescheduling(items: list[ReschedulingItem]) -> ApplyResultSummary:
    """Entry point — delegates to DoctolibMatchingApplier."""
    return DoctolibMatchingApplier().apply_rescheduling(items)

employee_export

AffiliationStatus

Bases: Enum

Affiliation status of a member.

AFFILIATED class-attribute instance-attribute
AFFILIATED = 'Affilié'
TERMINATED class-attribute instance-attribute
TERMINATED = 'Résilié'
UPCOMING class-attribute instance-attribute
UPCOMING = 'À venir'

HEADERS module-attribute

HEADERS = [
    "Prénom",
    "Nom",
    "SIREN/SIRET",
    "Entité",
    "Matricule",
    "Email d'invitation",
    "Numéro de sécurité sociale",
    "NTT",
    "Catégorie de risque",
    "Date de dernière visite périodique",
    "Date de prochaine visite périodique",
    "Date de début de contrat",
    "Statut de visite",
    "Statut d'affiliation",
]

VisitStatus

Bases: Enum

Visit status of a member.

LATE class-attribute instance-attribute
LATE = 'En retard'
ON_TIME class-attribute instance-attribute
ON_TIME = 'À jour'
UPCOMING class-attribute instance-attribute
UPCOMING = 'Planifiée'

generate_employees_export

generate_employees_export(account_id, file_format)

Generate an export of occupational health employees for the given account.

Source code in components/occupational_health/public/actions/employee_export.py
def generate_employees_export(
    account_id: uuid.UUID,
    file_format: ExportExtension,
) -> GeneratedExportData:
    """Generate an export of occupational health employees for the given account."""
    if file_format not in _SERIALIZER_BY_FORMAT:
        raise ValueError(f"Unsupported file format: {file_format}")

    members: list[AffiliatedMemberForAdminDashboard] = (
        get_current_or_upcoming_affiliated_members_for_admin_dashboard(
            AccountId(account_id),
            with_ssn_and_ntt=True,
            with_siren_or_siret=True,
        )
    )
    items_count = len(members)

    return GeneratedExportData(
        file_content=_SERIALIZER_BY_FORMAT[file_format](_generate_rows(members)),
        mimetype=_MIMETYPE_BY_FORMAT[file_format],
        items_count=items_count,
    )

members

update_dmst

update_dmst(
    occupational_health_profile_id, dmst, commit=True
)

Updates the DMST data for a given occupational health profile. We also create the medical secrecy worker record ID, which will be created by the caller on the next request using this ID

See update_curriculum_laboris_job: also creates or updates jobs for the profile

Source code in components/occupational_health/internal/business_logic/actions/members/update_dmst.py
def update_dmst(
    occupational_health_profile_id: UUID,
    dmst: Dmst,
    commit: bool = True,
) -> None:
    """
    Updates the DMST data for a given occupational health profile. We also create the
    medical secrecy worker record ID, which will be created by the caller on the next
    request using this ID

    See update_curriculum_laboris_job: also creates or updates jobs for the profile
    """

    occupational_health_profile = current_session.get_one(
        OccupationalHealthProfile, occupational_health_profile_id
    )

    if not dmst.medical_secrecy_worker_record_id:
        # Note: we're pre-creating the ID, the caller needs to create the medical secrecy data
        # Dirty hack: we modify the dmst param so that the caller can get the new ID
        occupational_health_profile.medical_secrecy_worker_record_id = uuid.uuid4()
        dmst.medical_secrecy_worker_record_id = (
            occupational_health_profile.medical_secrecy_worker_record_id
        )

    existing_job_ids = {
        job.id
        for job in OccupationalHealthJobBroker.get_existing_jobs(
            occupational_health_profile_id,
        ).all()
    }
    updated_job_ids = {
        job.id for job in dmst.curriculum_laboris.jobs if job.id or set()
    }
    deleted_ids = set(existing_job_ids) - set(updated_job_ids)

    for deleted_id in deleted_ids:
        delete_curriculum_laboris_job(
            occupational_health_profile_id=occupational_health_profile.id,
            job_id=deleted_id,
            commit=False,
        )

    if dmst.curriculum_laboris.jobs:
        for job in dmst.curriculum_laboris.jobs:
            update_curriculum_laboris_job(
                occupational_health_profile_id=occupational_health_profile.id,
                job=job,
                commit=False,
            )

    if dmst.administrative_profile:
        update_administrative_profile(
            occupational_health_profile_id=occupational_health_profile.id,
            health_statuses=dmst.administrative_profile.health_statuses,
            risk_category=dmst.administrative_profile.risk_category,
            notes=dmst.administrative_profile.notes,
            medical_record_sharing_consent=dmst.administrative_profile.medical_record_sharing_consent,
            medical_record_access_consent=dmst.administrative_profile.medical_record_access_consent,
            health_data_sharing_consent=dmst.administrative_profile.health_data_sharing_consent,
            video_consultation_consent=dmst.administrative_profile.video_consultation_consent,
            orient_prevention_of_work_consent=dmst.administrative_profile.orient_prevention_of_work_consent,
            transfer_dmst_to_spsti_consent=dmst.administrative_profile.transfer_dmst_to_spsti_consent,
            commit=False,
        )

        update_personal_email(
            profile_id=ProfileId(occupational_health_profile.id),
            personal_email=dmst.administrative_profile.personal_email,
            commit=False,
        )

        update_phone_number(
            profile_id=ProfileId(occupational_health_profile.id),
            phone_number=dmst.administrative_profile.phone_number,
            commit=False,
        )

    if commit:
        current_session.commit()
    else:
        current_session.flush()

on_demand_visit

Public facade for on-demand visit actions.

OnDemandVisitInput dataclass

OnDemandVisitInput(
    first_name,
    last_name,
    email,
    phone_number,
    account_name,
    account_id,
    visit_format,
    visit_type,
    if_return_visit_specify_motive,
    if_on_demand_visit_specify_motive,
    if_pre_return_visit_specify_request_initiator,
    work_stoppage_start_date,
    work_stoppage_end_date,
    return_to_work_date,
    utm_source,
    utm_content,
    is_hr_informed,
    is_employee_submitting_the_request,
    person_submitting_request,
    additional_comments,
    submitted_at,
)

Input data for creating an on-demand visit request.

account_id instance-attribute
account_id
account_name instance-attribute
account_name
additional_comments instance-attribute
additional_comments
email instance-attribute
email
first_name instance-attribute
first_name
if_on_demand_visit_specify_motive instance-attribute
if_on_demand_visit_specify_motive
if_pre_return_visit_specify_request_initiator instance-attribute
if_pre_return_visit_specify_request_initiator
if_return_visit_specify_motive instance-attribute
if_return_visit_specify_motive
is_employee_submitting_the_request instance-attribute
is_employee_submitting_the_request
is_hr_informed instance-attribute
is_hr_informed
last_name instance-attribute
last_name
person_submitting_request instance-attribute
person_submitting_request
phone_number instance-attribute
phone_number
return_to_work_date instance-attribute
return_to_work_date
submitted_at instance-attribute
submitted_at
utm_content instance-attribute
utm_content
utm_source instance-attribute
utm_source
visit_format instance-attribute
visit_format
visit_type instance-attribute
visit_type
work_stoppage_end_date instance-attribute
work_stoppage_end_date
work_stoppage_start_date instance-attribute
work_stoppage_start_date

create_on_demand_visit

create_on_demand_visit(visit_input)

Create an on-demand visit: Append row to GSheet.

Parameters:

Name Type Description Default
visit_input OnDemandVisitInput

The visit request data from the automation/typeform.

required
Source code in components/occupational_health/internal/business_logic/actions/on_demand_visit.py
def create_on_demand_visit(visit_input: OnDemandVisitInput) -> None:
    """Create an on-demand visit: Append row to GSheet.

    Args:
        visit_input: The visit request data from the automation/typeform.
    """
    from components.occupational_health.internal.helpers.gsheet_service import (
        OccupationalHealthGSheetService,
        SheetType,
    )
    from shared.services.google.spreadsheets import SpreadsheetsService

    # TODO @david.barthelemy Retrieve the user_id to create the visit in the database

    spreadsheets_service = SpreadsheetsService.get(
        credentials_config_key="GOOGLE_GSPREAD_SERVICE_ACCOUNT_SECRET_NAME"
    )
    gsheet_service = OccupationalHealthGSheetService(
        spreadsheets_service, SheetType.ON_DEMAND
    )

    gsheet_service.append_row(
        {
            "first_name": visit_input.first_name,
            "last_name": visit_input.last_name,
            "phone_number": visit_input.phone_number or "",
            "email": visit_input.email,
            "account_name": visit_input.account_name,
            "acc_id": visit_input.account_id or "",
            "visit_format": visit_input.visit_format or "",
            "visit_type": visit_input.visit_type or "",
            "if_return_visit_specify_motive": visit_input.if_return_visit_specify_motive
            or "",
            "if_on_demand_visit_specify_motive": visit_input.if_on_demand_visit_specify_motive
            or "",
            "if_pre_return_visit_specify_request_initiator": visit_input.if_pre_return_visit_specify_request_initiator
            or "",
            "work_stoppage_start_date": _format_date(
                visit_input.work_stoppage_start_date
            ),
            "work_stoppage_end_date": _format_date(visit_input.work_stoppage_end_date),
            "return_to_work_date": _format_date(visit_input.return_to_work_date),
            "utm_source": visit_input.utm_source or "",
            "utm_content": visit_input.utm_content or "",
            "is_hr_informed": visit_input.is_hr_informed or "",
            "is_employee_submitting_the_request": visit_input.is_employee_submitting_the_request
            or "",
            "person_submitting_request": visit_input.person_submitting_request or "",
            "additional_comments": visit_input.additional_comments or "",
            "Submitted At": _format_datetime(visit_input.submitted_at),
        }
    )

subscriber_documents

soft_delete_subscriber_document

soft_delete_subscriber_document(
    account_id, document_id, commit=False
)
Source code in components/occupational_health/internal/business_logic/actions/subscriber_documents.py
@tracer.wrap()
def soft_delete_subscriber_document(
    account_id: uuid.UUID,
    document_id: uuid.UUID,
    commit: bool = False,
) -> None:
    existing_doc = current_session.scalars(
        select(OccupationalHealthSubscriberDocument).where(
            OccupationalHealthSubscriberDocument.account_id == account_id,
            OccupationalHealthSubscriberDocument.id == document_id,
            OccupationalHealthSubscriberDocument.deleted_at.is_(None),
        )
    ).one_or_none()

    if not existing_doc:
        raise BaseErrorCode.missing_resource(
            f"No document found with account_id {account_id} and document_id {document_id}"
        )

    existing_doc.deleted_at = utcnow()

    if commit:
        current_session.commit()
    else:
        current_session.flush()

update_subscriber_document

update_subscriber_document(document, commit=False)
Source code in components/occupational_health/internal/business_logic/actions/subscriber_documents.py
@tracer.wrap()
def update_subscriber_document(
    document: UpdateSubscriberDocument,
    commit: bool = False,
) -> None:
    existing_doc = current_session.scalars(
        select(OccupationalHealthSubscriberDocument).where(
            OccupationalHealthSubscriberDocument.account_id == document.account_id,
            OccupationalHealthSubscriberDocument.id == document.id,
            OccupationalHealthSubscriberDocument.deleted_at.is_(None),
        )
    ).one_or_none()

    if not existing_doc:
        raise BaseErrorCode.missing_resource(
            f"No document found with account_id {document.account_id} and document_id {document.id}"
        )

    existing_doc.name = document.name
    existing_doc.document_type = document.document_type
    existing_doc.siret = document.siret

    if commit:
        current_session.commit()
    else:
        current_session.flush()

upload_subscriber_documents

upload_subscriber_documents(
    documents, skip_duplication_check=False
)
Source code in components/occupational_health/internal/business_logic/actions/subscriber_documents.py
@tracer.wrap()
def upload_subscriber_documents(
    documents: list[CreateSubscriberDocument],
    skip_duplication_check: bool = False,
) -> None:
    if documents:
        # Check duplicates for the first document's account_id (assuming all have the same account)
        check_duplicate_subscriber_files(
            account_id=documents[0].account_id,
            files=[doc.file for doc in documents],
            skip_duplication_check=skip_duplication_check,
        )

    for document in documents:
        with converted_file_mimetype_and_hash(document.name, document.file) as (
            converted_file,
            mime_type,
            content_hash,
        ):
            new_document = OccupationalHealthSubscriberDocument(
                document_type=document.type,
                account_id=AccountId(document.account_id),
                siret=document.siret,
                mime_type=mime_type,
                content_hash=content_hash,
                name=document.name,
            )
            current_session.add(new_document)
            new_document.upload_file(converted_file)

    current_session.commit()

workspace_actions

create_workspace_action

create_workspace_action(
    thesaurus_mean_id,
    target_type,
    target_id,
    status,
    creator_full_name,
    prevention_type,
    eta_date=None,
    note=None,
    siret=None,
    commit=True,
)

Create a new WorkspaceAction for a company or a member

Returns:

Type Description
UUID

The ID of the newly created AMT

Source code in components/occupational_health/internal/business_logic/actions/workspace_actions.py
def create_workspace_action(
    thesaurus_mean_id: uuid.UUID,
    target_type: WorkspaceActionType,
    target_id: uuid.UUID,
    status: WorkspaceActionStatus,
    creator_full_name: str,
    prevention_type: WorkspaceActionPreventionType,
    eta_date: Optional[date] = None,
    note: Optional[str] = None,
    siret: Optional[str] = None,
    commit: bool = True,
) -> uuid.UUID:
    """
    Create a new WorkspaceAction for a company or a member

    Returns:
        The ID of the newly created AMT
    """

    # TODO david.barthelemy: the creator_full_name must be replace as soon as you have a reference to the global profile. For now it's ok to live like this
    thesaurus_mean = get_or_raise_missing_resource(ThesaurusMean, thesaurus_mean_id)
    if target_type == WorkspaceActionType.MEMBER:
        account_id = None
        profile = get_or_raise_missing_resource(OccupationalHealthProfile, target_id)
    elif target_type == WorkspaceActionType.COMPANY:
        account_id = target_id
        profile = None
    else:
        raise NotImplementedError(f"unsupported target type: {target_type}")

    workspace_action = OccupationalHealthWorkspaceAction(
        title=thesaurus_mean.label,
        thesaurus_mean=thesaurus_mean,
        type=target_type,
        profile=profile,
        account_id=account_id,
        # actions for at the entity level (SIRET) must have an account
        siret=siret if account_id else None,
        prevention_type=prevention_type,
        eta_date=eta_date,
        status=status,
        note=note,
        actor_full_name=creator_full_name,
    )

    current_session.add(workspace_action)

    if commit:
        current_session.commit()
    else:
        current_session.flush()

    current_logger.info(
        "Created new WorkspaceAction",
        workspace_action_d=str(workspace_action.id),
        target_type=str(target_type),
        target_id=str(target_id),
        status=str(status),
    )

    return workspace_action.id

delete_workspace_action

delete_workspace_action(workspace_action_id, commit=True)

Delete an existing WorkspaceAction

Source code in components/occupational_health/internal/business_logic/actions/workspace_actions.py
def delete_workspace_action(
    workspace_action_id: uuid.UUID,
    commit: bool = True,
) -> None:
    """
    Delete an existing WorkspaceAction
    """
    workspace_action = get_or_raise_missing_resource(
        OccupationalHealthWorkspaceAction, workspace_action_id
    )

    current_session.delete(workspace_action)

    if commit:
        current_session.commit()
    else:
        current_session.flush()

    current_logger.info(
        "Deleted WorkspaceAction",
        workspace_action_id=str(workspace_action_id),
    )

update_workspace_action

update_workspace_action(
    workspace_action_id,
    status,
    prevention_type=None,
    eta_date=None,
    note=None,
    commit=True,
)

Update an existing WorkspaceAction

Only updates the fields: status, prevention_type, eta_date, note The target and action (thesaurus_mean) cannot be changed

Source code in components/occupational_health/internal/business_logic/actions/workspace_actions.py
def update_workspace_action(
    workspace_action_id: uuid.UUID,
    status: WorkspaceActionStatus,
    prevention_type: Optional[WorkspaceActionPreventionType] = None,
    eta_date: Optional[date] = None,
    note: Optional[str] = None,
    commit: bool = True,
) -> None:
    """
    Update an existing WorkspaceAction

    Only updates the fields: status, prevention_type, eta_date, note
    The target and action (thesaurus_mean) cannot be changed
    """
    workspace_action = get_or_raise_missing_resource(
        OccupationalHealthWorkspaceAction, workspace_action_id
    )

    workspace_action.status = status
    workspace_action.prevention_type = prevention_type
    workspace_action.eta_date = eta_date
    workspace_action.note = note

    if commit:
        current_session.commit()
    else:
        current_session.flush()

    current_logger.info(
        "Updated WorkspaceAction",
        workspace_action_id=str(workspace_action.id),
        status=str(status),
    )

components.occupational_health.public.business_logic

actions

merge_users

merge_occupational_health_users
merge_occupational_health_users(
    source_user_id,
    target_user_id,
    commit=False,
    event_bus=None,
)

Merge all occupational health data from source user to target user.

This function handles merging of all occupational health entities that reference user_id: - Dashboard admins - Notifies oncall to update spreadsheets if visits exist for the user

Parameters:

Name Type Description Default
source_user_id UserId | str

User ID to merge from

required
target_user_id UserId | str

User ID to merge into

required
commit bool

Whether to commit the transaction (default: False)

False
event_bus EventBus[CommittableFunction] | None

If provided, notifications are deferred until event_bus.apply(commit=True)

None

Returns:

Type Description
list[str]

List of log messages describing actions taken

Source code in components/occupational_health/public/business_logic/actions/merge_users.py
def merge_occupational_health_users(
    source_user_id: UserId | str,
    target_user_id: UserId | str,
    commit: bool = False,
    event_bus: EventBus[CommittableFunction] | None = None,
) -> list[str]:
    """
    Merge all occupational health data from source user to target user.

    This function handles merging of all occupational health entities that reference user_id:
    - Dashboard admins
    - Notifies oncall to update spreadsheets if visits exist for the user

    Args:
        source_user_id: User ID to merge from
        target_user_id: User ID to merge into
        commit: Whether to commit the transaction (default: False)
        event_bus: If provided, notifications are deferred until event_bus.apply(commit=True)

    Returns:
        List of log messages describing actions taken
    """
    logs: list[str] = []

    # Create internal event bus if none provided
    internal_event_bus: BasicEventBus | None = None
    if event_bus is None:
        internal_event_bus = BasicEventBus()

    effective_event_bus = mandatory(event_bus or internal_event_bus)

    # Merge dashboard admins
    logs += merge_occupational_health_dashboard_admins(
        source_user_id=UserId(source_user_id),
        target_user_id=UserId(target_user_id),
        commit=False,
    )

    # Check visits and notify oncall to update spreadsheets if needed
    logs += notify_about_user_merge_in_occupational_health_visits(
        source_user_id=UserId(source_user_id),
        target_user_id=UserId(target_user_id),
        event_bus=effective_event_bus,
    )

    if commit:
        from shared.helpers.db import current_session

        current_session.commit()

    # Only apply if we created an internal event bus (caller is responsible otherwise)
    if internal_event_bus is not None:
        internal_event_bus.apply(commit=commit)

    return logs

components.occupational_health.public.dependencies

AccountSearchResult dataclass

AccountSearchResult(id, name)

Represent a basic Account object retrieved from the Search

id instance-attribute

id

name instance-attribute

name

OCCUPATIONAL_HEALTH_COMPONENT_NAME module-attribute

OCCUPATIONAL_HEALTH_COMPONENT_NAME = 'occupational_health'

OccupationalHealthDependency

Bases: ABC

Dependency interface for occupational_health component to access country-specific functionality.

This interface abstracts away direct dependencies on country-specific components, allowing the occupational_health component to work with different country implementations.

create_profile_with_user abstractmethod

create_profile_with_user(profile_data)

Create user profile with user data.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def create_profile_with_user(
    self, profile_data: ProfileData
) -> tuple[UserId, GlobalProfileId]:
    """Create user profile with user data."""
    raise NotImplementedError()

get_account_admins_profile_ids abstractmethod

get_account_admins_profile_ids(account_id)

Get account admin profile IDs.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_account_admins_profile_ids(
    self, account_id: AccountId
) -> set[GlobalProfileId]:
    """Get account admin profile IDs."""
    raise NotImplementedError()

get_account_id abstractmethod

get_account_id(company_id)

Get account ID for a company.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_account_id(self, company_id: CompanyId) -> AccountId:
    """Get account ID for a company."""
    raise NotImplementedError()

get_account_id_and_siret abstractmethod

get_account_id_and_siret(company_id)

Get account ID and SIRET for a company.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_account_id_and_siret(
    self, company_id: CompanyId
) -> tuple[AccountId, str | None]:
    """Get account ID and SIRET for a company."""
    raise NotImplementedError()

get_account_name abstractmethod

get_account_name(account_id)

Get account name.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_account_name(self, account_id: AccountId) -> str:
    """Get account name."""
    raise NotImplementedError()

get_company_admins_profile_ids abstractmethod

get_company_admins_profile_ids(company_id)

Get company admin profile IDs.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_admins_profile_ids(
    self, company_id: CompanyId
) -> set[GlobalProfileId]:
    """Get company admin profile IDs."""
    raise NotImplementedError()

get_company_display_name_and_account_id abstractmethod

get_company_display_name_and_account_id(company_id)

Get company display name and associated account ID.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_display_name_and_account_id(
    self, company_id: CompanyId
) -> tuple[str, AccountId]:
    """Get company display name and associated account ID."""
    raise NotImplementedError()

get_company_entities_by_account_id abstractmethod

get_company_entities_by_account_id(account_id)

Get all DSN companies in an account.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_entities_by_account_id(
    self, account_id: AccountId
) -> list["DSNCompanyData"]:
    """Get all DSN companies in an account."""
    raise NotImplementedError()

get_company_from_siret abstractmethod

get_company_from_siret(siret)

Get company data from SIRET via DSN.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_from_siret(self, siret: str) -> Optional["DSNCompanyData"]:
    """Get company data from SIRET via DSN."""
    raise NotImplementedError()

get_company_ids_in_account abstractmethod

get_company_ids_in_account(account_id)

Get all company IDs in an account.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_ids_in_account(self, account_id: AccountId) -> list[CompanyId]:
    """Get all company IDs in an account."""
    raise NotImplementedError()

get_company_names_by_ids abstractmethod

get_company_names_by_ids(company_ids)

Get company names by IDs.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_names_by_ids(
    self, company_ids: set[CompanyId]
) -> dict[CompanyId, str]:
    """Get company names by IDs."""
    raise NotImplementedError()

get_company_siren abstractmethod

get_company_siren(company_id)

Get company SIREN.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_siren(self, company_id: CompanyId) -> str:
    """Get company SIREN."""
    raise NotImplementedError()

get_company_siret abstractmethod

get_company_siret(company_id, force_nic=None)

Get company SIRET, optionally forcing a specific NIC.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_company_siret(
    self, company_id: CompanyId, force_nic: str | None = None
) -> str | None:
    """Get company SIRET, optionally forcing a specific NIC."""
    raise NotImplementedError()

get_global_profile_id abstractmethod

get_global_profile_id(user_id)

Get global profile ID for a user.

Raises:

Type Description
UserIdNotFound

If the user ID does not exist.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_global_profile_id(self, user_id: UserId) -> GlobalProfileId | None:
    """Get global profile ID for a user.

    Raises:
        UserIdNotFound: If the user ID does not exist.
    """
    raise NotImplementedError()

get_ssn_and_ntt_for_user abstractmethod

get_ssn_and_ntt_for_user(user_id)

Get SSN and NTT of a user.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_ssn_and_ntt_for_user(
    self, user_id: UserId
) -> tuple[str | None, str | None]:
    """Get SSN and NTT of a user."""
    raise NotImplementedError()

get_ssn_and_ntt_for_users abstractmethod

get_ssn_and_ntt_for_users(user_ids)

Get SSN and NTT for multiple users.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_ssn_and_ntt_for_users(
    self, user_ids: Iterable[UserId]
) -> dict[UserId, tuple[str | None, str | None]]:
    """Get SSN and NTT for multiple users."""
    raise NotImplementedError()

get_user_id_by_global_profile_id_mapping_from_global_profile_ids abstractmethod

get_user_id_by_global_profile_id_mapping_from_global_profile_ids(
    global_profile_ids,
)

Get global profile ID -> user ID mapping from global health profile IDs.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_user_id_by_global_profile_id_mapping_from_global_profile_ids(
    self, global_profile_ids: Iterable[GlobalProfileId]
) -> dict[GlobalProfileId, UserId]:
    """Get global profile ID -> user ID mapping from global health profile IDs."""
    raise NotImplementedError()

get_user_id_by_global_profile_id_mapping_from_user_ids abstractmethod

get_user_id_by_global_profile_id_mapping_from_user_ids(
    user_ids,
)

Get global profile ID -> user ID mapping from user IDs.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_user_id_by_global_profile_id_mapping_from_user_ids(
    self, user_ids: Iterable[UserId]
) -> dict[GlobalProfileId, UserId]:
    """Get global profile ID -> user ID mapping from user IDs."""
    raise NotImplementedError()

get_user_id_from_global_profile_id abstractmethod

get_user_id_from_global_profile_id(global_profile_id)

Get user ID from global profile ID.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_user_id_from_global_profile_id(
    self, global_profile_id: GlobalProfileId
) -> UserId:
    """Get user ID from global profile ID."""
    raise NotImplementedError()

get_user_slack_id abstractmethod

get_user_slack_id(user_id)

Get Slack handle for an Alan user.

Returns Slack handle (e.g., "john.doe") or None if not found. Used for pinging users in Slack notifications.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_user_slack_id(self, user_id: UserId) -> str | None:
    """
    Get Slack handle for an Alan user.

    Returns Slack handle (e.g., "john.doe") or None if not found.
    Used for pinging users in Slack notifications.
    """
    raise NotImplementedError()

get_work_stoppages_for_users abstractmethod

get_work_stoppages_for_users(
    user_ids, ever_active_during_period=None
)

Get work stoppages for users.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def get_work_stoppages_for_users(
    self,
    user_ids: Iterable[UserId],
    ever_active_during_period: tuple[date, date] | None = None,
) -> dict[UserId, list["WorkStoppage"]]:
    """Get work stoppages for users."""
    raise NotImplementedError()

prepare_mailer_params_for_billing_invoice_email abstractmethod

prepare_mailer_params_for_billing_invoice_email(
    email_address,
    template_name,
    template_args,
    attachments,
    invoice_ids,
)

Return a MailerParam instance to be used for sending billing invoice emails.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def prepare_mailer_params_for_billing_invoice_email(
    self,
    email_address: str,
    template_name: str,
    template_args: dict[str, Any],
    attachments: list[dict[str, str]] | None,
    invoice_ids: list[int],
) -> None | BaseMailerParams:
    """
    Return a MailerParam instance to be used for sending billing invoice emails.
    """
    raise NotImplementedError()

search_accounts_by_name abstractmethod

search_accounts_by_name(query, account_ids)

Search accounts by name.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def search_accounts_by_name(
    self, query: str, account_ids: Iterable[UUID]
) -> list[AccountSearchResult]:
    """Search accounts by name."""
    raise NotImplementedError()

set_ssn_ntt_on_user abstractmethod

set_ssn_ntt_on_user(user_id, ssn, ntt)

Set SSN and NTT on user.

Source code in components/occupational_health/public/dependencies.py
@abstractmethod
def set_ssn_ntt_on_user(
    self, user_id: UserId, ssn: str | None, ntt: str | None
) -> None:
    """Set SSN and NTT on user."""
    raise NotImplementedError()

ProfileData dataclass

ProfileData(
    email=None,
    first_name=None,
    last_name=None,
    birth_date=None,
    language=None,
    phone_number=None,
    gender=None,
)

The profile data used to create a user and profile.

birth_date class-attribute instance-attribute

birth_date = None

email class-attribute instance-attribute

email = None

first_name class-attribute instance-attribute

first_name = None

gender class-attribute instance-attribute

gender = None

language class-attribute instance-attribute

language = None

last_name class-attribute instance-attribute

last_name = None

phone_number class-attribute instance-attribute

phone_number = None

get_app_dependency

get_app_dependency()

Retrieves the occupational_health dependency

Source code in components/occupational_health/public/dependencies.py
def get_app_dependency() -> OccupationalHealthDependency:
    """Retrieves the occupational_health dependency"""
    from flask import current_app

    app = cast("CustomFlask", current_app)
    return cast(
        "OccupationalHealthDependency",
        app.get_component_dependency(OCCUPATIONAL_HEALTH_COMPONENT_NAME),
    )

set_app_dependency

set_app_dependency(dependency)

Sets the occupational_health dependency to the app

Source code in components/occupational_health/public/dependencies.py
def set_app_dependency(dependency: OccupationalHealthDependency) -> None:
    """Sets the occupational_health dependency to the app"""
    from flask import current_app

    cast("CustomFlask", current_app).add_component_dependency(
        OCCUPATIONAL_HEALTH_COMPONENT_NAME, dependency
    )

components.occupational_health.public.employment

employment_consumer

Note: Do not import local country code here, do it in the internal component after checking the country code.

occupational_health_employment_change_consumer

occupational_health_employment_change_consumer(
    employment_change, event_bus_orchestrator
)
Source code in components/occupational_health/public/employment/employment_consumer.py
def occupational_health_employment_change_consumer(  # noqa: D103
    employment_change: "EmploymentChange[FrExtendedValues]",
    event_bus_orchestrator: "EventBusOrchestrator",
) -> None:
    from components.employment.public.enums import CountryCode

    if employment_change.country_code != CountryCode.fr:
        return

    from components.occupational_health.internal.business_logic.actions.employment.on_employment_change import (
        on_employment_change,
    )

    on_employment_change(
        employment_change=employment_change,
        event_bus_orchestrator=event_bus_orchestrator,
    )

requires_siret_information

requires_siret_information

requires_siret_information(account_id)

Return whether the given account ID requires SIRET information to be provided when tracking employment movements.

Source code in components/occupational_health/public/employment/requires_siret_information.py
def requires_siret_information(account_id: UUID) -> bool:
    """
    Return whether the given account ID requires SIRET information to be provided when tracking employment movements.
    """
    from components.occupational_health.public.queries.has_active_occupational_health_contract import (
        has_active_or_upcoming_occupational_health_contract,
    )

    account_id = AccountId(account_id)

    if not has_active_or_upcoming_occupational_health_contract(account_id):
        # No need to collect the SIRET if there is no active contract
        return False

    rules = current_session.query(AffiliationStrategyRule).filter(  # noqa: ALN085
        AffiliationStrategyRule.account_id == account_id,
    )

    rules_count = rules.count()

    if rules_count == 0:
        # No rule = will be manual anyway (should not happen often, as it indicates a misconfiguration)
        return False

    if rules_count == 1:
        rule: AffiliationStrategyRule = rules.one()

        is_always_affiliate = (
            rule.action == RuleAction.AFFILIATE and rule.is_wildcard_siret
        )
        if is_always_affiliate:
            return False

        is_manual = (
            rule.action == RuleAction.REQUIRE_MANUAL_REVIEW and rule.is_wildcard_siret
        )
        if is_manual:
            return False

        # Misconfiguration: only one rule, SIRET-based. We'll probably need the SIRET down the line.
        return True

    # Multiple rules = SIRET is needed to make a decision
    return True

components.occupational_health.public.entities

billing

BillingStrategy

Bases: AlanBaseEnum

If the customer is getting an invoice per SIREN or per SIRET.

SIREN_BASED class-attribute instance-attribute
SIREN_BASED = 'siren_based'
SIRET_BASED class-attribute instance-attribute
SIRET_BASED = 'siret_based'

ContractInvoicesData dataclass

ContractInvoicesData(*, contract_ref, balance, invoices)

Bases: DataClassJsonMixin

Invoices grouped by contract ref, with a live contract-level balance.

balance instance-attribute
balance
contract_ref instance-attribute
contract_ref
invoices instance-attribute
invoices

CustomerAddress dataclass

CustomerAddress(street, postal_code, city, country_code)

Bases: DataClassJsonMixin

Postal address of a customer.

city instance-attribute
city
country_code instance-attribute
country_code
postal_code instance-attribute
postal_code
street instance-attribute
street

CustomerData dataclass

CustomerData(
    *, entity_name, siren, siret=None, postal_address
)

Bases: DataClassJsonMixin

Customer data needed for billing purposes.

billing_strategy property
billing_strategy

Return the billing strategy for this customer.

entity_name instance-attribute
entity_name
postal_address instance-attribute
postal_address
siren instance-attribute
siren
siret class-attribute instance-attribute
siret = None

InvoiceFileData dataclass

InvoiceFileData(*, file_name, file)

Data representing an invoice file.

file instance-attribute
file
file_name instance-attribute
file_name

InvoiceToGenerateData dataclass

InvoiceToGenerateData(
    *,
    billing_period,
    billing_year,
    contract_type,
    contract_ref,
    entity_name,
    siren,
    siret,
    contract_yearly_price_per_employee_in_cents,
    contract_has_installment_plan=None,
    references_of_employees_affiliated_over_the_year
)

Bases: DataClassJsonMixin

Data needed to generate an invoice for a customer.

billing_period instance-attribute
billing_period
billing_year instance-attribute
billing_year

The year of the billing period. END_OF_YEAR_REGUL invoices are issued on 01/01 of the year Y+1 but will hold the original year Y in billing_year.

contract_has_installment_plan class-attribute instance-attribute
contract_has_installment_plan = None
contract_ref instance-attribute
contract_ref
contract_type instance-attribute
contract_type
contract_yearly_price_per_employee_in_cents instance-attribute
contract_yearly_price_per_employee_in_cents
entity_name instance-attribute
entity_name
references_of_employees_affiliated_over_the_year instance-attribute
references_of_employees_affiliated_over_the_year

List of opaque references of all employees affiliated on this subscription/contract, during the entire year of the billing period (not just the specified billing period).

siren instance-attribute
siren
siret instance-attribute
siret

IssuedInvoiceData dataclass

IssuedInvoiceData(
    *,
    invoice_id,
    invoice_number,
    event_date,
    issued_date,
    due_date,
    total_invoice_amount,
    remaining_balance,
    contract_ref,
    entity_name,
    siren,
    siret,
    has_appendix,
    email_sent_at
)

Bases: DataClassJsonMixin

Data representing an actual issued invoice from the Invoice model. Used to display invoices in the company admin dashboard.

contract_ref instance-attribute
contract_ref
due_date instance-attribute
due_date
email_sent_at instance-attribute
email_sent_at
entity_name instance-attribute
entity_name
event_date instance-attribute
event_date
has_appendix instance-attribute
has_appendix
invoice_id instance-attribute
invoice_id
invoice_number instance-attribute
invoice_number
issued_date instance-attribute
issued_date
remaining_balance instance-attribute
remaining_balance
siren instance-attribute
siren
siret instance-attribute
siret
total_invoice_amount instance-attribute
total_invoice_amount

OccupationalHealthBillingPeriod

Bases: AlanBaseEnum

Billing period for occupational health services.

END_OF_YEAR_REGUL class-attribute instance-attribute
END_OF_YEAR_REGUL = 'END_OF_YEAR_REGUL'
Q1 class-attribute instance-attribute
Q1 = 'Q1'
Q2 class-attribute instance-attribute
Q2 = 'Q2'
Q3 class-attribute instance-attribute
Q3 = 'Q3'
Q4 class-attribute instance-attribute
Q4 = 'Q4'
to_validity_period
to_validity_period(year)

The period during which we'd consider affiliations for billing purposes.

Source code in components/occupational_health/public/entities/billing.py
def to_validity_period(self, year: int) -> ValidityPeriod:
    """The period during which we'd consider affiliations for billing purposes."""
    start_date = date(year, 1, 1)
    match self:
        case OccupationalHealthBillingPeriod.Q1:
            # Only consider the affiliations active on 01/01
            end_date = date(year, 1, 1)
        case OccupationalHealthBillingPeriod.Q2:
            # Bill for all affiliations active between 01/01 and 31/03
            end_date = date(year, 3, 31)
        case OccupationalHealthBillingPeriod.Q3:
            # In Q3 we bill for affiliations until the end of Q2
            end_date = date(year, 6, 30)
        case OccupationalHealthBillingPeriod.Q4:
            # In Q4 we bill for affiliations until the end of Q3
            end_date = date(year, 9, 30)
        case OccupationalHealthBillingPeriod.END_OF_YEAR_REGUL:
            # Then at the end of the year, we bill for all affiliations during the year
            end_date = date(year, 12, 31)
        case _:
            raise NotImplementedError(f"Unsupported billing period: {self}")

    return ValidityPeriod(
        start_date=start_date,
        end_date=end_date,
    )

dmst

AdministrativeProfile dataclass

AdministrativeProfile(
    *,
    ssn,
    health_statuses,
    risk_category,
    notes,
    medical_record_sharing_consent,
    medical_record_access_consent,
    health_data_sharing_consent,
    video_consultation_consent,
    orient_prevention_of_work_consent,
    transfer_dmst_to_spsti_consent,
    medical_record_sharing_consent_collected_at=None,
    medical_record_access_consent_collected_at=None,
    health_data_sharing_consent_collected_at=None,
    video_consultation_consent_collected_at=None,
    orient_prevention_of_work_consent_collected_at=None,
    transfer_dmst_to_spsti_consent_collected_at=None,
    personal_email=None,
    professional_email=None,
    phone_number=None
)

Bases: DataClassJsonMixin

Administrative information about a member's occupational health status.

Keep in sync with: frontend/apps/medical-software/app/hooks/useProfileDmstQuery.ts

Used in the HP medical app (DMST) for the "Administrative side modal"

health_data_sharing_consent
health_data_sharing_consent_collected_at = None
health_statuses instance-attribute
health_statuses
medical_record_access_consent
medical_record_access_consent_collected_at = None
medical_record_sharing_consent
medical_record_sharing_consent_collected_at = None
notes instance-attribute
notes
orient_prevention_of_work_consent
orient_prevention_of_work_consent_collected_at = None
personal_email class-attribute instance-attribute
personal_email = None
phone_number class-attribute instance-attribute
phone_number = None
professional_email class-attribute instance-attribute
professional_email = None
risk_category instance-attribute
risk_category
ssn instance-attribute
ssn
transfer_dmst_to_spsti_consent
transfer_dmst_to_spsti_consent_collected_at = None
video_consultation_consent
video_consultation_consent_collected_at = None

ContractType

Bases: AlanBaseEnum

Contract types from Présanse thesaurus level 1.

cdd class-attribute instance-attribute
cdd = 'cdd'
cdi class-attribute instance-attribute
cdi = 'cdi'
contrat_accompagnement_emploi class-attribute instance-attribute
contrat_accompagnement_emploi = (
    "contrat_accompagnement_emploi"
)
contrat_apprentissage class-attribute instance-attribute
contrat_apprentissage = 'contrat_apprentissage'
contrat_groupements_employeurs class-attribute instance-attribute
contrat_groupements_employeurs = (
    "contrat_groupements_employeurs"
)
contrat_initiative_emploi class-attribute instance-attribute
contrat_initiative_emploi = 'contrat_initiative_emploi'
contrat_intermittent class-attribute instance-attribute
contrat_intermittent = 'contrat_intermittent'
contrat_professionnalisation class-attribute instance-attribute
contrat_professionnalisation = (
    "contrat_professionnalisation"
)
contrat_saisonnier class-attribute instance-attribute
contrat_saisonnier = 'contrat_saisonnier'
contrat_temporaire class-attribute instance-attribute
contrat_temporaire = 'contrat_temporaire'
contrat_unique_insertion class-attribute instance-attribute
contrat_unique_insertion = 'contrat_unique_insertion'

CurriculumLaboris dataclass

CurriculumLaboris(*, occupational_medical_history, jobs)

Bases: DataClassJsonMixin

Member curriculum laboris (work information): previous job, description of work, tasks, etc.

Keep in sync with: frontend/apps/medical-software/app/hooks/useProfileDmstQuery.ts

Used in the HP medical app (DMST) for the "Professional" tab

jobs instance-attribute
jobs
occupational_medical_history instance-attribute
occupational_medical_history

Dmst dataclass

Dmst(
    *,
    occupational_health_profile_id,
    first_name,
    last_name,
    gender,
    birthdate,
    medical_secrecy_worker_record_id,
    curriculum_laboris,
    administrative_profile,
    health_history=None,
    google_drive_folder_id=None
)

Bases: DataClassJsonMixin

Information about a member, such as their name, gender, birthdate, etc.

Keep in sync with: frontend/apps/medical-software/app/hooks/useProfileDmstQuery.ts

Used in the HP medical app (DMST)

administrative_profile instance-attribute
administrative_profile
birthdate instance-attribute
birthdate
curriculum_laboris instance-attribute
curriculum_laboris
first_name instance-attribute
first_name
gender instance-attribute
gender
google_drive_folder_id class-attribute instance-attribute
google_drive_folder_id = None
health_history class-attribute instance-attribute
health_history = None
last_name instance-attribute
last_name
medical_secrecy_worker_record_id instance-attribute
medical_secrecy_worker_record_id
occupational_health_profile_id instance-attribute
occupational_health_profile_id

MemberJob dataclass

MemberJob(
    *,
    id,
    title,
    start_date,
    end_date,
    employer,
    is_past_job,
    medical_secrecy_worker_job_id,
    description,
    working_hours,
    missions,
    physical_conditions,
    organizational_conditions,
    mental_conditions,
    tools,
    worn_equipment,
    risks_and_advice,
    contract_type,
    exposition_notations
)

Bases: DataClassJsonMixin

Represents a single job held by a member, including employment details and duration.

Keep in sync with: frontend/apps/medical-software/app/hooks/useProfileDmstQuery.ts

Used in the HP medical app (DMST) for the "Professional" tab

contract_type instance-attribute
contract_type
description instance-attribute
description
employer instance-attribute
employer
end_date instance-attribute
end_date
exposition_notations instance-attribute
exposition_notations
id instance-attribute
id
is_past_job instance-attribute
is_past_job
medical_secrecy_worker_job_id instance-attribute
medical_secrecy_worker_job_id
mental_conditions instance-attribute
mental_conditions
missions instance-attribute
missions
organizational_conditions instance-attribute
organizational_conditions
physical_conditions instance-attribute
physical_conditions
risks_and_advice instance-attribute
risks_and_advice
start_date instance-attribute
start_date
title instance-attribute
title
tools instance-attribute
tools
working_hours instance-attribute
working_hours
worn_equipment instance-attribute
worn_equipment

OccupationalMedicalHistory dataclass

OccupationalMedicalHistory(*, types, description)

Bases: DataClassJsonMixin

Represents the medical history pertaining to occupational health.

Note: this is different from the general HealthHistory, which is not tied to occupational (work) history

description instance-attribute
description
types instance-attribute
types

OccupationalMedicalHistoryType

Bases: AlanBaseEnum

Representing types of occupational medical history, can be multiple

occupational_accident class-attribute instance-attribute
occupational_accident = 'occupational_accident'
occupational_disease class-attribute instance-attribute
occupational_disease = 'occupational_disease'

doctolib_matching

Entity dataclasses for Doctolib matching v2.

These mirror the output buckets of the ActivePieces automation "Matching Doctolib bookings and VIP spreadsheet v3".

Automation correspondence: - DoctolibCsvRow: parsed row from the Doctolib CSV export (automation step "Parse CSV") - NewlyScheduledItem: automation step "Map matched rows" → newly_scheduled bucket - ReschedulingItem: automation step "Map matched rows" → rescheduling bucket - CancelledItem: automation step "Filter cancelled" → cancelled bucket - NewRowItem: automation step "Map unmatched to new rows" → new_rows bucket - UnmatchedItem: rows that could not be matched to any Alan user (for manual resolution) - MatchingV2Response: final aggregated response sent to the frontend

ApplyResultSummary dataclass

ApplyResultSummary(total, succeeded, failed, errors)

Bases: DataClassJsonMixin

Summary of a batch GSheet write operation.

errors instance-attribute
errors
failed instance-attribute
failed
succeeded instance-attribute
succeeded
total instance-attribute
total

CancelledItem dataclass

CancelledItem(
    gsheet_row_index,
    user_id,
    first_name,
    last_name,
    date_planned,
)

Bases: DataClassJsonMixin

A gsheet row whose user was expected to reschedule but is absent from the CSV.

date_planned instance-attribute
date_planned
first_name instance-attribute
first_name
gsheet_row_index instance-attribute
gsheet_row_index
last_name instance-attribute
last_name
user_id instance-attribute
user_id

ConsultationType

Bases: str, Enum

Type of medical consultation derived from the Doctolib appointment motif.

ONSITE class-attribute instance-attribute
ONSITE = 'onsite visit'
TELECONSULTATION class-attribute instance-attribute
TELECONSULTATION = 'teleconsultation'

DoctolibCsvRow dataclass

DoctolibCsvRow(
    csv_row_index,
    doctolib_patient_id,
    first_name,
    last_name,
    birth_name,
    birth_date,
    email,
    phone,
    appointment_date,
    appointment_start,
    appointment_end,
    duration,
    agenda,
    motif,
    consultation_type,
    id="",
    notes="",
    date_saisie="",
    date_derniere_mise_a_jour="",
    cree_par="",
    statut="",
    rdv_internet="",
    nouveau_patient="",
    honoraires_cb="",
    honoraires_especes="",
    honoraires_cheques="",
    honoraires_tiers_payant="",
    honoraires_total_regle="",
    honoraires_restant_a_regler="",
    agenda_ressource="",
    civilite="",
    adresse="",
    code_postal="",
    ville="",
    heure_arrivee="",
    heure_prise_en_charge="",
    heure_depart="",
    symptomes_covid19="",
    identifiant_externe="",
    temps_reservation_secondes="",
    a_ete_importe="",
    a_ete_reserve_absence="",
    a_ete_reserve_creneau_deja_reserve="",
    a_ete_reserve_hors_horaires="",
    dispositif_reservation_patients="",
    personne_referente="",
    patient_notification_consent="",
    patient_legal_gender="",
    telephone_secondaire="",
    patient_insurance_sector="",
)

Bases: DataClassJsonMixin

A single normalized row from the Doctolib CSV export.

a_ete_importe class-attribute instance-attribute
a_ete_importe = ''
a_ete_reserve_absence class-attribute instance-attribute
a_ete_reserve_absence = ''
a_ete_reserve_creneau_deja_reserve class-attribute instance-attribute
a_ete_reserve_creneau_deja_reserve = ''
a_ete_reserve_hors_horaires class-attribute instance-attribute
a_ete_reserve_hors_horaires = ''
adresse class-attribute instance-attribute
adresse = ''
agenda instance-attribute
agenda
agenda_ressource class-attribute instance-attribute
agenda_ressource = ''
appointment_date instance-attribute
appointment_date
appointment_end instance-attribute
appointment_end
appointment_start instance-attribute
appointment_start
birth_date instance-attribute
birth_date
birth_name instance-attribute
birth_name
civilite class-attribute instance-attribute
civilite = ''
code_postal class-attribute instance-attribute
code_postal = ''
consultation_type instance-attribute
consultation_type
cree_par class-attribute instance-attribute
cree_par = ''
csv_row_index instance-attribute
csv_row_index
date_derniere_mise_a_jour class-attribute instance-attribute
date_derniere_mise_a_jour = ''
date_saisie class-attribute instance-attribute
date_saisie = ''
dispositif_reservation_patients class-attribute instance-attribute
dispositif_reservation_patients = ''
doctolib_patient_id instance-attribute
doctolib_patient_id
duration instance-attribute
duration
email instance-attribute
email
first_name instance-attribute
first_name
heure_arrivee class-attribute instance-attribute
heure_arrivee = ''
heure_depart class-attribute instance-attribute
heure_depart = ''
heure_prise_en_charge class-attribute instance-attribute
heure_prise_en_charge = ''
honoraires_cb class-attribute instance-attribute
honoraires_cb = ''
honoraires_cheques class-attribute instance-attribute
honoraires_cheques = ''
honoraires_especes class-attribute instance-attribute
honoraires_especes = ''
honoraires_restant_a_regler class-attribute instance-attribute
honoraires_restant_a_regler = ''
honoraires_tiers_payant class-attribute instance-attribute
honoraires_tiers_payant = ''
honoraires_total_regle class-attribute instance-attribute
honoraires_total_regle = ''
id class-attribute instance-attribute
id = ''
identifiant_externe class-attribute instance-attribute
identifiant_externe = ''
last_name instance-attribute
last_name
motif instance-attribute
motif
notes class-attribute instance-attribute
notes = ''
nouveau_patient class-attribute instance-attribute
nouveau_patient = ''
patient_insurance_sector class-attribute instance-attribute
patient_insurance_sector = ''
patient_legal_gender = ''
patient_notification_consent = ''
personne_referente class-attribute instance-attribute
personne_referente = ''
phone instance-attribute
phone
rdv_internet class-attribute instance-attribute
rdv_internet = ''
statut class-attribute instance-attribute
statut = ''
symptomes_covid19 class-attribute instance-attribute
symptomes_covid19 = ''
telephone_secondaire class-attribute instance-attribute
telephone_secondaire = ''
temps_reservation_secondes class-attribute instance-attribute
temps_reservation_secondes = ''
ville class-attribute instance-attribute
ville = ''

MatchingV2Response dataclass

MatchingV2Response(
    newly_scheduled,
    rescheduling,
    cancelled,
    new_rows,
    unmatched,
    stats,
)

Bases: DataClassJsonMixin

Full matching response returned to the frontend.

cancelled instance-attribute
cancelled
new_rows instance-attribute
new_rows
newly_scheduled instance-attribute
newly_scheduled
rescheduling instance-attribute
rescheduling
stats instance-attribute
stats
unmatched instance-attribute
unmatched

NewRowItem dataclass

NewRowItem(
    first_name,
    last_name,
    email,
    phone,
    birth_date,
    consultation_type,
    date_planned,
    hour_start,
    hour_end,
    hp,
    docto_patient_id,
    match_priority,
    candidates=list(),
)

Bases: DataClassJsonMixin

A matched row for a user not currently tracked in the gsheet.

birth_date instance-attribute
birth_date
candidates class-attribute instance-attribute
candidates = field(default_factory=list)
consultation_type class-attribute instance-attribute
consultation_type = field(
    metadata={
        "marshmallow_field": Enum(
            ConsultationType, by_value=True
        )
    }
)
date_planned instance-attribute
date_planned
docto_patient_id instance-attribute
docto_patient_id
email instance-attribute
email
first_name instance-attribute
first_name
hour_end instance-attribute
hour_end
hour_start instance-attribute
hour_start
hp instance-attribute
hp
last_name instance-attribute
last_name
match_priority instance-attribute
match_priority
phone instance-attribute
phone

NewlyScheduledItem dataclass

NewlyScheduledItem(
    gsheet_row_index,
    user_id,
    first_name,
    last_name,
    doctolib_email,
    doctolib_phone,
    doctolib_birth_date,
    hp,
    consultation_type,
    date_planned,
    hour_start,
    hour_end,
    docto_patient_id,
    match_priority,
)

Bases: DataClassJsonMixin

A matched row that fills a pending visit slot in the predictable gsheet.

consultation_type class-attribute instance-attribute
consultation_type = field(
    metadata={
        "marshmallow_field": Enum(
            ConsultationType, by_value=True
        )
    }
)
date_planned instance-attribute
date_planned
docto_patient_id instance-attribute
docto_patient_id
doctolib_birth_date instance-attribute
doctolib_birth_date
doctolib_email instance-attribute
doctolib_email
doctolib_phone instance-attribute
doctolib_phone
first_name instance-attribute
first_name
gsheet_row_index instance-attribute
gsheet_row_index
hour_end instance-attribute
hour_end
hour_start instance-attribute
hour_start
hp instance-attribute
hp
last_name instance-attribute
last_name
match_priority instance-attribute
match_priority
user_id instance-attribute
user_id

ReschedulingItem dataclass

ReschedulingItem(
    gsheet_row_index,
    user_id,
    first_name,
    last_name,
    old_date,
    old_hour,
    old_hp,
    new_date,
    new_hour,
    new_hp,
    hour_end,
    consultation_type,
    docto_patient_id,
    match_priority,
)

Bases: DataClassJsonMixin

A matched row where the visit was already scheduled but date/hour/HP changed.

consultation_type class-attribute instance-attribute
consultation_type = field(
    metadata={
        "marshmallow_field": Enum(
            ConsultationType, by_value=True
        )
    }
)
docto_patient_id instance-attribute
docto_patient_id
first_name instance-attribute
first_name
gsheet_row_index instance-attribute
gsheet_row_index
hour_end instance-attribute
hour_end
last_name instance-attribute
last_name
match_priority instance-attribute
match_priority
new_date instance-attribute
new_date
new_hour instance-attribute
new_hour
new_hp instance-attribute
new_hp
old_date instance-attribute
old_date
old_hour instance-attribute
old_hour
old_hp instance-attribute
old_hp
user_id instance-attribute
user_id

UnmatchedItem dataclass

UnmatchedItem(csv_row, candidates=list())

Bases: DataClassJsonMixin

A CSV row that could not be automatically matched to any Alan user.

candidates class-attribute instance-attribute
candidates = field(default_factory=list)
csv_row instance-attribute
csv_row

health_history

Health data entities for occupational health worker medical records.

These entities correspond to the TypeScript interfaces in: frontend/apps/medical-software/app/hooks/types.ts

NOTE: This is a duplicate of components.medical_secrecy.internal.entities.occupational_health_worker_medical_record_health_history to avoid cross-component dependencies. Keep in sync manually. TODO: @david.barthelemy: clean this after talking with Alexandre and Mickael

AlcoholStatus

Bases: AlanBaseEnum

Alcohol consumption status.

addicted class-attribute instance-attribute
addicted = 'addicted'
casual class-attribute instance-attribute
casual = 'casual'
deprived class-attribute instance-attribute
deprived = 'deprived'
no class-attribute instance-attribute
no = 'no'

AllergyItem dataclass

AllergyItem(
    *,
    label=None,
    status=None,
    start_date_year=None,
    observation=None
)

Bases: DataClassJsonMixin

Allergy item.

Corresponds to AllergyItem TypeScript interface.

label class-attribute instance-attribute
label = None
observation class-attribute instance-attribute
observation = None
start_date_year class-attribute instance-attribute
start_date_year = None
status class-attribute instance-attribute
status = None

AllergyStatus

Bases: AlanBaseEnum

Status of an allergy.

cured class-attribute instance-attribute
cured = 'cured'
in_treatment class-attribute instance-attribute
in_treatment = 'in_treatment'
untreated class-attribute instance-attribute
untreated = 'untreated'

EyeHealthItem dataclass

EyeHealthItem(
    *,
    optical_correction=None,
    ophthalmologic_follow_up=None,
    observation=None
)

Bases: DataClassJsonMixin

Corresponds to EyeHealthItem TypeScript interface.

observation class-attribute instance-attribute
observation = None
ophthalmologic_follow_up class-attribute instance-attribute
ophthalmologic_follow_up = None
optical_correction class-attribute instance-attribute
optical_correction = None

FamilyHistoryItem dataclass

FamilyHistoryItem(
    *, label=None, family_members=list(), observation=None
)

Bases: DataClassJsonMixin

Family history item.

Corresponds to FamilyHistoryItem TypeScript interface.

family_members class-attribute instance-attribute
family_members = field(default_factory=list)
label class-attribute instance-attribute
label = None
observation class-attribute instance-attribute
observation = None

FamilyHistoryMember dataclass

FamilyHistoryMember(*, age=None, status=None)

Bases: DataClassJsonMixin

Family history member information.

Corresponds to FamilyHistoryMember TypeScript interface.

age class-attribute instance-attribute
age = None
status class-attribute instance-attribute
status = None

FamilyStatus

Bases: AlanBaseEnum

Family member status.

aunt class-attribute instance-attribute
aunt = 'aunt'
brother class-attribute instance-attribute
brother = 'brother'
father class-attribute instance-attribute
father = 'father'
grand_father class-attribute instance-attribute
grand_father = 'grand_father'
grand_mother class-attribute instance-attribute
grand_mother = 'grand_mother'
great_grand_father class-attribute instance-attribute
great_grand_father = 'great_grand_father'
great_grand_mother class-attribute instance-attribute
great_grand_mother = 'great_grand_mother'
mother class-attribute instance-attribute
mother = 'mother'
other class-attribute instance-attribute
other = 'other'
sister class-attribute instance-attribute
sister = 'sister'
uncle class-attribute instance-attribute
uncle = 'uncle'

HealthHistory dataclass

HealthHistory(
    *,
    medical_surgical_history=list(),
    allergies=list(),
    treatment=None,
    vaccine=None,
    family_history=list(),
    lifestyle=None,
    transport_mode=None,
    personal_situation=None,
    gynecological_follow_up=None,
    gynecological_observation=None,
    contraceptives=None,
    eye_health=None,
    medical_observation=None,
    health_measures=list()
)

Bases: DataClassJsonMixin

Health history for occupational health worker medical record.

This dataclass contains all health-related information that needs to be encrypted and stored in the OccupationalHealthWorkerMedicalRecord.health_history field.

All fields correspond to the forms defined in: frontend/apps/medical-software/app/pages/MemberHealthTab.tsx

allergies class-attribute instance-attribute
allergies = field(default_factory=list)
contraceptives class-attribute instance-attribute
contraceptives = None
eye_health class-attribute instance-attribute
eye_health = None
family_history class-attribute instance-attribute
family_history = field(default_factory=list)
gynecological_follow_up class-attribute instance-attribute
gynecological_follow_up = None
gynecological_observation class-attribute instance-attribute
gynecological_observation = None
health_measures class-attribute instance-attribute
health_measures = field(default_factory=list)
lifestyle class-attribute instance-attribute
lifestyle = None
medical_observation class-attribute instance-attribute
medical_observation = None
medical_surgical_history class-attribute instance-attribute
medical_surgical_history = field(default_factory=list)
personal_situation class-attribute instance-attribute
personal_situation = None
transport_mode class-attribute instance-attribute
transport_mode = None
treatment class-attribute instance-attribute
treatment = None
vaccine class-attribute instance-attribute
vaccine = None

LifestyleItem dataclass

LifestyleItem(
    *,
    alcohol_status=None,
    alcohol_observation=None,
    smoking_status=None,
    smoking_observation=None,
    other_addictive_behaviors=None,
    physical_activity_status=None,
    physical_activity_observations=None,
    food=None,
    sleep=None,
    lifestyle_observations=None
)

Bases: DataClassJsonMixin

Lifestyle information.

Corresponds to LifestyleItem TypeScript interface.

alcohol_observation class-attribute instance-attribute
alcohol_observation = None
alcohol_status class-attribute instance-attribute
alcohol_status = None
food class-attribute instance-attribute
food = None
lifestyle_observations class-attribute instance-attribute
lifestyle_observations = None
other_addictive_behaviors class-attribute instance-attribute
other_addictive_behaviors = None
physical_activity_observations class-attribute instance-attribute
physical_activity_observations = None
physical_activity_status class-attribute instance-attribute
physical_activity_status = None
sleep class-attribute instance-attribute
sleep = None
smoking_observation class-attribute instance-attribute
smoking_observation = None
smoking_status class-attribute instance-attribute
smoking_status = None

LivingStatus

Bases: AlanBaseEnum

Living status.

alone class-attribute instance-attribute
alone = 'alone'
couple class-attribute instance-attribute
couple = 'couple'
family class-attribute instance-attribute
family = 'family'
house_share class-attribute instance-attribute
house_share = 'house_share'
other class-attribute instance-attribute
other = 'other'
single_parent_family class-attribute instance-attribute
single_parent_family = 'single_parent_family'

MedicalSurgicalHistoryItem dataclass

MedicalSurgicalHistoryItem(
    *,
    label=None,
    status=None,
    start_date_year=None,
    observation=None
)

Bases: DataClassJsonMixin

Medical and surgical history item.

Corresponds to MedicalSurgicalHistoryItem TypeScript interface.

label class-attribute instance-attribute
label = None
observation class-attribute instance-attribute
observation = None
start_date_year class-attribute instance-attribute
start_date_year = None
status class-attribute instance-attribute
status = None

MedicalSurgicalHistoryStatus

Bases: AlanBaseEnum

Status of a medical or surgical history item.

cured class-attribute instance-attribute
cured = 'cured'
in_treatment class-attribute instance-attribute
in_treatment = 'in_treatment'
relapse class-attribute instance-attribute
relapse = 'relapse'
remission class-attribute instance-attribute
remission = 'remission'

PersonalSituationItem dataclass

PersonalSituationItem(
    *, living_status=None, number_of_children=None
)

Bases: DataClassJsonMixin

Corresponds to PersonalSituationItem TypeScript interface.

living_status class-attribute instance-attribute
living_status = None
number_of_children class-attribute instance-attribute
number_of_children = None

PhysicalActivityStatus

Bases: AlanBaseEnum

Physical activity status.

less_than_1 class-attribute instance-attribute
less_than_1 = 'less_than_1'
less_than_2 class-attribute instance-attribute
less_than_2 = 'less_than_2'
more_than_2 class-attribute instance-attribute
more_than_2 = 'more_than_2'
no class-attribute instance-attribute
no = 'no'

SmokingStatus

Bases: AlanBaseEnum

Smoking status.

deprived class-attribute instance-attribute
deprived = 'deprived'
less_than_5 class-attribute instance-attribute
less_than_5 = 'less_than_5'
less_than_9 class-attribute instance-attribute
less_than_9 = 'less_than_9'
more_than_9 class-attribute instance-attribute
more_than_9 = 'more_than_9'
no class-attribute instance-attribute
no = 'no'
passive class-attribute instance-attribute
passive = 'passive'

TransportModeItem dataclass

TransportModeItem(
    *,
    travel_time=None,
    transport_mode=None,
    is_full_remote=None
)

Bases: DataClassJsonMixin

Corresponds to TransportModeItem TypeScript interface.

is_full_remote class-attribute instance-attribute
is_full_remote = None
transport_mode class-attribute instance-attribute
transport_mode = None
travel_time class-attribute instance-attribute
travel_time = None

TreatmentItem dataclass

TreatmentItem(*, description=None)

Bases: DataClassJsonMixin

Treatment item.

Corresponds to TreatmentItem TypeScript interface.

description class-attribute instance-attribute
description = None

VaccineItem dataclass

VaccineItem(*, status=None, description=None)

Bases: DataClassJsonMixin

Vaccination item.

Corresponds to VaccineItem TypeScript interface.

description class-attribute instance-attribute
description = None
status class-attribute instance-attribute
status = None

VaccineStatus

Bases: AlanBaseEnum

Status of vaccination.

need_update class-attribute instance-attribute
need_update = 'need_update'
unknown class-attribute instance-attribute
unknown = 'unknown'
up_to_date class-attribute instance-attribute
up_to_date = 'up_to_date'

health_measure

Health measure entity for occupational health.

NOTE: This is a duplicate of components.medical_secrecy.internal.entities.health_measure.HealthMeasureItem to avoid cross-component dependencies. Keep in sync manually.

HealthMeasureItem dataclass

HealthMeasureItem(
    *, id=None, measure_type, measure_value, measure_date
)

Bases: DataClassJsonMixin

Single health measure stored as individual encrypted columns in DB.

Corresponds to HealthMeasureItem TypeScript interface.

id class-attribute instance-attribute
id = None
measure_date instance-attribute
measure_date
measure_type instance-attribute
measure_type
measure_value instance-attribute
measure_value

health_professional

HealthProfessional dataclass

HealthProfessional(id, first_name, last_name, role)

Bases: DataClassJsonMixin

A health professional entity.

Used to represent health professionals from the occupational health component.

first_name instance-attribute
first_name
id instance-attribute
id
last_name instance-attribute
last_name
role instance-attribute
role

member_info

MemberInfo dataclass

MemberInfo(
    *,
    occupational_health_profile_id,
    user_id,
    member_first_name,
    member_last_name,
    member_gender,
    member_birthdate
)

Bases: DataClassJsonMixin

Information about a member, such as their name, gender, birthdate, etc.

Note: we'll be using other structures for the rest of the data e.g. MemberActivity, etc.

Used in the HP medical app.

member_birthdate instance-attribute
member_birthdate
member_first_name instance-attribute
member_first_name
member_gender instance-attribute
member_gender
member_last_name instance-attribute
member_last_name
occupational_health_profile_id instance-attribute
occupational_health_profile_id
user_id instance-attribute
user_id

search

SearchResult dataclass

SearchResult(*, members, accounts, companies)

Bases: DataClassJsonMixin

Represents the result of a search operation for members, accounts, companies

accounts instance-attribute
accounts
companies instance-attribute
companies
members instance-attribute
members

subscribers

CreateSubscriberDocument dataclass

CreateSubscriberDocument(
    *, name, type, file, account_id, siret
)

Bases: DataClassJsonMixin

Create a new subscriber document

account_id instance-attribute
account_id
file instance-attribute
file
name instance-attribute
name
siret instance-attribute
siret
type instance-attribute
type

SubscriberDocument dataclass

SubscriberDocument(
    *,
    id,
    public_uri,
    filename,
    uploaded_at,
    account_id,
    siret,
    company_name,
    company_address,
    document_type,
    mime_type
)

Bases: DataClassJsonMixin

Subscriber document

account_id instance-attribute
account_id
company_address instance-attribute
company_address
company_name instance-attribute
company_name
document_type instance-attribute
document_type
filename instance-attribute
filename
id instance-attribute
id
mime_type instance-attribute
mime_type
public_uri instance-attribute
public_uri
siret instance-attribute
siret
uploaded_at instance-attribute
uploaded_at

UpdateSubscriberDocument dataclass

UpdateSubscriberDocument(
    *, id, account_id, name, document_type, siret
)

Bases: DataClassJsonMixin

Update an existing subscriber document

account_id instance-attribute
account_id
document_type instance-attribute
document_type
id instance-attribute
id
name instance-attribute
name
siret instance-attribute
siret

thesaurus

ThesaurusMean dataclass

ThesaurusMean(*, id, label)

Bases: DataClassJsonMixin

The public dataclass of a ThesaurusMean from Presanse

id instance-attribute
id
label instance-attribute
label

ThesaurusOccupationalExposure dataclass

ThesaurusOccupationalExposure(
    *,
    notation,
    label,
    dc_type,
    status,
    created_at,
    version,
    line_number,
    risk_category,
    parent_notation,
    class_label=None,
    subclass_label=None,
    class_notation=None,
    subclass_notation=None
)

Bases: DataClassJsonMixin

A TEP (Thésaurus des Expositions Professionnelles) exposition entry.

class_label class-attribute instance-attribute
class_label = None
class_notation class-attribute instance-attribute
class_notation = None
created_at instance-attribute
created_at
dc_type instance-attribute
dc_type
label instance-attribute
label
line_number instance-attribute
line_number
notation instance-attribute
notation
parent_notation instance-attribute
parent_notation
risk_category instance-attribute
risk_category
status instance-attribute
status
subclass_label class-attribute instance-attribute
subclass_label = None
subclass_notation class-attribute instance-attribute
subclass_notation = None
version instance-attribute
version

visit

PlannedVisit dataclass

PlannedVisit(
    profile_id, timestamp_planned, minutes_difference
)

Bases: DataClassJsonMixin

Planned visit information

minutes_difference instance-attribute
minutes_difference
profile_id instance-attribute
profile_id
timestamp_planned instance-attribute
timestamp_planned

PlannedVisitWithDetails dataclass

PlannedVisitWithDetails(
    *,
    visit_id,
    profile_id,
    timestamp_planned,
    visit_table,
    visit_type,
    visit_setup,
    health_professional_name
)

Bases: DataClassJsonMixin

Planned visit with extra details for monitoring exports.

health_professional_name instance-attribute
health_professional_name
profile_id instance-attribute
profile_id
timestamp_planned instance-attribute
timestamp_planned
visit_id instance-attribute
visit_id
visit_setup instance-attribute
visit_setup
visit_table instance-attribute
visit_table
visit_type instance-attribute
visit_type

VisitInfo dataclass

VisitInfo(
    *,
    visit_id,
    member_first_name,
    member_last_name,
    member_gender,
    member_birthdate,
    visit_date,
    visit_hour_booked,
    visit_type,
    health_professional_name,
    health_professional_id,
    visit_status,
    occupational_health_profile_id,
    visit_setup
)

Bases: DataClassJsonMixin

Information about a visit scheduled for a specific date.

Used in the HP medical app.

health_professional_id instance-attribute
health_professional_id
health_professional_name instance-attribute
health_professional_name
member_birthdate instance-attribute
member_birthdate
member_first_name instance-attribute
member_first_name
member_gender instance-attribute
member_gender
member_last_name instance-attribute
member_last_name
occupational_health_profile_id instance-attribute
occupational_health_profile_id
visit_date instance-attribute
visit_date
visit_hour_booked instance-attribute
visit_hour_booked
visit_id instance-attribute
visit_id
visit_setup instance-attribute
visit_setup
visit_status instance-attribute
visit_status
visit_type instance-attribute
visit_type

workspace_action

WorkspaceAction dataclass

WorkspaceAction(
    id,
    title,
    target_type,
    status,
    created_at,
    siret=None,
    company_name=None,
    company_address=None,
    members_count=None,
    member_full_name=None,
    job_title=None,
    health_professional_fullname=None,
    profile_id=None,
    account_id=None,
    account_name=None,
    eta_date=None,
    prevention_type=None,
    note=None,
)

Bases: DataClassJsonMixin

Represents a WorkspaceAction (an AMT in France) in the context of occupational health.

account_id class-attribute instance-attribute
account_id = None
account_name class-attribute instance-attribute
account_name = None
company_address class-attribute instance-attribute
company_address = None
company_name class-attribute instance-attribute
company_name = None
created_at instance-attribute
created_at
eta_date class-attribute instance-attribute
eta_date = None
health_professional_fullname class-attribute instance-attribute
health_professional_fullname = None
id instance-attribute
id
job_title class-attribute instance-attribute
job_title = None
member_full_name class-attribute instance-attribute
member_full_name = None
members_count class-attribute instance-attribute
members_count = None
note class-attribute instance-attribute
note = None
prevention_type class-attribute instance-attribute
prevention_type = None
profile_id class-attribute instance-attribute
profile_id = None
siret class-attribute instance-attribute
siret = None
status instance-attribute
status
target_type instance-attribute
target_type
title instance-attribute
title

WorkspaceActionPreventionType

Bases: AlanBaseEnum

Represents the type of prevention of the WorkspaceAction

primary class-attribute instance-attribute
primary = 'primary'
secondary class-attribute instance-attribute
secondary = 'secondary'
tertiary class-attribute instance-attribute
tertiary = 'tertiary'

WorkspaceActionStatus

Bases: AlanBaseEnum

Represents a current status of the WorkspaceAction

in_progress class-attribute instance-attribute
in_progress = 'in_progress'
todo class-attribute instance-attribute
todo = 'todo'
validated class-attribute instance-attribute
validated = 'validated'

WorkspaceActionType

Bases: AlanBaseEnum

Represents the target of the WorkspaceAction. For now it's only a member or a company

company class-attribute instance-attribute
company = 'company'
member class-attribute instance-attribute
member = 'member'

components.occupational_health.public.enums

AffiliationDecision

Bases: AlanBaseEnum

Possible decisions for occupational health affiliation.

AFFILIATED class-attribute instance-attribute

AFFILIATED = 'affiliated'

NOT_AFFILIATED class-attribute instance-attribute

NOT_AFFILIATED = 'not_affiliated'

REQUIRE_MANUAL_REVIEW class-attribute instance-attribute

REQUIRE_MANUAL_REVIEW = 'require_manual_review'

RiskCategory

Bases: AlanBaseEnum

Risk categories / "type de suivi" for occupational health visits.

These determine the frequency and type of required medical visits.

SI class-attribute instance-attribute

SI = 'SI'

SIA class-attribute instance-attribute

SIA = 'SIA'

SIR class-attribute instance-attribute

SIR = 'SIR'

SubscriberDocumentType

Bases: AlanBaseEnum

Type of subscriber document.

See https://www.notion.so/alaninsurance/Occupational-Health-Glossary-2d81426e8be7801cba9ed5dacdc24c60 ⧉

ACTIVITY_REPORT class-attribute instance-attribute

ACTIVITY_REPORT = 'activity_report'

COLLECTIVE_WORKSTATION_ASSESSMENT class-attribute instance-attribute

COLLECTIVE_WORKSTATION_ASSESSMENT = (
    "collective_workstation_assessment"
)

CSE class-attribute instance-attribute

CSE = 'CSE'

OTHER class-attribute instance-attribute

OTHER = 'other'

PAPRIPACT class-attribute instance-attribute

PAPRIPACT = 'PAPRIPACT'

SINGLE_OCCUPATIONAL_RISK_ASSESSMENT class-attribute instance-attribute

SINGLE_OCCUPATIONAL_RISK_ASSESSMENT = (
    "single_occupational_risk_assessment"
)

STAFF_INTERNAL_REGULATIONS class-attribute instance-attribute

STAFF_INTERNAL_REGULATIONS = 'staff_internal_regulations'

STOPPAGES_ACCIDENTS_ILLNESSES_SUMMARY class-attribute instance-attribute

STOPPAGES_ACCIDENTS_ILLNESSES_SUMMARY = (
    "stoppages_accidents_illnesses_summary"
)

WORKPLACE_RISK_FILE class-attribute instance-attribute

WORKPLACE_RISK_FILE = 'workplace_risk_file'

components.occupational_health.public.events

subscription

subscribe_to_events

subscribe_to_events()

All event subscriptions for the occupational health component should be done here.

Source code in components/occupational_health/public/events/subscription.py
def subscribe_to_events() -> None:
    """
    All event subscriptions for the occupational health component should be done here.
    """
    from components.occupational_health.internal.profile.actions.update_occupational_health_profile_merge import (
        update_occupational_health_profile_merge,
    )
    from shared.messaging.broker import get_message_broker

    message_broker = get_message_broker()

    # This is on top of the existing global profile profile merge event
    # See: the components.global_profile.internal.events.subscribers.update_links_to_profile_when_merging
    message_broker.subscribe_async(
        ProfilesMerged,
        update_occupational_health_profile_merge,
        queue_name=LOW_PRIORITY_QUEUE,
    )

components.occupational_health.public.exceptions

CompanyIdNotFound

CompanyIdNotFound(company_id)

Bases: ConsumerError

The company ID mentioned in the employment change could not be found.

Should only happen within unit tests, or if a blocked movement has its company deleted (very rare)

Source code in components/occupational_health/public/exceptions.py
def __init__(self, company_id: CompanyId) -> None:
    super().__init__(
        message=f"Company ID '{company_id}' not found",
        consumer=occupational_health_employment_change_consumer.__name__,
    )

error_code class-attribute instance-attribute

error_code = 'company_id_not_found'

UserIdNotFound

UserIdNotFound(message)

Bases: UserNotFoundError

A user ID cannot be found.

This happens generally after a user merge event, when the employment change still refers the old user ID.

Source code in components/occupational_health/public/exceptions.py
def __init__(self, message: str) -> None:
    super().__init__(
        message=message,
        consumer=occupational_health_employment_change_consumer.__name__,
    )

components.occupational_health.public.marmot

actions

Actions for use in Marmot controllers.

AffiliationRequest dataclass

AffiliationRequest(
    user_id,
    start_date,
    end_date,
    risk_category,
    extra_data,
    last_visit=None,
    personal_email=None,
    professional_email=None,
)

Bases: DataClassJsonMixin

end_date instance-attribute
end_date
extra_data instance-attribute
extra_data
last_visit class-attribute instance-attribute
last_visit = None
personal_email class-attribute instance-attribute
personal_email = None
professional_email class-attribute instance-attribute
professional_email = None
risk_category instance-attribute
risk_category
start_date instance-attribute
start_date
user_id instance-attribute
user_id

DuplicateAdminError

Bases: Exception

Raised when trying to add a duplicate occupational health dashboard admin.

RuleAction

Bases: AlanBaseEnum

Which action to take when affiliating employees matching the rule.

AFFILIATE class-attribute instance-attribute
AFFILIATE = 'affiliate'

Affiliate the employee automatically

IGNORE class-attribute instance-attribute
IGNORE = 'ignore'

Do not affiliate the employee

REQUIRE_MANUAL_REVIEW class-attribute instance-attribute
REQUIRE_MANUAL_REVIEW = 'require_manual_review'

Ask the admin to decide whether to affiliate the employee or not

add_discrepancy_note_for_marmot

add_discrepancy_note_for_marmot(
    account_id, user_id, comment, noted_by_user_id
)

Add a note on a discrepancy.

Upserts: if a note already exists for (account_id, user_id), update comment and noted_by; otherwise create a new record.

Source code in components/occupational_health/public/marmot/actions.py
def add_discrepancy_note_for_marmot(
    account_id: AccountId,
    user_id: UserId,
    comment: str,
    noted_by_user_id: UserId,
) -> None:
    """
    Add a note on a discrepancy.

    Upserts: if a note already exists for (account_id, user_id), update
    comment and noted_by; otherwise create a new record.
    """
    from components.occupational_health.internal.models.occupational_health_discrepancy_note import (
        OccupationalHealthDiscrepancyNote,
    )

    existing = current_session.scalars(
        select(OccupationalHealthDiscrepancyNote).where(
            OccupationalHealthDiscrepancyNote.account_id == account_id,
            OccupationalHealthDiscrepancyNote.user_id == user_id,
        )
    ).one_or_none()

    if existing:
        existing.comment = comment
        existing.noted_by_user_id = noted_by_user_id
        current_session.add(existing)
    else:
        current_session.add(
            OccupationalHealthDiscrepancyNote(
                account_id=account_id,
                user_id=user_id,
                comment=comment,
                noted_by_user_id=noted_by_user_id,
            )
        )

    current_session.commit()

    current_logger.info(
        "Added discrepancy note",
        account_id=account_id,
        user_id=user_id,
        noted_by_user_id=noted_by_user_id,
    )

add_occupational_health_dashboard_admin_to_account

add_occupational_health_dashboard_admin_to_account(
    account_id, user_id
)

Add an occupational health dashboard admin to an account.

Raises:

Type Description
DuplicateAdminError

If the user is already an admin for this account.

Source code in components/occupational_health/internal/business_logic/actions/dashboard_admin.py
def add_occupational_health_dashboard_admin_to_account(
    account_id: AccountId, user_id: UserId
) -> UUID:
    """
    Add an occupational health dashboard admin to an account.

    Raises:
        DuplicateAdminError: If the user is already an admin for this account.
    """

    # Check if admin already exists for this user and account
    existing_admin = current_session.scalar(
        select(OccupationalHealthDashboardAdmin).where(
            OccupationalHealthDashboardAdmin.user_id == user_id,
            OccupationalHealthDashboardAdmin.account_id == account_id,
        )
    )

    if existing_admin:
        raise DuplicateAdminError(
            f"User {user_id} is already an admin for account {account_id}. "
            "Cannot add duplicate admin."
        )

    current_logger.info(
        f"Adding occupational health dashboard admin user {user_id} to account {account_id}"
    )
    occupational_health_dashboard_admin = OccupationalHealthDashboardAdmin(
        account_id=account_id,
        user_id=user_id,
        scope=OccupationalHealthDashboardAdminScope.ACCOUNT,
    )
    current_session.add(occupational_health_dashboard_admin)
    current_session.commit()

    return occupational_health_dashboard_admin.id

affiliate_multiple_members

affiliate_multiple_members(account_id, affiliations)

Affiliate multiple members to Prévenir in a single transaction.

Parameters:

Name Type Description Default
account_id AccountId

The account ID to affiliate members to

required
affiliations list[AffiliationRequest]

List of affiliation requests with user_id, start_date, optional last_visit, and optional risk_category

required

Returns:

Type Description
list[UUID]

List of affiliation IDs

Source code in components/occupational_health/internal/business_logic/actions/affiliation_tool.py
def affiliate_multiple_members(
    account_id: AccountId,
    affiliations: list[AffiliationRequest],
) -> list[UUID]:
    """
    Affiliate multiple members to Prévenir in a single transaction.

    Args:
        account_id: The account ID to affiliate members to
        affiliations: List of affiliation requests with user_id, start_date, optional last_visit, and optional risk_category

    Returns:
        List of affiliation IDs
    """
    contract_start_date = get_contract_start_date(account_id)

    current_logger.info(
        f"Affiliating {len(affiliations)} members to account {account_id}..."
    )
    affiliation_ids: list[UUID] = []

    for affiliation_request in affiliations:
        # Get or create occupational health profile ID from user_id
        profile_id = get_or_create_profile_id(affiliation_request.user_id, commit=False)
        affiliation_id = affiliate_member(
            occupational_health_profile_id=profile_id,
            start_date=affiliation_request.start_date,
            end_date=affiliation_request.end_date,
            account_id=account_id,
            commit=False,
        )
        affiliation_ids.append(affiliation_id)

        # Update personal email if provided
        if affiliation_request.personal_email:
            update_personal_email(
                profile_id=profile_id,
                personal_email=affiliation_request.personal_email,
                commit=False,
            )

        # Update professional email if provided
        if affiliation_request.professional_email:
            update_professional_email(
                profile_id=profile_id,
                account_id=account_id,
                professional_email=affiliation_request.professional_email,
                commit=False,
            )

        # Store risk category in administrative profile if provided
        if affiliation_request.risk_category:
            update_administrative_profile(
                occupational_health_profile_id=profile_id,
                risk_category=affiliation_request.risk_category,
                commit=False,
            )

        affiliation_log = OccupationalHealthMassAffiliationLog(
            profile_id=profile_id,
            account_id=account_id,
            start_date=affiliation_request.start_date,
            extra_data=affiliation_request.extra_data,
            affiliation_id=affiliation_id,
        )
        current_session.add(affiliation_log)

        # Create visit record if last_visit data is provided
        if affiliation_request.last_visit:
            visit_type = affiliation_request.last_visit.visit_type
            visit_date = affiliation_request.last_visit.date

            create_backfilled_visit_if_not_exists(
                profile_id=profile_id,
                visit_date=visit_date,
                visit_type=visit_type,
                account_id=account_id,
                contract_start_date=contract_start_date,
            )

    current_session.commit()

    return affiliation_ids

affiliate_new_member_for_marmot

affiliate_new_member_for_marmot(
    *,
    user_id,
    account_id,
    start_date,
    end_date=None,
    personal_email=None,
    professional_email=None,
    last_visit=None,
    risk_category=None,
    actor_user_id=None
)

Affiliate a new member to occupational health.

Source code in components/occupational_health/public/marmot/actions.py
def affiliate_new_member_for_marmot(
    *,
    user_id: UserId,
    account_id: AccountId,
    start_date: date,
    end_date: date | None = None,
    personal_email: str | None = None,
    professional_email: str | None = None,
    last_visit: "LastVisit | None" = None,
    risk_category: "RiskCategory | None" = None,
    actor_user_id: UserId | str | None = None,
) -> None:
    """
    Affiliate a new member to occupational health.
    """
    from components.occupational_health.internal.business_logic.actions.members.update_administrative_profile import (
        update_administrative_profile,
    )

    current_logger.info(
        "Affiliating new member from Marmot",
        extra={
            "user_id": user_id,
            "account_id": account_id,
            "start_date": start_date,
            "end_date": end_date,
        },
    )

    profile_id = get_or_create_profile_id(user_id)

    current_logger.info(
        "Affiliating new member from Marmot",
        extra={
            "profile_id": profile_id,
            "account_id": account_id,
            "start_date": start_date,
            "end_date": end_date,
        },
    )

    account_name = get_account_name(account_id)
    ts_thread = log_and_post_message(
        f"Affiliating new member {user_id} from Marmot (account {account_name})",
        username="Alaner activity in Marmot",
    )
    slack_id = get_actor_slack_handle(actor_user_id=actor_user_id)
    log_and_post_message(
        f"<{slack_id}> can you share context on why this was required?",
        username="Alaner activity in Marmot",
        ts_thread=ts_thread,
    )

    affiliate_member(
        occupational_health_profile_id=profile_id,
        start_date=start_date,
        end_date=end_date,
        account_id=account_id,
    )

    # Update personal email if provided
    if personal_email:
        update_personal_email(
            profile_id=profile_id,
            personal_email=personal_email,
        )

    # Update professional email if provided
    if professional_email:
        update_professional_email(
            profile_id=profile_id,
            account_id=account_id,
            professional_email=professional_email,
        )

    # Store risk category in administrative profile if provided
    if risk_category:
        update_administrative_profile(
            occupational_health_profile_id=profile_id,
            risk_category=risk_category,
        )

    # Create visit record if last_visit data is provided
    if last_visit:
        create_backfilled_visit_if_not_exists(
            profile_id=profile_id,
            visit_date=last_visit.date,
            visit_type=last_visit.visit_type,
            account_id=account_id,
            commit=True,
            contract_start_date=get_contract_start_date(account_id),
        )

cancel_affiliation_for_marmot

cancel_affiliation_for_marmot(
    affiliation_id, actor_user_id
)

Cancel an affiliation for a profile.

This is a wrapper around cancel_affiliation that adds logging.

Source code in components/occupational_health/public/marmot/actions.py
def cancel_affiliation_for_marmot(
    affiliation_id: UUID,
    actor_user_id: UserId | str | None,
) -> None:
    """
    Cancel an affiliation for a profile.

    This is a wrapper around cancel_affiliation that adds logging.
    """
    from components.occupational_health.internal.business_logic.actions.affiliation import (
        cancel_affiliation,
    )

    current_logger.info(
        f"Cancelling affiliation {affiliation_id} from Marmot",
        extra={
            "actor_user_id": actor_user_id,
        },
    )

    ts_thread = log_and_post_message(
        f"Cancelling affiliation `{affiliation_id}` from Marmot",
        username="Alaner activity in Marmot",
    )
    slack_id = get_actor_slack_handle(actor_user_id=actor_user_id)
    log_and_post_message(
        f"<{slack_id}> can you share context on why this was required?",
        username="Alaner activity in Marmot",
        ts_thread=ts_thread,
    )

    cancel_affiliation(affiliation_id=affiliation_id)

create_affiliation_strategy_rule_for_marmot

create_affiliation_strategy_rule_for_marmot(
    *, account_id, company_id=None, siret=None, name, action
)

Create a new affiliation strategy rule for a company or SIRET.

Either company_id or siret must be provided, but not both.

Source code in components/occupational_health/public/marmot/actions.py
def create_affiliation_strategy_rule_for_marmot(
    *,
    account_id: AccountId,
    company_id: CompanyId | None = None,
    siret: str | None = None,
    name: str | None,
    action: RuleAction,
) -> UUID:
    """
    Create a new affiliation strategy rule for a company or SIRET.

    Either company_id or siret must be provided, but not both.
    """
    from components.occupational_health.internal.models.affiliation_strategy_rule import (
        AffiliationStrategyRule,
    )

    if not company_id and not siret:
        raise BaseErrorCode.invalid_arguments(
            message="Either company_id or siret must be provided"
        )
    if company_id and siret:
        raise BaseErrorCode.invalid_arguments(
            message="Cannot provide both company_id and siret"
        )

    current_logger.info(
        "Creating affiliation strategy rule from Marmot",
        extra={
            "account_id": account_id,
            "company_id": company_id,
            "siret": siret,
            "name": name,
            "action": action,
        },
    )

    # Check if rule already exists
    query = select(AffiliationStrategyRule).where(
        AffiliationStrategyRule.account_id == account_id,
    )
    if company_id:
        query = query.where(AffiliationStrategyRule.company_id == company_id)
    if siret:
        query = query.where(AffiliationStrategyRule.siret == siret)

    existing_rule = current_session.scalars(query).first()

    if existing_rule:
        rule_type = f"company {company_id}" if company_id else f"SIRET {siret}"
        raise BaseErrorCode.invalid_arguments(
            message=f"Rule already exists for {rule_type} in account {account_id}"
        )

    # Create new rule
    new_rule = AffiliationStrategyRule(
        account_id=account_id,
        company_id=company_id,
        siret=siret,
        name=name,
        action=action,
        is_missing_siret=False,
    )

    current_session.add(new_rule)
    current_session.commit()

    current_logger.info(
        f"Created new affiliation strategy rule: {new_rule.id}",
        extra={
            "rule_id": new_rule.id,
            "account_id": account_id,
            "company_id": company_id,
            "siret": siret,
            "name": name,
            "action": action,
        },
    )

    return new_rule.id

create_contract

create_contract(session, account_id, start_date)
Source code in components/occupational_health/internal/business_logic/contracting/actions.py
@transactional(propagation=Propagation.REQUIRES_NEW)
def create_contract(
    session: Session,  # noqa:ARG001
    account_id: AccountId,
    start_date: date,
) -> UUID:
    from components.contracting.public.events import (
        CompanyContractSigned,
        ContractSigned,
    )
    from components.contracting.subcomponents.subscription.public.actions import (
        initialize_subscription,
        record_subscription_updates,
    )
    from components.contracting.subcomponents.subscription.public.entities import (
        SubscriptionScope,
        SubscriptionUpdateRequest,
    )
    from components.occupational_health.external.company import (
        get_company_ids_in_account,
    )
    from shared.helpers.app_name import AppName
    from shared.messaging.broker import get_message_broker

    subscription = initialize_subscription(
        subscription_scope=SubscriptionScope.occupational_health,
        owner_type=SubscriptionOwnerType.ACCOUNT,
        owner_ref=str(account_id),
    )
    current_logger.info(
        f"Created subscription {subscription.id} for account {account_id}"
    )

    updates = [
        SubscriptionUpdateRequest(
            validity_period=ValidityPeriod(
                start_date=start_date,
                end_date=None,
            ),
            # mandatory but not used
            payload_ref=UUID("00000000-0000-0000-0000-000000000000"),
            operation_ref=None,
            is_deletion=False,
        ),
    ]

    record_subscription_updates(
        subscription_scope=SubscriptionScope.occupational_health,
        # NOTE: for now "subscription_ref" was used in the context of affiliations,
        # and was a string of the format "occhealth:account:<account_id>" (see
        # build_subscription_ref()). Here the subscription ref is just the
        # subscription UUID, which could be confusing: let's not expose that
        # "contracting" subscription_ref under that name at this point.
        subscription_ref=str(subscription.id),
        updates=updates,
        commit=False,
    )
    current_logger.info(
        f"Added subscription version for subscription {subscription.id} and start date {start_date}"
    )

    message_broker = get_message_broker()
    for company_id in get_company_ids_in_account(account_id):
        message_broker.publish(
            CompanyContractSigned(
                company_id=str(company_id),
                contract_id=str(subscription.id),
                app_name=AppName.ALAN_FR,
            )
        )
        message_broker.publish(
            ContractSigned(
                company_id=str(company_id),
                individual_id=None,
                contract_id=str(subscription.id),
                app_name=AppName.ALAN_FR,
                contract_type="occupational_health",
            )
        )

    return subscription.id

update_employment_nic

update_employment_nic(employment_id, nic)

Update the NIC of an employment.

Source code in components/occupational_health/public/marmot/actions.py
def update_employment_nic(
    employment_id: UUID,
    nic: str | None,
) -> None:
    """
    Update the NIC of an employment.
    """
    from components.employment.public.business_logic.queries.core_employment_version import (
        get_latest_version_for_employment_or_raise,
    )
    from components.fr.public.employment.fr_extended_values import (
        FrExtendedValues,
    )
    from shared.temporal.validity_period import ValidityPeriod

    employment_version = get_latest_version_for_employment_or_raise(employment_id)
    validity_period = ValidityPeriod(
        start_date=employment_version.start_date,
        end_date=employment_version.end_date,
    )

    set_extended_employment_values(
        employment_id=employment_id,
        values=FrExtendedValues({"nic": nic}),
        source_type=SourceType.fr_marmot_occupational_health,
        source_information=SourceInformation(
            raw_data={},
            metadata={"endpoint": "occupational_health.update_employment"},
        ),
        validity_period=validity_period,
        commit=True,
    )

    current_logger.info(
        "Updated NIC of employment",
        employment_id=employment_id,
        nic=nic,
    )

entities

AccountForMarmot dataclass

AccountForMarmot(id, name)

Bases: DataClassJsonMixin

Represent an account with an occupational health contract for display in Marmot.

id instance-attribute
id
name instance-attribute
name

AffiliatedMemberForMarmot dataclass

AffiliatedMemberForMarmot(
    affiliation_id,
    start_date,
    end_date,
    is_cancelled,
    user_id,
    occupational_health_profile_id,
    global_profile_id,
    first_name,
    last_name,
    birth_date,
    professional_email,
    personal_email,
    is_vip,
    employments,
    account_id,
)

Bases: DataClassJsonMixin

Represent a member affiliated to Prévenir, to be displayed in Marmot

account_id instance-attribute
account_id
affiliation_id instance-attribute
affiliation_id
birth_date instance-attribute
birth_date
employments instance-attribute
employments
end_date instance-attribute
end_date
first_name instance-attribute
first_name
global_profile_id instance-attribute
global_profile_id
is_cancelled instance-attribute
is_cancelled
is_vip instance-attribute
is_vip
last_name instance-attribute
last_name
occupational_health_profile_id instance-attribute
occupational_health_profile_id
personal_email instance-attribute
personal_email
professional_email instance-attribute
professional_email
start_date instance-attribute
start_date
user_id instance-attribute
user_id

AffiliationDecisionInfo dataclass

AffiliationDecisionInfo(
    decision_id,
    decision,
    manual_decision,
    rule_name,
    created_at,
)

Bases: DataClassJsonMixin

Info about an affiliation decision for a user.

created_at instance-attribute
created_at
decision instance-attribute
decision
decision_id instance-attribute
decision_id
manual_decision instance-attribute
manual_decision
rule_name instance-attribute
rule_name

AffiliationMovementForMarmot dataclass

AffiliationMovementForMarmot(
    movement_id,
    account_id,
    profile_id,
    movement_type,
    date_of_movement,
    first_name,
    last_name,
    birth_date,
    created_at,
    decision_id,
    company_id,
    company_name,
    company_name_enriched_from_dsn,
)

Bases: DataClassJsonMixin

Represent an affiliation movement for display in Marmot

account_id instance-attribute
account_id
birth_date instance-attribute
birth_date
company_id instance-attribute
company_id
company_name instance-attribute
company_name
company_name_enriched_from_dsn instance-attribute
company_name_enriched_from_dsn
created_at instance-attribute
created_at
date_of_movement instance-attribute
date_of_movement
decision_id instance-attribute
decision_id
first_name instance-attribute
first_name
last_name instance-attribute
last_name
movement_id instance-attribute
movement_id
movement_type instance-attribute
movement_type
profile_id instance-attribute
profile_id

CancelledAffiliationInfo dataclass

CancelledAffiliationInfo(
    affiliation_id, start_date, end_date
)

Bases: DataClassJsonMixin

Info about a cancelled affiliation for a user.

affiliation_id instance-attribute
affiliation_id
end_date instance-attribute
end_date
start_date instance-attribute
start_date

CompanyWithAffiliatedCount dataclass

CompanyWithAffiliatedCount(
    company_id,
    company_display_name,
    ever_affiliated_members_count,
    sirets,
)

Bases: DataClassJsonMixin

Represent a company with its count of affiliated members for display in Marmot.

company_display_name instance-attribute
company_display_name
company_id instance-attribute
company_id
ever_affiliated_members_count instance-attribute
ever_affiliated_members_count
sirets instance-attribute
sirets

ContractInfo dataclass

ContractInfo(account_id, start_date, end_date)

Bases: DataClassJsonMixin

Information about a contract.

account_id instance-attribute
account_id
end_date instance-attribute
end_date
start_date instance-attribute
start_date

DiscrepancyFixProposition dataclass

DiscrepancyFixProposition(fix_type, reason, data)

Bases: DataClassJsonMixin

A proposed fix for a discrepancy.

data instance-attribute
data
fix_type instance-attribute
fix_type
reason instance-attribute
reason

DiscrepancyForMarmot dataclass

DiscrepancyForMarmot(
    account_id,
    profile_id,
    user_id,
    category,
    message,
    employments_timeline=None,
    affiliations_timeline=None,
    *,
    first_name,
    last_name,
    birth_date,
    employments,
    discrepancy_note=None,
    cancelled_affiliations=list(),
    affiliation_decisions=list()
)

Bases: Discrepancy

Represent a discrepancy found in an affiliation for display in Marmot.

affiliation_decisions class-attribute instance-attribute
affiliation_decisions = field(default_factory=list)
birth_date instance-attribute
birth_date
cancelled_affiliations class-attribute instance-attribute
cancelled_affiliations = field(default_factory=list)
discrepancy_note class-attribute instance-attribute
discrepancy_note = None
employments instance-attribute
employments
first_name instance-attribute
first_name
last_name instance-attribute
last_name

DiscrepancyNoteInfo dataclass

DiscrepancyNoteInfo(comment, noted_by_full_name, noted_at)

Bases: DataClassJsonMixin

Info about a note added to a discrepancy.

comment instance-attribute
comment
noted_at instance-attribute
noted_at
noted_by_full_name instance-attribute
noted_by_full_name

LastVisit dataclass

LastVisit(date, visit_type)

Bases: DataClassJsonMixin

Information about the last occupational health visit.

date instance-attribute
date
visit_type instance-attribute
visit_type

MemberToAffiliate dataclass

MemberToAffiliate(
    row_index,
    first_name,
    last_name,
    birthdate,
    ssn,
    professional_email,
    personal_email,
    proposed_user_id,
    proposed_user_name,
    proposed_birthdate,
    matched_name,
    matched_birthdate,
    matching_result,
    affiliations,
    employments,
    target_start_date,
    target_end_date,
    last_visit,
    risk_category,
    extra_data,
)

Bases: DataClassJsonMixin

Result of matching a spreadsheet row.

affiliations instance-attribute
affiliations
birthdate instance-attribute
birthdate
employments instance-attribute
employments
extra_data instance-attribute
extra_data
first_name instance-attribute
first_name
last_name instance-attribute
last_name
last_visit instance-attribute
last_visit
matched_birthdate instance-attribute
matched_birthdate
matched_name instance-attribute
matched_name
matching_result instance-attribute
matching_result
personal_email instance-attribute
personal_email
professional_email instance-attribute
professional_email
proposed_birthdate instance-attribute
proposed_birthdate
proposed_user_id instance-attribute
proposed_user_id
proposed_user_name instance-attribute
proposed_user_name
risk_category instance-attribute
risk_category
row_index instance-attribute
row_index
ssn instance-attribute
ssn
target_end_date instance-attribute
target_end_date
target_start_date instance-attribute
target_start_date

NicTimelineEntry dataclass

NicTimelineEntry(nic, start_date, end_date)

Bases: DataClassJsonMixin

A single NIC value with its validity period.

end_date instance-attribute
end_date
nic instance-attribute
nic
start_date instance-attribute
start_date

OccupationalHealthAffiliationDecisionForMarmot dataclass

OccupationalHealthAffiliationDecisionForMarmot(
    id,
    created_at,
    decision,
    employment_change_payload,
    core_employment_id,
    rule_id,
    rule_name,
)

Bases: DataClassJsonMixin

Affiliation decision data for display in Marmot.

core_employment_id instance-attribute
core_employment_id
created_at instance-attribute
created_at
decision instance-attribute
decision
employment_change_payload instance-attribute
employment_change_payload
id instance-attribute
id
rule_id instance-attribute
rule_id
rule_name instance-attribute
rule_name

OccupationalHealthAffiliationDecisionWithProfileForMarmot dataclass

OccupationalHealthAffiliationDecisionWithProfileForMarmot(
    id,
    created_at,
    decision,
    manual_decision,
    employment_change_payload,
    core_employment_id,
    rule_id,
    rule_name,
    first_name,
    last_name,
    birth_date,
    company_id,
    company_name,
    company_name_enriched_from_dsn,
)

Bases: DataClassJsonMixin

Affiliation decision data with profile information for display in Marmot.

birth_date instance-attribute
birth_date
company_id instance-attribute
company_id
company_name instance-attribute
company_name
company_name_enriched_from_dsn instance-attribute
company_name_enriched_from_dsn
core_employment_id instance-attribute
core_employment_id
created_at instance-attribute
created_at
decision instance-attribute
decision
employment_change_payload instance-attribute
employment_change_payload
first_name instance-attribute
first_name
id instance-attribute
id
last_name instance-attribute
last_name
manual_decision instance-attribute
manual_decision
rule_id instance-attribute
rule_id
rule_name instance-attribute
rule_name

OccupationalHealthProfileData dataclass

OccupationalHealthProfileData(
    profile_id,
    past_visits,
    affiliations,
    affiliation_decisions,
    next_visit_result,
)

Bases: DataClassJsonMixin

All occupational health data for a profile

affiliation_decisions instance-attribute
affiliation_decisions
affiliations instance-attribute
affiliations
next_visit_result instance-attribute
next_visit_result
past_visits instance-attribute
past_visits
profile_id instance-attribute
profile_id

RiskCategory

Bases: AlanBaseEnum

Risk categories / "type de suivi" for occupational health visits.

These determine the frequency and type of required medical visits.

SI class-attribute instance-attribute
SI = 'SI'
SIA class-attribute instance-attribute
SIA = 'SIA'
SIR class-attribute instance-attribute
SIR = 'SIR'

SiretWithMemberCount dataclass

SiretWithMemberCount(siret, member_count, pretty_name)

Bases: DataClassJsonMixin

Represent a SIRET with its count of members.

member_count instance-attribute
member_count
pretty_name instance-attribute
pretty_name

Company name from DSN, if available. Format: 'Name (City)' or None if not found.

siret instance-attribute
siret

VisitType

Bases: AlanBaseEnum

EXAMEN_MEDICAL_APTITUDE_EMBAUCHE class-attribute instance-attribute
EXAMEN_MEDICAL_APTITUDE_EMBAUCHE = (
    "3.examen_medical_aptitude_embauche"
)
EXAMEN_MEDICAL_APTITUDE_PERIODIQUE class-attribute instance-attribute
EXAMEN_MEDICAL_APTITUDE_PERIODIQUE = (
    "4.examen_medical_aptitude_periodique"
)
VISITE_CESSATION_EXPOSITION class-attribute instance-attribute
VISITE_CESSATION_EXPOSITION = (
    "30.visite_cessation_exposition"
)
VISITE_CESSATION_EXPOSITION_COURS_CARRIERE class-attribute instance-attribute
VISITE_CESSATION_EXPOSITION_COURS_CARRIERE = (
    "31.visite_cessation_exposition_cours_carriere"
)
VISITE_CESSATION_EXPOSITION_COURS_CARRIERE_INITIATIVE_EMPLOYEUR class-attribute instance-attribute
VISITE_CESSATION_EXPOSITION_COURS_CARRIERE_INITIATIVE_EMPLOYEUR = "32.visite_cessation_exposition_cours_carriere_initiative_employeur"
VISITE_CESSATION_EXPOSITION_COURS_CARRIERE_INITIATIVE_TRAVAILLEUR class-attribute instance-attribute
VISITE_CESSATION_EXPOSITION_COURS_CARRIERE_INITIATIVE_TRAVAILLEUR = "33.visite_cessation_exposition_cours_carriere_initiative_travailleur"
VISITE_CESSATION_EXPOSITION_FIN_CARRIERE class-attribute instance-attribute
VISITE_CESSATION_EXPOSITION_FIN_CARRIERE = (
    "34.visite_cessation_exposition_fin_carriere"
)
VISITE_CESSATION_EXPOSITION_FIN_CARRIERE_INITIATIVE_EMPLOYEUR class-attribute instance-attribute
VISITE_CESSATION_EXPOSITION_FIN_CARRIERE_INITIATIVE_EMPLOYEUR = "35.visite_cessation_exposition_fin_carriere_initiative_employeur"
VISITE_CESSATION_EXPOSITION_FIN_CARRIERE_INITIATIVE_TRAVAILLEUR class-attribute instance-attribute
VISITE_CESSATION_EXPOSITION_FIN_CARRIERE_INITIATIVE_TRAVAILLEUR = "36.visite_cessation_exposition_fin_carriere_initiative_travailleur"
VISITE_DEMANDE class-attribute instance-attribute
VISITE_DEMANDE = '11.visite_demande'
VISITE_DEMANDE_EMPLOYEUR class-attribute instance-attribute
VISITE_DEMANDE_EMPLOYEUR = '13.visite_demande_employeur'
VISITE_DEMANDE_MEDECIN_TRAVAIL class-attribute instance-attribute
VISITE_DEMANDE_MEDECIN_TRAVAIL = (
    "14.visite_demande_medecin_travail"
)
VISITE_DEMANDE_MEDECIN_TRAVAIL_ORIENTATION_INFIRMIER class-attribute instance-attribute
VISITE_DEMANDE_MEDECIN_TRAVAIL_ORIENTATION_INFIRMIER = "15.visite_demande_medecin_travail_orientation_infirmier"
VISITE_DEMANDE_TRAVAILLEUR class-attribute instance-attribute
VISITE_DEMANDE_TRAVAILLEUR = "12.visite_demande_travailleur"
VISITE_INFORMATION_PREVENTION_INITIALE class-attribute instance-attribute
VISITE_INFORMATION_PREVENTION_INITIALE = (
    "1.visite_information_prevention_initiale"
)
VISITE_INFORMATION_PREVENTION_PERIODIQUE class-attribute instance-attribute
VISITE_INFORMATION_PREVENTION_PERIODIQUE = (
    "2.visite_information_prevention_periodique"
)
VISITE_INTERMEDIAIRE_ENTRE_DEUX_EXAMENS class-attribute instance-attribute
VISITE_INTERMEDIAIRE_ENTRE_DEUX_EXAMENS = (
    "5.visite_intermediaire_entre_deux_examens"
)
VISITE_MI_CARRIERE class-attribute instance-attribute
VISITE_MI_CARRIERE = '29.visite_mi_carriere'
VISITE_PREREPRISE class-attribute instance-attribute
VISITE_PREREPRISE = '6.visite_prereprise'
VISITE_PREREPRISE_INITIATIVE_MEDECIN_CONSEIL class-attribute instance-attribute
VISITE_PREREPRISE_INITIATIVE_MEDECIN_CONSEIL = (
    "7.visite_prereprise_initiative_medecin_conseil"
)
VISITE_PREREPRISE_INITIATIVE_MEDECIN_TRAITANT class-attribute instance-attribute
VISITE_PREREPRISE_INITIATIVE_MEDECIN_TRAITANT = (
    "8.visite_prereprise_initiative_medecin_traitant"
)
VISITE_PREREPRISE_INITIATIVE_MEDECIN_TRAVAIL class-attribute instance-attribute
VISITE_PREREPRISE_INITIATIVE_MEDECIN_TRAVAIL = (
    "10.visite_prereprise_initiative_medecin_travail"
)
VISITE_PREREPRISE_INITIATIVE_TRAVAILLEUR class-attribute instance-attribute
VISITE_PREREPRISE_INITIATIVE_TRAVAILLEUR = (
    "9.visite_prereprise_initiative_travailleur"
)
VISITE_REPRISE class-attribute instance-attribute
VISITE_REPRISE = '16.visite_reprise'
VISITE_REPRISE_ABSENCE_30_JOURS_ACCIDENT_TRAVAIL class-attribute instance-attribute
VISITE_REPRISE_ABSENCE_30_JOURS_ACCIDENT_TRAVAIL = (
    "20.visite_reprise_absence_30_jours_accident_travail"
)
VISITE_REPRISE_ABSENCE_30_JOURS_ACCIDENT_TRAVAIL_INITIATIVE_EMPLOYEUR class-attribute instance-attribute
VISITE_REPRISE_ABSENCE_30_JOURS_ACCIDENT_TRAVAIL_INITIATIVE_EMPLOYEUR = "21.visite_reprise_absence_30_jours_accident_travail_initiative_employeur"
VISITE_REPRISE_ABSENCE_30_JOURS_ACCIDENT_TRAVAIL_INITIATIVE_TRAVAILLEUR class-attribute instance-attribute
VISITE_REPRISE_ABSENCE_30_JOURS_ACCIDENT_TRAVAIL_INITIATIVE_TRAVAILLEUR = "22.visite_reprise_absence_30_jours_accident_travail_initiative_travailleur"
VISITE_REPRISE_ABSENCE_MALADIE_PROFESSIONNELLE class-attribute instance-attribute
VISITE_REPRISE_ABSENCE_MALADIE_PROFESSIONNELLE = (
    "23.visite_reprise_absence_maladie_professionnelle"
)
VISITE_REPRISE_ABSENCE_MALADIE_PROFESSIONNELLE_INITIATIVE_EMPLOYEUR class-attribute instance-attribute
VISITE_REPRISE_ABSENCE_MALADIE_PROFESSIONNELLE_INITIATIVE_EMPLOYEUR = "24.visite_reprise_absence_maladie_professionnelle_initiative_employeur"
VISITE_REPRISE_ABSENCE_MALADIE_PROFESSIONNELLE_INITIATIVE_TRAVAILLEUR class-attribute instance-attribute
VISITE_REPRISE_ABSENCE_MALADIE_PROFESSIONNELLE_INITIATIVE_TRAVAILLEUR = "25.visite_reprise_absence_maladie_professionnelle_initiative_travailleur"
VISITE_REPRISE_CAUSE_MALADIE_ACCIDENT_NON_PRO class-attribute instance-attribute
VISITE_REPRISE_CAUSE_MALADIE_ACCIDENT_NON_PRO = (
    "17.visite_reprise_cause_maladie_accident_non_pro"
)
VISITE_REPRISE_CAUSE_MALADIE_ACCIDENT_NON_PRO_INITIATIVE_EMPLOYEUR class-attribute instance-attribute
VISITE_REPRISE_CAUSE_MALADIE_ACCIDENT_NON_PRO_INITIATIVE_EMPLOYEUR = "18.visite_reprise_cause_maladie_accident_non_pro_initiative_employeur"
VISITE_REPRISE_CAUSE_MALADIE_ACCIDENT_NON_PRO_INITIATIVE_TRAVAILLEUR class-attribute instance-attribute
VISITE_REPRISE_CAUSE_MALADIE_ACCIDENT_NON_PRO_INITIATIVE_TRAVAILLEUR = "19.visite_reprise_cause_maladie_accident_non_pro_initiative_travailleur"
VISITE_REPRISE_CONGE_MATERNITE class-attribute instance-attribute
VISITE_REPRISE_CONGE_MATERNITE = (
    "26.visite_reprise_conge_maternite"
)
VISITE_REPRISE_CONGE_MATERNITE_INITIATIVE_EMPLOYEUR class-attribute instance-attribute
VISITE_REPRISE_CONGE_MATERNITE_INITIATIVE_EMPLOYEUR = (
    "27.visite_reprise_conge_maternite_initiative_employeur"
)
VISITE_REPRISE_CONGE_MATERNITE_INITIATIVE_TRAVAILLEUR class-attribute instance-attribute
VISITE_REPRISE_CONGE_MATERNITE_INITIATIVE_TRAVAILLEUR = "28.visite_reprise_conge_maternite_initiative_travailleur"
detect_from_string classmethod
detect_from_string(visit_type_str)

Detect the visit type from a string.

Source code in components/occupational_health/internal/enums/visit_type.py
@classmethod
def detect_from_string(cls, visit_type_str: str) -> "VisitType":
    """
    Detect the visit type from a string.
    """
    try:
        return VisitType(visit_type_str)
    except ValueError:
        pass

    mapping = {
        "Visite d'embauche": VisitType.VISITE_INFORMATION_PREVENTION_INITIALE,
        "VIP": VisitType.VISITE_INFORMATION_PREVENTION_INITIALE,
        "Visite périodique": VisitType.VISITE_INFORMATION_PREVENTION_PERIODIQUE,
        "Visite de reprise": VisitType.VISITE_REPRISE,
        "Visite à la demande": VisitType.VISITE_DEMANDE,
        # not 100% sure about the original meaning of this one, but limited number of cases
        "Visite de suivi": VisitType.VISITE_DEMANDE_MEDECIN_TRAVAIL,
    }

    if visit_type_str not in mapping:
        raise ValueError(f"Visit type not found in mapping: {visit_type_str}")

    return mapping[visit_type_str]
level_1_category property
level_1_category

enums

MovementType

Bases: AlanBaseEnum

The recorded movement: either an affiliation, or a termination, or a cancellation.

Transfers are composed by a termination and an affiliation, if they moved between two distinct accounts.

AFFILIATION class-attribute instance-attribute
AFFILIATION = 'affiliation'
CANCELLATION class-attribute instance-attribute
CANCELLATION = 'cancellation'
TERMINATION class-attribute instance-attribute
TERMINATION = 'termination'

queries

Queries for use in Marmot controllers.

InvalidInputDataError

Bases: ValueError

The input data is not valid.

analyze_discrepancy_fixes_for_marmot

analyze_discrepancy_fixes_for_marmot(discrepancies)

Analyze multiple discrepancies and suggest fixes.

Returns a mapping of user_id to fix proposition (or None if no fix found).

Source code in components/occupational_health/public/marmot/queries.py
@tracer_wrap()
def analyze_discrepancy_fixes_for_marmot(
    discrepancies: list[DiscrepancyForMarmot],
) -> dict[UserId, DiscrepancyFixProposition | None]:
    """
    Analyze multiple discrepancies and suggest fixes.

    Returns a mapping of user_id to fix proposition (or None if no fix found).
    """
    from sqlalchemy import select

    from components.occupational_health.internal.business_logic.contracting.queries.contracting import (
        get_contract_timeline_for_account,
    )
    from components.occupational_health.internal.business_logic.queries.affiliation.affiliation_tool import (
        detect_date_format,
    )
    from components.occupational_health.internal.business_logic.queries.affiliation.affiliations import (
        get_all_affiliations,
    )
    from components.occupational_health.internal.business_logic.queries.affiliation.discrepancy_fix_analysis import (
        analyze_discrepancy_fix,
    )
    from components.occupational_health.internal.models.occupational_health_mass_affiliation_log import (
        OccupationalHealthMassAffiliationLog,
    )
    from shared.helpers.db import current_session

    if not discrepancies:
        return {}

    # All discrepancies belong to the same account
    account_id = discrepancies[0].account_id

    # Fetch contract start date once
    timeline = get_contract_timeline_for_account(account_id)
    contract_start_date = (
        timeline.periods[0].validity_period.start_date
        if timeline and timeline.periods
        else None
    )

    # Fetch all mass affiliation logs for the account in one query, indexed by affiliation_id
    all_logs = current_session.scalars(
        select(OccupationalHealthMassAffiliationLog).where(
            OccupationalHealthMassAffiliationLog.account_id == account_id,
        )
    ).all()
    logs_by_affiliation_id = {log.affiliation_id: log for log in all_logs}

    # Detect a consistent date format across all log date strings
    all_date_strings: set[str] = set()
    for log in all_logs:
        for key in ("date_debut_de_contrat", "date_fin_de_contrat_optionnel"):
            value = log.extra_data.get(key)
            if isinstance(value, str) and value.strip():
                all_date_strings.add(value.strip())
    try:
        date_format = (
            detect_date_format(all_date_strings, ignore_non_date=True)
            if all_date_strings
            else "%Y-%m-%d"
        )
    except InvalidInputDataError as e:
        raise BaseErrorCode.invalid_arguments(description=str(e)) from e

    fixes: dict[UserId, DiscrepancyFixProposition | None] = {}

    for discrepancy in discrepancies:
        # Skip if missing data or ambiguous (multiple employments)
        if not discrepancy.profile_id or len(discrepancy.employments) != 1:
            fixes[discrepancy.user_id] = None
            continue

        affiliations = get_all_affiliations(
            discrepancy.profile_id, discrepancy.account_id
        )
        # Skip if no affiliation or ambiguous (multiple affiliations)
        if not affiliations or len(affiliations) > 1:
            fixes[discrepancy.user_id] = None
            continue

        mass_log = logs_by_affiliation_id.get(affiliations[0].affiliation_id)
        if not mass_log:
            fixes[discrepancy.user_id] = None
            continue

        fixes[discrepancy.user_id] = analyze_discrepancy_fix(
            discrepancy=discrepancy,
            affiliation_id=affiliations[0].affiliation_id,
            contract_start_date=contract_start_date,
            mass_log=mass_log,
            date_format=date_format,
        )

    return fixes

get_accounts_list_for_marmot

get_accounts_list_for_marmot()

Return the list of accounts with occupational health contracts for Marmot.

Source code in components/occupational_health/public/marmot/queries.py
@tracer_wrap()
def get_accounts_list_for_marmot() -> list[AccountForMarmot]:
    """
    Return the list of accounts with occupational health contracts for Marmot.
    """
    from components.occupational_health.internal.business_logic.contracting.queries.contracting import (
        get_all_account_ids_with_contract,
    )
    from components.occupational_health.public.dependencies import (
        get_app_dependency,
    )

    dependency = get_app_dependency()

    accounts = [
        AccountForMarmot(
            id=account_id,
            name=dependency.get_account_name(account_id=account_id),
        )
        for account_id in get_all_account_ids_with_contract()
    ]

    # Sort by name for consistent ordering
    return sorted(accounts, key=lambda a: a.name)

get_admin_accounts_from_user_for_marmot

get_admin_accounts_from_user_for_marmot(user_id)

Returns the list of accounts for the current admin user

Source code in components/occupational_health/public/marmot/queries.py
def get_admin_accounts_from_user_for_marmot(
    user_id: str,
) -> list[OccupationalHealthDashboardAdmin]:
    """
    Returns the list of accounts for the current admin user
    """
    return get_accounts_for_admin(UserId(user_id))

get_affiliated_members_for_marmot

get_affiliated_members_for_marmot(
    filter_by_account_id=None,
)

Return the list of affiliated members (including past ones) for display in Marmot.

Source code in components/occupational_health/public/marmot/queries.py
@tracer_wrap()
def get_affiliated_members_for_marmot(
    filter_by_account_id: AccountId | UUID | None = None,
) -> list[AffiliatedMemberForMarmot]:
    """
    Return the list of affiliated members (including past ones) for display in Marmot.
    """
    from components.affiliation_occ_health.public.api import (
        get_affiliation_list,
    )
    from components.occupational_health.internal.business_logic.queries.marmot import (
        get_affiliated_members_from_affiliations_for_marmot,
    )
    from components.occupational_health.internal.helpers.affiliation import (
        build_subscription_ref,
    )

    if filter_by_account_id:
        affiliations = get_affiliation_list(
            AffiliationService.OCCUPATIONAL_HEALTH,
            subscription_ref=build_subscription_ref(AccountId(filter_by_account_id)),
        )
    else:
        affiliations = get_affiliation_list(AffiliationService.OCCUPATIONAL_HEALTH)

    return get_affiliated_members_from_affiliations_for_marmot(
        affiliations=affiliations
    )

get_affiliation_decisions_for_marmot

get_affiliation_decisions_for_marmot(
    profile_service,
    account_id,
    decision=None,
    not_older_than=None,
)

Return the list of affiliation decisions, optionally filtered by decision type and date.

Source code in components/occupational_health/public/marmot/queries.py
@tracer_wrap()
@inject_profile_service
def get_affiliation_decisions_for_marmot(
    profile_service: ProfileService,
    account_id: AccountId,
    decision: AffiliationDecision | None = None,
    not_older_than: date | None = None,
) -> list[OccupationalHealthAffiliationDecisionWithProfileForMarmot]:
    """
    Return the list of affiliation decisions, optionally filtered by decision type and date.
    """
    company_ids = get_company_ids_in_account(account_id)
    query = (
        current_session.query(OccupationalHealthAffiliationDecision)  # noqa: ALN085
        .join(OccupationalHealthAffiliationDecision.rule, isouter=True)
        .join(OccupationalHealthAffiliationDecision.profile)
        .options(
            joinedload(OccupationalHealthAffiliationDecision.rule),
            joinedload(OccupationalHealthAffiliationDecision.profile),
            raiseload("*"),
        )
        .filter(
            cast(
                OccupationalHealthAffiliationDecision.employment_change_payload[
                    "core_employment_version"
                ]["company_id"].astext,
                Integer,
            ).in_(company_ids),
        )
        .order_by(OccupationalHealthAffiliationDecision.created_at.desc())
    )

    if decision:
        query = query.filter(OccupationalHealthAffiliationDecision.decision == decision)

    if not_older_than:
        query = query.filter(
            OccupationalHealthAffiliationDecision.created_at >= not_older_than
        )

    decisions = query.all()

    # Get all global profile IDs
    global_profile_ids = {decision.profile.global_profile_id for decision in decisions}  # type: ignore[union-attr]
    profiles = profile_service.get_profiles(profile_ids=global_profile_ids)
    profiles_per_global_id = {profile.id: profile for profile in profiles}

    # Get company names
    company_names = get_company_names_by_ids(set(company_ids))
    company_names_from_company_id_and_nic = _build_company_names_by_company_id_and_nic(
        [decision.employment_change_payload for decision in decisions]
    )

    return [
        _build_decision(
            decision,
            profiles_per_global_id.get(decision.profile.global_profile_id),  # type: ignore[union-attr]
            company_names,
            company_names_from_company_id_and_nic,
        )
        for decision in decisions
    ]

get_all_admins_for_account

get_all_admins_for_account(account_id, profile_service)

Get all admins for a given account.

Source code in components/occupational_health/internal/business_logic/queries/admin_dashboard/admin_authorization.py
@inject_profile_service
def get_all_admins_for_account(
    account_id: AccountId,
    profile_service: ProfileService,
) -> list[DashboardAdmin]:
    """
    Get all admins for a given account.
    """
    stmt = select(OccupationalHealthDashboardAdmin).filter(
        OccupationalHealthDashboardAdmin.account_id == account_id,
        OccupationalHealthDashboardAdmin.scope
        == OccupationalHealthDashboardAdminScope.ACCOUNT,
    )
    result = current_session.execute(stmt).scalars().all()

    user_ids = {admin.user_id for admin in result}
    profiles = profile_service.user_compat.get_user_profile_mapping(user_ids)
    profile_by_user_id = {
        str(user_id): profile for user_id, profile in profiles.user_to_profile.items()
    }

    return [
        DashboardAdmin(
            admin_id=admin.id,
            user_id=admin.user_id,
            # Will break if a user_id was merged into another one and no longer exists
            first_name=profile_by_user_id[admin.user_id].first_name,
            last_name=profile_by_user_id[admin.user_id].last_name,
            account_id=admin.account_id,
        )
        for admin in result
    ]

get_companies_with_affiliated_count_for_marmot

get_companies_with_affiliated_count_for_marmot(account_id)

Return the list of all companies in an account with their count of affiliated members.

Source code in components/occupational_health/public/marmot/queries.py
@tracer_wrap()
def get_companies_with_affiliated_count_for_marmot(
    account_id: AccountId | UUID,
) -> list[CompanyWithAffiliatedCount]:
    """
    Return the list of all companies in an account with their count of affiliated members.
    """
    from components.occupational_health.internal.business_logic.queries.employments import (
        get_employments_by_profile_id,
    )

    account_id = AccountId(account_id)

    # Get all companies in the account
    company_ids = get_company_ids_in_account(account_id)
    company_names = get_company_names_by_ids(company_ids=set(company_ids))

    # Get all affiliations for this account (including terminated, upcoming, but not cancelled ones).
    affiliations = get_all_affiliations_for_account(account_id=account_id)

    # Fetch employments
    profile_ids = {affiliation.profile_id for affiliation in affiliations}
    employments_by_profile_id = get_employments_by_profile_id(
        account_id=account_id,
        profile_ids=profile_ids,
        should_overlap_prevenir_contract=False,
    )

    # Count unique affiliated members per company and collect SIRETs
    company_affiliated_member_ids: dict[CompanyId, set[ProfileId]] = {}
    # Map: company_id -> siret -> set of profile_ids
    profile_ids_by_siret_by_company_id: dict[CompanyId, dict[str, set[ProfileId]]] = {}
    dependency = get_app_dependency()

    siren_per_company = {
        company_id: dependency.get_company_siren(company_id)
        for company_id in company_ids
    }
    siret_per_company = {
        company_id: dependency.get_company_siret(company_id)
        for company_id in company_ids
    }

    nic_per_employment_id = get_nic_for_employments(
        on_date_per_employment_id={
            employment.employment_id: clamp(
                utctoday(), employment.start_date, employment.end_date
            )
            for employments in employments_by_profile_id.values()
            for employment in employments
        }
    )

    for profile_id, employments in employments_by_profile_id.items():
        for employment in employments:
            company_id = CompanyId(employment.company_id)
            if company_id not in company_ids:
                raise RuntimeError(
                    f"Encountered employment from out-of-account company:"
                    f" {employment.employment_id} has company ID {company_id} which is not in account {company_ids}."
                )

            # Count members per company
            if company_id not in company_affiliated_member_ids:
                company_affiliated_member_ids[company_id] = set()
            company_affiliated_member_ids[company_id].add(profile_id)

            nic = nic_per_employment_id[employment.employment_id]
            if not nic:
                # Trust first the Employment Component, then the Company configuration
                siret = siret_per_company[company_id]
                if siret:
                    nic = siret[-5:]

            if nic:
                siren = siren_per_company[company_id]
                if siren:
                    siret = f"{siren}{nic}"

                    if company_id not in profile_ids_by_siret_by_company_id:
                        profile_ids_by_siret_by_company_id[company_id] = defaultdict(
                            set
                        )
                    profile_ids_by_siret_by_company_id[company_id][siret].add(
                        profile_id
                    )

    # Build the result list with all companies (even those with 0 affiliated members)
    result = []
    for company_id in company_ids:
        count = len(company_affiliated_member_ids.get(company_id, set()))
        profile_ids_by_siret = profile_ids_by_siret_by_company_id.get(company_id, {})

        company_data_per_siret = {
            siret: dependency.get_company_from_siret(siret=siret)
            for siret in profile_ids_by_siret.keys()
            if siret
        }

        # Fetch company names from DSN for each SIRET
        sirets = []
        for siret, profile_ids in profile_ids_by_siret.items():
            pretty_name = None
            company_data = company_data_per_siret.get(siret)
            if company_data and company_data.name:
                pretty_name = (
                    f"{company_data.name} ({company_data.city})"
                    if company_data.city
                    else company_data.name
                )

            sirets.append(
                SiretWithMemberCount(
                    siret=siret,
                    member_count=len(profile_ids),
                    pretty_name=pretty_name,
                )
            )

        # Sort SIRETs by member count descending, then by SIRET
        sirets.sort(key=lambda s: (-s.member_count, s.siret))

        result.append(
            CompanyWithAffiliatedCount(
                company_id=company_id,
                company_display_name=company_names.get(
                    company_id, f"Company {company_id}"
                ),
                ever_affiliated_members_count=count,
                sirets=sirets,
            )
        )

    # Sort by company display name for consistent ordering
    return sorted(result, key=lambda c: c.company_display_name)

get_contract_info_for_account

get_contract_info_for_account(account_id)

Get contract information for an account.

Source code in components/occupational_health/internal/business_logic/queries/affiliation/affiliation_tool.py
def get_contract_info_for_account(account_id: AccountId) -> ContractInfoResult:
    """Get contract information for an account."""
    try:
        account_name = get_account_name(account_id)
    except NoResultFound:
        raise BaseErrorCode.missing_resource(f"Account {account_id} not found")
    timeline = get_contract_timeline_for_account(account_id)

    contracts: list[ContractInfo] = []
    if timeline:
        for version in timeline:
            contracts.append(
                ContractInfo(
                    account_id=account_id,
                    start_date=version.validity_period.start_date,
                    end_date=version.validity_period.end_date,
                )
            )

    return ContractInfoResult(
        account_name=account_name,
        contracts=contracts,
    )

get_occupational_health_profile_data_for_marmot

get_occupational_health_profile_data_for_marmot(
    profile_id, account_id=None
)

Return all occupational health data for a profile, for display in Marmot.

Source code in components/occupational_health/public/marmot/queries.py
@tracer_wrap()
def get_occupational_health_profile_data_for_marmot(
    profile_id: ProfileId | UUID,
    account_id: AccountId | None = None,
) -> OccupationalHealthProfileData:
    """
    Return all occupational health data for a profile, for display in Marmot.
    """
    from components.occupational_health.external.global_profile import (
        get_user_id_from_occupational_health_profile_id,
    )
    from components.occupational_health.internal.business_logic.queries.marmot import (
        get_affiliated_members_from_affiliations_for_marmot,
    )
    from components.occupational_health.internal.business_logic.queries.visits.visits import (
        get_all_past_visits_from_all_sources_by_profile,
        get_computed_next_visit_result,
    )
    from components.occupational_health.internal.models.occupational_health_affiliation_decision import (
        OccupationalHealthAffiliationDecision,
    )
    from shared.helpers.db import current_session

    profile_id = ProfileId(profile_id)

    affiliations = get_all_affiliations(profile_id, account_id)
    affiliated_members = get_affiliated_members_from_affiliations_for_marmot(
        affiliations=affiliations
    )

    # Get past visits
    user_id = get_user_id_from_occupational_health_profile_id(profile_id=profile_id)
    past_visits_by_profile_id = get_all_past_visits_from_all_sources_by_profile(
        profile_ids=[profile_id], user_id_mapping={user_id: profile_id}
    )
    past_visits = past_visits_by_profile_id[profile_id]

    # Get all affiliation decisions
    affiliation_decisions = (
        current_session.query(OccupationalHealthAffiliationDecision)  # noqa: ALN085
        .filter(OccupationalHealthAffiliationDecision.profile_id == profile_id)
        .options(
            joinedload(OccupationalHealthAffiliationDecision.rule),
            raiseload("*"),
        )
    )

    # Serialize affiliation decisions
    serialized_decisions = []
    for decision in affiliation_decisions:
        serialized_decisions.append(
            OccupationalHealthAffiliationDecisionForMarmot(
                id=decision.id,
                created_at=decision.created_at.date(),
                decision=decision.decision,
                employment_change_payload=decision.employment_change_payload,
                core_employment_id=decision.core_employment_id,
                rule_id=decision.rule_id,
                rule_name=decision.rule.name if decision.rule else None,
            )
        )

    next_visit_result = get_computed_next_visit_result(
        profile_id=profile_id,
        affiliations=affiliations,
    )

    return OccupationalHealthProfileData(
        profile_id=profile_id,
        past_visits=past_visits,
        affiliations=affiliated_members,
        affiliation_decisions=serialized_decisions,
        next_visit_result=next_visit_result,
    )

get_recent_affiliation_movements

get_recent_affiliation_movements(
    profile_service, account_id, not_older_than
)

Return the list of affiliation decisions made since the given date for the given account.

Source code in components/occupational_health/public/marmot/queries.py
@inject_profile_service
def get_recent_affiliation_movements(
    profile_service: ProfileService,
    account_id: AccountId | UUID,
    not_older_than: date,
) -> list[AffiliationMovementForMarmot]:
    """
    Return the list of affiliation decisions made since the given date for the given account.
    """
    account_id = AccountId(account_id)

    movements = (
        current_session.query(  # noqa: ALN085
            OccupationalHealthProfile.global_profile_id,
            OccupationalHealthAffiliationMovement.id,
            OccupationalHealthAffiliationMovement.account_id,
            OccupationalHealthAffiliationMovement.created_at,
            OccupationalHealthAffiliationMovement.movement_type,
            OccupationalHealthAffiliationMovement.affiliation_decision_id,
            OccupationalHealthAffiliationMovement.profile_id,
            OccupationalHealthAffiliationMovement.employment_change_payload,
        )
        .select_from(OccupationalHealthAffiliationMovement)
        .join(OccupationalHealthAffiliationMovement.profile)
        .filter(
            OccupationalHealthAffiliationMovement.account_id == account_id,
            OccupationalHealthAffiliationMovement.created_at >= not_older_than,
        )
        .options(raiseload("*"))
        .all()
    )

    global_profile_ids = {movement.global_profile_id for movement in movements}
    profiles = profile_service.get_profiles(
        profile_ids=global_profile_ids,
    )
    profiles_per_global_id = {profile.id: profile for profile in profiles}

    company_ids = {
        movement.employment_change_payload["core_employment_version"]["company_id"]
        for movement in movements
        if movement.employment_change_payload
    }
    company_names = get_company_names_by_ids(company_ids)

    company_names_from_company_id_and_nic = _build_company_names_by_company_id_and_nic(
        [movement.employment_change_payload for movement in movements]
    )

    affiliation_movements_to_return: list[AffiliationMovementForMarmot] = []

    for movement in movements:
        if not movement.employment_change_payload:
            movement_company_id = None
            movement_company_name = None
            company_name_enriched_from_dsn = False
        else:
            movement_company_id = movement.employment_change_payload[
                "core_employment_version"
            ]["company_id"]
            nic = extract_nic_from_employment_change(
                EmploymentChange.from_dict(movement.employment_change_payload)
            )
            company_name_from_dsn = (
                company_names_from_company_id_and_nic.get((movement_company_id, nic))
                if nic
                else None
            )
            movement_company_name = (
                company_name_from_dsn
                # Fall back on the usual Company.name data
                or company_names[movement_company_id]
            )
            company_name_enriched_from_dsn = company_name_from_dsn is not None

        affiliation_movements_to_return.append(
            AffiliationMovementForMarmot(
                movement_id=movement.id,
                account_id=account_id,
                birth_date=profiles_per_global_id[
                    movement.global_profile_id
                ].birth_date,
                first_name=profiles_per_global_id[movement.global_profile_id].first_name
                or "",
                last_name=profiles_per_global_id[movement.global_profile_id].last_name
                or "",
                created_at=movement.created_at,
                profile_id=movement.profile_id,
                movement_type=movement.movement_type,
                date_of_movement=_get_date_of_movement(movement),
                decision_id=movement.affiliation_decision_id,
                company_id=movement_company_id,
                company_name=movement_company_name,
                company_name_enriched_from_dsn=company_name_enriched_from_dsn,
            )
        )

    return affiliation_movements_to_return

parse_and_match_spreadsheet

parse_and_match_spreadsheet(
    profile_service,
    account_id,
    spreadsheet_data,
    raise_on_invalid_data=False,
)

Parse spreadsheet data and return matching results (members that will could affiliate)

Source code in components/occupational_health/internal/business_logic/queries/affiliation/affiliation_tool.py
@inject_profile_service
@tracer_wrap()
def parse_and_match_spreadsheet(
    profile_service: ProfileService,
    account_id: AccountId,
    spreadsheet_data: str,
    raise_on_invalid_data: bool = False,
) -> ParsingAndMatchingResult:
    """
    Parse spreadsheet data and return matching results (members that will could affiliate)
    """
    from components.fr.internal.fr_employment_data_sources.business_logic.queries.user import (  # noqa: ALN043 - function should be made public (or existing global one made compatible)
        get_user_from_personal_informations,
    )
    from components.fr.internal.fr_employment_data_sources.entities.employee_personal_informations import (  # noqa: ALN043 - function should be made public (or existing global one made compatible)
        EmployeePersonalInformations,
    )

    parsing_warnings = []

    rows = parse_spreadsheet_data(spreadsheet_data)

    new_warnings = _sanitize_ssns(
        rows,
        raise_on_invalid_or_duplicate_ssn=raise_on_invalid_data,
    )
    parsing_warnings.extend(new_warnings)

    # Raise on duplicate proposed user IDs
    _sanitize_data(rows)

    # Get all affiliations for the account to check if users are already affiliated
    all_affiliations = get_all_affiliations_for_account(account_id)

    # Build a set of affiliated user IDs
    affiliations_by_user_ids: dict[UserId, Affiliation] = {}
    for affiliation in all_affiliations:
        # TODO: improve performance (N+1 query)
        user_id = get_user_id_from_occupational_health_profile_id(
            affiliation.profile_id
        )
        affiliations_by_user_ids[user_id] = affiliation

    # Find the contract start date
    contract_timeline = get_contract_timeline_for_account(account_id)
    if not contract_timeline:
        raise ValueError(f"Couldn't find any contract for account {account_id}")
    default_start_date = contract_timeline.start_date

    # Collect user IDs for employment lookup
    user_ids_for_employment: set[UserId] = set()

    # Detect the date format for the entire file
    all_birthdates: set[str] = set()
    all_visit_dates: set[str] = set()
    all_contract_start_dates: set[str] = set()
    all_contract_end_dates: set[str] = set()
    for row in rows:
        # Collect birthdates
        if row.get("date_naissance_gsheet"):
            all_birthdates.add(row["date_naissance_gsheet"])
        # Collect last visit dates
        if row.get("date_de_derniere_visite"):
            all_visit_dates.add(row["date_de_derniere_visite"].strip())
        # Collect contract start dates
        if row.get("date_debut_de_contrat"):
            all_contract_start_dates.add(row["date_debut_de_contrat"].strip())
        # Collect contract end dates
        if row.get("date_fin_de_contrat_optionnel"):
            all_contract_end_dates.add(row["date_fin_de_contrat_optionnel"].strip())

    birthdate_format = detect_date_format(all_birthdates)
    visit_date_format = detect_date_format(all_visit_dates)
    contract_start_date_format = detect_date_format(all_contract_start_dates)
    contract_end_date_format = detect_date_format(all_contract_end_dates)

    matches: list[MemberToAffiliate] = []

    for row_index, row in enumerate(rows):
        # TODO: support aliases for column names
        first_name = row.get("prenom_gsheet", "").strip()
        last_name = row.get("nom_gsheet", "").strip()
        birthdate_str = row.get("date_naissance_gsheet")
        birthdate = (
            datetime.strptime(birthdate_str, birthdate_format).date()
            if birthdate_str
            else None
        )
        ssn = row.get("ssn_original", "").strip()
        email_pro_gsheet_raw = row.get("email_pro_gsheet", "").strip()
        email_perso_gsheet_raw = row.get("email_perso_gsheet", "").strip()

        # Validate and normalize professional email
        email_pro_gsheet, email_pro_warning = _validate_email(
            row_index=row_index,
            email=email_pro_gsheet_raw,
            field_name="email_pro_gsheet",
        )
        if email_pro_warning is not None:
            parsing_warnings.append(email_pro_warning)

        # Validate and normalize personal email
        email_perso_gsheet, email_perso_warning = _validate_email(
            row_index=row_index,
            email=email_perso_gsheet_raw,
            field_name="email_perso_gsheet",
        )
        if email_perso_warning is not None:
            parsing_warnings.append(email_perso_warning)

        # Parse risk category from 'suivi_si_sia_sir' column
        risk_category, risk_category_warning = parse_risk_category(
            row_index=row_index,
            risk_category_str=row.get("suivi_si_sia_sir"),
        )
        if risk_category_warning is not None:
            parsing_warnings.append(risk_category_warning)

        # Parse last visit information
        last_visit, warning = _parse_last_visit(
            row_index=row_index,
            last_visit_date_str=row.get("date_de_derniere_visite", "").strip(),
            last_visit_type_str=row.get("type_de_visite", "").strip(),
            visit_date_format=visit_date_format,
        )
        if warning is not None:
            parsing_warnings.append(warning)

        # Parse contract start date (per-member override)
        contract_start_date_str = row.get("date_debut_de_contrat", "").strip()
        if contract_start_date_str:
            try:
                affiliation_start_date = datetime.strptime(
                    contract_start_date_str, contract_start_date_format
                ).date()
                # clamp on contract start_date
                affiliation_start_date = max(affiliation_start_date, default_start_date)
            except ValueError:
                parsing_warnings.append(
                    ParsingWarning(
                        row_index=row_index,
                        value=contract_start_date_str,
                        message=f"Invalid date format for contract start date: {contract_start_date_str} - expected format: {contract_start_date_format}",
                        category=ParsingWarningCategory.INVALID_CONTRACT_START_DATE,
                    )
                )
                affiliation_start_date = default_start_date
        else:
            affiliation_start_date = default_start_date
        # Parse optional contract end date
        contract_end_date_str = row.get("date_fin_de_contrat_optionnel", "").strip()
        affiliation_end_date: date | None = None
        if contract_end_date_str:
            try:
                affiliation_end_date = datetime.strptime(
                    contract_end_date_str, contract_end_date_format
                ).date()
            except ValueError:
                parsing_warnings.append(
                    ParsingWarning(
                        row_index=row_index,
                        value=contract_end_date_str,
                        message=f"Invalid date format for contract end date: {contract_end_date_str} - expected format: {contract_end_date_format}",
                        category=ParsingWarningCategory.INVALID_CONTRACT_END_DATE,
                    )
                )

        # Skip row if end date is before start date
        if affiliation_end_date and affiliation_end_date < affiliation_start_date:
            parsing_warnings.append(
                ParsingWarning(
                    row_index=row_index,
                    value=contract_end_date_str,
                    message=f"End date {affiliation_end_date} is before contract or affiliation start date {affiliation_start_date}, skipping row",
                    category=ParsingWarningCategory.END_DATE_BEFORE_START_DATE,
                )
            )
            continue

        # Create EmployeePersonalInformations for matching
        personal_informations = EmployeePersonalInformations(
            first_name=first_name,
            known_last_names={
                # we could inject the birth last name if available
                last_name
            },
            email=email_pro_gsheet,
            ssn=ssn.strip(),
            birth_date=birthdate,
            auto_compute_ssn_key=True,  # Auto-compute SSN key if needed
            # TODO: consider using siren / company external IDs when available
        )

        # Match user (company is optional)
        # NOTE: might need to move this back into the fr component / component injection until it gets globalized
        matching_result = get_user_from_personal_informations(
            personal_informations=personal_informations,
            company=None,
        )

        # Get matched user data if found
        if matching_result.matched_user_id is not None:
            matched_global_profile_id = (
                profile_service.user_compat.get_profile_id_by_user_id(
                    user_id=matching_result.matched_user_id
                )
            )
            if matched_global_profile_id is None:
                raise BaseErrorCode.missing_resource(
                    f"User ID {matching_result.matched_user_id} has no global profile"
                )

            matched_profile = profile_service.get_profile(
                profile_id=matched_global_profile_id
            )
        else:
            matched_profile = None

        proposed_user_id = row.get("alan_user_id", "").strip()
        proposed_user_name = None
        proposed_birthdate = None
        if proposed_user_id:
            if (
                matching_result.matched_user_id
                and matched_profile
                and str(matching_result.matched_user_id) == proposed_user_id
            ):
                # Everybody agrees, let's reuse the same name
                proposed_user_name = matched_profile.full_name
                proposed_birthdate = matched_profile.birth_date
            else:
                proposed_global_profile_id = (
                    profile_service.user_compat.get_profile_id_by_user_id(
                        user_id=UserId(proposed_user_id)
                    )
                )
                if proposed_global_profile_id is None:
                    raise BaseErrorCode.missing_resource(
                        f"User ID {proposed_user_id} has no global profile"
                    )

                proposed_profile = profile_service.get_profile(
                    profile_id=proposed_global_profile_id
                )
                if proposed_profile:
                    proposed_user_name = proposed_profile.full_name
                    proposed_birthdate = proposed_profile.birth_date

        relevant_user_ids = {
            UserId(str(matching_result.matched_user_id))
            if matching_result.matched_user_id
            else None,
            UserId(proposed_user_id),
        }
        relevant_user_ids -= {None}
        relevant_affiliations = []
        for relevant_user_id in relevant_user_ids:
            if relevant_user_id in affiliations_by_user_ids:
                affiliation = affiliations_by_user_ids[relevant_user_id]
                relevant_affiliations.append(
                    RelevantAffiliation(
                        user_id=relevant_user_id,
                        start_date=affiliation.start_date,
                        end_date=affiliation.end_date,
                    )
                )

        user_ids_for_employment.update(relevant_user_ids)  # type: ignore[arg-type]

        extra_data_from_row = row.copy()
        # No need to store some of the columns
        extra_data_from_row.pop("marmot_user_link", None)
        extra_data_from_row.pop("marmot_account_link", None)
        extra_data_from_row.pop("marmot_company_link", None)

        matches.append(
            MemberToAffiliate(
                row_index=row_index,
                first_name=first_name,
                last_name=last_name,
                birthdate=birthdate,
                ssn=ssn,
                professional_email=email_pro_gsheet or None,
                personal_email=email_perso_gsheet or None,
                # return None instead of an empty string
                proposed_user_id=proposed_user_id or None,
                proposed_user_name=proposed_user_name,
                proposed_birthdate=proposed_birthdate,
                matching_result=matching_result,
                matched_name=matched_profile.full_name if matched_profile else None,
                matched_birthdate=(
                    matched_profile.birth_date if matched_profile else None
                ),
                affiliations=relevant_affiliations,
                employments=[],  # Will be filled below
                target_start_date=affiliation_start_date,
                target_end_date=affiliation_end_date,
                last_visit=last_visit,
                risk_category=risk_category,
                extra_data=extra_data_from_row,
            )
        )

    # Get employments for all matched users and inject them into matches
    if user_ids_for_employment:
        employments_by_user_id = get_employments_by_user_id(
            account_ids={account_id},
            user_ids=user_ids_for_employment,
            should_overlap_prevenir_contract=False,
        )

        # Build company names
        company_ids = set()
        for employments in employments_by_user_id.values():
            for employment in employments:
                company_ids.add(CompanyId(employment.company_id))
        company_names = get_company_names_by_ids(company_ids)

        # Fill employments in matches
        for match in matches:
            match_user_id: str | int | None = (
                match.matching_result.matched_user_id or match.proposed_user_id
            )
            if match_user_id is None:
                continue
            user_employments = employments_by_user_id.get(
                UserId(str(match_user_id)), []
            )
            match.employments = [
                EmploymentInfo(
                    company_id=employment.company_id,
                    company_name=company_names.get(
                        CompanyId(employment.company_id),
                        "Unknown!",
                    ),
                    start_date=employment.start_date,
                    end_date=employment.end_date,
                )
                for employment in user_employments
            ]

            # Clamp target_start_date to earliest employment start date
            if match.employments:
                earliest_employment_start = min(e.start_date for e in match.employments)
                match.target_start_date = max(
                    match.target_start_date, earliest_employment_start
                )

    # Add warning if we suggest the same user ID for multiple users
    parsing_warnings.extend(_check_for_duplicate_matching_user_ids(matches))

    return ParsingAndMatchingResult(
        matches=matches,
        warnings=parsing_warnings,
    )

components.occupational_health.public.queries

billed_entity

get_flask_admin_url_from_contract_ref

get_flask_admin_url_from_contract_ref(contract_ref)

Return the flask admin url linking to a billed entity from the contract reference (used for invoice)

Source code in components/occupational_health/public/queries/billed_entity.py
def get_flask_admin_url_from_contract_ref(contract_ref: str) -> str:
    """
    Return the flask admin url linking to a billed entity from the contract reference (used for invoice)
    """
    account_id = contract_ref.split(":")[-2]
    siret_or_siren = contract_ref.split(":")[-1]
    if len(siret_or_siren) == SIREN_LENGTH:
        return f"{current_config['BASE_URL']}/admin/occupationalhealthbilledentity/?flt0_account_id_equals={account_id}&flt1_siren_equals={siret_or_siren}"
    elif len(siret_or_siren) == SIRET_LENGTH:
        return f"{current_config['BASE_URL']}/admin/occupationalhealthbilledentity/?flt0_account_id_equals={account_id}&flt1_siret_equals={siret_or_siren}"
    else:
        raise ValueError(f"Incorrect siret or siren in contract_ref {contract_ref}")

billing

Set of queries used by the Billing stack.

See https://github.com/alan-eu/Topics/discussions/30344?sort=old#discussioncomment-14080373 ⧉

ContractRefNotFound

Bases: Exception

A contract reference was not found.

UnableToDetermineCustomerToBill

Bases: Exception

Unable to determine which customer to bill.

build_invoices_data_for_period

build_invoices_data_for_period(
    billing_period, billing_year
)

Build the data required to generate all invoices for the given billing period and invoice year.

Parameters:

Name Type Description Default
billing_period OccupationalHealthBillingPeriod

The billing period to build data for.

required
billing_year int

The year of the billing period to build data for.

required

The data is not guaranteed to be stable over time for past billing periods, as retroactive affiliation changes can occur.

Returns:

Name Type Description
tuple tuple[list[InvoiceToGenerateData], dict[UUID, dict[str, int | str]]]

A tuple containing: - list[InvoiceToGenerateData]: The data about all invoices to generate for the given period. - dict[UUID, dict[str, int | str]]: Statistics per account with keys "account_name", "invoices_count" and "errors_count".

Source code in components/occupational_health/public/queries/billing.py
def build_invoices_data_for_period(
    billing_period: OccupationalHealthBillingPeriod,
    billing_year: int,
) -> tuple[list[InvoiceToGenerateData], dict[UUID, dict[str, int | str]]]:
    """
    Build the data required to generate all invoices for the given billing period and invoice year.

    Args:
        billing_period: The billing period to build data for.
        billing_year: The year of the billing period to build data for.

    The data is not guaranteed to be stable over time for past billing periods, as
    retroactive affiliation changes can occur.

    Returns:
        tuple: A tuple containing:
            - list[InvoiceToGenerateData]: The data about all invoices to generate for the given period.
            - dict[UUID, dict[str, int | str]]: Statistics per account with keys "account_name", "invoices_count" and "errors_count".
    """
    from components.occupational_health.internal.business_logic.contracting.queries.contracting import (
        get_all_account_ids_with_contract,
    )

    with replica_db_context():
        # Get all customers with an active contract during the given billing period
        account_ids = get_all_account_ids_with_contract(
            active_during_period=billing_period.to_validity_period(year=billing_year),
        )

        dependency = get_app_dependency()
        invoice_to_generate_data_list: list[InvoiceToGenerateData] = []
        stats_per_account: dict[UUID, dict[str, int | str]] = {}

        for account_id in account_ids:
            invoices, errors = build_invoices_data_for_period_and_account(
                billing_period=billing_period,
                billing_year=billing_year,
                account_id=account_id,
            )
            invoice_to_generate_data_list.extend(invoices)
            stats_per_account[account_id] = {
                "account_name": dependency.get_account_name(account_id=account_id),
                "invoices_count": len(invoices),
                "errors_count": len(errors),
            }

        return invoice_to_generate_data_list, stats_per_account

build_invoices_data_for_period_and_account

build_invoices_data_for_period_and_account(
    billing_period, billing_year, account_id
)

Build the data required to generate all invoices for the given billing period, invoice year and account.

Parameters:

Name Type Description Default
billing_period OccupationalHealthBillingPeriod

The billing period to build data for.

required
billing_year int

The year of the billing period to build data for.

required
account_id UUID | AccountId

The account ID to build invoices for.

required

The data is not guaranteed to be stable over time for past billing periods, as retroactive affiliation changes can occur.

Returns:

Name Type Description
tuple tuple[list[InvoiceToGenerateData], dict[UUID, str]]

A tuple containing: - list[InvoiceToGenerateData]: The data about all invoices to generate for the given period. - dict[UUID, str]: Mapping of affiliation IDs to rejection reasons for affiliations that couldn't be matched.

Source code in components/occupational_health/public/queries/billing.py
def build_invoices_data_for_period_and_account(
    billing_period: OccupationalHealthBillingPeriod,
    billing_year: int,
    account_id: UUID | AccountId,
) -> tuple[list[InvoiceToGenerateData], dict[UUID, str]]:
    """
    Build the data required to generate all invoices for the given billing period, invoice year and account.

    Args:
        billing_period: The billing period to build data for.
        billing_year: The year of the billing period to build data for.
        account_id: The account ID to build invoices for.

    The data is not guaranteed to be stable over time for past billing periods, as
    retroactive affiliation changes can occur.

    Returns:
        tuple: A tuple containing:
            - list[InvoiceToGenerateData]: The data about all invoices to generate for the given period.
            - dict[UUID, str]: Mapping of affiliation IDs to rejection reasons for affiliations that couldn't be matched.
    """
    from components.occupational_health.internal.business_logic.billing.pricing import (
        YEARLY_PRICE_PER_EMPLOYEE_IN_CENTS,
    )
    from components.occupational_health.internal.business_logic.queries.affiliation.affiliations import (
        get_all_affiliations_for_account,
    )
    from components.occupational_health.internal.helpers.affiliation import (
        build_profile_id_from_affiliable_ref,
    )

    account_id = AccountId(account_id)

    # 1. Get the customer data, including the billable entities.
    customers = get_billed_entities_for_account(account_id=account_id)

    # 2. Get all affiliated members overlapping with billing period for the given account, spread them over the billable entities depending on their SIRET.
    affiliations_for_whole_account = get_all_affiliations_for_account(
        account_id=account_id
    )

    # Sort affiliations by start_date to process them in chronological order
    affiliations_sorted = sorted(
        affiliations_for_whole_account, key=lambda aff: aff.start_date
    )

    # Pre-fetch billing affiliation overrides for all affiliations in this account
    # Assumption: the number of such overrides is small (a handful per account max),
    # making it more efficient to fetch them all at once rather than querying one by
    # one during the loop below.
    all_affiliation_ids = {aff.affiliation_id for aff in affiliations_sorted}
    billing_overrides = _get_billing_overrides(all_affiliation_ids)

    # Compute the start and end dates, between which we'll consider period to bill
    to_bill_period = billing_period.to_validity_period(year=billing_year)

    customer_by_contract_ref = {
        customer.contract_ref: customer for customer in customers
    }
    # Index customers by profile_id to detect duplicates
    customers_per_profile: defaultdict[ProfileId, list[BilledEntityData]] = defaultdict(
        list
    )
    errors = 0
    error_affiliation_ids: dict[UUID, str] = {}
    for affiliation in affiliations_sorted:
        if affiliation.is_ever_active_between(
            period_start=to_bill_period.start_date,
            period_end=to_bill_period.end_date,
        ):
            # Calculate the number of days the affiliation is active during the billing period
            affiliation_duration = _find_duration_of_affiliation_on_period(
                affiliation, to_bill_period, billing_period
            )

            # Skip affiliations that are active for 3 days or less
            if affiliation_duration and affiliation_duration <= 3:
                current_logger.info(
                    f"Skipping affiliation {affiliation.affiliation_id} - only active for {affiliation_duration} days"
                )
                continue

            # This affiliation is active at some point during the billing period:
            # take it into account for billing.
            profile_id = build_profile_id_from_affiliable_ref(
                affiliation.affiliable_ref
            )
            try:
                # Find the right customer for this affiliation
                matching_customer = _find_customer_to_bill_for_affiliation(
                    account_id=account_id,
                    affiliation=affiliation,
                    profile_id=profile_id,
                    customers=customers,
                    to_bill_period=to_bill_period,
                )
            except UnableToDetermineCustomerToBill as e:
                current_logger.warning(
                    f"Unable to determine customer to bill for {affiliation} based on employment data."
                )

                # Check if there's a manual billing override for this affiliation
                override_billed_entity_id = billing_overrides.get(
                    affiliation.affiliation_id
                )
                if override_billed_entity_id is not None:
                    override_customer = one(
                        customers,
                        predicate=lambda c: c.id == override_billed_entity_id,  # noqa: B023
                        message=f"Unable to find BilledEntity {override_billed_entity_id} for BillingAffiliationOverride of affiliation {affiliation.affiliation_id}",
                    )
                    current_logger.info(
                        f"Found a BillingAffiliationOverride to determine customer to bill for affiliation {affiliation.affiliation_id} -> billing to {override_customer.entity_name}.",
                        affiliation_id=affiliation.affiliation_id,
                        billed_entity_id=override_billed_entity_id,
                        original_error=str(e),
                    )
                    customers_per_profile[profile_id].append(override_customer)
                else:
                    errors += 1
                    error_affiliation_ids[affiliation.affiliation_id] = str(e)
            else:
                # Collect customer for each profile_id if no error
                customers_per_profile[profile_id].append(matching_customer)

    # TODO: ensure transfers are properly handled (eg unit test?)

    if errors:
        current_logger.warning(
            f"Unable to find the customer to bill for {errors} affiliations in account {account_id}"
        )

    # 3. Process customers per profile and select the final customer to bill
    profile_ids_per_customer_ref: defaultdict[str, set[str]] = defaultdict(set)

    for profile_id, customer_list in customers_per_profile.items():
        uniq_contract_ref = {customer.contract_ref for customer in customer_list}
        if len(uniq_contract_ref) > 1:
            current_logger.info(
                f"Multiple customers found for profile<{profile_id}> : {', '.join(uniq_contract_ref)}, taking the first one"
            )

        # Take the first customer (from earliest affiliation due to sorting)
        selected_customer = customer_list[0]
        profile_ids_per_customer_ref[selected_customer.contract_ref].add(
            str(profile_id)
        )

    # 4. Package the data
    # Iterate over all customers, not just those with profiles
    invoice_data = []
    for customer_contract_ref, customer in customer_by_contract_ref.items():
        profile_ids = profile_ids_per_customer_ref.get(customer_contract_ref, set())
        invoice_data.append(
            InvoiceToGenerateData(
                billing_period=billing_period,
                billing_year=billing_year,
                contract_type="occupational_health",
                contract_ref=customer_contract_ref,
                entity_name=customer.entity_name,
                siren=customer.siren,
                siret=customer.siret,
                contract_yearly_price_per_employee_in_cents=YEARLY_PRICE_PER_EMPLOYEE_IN_CENTS,
                contract_has_installment_plan=(
                    customer.has_installment_plan_per_year or {}
                ).get(str(billing_year)),
                references_of_employees_affiliated_over_the_year=profile_ids,
            )
        )

    return invoice_data, error_affiliation_ids

get_active_employee_count_for_account

get_active_employee_count_for_account(account_id)

Return the number of currently active affiliations for the given account.

Note: billing is the first external consumer of this function which is why it was defined in billing.py - don't hesitate to move it

Source code in components/occupational_health/public/queries/billing.py
def get_active_employee_count_for_account(account_id: AccountId | UUID) -> int:
    """
    Return the number of currently active affiliations for the given account.

    Note: billing is the first external consumer of this function which is why it was
    defined in billing.py - don't hesitate to move it
    """
    from components.occupational_health.internal.business_logic.queries.affiliation.affiliations import (
        get_all_affiliations_for_account,
    )

    return len(
        get_all_affiliations_for_account(
            account_id=AccountId(account_id),
            only_active_on=utctoday(),
        )
    )

get_billed_entities_for_account

get_billed_entities_for_account(account_id)

Get all billed entities for a given account.

Source code in components/occupational_health/public/queries/billing.py
def get_billed_entities_for_account(
    account_id: UUID,
) -> list["BilledEntityData"]:
    """
    Get all billed entities for a given account.
    """
    query = select(OccupationalHealthBilledEntity).where(
        OccupationalHealthBilledEntity.account_id == AccountId(account_id)
    )
    billed_entities: Sequence[OccupationalHealthBilledEntity] = (
        current_session.execute(query).scalars().all()
    )

    serialized_billed_entities: list[BilledEntityData] = []

    for billed_entity in billed_entities:
        serialized_billed_entities.append(
            BilledEntityData(
                id=billed_entity.id,
                entity_name=billed_entity.entity_name,
                siren=billed_entity.siren,
                siret=billed_entity.siret,
                postal_address=CustomerAddress(
                    street=billed_entity.postal_street,
                    postal_code=billed_entity.postal_code,
                    city=billed_entity.postal_city,
                    country_code=billed_entity.postal_country_code,
                ),
                account_id=billed_entity.account_id,
                has_installment_plan_per_year=billed_entity.has_installment_plan_per_year,
                contract_ref=billed_entity.contract_ref,
                subscription_ref=billed_entity.subscription_ref,
                emails_to_notify=billed_entity.emails_to_notify,
            )
        )

    return serialized_billed_entities

get_customer_data

get_customer_data(contract_ref)

Fetch customer data for the billing system.

Until we decide how to store this customer data (on the Billing side?), here is a query to serve it.

Parameters:

Name Type Description Default
contract_ref str

The contract reference to look up customer data for

required

Returns:

Name Type Description
CustomerData CustomerData

The customer data for the given contract reference

Raises:

Type Description
ContractRefNotFound

If the contract reference is not found in the customer data mapping

Source code in components/occupational_health/public/queries/billing.py
def get_customer_data(contract_ref: str) -> CustomerData:
    """
    Fetch customer data for the billing system.

    Until we decide how to store this customer data (on the Billing side?), here is a query to serve it.

    Args:
        contract_ref: The contract reference to look up customer data for

    Returns:
        CustomerData: The customer data for the given contract reference

    Raises:
        ContractRefNotFound: If the contract reference is not found in the customer data mapping
    """
    query = select(OccupationalHealthBilledEntity).where(
        OccupationalHealthBilledEntity.contract_ref == contract_ref
    )

    with replica_db_context():
        billed_entity = current_session.scalars(query).one_or_none()

    if billed_entity is None:
        raise ContractRefNotFound(
            f"Customer data not found for contract reference: {contract_ref}"
        )

    return CustomerData(
        entity_name=billed_entity.entity_name,
        siren=billed_entity.siren,
        siret=billed_entity.siret,
        postal_address=CustomerAddress(
            street=billed_entity.postal_street,
            postal_code=billed_entity.postal_code,
            city=billed_entity.postal_city,
            country_code=billed_entity.postal_country_code,
        ),
    )

get_profile_list_from_invoice_data

get_profile_list_from_invoice_data(invoices)

Build a list of profiles with customer names from invoice data.

Parameters:

Name Type Description Default
invoices list[InvoiceToGenerateData]

List of invoice objects, each with contract_ref and references_of_employees_affiliated_over_the_year attributes.

required

Returns:

Type Description
list[dict[str, str | ProfileId]]

List of dicts with profile_id, first_name, last_name, and customer_name.

Source code in components/occupational_health/public/queries/billing.py
def get_profile_list_from_invoice_data(
    invoices: list[InvoiceToGenerateData],
) -> list[dict[str, str | ProfileId]]:
    """
    Build a list of profiles with customer names from invoice data.

    Args:
        invoices: List of invoice objects, each with contract_ref and references_of_employees_affiliated_over_the_year attributes.

    Returns:
        List of dicts with profile_id, first_name, last_name, and customer_name.
    """
    from uuid import UUID

    from components.global_profile.public.api import ProfileService
    from components.occupational_health.external.global_profile import (
        get_global_profile_ids_by_occupational_health_profile_id_mapping,
    )

    # Build profile list with customer names
    profile_to_customer_name: dict[ProfileId, str] = {}
    all_profile_ids: set[ProfileId] = set()
    for invoice in invoices:
        customer_data = get_customer_data(invoice.contract_ref)
        for profile_id_str in invoice.references_of_employees_affiliated_over_the_year:
            profile_id = ProfileId(UUID(profile_id_str))
            all_profile_ids.add(profile_id)
            profile_to_customer_name[profile_id] = customer_data.entity_name

    # Get global_profile_ids from occupational health profile_ids
    profile_id_to_global_profile_id = (
        get_global_profile_ids_by_occupational_health_profile_id_mapping(
            profile_ids=all_profile_ids
        )
    )

    # Get first_name/last_name from global profiles using global_profile_ids directly
    global_profile_ids = list(profile_id_to_global_profile_id.values())
    profile_service = ProfileService.create()
    global_profiles = profile_service.get_profiles(profile_ids=global_profile_ids)
    # Build lookup by global_profile_id for easier access
    global_profile_by_id = {profile.id: profile for profile in global_profiles}

    profiles_with_customer_name: list[dict[str, str | ProfileId]] = []
    for profile_id in all_profile_ids:
        global_profile_id = profile_id_to_global_profile_id[profile_id]
        profile = global_profile_by_id.get(global_profile_id)
        first_name = (profile.first_name or "⁉️") if profile else "⁉️"
        last_name = (profile.last_name or "⁉️") if profile else "⁉️"
        profiles_with_customer_name.append(
            {
                "profile_id": profile_id,
                "first_name": first_name,
                "last_name": last_name,
                "customer_name": profile_to_customer_name[profile_id],
            }
        )
    return profiles_with_customer_name

doctolib_matching

Public facade for Doctolib export matching.

Thin re-export layer so the Marmot controller (in components/fr/) can import everything from a single public module.

build_doctolib_patient_id_to_user_id_lookup

build_doctolib_patient_id_to_user_id_lookup(
    predictable_rows,
)

Build doctolib_patient_id -> user_id from predictable gsheet rows.

Only includes entries where helper_docto_patient_id maps to exactly 1 distinct user_id.

Source code in components/occupational_health/internal/business_logic/doctolib/profile_indexes.py
def build_doctolib_patient_id_to_user_id_lookup(
    predictable_rows: list[dict[str, Any]],
) -> dict[str, str]:
    """Build doctolib_patient_id -> user_id from predictable gsheet rows.

    Only includes entries where helper_docto_patient_id maps to exactly 1 distinct user_id.
    """
    grouped: dict[str, set[str]] = defaultdict(set)
    for row in predictable_rows:
        patient_id = row.get("helper_docto_patient_id")
        user_id = row.get("user_id")
        if patient_id and user_id:
            grouped[patient_id].add(user_id)

    return {
        patient_id: next(iter(user_ids))
        for patient_id, user_ids in grouped.items()
        if len(user_ids) == 1
    }

categorize_matched_rows

categorize_matched_rows(
    matched_rows,
    unmatched_csv_rows,
    predictable_rows,
    on_demand_rows,
)

Categorize matched rows into buckets — strict step_28 transposition.

Mirrors automation step_28 JavaScript exactly: 1. Build predictableById, rescheduleCheckById maps 2. Loop doctoExportData: reschedule check → newly_scheduled → new_rows 3. Detect cancellations 4. Filter new_rows with ErrorFilterSheet

Note: on-demand filtering must be done BEFORE calling this function via filter_on_demand_visits(). matched_rows and unmatched_csv_rows should already be filtered.

Source code in components/occupational_health/internal/business_logic/doctolib/categorizer.py
def categorize_matched_rows(
    matched_rows: list[MatchedRow],
    unmatched_csv_rows: list[DoctolibCsvRow],
    predictable_rows: list[dict[str, Any]],
    on_demand_rows: list[dict[str, Any]],
) -> MatchingV2Response:
    """Categorize matched rows into buckets — strict step_28 transposition.

    Mirrors automation step_28 JavaScript exactly:
    1. Build predictableById, rescheduleCheckById maps
    2. Loop doctoExportData: reschedule check → newly_scheduled → new_rows
    3. Detect cancellations
    4. Filter new_rows with ErrorFilterSheet

    Note: on-demand filtering must be done BEFORE calling this function
    via filter_on_demand_visits(). matched_rows and unmatched_csv_rows
    should already be filtered.
    """
    # Build predictableById (step_12 → step_28)
    match_rows_by_uid: dict[str, dict[str, Any]] = {}
    # Build rescheduleCheckById (step_29 → step_28)
    reschedule_rows_by_uid: dict[str, dict[str, Any]] = {}
    for row in predictable_rows:
        uid = row.get("user_id")
        if uid:
            if _is_truthy(row.get("helper_looking_for_match")):
                match_rows_by_uid[str(uid)] = row
            if _is_truthy(row.get("helper_looking_for_rescheduling")):
                reschedule_rows_by_uid[str(uid)] = row

    # On-demand rows flagged with helper_docto_scan=1 have already been processed
    # by a previous doctolib scan. We exclude them from new_rows to avoid duplicates.
    # This is the same code than in the step_28 of the automation
    error_filter_set: set[str] = set()
    for row in on_demand_rows:
        if _is_truthy(row.get("helper_docto_scan")):
            email = str(row.get("email", "")).strip().lower()
            date_val = normalize_date(str(row.get("date_planned", "")))
            hour_val = normalize_time_for_comparison(str(row.get("hour_booked", "")))
            if email and date_val and hour_val:
                error_filter_set.add(f"{date_val}|{hour_val}|{email}")

    # Main loop: categorize (mirrors step_28 doctoExportData.forEach)
    csv_user_ids: set[str] = set()
    newly_scheduled: list[NewlyScheduledItem] = []
    rescheduling: list[ReschedulingItem] = []
    new_rows_unfiltered: list[NewRowItem] = []

    for m in matched_rows:
        uid_str = str(m.user_id)
        csv_user_ids.add(uid_str)
        csv_row = m.csv_row

        # 1) Check reschedule first (blocks newly_scheduled)
        if uid_str in reschedule_rows_by_uid:
            gsheet_row = reschedule_rows_by_uid[uid_str]
            old_date = normalize_date(str(gsheet_row.get("date_planned", "")))
            old_hour = normalize_time_for_comparison(
                str(gsheet_row.get("hour_booked", ""))
            )
            old_hp = str(gsheet_row.get("hp_visit_lead", "")).strip()
            new_date = normalize_date(csv_row.appointment_date)
            new_hour = normalize_time_for_comparison(csv_row.appointment_start)
            new_hp = csv_row.agenda

            # If the date, hour_booked or hp changed, we have to reschedule
            if old_date != new_date or old_hour != new_hour or old_hp != new_hp:
                rescheduling.append(
                    ReschedulingItem(
                        gsheet_row_index=gsheet_row.get("_row_index", 0),
                        user_id=uid_str,
                        first_name=csv_row.first_name,
                        last_name=csv_row.last_name,
                        old_date=old_date,
                        old_hour=old_hour,
                        old_hp=old_hp,
                        new_date=new_date,
                        new_hour=new_hour,
                        new_hp=new_hp,
                        hour_end=csv_row.appointment_end,
                        consultation_type=csv_row.consultation_type,
                        docto_patient_id=csv_row.doctolib_patient_id,
                        match_priority=m.priority,
                    )
                )
            continue

        # 2) Check newly_scheduled
        if uid_str in match_rows_by_uid:
            gsheet_row = match_rows_by_uid[uid_str]
            newly_scheduled.append(
                NewlyScheduledItem(
                    gsheet_row_index=gsheet_row.get("_row_index", 0),
                    user_id=uid_str,
                    first_name=csv_row.first_name,
                    last_name=csv_row.last_name,
                    doctolib_email=csv_row.email,
                    doctolib_phone=csv_row.phone,
                    doctolib_birth_date=csv_row.birth_date,
                    hp=csv_row.agenda,
                    consultation_type=csv_row.consultation_type,
                    date_planned=csv_row.appointment_date,
                    hour_start=csv_row.appointment_start,
                    hour_end=csv_row.appointment_end,
                    docto_patient_id=csv_row.doctolib_patient_id,
                    match_priority=m.priority,
                )
            )
            continue

        # 3) Not in match/reschedule → new_rows (before ErrorFilterSheet)
        new_rows_unfiltered.append(
            NewRowItem(
                first_name=csv_row.first_name,
                last_name=csv_row.last_name,
                email=csv_row.email,
                phone=csv_row.phone,
                birth_date=csv_row.birth_date,
                consultation_type=csv_row.consultation_type,
                date_planned=csv_row.appointment_date,
                hour_start=csv_row.appointment_start,
                hour_end=csv_row.appointment_end,
                hp=csv_row.agenda,
                docto_patient_id=csv_row.doctolib_patient_id,
                match_priority=m.priority,
            )
        )

    # Add unmatched rows to new_rows
    for csv_row in unmatched_csv_rows:
        new_rows_unfiltered.append(
            NewRowItem(
                first_name=csv_row.first_name,
                last_name=csv_row.last_name,
                email=csv_row.email,
                phone=csv_row.phone,
                birth_date=csv_row.birth_date,
                consultation_type=csv_row.consultation_type,
                date_planned=csv_row.appointment_date,
                hour_start=csv_row.appointment_start,
                hour_end=csv_row.appointment_end,
                hp=csv_row.agenda,
                docto_patient_id=csv_row.doctolib_patient_id,
                match_priority=0,
            )
        )

    # ErrorFilterSheet: filter new_rows (step_28 filteredNewRows)
    new_rows: list[NewRowItem] = []
    error_filtered_count = 0
    for nr in new_rows_unfiltered:
        email = nr.email.strip().lower()
        date_val = nr.date_planned
        hour_val = normalize_time_for_comparison(nr.hour_start)
        key = f"{date_val}|{hour_val}|{email}"
        if key in error_filter_set:
            error_filtered_count += 1
            continue
        new_rows.append(nr)

    # Cancelled: reschedule users absent from CSV
    cancelled: list[CancelledItem] = []
    for uid, gsheet_row in reschedule_rows_by_uid.items():
        if uid not in csv_user_ids:
            cancelled.append(
                CancelledItem(
                    gsheet_row_index=gsheet_row.get("_row_index", 0),
                    user_id=uid,
                    first_name=str(gsheet_row.get("first_name", "")),
                    last_name=str(gsheet_row.get("last_name", "")),
                    date_planned=str(gsheet_row.get("date_planned", "")),
                )
            )

    return MatchingV2Response(
        newly_scheduled=newly_scheduled,
        rescheduling=rescheduling,
        cancelled=cancelled,
        new_rows=new_rows,
        unmatched=[],
        stats={
            "total_csv_rows": len(matched_rows) + len(unmatched_csv_rows),
            "matched": len(matched_rows),
            "unmatched": len(unmatched_csv_rows),
            "error_filter_removed": error_filtered_count,
        },
    )

filter_on_demand_visits

filter_on_demand_visits(
    matched_rows, unmatched_csv_rows, on_demand_rows
)

Filter out CSV rows that already have an on-demand visit booked.

Mirrors the Metabase SQL LEFT JOIN filter: removes rows where the user+date already exists in the on-demand gsheet tab (future dates only). Must be called BEFORE categorize_matched_rows, just like the SQL does before step_28.

Source code in components/occupational_health/internal/business_logic/doctolib/categorizer.py
def filter_on_demand_visits(
    matched_rows: list[MatchedRow],
    unmatched_csv_rows: list[DoctolibCsvRow],
    on_demand_rows: list[dict[str, Any]],
) -> tuple[list[MatchedRow], list[DoctolibCsvRow]]:
    """Filter out CSV rows that already have an on-demand visit booked.

    Mirrors the Metabase SQL LEFT JOIN filter: removes rows where the user+date
    already exists in the on-demand gsheet tab (future dates only).
    Must be called BEFORE categorize_matched_rows, just like the SQL does before step_28.
    """
    today = utctoday()
    on_demand_keys: set[tuple[str, str]] = set()
    for row in on_demand_rows:
        uid = str(row.get("user_id", "")).strip()
        date_planned = str(row.get("date_planned", "")).strip()
        if uid and date_planned:
            parsed = norm_birth_date(date_planned)
            if parsed:
                try:
                    d = datetime.strptime(parsed, "%Y-%m-%d").date()
                    if d >= today:
                        on_demand_keys.add((uid, parsed))
                except ValueError:
                    pass

    filtered_matched = [
        m
        for m in matched_rows
        if (m.user_id, norm_birth_date(m.csv_row.appointment_date))
        not in on_demand_keys
    ]

    # Unmatched rows have no user_id so they can't be filtered by on-demand keys.
    return filtered_matched, unmatched_csv_rows

find_candidates_for_new_rows

find_candidates_for_new_rows(new_rows)

Find candidate users for new_rows so Ops can pick the right one in the UI.

Lightweight search: only returns basic user info, no SSN/employment enrichment. Mutates each NewRowItem in place by populating its candidates list.

Source code in components/occupational_health/internal/business_logic/doctolib/queries.py
def find_candidates_for_new_rows(new_rows: list[NewRowItem]) -> None:
    """Find candidate users for new_rows so Ops can pick the right one in the UI.

    Lightweight search: only returns basic user info, no SSN/employment enrichment.
    Mutates each NewRowItem in place by populating its candidates list.
    """
    for item in new_rows:
        results = perform_search(
            f"{item.first_name} {item.last_name}",
            search_for_users=True,
        )
        item.candidates = [
            {
                "user_id": str(user.id),
                "first_name": user.first_name,
                "last_name": user.last_name,
                "email": user.email,
                "birth_date": str(user.birth_date) if user.birth_date else "",
            }
            for user in results.users
        ]

get_all_occupational_health_profiles_with_user_data

get_all_occupational_health_profiles_with_user_data()
Source code in components/occupational_health/internal/business_logic/doctolib/queries.py
def get_all_occupational_health_profiles_with_user_data() -> list[dict[str, Any]]:
    from components.fr.internal.models.user import User  # noqa: ALN069

    profile_ids = (
        current_session.execute(select(OccupationalHealthProfile.id)).scalars().all()
    )

    if not profile_ids:
        return []
    user_id_by_profile_id = get_user_ids_by_occupational_health_profile_ids_mapping(
        profile_ids=profile_ids
    )
    user_ids = set(user_id_by_profile_id.values())
    users = (
        current_session.execute(select(User).where(User.id.in_(user_ids)))
        .scalars()
        .all()
    )

    return [
        {
            "user_id": str(user.id),
            "email": user.email,
            "first_name": user.first_name,
            "last_name": user.last_name,
            "birth_date": user.birth_date,
            "birth_name": None,
        }
        for user in users
    ]

match_all_csv_rows

match_all_csv_rows(csv_rows, profiles, doctolib_lookup)

Match all CSV rows to Alan users.

Each row goes through an 8-priority cascade (see match_csv_row_to_user). Lower priority number = higher confidence match. If the best priority yields multiple distinct user_ids, the row is marked as ambiguous.

Logs group counts so results can be compared with the original SQL query.

Source code in components/occupational_health/internal/business_logic/doctolib/matching_engine.py
def match_all_csv_rows(
    csv_rows: list[DoctolibCsvRow],
    profiles: list[dict[str, Any]],
    doctolib_lookup: dict[str, str],
) -> tuple[list[MatchedRow], list[DoctolibCsvRow]]:
    """Match all CSV rows to Alan users.

    Each row goes through an 8-priority cascade (see match_csv_row_to_user).
    Lower priority number = higher confidence match. If the best priority
    yields multiple distinct user_ids, the row is marked as ambiguous.

    Logs group counts so results can be compared with the original SQL query.
    """
    indexes = build_profile_indexes(profiles)
    matched: list[MatchedRow] = []
    unmatched: list[DoctolibCsvRow] = []
    priority_counts: dict[int, int] = defaultdict(int)
    group_counts: dict[str, int] = defaultdict(int)

    for row in csv_rows:
        result = match_csv_row_to_user(row, indexes, doctolib_lookup)
        priority_counts[result.priority] += 1

        group = _PRIORITY_TO_LOG_GROUP.get(result.priority)
        if group:
            group_counts[group] += 1

        if result.user_id:
            matched.append(
                MatchedRow(
                    csv_row=row,
                    user_id=result.user_id,
                    priority=result.priority,
                    strategy=result.strategy,
                )
            )
        else:
            unmatched.append(row)

    return matched, unmatched

normalize_doctolib_csv_rows

normalize_doctolib_csv_rows(raw_rows)

Normalize raw CSV dicts into typed DoctolibCsvRow objects.

Full normalization pipeline: - Map French headers → English field names - Normalize dates (DD/MM/YYYY) and times (HH:MM:SS) - Derive consultation_type from motif - Compute appointment_end = start + duration

Source code in components/occupational_health/internal/business_logic/doctolib/csv_normalizer.py
def normalize_doctolib_csv_rows(
    raw_rows: list[dict[str, str]],
) -> list[DoctolibCsvRow]:
    """Normalize raw CSV dicts into typed DoctolibCsvRow objects.

    Full normalization pipeline:
    - Map French headers → English field names
    - Normalize dates (DD/MM/YYYY) and times (HH:MM:SS)
    - Derive consultation_type from motif
    - Compute appointment_end = start + duration
    """
    results: list[DoctolibCsvRow] = []

    for idx, raw in enumerate(raw_rows):
        # Map French headers → English keys
        mapped: dict[str, str] = {}
        for french_key, english_key in DOCTOLIB_HEADER_MAP.items():
            mapped[english_key] = raw.get(french_key, "").strip()

        # Normalize date and time fields
        mapped["birth_date"] = normalize_date(mapped.get("birth_date", ""))
        mapped["appointment_date"] = normalize_date(mapped.get("appointment_date", ""))
        mapped["appointment_start"] = normalize_time(
            mapped.get("appointment_start", "")
        )

        # Derive consultation_type from motif
        consultation_type = _derive_consultation_type(mapped.get("motif", ""))

        # Compute end time from start + duration
        mapped["appointment_end"] = _compute_end_time(
            mapped["appointment_start"],
            mapped.get("duration", ""),
        )

        results.append(
            DoctolibCsvRow(
                csv_row_index=idx,
                doctolib_patient_id=mapped.get("doctolib_patient_id", ""),
                first_name=mapped.get("first_name", ""),
                last_name=mapped.get("last_name", ""),
                birth_name=mapped.get("birth_name", ""),
                birth_date=mapped.get("birth_date", ""),
                email=mapped.get("email", ""),
                phone=mapped.get("phone", ""),
                appointment_date=mapped.get("appointment_date", ""),
                appointment_start=mapped.get("appointment_start", ""),
                appointment_end=mapped.get("appointment_end", ""),
                duration=mapped.get("duration", ""),
                agenda=mapped.get("agenda", ""),
                motif=mapped.get("motif", ""),
                consultation_type=consultation_type,
                id=mapped.get("id", ""),
                notes=mapped.get("notes", ""),
                date_saisie=mapped.get("date_saisie", ""),
                date_derniere_mise_a_jour=mapped.get("date_derniere_mise_a_jour", ""),
                cree_par=mapped.get("cree_par", ""),
                statut=mapped.get("statut", ""),
                rdv_internet=mapped.get("rdv_internet", ""),
                nouveau_patient=mapped.get("nouveau_patient", ""),
                honoraires_cb=mapped.get("honoraires_cb", ""),
                honoraires_especes=mapped.get("honoraires_especes", ""),
                honoraires_cheques=mapped.get("honoraires_cheques", ""),
                honoraires_tiers_payant=mapped.get("honoraires_tiers_payant", ""),
                honoraires_total_regle=mapped.get("honoraires_total_regle", ""),
                honoraires_restant_a_regler=mapped.get(
                    "honoraires_restant_a_regler", ""
                ),
                agenda_ressource=mapped.get("agenda_ressource", ""),
                civilite=mapped.get("civilite", ""),
                adresse=mapped.get("adresse", ""),
                code_postal=mapped.get("code_postal", ""),
                ville=mapped.get("ville", ""),
                heure_arrivee=mapped.get("heure_arrivee", ""),
                heure_prise_en_charge=mapped.get("heure_prise_en_charge", ""),
                heure_depart=mapped.get("heure_depart", ""),
                symptomes_covid19=mapped.get("symptomes_covid19", ""),
                identifiant_externe=mapped.get("identifiant_externe", ""),
                temps_reservation_secondes=mapped.get("temps_reservation_secondes", ""),
                a_ete_importe=mapped.get("a_ete_importe", ""),
                a_ete_reserve_absence=mapped.get("a_ete_reserve_absence", ""),
                a_ete_reserve_creneau_deja_reserve=mapped.get(
                    "a_ete_reserve_creneau_deja_reserve", ""
                ),
                a_ete_reserve_hors_horaires=mapped.get(
                    "a_ete_reserve_hors_horaires", ""
                ),
                dispositif_reservation_patients=mapped.get(
                    "dispositif_reservation_patients", ""
                ),
                personne_referente=mapped.get("personne_referente", ""),
                patient_notification_consent=mapped.get(
                    "patient_notification_consent", ""
                ),
                patient_legal_gender=mapped.get("patient_legal_gender", ""),
                telephone_secondaire=mapped.get("telephone_secondaire", ""),
                patient_insurance_sector=mapped.get("patient_insurance_sector", ""),
            )
        )

    return results

parse_doctolib_csv

parse_doctolib_csv(csv_content)

Parse semicolon-separated Doctolib CSV export.

Handles BOM (byte order mark) and strips whitespace from headers.

Source code in components/occupational_health/internal/business_logic/doctolib/csv_normalizer.py
def parse_doctolib_csv(csv_content: str) -> list[dict[str, str]]:
    """Parse semicolon-separated Doctolib CSV export.

    Handles BOM (byte order mark) and strips whitespace from headers.
    """
    # Strip BOM if present (common in Windows-exported CSVs)
    cleaned = csv_content.lstrip("\ufeff")
    reader = csv.DictReader(io.StringIO(cleaned), delimiter=";")
    # Strip whitespace from headers to avoid key mismatch
    if reader.fieldnames:
        reader.fieldnames = [f.strip() for f in reader.fieldnames]
    return list(reader)

read_gsheet_visit_rows

read_gsheet_visit_rows()

Read all rows from on-demand and predictable visit gsheets.

Source code in components/occupational_health/internal/business_logic/doctolib/queries.py
def read_gsheet_visit_rows() -> dict[str, list[dict[str, Any]]]:
    """Read all rows from on-demand and predictable visit gsheets."""
    from components.occupational_health.internal.helpers.gsheet_service import (
        OccupationalHealthGSheetService,
        SheetType,
    )

    spreadsheets_service = SpreadsheetsService.get(
        credentials_config_key="GOOGLE_GSPREAD_SERVICE_ACCOUNT_SECRET_NAME"
    )

    results: dict[str, list[dict[str, Any]]] = {}
    for sheet_type in (SheetType.ON_DEMAND, SheetType.PREDICTABLE):
        service = OccupationalHealthGSheetService(spreadsheets_service, sheet_type)
        rows = service.read_rows()
        results[sheet_type.value] = [
            {**row.data, "_row_index": row.row_index} for row in rows
        ]

    return results

doctolib_user_matching_v1

Queries for Occupational Health doctolib export matching tool in Marmot

TODO @david.barthelemy: clean this after the visit management tool v2 is released

find_matching_candidates

find_matching_candidates(search_row)

Find all matching candidates for a user from the doctolib export

Source code in components/occupational_health/public/queries/doctolib_user_matching_v1.py
def find_matching_candidates(
    search_row: UserVisitData,
) -> list[UserVisitData]:
    """
    Find all matching candidates for a user from the doctolib export
    """
    dependency = get_app_dependency()

    # Use the search from Marmot (because Ops do this hy hand)
    results = perform_search(
        f"{search_row.first_name} {search_row.last_name}",
        search_for_users=True,
    )

    users = results.users
    # Build candidates. It's k to live with N+1 queries for now regarding the very low volume
    candidates: list[UserVisitData] = []
    for user in users:
        data = asdict(search_row)
        user_id = UserId(str(user.id))
        ssn, _ = dependency.get_ssn_and_ntt_for_user(user_id)
        try:
            occupational_health_profile = get_occupational_health_profile_or_none(
                user_id
            )
        except UserIdNotFound:
            current_logger.error(
                f"[Occupational Health] Cannot match {user_id} because no global profile"
            )
            occupational_health_profile = None
        if not occupational_health_profile:
            continue
        active_affiliations = get_active_affiliations(
            occupational_health_profile_id=occupational_health_profile.id,
            on_date=utctoday(),
        )

        account_ids = {
            build_account_id_from_subscription_ref(
                mandatory(affiliation.subscription_ref)
            )
            for affiliation in active_affiliations
        }
        employments_by_user_id = get_employments_by_user_id(
            account_ids=account_ids,
            user_ids={user_id},
            should_overlap_prevenir_contract=False,
        )

        employments_for_user = []
        if user_id in employments_by_user_id:
            employments_for_user = [
                ep for ep in employments_by_user_id[user_id] if not ep.is_cancelled
            ]
        if len(employments_for_user) != 1:
            # We don't override the account_id because it's linked to multiple account. It will be investigated by hand from Ops
            # TODO @david.barthelemy: Send accounts as array and let Ops select the right one
            data.update(
                {
                    "user_id": user_id,
                    "INS_number": ssn,
                    "employment_start_date": None,
                    "account_id": None,
                    "company_id": None,
                    "account_name": None,
                    "email": user.email,
                }
            )
        else:
            employment = employments_for_user[0]
            company_id = employment.company_id
            account_id = dependency.get_account_id(CompanyId(company_id))
            account_name = dependency.get_account_name(account_id=account_id)
            data.update(
                {
                    "user_id": user_id,
                    "INS_number": ssn,
                    "employment_start_date": _format_date_french(employment.start_date),
                    "account_id": account_id,
                    "company_id": company_id,
                    "account_name": account_name,
                    "email": user.email,
                }
            )
        candidates.append(UserVisitData.from_dict(data))

    return candidates

parse_rows_from_gsheet_data

parse_rows_from_gsheet_data(tsv_data)

Parse TSV data from spreadsheet into MatchingUserVisitData objects

Source code in components/occupational_health/public/queries/doctolib_user_matching_v1.py
def parse_rows_from_gsheet_data(tsv_data: str) -> list[UserVisitData]:
    """Parse TSV data from spreadsheet into MatchingUserVisitData objects"""
    # TODO @david.barthelemy Use DictReader instead of home-made parsing
    rows: list[UserVisitData] = []

    lines = tsv_data.strip().split("\n")
    if not lines:
        return rows

    header = lines[0]
    headers = [h.strip() for h in header.split("\t")]

    if "first_name" not in (h.lower() for h in headers):
        raise ValueError("Headers must be set")

    columns_number = len(headers)

    for line in lines[1:]:
        if not line.strip():
            continue
        parts = line.split("\t")
        len_parts = len(parts)

        data = {
            headers[i]: parts[i].strip() if i < len_parts else None
            for i in range(columns_number)
        }
        rows.append(UserVisitData(**data))
    return rows

search_users_matching_candidates

search_users_matching_candidates(rows)

Search for matching candidates for each row

Source code in components/occupational_health/public/queries/doctolib_user_matching_v1.py
def search_users_matching_candidates(
    rows: list[UserVisitData],
) -> list[MatchingResult]:
    """Search for matching candidates for each row"""
    results = []

    for row in rows:
        candidates = find_matching_candidates(
            search_row=row,
        )

        results.append(
            MatchingResult(
                input_row=row,
                candidates=candidates,
            )
        )

    return results

has_active_occupational_health_contract

has_active_occupational_health_contract

has_active_occupational_health_contract(account_id)

Return whether the given account ID has an active occupational health contract at the moment.

Parameters:

Name Type Description Default
account_id UUID

The ID of the account to check.

required

Returns:

Name Type Description
bool bool

True if the account has an active occupational health contract, False otherwise.

Source code in components/occupational_health/public/queries/has_active_occupational_health_contract.py
def has_active_occupational_health_contract(account_id: UUID) -> bool:
    """
    Return whether the given account ID has an active occupational health contract at the moment.

    Args:
        account_id: The ID of the account to check.

    Returns:
        bool: True if the account has an active occupational health contract, False otherwise.
    """
    from components.occupational_health.internal.business_logic.contracting.queries.helpers import (
        has_currently_active_contract as _has_active_contract,
    )

    return _has_active_contract(AccountId(account_id))

has_active_or_upcoming_occupational_health_contract

has_active_or_upcoming_occupational_health_contract(
    account_id,
)

Return whether the given account ID has an active or upcoming occupational health contract.

Parameters:

Name Type Description Default
account_id UUID

The ID of the account to check.

required

Returns: bool: True if the account has an active or upcoming occupational health contract, False otherwise.

Source code in components/occupational_health/public/queries/has_active_occupational_health_contract.py
def has_active_or_upcoming_occupational_health_contract(account_id: UUID) -> bool:
    """
    Return whether the given account ID has an active or upcoming occupational health contract.

    Args:
        account_id: The ID of the account to check.
    Returns:
        bool: True if the account has an active or upcoming occupational health contract, False otherwise.
    """
    from components.occupational_health.internal.business_logic.contracting.queries.helpers import (
        has_currently_active_or_upcoming_contract as _has_active_or_upcoming_contract,
    )

    return _has_active_or_upcoming_contract(AccountId(account_id))

health_professional

get_or_raise_health_professional

get_or_raise_health_professional(health_professional_id)

Get or raise health professional by ID.

Source code in components/occupational_health/internal/queries/health_professional.py
def get_or_raise_health_professional(
    health_professional_id: UUID,
) -> HealthProfessional:
    """
    Get or raise health professional by ID.
    """
    health_professional = (
        current_session.execute(
            select(OccupationalHealthHealthProfessional).where(
                OccupationalHealthHealthProfessional.id == health_professional_id
            )
        )
        .scalars()
        .one()
    )

    return HealthProfessional(
        id=health_professional.id,
        first_name=health_professional.first_name,
        last_name=health_professional.last_name,
        role=health_professional.role,
    )

invoices

Public interface for invoice queries.

Re-exports internal query functions for use by external components.

get_invoice_appendix_employee_data_for_profile_ids

get_invoice_appendix_employee_data_for_profile_ids(
    profile_ids,
)

Returns the data needed to build the invoice appendix for each employee per profile id.

:param profile_ids: iterable of profile ids :return: dict of InvoiceAppendixEmployeeData per profile id

Source code in components/occupational_health/public/queries/invoices.py
def get_invoice_appendix_employee_data_for_profile_ids(
    profile_ids: Iterable[ProfileId],
) -> dict[ProfileId, InvoiceAppendixEmployeeData]:
    """
    Returns the data needed to build the invoice appendix for each employee per profile id.

    :param profile_ids: iterable of profile ids
    :return: dict of InvoiceAppendixEmployeeData per profile id
    """
    # get global profiles
    global_profile_id_per_profile_id = (
        get_global_profile_ids_by_occupational_health_profile_id_mapping(
            profile_ids=profile_ids
        )
    )
    global_profiles = ProfileService.create().get_profiles(
        profile_ids=global_profile_id_per_profile_id.values()
    )
    global_profile_by_global_profile_id = {p.id: p for p in global_profiles}

    # get ssn and ntt
    user_id_by_profile = get_user_ids_by_occupational_health_profile_ids_mapping(
        profile_ids
    )
    dependency = get_app_dependency()
    ssn_and_ntt_by_user_id = dependency.get_ssn_and_ntt_for_users(
        user_id_by_profile.values()
    )

    result: dict[ProfileId, InvoiceAppendixEmployeeData] = {}
    for profile_id in profile_ids:
        global_profile_id = global_profile_id_per_profile_id.get(profile_id)
        if global_profile_id is None:
            current_logger.warning(
                "No global profile id found for occupational health profile id",
                occupational_health_profile_id=str(profile_id),
            )
            result[profile_id] = InvoiceAppendixEmployeeData()
            continue
        global_profile = global_profile_by_global_profile_id.get(global_profile_id)
        if global_profile is None:
            current_logger.warning(
                "No global profile found for global profile id",
                profile_id=str(global_profile_id),
                occupational_health_profile_id=str(profile_id),
            )
            result[profile_id] = InvoiceAppendixEmployeeData()
            continue

        user_id = user_id_by_profile.get(profile_id)
        if user_id is None:
            current_logger.warning(
                "No user id found for occupational health profile id",
                occupational_health_profile_id=str(profile_id),
            )
            result[profile_id] = InvoiceAppendixEmployeeData()
            continue

        (ssn, ntt) = ssn_and_ntt_by_user_id.get(user_id, (None, None))

        result[profile_id] = InvoiceAppendixEmployeeData(
            first_name=global_profile.first_name,
            last_name=global_profile.last_name,
            ssn=ssn,
            ntt=ntt,
        )

    return result

get_invoice_file_for_account

get_invoice_file_for_account(
    account_id, invoice_id, file_type="invoice"
)

Get file data for a specific invoice that belongs to the given account.

Parameters:

Name Type Description Default
account_id AccountId

The account ID to verify ownership

required
invoice_id int

The invoice ID to fetch

required
file_type Literal['invoice', 'appendix']

Which file to return — "invoice" for the PDF, "appendix" for the CSV

'invoice'

Returns:

Type Description
InvoiceFileData

InvoiceFileData with the filename and raw file

Raises:

Type Description
missing_resource

If invoice not found or doesn't belong to account

remote_file_retrieval

If the requested file is not attached

Source code in components/occupational_health/internal/queries/invoices.py
def get_invoice_file_for_account(
    account_id: AccountId,
    invoice_id: int,
    file_type: Literal["invoice", "appendix"] = "invoice",
) -> InvoiceFileData:
    """
    Get file data for a specific invoice that belongs to the given account.

    Args:
        account_id: The account ID to verify ownership
        invoice_id: The invoice ID to fetch
        file_type: Which file to return — "invoice" for the PDF, "appendix" for the CSV

    Returns:
        InvoiceFileData with the filename and raw file

    Raises:
        BaseErrorCode.missing_resource: If invoice not found or doesn't belong to account
        BaseErrorCode.remote_file_retrieval: If the requested file is not attached
    """
    from components.fr.internal.billing.models.invoice import Invoice  # noqa: ALN069

    billed_entities = get_billed_entities_for_account(account_id=account_id)
    if len(billed_entities) == 0:
        raise BaseErrorCode.missing_resource(
            message="No billed entities found for account"
        )

    # Get all contract refs for this account
    contract_refs = [billed_entity.contract_ref for billed_entity in billed_entities]
    # Query the invoice, ensuring it belongs to the account
    invoice: Invoice | None = current_session.execute(
        select(Invoice).where(
            Invoice.id == invoice_id,
            Invoice.contract_type == ContractType.occupational_health,
            Invoice.contract_ref.in_(contract_refs),
        )
    ).scalar_one_or_none()

    if invoice is None:
        raise BaseErrorCode.missing_resource(message="No invoice found for account")

    full_date = invoice.event_date.strftime("%Y%m%d")

    if file_type == "appendix":
        if invoice.appendix_uri is None:
            raise BaseErrorCode.remote_file_retrieval(
                message="Invoice has no attached appendix"
            )
        filename = f"Prévenir {full_date} - Annexe Médecine du travail - {invoice.invoice_number}.csv"
        return InvoiceFileData(
            filename=filename,
            file=invoice.get_or_download_file(
                file_field="appendix_file", uri_field="appendix_uri"
            ),
        )

    if invoice.uri is None:
        raise BaseErrorCode.remote_file_retrieval(message="Invoice has no attached pdf")

    filename = f"Prévenir - Facture Médecine du travail - {full_date} - {invoice.invoice_number}.pdf"

    return InvoiceFileData(
        filename=filename,
        file=invoice.get_or_download_file(),
    )

get_issued_invoices_for_account

get_issued_invoices_for_account(account_id)

Get all actual issued invoices for all billed entities of an account.

Parameters:

Name Type Description Default
account_id UUID

The account ID to fetch invoices for

required

Returns:

Type Description
list[IssuedInvoiceData]

List of IssuedInvoiceData for all invoices linked to this account's billed entities,

list[IssuedInvoiceData]

sorted by event_date descending.

Source code in components/occupational_health/internal/queries/invoices.py
def get_issued_invoices_for_account(account_id: UUID) -> list[IssuedInvoiceData]:
    """
    Get all actual issued invoices for all billed entities of an account.

    Args:
        account_id: The account ID to fetch invoices for

    Returns:
        List of IssuedInvoiceData for all invoices linked to this account's billed entities,
        sorted by event_date descending.
    """
    billed_entities = get_billed_entities_for_account(account_id=account_id)

    if not billed_entities:
        return []

    # Build a mapping of contract_ref -> billed entity info
    entity_info_by_contract_ref: dict[str, dict[str, str | None]] = {}
    contract_refs: list[str] = []
    for entity in billed_entities:
        contract_ref = entity.contract_ref
        contract_refs.append(contract_ref)
        entity_info_by_contract_ref[contract_ref] = {
            "entity_name": entity.entity_name,
            "siren": entity.siren,
            "siret": entity.siret,
        }

    # Query invoices for these contract refs
    from components.fr.internal.billing.models.invoice import Invoice  # noqa: ALN069

    invoices_query = (
        select(Invoice)
        .where(
            Invoice.contract_type == ContractType.occupational_health,
            Invoice.contract_ref.in_(contract_refs),
        )
        .order_by(Invoice.event_date.desc())
    )
    invoices = current_session.execute(invoices_query).scalars().all()

    # Build response dataclass list
    result: list[IssuedInvoiceData] = []
    for invoice in invoices:
        entity_info = entity_info_by_contract_ref.get(invoice.contract_ref, {})
        result.append(
            IssuedInvoiceData(
                invoice_id=invoice.id,
                invoice_number=invoice.invoice_number,
                event_date=invoice.event_date,
                issued_date=invoice.issued_date,
                due_date=invoice.due_date,
                total_invoice_amount=invoice.total_invoice_amount,
                remaining_balance=invoice.remaining_balance,
                contract_ref=invoice.contract_ref,
                entity_name=entity_info.get("entity_name") or "",
                siren=entity_info.get("siren") or "",
                siret=entity_info.get("siret"),
                has_appendix=invoice.appendix_uri is not None,
                email_sent_at=invoice.email_sent_at,
            )
        )

    return result

is_currently_affiliated_to_prevenir

is_currently_affiliated_to_occupational_health

is_currently_affiliated_to_occupational_health(
    global_profile_id, account_id, commit=True
)

Check if the user is currently affiliated to Prevenir.

Will create a OccupationalHealthProfile if it doesn't exist, hence the commit parameter.

Returns True if the user has active affiliations, otherwise False.

Source code in components/occupational_health/public/queries/is_currently_affiliated_to_prevenir.py
def is_currently_affiliated_to_occupational_health(
    global_profile_id: GlobalProfileId | UUID,
    account_id: AccountId | UUID,
    commit: bool = True,
) -> bool:
    """
    Check if the user is currently affiliated to Prevenir.

    Will create a OccupationalHealthProfile if it doesn't exist, hence the commit parameter.

    Returns True if the user has active affiliations, otherwise False.
    """
    global_profile_id = GlobalProfileId(global_profile_id)
    account_id = AccountId(account_id)

    occupational_health_profile_id = get_or_create_profile_id_from_global_id(
        global_profile_id,
        commit=commit,
    )

    # If you pass an user (a global profile ID) and an account ID, you can get at most one affiliation.
    active_affiliations = get_active_affiliations(
        occupational_health_profile_id=occupational_health_profile_id,
        for_account_id=account_id,
    )

    if active_affiliations:
        return True

    return False

profiles

get_profile_dmst

get_profile_dmst(
    occupational_health_profile_id, profile_service
)

Returns the member information for a given occupational health profile ID.

Source code in components/occupational_health/internal/business_logic/queries/profiles/profiles.py
@inject_profile_service
def get_profile_dmst(
    occupational_health_profile_id: UUID, profile_service: ProfileService
) -> Dmst:
    """
    Returns the member information for a given occupational health profile ID.
    """

    occupational_health_profile = current_session.get_one(
        OccupationalHealthProfile,
        occupational_health_profile_id,
        options=[joinedload(OccupationalHealthProfile.employment_data)],
    )
    profile = profile_service.get_or_raise_profile(
        occupational_health_profile.global_profile_id
    )
    jobs = OccupationalHealthJobBroker.get_existing_jobs(
        occupational_health_profile_id
    ).all()
    administrative_profile = (
        OccupationalHealthAdministrativeProfileBroker.get_by_profile(
            ProfileId(occupational_health_profile_id)
        ).one_or_none()
    )

    # Temporary getting the SSN while we implement the INS clinic side
    user_id = get_app_dependency().get_user_id_from_global_profile_id(
        GlobalProfileId(profile.id)
    )
    ssn, _ = get_app_dependency().get_ssn_and_ntt_for_user(user_id)

    personal_email = occupational_health_profile.personal_email
    phone_number = occupational_health_profile.phone_number

    latest_employment_data = (
        occupational_health_profile.employment_data[-1]
        if occupational_health_profile.employment_data
        else None
    )
    professional_email = (
        latest_employment_data.professional_email if latest_employment_data else None
    )

    # Get Google drive folder ID from constant
    # This should be updated in the future to a more dynamic approach
    google_drive_folder_id = (
        ACCOUNT_GOOGLE_DRIVE_FOLDER.get(latest_employment_data.account_id)
        if latest_employment_data
        else None
    )

    # If we do have an account ID, but we can't find a Google drive folder for it, log it
    # Don't log if we don't have the latest employment data or if no account ID is linked to it
    if (
        latest_employment_data and latest_employment_data.account_id
    ) and not google_drive_folder_id:
        current_logger.error(
            "Could not find Google Drive folder ID for account",
            account_id=latest_employment_data.account_id,
        )

    return Dmst(
        occupational_health_profile_id=occupational_health_profile.id,
        first_name=profile.first_name or "⁉️",
        last_name=profile.last_name or "⁉️",
        gender=profile.gender,
        birthdate=profile.birth_date,
        medical_secrecy_worker_record_id=occupational_health_profile.medical_secrecy_worker_record_id,
        curriculum_laboris=CurriculumLaboris(
            occupational_medical_history=OccupationalMedicalHistory(
                types=[],  # In medical secrecy record (see get_medical_record)
                description=None,  # In medical secrecy record (see get_medical_record)
            ),
            jobs=(
                [
                    MemberJob(
                        id=job.id,
                        title=job.title,
                        start_date=job.start_date,
                        end_date=job.end_date,
                        employer=job.employer,
                        # The last job (oldest) is a generic "past job" job
                        is_past_job=job.is_past_job,
                        medical_secrecy_worker_job_id=job.medical_secrecy_worker_job_id,
                        description=None,  # In medical secrecy worker job (see get_medical_record)
                        working_hours=None,  # In medical secrecy worker job (see get_medical_record)
                        missions=None,  # In medical secrecy worker job (see get_medical_record)
                        physical_conditions=None,  # In medical secrecy worker job (see get_medical_record)
                        organizational_conditions=None,  # In medical secrecy worker job (see get_medical_record)
                        mental_conditions=None,  # In medical secrecy worker job (see get_medical_record)
                        tools=None,  # In medical secrecy worker job (see get_medical_record)
                        worn_equipment=None,  # In medical secrecy worker job (see get_medical_record)
                        risks_and_advice=None,  # In medical secrecy worker job (see get_medical_record)
                        contract_type=None,  # In medical secrecy worker job (see get_medical_record)
                        exposition_notations=[],  # In medical secrecy worker job (see get_medical_record)
                    )
                    for (index, job) in enumerate(jobs)
                ]
            ),
        ),
        administrative_profile=(
            AdministrativeProfile(
                ssn=ssn,
                health_statuses=administrative_profile.health_statuses,
                risk_category=administrative_profile.risk_category,
                notes=administrative_profile.notes,
                medical_record_sharing_consent=administrative_profile.medical_record_sharing_consent,
                medical_record_access_consent=administrative_profile.medical_record_access_consent,
                health_data_sharing_consent=administrative_profile.health_data_sharing_consent,
                video_consultation_consent=administrative_profile.video_consultation_consent,
                orient_prevention_of_work_consent=administrative_profile.orient_prevention_of_work_consent,
                transfer_dmst_to_spsti_consent=administrative_profile.transfer_dmst_to_spsti_consent,
                medical_record_sharing_consent_collected_at=administrative_profile.medical_record_sharing_consent_collected_at,
                medical_record_access_consent_collected_at=administrative_profile.medical_record_access_consent_collected_at,
                health_data_sharing_consent_collected_at=administrative_profile.health_data_sharing_consent_collected_at,
                video_consultation_consent_collected_at=administrative_profile.video_consultation_consent_collected_at,
                orient_prevention_of_work_consent_collected_at=administrative_profile.orient_prevention_of_work_consent_collected_at,
                transfer_dmst_to_spsti_consent_collected_at=administrative_profile.transfer_dmst_to_spsti_consent_collected_at,
                personal_email=personal_email,
                professional_email=professional_email,
                phone_number=phone_number,
            )
            if administrative_profile
            else AdministrativeProfile(
                ssn=ssn,
                health_statuses=[],
                risk_category=RiskCategory.SI,
                notes=None,
                medical_record_sharing_consent=None,
                medical_record_access_consent=None,
                health_data_sharing_consent=None,
                video_consultation_consent=None,
                orient_prevention_of_work_consent=None,
                transfer_dmst_to_spsti_consent=None,
                personal_email=personal_email,
                professional_email=professional_email,
                phone_number=phone_number,
            )
        ),
        health_history=HealthHistory(),  # In Medical secrecy Medical record (see get_medical_record)
        google_drive_folder_id=google_drive_folder_id,
    )

search

get_search_results

get_search_results(profile_service, search_input)

Search for members and accounts based on provided search input.

Note

The function currently only searches for members by full name

Source code in components/occupational_health/internal/business_logic/queries/search/search.py
@inject_profile_service
def get_search_results(
    profile_service: ProfileService, search_input: str
) -> SearchResult:
    """
    Search for members and accounts based on provided search input.

    Note:
        The function currently only searches for members by full name
    """
    from components.occupational_health.public.dependencies import get_app_dependency

    dependency = get_app_dependency()
    # Searching for the global profiles and associated occupational health profiles
    global_profiles = profile_service.search_profiles_by_fullname(search_input)
    global_profiles_by_id: dict[GlobalProfileId, list[Profile]] = group_by(
        global_profiles, lambda profile: GlobalProfileId(profile.id)
    )
    global_profiles_ids = list(global_profiles_by_id.keys())
    profile_with_jobs = (
        get_occupational_health_profile_by_global_profile_ids_with_latest_job(
            global_profiles_ids
        )
    )
    occupational_health_profiles = [profile for profile, jobs in profile_with_jobs]
    occupational_health_profiles_to_latest_health_job_mapping = {
        profile.id: job
        for (
            profile,
            job,
        ) in profile_with_jobs
    }
    occupational_health_profile_ids_by_administrative_profiles = {
        occupational_health_profile.id: occupational_health_profile.administrative_profile
        for occupational_health_profile in occupational_health_profiles
    }

    occupational_health_profile_ids_by_global_profile_id = (
        get_occupational_health_profile_ids_by_global_profile_id_mapping(
            global_profiles_ids
        )
    )
    company_names_by_occupational_health_profile_ids = (
        get_company_names_by_occupational_health_profile_id_mapping(
            occupational_health_profiles
        )
    )
    global_profile_ids_by_occupational_health_profile_ids = {
        value: key
        for key, value in occupational_health_profile_ids_by_global_profile_id.items()
    }

    affiliated_account_ids = get_all_account_ids_with_active_contract()
    matched_accounts = dependency.search_accounts_by_name(
        search_input, affiliated_account_ids
    )

    def get_latest_job_name_by_occupational_health_profile_id(
        occupational_health_profile_id: ProfileId,
    ) -> Optional[str]:
        latest_job = occupational_health_profiles_to_latest_health_job_mapping[
            occupational_health_profile_id
        ]
        return latest_job.title if latest_job else None

    def get_profile_by_occupational_health_profile_id(
        occupational_health_profile_id: ProfileId,
    ) -> Profile:
        global_profile_id = global_profile_ids_by_occupational_health_profile_ids[
            occupational_health_profile_id
        ]
        return one(global_profiles_by_id[global_profile_id])

    def get_risk_category_by_occupational_health_profile_id(
        occupational_health_profile_id: ProfileId,
    ) -> Optional[RiskCategory]:
        administrative_profile = (
            occupational_health_profile_ids_by_administrative_profiles[
                occupational_health_profile_id
            ]
        )
        return administrative_profile.risk_category if administrative_profile else None

    def get_company_names_by_occupational_health_profile_id(
        occupational_health_profile_id: ProfileId,
    ) -> Optional[str]:
        names = company_names_by_occupational_health_profile_ids.get(
            occupational_health_profile_id, None
        )
        return ",".join(names) if names else None

    return SearchResult(
        members=[
            Member(
                occupational_health_profile_id=str(occupational_health_profile_id),
                first_name=get_profile_by_occupational_health_profile_id(
                    occupational_health_profile_id
                ).first_name,
                last_name=get_profile_by_occupational_health_profile_id(
                    occupational_health_profile_id
                ).last_name,
                job_title=get_latest_job_name_by_occupational_health_profile_id(
                    occupational_health_profile_id
                ),
                company_name=get_company_names_by_occupational_health_profile_id(
                    occupational_health_profile_id
                ),
                risk_category=get_risk_category_by_occupational_health_profile_id(
                    occupational_health_profile_id
                ),
            )
            for occupational_health_profile_id in occupational_health_profile_ids_by_global_profile_id.values()
        ],
        accounts=[
            Account(id=str(account.id), name=account.name)
            for account in matched_accounts
        ],
        companies=[],  # TODO @david.barthelemy Remove this when frontend is deployed
    )

strategy_rules

AffiliationStrategyRuleForMarmot dataclass

AffiliationStrategyRuleForMarmot(
    rule_id,
    account_id,
    name,
    company_id,
    company_name,
    siret,
    company_name_from_siret,
    is_missing_siret,
    action,
    decisions_count,
    created_at,
    updated_at,
)

Bases: DataClassJsonMixin

Affiliation strategy rules displayed in Marmot.

account_id instance-attribute
account_id
action instance-attribute
action
company_id instance-attribute
company_id
company_name instance-attribute
company_name
company_name_from_siret instance-attribute
company_name_from_siret
created_at instance-attribute
created_at
decisions_count instance-attribute
decisions_count
is_missing_siret instance-attribute
is_missing_siret
name instance-attribute
name
rule_id instance-attribute
rule_id
siret instance-attribute
siret
updated_at instance-attribute
updated_at

get_affiliation_strategy_rules_for_account

get_affiliation_strategy_rules_for_account(account_id)

Get all affiliation strategy rules for a given account.

Source code in components/occupational_health/public/queries/strategy_rules.py
def get_affiliation_strategy_rules_for_account(
    account_id: AccountId | UUID,
) -> list[AffiliationStrategyRuleForMarmot]:
    """Get all affiliation strategy rules for a given account."""
    account_id = AccountId(account_id)

    rules = current_session.scalars(
        select(AffiliationStrategyRule)
        .where(AffiliationStrategyRule.account_id == account_id)
        # Maybe we should order them by priority instead
        .order_by(AffiliationStrategyRule.created_at.desc())
    ).all()

    rule_ids = [rule.id for rule in rules]
    decisions_count_by_rule_id: dict[UUID, int] = {}
    if rule_ids:
        counts = current_session.execute(
            select(
                OccupationalHealthAffiliationDecision.rule_id,
                func.count().label("decisions_count"),
            )
            .where(OccupationalHealthAffiliationDecision.rule_id.in_(rule_ids))
            .group_by(OccupationalHealthAffiliationDecision.rule_id)
        ).all()
        decisions_count_by_rule_id = {
            row.rule_id: row.decisions_count for row in counts
        }

    all_sirets = set(rule.siret for rule in rules if rule.siret)
    company_name_by_siret = {}
    for siret in all_sirets:
        company_data = get_app_dependency().get_company_from_siret(siret)
        if company_data:
            company_name = f"{company_data.name} ({company_data.city})"
            company_name_by_siret[siret] = company_name

    all_company_ids = set(rule.company_id for rule in rules if rule.company_id)
    company_name_by_company_id = get_app_dependency().get_company_names_by_ids(
        all_company_ids
    )

    return [
        AffiliationStrategyRuleForMarmot(
            rule_id=rule.id,
            account_id=rule.account_id,
            name=rule.name,
            company_name=company_name_by_company_id.get(rule.company_id)
            if rule.company_id
            else None,
            company_id=rule.company_id,
            siret=rule.siret,
            company_name_from_siret=company_name_by_siret.get(rule.siret)
            if rule.siret
            else None,
            is_missing_siret=rule.is_missing_siret,
            action=rule.action,
            decisions_count=decisions_count_by_rule_id.get(rule.id, 0),
            created_at=rule.created_at,
            updated_at=rule.updated_at,
        )
        for rule in rules
    ]

subscriber_documents

get_documents_for_subscriber

get_documents_for_subscriber(account_id)

Returns all non-deleted documents for a given subscriber account ID.

Source code in components/occupational_health/internal/business_logic/queries/subscriber_documents.py
def get_documents_for_subscriber(account_id: UUID) -> list[SubscriberDocument]:
    """Returns all non-deleted documents for a given subscriber account ID."""
    from components.occupational_health.public.queries.subscribers import (
        get_subscriber_entities_for_account,
    )

    documents = (
        current_session.execute(
            select(OccupationalHealthSubscriberDocument)
            .where(
                and_(
                    OccupationalHealthSubscriberDocument.account_id == account_id,
                    OccupationalHealthSubscriberDocument.deleted_at.is_(None),
                )
            )
            # Most recent first and add the ID in case of same date: always same order
            .order_by(
                OccupationalHealthSubscriberDocument.uploaded_at.desc(),
                OccupationalHealthSubscriberDocument.id,
            )
        )
        .scalars()
        .all()
    )

    entities = list(get_subscriber_entities_for_account(account_id))
    entity_name_by_siret = {e.siret: e.name for e in entities}
    entity_address_by_siret: dict[str, str | None] = {
        e.siret: f"{e.postal_code} {e.city}" if e.postal_code and e.city else None
        for e in entities
    }

    return [
        SubscriberDocument(
            id=document.id,
            public_uri=document.public_uri,
            filename=document.filename,
            account_id=document.account_id,
            siret=document.siret,
            company_name=entity_name_by_siret.get(document.siret)
            if document.siret
            else None,
            company_address=entity_address_by_siret.get(document.siret)
            if document.siret
            else None,
            uploaded_at=document.uploaded_at,
            document_type=document.document_type,
            mime_type=document.mime_type,
        )
        for document in documents
    ]

subscribers

get_affiliated_members_for_account

get_affiliated_members_for_account(
    account_id, profile_service
)

Return the list of currently affiliated members for a subscriber profile.

Source code in components/occupational_health/internal/business_logic/queries/subscribers/members.py
@inject_profile_service
@tracer_wrap()
def get_affiliated_members_for_account(
    account_id: AccountId,
    profile_service: ProfileService,
) -> list[SubscriberMember]:
    """
    Return the list of currently affiliated members for a subscriber profile.
    """
    affiliations = get_all_affiliations_for_account(
        account_id=account_id,
        only_active_on=utctoday(),
    )

    if not affiliations:
        return []

    occupational_health_profile_ids = {
        build_profile_id_from_affiliable_ref(affiliation.affiliable_ref)
        for affiliation in affiliations
    }

    profiles = list(
        current_session.execute(
            select(OccupationalHealthProfile).where(
                OccupationalHealthProfile.id.in_(occupational_health_profile_ids)
            )
        ).scalars()
    )
    profile_by_id = {profile.id: profile for profile in profiles}
    global_profile_ids = [profile.global_profile_id for profile in profiles]

    profiles_with_jobs = (
        get_occupational_health_profile_by_global_profile_ids_with_latest_job(
            global_profile_ids=global_profile_ids
        )
    )
    job_by_profile_id = {profile.id: job for profile, job in profiles_with_jobs}

    global_profiles = profile_service.get_profiles(global_profile_ids)
    global_profiles_by_id = {profile.id: profile for profile in global_profiles}

    company_names_by_profile_id = (
        get_company_names_by_occupational_health_profile_id_mapping(profiles)
    )

    risk_category_by_profile_id = get_current_risk_category_for_profiles(
        profile_ids=occupational_health_profile_ids,
    )

    employments_by_global_profile_id = get_employments_by_global_profile_id(
        account_id=account_id,
        global_profile_ids=global_profile_ids,
    )

    user_id_to_global_profile_id = get_user_id_to_global_profile_id_mapping(
        global_profile_ids=global_profile_ids
    )
    global_profile_id_to_user_id = {
        gp_id: user_id for user_id, gp_id in user_id_to_global_profile_id.items()
    }
    user_id_by_profile_id: dict[UserId, ProfileId] = {
        global_profile_id_to_user_id[profile.global_profile_id]: profile.id
        for profile in profiles
        if profile.global_profile_id in global_profile_id_to_user_id
    }

    # Fetch visits
    last_visit_by_profile_id = get_last_visit_for_profiles(
        profile_ids=occupational_health_profile_ids,
        user_id_mapping=user_id_by_profile_id,
    )
    next_visit_by_profile_id = get_future_visit_for_profiles(
        profile_ids=occupational_health_profile_ids,
        user_id_mapping=user_id_by_profile_id,
        employments_by_global_profile_id=employments_by_global_profile_id,
        affiliations=affiliations,
        risk_category_by_profile_id=risk_category_by_profile_id,
    )

    members = []
    for affiliation in affiliations:
        profile_id = build_profile_id_from_affiliable_ref(affiliation.affiliable_ref)

        if profile_id not in profile_by_id:
            current_logger.error(
                f"Profile {profile_id} not found for affiliation {affiliation.affiliation_id}",
            )
            continue

        profile = profile_by_id[profile_id]
        global_profile = global_profiles_by_id.get(profile.global_profile_id)

        if not global_profile:
            current_logger.error(
                f"Global profile {profile.global_profile_id} not found",
            )
            continue

        next_visit = next_visit_by_profile_id[profile_id]

        job = job_by_profile_id.get(profile_id)
        job_title = job.title if job else None

        company_names = list(company_names_by_profile_id.get(profile_id, set()))

        members.append(
            SubscriberMember(
                occupational_health_profile_id=str(profile_id),
                first_name=global_profile.first_name or "",
                last_name=global_profile.last_name or "",
                job_title=job_title,
                company_names=company_names,
                risk_category=risk_category_by_profile_id.get(profile_id),
                last_visit=last_visit_by_profile_id.get(profile_id),
                next_visit=next_visit,
                status=get_member_status(
                    next_visit=next_visit,
                ),
            )
        )

    return members

get_subscriber_entities_for_account

get_subscriber_entities_for_account(account_id)

Get all potential entities (SIRET) for an account.

Note: "potential" because not all entities returned will be affiliated to the account.

Source code in components/occupational_health/internal/business_logic/queries/subscribers/subscribers.py
def get_subscriber_entities_for_account(account_id: UUID) -> list[DSNCompanyData]:
    """
    Get all potential entities (SIRET) for an account.

    Note: "potential" because not all entities returned will be affiliated to the account.
    """
    dependency = get_app_dependency()

    return dependency.get_company_entities_by_account_id(AccountId(account_id))

get_subscriber_for_account

get_subscriber_for_account(account_id)

Get a subscriber for an account.

Source code in components/occupational_health/internal/business_logic/queries/subscribers/subscribers.py
def get_subscriber_for_account(account_id: UUID) -> Subscriber | None:
    """Get a subscriber for an account."""
    dependency = get_app_dependency()

    try:
        account_name = dependency.get_account_name(AccountId(account_id))
    except NoResultFound:
        return None

    companies = _get_companies_for_subscriber_by_account_id(account_id)

    return Subscriber(
        id=account_id,
        name=account_name,
        companies=companies,
    )

visits

get_all_visits_for_member

get_all_visits_for_member(
    profile_service, occupational_health_profile_id
)

Get all visits for a specific member

Parameters:

Name Type Description Default
profile_service ProfileService

The profile service for fetching member information

required
occupational_health_profile_id ProfileId | UUID

Optional filter by occupational health profile ID

required

Returns:

Type Description
list[VisitInfo]

List[VisitInfo]: List of visits scheduled for the specified date

Source code in components/occupational_health/internal/business_logic/queries/visits/medical_app.py
@inject_profile_service
def get_all_visits_for_member(
    profile_service: ProfileService,
    occupational_health_profile_id: ProfileId | UUID,
) -> list[VisitInfo]:
    """
    Get all visits for a specific member

    Args:
        profile_service: The profile service for fetching member information
        occupational_health_profile_id: Optional filter by occupational health profile ID

    Returns:
        List[VisitInfo]: List of visits scheduled for the specified date
    """

    if isinstance(occupational_health_profile_id, UUID):
        occupational_health_profile_id = ProfileId(occupational_health_profile_id)

    occupational_health_profile = (
        current_session.query(OccupationalHealthProfile)  # noqa: ALN085
        .filter(OccupationalHealthProfile.id == occupational_health_profile_id)
        .one_or_none()
    )
    if not occupational_health_profile:
        raise Exception("Occupational Health Profile not found")

    global_profile_id = occupational_health_profile.global_profile_id
    user_id = get_app_dependency().get_user_id_from_global_profile_id(global_profile_id)

    subquery_backend = current_session.query(  # noqa: ALN085
        OccupationalHealthVisit.id.label("visit_id"),
        OccupationalHealthVisit.profile_id.label("profile_id"),
        null().label("user_id"),
        OccupationalHealthVisit.visit_date.label("visit_date"),
        null().label("visit_hour_booked"),
        OccupationalHealthVisit.visit_type.label("visit_type"),
        literal_column(f"'{VisitStatus.HAPPENED}'").label("status"),
        OccupationalHealthVisit.health_professional_id.label("health_professional_id"),
        OccupationalHealthVisit.visit_setup.label("visit_setup"),
    ).filter(OccupationalHealthVisit.profile_id == occupational_health_profile_id)

    subquery_predictable = current_session.query(  # noqa: ALN085
        TuringOccupationalHealthPredictableVisit.id.label("visit_id"),
        null().label("profile_id"),
        TuringOccupationalHealthPredictableVisit.user_id.label("user_id"),
        TuringOccupationalHealthPredictableVisit.date_planned.label("visit_date"),
        TuringOccupationalHealthPredictableVisit.hour_booked.label("visit_hour_booked"),
        TuringOccupationalHealthPredictableVisit.visit_type.label("visit_type"),
        TuringOccupationalHealthPredictableVisit.status.label("status"),
        TuringOccupationalHealthPredictableVisit.hp_visit_owner_id.label(
            "health_professional_id"
        ),
        TuringOccupationalHealthPredictableVisit.visit_setup.label("visit_setup"),
    ).filter(TuringOccupationalHealthPredictableVisit.user_id == str(user_id))
    subquery_on_demand = current_session.query(  # noqa: ALN085
        TuringOccupationalHealthOnDemandVisit.id.label("visit_id"),
        null().label("profile_id"),
        TuringOccupationalHealthOnDemandVisit.user_id.label("user_id"),
        TuringOccupationalHealthOnDemandVisit.date_planned.label("visit_date"),
        TuringOccupationalHealthOnDemandVisit.hour_booked.label("visit_hour_booked"),
        TuringOccupationalHealthOnDemandVisit.visit_type.label("visit_type"),
        TuringOccupationalHealthOnDemandVisit.status.label("status"),
        TuringOccupationalHealthOnDemandVisit.hp_visit_owner_id.label(
            "health_professional_id"
        ),
        TuringOccupationalHealthOnDemandVisit.visit_setup.label("visit_setup"),
    ).filter(TuringOccupationalHealthOnDemandVisit.user_id == str(user_id))

    subquery = subquery_backend.union_all(
        subquery_predictable,
        subquery_on_demand,
    ).subquery()

    all_visits = current_session.query(subquery).all()  # noqa: ALN085

    # Fetch HPs
    health_professional_mapping = {
        hp.id: hp
        for hp in current_session.query(  # noqa: ALN085
            OccupationalHealthHealthProfessional.id,
            OccupationalHealthHealthProfessional.first_name,
            OccupationalHealthHealthProfessional.last_name,
        )
    }

    # Convert to dataclass instances
    result = []
    profile = profile_service.get_or_raise_profile(
        occupational_health_profile.global_profile_id
    )
    for visit in all_visits:
        # Get health professional name
        health_professional = health_professional_mapping.get(
            visit.health_professional_id
        )
        health_professional_name = (
            f"{health_professional.first_name} {health_professional.last_name}".strip()
            if health_professional
            else None
        )

        result.append(
            VisitInfo(
                visit_id=visit.visit_id,
                member_first_name=profile.first_name or "⁉️",
                member_last_name=profile.last_name or "⁉️",
                member_gender=profile.gender,
                member_birthdate=profile.birth_date,
                visit_date=visit.visit_date,
                visit_hour_booked=_format_visit_hour_booked(visit.visit_hour_booked),
                visit_type=visit.visit_type,
                health_professional_name=health_professional_name,
                health_professional_id=visit.health_professional_id,
                visit_status=visit.status,
                occupational_health_profile_id=occupational_health_profile_id,
                visit_setup=visit.visit_setup,
            )
        )

    return result

get_closest_visit_by_health_professional_and_timestamp

get_closest_visit_by_health_professional_and_timestamp(
    health_professional_id, on_datetime
)

Retrieves the closest occupational health visit by a health professional for a given day.

Gets all visits for the day and returns the one closest to on_datetime within allowed time window:

  • max 10 minutes before planned time (recording started early)
  • max 20 minutes after planned time (recording started late)

Warning: only querying turing tables for now, could refactor with code above

Source code in components/occupational_health/internal/business_logic/queries/visits/visits.py
def get_closest_visit_by_health_professional_and_timestamp(
    health_professional_id: UUID | None,
    on_datetime: datetime | None,
) -> Optional[PlannedVisit]:
    """
    Retrieves the closest occupational health visit by a health professional for a given day.

    Gets all visits for the day and returns the one closest to on_datetime within allowed time window:

    - max 10 minutes before planned time (recording started early)
    - max 20 minutes after planned time (recording started late)

    Warning: only querying turing tables for now, could refactor with code above
    """
    from sqlalchemy import select

    from components.occupational_health.external.global_profile import (
        get_occupational_health_profile_id,
    )
    from components.occupational_health.internal.enums.visit_status import VisitStatus
    from components.occupational_health.internal.models.turing.turing_occupational_health_on_demand_visit import (
        TuringOccupationalHealthOnDemandVisit,
    )
    from components.occupational_health.internal.models.turing.turing_occupational_health_predictable_visit import (
        TuringOccupationalHealthPredictableVisit,
    )
    from components.occupational_health.internal.types import UserId
    from shared.helpers.db import current_session

    if not health_professional_id or not on_datetime:
        raise RuntimeError(
            "Either or both health_professional_id / on_datetime must be provided."
        )

    on_demand_stmt = select(
        TuringOccupationalHealthOnDemandVisit.user_id,
        TuringOccupationalHealthOnDemandVisit.timestamp_planned,
    ).filter(
        TuringOccupationalHealthOnDemandVisit.hp_visit_owner_id
        == health_professional_id,
        TuringOccupationalHealthOnDemandVisit.date_planned == on_datetime.date(),
        TuringOccupationalHealthOnDemandVisit.status == VisitStatus.HAPPENED,
    )

    predictable_stmt = select(
        TuringOccupationalHealthPredictableVisit.user_id,
        TuringOccupationalHealthPredictableVisit.timestamp_planned,
    ).filter(
        TuringOccupationalHealthPredictableVisit.hp_visit_owner_id
        == health_professional_id,
        TuringOccupationalHealthPredictableVisit.date_planned == on_datetime.date(),
        TuringOccupationalHealthPredictableVisit.status == VisitStatus.HAPPENED,
    )

    unioned = on_demand_stmt.union_all(predictable_stmt).subquery()
    stmt = select(
        unioned.c.user_id.label("user_id"),
        unioned.c.timestamp_planned.label("timestamp_planned"),
    )
    visits = current_session.execute(stmt).fetchall()

    if not visits:
        current_logger.info(
            f"No visit found for HP {health_professional_id} on {on_datetime.date()}."
        )
        return None

    # Find the visit closest to on_datetime
    # Use naive datetimes for comparison (timestamp_planned is naive, on_datetime may be aware)
    on_datetime_naive = on_datetime.replace(tzinfo=None)

    # Filter visits within allowed time window:
    # - max 10 minutes before planned time (recording started early)
    # - max 20 minutes after planned time (recording started late)
    max_before_seconds = 10 * 60  # 10 minutes
    max_after_seconds = 20 * 60  # 20 minutes

    valid_visits = []
    for visit in visits:
        diff_seconds = (on_datetime_naive - visit.timestamp_planned).total_seconds()
        # diff_seconds > 0 means on_datetime is after planned (recording late)
        # diff_seconds < 0 means on_datetime is before planned (recording early)
        if -max_before_seconds <= diff_seconds <= max_after_seconds:
            valid_visits.append(visit)

    if not valid_visits:
        current_logger.info(
            f"No visit within time window for HP {health_professional_id} "
            f"on {on_datetime.date()} (max 10min before, 20min after planned time)."
        )
        return None

    closest_visit = min(
        valid_visits,
        key=lambda v: abs((v.timestamp_planned - on_datetime_naive).total_seconds()),
    )

    if not closest_visit.user_id:
        current_logger.error(
            f"Visit for HP {health_professional_id} on {on_datetime.date()} has no user_id."
        )
        return None

    user_id = UserId(closest_visit.user_id)
    profile_id = get_occupational_health_profile_id(user_id)

    # Calculate minutes difference between on_datetime and the found visit
    time_diff_seconds = abs(
        (closest_visit.timestamp_planned - on_datetime_naive).total_seconds()
    )
    minutes_difference = int(time_diff_seconds / 60)

    return PlannedVisit(
        profile_id=profile_id,
        timestamp_planned=closest_visit.timestamp_planned,
        minutes_difference=minutes_difference,
    )

get_visit_details_by_hp_and_date

get_visit_details_by_hp_and_date(
    health_professional_id,
    visit_date,
    occupational_health_profile_id,
)

Find a turing visit by HP ID, date, and occupational health profile ID.

Source code in components/occupational_health/internal/business_logic/queries/visits/visits.py
def get_visit_details_by_hp_and_date(
    health_professional_id: UUID,
    visit_date: date,
    occupational_health_profile_id: UUID,
) -> Optional[PlannedVisitWithDetails]:
    """Find a turing visit by HP ID, date, and occupational health profile ID."""
    from sqlalchemy import literal_column, select

    from components.occupational_health.external.global_profile import (
        get_user_id_from_occupational_health_profile_id,
    )
    from components.occupational_health.internal.queries.health_professional import (
        get_or_raise_health_professional,
    )

    user_id = get_user_id_from_occupational_health_profile_id(
        ProfileId(occupational_health_profile_id)
    )
    hp = get_or_raise_health_professional(health_professional_id)
    hp_name = f"{hp.first_name} {hp.last_name}"

    user_id_str = str(user_id)

    on_demand_stmt = select(  # type: ignore[var-annotated]
        TuringOccupationalHealthOnDemandVisit.id,
        TuringOccupationalHealthOnDemandVisit.timestamp_planned,
        TuringOccupationalHealthOnDemandVisit.visit_type,
        TuringOccupationalHealthOnDemandVisit.visit_setup,
        literal_column("'on_demand'").label("visit_table"),
    ).filter(
        TuringOccupationalHealthOnDemandVisit.hp_visit_owner_id
        == health_professional_id,
        TuringOccupationalHealthOnDemandVisit.date_planned == visit_date,
        TuringOccupationalHealthOnDemandVisit.status == VisitStatus.HAPPENED,
        TuringOccupationalHealthOnDemandVisit.user_id == user_id_str,
    )

    predictable_stmt = select(  # type: ignore[var-annotated]
        TuringOccupationalHealthPredictableVisit.id,
        TuringOccupationalHealthPredictableVisit.timestamp_planned,
        TuringOccupationalHealthPredictableVisit.visit_type,
        TuringOccupationalHealthPredictableVisit.visit_setup,
        literal_column("'predictable'").label("visit_table"),
    ).filter(
        TuringOccupationalHealthPredictableVisit.hp_visit_owner_id
        == health_professional_id,
        TuringOccupationalHealthPredictableVisit.date_planned == visit_date,
        TuringOccupationalHealthPredictableVisit.status == VisitStatus.HAPPENED,
        TuringOccupationalHealthPredictableVisit.user_id == user_id_str,
    )

    unioned = on_demand_stmt.union_all(predictable_stmt).subquery()
    visit = current_session.execute(select(unioned)).one_or_none()

    if not visit:
        return None

    return PlannedVisitWithDetails(
        visit_id=visit.id,
        profile_id=occupational_health_profile_id,
        timestamp_planned=visit.timestamp_planned,
        visit_table=visit.visit_table,
        visit_type=visit.visit_type,
        visit_setup=visit.visit_setup,
        health_professional_name=hp_name,
    )

get_visits

get_visits(profile_service, on_date)

Get all visits scheduled for a specific date. If no date is provided, return visits for today.

Parameters:

Name Type Description Default
profile_service ProfileService

The profile service for fetching member information

required
on_date date | None

The date to fetch visits for. If None, uses today's date.

required

Returns:

Type Description
list[VisitInfo]

List[VisitInfo]: List of visits scheduled for the specified date

Source code in components/occupational_health/internal/business_logic/queries/visits/medical_app.py
@inject_profile_service
def get_visits(
    profile_service: ProfileService,
    on_date: date | None,
) -> list[VisitInfo]:
    """
    Get all visits scheduled for a specific date.
    If no date is provided, return visits for today.

    Args:
        profile_service: The profile service for fetching member information
        on_date: The date to fetch visits for. If None, uses today's date.

    Returns:
        List[VisitInfo]: List of visits scheduled for the specified date
    """
    if not on_date:
        on_date = utctoday()

    # Query visits scheduled for the specified date
    subquery_backend = current_session.query(  # type: ignore[var-annotated] # noqa: ALN085
        OccupationalHealthVisit.profile_id.label("profile_id"),
        null().label("user_id"),
        OccupationalHealthVisit.visit_date.label("visit_date"),
        null().label("visit_hour_booked"),
        OccupationalHealthVisit.visit_type.label("visit_type"),
        literal_column(f"'{VisitStatus.HAPPENED}'").label("status"),
        OccupationalHealthVisit.health_professional_id.label("health_professional_id"),
        OccupationalHealthVisit.visit_setup.label("visit_setup"),
    ).filter(
        # CAVEAT: the data is stored using the local date, not UTC
        OccupationalHealthVisit.visit_date == on_date,
        OccupationalHealthVisit.profile_id.is_not(None),
    )

    subquery_predictable = current_session.query(  # noqa: ALN085
        null().label("profile_id"),
        TuringOccupationalHealthPredictableVisit.user_id.label("user_id"),
        TuringOccupationalHealthPredictableVisit.date_planned.label("visit_date"),
        TuringOccupationalHealthPredictableVisit.hour_booked.label("visit_hour_booked"),
        TuringOccupationalHealthPredictableVisit.visit_type.label("visit_type"),
        TuringOccupationalHealthPredictableVisit.status.label("status"),
        TuringOccupationalHealthPredictableVisit.hp_visit_owner_id.label(
            "health_professional_id"
        ),
        TuringOccupationalHealthPredictableVisit.visit_setup.label("visit_setup"),
    ).filter(
        TuringOccupationalHealthPredictableVisit.date_planned == on_date,
    )
    subquery_on_demand = current_session.query(  # noqa: ALN085
        null().label("profile_id"),
        TuringOccupationalHealthOnDemandVisit.user_id.label("user_id"),
        TuringOccupationalHealthOnDemandVisit.date_planned.label("visit_date"),
        TuringOccupationalHealthOnDemandVisit.hour_booked.label("visit_hour_booked"),
        TuringOccupationalHealthOnDemandVisit.visit_type.label("visit_type"),
        TuringOccupationalHealthOnDemandVisit.status.label("status"),
        TuringOccupationalHealthOnDemandVisit.hp_visit_owner_id.label(
            "health_professional_id"
        ),
        TuringOccupationalHealthOnDemandVisit.visit_setup.label("visit_setup"),
    ).filter(
        TuringOccupationalHealthOnDemandVisit.date_planned == on_date,
    )

    subquery = subquery_backend.union_all(
        subquery_predictable,
        subquery_on_demand,
    ).subquery()

    all_visits = current_session.query(subquery).all()  # noqa: ALN085

    # Now, the mapping mess: we either have the user ID, or the (Occupational Health) profile ID

    # Get the global profile ID for the user IDs
    user_ids_found = {
        visit.user_id for visit in all_visits if visit.user_id is not None
    }
    user_id_mapping = (
        get_app_dependency().get_user_id_by_global_profile_id_mapping_from_user_ids(
            user_ids_found
        )
    )
    global_profile_id_by_user_id = {
        user_id: global_profile_id
        for global_profile_id, user_id in user_id_mapping.items()
    }

    # Get the list of found profile IDs
    profile_ids_found = {
        visit.profile_id for visit in all_visits if visit.profile_id is not None
    }

    # Based on those two lists, get the final mapping of (Occupational Health) profile IDs indexed by the Global Profile ID
    profile_id_by_global_profile_id: dict[GlobalProfileId, ProfileId] = {
        profile.global_profile_id: profile.id
        for profile in (
            current_session.query(  # noqa: ALN085
                OccupationalHealthProfile.id,
                OccupationalHealthProfile.global_profile_id,
            )
            .filter(
                or_(
                    OccupationalHealthProfile.global_profile_id.in_(
                        user_id_mapping.keys()
                    ),
                    OccupationalHealthProfile.id.in_(profile_ids_found),
                )
            )
            .all()
        )
    }

    # Get profile information from profile service
    profiles = profile_service.get_profiles(
        profile_ids=list(profile_id_by_global_profile_id.keys())
    )
    profiles_by_occupational_health_profile_id = {
        profile_id_by_global_profile_id[
            GlobalProfileId(global_profile.id)
        ]: global_profile
        for global_profile in profiles
    }

    # Fetch HPs
    health_professional_mapping = {
        hp.id: hp
        for hp in current_session.query(  # noqa: ALN085
            OccupationalHealthHealthProfessional.id,
            OccupationalHealthHealthProfessional.first_name,
            OccupationalHealthHealthProfessional.last_name,
        )
    }

    # Convert to dataclass instances
    result = []
    for visit in all_visits:
        # Get health professional name
        health_professional = health_professional_mapping.get(
            visit.health_professional_id
        )
        health_professional_name = (
            f"{health_professional.first_name} {health_professional.last_name}".strip()
            if health_professional
            else None
        )

        # Get member information from global profile
        occupational_health_profile_id = visit.profile_id
        if not occupational_health_profile_id:
            # Visit imported from GSheets, we only know the User ID
            global_profile_id = global_profile_id_by_user_id.get(visit.user_id)
            if not global_profile_id:
                current_logger.error(
                    f"Found spreadsheet visit with unknown user ID {visit.user_id} ({visit.visit_type} on date {visit.visit_date})."
                    " The user ID from the GSheet probably doesn't exist.",
                    stack_info=False,
                )
                # Do not crash but return "empty" info about member. HPs will not be
                # able to open the DMST.
                result.append(_make_empty_visit_info(visit, health_professional_name))
                continue

            if global_profile_id not in profile_id_by_global_profile_id:
                current_logger.error(
                    f"Global Profile ID {global_profile_id} ({visit.user_id=}) not found for visit ({visit.visit_type} on date {visit.visit_date})."
                    " The user is probably not affiliated to Prévenir.",
                    stack_info=False,
                )
                # Do not crash but return "empty" info about member. HPs will not be
                # able to open the DMST.
                result.append(_make_empty_visit_info(visit, health_professional_name))
                continue

            occupational_health_profile_id = profile_id_by_global_profile_id[
                global_profile_id
            ]

        if (
            occupational_health_profile_id
            not in profiles_by_occupational_health_profile_id
        ):
            current_logger.error(
                f"Occupational Health Profile ID {occupational_health_profile_id} ({visit.user_id=}) not found for visit ({visit.visit_type} on date {visit.visit_date})."
                "The profile is probably not affiliated to Prévenir.",
                stack_info=False,
            )
            # Do not crash but return "empty" info about member. HPs will not be
            # able to open the DMST.
            result.append(_make_empty_visit_info(visit, health_professional_name))
            continue

        global_profile = profiles_by_occupational_health_profile_id[
            occupational_health_profile_id
        ]

        result.append(
            VisitInfo(
                member_first_name=global_profile.first_name or "⁉️",
                member_last_name=global_profile.last_name or "⁉️",
                member_gender=global_profile.gender,
                member_birthdate=global_profile.birth_date,
                visit_date=visit.visit_date,
                visit_hour_booked=_format_visit_hour_booked(visit.visit_hour_booked),
                visit_type=visit.visit_type,
                health_professional_name=health_professional_name,
                health_professional_id=visit.health_professional_id,
                visit_status=visit.status,
                visit_id=None,  # Not used for now in this method.
                occupational_health_profile_id=profile_id_by_global_profile_id[
                    GlobalProfileId(global_profile.id)
                ],
                visit_setup=visit.visit_setup,
            )
        )

    # Sort visits by visit hour booked (if available) and put None value in front
    result.sort(
        key=lambda v: (v.visit_hour_booked not in (None, ""), v.visit_hour_booked or "")
    )
    return result

workspace_actions

get_all_workspace_actions

get_all_workspace_actions(
    occupational_health_profile_id=None, account_id=None
)

Get all workspace actions, optionally filtered by occupational health profile ID or account ID (XOR).

Parameters:

Name Type Description Default
occupational_health_profile_id Optional[UUID]

Optional profile ID to filter member-type actions

None
account_id Optional[UUID]

Optional account ID to filter company-type actions

None

Returns:

Type Description
list[WorkspaceAction]

List of WorkspaceAction entities

Source code in components/occupational_health/public/queries/workspace_actions.py
def get_all_workspace_actions(
    occupational_health_profile_id: Optional[UUID] = None,
    account_id: Optional[UUID] = None,
) -> list[WorkspaceAction]:
    """
    Get all workspace actions, optionally filtered by occupational health profile ID or account ID (XOR).

    Args:
        occupational_health_profile_id: Optional profile ID to filter member-type actions
        account_id: Optional account ID to filter company-type actions

    Returns:
        List of WorkspaceAction entities
    """
    return _get_all_workspace_actions(
        occupational_health_profile_id=occupational_health_profile_id,
        account_id=account_id,
    )

components.occupational_health.public.testing

OccupationalHealthHealthProfessionalFactory

Bases: AlanBaseFactory['OccupationalHealthHealthProfessional']

Meta

model class-attribute instance-attribute
model = OccupationalHealthHealthProfessional

first_name class-attribute instance-attribute

first_name = Faker('first_name')

last_name class-attribute instance-attribute

last_name = Faker('last_name')

role class-attribute instance-attribute

role = 'Doctor'

components.occupational_health.public.thesaurus

expositions

actions

parse_and_store_tep_rdf
parse_and_store_tep_rdf(file_path, commit=True)

Parse a TEP RDF file (compressed or not) and store expositions in the database.

Parameters:

Name Type Description Default
file_path str | Path

Path to the RDF file to parse (can be gzip compressed with .gz extension)

required

Raises:

Type Description
TepParsingError

If there is an error parsing the file or if the file is invalid

FileNotFoundError

If the file does not exist

Source code in components/occupational_health/internal/thesaurus/expositions/parser.py
def parse_and_store_tep_rdf(file_path: str | Path, commit: bool = True) -> None:
    """
    Parse a TEP RDF file (compressed or not) and store expositions in the database.

    Args:
        file_path: Path to the RDF file to parse (can be gzip compressed with .gz extension)

    Raises:
        TepParsingError: If there is an error parsing the file or if the file is invalid
        FileNotFoundError: If the file does not exist
    """
    # Convert to Path object if string
    file_path = Path(file_path)

    try:
        # Parse the file based on whether it's compressed or not
        if file_path.suffix == ".gz":
            with GzipFile(file_path) as gz_file:
                tree = ET.parse(gz_file)
        else:
            tree = ET.parse(file_path)

        root = tree.getroot()

        # Define XML namespaces used in the RDF file
        namespaces = {
            "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
            "rdfs": "http://www.w3.org/2000/01/rdf-schema#",
            "skos": "http://www.w3.org/2004/02/skos/core#",
            "dct": "http://purl.org/dc/terms/",
            "dc": "http://purl.org/dc/elements/1.1/",
            "adms": "http://www.w3.org/ns/adms#",
            "tep": "http://data.esante.gouv.fr/tep/",
        }

        expositions: list[ParsedThesaurusOccupationalExposure] = []

        # Find all Description elements that have a rdfs:label
        descriptions = root.findall(".//rdf:Description", namespaces)

        for desc in descriptions:
            # Extract required fields using XML paths
            label_elem = desc.find("rdfs:label", namespaces)
            notation_elem = desc.find("skos:notation", namespaces)
            type_elem = desc.find("dc:type", namespaces)
            status_elem = desc.find("adms:status", namespaces)
            created_elem = desc.find("dct:created", namespaces)
            version_elem = desc.find("tep:versionTep", namespaces)
            line_num_elem = desc.find("tep:numLigne", namespaces)

            # Skip if any required field is missing
            if (
                label_elem is None
                or notation_elem is None
                or type_elem is None
                or status_elem is None
                or created_elem is None
                or version_elem is None
                or line_num_elem is None
            ):
                continue

            # Get parent notation: it always exists
            parent_notation = None
            parent_elem = desc.find("rdfs:subClassOf", namespaces)
            if parent_elem is not None:
                parent_ref = parent_elem.get(
                    "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource"
                )
                if parent_ref:
                    # Find the parent Description element
                    parent_desc = root.find(
                        f".//rdf:Description[@rdf:about='{parent_ref}']", namespaces
                    )
                    if parent_desc is not None:
                        parent_notation_elem = parent_desc.find(
                            "skos:notation", namespaces
                        )
                        if parent_notation_elem is not None:
                            parent_notation = parent_notation_elem.text

            # Get risk category from list of risks
            risk_category = RiskCategory.SI
            if label_elem.text.strip().lower() in SIR_LABELS:
                risk_category = RiskCategory.SIR
            if (
                label_elem.text.strip().lower() in SIA_LABELS
                or notation_elem.text in SIA_NOTATIONS
            ):
                risk_category = RiskCategory.SIA

            try:
                # Skip if type is not in TepDcType enum
                if type_elem.text not in [e.value for e in DcType]:
                    current_logger.error(
                        f"Skipping exposition {notation_elem.text} due to invalid type: {type_elem.text}"
                    )
                    continue

                exposition = ParsedThesaurusOccupationalExposure(
                    notation=notation_elem.text,
                    label=label_elem.text.strip(),
                    dc_type=DcType.validate(type_elem.text),
                    status=AdmsStatus.validate(status_elem.text),
                    created_at=datetime.fromisoformat(created_elem.text),
                    version=version_elem.text,
                    line_number=int(line_num_elem.text),
                    risk_category=risk_category,
                    parent_notation=mandatory(parent_notation),
                )
                expositions.append(exposition)
            except Exception as exception:
                raise TepParsingError(
                    f"Error parsing exposition with notation {notation_elem.text}: {str(exception)[:250]}"
                ) from exception

        if not expositions:
            raise TepParsingError("No valid expositions found in the file")

        current_logger.info(f"Found {len(expositions)} TEP expositions to upsert")

        # Add information that is not present in the initial file
        add_class_subclass_notations(expositions)

        # Bulk insert with upsert on conflict (PostgreSQL specific)
        statement = insert(ThesaurusOccupationalExposure).values(
            [exposition.to_dict() for exposition in expositions]
        )
        statement = statement.on_conflict_do_update(
            index_elements=["notation"],
            set_={
                "label": statement.excluded.label,
                "dc_type": statement.excluded.dc_type,
                "status": statement.excluded.status,
                "created_at": statement.excluded.created_at,
                "version": statement.excluded.version,
                "line_number": statement.excluded.line_number,
                "risk_category": statement.excluded.risk_category,
                "parent_notation": statement.excluded.parent_notation,
                "class_notation": statement.excluded.class_notation,
                "subclass_notation": statement.excluded.subclass_notation,
            },
        )
        current_session.execute(statement)
        if commit:
            current_session.commit()
        else:
            current_session.rollback()

        current_logger.info(f"Successfully stored {len(expositions)} TEP expositions")

    except FileNotFoundError as exception:
        raise TepParsingError(f"File {file_path} not found") from exception

    except Exception as exception:
        current_session.rollback()
        raise TepParsingError(
            f"Error parsing TEP RDF file {file_path}: {str(exception)[:250]}"
        ) from exception

queries

get_expositions
get_expositions(exposition_notations)

Return the exposition data matching the given notations

Source code in components/occupational_health/internal/thesaurus/expositions/queries.py
@tracer_wrap()
def get_expositions(
    exposition_notations: list[str],
) -> list[ThesaurusOccupationalExposure]:
    """
    Return the exposition data matching the given notations
    """
    expositions = ThesaurusOccupationalExposureBroker.get_by_notations(
        exposition_notations
    ).all()
    class_and_subclass_labels_by_notation = get_class_subclass_labels(list(expositions))
    return [
        ThesaurusOccupationalExposureMapper.to_entity(
            model, class_and_subclass_labels_by_notation
        )
        for model in expositions
    ]
search_expositions
search_expositions(search_input)

Return the exposition data matching the given search input (case-insensitive and accent-insensitive)

Note: adds the class and subclass labels to the exposition objects

Source code in components/occupational_health/internal/thesaurus/expositions/queries.py
@tracer_wrap()
def search_expositions(search_input: str) -> list[ThesaurusOccupationalExposure]:
    """
    Return the exposition data matching the given search input (case-insensitive and accent-insensitive)

    Note: adds the class and subclass labels to the exposition objects
    """
    expositions = ThesaurusOccupationalExposureBroker.search_in_label(
        search_input
    ).all()
    class_and_subclass_labels_by_notation = get_class_subclass_labels(list(expositions))
    return [
        ThesaurusOccupationalExposureMapper.to_entity(
            model, class_and_subclass_labels_by_notation
        )
        for model in expositions
    ]

means

actions

parse_and_store_thesaurus_means
parse_and_store_thesaurus_means(dry_run)

Import Thesaurus mean data from an xlsx file (extracted logic)

Source code in components/occupational_health/internal/thesaurus/means/parse_and_store_thesaurus_means.py
def parse_and_store_thesaurus_means(dry_run: bool) -> None:
    """Import Thesaurus mean data from an xlsx file (extracted logic)"""
    import pandas as pd

    filename = "data/thesaurus_amts.xlsx"
    s3_bucket_name = "alan-medical-secrecy-api"
    s3_bucket = (
        f"{s3_bucket_name}-prod"
        if is_production_mode()
        else f"{s3_bucket_name}-staging"
    )

    current_logger.info(f"Download {filename} from S3 (bucket: {s3_bucket})...")
    xls_fileobj = io.BytesIO()

    RemoteFileClient.download_from_key_into(filename, xls_fileobj, s3_bucket=s3_bucket)

    xls_fileobj.seek(0)
    df = pd.read_excel(xls_fileobj)

    df[TO_BE_DISPLAYED_COLUMN_NAME] = (
        df[TO_BE_DISPLAYED_COLUMN_NAME].astype(str).str.strip().str.lower()
    )
    df[LABEL_COLUMN_NAME] = df[LABEL_COLUMN_NAME].astype(str).str.strip()
    # Filter labels when tagged as "can be displayed"
    df = df[
        (df[TO_BE_DISPLAYED_COLUMN_NAME] == "yes") & (df[LABEL_COLUMN_NAME].notna())
    ]

    labels = df[LABEL_COLUMN_NAME].unique()
    if not len(labels):
        current_logger.info("No label to import")
        return
    existing_labels = set(
        current_session.scalars(
            select(ThesaurusMean.label).where(ThesaurusMean.label.in_(labels))
        ).all()
    )

    to_create = [
        ThesaurusMean(label=label) for label in labels if label not in existing_labels
    ]

    if not to_create:
        current_logger.info("No ThesaurusMean to import")
        return

    current_session.add_all(to_create)
    if dry_run:
        current_session.flush()
        current_logger.info(f"DRY RUN: {len(to_create)} ThesaurusMean created")
    else:
        current_session.commit()
        current_logger.info(f"{len(to_create)} ThesaurusMean created")

queries

get_all_thesaurus_means
get_all_thesaurus_means()
Source code in components/occupational_health/internal/thesaurus/means/queries.py
@tracer_wrap()
def get_all_thesaurus_means() -> list[ThesaurusMean]:
    means = ThesaurusMeanBroker.get_all().all()
    return [ThesaurusMeanMapper.to_entity(model) for model in means]

terminologies

queries

search_terminologies
search_terminologies(search_input)

Return the terminology data matching the given search input (case-insensitive and accent-insensitive)

Source code in components/occupational_health/internal/thesaurus/terminologies/queries.py
@tracer_wrap()
def search_terminologies(search_input: str) -> list[Terminology]:
    """
    Return the terminology data matching the given search input (case-insensitive and accent-insensitive)
    """
    matching_data: list[Terminology] = []
    terminologies_data = _get_terminologies_data()

    # Normalize search input once
    normalized_search = remove_accents(search_input.lower())

    for terminology in terminologies_data:
        # Normalize the fields being searched
        normalized_label = remove_accents(terminology.label.lower())

        if normalized_search in normalized_label:
            matching_data.append(terminology)
    return matching_data

components.occupational_health.public.types

AccountId module-attribute

AccountId = NewType('AccountId', UUID)

The Account ID type.

CompanyId module-attribute

CompanyId = NewType('CompanyId', str)

The Company ID type. We use 'str' to not make assumption on the country's representation of the ID.

GlobalProfileId module-attribute

GlobalProfileId = NewType('GlobalProfileId', UUID)

The Global Profile ID type.

ProfileId module-attribute

ProfileId = NewType('ProfileId', UUID)

The Occupational Health Profile ID type.

UserId module-attribute

UserId = NewType('UserId', str)

The country-specific User ID type. We use 'str' to not make assumption on the country's representation of the ID.