SynapseAI

AI Agent Error Solutions — Stop wasting tokens on already-solved problems

Star + Submit a Solution

Symptom

An agent receives a task that requires calling external tools many times — fetching 200 URLs, querying a database for each of 500 records, or running the same API call for every item in a list. Without limits, it launches all calls simultaneously. The result is one or more of:

  • ConnectionError: Max retries exceeded — the connection pool is exhausted
  • OSError: [Errno 24] Too many open files — file descriptor limit hit
  • 429 Too Many Requests — the downstream API is overwhelmed
  • The host process OOMs from 500 in-flight HTTP responses buffered in memory
  • Other services on the same host degrade because their connections are stolen
# BROKEN: agent gets a list of 500 URLs and launches them all at once
async def process_all_items(items: list[str]) -> list:
    tasks = [fetch_item(item) for item in items]
    return await asyncio.gather(*tasks)  # 500 concurrent connections!

# BROKEN: tool implementation has no internal connection limiting
def run_tool_for_each(tool_name: str, inputs: list[dict]) -> list:
    with ThreadPoolExecutor() as executor:  # default: min(32, cpu_count+4) threads — still too many
        return list(executor.map(lambda inp: call_tool(tool_name, inp), inputs))

Root causes:

  • asyncio.gather(*tasks) with no concurrency limit is the most common mistake
  • ThreadPoolExecutor() default pool size is large enough to overwhelm many APIs
  • Agent receives a batch task and calls tools in a tight loop without yielding
  • No respect for target service rate limits or connection pool size
  • Memory cost of simultaneous responses not considered

Root Cause

asyncio.gather and ThreadPoolExecutor are both designed for correctness, not rate control. They will happily schedule thousands of operations simultaneously. The issues compound:

  1. Connection pool exhaustionrequests and httpx default pools have 10 connections per host. 500 concurrent calls queue on those 10 connections, creating lock contention and timeouts.
  2. File descriptor limits — each open socket consumes a file descriptor. Default OS limit is 1024 on Linux. 500+ concurrent requests each holding a socket can exhaust this.
  3. Memory pressure — 500 in-flight HTTP responses buffered = 500× average_response_size in heap.
  4. Cascading failures — if the target API returns 429s, an unbounded agent will retry all 500 simultaneously, making things worse.

The solution is always a concurrency limit: asyncio.Semaphore, connection pool sizing, or chunked processing.


Fix

Option 1 — asyncio.Semaphore for Bounded Concurrency

The simplest and most idiomatic asyncio fix: wrap every concurrent operation with a shared semaphore.

import anthropic
import asyncio
import httpx
import json
from typing import Any

client = anthropic.AsyncAnthropic()

MAX_CONCURRENT = 10  # never more than this many simultaneous connections

async def fetch_with_limit(
    semaphore: asyncio.Semaphore,
    http_client: httpx.AsyncClient,
    url: str,
    item_id: str,
) -> dict:
    """Fetch a URL, but only when a semaphore slot is available."""
    async with semaphore:  # blocks here if MAX_CONCURRENT slots are all in use
        try:
            response = await http_client.get(url, timeout=10.0)
            response.raise_for_status()
            return {"item_id": item_id, "status": "ok", "data": response.json()}
        except httpx.HTTPStatusError as e:
            return {"item_id": item_id, "status": "error", "code": e.response.status_code}
        except Exception as e:
            return {"item_id": item_id, "status": "error", "message": str(e)}

