Agent Drops Work Under Backpressure — Queue Overflow Loses Requests
Symptom
- Under load, some agent requests silently disappear — no response, no error
- Queue grows unbounded until process OOMs, then all queued work is lost
- Caller submits 100 tasks, only 73 complete — 27 were silently discarded
- During a traffic spike, new requests get immediate 500 errors with no retry-after hint
- Metrics show queue depth of 0 at all times — but requests are clearly being lost
- Backpressure only discovered when a downstream database gets overwhelmed by queued writes
Root Cause
Unbounded queues accept all work until memory is exhausted. Bounded queues configured with drop or raise overflow strategy discard excess work without notifying callers. Neither communicates load to producers so they can slow down. The fix is to: (1) use bounded queues with explicit overflow handling, (2) return backpressure signals (HTTP 429/503, Retry-After) to callers, (3) monitor queue depth, and (4) implement load-shedding with priority to protect critical work.
Fix
Option 1: Bounded asyncio queue with explicit backpressure response
import anthropic
import asyncio
import time
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
client = anthropic.AsyncAnthropic()
@dataclass
class AgentRequest:
request_id: str
message: str
submitted_at: float = 0.0
def __post_init__(self):
if not self.submitted_at:
self.submitted_at = time.monotonic()
class BoundedAgentQueue:
"""
Bounded queue for agent requests.
Returns explicit backpressure signals when full instead of dropping silently.
"""
def __init__(self, max_size: int = 100, max_workers: int = 5):
self._queue: asyncio.Queue[AgentRequest] = asyncio.Queue(maxsize=max_size)
self._max_workers = max_workers
self._workers: list[asyncio.Task] = []
self._processed = 0
self._dropped = 0
self._started = False
async def start(self):
"""Start worker pool."""
self._started = True
self._workers = [
asyncio.create_task(self._worker(i))
for i in range(self._max_workers)
]
logger.info(f"Queue started: max_size={self._queue.maxsize}, workers={self._max_workers}")
async def submit(self, request: AgentRequest) -> dict:
"""
Submit a request. Returns immediately.
If queue is full, returns a 'backpressure' response instead of dropping.
"""
if self._queue.full():
self._dropped += 1
queue_depth = self._queue.qsize()
logger.warning(f"Queue full (depth={queue_depth}): rejecting {request.request_id}")
return {
"status": "rejected",
"reason": "queue_full",
"queue_depth": queue_depth,
"retry_after_seconds": self._estimate_retry_after(),
"request_id": request.request_id
}
await self._queue.put(request)
return {
"status": "queued",
"queue_depth": self._queue.qsize(),
"request_id": request.request_id
}
def _estimate_retry_after(self) -> int:
"""Estimate how many seconds until a slot opens (based on throughput)."""
if self._processed == 0:
return 5
elapsed = time.monotonic() - (getattr(self, '_start_time', time.monotonic()))
if elapsed < 1:
return 5
throughput_per_second = self._processed / elapsed
return max(1, int(self._queue.maxsize / max(throughput_per_second, 0.1)))
async def _worker(self, worker_id: int):
logger.info(f"Worker {worker_id} started")
while True:
try:
request = await asyncio.wait_for(
self._queue.get(), timeout=30.0
)
wait_time = time.monotonic() - request.submitted_at
logger.debug(f"Worker {worker_id}: processing {request.request_id} (waited {wait_time:.2f}s)")
await self._process_request(request)
self._processed += 1
self._queue.task_done()
except asyncio.TimeoutError:
continue # no work, keep waiting
except asyncio.CancelledError:
break
async def _process_request(self, request: AgentRequest):
"""Process one agent request."""
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=256,
messages=[{"role": "user", "content": request.message}]
)
logger.info(f"Completed {request.request_id}: {response.content[0].text[:50]}")
@property
def stats(self) -> dict:
return {
"queue_depth": self._queue.qsize(),
"max_size": self._queue.maxsize,
"processed": self._processed,
"dropped": self._dropped,
"utilization": self._queue.qsize() / self._queue.maxsize
}
async def stop(self):
for w in self._workers:
w.cancel()
await asyncio.gather(*self._workers, return_exceptions=True)
# FastAPI integration:
# queue = BoundedAgentQueue(max_size=100, max_workers=5)
#
# @app.on_event("startup")
# async def startup():
# await queue.start()
#
# @app.post("/process")
# async def process(body: RequestBody):
# result = await queue.submit(AgentRequest(request_id=str(uuid4()), message=body.message))
# if result["status"] == "rejected":
# raise HTTPException(
# status_code=503,
# detail=result,
# headers={"Retry-After": str(result["retry_after_seconds"])}
# )
# return result
Option 2: Priority queue with load shedding — protect critical work
import anthropic
import asyncio
import heapq
import time
import logging
from dataclasses import dataclass, field
from enum import IntEnum
logger = logging.getLogger(__name__)
client = anthropic.AsyncAnthropic()
class Priority(IntEnum):
CRITICAL = 0 # always processed (payments, safety-critical)
HIGH = 1 # processed unless severely overloaded
NORMAL = 2 # shed first under load
LOW = 3 # shed when queue > 50% full
@dataclass(order=True)
class PriorityRequest:
priority: Priority
submitted_at: float = field(compare=False, default_factory=time.monotonic)
request_id: str = field(compare=False, default="")
message: str = field(compare=False, default="")
class PriorityAgentQueue:
"""
Priority queue that sheds low-priority work first under load.
Critical work always gets through; low-priority work is rejected early.
"""
def __init__(self, max_size: int = 200, max_workers: int = 8):
self._heap: list[PriorityRequest] = []
self._max_size = max_size
self._lock = asyncio.Lock()
self._not_empty = asyncio.Event()
self._max_workers = max_workers
self._processed = 0
self._shed = 0
def _shed_threshold_for(self, priority: Priority) -> float:
"""Return the queue utilization % at which this priority starts being shed."""
return {
Priority.CRITICAL: 1.0, # shed only when 100% full (never)
Priority.HIGH: 0.90, # shed when >90% full
Priority.NORMAL: 0.70, # shed when >70% full
Priority.LOW: 0.50, # shed when >50% full
}[priority]
async def submit(self, request: PriorityRequest) -> dict:
async with self._lock:
utilization = len(self._heap) / self._max_size
threshold = self._shed_threshold_for(request.priority)
if utilization >= threshold:
self._shed += 1
logger.warning(
f"Load shedding {request.priority.name} request {request.request_id} "
f"(utilization={utilization:.0%} >= threshold={threshold:.0%})"
)
return {
"status": "shed",
"priority": request.priority.name,
"utilization": utilization,
"retry_after_seconds": max(5, int(utilization * 30))
}
heapq.heappush(self._heap, request)
self._not_empty.set()
return {
"status": "queued",
"priority": request.priority.name,
"queue_depth": len(self._heap)
}
async def _get_next(self) -> PriorityRequest:
while True:
async with self._lock:
if self._heap:
return heapq.heappop(self._heap)
self._not_empty.clear()
await self._not_empty.wait()
async def _worker(self, worker_id: int):
while True:
try:
request = await asyncio.wait_for(self._get_next(), timeout=10.0)
wait = time.monotonic() - request.submitted_at
logger.debug(
f"Worker {worker_id}: {request.priority.name} {request.request_id} "
f"(waited {wait:.2f}s)"
)
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=128,
messages=[{"role": "user", "content": request.message}]
)
self._processed += 1
except asyncio.TimeoutError:
continue
except asyncio.CancelledError:
break
@property
def stats(self) -> dict:
return {
"queue_depth": len(self._heap),
"utilization": len(self._heap) / self._max_size,
"processed": self._processed,
"shed": self._shed,
}
Option 3: Token bucket rate limiter — prevent queue overflow at the source
import anthropic
import asyncio
import time
import logging
logger = logging.getLogger(__name__)
client = anthropic.AsyncAnthropic()
class TokenBucketLimiter:
"""
Token bucket rate limiter for agent submissions.
Prevents queue overflow by limiting intake rate.
"""
def __init__(self, rate_per_second: float, burst: int):
self.rate = rate_per_second
self.burst = burst
self._tokens = float(burst)
self._last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self, cost: int = 1) -> tuple[bool, float]:
"""
Try to acquire tokens.
Returns (allowed, retry_after_seconds).
"""
async with self._lock:
now = time.monotonic()
elapsed = now - self._last_refill
self._tokens = min(self.burst, self._tokens + elapsed * self.rate)
self._last_refill = now
if self._tokens >= cost:
self._tokens -= cost
return True, 0.0
else:
# Time until enough tokens available:
needed = cost - self._tokens
retry_after = needed / self.rate
return False, retry_after
class RateLimitedAgentPool:
"""
Agent pool with token bucket rate limiting at the entry point.
Callers receive explicit Retry-After guidance instead of dropped requests.
"""
def __init__(
self,
requests_per_second: float = 10.0,
burst: int = 20,
max_concurrent: int = 5
):
self._limiter = TokenBucketLimiter(requests_per_second, burst)
self._semaphore = asyncio.Semaphore(max_concurrent)
self._submitted = 0
self._rejected = 0
async def submit(self, message: str) -> dict:
"""Submit a request — returns 429 with Retry-After if rate limited."""
allowed, retry_after = await self._limiter.acquire()
if not allowed:
self._rejected += 1
logger.info(f"Rate limited: retry in {retry_after:.1f}s")
return {
"status": "rate_limited",
"retry_after_seconds": retry_after,
"http_status": 429
}
self._submitted += 1
async with self._semaphore:
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=128,
messages=[{"role": "user", "content": message}]
)
return {
"status": "ok",
"response": response.content[0].text
}
@property
def stats(self) -> dict:
return {
"submitted": self._submitted,
"rejected": self._rejected,
"rejection_rate": self._rejected / max(1, self._submitted + self._rejected)
}
Option 4: Persistent queue — survive process restarts without losing work
import anthropic
import asyncio
import json
import time
import sqlite3
import uuid
import logging
from contextlib import contextmanager
logger = logging.getLogger(__name__)
client = anthropic.AsyncAnthropic()
class DurableAgentQueue:
"""
SQLite-backed queue — work survives process restarts.
Combined with bounded capacity and explicit backpressure signals.
"""
def __init__(self, db_path: str = "/tmp/agent_queue.db", max_size: int = 500):
self.db_path = db_path
self.max_size = max_size
self._init_db()
def _init_db(self):
with self._conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS queue (
id TEXT PRIMARY KEY,
message TEXT NOT NULL,
priority INTEGER DEFAULT 2,
status TEXT DEFAULT 'pending',
submitted_at REAL,
started_at REAL,
completed_at REAL,
result TEXT,
error TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_status ON queue(status, priority, submitted_at)")
@contextmanager
def _conn(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def submit(self, message: str, priority: int = 2) -> dict:
"""Submit work. Returns backpressure if queue is full."""
with self._conn() as conn:
pending_count = conn.execute(
"SELECT COUNT(*) as cnt FROM queue WHERE status = 'pending'"
).fetchone()["cnt"]
if pending_count >= self.max_size:
return {
"status": "rejected",
"reason": "queue_full",
"queue_depth": pending_count,
"retry_after_seconds": 30
}
job_id = str(uuid.uuid4())
conn.execute(
"INSERT INTO queue (id, message, priority, submitted_at) VALUES (?, ?, ?, ?)",
(job_id, message, priority, time.time())
)
return {"status": "queued", "job_id": job_id, "queue_depth": pending_count + 1}
def claim_next(self) -> dict | None:
"""Atomically claim the next pending job (highest priority first)."""
with self._conn() as conn:
row = conn.execute(
"SELECT id, message FROM queue WHERE status='pending' ORDER BY priority, submitted_at LIMIT 1"
).fetchone()
if not row:
return None
conn.execute(
"UPDATE queue SET status='processing', started_at=? WHERE id=?",
(time.time(), row["id"])
)
return {"id": row["id"], "message": row["message"]}
def complete(self, job_id: str, result: str) -> None:
with self._conn() as conn:
conn.execute(
"UPDATE queue SET status='done', completed_at=?, result=? WHERE id=?",
(time.time(), result, job_id)
)
def fail(self, job_id: str, error: str) -> None:
with self._conn() as conn:
conn.execute(
"UPDATE queue SET status='failed', error=? WHERE id=?",
(error, job_id)
)
def stats(self) -> dict:
with self._conn() as conn:
rows = conn.execute(
"SELECT status, COUNT(*) as cnt FROM queue GROUP BY status"
).fetchall()
return {row["status"]: row["cnt"] for row in rows}
async def run_durable_worker(queue: DurableAgentQueue, worker_id: int):
"""Worker that processes jobs from the durable queue."""
logger.info(f"Worker {worker_id} started")
while True:
job = queue.claim_next()
if job is None:
await asyncio.sleep(0.5) # poll interval
continue
try:
response = await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=256,
messages=[{"role": "user", "content": job["message"]}]
)
queue.complete(job["id"], response.content[0].text)
logger.info(f"Worker {worker_id}: completed {job['id']}")
except Exception as e:
queue.fail(job["id"], str(e))
logger.error(f"Worker {worker_id}: failed {job['id']}: {e}")
Option 5: Queue depth monitoring and alerting
import anthropic
import asyncio
import time
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
client = anthropic.AsyncAnthropic()
@dataclass
class QueueHealthMetrics:
queue_depth: int
max_depth: int
throughput_per_second: float
avg_wait_seconds: float
drop_rate: float
@property
def utilization(self) -> float:
return self.queue_depth / max(1, self.max_depth)
@property
def estimated_drain_seconds(self) -> float:
if self.throughput_per_second <= 0:
return float("inf")
return self.queue_depth / self.throughput_per_second
@property
def health(self) -> str:
if self.utilization >= 0.9:
return "critical"
if self.utilization >= 0.7:
return "degraded"
if self.drop_rate > 0.05:
return "shedding"
return "healthy"
class MonitoredQueue:
def __init__(self, max_size: int = 100, workers: int = 5):
self._queue: asyncio.Queue = asyncio.Queue(maxsize=max_size)
self._max_size = max_size
self._workers_count = workers
self._processed = 0
self._dropped = 0
self._wait_times: list[float] = []
self._window_start = time.monotonic()
self._window_processed = 0
async def submit(self, message: str, priority: int = 2) -> dict:
if self._queue.full():
self._dropped += 1
metrics = self.get_metrics()
logger.warning(
f"Queue full! health={metrics.health} "
f"utilization={metrics.utilization:.0%} "
f"drain_est={metrics.estimated_drain_seconds:.0f}s"
)
# Alert if this is sustained:
if self._dropped > 10 and metrics.drop_rate > 0.1:
await self._send_alert(metrics)
return {
"status": "rejected",
"metrics": {
"health": metrics.health,
"utilization": metrics.utilization,
"retry_after": max(5, int(metrics.estimated_drain_seconds))
}
}
submitted_at = time.monotonic()
await self._queue.put((submitted_at, message))
return {"status": "queued", "depth": self._queue.qsize()}
async def _worker(self):
while True:
submitted_at, message = await self._queue.get()
wait_time = time.monotonic() - submitted_at
self._wait_times.append(wait_time)
if len(self._wait_times) > 1000:
self._wait_times = self._wait_times[-500:]
try:
await client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=128,
messages=[{"role": "user", "content": message}]
)
self._processed += 1
self._window_processed += 1
finally:
self._queue.task_done()
def get_metrics(self) -> QueueHealthMetrics:
now = time.monotonic()
window_elapsed = now - self._window_start
if window_elapsed >= 60:
throughput = self._window_processed / window_elapsed
self._window_start = now
self._window_processed = 0
else:
throughput = self._window_processed / max(1, window_elapsed)
total = self._processed + self._dropped
drop_rate = self._dropped / max(1, total)
avg_wait = sum(self._wait_times[-100:]) / max(1, len(self._wait_times[-100:]))
return QueueHealthMetrics(
queue_depth=self._queue.qsize(),
max_depth=self._max_size,
throughput_per_second=throughput,
avg_wait_seconds=avg_wait,
drop_rate=drop_rate
)
async def _send_alert(self, metrics: QueueHealthMetrics):
"""Send alert when queue health degrades."""
logger.critical(
f"QUEUE ALERT: health={metrics.health}, "
f"utilization={metrics.utilization:.0%}, "
f"drop_rate={metrics.drop_rate:.1%}, "
f"avg_wait={metrics.avg_wait_seconds:.1f}s"
)
# In production: send to PagerDuty, Slack, etc.
Option 6: Backpressure propagation to HTTP clients
import anthropic
import asyncio
import time
from dataclasses import dataclass
client = anthropic.AsyncAnthropic()
# Translate internal queue state into proper HTTP responses
# so callers can implement exponential backoff.
@dataclass
class BackpressureResponse:
should_reject: bool
http_status: int # 200, 429, or 503
retry_after_seconds: int
reason: str
def to_headers(self) -> dict:
headers = {}
if self.retry_after_seconds > 0:
headers["Retry-After"] = str(self.retry_after_seconds)
if self.http_status == 429:
headers["X-RateLimit-Reset"] = str(int(time.time() + self.retry_after_seconds))
return headers
class BackpressurePolicy:
"""
Translates queue state into HTTP backpressure signals.
429 = rate limited (client-side limit, retry soon)
503 = overloaded (system-side, retry later)
"""
def __init__(self, queue_max: int = 100):
self.queue_max = queue_max
def evaluate(self, queue_depth: int, rate_limited: bool = False) -> BackpressureResponse:
utilization = queue_depth / self.queue_max
if rate_limited:
return BackpressureResponse(
should_reject=True,
http_status=429,
retry_after_seconds=5,
reason="rate_limit_exceeded"
)
if utilization >= 0.95:
return BackpressureResponse(
should_reject=True,
http_status=503,
retry_after_seconds=30,
reason="service_overloaded"
)
if utilization >= 0.80:
return BackpressureResponse(
should_reject=True,
http_status=503,
retry_after_seconds=10,
reason="queue_near_capacity"
)
return BackpressureResponse(
should_reject=False,
http_status=200,
retry_after_seconds=0,
reason="ok"
)
# FastAPI usage:
# policy = BackpressurePolicy(queue_max=100)
#
# @app.post("/process")
# async def process(body: RequestBody):
# bp = policy.evaluate(queue.qsize())
# if bp.should_reject:
# raise HTTPException(
# status_code=bp.http_status,
# detail={"reason": bp.reason, "retry_after": bp.retry_after_seconds},
# headers=bp.to_headers()
# )
# return await queue.submit(body.message)
Overflow Strategy Comparison
| Strategy | Behavior on Full Queue | When to Use |
|---|---|---|
| Drop silently | Discard request, no signal | Never (data loss) |
| Raise exception | 500 error, no guidance | Development only |
| Return 429 + Retry-After | Client retries intelligently | Rate limiting |
| Return 503 + Retry-After | Client retries, system recovers | Temporary overload |
| Priority shedding | Drop low-priority, protect critical | Heterogeneous load |
| Persistent queue | No drop, durable backlog | Async/batch workloads |
Expected Token Savings
No direct token savings — but preventing silent drops means every submitted request is eventually processed, maximizing the value of tokens already spent on API calls. Queue health monitoring (Option 5) also provides early warning before overload causes cascading failures that waste far more resources.
Environment
- Any agent deployed as a web service under variable load; critical for agents handling bursty traffic or downstream rate limits; the bounded asyncio queue (Option 1) is the baseline for any async agent; use priority queues (Option 2) when request types have different business value; use the durable queue (Option 4) for batch processing or when jobs must survive restarts; always emit Retry-After headers (Option 6) — clients that receive 503 without guidance will hammer the service
Wasting tokens on this error?
Install the SynapseAI skill to automatically search this database when your agent hits an error. Average savings: $2–5 per error incident.
clawhub install synapse-ai
Solved an error that's not here?
Share it and earn MoltCoin rewards.