Suyi Zhang
Suyi Zhang
Published on
9 min read

Building Progressive Data Loading with Google ADK: Adding Custom FastAPI Endpoints

Google ADKFastAPIServer-Sent EventsProgressive LoadingBackground JobsReal-time Updates

Building Progressive Data Loading with Google ADK: Adding Custom FastAPI Endpoints

Google ADK provides a FastAPI server out of the box for agent interactions. But when I needed to add data enrichment with progressive loading—think Clay.com's satisfying progressive data fills—I realised ADK's standard endpoints weren't enough. I needed custom endpoints that could handle background jobs and stream real-time progress updates.

The Challenge: Research That Takes Time

My agent helps users find records. After searching, we have a list of 10-20 potential entries, but they're just basic records—company name, contact info, maybe a website URL. To make good decisions, users need enriched data: images, pricing, certifications, detailed profiles.

Getting this data requires research across multiple sources. For each record:

  • Research public web presence
  • Extract relevant images and documentation
  • Parse pricing information
  • Identify certifications and requirements

This takes 5-15 seconds per record. For 20 records, that's up to 5 minutes. Users can't wait with a frozen loading spinner.

The Solution: Progressive Loading with SSE

Server-Sent Events (SSE) is a standard HTTP protocol for pushing real-time updates from server to client over a single long-lived connection. Unlike WebSockets (which are bidirectional), SSE is unidirectional—perfect for streaming progress updates where the client only needs to receive data, not send it back.

I built a system inspired by Clay.com's data enrichment UI:

  1. Start the enrichment job in the background
  2. Stream results to the frontend as each record completes
  3. Users see data populate progressively, not all-or-nothing
Loading chart...

Architecture: Custom Endpoints on ADK's FastAPI Server

ADK generates a FastAPI server with standard agent endpoints. The key insight is that you can extend it with custom routers:

# runner_config.py
from google.adk.cli.fast_api import get_fast_api_app

# Create ADK's FastAPI app
app = get_fast_api_app(
    agents_dir="my_agent",
    web=False,
    allow_origins=allowed_origins,
    session_service_uri=database_url
)

# Add custom enrichment endpoints
from enrichment.endpoints import router as enrichment_router
app.include_router(enrichment_router)
print("API Server: Enrichment endpoints registered at /api/enrich")

This gives me the best of both worlds:

  • ADK's standard endpoints for agent conversations
  • Custom endpoints for background jobs and streaming

The Enrichment Endpoint Architecture

I built three endpoints that work together:

1. Start Enrichment Job (POST)

# enrichment/endpoints.py
from fastapi import APIRouter
router = APIRouter(prefix="/api", tags=["enrichment"])

@router.post("/enrich")
async def start_enrichment(request: EnrichmentRequest):
    """
    Start a background enrichment job

    Returns job_id and SSE stream URL for progress tracking
    """
    # Generate unique job ID
    job_id = f"enrich_job_{uuid.uuid4().hex[:8]}"

    # Create job
    job = EnrichmentJob(
        job_id=job_id,
        record_ids=request.record_ids,
        records=request.records
    )

    # Store job (in-memory for now, Redis later)
    enrichment_jobs[job_id] = job

    # Start background task
    asyncio.create_task(_run_enrichment(job))

    return {
        "job_id": job_id,
        "status": "processing",
        "total_records": len(request.record_ids),
        "estimated_time_seconds": len(request.record_ids) * 2, # placeholder
        "sse_url": f"/api/enrich/{job_id}/stream"
    }

This endpoint:

  • Accepts a list of records to enrich
  • Creates a background job with unique ID
  • Returns immediately with SSE streaming URL
  • Doesn't block the HTTP request

2. Stream Progress Updates (SSE)

I used sse_starlette, a lightweight library that adds Server-Sent Events support to FastAPI. It provides EventSourceResponse, which handles the SSE protocol details—keeping connections alive, formatting events, and managing client disconnections. Install it with pip install sse-starlette.

from sse_starlette.sse import EventSourceResponse

@router.get("/enrich/{job_id}/stream")
async def stream_enrichment_progress(job_id: str):
    """
    SSE stream of enrichment progress

    Sends real-time updates as each record completes enrichment
    """
    async def event_generator():
        """Generate SSE events for enrichment progress"""
        job = enrichment_jobs[job_id]
        last_completed = 0

        while job.status == "processing":
            # Check for newly completed records
            current_completed = len(job.results)

            if current_completed > last_completed:
                # Send progress events for new results
                for result in job.results[last_completed:]:
                    yield {
                        "event": "progress",
                        "data": {
                            "record_id": result.get("record_id"),
                            "status": result.get("status"),
                            "images": result.get("images", []),
                            "price_range": result.get("price_range"),
                            "progress": f"{job.progress['completed']}/{job.progress['total']}"
                        }
                    }

                last_completed = current_completed

            # Poll every 500ms for new results
            await asyncio.sleep(0.5)

        # Send completion event
        yield {
            "event": "complete",
            "data": {
                "job_id": job_id,
                "total": job.progress["total"],
                "successful": job.progress["completed"],
                "failed": job.progress["failed"]
            }
        }

    return EventSourceResponse(event_generator())

