Skip to main content

Overview

For automation pipelines, you need to detect when a human has reviewed and corrected an extraction. This guide shows how to implement efficient, robust polling to wait for the is_reviewed flag to become true.

The Review Lifecycle

1

Extraction Flagged

AI extraction completes with needs_review = true due to low-confidence fields.
2

Reviewer Notified

Human reviewer sees the extraction in the pending reviews queue.
3

Review Completed

Reviewer corrects fields and saves, setting is_reviewed = true.
4

Automation Resumes

Your polling detects the change and retrieves reviewed_results.

Basic Polling Implementation

Python Example

import requests
import time
from typing import Optional, Dict

def poll_for_review(
    document_id: str,
    api_key: str,
    base_url: str = "https://api.documind.com/api/v1",
    poll_interval: int = 10,
    timeout: int = 300
) -> Optional[Dict]:
    """
    Poll for extraction review completion.
    
    Args:
        document_id: UUID of the document
        api_key: Your API key
        poll_interval: Seconds between polls (default: 10)
        timeout: Maximum seconds to wait (default: 300/5min)
    
    Returns:
        Reviewed results or None if timeout
    """
    headers = {"X-API-Key": api_key}
    start_time = time.time()
    poll_count = 0
    
    while (time.time() - start_time) < timeout:
        poll_count += 1
        
        # Query extractions by document_id
        response = requests.get(
            f"{base_url}/data/extractions",
            headers=headers,
            params={
                "document_id": document_id,
                "limit": 1
            }
        )
        
        if response.status_code != 200:
            print(f"⚠️  API error: {response.status_code}")
            time.sleep(poll_interval)
            continue
        
        data = response.json()
        
        if not data.get("items"):
            print(f"⚠️  No extraction found for document {document_id}")
            time.sleep(poll_interval)
            continue
        
        extraction = data["items"][0]
        
        if extraction.get("is_reviewed"):
            elapsed = time.time() - start_time
            print(f"✓ Review completed after {elapsed:.1f}s ({poll_count} polls)")
            return extraction.get("reviewed_results")
        
        print(f"⏳ Waiting for review (poll #{poll_count}, {poll_interval}s interval)...")
        time.sleep(poll_interval)
    
    print(f"❌ Timeout after {timeout}s")
    return None

# Usage
import requests

API_KEY = "your_api_key"

# Extract data first
response = requests.post(
    f"https://api.documind.com/api/v1/extract/{document_id}",
    headers={"X-API-Key": API_KEY, "Content-Type": "application/json"},
    json={"schema": schema, "prompt": "Extract data"}
)
result = response.json()

if result["needs_review"]:
    print("⚠️  Document needs human review - direct team to: https://app.documind.com/review")
    
    reviewed_data = poll_for_review(
        document_id=result["document_id"],
        api_key=API_KEY,
        poll_interval=10,  # Check every 10 seconds
        timeout=600        # Give up after 10 minutes
    )
    
    if reviewed_data:
        process_invoice(reviewed_data)
    else:
        handle_timeout(document_id)
else:
    process_invoice(result["results"])

Node.js/TypeScript Example

interface ReviewResult {
  reviewed_results: Record<string, any>;
  is_reviewed: boolean;
}

