Skip to main content
Component: Orchestrator - Multi-step workflow execution engine Module: gaia.agents.code.orchestration.orchestrator Import: from gaia.agents.code.orchestration.orchestrator import Orchestrator, ExecutionResult, CheckpointAssessment

Overview

Orchestrator controls LLM-driven workflow execution with error recovery using Checklist Mode. The LLM generates a checklist of template invocations, which are executed deterministically with automatic error recovery and checkpoint assessment. Key Features:
  • LLM-driven checklist generation
  • Deterministic template execution
  • Three-tier error recovery strategy
  • Iterative refinement with checkpoint review
  • Progress reporting and validation tracking
  • Project state analysis between iterations

API Specification

ExecutionResult

@dataclass
class ExecutionResult:
    """Result of a complete workflow execution."""

    success: bool
    phases_completed: List[str] = field(default_factory=list)
    phases_failed: List[str] = field(default_factory=list)
    total_steps: int = 0
    steps_succeeded: int = 0
    steps_failed: int = 0
    steps_skipped: int = 0
    errors: List[str] = field(default_factory=list)
    outputs: Dict[str, Any] = field(default_factory=dict)

    @property
    def summary(self) -> str:
        """Get a human-readable summary."""
        status = "SUCCESS" if self.success else "FAILED"
        return (
            f"{status}: {self.steps_succeeded}/{self.total_steps} steps completed, "
            f"{self.steps_failed} failed, {self.steps_skipped} skipped"
        )

CheckpointAssessment

@dataclass
class CheckpointAssessment:
    """LLM-produced verdict about the current checkpoint."""

    status: str  # "complete" or "needs_fix"
    reasoning: str
    issues: List[str] = field(default_factory=list)
    fix_instructions: List[str] = field(default_factory=list)

    @property
    def needs_fix(self) -> bool:
        """Return True when the reviewer requires another checklist."""
        return self.status.lower() != "complete"

    def to_dict(self) -> Dict[str, Any]:
        """Serialize the assessment."""
        ...

Orchestrator

class Orchestrator:
    """
    Controls LLM-driven workflow execution with error recovery.

    Uses Checklist Mode exclusively:
    - LLM analyzes user request and generates a checklist of templates
    - Executor runs templates deterministically
    - Provides semantic understanding (e.g., adds checkboxes for todos)
    """

    def __init__(
        self,
        tool_executor: ToolExecutor,
        llm_client: ChatSDK,
        llm_fixer: Optional[Callable[[str, str], Optional[str]]] = None,
        progress_callback: Optional[Callable[[str, str, int, int], None]] = None,
        console: Optional[AgentConsole] = None,
        max_checklist_loops: int = 10,
    ):
        """
        Initialize orchestrator.

        Args:
            tool_executor: Function to execute tools (name, args) -> result
            llm_client: Chat SDK for checklist generation (required)
            llm_fixer: Optional LLM-based code fixer for escalation
            progress_callback: Optional callback(phase, step, current, total)
            console: Optional console for displaying output
            max_checklist_loops: Max number of checklist iterations
        """
        ...

    def execute(
        self, context: UserContext, step_through: bool = False
    ) -> ExecutionResult:
        """
        Execute the workflow using iterative LLM-generated checklists.

        Args:
            context: UserContext with request and project info
            step_through: If True, pause after each step for review

        Returns:
            ExecutionResult with success status and detailed outputs
        """
        ...

    def _assess_checkpoint(
        self,
        context: UserContext,
        checklist: Any,
        execution_result: Any,
        validation_history: List[Any],
    ) -> CheckpointAssessment:
        """Ask the LLM whether the workflow is complete or needs another checklist."""
        ...

    def _build_checkpoint_prompt(
        self,
        context: UserContext,
        checklist: Any,
        execution_result: Any,
        validation_history: List[Any],
    ) -> str:
        """Build the prompt for the checkpoint reviewer."""
        ...

Usage Examples

Example 1: Basic Workflow Execution

from gaia.agents.code.orchestration.orchestrator import Orchestrator
from gaia.agents.code.orchestration.steps.base import UserContext

# Create context
context = UserContext(
    user_request="Create a Next.js blog",
    project_dir="/path/to/project",
    language="typescript",
    project_type="fullstack"
)

# Initialize orchestrator
orchestrator = Orchestrator(
    tool_executor=tool_executor,
    llm_client=chat_sdk,
    max_checklist_loops=5
)

# Execute workflow
result = orchestrator.execute(context)

if result.success:
    print(f"Workflow completed: {result.summary}")
    print(f"Files created: {len(result.outputs.get('files', []))}")
else:
    print(f"Workflow failed: {result.errors}")

Example 2: Step-Through Mode

# Execute with manual step confirmation
result = orchestrator.execute(context, step_through=True)

# User is prompted after each step:
# "Press Enter to continue, or 'n'/'q' to stop..."

Example 3: Custom Progress Callback

def progress_handler(phase: str, step: str, current: int, total: int):
    """Handle progress updates."""
    print(f"[{phase}] Step {current}/{total}: {step}")

orchestrator = Orchestrator(
    tool_executor=tool_executor,
    llm_client=chat_sdk,
    progress_callback=progress_handler
)

result = orchestrator.execute(context)

Workflow Flow

1. Generate Checklist (LLM)
   └─> Analyze user request + project state
   └─> Generate list of template invocations

2. Execute Checklist (Deterministic)
   └─> For each item:
       ├─> Execute template with args
       ├─> Apply error recovery if needed
       └─> Track validation results

3. Assess Checkpoint (LLM)
   └─> Review execution results
   └─> Check validation logs
   └─> Decide: complete or needs_fix

4. Iterate if needed
   └─> If needs_fix:
       ├─> Add fix feedback to context
       ├─> Generate new checklist
       └─> Repeat from step 2

Testing Requirements

def test_orchestrator_initialization():
    """Test orchestrator creation."""
    orchestrator = Orchestrator(
        tool_executor=mock_executor,
        llm_client=mock_llm
    )
    assert orchestrator is not None

def test_checkpoint_assessment():
    """Test checkpoint assessment creation."""
    assessment = CheckpointAssessment(
        status="needs_fix",
        reasoning="Tests failing",
        issues=["TypeError in main.py"],
        fix_instructions=["Fix type annotation"]
    )
    assert assessment.needs_fix
    assert len(assessment.issues) == 1

def test_execution_result_summary():
    """Test execution result summary."""
    result = ExecutionResult(
        success=True,
        total_steps=5,
        steps_succeeded=5
    )
    assert "5/5" in result.summary
    assert "SUCCESS" in result.summary

Dependencies

[project]
dependencies = [
    "gaia.agents.code.orchestration.checklist_generator",
    "gaia.agents.code.orchestration.checklist_executor",
    "gaia.agents.code.orchestration.steps.error_handler",
    "gaia.agents.code.orchestration.project_analyzer",
]

Acceptance Criteria

  • Orchestrator class implemented
  • Checklist generation integration
  • Deterministic execution working
  • Checkpoint assessment functional
  • Iterative refinement working
  • Error recovery operational
  • Progress callbacks functional
  • All unit tests pass (6+ tests)

Orchestrator Technical Specification