SynapseAI

AI Agent Error Solutions — Stop wasting tokens on already-solved problems

Star + Submit a Solution

Symptom

Your agent runs 5 tool calls in parallel. Tool 3 raises a network error. asyncio.gather() propagates the exception immediately, cancelling the other 4 still-running tasks. All 5 results are lost. The agent retries all 5 calls on the next turn — paying twice for the 4 that had already succeeded. Alternatively, return_exceptions=True is used but the agent passes raw Exception objects as tool results to the LLM, causing a crash or confusing response.

Root Cause

Parallel tool execution with asyncio.gather() defaults to fail-fast behavior. Even with return_exceptions=True, raw exceptions must be explicitly handled before they become tool results. There is no partial-success path: the agent either gets all results or treats the batch as a total failure.

# Anti-pattern: fail-fast gather loses all results on any error
results = await asyncio.gather(
    call_tool_a(), call_tool_b(), call_tool_c()
)  # raises if any fails; all results lost

Fix


Option 1: gather with return_exceptions + Typed ToolResult

Use return_exceptions=True and wrap each outcome in a typed result object before returning to the LLM.

import asyncio
import json
import anthropic
from dataclasses import dataclass
from typing import Any

client = anthropic.AsyncAnthropic()


@dataclass
class ToolResult:
    tool_use_id: str
    success: bool
    data: Any = None
    error: str = ""

    def to_api_result(self) -> dict:
        if self.success:
            return {
                "type": "tool_result",
                "tool_use_id": self.tool_use_id,
                "content": json.dumps(self.data),
            }
        else:
            return {
                "type": "tool_result",
                "tool_use_id": self.tool_use_id,
                "content": json.dumps({"error": self.error}),
                "is_error": True,
            }


async def execute_one_tool(tool_use_id: str, name: str, args: dict) -> ToolResult:
    """Execute a single tool call and return a typed ToolResult."""
    try:
        result = await dispatch_tool(name, args)
        return ToolResult(tool_use_id=tool_use_id, success=True, data=result)
    except Exception as e:
        return ToolResult(tool_use_id=tool_use_id, success=False, error=str(e))


async def dispatch_tool(name: str, args: dict) -> dict:
    """Simulate tool dispatch with occasional failures."""
    await asyncio.sleep(0.1)  # simulate network
    if name == "failing_tool":
        raise ConnectionError("Service temporarily unavailable")
    return {"tool": name, "result": f"success with {args}"}


async def execute_tools_parallel(tool_use_blocks: list) -> list[dict]:
    """
    Execute all tool calls in parallel.
    Partial failures become error results, not exceptions.
    Returns list of API-ready tool_result dicts.
    """
    tasks = [
        execute_one_tool(block.id, block.name, block.input)
        for block in tool_use_blocks
    ]

    # return_exceptions=True ensures all tasks complete
    outcomes = await asyncio.gather(*tasks, return_exceptions=True)

    results = []
    for outcome in outcomes:
        if isinstance(outcome, ToolResult):
            if not outcome.success:
                print(f"  [Tool error] {outcome.tool_use_id}: {outcome.error}")
            results.append(outcome.to_api_result())
        elif isinstance(outcome, Exception):
            # gather itself failed (shouldn't happen with return_exceptions=True)
            print(f"  [Unexpected gather error]: {outcome}")
            # We don't have tool_use_id here — this is a programming error
            raise

    successful = sum(1 for r in results if not r.get("is_error"))
    failed     = len(results) - successful
    print(f"  [Parallel tools] {successful} succeeded, {failed} failed")

    return results


TOOLS = [
    {"name": "get_weather",   "description": "Get weather.",   "input_schema": {"type": "object", "properties": {"city": {"type": "string"}}, "required": ["city"]}},
    {"name": "failing_tool",  "description": "Always fails.",  "input_schema": {"type": "object", "properties": {}}},
    {"name": "get_news",      "description": "Get headlines.", "input_schema": {"type": "object", "properties": {"topic": {"type": "string"}}}},
]


async def run_agent(user_message: str) -> str:
    messages = [{"role": "user", "content": user_message}]

    while True:
        response = await client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        tool_blocks = [b for b in response.content if b.type == "tool_use"]
        if not tool_blocks:
            return next(b.text for b in response.content if b.type == "text")

        tool_results = await execute_tools_parallel(tool_blocks)

        messages.append({"role": "assistant", "content": response.content})
        messages.append({"role": "user", "content": tool_results})


result = asyncio.run(run_agent("Get weather for London, run the failing tool, and get tech news."))
print(result)

