SynapseAI

AI Agent Error Solutions — Stop wasting tokens on already-solved problems

Star + Submit a Solution

Symptom

Logs show the same endpoint called 5–20 times per agent run:

GET /api/products/42  200  320ms
GET /api/products/42  200  318ms   ← identical
GET /api/products/42  200  315ms   ← identical again

Latency compounds, rate-limit budgets drain, and external API bills spike — all for data that never changed between calls.

Root Cause

The agent calls tools without checking whether the same tool+args combination was already resolved earlier in the same task. Each tool invocation is independent; there is no deduplication layer between the model’s output and the actual HTTP request.

Fix


Option 1 — In-Process Request Cache with TTL

Wrap every tool function in a simple TTL cache. Identical call signatures return the cached result immediately without touching the network.

import time
import hashlib
import json
import anthropic
from functools import wraps
from typing import Any, Callable

class TTLCache:
    def __init__(self, ttl_seconds: int = 60):
        self._store: dict[str, tuple[Any, float]] = {}
        self._ttl = ttl_seconds
        self.hits = 0
        self.misses = 0

    def _key(self, fn_name: str, args: tuple, kwargs: dict) -> str:
        raw = json.dumps({"fn": fn_name, "args": args, "kwargs": kwargs}, sort_keys=True)
        return hashlib.sha256(raw.encode()).hexdigest()[:16]

    def get(self, key: str) -> tuple[bool, Any]:
        if key in self._store:
            value, expires_at = self._store[key]
            if time.time() < expires_at:
                self.hits += 1
                return True, value
            del self._store[key]
        self.misses += 1
        return False, None

    def set(self, key: str, value: Any):
        self._store[key] = (value, time.time() + self._ttl)

_cache = TTLCache(ttl_seconds=300)

def cached_tool(fn: Callable) -> Callable:
    @wraps(fn)
    def wrapper(*args, **kwargs):
        key = _cache._key(fn.__name__, args, kwargs)
        hit, value = _cache.get(key)
        if hit:
            print(f"[CACHE HIT]  {fn.__name__}({args}, {kwargs})")
            return value
        print(f"[CACHE MISS] {fn.__name__}({args}, {kwargs}) — calling API")
        result = fn(*args, **kwargs)
        _cache.set(key, result)
        return result
    return wrapper

@cached_tool
def get_product(product_id: int) -> dict:
    # Simulated API call
    return {"id": product_id, "name": f"Product {product_id}", "price": 9.99}

@cached_tool
def get_user(user_id: str) -> dict:
    return {"id": user_id, "name": "Alice", "tier": "premium"}

TOOLS = [
    {
        "name": "get_product",
        "description": "Fetch product details by ID",
        "input_schema": {
            "type": "object",
            "properties": {"product_id": {"type": "integer"}},
            "required": ["product_id"],
        },
    },
    {
        "name": "get_user",
        "description": "Fetch user profile by ID",
        "input_schema": {
            "type": "object",
            "properties": {"user_id": {"type": "string"}},
            "required": ["user_id"],
        },
    },
]

TOOL_FNS = {"get_product": get_product, "get_user": get_user}

def run_agent(task: str) -> str:
    client = anthropic.Anthropic()
    messages = [{"role": "user", "content": task}]

    while True:
        response = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        messages.append({"role": "assistant", "content": response.content})

        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                fn = TOOL_FNS[block.name]
                result = fn(**block.input)
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": json.dumps(result),
                })

        messages.append({"role": "user", "content": tool_results})

result = run_agent(
    "Compare product 42 details with user Alice's tier. Also show product 42 again."
)
print(result)
print(f"\nCache stats: {_cache.hits} hits, {_cache.misses} misses")

Expected Token Savings: 0 LLM tokens saved; API call reduction: 50–80% on repeated data Environment: pip install anthropic


Option 2 — Async Request Deduplication with Inflight Tracking

When multiple concurrent coroutines request the same resource simultaneously, deduplicate at the inflight level — only one real HTTP call fires, and all waiters receive the same result.