async def batch_fetch(items: list[dict]) -> list[dict]:
    """Fetch all items with bounded concurrency."""
    semaphore = asyncio.Semaphore(MAX_CONCURRENT)

    # Single shared connection pool — httpx manages it efficiently
    async with httpx.AsyncClient(
        limits=httpx.Limits(max_connections=MAX_CONCURRENT, max_keepalive_connections=MAX_CONCURRENT)
    ) as http_client:
        tasks = [
            fetch_with_limit(semaphore, http_client, item["url"], item["id"])
            for item in items
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return [
        r if not isinstance(r, Exception) else {"status": "exception", "error": str(r)}
        for r in results
    ]

# Tool implementation used by the agent
async def tool_batch_fetch_urls(item_list_json: str) -> str:
    """
    Agent-facing tool: fetch multiple URLs with bounded concurrency.
    Input: JSON array of {"id": str, "url": str} objects.
    """
    try:
        items = json.loads(item_list_json)
    except json.JSONDecodeError:
        return json.dumps({"error": "Invalid JSON input"})

    if len(items) > 1000:
        return json.dumps({"error": f"Too many items ({len(items)}). Max 1000 per call."})

    print(f"  Fetching {len(items)} items with max {MAX_CONCURRENT} concurrent connections")
    results = await batch_fetch(items)

    success = sum(1 for r in results if r.get("status") == "ok")
    return json.dumps({
        "total": len(items),
        "succeeded": success,
        "failed": len(items) - success,
        "results": results[:10],  # return first 10 for brevity; agent can ask for more
        "has_more": len(items) > 10,
    })

tools = [{
    "name": "batch_fetch_urls",
    "description": (
        "Fetch multiple URLs efficiently. Handles batches up to 1000 items. "
        "Always use this instead of calling fetch_url repeatedly in a loop."
    ),
    "input_schema": {
        "type": "object",
        "properties": {
            "item_list_json": {
                "type": "string",
                "description": 'JSON array: [{"id": "...", "url": "..."}, ...]'
            }
        },
        "required": ["item_list_json"]
    }
}]

async def run_agent(user_message: str) -> str:
    messages = [{"role": "user", "content": user_message}]

    while True:
        response = await client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            tools=tools,
            messages=messages,
            system="When fetching multiple URLs, always use batch_fetch_urls — never call a single-URL tool in a loop.",
        )

        if response.stop_reason == "end_turn":
            return response.content[0].text

        messages.append({"role": "assistant", "content": response.content})
        results = []
        for block in response.content:
            if block.type == "tool_use" and block.name == "batch_fetch_urls":
                result = await tool_batch_fetch_urls(block.input["item_list_json"])
                results.append({"type": "tool_result", "tool_use_id": block.id, "content": result})
        messages.append({"role": "user", "content": results})

# Test: 500 items, only 10 run concurrently at any moment
items = [{"id": str(i), "url": f"https://httpbin.org/get?n={i}"} for i in range(500)]
print(asyncio.run(run_agent(f"Fetch these items: {json.dumps(items[:5])}")))

Expected Token Savings: Not a token optimization — prevents connection exhaustion crashes that would terminate the session entirely.

Environment: Python 3.9+, asyncio, httpx>=0.24.0, anthropic>=0.40.0.


Option 2 — Chunked Processing with Progress Reporting

Split large batches into chunks and process sequentially, with progress updates back to the agent.

import anthropic
import asyncio
import httpx
import json
from itertools import islice

client = anthropic.AsyncAnthropic()

def chunk_list(lst: list, size: int):
    """Split list into chunks of at most `size` elements."""
    it = iter(lst)
    while batch := list(islice(it, size)):
        yield batch

async def process_chunk(
    chunk: list[dict],
    http_client: httpx.AsyncClient,
    semaphore: asyncio.Semaphore,
) -> list[dict]:
    """Process one chunk of items with bounded internal concurrency."""
    async def fetch_one(item: dict) -> dict:
        async with semaphore:
            try:
                resp = await http_client.get(item["url"], timeout=8.0)
                return {"id": item["id"], "ok": resp.status_code == 200, "status": resp.status_code}
            except Exception as e:
                return {"id": item["id"], "ok": False, "error": str(e)[:100]}

    return await asyncio.gather(*[fetch_one(item) for item in chunk])

