Skip to main content
Component: DatabaseMixin Module: gaia.core.database_mixin (planned location) Import: from gaia import DatabaseMixin (after v1.0.0 restructure)

Overview

DatabaseMixin provides database connectivity for GAIA agents using SQLAlchemy Core (not ORM). It enables agents to store and query data across multiple database backends (SQLite, PostgreSQL, MySQL) with a simple, consistent API. Why not use ORM?
  • Simpler for AI agents (tools return dicts, not objects)
  • More explicit SQL (easier for LLMs to reason about)
  • Better performance for simple queries
  • Lower learning curve

Requirements

Functional Requirements

  1. Multi-Database Support
    • SQLite for development and small deployments
    • PostgreSQL for production
    • MySQL for existing infrastructure
  2. Simple Query Interface
    • execute_query() - SELECT queries returning list of dicts
    • execute_insert() - INSERT with optional RETURNING clause
    • execute_update() - UPDATE with parameterized WHERE
    • execute_delete() - DELETE with parameterized WHERE
  3. Transaction Management
    • Context manager for automatic commit/rollback
    • Explicit transaction control
    • Nested transaction support
  4. Connection Pooling
    • Configurable pool size
    • Automatic connection recycling
    • Thread-safe operations
  5. Schema Management
    • Initialize from SQL file
    • Table existence checking
    • Schema reflection
  6. Security
    • Parameterized queries (SQL injection prevention)
    • No raw SQL string interpolation
    • Named parameters only

Non-Functional Requirements

  1. Performance
    • Connection pooling for concurrent requests
    • Lazy initialization option
    • Query result streaming for large datasets
  2. Error Handling
    • Clear error messages
    • Connection retry logic
    • Transaction rollback on errors
  3. Testability
    • In-memory SQLite for tests
    • Temporary database creation
    • Easy mocking

API Specification

File Location

src/gaia/core/database_mixin.py

Public Interface

from typing import Any, Dict, List, Optional, Union
from contextlib import contextmanager
from sqlalchemy import create_engine, text, MetaData
from sqlalchemy.engine import Engine, Connection

