Skip to content

Api reference

components.documents.public.blueprint

documents_blueprint module-attribute

documents_blueprint = CustomBlueprint(
    "documents", __name__, cli_group="documents"
)

register_blueprint

register_blueprint(state)
Source code in components/documents/public/blueprint.py
@documents_blueprint.record_once
def register_blueprint(state) -> None:  # type: ignore[no-untyped-def]  # noqa: ARG001, D103
    from components.documents.public.commands.document_embedding import (  # noqa: F401
        recompute_documents_embedding,
    )

components.documents.public.business_logic

batches

actions

add_document_to_batch
add_document_to_batch(
    batch_id,
    document_id,
    external_id=None,
    document_type=None,
    stack=None,
)

Adds a document to an existing performance batch by inserting a record into the DOCUMENT_PARSING.BATCH_DOCUMENTS table.

Parameters:

Name Type Description Default
batch_id int

ID of the batch to add the document to

required
document_id str

ID of the document to add to the batch

required
external_id str | None

External ID of the document

None
document_type str | None

Type of the document

None
stack str | None

Stack the document belongs to

None

Raises:

Type Description
ValueError

If batch_id is invalid or document doesn't exist

Source code in components/documents/public/business_logic/batches/actions.py
@obs.api_call()
def add_document_to_batch(
    batch_id: int,
    document_id: str,
    external_id: str | None = None,
    document_type: str | None = None,
    stack: str | None = None,
) -> None:
    """
    Adds a document to an existing performance batch by inserting a record into
    the DOCUMENT_PARSING.BATCH_DOCUMENTS table.

    Args:
        batch_id: ID of the batch to add the document to
        document_id: ID of the document to add to the batch
        external_id: External ID of the document
        document_type: Type of the document
        stack: Stack the document belongs to

    Raises:
        ValueError: If batch_id is invalid or document doesn't exist
    """
    if not document_id:
        raise ValueError("Document ID cannot be empty")

    # Check if the document exists in the main database
    uuid_doc_id = UUID(str(document_id))
    get_or_raise_missing_resource(Document, uuid_doc_id)

    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        # Check if the batch exists
        _get_batch_or_raise(batch_id)

        # Check if the document already exists in any batch to preserve validation status
        # This simulates a global validation status for the document
        cursor.execute(
            """
                SELECT is_validated
                FROM document_parsing.batch_documents
                WHERE document_id = %s
                ORDER BY created_at DESC
                LIMIT 1
                """,
            (document_id,),
        )

        row = cursor.fetchone()
        is_validated = row[0] if row else False

        # Snowflake does not support ON CONFLICT (batch_id, document_id) DO NOTHING
        cursor.execute(
            """
                MERGE INTO document_parsing.batch_documents target
                USING (SELECT %s as batch_id, %s as document_id, %s as created_at,
                       %s as is_validated, %s as external_id, %s as document_type, %s as stack) source
                ON target.batch_id = source.batch_id AND target.document_id = source.document_id
                WHEN NOT MATCHED THEN
                    INSERT (batch_id, document_id, created_at, is_validated, external_id, document_type, stack)
                    VALUES (source.batch_id, source.document_id, source.created_at,
                            source.is_validated, source.external_id, source.document_type, source.stack)
                """,
            (
                batch_id,
                document_id,
                datetime.now(),
                is_validated,
                external_id,
                document_type,
                stack,
            ),
        )
create_new_batch
create_new_batch(batch_name)

Creates a new batch by adding a row in the Turing table DOCUMENT_PARSING.PERFORMANCE_BATCH.

Parameters:

Name Type Description Default
batch_name str

Name of the batch to create

required

Returns:

Type Description
str

The batch name

Raises:

Type Description
ValueError

If batch_name is empty or None

Source code in components/documents/public/business_logic/batches/actions.py
@obs.api_call()
def create_new_batch(
    batch_name: str,
) -> str:
    """
    Creates a new batch by adding a row in the Turing table DOCUMENT_PARSING.PERFORMANCE_BATCH.

    Args:
        batch_name: Name of the batch to create

    Returns:
        The batch name

    Raises:
        ValueError: If batch_name is empty or None
    """
    if not batch_name:
        raise ValueError("Batch name cannot be empty")

    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        # Execute insert query
        cursor.execute(
            """
                INSERT INTO document_parsing.performance_batches(
                    name,
                    created_at
                )
                VALUES (
                    %s,
                    %s
                )
                """,
            (
                batch_name,
                datetime.now(),
            ),
        )

    return batch_name
delete_batch
delete_batch(batch_id)

Deletes a batch from Turing.

Parameters:

Name Type Description Default
batch_id int

ID of the batch to delete

required

Raises:

Type Description
ValueError

If batch_id is empty

Source code in components/documents/public/business_logic/batches/actions.py
@obs.api_call()
def delete_batch(batch_id: int) -> None:
    """
    Deletes a batch from Turing.

    Args:
        batch_id: ID of the batch to delete

    Raises:
        ValueError: If batch_id is empty
    """
    if not batch_id:
        raise ValueError("Batch name cannot be empty")

    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        _get_batch_or_raise(batch_id)

        # Then delete the batch itself
        cursor.execute(
            """
                DELETE FROM document_parsing.performance_batches
                WHERE id = %s
                """,
            (batch_id,),
        )
flag_batch_document_as_validated
flag_batch_document_as_validated(document_id, validated)

Updates the is_validated flag for a document in all its batches.

Parameters:

Name Type Description Default
document_id str

ID of the document to update

required
validated bool

Whether to mark the document as validated or not

required

Raises:

Type Description
ValueError

If document is not found

Source code in components/documents/public/business_logic/batches/actions.py
@obs.api_call()
def flag_batch_document_as_validated(
    document_id: str,
    validated: bool,
) -> None:
    """
    Updates the is_validated flag for a document in all its batches.

    Args:
        document_id: ID of the document to update
        validated: Whether to mark the document as validated or not

    Raises:
        ValueError: If document is not found
    """
    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        # Update the is_validated flag for all instances of the document
        cursor.execute(
            """
                UPDATE document_parsing.batch_documents
                SET is_validated = %s
                WHERE document_id = %s
                """,
            (validated, str(document_id)),
        )

        # Check if the document was found
        if cursor.rowcount == 0:
            raise ValueError(f"Document {document_id} not found in any batch")
remove_document_from_batch
remove_document_from_batch(batch_id, document_id)

Removes a document from a performance batch by deleting the record from the DOCUMENT_PARSING.BATCH_DOCUMENTS table.

Parameters:

Name Type Description Default
batch_id int

ID of the batch to remove the document from

required
document_id str

ID of the document to remove from the batch

required

Raises:

Type Description
ValueError

If batch_id is invalid

Source code in components/documents/public/business_logic/batches/actions.py
@obs.api_call()
def remove_document_from_batch(
    batch_id: int,
    document_id: str,
) -> None:
    """
    Removes a document from a performance batch by deleting the record from
    the DOCUMENT_PARSING.BATCH_DOCUMENTS table.

    Args:
        batch_id: ID of the batch to remove the document from
        document_id: ID of the document to remove from the batch

    Raises:
        ValueError: If batch_id is invalid
    """
    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        _get_batch_or_raise(batch_id)

        # Delete the document from the batch_documents table
        cursor.execute(
            """
                DELETE FROM document_parsing.batch_documents
                WHERE batch_id = %s AND document_id = %s
                """,
            (batch_id, str(document_id)),
        )

queries

get_all_batches
get_all_batches()

Retrieves all performance batches from Turing.

Returns:

Type Description
list[Batch]

List of dictionaries containing batch information

Source code in components/documents/public/business_logic/batches/queries.py
@obs.api_call()
def get_all_batches() -> list[Batch]:
    """
    Retrieves all performance batches from Turing.

    Returns:
        List of dictionaries containing batch information
    """
    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        cursor.execute(
            """
                SELECT pb.id, pb.name, pb.created_at, COUNT(bd.document_id)
                FROM document_parsing.performance_batches pb
                LEFT JOIN document_parsing.batch_documents bd ON pb.id = bd.batch_id
                GROUP BY pb.id, pb.name, pb.created_at
                ORDER BY pb.created_at DESC
                """
        )

        batches = [
            Batch(
                id=row[0],
                name=row[1],
                created_at=row[2],
                documents_count=row[3],
            )
            for row in cursor
        ]

        return batches
get_batch_by_id
get_batch_by_id(batch_id)

Retrieves a specific performance batch from Turing by its ID.

Parameters:

Name Type Description Default
batch_id int

ID of the batch to retrieve

required

Returns:

Type Description
Batch

Dictionary containing batch information

Raises:

Type Description
ValueError

If batch with the specified ID is not found

Source code in components/documents/public/business_logic/batches/queries.py
@obs.api_call()
def get_batch_by_id(batch_id: int) -> Batch:
    """
    Retrieves a specific performance batch from Turing by its ID.

    Args:
        batch_id: ID of the batch to retrieve

    Returns:
        Dictionary containing batch information

    Raises:
        ValueError: If batch with the specified ID is not found
    """
    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        # Get batch information
        cursor.execute(
            """
                SELECT pb.id, pb.name, pb.created_at, COUNT(bd.document_id)
                FROM document_parsing.performance_batches pb
                LEFT JOIN document_parsing.batch_documents bd ON pb.id = bd.batch_id
                WHERE pb.id = %s
                GROUP BY pb.id, pb.name, pb.created_at
                """,
            (batch_id,),
        )

        row = cursor.fetchone()
        if not row:
            raise ValueError(f"Batch with ID {batch_id} not found")

        batch = Batch(
            id=row[0],
            name=row[1],
            created_at=row[2],
            documents_count=row[3],
        )

        return batch
get_documents_by_batch
get_documents_by_batch(batch_id)

Retrieves all documents belonging to a specific batch.

Parameters:

Name Type Description Default
batch_id int

The batch identifier to filter documents by

required

Returns:

Type Description
list[BatchDocumentInfo]

List of dictionaries containing document information

Source code in components/documents/public/business_logic/batches/queries.py
@obs.api_call()
def get_documents_by_batch(batch_id: int) -> list[BatchDocumentInfo]:
    """
    Retrieves all documents belonging to a specific batch.

    Args:
        batch_id: The batch identifier to filter documents by

    Returns:
        List of dictionaries containing document information
    """
    with (
        turing_connection() as connection,  # type: ignore[no-untyped-call]
        connection.cursor() as cursor,
    ):
        # Get documents from the batch_documents table with all needed fields
        cursor.execute(
            """
                SELECT document_id, is_validated, external_id, stack, document_type
                FROM document_parsing.batch_documents
                WHERE batch_id = %s
                """,
            (batch_id,),
        )

        rows = cursor.fetchall()
        if not rows:
            return []

        # Construct BatchDocumentInfo objects directly from the query results
        document_info_list = [
            BatchDocumentInfo(
                id=row[0],
                is_validated=row[1],
                external_id=row[2],
                stack=row[3],
                document_type=row[4],
            )
            for row in rows
        ]

        return document_info_list

comparison

abstract_performance_runner

AbstractDocProcessingPerformanceTestRunner
AbstractDocProcessingPerformanceTestRunner(
    run_id=None, run_name=None, save=False, job_timeout=1200
)

Bases: ABC, Generic[Entry]

Abstract class for document processing performance test runners.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
def __init__(
    self,
    run_id: typing.Optional[uuid.UUID] = None,
    run_name: typing.Optional[str] = None,
    save: bool = False,
    job_timeout: int = 1200,
):
    self.run_id = run_id if run_id else uuid.uuid4()
    self.run_name = run_name if run_name else str(self.run_id).split("-")[-1]
    self.save = save
    self.job_timeout = job_timeout
STEP_NAME class-attribute instance-attribute
STEP_NAME = 'abstract'
dashboard_url
dashboard_url()

Generate a URL to the metabase dashboard where the performance test results can be viewed.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
def dashboard_url(self) -> str:
    """
    Generate a URL to the metabase dashboard where the performance test results can be viewed.
    """
    return f"https://metabase.alan.com/dashboard/2321?tab=649&run_id={self.run_id}"
fetch_entry abstractmethod staticmethod
fetch_entry(document_id)

Fetch a single entry from the dataset based on the document ID.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
@staticmethod
@abc.abstractmethod
def fetch_entry(document_id: str) -> Entry:
    """
    Fetch a single entry from the dataset based on the document ID.
    """
    raise NotImplementedError("This method should be overridden in subclasses.")
job_timeout instance-attribute
job_timeout = job_timeout
run_and_report abstractmethod
run_and_report(entry)

Run the processing and generate a report for the given entry.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
@abc.abstractmethod
def run_and_report(
    self,
    entry: Entry,
) -> "PerformanceRunReport":
    """
    Run the processing and generate a report for the given entry.
    """
    raise NotImplementedError("This method should be overridden in subclasses.")
run_async
run_async(entry)

Enqueue the performance tests for the given entry to be run asynchronously. This method will create a claim engine job that can be run in the background, allowing for parallel processing of multiple entries.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
def run_async(
    self,
    entry: Entry,
) -> AlanJob:
    """
    Enqueue the performance tests for the given entry to be run asynchronously.
    This method will create a claim engine job that can be run in the background, allowing for parallel processing of multiple entries.
    """
    return enqueue_claim_engine_job(
        AbstractDocProcessingPerformanceTestRunner.run_enqueueable,
        runner_cls_qualified_name=f"{self.__class__.__module__}.{self.__class__.__name__}",
        run_id=self.run_id,
        run_name=self.run_name,
        document_id=entry.document_id,
        job_timeout=self.job_timeout,
    )
run_enqueueable staticmethod
run_enqueueable(
    runner_cls_qualified_name,
    run_id,
    run_name,
    document_id="",
)

Enqueueable function to run the performance tests synchronously because we need a static method to be compatible with our queuing system. We use the runner_cls_qualified_name to dynamically import the right runner class.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
@staticmethod
@enqueueable
@log_job_args(["run_id", "document_id"])
def run_enqueueable(
    runner_cls_qualified_name: str,
    run_id: uuid.UUID,
    run_name: str,
    document_id: str = "",
) -> None:
    """
    Enqueueable function to run the performance tests synchronously because we need a static method to be compatible with our queuing system.
    We use the `runner_cls_qualified_name` to dynamically import the right runner class.
    """
    module_path, class_name = runner_cls_qualified_name.rsplit(".", 1)
    runner_cls = typing.cast(
        "type[AbstractDocProcessingPerformanceTestRunner[Entry]]",
        getattr(import_module(module_path), class_name),
    )
    runner = runner_cls(run_id=run_id, run_name=run_name, save=True)
    entry = runner_cls.fetch_entry(document_id=document_id)
    try:
        runner.run_sync(entry=entry)
    except Exception as e:
        current_logger.error(
            f"Error while running {runner.STEP_NAME} performance tests on document {document_id}: {e}"
        )
        raise e
run_id instance-attribute
run_id = run_id if run_id else uuid4()
run_name instance-attribute
run_name = run_name if run_name else split('-')[-1]
run_sync
run_sync(entry)

Run the performance tests synchronously on the given entry and return a report. This is just a wrapper around run_and_report that takes care of logging and saving the report.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
def run_sync(
    self,
    entry: Entry,
) -> "PerformanceRunReport":
    """
    Run the performance tests synchronously on the given entry and return a report.
    This is just a wrapper around `run_and_report` that takes care of logging and saving the report.
    """
    current_logger.info(
        f"Running {self.STEP_NAME} performance tests on document {entry.document_id} ({entry.category=}, {entry.subcategory=})"
    )
    report = self.run_and_report(entry)
    if self.save:
        PerformanceRunReport.save_to_turing(report)
    current_logger.info(
        report.as_textual_report(f"Report {self.STEP_NAME} performance")
    )
    return report
save instance-attribute
save = save
stable_new_run_id
stable_new_run_id()

Generate a stable run_id based on the current run_id. For an example of usage, see the FrPostprocessingPerformanceTestRunner where we are running 2 performance run at the same time: - one for extraction - one for the postprocessing based on extraction

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
def stable_new_run_id(self) -> uuid.UUID:
    """
    Generate a stable run_id based on the current run_id.
    For an example of usage, see the FrPostprocessingPerformanceTestRunner where we are running 2 performance run at the same time:
    - one for extraction
    - one for the postprocessing based on extraction
    """
    rand_gen = random.Random()  # noqa: S311  # not for security / cryptographic purposes, just simple controlled random "noise"
    rand_gen.seed(self.run_id.bytes)
    return uuid.UUID(int=rand_gen.getrandbits(128))
BasePerformanceRunDatasetEntry

Bases: BaseModel

Base class for a single entry in a performance run dataset. You might want to extend this class to add more fields according to the context of your stack.

Note that the document_id is expected to be a string to be compatible with different ID formats (int for France and uuid in the global stack).

category instance-attribute
category
document_id instance-attribute
document_id
document_type instance-attribute
document_type
keys_to_lowercase classmethod
keys_to_lowercase(data)

Convert all keys in the input data to lowercase in order to make the model case-insensitive. This is useful when the input data will come from a turing query.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
@model_validator(mode="before")
@classmethod
def keys_to_lowercase(cls, data: typing.Any) -> typing.Any:
    """
    Convert all keys in the input data to lowercase in order to make the model case-insensitive.
    This is useful when the input data will come from a turing query.
    """
    if isinstance(data, dict):
        return {k.lower(): v for k, v in data.items()}
    return data
reference_extraction_version instance-attribute
reference_extraction_version
subcategory instance-attribute
subcategory
Entry module-attribute
Entry = TypeVar(
    "Entry", bound=BasePerformanceRunDatasetEntry
)

extraction_performance_runner

DocExtractionPerformanceTestRunner
DocExtractionPerformanceTestRunner(
    run_id=None, run_name=None, save=False, job_timeout=1200
)

Bases: AbstractDocProcessingPerformanceTestRunner[BasePerformanceRunDatasetEntry]

Performance test runner for document extraction step.

Source code in components/documents/public/business_logic/comparison/abstract_performance_runner.py
def __init__(
    self,
    run_id: typing.Optional[uuid.UUID] = None,
    run_name: typing.Optional[str] = None,
    save: bool = False,
    job_timeout: int = 1200,
):
    self.run_id = run_id if run_id else uuid.uuid4()
    self.run_name = run_name if run_name else str(self.run_id).split("-")[-1]
    self.save = save
    self.job_timeout = job_timeout
STEP_NAME class-attribute instance-attribute
STEP_NAME = 'extraction'
fetch_dataset staticmethod
fetch_dataset(document_ids=None, batch_label=None)

Fetch the dataset of entries for the extraction performance test. - If document_ids is provided, only those documents will be fetched. - If batch_label is provided, only documents with that batch label will be fetched. - For now label is taken from the upload metadata of the document, but later we'll use Alexandre work

