- Published on
- • 9 min read
Building Progressive Data Loading with Google ADK: Adding Custom FastAPI Endpoints
Building Progressive Data Loading with Google ADK: Adding Custom FastAPI Endpoints
Google ADK provides a FastAPI server out of the box for agent interactions. But when I needed to add data enrichment with progressive loading—think Clay.com's satisfying progressive data fills—I realised ADK's standard endpoints weren't enough. I needed custom endpoints that could handle background jobs and stream real-time progress updates.
The Challenge: Research That Takes Time
My agent helps users find records. After searching, we have a list of 10-20 potential entries, but they're just basic records—company name, contact info, maybe a website URL. To make good decisions, users need enriched data: images, pricing, certifications, detailed profiles.
Getting this data requires research across multiple sources. For each record:
- Research public web presence
- Extract relevant images and documentation
- Parse pricing information
- Identify certifications and requirements
This takes 5-15 seconds per record. For 20 records, that's up to 5 minutes. Users can't wait with a frozen loading spinner.
The Solution: Progressive Loading with SSE
Server-Sent Events (SSE) is a standard HTTP protocol for pushing real-time updates from server to client over a single long-lived connection. Unlike WebSockets (which are bidirectional), SSE is unidirectional—perfect for streaming progress updates where the client only needs to receive data, not send it back.
I built a system inspired by Clay.com's data enrichment UI:
- Start the enrichment job in the background
- Stream results to the frontend as each record completes
- Users see data populate progressively, not all-or-nothing
Architecture: Custom Endpoints on ADK's FastAPI Server
ADK generates a FastAPI server with standard agent endpoints. The key insight is that you can extend it with custom routers:
# runner_config.py
from google.adk.cli.fast_api import get_fast_api_app
# Create ADK's FastAPI app
app = get_fast_api_app(
agents_dir="my_agent",
web=False,
allow_origins=allowed_origins,
session_service_uri=database_url
)
# Add custom enrichment endpoints
from enrichment.endpoints import router as enrichment_router
app.include_router(enrichment_router)
print("API Server: Enrichment endpoints registered at /api/enrich")
This gives me the best of both worlds:
- ADK's standard endpoints for agent conversations
- Custom endpoints for background jobs and streaming
The Enrichment Endpoint Architecture
I built three endpoints that work together:
1. Start Enrichment Job (POST)
# enrichment/endpoints.py
from fastapi import APIRouter
router = APIRouter(prefix="/api", tags=["enrichment"])
@router.post("/enrich")
async def start_enrichment(request: EnrichmentRequest):
"""
Start a background enrichment job
Returns job_id and SSE stream URL for progress tracking
"""
# Generate unique job ID
job_id = f"enrich_job_{uuid.uuid4().hex[:8]}"
# Create job
job = EnrichmentJob(
job_id=job_id,
record_ids=request.record_ids,
records=request.records
)
# Store job (in-memory for now, Redis later)
enrichment_jobs[job_id] = job
# Start background task
asyncio.create_task(_run_enrichment(job))
return {
"job_id": job_id,
"status": "processing",
"total_records": len(request.record_ids),
"estimated_time_seconds": len(request.record_ids) * 2, # placeholder
"sse_url": f"/api/enrich/{job_id}/stream"
}
This endpoint:
- Accepts a list of records to enrich
- Creates a background job with unique ID
- Returns immediately with SSE streaming URL
- Doesn't block the HTTP request
2. Stream Progress Updates (SSE)
I used sse_starlette, a lightweight library that adds Server-Sent Events support to FastAPI. It provides EventSourceResponse, which handles the SSE protocol details—keeping connections alive, formatting events, and managing client disconnections. Install it with pip install sse-starlette.
from sse_starlette.sse import EventSourceResponse
@router.get("/enrich/{job_id}/stream")
async def stream_enrichment_progress(job_id: str):
"""
SSE stream of enrichment progress
Sends real-time updates as each record completes enrichment
"""
async def event_generator():
"""Generate SSE events for enrichment progress"""
job = enrichment_jobs[job_id]
last_completed = 0
while job.status == "processing":
# Check for newly completed records
current_completed = len(job.results)
if current_completed > last_completed:
# Send progress events for new results
for result in job.results[last_completed:]:
yield {
"event": "progress",
"data": {
"record_id": result.get("record_id"),
"status": result.get("status"),
"images": result.get("images", []),
"price_range": result.get("price_range"),
"progress": f"{job.progress['completed']}/{job.progress['total']}"
}
}
last_completed = current_completed
# Poll every 500ms for new results
await asyncio.sleep(0.5)
# Send completion event
yield {
"event": "complete",
"data": {
"job_id": job_id,
"total": job.progress["total"],
"successful": job.progress["completed"],
"failed": job.progress["failed"]
}
}
return EventSourceResponse(event_generator())
This is where the majority of the work happens:
- Server-Sent Events (SSE) maintain a persistent HTTP connection from server to client
- Poll the job every 500ms for new results
- Stream each completed record immediately
- Frontend updates progressively as data arrives
3. Status Polling Endpoint (Optional Fallback)
@router.get("/enrich-records/{job_id}/status")
async def get_enrichment_status(job_id: str):
"""
Poll-based status check (alternative to SSE)
Returns current job status and all results so far
"""
if job_id not in enrichment_jobs:
raise HTTPException(status_code=404, detail="Job not found")
job = enrichment_jobs[job_id]
return EnrichmentStatusResponse(
job_id=job_id,
status=job.status,
progress=job.progress,
results=job.results
)
This provides a RESTful alternative to SSE streaming. While SSE is preferred for real-time updates, polling is useful in certain scenarios:
When polling makes sense:
- Corporate firewalls or proxies that block long-lived connections
- Client libraries without EventSource support (some mobile frameworks, older browsers)
- Testing and debugging with tools like curl or Postman
- Intermittent network conditions where reconnecting SSE is problematic
For this implementation, SSE provides the best user experience, but the polling endpoint ensures the system works everywhere.
The Background Job Processing
The background job is where the actual enrichment happens. It's started with asyncio.create_task() from the POST endpoint, which allows it to run independently without blocking the HTTP response.
The key insight: instead of waiting for all records to finish before returning results, we stream them progressively using Python's asyncio.as_completed().
async def _run_enrichment(job: EnrichmentJob):
"""
Background task to run enrichment
Updates job progress as records complete
"""
# Run enrichment with streaming
async for result in enrichment_service.enrich_records_streaming(
records=job.records,
max_concurrent=10,
timeout_per_record=15
):
# Update job progress immediately
job.update_progress(
record_id=result.get("record_id"),
result=result
)
# Mark job complete
job.mark_complete()
# Clean up after 1 hour
await asyncio.sleep(3600)
del enrichment_jobs[job.job_id]
What's happening here:
- Streaming consumption:
async forconsumes results as they arrive, not all at once - Immediate updates: Each result updates the job's shared state immediately
- SSE picks up changes: The SSE endpoint polls
job.resultsand streams new entries to the client - Automatic cleanup: Jobs are deleted after 1 hour to prevent memory leaks
The Streaming Enrichment Service
The service is where the magic of progressive loading happens. Instead of using asyncio.gather() (which waits for all tasks to complete), we use asyncio.as_completed() to yield results as soon as each one finishes.
# enrichment/service.py
class EnrichmentService:
async def enrich_records_streaming(
self,
records: List[Dict[str, Any]],
max_concurrent: int = 10,
timeout_per_record: int = 15
) -> AsyncGenerator[Dict[str, Any], None]:
"""
Enrich records with streaming progress updates
Yields enrichment results as each record completes
"""
async with aiohttp.ClientSession() as session:
# Create tasks for all records
tasks = [
self._enrich_single_record(session, record, timeout)
for record in records
]
# Use as_completed to yield results as they finish
# This is the key to progressive loading!
for coro in asyncio.as_completed(tasks):
result = await coro
yield result # Stream result immediately
Why asyncio.as_completed() is critical:
asyncio.as_completed() yields results in completion order, not submission order. This means:
- Fast records appear first: A record that takes 2 seconds shows up before one that takes 10 seconds
- No waiting for stragglers: Users see results immediately, even if some records are still processing
- Perceived performance boost: The UI feels responsive because data populates quickly
Frontend Integration
The frontend connects to the SSE stream and updates the UI progressively:
// Start enrichment job
const response = await fetch('/api/enrich-records', {
method: 'POST',
body: JSON.stringify({
user_id: userId,
session_id: sessionId,
record_ids: selectedIds,
records: selectedrecords
})
});
const { job_id, sse_url } = await response.json();
// Connect to SSE stream
const eventSource = new EventSource(sse_url);
eventSource.addEventListener('progress', (event) => {
const data = JSON.parse(event.data);
// Update UI for this record
updaterecordCard(data.record_id, {
images: data.images,
priceRange: data.price_range
});
// Update progress bar
updateProgress(data.progress);
});
eventSource.addEventListener('complete', (event) => {
const data = JSON.parse(event.data);
console.log(`Enrichment complete: ${data.successful}/${data.total} successful`);
eventSource.close();
});
Data Models for Type Safety
Pydantic models ensure type safety across the API:
# enrichment/models.py
from pydantic import BaseModel
class EnrichmentRequest(BaseModel):
"""Request model for starting an enrichment job"""
user_id: str
session_id: str
record_ids: List[str]
records: List[Dict[str, Any]]
class EnrichmentProgress(BaseModel):
"""Progress update for a single record"""
record_id: str
status: str # 'completed', 'failed', 'processing'
images: Optional[List[str]] = None
price_range: Optional[str] = None
error: Optional[str] = None
progress: str # e.g., "15/20"
class EnrichmentJob:
"""In-memory job storage for enrichment progress tracking"""
def __init__(self, job_id: str, record_ids: List[str], records: List[Dict]):
self.job_id = job_id
self.record_ids = record_ids
self.status = "processing"
self.results = []
self.progress = {
"total": len(record_ids),
"completed": 0,
"failed": 0
}
Production Considerations
Job Storage: In-Memory vs Redis
Phase 1: In-Memory Storage
For the initial implementation, I'm using a simple Python dictionary to store job state in the server's memory. This is the fastest option for prototyping and works perfectly for single-instance deployments:
# In-memory job storage (simple but limited)
enrichment_jobs: Dict[str, EnrichmentJob] = {}
Limitations of in-memory storage:
- No persistence: All jobs disappear when the server restarts (except being preserved in ADK session state)
- Can't handle heavy load: Since the Docker container runs a single instance, all enrichment jobs compete for the same CPU and memory resources with the ADK agent
- Memory leaks: Need manual cleanup to prevent unbounded growth
Phase 2: Redis for Production
Redis is an in-memory data store that acts like a database but lives entirely in RAM for speed. Unlike a Python dictionary that exists only in one process, Redis runs as a separate service that multiple API servers can share.
For production, I'll migrate to Redis for several critical advantages:
Why Redis?
- Persistence: Job state survives server restarts and ECS deployments
- Resource isolation: Offload job storage to a separate Redis container, freeing up memory in the main Docker container for the ADK agent
- Built-in TTL: Redis automatically deletes old jobs after a timeout (no manual cleanup)
- Future-proofing: When ready to scale, adding more containers is straightforward since they can all share the same Redis instance
How it works: When a job is created, instead of storing it in a Python dict, it's serialized to JSON and saved to Redis with an expiration time (e.g., 1 hour). All API instances read and write to the same Redis server, so they stay in sync. When the TTL expires, Redis automatically removes the job—no memory leak risks.
Migration path:
The nice part about this architecture is that switching from in-memory to Redis is straightforward. The job storage is abstracted behind simple get and set operations, so the endpoint code doesn't need to change—just swap the storage backend.
Concurrency Control
The service limits concurrent operations for efficiency:
# Create session with concurrency limits
connector = aiohttp.TCPConnector(
limit=max_concurrent, # Total concurrent operations
limit_per_host=3, # Per-host limit for external requests
ttl_dns_cache=300
)
Error Handling
Each record enrichment is isolated—failures don't break the entire job:
async for result in enrichment_service.enrich_records_streaming(...):
try:
job.update_progress(result)
except Exception as e:
# Log error but continue processing other records
logger.error(f"Failed to process record: {e}")
job.update_progress({
"record_id": record_id,
"status": "failed",
"error": str(e)
})
The Results: Clay.com-Style UX on a Custom Stack
User Experience:
- Instant feedback: Job starts immediately, no waiting
- Progressive loading: See results as they arrive, not all at once
- No timeouts: Long jobs (3+ minutes) work reliably
- Error transparency: Failed records don't break the entire job
Technical Benefits:
- Efficient concurrency: Process 10+ records in parallel
- Resilient to failures: Individual failures don't crash the job
- Clean separation: Background jobs don't block agent conversations
- Type safety: Pydantic models catch errors at the API boundary
Integration with ADK:
- Seamless: Custom endpoints coexist with ADK's standard endpoints
- Same server: No need for separate microservices
- Shared middleware: CORS and auth work across all endpoints
- Single deployment: One Docker container for everything
TL;DR: The Key Takeaway
Google ADK provides a FastAPI server for agent interactions, but it's not limited to its standard endpoints. By adding custom FastAPI routers, I built Clay.com-style progressive data loading on top of ADK's infrastructure. The result is a seamless user experience for long-running data enrichment jobs, all running on the same server as my AI agents.
The pattern is simple:
- POST endpoint: Start background job, return job_id
- SSE endpoint: Stream progress updates in real-time
- Background task: Process items with
asyncio.as_completed() - Job storage: In-memory for MVP, Redis for production
This is part 5 of my series reflecting on building production AI agents with Google ADK. Check out the other posts in the series:
