Documentation Index Fetch the complete documentation index at: https://docs.documind.cloud/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Robust error handling is essential for production applications. This guide covers error types, handling strategies, and best practices for building reliable integrations with Documind.
HTTP Status Codes
Documind uses standard HTTP status codes to indicate success or failure:
Code Status Meaning Action 200 OK Request succeeded Process response 400 Bad Request Invalid parameters Fix request 401 Unauthorized Invalid/missing API key Check authentication 402 Payment Required Insufficient credits Add credits 403 Forbidden Insufficient permissions Check API key scopes 404 Not Found Resource doesn’t exist Verify ID 429 Too Many Requests Rate limit exceeded Implement backoff 500 Internal Server Error Server error Retry with backoff 503 Service Unavailable Temporary outage Retry later
All errors return a consistent JSON structure:
{
"detail" : "Error message describing what went wrong"
}
Some errors include additional context:
{
"detail" : "Invalid model name: gpt-5" ,
"allowed_values" : [ "openai-gpt-4o" , "openai-gpt-4.1" , "google-gemini-2.0-flash" ]
}
Common Errors and Solutions
401 Unauthorized
Cause: Missing or invalid API key
{
"detail" : "Invalid or missing API key"
}
Solution:
import requests
# ❌ Wrong
headers = {} # Missing API key
# ✅ Correct
headers = { "X-API-Key" : "your_api_key_here" }
try :
response = requests.post(url, headers = headers, json = data)
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 401 :
print ( "API key is invalid or missing. Please check your credentials." )
402 Payment Required
Cause: Insufficient credits
{
"detail" : "Insufficient credits. Please upgrade your plan or wait for your daily credits to refresh."
}
Solution:
# Check credits before extraction
credits_response = requests.get(
f " { BASE_URL } /usage/credits" ,
headers = headers
)
credits = credits_response.json()
if credits [ "available_credits" ] < required_credits:
print ( f "Insufficient credits. Available: { credits [ 'available_credits' ] } , Required: { required_credits } " )
# Notify user or queue for later
else :
# Proceed with extraction
extract_response = requests.post( ... )
400 Bad Request
Cause: Invalid request parameters
Common scenarios:
{
"detail" : "Invalid document ID format."
}
Solution: Validate UUIDs before making requests
import uuid
def is_valid_uuid ( uuid_string ):
try :
uuid.UUID(uuid_string)
return True
except ValueError :
return False
if not is_valid_uuid(document_id):
raise ValueError ( f "Invalid document ID: { document_id } " )
Invalid Schema
{
"detail" : "Invalid schema format"
}
Solution: Validate schema structure
def validate_schema ( schema ):
"""Validate basic schema structure"""
if not isinstance (schema, dict ):
raise ValueError ( "Schema must be a dictionary" )
if "type" not in schema:
raise ValueError ( "Schema must have 'type' field" )
if schema[ "type" ] == "object" and "named_entities" not in schema:
raise ValueError ( "Object schemas must have 'named_entities'" )
return True
Invalid Model Name
{
"detail" : "Invalid model name: gpt-5. Allowed values: ['openai-gpt-4o', 'openai-gpt-4.1', 'google-gemini-2.0-flash']"
}
Solution: Use constants for model names
class ExtractionMode :
GEMINI_FLASH = "google-gemini-2.0-flash" # 2 credits/page
GPT_4_1 = "openai-gpt-4.1" # 4 credits/page
GPT_4O = "openai-gpt-4o" # 6 credits/page
VLM = "vlm" # 10 credits/page
ADVANCED = None # 15 credits/page
# Use constants
response = requests.post(
f " { BASE_URL } /extract/ { document_id } " ,
json = {
"schema" : schema,
"model" : ExtractionMode. GEMINI_FLASH
}
)
404 Not Found
Cause: Document or extraction doesn’t exist
{
"detail" : "Extraction not found"
}
Solution: Verify IDs and handle missing resources
def get_extraction_safe ( document_id ):
"""Get extraction with proper error handling"""
try :
response = requests.get(
f " { BASE_URL } /data/extractions?document_id= { document_id } " ,
headers = headers
)
response.raise_for_status()
data = response.json()
if not data[ "items" ]:
return None
return data[ "items" ][ 0 ]
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404 :
logger.warning( f "Extraction not found for document { document_id } " )
return None
raise
429 Too Many Requests
Cause: Rate limit exceeded
{
"detail" : "Rate limit exceeded. Please try again later."
}
Solution: Implement exponential backoff
import time
from functools import wraps
def with_retry ( max_retries = 3 , base_delay = 1 ):
"""Decorator for retrying with exponential backoff"""
def decorator ( func ):
@wraps (func)
def wrapper ( * args , ** kwargs ):
for attempt in range (max_retries):
try :
return func( * args, ** kwargs)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429 :
if attempt < max_retries - 1 :
delay = base_delay * ( 2 ** attempt)
logger.warning( f "Rate limited. Retrying in { delay } s..." )
time.sleep(delay)
continue
raise
return None
return wrapper
return decorator
@with_retry ( max_retries = 5 , base_delay = 2 )
def extract_document ( document_id , schema ):
response = requests.post(
f " { BASE_URL } /extract/ { document_id } " ,
headers = headers,
json = { "schema" : schema}
)
response.raise_for_status()
return response.json()
500 Internal Server Error
Cause: Unexpected server error
{
"detail" : "Failed to extract information. Please contact support."
}
Solution: Implement retry logic with logging
def extract_with_retry ( document_id , schema , max_retries = 3 ):
"""Extract with retry for server errors"""
for attempt in range (max_retries):
try :
response = requests.post(
f " { BASE_URL } /extract/ { document_id } " ,
headers = headers,
json = { "schema" : schema},
timeout = 120
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code >= 500 :
if attempt < max_retries - 1 :
delay = 5 * (attempt + 1 )
logger.error( f "Server error on attempt { attempt + 1 } . Retrying in { delay } s..." )
time.sleep(delay)
continue
else :
logger.error( f "Failed after { max_retries } attempts" )
# Log to monitoring system
notify_error(document_id, str (e))
raise
Retry Strategies
Exponential Backoff
Best for rate limits and temporary server issues:
import time
import random
def exponential_backoff (
func ,
max_retries = 5 ,
base_delay = 1 ,
max_delay = 60 ,
jitter = True
):
"""Execute function with exponential backoff"""
for attempt in range (max_retries):
try :
return func()
except Exception as e:
if attempt == max_retries - 1 :
raise
delay = min (base_delay * ( 2 ** attempt), max_delay)
# Add jitter to prevent thundering herd
if jitter:
delay = delay * ( 0.5 + random.random())
logger.warning( f "Attempt { attempt + 1 } failed. Retrying in { delay :.1f} s..." )
time.sleep(delay)
Circuit Breaker
Prevent cascading failures:
from datetime import datetime, timedelta
class CircuitBreaker :
def __init__ ( self , failure_threshold = 5 , timeout = 60 ):
self .failure_threshold = failure_threshold
self .timeout = timeout
self .failures = 0
self .last_failure_time = None
self .state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
def call ( self , func , * args , ** kwargs ):
if self .state == "OPEN" :
if datetime.now() - self .last_failure_time > timedelta( seconds = self .timeout):
self .state = "HALF_OPEN"
else :
raise Exception ( "Circuit breaker is OPEN" )
try :
result = func( * args, ** kwargs)
self .on_success()
return result
except Exception as e:
self .on_failure()
raise
def on_success ( self ):
self .failures = 0
self .state = "CLOSED"
def on_failure ( self ):
self .failures += 1
self .last_failure_time = datetime.now()
if self .failures >= self .failure_threshold:
self .state = "OPEN"
# Usage
breaker = CircuitBreaker( failure_threshold = 5 , timeout = 60 )
def extract_safe ( document_id ):
return breaker.call(extract_document, document_id)
Timeout Handling
Set Appropriate Timeouts
# Different timeouts for different operations
TIMEOUTS = {
"upload" : 300 , # 5 minutes for large files
"extract" : 120 , # 2 minutes for extraction
"schema" : 30 , # 30 seconds for schema generation
"query" : 10 , # 10 seconds for queries
}
# Upload with timeout
response = requests.post(
f " { BASE_URL } /upload" ,
headers = headers,
files = files,
timeout = TIMEOUTS [ "upload" ]
)
# Extract with timeout
response = requests.post(
f " { BASE_URL } /extract/ { document_id } " ,
headers = headers,
json = data,
timeout = TIMEOUTS [ "extract" ]
)
Handle Timeout Errors
from requests.exceptions import Timeout
try :
response = requests.post(url, json = data, timeout = 30 )
except Timeout:
logger.error( "Request timed out" )
# Queue for retry
retry_queue.add(document_id)
except Exception as e:
logger.error( f "Unexpected error: { e } " )
raise
Validation Errors
Pre-Request Validation
Catch errors before making API calls:
def validate_extraction_request ( document_id , schema , model = None ):
"""Validate extraction request before sending"""
errors = []
# Validate document ID
if not is_valid_uuid(document_id):
errors.append( f "Invalid document ID format: { document_id } " )
# Validate schema
if not isinstance (schema, dict ):
errors.append( "Schema must be a dictionary" )
elif "type" not in schema:
errors.append( "Schema must have 'type' field" )
# Validate model if provided
valid_models = [ "openai-gpt-4o" , "openai-gpt-4.1" , "google-gemini-2.0-flash" ]
if model and model not in valid_models:
errors.append( f "Invalid model: { model } . Must be one of { valid_models } " )
if errors:
raise ValueError ( f "Validation errors: { '; ' .join(errors) } " )
return True
# Use before extraction
try :
validate_extraction_request(document_id, schema, model)
result = extract_document(document_id, schema, model)
except ValueError as e:
logger.error( f "Validation failed: { e } " )
return None
Logging and Monitoring
Structured Logging
import logging
import json
logger = logging.getLogger( __name__ )
def log_api_call ( method , endpoint , status_code , duration , error = None ):
"""Log API calls with structured data"""
log_data = {
"method" : method,
"endpoint" : endpoint,
"status_code" : status_code,
"duration_ms" : duration,
"error" : str (error) if error else None ,
"timestamp" : datetime.now().isoformat()
}
if error:
logger.error( f "API call failed: { json.dumps(log_data) } " )
else :
logger.info( f "API call succeeded: { json.dumps(log_data) } " )
# Usage
start_time = time.time()
try :
response = requests.post(url, json = data)
response.raise_for_status()
duration = (time.time() - start_time) * 1000
log_api_call( "POST" , url, response.status_code, duration)
except Exception as e:
duration = (time.time() - start_time) * 1000
log_api_call( "POST" , url, getattr (e.response, 'status_code' , None ), duration, error = e)
raise
Error Tracking
def track_error ( error_type , document_id , details ):
"""Track errors for monitoring"""
# Send to error tracking service (Sentry, Rollbar, etc.)
error_data = {
"type" : error_type,
"document_id" : document_id,
"details" : details,
"timestamp" : datetime.now().isoformat()
}
# Log locally
logger.error( f "Error tracked: { json.dumps(error_data) } " )
# Send to monitoring service
# sentry_sdk.capture_exception(error_data)
Complete Error Handling Example
import requests
import time
import logging
from typing import Optional, Dict, Any
logger = logging.getLogger( __name__ )
class DocumindClient :
def __init__ ( self , api_key : str , base_url : str ):
self .api_key = api_key
self .base_url = base_url
self .headers = { "X-API-Key" : api_key}
def extract (
self ,
document_id : str ,
schema : Dict[ str , Any],
model : Optional[ str ] = None ,
max_retries : int = 3
) -> Optional[Dict[ str , Any]]:
"""Extract with comprehensive error handling"""
# Validate inputs
try :
validate_extraction_request(document_id, schema, model)
except ValueError as e:
logger.error( f "Validation error: { e } " )
return None
# Retry logic
for attempt in range (max_retries):
try :
response = requests.post(
f " { self .base_url } /extract/ { document_id } " ,
headers = self .headers,
json = { "schema" : schema, "model" : model},
timeout = 120
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
status_code = e.response.status_code
if status_code == 401 :
logger.error( "Authentication failed. Check API key." )
return None
elif status_code == 402 :
logger.error( "Insufficient credits." )
return None
elif status_code == 404 :
logger.error( f "Document { document_id } not found." )
return None
elif status_code == 429 :
if attempt < max_retries - 1 :
delay = 2 ** attempt
logger.warning( f "Rate limited. Retrying in { delay } s..." )
time.sleep(delay)
continue
return None
elif status_code >= 500 :
if attempt < max_retries - 1 :
delay = 5 * (attempt + 1 )
logger.error( f "Server error. Retrying in { delay } s..." )
time.sleep(delay)
continue
logger.error( f "Server error after { max_retries } attempts." )
return None
else :
logger.error( f "HTTP error { status_code } : { e.response.text } " )
return None
except requests.exceptions.Timeout:
logger.error( f "Request timed out on attempt { attempt + 1 } " )
if attempt < max_retries - 1 :
continue
return None
except Exception as e:
logger.error( f "Unexpected error: { e } " )
return None
return None
Next Steps
Performance Optimization Optimize your integration for speed and performance
API Reference Explore all API endpoints and error responses
Batch Processing Tutorial Handle errors in batch processing workflows
Core Concepts Understand the extraction workflow