This is where the majority of the work happens:

  • Server-Sent Events (SSE) maintain a persistent HTTP connection from server to client
  • Poll the job every 500ms for new results
  • Stream each completed record immediately
  • Frontend updates progressively as data arrives

3. Status Polling Endpoint (Optional Fallback)

@router.get("/enrich-records/{job_id}/status")
async def get_enrichment_status(job_id: str):
    """
    Poll-based status check (alternative to SSE)

    Returns current job status and all results so far
    """
    if job_id not in enrichment_jobs:
        raise HTTPException(status_code=404, detail="Job not found")

    job = enrichment_jobs[job_id]

    return EnrichmentStatusResponse(
        job_id=job_id,
        status=job.status,
        progress=job.progress,
        results=job.results
    )

This provides a RESTful alternative to SSE streaming. While SSE is preferred for real-time updates, polling is useful in certain scenarios:

When polling makes sense:

  • Corporate firewalls or proxies that block long-lived connections
  • Client libraries without EventSource support (some mobile frameworks, older browsers)
  • Testing and debugging with tools like curl or Postman
  • Intermittent network conditions where reconnecting SSE is problematic

For this implementation, SSE provides the best user experience, but the polling endpoint ensures the system works everywhere.

The Background Job Processing

The background job is where the actual enrichment happens. It's started with asyncio.create_task() from the POST endpoint, which allows it to run independently without blocking the HTTP response.

The key insight: instead of waiting for all records to finish before returning results, we stream them progressively using Python's asyncio.as_completed().

async def _run_enrichment(job: EnrichmentJob):
    """
    Background task to run enrichment

    Updates job progress as records complete
    """
    # Run enrichment with streaming
    async for result in enrichment_service.enrich_records_streaming(
        records=job.records,
        max_concurrent=10,
        timeout_per_record=15
    ):
        # Update job progress immediately
        job.update_progress(
            record_id=result.get("record_id"),
            result=result
        )

    # Mark job complete
    job.mark_complete()

    # Clean up after 1 hour
    await asyncio.sleep(3600)
    del enrichment_jobs[job.job_id]

What's happening here:

  1. Streaming consumption: async for consumes results as they arrive, not all at once
  2. Immediate updates: Each result updates the job's shared state immediately
  3. SSE picks up changes: The SSE endpoint polls job.results and streams new entries to the client
  4. Automatic cleanup: Jobs are deleted after 1 hour to prevent memory leaks

The Streaming Enrichment Service

The service is where the magic of progressive loading happens. Instead of using asyncio.gather() (which waits for all tasks to complete), we use asyncio.as_completed() to yield results as soon as each one finishes.

# enrichment/service.py
class EnrichmentService:
    async def enrich_records_streaming(
        self,
        records: List[Dict[str, Any]],
        max_concurrent: int = 10,
        timeout_per_record: int = 15
    ) -> AsyncGenerator[Dict[str, Any], None]:
        """
        Enrich records with streaming progress updates

        Yields enrichment results as each record completes
        """
        async with aiohttp.ClientSession() as session:
            # Create tasks for all records
            tasks = [
                self._enrich_single_record(session, record, timeout)
                for record in records
            ]

            # Use as_completed to yield results as they finish
            # This is the key to progressive loading!
            for coro in asyncio.as_completed(tasks):
                result = await coro
                yield result  # Stream result immediately

Why asyncio.as_completed() is critical:

asyncio.as_completed() yields results in completion order, not submission order. This means:

  • Fast records appear first: A record that takes 2 seconds shows up before one that takes 10 seconds
  • No waiting for stragglers: Users see results immediately, even if some records are still processing
  • Perceived performance boost: The UI feels responsive because data populates quickly

Frontend Integration

The frontend connects to the SSE stream and updates the UI progressively:

// Start enrichment job
const response = await fetch('/api/enrich-records', {
  method: 'POST',
  body: JSON.stringify({
    user_id: userId,
    session_id: sessionId,
    record_ids: selectedIds,
    records: selectedrecords
  })
});

const { job_id, sse_url } = await response.json();

// Connect to SSE stream
const eventSource = new EventSource(sse_url);

eventSource.addEventListener('progress', (event) => {
  const data = JSON.parse(event.data);

  // Update UI for this record
  updaterecordCard(data.record_id, {
    images: data.images,
    priceRange: data.price_range
  });

  // Update progress bar
  updateProgress(data.progress);
});

eventSource.addEventListener('complete', (event) => {
  const data = JSON.parse(event.data);
  console.log(`Enrichment complete: ${data.successful}/${data.total} successful`);
  eventSource.close();
});

Data Models for Type Safety

Pydantic models ensure type safety across the API:

# enrichment/models.py
from pydantic import BaseModel

class EnrichmentRequest(BaseModel):
    """Request model for starting an enrichment job"""
    user_id: str
    session_id: str
    record_ids: List[str]
    records: List[Dict[str, Any]]