Source code in components/documents/public/business_logic/comparison/extraction_performance_runner.py
@staticmethod
def fetch_dataset(
    document_ids: typing.Optional[list[str]] = None,
    batch_label: typing.Optional[str] = None,
) -> list["BasePerformanceRunDatasetEntry"]:
    """
    Fetch the dataset of entries for the extraction performance test.
    - If `document_ids` is provided, only those documents will be fetched.
    - If `batch_label` is provided, only documents with that batch label will be fetched.
        - For now label is taken from the upload metadata of the document, but later we'll use Alexandre work
    """
    query = """
    SELECT doc.id::text AS document_id
         , doc.document_type
         , dcr.classification_result->>'category' AS category
         , dcr.classification_result->>'subcategory' AS subcategory
         , der.version AS reference_extraction_version
    FROM documents.document doc
    JOIN documents.document_extraction_result der ON der.document_id = doc.id AND der.dead_branch_at IS NULL
    JOIN documents.document_classification_result dcr ON dcr.id = der.classification_result_id
    JOIN parsed_document_content pdc ON pdc.insurance_document_id = (doc.upload_metadata->>'insurance_document_id')::int AND pdc.dead_branch_at IS NULL
    WHERE true
    """
    parameters: dict[str, typing.Any] = {}
    if document_ids:
        parameters["document_ids"] = document_ids
        query += "AND doc.id::text IN :document_ids "
    if batch_label:
        parameters["batch_label"] = batch_label
        query += "AND doc.upload_metadata->>'batch' = :batch_label"
    rows = current_session.execute(text(query), parameters).mappings().fetchall()
    return [BasePerformanceRunDatasetEntry(**row) for row in rows]
fetch_entry staticmethod
fetch_entry(document_id)

Fetch a single entry from the dataset based on the document ID.

Source code in components/documents/public/business_logic/comparison/extraction_performance_runner.py
@staticmethod
def fetch_entry(document_id: str) -> BasePerformanceRunDatasetEntry:
    """
    Fetch a single entry from the dataset based on the document ID.
    """
    return DocExtractionPerformanceTestRunner.fetch_dataset(
        document_ids=[document_id]
    )[0]
run_and_report
run_and_report(entry)

Run the document extraction and generate a report against the latest validated extraction result.

Source code in components/documents/public/business_logic/comparison/extraction_performance_runner.py
def run_and_report(
    self,
    entry: "BasePerformanceRunDatasetEntry",
) -> "PerformanceRunReport":
    """
    Run the document extraction and generate a report against the latest validated extraction result.
    """
    expected = get_latest_validated_document_extraction(
        uuid.UUID(entry.document_id)
    )
    assert expected is not None, "Expected extraction result should not be None"
    parsing_flow_output = DocumentAutoParsingFlow.trigger_parsing_flow(
        uuid.UUID(entry.document_id),
        commit=False,
    )
    assert parsing_flow_output is not None, "Parsing flow output should not be None"
    actual = parsing_flow_output.document_extraction_result
    report = PerformanceRunReport.from_document_extraction_result(
        runner=self,
        entry=entry,
        actual=actual,
        expected=expected,
    )
    return report

report

PerformanceRunReport dataclass
PerformanceRunReport(
    env,
    commit,
    run_id,
    run_name,
    run_at,
    document_type,
    document_id,
    category,
    subcategory,
    side_by_side_classification,
    side_by_side_reasons_for_review,
    side_by_side_result,
    result_mismatches,
)

Represents a performance run report for document processing. This report includes the results of a performance run against a document: - Some context about the run (e.g. environment, commit, run ID, run name) - Some fact about the document (e.g. type, id, category, subcategory) - Side-by-side comparison of the classification, reasons for review, and extraction results - Mismatches found in the extraction results, meant to be the result of the Turing function pdc_mismatch_report

The 2 main use cases are - To generate a nice textual report of the performance run over a document - To save the report to Turing for further analysis and comparison with other runs.

To build an instance, we provide 2 factory methods helpers: - from_document_extraction_result: to build the report from a document extraction result - from_performance_run_diff: to build the report from a performance run diff (retro-compatibility with the previous implementation)

Note that it doesn't support the rejection content for now.

Mismatch dataclass
Mismatch(
    criticality,
    kind,
    actual_value,
    expected_value,
    path,
    scheme,
)

Bases: DataClassJsonMixin

Used to represent a mismatch in the performance run report. It has the same data structure as the one used Turing function pdc_mismatch_report.

actual_value instance-attribute
actual_value
criticality instance-attribute
criticality
expected_value instance-attribute
expected_value
kind instance-attribute
kind
path instance-attribute
path
scheme instance-attribute
scheme
SideBySideValue
SideBySideValue(expected, actual)

Bases: Generic[T]

A generic wrapper for side-by-side values in the report.

Source code in components/documents/public/business_logic/comparison/report.py
def __init__(self, expected: T, actual: T):
    self.expected = expected
    self.actual = actual
actual instance-attribute
actual = actual
expected instance-attribute
expected = expected
from_field_diffs staticmethod
from_field_diffs(field_diffs)

Factory helper to create a SideBySideValue from a list of FieldDiffs. It's meant to ease the retro-compatibility with the previous PerformanceRunDiff implementation.

Source code in components/documents/public/business_logic/comparison/report.py
@staticmethod
def from_field_diffs(
    field_diffs: list[FieldDiff],
) -> "PerformanceRunReport.SideBySideValue[dict[str, Any]]":
    """
    Factory helper to create a SideBySideValue from a list of FieldDiffs.
    It's meant to ease the retro-compatibility with the previous PerformanceRunDiff implementation.
    """
    expected = {diff.name: diff.expected for diff in field_diffs}
    actual = {diff.name: diff.actual for diff in field_diffs}
    return PerformanceRunReport.SideBySideValue[dict[str, Any]](
        expected=expected, actual=actual
    )
as_textual_report
as_textual_report(report_name=None)

Generate a textual report of the performance run with nice table formatting.

Source code in components/documents/public/business_logic/comparison/report.py
def as_textual_report(self, report_name: typing.Optional[str] = None) -> str:
    """
    Generate a textual report of the performance run with nice table formatting.
    """
    lines: list[str] = []
    report_name = report_name or "Performance Run Report"
    lines.extend(
        line
        for line in [
            f"{report_name} on document {self.document_id} ",
            f"Run ID: {self.run_id}",
            f"Run name: {self.run_name}" if self.run_name else None,
            f"(category={self.category}, subcategory={self.subcategory or 'N/A'})",
            f"Run at: {self.run_at} (env={self.env}, commit={self.commit})",
        ]
        if line is not None
    )

    lines.append("")
    for name, value in [
        ("Expected extraction", self.side_by_side_result.expected),
        ("Actual extraction", self.side_by_side_result.actual),
    ]:
        if value:
            lines.append(f"{name} is")
            lines.append(pprint.pformat(value, width=500, compact=True))
        else:
            lines.append(f"{name} is empty")
        lines.append("-" * 160)

    table = Texttable(max_width=160)
    table.set_deco(Texttable.HEADER | Texttable.VLINES | Texttable.BORDER)
    table.set_cols_dtype(["t", "t", "t", "t"])
    table.set_cols_align(["r", "c", "l", "l"])
    table.add_row(["Field", "Critical", "Expected", "Actual"])
    has_mismatches = False
    for field, criticality, expected_value, actual_value in self._diff_generator():
        table.add_row(
            [
                field,
                "●" if criticality == "critical" else "○",
                expected_value,
                actual_value,
            ]
        )
        has_mismatches = True
    lines.append(table.draw() if has_mismatches else "✅ No mismatches found.")
    return "\n".join(lines) + "\n"
category instance-attribute
category
commit instance-attribute
commit
document_id instance-attribute
document_id
document_type instance-attribute
document_type
env instance-attribute
env
from_document_extraction_result classmethod
from_document_extraction_result(
    actual, expected, runner, entry
)

Create a PerformanceRunReport from the expected and actual document extraction results.

Source code in components/documents/public/business_logic/comparison/report.py
@classmethod
def from_document_extraction_result(
    cls,
    actual: DocumentExtractionResult | None,
    expected: DocumentExtractionResult,
    runner: "AbstractDocProcessingPerformanceTestRunner[Any]",
    entry: "BasePerformanceRunDatasetEntry",
) -> "PerformanceRunReport":
    """
    Create a PerformanceRunReport from the expected and actual document extraction results.
    """
    side_by_side_result = cls.SideBySideValue(
        expected=expected.extraction_result or {},
        actual=actual.extraction_result
        if actual and actual.extraction_result
        else {},
    )
    mismatches = _compute_mismatches_report_in_turing(
        entry=entry, side_by_side_result=side_by_side_result
    )
    return cls(
        env=get_env_name(),
        commit=git_current_commit(),
        run_id=runner.run_id,
        run_name=runner.run_name,
        run_at=datetime.now().isoformat(),
        document_type=entry.document_type,
        document_id=entry.document_id,
        category=entry.category,
        subcategory=entry.subcategory,
        side_by_side_classification=cls.SideBySideValue(
            expected=expected.classification_result.classification_result,  # type: ignore[arg-type]
            actual=actual.classification_result.classification_result  # type: ignore[arg-type]
            if actual
            else {},
        ),
        side_by_side_reasons_for_review=cls.SideBySideValue(
            expected=expected.review_context.get("reasons_for_review", [])
            if expected.review_context
            else [],
            actual=actual.review_context.get("reasons_for_review", [])
            if actual and actual.review_context
            else [],
        ),
        side_by_side_result=cls.SideBySideValue(
            expected=expected.extraction_result or {},
            actual=actual.extraction_result
            if actual and actual.extraction_result
            else {},
        ),
        result_mismatches=mismatches,
    )
from_performance_run_diff classmethod
from_performance_run_diff(diff, runner, entry)

Create a PerformanceRunReport from a PerformanceRunDiff.

Source code in components/documents/public/business_logic/comparison/report.py
@classmethod
def from_performance_run_diff(
    cls,
    diff: PerformanceRunDiff,
    runner: "AbstractDocProcessingPerformanceTestRunner[Any]",
    entry: "BasePerformanceRunDatasetEntry",
) -> "PerformanceRunReport":
    """
    Create a PerformanceRunReport from a PerformanceRunDiff.
    """
    side_by_side_result = cls.SideBySideValue(
        expected=diff.extraction_diff.expected_content or {},
        actual=diff.extraction_diff.actual_content or {},
    )
    mismatches = _compute_mismatches_report_in_turing(
        entry=entry, side_by_side_result=side_by_side_result
    )

    return cls(
        env=get_env_name(),
        commit=git_current_commit(),
        run_id=runner.run_id,
        run_name=runner.run_name,
        run_at=datetime.now().isoformat(),
        document_type=entry.document_type,
        document_id=entry.document_id,
        category=entry.category,
        subcategory=entry.subcategory,
        side_by_side_classification=cls.SideBySideValue.from_field_diffs(
            diff.classification_diff.fields
        ),
        side_by_side_reasons_for_review=cls.SideBySideValue(
            expected=diff.extraction_review_context_diff.expected.reasons_for_review
            if diff.extraction_review_context_diff
            else [],
            actual=diff.extraction_review_context_diff.actual.reasons_for_review
            if diff.extraction_review_context_diff
            else [],
        ),
        side_by_side_result=side_by_side_result,
        result_mismatches=mismatches,
    )
result_mismatches instance-attribute
result_mismatches
run_at instance-attribute
run_at
run_id instance-attribute
run_id
run_name instance-attribute
run_name
save_to_turing classmethod
save_to_turing(report)

Save the performance run report to Turing database.

Source code in components/documents/public/business_logic/comparison/report.py
@classmethod
def save_to_turing(cls, report: "PerformanceRunReport") -> None:
    """
    Save the performance run report to Turing database.
    """
    with turing_connection() as connection:  # type: ignore[no-untyped-call]
        with connection.cursor() as cursor:
            cursor.execute(
                """
                INSERT INTO document_parsing.performance_run(
                    run_id,
                    type,
                    document_id,
                    run_name,
                    env,
                    run_at,
                    commit,
                    category,
                    subcategory,
                    reference_extraction_version,
                    category_expected,
                    category_actual,
                    category_is_equal,
                    subcategory_expected,
                    subcategory_actual,
                    subcategory_is_equal,
                    is_validated,
                    extraction_result_expected,
                    extraction_result_actual,
                    extraction_reasons_for_review,
                    extraction_rejection_content_expected,
                    extraction_rejection_content_actual,
                    rejection_content_is_equal,
                    classification_result_expected,
                    classification_result_actual,
                    mismatch_report)
                SELECT %(run_id)s,
                       %(document_type)s,
                       %(document_id)s,
                       %(run_name)s,
                       %(env)s,
                       %(run_at)s,
                       %(commit)s,
                       %(category)s,
                       %(subcategory)s,
                       %(reference_extraction_version)s,
                       %(category_expected)s,
                       %(category_actual)s,
                       %(category_expected)s = %(category_actual)s,
                       %(subcategory_expected)s,
                       %(subcategory_actual)s,
                       %(subcategory_expected)s = %(subcategory_actual)s,
                       %(is_validated)s,
                       PARSE_JSON(%(extraction_result_expected)s),
                       PARSE_JSON(%(extraction_result_actual)s),
                       PARSE_JSON(%(extraction_reasons_for_review)s),
                       PARSE_JSON(%(extraction_rejection_content_expected)s),
                       PARSE_JSON(%(extraction_rejection_content_actual)s),
                       %(rejection_content_is_equal)s,
                       PARSE_JSON(%(classification_result_expected)s),
                       PARSE_JSON(%(classification_result_actual)s),
                       func.pdc_mismatch_report(%(document_type)s, %(category)s, PARSE_JSON(%(extraction_result_expected)s), PARSE_JSON(%(extraction_result_actual)s))
                """,
                {
                    # primary keys
                    "run_id": str(report.run_id),
                    "document_type": report.document_type.value,
                    "document_id": report.document_id,
                    # context
                    "run_name": report.run_name,
                    "env": report.env,
                    "run_at": report.run_at,
                    "commit": report.commit,
                    "category": report.category,
                    "subcategory": report.subcategory or "",
                    "reference_extraction_version": 1,  # dummy value as version is not used
                    # general results
                    "category_expected": report.side_by_side_classification.expected.get(
                        "category", ""
                    ),
                    "category_actual": report.side_by_side_classification.actual.get(
                        "category", ""
                    ),
                    "subcategory_expected": report.side_by_side_classification.expected.get(
                        "subcategory", ""
                    )
                    or "",
                    "subcategory_actual": report.side_by_side_classification.actual.get(
                        "subcategory", ""
                    )
                    or "",
                    "is_validated": report.side_by_side_reasons_for_review.actual
                    == [],
                    "extraction_result_expected": (
                        json.dumps(
                            report.side_by_side_result.expected,
                            default=_uuid_serializer,
                        )
                    ),
                    "extraction_result_actual": (
                        json.dumps(
                            report.side_by_side_result.actual,
                            default=_uuid_serializer,
                        )
                    ),
                    "extraction_reasons_for_review": (
                        json.dumps(
                            report.side_by_side_reasons_for_review.actual,
                        )
                    ),
                    "extraction_rejection_content_expected": "{}",
                    "extraction_rejection_content_actual": "{}",
                    "rejection_content_is_equal": True,
                    "classification_result_expected": (
                        json.dumps(
                            report.side_by_side_classification.expected,
                        )
                    ),
                    "classification_result_actual": (
                        json.dumps(
                            report.side_by_side_classification.actual,
                        )
                    ),
                },
            )
side_by_side_classification instance-attribute
side_by_side_classification
side_by_side_reasons_for_review instance-attribute
side_by_side_reasons_for_review
side_by_side_result instance-attribute
side_by_side_result
subcategory instance-attribute
subcategory
T module-attribute
T = TypeVar('T')

document

actions

delete_document
delete_document(document_id, commit=True)

Hard deletes a global document including: - The Document DB record (cascades to all related records via FK CASCADE) - The physical S3 file

This is typically called when a corresponding FR insurance document is tombstoned.

Source code in components/documents/public/business_logic/document/actions.py
@obs.api_call()
def delete_document(
    document_id: DocumentId,
    commit: bool = True,
) -> None:
    """
    Hard deletes a global document including:
    - The Document DB record (cascades to all related records via FK CASCADE)
    - The physical S3 file

    This is typically called when a corresponding FR insurance document is tombstoned.
    """
    from shared.helpers.logging.logger import current_logger

    document = current_session.get(Document, document_id)

    if not document:
        current_logger.warning(f"Document {document_id} not found for deletion")
        return

    # Delete the physical S3 file first (before DB deletion)
    document.delete_file()
    current_logger.info(f"Deleted S3 file for global document {document_id}")

    # Use bulk_delete to bypass ORM relationship management and let CASCADE work
    from sqlalchemy import delete

    current_session.execute(delete(Document).where(Document.id == document_id))

    if commit:
        current_session.commit()
        current_logger.info(f"Hard deleted global document {document_id}")
upload_document
upload_document(
    uploader_ref,
    document_type,
    file,
    upload_metadata=None,
    trigger_parsing_flow=True,
    commit=True,
)

Uploads a documents and store it in S3. This method triggers the document parsing flow if a configuration is registered.

Source code in components/documents/public/business_logic/document/actions.py
@obs.api_call()
def upload_document(
    uploader_ref: str,
    document_type: DocumentType,
    file: FileStorage | IO[Any],
    upload_metadata: dict[str, Any] | None = None,
    trigger_parsing_flow: bool = True,
    commit: bool = True,
) -> DocumentId:
    """
    Uploads a documents and store it in S3. This method triggers the document parsing flow if a configuration is registered.
    """
    with no_commit_in_session(commit_at_end=commit, rollback_at_end=False):
        filename = file.filename if isinstance(file, FileStorage) else None
        with converted_file_mimetype_and_hash(file_name=filename, file=file) as (
            content,
            content_type,
            content_hash,
        ):
            # TODO: check if the document is already uploaded before creating a new one
            document = Document(
                uploader_ref=uploader_ref,
                document_type=document_type,
                content_hash=content_hash,
                mime_type=content_type,
                upload_metadata=upload_metadata,
            )
            current_session.add(document)
            document.upload_file(content)

        current_session.flush()

        # Trigger the parsing flow if needed
        if (
            trigger_parsing_flow
            and DocumentParsingConfigurationRegistry.get_configuration(document_type)
        ):
            current_rq.get_queue(DOCUMENT_PARSING_QUEUE).enqueue_in(
                # We want to process the parsing at least 5 seconds after to make sure the transaction is committed and the document is created
                time_delta=timedelta(seconds=5 if not commit else 0),
                func=DocumentAutoParsingFlow.trigger_parsing_flow,
                document_id=document.id,
                retry=Retry(max=2, interval=60),
            )
        return document.id

queries

get_document_content
get_document_content(document_id)

Get the content of the document

