Início / Trilha 4 — Multi-Agentes / Módulo 4.8 — Tratamento de Erros
8

Trilha 4 · Módulo 8

Tratamento de Erros

Agentes falham. A questão não é se vão falhar, mas como o sistema se recupera com graciosidade. Aprenda a classificar falhas, implementar retry inteligente, circuit breaker, fallback e escalação humana.

6 tópicos ~45 min Intermediário-Avançado Resiliência
1

Tipos de Falha em Agentes

Sistemas agênticos enfrentam um espectro de falhas muito mais amplo que software tradicional. Classificar corretamente o tipo de falha é o primeiro passo para escolher a estratégia de recuperação certa.

Falhas de Formato

  • LLM retorna JSON inválido ou incompleto
  • Campo obrigatório ausente na resposta
  • Tipo errado (string onde esperava int)
  • Schema Pydantic não validado

Solução: retry com prompt corretivo

Falhas de Tool Call

  • API externa retorna 5xx
  • Rate limit atingido (429)
  • Timeout na execução da tool
  • Parâmetros inválidos para a tool

Solução: retry com backoff exponencial

Falhas de Loop

  • Agente fica preso chamando a mesma tool
  • Loop de replanejamento infinito
  • Ciclo entre dois agentes
  • Contexto cresce sem progressão

Solução: contador de iterações + timeout

Falhas de Recurso

  • Contexto excede janela máxima
  • Custo por tarefa ultrapassa limite
  • Memória insuficiente para processamento
  • Concorrência máxima atingida

Solução: circuit breaker + escalação

Taxonomia de Severidade

Transiente Falha temporária, retry resolve. Ex: timeout de rede, rate limit
Recuperável Precisa de ajuste. Ex: formato errado, parâmetro inválido
Degradada Funciona parcialmente, ativar fallback. Ex: serviço indisponível
Fatal Precisa de intervenção humana. Ex: ação irreversível, custo explodido
2

Retry com Backoff Exponencial

Falhas transientes como timeouts de API ou rate limits são resolvíveis com retry. Mas retry sem controle pode piorar o problema — sobrecarregando o serviço que já está sob pressão. O backoff exponencial com jitter é o padrão correto.

Python — Retry com Backoff Exponencial

import asyncio
import random
import time
from functools import wraps
from typing import Type, Tuple

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exceptions: Tuple[Type[Exception], ...] = (Exception,),
    jitter: bool = True
):
    """Decorator para retry com backoff exponencial e jitter."""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            last_exception = None

            for attempt in range(max_retries + 1):
                try:
                    return await func(*args, **kwargs)

                except exceptions as e:
                    last_exception = e

                    if attempt == max_retries:
                        raise  # Esgotou retries

                    # Cálculo do delay: base * 2^attempt
                    delay = min(base_delay * (2 ** attempt), max_delay)

                    # Jitter: evita thundering herd
                    if jitter:
                        delay = delay * (0.5 + random.random() * 0.5)

                    print(f"  Tentativa {attempt + 1}/{max_retries} falhou: {e}")
                    print(f"  Aguardando {delay:.1f}s antes de retry...")
                    await asyncio.sleep(delay)

            raise last_exception
        return wrapper
    return decorator


# Uso: decorador em chamadas de API
@retry_with_backoff(
    max_retries=3,
    base_delay=1.0,
    exceptions=(TimeoutError, ConnectionError, RateLimitError)
)
async def call_llm_api(prompt: str) -> str:
    return await openai_client.chat(prompt)


# Retry com contexto para falhas de formato
async def call_with_format_retry(
    prompt: str,
    schema: type,
    max_format_retries: int = 2
) -> dict:
    """Retry específico para falhas de schema/formato."""

    for attempt in range(max_format_retries + 1):
        response = await call_llm_api(prompt)

        try:
            return schema.model_validate_json(response)
        except ValidationError as e:
            if attempt == max_format_retries:
                raise
            # Adiciona instrução corretiva ao prompt
            prompt += f"\n\nAnterior inválido. Erro: {e}\nRetorne APENAS JSON válido."
    

1s

1ª tentativa

2s

2ª tentativa

4s

3ª tentativa

Por que Jitter?

