Source Code:
src/gaia/api/Components: OpenAI-compatible API Server, Schemas, Endpoints, SSE Streaming
Module:
gaia.api
Import: from gaia.api import app, schemas, agent_registryOverview
The GAIA API Server provides an OpenAI-compatible REST API for exposing GAIA agents to external tools like VSCode, Claude Dev, and custom applications. It implements the OpenAI chat completions interface with streaming support, agent registration, and comprehensive debugging features. Key Features:- OpenAI-compatible endpoints (/v1/chat/completions, /v1/models)
- Server-Sent Events (SSE) streaming
- Dynamic agent registry
- Workspace root extraction from GitHub Copilot
- Debug modes (logging, prompts, step-through)
- CORS support
- Health checks
- Token usage tracking
Requirements
Functional Requirements
Core Endpoints
-
POST /v1/chat/completions
- Non-streaming responses
- SSE streaming responses
- Message history support
- Tool calls support (future)
-
GET /v1/models
- List available agents as models
- Model metadata (tokens, description)
-
GET /health
- Health check endpoint
- Returns 200 OK
Agent Registry
-
Agent Registration
- Dynamic agent discovery
- Model ID mapping
- Agent instantiation
- Workspace configuration
-
Agent Management
- Singleton instances per agent type
- Silent mode operation
- Workspace root injection
Request Processing
-
Message Handling
- OpenAI message format
- System/user/assistant roles
- GitHub Copilot workspace extraction
- Prompt formatting
-
Response Generation
- Agent process_query() integration
- Token counting
- Usage statistics
- Unique response IDs
Streaming Support
- SSE Streaming
- Chunk-based streaming
- Delta content format
- Finish reason reporting
- Proper SSE formatting (data: prefix)
Non-Functional Requirements
-
Performance
- Low latency response
- Efficient streaming
- Connection pooling
-
Reliability
- Error handling
- Connection recovery
- Graceful degradation
-
Debugging
- Request/response logging
- Prompt display
- Step-through mode
- Raw HTTP logging
-
Security
- CORS configuration
- Input validation
- Error sanitization
API Specification
File Locations
src/gaia/api/app.py # CLI entry point
src/gaia/api/openai_server.py # FastAPI server
src/gaia/api/schemas.py # Pydantic models
src/gaia/api/agent_registry.py # Agent management
src/gaia/api/sse_handler.py # SSE streaming
Schemas (Pydantic Models)
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field
class ChatMessage(BaseModel):
"""Chat message in OpenAI format."""
role: Literal["system", "user", "assistant", "tool"]
content: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None
class ChatCompletionRequest(BaseModel):
"""POST /v1/chat/completions request."""
model: str = Field(..., description="Model ID (e.g., gaia-code, gaia-jira)")
messages: List[ChatMessage]
stream: bool = Field(default=False, description="Enable SSE streaming")
temperature: Optional[float] = Field(default=0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(default=None, gt=0)
top_p: Optional[float] = Field(default=1.0, ge=0, le=1)
class ChatCompletionResponseMessage(BaseModel):
"""Response message from chat completion."""
role: Literal["assistant"]
content: str
tool_calls: Optional[List[Dict[str, Any]]] = None
class ChatCompletionChoice(BaseModel):
"""A single completion choice."""
index: int
message: ChatCompletionResponseMessage
finish_reason: Literal["stop", "length"]
class UsageInfo(BaseModel):
"""Token usage information."""
prompt_tokens: int
completion_tokens: int
total_tokens: int
class ChatCompletionResponse(BaseModel):
"""POST /v1/chat/completions response (non-streaming)."""
id: str
object: Literal["chat.completion"]
created: int
model: str
choices: List[ChatCompletionChoice]
usage: UsageInfo
class ModelInfo(BaseModel):
"""Model metadata for /v1/models endpoint."""
id: str
object: Literal["model"]
created: int
owned_by: str
description: Optional[str] = None
max_input_tokens: Optional[int] = None
max_output_tokens: Optional[int] = None
class ModelListResponse(BaseModel):
"""GET /v1/models response."""
object: Literal["list"]
data: List[ModelInfo]
API Endpoints
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="GAIA OpenAI-Compatible API",
description="OpenAI-compatible API for GAIA agents",
version="1.0.0",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
"""
Create chat completion (OpenAI-compatible endpoint).
Supports both streaming (SSE) and non-streaming responses.
Args:
request: Chat completion request with model, messages, and options
Returns:
For non-streaming: ChatCompletionResponse
For streaming: StreamingResponse with SSE chunks
Raises:
HTTPException 404: Model not found
HTTPException 400: No user message in request
Example Non-Streaming:
POST /v1/chat/completions
{
"model": "gaia-code",
"messages": [{"role": "user", "content": "Write hello world"}],
"stream": false
}
Response:
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1234567890,
"model": "gaia-code",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "Here's hello world:\\nprint('Hello, World!')"
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 10,
"completion_tokens": 20,
"total_tokens": 30
}
}
Example Streaming:
POST /v1/chat/completions
{
"model": "gaia-code",
"messages": [{"role": "user", "content": "Write hello world"}],
"stream": true
}
Response (SSE):
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk",...}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk",...}
data: [DONE]
"""
pass
@app.get("/v1/models")
async def list_models() -> ModelListResponse:
"""
List available models (GAIA agents).
Returns:
ModelListResponse with list of available agents as models
Example:
GET /v1/models
Response:
{
"object": "list",
"data": [
{
"id": "gaia-code",
"object": "model",
"created": 1234567890,
"owned_by": "amd-gaia",
"description": "Autonomous Python coding agent",
"max_input_tokens": 32768,
"max_output_tokens": 8192
},
{
"id": "gaia-jira",
"object": "model",
"created": 1234567890,
"owned_by": "amd-gaia"
}
]
}
"""
pass
@app.get("/health")
async def health_check():
"""
Health check endpoint.
Returns:
{"status": "ok"}
Example:
GET /health
Response:
{"status": "ok"}
"""
return {"status": "ok"}
Agent Registry
The API-server’sAgentRegistry is not a runtime plug-in registry — it is
a hardcoded AGENT_MODELS dict in src/gaia/api/agent_registry.py plus a
dynamic import helper. There is no register_agent() method today; to add a
new model you edit AGENT_MODELS and restart the server.
# src/gaia/api/agent_registry.py (simplified)
from importlib import import_module
from typing import Any, Dict
AGENT_MODELS: Dict[str, Dict[str, Any]] = {
"gaia-code": {
"class_name": "gaia.agents.routing.agent.RoutingAgent",
"init_params": {"api_mode": True, "silent_mode": True,
"streaming": False, "max_steps": 100},
"description": "Intelligent routing agent ...",
},
# Add entries here to expose more models on /v1/models.
}
class AgentRegistry:
"""Resolves model IDs to agent instances by importing the target class
on demand. `gaia api start --debug/--show-prompts/--streaming/--step-through`
set GAIA_API_* env-vars that are merged into `init_params` at startup."""
def get_agent(self, model_id: str):
"""Load and instantiate the agent for `model_id`.
Raises `KeyError` if the model is not registered.
A *fresh* agent is created per call — there is no singleton cache.
"""
entry = AGENT_MODELS[model_id]
module_path, _, class_name = entry["class_name"].rpartition(".")
agent_class = getattr(import_module(module_path), class_name)
return agent_class(**entry["init_params"])
def list_models(self):
"""Return the models block used by `/v1/models`."""
return [
{
"id": model_id,
"object": "model",
"owned_by": "amd-gaia",
"description": cfg.get("description", ""),
}
for model_id, cfg in AGENT_MODELS.items()
]
For the richer, manifest-based agent registry used by the Agent UI (not the
OpenAI-style API server), see the
Agent Registry spec covering
src/gaia/agents/registry.py, AgentManifest, and ~/.gaia/agents/ discovery.
### SSE Streaming
```python
from typing import AsyncGenerator
import json
import uuid
import time
async def stream_chat_completion(
agent: Agent,
user_message: str,
model_id: str,
) -> AsyncGenerator[str, None]:
"""
Stream chat completion using SSE format.
Args:
agent: Agent instance to use
user_message: User's message
model_id: Model identifier
Yields:
SSE-formatted chunks (data: {...})
Example:
async for chunk in stream_chat_completion(agent, "Hello", "gaia-code"):
print(chunk)
# Output:
# data: {"id":"chatcmpl-123","object":"chat.completion.chunk",...}
# data: {"id":"chatcmpl-123","object":"chat.completion.chunk",...}
# data: [DONE]
"""
chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
created = int(time.time())
try:
# Stream response from agent
for delta_content in agent.stream_query(user_message):
chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": created,
"model": model_id,
"choices": [
{
"index": 0,
"delta": {"content": delta_content},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk)}\n\n"
# Final chunk with finish_reason
final_chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": created,
"model": model_id,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
error_chunk = {
"error": {
"message": str(e),
"type": "server_error",
}
}
yield f"data: {json.dumps(error_chunk)}\n\n"
Implementation Details
Workspace Root Extraction
def extract_workspace_root(messages):
"""
Extract workspace root path from GitHub Copilot messages.
GitHub Copilot includes workspace info in messages like:
<workspace_info>
I am working in a workspace with the following folders:
- /Users/username/path/to/workspace
</workspace_info>
Args:
messages: List of ChatMessage objects
Returns:
str: Workspace root path, or None if not found
"""
import re
for msg in messages:
if msg.role == "user" and msg.content:
workspace_match = re.search(
r"<workspace_info>.*?following folders:\s*\n\s*-\s*([^\s\n]+)",
msg.content,
re.DOTALL,
)
if workspace_match:
return workspace_match.group(1).strip()
return None
Request Processing Flow
async def create_chat_completion(request: ChatCompletionRequest):
# 1. Extract workspace root (carried in the last user message's metadata).
workspace_root = extract_workspace_root(request.messages)
# 2. Resolve the agent. `get_agent()` takes only model_id; workspace_root
# is applied afterwards (e.g., via agent.config or a per-request setter).
try:
agent = registry.get_agent(request.model)
except KeyError:
raise HTTPException(404, f"Model not found: {request.model}")
if workspace_root and hasattr(agent, "set_workspace_root"):
agent.set_workspace_root(workspace_root)
# 3. Extract user message
user_messages = [m for m in request.messages if m.role == "user"]
if not user_messages:
raise HTTPException(400, "No user message in request")
user_message = user_messages[-1].content
# 4. Generate response
if request.stream:
# Streaming response
return StreamingResponse(
stream_chat_completion(agent, user_message, request.model),
media_type="text/event-stream",
)
else:
# Non-streaming response
response_text = agent.process_query(user_message)
# Token counting
if isinstance(agent, ApiAgent):
prompt_tokens = agent.estimate_tokens(user_message)
completion_tokens = agent.estimate_tokens(response_text)
else:
prompt_tokens = len(user_message) // 4
completion_tokens = len(response_text) // 4
return ChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4().hex[:8]}",
object="chat.completion",
created=int(time.time()),
model=request.model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatCompletionResponseMessage(
role="assistant",
content=response_text,
),
finish_reason="stop",
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
)
Debug Middleware
@app.middleware("http")
async def log_raw_requests(request: Request, call_next):
"""
Middleware to log raw HTTP requests when debug mode is enabled.
"""
if os.environ.get("GAIA_API_DEBUG") == "1":
logger.debug("=" * 80)
logger.debug("RAW HTTP REQUEST")
logger.debug("=" * 80)
logger.debug(f"Path: {request.url.path}")
logger.debug(f"Method: {request.method}")
logger.debug("Headers:")
for name, value in request.headers.items():
logger.debug(f" {name}: {value}")
# Don't read body for streaming endpoints
if request.url.path != "/v1/chat/completions":
body_bytes = await request.body()
logger.debug(f"Body: {body_bytes.decode('utf-8')}")
logger.debug("=" * 80)
response = await call_next(request)
return response
Testing Requirements
Unit Tests
File:tests/api/test_api_server.py
import pytest
from fastapi.testclient import TestClient
from gaia.api.openai_server import app
client = TestClient(app)
def test_health_check():
"""Test health endpoint."""
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_list_models():
"""Test models listing."""
response = client.get("/v1/models")
assert response.status_code == 200
data = response.json()
assert data["object"] == "list"
assert isinstance(data["data"], list)
assert len(data["data"]) > 0
def test_chat_completion_non_streaming():
"""Test non-streaming chat completion."""
response = client.post(
"/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [{"role": "user", "content": "Say hello"}],
"stream": False
}
)
assert response.status_code == 200
data = response.json()
assert data["object"] == "chat.completion"
assert len(data["choices"]) > 0
assert data["choices"][0]["message"]["role"] == "assistant"
assert isinstance(data["choices"][0]["message"]["content"], str)
assert "usage" in data
def test_chat_completion_streaming():
"""Test streaming chat completion."""
with client.stream(
"POST",
"/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [{"role": "user", "content": "Count to 3"}],
"stream": True
}
) as response:
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream"
chunks = []
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data != "[DONE]":
import json
chunk = json.loads(data)
chunks.append(chunk)
assert len(chunks) > 0
assert chunks[0]["object"] == "chat.completion.chunk"
def test_chat_completion_invalid_model():
"""Test chat completion with invalid model."""
response = client.post(
"/v1/chat/completions",
json={
"model": "nonexistent-model",
"messages": [{"role": "user", "content": "Hello"}],
}
)
assert response.status_code == 404
def test_chat_completion_no_user_message():
"""Test chat completion without user message."""
response = client.post(
"/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [{"role": "system", "content": "You are helpful"}],
}
)
assert response.status_code == 400
Usage Examples
Example 1: Start API Server
# Foreground
gaia api start
# Background
gaia api start --background
# Custom host/port
gaia api start --host 0.0.0.0 --port 9090
# With debug
gaia api start --debug --show-prompts
# Stop server
gaia api stop
Example 2: Non-Streaming Request (curl)
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Write a hello world program"}
],
"stream": false
}'
Example 3: Streaming Request (curl)
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Write a hello world program"}
],
"stream": true
}'
Example 4: Python Client
import requests
# List models
response = requests.get("http://localhost:8080/v1/models")
models = response.json()["data"]
for model in models:
print(f"{model['id']}: {model.get('description', 'No description')}")
# Non-streaming chat
response = requests.post(
"http://localhost:8080/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Write hello world in Python"}
],
"stream": False
}
)
result = response.json()
print(result["choices"][0]["message"]["content"])
# Streaming chat
response = requests.post(
"http://localhost:8080/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Count to 5"}
],
"stream": True
},
stream=True
)
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith("data: "):
data = line[6:]
if data != "[DONE]":
import json
chunk = json.loads(data)
content = chunk["choices"][0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)
print()
Example 5: OpenAI Python SDK
from openai import OpenAI
# Point to GAIA API server
client = OpenAI(
base_url="http://localhost:8080/v1",
api_key="none" # Not required for GAIA
)
# Non-streaming
response = client.chat.completions.create(
model="gaia-code",
messages=[
{"role": "user", "content": "Write hello world"}
]
)
print(response.choices[0].message.content)
# Streaming
stream = client.chat.completions.create(
model="gaia-code",
messages=[
{"role": "user", "content": "Count to 5"}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
CLI Interface
Commands
# app.py
def start_server(
host: str = "localhost",
port: int = 8080,
background: bool = False,
debug: bool = False,
show_prompts: bool = False,
streaming: bool = False,
step_through: bool = False,
):
"""
Start the API server.
Args:
host: Host to bind to (default: localhost)
port: Port to bind to (default: 8080)
background: Run in background if True
debug: Enable debug logging
show_prompts: Display prompts sent to LLM
streaming: Enable real-time streaming of LLM responses
step_through: Enable step-through debugging mode
"""
pass
def stop_server(port: int = 8080):
"""
Stop the API server by finding and killing processes on the port.
Args:
port: Port number to stop server on (default: 8080)
"""
pass
def check_status():
"""Check if API server is running."""
pass
API Server Technical Specification