class DatabaseMixin:
    """
    Mixin providing database connectivity for GAIA agents.

    Uses SQLAlchemy Core for multi-database support with connection pooling.

    Usage:
        class MyAgent(Agent, DatabaseMixin):
            def __init__(self, db_url: str, **kwargs):
                super().__init__(**kwargs)
                self.initialize_database(db_url)

    Supported databases:
        - SQLite: "sqlite:///path/to/db.sqlite"
        - PostgreSQL: "postgresql://user:pass@host/db"
        - MySQL: "mysql://user:pass@host/db"
    """

    _engine: Optional[Engine] = None
    _metadata: Optional[MetaData] = None

    def initialize_database(
        self,
        db_url: str,
        schema_file: Optional[str] = None,
        pool_size: int = 5,
        max_overflow: int = 10,
        pool_timeout: int = 30,
        echo: bool = False,
    ) -> None:
        """
        Initialize database connection.

        Args:
            db_url: SQLAlchemy database URL
            schema_file: Optional SQL file to execute on initialization
            pool_size: Number of connections in pool (default: 5)
            max_overflow: Max connections beyond pool (default: 10)
            pool_timeout: Timeout in seconds (default: 30)
            echo: Log all SQL statements (default: False)

        Raises:
            ValueError: If db_url is invalid
            FileNotFoundError: If schema_file doesn't exist
            DatabaseError: If initialization fails
        """
        pass

    def execute_query(
        self,
        query: str,
        params: Optional[Dict[str, Any]] = None,
    ) -> List[Dict[str, Any]]:
        """
        Execute SELECT query and return results as list of dicts.

        Args:
            query: SQL query with :param_name placeholders
            params: Dictionary of parameter values

        Returns:
            List of row dictionaries

        Example:
            results = self.execute_query(
                "SELECT * FROM users WHERE name = :name",
                {"name": "Alice"}
            )

        Raises:
            DatabaseError: If query fails
        """
        pass

    def execute_insert(
        self,
        table: str,
        data: Dict[str, Any],
        returning: Optional[str] = None,
    ) -> Optional[Any]:
        """
        Insert a row and optionally return a column value.

        Args:
            table: Table name
            data: Column-value dictionary
            returning: Column to return (e.g., "id")

        Returns:
            Value of returning column if specified, else None

        Example:
            user_id = self.execute_insert(
                "users",
                {"name": "Bob", "email": "[email protected]"},
                returning="id"
            )

        Raises:
            DatabaseError: If insert fails
        """
        pass

    def execute_update(
        self,
        table: str,
        data: Dict[str, Any],
        where: str,
        where_params: Dict[str, Any],
    ) -> int:
        """
        Update rows matching condition.

        Args:
            table: Table name
            data: Column-value dictionary to update
            where: WHERE clause with :param placeholders
            where_params: Parameters for WHERE clause

        Returns:
            Number of rows affected

        Example:
            count = self.execute_update(
                "users",
                {"email": "[email protected]"},
                "id = :user_id",
                {"user_id": 42}
            )

        Raises:
            DatabaseError: If update fails
        """
        pass

    def execute_delete(
        self,
        table: str,
        where: str,
        where_params: Dict[str, Any],
    ) -> int:
        """
        Delete rows matching condition.

        Args:
            table: Table name
            where: WHERE clause with :param placeholders
            where_params: Parameters for WHERE clause

        Returns:
            Number of rows deleted

        Example:
            count = self.execute_delete(
                "users",
                "inactive = :inactive AND last_login < :date",
                {"inactive": True, "date": "2020-01-01"}
            )

        Raises:
            DatabaseError: If delete fails
        """
        pass

    @contextmanager
    def transaction(self) -> Connection:
        """
        Execute operations within a transaction.

        Auto-commits on success, rolls back on exception.

        Usage:
            with self.transaction() as conn:
                conn.execute(text("INSERT INTO users ..."))
                conn.execute(text("UPDATE accounts ..."))

        Yields:
            Connection: SQLAlchemy connection

        Raises:
            DatabaseError: If transaction fails
        """
        pass

    @contextmanager
    def get_connection(self) -> Connection:
        """
        Get a database connection with automatic cleanup.

        Usage:
            with self.get_connection() as conn:
                result = conn.execute(text("SELECT * FROM users"))

        Yields:
            Connection: SQLAlchemy connection

        Raises:
            RuntimeError: If database not initialized
        """
        pass

    def table_exists(self, table_name: str) -> bool:
        """
        Check if a table exists in the database.

        Args:
            table_name: Name of the table

        Returns:
            True if table exists, False otherwise
        """
        pass

    def close_database(self) -> None:
        """
        Close database connections and dispose of engine.

        Call this in agent cleanup/shutdown.
        """
        pass

Implementation Details

Connection Pooling

SQLite-specific handling:
if db_url.startswith("sqlite"):
    self._engine = create_engine(
        db_url,
        echo=echo,
        connect_args={"check_same_thread": False},  # Allow multi-thread
    )
Other databases:
else:
    self._engine = create_engine(
        db_url,
        echo=echo,
        poolclass=QueuePool,
        pool_size=pool_size,
        max_overflow=max_overflow,
        pool_timeout=pool_timeout,
    )

Schema Initialization

def initialize_database(self, db_url: str, schema_file: Optional[str] = None, ...):
    # Create engine
    self._engine = create_engine(...)

    # Load and execute schema if provided
    if schema_file:
        schema_path = Path(schema_file)
        if not schema_path.exists():
            raise FileNotFoundError(f"Schema file not found: {schema_file}")

        schema_sql = schema_path.read_text()
        with self.transaction() as conn:
            # Execute schema (may contain multiple statements)
            for statement in schema_sql.split(';'):
                if statement.strip():
                    conn.execute(text(statement))

    # Reflect schema for table_exists()
    self._metadata = MetaData()
    self._metadata.reflect(bind=self._engine)

