Component: FileChangeHandler
Module: gaia.utils.file_watcher
Import: from gaia.utils import FileChangeHandler
Overview
FileChangeHandler provides a reusable file system watcher for GAIA agents. It monitors directories for file changes (create, modify, delete) and triggers callbacks, enabling agents to automatically process new files.
Key Features:
- File event detection (create, modify, delete)
- Callback-based architecture
- File extension filtering
- Pattern-based ignore rules
- Debouncing to prevent duplicate events
- Integration with watchdog library
Requirements
Functional Requirements
-
File Event Handling
- Detect file creation
- Detect file modification
- Detect file deletion
- Optional: Detect file moves
-
Filtering
- File extension filters
- Filename pattern matching
- Ignore patterns (e.g., temp files, hidden files)
-
Debouncing
- Prevent duplicate events
- Configurable debounce time
- Handle rapid successive changes
-
Callback System
on_created(event) callback
on_modified(event) callback
on_deleted(event) callback
- Pass file path and metadata
-
Integration with watchdog
- Extends
FileSystemEventHandler
- Works with
Observer
- Thread-safe
Non-Functional Requirements
-
Performance
- Low overhead monitoring
- Efficient event filtering
- Non-blocking callbacks
-
Reliability
- Handle edge cases (permission errors, symlinks)
- Graceful degradation
- Proper cleanup
-
Usability
- Simple API
- Good defaults
- Clear error messages
API Specification
File Location
src/gaia/utils/file_watcher.py
Public Interface
from pathlib import Path
from typing import Callable, Optional, Set, List
from watchdog.events import FileSystemEventHandler, FileSystemEvent
import time
import logging
logger = logging.getLogger(__name__)
class FileChangeHandler(FileSystemEventHandler):
"""
Generic file system watcher with callback support.
Monitors directories for file changes and triggers callbacks.
Includes debouncing to prevent duplicate event processing.
Usage:
def process_new_file(event):
print(f"New file: {event.src_path}")
handler = FileChangeHandler(
on_created=process_new_file,
extensions={'.pdf', '.png'},
debounce_seconds=1.0
)
from watchdog.observers import Observer
observer = Observer()
observer.schedule(handler, "/path/to/watch", recursive=False)
observer.start()
Attributes:
on_created: Callback for file creation events
on_modified: Callback for file modification events
on_deleted: Callback for file deletion events
extensions: Set of file extensions to monitor (e.g., {'.pdf', '.txt'})
ignore_patterns: Patterns to ignore (e.g., {'.*', '~*'})
debounce_seconds: Minimum time between events for same file
"""
SUPPORTED_EXTENSIONS: Set[str] = {'.pdf', '.png', '.jpg', '.jpeg', '.txt', '.md', '.docx'}
DEFAULT_IGNORE_PATTERNS: Set[str] = {'.*', '~*', '*.tmp', '*.swp', '*.bak'}
def __init__(
self,
on_created: Optional[Callable[[FileSystemEvent], None]] = None,
on_modified: Optional[Callable[[FileSystemEvent], None]] = None,
on_deleted: Optional[Callable[[FileSystemEvent], None]] = None,
extensions: Optional[Set[str]] = None,
ignore_patterns: Optional[Set[str]] = None,
debounce_seconds: float = 1.0,
):
"""
Initialize file change handler.
Args:
on_created: Callback when file is created
on_modified: Callback when file is modified
on_deleted: Callback when file is deleted
extensions: File extensions to monitor (default: all common types)
ignore_patterns: Patterns to ignore (default: temp/hidden files)
debounce_seconds: Min time between events for same file (default: 1.0)
"""
super().__init__()
self._on_created = on_created
self._on_modified = on_modified
self._on_deleted = on_deleted
self._extensions = extensions or self.SUPPORTED_EXTENSIONS
self._ignore_patterns = ignore_patterns or self.DEFAULT_IGNORE_PATTERNS
self._debounce_seconds = debounce_seconds
self._last_events: Dict[str, float] = {} # path → timestamp
def on_created(self, event: FileSystemEvent) -> None:
"""Handle file creation event."""
if not event.is_directory and self._should_process(event):
if self._is_debounced(event.src_path):
logger.debug(f"Debounced: {event.src_path}")
return
logger.info(f"File created: {event.src_path}")
if self._on_created:
try:
self._on_created(event)
except Exception as e:
logger.error(f"Error in on_created callback: {e}", exc_info=True)
def on_modified(self, event: FileSystemEvent) -> None:
"""Handle file modification event."""
if not event.is_directory and self._should_process(event):
if self._is_debounced(event.src_path):
logger.debug(f"Debounced: {event.src_path}")
return
logger.info(f"File modified: {event.src_path}")
if self._on_modified:
try:
self._on_modified(event)
except Exception as e:
logger.error(f"Error in on_modified callback: {e}", exc_info=True)
def on_deleted(self, event: FileSystemEvent) -> None:
"""Handle file deletion event."""
if not event.is_directory and self._should_process(event):
logger.info(f"File deleted: {event.src_path}")
if self._on_deleted:
try:
self._on_deleted(event)
except Exception as e:
logger.error(f"Error in on_deleted callback: {e}", exc_info=True)
# Clean up debounce tracking
self._last_events.pop(event.src_path, None)
def _should_process(self, event: FileSystemEvent) -> bool:
"""Check if file should be processed based on filters."""
path = Path(event.src_path)
# Check extension
if self._extensions and path.suffix not in self._extensions:
return False
# Check ignore patterns
for pattern in self._ignore_patterns:
if path.match(pattern):
return False
return True
def _is_debounced(self, file_path: str) -> bool:
"""Check if event should be debounced."""
current_time = time.time()
last_time = self._last_events.get(file_path, 0)
if current_time - last_time < self._debounce_seconds:
return True
self._last_events[file_path] = current_time
return False
Testing Requirements
Unit Tests
File: tests/sdk/test_file_change_handler.py
import pytest
from pathlib import Path
import time
import tempfile
from gaia import FileChangeHandler
from watchdog.observers import Observer
from watchdog.events import FileCreatedEvent, FileModifiedEvent
def test_file_change_handler_can_be_imported():
"""Verify FileChangeHandler can be imported from gaia."""
from gaia import FileChangeHandler
assert FileChangeHandler is not None
def test_handler_creation():
"""Test handler can be created."""
handler = FileChangeHandler()
assert handler is not None
def test_handler_with_callbacks():
"""Test handler with callbacks."""
created_files = []
modified_files = []
def on_create(event):
created_files.append(event.src_path)
def on_modify(event):
modified_files.append(event.src_path)
handler = FileChangeHandler(
on_created=on_create,
on_modified=on_modify
)
assert handler._on_created is not None
assert handler._on_modified is not None
def test_extension_filtering():
"""Test file extension filtering."""
processed_files = []
def on_create(event):
processed_files.append(event.src_path)
handler = FileChangeHandler(
on_created=on_create,
extensions={'.pdf', '.txt'}
)
# Simulate events
pdf_event = FileCreatedEvent("test.pdf")
txt_event = FileCreatedEvent("test.txt")
jpg_event = FileCreatedEvent("test.jpg")
handler.on_created(pdf_event)
handler.on_created(txt_event)
handler.on_created(jpg_event)
# Only .pdf and .txt should be processed
assert len(processed_files) == 2
assert "test.pdf" in processed_files[0]
assert "test.txt" in processed_files[1]
def test_ignore_patterns():
"""Test ignore patterns."""
processed_files = []
def on_create(event):
processed_files.append(event.src_path)
handler = FileChangeHandler(
on_created=on_create,
ignore_patterns={'.*', '~*'}
)
# Simulate events
normal_event = FileCreatedEvent("file.txt")
hidden_event = FileCreatedEvent(".hidden")
temp_event = FileCreatedEvent("~temp.txt")
handler.on_created(normal_event)
handler.on_created(hidden_event)
handler.on_created(temp_event)
# Only normal file should be processed
assert len(processed_files) == 1
assert "file.txt" in processed_files[0]
def test_debouncing():
"""Test event debouncing."""
call_count = 0
def on_create(event):
nonlocal call_count
call_count += 1
handler = FileChangeHandler(
on_created=on_create,
debounce_seconds=0.5
)
# Rapid events on same file
event = FileCreatedEvent("test.txt")
handler.on_created(event)
handler.on_created(event) # Should be debounced
handler.on_created(event) # Should be debounced
assert call_count == 1
# Wait for debounce period
time.sleep(0.6)
handler.on_created(event) # Should process
assert call_count == 2
def test_real_file_watching(tmp_path):
"""Test with real file system changes."""
processed_files = []
def on_create(event):
processed_files.append(Path(event.src_path).name)
handler = FileChangeHandler(
on_created=on_create,
extensions={'.txt'}
)
observer = Observer()
observer.schedule(handler, str(tmp_path), recursive=False)
observer.start()
try:
# Create a file
test_file = tmp_path / "test.txt"
test_file.write_text("test content")
# Wait for event
time.sleep(0.5)
# Verify callback was triggered
assert "test.txt" in processed_files
finally:
observer.stop()
observer.join(timeout=5)
def test_callback_error_handling():
"""Test that callback errors don't crash handler."""
def bad_callback(event):
raise Exception("Callback error")
handler = FileChangeHandler(on_created=bad_callback)
# Should not raise
event = FileCreatedEvent("test.txt")
handler.on_created(event) # Logs error but doesn't crash
Usage Examples
from gaia import Agent, FileChangeHandler
from gaia.llm import VLMClient
from watchdog.observers import Observer
from pathlib import Path
class MedicalIntakeAgent(Agent):
"""Process medical intake forms automatically."""
def __init__(self, watch_dir: str = "./intake_forms", **kwargs):
super().__init__(**kwargs)
self.watch_dir = Path(watch_dir)
self.watch_dir.mkdir(exist_ok=True)
self.vlm = VLMClient()
self._observer = None
self._start_watching()
def _start_watching(self):
"""Start watching for new intake forms."""
def process_new_form(event):
print(f"New form detected: {event.src_path}")
# Extract data from form
result = self._extract_form_data(event.src_path)
print(f"Extracted: {result}")
handler = FileChangeHandler(
on_created=process_new_form,
extensions={'.pdf', '.png', '.jpg'},
debounce_seconds=2.0
)
self._observer = Observer()
self._observer.schedule(handler, str(self.watch_dir), recursive=False)
self._observer.start()
print(f"Watching: {self.watch_dir}")
def _extract_form_data(self, image_path: str) -> dict:
"""Extract data from intake form."""
path = Path(image_path)
image_bytes = path.read_bytes()
extracted = self.vlm.extract_from_image(image_bytes, "Extract patient data")
return {"file": str(path), "data": extracted}
def __del__(self):
"""Stop watching on cleanup."""
if self._observer:
self._observer.stop()
self._observer.join(timeout=5)
Example 2: Document Indexing Agent
from gaia import Agent, FileChangeHandler
from gaia.rag.sdk import RAGSDK, RAGConfig
from watchdog.observers import Observer
class AutoIndexAgent(Agent):
"""Automatically index new documents."""
def __init__(self, docs_dir: str = "./docs", **kwargs):
super().__init__(**kwargs)
self.rag = RAGSDK(RAGConfig())
self._setup_watching(docs_dir)
def _setup_watching(self, docs_dir: str):
"""Watch directory for new documents."""
def index_new_doc(event):
print(f"Indexing: {event.src_path}")
self.rag.index_document(event.src_path)
print(f"✅ Indexed: {Path(event.src_path).name}")
handler = FileChangeHandler(
on_created=index_new_doc,
extensions={'.pdf', '.txt', '.md', '.docx'}
)
observer = Observer()
observer.schedule(handler, docs_dir, recursive=True)
observer.start()
print(f"Auto-indexing enabled for: {docs_dir}")
Implementation Details
FileChangeHandler was originally embedded in src/gaia/agents/chat/agent.py as a tightly-coupled inner class. It has been extracted to src/gaia/utils/file_watcher.py as a generic, callback-based implementation. ChatAgent now imports it from there:
from gaia.utils.file_watcher import FileChangeHandler
The callback-based design decouples the handler from any specific agent:
# Generic (callback-based) — current implementation
class FileChangeHandler(FileSystemEventHandler):
def __init__(self, on_created=None, ...):
self._on_created = on_created # ✅ Generic callback
def on_created(self, event):
if self._on_created:
self._on_created(event) # ✅ Calls callback
Debouncing Implementation
def __init__(self, ..., debounce_seconds=1.0):
self._debounce_seconds = debounce_seconds
self._last_events = {} # path → timestamp
def _is_debounced(self, file_path: str) -> bool:
"""Check if event should be debounced."""
current_time = time.time()
last_time = self._last_events.get(file_path, 0)
if current_time - last_time < self._debounce_seconds:
return True # Skip this event
self._last_events[file_path] = current_time
return False # Process this event
Extension Filtering
def _should_process(self, event: FileSystemEvent) -> bool:
"""Check if file should be processed."""
path = Path(event.src_path)
# Check extension
if self._extensions and path.suffix not in self._extensions:
return False
# Check ignore patterns
for pattern in self._ignore_patterns:
if path.match(pattern):
return False
return True
Dependencies
Required Packages
# pyproject.toml
[project]
dependencies = [
"watchdog>=3.0.0", # File system monitoring
]
Import Dependencies
from pathlib import Path
from typing import Callable, Optional, Set, Dict
from watchdog.events import FileSystemEventHandler, FileSystemEvent
from watchdog.observers import Observer
import time
import logging
Documentation Updates Required
docs/sdk/core/agent-system.mdx
Add new section after Tool Mixins:
## 10. File System Monitoring
### FileChangeHandler
**Import:** `from gaia import FileChangeHandler`
**Purpose:** Monitor directories for file changes and trigger automatic processing.
**When to use:**
- Auto-process files dropped in a folder
- Auto-index new documents
- Watch for configuration changes
- Trigger workflows on file events
[Full documentation with examples]
Update EMR Example
Replace manual file watching with FileChangeHandler in medical-intake-build-guide.md
Implementation Checklist
Step 1: Create File
Step 2: Implement Class
Step 3: Add Features
Step 4: Write Tests
Step 5: Export & Document
Step 6: Validate
FileChangeHandler Technical Specification