Skip to main content

Complete Examples

20.1 Database-Backed Agent

from gaia.agents.base.agent import Agent
from gaia.agents.base.tools import tool
from gaia.agents.base.console import AgentConsole
import sqlite3
from pathlib import Path

class CustomerAgent(Agent):
    """Agent for customer relationship management."""

    def __init__(self, db_path: str = "customers.db", **kwargs):
        self.db_path = db_path
        self.conn = None
        super().__init__(**kwargs)
        self._init_database()

    def _init_database(self):
        """Initialize SQLite database."""
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row

        # Create schema
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS customers (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                email TEXT,
                phone TEXT,
                notes TEXT
            )
        """)
        self.conn.commit()

    def _get_system_prompt(self) -> str:
        return """You are a customer service assistant.
        Use the tools to search, create, and update customer records."""

    def _create_console(self):
        return AgentConsole()

    def _register_tools(self):
        @tool
        def search_customer(name: str) -> dict:
            """Search for customers by name."""
            cursor = self.conn.execute(
                "SELECT * FROM customers WHERE name LIKE ?",
                (f"%{name}%",)
            )
            customers = [dict(row) for row in cursor.fetchall()]
            return {
                "customers": customers,
                "count": len(customers)
            }

        @tool
        def create_customer(name: str, email: str, phone: str = "") -> dict:
            """Create a new customer record."""
            cursor = self.conn.execute(
                "INSERT INTO customers (name, email, phone) VALUES (?, ?, ?)",
                (name, email, phone)
            )
            self.conn.commit()
            return {
                "status": "created",
                "customer_id": cursor.lastrowid
            }

        @tool
        def update_customer(customer_id: int, notes: str) -> dict:
            """Add notes to a customer record."""
            self.conn.execute(
                "UPDATE customers SET notes = ? WHERE id = ?",
                (notes, customer_id)
            )
            self.conn.commit()
            return {"status": "updated", "customer_id": customer_id}

    def __del__(self):
        """Close database connection."""
        if self.conn:
            self.conn.close()

# Usage
agent = CustomerAgent(db_path="crm.db")
result = agent.process_query("Find customer John Smith and add a note that he called today")
print(result)

20.2 Document Q&A Agent

from gaia.agents.base.agent import Agent
from gaia.agents.base.tools import tool
from gaia.agents.base.console import AgentConsole
from gaia.rag.sdk import RAGSDK, RAGConfig

class DocumentQAAgent(Agent):
    """Agent for answering questions about documents."""

    def __init__(self, docs_directory: str = "./docs", **kwargs):
        super().__init__(**kwargs)

        # Initialize RAG
        rag_config = RAGConfig(
            chunk_size=500,
            max_chunks=5
        )
        self.rag = RAGSDK(rag_config)

        # Index all PDFs in directory
        from pathlib import Path
        docs_path = Path(docs_directory)
        for pdf_file in docs_path.glob("**/*.pdf"):
            self.rag.index_document(str(pdf_file))

    def _get_system_prompt(self) -> str:
        return """You are a document research assistant.
        Use the search_docs tool to find relevant information."""

    def _create_console(self):
        return AgentConsole()

    def _register_tools(self):
        @tool
        def search_docs(question: str) -> dict:
            """Search indexed documents for information."""
            response = self.rag.query(question)
            return {
                "answer": response.text,
                "sources": response.source_files,
                "relevance_scores": response.chunk_scores
            }

        @tool
        def list_documents() -> dict:
            """List all indexed documents."""
            files = self.rag.get_indexed_files()
            return {
                "documents": files,
                "count": len(files)
            }

# Usage
agent = DocumentQAAgent(docs_directory="./manuals")
result = agent.process_query("What are the installation requirements?")

20.3 Vision + Database Agent

from gaia.agents.base.agent import Agent
from gaia.agents.base.tools import tool
from gaia.llm.vlm_client import VLMClient
from pathlib import Path
import sqlite3
import json

class ReceiptProcessor(Agent):
    """Agent that processes receipt images and stores in database."""

    def __init__(self, db_path: str = "receipts.db", **kwargs):
        super().__init__(**kwargs)
        self.vlm = VLMClient()
        self.conn = sqlite3.connect(db_path)
        self._init_database()

    def _init_database(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS receipts (
                id INTEGER PRIMARY KEY,
                merchant TEXT,
                date TEXT,
                total REAL,
                items TEXT,
                image_path TEXT,
                extracted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        self.conn.commit()

    def _get_system_prompt(self) -> str:
        return """You are a receipt processing assistant.
        Extract data from receipt images and store in the database."""

    def _create_console(self):
        from gaia.agents.base.console import AgentConsole
        return AgentConsole()

    def _register_tools(self):
        @tool
        def process_receipt(image_path: str) -> dict:
            """Extract data from receipt image and save to database."""
            path = Path(image_path)
            if not path.exists():
                return {"error": f"Image not found: {image_path}"}

            # Extract with VLM
            image_bytes = path.read_bytes()
            prompt = """Extract receipt information as JSON:
            {
                "merchant": "",
                "date": "YYYY-MM-DD",
                "total": 0.00,
                "items": [{"name": "", "price": 0.00}]
            }"""

            extracted = self.vlm.extract_from_image(image_bytes, prompt)

            # Parse JSON
            try:
                if "```json" in extracted:
                    json_str = extracted.split("```json")[1].split("```")[0]
                else:
                    json_str = extracted

                data = json.loads(json_str.strip())

                # Save to database
                cursor = self.conn.execute(
                    """INSERT INTO receipts (merchant, date, total, items, image_path)
                       VALUES (?, ?, ?, ?, ?)""",
                    (data["merchant"], data["date"], data["total"],
                     json.dumps(data["items"]), str(path))
                )
                self.conn.commit()

                return {
                    "status": "processed",
                    "receipt_id": cursor.lastrowid,
                    "merchant": data["merchant"],
                    "total": data["total"]
                }

            except json.JSONDecodeError as e:
                return {
                    "status": "error",
                    "error": f"Failed to parse JSON: {e}",
                    "raw_text": extracted[:200]
                }

        @tool
        def get_receipts(merchant: str = None) -> dict:
            """Get receipts, optionally filtered by merchant."""
            query = "SELECT * FROM receipts"
            params = []

            if merchant:
                query += " WHERE merchant LIKE ?"
                params.append(f"%{merchant}%")

            query += " ORDER BY date DESC LIMIT 10"

            cursor = self.conn.execute(query, params)
            receipts = [dict(row) for row in cursor.fetchall()]

            return {
                "receipts": receipts,
                "count": len(receipts)
            }

# Usage
agent = ReceiptProcessor()
result = agent.process_query("Process receipt.jpg and show me all Walmart receipts")