async function pollForReview(
  documentId: string,
  apiKey: string,
  baseUrl: string = "https://api.documind.com/api/v1",
  pollInterval: number = 10000,  // milliseconds
  timeout: number = 300000       // milliseconds
): Promise<Record<string, any> | null> {
  const startTime = Date.now();
  let pollCount = 0;
  
  while ((Date.now() - startTime) < timeout) {
    pollCount++;
    
    try {
      const response = await fetch(
        `${baseUrl}/data/extractions?document_id=${documentId}&limit=1`,
        {
          headers: {
            'X-API-Key': apiKey
          }
        }
      );
      
      if (!response.ok) {
        console.warn(`⚠️  API error: ${response.status}`);
        await new Promise(resolve => setTimeout(resolve, pollInterval));
        continue;
      }
      
      const data = await response.json();
      
      if (!data.items || data.items.length === 0) {
        console.warn(`⚠️  No extraction found for document ${documentId}`);
        await new Promise(resolve => setTimeout(resolve, pollInterval));
        continue;
      }
      
      const extraction = data.items[0];
      
      if (extraction.is_reviewed) {
        const elapsed = (Date.now() - startTime) / 1000;
        console.log(`✓ Review completed after ${elapsed.toFixed(1)}s (${pollCount} polls)`);
        return extraction.reviewed_results;
      }
      
      console.log(`⏳ Waiting for review (poll #${pollCount}, ${pollInterval/1000}s interval)...`);
      await new Promise(resolve => setTimeout(resolve, pollInterval));
      
    } catch (error) {
      console.error(`⚠️  Poll error: ${error}`);
      await new Promise(resolve => setTimeout(resolve, pollInterval));
    }
  }
  
  console.error(`❌ Timeout after ${timeout/1000}s`);
  return null;
}

// Usage
const result = await extractData(documentId, schema);

if (result.needs_review) {
  console.log("⚠️  Document needs human review");
  
  const reviewedData = await pollForReview(
    result.document_id,
    process.env.API_KEY!,
    "https://api.documind.com/api/v1",
    10000,  // 10 seconds
    600000  // 10 minutes
  );
  
  if (reviewedData) {
    await processInvoice(reviewedData);
  } else {
    await handleTimeout(documentId);
  }
} else {
  await processInvoice(result.results);
}

Advanced Polling Strategies

Exponential Backoff

Reduce API calls by increasing poll interval over time:
import math

def poll_with_backoff(
    document_id: str,
    api_key: str,
    initial_interval: int = 5,
    max_interval: int = 60,
    timeout: int = 600
) -> Optional[Dict]:
    """Poll with exponential backoff."""
    headers = {"X-API-Key": api_key}
    start_time = time.time()
    poll_count = 0
    
    while (time.time() - start_time) < timeout:
        poll_count += 1
        
        # Check review status
        response = requests.get(
            "https://api.documind.com/api/v1/data/extractions",
            headers=headers,
            params={"document_id": document_id, "limit": 1}
        )
        
        data = response.json()
        if data["items"] and data["items"][0]["is_reviewed"]:
            return data["items"][0]["reviewed_results"]
        
        # Calculate next interval with exponential backoff
        interval = min(
            initial_interval * (2 ** (poll_count - 1)),
            max_interval
        )
        
        print(f"⏳ Poll #{poll_count}, waiting {interval}s...")
        time.sleep(interval)
    
    return None
Exponential backoff is ideal when you don’t know how long reviews will take. It starts fast for quick reviews but backs off for longer ones.

Batch Polling

Poll multiple documents efficiently:
from typing import List, Dict

def poll_batch_reviews(
    document_ids: List[str],
    api_key: str,
    poll_interval: int = 15,
    timeout: int = 600
) -> Dict[str, Optional[Dict]]:
    """
    Poll multiple documents, returning results as they're reviewed.
    
    Returns:
        Dictionary mapping document_id -> reviewed_results
    """
    headers = {"X-API-Key": api_key}
    start_time = time.time()
    remaining_ids = set(document_ids)
    results = {}
    
    while remaining_ids and (time.time() - start_time) < timeout:
        # Query all remaining documents
        response = requests.get(
            "https://api.documind.com/api/v1/data/extractions",
            headers=headers,
            params={
                "needs_review": True,
                "limit": len(remaining_ids)
            }
        )
        
        extractions = response.json()["items"]
        
        # Check each document
        for extraction in extractions:
            doc_id = extraction["document_id"]
            
            if doc_id in remaining_ids and extraction.get("is_reviewed"):
                print(f"✓ Review completed for {doc_id}")
                results[doc_id] = extraction["reviewed_results"]
                remaining_ids.remove(doc_id)
        
        if remaining_ids:
            print(f"⏳ Waiting for {len(remaining_ids)} documents...")
            time.sleep(poll_interval)
    
    # Mark timed-out documents
    for doc_id in remaining_ids:
        results[doc_id] = None
    
    return results