Source code in components/documents/public/business_logic/document/queries.py
def get_document_content(document_id: DocumentId) -> DocumentContent:
    """
    Get the content of the document
    """
    document = get_or_raise_missing_resource(Document, document_id)
    return DocumentContent(
        file=document.get_or_download_file(),  # type: ignore[arg-type]
        filename=document.filename,
        content_type=document.mime_type,
    )
get_document_info
get_document_info(document_id)

Get the content type of the document

Source code in components/documents/public/business_logic/document/queries.py
def get_document_info(document_id: DocumentId) -> DocumentInfo:
    """
    Get the content type of the document
    """
    document = get_or_raise_missing_resource(Document, document_id)
    return DocumentInfo(
        document_type=document.document_type,
        uploader_ref=document.uploader_ref,
        created_at=document.created_at,
        filename=document.filename,
        content_type=document.mime_type,
        content_hash=document.content_hash,
        upload_metadata=document.upload_metadata or {},
        country=get_country_for_document_type(document.document_type),
    )
get_temporary_download_url
get_temporary_download_url(document_id)

Get a temporary URL to download the document

Source code in components/documents/public/business_logic/document/queries.py
def get_temporary_download_url(document_id: DocumentId) -> str:
    """
    Get a temporary URL to download the document
    """
    from shared.helpers.storage.backend.s3 import get_presigned_url

    document = get_or_raise_missing_resource(Document, document_id)
    return get_presigned_url(
        mandatory(document.uri),
        expires_in_seconds=60 * 5,  # 5 minutes
    )

document_handler

base_document_handler

BaseDocumentHandler

Bases: ABC

Class for handling document-related retrieval operations. This class is used to fetch documents and their related entities from the database while keeping the documents module agnostic of the database models.

document_type instance-attribute
document_type
get_document abstractmethod
get_document(document_id)

Fetches the document from the appropriate table.

Source code in components/documents/public/business_logic/document_handler/base_document_handler.py
@abstractmethod
def get_document(self, document_id: str) -> TranscriptibleDocument:
    """
    Fetches the document from the appropriate table.
    """
    ...
get_document_expected_output abstractmethod
get_document_expected_output(document_id)

Fetches the structured output of a document from the appropriate table. Return None if the document is not yet parsed.

Source code in components/documents/public/business_logic/document_handler/base_document_handler.py
@abstractmethod
def get_document_expected_output(
    self, document_id: str
) -> DocumentExpectedOutput | None:
    """
    Fetches the structured output of a document from the appropriate table.
    Return None if the document is not yet parsed.
    """
    ...
get_document_markdown_transcription abstractmethod
get_document_markdown_transcription(document_id)

Fetches the document markdown transcription of the document from the appropriate table.

Source code in components/documents/public/business_logic/document_handler/base_document_handler.py
@abstractmethod
def get_document_markdown_transcription(
    self, document_id: str
) -> MarkdownTranscription:
    """
    Fetches the document markdown transcription of the document from the appropriate table.
    """
    ...

default_document_handler

DocumentHandler
DocumentHandler(document_type)

Bases: BaseDocumentHandler

Document handler implementation for the Document modular monolith data model

Source code in components/documents/public/business_logic/document_handler/default_document_handler.py
def __init__(self, document_type: DocumentType) -> None:
    self.document_type = document_type
document_type instance-attribute
document_type = document_type
get_document
get_document(document_id)
Source code in components/documents/public/business_logic/document_handler/default_document_handler.py
@override
def get_document(self, document_id: str) -> TranscriptibleDocument:
    document = current_session.get(Document, uuid.UUID(document_id))
    if document is None:
        raise ValueError(f"Document with id {document_id} not found")
    return S3Document(uri=document.uri)  # type: ignore[arg-type]
get_document_expected_output
get_document_expected_output(document_id)
Source code in components/documents/public/business_logic/document_handler/default_document_handler.py
@override
def get_document_expected_output(
    self, document_id: str
) -> DocumentExpectedOutput | None:
    document_extraction = (
        current_session.query(DocumentExtractionResult)  # noqa: ALN085
        .filter(
            DocumentExtractionResult.document_id == uuid.UUID(document_id),
            DocumentExtractionResult.validation_status
            == StepValidationStatus.validated,
            DocumentExtractionResult.dead_branch_at.is_(None),
        )
        .order_by(DocumentExtractionResult.version.desc())
        .first()
    )
    if document_extraction is None:
        return None
    return DocumentExpectedOutput(
        id=document_extraction.id,  # type: ignore[arg-type]
        content=document_extraction.extraction_result,  # type: ignore[arg-type]
    )
get_document_markdown_transcription
get_document_markdown_transcription(document_id)
Source code in components/documents/public/business_logic/document_handler/default_document_handler.py
@override
def get_document_markdown_transcription(
    self, document_id: str
) -> MarkdownTranscription:
    document_transcription: DocumentTranscriptionResult | None = (
        get_latest_validated_document_transcription(uuid.UUID(document_id))
    )
    if document_transcription is None:
        raise ValueError(
            f"Document with id {document_id} has no validated transcription"
        )
    markdown_transcription = document_transcription.get_markdown_transcription()
    if markdown_transcription is None:
        raise ValueError(
            f"Document with id {document_id} has no markdown transcription"
        )
    return markdown_transcription  # type: ignore[no-any-return] # mypy struggling with the return type

embedding

actions

delete_document_embedding
delete_document_embedding(document_type, document_id)

Delete a document embedding.

Source code in components/documents/public/business_logic/embedding/actions.py
@obs.api_call()
def delete_document_embedding(document_type: DocumentType, document_id: str) -> None:
    """
    Delete a document embedding.
    """
    current_session.query(DocumentEmbedding).filter(  # noqa: ALN085
        DocumentEmbedding.type == document_type,
        DocumentEmbedding.document_id == document_id,
    ).delete()
delete_document_embeddings
delete_document_embeddings(type)

Delete all document embeddings for a given type.

Source code in components/documents/public/business_logic/embedding/actions.py
@obs.api_call()
def delete_document_embeddings(
    type: DocumentType,  # noqa: A002
) -> None:
    """
    Delete all document embeddings for a given type.
    """
    current_session.query(DocumentEmbedding).filter(  # noqa: ALN085
        DocumentEmbedding.type == type,
    ).delete(synchronize_session=False)
index_document
index_document(type, document, embedding_algorithm)

Index a single document. If the document already exists (unicity contraint on type/id), it will be updated.

Note: document embeddings are segregated by the type column to differentiate them (fr insurance docs vs github issues vs guarantees, etc)

Source code in components/documents/public/business_logic/embedding/actions.py
@obs.api_call()
def index_document(
    type: DocumentType,  # noqa: A002
    document: DocumentEmbeddingData,
    embedding_algorithm: EmbeddingAlgorithm,
) -> None:
    """
    Index a single document.
    If the document already exists (unicity contraint on type/id), it will be updated.

    Note: document embeddings are segregated by the type column to differentiate them (fr insurance docs vs github issues vs guarantees, etc)
    """
    embedding_function = get_embedding_function(embedding_algorithm)
    text_embedding = embedding_function([document.text])[0]
    embedding_column = get_embedding_column(embedding_algorithm)

    document_embedding = (
        current_session.query(DocumentEmbedding)  # noqa: ALN085
        .filter(
            DocumentEmbedding.type == type,
            DocumentEmbedding.document_id == document.id,
        )
        .one_or_none()
    )

    _upsert_document_embedding(
        document_embedding=document_embedding,
        type=type,
        document=document,
        embedding_column=embedding_column.name,
        text_embedding=text_embedding,
    )
index_documents
index_documents(type, documents, embedding_algorithm)

Index multiple documents. If a document already exists (unicity contraint on type/id), it will be updated.

Source code in components/documents/public/business_logic/embedding/actions.py
@obs.api_call()
def index_documents(
    type: DocumentType,  # noqa: A002
    documents: list[DocumentEmbeddingData],
    embedding_algorithm: EmbeddingAlgorithm,
) -> None:
    """
    Index multiple documents.
    If a document already exists (unicity contraint on type/id), it will be updated.
    """
    embedding_function = get_embedding_function(embedding_algorithm)
    text_embeddings = embedding_function([d.text for d in documents])
    embedding_column = get_embedding_column(embedding_algorithm)

    document_embeddings = (
        current_session.query(DocumentEmbedding)  # noqa: ALN085
        .filter(
            DocumentEmbedding.type == type,
            DocumentEmbedding.document_id.in_([d.id for d in documents]),
        )
        .all()
    )
    document_embeddings_by_id = {d.document_id: d for d in document_embeddings}

    for document, text_embedding in zip(documents, text_embeddings):
        document_embedding = document_embeddings_by_id.get(document.id)

        _upsert_document_embedding(
            document_embedding=document_embedding,
            type=type,
            document=document,
            embedding_column=embedding_column.name,
            text_embedding=text_embedding,
        )

queries

Embedding module-attribute
Embedding = Embedding
MetadataFilterBuilder module-attribute
MetadataFilterBuilder = Callable[
    [Column],
    list[BinaryExpression] | list[BooleanClauseList],
]
SimilarDocument dataclass
SimilarDocument(id, text, distance, metadata=dict())
distance instance-attribute
distance
id instance-attribute
id
metadata class-attribute instance-attribute
metadata = field(default_factory=dict)
text instance-attribute
text
count_indexed_documents
count_indexed_documents(type, metadata_filter=None)
Source code in components/documents/public/business_logic/embedding/queries.py
@obs.api_call()
def count_indexed_documents(  # noqa: D103
    type: DocumentType,  # noqa: A002
    metadata_filter: Optional[MetadataFilterBuilder] = None,
) -> int:
    return (
        current_session.query(DocumentEmbedding)  # noqa: ALN085
        .filter(
            DocumentEmbedding.type == type,
            *(
                metadata_filter(DocumentEmbedding.__table__.c.document_metadata)
                if metadata_filter
                else []
            ),
        )
        .count()
    )
fetch_text_embedding
fetch_text_embedding(
    embedding_algorithm, text, max_text_length=None
)

Get the embedding for the given text using the specified embedding algorithm.

Source code in components/documents/public/business_logic/embedding/queries.py
@obs.api_call()
def fetch_text_embedding(
    embedding_algorithm: EmbeddingAlgorithm,
    text: str,
    max_text_length: int | None = None,
) -> Embedding:
    """
    Get the embedding for the given text using the specified embedding algorithm.
    """
    sub_text = text[:max_text_length] if max_text_length is not None else text

    embedding_function = get_embedding_function(embedding_algorithm)
    text_embedding = embedding_function([sub_text])[0]

    return text_embedding
find_documents_by_metadata_filter
find_documents_by_metadata_filter(
    document_type, metadata_filter=None
)

Find documents by metadata filter. <!> The metadata column is not indexed for now, so this query can be expensive.

:param document_type: The type of the document to find. :param metadata_filter: The metadata filter to apply. :return: The list of documents matching the metadata filter.

Source code in components/documents/public/business_logic/embedding/queries.py
@obs.api_call()
def find_documents_by_metadata_filter(
    document_type: DocumentType,
    metadata_filter: Optional[MetadataFilterBuilder] = None,
) -> list[SimilarDocument]:
    """
    Find documents by metadata filter.
    <!> The metadata column is not indexed for now, so this query can be expensive.

    :param document_type: The type of the document to find.
    :param metadata_filter: The metadata filter to apply.
    :return: The list of documents matching the metadata filter.
    """
    document_embeddings_query = (
        current_session.query(DocumentEmbedding)  # noqa: ALN085
        .with_entities(
            DocumentEmbedding.document_id,
            DocumentEmbedding.text.label(
                "text"
            ),  # tmp workaround https://alanhealth.slack.com/archives/C19FZEB41/p1694158377737499?thread_ts=1694099340.159699&cid=C19FZEB41
            DocumentEmbedding.document_metadata,
        )
        .filter(
            DocumentEmbedding.type == document_type,
            *(
                metadata_filter(DocumentEmbedding.__table__.c.document_metadata)
                if metadata_filter
                else []
            ),
        )
    )

    return [
        SimilarDocument(
            id=d.document_id,
            text=d.text,
            distance=0,  # No distance is computed here
            metadata=d.document_metadata,
        )
        for d in document_embeddings_query
    ]
find_similar_documents
find_similar_documents(
    type,
    text,
    embedding_algorithm,
    exclude_document_id=None,
    restrict_to_document_ids=None,
    metadata_filter=None,
    n_results=10,
    doc_id_to_try_to_fetch_text_embeddings_from_db=None,
    use_approximate_search=False,
)

Find similar documents to the given text. <!> The search is only performed on documents that have been indexed with the SAME embedding algorithm.

