Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.documind.cloud/llms.txt

Use this file to discover all available pages before exploring further.

Overview

Learn how to process large volumes of documents efficiently using parallel processing, queuing, and optimization strategies.

Architecture

Documents Queue → Workers (Parallel) → Results Database
      │                  │
      │                  ├→ Worker 1: Upload + Extract
      │                  ├→ Worker 2: Upload + Extract  
      │                  └→ Worker 3: Upload + Extract

      └→ Monitor: Credits, Errors, Progress

Step 1: Set Up Worker Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
from queue import Queue
import threading

class BatchProcessor:
    def __init__(self, api_key: str, num_workers: int = 5):
        self.api_key = api_key
        self.base_url = "https://api.documind.cloud/api/v1"
        self.headers = {"X-API-Key": api_key}
        self.num_workers = num_workers
        
        # Tracking
        self.total_processed = 0
        self.total_failed = 0
        self.lock = threading.Lock()
    
    def process_batch(self, file_paths: list, schema: dict) -> list:
        """Process batch of documents in parallel"""
        results = []
        
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            futures = {
                executor.submit(self._process_single, path, schema): path
                for path in file_paths
            }
            
            for future in as_completed(futures):
                file_path = futures[future]
                try:
                    result = future.result()
                    results.append(result)
                    
                    with self.lock:
                        self.total_processed += 1
                    
                    logger.info(f"✓ Processed {file_path}")
                
                except Exception as e:
                    with self.lock:
                        self.total_failed += 1
                    
                    logger.error(f"✗ Failed {file_path}: {e}")
                    results.append({
                        "file": file_path,
                        "success": False,
                        "error": str(e)
                    })
        
        return results
    
    def _process_single(self, file_path: str, schema: dict) -> dict:
        """Process a single document"""
        # Upload
        document_id = self._upload(file_path)
        
        # Extract
        result = self._extract(document_id, schema)
        
        return {
            "file": file_path,
            "document_id": document_id,
            "success": True,
            "data": result["results"]
        }

Step 2: Implement Queue-Based Processing

For very large batches, use a queue:
import queue
import time

def worker(work_queue: queue.Queue, results_queue: queue.Queue, processor: BatchProcessor, schema: dict):
    """Worker thread that processes documents from queue"""
    while True:
        try:
            file_path = work_queue.get(timeout=1)
            
            try:
                result = processor._process_single(file_path, schema)
                results_queue.put(result)
            except Exception as e:
                results_queue.put({
                    "file": file_path,
                    "success": False,
                    "error": str(e)
                })
            finally:
                work_queue.task_done()
        
        except queue.Empty:
            break

def process_large_batch(file_paths: list, schema: dict, num_workers: int = 10):
    """Process large batch using queue"""
    work_queue = queue.Queue()
    results_queue = queue.Queue()
    
    # Add files to work queue
    for path in file_paths:
        work_queue.put(path)
    
    # Start workers
    threads = []
    processor = BatchProcessor(api_key)
    
    for _ in range(num_workers):
        t = threading.Thread(
            target=worker,
            args=(work_queue, results_queue, processor, schema)
        )
        t.start()
        threads.append(t)
    
    # Wait for completion
    work_queue.join()
    
    # Collect results
    results = []
    while not results_queue.empty():
        results.append(results_queue.get())
    
    return results

Step 3: Add Progress Tracking

from tqdm import tqdm

class ProgressTracker:
    def __init__(self, total: int):
        self.pbar = tqdm(total=total, desc="Processing")
        self.successful = 0
        self.failed = 0
        self.lock = threading.Lock()
    
    def update(self, success: bool):
        with self.lock:
            if success:
                self.successful += 1
            else:
                self.failed += 1
            
            self.pbar.update(1)
            self.pbar.set_postfix({
                'success': self.successful,
                'failed': self.failed
            })
    
    def close(self):
        self.pbar.close()

# Usage
tracker = ProgressTracker(total=len(file_paths))

with ThreadPoolExecutor(max_workers=5) as executor:
    futures = {
        executor.submit(process_document, path): path
        for path in file_paths
    }
    
    for future in as_completed(futures):
        try:
            result = future.result()
            tracker.update(success=True)
        except Exception:
            tracker.update(success=False)

tracker.close()

Step 4: Implement Rate Limiting

from ratelimit import limits, sleep_and_retry

class RateLimitedProcessor(BatchProcessor):
    # Limit to 10 requests per second
    @sleep_and_retry
    @limits(calls=10, period=1)
    def _api_call(self, method: str, url: str, **kwargs):
        """Rate-limited API call"""
        if method == "POST":
            return requests.post(url, **kwargs)
        elif method == "GET":
            return requests.get(url, **kwargs)
    
    def _upload(self, file_path: str) -> str:
        """Upload with rate limiting"""
        with open(file_path, "rb") as f:
            files = {"files": f}
            response = self._api_call(
                "POST",
                f"{self.base_url}/upload",
                headers=self.headers,
                files=files
            )
        return response.json()[0]