Sem jitter, múltiplos agentes que falham simultaneamente vão fazer retry no mesmo instante, causando o "thundering herd" — todos atacando o serviço ao mesmo tempo. O jitter randomiza o delay entre 50% e 100% do valor calculado, espalhando as requisições no tempo.

3

Fallback Strategy

Quando o retry não resolve — o serviço está down, o agente principal não converge — é hora do fallback. Fallback é ter um plano B: agente alternativo, modelo diferente, resultado parcial com flag de incerteza, ou cache da última execução bem-sucedida.

Python — Fallback Chain

from dataclasses import dataclass
from typing import Optional, Any
from enum import Enum

class ResultConfidence(Enum):
    HIGH = "high"       # Resultado normal
    MEDIUM = "medium"   # Fallback parcial
    LOW = "low"         # Cache ou estimativa
    FAILED = "failed"   # Nenhum fallback funcionou

@dataclass
class AgentResult:
    data: Any
    confidence: ResultConfidence
    source: str
    partial: bool = False
    metadata: dict = None


async def execute_with_fallback(task: dict) -> AgentResult:
    """
    Cadeia de fallback:
    1. Agente primário (GPT-4o)
    2. Agente alternativo (Claude Sonnet)
    3. Modelo menor com prompt simplificado
    4. Cache da última execução bem-sucedida
    5. Resultado parcial com flag de incerteza
    """

    # Tentativa 1: Agente primário
    try:
        result = await primary_agent.execute(task)
        return AgentResult(
            data=result,
            confidence=ResultConfidence.HIGH,
            source="primary_agent"
        )
    except Exception as e:
        log_failure("primary_agent", e)

    # Tentativa 2: Agente alternativo com abordagem diferente
    try:
        result = await fallback_agent.execute(task, simplified=True)
        return AgentResult(
            data=result,
            confidence=ResultConfidence.MEDIUM,
            source="fallback_agent",
            partial=True
        )
    except Exception as e:
        log_failure("fallback_agent", e)

    # Tentativa 3: Modelo menor, prompt simplificado
    try:
        simplified_task = simplify_task(task)
        result = await lightweight_agent.execute(simplified_task)
        return AgentResult(
            data=result,
            confidence=ResultConfidence.LOW,
            source="lightweight_agent",
            partial=True
        )
    except Exception as e:
        log_failure("lightweight_agent", e)

    # Tentativa 4: Cache da última execução
    cached = await get_cached_result(task["task_id"])
    if cached and not cache_is_stale(cached):
        return AgentResult(
            data=cached["data"],
            confidence=ResultConfidence.LOW,
            source="cache",
            metadata={"cached_at": cached["timestamp"]}
        )

    # Nenhum fallback funcionou — resultado parcial forçado
    return AgentResult(
        data=None,
        confidence=ResultConfidence.FAILED,
        source="none",
        metadata={"error": "all_fallbacks_exhausted"}
    )


def simplify_task(task: dict) -> dict:
    """Reduz escopo da tarefa para aumentar chance de sucesso."""
    return {
        **task,
        "scope": "minimal",
        "depth": "surface",
        "max_tokens": 500
    }
  

Quando usar cada fallback

Agente alternativo Agente primário não converge em N tentativas ou usa abordagem incompatível com o domínio
Modelo menor Custo explodiu ou API principal offline — usa haiku/mini com prompt mais direto
Cache Tarefa repetitiva, dados não críticos — resultado de 24h atrás pode ser aceitável
Resultado parcial Quando é melhor retornar algo com flag de incerteza do que falhar completamente
4

Circuit Breaker

O circuit breaker é o padrão de resiliência mais importante para sistemas distribuídos. Quando um serviço falha repetidamente, parar de chamá-lo temporariamente protege o sistema de cascata de falhas e dá ao serviço tempo para se recuperar.

F

Fechado

Chamadas passam normalmente. Monitora falhas.

A

Aberto

Bloqueia chamadas. Retorna erro imediato.

M

Meio-Aberto

Permite teste. Fecha se OK, reabre se falha.

Python — Circuit Breaker Implementation

import time
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any

class CircuitState(Enum):
    CLOSED = "closed"       # Normal
    OPEN = "open"           # Bloqueado
    HALF_OPEN = "half_open" # Testando