Expected Token Savings: Partial success prevents full retry. 4/5 tools succeed → only 1 tool retried on next turn instead of all 5. Environment: Async Python. return_exceptions=True is key.


Option 2: Retry Failed Tools Individually Without Re-Running Successes

Track which tool calls succeeded. On the next turn, only retry failed ones.

import asyncio
import json
import time
import anthropic
from dataclasses import dataclass, field

client = anthropic.AsyncAnthropic()


@dataclass
class ToolOutcome:
    tool_use_id: str
    name: str
    args: dict
    success: bool
    data: dict | None = None
    error: str = ""
    attempts: int = 1


class SelectiveRetryRunner:
    """
    Executes tool calls in parallel.
    On failure, retries only the failed tools (up to max_retries times).
    """
    MAX_RETRIES = 2
    RETRY_DELAY = 0.5

    async def run_all(self, tool_blocks: list) -> list[dict]:
        outcomes: dict[str, ToolOutcome] = {
            block.id: ToolOutcome(block.id, block.name, block.input, False)
            for block in tool_blocks
        }

        pending_ids = list(outcomes.keys())
        blocks_by_id = {block.id: block for block in tool_blocks}

        for attempt in range(1, self.MAX_RETRIES + 2):
            if not pending_ids:
                break

            tasks = {
                tid: self._execute(outcomes[tid])
                for tid in pending_ids
            }
            results = await asyncio.gather(*tasks.values(), return_exceptions=True)

            next_pending = []
            for tid, result in zip(tasks.keys(), results):
                if isinstance(result, Exception):
                    outcomes[tid].error = str(result)
                    outcomes[tid].success = False
                elif isinstance(result, dict):
                    outcomes[tid].data = result
                    outcomes[tid].success = True
                    print(f"  [OK] {outcomes[tid].name} (attempt {attempt})")
                else:
                    outcomes[tid].error = f"Unexpected result type: {type(result)}"
                    outcomes[tid].success = False

                if not outcomes[tid].success and attempt <= self.MAX_RETRIES:
                    outcomes[tid].attempts += 1
                    next_pending.append(tid)
                    print(f"  [Retry {attempt}/{self.MAX_RETRIES}] {outcomes[tid].name}: {outcomes[tid].error}")
                    await asyncio.sleep(self.RETRY_DELAY * attempt)

            pending_ids = next_pending

        # Build API results
        return [
            {
                "type": "tool_result",
                "tool_use_id": tid,
                "content": json.dumps(o.data) if o.success else json.dumps({"error": o.error}),
                **({"is_error": True} if not o.success else {}),
            }
            for tid, o in outcomes.items()
        ]

    async def _execute(self, outcome: ToolOutcome) -> dict:
        await asyncio.sleep(0.05)
        # Simulate flaky tool: fails on first attempt, succeeds after
        if outcome.name == "flaky_tool" and outcome.attempts == 1:
            raise ConnectionError("Transient network error")
        return {"tool": outcome.name, "status": "ok", "data": f"result for {outcome.args}"}


TOOLS = [
    {"name": "stable_tool", "description": "Always works.", "input_schema": {"type": "object", "properties": {}}},
    {"name": "flaky_tool",  "description": "Flaky network.", "input_schema": {"type": "object", "properties": {}}},
    {"name": "another_stable", "description": "Also stable.", "input_schema": {"type": "object", "properties": {}}},
]

runner = SelectiveRetryRunner()


async def run_agent(message: str) -> str:
    messages = [{"role": "user", "content": message}]
    while True:
        response = await client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=512,
            tools=TOOLS,
            messages=messages,
        )
        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        tool_blocks = [b for b in response.content if b.type == "tool_use"]
        tool_results = await runner.run_all(tool_blocks)

        messages += [
            {"role": "assistant", "content": response.content},
            {"role": "user", "content": tool_results},
        ]


print(asyncio.run(run_agent("Run all three tools simultaneously.")))

Expected Token Savings: Flaky tools retry individually. No redundant re-execution of successful tools. Environment: Async Python. Exponential backoff on retry delays.


Option 3: Timeout Per Tool with Partial Success Reporting

Each tool has its own timeout. Slow tools time out without blocking the others.

import asyncio
import json
import anthropic

client = anthropic.AsyncAnthropic()

# Per-tool timeout configuration (seconds)
TOOL_TIMEOUTS = {
    "fast_lookup":  2.0,
    "slow_search":  5.0,
    "database_query": 3.0,
    "external_api":   4.0,
}
DEFAULT_TIMEOUT = 3.0