# Usage
doc_ids = ["uuid1", "uuid2", "uuid3"]
reviewed = poll_batch_reviews(doc_ids, API_KEY, poll_interval=15)

for doc_id, data in reviewed.items():
    if data:
        process_document(doc_id, data)
    else:
        handle_timeout(doc_id)

Concurrent Polling

Use asyncio for efficient concurrent polling:
import asyncio
import aiohttp
from typing import List, Optional

async def async_poll_review(
    session: aiohttp.ClientSession,
    document_id: str,
    api_key: str,
    poll_interval: int = 10,
    timeout: int = 300
) -> Optional[Dict]:
    """Async polling for a single document."""
    headers = {"X-API-Key": api_key}
    start_time = asyncio.get_event_loop().time()
    
    while (asyncio.get_event_loop().time() - start_time) < timeout:
        try:
            async with session.get(
                f"https://api.documind.com/api/v1/data/extractions",
                headers=headers,
                params={"document_id": document_id, "limit": 1}
            ) as response:
                data = await response.json()
                
                if data["items"] and data["items"][0]["is_reviewed"]:
                    return data["items"][0]["reviewed_results"]
            
            await asyncio.sleep(poll_interval)
            
        except Exception as e:
            print(f"⚠️  Error polling {document_id}: {e}")
            await asyncio.sleep(poll_interval)
    
    return None

async def poll_multiple_documents(
    document_ids: List[str],
    api_key: str
) -> Dict[str, Optional[Dict]]:
    """Poll multiple documents concurrently."""
    async with aiohttp.ClientSession() as session:
        tasks = [
            async_poll_review(session, doc_id, api_key)
            for doc_id in document_ids
        ]
        
        results = await asyncio.gather(*tasks)
        return dict(zip(document_ids, results))

# Usage
document_ids = ["uuid1", "uuid2", "uuid3"]
results = asyncio.run(poll_multiple_documents(document_ids, API_KEY))

Best Practices

Balance responsiveness vs API usage:
IntervalUse CaseAPI Calls/Hour
5sReal-time processing720
10sStandard automation360
30sBatch processing120
60sLow-priority jobs60
10-15 second intervals work well for most automation scenarios.
Set realistic timeouts based on your review SLA:
# Production settings
REVIEW_TIMEOUT = {
    "critical": 300,    # 5 minutes
    "standard": 600,    # 10 minutes
    "batch": 3600       # 1 hour
}

timeout = REVIEW_TIMEOUT[priority]
Don’t let transient errors stop your polling:
try:
    response = requests.get(url, headers=headers, timeout=30)
    response.raise_for_status()
except requests.exceptions.Timeout:
    print("⚠️  Request timeout, retrying...")
    continue
except requests.exceptions.HTTPError as e:
    if e.response.status_code >= 500:
        print("⚠️  Server error, retrying...")
        continue
    else:
        raise  # Don't retry 4xx errors
Track polling for debugging and optimization:
import logging

logger = logging.getLogger(__name__)

def poll_for_review(document_id, ...):
    logger.info(f"Starting poll for {document_id}")
    
    while ...:
        logger.debug(f"Poll attempt #{poll_count}")
        
        if is_reviewed:
            logger.info(f"Review completed in {elapsed}s")
            return result
    
    logger.warning(f"Poll timeout for {document_id}")
    return None

Query Options

Filter by Document ID

Most efficient for single-document polling:
GET /data/extractions?document_id=550e8400-e29b-41d4-a716-446655440000&limit=1

Filter by Status

For batch polling of pending reviews:
GET /data/extractions?needs_review=true&is_reviewed=false&limit=50

Filter by Date Range

Poll for documents extracted in a specific timeframe:
GET /data/extractions?created_after=2024-01-15T00:00:00Z&needs_review=true

Monitoring & Alerting