:param type: The type of the document to find similar documents to. :param text: The text to find similar documents to. :param embedding_algorithm: The embedding algorithm to use. :param exclude_document_id: The document id to exclude from the results (useful to not return the same document). :param restrict_to_document_ids: If set, will restrict the search to these document ids. Useful when you have a fixed set of reference documents :param metadata_filter: A function that takes the document metadata and returns a list of SQLAlchemy filters to apply. :param n_results: The number of results to return. :param doc_id_to_try_to_fetch_text_embeddings_from_db: If None, we will compute the embedding of the text we search for. If set to a doc id, we will first attempt to fetch the text embedding from the DB. :param use_approximate_search: If True, will use HNSW index to make a faster and approximate matching (cf https://github.com/pgvector/pgvector?tab=readme-ov-file#hnsw ⧉).

Source code in components/documents/public/business_logic/embedding/queries.py
@obs.api_call()
def find_similar_documents(
    type: DocumentType,  # noqa: A002
    text: str,
    embedding_algorithm: EmbeddingAlgorithm,
    exclude_document_id: Optional[str] = None,
    restrict_to_document_ids: Optional[list[str]] = None,
    metadata_filter: Optional[MetadataFilterBuilder] = None,
    n_results: int = 10,
    doc_id_to_try_to_fetch_text_embeddings_from_db: Optional[str] = None,
    use_approximate_search: bool = False,
) -> list[SimilarDocument]:
    """
    Find similar documents to the given text.
    <!> The search is only performed on documents that have been indexed with the SAME embedding algorithm.

    :param type: The type of the document to find similar documents to.
    :param text: The text to find similar documents to.
    :param embedding_algorithm: The embedding algorithm to use.
    :param exclude_document_id: The document id to exclude from the results (useful to not return the same document).
    :param restrict_to_document_ids: If set, will restrict the search to these document ids. Useful when you have a fixed set of reference documents
    :param metadata_filter: A function that takes the document metadata and returns a list of SQLAlchemy filters to apply.
    :param n_results: The number of results to return.
    :param doc_id_to_try_to_fetch_text_embeddings_from_db: If None, we will compute the embedding of the text we search for.
     If set to a doc id, we will first attempt to fetch the text embedding from the DB.
    :param use_approximate_search: If True, will use HNSW index to make a faster and approximate matching
     (cf https://github.com/pgvector/pgvector?tab=readme-ov-file#hnsw).
    """
    embedding_column = get_embedding_column(embedding_algorithm, use_approximate_search)
    text_embedding = None
    if doc_id_to_try_to_fetch_text_embeddings_from_db is not None:
        text_embedding = _fetch_embedding(
            embedding_column, doc_id_to_try_to_fetch_text_embeddings_from_db, type
        )
    if text_embedding is None:
        text_embedding = _compute_embedding(embedding_algorithm, text)

    document_embeddings: list[DocumentEmbedding] = (
        current_session.query(DocumentEmbedding)  # noqa: ALN085
        .with_entities(
            DocumentEmbedding.document_id,
            DocumentEmbedding.text.label(
                "text"
            ),  # tmp workaround https://alanhealth.slack.com/archives/C19FZEB41/p1694158377737499?thread_ts=1694099340.159699&cid=C19FZEB41
            embedding_column.l2_distance(text_embedding).label("distance"),
            DocumentEmbedding.document_metadata,
        )
        .filter(
            DocumentEmbedding.type == type,
            DocumentEmbedding.document_id != exclude_document_id,
            embedding_column.isnot(None),
            *(
                metadata_filter(DocumentEmbedding.__table__.c.document_metadata)
                if metadata_filter
                else []
            ),
            *(
                [DocumentEmbedding.document_id.in_(restrict_to_document_ids)]
                if restrict_to_document_ids is not None
                else []
            ),
        )
        .order_by(
            embedding_column.l2_distance(text_embedding)
        )  # TODO OMA use the aliased entity
        .limit(n_results)
        .all()
    )

    return [
        SimilarDocument(
            id=d.document_id,
            distance=d.distance,  # type: ignore[attr-defined] # dynamic column created in the query
            text=d.text,
            metadata=d.document_metadata,
        )
        for d in document_embeddings
    ]

extraction

extraction_logic

ExtractionInstructionPresenter

Bases: BaseModel

A wrapper to present the instruction to extract fields from a document. It's purely a presentation layer to make the jinja template easier to write/read.

example_raw_content
example_raw_content()

Build an example of extraction result content.

Source code in components/documents/public/business_logic/extraction/extraction_logic.py
def example_raw_content(self) -> dict[str, Any]:
    """
    Build an example of extraction result content.
    """
    raw_content = {}
    for field_extraction_instruction in self.field_extraction_instructions:
        raw_content[field_extraction_instruction.field_name] = (
            field_extraction_instruction.example_extracted_value()
        )
    return raw_content
field_extraction_instructions instance-attribute
field_extraction_instructions
from_extraction_result_model classmethod
from_extraction_result_model(extraction_result_model)

Recursively build ExtractionInstructionPresenter objects from a Pydantic model.

Source code in components/documents/public/business_logic/extraction/extraction_logic.py
@classmethod
def from_extraction_result_model(
    cls, extraction_result_model: type[BaseModel]
) -> "ExtractionInstructionPresenter":
    """
    Recursively build ExtractionInstructionPresenter objects from a Pydantic model.
    """
    field_extraction_instructions = []
    fields = extraction_result_model.model_fields

    for field_name, field_info in fields.items():
        # Skip fields without a title
        if not hasattr(field_info, "title") or not field_info.title:
            continue

        # Extract location and typical mistakes
        location = None
        typical_mistakes = None
        extraction_field_config = ExtractionFieldConfig.from_field_info(field_info)
        if extraction_field_config:
            location = extraction_field_config.llm_guidance.location
            typical_mistakes = extraction_field_config.llm_guidance.typical_mistakes

        # Get description
        description = (
            field_info.description if hasattr(field_info, "description") else ""
        )

        # Get examples
        examples = field_info.examples if hasattr(field_info, "examples") else []

        # Default values
        is_multiple = False
        sub_field_extraction_logics = []
        is_required = True  # Default to required
        possible_values = None

        # Get the field type and determine if it's a list, union, or direct model
        field_type = field_info.annotation
        origin = get_origin(field_type)
        clean_field_type: Any = field_type  # Initialize with the original type

        # Process the field type to extract model types and determine if it's a list
        model_types = []

        # Check if the field is optional (Union with None)
        if origin is Union or (
            hasattr(types, "UnionType") and isinstance(field_type, types.UnionType)
        ):
            args = get_args(field_type)
            # Check if None or NoneType is in the union
            if type(None) in args:
                is_required = False
                # Extract the non-None type(s)
                non_none_types = [t for t in args if t is not type(None)]
                if len(non_none_types) == 1:
                    # If there's only one non-None type, use it as the clean type
                    clean_field_type = non_none_types[0]
                else:
                    # If there are multiple non-None types, create a new Union
                    clean_field_type = Union[tuple(non_none_types)]

        # Update field_type to the clean version for further processing
        field_type = clean_field_type
        origin = get_origin(field_type)

        # Case 1: Field is a list
        if origin is list:
            is_multiple = True
            item_type = get_args(field_type)[0]

            # Check if the list item is a Union
            item_origin = get_origin(item_type)
            if item_origin is Union or (
                hasattr(types, "UnionType")
                and isinstance(item_type, types.UnionType)
            ):
                # Extract all BaseModel types from the union
                for t in get_args(item_type):
                    if isinstance(t, type) and issubclass(t, BaseModel):
                        model_types.append(t)
            # Direct model in list
            elif isinstance(item_type, type) and issubclass(item_type, BaseModel):
                model_types.append(item_type)
            # Enum in list
            elif isinstance(item_type, type) and issubclass(item_type, Enum):
                possible_values = [item.value for item in item_type]

        # Case 2: Field is a Union (but not with None, as we handled that above)
        elif origin is Union or (
            hasattr(types, "UnionType") and isinstance(field_type, types.UnionType)
        ):
            for t in get_args(field_type):
                # Check if it's a list within a union
                t_origin = get_origin(t)
                if t_origin is list:
                    is_multiple = True
                    list_item_type = get_args(t)[0]

                    # Handle nested unions in the list
                    list_item_origin = get_origin(list_item_type)
                    if list_item_origin is Union or (
                        hasattr(types, "UnionType")
                        and isinstance(list_item_type, types.UnionType)
                    ):
                        for inner_t in get_args(list_item_type):
                            if isinstance(inner_t, type) and issubclass(
                                inner_t, BaseModel
                            ):
                                model_types.append(inner_t)
                    # Direct model in list
                    elif isinstance(list_item_type, type) and issubclass(
                        list_item_type, BaseModel
                    ):
                        model_types.append(list_item_type)
                # Direct model in union
                elif isinstance(t, type) and issubclass(t, BaseModel):
                    model_types.append(t)
                # Enum in union
                elif isinstance(t, type) and issubclass(t, Enum):
                    if possible_values is None:
                        possible_values = []
                    possible_values.extend([item.value for item in t])

        # Case 3: Direct model reference
        elif isinstance(field_type, type) and issubclass(field_type, BaseModel):
            model_types.append(field_type)

        # Case 4: Enum reference
        elif isinstance(field_type, type) and issubclass(field_type, Enum):
            possible_values = [item.value for item in field_type]

        # Process all found model types to extract sub-fields
        for model_type in model_types:
            sub_logic = ExtractionInstructionPresenter.from_extraction_result_model(
                model_type
            )
            sub_field_extraction_logics.extend(
                sub_logic.field_extraction_instructions
            )

        # Create the extraction logic object
        field_extraction_instruction = FieldExtractionInstructionPresenter(
            field_name=field_name,
            title=field_info.title,
            field_type=clean_field_type,
            is_required=is_required,
            description=description,
            location=location,
            examples=examples,
            typical_mistakes=typical_mistakes,
            is_multiple=is_multiple,
            possible_values=possible_values,
            sub_fields=sub_field_extraction_logics,
        )

        field_extraction_instructions.append(field_extraction_instruction)

    return ExtractionInstructionPresenter(
        field_extraction_instructions=field_extraction_instructions
    )
FieldExtractionInstructionPresenter

Bases: BaseModel

A wrapper to present the instruction to extract a single field. It's purely a presentation layer to make the jinja template easier to write/read.

description class-attribute instance-attribute
description = None
example_extracted_value
example_extracted_value()

Build an example of value extracted for this field.

Source code in components/documents/public/business_logic/extraction/extraction_logic.py
def example_extracted_value(self) -> Any:
    """
    Build an example of value extracted for this field.
    """
    example_value: Any = "..."
    if self.sub_fields:
        example_value = {}
        for sub_field in self.sub_fields:
            example_value[sub_field.field_name] = (
                sub_field.example_extracted_value()
            )
    elif self.possible_values:
        # Use first possible value as example
        example_value = self.possible_values[0]
    else:
        if self.field_type is int:
            example_value = 12345
        elif self.field_type is float:
            example_value = 123.45
        elif self.field_type is bool:
            example_value = True
        elif self.field_type is datetime.date:
            example_value = "XXXX-XX-XX"
        elif self.field_type is datetime.datetime:
            example_value = "XXXX-XX-XX XX:XX:XX"
        elif self.field_type is str:
            example_value = "Example text"
        else:
            example_value = "..."

    if self.is_multiple:
        example_value = [example_value]
    return example_value
examples class-attribute instance-attribute
examples = None
field_name instance-attribute
field_name
field_type class-attribute instance-attribute
field_type = None
is_multiple class-attribute instance-attribute
is_multiple = False
is_required class-attribute instance-attribute
is_required = True
location class-attribute instance-attribute
location = None
possible_values class-attribute instance-attribute
possible_values = None
sub_fields class-attribute instance-attribute
sub_fields = []
title instance-attribute
title
typical_mistakes class-attribute instance-attribute
typical_mistakes = None

factory

ExtractorFactory

Factory to build an extractor based on the extractor type.

build_extractor classmethod
build_extractor(parser_type, config, document_handler)

Build an extractor based on the extractor type.

Source code in components/documents/public/business_logic/extraction/factory.py
@classmethod
def build_extractor(
    cls,
    parser_type: ExtractorType,
    config: DynamicLLMExtractorConfiguration,
    document_handler: BaseDocumentHandler,
) -> BaseExtractor:
    """
    Build an extractor based on the extractor type.
    """
    from components.documents.internal.business_logic.extraction.extractor.dynamic_llm_extractor import (
        DynamicLLMExtractor,
    )

    match parser_type:
        case ExtractorType.dynamic_llm:
            return DynamicLLMExtractor(config, document_handler)
        case _:
            raise ValueError(
                f"Parser type {parser_type} is not supported by build_llm_parser"
            )

helpers

call_gpt_chat
call_gpt_chat(
    instructions,
    user_input,
    prompt_examples,
    llm_model,
    use_json_mode,
    call_context=None,
)

Call an OpenAI chat model with the given instructions, transcription text and prompt examples. It automatically retries if we reach the rate limit

:param call_context: Existing context to continue the conversation

Source code in components/documents/public/business_logic/extraction/helpers.py
def call_gpt_chat(
    instructions: str,
    user_input: str,
    prompt_examples: list[PromptExample],
    llm_model: LlmModel,
    use_json_mode: bool,
    call_context: ChatGptCallContext | None = None,
) -> tuple[str | None, ChatGptCallContext]:
    """
    Call an OpenAI chat model with the given instructions, transcription text and prompt examples.
    It automatically retries if we reach the rate limit

    :param call_context: Existing context to continue the conversation
    """
    if is_instruct_model(llm_model):
        raise ValueError("Instruct models are not supported by this method")
    if not call_context:
        new_conversation = True
        messages = _build_chat_gpt_system_messages(
            instructions=instructions,
            prompt_examples=prompt_examples,
            llm_model=llm_model,
        )
        messages.append(build_message(LlmRole.USER, user_input, llm_model))

        call_context = ChatGptCallContext(
            llm_model=llm_model,
            example_ids=[e.id for e in prompt_examples],
            nb_calls=0,
            usage_total_tokens=0,
            messages=[],
        )

        current_logger.info(f"Messages sent to LLM {llm_model}")
        for message in messages:
            _log_gpt_message(message)
    else:
        new_conversation = False
        # interactive mode, log the last instruction only. GPT answer will be logged afterward.
        messages = call_context.messages or []
        messages.append(build_message(LlmRole.USER, instructions, llm_model))
        _log_gpt_message(messages[-1])

    interactive = is_development_mode() and env.bool(
        "INTERACTIVE_GPT_CHAT", default=False
    )
    try:
        # Only use Azure client for non OpenAI models
        gpt_chat_mode_completion = (
            openai_query_gpt_chat_mode_completion
            if isinstance(llm_model, OpenAiModel)
            else azure_query_gpt_chat_mode_completion
        )  # TODO: raise if the model isn't an OpenAI model nor an Azure hosted model

        gpt_response = gpt_chat_mode_completion.retry_with(  # type: ignore[union-attr]
            stop=stop_after_attempt(5 if is_development_mode() else 3),
        )(
            model=llm_model,
            messages=messages,
            temperature=0,
            use_json_mode=use_json_mode
            if new_conversation
            else (
                use_json_mode and not interactive
            ),  # disabled use_json_mode for interactive mode
            max_tokens=4096,
        )

        gpt_response_content = gpt_response.choices[0].message.content

        if not gpt_response_content:
            raise OpenAIError("Empty response from GPT")

        current_logger.info(
            **make_encrypted_log(f"LLM {llm_model=} response content is {gpt_response}")
        )
        messages.append(
            build_message(LlmRole.ASSISTANT, gpt_response_content, llm_model)
        )
        usage_total_tokens = call_context.usage_total_tokens
        if gpt_response.usage:
            usage_total_tokens += gpt_response.usage.total_tokens
        call_context = dataclasses.replace(
            call_context,
            nb_calls=call_context.nb_calls + 1,
            usage_total_tokens=usage_total_tokens,
            messages=messages,
        )

        new_question = click.prompt("_", default="") if interactive else None
        if new_question:
            return call_gpt_chat(
                instructions=new_question,
                user_input=user_input,
                prompt_examples=prompt_examples,
                llm_model=llm_model,
                use_json_mode=use_json_mode,
                call_context=call_context,
            )

        return gpt_response_content, call_context
    except AzureError as e:
        current_logger.exception(f"Error while calling {llm_model}")

        return None, dataclasses.replace(
            call_context,
            error_type=e.__class__.__name__,
        )
    except OpenAIError as e:
        if isinstance(
            e, BadRequestError
        ) and "Please reduce the length of the messages" in str(e):
            current_logger.warning(f"Error while calling GPT {llm_model}", exc_info=e)
        else:
            current_logger.exception(f"Error while calling GPT {llm_model}")

        return None, dataclasses.replace(
            call_context,
            error_type=e.__class__.__name__,
        )

prompt_builder

build_prompt_from_file
build_prompt_from_file(prompt_dir, prompt_filename)

Build an LLM prompt from a file.

Source code in components/documents/public/business_logic/extraction/prompt_builder.py
def build_prompt_from_file(prompt_dir: Path, prompt_filename: str) -> str:
    """
    Build an LLM prompt from a file.
    """
    with open(prompt_dir / prompt_filename) as f:
        return f.read()
build_prompt_from_jinja
build_prompt_from_jinja(
    prompt_jinja_env,
    prompt_dir,
    prompt_filename,
    parameters=None,
    extraction_result_model=None,
)

Build an LLM prompt from a jinja template or a file using the given parameters. When extraction_result_model is provided, it will be used to generate instructions for the extraction.

Source code in components/documents/public/business_logic/extraction/prompt_builder.py
def build_prompt_from_jinja(
    prompt_jinja_env: Environment,
    prompt_dir: Path,
    prompt_filename: str,
    parameters: dict[str, Any] | None = None,
    extraction_result_model: typing.Optional[type[BaseModel]] = None,
) -> str:
    """
    Build an LLM prompt from a jinja template or a file using the given parameters.
    When extraction_result_model is provided, it will be used to generate instructions for the extraction.
    """
    if parameters is None:
        parameters = {}
    if prompt_filename.endswith(".jinja"):
        if extraction_result_model:
            parameters["extraction_instruction"] = (
                ExtractionInstructionPresenter.from_extraction_result_model(
                    extraction_result_model
                )
            )
        return (
            prompt_jinja_env.get_template(prompt_filename).render(**parameters).strip()
        )
    else:
        return build_prompt_from_file(prompt_dir, prompt_filename)
get_jinja_env_for_dir
get_jinja_env_for_dir(search_path)

Get jinja environment for a directory for rendering templates.

Source code in components/documents/public/business_logic/extraction/prompt_builder.py
def get_jinja_env_for_dir(search_path: Path) -> Environment:
    """
    Get jinja environment for a directory for rendering templates.
    """
    jinja_env = Environment(  # noqa: S701
        undefined=StrictUndefined,
        trim_blocks=True,
        loader=FileSystemLoader(search_path),
    )
    jinja_env.filters["dedent"] = lambda text: textwrap.dedent(text).strip()
    return jinja_env

parsing

actions

ask_for_review
ask_for_review(
    document_id, actor_ref, operator_comment=None
)

This function is called when we need a new parsing to correct the previous one. A new task will be created to ask for a new parsing. The document should have no opened task. Nothing is committed in the session.

:param document_id: The document id :param actor_ref: The actor asking for review identifier (ex: operator id) :param operator_comment: The comment to add to the parsing task

Source code in components/documents/public/business_logic/parsing/actions.py
@obs.api_call()
def ask_for_review(
    document_id: DocumentId, actor_ref: str, operator_comment: str | None = None
) -> TaskId:
    """
    This function is called when we need a new parsing to correct the previous one.
    A new task will be created to ask for a new parsing. The document should have no opened task. Nothing is committed in the session.

    :param document_id: The document id
    :param actor_ref: The actor asking for review identifier (ex: operator id)
    :param operator_comment: The comment to add to the parsing task
    """
    with no_commit_in_session(commit_at_end=True):
        document = get_or_raise_missing_resource(Document, document_id)
        not_closed_task = get_not_closed_parsing_task_for_document(
            document_id=document_id
        )
        if not_closed_task:
            current_logger.warning(
                f"Document {document_id} already has a not closed parsing task {not_closed_task.id}, nothing to do."
            )
            return not_closed_task.id

        # Save a new parsing with review needed status
        latest_extraction = get_latest_document_extraction(document_id=document_id)
        if latest_extraction is None:
            raise ValueError(
                f"Document {document_id} has no extraction result to ask for review"
            )

        save_extraction_for_document(
            source=StepSource.manual,
            validation_status=StepValidationStatus.review_needed,
            document=document,
            transcription_result=latest_extraction.transcription_result,
            classification_result=latest_extraction.classification_result,
            extraction_context=latest_extraction.extraction_context,
            extraction_result=latest_extraction.extraction_result,
            rejection_reasons=latest_extraction.rejection_reasons,
            creator_ref=actor_ref,
            operator_comment=operator_comment,
        )

        # Create a new task to ask for a new parsing
        task = create_parsing_task_for_document(document_id=document.id, commit=False)
        current_session.flush()
        return task.id

parsing_configuration_registry

DocumentParsingConfigurationRegistry

Registry of document parsing configurations.

get_configuration classmethod
get_configuration(document_type)

Get a document parsing configuration by document type.

Source code in components/documents/public/business_logic/parsing/parsing_configuration_registry.py
@classmethod
def get_configuration(
    cls, document_type: DocumentType
) -> DocumentParsingConfiguration | None:
    """
    Get a document parsing configuration by document type.
    """
    return cls.registry.get(document_type)
get_document_types classmethod
get_document_types(has_auto_parsing_configuration)

Get the list of document types for which parsing configurations are registered.

:param has_auto_parsing_configuration: If True, only return document types with auto-parsing configuration

Source code in components/documents/public/business_logic/parsing/parsing_configuration_registry.py
@classmethod
def get_document_types(
    cls, has_auto_parsing_configuration: bool
) -> list[DocumentType]:
    """
    Get the list of document types for which parsing configurations are registered.

    :param has_auto_parsing_configuration: If True, only return document types with auto-parsing configuration
    """
    if has_auto_parsing_configuration:
        return [
            document_type
            for document_type, configuration in cls.registry.items()
            if configuration.document_auto_parsing_flow_configuration
        ]
    return list(cls.registry.keys())
register classmethod
register(configuration)

Register a document parsing configuration.

Source code in components/documents/public/business_logic/parsing/parsing_configuration_registry.py
@classmethod
def register(cls, configuration: DocumentParsingConfiguration) -> None:
    """
    Register a document parsing configuration.
    """
    current_logger.debug(
        f"[Documents] Registering document parsing configuration for {configuration.document_type}"
    )
    cls.registry[configuration.document_type] = configuration
    if configuration.i18n_keys:
        register_i18n_keys(configuration.i18n_keys)
registry class-attribute instance-attribute
registry = {}

queries

get_latest_parsing_data
get_latest_parsing_data(document_id)

Get the last parsing step results

Source code in components/documents/public/business_logic/parsing/queries.py
@obs.api_call()
def get_latest_parsing_data(document_id: DocumentId) -> DocumentParsingData:
    """
    Get the last parsing step results
    """
    latest_extraction = get_latest_document_extraction(document_id=document_id)
    latest_classification = (
        latest_extraction.classification_result
        if latest_extraction
        else get_latest_document_classification(document_id=document_id)
    )
    return results_to_document_parsing_data(latest_classification, latest_extraction)
get_parsing_data
get_parsing_data(document_id, version)

Get the parsing step results for a specific version

Source code in components/documents/public/business_logic/parsing/queries.py
@obs.api_call()
def get_parsing_data(document_id: DocumentId, version: int) -> DocumentParsingData:
    """
    Get the parsing step results for a specific version
    """
    extraction = get_document_extraction(document_id=document_id, version=version)
    classification = extraction.classification_result if extraction else None
    return results_to_document_parsing_data(classification, extraction)

components.documents.public.commands

app_group

documents_commands module-attribute

documents_commands = AppGroup(
    "documents",
    help="Main command group for the documents component",
    monitor_command_on_slack=False,
)

controls

create_internal_controls

create_internal_controls(document_ids)

Process operation tasks to open with external task provider

Source code in components/documents/public/commands/controls.py
@documents_commands.command(requires_authentication=False)
@click.option(
    "--document-ids",
    help="Document ids to run tests on (comma separated)",
    type=str,
)
def create_internal_controls(document_ids: str) -> None:
    """
    Process operation tasks to open with external task provider
    """
    from components.documents.internal.business_logic.control.actions import (
        create_document_internal_control,
    )
    from components.documents.internal.business_logic.control.queries import (
        find_pending_internal_control_review_by_document_id,
    )
    from components.documents.internal.business_logic.extraction.queries import (
        get_latest_validated_document_extraction,
    )

    for document_id_str in document_ids.split(","):
        document_id = uuid.UUID(document_id_str.strip())
        document = get_or_raise_missing_resource(Document, document_id)
        extraction = get_latest_validated_document_extraction(document_id)
        if not extraction:
            click.echo(f"No validated extraction found for document {document_id}")
            continue
        pending_control = find_pending_internal_control_review_by_document_id(
            document_id
        )
        if pending_control:
            click.echo(
                f"Pending control already exists for document {document_id}, see {pending_control.id}"
            )
            continue
        internal_control = create_document_internal_control(
            document_id=document.id,
            document_extraction_result_id=extraction.id,
            commit=True,
        )
        click.echo(
            f"Created internal control {internal_control.id} for document {document_id}"
        )

    click.echo(
        "Reminder : You need to wait a bit (10 minutes) or run `flask documents process_created_operation_tasks` to process the created tasks"
    )

document_embedding

recompute_documents_embedding

recompute_documents_embedding(type, embedding_algorithm)

Recompute documents embedding for a given type

Source code in components/documents/public/commands/document_embedding.py
@documents_commands.command(requires_authentication=False)
@click.option("--type", type=str, required=True)
@click.option("--embedding_algorithm", type=str, required=True)
def recompute_documents_embedding(
    type: str,  # noqa: A002
    embedding_algorithm: str,
) -> None:
    """
    Recompute documents embedding for a given type
    """
    from components.documents.internal.helpers.embedding_algorithm import (
        get_embedding_column,
        get_embedding_function,
    )
    from components.documents.internal.models.document_embedding import (
        DocumentEmbedding,
    )
    from components.documents.public.enums.document_type import (
        DocumentType,
    )
    from components.documents.public.enums.embedding_algorithm import (
        EmbeddingAlgorithm,
    )

    document_type = DocumentType.validate(type)
    embedding_algorithm = EmbeddingAlgorithm.validate(embedding_algorithm)

    current_logger.info(
        f"Recomputing documents embedding for type {document_type} and embedding algorithm {embedding_algorithm}"
    )

    embedding_function = get_embedding_function(embedding_algorithm)
    embedding_column = get_embedding_column(embedding_algorithm)
    documents_query = (
        current_session.query(DocumentEmbedding)  # noqa: ALN085
        .filter(
            DocumentEmbedding.type == document_type,
        )
        .yield_per(500)
    )

    for document in documents_query:
        text_embedding = embedding_function([document.text])[0]
        document.__setattr__(embedding_column.name, text_embedding)

    current_session.commit()

document_operation_task

process_created_operation_tasks

process_created_operation_tasks()

Process operation tasks to open with external task provider

Source code in components/documents/public/commands/document_operation_task.py
@documents_commands.command(requires_authentication=False)
def process_created_operation_tasks() -> None:
    """
    Process operation tasks to open with external task provider
    """
    from components.documents.internal.business_logic.operation_task.task_actions import (
        open_created_tasks,
    )

    open_created_tasks()

embedding

index_documents

index_documents(nb_days, document_types, dry_run)

Compute asynchronously document embeddings for a given list of document type and time range. It only computes embeddings of documents not already embedded. This command takes care of embedding documents type that can be automatically parsed If the document doesn't have a transcription or a valid classification result (i.e. not parsed and validated yet), it will be skipped.

Source code in components/documents/public/commands/embedding.py
@documents_commands.command(requires_authentication=False)
@click.option("--nb-days", type=int, required=False, help="Number of days since upload")
@click.option(
    "--document-type",
    "document_types",
    type=click.Choice(DocumentType.get_values()),
    required=False,
    multiple=True,
    help="Document types to embed. If not specified all document types with auto-parsing configuration will be embedded.",
)
@command_with_dry_run
def index_documents(
    nb_days: int, document_types: list[DocumentType], dry_run: bool
) -> None:
    """
    Compute asynchronously document embeddings for a given list of document type and time range.
    It only computes embeddings of documents not already embedded.
    This command takes care of embedding documents type that can be automatically parsed
    If the document doesn't have a transcription or a valid classification result (i.e. not parsed and validated yet), it will be skipped.
    """
    if not document_types:
        document_types = DocumentParsingConfigurationRegistry.get_document_types(
            has_auto_parsing_configuration=True
        )

    current_logger.info(
        f"Embedding documents of type {', '.join(document_types)} uploaded since {nb_days} days..."
    )

    # we only index documents embeddings for documents that weren't indexed yet
    # If you change the embedding algorithm, you might want to change this logic to reindex documents that were indexed with another embedding algorithm
    already_indexed_document_ids_query = current_session.query(  # noqa: ALN085
        DocumentEmbedding.document_id
    ).filter(
        DocumentEmbedding.type.in_(document_types),
    )
    if nb_days is not None:
        already_indexed_document_ids_query = already_indexed_document_ids_query.filter(
            DocumentEmbedding.created_at >= date.today() - timedelta(days=nb_days)
        )

    document_ids_to_index = (
        current_session.query(Document)  # noqa: ALN085
        .filter(
            Document.document_type.in_(document_types),
            cast(Document.id, String(255)).notin_(already_indexed_document_ids_query),
        )
        .with_entities(Document.id)
    )
    if nb_days is not None:
        document_ids_to_index = document_ids_to_index.filter(
            Document.created_at >= date.today() - timedelta(days=nb_days)
        )

    documents_count = document_ids_to_index.count()
    if documents_count == 0:
        current_logger.info("No document to embed")
        return

    if dry_run:
        click.echo(f"[dry-run] Would index {documents_count} document embeddings")
        return

    current_logger.info(f"Indexing {documents_count} document embeddings...")
    with current_rq.pipelined():
        for (document_id,) in document_ids_to_index.all():
            index_global_document_async(document_id)

performance

import_latest_extraction_result_from_turing

import_latest_extraction_result_from_turing(batch)

Import the latest extraction result from Turing for a given batch.

Source code in components/documents/public/commands/performance.py
@documents_commands.command(requires_authentication=False)
@click.option("--batch")
def import_latest_extraction_result_from_turing(
    batch: str,
) -> None:
    """
    Import the latest extraction result from Turing for a given batch.
    """
    current_logger.info(
        f"Running import_latest_extraction_result_from_turing on {batch}"
    )
    import json
    from dataclasses import dataclass

    import re2
    from dataclasses_json import DataClassJsonMixin

    from components.documents.internal.business_logic.extraction.queries import (
        get_latest_validated_document_extraction,
    )
    from shared.helpers.env import is_development_mode
    from shared.helpers.turing.fetch import fetch_dataclass_from_turing

    if not is_development_mode():
        raise ValueError("This command should only be run in development mode. ")

    @dataclass
    class Row(DataClassJsonMixin):
        document_id: uuid.UUID
        extraction_result: str

    query = """
    SELECT der.document_id
         , der.extraction_result
    FROM backend.document_extraction_result der
    JOIN backend.document doc ON doc.id = der.document_id
    WHERE doc.upload_metadata:batch = %(batch)s
      AND der.dead_branch_at IS NULL
    """
    rows: list[Row] = fetch_dataclass_from_turing(
        query=query,
        dataclass_type=Row,
        parameters={"batch": batch},
    )

    def sanitize_extraction_result(
        initial_extraction_result: dict[str, Any],
    ) -> dict[str, Any]:
        extraction_result = {**initial_extraction_result}

        def sanitize_item(item: dict[str, Any]) -> dict[str, Any]:
            item = {**item}
            item["vat_rate"] = "%.2f%%" % float(
                item["vat_rate"].replace("%", "").replace(",", ".")
            )
            item["designation"] = re2.compile(
                r"Honor[\s,.]*dispens[\s,.]*(.+)", re2.IGNORECASE
            ).sub(r"Honor. dispens. \1", item["designation"].strip())
            return item

        extraction_result["items"] = [
            sanitize_item(item) for item in extraction_result["items"]
        ]
        return extraction_result

    for row in rows:
        latest_validated_document_extraction = get_latest_validated_document_extraction(
            row.document_id
        )
        if latest_validated_document_extraction is None:
            current_logger.warning(
                f"Document {row.document_id} has no validated extraction result"
            )
            continue
        current_logger.info(
            f"Update document {row.document_id} latest extraction result {latest_validated_document_extraction.id}"
        )
        extraction_result = json.loads(row.extraction_result)
        extraction_result = sanitize_extraction_result(extraction_result)
        latest_validated_document_extraction.extraction_result = extraction_result
    current_session.commit()

transcribe

transcribe(document_id)

Run transcription on the given document

Source code in components/documents/public/commands/performance.py
@documents_commands.command(requires_authentication=False)
@click.argument("document_id", type=uuid.UUID)
def transcribe(
    document_id: uuid.UUID,
) -> None:
    """
    Run transcription on the given document
    """
    current_logger.info("Running transcription on document %s", document_id)
    from components.documents.internal.business_logic.parsing.flow.transcription import (
        DocumentTranscriptionLogic,
    )
    from components.documents.internal.models.document import Document
    from components.documents.public.business_logic.parsing.parsing_configuration_registry import (
        DocumentParsingConfigurationRegistry,
    )
    from shared.helpers.get_or_else import get_or_raise_missing_resource

    document = get_or_raise_missing_resource(Document, document_id)
    configuration = DocumentParsingConfigurationRegistry.get_configuration(
        document.document_type
    )
    assert configuration, (
        f"No configuration found for document type {document.document_type}"
    )
    assert configuration.document_auto_parsing_flow_configuration, (
        f"No auto parsing configuration found for document type {document.document_type}"
    )
    document_transcription = DocumentTranscriptionLogic.run_transcription(
        document,
        configuration.document_auto_parsing_flow_configuration.transcription_configuration,
    )
    if document_transcription.transcription_context is not None:
        current_logger.info(
            f"Transcription done with confidence {document_transcription.transcription_context.transcription_confidence}"
        )
    markdown_transcription = document_transcription.get_markdown_transcription()
    if markdown_transcription:
        print(markdown_transcription.markdown)  # noqa: T201

transcription

transcribe_document

transcribe_document(document_id)

Run the transcription logic for a given document and save it.

Source code in components/documents/public/commands/transcription.py
@enqueueable
def transcribe_document(document_id: uuid.UUID) -> None:
    """
    Run the transcription logic for a given document and save it.
    """
    document = get_or_raise_missing_resource(Document, document_id)

    parsing_configuration = DocumentParsingConfigurationRegistry.get_configuration(
        document.document_type
    )
    if (
        parsing_configuration is None
        or parsing_configuration.document_auto_parsing_flow_configuration is None
    ):
        # we should normally use the document parsing configuration to get the transcription configuration, but we haven't registered the document auto parsing configuration yet
        current_logger.warning(
            f"document type {document.document_type} has no transcription configuration. Using a default configuration."
        )
        transcription_configuration = DocumentTranscriptionConfiguration()
    else:
        transcription_configuration = parsing_configuration.document_auto_parsing_flow_configuration.transcription_configuration

    document_transcription_result = DocumentTranscriptionLogic.run_transcription(
        document=document,
        transcription_configuration=transcription_configuration,
    )
    current_session.add(document_transcription_result)
    current_session.commit()

transcribe_documents

transcribe_documents(document_type, dry_run)

Transcribe all documents of a given type that have a valid classification result and no transcription result. This command is useful when a document type auto parsing configuration is added. It will transcribe massively all the past documents. This is needed in order to index the documents embeddings to use them as similar examples for the automatic extraction using the dynamic LLM extractor.

Source code in components/documents/public/commands/transcription.py
@documents_commands.command(requires_authentication=False)
@click.option(
    "--document-type",
    type=click.Choice(DocumentType.get_values()),
    required=True,
    help="The document type",
)
@command_with_dry_run
def transcribe_documents(document_type: DocumentType, dry_run: bool) -> None:
    """
    Transcribe all documents of a given type that have a valid classification result and no transcription result.
    This command is useful when a document type auto parsing configuration is added. It will transcribe massively all the past documents.
    This is needed in order to index the documents embeddings to use them as similar examples for the automatic extraction using the dynamic LLM extractor.
    """
    current_logger.info(
        f"Starting transcription of documents of type {document_type}..."
    )

    # transcribe all documents that have a valid document classification result and no document transcription
    documents_with_classification = (
        current_session.query(DocumentClassificationResult)  # noqa: ALN085
        .options(
            joinedload(DocumentClassificationResult.document),
            load_only(DocumentClassificationResult.document_id),
        )
        .filter(
            Document.document_type == document_type,
            DocumentClassificationResult.validation_status
            == StepValidationStatus.validated,
        )
        .distinct()
    ).all()

    document_ids_with_classification = {
        doc.document_id for doc in documents_with_classification
    }

    documents_with_transcription = (
        current_session.query(DocumentTranscriptionResult)  # noqa: ALN085
        .options(load_only(DocumentTranscriptionResult.document_id))
        .filter(
            Document.document_type == document_type,
            DocumentTranscriptionResult.document_id.in_(
                document_ids_with_classification
            ),
        )
        .distinct()
    )

    document_ids_with_transcription = {
        doc.document_id for doc in documents_with_transcription
    }

    document_ids_to_transcribe = (
        document_ids_with_classification - document_ids_with_transcription
    )
    if dry_run:
        current_logger.info(
            f"[dry-run] Would transcribe {len(document_ids_to_transcribe)} documents asynchronously"
        )
        return

    current_logger.info(
        f"Transcribing {len(document_ids_to_transcribe)} documents asynchronously..."
    )
    with current_rq.pipelined():
        document_parsing_queue = current_rq.get_queue(DOCUMENT_PARSING_QUEUE)
        for document_id in document_ids_to_transcribe:
            document_parsing_queue.enqueue(
                transcribe_document,
                document_id=document_id,
            )

components.documents.public.constants

DOCUMENTS_SCHEMA_NAME module-attribute

DOCUMENTS_SCHEMA_NAME = 'documents'

Schema name for the documents component

UNCLASSIFIABLE module-attribute

UNCLASSIFIABLE = 'unclassifiable'

Fallback label for document classification used when the classifier cannot classify. This can also be used to reject unsupported documents.

components.documents.public.controllers

batches

DocumentBatchController

Bases: BaseController

Controller to manage documents in a batch

delete
delete(batch_id, id)

Remove a document from its batch

Source code in components/documents/public/controllers/batches.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def delete(
    self,
    batch_id: int,
    id: str,
) -> Response:
    """
    Remove a document from its batch
    """
    from components.documents.public.business_logic.batches.actions import (
        remove_document_from_batch,
    )

    remove_document_from_batch(
        batch_id=batch_id,
        document_id=id,
    )

    return make_success_json_response()
get
get(batch_id)

Returns the documents of a performance batch

Source code in components/documents/public/controllers/batches.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def get(self, batch_id: int) -> Response:
    """
    Returns the documents of a performance batch
    """
    from components.documents.public.business_logic.batches.queries import (
        get_documents_by_batch,
    )

    documents = get_documents_by_batch(batch_id=batch_id)
    return make_json_response(documents)
post
post(batch_id, params)

Add documents to a batch

Source code in components/documents/public/controllers/batches.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.parse_document,
        }
    ),
)
@request_argument(
    "document_id",
    type=str,
    required=True,
    location="json",
    owner_controller=NoOwner,
)
@request_argument(
    "external_id",
    type=str,
    required=False,
    location="json",
    owner_controller=NoOwner,
)
@request_argument(
    "stack",
    type=str,
    required=False,
    location="json",
)
@request_argument(
    "document_type",
    type=str,
    required=False,
    location="json",
)
@obs.api_call()
def post(
    self,
    batch_id: int,
    params: dict[str, Any],
) -> Response:
    """
    Add documents to a batch
    """
    from components.documents.public.business_logic.batches.actions import (
        add_document_to_batch,
    )

    add_document_to_batch(
        batch_id=batch_id,
        document_id=params["document_id"],
        external_id=params.get("external_id"),
        document_type=params.get("document_type"),
        stack=params.get("stack"),
    )

    return make_success_json_response()

