Skip to main content

Overview

Robust error handling is essential for production applications. This guide covers error types, handling strategies, and best practices for building reliable integrations with Documind.

HTTP Status Codes

Documind uses standard HTTP status codes to indicate success or failure:
CodeStatusMeaningAction
200OKRequest succeededProcess response
400Bad RequestInvalid parametersFix request
401UnauthorizedInvalid/missing API keyCheck authentication
402Payment RequiredInsufficient creditsAdd credits
403ForbiddenInsufficient permissionsCheck API key scopes
404Not FoundResource doesn’t existVerify ID
429Too Many RequestsRate limit exceededImplement backoff
500Internal Server ErrorServer errorRetry with backoff
503Service UnavailableTemporary outageRetry later

Error Response Format

All errors return a consistent JSON structure:
{
  "detail": "Error message describing what went wrong"
}
Some errors include additional context:
{
  "detail": "Invalid model name: gpt-5",
  "allowed_values": ["openai-gpt-4o", "openai-gpt-4.1", "google-gemini-2.0-flash"]
}

Common Errors and Solutions

401 Unauthorized

Cause: Missing or invalid API key
{
  "detail": "Invalid or missing API key"
}
Solution:
import requests

# ❌ Wrong
headers = {}  # Missing API key

# ✅ Correct
headers = {"X-API-Key": "your_api_key_here"}

try:
    response = requests.post(url, headers=headers, json=data)
    response.raise_for_status()
except requests.exceptions.HTTPError as e:
    if e.response.status_code == 401:
        print("API key is invalid or missing. Please check your credentials.")

402 Payment Required

Cause: Insufficient credits
{
  "detail": "Insufficient credits. Please upgrade your plan or wait for your daily credits to refresh."
}
Solution:
# Check credits before extraction
credits_response = requests.get(
    f"{BASE_URL}/usage/credits",
    headers=headers
)
credits = credits_response.json()

if credits["available_credits"] < required_credits:
    print(f"Insufficient credits. Available: {credits['available_credits']}, Required: {required_credits}")
    # Notify user or queue for later
else:
    # Proceed with extraction
    extract_response = requests.post(...)

400 Bad Request

Cause: Invalid request parameters Common scenarios:

Invalid UUID Format

{
  "detail": "Invalid document ID format."
}
Solution: Validate UUIDs before making requests
import uuid

def is_valid_uuid(uuid_string):
    try:
        uuid.UUID(uuid_string)
        return True
    except ValueError:
        return False

if not is_valid_uuid(document_id):
    raise ValueError(f"Invalid document ID: {document_id}")

Invalid Schema

{
  "detail": "Invalid schema format"
}
Solution: Validate schema structure
def validate_schema(schema):
    """Validate basic schema structure"""
    if not isinstance(schema, dict):
        raise ValueError("Schema must be a dictionary")
    
    if "type" not in schema:
        raise ValueError("Schema must have 'type' field")
    
    if schema["type"] == "object" and "named_entities" not in schema:
        raise ValueError("Object schemas must have 'named_entities'")
    
    return True

Invalid Model Name

{
  "detail": "Invalid model name: gpt-5. Allowed values: ['openai-gpt-4o', 'openai-gpt-4.1', 'google-gemini-2.0-flash']"
}
Solution: Use constants for model names
class ExtractionMode:
    GEMINI_FLASH = "google-gemini-2.0-flash"  # 2 credits/page
    GPT_4_1 = "openai-gpt-4.1"                # 4 credits/page
    GPT_4O = "openai-gpt-4o"                  # 6 credits/page
    VLM = "vlm"                               # 10 credits/page
    ADVANCED = None                            # 15 credits/page

# Use constants
response = requests.post(
    f"{BASE_URL}/extract/{document_id}",
    json={
        "schema": schema,
        "model": ExtractionMode.GEMINI_FLASH
    }
)

404 Not Found

Cause: Document or extraction doesn’t exist
{
  "detail": "Extraction not found"
}
Solution: Verify IDs and handle missing resources
def get_extraction_safe(document_id):
    """Get extraction with proper error handling"""
    try:
        response = requests.get(
            f"{BASE_URL}/data/extractions?document_id={document_id}",
            headers=headers
        )
        response.raise_for_status()
        
        data = response.json()
        if not data["items"]:
            return None
        
        return data["items"][0]
        
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            logger.warning(f"Extraction not found for document {document_id}")
            return None
        raise