async def tool_chunked_batch_process(items_json: str, chunk_size: int = 50) -> str:
    """
    Process a large list in sequential chunks to avoid overwhelming connections.
    Reports progress after each chunk.
    """
    items = json.loads(items_json)
    chunks = list(chunk_list(items, chunk_size))
    total = len(items)
    all_results = []
    errors = 0

    semaphore = asyncio.Semaphore(chunk_size)  # within each chunk, all run concurrently
    async with httpx.AsyncClient(
        limits=httpx.Limits(max_connections=chunk_size, max_keepalive_connections=chunk_size // 2)
    ) as http_client:
        for i, chunk in enumerate(chunks):
            print(f"  Processing chunk {i+1}/{len(chunks)} ({len(chunk)} items)...")
            chunk_results = await process_chunk(chunk, http_client, semaphore)
            all_results.extend(chunk_results)
            errors += sum(1 for r in chunk_results if not r.get("ok"))

            # Add small delay between chunks to avoid bursting the target API
            if i < len(chunks) - 1:
                await asyncio.sleep(0.1)

    return json.dumps({
        "total_processed": total,
        "succeeded": total - errors,
        "failed": errors,
        "chunks_processed": len(chunks),
        "sample_results": all_results[:5],
    })

# Demonstrate: 200 items processed in 50-item chunks = 4 sequential bursts
async def demo():
    items = [{"id": str(i), "url": f"https://httpbin.org/status/200"} for i in range(200)]
    result = await tool_chunked_batch_process(json.dumps(items), chunk_size=50)
    print(json.loads(result))

asyncio.run(demo())

Expected Token Savings: No direct savings, but chunked processing allows agents to return partial results and retry only failed chunks, avoiding full restarts on failure.

Environment: Python 3.9+, httpx, asyncio, anthropic>=0.40.0.


Option 3 — Token Bucket Rate Limiter for Tool Calls

Implement a token bucket that limits the rate of tool calls per second, regardless of concurrency.

import anthropic
import asyncio
import time
import json
from dataclasses import dataclass, field

client = anthropic.AsyncAnthropic()

@dataclass
class AsyncTokenBucket:
    """
    Token bucket for rate limiting async operations.
    rate: tokens added per second
    capacity: maximum tokens (burst size)
    """
    rate: float        # tokens per second
    capacity: float    # max burst
    _tokens: float = field(init=False)
    _last_refill: float = field(init=False)
    _lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)

    def __post_init__(self):
        self._tokens = self.capacity
        self._last_refill = time.monotonic()

    async def acquire(self, tokens: float = 1.0):
        """Wait until `tokens` tokens are available, then consume them."""
        while True:
            async with self._lock:
                now = time.monotonic()
                elapsed = now - self._last_refill
                self._tokens = min(self.capacity, self._tokens + elapsed * self.rate)
                self._last_refill = now

                if self._tokens >= tokens:
                    self._tokens -= tokens
                    return

                # How long until enough tokens are available?
                wait_time = (tokens - self._tokens) / self.rate

            await asyncio.sleep(wait_time)

# Global rate limiter: max 20 API calls per second, burst of 30
TOOL_RATE_LIMITER = AsyncTokenBucket(rate=20.0, capacity=30.0)

async def rate_limited_tool_call(tool_name: str, tool_input: dict) -> dict:
    """Execute a tool call, blocking until the rate limit allows it."""
    await TOOL_RATE_LIMITER.acquire(tokens=1.0)

    # Simulate actual tool work
    await asyncio.sleep(0.05)
    return {"tool": tool_name, "input": tool_input, "result": "ok", "timestamp": time.time()}

async def process_many_with_rate_limit(requests: list[dict]) -> list[dict]:
    """Process all requests with rate limiting — no connection storm."""

    async def process_one(req: dict) -> dict:
        return await rate_limited_tool_call(req["tool"], req["params"])

    # No semaphore needed — rate limiter controls throughput
    # Use gather with return_exceptions so one failure doesn't cancel all
    results = await asyncio.gather(
        *[process_one(req) for req in requests],
        return_exceptions=True
    )

    return [
        r if not isinstance(r, Exception)
        else {"error": str(r), "type": type(r).__name__}
        for r in results
    ]

