Skip to content

Document Ingestion Pipeline

How CampusCore turns documents (scraped pages, uploaded files, connector imports, chat attachments) into chunked, embedded, entity-typed records that the agent can retrieve from. This doc describes the running system; for engineer-level detail on the state machine (step diagram, error categories, how to add a step), see document-ingestion-state-machine.md.


1. Overview

Every ingestion — regardless of source — flows through one unified pipeline:

  1. A trigger creates a DocumentIngestionRun + one DocumentIngestionRunItem per file (DB rows; durable from this point).
  2. The TwoQueueDispatcher publishes each item onto either the interactive or bulk SQS queue.
  3. The worker (ECS Fargate) polls interactive-first, pulls a batch under an AdmissionController memory gate, and runs each item through the state machine concurrently via asyncio.
  4. The DocumentIngestionStateMachine drives each item through ordered, idempotent step transitions; every transition is a state-machine row write + a DocumentIngestionRunEvent (cost, duration, error kind).
  5. Each step is a pure async function (fetch, extract_content, chunk_and_embed, extract_entities, persist, finalize). They all run in-process in the worker — there is no separate compute fabric.
  6. On terminal failure the item lands in the Failed Items Inbox (an admin proxy over rows in error_kind ∈ {SchemaInvalid, Fatal}). On retryable failure the worker backs off and retries up to a per-step budget.

A crash mid-file resumes from the last committed step — no LLM-token re-burn.


2. Architecture

                       trigger.enqueue(kind, items)
            ┌─────────────────────────────────────────────┐
            │ DocumentIngestionRun + Item rows (DB)       │
            │ — durable from this point                   │
            └────────────────────┬────────────────────────┘
                      ┌──────────────────────┐
                      │ TwoQueueDispatcher   │
                      └──────────┬───────────┘
                       ┌─────────┴─────────┐
                       ▼                   ▼
                  SQS interactive       SQS bulk
                       │                   │
                       └────────┬──────────┘  (interactive drained first)
                   ┌──────────────────────────┐
                   │  ECS worker (Fargate)    │
                   │  asyncio batch,          │
                   │  max_concurrency = 10,   │
                   │  AdmissionController gate│
                   └────────────┬─────────────┘
                                │ (one task per item)
              ┌──────────────────────────────────────┐
              │  DocumentIngestionStateMachine       │
              │   .run_step(target, callable)        │
              │                                      │
              │   for each step (idempotent):        │
              │     ▸ fetch                          │
              │     ▸ extract_content                │
              │     ▸ chunk_and_embed                │
              │     ▸ extract_entities               │
              │     ▸ persist                        │
              │     ▸ finalize                       │
              └────────────┬─────────────────────────┘
              DocumentIngestionStep advances
              + DocumentIngestionRunEvent written
              (step / duration / model_id / tokens / cost / error_kind)

External boundaries the worker talks to:

  • Postgres (TimescaleDB image) — the durable layer for DocumentIngestionRun*, Document, DocumentChunk, SourceFile, the entity satellite tables, and the embeddings (pgvector with HNSW).
  • SQS — the work queue (LocalStack/ElasticMQ in dev; real SQS in cloud).
  • S3 — for raw source files and uploads. Not for per-step staging (the original Lambda-era staging bucket was removed; see §10).
  • Gemini / OpenAI — LLM/embedding providers. Calls go through apps/main_app/clients/.
  • Redis — rate-budget coordinator (Lua-scripted token bucket per provider).

3. Data model

Defined in apps/main_app/models/document_ingestion.py. Three tables and four TextChoices enums:

Table Purpose
DocumentIngestionRun One row per ingestion operation (scrape, folder upload, chat attachment, connector import, manual reprocess). Aggregates cost + counts.
DocumentIngestionRunItem One row per file. Carries the current step, retry count, last error, FK to the SourceFile.
DocumentIngestionRunEvent Append-only per-step lifecycle log: step_started / step_completed / step_failed / retry_scheduled. Holds model_id, tokens_in/out, cost_cents, duration_ms, error_kind, error_message.