429 Too Many Requests

Cause: Rate limit exceeded
{
  "detail": "Rate limit exceeded. Please try again later."
}
Solution: Implement exponential backoff
import time
from functools import wraps

def with_retry(max_retries=3, base_delay=1):
    """Decorator for retrying with exponential backoff"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except requests.exceptions.HTTPError as e:
                    if e.response.status_code == 429:
                        if attempt < max_retries - 1:
                            delay = base_delay * (2 ** attempt)
                            logger.warning(f"Rate limited. Retrying in {delay}s...")
                            time.sleep(delay)
                            continue
                    raise
            return None
        return wrapper
    return decorator

@with_retry(max_retries=5, base_delay=2)
def extract_document(document_id, schema):
    response = requests.post(
        f"{BASE_URL}/extract/{document_id}",
        headers=headers,
        json={"schema": schema}
    )
    response.raise_for_status()
    return response.json()

500 Internal Server Error

Cause: Unexpected server error
{
  "detail": "Failed to extract information. Please contact support."
}
Solution: Implement retry logic with logging
def extract_with_retry(document_id, schema, max_retries=3):
    """Extract with retry for server errors"""
    for attempt in range(max_retries):
        try:
            response = requests.post(
                f"{BASE_URL}/extract/{document_id}",
                headers=headers,
                json={"schema": schema},
                timeout=120
            )
            response.raise_for_status()
            return response.json()
            
        except requests.exceptions.HTTPError as e:
            if e.response.status_code >= 500:
                if attempt < max_retries - 1:
                    delay = 5 * (attempt + 1)
                    logger.error(f"Server error on attempt {attempt + 1}. Retrying in {delay}s...")
                    time.sleep(delay)
                    continue
                else:
                    logger.error(f"Failed after {max_retries} attempts")
                    # Log to monitoring system
                    notify_error(document_id, str(e))
            raise

Retry Strategies

Exponential Backoff

Best for rate limits and temporary server issues:
import time
import random

def exponential_backoff(
    func,
    max_retries=5,
    base_delay=1,
    max_delay=60,
    jitter=True
):
    """Execute function with exponential backoff"""
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            delay = min(base_delay * (2 ** attempt), max_delay)
            
            # Add jitter to prevent thundering herd
            if jitter:
                delay = delay * (0.5 + random.random())
            
            logger.warning(f"Attempt {attempt + 1} failed. Retrying in {delay:.1f}s...")
            time.sleep(delay)

Circuit Breaker

Prevent cascading failures:
from datetime import datetime, timedelta

class CircuitBreaker:
    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = 0
        self.last_failure_time = None
        self.state = "CLOSED"  # CLOSED, OPEN, HALF_OPEN
    
    def call(self, func, *args, **kwargs):
        if self.state == "OPEN":
            if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
                self.state = "HALF_OPEN"
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise
    
    def on_success(self):
        self.failures = 0
        self.state = "CLOSED"
    
    def on_failure(self):
        self.failures += 1
        self.last_failure_time = datetime.now()
        
        if self.failures >= self.failure_threshold:
            self.state = "OPEN"

# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout=60)

def extract_safe(document_id):
    return breaker.call(extract_document, document_id)

Timeout Handling

Set Appropriate Timeouts

# Different timeouts for different operations
TIMEOUTS = {
    "upload": 300,      # 5 minutes for large files
    "extract": 120,     # 2 minutes for extraction
    "schema": 30,       # 30 seconds for schema generation
    "query": 10,        # 10 seconds for queries
}

# Upload with timeout
response = requests.post(
    f"{BASE_URL}/upload",
    headers=headers,
    files=files,
    timeout=TIMEOUTS["upload"]
)

# Extract with timeout
response = requests.post(
    f"{BASE_URL}/extract/{document_id}",
    headers=headers,
    json=data,
    timeout=TIMEOUTS["extract"]
)

Handle Timeout Errors

from requests.exceptions import Timeout

try:
    response = requests.post(url, json=data, timeout=30)
except Timeout:
    logger.error("Request timed out")
    # Queue for retry
    retry_queue.add(document_id)
except Exception as e:
    logger.error(f"Unexpected error: {e}")
    raise

Validation Errors

Pre-Request Validation

Catch errors before making API calls:
def validate_extraction_request(document_id, schema, model=None):
    """Validate extraction request before sending"""
    errors = []
    
    # Validate document ID
    if not is_valid_uuid(document_id):
        errors.append(f"Invalid document ID format: {document_id}")
    
    # Validate schema
    if not isinstance(schema, dict):
        errors.append("Schema must be a dictionary")
    elif "type" not in schema:
        errors.append("Schema must have 'type' field")
    
    # Validate model if provided
    valid_models = ["openai-gpt-4o", "openai-gpt-4.1", "google-gemini-2.0-flash"]
    if model and model not in valid_models:
        errors.append(f"Invalid model: {model}. Must be one of {valid_models}")
    
    if errors:
        raise ValueError(f"Validation errors: {'; '.join(errors)}")
    
    return True

# Use before extraction
try:
    validate_extraction_request(document_id, schema, model)
    result = extract_document(document_id, schema, model)
except ValueError as e:
    logger.error(f"Validation failed: {e}")
    return None

Logging and Monitoring

Structured Logging

import logging
import json

logger = logging.getLogger(__name__)

def log_api_call(method, endpoint, status_code, duration, error=None):
    """Log API calls with structured data"""
    log_data = {
        "method": method,
        "endpoint": endpoint,
        "status_code": status_code,
        "duration_ms": duration,
        "error": str(error) if error else None,
        "timestamp": datetime.now().isoformat()
    }
    
    if error:
        logger.error(f"API call failed: {json.dumps(log_data)}")
    else:
        logger.info(f"API call succeeded: {json.dumps(log_data)}")

# Usage
start_time = time.time()
try:
    response = requests.post(url, json=data)
    response.raise_for_status()
    duration = (time.time() - start_time) * 1000
    log_api_call("POST", url, response.status_code, duration)
except Exception as e:
    duration = (time.time() - start_time) * 1000
    log_api_call("POST", url, getattr(e.response, 'status_code', None), duration, error=e)
    raise

Error Tracking

def track_error(error_type, document_id, details):
    """Track errors for monitoring"""
    # Send to error tracking service (Sentry, Rollbar, etc.)
    error_data = {
        "type": error_type,
        "document_id": document_id,
        "details": details,
        "timestamp": datetime.now().isoformat()
    }
    
    # Log locally
    logger.error(f"Error tracked: {json.dumps(error_data)}")
    
    # Send to monitoring service
    # sentry_sdk.capture_exception(error_data)

Complete Error Handling Example

import requests
import time
import logging
from typing import Optional, Dict, Any

logger = logging.getLogger(__name__)

class DocumindClient:
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        self.headers = {"X-API-Key": api_key}
    
    def extract(
        self,
        document_id: str,
        schema: Dict[str, Any],
        model: Optional[str] = None,
        max_retries: int = 3
    ) -> Optional[Dict[str, Any]]:
        """Extract with comprehensive error handling"""
        
        # Validate inputs
        try:
            validate_extraction_request(document_id, schema, model)
        except ValueError as e:
            logger.error(f"Validation error: {e}")
            return None
        
        # Retry logic
        for attempt in range(max_retries):
            try:
                response = requests.post(
                    f"{self.base_url}/extract/{document_id}",
                    headers=self.headers,
                    json={"schema": schema, "model": model},
                    timeout=120
                )
                
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.HTTPError as e:
                status_code = e.response.status_code
                
                if status_code == 401:
                    logger.error("Authentication failed. Check API key.")
                    return None
                
                elif status_code == 402:
                    logger.error("Insufficient credits.")
                    return None
                
                elif status_code == 404:
                    logger.error(f"Document {document_id} not found.")
                    return None
                
                elif status_code == 429:
                    if attempt < max_retries - 1:
                        delay = 2 ** attempt
                        logger.warning(f"Rate limited. Retrying in {delay}s...")
                        time.sleep(delay)
                        continue
                    return None
                
                elif status_code >= 500:
                    if attempt < max_retries - 1:
                        delay = 5 * (attempt + 1)
                        logger.error(f"Server error. Retrying in {delay}s...")
                        time.sleep(delay)
                        continue
                    logger.error(f"Server error after {max_retries} attempts.")
                    return None
                
                else:
                    logger.error(f"HTTP error {status_code}: {e.response.text}")
                    return None
            
            except requests.exceptions.Timeout:
                logger.error(f"Request timed out on attempt {attempt + 1}")
                if attempt < max_retries - 1:
                    continue
                return None
            
            except Exception as e:
                logger.error(f"Unexpected error: {e}")
                return None
        
        return None

Next Steps