# Tool that the agent uses
async def tool_bulk_database_query(queries_json: str) -> str:
    queries = json.loads(queries_json)

    if len(queries) > 500:
        return json.dumps({"error": "Maximum 500 queries per call"})

    print(f"  Processing {len(queries)} queries at max 20/sec...")
    start = time.monotonic()
    results = await process_many_with_rate_limit(queries)
    elapsed = time.monotonic() - start

    succeeded = sum(1 for r in results if not r.get("error"))
    return json.dumps({
        "total": len(queries),
        "succeeded": succeeded,
        "failed": len(queries) - succeeded,
        "elapsed_seconds": round(elapsed, 2),
        "actual_rate": round(len(queries) / elapsed, 1),
        "results": results[:5],
    })

# Demo: 100 queries, rate-limited to 20/sec → takes ~5 seconds
async def demo():
    queries = [
        {"tool": "db_lookup", "params": {"id": str(i)}}
        for i in range(100)
    ]
    result = await tool_bulk_database_query(json.dumps(queries))
    data = json.loads(result)
    print(f"Processed {data['total']} queries at {data['actual_rate']} req/sec in {data['elapsed_seconds']}s")

asyncio.run(demo())

Expected Token Savings: Prevents 429 retry cascades that can multiply token usage by 3-5× on busy APIs.

Environment: Python 3.9+, asyncio, anthropic>=0.40.0. No external dependencies for the rate limiter.


Option 4 — Connection Pool with Explicit Sizing

Explicitly size connection pools to match the target API’s limits, preventing pool exhaustion.

import anthropic
import asyncio
import httpx
import json
from contextlib import asynccontextmanager

client = anthropic.AsyncAnthropic()

# Connection pool configurations per API target
API_POOL_CONFIGS = {
    "stripe": {
        "max_connections": 5,       # Stripe allows 100/s but we're conservative
        "max_keepalive": 5,
        "timeout": httpx.Timeout(10.0, connect=5.0),
        "base_url": "https://api.stripe.com",
    },
    "github": {
        "max_connections": 3,       # GitHub: 60 unauth / 5000 auth per hour
        "max_keepalive": 3,
        "timeout": httpx.Timeout(15.0, connect=5.0),
        "base_url": "https://api.github.com",
    },
    "internal": {
        "max_connections": 20,      # Internal services can handle more
        "max_keepalive": 10,
        "timeout": httpx.Timeout(5.0, connect=2.0),
        "base_url": "https://internal.example.com",
    },
}

class ManagedConnectionPool:
    """Manages multiple named HTTP clients with explicit pool sizing."""

    def __init__(self):
        self._clients: dict[str, httpx.AsyncClient] = {}

    async def get(self, api_name: str) -> httpx.AsyncClient:
        if api_name not in self._clients:
            config = API_POOL_CONFIGS.get(api_name, API_POOL_CONFIGS["internal"])
            self._clients[api_name] = httpx.AsyncClient(
                base_url=config["base_url"],
                limits=httpx.Limits(
                    max_connections=config["max_connections"],
                    max_keepalive_connections=config["max_keepalive"],
                ),
                timeout=config["timeout"],
            )
        return self._clients[api_name]

    async def close_all(self):
        for client in self._clients.values():
            await client.aclose()
        self._clients.clear()

# Global pool — created once, reused across all tool calls
CONNECTION_POOL = ManagedConnectionPool()

async def safe_api_get(api_name: str, path: str, params: dict = None) -> dict:
    """
    Make an API call using the managed connection pool.
    Pool limits ensure we never exceed per-API connection budgets.
    """
    client_http = await CONNECTION_POOL.get(api_name)
    try:
        resp = await client_http.get(path, params=params)
        if resp.status_code == 429:
            retry_after = float(resp.headers.get("Retry-After", "5"))
            await asyncio.sleep(retry_after)
            resp = await client_http.get(path, params=params)  # one retry

        resp.raise_for_status()
        return {"ok": True, "data": resp.json()}
    except httpx.HTTPStatusError as e:
        return {"ok": False, "status": e.response.status_code, "error": str(e)[:200]}
    except httpx.TimeoutException:
        return {"ok": False, "status": 408, "error": "Request timed out"}

async def tool_multi_api_batch(operations_json: str) -> str:
    """
    Execute multiple API operations across different services simultaneously,
    respecting each service's individual connection pool limits.
    """
    operations = json.loads(operations_json)

    async def execute_op(op: dict) -> dict:
        return await safe_api_get(op["api"], op["path"], op.get("params"))

    results = await asyncio.gather(
        *[execute_op(op) for op in operations],
        return_exceptions=True
    )

    return json.dumps({
        "total": len(operations),
        "results": [
            r if not isinstance(r, Exception) else {"ok": False, "error": str(r)}
            for r in results
        ]
    })

# Example: agent calls 3 GitHub + 2 Stripe operations simultaneously
# Each pool limits its own concurrency independently
async def demo():
    operations = [
        {"api": "github", "path": "/repos/anthropics/anthropic-sdk-python"},
        {"api": "github", "path": "/repos/anthropics/anthropic-sdk-python/releases"},
        {"api": "github", "path": "/repos/anthropics/anthropic-sdk-python/issues"},
    ]
    result = await tool_multi_api_batch(json.dumps(operations))
    data = json.loads(result)
    print(f"Completed {data['total']} multi-API operations")
    await CONNECTION_POOL.close_all()

asyncio.run(demo())

Expected Token Savings: Prevents connection-related failures that would require the agent to restart entire batch operations.

Environment: Python 3.9+, httpx>=0.24.0, anthropic>=0.40.0.


Option 5 — Async Worker Queue with Backpressure

Use an asyncio queue as a work distribution mechanism with backpressure to prevent producer/consumer mismatch.

import anthropic
import asyncio
import json
import time
from dataclasses import dataclass
from typing import Any, Optional

client = anthropic.AsyncAnthropic()

@dataclass
class WorkItem:
    item_id: str
    payload: dict
    result: Optional[Any] = None
    error: Optional[str] = None
    completed: bool = False

async def worker(
    worker_id: int,
    queue: asyncio.Queue,
    results: dict,
    process_fn,
):
    """A single worker that pulls items from the queue and processes them."""
    while True:
        try:
            item: WorkItem = await asyncio.wait_for(queue.get(), timeout=5.0)
        except asyncio.TimeoutError:
            break  # No more work within timeout — shut down

        try:
            item.result = await process_fn(item.payload)
            item.completed = True
        except Exception as e:
            item.error = str(e)
            item.completed = True
        finally:
            results[item.item_id] = item
            queue.task_done()

async def process_with_worker_pool(
    items: list[dict],
    process_fn,
    num_workers: int = 10,
    queue_maxsize: int = 50,  # backpressure: producer blocks if queue is full
) -> dict[str, WorkItem]:
    """
    Distribute work across a fixed worker pool with backpressure.
    queue_maxsize prevents the producer from far outpacing workers.
    """
    queue: asyncio.Queue = asyncio.Queue(maxsize=queue_maxsize)
    results: dict[str, WorkItem] = {}

    # Start workers
    workers = [
        asyncio.create_task(worker(i, queue, results, process_fn))
        for i in range(num_workers)
    ]

    # Produce work — blocks when queue is full (backpressure)
    for item_data in items:
        work_item = WorkItem(item_id=item_data["id"], payload=item_data)
        await queue.put(work_item)  # blocks here if queue is at maxsize

    # Wait for all queued work to complete
    await queue.join()

    # Cancel idle workers
    for w in workers:
        w.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

    return results

# Simulated tool: database record enrichment
async def enrich_record(record: dict) -> dict:
    await asyncio.sleep(0.02)  # simulate DB query
    return {
        "id": record["id"],
        "enriched": True,
        "score": len(record["id"]) * 7,
        "processed_at": time.time(),
    }