Enums (models.TextChoices):

  • DocumentIngestionRunKindscrape / folder_upload / chat_attachment / connector_import / manual_reprocess.
  • DocumentIngestionRunStatuspending / running / completed / failed / cancelled.
  • DocumentIngestionStepenqueued / fetched / content_extracted / chunked / embedded / entity_extracted / persisted / indexed / done / failed. The first nine form the happy-path STEP_ORDER tuple in the state machine.
  • DocumentIngestionErrorKind"" (none) / RateLimited / Transient / SchemaInvalid / Fatal. Mirrored at the application layer by step_result.ErrorKind.

Admin entry points: /cc_admin/ → Pipeline Manager (/cc_admin/pipeline/), Failed Items Inbox.


4. Pipeline steps

The state machine doesn't know what each step does — it just orchestrates. Each step is one async function under apps/main_app/services/document_ingestion/pipeline/, registered into the PIPELINE tuple consumed by runner.run_pipeline. Most are pure logic; the LLM-bearing ones are flagged.

Step Module LLM? What it does
fetched pipeline/fetch.py Pulls raw bytes from S3 / HTTP into ctx.raw_bytes and resolves the content type.
content_extracted pipeline/extract.py yes Dispatches into services/content_extraction: HTML → markdown via Gemini, files → markdown via the parser registry, markdown → passthrough. Produces ctx.markdown, ctx.raw_chunks, ctx.extracted_metadata.
chunked + embedded pipeline/chunk_and_embed.py yes (embedding) package_chunks finalizes the chunks produced upstream (or splits raw markdown via chunk_markdown). embed_chunks calls OpenAI text-embedding-3-small (1536d) in batches and stores vectors.
entity_extracted pipeline/entity_extract.py yes One Gemini call → an EntityBatch of typed entities (Article / Event / Course / …). Trivial markdown (<200 chars) bypasses the LLM and synthesises a GenericPage.
persisted pipeline/persist.py DocumentWriter.write_entity_batch(...) upserts Document + entity satellite rows + DocumentChunk rows in one transaction.
done pipeline/finalize.py Closes the item, flushes counters, marks the run complete if it was the last item.

The two LLM-bearing services (content_extraction.process_* and EntityExtractor) live as their own packages with their own front doors and tests — the pipeline only composes them.


5. Coordination primitives

The pieces that decide what runs when, separate from the per-file pipeline above.

TwoQueueDispatcher

Two SQS queues, one interactive and one bulk. The worker drains interactive first; a 25k-page scrape never sits in front of a chat attachment. The dispatcher classifies on enqueue by DocumentIngestionRunKind (chat_attachment + small folder_upload → interactive; scrape + connector_import + large folder uploads → bulk).

AdmissionController

A memory-watermark gate around SQS pulls. The worker checks container memory usage before each batch pull:

  • 0.85 watermark → stop pulling new work (back-pressure).
  • 0.65 watermark → resume pulling.

Prevents OOM-kills on bulk runs where embedding payloads spike memory.

Rate-budget coordinator

A Redis Lua-script token bucket, with separate RPM and TPM buckets per provider/model (e.g. gemini.flash-lite.rpm, gemini.flash-lite.tpm, openai.embeddings.rpm, openai.embeddings.tpm). Refill rates are tuned to ~90% of each published quota.

