Complete Automation Workflow
This guide shows production-ready patterns for integrating Documind into your automation pipelines. All examples handle the complete workflow including review polling.Pattern 1: Batch Processing
Process multiple documents with parallel extraction and review handling.Copy
import concurrent.futures
import requests
import time
from typing import List, Dict, Tuple
class BatchDocumentProcessor:
"""Process multiple documents with review handling."""
def __init__(self, api_key: str, max_workers: int = 5):
self.api_key = api_key
self.base_url = "https://api.documind.com/api/v1"
self.headers = {"X-API-Key": api_key}
self.max_workers = max_workers
def process_batch(
self,
file_paths: List[str],
schema: Dict,
mode: str = "advanced"
) -> List[Tuple[str, Dict]]:
"""
Process batch of documents in parallel.
Returns:
List of (filename, extracted_data) tuples
"""
# Phase 1: Upload all documents
print(f"📤 Uploading {len(file_paths)} documents...")
doc_mappings = self._upload_batch(file_paths)
print(f"✓ All documents uploaded")
# Phase 2: Extract in parallel
print(f"🔍 Extracting data (using {self.max_workers} workers)...")
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
extract_futures = {
executor.submit(self._extract_one, doc_id, schema, mode): filename
for filename, doc_id in doc_mappings.items()
}
extractions = {}
for future in concurrent.futures.as_completed(extract_futures):
filename = extract_futures[future]
try:
result = future.result()
extractions[filename] = result
print(f" ✓ {filename}")
except Exception as e:
print(f" ✗ {filename}: {e}")
extractions[filename] = None
# Phase 3: Separate into immediate vs review needed
immediate_results = []
needs_review = []
for filename, result in extractions.items():
if result is None:
continue
if result["needs_review"]:
needs_review.append((filename, result["document_id"]))
else:
immediate_results.append((filename, result["results"]))
print(f"\n📊 Results:")
print(f" ✓ Immediate: {len(immediate_results)}")
print(f" ⚠️ Need review: {len(needs_review)}")
# Phase 4: Poll for reviews in parallel
if needs_review:
print(f"\n⏳ Waiting for {len(needs_review)} reviews...")
reviewed_results = self._poll_batch_reviews(
[doc_id for _, doc_id in needs_review],
timeout=600
)
for (filename, doc_id) in needs_review:
if reviewed_results.get(doc_id):
immediate_results.append((filename, reviewed_results[doc_id]))
print(f" ✓ {filename} reviewed")
else:
print(f" ⚠️ {filename} review timeout")
return immediate_results
def _upload_batch(self, file_paths: List[str]) -> Dict[str, str]:
"""Upload multiple files, return filename -> doc_id mapping."""
mappings = {}
# Upload in batches of 100 (API limit)
for i in range(0, len(file_paths), 100):
batch = file_paths[i:i+100]
files = [("files", open(f, "rb")) for f in batch]
try:
response = requests.post(
f"{self.base_url}/upload",
headers=self.headers,
files=files
)
response.raise_for_status()
doc_ids = response.json()
for file_path, doc_id in zip(batch, doc_ids):
mappings[file_path] = doc_id
finally:
for _, fh in files:
fh.close()
return mappings
def _extract_one(self, doc_id: str, schema: Dict, mode: str) -> Dict:
"""Extract data from one document."""
config = {
"schema": schema,
"prompt": "Extract all data accurately"
}
if mode == "basic":
config["model"] = "openai-gpt-4.1"
elif mode == "vlm":
config["extraction_mode"] = "vlm"
config["review_threshold"] = 85
else: # advanced mode - no model or extraction_mode specified
config["review_threshold"] = 85
response = requests.post(
f"{self.base_url}/extract/{doc_id}",
headers={**self.headers, "Content-Type": "application/json"},
json=config
)
response.raise_for_status()
return response.json()
def _poll_batch_reviews(
self,
doc_ids: List[str],
timeout: int = 600
) -> Dict[str, Dict]:
"""Poll multiple documents for review completion."""
start = time.time()
remaining = set(doc_ids)
results = {}
while remaining and (time.time() - start) < timeout:
# Query all remaining documents
response = requests.get(
f"{self.base_url}/data/extractions",
headers=self.headers,
params={
"needs_review": True,
"limit": len(remaining)
}
)
for extraction in response.json()["items"]:
doc_id = extraction["document_id"]
if doc_id in remaining and extraction["is_reviewed"]:
results[doc_id] = extraction["reviewed_results"]
remaining.remove(doc_id)
if remaining:
time.sleep(10)
return results
# Usage
processor = BatchDocumentProcessor(api_key="your_api_key", max_workers=5)
invoice_schema = {
"named_entities": {
"invoice_number": {"type": "string"},
"total_amount": {"type": "number"},
"vendor_name": {"type": "string"}
},
"required": ["invoice_number", "total_amount"]
}
# Process batch
file_paths = [f"invoices/invoice_{i:03d}.pdf" for i in range(1, 21)]
results = processor.process_batch(
file_paths=file_paths,
schema=invoice_schema,
mode="advanced"
)
# Process results
for filename, data in results:
print(f"Processing {filename}: ${data['total_amount']}")
update_accounting_system(data)
Pattern 2: Scheduled Job
Daily processing with error handling and retry logic.Copy
import schedule
import logging
from datetime import datetime, timedelta
from pathlib import Path
class DailyInvoiceProcessor:
"""Scheduled invoice processing with retry."""
def __init__(self, api_key: str, input_dir: str, output_dir: str):
self.api_key = api_key
self.input_dir = Path(input_dir)
self.output_dir = Path(output_dir)
self.error_dir = Path(output_dir) / "errors"
self.error_dir.mkdir(exist_ok=True)
# Setup logging
logging.basicConfig(
filename="invoice_processor.log",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
self.logger = logging.getLogger(__name__)
def run_daily_job(self):
"""Main job: process all PDFs in input directory."""
self.logger.info("="*50)
self.logger.info("Starting daily invoice processing")
# Find all PDFs
pdf_files = list(self.input_dir.glob("*.pdf"))
self.logger.info(f"Found {len(pdf_files)} PDF files")
if not pdf_files:
self.logger.info("No files to process")
return
# Process with retry logic
processed = []
failed = []
for pdf_file in pdf_files:
try:
self.logger.info(f"Processing {pdf_file.name}")
result = self._process_with_retry(pdf_file, max_retries=3)
if result:
processed.append(pdf_file)
self._save_result(pdf_file, result)
self._archive_file(pdf_file)
self.logger.info(f" ✓ Success: {pdf_file.name}")
else:
failed.append(pdf_file)
self._move_to_errors(pdf_file)
self.logger.error(f" ✗ Failed: {pdf_file.name}")
except Exception as e:
failed.append(pdf_file)
self._move_to_errors(pdf_file)
self.logger.error(f" ✗ Error processing {pdf_file.name}: {e}")
# Summary
self.logger.info(f"Job complete: {len(processed)} processed, {len(failed)} failed")
self._send_summary_email(processed, failed)
def _process_with_retry(self, file_path: Path, max_retries: int = 3) -> dict:
"""Process file with exponential backoff retry."""
for attempt in range(max_retries):
try:
# Upload
doc_id = self._upload(file_path)
# Extract
result = self._extract(doc_id)
# Handle review
if result["needs_review"]:
self.logger.info(f" ⚠️ Needs review, waiting...")
reviewed = self._poll_review(doc_id, timeout=600)
if reviewed:
return reviewed
else:
self.logger.warning(f" Review timeout on attempt {attempt + 1}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
continue
return None
else:
return result["results"]
except requests.exceptions.RequestException as e:
self.logger.warning(f" Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
else:
raise
return None
def _upload(self, file_path: Path) -> str:
"""Upload file and return document ID."""
with open(file_path, "rb") as f:
response = requests.post(
"https://api.documind.com/api/v1/upload",
headers={"X-API-Key": self.api_key},
files={"files": f}
)
response.raise_for_status()
return response.json()[0]
def _extract(self, doc_id: str) -> dict:
"""Extract data from document."""
response = requests.post(
f"https://api.documind.com/api/v1/extract/{doc_id}",
headers={
"X-API-Key": self.api_key,
"Content-Type": "application/json"
},
json={
"schema": self._get_invoice_schema(),
# Advanced mode - no model or extraction_mode
"review_threshold": 85
}
)
response.raise_for_status()
return response.json()
def _poll_review(self, doc_id: str, timeout: int) -> dict:
"""Poll for review completion."""
start = time.time()
while (time.time() - start) < timeout:
response = requests.get(
"https://api.documind.com/api/v1/data/extractions",
headers={"X-API-Key": self.api_key},
params={"document_id": doc_id, "limit": 1}
)
data = response.json()
if data["items"] and data["items"][0]["is_reviewed"]:
return data["items"][0]["reviewed_results"]
time.sleep(10)
return None
def _get_invoice_schema(self) -> dict:
"""Get invoice extraction schema."""
return {
"named_entities": {
"invoice_number": {"type": "string"},
"invoice_date": {"type": "string"},
"vendor_name": {"type": "string"},
"total_amount": {"type": "number"}
},
"required": ["invoice_number", "total_amount"]
}
def _save_result(self, file_path: Path, result: dict):
"""Save extraction result as JSON."""
output_file = self.output_dir / f"{file_path.stem}.json"
with open(output_file, "w") as f:
json.dump(result, f, indent=2)
def _archive_file(self, file_path: Path):
"""Move processed file to archive."""
archive_dir = self.output_dir / "archive" / datetime.now().strftime("%Y-%m")
archive_dir.mkdir(parents=True, exist_ok=True)
file_path.rename(archive_dir / file_path.name)
def _move_to_errors(self, file_path: Path):
"""Move failed file to error directory."""
file_path.rename(self.error_dir / file_path.name)
def _send_summary_email(self, processed: List, failed: List):
"""Send daily summary email."""
# Implement your email logic here
pass
# Setup scheduled job
processor = DailyInvoiceProcessor(
api_key="your_api_key",
input_dir="/path/to/incoming/invoices",
output_dir="/path/to/processed"
)
# Run every day at 9 AM
schedule.every().day.at("09:00").do(processor.run_daily_job)
while True:
schedule.run_pending()
time.sleep(60)
Pattern 3: Event-Driven Processing
Process documents as they arrive using file system watcher.Copy
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
from queue import Queue
import threading
class DocumentHandler(FileSystemEventHandler):
"""Watch for new documents and process them."""
def __init__(self, api_key: str, schema: dict, process_queue: Queue):
self.api_key = api_key
self.schema = schema
self.queue = process_queue
def on_created(self, event):
"""Handle new file creation."""
if event.is_directory:
return
file_path = Path(event.src_path)
# Only process PDFs
if file_path.suffix.lower() != ".pdf":
return
print(f"📄 New document detected: {file_path.name}")
# Add to processing queue
self.queue.put(file_path)
class DocumentProcessor(threading.Thread):
"""Background processor for document queue."""
def __init__(self, api_key: str, schema: dict, process_queue: Queue):
super().__init__(daemon=True)
self.api_key = api_key
self.schema = schema
self.queue = process_queue
self.running = True
def run(self):
"""Process documents from queue."""
while self.running:
try:
# Get file from queue (with timeout)
file_path = self.queue.get(timeout=1)
print(f"🔍 Processing {file_path.name}...")
result = self._process_document(file_path)
if result:
print(f"✓ {file_path.name} processed")
self._save_and_archive(file_path, result)
else:
print(f"✗ {file_path.name} failed")
self.queue.task_done()
except Queue.Empty:
continue
except Exception as e:
print(f"Error: {e}")
def _process_document(self, file_path: Path) -> dict:
"""Upload, extract, and handle review."""
# Upload
with open(file_path, "rb") as f:
response = requests.post(
"https://api.documind.com/api/v1/upload",
headers={"X-API-Key": self.api_key},
files={"files": f}
)
doc_id = response.json()[0]
# Extract
response = requests.post(
f"https://api.documind.com/api/v1/extract/{doc_id}",
headers={
"X-API-Key": self.api_key,
"Content-Type": "application/json"
},
json={
"schema": self.schema,
# Advanced mode - no model or extraction_mode
"review_threshold": 85
}
)
result = response.json()
# Handle review if needed
if result["needs_review"]:
print(f" ⚠️ {file_path.name} needs review")
reviewed = self._wait_for_review(doc_id)
return reviewed if reviewed else None
return result["results"]
def _wait_for_review(self, doc_id: str, timeout: int = 600) -> dict:
"""Wait for review completion."""
start = time.time()
while (time.time() - start) < timeout:
response = requests.get(
f"https://api.documind.com/api/v1/data/extractions",
headers={"X-API-Key": self.api_key},
params={"document_id": doc_id, "limit": 1}
)
data = response.json()
if data["items"] and data["items"][0]["is_reviewed"]:
return data["items"][0]["reviewed_results"]
time.sleep(10)
return None
def _save_and_archive(self, file_path: Path, result: dict):
"""Save result and move file to archive."""
# Save JSON result
output_file = file_path.parent / "processed" / f"{file_path.stem}.json"
output_file.parent.mkdir(exist_ok=True)
with open(output_file, "w") as f:
json.dump(result, f, indent=2)
# Archive original
archive_dir = file_path.parent / "archive"
archive_dir.mkdir(exist_ok=True)
file_path.rename(archive_dir / file_path.name)
# Setup
watch_dir = "/path/to/watch/directory"
schema = {...} # Your schema
api_key = "your_api_key"
# Create queue and start processor
process_queue = Queue()
processor = DocumentProcessor(api_key, schema, process_queue)
processor.start()
# Setup file watcher
event_handler = DocumentHandler(api_key, schema, process_queue)
observer = Observer()
observer.schedule(event_handler, watch_dir, recursive=False)
observer.start()
print(f"👀 Watching {watch_dir} for new documents...")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
processor.running = False
observer.join()
processor.join()
Best Practices Summary
Error Handling
Error Handling
- Implement retry logic with exponential backoff
- Log all errors with context
- Have fallback for timeout scenarios
- Alert on high failure rates
Performance
Performance
- Use parallel processing for batch jobs
- Poll efficiently (10-15s intervals)
- Cache schema definitions
- Implement connection pooling
Monitoring
Monitoring
- Track success/failure rates
- Monitor review completion times
- Alert on queue backlogs
- Log credit usage
Security
Security
- Store API keys in environment variables
- Use least-privilege API key scopes
- Rotate keys periodically
- Encrypt sensitive extracted data