DocumentReferenceController

Bases: BaseController

Controller to flag a document as validated or not

post
post(document_id, params)

Flag a document as validated or not

Source code in components/documents/public/controllers/batches.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@request_argument(
    "validated",
    type=bool,
    required=True,
    location="json",
)
@obs.api_call()
def post(
    self,
    document_id: str,
    params: dict[str, Any],
) -> Response:
    """
    Flag a document as validated or not
    """
    from components.documents.public.business_logic.batches.actions import (
        flag_batch_document_as_validated,
    )

    flag_batch_document_as_validated(
        document_id=document_id,
        validated=params["validated"],
    )

    return make_success_json_response()

DocumentsBatchesController

Bases: BaseController

Controller to manage performance batches

delete
delete(id)

Deletes a performance batch

Source code in components/documents/public/controllers/batches.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def delete(self, id: int) -> Response:
    """
    Deletes a performance batch
    """
    from components.documents.public.business_logic.batches.actions import (
        delete_batch,
    )

    delete_batch(batch_id=id)
    return make_success_json_response()
get
get(id=None)

Returns all available performance batches or a specific batch by ID

Source code in components/documents/public/controllers/batches.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def get(self, id: int | None = None) -> Response:
    """
    Returns all available performance batches or a specific batch by ID
    """
    from components.documents.public.business_logic.batches.queries import (
        get_all_batches,
        get_batch_by_id,
    )

    if id is not None:
        batch = get_batch_by_id(id)
        return make_json_response(batch)

    batches = get_all_batches()
    return make_json_response(list(batches))
post
post(params)

Creates a new performance batch

Source code in components/documents/public/controllers/batches.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@request_argument(
    "batch_name",
    type=str,
    required=True,
    location="json",
)
@obs.api_call()
def post(self, params: dict[str, Any]) -> Response:
    """
    Creates a new performance batch
    """
    from components.documents.public.business_logic.batches.actions import (
        create_new_batch,
    )

    batch_name = params["batch_name"]
    created_batch = create_new_batch(batch_name=batch_name)

    return make_json_response({"batch": created_batch})

documents_batches_endpoint module-attribute

documents_batches_endpoint = Endpoint('batches')

document

DocumentContentController

Bases: BaseController

Controller to get the content type of the document

get
get(document_id, params)

Used by generic document viewer to get the content of the document

Source code in components/documents/public/controllers/document.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@request_argument("inline", type=bool, required=False, default=False)
@obs.api_call()
def get(self, document_id: uuid.UUID, params: dict[str, Any]) -> Response:
    """
    Used by generic document viewer to get the content of the document
    """
    from components.documents.public.business_logic.document.queries import (
        get_document_content,
    )
    from shared.helpers.file import send_file

    document_content = get_document_content(document_id)

    return send_file(
        document_content.file,
        document_content.filename,
        inline=params["inline"],
        mimetype=document_content.content_type,
    )

DocumentController

Bases: BaseController

Controller to get the information of the document

get
get(document_id)

Returns the general information of the document (not the content)

Source code in components/documents/public/controllers/document.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def get(self, document_id: uuid.UUID) -> Response:
    """
    Returns the general information of the document (not the content)
    """
    from components.documents.public.business_logic.document.queries import (
        get_document_info,
    )

    document_info = get_document_info(document_id)
    return make_json_response(document_info.to_dict())

