MÓDULO 5.4

👁️ Camada 4 — Supervisão e Replanejamento

Agentes falham, travam, e custam mais do que o esperado. A Camada 4 é o sistema nervoso do orquestrador: detecta problemas antes que se tornem crises e age autonomamente — ou escala para humanos quando necessário.

6
Tópicos
50
Minutos
Avançado
Nível
Código
Tipo
1

💓 Health Monitoring de Agentes

Como saber se um agente está trabalhando normalmente vs. travado vs. crashado? Sem health monitoring, o orquestrador pode esperar por um agente morto indefinidamente enquanto o prazo passa.

💚

Saudável

Heartbeat nos últimos 15s. Processando normalmente.

🟡

Lento

Heartbeat >15s. Pode estar com tarefa densa. Monitorar.

🔴

Morto

Sem heartbeat >60s. Marcar tarefa como FAILED e reagendar.

Implementação: AgentHealthMonitor

import asyncio
import time
from enum import Enum

class AgentHealth(Enum):
    HEALTHY = "healthy"
    SLOW    = "slow"
    DEAD    = "dead"

class AgentHealthMonitor:
    """Monitora saúde de todos os agentes ativos na sessão."""

    HEARTBEAT_INTERVAL = 10   # Agentes enviam heartbeat a cada 10s
    SLOW_THRESHOLD     = 25   # >25s sem heartbeat = lento
    DEAD_THRESHOLD     = 60   # >60s sem heartbeat = morto

    def __init__(self, redis: RedisStateManager, session_id: str):
        self.redis = redis
        self.session_id = session_id
        self.callbacks = {
            AgentHealth.SLOW: [],
            AgentHealth.DEAD: []
        }

    def on_slow(self, callback):
        """Registra callback para agente lento."""
        self.callbacks[AgentHealth.SLOW].append(callback)

    def on_dead(self, callback):
        """Registra callback para agente morto."""
        self.callbacks[AgentHealth.DEAD].append(callback)

    def check_agent(self, agent_id: str) -> AgentHealth:
        """Verifica saúde de um agente específico."""
        key = f"orch:{self.session_id}:heartbeat:{agent_id}"
        last_seen = self.redis.r.get(key)

        if not last_seen:
            return AgentHealth.DEAD

        elapsed = time.time() - float(last_seen)

        if elapsed > self.DEAD_THRESHOLD:
            return AgentHealth.DEAD
        elif elapsed > self.SLOW_THRESHOLD:
            return AgentHealth.SLOW
        else:
            return AgentHealth.HEALTHY

    async def monitor_loop(self, active_agents: set[str], interval: float = 15.0):
        """Loop assíncrono de monitoramento contínuo."""
        while True:
            for agent_id in list(active_agents):
                health = self.check_agent(agent_id)

                if health == AgentHealth.DEAD:
                    print(f"[Monitor] ALERTA: {agent_id} está morto! Disparando callbacks.")
                    for cb in self.callbacks[AgentHealth.DEAD]:
                        await cb(agent_id)
                    active_agents.discard(agent_id)

                elif health == AgentHealth.SLOW:
                    print(f"[Monitor] AVISO: {agent_id} está lento.")
                    for cb in self.callbacks[AgentHealth.SLOW]:
                        await cb(agent_id)

            await asyncio.sleep(interval)
2

⏱️ SLA Monitoring

Cada tipo de tarefa tem um SLA de tempo esperado. O supervisor monitora em tempo real e aciona alertas (ou replanning) quando tarefas ultrapassam seus limites.

SLAs por tipo de tarefa (exemplo)

