Pipeline Infrastructure
The pipeline is the execution engine that runs passes in the correct order. It consists of three layers: the trait (CompilePass), the executor (PipelineExecutor), and the orchestrator (PipelineOrchestrator).
CompilePass Trait
Every pass implements the CompilePass trait:
#[async_trait]
pub trait CompilePass: Send + Sync {
/// Unique pass name (used for dependencies and checkpointing).
fn name(&self) -> &str;
/// Execute the pass, reading from and writing to the shared context.
async fn execute(&mut self, ctx: &mut CompileContext) -> Result<PassResult>;
/// Whether the pipeline can continue if this pass fails.
fn is_optional(&self) -> bool { false }
/// Names of passes that must complete before this one runs.
fn depends_on(&self) -> Vec<&'static str> { Vec::new() }
/// How to handle failures: Fail, Skip, or Retry.
fn failure_policy(&self) -> FailurePolicy { ... }
/// Which context fields this pass reads/writes (for parallel safety).
fn access_pattern(&self) -> AccessPattern { AccessPattern::default() }
}
AccessPattern
Declares which context fields a pass accesses, enabling safe parallel execution:
pub struct AccessPattern {
pub reads_tree: bool,
pub writes_tree: bool,
pub writes_reasoning_index: bool,
pub writes_navigation_index: bool,
pub writes_description: bool,
pub writes_concepts: bool,
}
Within a parallel execution group, at most one pass may write to the tree. All other passes receive cloned contexts with tree snapshots. After all passes complete, outputs are merged back into the main context.
FailurePolicy
| Policy | Behavior |
|---|---|
Fail | Stop the entire pipeline (default for required passes) |
Skip | Log the failure, mark as failed, continue pipeline |
Retry(config) | Retry with exponential backoff up to max_attempts |
Optional passes default to Skip. The Retry policy accepts configurable delay and max attempts.
PipelineExecutor
The executor is the main entry point. It provides two preset configurations:
// Without LLM — skips EnhancePass and ConceptPass
let executor = PipelineExecutor::new();
// With LLM — includes summary generation and concept extraction
let executor = PipelineExecutor::with_llm(llm_client);
Custom pipelines can be built using the orchestrator directly:
let orchestrator = PipelineOrchestrator::new()
.stage_with_priority(ParsePass::new(), 10)
.stage_with_priority(BuildPass::new(), 20)
.stage_with_priority(MyCustomPass::new(), 35);
let executor = PipelineExecutor::from_orchestrator(orchestrator);
You can also add passes to an existing executor:
let executor = PipelineExecutor::with_llm(client)
.add_stage_with_priority(MyPass::new(), 55)
.add_stage_with_deps(MyValidationPass::new(), 56, &["my_pass"]);
PipelineOrchestrator
The orchestrator handles the complex parts of pipeline execution:
Dependency Resolution
Passes declare dependencies by name. The orchestrator performs a topological sort with priority-based ordering (Kahn's algorithm):
- Build a dependency graph from
depends_on()declarations - Validate all dependencies refer to existing passes
- Sort by: dependencies first, then priority (lower = earlier), then registration order
- Detect circular dependencies and report an error
Execution Groups
Passes at the same dependency level with no inter-dependencies are grouped for parallel execution:
Group 0 (parallel): [ParsePass] — no deps
Group 1 (parallel): [BuildPass] — depends on "parse"
Group 2 (parallel): [ValidatePass, SplitPass] — both depend on "build"
Group 3: [EnhancePass] — depends on "build"
Group 4: [EnrichPass] — depends on "build"
Group 5 (parallel): [ReasoningPass, NavigationPass] — depend on "enrich"
...
Parallel Execution
When a group has multiple passes:
- Identify the tree writer (if any) — it gets the main context
- All other passes receive cloned contexts with tree snapshots
- All passes run concurrently via
tokio::join! - Results are merged back by inspecting each pass's
AccessPattern - Additive metrics (LLM calls, tokens) are summed across passes
CompileContext
The shared context passed between passes:
CompileContext
├── doc_id, name, format, source_path # Document identity
├── input: CompilerInput # Source (File/Content/Bytes)
├── source_hash: String # SHA-256 for checkpoint validation
├── raw_nodes: Vec<RawNode> # ← ParsePass writes
├── tree: Option<DocumentTree> # ← BuildPass writes
├── reasoning_index: Option<ReasoningIndex> # ← ReasoningPass writes
├── navigation_index: Option<NavigationIndex> # ← NavigationPass writes
├── concepts: Vec<Concept> # ← ConceptPass writes
├── description: Option<String> # ← EnrichPass writes
├── summary_cache: SummaryCache # Summary memoization
├── metrics: IndexMetrics # Performance tracking
├── stage_results: HashMap<String, PassResult> # Per-pass results
└── options: PipelineOptions # Configuration