Document Ingestion Pipeline¶
How CampusCore turns documents (scraped pages, uploaded files, connector imports, chat attachments) into chunked, embedded, entity-typed records that the agent can retrieve from. This doc describes the running system; for engineer-level detail on the state machine (step diagram, error categories, how to add a step), see document-ingestion-state-machine.md.
1. Overview¶
Every ingestion — regardless of source — flows through one unified pipeline:
- A trigger creates a
DocumentIngestionRun+ oneDocumentIngestionRunItemper file (DB rows; durable from this point). - The
TwoQueueDispatcherpublishes each item onto either the interactive or bulk SQS queue. - The worker (ECS Fargate) polls interactive-first, pulls a batch under an
AdmissionControllermemory gate, and runs each item through the state machine concurrently via asyncio. - The
DocumentIngestionStateMachinedrives each item through ordered, idempotent step transitions; every transition is a state-machine row write + aDocumentIngestionRunEvent(cost, duration, error kind). - Each step is a pure async function (
fetch,extract_content,chunk_and_embed,extract_entities,persist,finalize). They all run in-process in the worker — there is no separate compute fabric. - On terminal failure the item lands in the Failed Items Inbox (an admin proxy over rows in
error_kind ∈ {SchemaInvalid, Fatal}). On retryable failure the worker backs off and retries up to a per-step budget.
A crash mid-file resumes from the last committed step — no LLM-token re-burn.
2. Architecture¶
trigger.enqueue(kind, items)
│
▼
┌─────────────────────────────────────────────┐
│ DocumentIngestionRun + Item rows (DB) │
│ — durable from this point │
└────────────────────┬────────────────────────┘
│
▼
┌──────────────────────┐
│ TwoQueueDispatcher │
└──────────┬───────────┘
│
┌─────────┴─────────┐
▼ ▼
SQS interactive SQS bulk
│ │
└────────┬──────────┘ (interactive drained first)
▼
┌──────────────────────────┐
│ ECS worker (Fargate) │
│ asyncio batch, │
│ max_concurrency = 10, │
│ AdmissionController gate│
└────────────┬─────────────┘
│ (one task per item)
▼
┌──────────────────────────────────────┐
│ DocumentIngestionStateMachine │
│ .run_step(target, callable) │
│ │
│ for each step (idempotent): │
│ ▸ fetch │
│ ▸ extract_content │
│ ▸ chunk_and_embed │
│ ▸ extract_entities │
│ ▸ persist │
│ ▸ finalize │
└────────────┬─────────────────────────┘
│
▼
DocumentIngestionStep advances
+ DocumentIngestionRunEvent written
(step / duration / model_id / tokens / cost / error_kind)
External boundaries the worker talks to:
- Postgres (TimescaleDB image) — the durable layer for
DocumentIngestionRun*,Document,DocumentChunk,SourceFile, the entity satellite tables, and the embeddings (pgvector with HNSW). - SQS — the work queue (LocalStack/ElasticMQ in dev; real SQS in cloud).
- S3 — for raw source files and uploads. Not for per-step staging (the original Lambda-era staging bucket was removed; see §10).
- Gemini / OpenAI — LLM/embedding providers. Calls go through
apps/main_app/clients/. - Redis — rate-budget coordinator (Lua-scripted token bucket per provider).
3. Data model¶
Defined in apps/main_app/models/document_ingestion.py. Three tables and four TextChoices enums:
| Table | Purpose |
|---|---|
DocumentIngestionRun |
One row per ingestion operation (scrape, folder upload, chat attachment, connector import, manual reprocess). Aggregates cost + counts. |
DocumentIngestionRunItem |
One row per file. Carries the current step, retry count, last error, FK to the SourceFile. |
DocumentIngestionRunEvent |
Append-only per-step lifecycle log: step_started / step_completed / step_failed / retry_scheduled. Holds model_id, tokens_in/out, cost_cents, duration_ms, error_kind, error_message. |
Enums (models.TextChoices):
DocumentIngestionRunKind—scrape/folder_upload/chat_attachment/connector_import/manual_reprocess.DocumentIngestionRunStatus—pending/running/completed/failed/cancelled.DocumentIngestionStep—enqueued/fetched/content_extracted/chunked/embedded/entity_extracted/persisted/indexed/done/failed. The first nine form the happy-pathSTEP_ORDERtuple in the state machine.DocumentIngestionErrorKind—""(none) /RateLimited/Transient/SchemaInvalid/Fatal. Mirrored at the application layer bystep_result.ErrorKind.
Admin entry points: /cc_admin/ → Pipeline Manager (/cc_admin/pipeline/), Failed Items Inbox.
4. Pipeline steps¶
The state machine doesn't know what each step does — it just orchestrates. Each step is one async function under apps/main_app/services/document_ingestion/pipeline/, registered into the PIPELINE tuple consumed by runner.run_pipeline. Most are pure logic; the LLM-bearing ones are flagged.
| Step | Module | LLM? | What it does |
|---|---|---|---|
fetched |
pipeline/fetch.py |
— | Pulls raw bytes from S3 / HTTP into ctx.raw_bytes and resolves the content type. |
content_extracted |
pipeline/extract.py |
yes | Dispatches into services/content_extraction: HTML → markdown via Gemini, files → markdown via the parser registry, markdown → passthrough. Produces ctx.markdown, ctx.raw_chunks, ctx.extracted_metadata. |
chunked + embedded |
pipeline/chunk_and_embed.py |
yes (embedding) | package_chunks finalizes the chunks produced upstream (or splits raw markdown via chunk_markdown). embed_chunks calls OpenAI text-embedding-3-small (1536d) in batches and stores vectors. |
entity_extracted |
pipeline/entity_extract.py |
yes | One Gemini call → an EntityBatch of typed entities (Article / Event / Course / …). Trivial markdown (<200 chars) bypasses the LLM and synthesises a GenericPage. |
persisted |
pipeline/persist.py |
— | DocumentWriter.write_entity_batch(...) upserts Document + entity satellite rows + DocumentChunk rows in one transaction. |
done |
pipeline/finalize.py |
— | Closes the item, flushes counters, marks the run complete if it was the last item. |
The two LLM-bearing services (content_extraction.process_* and EntityExtractor) live as their own packages with their own front doors and tests — the pipeline only composes them.
5. Coordination primitives¶
The pieces that decide what runs when, separate from the per-file pipeline above.
TwoQueueDispatcher¶
Two SQS queues, one interactive and one bulk. The worker drains interactive first; a 25k-page scrape never sits in front of a chat attachment. The dispatcher classifies on enqueue by DocumentIngestionRunKind (chat_attachment + small folder_upload → interactive; scrape + connector_import + large folder uploads → bulk).
AdmissionController¶
A memory-watermark gate around SQS pulls. The worker checks container memory usage before each batch pull:
- 0.85 watermark → stop pulling new work (back-pressure).
- 0.65 watermark → resume pulling.
Prevents OOM-kills on bulk runs where embedding payloads spike memory.
Rate-budget coordinator¶
A Redis Lua-script token bucket, with separate RPM and TPM buckets per provider/model (e.g. gemini.flash-lite.rpm, gemini.flash-lite.tpm, openai.embeddings.rpm, openai.embeddings.tpm). Refill rates are tuned to ~90% of each published quota.
Every LLM/embedding call site wraps its invocation in the gate(budget, provider, model_family) async context manager:
- Pre-call — acquires one slot from the
.rpmbucket. If the bucket is empty,gatesleeps for the bucket's reportedwait_msand retries (the RPM cap becomes a queue, not an error). The work eventually runs rather than failing. - Post-call — the call site invokes
await g.charge(response)(orcharge_tokens(n)for embeddings, which don't exposeusage_metadata), debiting the actual token count from the.tpmbucket. Over-budget TPM logs a warning but doesn't block — the work is already done.
The backend is selected at process start by get_rate_budget(): RedisRateBudget when settings.REDIS_URL is set (fleet-wide coordination), else InProcessRateBudget (dev/single-worker). The 6 wired call sites are the entity extractor, Gemini OCR parser, HTML→markdown conversion, metadata extraction, summary generation, and OpenAI chunk embeddings.
ECS scale-up pre-warm (scale_workers_for_bulk)¶
When a bulk-trigger (scrape, large folder upload) creates a run, trigger.py bumps the ECS worker service's desired_count so workers are ready when the SQS lag hits. Tied to INGEST_BULK_WORKER_DESIRED_COUNT and friends in settings.
6. Error handling¶
The 4-way exception taxonomy in services/document_ingestion/exceptions.py (mirrored by step_result.ErrorKind):
| Exception | ErrorKind |
State-machine reaction |
|---|---|---|
RateLimitedError |
RateLimited |
Acquire rate budget, retry same step. |
TransientError |
Transient |
Exponential backoff, retry up to max_retries. |
SchemaInvalidError |
SchemaInvalid |
Surface to Failed Items Inbox. Do not retry — same input → same output. |
FatalError |
Fatal |
Surface to Failed Items Inbox. Operator intervention. |
categorize_exception(exc) is the single source of truth that maps arbitrary BaseException → ErrorKind. Unknown exceptions default to Transient (when in doubt, give it one more try; the state machine still caps total attempts).
result_from_exception(exc, *, stage, ...) wraps an exception into a StepResult for the state machine to ingest.
DLQ behaviour: SQS messages that fail more than max_receive_count times land in the bulk DLQ; the Failed Items Inbox admin lists everything in error_kind ∈ {SchemaInvalid, Fatal} plus everything still pinned in the DLQ.
See document-ingestion-state-machine.md §"Error categories" for the per-step decision table.
7. Observability + cost telemetry¶
Every step transition emits one DocumentIngestionRunEvent row. step_completed events carry:
model_id—"gemini-3.1-flash-lite","text-embedding-3-small", or""for pure-Python steps.tokens_in/tokens_out— populated by the cost tracker for LLM calls.cost_cents— eithertotal_cost_centsfrom the result (when the step ran multiple LLM calls) orcost_cents_for_call(model_id, tokens_in, tokens_out)derived from the per-model pricing table incost.py.duration_ms— wall-clock for the step.error_kind/error_message— populated onstep_failed.
Costs roll up: per-event → DocumentIngestionRunItem.cost_estimate_cents → DocumentIngestionRun.cost_estimate_cents. The Pipeline Manager admin reads these aggregates directly; no on-demand re-aggregation.
OTel traces stitch across the worker's step calls, the LLM client calls, and Postgres queries. The agent-run tagger scopes spans to actual agent loops. Traces export to the in-DB span store that powers the agent-traces admin. See observability-stack.md.
8. Local environment¶
Docker Compose (docker-compose.yml at repo root) brings up:
| Service | Image | Purpose |
|---|---|---|
campuscore_db |
Postgres + TimescaleDB | Schema + pgvector + HNSW indexes |
campuscore_redis |
Redis | Rate-budget coordinator, breaker state, cache |
campuscore_minio |
MinIO | S3-compatible (uploads + raw source files) |
campuscore_elasticmq |
ElasticMQ | SQS-compatible (interactive + bulk queues) |
campuscore_localstack |
LocalStack | Generic AWS services (CloudWatch, etc.) |
campuscore_tailwind |
node | CSS build watcher |
campuscore_web |
python | Django ASGI |
campuscore_worker |
python | The ingestion worker (manage.py run_document_worker) |
campuscore_dozzle |
dozzle | Container log viewer |
No Lambda in local. The worker calls Gemini and OpenAI directly. Streaming chat needs ASGI — run via uvicorn campus_core.asgi:application (handled by the web service).
9. Decisions recorded¶
Compact list of architectural decisions that shaped the current shape. Not a plan — these are the shipped choices, captured here so future engineers don't re-litigate them without context.
- Workers, not Lambda, do the LLM work. Direct in-process Gemini calls from ECS Fargate. Lower per-doc latency, simpler observability, one execution environment. Lambda was the original plan; never wired up; cleanup removed all of it (see §10).
- Two queues with priority. Interactive (chat attachments, single uploads) drains before bulk (scrapes, large folder uploads). Stops a 25k-page bulk job from blocking a single-document user request.
- Per-file state machine with idempotent transitions. A worker crash mid-file resumes from the last committed step. Each step's success is a DB write + an event; restart finds the right resume point.
- Per-step transitions, not opaque mega-steps. Each pipeline step (
fetch,extract_content,chunk_and_embed,extract_entities,persist,finalize) is its own state-machine transition with its own event pair, its own retry budget, and its own cost attribution. - Categorized errors over try/except-
Exception. The 4-way taxonomy +categorize_exceptionmakes "retry vs Failed Items Inbox" a typed decision, not a guess. Unknown exceptions default to retry-once (Transient) so a flapping provider doesn't terminate. - Centralized rate-budget coordinator (Redis Lua) over per-process backoff. Multiple worker tasks share one bucket per provider; total RPS to Gemini/OpenAI is bounded regardless of how many workers exist.
- Memory watermark admission control. SQS pulls pause at 0.85 container memory, resume at 0.65. Embedding bursts no longer OOM-kill the worker.
- Failed Items Inbox over silent retries. Items with
SchemaInvalidorFatalsurface in an admin proxy with their full error context. Operators triage; the worker doesn't loop on them. - Per-item cost telemetry, rolled up. Pricing table in
cost.py; per-LLM-call recorded bycost_tracker; aggregated intoDocumentIngestionRunItemand thenDocumentIngestionRun. The Pipeline Manager's Cost card reads aggregates directly. - One unified
DocumentIngestionRunmodel for every ingestion kind. One admin screen lists every scrape, folder upload, chat attachment, connector import, and manual reprocess. No per-source admin variants.
10. What we evaluated but didn't ship¶
Recorded so future engineers don't re-introduce these without remembering why we don't have them.
- Modal (third-party GPU/compute platform). Predecessor of the current shape. Removed because it required every BYOC client to set up a separate Modal billing relationship — at odds with the "deploy to your cloud" promise. Plain Python in ECS Fargate replaced it.
- AWS Lambda execution backend. The original design split heavy LLM work to container-image Lambdas (
extract-content+extract-typed) behind aDOCUMENT_PROCESSING_BACKEND=lambdafeature flag. Built end-to-end (handlers,LambdaClient, Terraform module, IAM, ECR repo, SSM Gemini-key) but never wired into the pipeline. The state-machine rework collapsed the dispatcher into direct in-process calls and the Lambda path was never restored. We removed it entirely (commit cleanup) because (a) for a single document Lambda only adds latency — the Gemini call dominates either way; (b) for throughput the real ceiling is Gemini's rate limit, not worker concurrency, so Lambda's scale-out doesn't help; (c) the operational overhead (separate deploy pipeline, IAM, monitoring across two runtimes) wasn't worth it at pilot scale. - S3 staging hand-off (
*-ingest-stagingbucket +payload_s3_keyevent field). Existed to ferry Lambda content-extraction outputs back to the worker without hitting Lambda's 6 MB sync-response cap. Gone with the Lambda removal — the worker holds outputs in-process now. - LocalStack Lambda emulation for parity. Local dev no longer needs Lambda emulation. LocalStack is still used for other AWS services (CloudWatch, etc.); ElasticMQ + MinIO handle SQS + S3.
If a future workload — burst-heavy with Gemini quota to spare, or a need for execution isolation per document — makes scale-out worth re-evaluating, the first lever is horizontal ECS worker scaling (raise desired_count), not re-introducing Lambda. One execution environment beats two.