Track these metrics:
class PollMetrics:
    def __init__(self):
        self.total_polls = 0
        self.successful_reviews = 0
        self.timeouts = 0
        self.avg_review_time = []
    
    def record_success(self, review_time: float):
        self.successful_reviews += 1
        self.avg_review_time.append(review_time)
    
    def record_timeout(self):
        self.timeouts += 1
    
    def get_stats(self):
        return {
            "success_rate": self.successful_reviews / self.total_polls * 100,
            "timeout_rate": self.timeouts / self.total_polls * 100,
            "avg_review_time": sum(self.avg_review_time) / len(self.avg_review_time)
        }

metrics = PollMetrics()

# Use in polling
if reviewed_data:
    metrics.record_success(elapsed_time)
else:
    metrics.record_timeout()
If timeout rate exceeds 20%, review your timeout settings or alert your review team about delays.

Complete Example

Here’s a production-ready polling implementation:
import requests
import time
import logging
from typing import Optional, Dict
from dataclasses import dataclass

@dataclass
class PollConfig:
    """Configuration for polling behavior."""
    poll_interval: int = 10      # seconds
    timeout: int = 600            # seconds
    backoff: bool = False         # exponential backoff
    max_interval: int = 60        # max backoff interval
    
class ReviewPoller:
    """Robust review polling with logging and metrics."""
    
    def __init__(self, api_key: str, base_url: str = "https://api.documind.com/api/v1"):
        self.api_key = api_key
        self.base_url = base_url
        self.logger = logging.getLogger(__name__)
        self.headers = {"X-API-Key": api_key}
    
    def poll(
        self,
        document_id: str,
        config: PollConfig = PollConfig()
    ) -> Optional[Dict]:
        """
        Poll for review completion with configurable behavior.
        
        Returns reviewed_results or None if timeout.
        """
        start_time = time.time()
        poll_count = 0
        
        self.logger.info(f"Starting poll for {document_id}")
        
        while (time.time() - start_time) < config.timeout:
            poll_count += 1
            
            try:
                # Query extraction status
                response = requests.get(
                    f"{self.base_url}/data/extractions",
                    headers=self.headers,
                    params={"document_id": document_id, "limit": 1},
                    timeout=30
                )
                
                response.raise_for_status()
                data = response.json()
                
                if not data.get("items"):
                    self.logger.warning(f"No extraction found: {document_id}")
                    time.sleep(config.poll_interval)
                    continue
                
                extraction = data["items"][0]
                
                if extraction.get("is_reviewed"):
                    elapsed = time.time() - start_time
                    self.logger.info(
                        f"Review completed: {document_id} "
                        f"({elapsed:.1f}s, {poll_count} polls)"
                    )
                    return extraction.get("reviewed_results")
                
                # Calculate next poll interval
                if config.backoff:
                    interval = min(
                        config.poll_interval * (2 ** (poll_count - 1)),
                        config.max_interval
                    )
                else:
                    interval = config.poll_interval
                
                self.logger.debug(
                    f"Poll #{poll_count}, waiting {interval}s..."
                )
                time.sleep(interval)
                
            except requests.exceptions.RequestException as e:
                self.logger.error(f"Poll error: {e}")
                time.sleep(config.poll_interval)
        
        elapsed = time.time() - start_time
        self.logger.warning(
            f"Poll timeout: {document_id} ({elapsed:.1f}s, {poll_count} polls)"
        )
        return None

# Usage
poller = ReviewPoller(api_key=API_KEY)

# Extract data
response = requests.post(
    f"https://api.documind.com/api/v1/extract/{document_id}",
    headers={"X-API-Key": API_KEY, "Content-Type": "application/json"},
    json={"schema": schema, "prompt": "Extract data"}
)
result = response.json()

if result["needs_review"]:
    reviewed = poller.poll(
        document_id=result["document_id"],
        config=PollConfig(
            poll_interval=10,
            timeout=600,
            backoff=True
        )
    )
    
    if reviewed:
        process_data(reviewed)
    else:
        handle_timeout(result["document_id"])

Next Steps