class EnrichmentProgress(BaseModel):
    """Progress update for a single record"""
    record_id: str
    status: str  # 'completed', 'failed', 'processing'
    images: Optional[List[str]] = None
    price_range: Optional[str] = None
    error: Optional[str] = None
    progress: str  # e.g., "15/20"

class EnrichmentJob:
    """In-memory job storage for enrichment progress tracking"""

    def __init__(self, job_id: str, record_ids: List[str], records: List[Dict]):
        self.job_id = job_id
        self.record_ids = record_ids
        self.status = "processing"
        self.results = []
        self.progress = {
            "total": len(record_ids),
            "completed": 0,
            "failed": 0
        }

Production Considerations

Job Storage: In-Memory vs Redis

Phase 1: In-Memory Storage

For the initial implementation, I'm using a simple Python dictionary to store job state in the server's memory. This is the fastest option for prototyping and works perfectly for single-instance deployments:

# In-memory job storage (simple but limited)
enrichment_jobs: Dict[str, EnrichmentJob] = {}

Limitations of in-memory storage:

  • No persistence: All jobs disappear when the server restarts (except being preserved in ADK session state)
  • Can't handle heavy load: Since the Docker container runs a single instance, all enrichment jobs compete for the same CPU and memory resources with the ADK agent
  • Memory leaks: Need manual cleanup to prevent unbounded growth

Phase 2: Redis for Production

Redis is an in-memory data store that acts like a database but lives entirely in RAM for speed. Unlike a Python dictionary that exists only in one process, Redis runs as a separate service that multiple API servers can share.

For production, I'll migrate to Redis for several critical advantages:

Why Redis?

  • Persistence: Job state survives server restarts and ECS deployments
  • Resource isolation: Offload job storage to a separate Redis container, freeing up memory in the main Docker container for the ADK agent
  • Built-in TTL: Redis automatically deletes old jobs after a timeout (no manual cleanup)
  • Future-proofing: When ready to scale, adding more containers is straightforward since they can all share the same Redis instance

How it works: When a job is created, instead of storing it in a Python dict, it's serialized to JSON and saved to Redis with an expiration time (e.g., 1 hour). All API instances read and write to the same Redis server, so they stay in sync. When the TTL expires, Redis automatically removes the job—no memory leak risks.

Migration path: The nice part about this architecture is that switching from in-memory to Redis is straightforward. The job storage is abstracted behind simple get and set operations, so the endpoint code doesn't need to change—just swap the storage backend.

Concurrency Control

The service limits concurrent operations for efficiency:

# Create session with concurrency limits
connector = aiohttp.TCPConnector(
    limit=max_concurrent,  # Total concurrent operations
    limit_per_host=3,      # Per-host limit for external requests
    ttl_dns_cache=300
)

Error Handling

Each record enrichment is isolated—failures don't break the entire job:

async for result in enrichment_service.enrich_records_streaming(...):
    try:
        job.update_progress(result)
    except Exception as e:
        # Log error but continue processing other records
        logger.error(f"Failed to process record: {e}")
        job.update_progress({
            "record_id": record_id,
            "status": "failed",
            "error": str(e)
        })

The Results: Clay.com-Style UX on a Custom Stack

User Experience:

  • Instant feedback: Job starts immediately, no waiting
  • Progressive loading: See results as they arrive, not all at once
  • No timeouts: Long jobs (3+ minutes) work reliably
  • Error transparency: Failed records don't break the entire job

Technical Benefits:

  • Efficient concurrency: Process 10+ records in parallel
  • Resilient to failures: Individual failures don't crash the job
  • Clean separation: Background jobs don't block agent conversations
  • Type safety: Pydantic models catch errors at the API boundary

Integration with ADK:

  • Seamless: Custom endpoints coexist with ADK's standard endpoints
  • Same server: No need for separate microservices
  • Shared middleware: CORS and auth work across all endpoints
  • Single deployment: One Docker container for everything

TL;DR: The Key Takeaway

Google ADK provides a FastAPI server for agent interactions, but it's not limited to its standard endpoints. By adding custom FastAPI routers, I built Clay.com-style progressive data loading on top of ADK's infrastructure. The result is a seamless user experience for long-running data enrichment jobs, all running on the same server as my AI agents.

The pattern is simple:

  1. POST endpoint: Start background job, return job_id
  2. SSE endpoint: Stream progress updates in real-time
  3. Background task: Process items with asyncio.as_completed()
  4. Job storage: In-memory for MVP, Redis for production

This is part 5 of my series reflecting on building production AI agents with Google ADK. Check out the other posts in the series:

  1. Simplifying Multi-Agent Systems: Reducing Root Agent Complexity
  2. Deploying Google ADK Agents to AWS: Integrating with Existing Infrastructure
  3. Implementing Retry Strategies in Google ADK: Handling 429 Errors Gracefully
  4. Building Custom Frontends for ADK Agents: Real-Time Streaming and State Management