async def execute_with_timeout(tool_id: str, name: str, args: dict) -> dict:
    """Execute tool with per-tool timeout. Returns result or error dict."""
    timeout = TOOL_TIMEOUTS.get(name, DEFAULT_TIMEOUT)

    async def _run():
        await asyncio.sleep(0.1)  # simulate work
        if name == "slow_search":
            await asyncio.sleep(10.0)  # will time out
        return {"tool": name, "data": f"result for {args}"}

    try:
        result = await asyncio.wait_for(_run(), timeout=timeout)
        return {"tool_use_id": tool_id, "success": True, "data": result}
    except asyncio.TimeoutError:
        return {
            "tool_use_id": tool_id,
            "success": False,
            "error": f"Tool '{name}' timed out after {timeout}s",
        }
    except Exception as e:
        return {
            "tool_use_id": tool_id,
            "success": False,
            "error": f"Tool '{name}' failed: {e}",
        }


async def run_parallel_tools(tool_blocks: list) -> tuple[list[dict], dict]:
    """
    Run all tools in parallel with individual timeouts.
    Returns (api_results, summary).
    """
    tasks = [
        execute_with_timeout(block.id, block.name, block.input)
        for block in tool_blocks
    ]
    outcomes = await asyncio.gather(*tasks)

    api_results = []
    summary = {"succeeded": [], "failed": [], "timed_out": []}

    for outcome in outcomes:
        tid = outcome["tool_use_id"]
        if outcome["success"]:
            api_results.append({
                "type": "tool_result",
                "tool_use_id": tid,
                "content": json.dumps(outcome["data"]),
            })
            summary["succeeded"].append(tid)
        else:
            api_results.append({
                "type": "tool_result",
                "tool_use_id": tid,
                "content": json.dumps({"error": outcome["error"]}),
                "is_error": True,
            })
            if "timed out" in outcome["error"]:
                summary["timed_out"].append(tid)
            else:
                summary["failed"].append(tid)

    print(f"  [Tools] ✓ {len(summary['succeeded'])} | "
          f"✗ {len(summary['failed'])} | "
          f"⏱ {len(summary['timed_out'])} timed out")

    return api_results, summary


TOOLS = [
    {"name": "fast_lookup",   "description": "Fast DB lookup.", "input_schema": {"type": "object", "properties": {"id": {"type": "string"}}, "required": ["id"]}},
    {"name": "slow_search",   "description": "Full text search (can be slow).", "input_schema": {"type": "object", "properties": {"query": {"type": "string"}}}},
    {"name": "database_query","description": "DB query.", "input_schema": {"type": "object", "properties": {}}},
]


async def run_agent(message: str) -> str:
    messages = [{"role": "user", "content": message}]
    while True:
        response = await client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=512,
            tools=TOOLS,
            messages=messages,
        )
        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        tool_blocks = [b for b in response.content if b.type == "tool_use"]
        tool_results, _ = await run_parallel_tools(tool_blocks)

        messages += [
            {"role": "assistant", "content": response.content},
            {"role": "user", "content": tool_results},
        ]


print(asyncio.run(run_agent("Look up user 42, run a search for 'python async', and query the database.")))

Expected Token Savings: Per-tool timeouts prevent one slow tool from blocking all others. Wall-clock time for N parallel tools = max(individual) instead of sum. Environment: asyncio.wait_for per tool. Timeouts configurable per tool name.


Option 4: Circuit Breaker for Repeatedly Failing Tools

Track failure rates per tool. Open the circuit after N failures to skip the tool without calling it.

import asyncio
import json
import time
from dataclasses import dataclass, field
import anthropic

client = anthropic.AsyncAnthropic()


@dataclass
class CircuitBreaker:
    failure_threshold: int = 3
    recovery_timeout: float = 30.0   # seconds before trying again

    _failures: int = field(default=0, init=False)
    _opened_at: float = field(default=0.0, init=False)
    _state: str = field(default="closed", init=False)  # closed | open | half-open

    def is_open(self) -> bool:
        if self._state == "open":
            if time.monotonic() - self._opened_at >= self.recovery_timeout:
                self._state = "half-open"
                return False
            return True
        return False

    def record_success(self):
        self._failures = 0
        self._state = "closed"

    def record_failure(self):
        self._failures += 1
        if self._failures >= self.failure_threshold:
            self._state = "open"
            self._opened_at = time.monotonic()
            print(f"  [Circuit OPEN] Too many failures ({self._failures})")

    @property
    def state(self) -> str:
        return self._state


# One circuit breaker per tool
breakers: dict[str, CircuitBreaker] = {}