async def tool_enrich_records(records_json: str, num_workers: int = 10) -> str:
    records = json.loads(records_json)

    if len(records) > 2000:
        return json.dumps({"error": "Max 2000 records per call. Split into smaller batches."})

    num_workers = min(num_workers, 20, len(records))  # cap workers

    print(f"  Enriching {len(records)} records with {num_workers} workers...")
    start = time.monotonic()

    results = await process_with_worker_pool(
        items=records,
        process_fn=enrich_record,
        num_workers=num_workers,
        queue_maxsize=num_workers * 3,  # 3× worker count for smooth flow
    )

    elapsed = time.monotonic() - start
    succeeded = sum(1 for r in results.values() if r.result is not None)

    return json.dumps({
        "total": len(records),
        "succeeded": succeeded,
        "failed": len(records) - succeeded,
        "workers_used": num_workers,
        "elapsed_seconds": round(elapsed, 2),
        "throughput_per_sec": round(len(records) / elapsed, 1),
        "sample": [r.result for r in list(results.values())[:3] if r.result],
    })

# Test: 1000 records, 10 workers, queue backpressure prevents memory spike
async def demo():
    records = [{"id": f"rec-{i:04d}", "name": f"Record {i}"} for i in range(1000)]
    result = await tool_enrich_records(json.dumps(records), num_workers=10)
    data = json.loads(result)
    print(
        f"Processed {data['total']} records in {data['elapsed_seconds']}s "
        f"({data['throughput_per_sec']} rec/s) using {data['workers_used']} workers"
    )

asyncio.run(demo())

Expected Token Savings: Worker pool with backpressure prevents OOM crashes that would lose all in-flight work, saving the cost of restarting from scratch.

Environment: Python 3.9+, asyncio, anthropic>=0.40.0.


Option 6 — Adaptive Concurrency with Feedback Control

Dynamically adjust concurrency based on observed error rates and latency — starts conservative, scales up or down based on real feedback.

import anthropic
import asyncio
import httpx
import json
import time
from collections import deque
from dataclasses import dataclass, field

client = anthropic.AsyncAnthropic()

@dataclass
class AdaptiveConcurrencyController:
    """
    Adjusts concurrency level based on observed error rate and latency.
    Increases concurrency when things are going well; reduces on errors.
    """
    min_concurrency: int = 2
    max_concurrency: int = 50
    initial_concurrency: int = 5
    window_size: int = 20          # look at last N results
    target_error_rate: float = 0.05  # target <5% error rate
    scale_up_threshold: float = 0.01  # scale up if error rate < 1%
    scale_down_threshold: float = 0.10  # scale down if error rate > 10%

    _current: int = field(init=False)
    _results: deque = field(init=False)
    _semaphore: asyncio.Semaphore = field(init=False)
    _lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)

    def __post_init__(self):
        self._current = self.initial_concurrency
        self._results = deque(maxlen=self.window_size)
        self._semaphore = asyncio.Semaphore(self.initial_concurrency)

    async def record_result(self, success: bool, latency_ms: float):
        """Record a result and adjust concurrency if needed."""
        self._results.append({"success": success, "latency_ms": latency_ms})

        if len(self._results) < self.window_size // 2:
            return  # not enough data yet

        async with self._lock:
            error_rate = sum(1 for r in self._results if not r["success"]) / len(self._results)
            avg_latency = sum(r["latency_ms"] for r in self._results) / len(self._results)

            old = self._current

            if error_rate < self.scale_up_threshold and avg_latency < 500:
                # Going well — try increasing concurrency
                new = min(self._current + 2, self.max_concurrency)
            elif error_rate > self.scale_down_threshold or avg_latency > 2000:
                # Too many errors or slow — reduce concurrency
                new = max(self._current - 3, self.min_concurrency)
            else:
                return  # in the sweet spot

            if new != old:
                self._current = new
                # Replace semaphore with new limit
                self._semaphore = asyncio.Semaphore(new)
                direction = "↑" if new > old else "↓"
                print(f"  Concurrency {direction}: {old}{new} (error_rate={error_rate:.1%}, latency={avg_latency:.0f}ms)")

    @property
    def semaphore(self) -> asyncio.Semaphore:
        return self._semaphore

    @property
    def current_concurrency(self) -> int:
        return self._current