Tipo SLA esperado Warning (150%) Critical (300%)
Pesquisa web30s45s90s
Análise de dados60s90s180s
Redação de relatório120s180s360s
Análise jurídica90s135s270s
class SLAMonitor:
    """Monitora cumprimento de SLAs por tipo de tarefa."""

    DEFAULT_SLAS = {
        "pesquisa":   {"expected": 30, "warn": 45,  "critical": 90},
        "analise":    {"expected": 60, "warn": 90,  "critical": 180},
        "redacao":    {"expected": 120,"warn": 180, "critical": 360},
        "juridico":   {"expected": 90, "warn": 135, "critical": 270},
        "formatacao": {"expected": 20, "warn": 30,  "critical": 60},
    }

    def __init__(self, custom_slas: dict = None):
        self.slas = {**self.DEFAULT_SLAS, **(custom_slas or {})}
        self.running_tasks: dict[str, float] = {}  # task_id → start_time
        self.sla_breaches: list[dict] = []

    def start_task(self, task_id: str, task_type: str):
        self.running_tasks[task_id] = {
            "start": time.time(),
            "type": task_type
        }

    def check_all(self) -> list[dict]:
        """Verifica todos os SLAs ativos. Retorna lista de violações."""
        violations = []
        now = time.time()

        for task_id, info in list(self.running_tasks.items()):
            elapsed = now - info["start"]
            task_type = info["type"]
            sla = self.slas.get(task_type, {"expected": 60, "warn": 90, "critical": 180})

            if elapsed > sla["critical"]:
                level = "critical"
            elif elapsed > sla["warn"]:
                level = "warning"
            else:
                continue

            violation = {
                "task_id": task_id,
                "task_type": task_type,
                "elapsed_sec": round(elapsed, 1),
                "sla_expected_sec": sla["expected"],
                "level": level,
                "overshoot_pct": round((elapsed / sla["expected"] - 1) * 100, 0)
            }
            violations.append(violation)
            self.sla_breaches.append({**violation, "detected_at": now})

        return violations

    def complete_task(self, task_id: str) -> float:
        """Marca tarefa como completa. Retorna duração em segundos."""
        if task_id in self.running_tasks:
            duration = time.time() - self.running_tasks.pop(task_id)["start"]
            return duration
        return 0.0
3

🔁 Detecção de Stall

Um agente em loop não está travado — está ativo, gerando tokens, consumindo custo. Mas não está progredindo. A detecção de stall identifica esse padrão antes que custe $100 de tokens.

Heurísticas de detecção de stall

from collections import Counter
import hashlib

class StallDetector:
    """Detecta quando um agente está preso em loop sem progresso."""

    def __init__(self,
                 max_repeated_actions: int = 3,
                 max_tokens_without_progress: int = 5000,
                 progress_check_interval: int = 5):
        self.max_repeated = max_repeated_actions
        self.max_tokens_stall = max_tokens_without_progress
        self.check_interval = progress_check_interval

        # Por task_id
        self.action_history: dict[str, list] = {}
        self.token_counts: dict[str, int] = {}
        self.last_progress_markers: dict[str, str] = {}

    def record_action(self, task_id: str, action_type: str,
                      action_input: str, tokens_used: int) -> bool:
        """
        Registra ação de agente. Retorna True se stall detectado.
        """
        if task_id not in self.action_history:
            self.action_history[task_id] = []
            self.token_counts[task_id] = 0

        # Fingerprint da ação (para detectar repetição)
        fingerprint = hashlib.md5(f"{action_type}:{action_input[:200]}".encode()).hexdigest()

        self.action_history[task_id].append(fingerprint)
        self.token_counts[task_id] += tokens_used

        # Heurística 1: Ações idênticas repetidas
        recent = self.action_history[task_id][-self.max_repeated:]
        if len(recent) == self.max_repeated and len(set(recent)) == 1:
            print(f"[StallDetector] STALL: {task_id} repetiu mesma ação {self.max_repeated}x")
            return True

        # Heurística 2: Muitos tokens sem mudança de estado (progress marker)
        current_marker = fingerprint
        last_marker = self.last_progress_markers.get(task_id, "")

        if current_marker == last_marker:
            if self.token_counts[task_id] > self.max_tokens_stall:
                print(f"[StallDetector] STALL: {task_id} sem progresso após "
                      f"{self.token_counts[task_id]} tokens")
                return True
        else:
            # Progresso detectado: reset contador de tokens
            self.last_progress_markers[task_id] = current_marker
            self.token_counts[task_id] = 0

        return False

    def reset_task(self, task_id: str):
        """Limpa histórico de uma tarefa (após retry)."""
        self.action_history.pop(task_id, None)
        self.token_counts.pop(task_id, None)
        self.last_progress_markers.pop(task_id, None)
4

🔃 Retry Inteligente

Retry ingênuo: tente de novo exatamente da mesma forma. Taxa de sucesso: idêntica ao primeiro erro. Retry inteligente adapta a abordagem com base no contexto do erro.

Retry com contexto

Injeta o erro anterior como contexto adicional: "Na tentativa anterior falhou porque X. Na nova tentativa, evite Y e tente Z."

Retry com modelo diferente

Tentativa 1 com Haiku falhou. Tentativa 2 com Sonnet — mais capaz para tarefas que exigem raciocínio mais profundo.

Retry com abordagem alternativa

Se a abordagem original falhou 2x, pedir ao LLM planejador para gerar uma abordagem alternativa para a mesma tarefa.

class IntelligentRetryHandler:
    """Retry que adapta a estratégia baseado no histórico de falhas."""

    MODEL_ESCALATION = [
        "claude-haiku-4-5",
        "claude-sonnet-4-6",
        "claude-opus-4-6",
    ]

    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
        self.retry_history: dict[str, list] = {}

    async def execute_with_retry(self, task: dict, execute_fn) -> dict:
        """
        Executa tarefa com retry inteligente.
        execute_fn: async function(task, context) → result
        """
        task_id = task["id"]
        self.retry_history[task_id] = []
        last_error = None

        for attempt in range(self.max_retries + 1):
            # Montar contexto de retry
            retry_context = self._build_retry_context(task_id, attempt, last_error)

            # Escalada de modelo conforme tentativas
            model = self.MODEL_ESCALATION[min(attempt, len(self.MODEL_ESCALATION) - 1)]
            task_with_model = {**task, "model_override": model}

            try:
                result = await execute_fn(task_with_model, retry_context)
                self.retry_history[task_id].append({
                    "attempt": attempt,
                    "success": True,
                    "model": model
                })
                return result

            except Exception as e:
                last_error = str(e)
                self.retry_history[task_id].append({
                    "attempt": attempt,
                    "success": False,
                    "error": last_error,
                    "model": model
                })
                print(f"[Retry] Task {task_id}: tentativa {attempt+1} falhou: {e}")

                if attempt < self.max_retries:
                    wait = 2 ** attempt  # backoff exponencial: 1s, 2s, 4s
                    await asyncio.sleep(wait)

        raise RuntimeError(
            f"Task {task_id} falhou após {self.max_retries + 1} tentativas. "
            f"Último erro: {last_error}"
        )

    def _build_retry_context(self, task_id: str, attempt: int, last_error: str) -> dict:
        """Constrói contexto adicional para próxima tentativa."""
        if attempt == 0 or not last_error:
            return {}

        history = self.retry_history.get(task_id, [])
        errors = [h["error"] for h in history if not h["success"]]

        return {
            "retry_attempt": attempt,
            "previous_errors": errors,
            "instruction": (
                f"Esta é a tentativa {attempt+1}. "
                f"Tentativas anteriores falharam com: {'; '.join(errors)}. "
                f"Tente uma abordagem diferente para evitar o mesmo problema."
            )
        }
5

🗺️ Replanning Automático

Quando o retry exauriu as tentativas e a tarefa ainda falha, o orquestrador aciona o replanner — um LLM que analisa o estado atual e o histórico de falhas para gerar uma abordagem completamente diferente.

class AutoReplanner:
    """Gera novo plano quando subtarefa falha repetidamente."""

    def __init__(self, max_replanning_depth: int = 2):
        self.max_depth = max_replanning_depth
        self.replanning_count: dict[str, int] = {}

    async def replan_task(self,
                          failed_task: dict,
                          failure_history: list[dict],
                          session_context: dict) -> list[dict]:
        """
        Gera subtarefas alternativas para substituir a tarefa que falhou.
        Retorna lista de novas tarefas a inserir no DAG.
        """
        task_id = failed_task["id"]
        depth = self.replanning_count.get(task_id, 0)

        if depth >= self.max_depth:
            raise RuntimeError(
                f"Replanning máximo atingido para {task_id}. Escalar para humano."
            )

        self.replanning_count[task_id] = depth + 1

        errors_summary = "\n".join([
            f"- Tentativa {i+1}: {f.get('error', 'sem detalhe')}"
            for i, f in enumerate(failure_history)
        ])

        response = client.messages.create(
            model="claude-opus-4-6",
            max_tokens=2048,
            messages=[{
                "role": "user",
                "content": f"""A seguinte tarefa falhou {len(failure_history)} vezes e precisa ser replanejada.

TAREFA ORIGINAL:
{json.dumps(failed_task, ensure_ascii=False)}

HISTÓRICO DE FALHAS:
{errors_summary}

CONTEXTO DA SESSÃO:
Objetivo: {session_context.get('objective', 'N/A')}
Tarefas já completas: {', '.join(session_context.get('completed_tasks', []))}

Gere uma lista de subtarefas ALTERNATIVAS que atinjam o mesmo objetivo
usando uma abordagem completamente diferente. Cada subtarefa no formato JSON:
{{"id": "R{task_id}_01", "titulo": "...", "descricao": "...",
  "agente_tipo": "...", "dependencias": [...], "duracao_estimada_seg": N}}"""
            }]
        )

        new_tasks = json.loads(response.content[0].text)
        print(f"[Replanner] Geradas {len(new_tasks)} subtarefas alternativas para {task_id}")
        return new_tasks
