Skip to main content

Complete Automation Workflow

This guide shows production-ready patterns for integrating Documind into your automation pipelines. All examples handle the complete workflow including review polling.

Pattern 1: Batch Processing

Process multiple documents with parallel extraction and review handling.
import concurrent.futures
import requests
import time
from typing import List, Dict, Tuple

class BatchDocumentProcessor:
    """Process multiple documents with review handling."""
    
    def __init__(self, api_key: str, max_workers: int = 5):
        self.api_key = api_key
        self.base_url = "https://api.documind.com/api/v1"
        self.headers = {"X-API-Key": api_key}
        self.max_workers = max_workers
    
    def process_batch(
        self,
        file_paths: List[str],
        schema: Dict,
        mode: str = "advanced"
    ) -> List[Tuple[str, Dict]]:
        """
        Process batch of documents in parallel.
        
        Returns:
            List of (filename, extracted_data) tuples
        """
        # Phase 1: Upload all documents
        print(f"📤 Uploading {len(file_paths)} documents...")
        doc_mappings = self._upload_batch(file_paths)
        print(f"✓ All documents uploaded")
        
        # Phase 2: Extract in parallel
        print(f"🔍 Extracting data (using {self.max_workers} workers)...")
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            extract_futures = {
                executor.submit(self._extract_one, doc_id, schema, mode): filename
                for filename, doc_id in doc_mappings.items()
            }
            
            extractions = {}
            for future in concurrent.futures.as_completed(extract_futures):
                filename = extract_futures[future]
                try:
                    result = future.result()
                    extractions[filename] = result
                    print(f"  ✓ {filename}")
                except Exception as e:
                    print(f"  ✗ {filename}: {e}")
                    extractions[filename] = None
        
        # Phase 3: Separate into immediate vs review needed
        immediate_results = []
        needs_review = []
        
        for filename, result in extractions.items():
            if result is None:
                continue
            
            if result["needs_review"]:
                needs_review.append((filename, result["document_id"]))
            else:
                immediate_results.append((filename, result["results"]))
        
        print(f"\n📊 Results:")
        print(f"  ✓ Immediate: {len(immediate_results)}")
        print(f"  ⚠️  Need review: {len(needs_review)}")
        
        # Phase 4: Poll for reviews in parallel
        if needs_review:
            print(f"\n⏳ Waiting for {len(needs_review)} reviews...")
            reviewed_results = self._poll_batch_reviews(
                [doc_id for _, doc_id in needs_review],
                timeout=600
            )
            
            for (filename, doc_id) in needs_review:
                if reviewed_results.get(doc_id):
                    immediate_results.append((filename, reviewed_results[doc_id]))
                    print(f"  ✓ {filename} reviewed")
                else:
                    print(f"  ⚠️  {filename} review timeout")
        
        return immediate_results
    
    def _upload_batch(self, file_paths: List[str]) -> Dict[str, str]:
        """Upload multiple files, return filename -> doc_id mapping."""
        mappings = {}
        
        # Upload in batches of 100 (API limit)
        for i in range(0, len(file_paths), 100):
            batch = file_paths[i:i+100]
            files = [("files", open(f, "rb")) for f in batch]
            
            try:
                response = requests.post(
                    f"{self.base_url}/upload",
                    headers=self.headers,
                    files=files
                )
                response.raise_for_status()
                doc_ids = response.json()
                
                for file_path, doc_id in zip(batch, doc_ids):
                    mappings[file_path] = doc_id
            finally:
                for _, fh in files:
                    fh.close()
        
        return mappings
    
    def _extract_one(self, doc_id: str, schema: Dict, mode: str) -> Dict:
        """Extract data from one document."""
        config = {
            "schema": schema,
            "prompt": "Extract all data accurately"
        }
        
        if mode == "basic":
            config["model"] = "openai-gpt-4.1"
        elif mode == "vlm":
            config["extraction_mode"] = "vlm"
            config["review_threshold"] = 85
        else:  # advanced mode - no model or extraction_mode specified
            config["review_threshold"] = 85
        
        response = requests.post(
            f"{self.base_url}/extract/{doc_id}",
            headers={**self.headers, "Content-Type": "application/json"},
            json=config
        )
        response.raise_for_status()
        return response.json()
    
    def _poll_batch_reviews(
        self,
        doc_ids: List[str],
        timeout: int = 600
    ) -> Dict[str, Dict]:
        """Poll multiple documents for review completion."""
        start = time.time()
        remaining = set(doc_ids)
        results = {}
        
        while remaining and (time.time() - start) < timeout:
            # Query all remaining documents
            response = requests.get(
                f"{self.base_url}/data/extractions",
                headers=self.headers,
                params={
                    "needs_review": True,
                    "limit": len(remaining)
                }
            )
            
            for extraction in response.json()["items"]:
                doc_id = extraction["document_id"]
                
                if doc_id in remaining and extraction["is_reviewed"]:
                    results[doc_id] = extraction["reviewed_results"]
                    remaining.remove(doc_id)
            
            if remaining:
                time.sleep(10)
        
        return results