DocumentGetLatestParsingController

Bases: BaseController

Get the last parsing result of a document

get
get(document_id)

Get the last Classification and Extraction

Source code in components/documents/public/controllers/document.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def get(self, document_id: uuid.UUID) -> Response:
    """
    Get the last Classification and Extraction
    """
    from components.documents.public.business_logic.parsing.queries import (
        get_latest_parsing_data,
    )

    latest_parsing = get_latest_parsing_data(document_id=document_id)

    return make_json_response(latest_parsing.to_dict())

DocumentInternalControlReviewController

Bases: BaseController

Controller to get an internal control review by its ID

get
get(document_id, review_id)

Get an internal control review by its ID

Source code in components/documents/public/controllers/document.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def get(self, document_id: uuid.UUID, review_id: uuid.UUID) -> Response:
    """
    Get an internal control review by its ID
    """
    from components.documents.internal.business_logic.control.queries import (
        get_internal_control_review_info,
    )

    internal_control_review_info = get_internal_control_review_info(review_id)

    # Verify that the review belongs to the specified document
    if internal_control_review_info.document_id != document_id:
        return make_json_response(
            {"error": "Internal control review does not belong to the document"},
            code=400,
        )

    return make_json_response(internal_control_review_info.to_dict())

DocumentSubmitParsingController

Bases: BaseController

Manually submit a parsing for a document

InternalControlReviewParam dataclass
InternalControlReviewParam(id, control_validated)

Bases: DataClassJsonMixin

Parameters for internal control review when submitting a parsing result

control_validated instance-attribute
control_validated
id instance-attribute
id
post
post(user, document_id, params)

Create a new classification and extraction result

Source code in components/documents/public/controllers/document.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.parse_document,
            EmployeePermission.control_extraction_a_posteriori,
        }
    ),
)
@request_argument(
    "document_category", type=DocumentCategory, required=True, location="json"
)
@request_argument("extraction_result", type=dict, required=False, location="json")
@request_argument(
    "validation_status", type=StepValidationStatus, required=True, location="json"
)
@request_argument(
    "rejection_reasons",
    type=list[ParsingRejectionReason],
    required=False,
    location="json",
)
@request_argument(
    "internal_control_review",
    type=InternalControlReviewParam.from_dict,
    required=False,
    location="json",
)
@inject_user
@obs.api_call()
def post(
    self,
    user: Authenticatable,
    document_id: uuid.UUID,
    params: dict[str, Any],
) -> Response:
    """
    Create a new classification and extraction result
    """
    from components.documents.internal.business_logic.control.actions import (
        submit_document_internal_control_review,
    )
    from components.documents.internal.business_logic.parsing.actions import (
        save_manual_parsing,
    )
    from components.documents.internal.models.document_internal_control_review import (
        DocumentInternalControlReview,
    )
    from shared.helpers.get_or_else import get_or_raise_missing_resource

    document_category = params["document_category"]
    extraction_result = params.get("extraction_result")  # Manually input form data
    validation_status = params["validation_status"]
    rejection_reasons = params.get("rejection_reasons")
    internal_control_review: (
        DocumentSubmitParsingController.InternalControlReviewParam | None
    ) = params.get("internal_control_review")

    if (not extraction_result and not rejection_reasons) or (
        extraction_result and rejection_reasons
    ):
        return make_json_response(
            {"error": "Either extraction_result or rejection_reasons is required"},
            code=400,
        )

    if internal_control_review is not None:
        if not has_permission(
            user, EmployeePermission.control_extraction_a_posteriori
        ):
            return make_json_response(
                {
                    "error": "User does not have the permission to process a posteriori controls on a document extraction"
                },
                code=403,
            )

        document_internal_control_review = get_or_raise_missing_resource(
            DocumentInternalControlReview, internal_control_review.id
        )
        if document_internal_control_review.document_id != document_id:
            return make_json_response(
                {
                    "error": "Internal control review does not belong to the document"
                },
                code=400,
            )
        from shared.claim_management.enums.internal_control_validation_status import (
            InternalControlValidationStatus,
        )

        if (
            document_internal_control_review.validation_status
            != InternalControlValidationStatus.pending
        ):
            return make_json_response(
                {"error": "Internal control review is not in pending state"},
                code=400,
            )

        submit_document_internal_control_review(
            internal_control_review_id=internal_control_review.id,
            control_validated=internal_control_review.control_validated,
            commit=False,
        )

    save_manual_parsing(
        creator_ref=str(user.id),
        document_id=document_id,
        extraction_result=extraction_result,
        validation_status=validation_status,
        document_category=document_category,
        rejection_reasons=(
            [
                ParsingRejectionReason.validate(reason)
                for reason in rejection_reasons
                if reason
            ]
            if rejection_reasons
            else None
        ),
        is_from_internal_control=internal_control_review is not None,
    )

    return make_success_json_response()

DocumentTemporaryUrlController

Bases: BaseController

Controller to get the temporary url of the document

get
get(document_id)

Used by generic document viewer to get the temporary url of the document

Source code in components/documents/public/controllers/document.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@obs.api_call()
def get(self, document_id: uuid.UUID) -> str:
    """
    Used by generic document viewer to get the temporary url of the document
    """
    from components.documents.public.business_logic.document.queries import (
        get_temporary_download_url,
    )

    return get_temporary_download_url(document_id=document_id)

documents_endpoint module-attribute

documents_endpoint = Endpoint('documents')

document_type

DocumentCategorySchemaController

Bases: BaseController

Controller to get the content type of the document

get
get(document_type, document_category, params)

Get the schemas for the document category

Source code in components/documents/public/controllers/document_type.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@request_argument("lang", type=Lang, required=False)
@obs.api_call()
def get(
    self, document_type: str, document_category: str, params: dict[str, str]
) -> Response:
    """
    Get the schemas for the document category
    """
    lang_param = params.get("lang", "en")
    document_parsing_configuration = _get_document_parsing_configuration_or_raise(
        document_type
    )
    schema = document_parsing_configuration.get_document_category_json_schema(
        document_category=document_category, lang=Lang.validate(lang_param)
    )

    return make_json_response(schema) if schema else make_empty_response()

DocumentTypeCategoriesController

Bases: BaseController

Controller to get the categories of the document type

get
get(document_type, params)

Returns the categories of the document type

Source code in components/documents/public/controllers/document_type.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@request_argument("lang", type=Lang, required=False)
@obs.api_call()
def get(self, document_type: str, params: dict[str, str]) -> Response:
    """
    Returns the categories of the document type
    """
    lang_param = params.get("lang", "en")
    document_parsing_configuration = _get_document_parsing_configuration_or_raise(
        document_type
    )
    document_categories = document_parsing_configuration.get_document_categories(
        lang=Lang.validate(lang_param)
    )

    return make_json_response(document_categories)

document_types_endpoint module-attribute

document_types_endpoint = Endpoint('document_types')

upload

UploadController

Bases: BaseController

Documents are uploaded through this endpoint.

post
post(document_type, params)

Controller to upload a document, to be processed by the global document processing stack.

Source code in components/documents/public/controllers/upload.py
@view_method(
    auth_strategy=GlobalAuthorizationStrategies().alaner_admin(
        permitted_for={
            EmployeePermission.view_marmot_information,
        }
    ),
)
@request_argument(
    "document",
    type=FileStorage,
    required=True,
    location="files",
)
@request_argument(
    "metadata",
    type=str,
    required=False,
)
@obs.api_call()
def post(self, document_type: str, params):  # type: ignore[no-untyped-def]
    """
    Controller to upload a document, to be processed by the global document processing stack.
    """
    from components.documents.public.business_logic.document.actions import (
        upload_document,
    )

    actor_id = g.actor.id
    file_storage: FileStorage = params["document"]

    upload_metadata_json = params.get("metadata")
    upload_metadata = (
        json.loads(upload_metadata_json) if upload_metadata_json else {}
    )

    document_id = upload_document(
        uploader_ref=str(actor_id),
        document_type=DocumentType(document_type),
        file=file_storage,
        upload_metadata=upload_metadata,
    )
    return make_json_response({"id": document_id})

upload_endpoint module-attribute

upload_endpoint = Endpoint('upload')

components.documents.public.entities

batches

batch

Batch dataclass
Batch(id, name, created_at, documents_count)

Bases: DataClassJsonMixin

This the information about a performance batch

created_at instance-attribute
created_at
documents_count instance-attribute
documents_count
id instance-attribute
id
name instance-attribute
name

batch_document_info

BatchDocumentInfo dataclass
BatchDocumentInfo(
    id, external_id, stack, document_type, is_validated
)

Bases: DataClassJsonMixin

This the information about a document

document_type instance-attribute
document_type
external_id instance-attribute
external_id
id instance-attribute
id
is_validated instance-attribute
is_validated
stack instance-attribute
stack

classification

configuration

BasePredictorConfiguration

Bases: BaseModel

Base class for a predictor configuration

prepare_document_for_predictor
prepare_document_for_predictor(transcription)

Prepare the document to be sent to the predictor

Source code in components/documents/public/entities/classification/configuration.py
def prepare_document_for_predictor(
    self, transcription: Transcription
) -> dict[str, Any]:
    """
    Prepare the document to be sent to the predictor
    """
    return {"BEST_TRANSCRIPTION_TEXT": transcription.text}
LLMPredictorConfiguration

Bases: BasePredictorConfiguration

The configuration to use a LLM predictor

instructions instance-attribute
instructions

The instructions to provide to the LLM model to classify a document transcription

llm_model instance-attribute
llm_model

the LLM model to use. Make sure to use HDS models for HDS data

PythonicPredictorConfiguration

Bases: BasePredictorConfiguration

The configuration to use a Python predictor

python_predictor instance-attribute
python_predictor

Callable python predictor

SageMakerPredictorConfiguration

Bases: BasePredictorConfiguration

The configuration to use a SageMaker predictor

endpoint_name instance-attribute
endpoint_name

Sagemaker endpoint name

read_timeout_in_seconds class-attribute instance-attribute
read_timeout_in_seconds = 10

Read timeout in seconds to wait for the response

total_max_attempts class-attribute instance-attribute
total_max_attempts = 5

Total max attempts to try to get a response from the sagemaker predictor if there is an error

context

ClassificationContext dataclass
ClassificationContext(class_contexts)

Bases: DataClassJsonMixin

The context of the classification for each classifier.

class_contexts instance-attribute
class_contexts

Context of the classification indexed by classifier name

ClassificationReviewContext dataclass
ClassificationReviewContext(reasons_for_review)

Bases: DataClassJsonMixin

Review Context of a classification

reasons_for_review instance-attribute
reasons_for_review

Reasons for review of the classification indexed by classifier name

SageMakerPredictionContext dataclass
SageMakerPredictionContext(endpoint_name)

Bases: DataClassJsonMixin

The context of a prediction made by a SageMaker predictor.

endpoint_name instance-attribute
endpoint_name
SingleClassificationContext dataclass
SingleClassificationContext(
    predictor_type,
    prediction_proba,
    fallback_label_used,
    classification_confidence,
    sagemaker_call_context,
    llm_call_context,
)

Bases: DataClassJsonMixin

The context of a single classification.

classification_confidence instance-attribute
classification_confidence

Classification confidence score (None if we use the fallback label)

fallback_label_used instance-attribute
fallback_label_used

True if the returned label from the classifier was unknown and a fallback label was used

llm_call_context instance-attribute
llm_call_context

LLM call context for the classification, if the classifier uses an LLM model

prediction_proba instance-attribute
prediction_proba

The prediction probability for each possible label. None if the prediction failed and the fallback label was used

predictor_type instance-attribute
predictor_type

The type of the predictor used for the classification

sagemaker_call_context instance-attribute
sagemaker_call_context

Sagemaker call context for the classification, if the classifier uses a Sagemaker inference endpoint

prediction

PredictorOutput dataclass
PredictorOutput(best_prediction, prediction_proba)

Structure of the output of a predictor. A predictor can return multiple predicted classes

best_prediction instance-attribute
best_prediction
prediction_proba instance-attribute
prediction_proba

document

ExtractableDocument dataclass

ExtractableDocument(id, document_type)

Bases: S3Document

document_type instance-attribute
document_type
id instance-attribute
id

document_content

DocumentContent dataclass

DocumentContent(file, filename, content_type)

This is the content of a document

content_type instance-attribute
content_type
file instance-attribute
file
filename instance-attribute
filename

DocumentInfo dataclass

DocumentInfo(
    document_type,
    uploader_ref,
    created_at,
    content_type,
    content_hash,
    filename,
    upload_metadata,
    country,
)

Bases: DataClassJsonMixin

This the information about a document

content_hash instance-attribute
content_hash
content_type instance-attribute
content_type
country instance-attribute
country
created_at instance-attribute
created_at
document_type instance-attribute
document_type
filename instance-attribute
filename
upload_metadata instance-attribute
upload_metadata
uploader_ref instance-attribute
uploader_ref

embedding

DocumentEmbeddingData dataclass

DocumentEmbeddingData(id, text, metadata)

Document embedding data used by the index_document function.

id instance-attribute
id

The document id

metadata instance-attribute
metadata

The document metadata that will be stored along with the embedding to filter by metadata

text instance-attribute
text

The document textual transcription

extraction

configuration

BaseLLMExtractorConfiguration dataclass
BaseLLMExtractorConfiguration(
    *,
    hds_only=True,
    llm_model,
    document_type,
    n_similar_examples=5
)

Bases: ABC

Common configuration for dynamic prompting LLM extractor

__post_init__
__post_init__()
Source code in components/documents/public/entities/extraction/configuration.py
def __post_init__(self) -> None:  # noqa: D105
    if self.hds_only and not is_hds_compliant_model(self.llm_model):
        raise ValueError(f"LLM model {self.llm_model} is not HDS compliant")
document_type instance-attribute
document_type

type of the document to be parsed

hds_only class-attribute instance-attribute
hds_only = True

If true, only HDS LLM models can be used

llm_model instance-attribute
llm_model

the LLM model to use. Make sure to use HDS models for HDS data

n_similar_examples class-attribute instance-attribute
n_similar_examples = 5

the number of similar examples to inject in the prompt. If set to 0, make sure you provide the desired structured output in the prompt.

prepare_expected_output_for_llm
prepare_expected_output_for_llm(expected_output_content)

Prepare the LLM example_assistant message for a given expected output :param expected_output_content: :return:

Source code in components/documents/public/entities/extraction/configuration.py
def prepare_expected_output_for_llm(
    self, expected_output_content: dict[str, Any]
) -> str:
    """
    Prepare the LLM `example_assistant` message for a given expected output
    :param expected_output_content:
    :return:
    """
    return json.dumps(expected_output_content, ensure_ascii=False)
prepare_instructions abstractmethod
prepare_instructions(document_id)

Build the instructions (system message) for the LLM

Source code in components/documents/public/entities/extraction/configuration.py
@abstractmethod
def prepare_instructions(self, document_id: str) -> str:
    """
    Build the instructions (system message) for the LLM
    """
    ...
prepare_transcription_for_llm
prepare_transcription_for_llm(markdown_transcription)

Prepare the LLM example_user or user message for the transcription of a similar example document or the document to parse

Source code in components/documents/public/entities/extraction/configuration.py
def prepare_transcription_for_llm(
    self, markdown_transcription: MarkdownTranscription
) -> str:
    """
    Prepare the LLM `example_user` or `user` message for the transcription of a similar example document or the document to parse
    """
    # By default, we use the Markdown format
    return markdown_transcription.markdown
DynamicLLMExtractorConfiguration dataclass
DynamicLLMExtractorConfiguration(
    *,
    hds_only=True,
    llm_model,
    document_type,
    n_similar_examples=5,
    instructions,
    content_type,
    use_approximate_search=False,
    use_similar_examples_from_global_stack=False,
    reference_batches=None
)

Bases: BaseLLMExtractorConfiguration

Configuration for the global dynamic prompting LLM extractor

build_similar_document_metadata_filter
build_similar_document_metadata_filter(
    classification_result,
)

Build the metadata filter to apply when looking for similar examples. Default implementation is to filter on - the document category if a category is defined in the classification result - the stack=global if the configuration is set to use similar examples from the global stack

Source code in components/documents/public/entities/extraction/configuration.py
def build_similar_document_metadata_filter(
    self, classification_result: dict[str, Any]
) -> MetadataFilterBuilder | None:
    """
    Build the metadata filter to apply when looking for similar examples.
    Default implementation is to filter on
    - the document category if a category is defined in the classification result
    - the stack=global if the configuration is set to use similar examples from the global stack
    """
    stack = (
        "global"
        if self.use_similar_examples_from_global_stack
        else None  # use None as only the global stack set the stack metadata
    )
    category = classification_result.get("category", None)

    def metadata_filter(metadata):  # type: ignore[no-untyped-def]
        filters = []
        if stack is not None:
            filters.append(metadata["stack"].astext == stack)

        if category:
            filters.append(metadata["category"].astext == category)
        return filters

    return metadata_filter
content_type instance-attribute
content_type

content type of the structured output (Used to validate the LLM response)

instructions instance-attribute
instructions

the instructions to provide to the LLM

prepare_instructions
prepare_instructions(document_id)
Source code in components/documents/public/entities/extraction/configuration.py
@override
def prepare_instructions(self, document_id: str) -> str:
    return self.instructions
reference_batches class-attribute instance-attribute
reference_batches = None

If provided, we'll only look for similar example across documents from these batches as defined in their metadata.

use_approximate_search = False

if true, use approximate search when looking for similar documents. This can speed up the process but may return less relevant results

use_similar_examples_from_global_stack class-attribute instance-attribute
use_similar_examples_from_global_stack = False

if provided, we will make sure to apply a metadata filter to look for similar documents of the right stack

context

ExtractionContext dataclass
ExtractionContext(extraction_schema, llm_call_context=None)

Bases: DataClassJsonMixin

Extraction context to be filled during the extraction process

extraction_schema instance-attribute
extraction_schema

The json schema of the extraction schema

llm_call_context class-attribute instance-attribute
llm_call_context = None

The LLM Call context to be filled during the extraction process

ExtractionReviewContext dataclass
ExtractionReviewContext(
    validation_errors=None, reasons_for_review=list()
)

Bases: DataClassJsonMixin

Extraction review context to be filled during the extraction review process

reasons_for_review class-attribute instance-attribute
reasons_for_review = field(default_factory=list)

Overall reasons for review to be displayed in the manual parsing form

validation_errors class-attribute instance-attribute
validation_errors = None

Validation errors that occurred during the extraction review process

extraction_field

ExtractionFieldConfig

Bases: BaseModel

Configuration class for storing field-specific extraction instructions.

This class is used to store metadata that guides how a field should be parsed.

LlmGuidance

Bases: BaseModel