@dataclass
class CircuitBreaker:
    name: str
    failure_threshold: int = 5      # Falhas para abrir
    success_threshold: int = 2      # Sucessos para fechar
    timeout: float = 60.0           # Segundos aberto antes de testar

    # Estado interno
    state: CircuitState = field(default=CircuitState.CLOSED)
    failure_count: int = 0
    success_count: int = 0
    last_failure_time: float = 0

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        if self.state == CircuitState.OPEN:
            # Verifica se passou o timeout
            if time.time() - self.last_failure_time > self.timeout:
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                print(f"  CB '{self.name}': HALF_OPEN — testando...")
            else:
                raise CircuitOpenError(
                    f"Circuit '{self.name}' aberto. "
                    f"Tenta novamente em {self.timeout - (time.time() - self.last_failure_time):.0f}s"
                )

        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result

        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1
            if self.success_count >= self.success_threshold:
                self.state = CircuitState.CLOSED
                print(f"  CB '{self.name}': CLOSED — serviço recuperado!")

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.state == CircuitState.HALF_OPEN:
            # Falhou no teste — volta a abrir
            self.state = CircuitState.OPEN
            print(f"  CB '{self.name}': OPEN — serviço ainda instável")

        elif self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"  CB '{self.name}': OPEN após {self.failure_count} falhas!")


# Uso no orquestrador
llm_circuit = CircuitBreaker("openai_api", failure_threshold=5, timeout=30)

async def safe_llm_call(prompt: str) -> str:
    try:
        return await llm_circuit.call(openai_client.chat, prompt)
    except CircuitOpenError:
        # Circuit aberto — usa fallback imediato sem esperar
        return await fallback_model.chat(prompt)
  

Métricas a monitorar

Expor o estado do circuit breaker via métricas (Prometheus/Datadog): número de aberturas por hora, tempo médio em estado OPEN, taxa de sucesso após HALF_OPEN. Esses dados revelam quais dependências são mais instáveis no seu sistema.

5

Human Escalation

Nem toda falha é resolvível por código. Quando o custo ultrapassa limite, a ação é irreversível ou a confiança no resultado é baixa demais, o sistema deve pausar e escalar para um humano. Isso não é fraqueza — é design responsável.

Critérios de Escalação

!

Custo acima do limite

Task consumiu mais de N tokens / R$ X. Parar antes de desperdiçar mais.

!

Ação irreversível detectada

Delete de dados, transferência financeira, email enviado. Requer aprovação.

!

Baixa confiança no resultado

Score de confiança abaixo de threshold (ex: 70%). Agente não tem certeza.

!

Contradição nos dados

Múltiplos agentes chegaram a conclusões opostas. Desempate humano necessário.

Python — Checkpoint de Aprovação

from dataclasses import dataclass
from typing import Literal
import asyncio

@dataclass
class EscalationCheckpoint:
    task_id: str
    reason: str
    context: dict
    action_proposed: str
    risk_level: Literal["low", "medium", "high", "critical"]

class HumanEscalationManager:
    def __init__(self, slack_webhook: str, approval_timeout: int = 3600):
        self.slack_webhook = slack_webhook
        self.approval_timeout = approval_timeout  # segundos
        self.pending_approvals: dict = {}

    async def request_approval(
        self,
        checkpoint: EscalationCheckpoint
    ) -> bool:
        """
        Pausa execução e aguarda aprovação humana.
        Returns True se aprovado, False se rejeitado ou timeout.
        """
        # 1. Notifica humano
        await self._notify_slack(checkpoint)

        # 2. Salva estado atual (para poder retomar)
        await self._save_state(checkpoint.task_id)

        # 3. Aguarda resposta com timeout
        try:
            approval = await asyncio.wait_for(
                self._wait_for_approval(checkpoint.task_id),
                timeout=self.approval_timeout
            )
            return approval
        except asyncio.TimeoutError:
            # Timeout: escalação automática para nível acima
            await self._escalate_to_manager(checkpoint)
            return False

    async def _notify_slack(self, checkpoint: EscalationCheckpoint):
        message = {
            "blocks": [
                {"type": "header", "text": {"type": "plain_text",
                    "text": f"Aprovação Necessária — {checkpoint.risk_level.upper()}"}},
                {"type": "section", "text": {"type": "mrkdwn",
                    "text": f"*Motivo:* {checkpoint.reason}\n"
                            f"*Ação proposta:* {checkpoint.action_proposed}"}},
                {"type": "actions", "elements": [
                    {"type": "button", "text": {"type": "plain_text", "text": "Aprovar"},
                     "style": "primary", "action_id": f"approve_{checkpoint.task_id}"},
                    {"type": "button", "text": {"type": "plain_text", "text": "Rejeitar"},
                     "style": "danger", "action_id": f"reject_{checkpoint.task_id}"}
                ]}
            ]
        }
        await send_slack_message(self.slack_webhook, message)
  

