Overview
Robust error handling is essential for production applications. This guide covers error types, handling strategies, and best practices for building reliable integrations with Documind.HTTP Status Codes
Documind uses standard HTTP status codes to indicate success or failure:| Code | Status | Meaning | Action |
|---|---|---|---|
| 200 | OK | Request succeeded | Process response |
| 400 | Bad Request | Invalid parameters | Fix request |
| 401 | Unauthorized | Invalid/missing API key | Check authentication |
| 402 | Payment Required | Insufficient credits | Add credits |
| 403 | Forbidden | Insufficient permissions | Check API key scopes |
| 404 | Not Found | Resource doesn’t exist | Verify ID |
| 429 | Too Many Requests | Rate limit exceeded | Implement backoff |
| 500 | Internal Server Error | Server error | Retry with backoff |
| 503 | Service Unavailable | Temporary outage | Retry later |
Error Response Format
All errors return a consistent JSON structure:Copy
{
"detail": "Error message describing what went wrong"
}
Copy
{
"detail": "Invalid model name: gpt-5",
"allowed_values": ["openai-gpt-4o", "openai-gpt-4.1", "google-gemini-2.0-flash"]
}
Common Errors and Solutions
401 Unauthorized
Cause: Missing or invalid API keyCopy
{
"detail": "Invalid or missing API key"
}
Copy
import requests
# ❌ Wrong
headers = {} # Missing API key
# ✅ Correct
headers = {"X-API-Key": "your_api_key_here"}
try:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401:
print("API key is invalid or missing. Please check your credentials.")
402 Payment Required
Cause: Insufficient creditsCopy
{
"detail": "Insufficient credits. Please upgrade your plan or wait for your daily credits to refresh."
}
Copy
# Check credits before extraction
credits_response = requests.get(
f"{BASE_URL}/usage/credits",
headers=headers
)
credits = credits_response.json()
if credits["available_credits"] < required_credits:
print(f"Insufficient credits. Available: {credits['available_credits']}, Required: {required_credits}")
# Notify user or queue for later
else:
# Proceed with extraction
extract_response = requests.post(...)
400 Bad Request
Cause: Invalid request parameters Common scenarios:Invalid UUID Format
Copy
{
"detail": "Invalid document ID format."
}
Copy
import uuid
def is_valid_uuid(uuid_string):
try:
uuid.UUID(uuid_string)
return True
except ValueError:
return False
if not is_valid_uuid(document_id):
raise ValueError(f"Invalid document ID: {document_id}")
Invalid Schema
Copy
{
"detail": "Invalid schema format"
}
Copy
def validate_schema(schema):
"""Validate basic schema structure"""
if not isinstance(schema, dict):
raise ValueError("Schema must be a dictionary")
if "type" not in schema:
raise ValueError("Schema must have 'type' field")
if schema["type"] == "object" and "named_entities" not in schema:
raise ValueError("Object schemas must have 'named_entities'")
return True
Invalid Model Name
Copy
{
"detail": "Invalid model name: gpt-5. Allowed values: ['openai-gpt-4o', 'openai-gpt-4.1', 'google-gemini-2.0-flash']"
}
Copy
class ExtractionMode:
GEMINI_FLASH = "google-gemini-2.0-flash" # 2 credits/page
GPT_4_1 = "openai-gpt-4.1" # 4 credits/page
GPT_4O = "openai-gpt-4o" # 6 credits/page
VLM = "vlm" # 10 credits/page
ADVANCED = None # 15 credits/page
# Use constants
response = requests.post(
f"{BASE_URL}/extract/{document_id}",
json={
"schema": schema,
"model": ExtractionMode.GEMINI_FLASH
}
)
404 Not Found
Cause: Document or extraction doesn’t existCopy
{
"detail": "Extraction not found"
}
Copy
def get_extraction_safe(document_id):
"""Get extraction with proper error handling"""
try:
response = requests.get(
f"{BASE_URL}/data/extractions?document_id={document_id}",
headers=headers
)
response.raise_for_status()
data = response.json()
if not data["items"]:
return None
return data["items"][0]
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
logger.warning(f"Extraction not found for document {document_id}")
return None
raise
429 Too Many Requests
Cause: Rate limit exceededCopy
{
"detail": "Rate limit exceeded. Please try again later."
}
Copy
import time
from functools import wraps
def with_retry(max_retries=3, base_delay=1):
"""Decorator for retrying with exponential backoff"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
logger.warning(f"Rate limited. Retrying in {delay}s...")
time.sleep(delay)
continue
raise
return None
return wrapper
return decorator
@with_retry(max_retries=5, base_delay=2)
def extract_document(document_id, schema):
response = requests.post(
f"{BASE_URL}/extract/{document_id}",
headers=headers,
json={"schema": schema}
)
response.raise_for_status()
return response.json()
500 Internal Server Error
Cause: Unexpected server errorCopy
{
"detail": "Failed to extract information. Please contact support."
}
Copy
def extract_with_retry(document_id, schema, max_retries=3):
"""Extract with retry for server errors"""
for attempt in range(max_retries):
try:
response = requests.post(
f"{BASE_URL}/extract/{document_id}",
headers=headers,
json={"schema": schema},
timeout=120
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code >= 500:
if attempt < max_retries - 1:
delay = 5 * (attempt + 1)
logger.error(f"Server error on attempt {attempt + 1}. Retrying in {delay}s...")
time.sleep(delay)
continue
else:
logger.error(f"Failed after {max_retries} attempts")
# Log to monitoring system
notify_error(document_id, str(e))
raise
Retry Strategies
Exponential Backoff
Best for rate limits and temporary server issues:Copy
import time
import random
def exponential_backoff(
func,
max_retries=5,
base_delay=1,
max_delay=60,
jitter=True
):
"""Execute function with exponential backoff"""
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = min(base_delay * (2 ** attempt), max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay = delay * (0.5 + random.random())
logger.warning(f"Attempt {attempt + 1} failed. Retrying in {delay:.1f}s...")
time.sleep(delay)
Circuit Breaker
Prevent cascading failures:Copy
from datetime import datetime, timedelta
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == "OPEN":
if datetime.now() - self.last_failure_time > timedelta(seconds=self.timeout):
self.state = "HALF_OPEN"
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failures = 0
self.state = "CLOSED"
def on_failure(self):
self.failures += 1
self.last_failure_time = datetime.now()
if self.failures >= self.failure_threshold:
self.state = "OPEN"
# Usage
breaker = CircuitBreaker(failure_threshold=5, timeout=60)
def extract_safe(document_id):
return breaker.call(extract_document, document_id)
Timeout Handling
Set Appropriate Timeouts
Copy
# Different timeouts for different operations
TIMEOUTS = {
"upload": 300, # 5 minutes for large files
"extract": 120, # 2 minutes for extraction
"schema": 30, # 30 seconds for schema generation
"query": 10, # 10 seconds for queries
}
# Upload with timeout
response = requests.post(
f"{BASE_URL}/upload",
headers=headers,
files=files,
timeout=TIMEOUTS["upload"]
)
# Extract with timeout
response = requests.post(
f"{BASE_URL}/extract/{document_id}",
headers=headers,
json=data,
timeout=TIMEOUTS["extract"]
)
Handle Timeout Errors
Copy
from requests.exceptions import Timeout
try:
response = requests.post(url, json=data, timeout=30)
except Timeout:
logger.error("Request timed out")
# Queue for retry
retry_queue.add(document_id)
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
Validation Errors
Pre-Request Validation
Catch errors before making API calls:Copy
def validate_extraction_request(document_id, schema, model=None):
"""Validate extraction request before sending"""
errors = []
# Validate document ID
if not is_valid_uuid(document_id):
errors.append(f"Invalid document ID format: {document_id}")
# Validate schema
if not isinstance(schema, dict):
errors.append("Schema must be a dictionary")
elif "type" not in schema:
errors.append("Schema must have 'type' field")
# Validate model if provided
valid_models = ["openai-gpt-4o", "openai-gpt-4.1", "google-gemini-2.0-flash"]
if model and model not in valid_models:
errors.append(f"Invalid model: {model}. Must be one of {valid_models}")
if errors:
raise ValueError(f"Validation errors: {'; '.join(errors)}")
return True
# Use before extraction
try:
validate_extraction_request(document_id, schema, model)
result = extract_document(document_id, schema, model)
except ValueError as e:
logger.error(f"Validation failed: {e}")
return None
Logging and Monitoring
Structured Logging
Copy
import logging
import json
logger = logging.getLogger(__name__)
def log_api_call(method, endpoint, status_code, duration, error=None):
"""Log API calls with structured data"""
log_data = {
"method": method,
"endpoint": endpoint,
"status_code": status_code,
"duration_ms": duration,
"error": str(error) if error else None,
"timestamp": datetime.now().isoformat()
}
if error:
logger.error(f"API call failed: {json.dumps(log_data)}")
else:
logger.info(f"API call succeeded: {json.dumps(log_data)}")
# Usage
start_time = time.time()
try:
response = requests.post(url, json=data)
response.raise_for_status()
duration = (time.time() - start_time) * 1000
log_api_call("POST", url, response.status_code, duration)
except Exception as e:
duration = (time.time() - start_time) * 1000
log_api_call("POST", url, getattr(e.response, 'status_code', None), duration, error=e)
raise
Error Tracking
Copy
def track_error(error_type, document_id, details):
"""Track errors for monitoring"""
# Send to error tracking service (Sentry, Rollbar, etc.)
error_data = {
"type": error_type,
"document_id": document_id,
"details": details,
"timestamp": datetime.now().isoformat()
}
# Log locally
logger.error(f"Error tracked: {json.dumps(error_data)}")
# Send to monitoring service
# sentry_sdk.capture_exception(error_data)
Complete Error Handling Example
Copy
import requests
import time
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class DocumindClient:
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.headers = {"X-API-Key": api_key}
def extract(
self,
document_id: str,
schema: Dict[str, Any],
model: Optional[str] = None,
max_retries: int = 3
) -> Optional[Dict[str, Any]]:
"""Extract with comprehensive error handling"""
# Validate inputs
try:
validate_extraction_request(document_id, schema, model)
except ValueError as e:
logger.error(f"Validation error: {e}")
return None
# Retry logic
for attempt in range(max_retries):
try:
response = requests.post(
f"{self.base_url}/extract/{document_id}",
headers=self.headers,
json={"schema": schema, "model": model},
timeout=120
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
if status_code == 401:
logger.error("Authentication failed. Check API key.")
return None
elif status_code == 402:
logger.error("Insufficient credits.")
return None
elif status_code == 404:
logger.error(f"Document {document_id} not found.")
return None
elif status_code == 429:
if attempt < max_retries - 1:
delay = 2 ** attempt
logger.warning(f"Rate limited. Retrying in {delay}s...")
time.sleep(delay)
continue
return None
elif status_code >= 500:
if attempt < max_retries - 1:
delay = 5 * (attempt + 1)
logger.error(f"Server error. Retrying in {delay}s...")
time.sleep(delay)
continue
logger.error(f"Server error after {max_retries} attempts.")
return None
else:
logger.error(f"HTTP error {status_code}: {e.response.text}")
return None
except requests.exceptions.Timeout:
logger.error(f"Request timed out on attempt {attempt + 1}")
if attempt < max_retries - 1:
continue
return None
except Exception as e:
logger.error(f"Unexpected error: {e}")
return None
return None