Overview
Learn how to process large volumes of documents efficiently using parallel processing, queuing, and optimization strategies.Architecture
Copy
Documents Queue → Workers (Parallel) → Results Database
│ │
│ ├→ Worker 1: Upload + Extract
│ ├→ Worker 2: Upload + Extract
│ └→ Worker 3: Upload + Extract
│
└→ Monitor: Credits, Errors, Progress
Step 1: Set Up Worker Pool
Copy
from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading
class BatchProcessor:
def __init__(self, api_key: str, num_workers: int = 5):
self.api_key = api_key
self.base_url = "https://api.documind.cloud/api/v1"
self.headers = {"X-API-Key": api_key}
self.num_workers = num_workers
# Tracking
self.total_processed = 0
self.total_failed = 0
self.lock = threading.Lock()
def process_batch(self, file_paths: list, schema: dict) -> list:
"""Process batch of documents in parallel"""
results = []
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
futures = {
executor.submit(self._process_single, path, schema): path
for path in file_paths
}
for future in as_completed(futures):
file_path = futures[future]
try:
result = future.result()
results.append(result)
with self.lock:
self.total_processed += 1
logger.info(f"✓ Processed {file_path}")
except Exception as e:
with self.lock:
self.total_failed += 1
logger.error(f"✗ Failed {file_path}: {e}")
results.append({
"file": file_path,
"success": False,
"error": str(e)
})
return results
def _process_single(self, file_path: str, schema: dict) -> dict:
"""Process a single document"""
# Upload
document_id = self._upload(file_path)
# Extract
result = self._extract(document_id, schema)
return {
"file": file_path,
"document_id": document_id,
"success": True,
"data": result["results"]
}
Step 2: Implement Queue-Based Processing
For very large batches, use a queue:Copy
import queue
import time
def worker(work_queue: queue.Queue, results_queue: queue.Queue, processor: BatchProcessor, schema: dict):
"""Worker thread that processes documents from queue"""
while True:
try:
file_path = work_queue.get(timeout=1)
try:
result = processor._process_single(file_path, schema)
results_queue.put(result)
except Exception as e:
results_queue.put({
"file": file_path,
"success": False,
"error": str(e)
})
finally:
work_queue.task_done()
except queue.Empty:
break
def process_large_batch(file_paths: list, schema: dict, num_workers: int = 10):
"""Process large batch using queue"""
work_queue = queue.Queue()
results_queue = queue.Queue()
# Add files to work queue
for path in file_paths:
work_queue.put(path)
# Start workers
threads = []
processor = BatchProcessor(api_key)
for _ in range(num_workers):
t = threading.Thread(
target=worker,
args=(work_queue, results_queue, processor, schema)
)
t.start()
threads.append(t)
# Wait for completion
work_queue.join()
# Collect results
results = []
while not results_queue.empty():
results.append(results_queue.get())
return results
Step 3: Add Progress Tracking
Copy
from tqdm import tqdm
class ProgressTracker:
def __init__(self, total: int):
self.pbar = tqdm(total=total, desc="Processing")
self.successful = 0
self.failed = 0
self.lock = threading.Lock()
def update(self, success: bool):
with self.lock:
if success:
self.successful += 1
else:
self.failed += 1
self.pbar.update(1)
self.pbar.set_postfix({
'success': self.successful,
'failed': self.failed
})
def close(self):
self.pbar.close()
# Usage
tracker = ProgressTracker(total=len(file_paths))
with ThreadPoolExecutor(max_workers=5) as executor:
futures = {
executor.submit(process_document, path): path
for path in file_paths
}
for future in as_completed(futures):
try:
result = future.result()
tracker.update(success=True)
except Exception:
tracker.update(success=False)
tracker.close()
Step 4: Implement Rate Limiting
Copy
from ratelimit import limits, sleep_and_retry
class RateLimitedProcessor(BatchProcessor):
# Limit to 10 requests per second
@sleep_and_retry
@limits(calls=10, period=1)
def _api_call(self, method: str, url: str, **kwargs):
"""Rate-limited API call"""
if method == "POST":
return requests.post(url, **kwargs)
elif method == "GET":
return requests.get(url, **kwargs)
def _upload(self, file_path: str) -> str:
"""Upload with rate limiting"""
with open(file_path, "rb") as f:
files = {"files": f}
response = self._api_call(
"POST",
f"{self.base_url}/upload",
headers=self.headers,
files=files
)
return response.json()[0]
Step 5: Handle Credits Management
Copy
def check_credits_before_batch(file_paths: list, credits_per_page: int = 2):
"""Estimate and check credits before processing"""
# Estimate total pages
total_pages = sum(get_page_count(path) for path in file_paths)
estimated_cost = total_pages * credits_per_page
# Check available credits
response = requests.get(
f"{BASE_URL}/usage/credits",
headers=headers
)
available = response.json()["available_credits"]
if available < estimated_cost:
raise ValueError(
f"Insufficient credits. Need {estimated_cost}, have {available}"
)
logger.info(f"Estimated cost: {estimated_cost} credits ({available} available)")
return True
# Usage
try:
check_credits_before_batch(file_paths)
results = process_batch(file_paths)
except ValueError as e:
logger.error(f"Cannot process batch: {e}")
Step 6: Implement Retry Logic
Copy
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
class ResilientProcessor(BatchProcessor):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type((requests.exceptions.HTTPError, requests.exceptions.Timeout)),
reraise=True
)
def _process_with_retry(self, file_path: str, schema: dict):
"""Process with automatic retry"""
return self._process_single(file_path, schema)
Complete Example: Daily Batch Job
Copy
import schedule
from pathlib import Path
from datetime import datetime
def daily_batch_job():
"""Process all pending documents daily"""
logger.info("=== Starting daily batch job ===")
# Find pending files
pending_dir = Path("documents/pending")
file_paths = list(pending_dir.glob("*.pdf"))
if not file_paths:
logger.info("No documents to process")
return
logger.info(f"Found {len(file_paths)} documents")
# Check credits
try:
check_credits_before_batch(file_paths)
except ValueError as e:
logger.error(f"Insufficient credits: {e}")
send_alert("Batch job failed: insufficient credits")
return
# Process batch
processor = ResilientProcessor(api_key, num_workers=10)
tracker = ProgressTracker(total=len(file_paths))
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(processor._process_with_retry, str(path), schema): path
for path in file_paths
}
for future in as_completed(futures):
file_path = futures[future]
try:
result = future.result()
results.append(result)
tracker.update(success=True)
# Move to processed folder
file_path.rename(Path("documents/processed") / file_path.name)
except Exception as e:
logger.error(f"Failed {file_path}: {e}")
tracker.update(success=False)
# Move to failed folder
file_path.rename(Path("documents/failed") / file_path.name)
tracker.close()
# Generate report
successful = sum(1 for r in results if r.get("success"))
report = f"""
Batch Job Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}
========================================
Total Documents: {len(file_paths)}
Successful: {successful}
Failed: {len(file_paths) - successful}
Success Rate: {successful/len(file_paths)*100:.1f}%
"""
logger.info(report)
send_report(report)
# Schedule to run daily at 2 AM
schedule.every().day.at("02:00").do(daily_batch_job)
while True:
schedule.run_pending()
time.sleep(60)
Optimization Strategies
1. Batch Upload
Upload multiple files at once:Copy
# Upload in one request
files = [("files", open(path, "rb")) for path in file_paths[:10]]
response = requests.post(f"{BASE_URL}/upload", headers=headers, files=files)
document_ids = response.json()
# Close files
for f in files:
f[1].close()
2. Parallel Extraction
Copy
# Extract from multiple documents simultaneously
with ThreadPoolExecutor(max_workers=5) as executor:
extraction_futures = {
executor.submit(extract_document, doc_id, schema): doc_id
for doc_id in document_ids
}
extraction_results = []
for future in as_completed(extraction_futures):
extraction_results.append(future.result())
3. Smart Batching
Group documents by type:Copy
def group_by_type(file_paths: list) -> dict:
"""Group files by document type"""
groups = {"invoices": [], "receipts": [], "forms": []}
for path in file_paths:
doc_type = classify_document(path) # Your classification logic
groups[doc_type].append(path)
return groups
# Process each type with appropriate schema
groups = group_by_type(file_paths)
for doc_type, paths in groups.items():
schema = get_schema_for_type(doc_type)
results = process_batch(paths, schema)
Monitoring Dashboard
Copy
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/status')
def get_status():
return jsonify({
"total_processed": processor.total_processed,
"total_failed": processor.total_failed,
"queue_size": work_queue.qsize(),
"workers_active": sum(1 for t in threads if t.is_alive())
})
# Run dashboard
app.run(port=8080)