Princípio do Checkpoint Gradual

Defina thresholds crescentes: $0.10 → log, $1.00 → notificação, $10.00 → pausa automática. Ações reversíveis permitem mais autonomia; ações irreversíveis sempre requerem aprovação.

6

Recovery Patterns

Quando um agente falha no meio de uma tarefa longa, o desperdício de repetir tudo do zero pode ser inaceitável — tanto em custo quanto em tempo. Recovery patterns permitem retomar a execução a partir do último ponto estável.

Checkpointing

Salvar estado a cada etapa concluída. Em caso de falha, recarregar o último checkpoint e continuar de onde parou.

Replay Seletivo

Usar o log de eventos para identificar quais steps falharam e reexecutar apenas eles, mantendo resultados válidos.

Sagas

Para ações com efeito colateral: cada step tem uma compensação (rollback). Falha aciona compensações em ordem reversa.

Python — Checkpoint + Replay

import json
import redis.asyncio as redis
from dataclasses import dataclass, asdict
from typing import List, Optional

@dataclass
class StepResult:
    step_id: str
    status: str  # "completed" | "failed" | "skipped"
    output: dict
    tokens_used: int
    completed_at: float

class CheckpointedExecutor:
    def __init__(self, task_id: str, redis_client: redis.Redis):
        self.task_id = task_id
        self.redis = redis_client
        self.checkpoint_key = f"checkpoint:{task_id}"

    async def execute_pipeline(self, steps: List[callable]) -> dict:
        """Executa pipeline com checkpointing automático."""

        # Carrega checkpoint existente (se houver)
        completed_steps = await self._load_checkpoint()
        results = {}

        for step in steps:
            step_id = step.__name__

            # Skip steps já completados
            if step_id in completed_steps:
                results[step_id] = completed_steps[step_id]
                print(f"  [SKIP] {step_id} — já completado no checkpoint")
                continue

            # Executa step
            try:
                output = await step(results)

                # Salva checkpoint imediatamente
                await self._save_checkpoint(step_id, output)
                results[step_id] = output
                print(f"  [OK]   {step_id} — checkpoint salvo")

            except Exception as e:
                print(f"  [FAIL] {step_id} — {e}")
                # Estado até aqui está salvo — pode retomar depois
                raise ExecutionError(
                    f"Falha em '{step_id}'. "
                    f"Execute novamente para retomar do checkpoint."
                )

        # Pipeline completo — limpa checkpoint
        await self.redis.delete(self.checkpoint_key)
        return results

    async def _save_checkpoint(self, step_id: str, output: dict):
        existing = await self._load_checkpoint()
        existing[step_id] = output
        await self.redis.setex(
            self.checkpoint_key,
            86400,  # TTL: 24 horas
            json.dumps(existing)
        )

    async def _load_checkpoint(self) -> dict:
        data = await self.redis.get(self.checkpoint_key)
        return json.loads(data) if data else {}
  

Resumo: Qual Pattern para Cada Falha

Tipo de Falha Pattern Primário Fallback
Timeout de API Retry + backoff Circuit breaker
Formato inválido Retry com prompt corretivo Agente alternativo
Serviço down Circuit breaker + fallback Cache
Custo estourado Human escalation Modelo menor
Falha no meio Checkpoint + replay Saga compensation

Resumo do Módulo

Conceitos centrais

  • Taxonomia: transiente, recuperável, degradada, fatal
  • Retry com backoff exponencial e jitter
  • Fallback chain: primário → alternativo → cache → parcial
  • Circuit breaker: Fechado → Aberto → Meio-aberto

Na prática

  • Escalar quando custo, confiança ou reversibilidade exigem
  • Checkpointing para tarefas longas e custosas
  • Sagas para ações com efeito colateral irreversível
  • Monitorar circuit breakers como KPI de resiliência