Source Code:
src/gaia/api/Components: OpenAI-compatible API Server, Schemas, Endpoints, SSE Streaming
Module:
gaia.api
Import: from gaia.api import app, schemas, agent_registryOverview
The GAIA API Server provides an OpenAI-compatible REST API for exposing GAIA agents to external tools like VSCode, Claude Dev, and custom applications. It implements the OpenAI chat completions interface with streaming support, agent registration, and comprehensive debugging features. Key Features:- OpenAI-compatible endpoints (/v1/chat/completions, /v1/models)
- Server-Sent Events (SSE) streaming
- Dynamic agent registry
- Workspace root extraction from GitHub Copilot
- Debug modes (logging, prompts, step-through)
- CORS support
- Health checks
- Token usage tracking
Requirements
Functional Requirements
Core Endpoints
-
POST /v1/chat/completions
- Non-streaming responses
- SSE streaming responses
- Message history support
- Tool calls support (future)
-
GET /v1/models
- List available agents as models
- Model metadata (tokens, description)
-
GET /health
- Health check endpoint
- Returns 200 OK
Agent Registry
-
Agent Registration
- Dynamic agent discovery
- Model ID mapping
- Agent instantiation
- Workspace configuration
-
Agent Management
- Singleton instances per agent type
- Silent mode operation
- Workspace root injection
Request Processing
-
Message Handling
- OpenAI message format
- System/user/assistant roles
- GitHub Copilot workspace extraction
- Prompt formatting
-
Response Generation
- Agent process_query() integration
- Token counting
- Usage statistics
- Unique response IDs
Streaming Support
- SSE Streaming
- Chunk-based streaming
- Delta content format
- Finish reason reporting
- Proper SSE formatting (data: prefix)
Non-Functional Requirements
-
Performance
- Low latency response
- Efficient streaming
- Connection pooling
-
Reliability
- Error handling
- Connection recovery
- Graceful degradation
-
Debugging
- Request/response logging
- Prompt display
- Step-through mode
- Raw HTTP logging
-
Security
- CORS configuration
- Input validation
- Error sanitization
API Specification
File Locations
Copy
src/gaia/api/app.py # CLI entry point
src/gaia/api/openai_server.py # FastAPI server
src/gaia/api/schemas.py # Pydantic models
src/gaia/api/agent_registry.py # Agent management
src/gaia/api/sse_handler.py # SSE streaming
Schemas (Pydantic Models)
Copy
from typing import Any, Dict, List, Literal, Optional
from pydantic import BaseModel, Field
class ChatMessage(BaseModel):
"""Chat message in OpenAI format."""
role: Literal["system", "user", "assistant", "tool"]
content: Optional[str] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
tool_call_id: Optional[str] = None
class ChatCompletionRequest(BaseModel):
"""POST /v1/chat/completions request."""
model: str = Field(..., description="Model ID (e.g., gaia-code, gaia-jira)")
messages: List[ChatMessage]
stream: bool = Field(default=False, description="Enable SSE streaming")
temperature: Optional[float] = Field(default=0.7, ge=0, le=2)
max_tokens: Optional[int] = Field(default=None, gt=0)
top_p: Optional[float] = Field(default=1.0, ge=0, le=1)
class ChatCompletionResponseMessage(BaseModel):
"""Response message from chat completion."""
role: Literal["assistant"]
content: str
tool_calls: Optional[List[Dict[str, Any]]] = None
class ChatCompletionChoice(BaseModel):
"""A single completion choice."""
index: int
message: ChatCompletionResponseMessage
finish_reason: Literal["stop", "length"]
class UsageInfo(BaseModel):
"""Token usage information."""
prompt_tokens: int
completion_tokens: int
total_tokens: int
class ChatCompletionResponse(BaseModel):
"""POST /v1/chat/completions response (non-streaming)."""
id: str
object: Literal["chat.completion"]
created: int
model: str
choices: List[ChatCompletionChoice]
usage: UsageInfo
class ModelInfo(BaseModel):
"""Model metadata for /v1/models endpoint."""
id: str
object: Literal["model"]
created: int
owned_by: str
description: Optional[str] = None
max_input_tokens: Optional[int] = None
max_output_tokens: Optional[int] = None
class ModelListResponse(BaseModel):
"""GET /v1/models response."""
object: Literal["list"]
data: List[ModelInfo]
API Endpoints
Copy
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI(
title="GAIA OpenAI-Compatible API",
description="OpenAI-compatible API for GAIA agents",
version="1.0.0",
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest):
"""
Create chat completion (OpenAI-compatible endpoint).
Supports both streaming (SSE) and non-streaming responses.
Args:
request: Chat completion request with model, messages, and options
Returns:
For non-streaming: ChatCompletionResponse
For streaming: StreamingResponse with SSE chunks
Raises:
HTTPException 404: Model not found
HTTPException 400: No user message in request
Example Non-Streaming:
POST /v1/chat/completions
{
"model": "gaia-code",
"messages": [{"role": "user", "content": "Write hello world"}],
"stream": false
}
Response:
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1234567890,
"model": "gaia-code",
"choices": [{
"index": 0,
"message": {
"role": "assistant",
"content": "Here's hello world:\\nprint('Hello, World!')"
},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 10,
"completion_tokens": 20,
"total_tokens": 30
}
}
Example Streaming:
POST /v1/chat/completions
{
"model": "gaia-code",
"messages": [{"role": "user", "content": "Write hello world"}],
"stream": true
}
Response (SSE):
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk",...}
data: {"id":"chatcmpl-abc","object":"chat.completion.chunk",...}
data: [DONE]
"""
pass
@app.get("/v1/models")
async def list_models() -> ModelListResponse:
"""
List available models (GAIA agents).
Returns:
ModelListResponse with list of available agents as models
Example:
GET /v1/models
Response:
{
"object": "list",
"data": [
{
"id": "gaia-code",
"object": "model",
"created": 1234567890,
"owned_by": "amd-gaia",
"description": "Autonomous Python coding agent",
"max_input_tokens": 32768,
"max_output_tokens": 8192
},
{
"id": "gaia-jira",
"object": "model",
"created": 1234567890,
"owned_by": "amd-gaia"
}
]
}
"""
pass
@app.get("/health")
async def health_check():
"""
Health check endpoint.
Returns:
{"status": "ok"}
Example:
GET /health
Response:
{"status": "ok"}
"""
return {"status": "ok"}
Agent Registry
Copy
from typing import Dict, Optional
from gaia.agents.base import Agent, ApiAgent
class AgentRegistry:
"""
Registry for managing GAIA agents exposed via API.
Features:
- Dynamic agent registration
- Singleton instances
- Model ID mapping
- Workspace configuration
"""
def __init__(self):
"""Initialize agent registry."""
self._agents: Dict[str, Agent] = {}
self._agent_classes: Dict[str, type] = {}
def register_agent(self, agent_class: type, model_id: Optional[str] = None):
"""
Register an agent class.
Args:
agent_class: Agent class to register
model_id: Optional custom model ID (default: from get_model_id())
Example:
>>> from gaia.agents.code import CodeAgent
>>> registry.register_agent(CodeAgent)
>>> registry.register_agent(CodeAgent, model_id="custom-code")
"""
pass
def get_agent(self, model_id: str, workspace_root: Optional[str] = None) -> Agent:
"""
Get or create agent instance.
Args:
model_id: Model/agent identifier
workspace_root: Optional workspace root path
Returns:
Agent instance
Raises:
ValueError: If model_id not found
Example:
>>> agent = registry.get_agent("gaia-code", workspace_root="/path/to/project")
"""
pass
def list_models(self) -> List[Dict[str, Any]]:
"""
List all registered models.
Returns:
List of model info dictionaries
Example:
>>> models = registry.list_models()
>>> [m["id"] for m in models]
["gaia-code", "gaia-jira"]
"""
pass
# Global registry instance
registry = AgentRegistry()
# Register agents
from gaia.agents.code import CodeAgent
from gaia.agents.jira import JiraAgent
registry.register_agent(CodeAgent)
registry.register_agent(JiraAgent)
SSE Streaming
Copy
from typing import AsyncGenerator
import json
import uuid
import time
async def stream_chat_completion(
agent: Agent,
user_message: str,
model_id: str,
) -> AsyncGenerator[str, None]:
"""
Stream chat completion using SSE format.
Args:
agent: Agent instance to use
user_message: User's message
model_id: Model identifier
Yields:
SSE-formatted chunks (data: {...})
Example:
async for chunk in stream_chat_completion(agent, "Hello", "gaia-code"):
print(chunk)
# Output:
# data: {"id":"chatcmpl-123","object":"chat.completion.chunk",...}
# data: {"id":"chatcmpl-123","object":"chat.completion.chunk",...}
# data: [DONE]
"""
chunk_id = f"chatcmpl-{uuid.uuid4().hex[:8]}"
created = int(time.time())
try:
# Stream response from agent
for delta_content in agent.stream_query(user_message):
chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": created,
"model": model_id,
"choices": [
{
"index": 0,
"delta": {"content": delta_content},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk)}\n\n"
# Final chunk with finish_reason
final_chunk = {
"id": chunk_id,
"object": "chat.completion.chunk",
"created": created,
"model": model_id,
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop",
}
],
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
error_chunk = {
"error": {
"message": str(e),
"type": "server_error",
}
}
yield f"data: {json.dumps(error_chunk)}\n\n"
Implementation Details
Workspace Root Extraction
Copy
def extract_workspace_root(messages):
"""
Extract workspace root path from GitHub Copilot messages.
GitHub Copilot includes workspace info in messages like:
<workspace_info>
I am working in a workspace with the following folders:
- /Users/username/path/to/workspace
</workspace_info>
Args:
messages: List of ChatMessage objects
Returns:
str: Workspace root path, or None if not found
"""
import re
for msg in messages:
if msg.role == "user" and msg.content:
workspace_match = re.search(
r"<workspace_info>.*?following folders:\s*\n\s*-\s*([^\s\n]+)",
msg.content,
re.DOTALL,
)
if workspace_match:
return workspace_match.group(1).strip()
return None
Request Processing Flow
Copy
async def create_chat_completion(request: ChatCompletionRequest):
# 1. Extract workspace root
workspace_root = extract_workspace_root(request.messages)
# 2. Get agent from registry
try:
agent = registry.get_agent(request.model, workspace_root)
except ValueError:
raise HTTPException(404, f"Model not found: {request.model}")
# 3. Extract user message
user_messages = [m for m in request.messages if m.role == "user"]
if not user_messages:
raise HTTPException(400, "No user message in request")
user_message = user_messages[-1].content
# 4. Generate response
if request.stream:
# Streaming response
return StreamingResponse(
stream_chat_completion(agent, user_message, request.model),
media_type="text/event-stream",
)
else:
# Non-streaming response
response_text = agent.process_query(user_message)
# Token counting
if isinstance(agent, ApiAgent):
prompt_tokens = agent.estimate_tokens(user_message)
completion_tokens = agent.estimate_tokens(response_text)
else:
prompt_tokens = len(user_message) // 4
completion_tokens = len(response_text) // 4
return ChatCompletionResponse(
id=f"chatcmpl-{uuid.uuid4().hex[:8]}",
object="chat.completion",
created=int(time.time()),
model=request.model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatCompletionResponseMessage(
role="assistant",
content=response_text,
),
finish_reason="stop",
)
],
usage=UsageInfo(
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=prompt_tokens + completion_tokens,
),
)
Debug Middleware
Copy
@app.middleware("http")
async def log_raw_requests(request: Request, call_next):
"""
Middleware to log raw HTTP requests when debug mode is enabled.
"""
if os.environ.get("GAIA_API_DEBUG") == "1":
logger.debug("=" * 80)
logger.debug("RAW HTTP REQUEST")
logger.debug("=" * 80)
logger.debug(f"Path: {request.url.path}")
logger.debug(f"Method: {request.method}")
logger.debug("Headers:")
for name, value in request.headers.items():
logger.debug(f" {name}: {value}")
# Don't read body for streaming endpoints
if request.url.path != "/v1/chat/completions":
body_bytes = await request.body()
logger.debug(f"Body: {body_bytes.decode('utf-8')}")
logger.debug("=" * 80)
response = await call_next(request)
return response
Testing Requirements
Unit Tests
File:tests/api/test_api_server.py
Copy
import pytest
from fastapi.testclient import TestClient
from gaia.api.openai_server import app
client = TestClient(app)
def test_health_check():
"""Test health endpoint."""
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}
def test_list_models():
"""Test models listing."""
response = client.get("/v1/models")
assert response.status_code == 200
data = response.json()
assert data["object"] == "list"
assert isinstance(data["data"], list)
assert len(data["data"]) > 0
def test_chat_completion_non_streaming():
"""Test non-streaming chat completion."""
response = client.post(
"/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [{"role": "user", "content": "Say hello"}],
"stream": False
}
)
assert response.status_code == 200
data = response.json()
assert data["object"] == "chat.completion"
assert len(data["choices"]) > 0
assert data["choices"][0]["message"]["role"] == "assistant"
assert isinstance(data["choices"][0]["message"]["content"], str)
assert "usage" in data
def test_chat_completion_streaming():
"""Test streaming chat completion."""
with client.stream(
"POST",
"/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [{"role": "user", "content": "Count to 3"}],
"stream": True
}
) as response:
assert response.status_code == 200
assert response.headers["content-type"] == "text/event-stream"
chunks = []
for line in response.iter_lines():
if line.startswith("data: "):
data = line[6:] # Remove "data: " prefix
if data != "[DONE]":
import json
chunk = json.loads(data)
chunks.append(chunk)
assert len(chunks) > 0
assert chunks[0]["object"] == "chat.completion.chunk"
def test_chat_completion_invalid_model():
"""Test chat completion with invalid model."""
response = client.post(
"/v1/chat/completions",
json={
"model": "nonexistent-model",
"messages": [{"role": "user", "content": "Hello"}],
}
)
assert response.status_code == 404
def test_chat_completion_no_user_message():
"""Test chat completion without user message."""
response = client.post(
"/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [{"role": "system", "content": "You are helpful"}],
}
)
assert response.status_code == 400
Usage Examples
Example 1: Start API Server
Copy
# Foreground
gaia api start
# Background
gaia api start --background
# Custom host/port
gaia api start --host 0.0.0.0 --port 8000
# With debug
gaia api start --debug --show-prompts
# Stop server
gaia api stop
Example 2: Non-Streaming Request (curl)
Copy
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Write a hello world program"}
],
"stream": false
}'
Example 3: Streaming Request (curl)
Copy
curl -X POST http://localhost:8080/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Write a hello world program"}
],
"stream": true
}'
Example 4: Python Client
Copy
import requests
# List models
response = requests.get("http://localhost:8080/v1/models")
models = response.json()["data"]
for model in models:
print(f"{model['id']}: {model.get('description', 'No description')}")
# Non-streaming chat
response = requests.post(
"http://localhost:8080/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Write hello world in Python"}
],
"stream": False
}
)
result = response.json()
print(result["choices"][0]["message"]["content"])
# Streaming chat
response = requests.post(
"http://localhost:8080/v1/chat/completions",
json={
"model": "gaia-code",
"messages": [
{"role": "user", "content": "Count to 5"}
],
"stream": True
},
stream=True
)
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith("data: "):
data = line[6:]
if data != "[DONE]":
import json
chunk = json.loads(data)
content = chunk["choices"][0]["delta"].get("content", "")
if content:
print(content, end="", flush=True)
print()
Example 5: OpenAI Python SDK
Copy
from openai import OpenAI
# Point to GAIA API server
client = OpenAI(
base_url="http://localhost:8080/v1",
api_key="none" # Not required for GAIA
)
# Non-streaming
response = client.chat.completions.create(
model="gaia-code",
messages=[
{"role": "user", "content": "Write hello world"}
]
)
print(response.choices[0].message.content)
# Streaming
stream = client.chat.completions.create(
model="gaia-code",
messages=[
{"role": "user", "content": "Count to 5"}
],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print()
CLI Interface
Commands
Copy
# app.py
def start_server(
host: str = "localhost",
port: int = 8080,
background: bool = False,
debug: bool = False,
show_prompts: bool = False,
streaming: bool = False,
step_through: bool = False,
):
"""
Start the API server.
Args:
host: Host to bind to (default: localhost)
port: Port to bind to (default: 8080)
background: Run in background if True
debug: Enable debug logging
show_prompts: Display prompts sent to LLM
streaming: Enable real-time streaming of LLM responses
step_through: Enable step-through debugging mode
"""
pass
def stop_server(port: int = 8080):
"""
Stop the API server by finding and killing processes on the port.
Args:
port: Port number to stop server on (default: 8080)
"""
pass
def check_status():
"""Check if API server is running."""
pass
Acceptance Criteria
- OpenAI-compatible API implemented in
src/gaia/api/openai_server.py - Schemas defined in
src/gaia/api/schemas.py - Agent registry implemented in
src/gaia/api/agent_registry.py - CLI commands in
src/gaia/api/app.py - Non-streaming responses work
- SSE streaming works
- Models endpoint works
- Health check works
- Workspace root extraction works
- All unit tests pass (10+ tests)
- Can start/stop server via CLI
- Compatible with OpenAI SDK
- Example code works
API Server Technical Specification