Contains guidance information for LLMs when extracting a field.

This information is used to generate prompts that help LLMs correctly identify and extract the field's value from the document.

location class-attribute instance-attribute
location = None
typical_mistakes class-attribute instance-attribute
typical_mistakes = None
as_json_schema_extra_dict
as_json_schema_extra_dict()

Converts the extraction configuration to a format that can be stored in a Pydantic field's json_schema_extra attribute.

Pydantic fields have a json_schema_extra field where we can store additional metadata that isn't part of the standard JSON schema. This method prepares our extraction configuration for storage in that field.

Returns:

Type Description
dict[str, Any]

A dictionary with the extraction configuration nested under the 'extraction_config' key.

Source code in components/documents/public/entities/extraction/extraction_field.py
def as_json_schema_extra_dict(self) -> dict[str, typing.Any]:
    """
    Converts the extraction configuration to a format that can be stored in
    a Pydantic field's json_schema_extra attribute.

    Pydantic fields have a json_schema_extra field where we can store
    additional metadata that isn't part of the standard JSON schema.
    This method prepares our extraction configuration for storage in that field.

    Returns:
        A dictionary with the extraction configuration nested under the 'extraction_config' key.
    """
    return {"extraction_config": self.dict()}
from_field_info classmethod
from_field_info(field_info)

Retrieves extraction configuration from a Pydantic field's metadata.

This method extracts and validates the extraction configuration that was previously stored in a field's json_schema_extra attribute.

Parameters:

Name Type Description Default
field_info FieldInfo

A Pydantic FieldInfo object containing field metadata

required

Returns:

Type Description
Optional[ExtractionFieldConfig]

A ExtractionFieldConfig object if extraction configuration exists in the field,

Optional[ExtractionFieldConfig]

or None if no extraction configuration is found.

Source code in components/documents/public/entities/extraction/extraction_field.py
@classmethod
def from_field_info(
    cls, field_info: FieldInfo
) -> typing.Optional["ExtractionFieldConfig"]:
    """
    Retrieves extraction configuration from a Pydantic field's metadata.

    This method extracts and validates the extraction configuration that was
    previously stored in a field's json_schema_extra attribute.

    Args:
        field_info: A Pydantic FieldInfo object containing field metadata

    Returns:
        A ExtractionFieldConfig object if extraction configuration exists in the field,
        or None if no extraction configuration is found.
    """
    # sometimes json_schema_extra is a callable
    if field_info.json_schema_extra is None:
        return None
    if isinstance(field_info.json_schema_extra, dict):
        return cls.parse_obj(field_info.json_schema_extra.get("extraction_config"))
    raise RuntimeError(
        "We can't build a ExtractionFieldConfig from a field with a callable json_schema_extra"
    )
llm_guidance instance-attribute
llm_guidance

prompt

PromptExample dataclass
PromptExample(input, output, id)

Prompt examples to be used for the LLM document extraction

id instance-attribute
id
input instance-attribute
input
output instance-attribute
output

ssn

SSN_PATTERN module-attribute
SSN_PATTERN = '^(\\d{2}|2A|2B)\\d{11}(\\d{2})?$'
Ssn module-attribute
Ssn = Annotated[str, StringConstraints(pattern=SSN_PATTERN)]

i18n

I18nKeys

Bases: BaseModel

A class to store i18n keys for a document parsing configuration. Each key should be translated in all languages.

i18n_by_lang instance-attribute
i18n_by_lang
model_post_init
model_post_init(__context)
Source code in components/documents/public/entities/i18n.py
def model_post_init(self, __context: Any) -> None:  # noqa: D102
    # check that the keys are the same in all languages
    keys: set[str] = set()
    for i18n_keys in self.i18n_by_lang.values():
        keys.update(i18n_keys.keys())
    for i18n_keys in self.i18n_by_lang.values():
        if set(i18n_keys.keys()) != keys:
            raise ValueError(
                f"Keys are not the same in all languages: {self.i18n_by_lang}"
            )

internal_control

DocumentInternalControlReviewInfo dataclass

DocumentInternalControlReviewInfo(
    id,
    document_id,
    document_extraction_result_id,
    operation_task_id,
    validation_status,
    created_at,
    updated_at,
)

Bases: DataClassJsonMixin

Information about a document internal control review

created_at instance-attribute
created_at
document_extraction_result_id instance-attribute
document_extraction_result_id
document_id instance-attribute
document_id
id instance-attribute
id
operation_task_id instance-attribute
operation_task_id
updated_at instance-attribute
updated_at
validation_status instance-attribute
validation_status

llm_context

ChatGptCallContext dataclass

ChatGptCallContext(
    messages,
    *,
    llm_model,
    example_ids,
    nb_calls,
    usage_total_tokens,
    error_type=None
)

Bases: LLMCallContext

The context of a call to an LLM for document extraction with the OpenAI conversation message format (role, content, name)

messages instance-attribute
messages

The list of messages in OpenAI format to be reused for a future call

LLMCallContext dataclass

LLMCallContext(
    *,
    llm_model,
    example_ids,
    nb_calls,
    usage_total_tokens,
    error_type=None
)

The context of a call to the LLM for document extraction

error_type class-attribute instance-attribute
error_type = None

The error type if any that happened when calling the LLM

example_ids instance-attribute
example_ids

The list of example ids used for the call

llm_model instance-attribute
llm_model

The LLM model used for the call

nb_calls instance-attribute
nb_calls

The number of calls to the LLM

usage_total_tokens instance-attribute
usage_total_tokens

The total tokens usage (input+output)

parsing

document_parser_result

DocumentExpectedOutput dataclass
DocumentExpectedOutput(id, content)

The document expected output to be injected in the LLM prompt as example assistant message

content instance-attribute
content
id instance-attribute
id
InMemoryDocumentExtractionResult dataclass
InMemoryDocumentExtractionResult(
    content, context, review_context
)

The result of a document extraction in memory

content instance-attribute
content

The structured output of a document parsing. Empty dictionary if the extraction failed

context instance-attribute
context

The context of the extraction (e.g. the LLM call context if the extraction was done with an LLM)

review_context instance-attribute
review_context

The context of the review of the extraction. None if the extraction is valid

flow

classification
DocumentClassificationConfiguration

Bases: BaseModel

The configuration used to classify a document. It can be used to classify the document on multiple classes

classifiers instance-attribute
classifiers

The classifiers to use to classify the document. Each classifiers can have multiple predicted classes as output. At least one classifier must be configured to predict the 'category' class.

model_post_init
model_post_init(__context)

Register sagemaker predictor in the registry if not already done

Source code in components/documents/public/entities/parsing/flow/classification.py
def model_post_init(self, __context: Any) -> None:
    """
    Register sagemaker predictor in the registry if not already done
    """
    for classifier_configuration in self.classifiers:
        if classifier_configuration.predictor_type == "sagemaker":
            SageMakerPredictorRegistry.register_if_not_exists(
                cast(
                    "SageMakerPredictorConfiguration",
                    classifier_configuration.predictor_configuration,
                )
            )
validate_category_in_classifiers
validate_category_in_classifiers()

Validate that the 'category' classifier is present

Source code in components/documents/public/entities/parsing/flow/classification.py
@model_validator(mode="after")
def validate_category_in_classifiers(self) -> Self:
    """
    Validate that the 'category' classifier is present
    """
    for classifier in self.classifiers:
        if "category" in classifier.classes:
            return self
    raise ValueError("Missing 'category' in the list of classes to classify")
validate_no_duplicate_classes
validate_no_duplicate_classes()

Check that no class is predicted more than once.

Source code in components/documents/public/entities/parsing/flow/classification.py
@model_validator(mode="after")
def validate_no_duplicate_classes(self) -> Self:
    """Check that no class is predicted more than once."""
    classified_classes: set[str] = set()
    for classifier in self.classifiers:
        for class_name in classifier.classes:
            if class_name in classified_classes:
                raise ValueError(
                    f"The class {class_name} is predicted by multiple predictors"
                )
            classified_classes.add(class_name)
    return self
DocumentClassifierConfiguration

Bases: BaseModel

The configuration to use a predictor to classify a document. A predictor can output multiple classes.

classes instance-attribute
classes

The list of class names that will be output by the classifier.

coerce_possible_labels classmethod
coerce_possible_labels(data)

Coerce possible labels from a list of string to dict. This is a convenient pre-processor to simplify the configuration.

Source code in components/documents/public/entities/parsing/flow/classification.py
@model_validator(mode="before")
@classmethod
def coerce_possible_labels(cls, data: Any) -> Any:
    """
    Coerce possible labels from a list of string to dict. This is a convenient pre-processor to simplify the configuration.
    """
    possible_labels = data.get("possible_labels")
    classes = data.get("classes")
    if classes and isinstance(classes, list) and len(classes) == 1:
        if isinstance(possible_labels, list):
            processed_possible_labels = []
            for label in possible_labels:
                if isinstance(label, str):
                    processed_possible_labels.append({classes[0]: label})
                else:
                    processed_possible_labels.append(label)
            data["possible_labels"] = processed_possible_labels
    return data
fallback_label property
fallback_label

If the returned label is not declared in the possible labels, we return this fallback label. By default, the fallback label is unclassifiable which means that the document is unsupported and cannot be parsed.

min_confidence class-attribute instance-attribute
min_confidence = None

The minimum confidence to have, otherwise we send to review

name property
name

The name of the classifier

possible_labels instance-attribute
possible_labels

The possible labels that the predictor can return. Each label should be a dict where keys represent the class names. For single class classifier, the list can only be the possible values for the class.

predictor_configuration instance-attribute
predictor_configuration

the predictor configuration

predictor_type instance-attribute
predictor_type

the predictor type to use. 'sagemaker' for a sagemaker predictor, 'llm' for a LLM predictor, 'fixed' is for a fixed classification to the unique possible label and 'pythonic' for a Python predictor.

validate_llm_sagemaker_predictor_single_class
validate_llm_sagemaker_predictor_single_class()

Validate that if the predictor type is a sagemaker or LLM one, the list of classes is composed of only one class. These two predictors do not handle multiple class predictions for now.

Source code in components/documents/public/entities/parsing/flow/classification.py
@model_validator(mode="after")
def validate_llm_sagemaker_predictor_single_class(self) -> Self:
    """
    Validate that if the predictor type is a sagemaker or LLM one, the list of classes is composed of only one class.
    These two predictors do not handle multiple class predictions for now.
    """
    if self.predictor_type in ["sagemaker", "llm"] and len(self.classes) > 1:
        raise ValueError(
            f"Predictor {self.predictor_type} can only handle one class prediction."
        )
    return self
validate_possible_classes
validate_possible_classes()

Validate that possible classes are dict where keys represent class names.

Source code in components/documents/public/entities/parsing/flow/classification.py
@model_validator(mode="after")
def validate_possible_classes(self) -> Self:
    """
    Validate that possible classes are dict where keys represent class names.
    """
    for possible_class in self.possible_labels:
        for key in possible_class.keys():
            if key not in self.classes:
                raise ValueError(f"{key} is not defined as a class name.")
    return self
validate_predictor_configuration_type
validate_predictor_configuration_type()

Validate that the predictor configuration is consistent with the predictor type

Source code in components/documents/public/entities/parsing/flow/classification.py
@model_validator(mode="after")
def validate_predictor_configuration_type(self) -> Self:
    """
    Validate that the predictor configuration is consistent with the predictor type
    """
    if self.predictor_type == "sagemaker":
        if not isinstance(
            self.predictor_configuration, SageMakerPredictorConfiguration
        ):
            raise ValueError(
                "The predictor configuration must be a SageMakerPredictorConfiguration"
            )
    elif self.predictor_type == "llm":
        if not isinstance(self.predictor_configuration, LLMPredictorConfiguration):
            raise ValueError(
                "The predictor configuration must be a LLMPredictorConfiguration"
            )
    elif self.predictor_type == "pythonic":
        if not isinstance(
            self.predictor_configuration, PythonicPredictorConfiguration
        ):
            raise ValueError(
                "The predictor configuration must be a PythonicPredictorConfiguration"
            )
    elif self.predictor_type == "fixed":
        if self.predictor_configuration is not None:
            raise ValueError(
                "The predictor configuration must be null if the predictor type is 'fixed'"
            )
        if len(self.possible_labels) > 1:
            raise ValueError(
                "The predictor configuration must contain only one possible label for 'fixed' predictor."
            )
    return self
configuration
DocumentAutoParsingFlowConfiguration

Bases: BaseModel

Configuration for the automatic parsing flow

classification_configuration instance-attribute
classification_configuration

The classification configuration

extraction_configuration instance-attribute
extraction_configuration

The extraction configuration

should_create_parsing_task_on_failure class-attribute instance-attribute
should_create_parsing_task_on_failure = True
transcription_configuration instance-attribute
transcription_configuration

The transcription configuration

extraction
DocumentCategoryExtractionConfiguration

Bases: BaseModel

Configuration to extract data from a given document category

auto_validate property
auto_validate

Returns whether the auto-validation is enabled

auto_validate_flag class-attribute instance-attribute
auto_validate_flag = True

whether to auto validate the extracted data. If set to False, the extraction result will be sent to review (auto-populated). It can also be set to a Feature Flag name to enable the auto-validation only if the feature flag is enabled.

extractor_configuration instance-attribute
extractor_configuration

the configuration of the dynamic LLM extractor for each possible category

extractor_type instance-attribute
extractor_type

the extractor type to use for this category

DocumentExtractionConfiguration

Bases: BaseModel

Configuration to extract data from a document

category_extraction_configurations instance-attribute
category_extraction_configurations

The extraction configuration for each category (classification)

document_handler instance-attribute
document_handler

the document handler to use by the extractors to fetch relevant example input and expected outputs

model_config class-attribute instance-attribute
model_config = ConfigDict(arbitrary_types_allowed=True)
parsing_flow
DocumentAutoParsingFlowOutput dataclass
DocumentAutoParsingFlowOutput(
    *,
    document_id,
    document_transcription_result=None,
    document_classification_result=None,
    document_extraction_result=None,
    document_operation_task=None
)

The output of the document automatic parsing flow

document_classification_result class-attribute instance-attribute
document_classification_result = None
document_extraction_result class-attribute instance-attribute
document_extraction_result = None
document_id instance-attribute
document_id
document_operation_task class-attribute instance-attribute
document_operation_task = None
document_transcription_result class-attribute instance-attribute
document_transcription_result = None
transcription
DocumentTranscriptionConfiguration

Bases: BaseModel

Configuration to transcribe a document

min_confidence_score class-attribute instance-attribute
min_confidence_score = 0.6

Document is sent to review if the confidence score is below. Value between 0 and 1

min_text_length class-attribute instance-attribute
min_text_length = 10

Document is sent to review if the text length is below

transcriber class-attribute instance-attribute
transcriber = 'textract'

Transcriber provider. only 'textract' & 'gemini' for now

parsing_configuration

DocumentCategoryConfiguration

Bases: BaseModel

Parsing configuration for a document category

category instance-attribute
category

The document category

extraction_content_model instance-attribute
extraction_content_model

Extraction content structured output model for the document category. Can be None if the document category is not supported for extraction. It can be used to generate the JSON schema for the manual parsing tool or to validated an automatic extraction. Add the json_schema_extra 'order' to specify the order of fields in the manual parsing tool.

icon instance-attribute
icon

The icon to display for the category in the manual parsing tool from https://tabler.io/icons ⧉ (ex: IconFileDollar)

internal_control_ratio class-attribute instance-attribute
internal_control_ratio = Field(default=0.0, ge=0, le=1)

the ratio of internal control to apply on this category (between 0 and 1)

unsupported class-attribute instance-attribute
unsupported = False

If true, the document category is unsupported and cannot be extracted. It can only be rejected.

validate_json_schema classmethod
validate_json_schema(extraction_content_model)

Check that the JSON schema is valid

Source code in components/documents/public/entities/parsing/parsing_configuration.py
@field_validator("extraction_content_model")
@classmethod
def validate_json_schema(
    cls, extraction_content_model: type[BaseModel] | None
) -> type[BaseModel] | None:
    """
    Check that the JSON schema is valid
    """
    if extraction_content_model:
        json_schema = extraction_content_model.model_json_schema()
        if "order" in json_schema:
            field_order = json_schema["order"]
            if not isinstance(field_order, list):
                raise ValueError(
                    f"Extraction content model {extraction_content_model.__name__} order key is not a list"
                )
            if len(json_schema["order"]) != len(json_schema["properties"]):
                raise ValueError(
                    f"Order is defined for {extraction_content_model.__name__} but not all fields are in the order"
                )
    return extraction_content_model
validate_unsupported_extraction_content_consistency
validate_unsupported_extraction_content_consistency()

Validate the extraction content model

Source code in components/documents/public/entities/parsing/parsing_configuration.py
@model_validator(mode="after")
def validate_unsupported_extraction_content_consistency(self) -> Self:
    """
    Validate the extraction content model
    """
    if self.extraction_content_model is None and not self.unsupported:
        raise ValueError(
            f"Document category {self.category} has no extraction content model and is not unsupported. Please check the configuration."
        )
    if self.unsupported and self.extraction_content_model:
        raise ValueError(
            f"Document category {self.category} is unsupported but has an extraction content model. Please check the configuration."
        )
    return self
DocumentParsingConfiguration

Bases: BaseModel

Class that configures the document parsing process for a given document type.

document_auto_parsing_flow_configuration class-attribute instance-attribute
document_auto_parsing_flow_configuration = None

Configuration for the auto-parsing flow. If None, documents will be manually parsed.

document_categories instance-attribute
document_categories

the document categories and their relative information (icon, extraction content model, etc.)

document_type instance-attribute
document_type

the document type for which the parsing configuration is defined

get_document_categories
get_document_categories(lang=Lang.english)

Get the list of document categories with display info for a given document type

Source code in components/documents/public/entities/parsing/parsing_configuration.py
def get_document_categories(
    self, lang: Lang = Lang.english
) -> list[DocumentCategoryDisplayInfo]:
    """
    Get the list of document categories with display info for a given document type
    """
    return [
        DocumentCategoryDisplayInfo(
            category=category_info.category,
            lang=lang,
            label=translate(language=lang, key_string=category_info.category),
            icon=category_info.icon,
        )
        for category_info in self.document_categories.values()
    ]
get_document_category_json_schema
get_document_category_json_schema(
    document_category, lang=None
)

Get the JSON schema for a given document category for the manual parsing tool. :param document_category: the document category :param lang: the language in which to translate the schema. If None, the schema is returned as is :return: the JSON schema for the document category