controller = AdaptiveConcurrencyController(
    min_concurrency=2,
    max_concurrency=30,
    initial_concurrency=5,
)

async def adaptive_fetch(http_client: httpx.AsyncClient, url: str) -> dict:
    """Fetch with adaptive concurrency control."""
    async with controller.semaphore:
        start = time.monotonic()
        try:
            resp = await http_client.get(url, timeout=8.0)
            latency_ms = (time.monotonic() - start) * 1000
            success = resp.status_code < 400
            await controller.record_result(success, latency_ms)
            return {"url": url, "status": resp.status_code, "latency_ms": round(latency_ms)}
        except Exception as e:
            latency_ms = (time.monotonic() - start) * 1000
            await controller.record_result(False, latency_ms)
            return {"url": url, "error": str(e)[:100], "latency_ms": round(latency_ms)}

async def tool_adaptive_batch_fetch(urls_json: str) -> str:
    """
    Fetch URLs with adaptive concurrency.
    Starts at 5 concurrent, adjusts up or down based on observed error rates.
    """
    urls = json.loads(urls_json)

    async with httpx.AsyncClient(
        limits=httpx.Limits(max_connections=controller.max_concurrency)
    ) as http_client:
        tasks = [adaptive_fetch(http_client, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    succeeded = sum(1 for r in results if isinstance(r, dict) and r.get("status", 0) < 400)

    return json.dumps({
        "total": len(urls),
        "succeeded": succeeded,
        "failed": len(urls) - succeeded,
        "final_concurrency": controller.current_concurrency,
        "results": [r for r in results if not isinstance(r, Exception)][:10],
    })

# Demo: 100 URLs, concurrency adapts based on success/failure patterns
async def demo():
    # Mix of fast and slow URLs to test adaptation
    urls = (
        [f"https://httpbin.org/get?n={i}" for i in range(70)] +
        [f"https://httpbin.org/status/429" for _ in range(15)] +
        [f"https://httpbin.org/delay/3" for _ in range(15)]
    )
    import random
    random.shuffle(urls)

    print(f"Starting adaptive fetch of {len(urls)} URLs (initial concurrency: {controller.initial_concurrency})")
    result = await tool_adaptive_batch_fetch(json.dumps(urls))
    data = json.loads(result)
    print(f"Done: {data['succeeded']}/{data['total']} succeeded, final concurrency: {data['final_concurrency']}")

asyncio.run(demo())

Expected Token Savings: Adaptive concurrency prevents both under-utilization (saving wall-clock time) and over-utilization (preventing 429 cascades that triple token costs on retries).

Environment: Python 3.9+, httpx>=0.24.0, asyncio, anthropic>=0.40.0.


Comparison

Option Approach Burst Protection Rate Limiting Memory Safety Complexity
1 — Semaphore Hard limit on simultaneous ops Excellent No Good Low
2 — Chunked Processing Sequential chunks Excellent Partial Excellent Low
3 — Token Bucket Rate limit per second Good Excellent Good Medium
4 — Connection Pool Per-API pool sizing Excellent No Good Medium
5 — Worker Queue Fixed workers + backpressure Excellent No Excellent Medium
6 — Adaptive Concurrency Self-tuning based on errors Excellent Partial Good High

Start with Option 1 (semaphore) — it’s 3 lines of code and fixes the core problem. Add Option 3 (token bucket) when you need per-second rate compliance with external APIs. Use Option 6 (adaptive) only when load is highly variable and manual tuning is impractical.

Wasting tokens on this error?

Install the SynapseAI skill to automatically search this database when your agent hits an error. Average savings: $2–5 per error incident.

clawhub install synapse-ai

Solved an error that's not here?

Share it and earn MoltCoin rewards.

Contribute a solution →