# Usage
processor = BatchDocumentProcessor(api_key="your_api_key", max_workers=5)

invoice_schema = {
    "named_entities": {
        "invoice_number": {"type": "string"},
        "total_amount": {"type": "number"},
        "vendor_name": {"type": "string"}
    },
    "required": ["invoice_number", "total_amount"]
}

# Process batch
file_paths = [f"invoices/invoice_{i:03d}.pdf" for i in range(1, 21)]
results = processor.process_batch(
    file_paths=file_paths,
    schema=invoice_schema,
    mode="advanced"
)

# Process results
for filename, data in results:
    print(f"Processing {filename}: ${data['total_amount']}")
    update_accounting_system(data)

Pattern 2: Scheduled Job

Daily processing with error handling and retry logic.
import schedule
import logging
from datetime import datetime, timedelta
from pathlib import Path

class DailyInvoiceProcessor:
    """Scheduled invoice processing with retry."""
    
    def __init__(self, api_key: str, input_dir: str, output_dir: str):
        self.api_key = api_key
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.error_dir = Path(output_dir) / "errors"
        self.error_dir.mkdir(exist_ok=True)
        
        # Setup logging
        logging.basicConfig(
            filename="invoice_processor.log",
            level=logging.INFO,
            format="%(asctime)s - %(levelname)s - %(message)s"
        )
        self.logger = logging.getLogger(__name__)
    
    def run_daily_job(self):
        """Main job: process all PDFs in input directory."""
        self.logger.info("="*50)
        self.logger.info("Starting daily invoice processing")
        
        # Find all PDFs
        pdf_files = list(self.input_dir.glob("*.pdf"))
        self.logger.info(f"Found {len(pdf_files)} PDF files")
        
        if not pdf_files:
            self.logger.info("No files to process")
            return
        
        # Process with retry logic
        processed = []
        failed = []
        
        for pdf_file in pdf_files:
            try:
                self.logger.info(f"Processing {pdf_file.name}")
                result = self._process_with_retry(pdf_file, max_retries=3)
                
                if result:
                    processed.append(pdf_file)
                    self._save_result(pdf_file, result)
                    self._archive_file(pdf_file)
                    self.logger.info(f"  ✓ Success: {pdf_file.name}")
                else:
                    failed.append(pdf_file)
                    self._move_to_errors(pdf_file)
                    self.logger.error(f"  ✗ Failed: {pdf_file.name}")
                    
            except Exception as e:
                failed.append(pdf_file)
                self._move_to_errors(pdf_file)
                self.logger.error(f"  ✗ Error processing {pdf_file.name}: {e}")
        
        # Summary
        self.logger.info(f"Job complete: {len(processed)} processed, {len(failed)} failed")
        self._send_summary_email(processed, failed)
    
    def _process_with_retry(self, file_path: Path, max_retries: int = 3) -> dict:
        """Process file with exponential backoff retry."""
        for attempt in range(max_retries):
            try:
                # Upload
                doc_id = self._upload(file_path)
                
                # Extract
                result = self._extract(doc_id)
                
                # Handle review
                if result["needs_review"]:
                    self.logger.info(f"  ⚠️  Needs review, waiting...")
                    reviewed = self._poll_review(doc_id, timeout=600)
                    
                    if reviewed:
                        return reviewed
                    else:
                        self.logger.warning(f"  Review timeout on attempt {attempt + 1}")
                        if attempt < max_retries - 1:
                            time.sleep(2 ** attempt)  # Exponential backoff
                            continue
                        return None
                else:
                    return result["results"]
                    
            except requests.exceptions.RequestException as e:
                self.logger.warning(f"  Attempt {attempt + 1} failed: {e}")
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)
                else:
                    raise
        
        return None
    
    def _upload(self, file_path: Path) -> str:
        """Upload file and return document ID."""
        with open(file_path, "rb") as f:
            response = requests.post(
                "https://api.documind.com/api/v1/upload",
                headers={"X-API-Key": self.api_key},
                files={"files": f}
            )
        response.raise_for_status()
        return response.json()[0]
    
    def _extract(self, doc_id: str) -> dict:
        """Extract data from document."""
        response = requests.post(
            f"https://api.documind.com/api/v1/extract/{doc_id}",
            headers={
                "X-API-Key": self.api_key,
                "Content-Type": "application/json"
            },
            json={
                "schema": self._get_invoice_schema(),
                # Advanced mode - no model or extraction_mode
                "review_threshold": 85
            }
        )
        response.raise_for_status()
        return response.json()
    
    def _poll_review(self, doc_id: str, timeout: int) -> dict:
        """Poll for review completion."""
        start = time.time()
        
        while (time.time() - start) < timeout:
            response = requests.get(
                "https://api.documind.com/api/v1/data/extractions",
                headers={"X-API-Key": self.api_key},
                params={"document_id": doc_id, "limit": 1}
            )
            
            data = response.json()
            if data["items"] and data["items"][0]["is_reviewed"]:
                return data["items"][0]["reviewed_results"]
            
            time.sleep(10)
        
        return None
    
    def _get_invoice_schema(self) -> dict:
        """Get invoice extraction schema."""
        return {
            "named_entities": {
                "invoice_number": {"type": "string"},
                "invoice_date": {"type": "string"},
                "vendor_name": {"type": "string"},
                "total_amount": {"type": "number"}
            },
            "required": ["invoice_number", "total_amount"]
        }
    
    def _save_result(self, file_path: Path, result: dict):
        """Save extraction result as JSON."""
        output_file = self.output_dir / f"{file_path.stem}.json"
        with open(output_file, "w") as f:
            json.dump(result, f, indent=2)
    
    def _archive_file(self, file_path: Path):
        """Move processed file to archive."""
        archive_dir = self.output_dir / "archive" / datetime.now().strftime("%Y-%m")
        archive_dir.mkdir(parents=True, exist_ok=True)
        file_path.rename(archive_dir / file_path.name)
    
    def _move_to_errors(self, file_path: Path):
        """Move failed file to error directory."""
        file_path.rename(self.error_dir / file_path.name)
    
    def _send_summary_email(self, processed: List, failed: List):
        """Send daily summary email."""
        # Implement your email logic here
        pass

