Overview
For automation pipelines, you need to detect when a human has reviewed and corrected an extraction. This guide shows how to implement efficient, robust polling to wait for theis_reviewed flag to become true.
The Review Lifecycle
1
Extraction Flagged
AI extraction completes with
needs_review = true due to low-confidence fields.2
Reviewer Notified
Human reviewer sees the extraction in the pending reviews queue.
3
Review Completed
Reviewer corrects fields and saves, setting
is_reviewed = true.4
Automation Resumes
Your polling detects the change and retrieves
reviewed_results.Basic Polling Implementation
Python Example
Copy
import requests
import time
from typing import Optional, Dict
def poll_for_review(
document_id: str,
api_key: str,
base_url: str = "https://api.documind.com/api/v1",
poll_interval: int = 10,
timeout: int = 300
) -> Optional[Dict]:
"""
Poll for extraction review completion.
Args:
document_id: UUID of the document
api_key: Your API key
poll_interval: Seconds between polls (default: 10)
timeout: Maximum seconds to wait (default: 300/5min)
Returns:
Reviewed results or None if timeout
"""
headers = {"X-API-Key": api_key}
start_time = time.time()
poll_count = 0
while (time.time() - start_time) < timeout:
poll_count += 1
# Query extractions by document_id
response = requests.get(
f"{base_url}/data/extractions",
headers=headers,
params={
"document_id": document_id,
"limit": 1
}
)
if response.status_code != 200:
print(f"⚠️ API error: {response.status_code}")
time.sleep(poll_interval)
continue
data = response.json()
if not data.get("items"):
print(f"⚠️ No extraction found for document {document_id}")
time.sleep(poll_interval)
continue
extraction = data["items"][0]
if extraction.get("is_reviewed"):
elapsed = time.time() - start_time
print(f"✓ Review completed after {elapsed:.1f}s ({poll_count} polls)")
return extraction.get("reviewed_results")
print(f"⏳ Waiting for review (poll #{poll_count}, {poll_interval}s interval)...")
time.sleep(poll_interval)
print(f"❌ Timeout after {timeout}s")
return None
# Usage
import requests
API_KEY = "your_api_key"
# Extract data first
response = requests.post(
f"https://api.documind.com/api/v1/extract/{document_id}",
headers={"X-API-Key": API_KEY, "Content-Type": "application/json"},
json={"schema": schema, "prompt": "Extract data"}
)
result = response.json()
if result["needs_review"]:
print("⚠️ Document needs human review - direct team to: https://app.documind.com/review")
reviewed_data = poll_for_review(
document_id=result["document_id"],
api_key=API_KEY,
poll_interval=10, # Check every 10 seconds
timeout=600 # Give up after 10 minutes
)
if reviewed_data:
process_invoice(reviewed_data)
else:
handle_timeout(document_id)
else:
process_invoice(result["results"])
Node.js/TypeScript Example
Copy
interface ReviewResult {
reviewed_results: Record<string, any>;
is_reviewed: boolean;
}
async function pollForReview(
documentId: string,
apiKey: string,
baseUrl: string = "https://api.documind.com/api/v1",
pollInterval: number = 10000, // milliseconds
timeout: number = 300000 // milliseconds
): Promise<Record<string, any> | null> {
const startTime = Date.now();
let pollCount = 0;
while ((Date.now() - startTime) < timeout) {
pollCount++;
try {
const response = await fetch(
`${baseUrl}/data/extractions?document_id=${documentId}&limit=1`,
{
headers: {
'X-API-Key': apiKey
}
}
);
if (!response.ok) {
console.warn(`⚠️ API error: ${response.status}`);
await new Promise(resolve => setTimeout(resolve, pollInterval));
continue;
}
const data = await response.json();
if (!data.items || data.items.length === 0) {
console.warn(`⚠️ No extraction found for document ${documentId}`);
await new Promise(resolve => setTimeout(resolve, pollInterval));
continue;
}
const extraction = data.items[0];
if (extraction.is_reviewed) {
const elapsed = (Date.now() - startTime) / 1000;
console.log(`✓ Review completed after ${elapsed.toFixed(1)}s (${pollCount} polls)`);
return extraction.reviewed_results;
}
console.log(`⏳ Waiting for review (poll #${pollCount}, ${pollInterval/1000}s interval)...`);
await new Promise(resolve => setTimeout(resolve, pollInterval));
} catch (error) {
console.error(`⚠️ Poll error: ${error}`);
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
}
console.error(`❌ Timeout after ${timeout/1000}s`);
return null;
}
// Usage
const result = await extractData(documentId, schema);
if (result.needs_review) {
console.log("⚠️ Document needs human review");
const reviewedData = await pollForReview(
result.document_id,
process.env.API_KEY!,
"https://api.documind.com/api/v1",
10000, // 10 seconds
600000 // 10 minutes
);
if (reviewedData) {
await processInvoice(reviewedData);
} else {
await handleTimeout(documentId);
}
} else {
await processInvoice(result.results);
}
Advanced Polling Strategies
Exponential Backoff
Reduce API calls by increasing poll interval over time:Copy
import math
def poll_with_backoff(
document_id: str,
api_key: str,
initial_interval: int = 5,
max_interval: int = 60,
timeout: int = 600
) -> Optional[Dict]:
"""Poll with exponential backoff."""
headers = {"X-API-Key": api_key}
start_time = time.time()
poll_count = 0
while (time.time() - start_time) < timeout:
poll_count += 1
# Check review status
response = requests.get(
"https://api.documind.com/api/v1/data/extractions",
headers=headers,
params={"document_id": document_id, "limit": 1}
)
data = response.json()
if data["items"] and data["items"][0]["is_reviewed"]:
return data["items"][0]["reviewed_results"]
# Calculate next interval with exponential backoff
interval = min(
initial_interval * (2 ** (poll_count - 1)),
max_interval
)
print(f"⏳ Poll #{poll_count}, waiting {interval}s...")
time.sleep(interval)
return None
Exponential backoff is ideal when you don’t know how long reviews will take. It starts fast for quick reviews but backs off for longer ones.
Batch Polling
Poll multiple documents efficiently:Copy
from typing import List, Dict
def poll_batch_reviews(
document_ids: List[str],
api_key: str,
poll_interval: int = 15,
timeout: int = 600
) -> Dict[str, Optional[Dict]]:
"""
Poll multiple documents, returning results as they're reviewed.
Returns:
Dictionary mapping document_id -> reviewed_results
"""
headers = {"X-API-Key": api_key}
start_time = time.time()
remaining_ids = set(document_ids)
results = {}
while remaining_ids and (time.time() - start_time) < timeout:
# Query all remaining documents
response = requests.get(
"https://api.documind.com/api/v1/data/extractions",
headers=headers,
params={
"needs_review": True,
"limit": len(remaining_ids)
}
)
extractions = response.json()["items"]
# Check each document
for extraction in extractions:
doc_id = extraction["document_id"]
if doc_id in remaining_ids and extraction.get("is_reviewed"):
print(f"✓ Review completed for {doc_id}")
results[doc_id] = extraction["reviewed_results"]
remaining_ids.remove(doc_id)
if remaining_ids:
print(f"⏳ Waiting for {len(remaining_ids)} documents...")
time.sleep(poll_interval)
# Mark timed-out documents
for doc_id in remaining_ids:
results[doc_id] = None
return results
# Usage
doc_ids = ["uuid1", "uuid2", "uuid3"]
reviewed = poll_batch_reviews(doc_ids, API_KEY, poll_interval=15)
for doc_id, data in reviewed.items():
if data:
process_document(doc_id, data)
else:
handle_timeout(doc_id)
Concurrent Polling
Use asyncio for efficient concurrent polling:Copy
import asyncio
import aiohttp
from typing import List, Optional
async def async_poll_review(
session: aiohttp.ClientSession,
document_id: str,
api_key: str,
poll_interval: int = 10,
timeout: int = 300
) -> Optional[Dict]:
"""Async polling for a single document."""
headers = {"X-API-Key": api_key}
start_time = asyncio.get_event_loop().time()
while (asyncio.get_event_loop().time() - start_time) < timeout:
try:
async with session.get(
f"https://api.documind.com/api/v1/data/extractions",
headers=headers,
params={"document_id": document_id, "limit": 1}
) as response:
data = await response.json()
if data["items"] and data["items"][0]["is_reviewed"]:
return data["items"][0]["reviewed_results"]
await asyncio.sleep(poll_interval)
except Exception as e:
print(f"⚠️ Error polling {document_id}: {e}")
await asyncio.sleep(poll_interval)
return None
async def poll_multiple_documents(
document_ids: List[str],
api_key: str
) -> Dict[str, Optional[Dict]]:
"""Poll multiple documents concurrently."""
async with aiohttp.ClientSession() as session:
tasks = [
async_poll_review(session, doc_id, api_key)
for doc_id in document_ids
]
results = await asyncio.gather(*tasks)
return dict(zip(document_ids, results))
# Usage
document_ids = ["uuid1", "uuid2", "uuid3"]
results = asyncio.run(poll_multiple_documents(document_ids, API_KEY))
Best Practices
Choose Appropriate Poll Intervals
Choose Appropriate Poll Intervals
Balance responsiveness vs API usage:
| Interval | Use Case | API Calls/Hour |
|---|---|---|
| 5s | Real-time processing | 720 |
| 10s | Standard automation | 360 |
| 30s | Batch processing | 120 |
| 60s | Low-priority jobs | 60 |
10-15 second intervals work well for most automation scenarios.
Implement Proper Timeouts
Implement Proper Timeouts
Set realistic timeouts based on your review SLA:
Copy
# Production settings
REVIEW_TIMEOUT = {
"critical": 300, # 5 minutes
"standard": 600, # 10 minutes
"batch": 3600 # 1 hour
}
timeout = REVIEW_TIMEOUT[priority]
Handle Errors Gracefully
Handle Errors Gracefully
Don’t let transient errors stop your polling:
Copy
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
except requests.exceptions.Timeout:
print("⚠️ Request timeout, retrying...")
continue
except requests.exceptions.HTTPError as e:
if e.response.status_code >= 500:
print("⚠️ Server error, retrying...")
continue
else:
raise # Don't retry 4xx errors
Log Polling Activity
Log Polling Activity
Track polling for debugging and optimization:
Copy
import logging
logger = logging.getLogger(__name__)
def poll_for_review(document_id, ...):
logger.info(f"Starting poll for {document_id}")
while ...:
logger.debug(f"Poll attempt #{poll_count}")
if is_reviewed:
logger.info(f"Review completed in {elapsed}s")
return result
logger.warning(f"Poll timeout for {document_id}")
return None
Query Options
Filter by Document ID
Most efficient for single-document polling:Copy
GET /data/extractions?document_id=550e8400-e29b-41d4-a716-446655440000&limit=1
Filter by Status
For batch polling of pending reviews:Copy
GET /data/extractions?needs_review=true&is_reviewed=false&limit=50
Filter by Date Range
Poll for documents extracted in a specific timeframe:Copy
GET /data/extractions?created_after=2024-01-15T00:00:00Z&needs_review=true
Monitoring & Alerting
Track these metrics:Copy
class PollMetrics:
def __init__(self):
self.total_polls = 0
self.successful_reviews = 0
self.timeouts = 0
self.avg_review_time = []
def record_success(self, review_time: float):
self.successful_reviews += 1
self.avg_review_time.append(review_time)
def record_timeout(self):
self.timeouts += 1
def get_stats(self):
return {
"success_rate": self.successful_reviews / self.total_polls * 100,
"timeout_rate": self.timeouts / self.total_polls * 100,
"avg_review_time": sum(self.avg_review_time) / len(self.avg_review_time)
}
metrics = PollMetrics()
# Use in polling
if reviewed_data:
metrics.record_success(elapsed_time)
else:
metrics.record_timeout()
If timeout rate exceeds 20%, review your timeout settings or alert your review team about delays.
Complete Example
Here’s a production-ready polling implementation:Copy
import requests
import time
import logging
from typing import Optional, Dict
from dataclasses import dataclass
@dataclass
class PollConfig:
"""Configuration for polling behavior."""
poll_interval: int = 10 # seconds
timeout: int = 600 # seconds
backoff: bool = False # exponential backoff
max_interval: int = 60 # max backoff interval
class ReviewPoller:
"""Robust review polling with logging and metrics."""
def __init__(self, api_key: str, base_url: str = "https://api.documind.com/api/v1"):
self.api_key = api_key
self.base_url = base_url
self.logger = logging.getLogger(__name__)
self.headers = {"X-API-Key": api_key}
def poll(
self,
document_id: str,
config: PollConfig = PollConfig()
) -> Optional[Dict]:
"""
Poll for review completion with configurable behavior.
Returns reviewed_results or None if timeout.
"""
start_time = time.time()
poll_count = 0
self.logger.info(f"Starting poll for {document_id}")
while (time.time() - start_time) < config.timeout:
poll_count += 1
try:
# Query extraction status
response = requests.get(
f"{self.base_url}/data/extractions",
headers=self.headers,
params={"document_id": document_id, "limit": 1},
timeout=30
)
response.raise_for_status()
data = response.json()
if not data.get("items"):
self.logger.warning(f"No extraction found: {document_id}")
time.sleep(config.poll_interval)
continue
extraction = data["items"][0]
if extraction.get("is_reviewed"):
elapsed = time.time() - start_time
self.logger.info(
f"Review completed: {document_id} "
f"({elapsed:.1f}s, {poll_count} polls)"
)
return extraction.get("reviewed_results")
# Calculate next poll interval
if config.backoff:
interval = min(
config.poll_interval * (2 ** (poll_count - 1)),
config.max_interval
)
else:
interval = config.poll_interval
self.logger.debug(
f"Poll #{poll_count}, waiting {interval}s..."
)
time.sleep(interval)
except requests.exceptions.RequestException as e:
self.logger.error(f"Poll error: {e}")
time.sleep(config.poll_interval)
elapsed = time.time() - start_time
self.logger.warning(
f"Poll timeout: {document_id} ({elapsed:.1f}s, {poll_count} polls)"
)
return None
# Usage
poller = ReviewPoller(api_key=API_KEY)
# Extract data
response = requests.post(
f"https://api.documind.com/api/v1/extract/{document_id}",
headers={"X-API-Key": API_KEY, "Content-Type": "application/json"},
json={"schema": schema, "prompt": "Extract data"}
)
result = response.json()
if result["needs_review"]:
reviewed = poller.poll(
document_id=result["document_id"],
config=PollConfig(
poll_interval=10,
timeout=600,
backoff=True
)
)
if reviewed:
process_data(reviewed)
else:
handle_timeout(result["document_id"])