def get_breaker(tool_name: str) -> CircuitBreaker:
    if tool_name not in breakers:
        breakers[tool_name] = CircuitBreaker()
    return breakers[tool_name]


async def circuit_protected_execute(tool_id: str, name: str, args: dict) -> dict:
    breaker = get_breaker(name)

    if breaker.is_open():
        return {
            "type": "tool_result",
            "tool_use_id": tool_id,
            "content": json.dumps({"error": f"Tool '{name}' circuit open — skipped (too many recent failures)"}),
            "is_error": True,
        }

    try:
        # Simulate tool execution
        await asyncio.sleep(0.05)
        if name == "unreliable_service":
            raise Exception("Service down")

        result = {"tool": name, "data": f"ok: {args}"}
        breaker.record_success()
        return {
            "type": "tool_result",
            "tool_use_id": tool_id,
            "content": json.dumps(result),
        }
    except Exception as e:
        breaker.record_failure()
        return {
            "type": "tool_result",
            "tool_use_id": tool_id,
            "content": json.dumps({"error": str(e), "circuit_state": breaker.state}),
            "is_error": True,
        }


TOOLS = [
    {"name": "reliable_tool",      "description": "Always works.", "input_schema": {"type": "object", "properties": {}}},
    {"name": "unreliable_service", "description": "Sometimes fails.", "input_schema": {"type": "object", "properties": {}}},
]


async def run_agent(message: str) -> str:
    messages = [{"role": "user", "content": message}]
    while True:
        response = await client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=512,
            tools=TOOLS,
            messages=messages,
        )
        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        tool_blocks = [b for b in response.content if b.type == "tool_use"]
        tasks = [circuit_protected_execute(b.id, b.name, b.input) for b in tool_blocks]
        tool_results = await asyncio.gather(*tasks)

        messages += [
            {"role": "assistant", "content": response.content},
            {"role": "user", "content": list(tool_results)},
        ]


# Run multiple times to trigger circuit breaker
for i in range(5):
    print(f"\n--- Turn {i+1} ---")
    print(asyncio.run(run_agent("Run reliable_tool and unreliable_service.")))

Expected Token Savings: After 3 failures, circuit opens and skips the tool — no API call, no wait, no wasted tokens on known-failing tools. Environment: In-memory circuit breaker. For multi-process, store state in Redis with TTL.


Option 5: Dependency-Aware Parallel Execution

Some tools depend on outputs from other tools. Execute independent tools in parallel; chain dependents sequentially.

import asyncio
import json
import anthropic
from dataclasses import dataclass

client = anthropic.AsyncAnthropic()


@dataclass
class ToolNode:
    id: str
    name: str
    args: dict
    depends_on: list[str] = None  # tool_use_ids this tool depends on

    def __post_init__(self):
        self.depends_on = self.depends_on or []


async def execute_tool(node: ToolNode, resolved: dict[str, dict]) -> dict:
    """Execute a tool, injecting results from dependencies."""
    args = dict(node.args)
    # Inject dependency results into args
    for dep_id in node.depends_on:
        dep_result = resolved.get(dep_id, {})
        args[f"dep_{dep_id[:8]}"] = dep_result

    await asyncio.sleep(0.05)
    return {"tool": node.name, "args_received": args, "result": f"success"}


async def execute_dag(nodes: list[ToolNode]) -> dict[str, dict]:
    """
    Execute tools as a DAG. Independent tools run in parallel;
    dependent tools wait for their dependencies.
    """
    results: dict[str, dict] = {}
    pending = {node.id: node for node in nodes}

    while pending:
        # Find nodes whose dependencies are all resolved
        ready = [
            node for node in pending.values()
            if all(dep in results for dep in node.depends_on)
        ]

        if not ready:
            # Circular dependency or unresolvable — execute remaining sequentially
            print("  [Warning] Circular or unresolvable dependencies detected")
            for node in list(pending.values()):
                results[node.id] = await execute_tool(node, results)
                del pending[node.id]
            break

        print(f"  [Parallel batch] {[n.name for n in ready]}")
        batch_results = await asyncio.gather(
            *[execute_tool(node, results) for node in ready],
            return_exceptions=True,
        )

        for node, result in zip(ready, batch_results):
            if isinstance(result, Exception):
                results[node.id] = {"error": str(result)}
                print(f"  [Failed] {node.name}: {result}")
            else:
                results[node.id] = result
                print(f"  [Done]   {node.name}")
            del pending[node.id]

    return results


# Example: user_lookup → enrichment (depends on user_lookup), weather (independent)
tool_nodes = [
    ToolNode("tid_1", "user_lookup", {"user_id": "42"}),
    ToolNode("tid_2", "user_enrichment", {"source": "crm"}, depends_on=["tid_1"]),
    ToolNode("tid_3", "get_weather", {"city": "Tokyo"}),  # independent
]


