Documentation Index
Fetch the complete documentation index at: https://docs.documind.cloud/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Learn how to process large volumes of documents efficiently using parallel processing, queuing, and optimization strategies.Architecture
Documents Queue → Workers (Parallel) → Results Database
│ │
│ ├→ Worker 1: Upload + Extract
│ ├→ Worker 2: Upload + Extract
│ └→ Worker 3: Upload + Extract
│
└→ Monitor: Credits, Errors, Progress
Step 1: Set Up Worker Pool
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
class BatchProcessor:
def __init__(self, api_key: str, num_workers: int = 5):
self.api_key = api_key
self.base_url = "https://api.documind.cloud/api/v1"
self.headers = {"X-API-Key": api_key}
self.num_workers = num_workers
# Tracking
self.total_processed = 0
self.total_failed = 0
self.lock = threading.Lock()
def process_batch(self, file_paths: list, schema: dict) -> list:
"""Process batch of documents in parallel"""
results = []
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = {
executor.submit(self._process_single, path, schema): path
for path in file_paths
}
for future in as_completed(futures):
file_path = futures[future]
try:
result = future.result()
results.append(result)
with self.lock:
self.total_processed += 1
logger.info(f"✓ Processed {file_path}")
except Exception as e:
with self.lock:
self.total_failed += 1
logger.error(f"✗ Failed {file_path}: {e}")
results.append({
"file": file_path,
"success": False,
"error": str(e)
})
return results
def _process_single(self, file_path: str, schema: dict) -> dict:
"""Process a single document"""
# Upload
document_id = self._upload(file_path)
# Extract
result = self._extract(document_id, schema)
return {
"file": file_path,
"document_id": document_id,
"success": True,
"data": result["results"]
}
Step 2: Implement Queue-Based Processing
For very large batches, use a queue:import queue
import time
def worker(work_queue: queue.Queue, results_queue: queue.Queue, processor: BatchProcessor, schema: dict):
"""Worker thread that processes documents from queue"""
while True:
try:
file_path = work_queue.get(timeout=1)
try:
result = processor._process_single(file_path, schema)
results_queue.put(result)
except Exception as e:
results_queue.put({
"file": file_path,
"success": False,
"error": str(e)
})
finally:
work_queue.task_done()
except queue.Empty:
break
def process_large_batch(file_paths: list, schema: dict, num_workers: int = 10):
"""Process large batch using queue"""
work_queue = queue.Queue()
results_queue = queue.Queue()
# Add files to work queue
for path in file_paths:
work_queue.put(path)
# Start workers
threads = []
processor = BatchProcessor(api_key)
for _ in range(num_workers):
t = threading.Thread(
target=worker,
args=(work_queue, results_queue, processor, schema)
)
t.start()
threads.append(t)
# Wait for completion
work_queue.join()
# Collect results
results = []
while not results_queue.empty():
results.append(results_queue.get())
return results
Step 3: Add Progress Tracking
from tqdm import tqdm
class ProgressTracker:
def __init__(self, total: int):
self.pbar = tqdm(total=total, desc="Processing")
self.successful = 0
self.failed = 0
self.lock = threading.Lock()
def update(self, success: bool):
with self.lock:
if success:
self.successful += 1
else:
self.failed += 1
self.pbar.update(1)
self.pbar.set_postfix({
'success': self.successful,
'failed': self.failed
})
def close(self):
self.pbar.close()
# Usage
tracker = ProgressTracker(total=len(file_paths))
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(process_document, path): path
for path in file_paths
}
for future in as_completed(futures):
try:
result = future.result()
tracker.update(success=True)
except Exception:
tracker.update(success=False)
tracker.close()
Step 4: Implement Rate Limiting
from ratelimit import limits, sleep_and_retry
class RateLimitedProcessor(BatchProcessor):
# Limit to 10 requests per second
@sleep_and_retry
@limits(calls=10, period=1)
def _api_call(self, method: str, url: str, **kwargs):
"""Rate-limited API call"""
if method == "POST":
return requests.post(url, **kwargs)
elif method == "GET":
return requests.get(url, **kwargs)
def _upload(self, file_path: str) -> str:
"""Upload with rate limiting"""
with open(file_path, "rb") as f:
files = {"files": f}
response = self._api_call(
"POST",
f"{self.base_url}/upload",
headers=self.headers,
files=files
)
return response.json()[0]
Step 5: Handle Credits Management
def check_credits_before_batch(file_paths: list, credits_per_page: int = 2):
"""Estimate and check credits before processing"""
# Estimate total pages
total_pages = sum(get_page_count(path) for path in file_paths)
estimated_cost = total_pages * credits_per_page
# Check available credits
response = requests.get(
f"{BASE_URL}/usage/credits",
headers=headers
)
available = response.json()["available_credits"]
if available < estimated_cost:
raise ValueError(
f"Insufficient credits. Need {estimated_cost}, have {available}"
)
logger.info(f"Estimated cost: {estimated_cost} credits ({available} available)")
return True
# Usage
try:
check_credits_before_batch(file_paths)
results = process_batch(file_paths)
except ValueError as e:
logger.error(f"Cannot process batch: {e}")
Step 6: Implement Retry Logic
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class ResilientProcessor(BatchProcessor):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.exceptions.HTTPError, requests.exceptions.Timeout)),
reraise=True
)
def _process_with_retry(self, file_path: str, schema: dict):
"""Process with automatic retry"""
return self._process_single(file_path, schema)
Complete Example: Daily Batch Job
import schedule
from pathlib import Path
from datetime import datetime
def daily_batch_job():
"""Process all pending documents daily"""
logger.info("=== Starting daily batch job ===")
# Find pending files
pending_dir = Path("documents/pending")
file_paths = list(pending_dir.glob("*.pdf"))
if not file_paths:
logger.info("No documents to process")
return
logger.info(f"Found {len(file_paths)} documents")
# Check credits
try:
check_credits_before_batch(file_paths)
except ValueError as e:
logger.error(f"Insufficient credits: {e}")
send_alert("Batch job failed: insufficient credits")
return
# Process batch
processor = ResilientProcessor(api_key, num_workers=10)
tracker = ProgressTracker(total=len(file_paths))
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(processor._process_with_retry, str(path), schema): path
for path in file_paths
}
for future in as_completed(futures):
file_path = futures[future]
try:
result = future.result()
results.append(result)
tracker.update(success=True)
# Move to processed folder
file_path.rename(Path("documents/processed") / file_path.name)
except Exception as e:
logger.error(f"Failed {file_path}: {e}")
tracker.update(success=False)
# Move to failed folder
file_path.rename(Path("documents/failed") / file_path.name)
tracker.close()
# Generate report
successful = sum(1 for r in results if r.get("success"))
report = f"""
Batch Job Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}
========================================
Total Documents: {len(file_paths)}
Successful: {successful}
Failed: {len(file_paths) - successful}
Success Rate: {successful/len(file_paths)*100:.1f}%
"""
logger.info(report)
send_report(report)
# Schedule to run daily at 2 AM
schedule.every().day.at("02:00").do(daily_batch_job)
while True:
schedule.run_pending()
time.sleep(60)
Optimization Strategies
1. Batch Upload
Upload multiple files at once:# Upload in one request
files = [("files", open(path, "rb")) for path in file_paths[:10]]
response = requests.post(f"{BASE_URL}/upload", headers=headers, files=files)
document_ids = response.json()
# Close files
for f in files:
f[1].close()
2. Parallel Extraction
# Extract from multiple documents simultaneously
with ThreadPoolExecutor(max_workers=5) as executor:
extraction_futures = {
executor.submit(extract_document, doc_id, schema): doc_id
for doc_id in document_ids
}
extraction_results = []
for future in as_completed(extraction_futures):
extraction_results.append(future.result())
3. Smart Batching
Group documents by type:def group_by_type(file_paths: list) -> dict:
"""Group files by document type"""
groups = {"invoices": [], "receipts": [], "forms": []}
for path in file_paths:
doc_type = classify_document(path) # Your classification logic
groups[doc_type].append(path)
return groups
# Process each type with appropriate schema
groups = group_by_type(file_paths)
for doc_type, paths in groups.items():
schema = get_schema_for_type(doc_type)
results = process_batch(paths, schema)
Monitoring Dashboard
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/status')
def get_status():
return jsonify({
"total_processed": processor.total_processed,
"total_failed": processor.total_failed,
"queue_size": work_queue.qsize(),
"workers_active": sum(1 for t in threads if t.is_alive())
})
# Run dashboard
app.run(port=8080)
Next Steps
Performance Optimization
Optimize speed and performance
Error Handling
Handle failures gracefully
Invoice Processing
Batch process invoices
API Reference
Explore batch endpoints