6

👤 Human Escalation Pipeline

Alguns problemas não podem ser resolvidos autonomamente. O pipeline de escalação humana define quando chamar, como notificar, e o que fazer enquanto espera a resposta.

Triggers de escalação humana

T1 Custo acima de X: execução ultrapassou $10 (ou budget configurado). Humano decide se continua.
T2 Ação irreversível: agente quer enviar email, deletar dado, fazer deploy — requer aprovação explícita.
T3 Confiança baixa: LLM retornou confidence <50% em decisão crítica — humano valida antes de continuar.
T4 Replanning exausto: sistema tentou tudo, ainda falha. Humano decide como prosseguir.
import aiohttp
import asyncio

class HumanEscalationPipeline:
    """Pipeline completo de escalação para humanos."""

    def __init__(self, slack_webhook: str, approval_timeout: int = 300):
        self.slack_webhook = slack_webhook
        self.approval_timeout = approval_timeout  # 5 minutos default
        self.pending_approvals: dict[str, asyncio.Event] = {}
        self.approval_results: dict[str, bool] = {}

    async def escalate(self,
                       session_id: str,
                       trigger: str,
                       context: dict,
                       default_on_timeout: bool = False) -> bool:
        """
        Escala para humano e aguarda resposta.
        Retorna True se aprovado, False se rejeitado/timeout.
        """
        approval_id = f"approval_{session_id}_{int(time.time())}"

        # 1. Notificar via Slack
        await self._send_slack_notification(approval_id, trigger, context)

        # 2. Criar evento de espera
        event = asyncio.Event()
        self.pending_approvals[approval_id] = event

        # 3. Aguardar aprovação com timeout
        try:
            await asyncio.wait_for(event.wait(), timeout=self.approval_timeout)
            result = self.approval_results.get(approval_id, default_on_timeout)
        except asyncio.TimeoutError:
            print(f"[Escalation] Timeout: nenhuma resposta em {self.approval_timeout}s. "
                  f"Usando default: {default_on_timeout}")
            result = default_on_timeout
        finally:
            self.pending_approvals.pop(approval_id, None)

        return result

    def approve(self, approval_id: str):
        """Chamado pela interface humana para aprovar."""
        self.approval_results[approval_id] = True
        if approval_id in self.pending_approvals:
            self.pending_approvals[approval_id].set()

    def reject(self, approval_id: str):
        """Chamado pela interface humana para rejeitar."""
        self.approval_results[approval_id] = False
        if approval_id in self.pending_approvals:
            self.pending_approvals[approval_id].set()

    async def _send_slack_notification(self, approval_id: str,
                                        trigger: str, context: dict):
        blocks = {
            "text": f":warning: Escalação de Agente — {trigger}",
            "blocks": [
                {"type": "section", "text": {"type": "mrkdwn",
                    "text": f"*Sessão:* `{context.get('session_id', 'N/A')}`\n"
                            f"*Trigger:* {trigger}\n"
                            f"*Contexto:* {str(context)[:500]}"}},
                {"type": "actions", "elements": [
                    {"type": "button", "text": {"type": "plain_text", "text": "Aprovar"},
                     "style": "primary", "value": f"approve:{approval_id}"},
                    {"type": "button", "text": {"type": "plain_text", "text": "Rejeitar"},
                     "style": "danger", "value": f"reject:{approval_id}"}
                ]}
            ]
        }
        async with aiohttp.ClientSession() as session:
            await session.post(self.slack_webhook, json=blocks)

Resumo do Módulo 5.4

Health monitoring com heartbeat distingue agente trabalhando vs. travado vs. morto
SLA monitoring converte "parece lento" em dados precisos: X% acima do SLA esperado
Detecção de stall por fingerprint de ação: para loops antes de queimarem $100 em tokens
Retry inteligente: contexto de erro + escalada de modelo + abordagem alternativa
Replanning automático gera subtarefas alternativas quando retry esgota
Pipeline de escalação humana com Slack, timeout e default behavior configurável