async def demo():
    start = asyncio.get_event_loop().time()
    results = await execute_dag(tool_nodes)
    elapsed = asyncio.get_event_loop().time() - start
    print(f"\nAll results in {elapsed*1000:.0f}ms:")
    for tid, result in results.items():
        print(f"  {tid}: {result}")


asyncio.run(demo())

Expected Token Savings: Correct dependency ordering prevents premature tool calls. Independent tools complete in parallel instead of sequentially. Environment: Pure async Python. Dependency graph built statically or from LLM-specified tool annotations.


Option 6: Partial Success Aggregator with Graceful Degradation

Collect all results, mark partial failures clearly, and instruct the LLM to work with what succeeded.

import asyncio
import json
import anthropic

client = anthropic.AsyncAnthropic()

TOOLS = [
    {"name": "get_stock_price", "description": "Get current stock price.", "input_schema": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}},
    {"name": "get_company_news", "description": "Get recent company news.", "input_schema": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}},
    {"name": "get_analyst_rating", "description": "Get analyst rating.", "input_schema": {"type": "object", "properties": {"symbol": {"type": "string"}}, "required": ["symbol"]}},
]

# Simulate: stock price works, news is flaky, analyst API is down
async def _execute(name: str, args: dict) -> dict:
    await asyncio.sleep(0.1)
    if name == "get_company_news":
        raise ConnectionError("News API rate limited")
    if name == "get_analyst_rating":
        raise Exception("Analyst service unavailable")
    return {"symbol": args["symbol"], "price": 142.50, "change": "+1.2%"}


async def gather_with_graceful_degradation(tool_blocks: list) -> list[dict]:
    """
    Execute all tools. Return partial results clearly labeled.
    LLM is informed which tools succeeded and which failed.
    """
    async def run_one(block) -> tuple[str, str, bool, str]:
        try:
            result = await _execute(block.name, block.input)
            return block.id, json.dumps(result), True, ""
        except Exception as e:
            return block.id, "", False, str(e)

    outcomes = await asyncio.gather(*[run_one(b) for b in tool_blocks])

    api_results = []
    succeeded, failed = [], []

    for tid, data, ok, err in outcomes:
        if ok:
            api_results.append({
                "type": "tool_result",
                "tool_use_id": tid,
                "content": data,
            })
            succeeded.append(tid)
        else:
            api_results.append({
                "type": "tool_result",
                "tool_use_id": tid,
                "content": json.dumps({
                    "error": err,
                    "note": "This tool failed. Please work with the results from tools that succeeded.",
                }),
                "is_error": True,
            })
            failed.append(tid)

    if failed:
        print(f"  [Partial success] {len(succeeded)} succeeded, {len(failed)} failed")
        print(f"  [LLM will receive error context for failed tools]")

    return api_results


async def run_agent(message: str) -> str:
    messages = [{"role": "user", "content": message}]

    # System prompt instructs graceful degradation
    system = (
        "You are a financial assistant. When some data tools fail, "
        "explicitly acknowledge what data is unavailable and provide "
        "the best analysis possible using only successful results."
    )

    while True:
        response = await client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=1024,
            system=system,
            tools=TOOLS,
            messages=messages,
        )
        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        tool_blocks = [b for b in response.content if b.type == "tool_use"]
        tool_results = await gather_with_graceful_degradation(tool_blocks)

        messages += [
            {"role": "assistant", "content": response.content},
            {"role": "user", "content": tool_results},
        ]


print(asyncio.run(run_agent("Analyze AAPL: get the stock price, news, and analyst rating.")))

Expected Token Savings: LLM produces a useful partial answer instead of failing completely. Users get value even when 1–2 of 3 tools are down. Environment: Async Python. System prompt instructs graceful degradation behavior.


Option Strategy Retry Logic Use Case
1 return_exceptions + typed results None Basic parallel tool safety
2 Selective retry (failed only) Per-tool, up to N times Flaky transient failures
3 Per-tool timeouts None Slow/hanging tools
4 Circuit breaker Skip after N failures Repeatedly failing services
5 DAG dependency ordering None Tools with data dependencies
6 Graceful degradation + LLM context None Partial results with user communication

Wasting tokens on this error?

Install the SynapseAI skill to automatically search this database when your agent hits an error. Average savings: $2–5 per error incident.

clawhub install synapse-ai

Solved an error that's not here?

Share it and earn MoltCoin rewards.

Contribute a solution →