Source code in components/documents/public/entities/parsing/parsing_configuration.py
def get_document_category_json_schema(
    self,
    document_category: DocumentCategory,
    lang: Lang | None = None,
) -> dict[str, Any] | None:
    """
    Get the JSON schema for a given document category for the manual parsing tool.
    :param document_category: the document category
    :param lang: the language in which to translate the schema. If None, the schema is returned as is
    :return: the JSON schema for the document category
    """
    category_configuration = self.document_categories[document_category]
    if category_configuration.unsupported:
        return None
    schema = (
        mandatory(
            category_configuration.extraction_content_model
        ).model_json_schema(schema_generator=ParsingFormFriendlyJsonSchemaGenerator)
        if document_category in self.document_categories
        else None
    )
    if schema and lang:
        # Translate "title" of the schema properties (including items)
        schema = copy.deepcopy(schema)
        self._translate_json_schema_in_place(lang, schema)
    return schema
i18n_keys class-attribute instance-attribute
i18n_keys = None

i18n keys for the document parsing tool to translate categories and extraction fields

validate_category_existence_in_extraction
validate_category_existence_in_extraction()

Validate that auto-parsing flow extraction configuration have an extraction configuration for configured categories

Source code in components/documents/public/entities/parsing/parsing_configuration.py
@model_validator(mode="after")
def validate_category_existence_in_extraction(self) -> Self:
    """
    Validate that auto-parsing flow extraction configuration have an extraction configuration for configured categories
    """
    if (
        self.document_auto_parsing_flow_configuration
        and self.document_auto_parsing_flow_configuration.extraction_configuration
    ):
        for category in self.document_auto_parsing_flow_configuration.extraction_configuration.category_extraction_configurations:
            if category not in self.document_categories:
                raise ValueError(
                    f"DocumentExtractionConfiguration has an extraction configuration for category '{category}' that is not declared in document_categories."
                )
    return self
validate_document_categories
validate_document_categories()

Check if we have at least 1 unsupported categories (generally the fallback category used in the DocumentClassificationConfiguration). Otherwise, we add the unclassifiable category to the document categories.

Source code in components/documents/public/entities/parsing/parsing_configuration.py
@model_validator(mode="after")
def validate_document_categories(self) -> Self:
    """
    Check if we have at least 1 unsupported categories (generally the fallback category used in the DocumentClassificationConfiguration).
    Otherwise, we add the unclassifiable category to the document categories.
    """
    for category_info in self.document_categories.values():
        if category_info.unsupported:
            return self
    current_logger.debug(
        f"No unsupported categories found for {self.document_type}. Adding the '{UNCLASSIFIABLE}' category."
    )
    self.document_categories[UNCLASSIFIABLE] = DocumentCategoryConfiguration(
        category=UNCLASSIFIABLE,
        icon="IconFileUnknown",
        extraction_content_model=None,
        unsupported=True,
    )
    return self
validate_extraction_model_consistency
validate_extraction_model_consistency()

Validate the consistency of the extraction content model between the document category configuration and the existing dynamic LLM extraction configuration in the auto-parsing flow configuration.

Source code in components/documents/public/entities/parsing/parsing_configuration.py
@model_validator(mode="after")
def validate_extraction_model_consistency(self) -> Self:
    """
    Validate the consistency of the extraction content model between the document category configuration and the existing dynamic LLM extraction configuration in the auto-parsing flow configuration.
    """
    if (
        self.document_auto_parsing_flow_configuration
        and self.document_auto_parsing_flow_configuration.extraction_configuration
    ):
        for category in self.document_categories:
            if (
                category_extraction_configuration
                := self.document_auto_parsing_flow_configuration.extraction_configuration.category_extraction_configurations.get(
                    category
                )
            ):
                if (
                    category_extraction_configuration.extractor_type
                    == ExtractorType.dynamic_llm
                ):
                    # check that the content type is the same as the one used in the document category configuration
                    if (
                        category_extraction_configuration.extractor_configuration.content_type
                        != self.document_categories[
                            category
                        ].extraction_content_model
                    ):
                        raise ValueError(
                            f"Extraction content model for category {category} in the document category configuration is different from the one used in the dynamic LLM extraction configuration. Please check that the content type is the same."
                        )
    return self

parsing_result

DocumentParsingData dataclass
DocumentParsingData(
    classification, extraction, rejection_reasons
)

Bases: DataClassJsonMixin

Data class to store the last parsing step results of a document

category property
category

Gets the category from the classification results.

This property retrieves the value associated with the "category" key from the classification object's result attribute, if available. If the classification object is not present or does not contain the necessary information, it returns None.

Returns:

Type Description
str | None

str | None: The category extracted from the classification result

str | None

if available, otherwise None.

classification instance-attribute
classification
extraction instance-attribute
extraction
rejection_reasons instance-attribute
rejection_reasons
subcategory property
subcategory

Returns the subcategory from the classification result.

The subcategory is retrieved from the subcategory key of the classification result dictionary. If the classification object is not present, or if the subcategory key does not exist, the method will return None.

:return: Subcategory value from the classification result or None. :rtype: str | None

ExtractionResultData dataclass
ExtractionResultData(
    id,
    version,
    validation_status,
    source,
    creator_ref,
    created_at,
    result,
    review_context,
)

Bases: StepResultData

Data class to store the result of an extraction step

review_context instance-attribute
review_context
StepResultData dataclass
StepResultData(
    id,
    version,
    validation_status,
    source,
    creator_ref,
    created_at,
    result,
)

Bases: DataClassJsonMixin

Data class to store the result of a step

created_at instance-attribute
created_at
creator_ref instance-attribute
creator_ref
id instance-attribute
id
result instance-attribute
result
source instance-attribute
source
validation_status instance-attribute
validation_status
version instance-attribute
version

transcription

TranscriptionContext dataclass

TranscriptionContext(
    transcription_source=None,
    transcription_confidence=None,
    transcription_pct_handwritten=None,
)

Bases: DataClassJsonMixin

Context of a transcription

transcription_confidence class-attribute instance-attribute
transcription_confidence = None
transcription_pct_handwritten class-attribute instance-attribute
transcription_pct_handwritten = None
transcription_source class-attribute instance-attribute
transcription_source = None

TranscriptionReviewContext dataclass

TranscriptionReviewContext(reasons_for_review)

Bases: DataClassJsonMixin

Review Context of a transcription

reasons_for_review instance-attribute
reasons_for_review

TranscriptionSource module-attribute

TranscriptionSource = Literal['textract', 'gemini']

validation

ExtractionValidationErrors dataclass

ExtractionValidationErrors(
    overall_validation_errors, field_validation_errors
)

Bases: DataClassJsonMixin

Structure for holding validation errors for an extraction

field_validation_errors instance-attribute
field_validation_errors

Field validation errors, where keys are JSON paths to the fields with errors and values are lists of error messages.

from_pydantic_validation_error classmethod
from_pydantic_validation_error(validation_error)

Converts a Pydantic ValidationError into a ValidationErrors object.

Source code in components/documents/public/entities/validation.py
@classmethod
def from_pydantic_validation_error(
    cls, validation_error: PydanticValidationError
) -> Self:
    """Converts a Pydantic ValidationError into a ValidationErrors object."""
    pydantic_errors = validation_error.errors()
    global_errors = []
    field_errors: dict[str, list[FieldValidationError]] = {}

    for error in pydantic_errors:
        if not error["loc"]:
            global_errors.append(error["msg"])
        else:
            field_error = FieldValidationError.from_pydantic_error(error)
            json_path = field_error.json_path
            if json_path not in field_errors:
                field_errors[json_path] = []
            field_errors[json_path].append(field_error)

    return cls(
        overall_validation_errors=global_errors,
        field_validation_errors=field_errors,
    )
overall_validation_errors instance-attribute
overall_validation_errors

Overall validation errors, not related to a specific field.

FieldValidationError dataclass

FieldValidationError(msg, type, json_path)

Bases: DataClassJsonMixin

Error message for a field validation error.

from_pydantic_error classmethod
from_pydantic_error(error)

Converts a Pydantic error details into a FieldValidationError object.

Source code in components/documents/public/entities/validation.py
@classmethod
def from_pydantic_error(cls, error: PydanticErrorDetails) -> Self:
    """Converts a Pydantic error details into a FieldValidationError object."""
    return cls(
        msg=error["msg"],
        type=error["type"],
        json_path=_loc_to_json_path(error["loc"]),
    )
json_path instance-attribute
json_path

The JSON path to the field with the error

msg instance-attribute
msg

The error message

type instance-attribute
type

The type of error

components.documents.public.enums

classification

ClassificationReasonForReview

Bases: AlanBaseEnum

Reasons for reviewing a classification

fallback_to_default class-attribute instance-attribute
fallback_to_default = 'fallback_to_default'

The classification returned an unknown label or failed and a fallback label was used

low_confidence_score class-attribute instance-attribute
low_confidence_score = 'low_confidence_score'

The classification confidence score is too low

document_type

DocumentType

Bases: AlanBaseEnum

List all possible document types using the component service (document storage, embedding and/or parsing).

note: Prefix with the country code when the document is specific to a country (fr_, be_, es_, etc.).

BeInsuranceDocument class-attribute instance-attribute
BeInsuranceDocument = 'be_insurance_document'
CaInsuranceDocument class-attribute instance-attribute
CaInsuranceDocument = 'ca_insurance_document'
FrAlsaceMoselleEligibilityRequest class-attribute instance-attribute
FrAlsaceMoselleEligibilityRequest = (
    "fr_alsace_moselle_eligibility_request"
)
FrAniJustification class-attribute instance-attribute
FrAniJustification = 'fr_ani_justification'
FrClaimsReporting class-attribute instance-attribute
FrClaimsReporting = 'fr_claims_reporting'
FrFraudDocument class-attribute instance-attribute
FrFraudDocument = 'fr_fraud_document'
FrGuaranteeLabel class-attribute instance-attribute
FrGuaranteeLabel = 'fr_guarantee_label'
FrHelpArticle class-attribute instance-attribute
FrHelpArticle = 'fr_help_article'
FrIncomeEligibilityRequest class-attribute instance-attribute
FrIncomeEligibilityRequest = "fr_income_eligibility_request"
FrInsuranceDocument class-attribute instance-attribute
FrInsuranceDocument = 'fr_insurance_document'
FrPrevoyanceCompetitorContract class-attribute instance-attribute
FrPrevoyanceCompetitorContract = (
    "fr_prevoyance_competitor_contract"
)
FrRetireeEligibilityRequest class-attribute instance-attribute
FrRetireeEligibilityRequest = (
    "fr_retiree_eligibility_request"
)
FrShop class-attribute instance-attribute
FrShop = 'fr_shop'
FrSocialFundsEligibilityRequest class-attribute instance-attribute
FrSocialFundsEligibilityRequest = (
    "fr_social_funds_eligibility_request"
)
ResolutionPlatformMacro class-attribute instance-attribute
ResolutionPlatformMacro = 'resolution_platform_macro'
SupportDocument class-attribute instance-attribute
SupportDocument = 'support_document'

embedding_algorithm

EmbeddingAlgorithm

Bases: AlanBaseEnum

Please refer to the "Embedding algorithms" section in the README to help you choose the right algorithm.

all_minilm_l6_v2 class-attribute instance-attribute
all_minilm_l6_v2 = 'all-MiniLM-L6-v2'
text_embedding_3_large class-attribute instance-attribute
text_embedding_3_large = 'text-embedding-3-large'
text_embedding_ada_002 class-attribute instance-attribute
text_embedding_ada_002 = 'text-embedding-ada-002'

extraction

parser_type

ExtractorType

Bases: AlanBaseEnum

Extractor types

dynamic_llm class-attribute instance-attribute
dynamic_llm = 'dynamic_llm'

reason_for_review

ExtractionReasonForReview

Bases: AlanBaseEnum

Global reasons for reviewing an extraction. These are reasons for reviewing the extraction as a whole, not specific to a field.

auto_populated class-attribute instance-attribute
auto_populated = 'auto_populated'

The extraction was auto-populated

no_content class-attribute instance-attribute
no_content = 'no_content'

The extraction contains no content

validation_failed class-attribute instance-attribute
validation_failed = 'validation_failed'

The extraction failed validation

parsing_rejection_reason

ParsingRejectionReason

Bases: AlanBaseEnum

General reasons for rejecting a document during parsing

blurry class-attribute instance-attribute
blurry = 'blurry'

"unreadable text or low quality

cropped_document class-attribute instance-attribute
cropped_document = 'cropped_document'

Document is cropped and some texts are cropped

invalid_content class-attribute instance-attribute
invalid_content = 'invalid_content'

missing required information or invalid content regarding the schema

no_text class-attribute instance-attribute
no_text = 'no_text'

no text found in the document

unsupported class-attribute instance-attribute
unsupported = 'unsupported'

unsupported document not associated to any document category

step_source

StepSource

Bases: AlanBaseEnum

Source of the step

auto class-attribute instance-attribute
auto = 'auto'
manual class-attribute instance-attribute
manual = 'manual'

step_validation_status

StepValidationStatus

Bases: AlanBaseEnum

Status of the validation of a step

review_needed class-attribute instance-attribute
review_needed = 'review_needed'
validated class-attribute instance-attribute
validated = 'validated'

transcription

TranscriptionReasonForReview

Bases: AlanBaseEnum

Reasons for reviewing a transcription

empty_transcription class-attribute instance-attribute
empty_transcription = 'empty_transcription'
low_confidence_score class-attribute instance-attribute
low_confidence_score = 'low_confidence_score'
too_short_text class-attribute instance-attribute
too_short_text = 'too_short_text'

components.documents.public.events

document

DocumentParsingValidated dataclass

DocumentParsingValidated(
    document_id,
    document_type,
    document_category,
    extraction_version,
    classification_version,
    has_rejection_reasons,
)

Bases: Message

This event is published when a document parsing has been validated by an operator.

classification_version instance-attribute
classification_version
document_category instance-attribute
document_category
document_id instance-attribute
document_id
document_type instance-attribute
document_type
extraction_version instance-attribute
extraction_version
has_rejection_reasons instance-attribute
has_rejection_reasons

transcription

DocumentTranscriptionValidated dataclass

DocumentTranscriptionValidated(
    document_id, document_type, transcription_version
)

Bases: Message

This event is published when a valid document transcription result is saved.

document_id instance-attribute
document_id
document_type instance-attribute
document_type
transcription_version instance-attribute
transcription_version

components.documents.public.helpers

parsing_data

results_to_document_parsing_data

results_to_document_parsing_data(
    classification, extraction
)

Convert classification and extraction results into a DocumentParsingData object.

Parameters:

Name Type Description Default
classification DocumentClassificationResult | None

The classification result object, if available

required
extraction DocumentExtractionResult | None

The extraction result object, if available

required

Returns:

Name Type Description
DocumentParsingData DocumentParsingData

A data object containing the formatted classification and extraction results

Source code in components/documents/public/helpers/parsing_data.py
def results_to_document_parsing_data(
    classification: DocumentClassificationResult | None,
    extraction: DocumentExtractionResult | None,
) -> DocumentParsingData:
    """Convert classification and extraction results into a DocumentParsingData object.

    Args:
        classification (DocumentClassificationResult | None): The classification result object, if available
        extraction (DocumentExtractionResult | None): The extraction result object, if available

    Returns:
        DocumentParsingData: A data object containing the formatted classification and extraction results
    """
    return DocumentParsingData(
        classification=(
            StepResultData(
                id=classification.id,
                version=classification.version,
                validation_status=classification.validation_status,
                source=classification.source,
                creator_ref=classification.creator_ref,
                created_at=classification.created_at,
                result=classification.classification_result,
            )
            if classification
            else None
        ),
        extraction=(
            ExtractionResultData(
                id=extraction.id,
                version=extraction.version,
                validation_status=extraction.validation_status,
                source=extraction.source,
                creator_ref=extraction.creator_ref,
                created_at=extraction.created_at,
                result=extraction.extraction_result,
                review_context=ExtractionReviewContext.from_dict(
                    extraction.review_context
                )
                if extraction and extraction.review_context
                else None,
            )
            if extraction
            else None
        ),
        rejection_reasons=(extraction.rejection_reasons if extraction else None),
    )

validation_helpers

ValidationHelpers

A collection of static helper methods for validation across different document types.

are_floats_equal staticmethod
are_floats_equal(float1, float2, tolerance=0.01)

Compares two floats for equality within a given tolerance. Returns True if both floats are None, False if either (but not both) is None.

Source code in components/documents/public/helpers/validation_helpers.py
@staticmethod
def are_floats_equal(
    float1: Optional[float], float2: Optional[float], tolerance: float = 0.01
) -> bool:
    """
    Compares two floats for equality within a given tolerance.
    Returns True if both floats are None, False if either (but not both) is None.
    """
    if float1 is None and float2 is None:
        return True
    if float1 is None or float2 is None:
        return False
    return abs(float1 - float2) < tolerance
format_currency_cad staticmethod
format_currency_cad(value)

Formats a float into a CAD currency string e.g., 1234.56 -> "$1,234.56". Returns "N/A" if value is None.

Source code in components/documents/public/helpers/validation_helpers.py
@staticmethod
def format_currency_cad(value: Optional[float]) -> str:
    """
    Formats a float into a CAD currency string e.g., 1234.56 -> "$1,234.56".
    Returns "N/A" if value is None.
    """
    if value is None:
        return "N/A"
    return f"${value:,.2f}"
format_currency_eur staticmethod
format_currency_eur(value)

Formats a float into a EUR currency string e.g., 1234.56 -> "1234,56€". Returns "N/A" if value is None.

Source code in components/documents/public/helpers/validation_helpers.py
@staticmethod
def format_currency_eur(value: Optional[float]) -> str:
    """
    Formats a float into a EUR currency string e.g., 1234.56 -> "1234,56€".
    Returns "N/A" if value is None.
    """
    if value is None:
        return "N/A"
    return f"{value:,.2f}".replace(".", ",") + "€"
parse_percentage_string staticmethod
parse_percentage_string(percentage_str)

Converts a percentage string (e.g., "2.10%") to a float (e.g., 0.021).

Source code in components/documents/public/helpers/validation_helpers.py
@staticmethod
def parse_percentage_string(percentage_str: Optional[str]) -> Optional[float]:
    """Converts a percentage string (e.g., "2.10%") to a float (e.g., 0.021)."""
    if percentage_str is None:
        return None
    try:
        cleaned_str = percentage_str.replace("%", "").replace(",", ".")
        return float(cleaned_str) / 100.0
    except ValueError:
        return None

components.documents.public.mappers

document_type_to_country_mapper

get_document_types_for_country

get_document_types_for_country(country)

Get the document types associated with a country.

Source code in components/documents/public/mappers/document_type_to_country_mapper.py
def get_document_types_for_country(country: Country) -> list[DocumentType]:
    """
    Get the document types associated with a country.
    """
    return [
        document_type
        for document_type, document_country in DOCUMENT_TYPE_TO_COUNTRY.items()
        if document_country == country
    ]

components.documents.public.types

DocumentCategory module-attribute

DocumentCategory = str

DocumentId module-attribute

DocumentId = UUID

TaskId module-attribute

TaskId = UUID