Skip to main content

Overview

Learn how to process large volumes of documents efficiently using parallel processing, queuing, and optimization strategies.

Architecture

Documents Queue → Workers (Parallel) → Results Database
      │                  │
      │                  ├→ Worker 1: Upload + Extract
      │                  ├→ Worker 2: Upload + Extract  
      │                  └→ Worker 3: Upload + Extract

      └→ Monitor: Credits, Errors, Progress

Step 1: Set Up Worker Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading

class BatchProcessor:
    def __init__(self, api_key: str, num_workers: int = 5):
        self.api_key = api_key
        self.base_url = "https://api.documind.cloud/api/v1"
        self.headers = {"X-API-Key": api_key}
        self.num_workers = num_workers
        
        # Tracking
        self.total_processed = 0
        self.total_failed = 0
        self.lock = threading.Lock()
    
    def process_batch(self, file_paths: list, schema: dict) -> list:
        """Process batch of documents in parallel"""
        results = []
        
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = {
                executor.submit(self._process_single, path, schema): path
                for path in file_paths
            }
            
            for future in as_completed(futures):
                file_path = futures[future]
                try:
                    result = future.result()
                    results.append(result)
                    
                    with self.lock:
                        self.total_processed += 1
                    
                    logger.info(f"✓ Processed {file_path}")
                
                except Exception as e:
                    with self.lock:
                        self.total_failed += 1
                    
                    logger.error(f"✗ Failed {file_path}: {e}")
                    results.append({
                        "file": file_path,
                        "success": False,
                        "error": str(e)
                    })
        
        return results
    
    def _process_single(self, file_path: str, schema: dict) -> dict:
        """Process a single document"""
        # Upload
        document_id = self._upload(file_path)
        
        # Extract
        result = self._extract(document_id, schema)
        
        return {
            "file": file_path,
            "document_id": document_id,
            "success": True,
            "data": result["results"]
        }

Step 2: Implement Queue-Based Processing

For very large batches, use a queue:
import queue
import time

def worker(work_queue: queue.Queue, results_queue: queue.Queue, processor: BatchProcessor, schema: dict):
    """Worker thread that processes documents from queue"""
    while True:
        try:
            file_path = work_queue.get(timeout=1)
            
            try:
                result = processor._process_single(file_path, schema)
                results_queue.put(result)
            except Exception as e:
                results_queue.put({
                    "file": file_path,
                    "success": False,
                    "error": str(e)
                })
            finally:
                work_queue.task_done()
        
        except queue.Empty:
            break

def process_large_batch(file_paths: list, schema: dict, num_workers: int = 10):
    """Process large batch using queue"""
    work_queue = queue.Queue()
    results_queue = queue.Queue()
    
    # Add files to work queue
    for path in file_paths:
        work_queue.put(path)
    
    # Start workers
    threads = []
    processor = BatchProcessor(api_key)
    
    for _ in range(num_workers):
        t = threading.Thread(
            target=worker,
            args=(work_queue, results_queue, processor, schema)
        )
        t.start()
        threads.append(t)
    
    # Wait for completion
    work_queue.join()
    
    # Collect results
    results = []
    while not results_queue.empty():
        results.append(results_queue.get())
    
    return results

Step 3: Add Progress Tracking

from tqdm import tqdm

class ProgressTracker:
    def __init__(self, total: int):
        self.pbar = tqdm(total=total, desc="Processing")
        self.successful = 0
        self.failed = 0
        self.lock = threading.Lock()
    
    def update(self, success: bool):
        with self.lock:
            if success:
                self.successful += 1
            else:
                self.failed += 1
            
            self.pbar.update(1)
            self.pbar.set_postfix({
                'success': self.successful,
                'failed': self.failed
            })
    
    def close(self):
        self.pbar.close()

# Usage
tracker = ProgressTracker(total=len(file_paths))

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {
        executor.submit(process_document, path): path
        for path in file_paths
    }
    
    for future in as_completed(futures):
        try:
            result = future.result()
            tracker.update(success=True)
        except Exception:
            tracker.update(success=False)

tracker.close()

Step 4: Implement Rate Limiting

from ratelimit import limits, sleep_and_retry

class RateLimitedProcessor(BatchProcessor):
    # Limit to 10 requests per second
    @sleep_and_retry
    @limits(calls=10, period=1)
    def _api_call(self, method: str, url: str, **kwargs):
        """Rate-limited API call"""
        if method == "POST":
            return requests.post(url, **kwargs)
        elif method == "GET":
            return requests.get(url, **kwargs)
    
    def _upload(self, file_path: str) -> str:
        """Upload with rate limiting"""
        with open(file_path, "rb") as f:
            files = {"files": f}
            response = self._api_call(
                "POST",
                f"{self.base_url}/upload",
                headers=self.headers,
                files=files
            )
        return response.json()[0]