Query Execution

def execute_query(self, query: str, params: Optional[Dict] = None) -> List[Dict]:
    if self._engine is None:
        raise RuntimeError("Database not initialized. Call initialize_database() first.")

    with self.get_connection() as conn:
        result = conn.execute(text(query), params or {})
        columns = result.keys()
        return [dict(zip(columns, row)) for row in result.fetchall()]

INSERT with RETURNING (Database-specific)

def execute_insert(self, table: str, data: Dict, returning: Optional[str] = None):
    columns = ", ".join(data.keys())
    placeholders = ", ".join(f":{k}" for k in data.keys())
    query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"

    if returning:
        # SQLite uses lastrowid, PostgreSQL uses RETURNING
        if self._engine.dialect.name == "sqlite":
            with self.transaction() as conn:
                result = conn.execute(text(query), data)
                return result.lastrowid
        else:
            query += f" RETURNING {returning}"
            with self.transaction() as conn:
                result = conn.execute(text(query), data)
                row = result.fetchone()
                return row[0] if row else None
    else:
        with self.transaction() as conn:
            conn.execute(text(query), data)
        return None

UPDATE with Parameter Safety

def execute_update(self, table: str, data: Dict, where: str, where_params: Dict):
    # Prefix data params to avoid collision with where_params
    set_clause = ", ".join(f"{k} = :set_{k}" for k in data.keys())
    query = f"UPDATE {table} SET {set_clause} WHERE {where}"

    params = {f"set_{k}": v for k, v in data.items()}
    params.update(where_params)

    with self.transaction() as conn:
        result = conn.execute(text(query), params)
        return result.rowcount

Testing Requirements

Unit Tests

File: tests/sdk/test_database_mixin.py
import pytest
from gaia import Agent, DatabaseMixin
import tempfile
from pathlib import Path

@pytest.fixture
def temp_db():
    """Create temporary SQLite database."""
    with tempfile.NamedTemporaryFile(suffix='.db', delete=False) as f:
        db_path = f.name
    yield f"sqlite:///{db_path}"
    Path(db_path).unlink(missing_ok=True)

def test_database_mixin_can_be_imported():
    """Verify DatabaseMixin can be imported."""
    from gaia import DatabaseMixin
    assert DatabaseMixin is not None

def test_initialize_database(temp_db):
    """Test database initialization."""
    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db)

    assert agent._engine is not None
    assert agent._metadata is not None

def test_initialize_with_schema(temp_db, tmp_path):
    """Test database initialization with schema file."""
    # Create schema file
    schema_file = tmp_path / "schema.sql"
    schema_file.write_text("""
        CREATE TABLE users (
            id INTEGER PRIMARY KEY,
            name TEXT NOT NULL
        );
    """)

    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db, schema_file=str(schema_file))

    # Verify table was created
    assert agent.table_exists("users")

def test_execute_query(temp_db, tmp_path):
    """Test query execution."""
    schema = tmp_path / "schema.sql"
    schema.write_text("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);")

    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db, schema_file=str(schema))

    # Insert test data
    agent.execute_insert("users", {"id": 1, "name": "Alice"})

    # Query
    results = agent.execute_query("SELECT * FROM users WHERE name = :name", {"name": "Alice"})

    assert len(results) == 1
    assert results[0]["name"] == "Alice"

def test_execute_insert_with_returning(temp_db, tmp_path):
    """Test insert with RETURNING clause."""
    schema = tmp_path / "schema.sql"
    schema.write_text("CREATE TABLE users (id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT);")

    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db, schema_file=str(schema))

    # Insert with returning
    user_id = agent.execute_insert(
        "users",
        {"name": "Bob"},
        returning="id"
    )

    assert user_id is not None
    assert isinstance(user_id, int)