# Setup scheduled job
processor = DailyInvoiceProcessor(
    api_key="your_api_key",
    input_dir="/path/to/incoming/invoices",
    output_dir="/path/to/processed"
)

# Run every day at 9 AM
schedule.every().day.at("09:00").do(processor.run_daily_job)

while True:
    schedule.run_pending()
    time.sleep(60)

Pattern 3: Event-Driven Processing

Process documents as they arrive using file system watcher.
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from pathlib import Path
from queue import Queue
import threading

class DocumentHandler(FileSystemEventHandler):
    """Watch for new documents and process them."""
    
    def __init__(self, api_key: str, schema: dict, process_queue: Queue):
        self.api_key = api_key
        self.schema = schema
        self.queue = process_queue
    
    def on_created(self, event):
        """Handle new file creation."""
        if event.is_directory:
            return
        
        file_path = Path(event.src_path)
        
        # Only process PDFs
        if file_path.suffix.lower() != ".pdf":
            return
        
        print(f"📄 New document detected: {file_path.name}")
        
        # Add to processing queue
        self.queue.put(file_path)

class DocumentProcessor(threading.Thread):
    """Background processor for document queue."""
    
    def __init__(self, api_key: str, schema: dict, process_queue: Queue):
        super().__init__(daemon=True)
        self.api_key = api_key
        self.schema = schema
        self.queue = process_queue
        self.running = True
    
    def run(self):
        """Process documents from queue."""
        while self.running:
            try:
                # Get file from queue (with timeout)
                file_path = self.queue.get(timeout=1)
                
                print(f"🔍 Processing {file_path.name}...")
                result = self._process_document(file_path)
                
                if result:
                    print(f"✓ {file_path.name} processed")
                    self._save_and_archive(file_path, result)
                else:
                    print(f"✗ {file_path.name} failed")
                
                self.queue.task_done()
                
            except Queue.Empty:
                continue
            except Exception as e:
                print(f"Error: {e}")
    
    def _process_document(self, file_path: Path) -> dict:
        """Upload, extract, and handle review."""
        # Upload
        with open(file_path, "rb") as f:
            response = requests.post(
                "https://api.documind.com/api/v1/upload",
                headers={"X-API-Key": self.api_key},
                files={"files": f}
            )
        doc_id = response.json()[0]
        
        # Extract
        response = requests.post(
            f"https://api.documind.com/api/v1/extract/{doc_id}",
            headers={
                "X-API-Key": self.api_key,
                "Content-Type": "application/json"
            },
            json={
                "schema": self.schema,
                # Advanced mode - no model or extraction_mode
                "review_threshold": 85
            }
        )
        result = response.json()
        
        # Handle review if needed
        if result["needs_review"]:
            print(f"  ⚠️  {file_path.name} needs review")
            reviewed = self._wait_for_review(doc_id)
            return reviewed if reviewed else None
        
        return result["results"]
    
    def _wait_for_review(self, doc_id: str, timeout: int = 600) -> dict:
        """Wait for review completion."""
        start = time.time()
        
        while (time.time() - start) < timeout:
            response = requests.get(
                f"https://api.documind.com/api/v1/data/extractions",
                headers={"X-API-Key": self.api_key},
                params={"document_id": doc_id, "limit": 1}
            )
            
            data = response.json()
            if data["items"] and data["items"][0]["is_reviewed"]:
                return data["items"][0]["reviewed_results"]
            
            time.sleep(10)
        
        return None
    
    def _save_and_archive(self, file_path: Path, result: dict):
        """Save result and move file to archive."""
        # Save JSON result
        output_file = file_path.parent / "processed" / f"{file_path.stem}.json"
        output_file.parent.mkdir(exist_ok=True)
        
        with open(output_file, "w") as f:
            json.dump(result, f, indent=2)
        
        # Archive original
        archive_dir = file_path.parent / "archive"
        archive_dir.mkdir(exist_ok=True)
        file_path.rename(archive_dir / file_path.name)

# Setup
watch_dir = "/path/to/watch/directory"
schema = {...}  # Your schema
api_key = "your_api_key"

# Create queue and start processor
process_queue = Queue()
processor = DocumentProcessor(api_key, schema, process_queue)
processor.start()

# Setup file watcher
event_handler = DocumentHandler(api_key, schema, process_queue)
observer = Observer()
observer.schedule(event_handler, watch_dir, recursive=False)
observer.start()

print(f"👀 Watching {watch_dir} for new documents...")

try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    observer.stop()
    processor.running = False

observer.join()
processor.join()

Best Practices Summary

  • Implement retry logic with exponential backoff
  • Log all errors with context
  • Have fallback for timeout scenarios
  • Alert on high failure rates
  • Use parallel processing for batch jobs
  • Poll efficiently (10-15s intervals)
  • Cache schema definitions
  • Implement connection pooling
  • Track success/failure rates
  • Monitor review completion times
  • Alert on queue backlogs
  • Log credit usage
  • Store API keys in environment variables
  • Use least-privilege API key scopes
  • Rotate keys periodically
  • Encrypt sensitive extracted data

Next Steps