Symptom
Anthropic returns HTTP 529 (overloaded) or 503 during peak hours. Your agent raises an unhandled anthropic.APIStatusError, the request is lost, and the user sees a cryptic error. There is no fallback to a lighter model, no request queuing, and no graceful degradation to cached or partial responses.
Root Cause
The agent uses a single hard-coded model with no fallback strategy. anthropic.APIStatusError with status 529 is treated the same as a bug — it propagates up and crashes the request handler. There is no model cascade, no circuit breaker for sustained outages, and no queue for requests that arrive during a temporary overload window.
Fix
Option 1: Model Cascade — Try Heavy, Fall Back to Light
Try the preferred model first. On 529/503, automatically retry with a cheaper, less-loaded alternative.
import time
import anthropic
client = anthropic.Anthropic()
# Ordered from preferred to fallback
MODEL_CASCADE = [
"claude-opus-4-6",
"claude-sonnet-4-6",
"claude-haiku-4-5-20251001",
]
FALLBACK_ERRORS = {529, 503, 502, 500}
def cascade_create(
messages: list[dict],
max_tokens: int = 1024,
system: str = "",
**kwargs,
) -> tuple[anthropic.types.Message, str]:
"""
Try each model in cascade order.
Returns (response, model_used).
Raises if all models fail.
"""
last_error = None
for model in MODEL_CASCADE:
try:
create_kwargs = dict(
model=model,
max_tokens=max_tokens,
messages=messages,
**kwargs,
)
if system:
create_kwargs["system"] = system
response = client.messages.create(**create_kwargs)
if model != MODEL_CASCADE[0]:
print(f" [Fallback] Using {model} (primary unavailable)")
return response, model
except anthropic.APIStatusError as e:
if e.status_code in FALLBACK_ERRORS:
print(f" [Model unavailable] {model}: HTTP {e.status_code} — trying next")
last_error = e
time.sleep(0.5) # brief pause before trying next model
continue
raise # non-retryable error (auth, bad request, etc.)
except anthropic.APIConnectionError as e:
print(f" [Connection error] {model}: {e} — trying next")
last_error = e
continue
raise RuntimeError(f"All models in cascade failed. Last error: {last_error}")
def chat(user_message: str, system: str = "You are a helpful assistant.") -> str:
messages = [{"role": "user", "content": user_message}]
response, model_used = cascade_create(messages, max_tokens=1024, system=system)
reply = response.content[0].text
print(f" [Responded with {model_used}]")
return reply
# Usage
print(chat("Summarize the key benefits of async programming in Python."))
Expected Token Savings: Haiku fallback costs ~15x less than Opus. During outages, users still get responses rather than errors — zero lost sessions.
Environment: Synchronous Python. Add max_tokens scaling per model to stay within their limits.
Option 2: Circuit Breaker with Automatic Recovery
Track failure rates per model. Open the circuit after N failures; try recovery after cooldown.
import time
import threading
import anthropic
from dataclasses import dataclass, field
from enum import Enum
client = anthropic.Anthropic()
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Refusing requests
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class ModelCircuitBreaker:
model: str
failure_threshold: int = 3
recovery_timeout: float = 60.0 # seconds
_state: CircuitState = field(default=CircuitState.CLOSED, init=False)
_failures: int = field(default=0, init=False)
_opened_at: float = field(default=0.0, init=False)
_lock: threading.Lock = field(default_factory=threading.Lock, init=False)
@property
def state(self) -> CircuitState:
with self._lock:
if self._state == CircuitState.OPEN:
if time.monotonic() - self._opened_at >= self.recovery_timeout:
self._state = CircuitState.HALF_OPEN
return self._state
def is_available(self) -> bool:
return self.state != CircuitState.OPEN
def record_success(self):
with self._lock:
self._failures = 0
self._state = CircuitState.CLOSED
def record_failure(self):
with self._lock:
self._failures += 1
if self._failures >= self.failure_threshold:
self._state = CircuitState.OPEN
self._opened_at = time.monotonic()
print(f" [Circuit OPEN] {self.model} — {self._failures} failures, cooling down {self.recovery_timeout}s")
def status(self) -> str:
return f"{self.model}: {self.state.value} ({self._failures} failures)"
class ResilientModelRouter:
MODEL_PRIORITY = [
"claude-sonnet-4-6",
"claude-haiku-4-5-20251001",
]
def __init__(self):
self.breakers = {m: ModelCircuitBreaker(m) for m in self.MODEL_PRIORITY}
def create(self, messages: list[dict], **kwargs) -> anthropic.types.Message:
available = [m for m in self.MODEL_PRIORITY if self.breakers[m].is_available()]
if not available:
raise RuntimeError("All models are circuit-broken. Try again later.")
for model in available:
breaker = self.breakers[model]
try:
response = client.messages.create(
model=model,
messages=messages,
**kwargs,
)
breaker.record_success()
return response
except anthropic.APIStatusError as e:
if e.status_code in (529, 503, 502):
breaker.record_failure()
print(f" [Failure recorded] {model}: HTTP {e.status_code}")
continue
raise
raise RuntimeError("All available models failed")
def print_status(self):
for breaker in self.breakers.values():
print(f" {breaker.status()}")
router = ResilientModelRouter()
def resilient_chat(user_message: str) -> str:
response = router.create(
messages=[{"role": "user", "content": user_message}],
max_tokens=512,
)
return response.content[0].text
# Usage
for i in range(3):
try:
reply = resilient_chat(f"Query {i}: What is 2+2?")
print(f"[{i}] {reply}")
except Exception as e:
print(f"[{i}] ERROR: {e}")
router.print_status()
Expected Token Savings: Circuit opens after 3 failures, immediately routing to fallback without waiting for timeouts. Saves 3–10s latency per failed attempt. Environment: Thread-safe circuit breaker. Single-process. For multi-process, move state to Redis.
Option 3: Request Queue with Drain on Recovery
Buffer incoming requests during outages. Drain the queue when the model becomes available again.
import asyncio
import time
import anthropic
from dataclasses import dataclass, field
from typing import Any
client = anthropic.AsyncAnthropic()
@dataclass
class QueuedRequest:
request_id: str
messages: list[dict]
max_tokens: int
future: asyncio.Future
enqueued_at: float = field(default_factory=time.monotonic)
class ResilientQueuedAgent:
"""
Buffers requests during model outages.
Drains queue with exponential backoff on recovery.
"""
MAX_QUEUE_SIZE = 50
MAX_WAIT_SECONDS = 120
def __init__(self):
self._queue: asyncio.Queue = asyncio.Queue(maxsize=self.MAX_QUEUE_SIZE)
self._model = "claude-haiku-4-5-20251001"
self._running = False
self._backoff = 1.0
async def start(self):
self._running = True
asyncio.create_task(self._drain_loop())
async def submit(self, request_id: str, messages: list[dict], max_tokens: int = 512) -> str:
"""Submit a request. Blocks until processed or timeout."""
future = asyncio.get_event_loop().create_future()
req = QueuedRequest(request_id, messages, max_tokens, future)
try:
self._queue.put_nowait(req)
except asyncio.QueueFull:
return "Service at capacity. Please retry in a moment."
try:
return await asyncio.wait_for(future, timeout=self.MAX_WAIT_SECONDS)
except asyncio.TimeoutError:
return f"Request {request_id} timed out after {self.MAX_WAIT_SECONDS}s"
async def _drain_loop(self):
while self._running:
try:
req: QueuedRequest = await asyncio.wait_for(
self._queue.get(), timeout=1.0
)
except asyncio.TimeoutError:
continue
wait_time = time.monotonic() - req.enqueued_at
if wait_time > self.MAX_WAIT_SECONDS:
req.future.set_result(f"Request expired after {wait_time:.0f}s in queue")
continue
try:
response = await client.messages.create(
model=self._model,
max_tokens=req.max_tokens,
messages=req.messages,
)
req.future.set_result(response.content[0].text)
self._backoff = 1.0 # reset backoff on success
except anthropic.APIStatusError as e:
if e.status_code in (529, 503):
# Re-queue and back off
print(f" [Overloaded] Backing off {self._backoff:.1f}s, re-queuing {req.request_id}")
await asyncio.sleep(self._backoff)
self._backoff = min(self._backoff * 2, 30.0) # cap at 30s
try:
self._queue.put_nowait(req)
except asyncio.QueueFull:
req.future.set_result("Queue full during retry. Please try again.")
else:
req.future.set_exception(e)
except Exception as e:
req.future.set_exception(e)
agent = ResilientQueuedAgent()
async def demo():
await agent.start()
# Simulate concurrent users during partial outage
tasks = [
agent.submit(f"req_{i}", [{"role": "user", "content": f"Tell me fact #{i} about Python."}])
for i in range(5)
]
results = await asyncio.gather(*tasks)
for i, r in enumerate(results):
print(f"[req_{i}] {r[:80]}")
asyncio.run(demo())
Expected Token Savings: No requests are lost during outages. Exponential backoff prevents thundering herd on recovery. Environment: Async Python. Queue size and timeout are configurable.
Option 4: Cached Response Fallback
On model unavailability, return a cached response for identical or semantically similar recent requests.
import hashlib
import json
import time
import anthropic
from collections import OrderedDict
client = anthropic.Anthropic()
# LRU cache: {cache_key → (response_text, timestamp)}
RESPONSE_CACHE: OrderedDict[str, tuple[str, float]] = OrderedDict()
CACHE_MAX_SIZE = 100
CACHE_TTL_SECONDS = 300 # 5 minutes
def make_cache_key(messages: list[dict], model: str) -> str:
content = json.dumps(messages, sort_keys=True) + model
return hashlib.sha256(content.encode()).hexdigest()[:16]
def cache_get(key: str) -> str | None:
if key in RESPONSE_CACHE:
text, ts = RESPONSE_CACHE[key]
if time.time() - ts < CACHE_TTL_SECONDS:
RESPONSE_CACHE.move_to_end(key) # LRU update
return text
else:
del RESPONSE_CACHE[key]
return None
def cache_set(key: str, text: str):
if key in RESPONSE_CACHE:
RESPONSE_CACHE.move_to_end(key)
RESPONSE_CACHE[key] = (text, time.time())
if len(RESPONSE_CACHE) > CACHE_MAX_SIZE:
RESPONSE_CACHE.popitem(last=False)
def create_with_cache_fallback(
messages: list[dict],
model: str = "claude-haiku-4-5-20251001",
max_tokens: int = 512,
) -> tuple[str, str]:
"""
Returns (response_text, source).
source: "live" | "cache" | "degraded"
"""
key = make_cache_key(messages, model)
try:
response = client.messages.create(
model=model,
max_tokens=max_tokens,
messages=messages,
)
text = response.content[0].text
cache_set(key, text)
return text, "live"
except anthropic.APIStatusError as e:
if e.status_code in (529, 503, 502):
# Try cache first
cached = cache_get(key)
if cached:
print(f" [Cache hit] Returning cached response (model unavailable)")
return f"[Cached response — model temporarily unavailable]\n\n{cached}", "cache"
# No cache — return degraded response
user_msg = messages[-1].get("content", "") if messages else ""
degraded = (
f"The AI service is temporarily overloaded. "
f"Your request was: '{user_msg[:100]}'. "
f"Please try again in a few minutes."
)
return degraded, "degraded"
raise
except anthropic.APIConnectionError:
cached = cache_get(key)
if cached:
return f"[Offline cache]\n\n{cached}", "cache"
return "Cannot reach the AI service. Please check your connection.", "degraded"
def chat(message: str) -> str:
messages = [{"role": "user", "content": message}]
text, source = create_with_cache_fallback(messages)
if source != "live":
print(f" [Source: {source}]")
return text
# Prime the cache
chat("What is Python?")
chat("How do async functions work?")
# These would return cached on model unavailability:
print(chat("What is Python?"))
print(chat("How do async functions work?"))
Expected Token Savings: Zero API cost for cache hits. Especially valuable for FAQ-style agent queries that repeat across users. Environment: In-memory LRU cache. For persistence across restarts, replace with Redis or SQLite.
Option 5: Degraded Mode with Static Fallback Responses
Define a set of canned responses for the most common query types. Serve them during extended outages.
import re
import anthropic
client = anthropic.Anthropic()
# Static fallback responses for common query patterns
FALLBACK_PATTERNS = [
(r"\b(hello|hi|hey|greet)\b", "Hello! I'm currently experiencing reduced capacity. I can still help with basic questions."),
(r"\b(help|what can you do|capabilities)\b", "I can help with coding, writing, analysis, and Q&A. The AI service is temporarily reduced — some complex requests may be delayed."),
(r"\b(status|down|error|broken|not working)\b", "The AI service is experiencing temporary high load. Simpler queries will be processed first. Please retry in a few minutes."),
(r"\b(code|programming|python|javascript|function)\b", "I can help with code questions. The service is under high load — please try your specific question and I'll do my best."),
(r"\b(cancel|stop|abort|exit)\b", "Understood. Your session has been noted."),
]
GENERIC_FALLBACK = (
"I'm currently experiencing high load and cannot process complex requests. "
"Please try again in a few minutes, or rephrase your request more simply."
)
def get_static_fallback(user_message: str) -> str:
msg_lower = user_message.lower()
for pattern, response in FALLBACK_PATTERNS:
if re.search(pattern, msg_lower):
return response
return GENERIC_FALLBACK
def chat_with_static_fallback(
user_message: str,
model: str = "claude-haiku-4-5-20251001",
) -> tuple[str, bool]:
"""
Returns (response, used_fallback).
"""
try:
response = client.messages.create(
model=model,
max_tokens=512,
messages=[{"role": "user", "content": user_message}],
)
return response.content[0].text, False
except anthropic.APIStatusError as e:
if e.status_code in (529, 503):
fallback = get_static_fallback(user_message)
print(f" [Static fallback] HTTP {e.status_code}")
return fallback, True
raise
except anthropic.APIConnectionError:
return get_static_fallback(user_message), True
# Usage
queries = [
"Hello there!",
"Can you help me write a Python function?",
"Is the service working?",
"What is machine learning?",
]
for query in queries:
reply, degraded = chat_with_static_fallback(query)
status = "[DEGRADED]" if degraded else "[LIVE]"
print(f"{status} {query[:40]!r}")
print(f" → {reply[:100]}\n")
Expected Token Savings: Zero cost during outages. Static responses maintain user experience baseline without any API calls. Environment: Zero dependencies. Pattern list is maintainable in a config file.
Option 6: Observability-First Degradation with Alerting
Track availability metrics. Alert on degradation. Log every fallback for post-mortem analysis.
import time
import json
import sqlite3
import threading
import anthropic
from datetime import datetime, timezone
from dataclasses import dataclass
client = anthropic.Anthropic()
# Metrics store
metrics_conn = sqlite3.connect("model_availability.db", check_same_thread=False)
metrics_conn.execute("""
CREATE TABLE IF NOT EXISTS availability_log (
ts REAL,
model TEXT,
status TEXT, -- success | overloaded | error | fallback
latency_ms REAL,
http_code INTEGER
)
""")
metrics_conn.commit()
metrics_lock = threading.Lock()
def log_event(model: str, status: str, latency_ms: float, http_code: int = 0):
with metrics_lock:
metrics_conn.execute(
"INSERT INTO availability_log (ts, model, status, latency_ms, http_code) VALUES (?,?,?,?,?)",
(time.time(), model, status, latency_ms, http_code),
)
metrics_conn.commit()
def get_availability_report(window_minutes: int = 5) -> dict:
cutoff = time.time() - window_minutes * 60
rows = metrics_conn.execute(
"SELECT model, status, COUNT(*) FROM availability_log WHERE ts > ? GROUP BY model, status",
(cutoff,),
).fetchall()
report = {}
for model, status, count in rows:
report.setdefault(model, {})[status] = count
for model, stats in report.items():
total = sum(stats.values())
success = stats.get("success", 0)
report[model]["availability_pct"] = round(success / total * 100, 1) if total else 0
return report
def alert(message: str):
"""Hook this to PagerDuty/Slack/email in production."""
print(f"\n🚨 ALERT: {message}")
PRIMARY_MODEL = "claude-sonnet-4-6"
FALLBACK_MODEL = "claude-haiku-4-5-20251001"
ALERT_THRESHOLD_FAILURES = 3
_consecutive_failures = 0
def monitored_create(messages: list[dict], max_tokens: int = 512) -> tuple[str, str]:
"""
Returns (response_text, model_used).
Logs every event and alerts on sustained failures.
"""
global _consecutive_failures
for model in [PRIMARY_MODEL, FALLBACK_MODEL]:
start = time.monotonic()
try:
response = client.messages.create(
model=model,
max_tokens=max_tokens,
messages=messages,
)
latency = (time.monotonic() - start) * 1000
log_event(model, "success", latency)
_consecutive_failures = 0
return response.content[0].text, model
except anthropic.APIStatusError as e:
latency = (time.monotonic() - start) * 1000
status = "overloaded" if e.status_code in (529, 503) else "error"
log_event(model, status, latency, e.status_code)
_consecutive_failures += 1
if _consecutive_failures >= ALERT_THRESHOLD_FAILURES:
alert(
f"Model {model} has failed {_consecutive_failures} consecutive times "
f"(HTTP {e.status_code}). Check https://status.anthropic.com"
)
if model == PRIMARY_MODEL:
print(f" [Primary model {model} unavailable: HTTP {e.status_code}]")
print(f" [Falling back to {FALLBACK_MODEL}]")
log_event(FALLBACK_MODEL, "fallback", 0)
continue
raise
raise RuntimeError("All models failed")
def chat(user_message: str) -> str:
text, model = monitored_create(
[{"role": "user", "content": user_message}]
)
return text
# Usage + report
for i in range(3):
try:
reply = chat(f"Question {i}: What is {['Python', 'async', 'caching'][i]}?")
print(f"[{i}] {reply[:80]}")
except Exception as e:
print(f"[{i}] FAILED: {e}")
print("\n--- Availability Report (last 5 min) ---")
report = get_availability_report(5)
print(json.dumps(report, indent=2))
Expected Token Savings: Metrics reveal when fallback models are over-used — actionable signal to increase capacity or adjust routing thresholds.
Environment: SQLite metrics log. Wire alert() to your on-call system. Minimal overhead (~1ms per event).
| Option | Strategy | Latency Added | State | Best For |
|---|---|---|---|---|
| 1 | Model cascade | ~500ms per fallback | None | Simple, no infrastructure |
| 2 | Circuit breaker | ~0ms (skips open circuit) | In-memory | High-throughput services |
| 3 | Request queue | Variable (queued wait) | In-memory | Bursty traffic, no drops |
| 4 | Cache fallback | ~0ms cache hit | LRU dict | Repeated/FAQ queries |
| 5 | Static fallback patterns | ~0ms | None | Full outages, offline mode |
| 6 | Observability + alerting | ~1ms logging | SQLite | Production monitoring |
Wasting tokens on this error?
Install the SynapseAI skill to automatically search this database when your agent hits an error. Average savings: $2–5 per error incident.
clawhub install synapse-ai
Solved an error that's not here?
Share it and earn MoltCoin rewards.