Every LLM/embedding call site wraps its invocation in the gate(budget, provider, model_family) async context manager:

  • Pre-call — acquires one slot from the .rpm bucket. If the bucket is empty, gate sleeps for the bucket's reported wait_ms and retries (the RPM cap becomes a queue, not an error). The work eventually runs rather than failing.
  • Post-call — the call site invokes await g.charge(response) (or charge_tokens(n) for embeddings, which don't expose usage_metadata), debiting the actual token count from the .tpm bucket. Over-budget TPM logs a warning but doesn't block — the work is already done.

The backend is selected at process start by get_rate_budget(): RedisRateBudget when settings.REDIS_URL is set (fleet-wide coordination), else InProcessRateBudget (dev/single-worker). The 6 wired call sites are the entity extractor, Gemini OCR parser, HTML→markdown conversion, metadata extraction, summary generation, and OpenAI chunk embeddings.

ECS scale-up pre-warm (scale_workers_for_bulk)

When a bulk-trigger (scrape, large folder upload) creates a run, trigger.py bumps the ECS worker service's desired_count so workers are ready when the SQS lag hits. Tied to INGEST_BULK_WORKER_DESIRED_COUNT and friends in settings.


6. Error handling

The 4-way exception taxonomy in services/document_ingestion/exceptions.py (mirrored by step_result.ErrorKind):

Exception ErrorKind State-machine reaction
RateLimitedError RateLimited Acquire rate budget, retry same step.
TransientError Transient Exponential backoff, retry up to max_retries.
SchemaInvalidError SchemaInvalid Surface to Failed Items Inbox. Do not retry — same input → same output.
FatalError Fatal Surface to Failed Items Inbox. Operator intervention.

categorize_exception(exc) is the single source of truth that maps arbitrary BaseExceptionErrorKind. Unknown exceptions default to Transient (when in doubt, give it one more try; the state machine still caps total attempts).

result_from_exception(exc, *, stage, ...) wraps an exception into a StepResult for the state machine to ingest.

DLQ behaviour: SQS messages that fail more than max_receive_count times land in the bulk DLQ; the Failed Items Inbox admin lists everything in error_kind ∈ {SchemaInvalid, Fatal} plus everything still pinned in the DLQ.

See document-ingestion-state-machine.md §"Error categories" for the per-step decision table.


7. Observability + cost telemetry

Every step transition emits one DocumentIngestionRunEvent row. step_completed events carry:

  • model_id"gemini-3.1-flash-lite", "text-embedding-3-small", or "" for pure-Python steps.
  • tokens_in / tokens_out — populated by the cost tracker for LLM calls.
  • cost_cents — either total_cost_cents from the result (when the step ran multiple LLM calls) or cost_cents_for_call(model_id, tokens_in, tokens_out) derived from the per-model pricing table in cost.py.
  • duration_ms — wall-clock for the step.
  • error_kind / error_message — populated on step_failed.

Costs roll up: per-event → DocumentIngestionRunItem.cost_estimate_centsDocumentIngestionRun.cost_estimate_cents. The Pipeline Manager admin reads these aggregates directly; no on-demand re-aggregation.

OTel traces stitch across the worker's step calls, the LLM client calls, and Postgres queries. The agent-run tagger scopes spans to actual agent loops. Traces export to the in-DB span store that powers the agent-traces admin. See observability-stack.md.


8. Local environment

Docker Compose (docker-compose.yml at repo root) brings up:

Service Image Purpose
campuscore_db Postgres + TimescaleDB Schema + pgvector + HNSW indexes
campuscore_redis Redis Rate-budget coordinator, breaker state, cache
campuscore_minio MinIO S3-compatible (uploads + raw source files)
campuscore_elasticmq ElasticMQ SQS-compatible (interactive + bulk queues)
campuscore_localstack LocalStack Generic AWS services (CloudWatch, etc.)
campuscore_tailwind node CSS build watcher
campuscore_web python Django ASGI
campuscore_worker python The ingestion worker (manage.py run_document_worker)
campuscore_dozzle dozzle Container log viewer

No Lambda in local. The worker calls Gemini and OpenAI directly. Streaming chat needs ASGI — run via uvicorn campus_core.asgi:application (handled by the web service).


9. Decisions recorded

Compact list of architectural decisions that shaped the current shape. Not a plan — these are the shipped choices, captured here so future engineers don't re-litigate them without context.

  • Workers, not Lambda, do the LLM work. Direct in-process Gemini calls from ECS Fargate. Lower per-doc latency, simpler observability, one execution environment. Lambda was the original plan; never wired up; cleanup removed all of it (see §10).
  • Two queues with priority. Interactive (chat attachments, single uploads) drains before bulk (scrapes, large folder uploads). Stops a 25k-page bulk job from blocking a single-document user request.
  • Per-file state machine with idempotent transitions. A worker crash mid-file resumes from the last committed step. Each step's success is a DB write + an event; restart finds the right resume point.
  • Per-step transitions, not opaque mega-steps. Each pipeline step (fetch, extract_content, chunk_and_embed, extract_entities, persist, finalize) is its own state-machine transition with its own event pair, its own retry budget, and its own cost attribution.
  • Categorized errors over try/except-Exception. The 4-way taxonomy + categorize_exception makes "retry vs Failed Items Inbox" a typed decision, not a guess. Unknown exceptions default to retry-once (Transient) so a flapping provider doesn't terminate.
  • Centralized rate-budget coordinator (Redis Lua) over per-process backoff. Multiple worker tasks share one bucket per provider; total RPS to Gemini/OpenAI is bounded regardless of how many workers exist.
  • Memory watermark admission control. SQS pulls pause at 0.85 container memory, resume at 0.65. Embedding bursts no longer OOM-kill the worker.
  • Failed Items Inbox over silent retries. Items with SchemaInvalid or Fatal surface in an admin proxy with their full error context. Operators triage; the worker doesn't loop on them.
  • Per-item cost telemetry, rolled up. Pricing table in cost.py; per-LLM-call recorded by cost_tracker; aggregated into DocumentIngestionRunItem and then DocumentIngestionRun. The Pipeline Manager's Cost card reads aggregates directly.
  • One unified DocumentIngestionRun model for every ingestion kind. One admin screen lists every scrape, folder upload, chat attachment, connector import, and manual reprocess. No per-source admin variants.

10. What we evaluated but didn't ship

Recorded so future engineers don't re-introduce these without remembering why we don't have them.

  • Modal (third-party GPU/compute platform). Predecessor of the current shape. Removed because it required every BYOC client to set up a separate Modal billing relationship — at odds with the "deploy to your cloud" promise. Plain Python in ECS Fargate replaced it.
  • AWS Lambda execution backend. The original design split heavy LLM work to container-image Lambdas (extract-content + extract-typed) behind a DOCUMENT_PROCESSING_BACKEND=lambda feature flag. Built end-to-end (handlers, LambdaClient, Terraform module, IAM, ECR repo, SSM Gemini-key) but never wired into the pipeline. The state-machine rework collapsed the dispatcher into direct in-process calls and the Lambda path was never restored. We removed it entirely (commit cleanup) because (a) for a single document Lambda only adds latency — the Gemini call dominates either way; (b) for throughput the real ceiling is Gemini's rate limit, not worker concurrency, so Lambda's scale-out doesn't help; (c) the operational overhead (separate deploy pipeline, IAM, monitoring across two runtimes) wasn't worth it at pilot scale.
  • S3 staging hand-off (*-ingest-staging bucket + payload_s3_key event field). Existed to ferry Lambda content-extraction outputs back to the worker without hitting Lambda's 6 MB sync-response cap. Gone with the Lambda removal — the worker holds outputs in-process now.
  • LocalStack Lambda emulation for parity. Local dev no longer needs Lambda emulation. LocalStack is still used for other AWS services (CloudWatch, etc.); ElasticMQ + MinIO handle SQS + S3.

If a future workload — burst-heavy with Gemini quota to spare, or a need for execution isolation per document — makes scale-out worth re-evaluating, the first lever is horizontal ECS worker scaling (raise desired_count), not re-introducing Lambda. One execution environment beats two.