def test_execute_update(temp_db, tmp_path):
    """Test update operation."""
    schema = tmp_path / "schema.sql"
    schema.write_text("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT, email TEXT);")

    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db, schema_file=str(schema))

    # Insert
    agent.execute_insert("users", {"id": 1, "name": "Alice", "email": "[email protected]"})

    # Update
    rows_affected = agent.execute_update(
        "users",
        {"email": "[email protected]"},
        "id = :user_id",
        {"user_id": 1}
    )

    assert rows_affected == 1

    # Verify
    results = agent.execute_query("SELECT email FROM users WHERE id = 1")
    assert results[0]["email"] == "[email protected]"

def test_transaction_commit(temp_db, tmp_path):
    """Test transaction commits on success."""
    schema = tmp_path / "schema.sql"
    schema.write_text("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);")

    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db, schema_file=str(schema))

    # Transaction that succeeds
    with agent.transaction() as conn:
        conn.execute(text("INSERT INTO users (id, name) VALUES (:id, :name)"), {"id": 1, "name": "Alice"})

    # Verify committed
    results = agent.execute_query("SELECT * FROM users")
    assert len(results) == 1

def test_transaction_rollback(temp_db, tmp_path):
    """Test transaction rolls back on exception."""
    schema = tmp_path / "schema.sql"
    schema.write_text("CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);")

    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db, schema_file=str(schema))

    # Transaction that fails
    try:
        with agent.transaction() as conn:
            conn.execute(text("INSERT INTO users (id, name) VALUES (:id, :name)"), {"id": 1, "name": "Alice"})
            raise Exception("Simulated error")
    except:
        pass

    # Verify rolled back
    results = agent.execute_query("SELECT * FROM users")
    assert len(results) == 0

def test_table_exists(temp_db, tmp_path):
    """Test table existence checking."""
    schema = tmp_path / "schema.sql"
    schema.write_text("CREATE TABLE users (id INTEGER PRIMARY KEY);")

    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(temp_db, schema_file=str(schema))

    assert agent.table_exists("users") is True
    assert agent.table_exists("nonexistent") is False

def test_multiple_databases():
    """Test different database types."""
    # SQLite
    sqlite_url = "sqlite:///:memory:"

    # PostgreSQL (would need real server)
    # postgres_url = "postgresql://user:pass@localhost/testdb"

    # MySQL (would need real server)
    # mysql_url = "mysql://user:pass@localhost/testdb"

    # For now, just test SQLite
    class TestAgent(Agent, DatabaseMixin):
        def _get_system_prompt(self): return "Test"
        def _create_console(self):
            from gaia import SilentConsole
            return SilentConsole()
        def _register_tools(self): pass

    agent = TestAgent(silent_mode=True)
    agent.initialize_database(sqlite_url)
    assert agent._engine is not None

Dependencies

Required Packages

# pyproject.toml
[project]
dependencies = [
    "sqlalchemy>=2.0",
]

[project.optional-dependencies]
database-postgresql = ["psycopg2-binary>=2.9"]
database-mysql = ["pymysql>=1.0"]

Import Dependencies

from typing import Any, Dict, List, Optional
from contextlib import contextmanager
from pathlib import Path
import logging

from sqlalchemy import create_engine, text, MetaData
from sqlalchemy.engine import Engine, Connection
from sqlalchemy.pool import QueuePool

Error Handling

Common Errors and Responses

# Database not initialized
def execute_query(self, ...):
    if self._engine is None:
        raise RuntimeError(
            "Database not initialized. "
            "Call initialize_database() in your agent's __init__"
        )

# SQL error
try:
    result = conn.execute(text(query), params)
except SQLAlchemyError as e:
    logger.error(f"Database error: {e}")
    raise DatabaseError(f"Query failed: {e}") from e

# Connection error
try:
    conn = self._engine.connect()
except OperationalError as e:
    logger.error(f"Connection failed: {e}")
    raise DatabaseError(
        f"Could not connect to database. "
        f"Ensure database server is running."
    ) from e

