Ingest state machine¶
The document pipeline drives every file through an explicit state machine. A crash mid-file resumes from the last committed step instead of re-running everything. This document is the engineer reference: step diagram, error categories, retry semantics, and how to add a new step.
Related code:
- Models —
apps/main_app/models/ingest.py - State machine —
apps/main_app/services/document_ingestion/state_machine.py - Error envelope —
apps/main_app/services/document_ingestion/envelope.py - Trigger entrypoints —
apps/main_app/services/document_ingestion/trigger.py - Worker —
apps/main_app/management/commands/run_document_worker.py
Data model¶
Every ingestion creates one DocumentIngestionRun. Each file in the run becomes
one DocumentIngestionRunItem. Each step transition emits one DocumentIngestionRunEvent
with the model id, token counts, cost in cents, and (on failure) the
error category.
DocumentIngestionRun (kind, status, item_count_*)
└── DocumentIngestionRunItem (step, retry_count, last_error_kind)
└── DocumentIngestionRunEvent (step_started | step_completed | step_failed | retry_scheduled)
Steps¶
ENQUEUED → FETCHED → CONTENT_EXTRACTED →
(CHUNKED || EMBEDDED || ENTITY_EXTRACTED) →
PERSISTED → INDEXED → DONE
Any state → FAILED
The parallel fan-out runs concurrently via DocumentIngestionStateMachine.run_parallel_steps
and rendezvous at PERSISTED. The worker today collapses the
sequence into a single ENQUEUED → DONE transition that wraps the
existing orchestrator; per-step granularity will land alongside the
orchestrator restructure.
Error categories¶
Every external call returns one of five categories. The state machine routes based on the category:
error_kind |
Worker reaction |
|---|---|
"" (status=ok) |
Advance to the next step. |
RateLimited |
Wait on the rate budget, retry the same step. |
Transient |
Exponential backoff, retry up to max_retries. |
SchemaInvalid |
Terminate to the Failed Items Inbox. Do not retry — same input → same output → token waste. |
Fatal |
Terminate to the Failed Items Inbox. Auth failures, missing input, programmer bugs. |
The reaction matrix is enforced inside DocumentIngestionStateMachine.run_step.
Categories are produced by result_from_exception (which maps
exception subclasses) or directly by Lambda handlers (which embed the
category in their return envelope).
Idempotency¶
run_step(target, callable) is idempotent. If item.step is already
at-or-past target, the callable is not invoked and a synthetic
ok envelope is returned. This is the resume-from-crash mechanism:
when SQS redelivers a message, the worker picks up where it left off.
The transition itself is atomic via SELECT FOR UPDATE + update()
inside a transaction. The SQS visibility timeout is the outer lock.
Adding a new step¶
- Add the value to
DocumentIngestionStepinmodels/ingest.py. - Insert it into
STEP_ORDERinservices/document_ingestion/state_machine.py. - Generate a migration (
makemigrationspicks up the choices change). - Update the worker to call
sm.run_step(NEW_STEP, callable)at the right point in the pipeline. If multiple new steps run concurrently, userun_parallel_steps. - Add a unit test covering the happy path + all four failure categories for the new callable.
Cross-references¶
document-pipeline.md— full redesign rationale (load envelopes, decisions, phasing).observability-stack.md— where OTel spans, Sentry breadcrumbs, and CloudWatch metrics land.- Failed Items Inbox — admin view for terminal-failed items with one-click requeue.