Tipos de Falha em Agentes
Sistemas agênticos enfrentam um espectro de falhas muito mais amplo que software tradicional. Classificar corretamente o tipo de falha é o primeiro passo para escolher a estratégia de recuperação certa.
Falhas de Formato
- LLM retorna JSON inválido ou incompleto
- Campo obrigatório ausente na resposta
- Tipo errado (string onde esperava int)
- Schema Pydantic não validado
Solução: retry com prompt corretivo
Falhas de Tool Call
- API externa retorna 5xx
- Rate limit atingido (429)
- Timeout na execução da tool
- Parâmetros inválidos para a tool
Solução: retry com backoff exponencial
Falhas de Loop
- Agente fica preso chamando a mesma tool
- Loop de replanejamento infinito
- Ciclo entre dois agentes
- Contexto cresce sem progressão
Solução: contador de iterações + timeout
Falhas de Recurso
- Contexto excede janela máxima
- Custo por tarefa ultrapassa limite
- Memória insuficiente para processamento
- Concorrência máxima atingida
Solução: circuit breaker + escalação
Taxonomia de Severidade
Retry com Backoff Exponencial
Falhas transientes como timeouts de API ou rate limits são resolvíveis com retry. Mas retry sem controle pode piorar o problema — sobrecarregando o serviço que já está sob pressão. O backoff exponencial com jitter é o padrão correto.
Python — Retry com Backoff Exponencial
import asyncio
import random
import time
from functools import wraps
from typing import Type, Tuple
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exceptions: Tuple[Type[Exception], ...] = (Exception,),
jitter: bool = True
):
"""Decorator para retry com backoff exponencial e jitter."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exception = e
if attempt == max_retries:
raise # Esgotou retries
# Cálculo do delay: base * 2^attempt
delay = min(base_delay * (2 ** attempt), max_delay)
# Jitter: evita thundering herd
if jitter:
delay = delay * (0.5 + random.random() * 0.5)
print(f" Tentativa {attempt + 1}/{max_retries} falhou: {e}")
print(f" Aguardando {delay:.1f}s antes de retry...")
await asyncio.sleep(delay)
raise last_exception
return wrapper
return decorator
# Uso: decorador em chamadas de API
@retry_with_backoff(
max_retries=3,
base_delay=1.0,
exceptions=(TimeoutError, ConnectionError, RateLimitError)
)
async def call_llm_api(prompt: str) -> str:
return await openai_client.chat(prompt)
# Retry com contexto para falhas de formato
async def call_with_format_retry(
prompt: str,
schema: type,
max_format_retries: int = 2
) -> dict:
"""Retry específico para falhas de schema/formato."""
for attempt in range(max_format_retries + 1):
response = await call_llm_api(prompt)
try:
return schema.model_validate_json(response)
except ValidationError as e:
if attempt == max_format_retries:
raise
# Adiciona instrução corretiva ao prompt
prompt += f"\n\nAnterior inválido. Erro: {e}\nRetorne APENAS JSON válido."
1s
1ª tentativa
2s
2ª tentativa
4s
3ª tentativa
Por que Jitter?
Sem jitter, múltiplos agentes que falham simultaneamente vão fazer retry no mesmo instante, causando o "thundering herd" — todos atacando o serviço ao mesmo tempo. O jitter randomiza o delay entre 50% e 100% do valor calculado, espalhando as requisições no tempo.
Fallback Strategy
Quando o retry não resolve — o serviço está down, o agente principal não converge — é hora do fallback. Fallback é ter um plano B: agente alternativo, modelo diferente, resultado parcial com flag de incerteza, ou cache da última execução bem-sucedida.
Python — Fallback Chain
from dataclasses import dataclass
from typing import Optional, Any
from enum import Enum
class ResultConfidence(Enum):
HIGH = "high" # Resultado normal
MEDIUM = "medium" # Fallback parcial
LOW = "low" # Cache ou estimativa
FAILED = "failed" # Nenhum fallback funcionou
@dataclass
class AgentResult:
data: Any
confidence: ResultConfidence
source: str
partial: bool = False
metadata: dict = None
async def execute_with_fallback(task: dict) -> AgentResult:
"""
Cadeia de fallback:
1. Agente primário (GPT-4o)
2. Agente alternativo (Claude Sonnet)
3. Modelo menor com prompt simplificado
4. Cache da última execução bem-sucedida
5. Resultado parcial com flag de incerteza
"""
# Tentativa 1: Agente primário
try:
result = await primary_agent.execute(task)
return AgentResult(
data=result,
confidence=ResultConfidence.HIGH,
source="primary_agent"
)
except Exception as e:
log_failure("primary_agent", e)
# Tentativa 2: Agente alternativo com abordagem diferente
try:
result = await fallback_agent.execute(task, simplified=True)
return AgentResult(
data=result,
confidence=ResultConfidence.MEDIUM,
source="fallback_agent",
partial=True
)
except Exception as e:
log_failure("fallback_agent", e)
# Tentativa 3: Modelo menor, prompt simplificado
try:
simplified_task = simplify_task(task)
result = await lightweight_agent.execute(simplified_task)
return AgentResult(
data=result,
confidence=ResultConfidence.LOW,
source="lightweight_agent",
partial=True
)
except Exception as e:
log_failure("lightweight_agent", e)
# Tentativa 4: Cache da última execução
cached = await get_cached_result(task["task_id"])
if cached and not cache_is_stale(cached):
return AgentResult(
data=cached["data"],
confidence=ResultConfidence.LOW,
source="cache",
metadata={"cached_at": cached["timestamp"]}
)
# Nenhum fallback funcionou — resultado parcial forçado
return AgentResult(
data=None,
confidence=ResultConfidence.FAILED,
source="none",
metadata={"error": "all_fallbacks_exhausted"}
)
def simplify_task(task: dict) -> dict:
"""Reduz escopo da tarefa para aumentar chance de sucesso."""
return {
**task,
"scope": "minimal",
"depth": "surface",
"max_tokens": 500
}
Quando usar cada fallback
Circuit Breaker
O circuit breaker é o padrão de resiliência mais importante para sistemas distribuídos. Quando um serviço falha repetidamente, parar de chamá-lo temporariamente protege o sistema de cascata de falhas e dá ao serviço tempo para se recuperar.
Fechado
Chamadas passam normalmente. Monitora falhas.
Aberto
Bloqueia chamadas. Retorna erro imediato.
Meio-Aberto
Permite teste. Fecha se OK, reabre se falha.
Python — Circuit Breaker Implementation
import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any
class CircuitState(Enum):
CLOSED = "closed" # Normal
OPEN = "open" # Bloqueado
HALF_OPEN = "half_open" # Testando
@dataclass
class CircuitBreaker:
name: str
failure_threshold: int = 5 # Falhas para abrir
success_threshold: int = 2 # Sucessos para fechar
timeout: float = 60.0 # Segundos aberto antes de testar
# Estado interno
state: CircuitState = field(default=CircuitState.CLOSED)
failure_count: int = 0
success_count: int = 0
last_failure_time: float = 0
async def call(self, func: Callable, *args, **kwargs) -> Any:
if self.state == CircuitState.OPEN:
# Verifica se passou o timeout
if time.time() - self.last_failure_time > self.timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
print(f" CB '{self.name}': HALF_OPEN — testando...")
else:
raise CircuitOpenError(
f"Circuit '{self.name}' aberto. "
f"Tenta novamente em {self.timeout - (time.time() - self.last_failure_time):.0f}s"
)
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise
def _on_success(self):
self.failure_count = 0
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
print(f" CB '{self.name}': CLOSED — serviço recuperado!")
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
# Falhou no teste — volta a abrir
self.state = CircuitState.OPEN
print(f" CB '{self.name}': OPEN — serviço ainda instável")
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f" CB '{self.name}': OPEN após {self.failure_count} falhas!")
# Uso no orquestrador
llm_circuit = CircuitBreaker("openai_api", failure_threshold=5, timeout=30)
async def safe_llm_call(prompt: str) -> str:
try:
return await llm_circuit.call(openai_client.chat, prompt)
except CircuitOpenError:
# Circuit aberto — usa fallback imediato sem esperar
return await fallback_model.chat(prompt)
Métricas a monitorar
Expor o estado do circuit breaker via métricas (Prometheus/Datadog): número de aberturas por hora, tempo médio em estado OPEN, taxa de sucesso após HALF_OPEN. Esses dados revelam quais dependências são mais instáveis no seu sistema.
Human Escalation
Nem toda falha é resolvível por código. Quando o custo ultrapassa limite, a ação é irreversível ou a confiança no resultado é baixa demais, o sistema deve pausar e escalar para um humano. Isso não é fraqueza — é design responsável.
Critérios de Escalação
Custo acima do limite
Task consumiu mais de N tokens / R$ X. Parar antes de desperdiçar mais.
Ação irreversível detectada
Delete de dados, transferência financeira, email enviado. Requer aprovação.
Baixa confiança no resultado
Score de confiança abaixo de threshold (ex: 70%). Agente não tem certeza.
Contradição nos dados
Múltiplos agentes chegaram a conclusões opostas. Desempate humano necessário.
Python — Checkpoint de Aprovação
from dataclasses import dataclass
from typing import Literal
import asyncio
@dataclass
class EscalationCheckpoint:
task_id: str
reason: str
context: dict
action_proposed: str
risk_level: Literal["low", "medium", "high", "critical"]
class HumanEscalationManager:
def __init__(self, slack_webhook: str, approval_timeout: int = 3600):
self.slack_webhook = slack_webhook
self.approval_timeout = approval_timeout # segundos
self.pending_approvals: dict = {}
async def request_approval(
self,
checkpoint: EscalationCheckpoint
) -> bool:
"""
Pausa execução e aguarda aprovação humana.
Returns True se aprovado, False se rejeitado ou timeout.
"""
# 1. Notifica humano
await self._notify_slack(checkpoint)
# 2. Salva estado atual (para poder retomar)
await self._save_state(checkpoint.task_id)
# 3. Aguarda resposta com timeout
try:
approval = await asyncio.wait_for(
self._wait_for_approval(checkpoint.task_id),
timeout=self.approval_timeout
)
return approval
except asyncio.TimeoutError:
# Timeout: escalação automática para nível acima
await self._escalate_to_manager(checkpoint)
return False
async def _notify_slack(self, checkpoint: EscalationCheckpoint):
message = {
"blocks": [
{"type": "header", "text": {"type": "plain_text",
"text": f"Aprovação Necessária — {checkpoint.risk_level.upper()}"}},
{"type": "section", "text": {"type": "mrkdwn",
"text": f"*Motivo:* {checkpoint.reason}\n"
f"*Ação proposta:* {checkpoint.action_proposed}"}},
{"type": "actions", "elements": [
{"type": "button", "text": {"type": "plain_text", "text": "Aprovar"},
"style": "primary", "action_id": f"approve_{checkpoint.task_id}"},
{"type": "button", "text": {"type": "plain_text", "text": "Rejeitar"},
"style": "danger", "action_id": f"reject_{checkpoint.task_id}"}
]}
]
}
await send_slack_message(self.slack_webhook, message)
Princípio do Checkpoint Gradual
Defina thresholds crescentes: $0.10 → log, $1.00 → notificação, $10.00 → pausa automática. Ações reversíveis permitem mais autonomia; ações irreversíveis sempre requerem aprovação.
Recovery Patterns
Quando um agente falha no meio de uma tarefa longa, o desperdício de repetir tudo do zero pode ser inaceitável — tanto em custo quanto em tempo. Recovery patterns permitem retomar a execução a partir do último ponto estável.
Checkpointing
Salvar estado a cada etapa concluída. Em caso de falha, recarregar o último checkpoint e continuar de onde parou.
Replay Seletivo
Usar o log de eventos para identificar quais steps falharam e reexecutar apenas eles, mantendo resultados válidos.
Sagas
Para ações com efeito colateral: cada step tem uma compensação (rollback). Falha aciona compensações em ordem reversa.
Python — Checkpoint + Replay
import json
import redis.asyncio as redis
from dataclasses import dataclass, asdict
from typing import List, Optional
@dataclass
class StepResult:
step_id: str
status: str # "completed" | "failed" | "skipped"
output: dict
tokens_used: int
completed_at: float
class CheckpointedExecutor:
def __init__(self, task_id: str, redis_client: redis.Redis):
self.task_id = task_id
self.redis = redis_client
self.checkpoint_key = f"checkpoint:{task_id}"
async def execute_pipeline(self, steps: List[callable]) -> dict:
"""Executa pipeline com checkpointing automático."""
# Carrega checkpoint existente (se houver)
completed_steps = await self._load_checkpoint()
results = {}
for step in steps:
step_id = step.__name__
# Skip steps já completados
if step_id in completed_steps:
results[step_id] = completed_steps[step_id]
print(f" [SKIP] {step_id} — já completado no checkpoint")
continue
# Executa step
try:
output = await step(results)
# Salva checkpoint imediatamente
await self._save_checkpoint(step_id, output)
results[step_id] = output
print(f" [OK] {step_id} — checkpoint salvo")
except Exception as e:
print(f" [FAIL] {step_id} — {e}")
# Estado até aqui está salvo — pode retomar depois
raise ExecutionError(
f"Falha em '{step_id}'. "
f"Execute novamente para retomar do checkpoint."
)
# Pipeline completo — limpa checkpoint
await self.redis.delete(self.checkpoint_key)
return results
async def _save_checkpoint(self, step_id: str, output: dict):
existing = await self._load_checkpoint()
existing[step_id] = output
await self.redis.setex(
self.checkpoint_key,
86400, # TTL: 24 horas
json.dumps(existing)
)
async def _load_checkpoint(self) -> dict:
data = await self.redis.get(self.checkpoint_key)
return json.loads(data) if data else {}
Resumo: Qual Pattern para Cada Falha
| Tipo de Falha | Pattern Primário | Fallback |
|---|---|---|
| Timeout de API | Retry + backoff | Circuit breaker |
| Formato inválido | Retry com prompt corretivo | Agente alternativo |
| Serviço down | Circuit breaker + fallback | Cache |
| Custo estourado | Human escalation | Modelo menor |
| Falha no meio | Checkpoint + replay | Saga compensation |
Resumo do Módulo
Conceitos centrais
- → Taxonomia: transiente, recuperável, degradada, fatal
- → Retry com backoff exponencial e jitter
- → Fallback chain: primário → alternativo → cache → parcial
- → Circuit breaker: Fechado → Aberto → Meio-aberto
Na prática
- → Escalar quando custo, confiança ou reversibilidade exigem
- → Checkpointing para tarefas longas e custosas
- → Sagas para ações com efeito colateral irreversível
- → Monitorar circuit breakers como KPI de resiliência