Usage Examples

Example 1: Simple Agent with Database

from gaia import Agent, DatabaseMixin, tool

class CustomerAgent(Agent, DatabaseMixin):
    """Agent for customer management."""

    def __init__(self, db_url: str = "sqlite:///customers.db", **kwargs):
        super().__init__(**kwargs)
        self.initialize_database(db_url, schema_file="schema.sql")

    def _get_system_prompt(self) -> str:
        return "You manage customer records."

    def _create_console(self):
        from gaia import AgentConsole
        return AgentConsole()

    def _register_tools(self):
        @tool
        def find_customer(name: str) -> dict:
            """Find customers by name."""
            results = self.execute_query(
                "SELECT * FROM customers WHERE name LIKE :name",
                {"name": f"%{name}%"}
            )
            return {"customers": results, "count": len(results)}

        @tool
        def create_customer(name: str, email: str) -> dict:
            """Create new customer."""
            customer_id = self.execute_insert(
                "customers",
                {"name": name, "email": email},
                returning="id"
            )
            return {"status": "created", "id": customer_id}

Example 2: Transaction Usage

@tool
def transfer_balance(from_id: int, to_id: int, amount: float) -> dict:
    """Transfer balance between accounts."""
    try:
        with self.transaction() as conn:
            # Debit from account
            conn.execute(
                text("UPDATE accounts SET balance = balance - :amount WHERE id = :id"),
                {"amount": amount, "id": from_id}
            )

            # Credit to account
            conn.execute(
                text("UPDATE accounts SET balance = balance + :amount WHERE id = :id"),
                {"amount": amount, "id": to_id}
            )

            # Both succeed or both roll back
        return {"status": "success", "amount": amount}

    except Exception as e:
        return {"status": "error", "error": str(e)}

Documentation Updates Required

SDK.md

Add to Section 1 (Core Agent System):
### 1.4 DatabaseMixin

**Import:** `from gaia import DatabaseMixin`

**Purpose:** Add database capabilities to your agent using SQLAlchemy Core.

**When to use:**
- Storing agent state across sessions
- Managing user data
- Tracking workflow history
- Any persistent data needs

[Full documentation with examples]

Update Existing Examples

Replace all temporary database code in SDK.md with DatabaseMixin usage.

Acceptance Criteria

  • DatabaseMixin class implemented in src/gaia/core/database_mixin.py
  • All methods implemented with docstrings
  • Supports SQLite, PostgreSQL, MySQL
  • Connection pooling works
  • Transaction management works (commit/rollback)
  • Schema initialization from file works
  • All unit tests pass (10+ tests)
  • Exported from gaia/__init__.py
  • Can import: from gaia import DatabaseMixin
  • Documented in SDK.md
  • Example agent using DatabaseMixin works
  • EMR agent can use it

Implementation Checklist

Step 1: Create File

  • Create src/gaia/core/database_mixin.py
  • Add copyright header
  • Add module docstring

Step 2: Implement Core Methods

  • initialize_database()
  • execute_query()
  • execute_insert()
  • execute_update()
  • execute_delete()
  • transaction() context manager
  • get_connection() context manager
  • table_exists()
  • close_database()

Step 3: Add Error Handling

  • RuntimeError if not initialized
  • DatabaseError for SQL errors
  • FileNotFoundError for schema
  • Proper error messages

Step 4: Add Logging

  • Log initialization
  • Log query execution (debug level)
  • Log errors
  • Hide credentials in logs

Step 5: Write Tests

  • Create tests/sdk/test_database_mixin.py
  • Test all methods
  • Test transactions
  • Test error cases
  • Test multiple databases

Step 6: Export & Document

  • Add to src/gaia/__init__.py
  • Add to __all__ list
  • Update SDK.md
  • Add example to SDK.md

Step 7: Validate

  • Can import: from gaia import DatabaseMixin
  • Example agent works
  • All tests pass
  • EMR agent can use it

DatabaseMixin Technical Specification