Skip to content

Ingest state machine

The document pipeline drives every file through an explicit state machine. A crash mid-file resumes from the last committed step instead of re-running everything. This document is the engineer reference: step diagram, error categories, retry semantics, and how to add a new step.

Related code:

Data model

Every ingestion creates one DocumentIngestionRun. Each file in the run becomes one DocumentIngestionRunItem. Each step transition emits one DocumentIngestionRunEvent with the model id, token counts, cost in cents, and (on failure) the error category.

DocumentIngestionRun (kind, status, item_count_*)
  └── DocumentIngestionRunItem (step, retry_count, last_error_kind)
        └── DocumentIngestionRunEvent  (step_started | step_completed | step_failed | retry_scheduled)

Steps

ENQUEUED → FETCHED → CONTENT_EXTRACTED →
  (CHUNKED || EMBEDDED || ENTITY_EXTRACTED) →
    PERSISTED → INDEXED → DONE

  Any state → FAILED

The parallel fan-out runs concurrently via DocumentIngestionStateMachine.run_parallel_steps and rendezvous at PERSISTED. The worker today collapses the sequence into a single ENQUEUED → DONE transition that wraps the existing orchestrator; per-step granularity will land alongside the orchestrator restructure.

Error categories

Every external call returns one of five categories. The state machine routes based on the category:

error_kind Worker reaction
"" (status=ok) Advance to the next step.
RateLimited Wait on the rate budget, retry the same step.
Transient Exponential backoff, retry up to max_retries.
SchemaInvalid Terminate to the Failed Items Inbox. Do not retry — same input → same output → token waste.
Fatal Terminate to the Failed Items Inbox. Auth failures, missing input, programmer bugs.

The reaction matrix is enforced inside DocumentIngestionStateMachine.run_step. Categories are produced by result_from_exception (which maps exception subclasses) or directly by Lambda handlers (which embed the category in their return envelope).

Idempotency

run_step(target, callable) is idempotent. If item.step is already at-or-past target, the callable is not invoked and a synthetic ok envelope is returned. This is the resume-from-crash mechanism: when SQS redelivers a message, the worker picks up where it left off.

The transition itself is atomic via SELECT FOR UPDATE + update() inside a transaction. The SQS visibility timeout is the outer lock.

Adding a new step

  1. Add the value to DocumentIngestionStep in models/ingest.py.
  2. Insert it into STEP_ORDER in services/document_ingestion/state_machine.py.
  3. Generate a migration (makemigrations picks up the choices change).
  4. Update the worker to call sm.run_step(NEW_STEP, callable) at the right point in the pipeline. If multiple new steps run concurrently, use run_parallel_steps.
  5. Add a unit test covering the happy path + all four failure categories for the new callable.

Cross-references