Step 5: Handle Credits Management

def check_credits_before_batch(file_paths: list, credits_per_page: int = 2):
    """Estimate and check credits before processing"""
    # Estimate total pages
    total_pages = sum(get_page_count(path) for path in file_paths)
    estimated_cost = total_pages * credits_per_page
    
    # Check available credits
    response = requests.get(
        f"{BASE_URL}/usage/credits",
        headers=headers
    )
    available = response.json()["available_credits"]
    
    if available < estimated_cost:
        raise ValueError(
            f"Insufficient credits. Need {estimated_cost}, have {available}"
        )
    
    logger.info(f"Estimated cost: {estimated_cost} credits ({available} available)")
    return True

# Usage
try:
    check_credits_before_batch(file_paths)
    results = process_batch(file_paths)
except ValueError as e:
    logger.error(f"Cannot process batch: {e}")

Step 6: Implement Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type

class ResilientProcessor(BatchProcessor):
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=2, max=10),
        retry=retry_if_exception_type((requests.exceptions.HTTPError, requests.exceptions.Timeout)),
        reraise=True
    )
    def _process_with_retry(self, file_path: str, schema: dict):
        """Process with automatic retry"""
        return self._process_single(file_path, schema)

Complete Example: Daily Batch Job

import schedule
from pathlib import Path
from datetime import datetime

def daily_batch_job():
    """Process all pending documents daily"""
    logger.info("=== Starting daily batch job ===")
    
    # Find pending files
    pending_dir = Path("documents/pending")
    file_paths = list(pending_dir.glob("*.pdf"))
    
    if not file_paths:
        logger.info("No documents to process")
        return
    
    logger.info(f"Found {len(file_paths)} documents")
    
    # Check credits
    try:
        check_credits_before_batch(file_paths)
    except ValueError as e:
        logger.error(f"Insufficient credits: {e}")
        send_alert("Batch job failed: insufficient credits")
        return
    
    # Process batch
    processor = ResilientProcessor(api_key, num_workers=10)
    tracker = ProgressTracker(total=len(file_paths))
    
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {
            executor.submit(processor._process_with_retry, str(path), schema): path
            for path in file_paths
        }
        
        for future in as_completed(futures):
            file_path = futures[future]
            try:
                result = future.result()
                results.append(result)
                tracker.update(success=True)
                
                # Move to processed folder
                file_path.rename(Path("documents/processed") / file_path.name)
            
            except Exception as e:
                logger.error(f"Failed {file_path}: {e}")
                tracker.update(success=False)
                
                # Move to failed folder
                file_path.rename(Path("documents/failed") / file_path.name)
    
    tracker.close()
    
    # Generate report
    successful = sum(1 for r in results if r.get("success"))
    report = f"""
    Batch Job Report - {datetime.now().strftime('%Y-%m-%d %H:%M')}
    ========================================
    Total Documents: {len(file_paths)}
    Successful: {successful}
    Failed: {len(file_paths) - successful}
    Success Rate: {successful/len(file_paths)*100:.1f}%
    """
    
    logger.info(report)
    send_report(report)

# Schedule to run daily at 2 AM
schedule.every().day.at("02:00").do(daily_batch_job)

while True:
    schedule.run_pending()
    time.sleep(60)

Optimization Strategies

1. Batch Upload

Upload multiple files at once:
# Upload in one request
files = [("files", open(path, "rb")) for path in file_paths[:10]]
response = requests.post(f"{BASE_URL}/upload", headers=headers, files=files)
document_ids = response.json()

# Close files
for f in files:
    f[1].close()

2. Parallel Extraction

# Extract from multiple documents simultaneously
with ThreadPoolExecutor(max_workers=5) as executor:
    extraction_futures = {
        executor.submit(extract_document, doc_id, schema): doc_id
        for doc_id in document_ids
    }
    
    extraction_results = []
    for future in as_completed(extraction_futures):
        extraction_results.append(future.result())

3. Smart Batching

Group documents by type:
def group_by_type(file_paths: list) -> dict:
    """Group files by document type"""
    groups = {"invoices": [], "receipts": [], "forms": []}
    
    for path in file_paths:
        doc_type = classify_document(path)  # Your classification logic
        groups[doc_type].append(path)
    
    return groups

# Process each type with appropriate schema
groups = group_by_type(file_paths)
for doc_type, paths in groups.items():
    schema = get_schema_for_type(doc_type)
    results = process_batch(paths, schema)

Monitoring Dashboard

from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/status')
def get_status():
    return jsonify({
        "total_processed": processor.total_processed,
        "total_failed": processor.total_failed,
        "queue_size": work_queue.qsize(),
        "workers_active": sum(1 for t in threads if t.is_alive())
    })

# Run dashboard
app.run(port=8080)

Next Steps

Performance Optimization

Optimize speed and performance

Error Handling

Handle failures gracefully

Invoice Processing

Batch process invoices

API Reference

Explore batch endpoints