Step 5: Handle Credits Management

def check_credits_before_batch(file_paths: list, credits_per_page: int = 2):
    """Estimate and check credits before processing"""
    # Estimate total pages
    total_pages = sum(get_page_count(path) for path in file_paths)
    estimated_cost = total_pages * credits_per_page
    
    # Check available credits
    response = requests.get(
        f"{BASE_URL}/usage/credits",
        headers=headers
    )
    available = response.json()["available_credits"]
    
    if available < estimated_cost:
        raise ValueError(
            f"Insufficient credits. Need {estimated_cost}, have {available}"
        )
    
    logger.info(f"Estimated cost: {estimated_cost} credits ({available} available)")
    return True

# Usage
try:
    check_credits_before_batch(file_paths)
    results = process_batch(file_paths)
except ValueError as e:
    logger.error(f"Cannot process batch: {e}")

Step 6: Implement Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class ResilientProcessor(BatchProcessor):
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type((requests.exceptions.HTTPError, requests.exceptions.Timeout)),
        reraise=True
    )
    def _process_with_retry(self, file_path: str, schema: dict):
        """Process with automatic retry"""
        return self._process_single(file_path, schema)

Complete Example: Daily Batch Job

import schedule
from pathlib import Path
from datetime import datetime

def daily_batch_job():
    """Process all pending documents daily"""
    logger.info("=== Starting daily batch job ===")
    
    # Find pending files
    pending_dir = Path("documents/pending")
    file_paths = list(pending_dir.glob("*.pdf"))
    
    if not file_paths:
        logger.info("No documents to process")
        return
    
    logger.info(f"Found {len(file_paths)} documents")
    
    # Check credits
    try:
        check_credits_before_batch(file_paths)
    except ValueError as e:
        logger.error(f"Insufficient credits: {e}")
        send_alert("Batch job failed: insufficient credits")
        return
    
    # Process batch
    processor = ResilientProcessor(api_key, num_workers=10)
    tracker = ProgressTracker(total=len(file_paths))
    
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {
            executor.submit(processor._process_with_retry, str(path), schema): path
            for path in file_paths
        }
        
        for future in as_completed(futures):
            file_path = futures[future]
            try:
                result = future.result()
                results.append(result)
                tracker.update(success=True)
                
                # Move to processed folder
                file_path.rename(Path("documents/processed") / file_path.name)
            
            except Exception as e:
                logger.error(f"Failed {file_path}: {e}")
                tracker.update(success=False)
                
                # Move to failed folder
                file_path.rename(Path("documents/failed") / file_path.name)
    
    tracker.close()
    
    # Generate report
    successful = sum(1 for r in results if r.get("success"))
    report = f"""
    Batch Job Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}
    ========================================
    Total Documents: {len(file_paths)}
    Successful: {successful}
    Failed: {len(file_paths) - successful}
    Success Rate: {successful/len(file_paths)*100:.1f}%
    """
    
    logger.info(report)
    send_report(report)

# Schedule to run daily at 2 AM
schedule.every().day.at("02:00").do(daily_batch_job)

while True:
    schedule.run_pending()
    time.sleep(60)

Optimization Strategies

1. Batch Upload

Upload multiple files at once:
# Upload in one request
files = [("files", open(path, "rb")) for path in file_paths[:10]]
response = requests.post(f"{BASE_URL}/upload", headers=headers, files=files)
document_ids = response.json()

# Close files
for f in files:
    f[1].close()

2. Parallel Extraction

# Extract from multiple documents simultaneously
with ThreadPoolExecutor(max_workers=5) as executor:
    extraction_futures = {
        executor.submit(extract_document, doc_id, schema): doc_id
        for doc_id in document_ids
    }
    
    extraction_results = []
    for future in as_completed(extraction_futures):
        extraction_results.append(future.result())

3. Smart Batching

Group documents by type:
def group_by_type(file_paths: list) -> dict:
    """Group files by document type"""
    groups = {"invoices": [], "receipts": [], "forms": []}
    
    for path in file_paths:
        doc_type = classify_document(path)  # Your classification logic
        groups[doc_type].append(path)
    
    return groups

# Process each type with appropriate schema
groups = group_by_type(file_paths)
for doc_type, paths in groups.items():
    schema = get_schema_for_type(doc_type)
    results = process_batch(paths, schema)

Monitoring Dashboard

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/status')
def get_status():
    return jsonify({
        "total_processed": processor.total_processed,
        "total_failed": processor.total_failed,
        "queue_size": work_queue.qsize(),
        "workers_active": sum(1 for t in threads if t.is_alive())
    })

# Run dashboard
app.run(port=8080)

Next Steps