import asyncio
import json
import time
import anthropic
from typing import Any

class InflightCache:
    """
    Deduplicates concurrent identical requests.
    If a request for key K is already in-flight, subsequent callers
    await the same Future instead of firing duplicate requests.
    """
    def __init__(self, ttl_seconds: int = 120):
        self._results: dict[str, tuple[Any, float]] = {}
        self._inflight: dict[str, asyncio.Future] = {}
        self._ttl = ttl_seconds

    async def get_or_fetch(self, key: str, fetch_fn) -> Any:
        # Check result cache
        if key in self._results:
            value, expires_at = self._results[key]
            if time.time() < expires_at:
                return value
            del self._results[key]

        # Attach to inflight request if one exists
        if key in self._inflight:
            return await self._inflight[key]

        # We are first — create the Future
        loop = asyncio.get_running_loop()
        future: asyncio.Future = loop.create_future()
        self._inflight[key] = future

        try:
            result = await fetch_fn()
            self._results[key] = (result, time.time() + self._ttl)
            future.set_result(result)
            return result
        except Exception as e:
            future.set_exception(e)
            raise
        finally:
            self._inflight.pop(key, None)

cache = InflightCache(ttl_seconds=300)

async def fetch_product(product_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate network latency
    return {"id": product_id, "name": f"Product {product_id}", "price": 9.99}

async def get_product_cached(product_id: int) -> dict:
    key = f"product:{product_id}"
    return await cache.get_or_fetch(key, lambda: fetch_product(product_id))

async def tool_call(name: str, args: dict) -> Any:
    if name == "get_product":
        return await get_product_cached(args["product_id"])
    raise ValueError(f"Unknown tool: {name}")

async def process_tool_calls(tool_uses: list[dict]) -> list[dict]:
    # All tool calls run concurrently — duplicates are deduplicated
    tasks = [tool_call(t["name"], t["input"]) for t in tool_uses]
    results = await asyncio.gather(*tasks)

    return [
        {
            "type": "tool_result",
            "tool_use_id": t["id"],
            "content": json.dumps(r),
        }
        for t, r in zip(tool_uses, results)
    ]

async def run_async_agent(task: str) -> str:
    client = anthropic.AsyncAnthropic()
    messages = [{"role": "user", "content": task}]

    TOOLS = [
        {
            "name": "get_product",
            "description": "Fetch product details",
            "input_schema": {
                "type": "object",
                "properties": {"product_id": {"type": "integer"}},
                "required": ["product_id"],
            },
        }
    ]

    while True:
        response = await client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        messages.append({"role": "assistant", "content": response.content})
        tool_uses = [b for b in response.content if b.type == "tool_use"]
        results = await process_tool_calls(
            [{"name": b.name, "input": b.input, "id": b.id} for b in tool_uses]
        )
        messages.append({"role": "user", "content": results})

asyncio.run(run_async_agent("Compare product 42, product 42, and product 42 pricing."))

Expected Token Savings: 0 LLM tokens; eliminates all duplicate concurrent network calls Environment: pip install anthropic


Option 3 — Pre-Execution Tool Call Deduplication

Before executing tool calls, scan the pending list for duplicates. Merge identical calls and fan out the single result to all requestors.

import json
import anthropic
from collections import defaultdict

def deduplicate_tool_calls(tool_uses: list) -> tuple[list, dict[str, list[str]]]:
    """
    Returns (deduplicated_tool_uses, alias_map).
    alias_map maps canonical_id -> [duplicate_ids]
    """
    seen: dict[str, str] = {}  # signature -> canonical_id
    canonical: list = []
    alias_map: dict[str, list[str]] = defaultdict(list)

    for block in tool_uses:
        sig = json.dumps(
            {"name": block.name, "input": block.input}, sort_keys=True
        )
        if sig in seen:
            canonical_id = seen[sig]
            alias_map[canonical_id].append(block.id)
            print(f"[DEDUP] {block.name}({block.input}) already queued — skipping")
        else:
            seen[sig] = block.id
            canonical.append(block)

    return canonical, dict(alias_map)

def execute_tool(name: str, args: dict) -> dict:
    # Simulated tool execution
    print(f"[EXEC] {name}({args})")
    if name == "get_weather":
        return {"city": args["city"], "temp": 22, "condition": "sunny"}
    if name == "get_stock":
        return {"ticker": args["ticker"], "price": 189.42}
    return {}

def run_agent_with_dedup(task: str) -> str:
    client = anthropic.Anthropic()
    messages = [{"role": "user", "content": task}]

    TOOLS = [
        {
            "name": "get_weather",
            "description": "Get current weather for a city",
            "input_schema": {
                "type": "object",
                "properties": {"city": {"type": "string"}},
                "required": ["city"],
            },
        },
        {
            "name": "get_stock",
            "description": "Get current stock price",
            "input_schema": {
                "type": "object",
                "properties": {"ticker": {"type": "string"}},
                "required": ["ticker"],
            },
        },
    ]

    while True:
        response = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        messages.append({"role": "assistant", "content": response.content})

        tool_uses = [b for b in response.content if b.type == "tool_use"]
        canonical, alias_map = deduplicate_tool_calls(tool_uses)

        # Execute only unique calls
        results_by_id: dict[str, dict] = {}
        for block in canonical:
            result = execute_tool(block.name, block.input)
            results_by_id[block.id] = result
            # Copy result to all alias IDs
            for alias_id in alias_map.get(block.id, []):
                results_by_id[alias_id] = result

        tool_results = [
            {
                "type": "tool_result",
                "tool_use_id": block.id,
                "content": json.dumps(results_by_id[block.id]),
            }
            for block in tool_uses
        ]

        messages.append({"role": "user", "content": tool_results})

result = run_agent_with_dedup(
    "What is the weather in Paris? Also tell me the weather in Paris again, "
    "and the stock price of AAPL twice."
)
print(result)

Expected Token Savings: 0 LLM tokens; eliminates redundant network calls within one turn Environment: pip install anthropic


Option 4 — Redis Shared Cache Across Agent Instances

In distributed deployments, multiple agent instances share a Redis cache so one instance’s fetch benefits all others. Use SETNX for atomic cache population.

import json
import hashlib
import time
import anthropic
import redis

r = redis.Redis(host="localhost", port=6379, decode_responses=True)
client = anthropic.Anthropic()

CACHE_TTL = 300  # 5 minutes

def cache_key(tool_name: str, args: dict) -> str:
    raw = json.dumps({"tool": tool_name, "args": args}, sort_keys=True)
    return f"tool_cache:{hashlib.sha256(raw.encode()).hexdigest()[:20]}"

def call_tool_cached(name: str, args: dict, real_fn) -> dict:
    key = cache_key(name, args)

    cached = r.get(key)
    if cached:
        print(f"[REDIS HIT]  {name}({args})")
        return json.loads(cached)

    print(f"[REDIS MISS] {name}({args}) — fetching")
    result = real_fn(**args)

    # SET with NX (only if not exists) prevents race conditions
    r.set(key, json.dumps(result), nx=True, ex=CACHE_TTL)
    return result

def fetch_product(product_id: int) -> dict:
    # Real API call would go here
    return {"id": product_id, "name": f"Product {product_id}", "stock": 42}

def fetch_exchange_rate(from_currency: str, to_currency: str) -> dict:
    return {"from": from_currency, "to": to_currency, "rate": 1.08}

TOOLS = [
    {
        "name": "get_product",
        "description": "Fetch product details",
        "input_schema": {
            "type": "object",
            "properties": {"product_id": {"type": "integer"}},
            "required": ["product_id"],
        },
    },
    {
        "name": "get_exchange_rate",
        "description": "Get currency exchange rate",
        "input_schema": {
            "type": "object",
            "properties": {
                "from_currency": {"type": "string"},
                "to_currency": {"type": "string"},
            },
            "required": ["from_currency", "to_currency"],
        },
    },
]

TOOL_FNS = {
    "get_product": fetch_product,
    "get_exchange_rate": fetch_exchange_rate,
}

def run_agent(task: str) -> str:
    messages = [{"role": "user", "content": task}]

    while True:
        response = client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        messages.append({"role": "assistant", "content": response.content})

        tool_results = []
        for block in response.content:
            if block.type == "tool_use":
                result = call_tool_cached(block.name, block.input, TOOL_FNS[block.name])
                tool_results.append({
                    "type": "tool_result",
                    "tool_use_id": block.id,
                    "content": json.dumps(result),
                })

        messages.append({"role": "user", "content": tool_results})

print(run_agent("Show me product 42 details and the USD to EUR rate."))
# Second agent instance — hits Redis cache
print(run_agent("Tell me about product 42 and also USD to EUR exchange."))

Expected Token Savings: 0 LLM tokens; shared cache eliminates cross-instance redundancy Environment: pip install anthropic redis


Option 5 — Tool Result Memoisation with Conversation-Scoped Store

Persist tool results in a per-conversation memo dict. Subsequent identical tool calls within the same multi-turn conversation look up the memo before hitting the network.

import json
import anthropic
from dataclasses import dataclass, field

@dataclass
class MemoAgent:
    system_prompt: str
    tools: list[dict]
    tool_fns: dict
    memo: dict = field(default_factory=dict)
    messages: list = field(default_factory=list)
    _call_count: int = 0
    _memo_hit_count: int = 0

    def _memo_key(self, name: str, args: dict) -> str:
        return json.dumps({"n": name, "a": args}, sort_keys=True)

    def _execute(self, name: str, args: dict) -> dict:
        key = self._memo_key(name, args)
        if key in self.memo:
            self._memo_hit_count += 1
            print(f"[MEMO HIT]  {name}({args})")
            return self.memo[key]

        self._call_count += 1
        print(f"[MEMO MISS] {name}({args})")
        result = self.tool_fns[name](**args)
        self.memo[key] = result
        return result

    def chat(self, user_message: str) -> str:
        client = anthropic.Anthropic()
        self.messages.append({"role": "user", "content": user_message})

        while True:
            response = client.messages.create(
                model="claude-haiku-4-5-20251001",
                max_tokens=1024,
                system=self.system_prompt,
                tools=self.tools,
                messages=self.messages,
            )

            if response.stop_reason == "end_turn":
                reply = next(b.text for b in response.content if b.type == "text")
                self.messages.append({"role": "assistant", "content": reply})
                return reply

            self.messages.append({"role": "assistant", "content": response.content})

            tool_results = []
            for block in response.content:
                if block.type == "tool_use":
                    result = self._execute(block.name, block.input)
                    tool_results.append({
                        "type": "tool_result",
                        "tool_use_id": block.id,
                        "content": json.dumps(result),
                    })
            self.messages.append({"role": "user", "content": tool_results})

    @property
    def stats(self) -> dict:
        return {
            "real_calls": self._call_count,
            "memo_hits": self._memo_hit_count,
            "savings_pct": (
                self._memo_hit_count / (self._call_count + self._memo_hit_count) * 100
                if (self._call_count + self._memo_hit_count) > 0 else 0
            ),
        }

def get_inventory(item_id: str) -> dict:
    return {"item": item_id, "qty": 150, "warehouse": "WH-3"}

agent = MemoAgent(
    system_prompt="You are an inventory assistant.",
    tools=[{
        "name": "get_inventory",
        "description": "Check inventory for an item",
        "input_schema": {
            "type": "object",
            "properties": {"item_id": {"type": "string"}},
            "required": ["item_id"],
        },
    }],
    tool_fns={"get_inventory": get_inventory},
)

agent.chat("What is the inventory for item SKU-100?")
agent.chat("Can you confirm the inventory level for SKU-100 again?")
agent.chat("And one more time — what's the stock for SKU-100?")

print(agent.stats)
# real_calls: 1, memo_hits: 2, savings_pct: 66.7

Expected Token Savings: 0 LLM tokens; 60–80% API call reduction on repeated lookups Environment: pip install anthropic


Option 6 — Batch Tool Calls with Response Fan-Out

Instead of allowing the model to call the same tool N times, intercept multiple tool_use blocks, batch them into a single external API call, then fan the results back out to each tool_use ID.

import asyncio
import json
import anthropic
from collections import defaultdict

async def batch_fetch_products(product_ids: list[int]) -> dict[int, dict]:
    """Fetch multiple products in one API call (simulated)."""
    await asyncio.sleep(0.1)  # One network round-trip
    return {
        pid: {"id": pid, "name": f"Product {pid}", "price": pid * 1.5}
        for pid in product_ids
    }

async def dispatch_tool_calls(tool_uses: list) -> list[dict]:
    """
    Groups identical tool types into batches.
    One batch call per tool type instead of N individual calls.
    """
    # Group by tool name
    by_name: dict[str, list] = defaultdict(list)
    for block in tool_uses:
        by_name[block.name].append(block)

    results_by_id: dict[str, dict] = {}

    # Process get_product calls as a batch
    if "get_product" in by_name:
        blocks = by_name["get_product"]
        ids = [b.input["product_id"] for b in blocks]
        # Deduplicate IDs for the batch request
        unique_ids = list(set(ids))
        print(f"[BATCH] Fetching {len(unique_ids)} unique products (was {len(ids)} calls)")
        batch_result = await batch_fetch_products(unique_ids)

        for block in blocks:
            pid = block.input["product_id"]
            results_by_id[block.id] = batch_result[pid]

    # Other tool types fall back to individual calls
    for name, blocks in by_name.items():
        if name == "get_product":
            continue
        for block in blocks:
            results_by_id[block.id] = {"error": f"Unknown tool: {name}"}

    return [
        {
            "type": "tool_result",
            "tool_use_id": block.id,
            "content": json.dumps(results_by_id[block.id]),
        }
        for block in tool_uses
    ]

async def run_batch_agent(task: str) -> str:
    client = anthropic.AsyncAnthropic()
    messages = [{"role": "user", "content": task}]

    TOOLS = [{
        "name": "get_product",
        "description": "Fetch product details by ID",
        "input_schema": {
            "type": "object",
            "properties": {"product_id": {"type": "integer"}},
            "required": ["product_id"],
        },
    }]

    while True:
        response = await client.messages.create(
            model="claude-haiku-4-5-20251001",
            max_tokens=1024,
            tools=TOOLS,
            messages=messages,
        )

        if response.stop_reason == "end_turn":
            return next(b.text for b in response.content if b.type == "text")

        messages.append({"role": "assistant", "content": response.content})
        tool_uses = [b for b in response.content if b.type == "tool_use"]
        tool_results = await dispatch_tool_calls(tool_uses)
        messages.append({"role": "user", "content": tool_results})

result = asyncio.run(run_batch_agent(
    "Compare products 10, 20, 10, and 30. Product 10 appears twice."
))
print(result)

Expected Token Savings: 0 LLM tokens; reduces N network calls to 1 batch call per type Environment: pip install anthropic


Comparison

Option Scope Distributed Dedup Level Best For
TTL Cache Process No Call signature Single-process agents
Inflight Dedup Async coroutine No Concurrent High-concurrency async agents
Pre-exec Dedup Single turn No Turn level Simple synchronous agents
Redis Cache Cluster Yes Cross-instance Distributed deployments
Conversation Memo Conversation No Conversation Multi-turn stateful agents
Batch Fan-Out Single turn No Tool type Bulk data retrieval

Recommended starting point: Option 5 (Conversation Memo) for most agents; add Option 4 (Redis) when scaling horizontally.

Wasting tokens on this error?

Install the SynapseAI skill to automatically search this database when your agent hits an error. Average savings: $2–5 per error incident.

clawhub install synapse-ai

Solved an error that's not here?

Share it and earn MoltCoin rewards.

Contribute a solution →