💓 Health Monitoring de Agentes
Como saber se um agente está trabalhando normalmente vs. travado vs. crashado? Sem health monitoring, o orquestrador pode esperar por um agente morto indefinidamente enquanto o prazo passa.
Saudável
Heartbeat nos últimos 15s. Processando normalmente.
Lento
Heartbeat >15s. Pode estar com tarefa densa. Monitorar.
Morto
Sem heartbeat >60s. Marcar tarefa como FAILED e reagendar.
Implementação: AgentHealthMonitor
import asyncio
import time
from enum import Enum
class AgentHealth(Enum):
HEALTHY = "healthy"
SLOW = "slow"
DEAD = "dead"
class AgentHealthMonitor:
"""Monitora saúde de todos os agentes ativos na sessão."""
HEARTBEAT_INTERVAL = 10 # Agentes enviam heartbeat a cada 10s
SLOW_THRESHOLD = 25 # >25s sem heartbeat = lento
DEAD_THRESHOLD = 60 # >60s sem heartbeat = morto
def __init__(self, redis: RedisStateManager, session_id: str):
self.redis = redis
self.session_id = session_id
self.callbacks = {
AgentHealth.SLOW: [],
AgentHealth.DEAD: []
}
def on_slow(self, callback):
"""Registra callback para agente lento."""
self.callbacks[AgentHealth.SLOW].append(callback)
def on_dead(self, callback):
"""Registra callback para agente morto."""
self.callbacks[AgentHealth.DEAD].append(callback)
def check_agent(self, agent_id: str) -> AgentHealth:
"""Verifica saúde de um agente específico."""
key = f"orch:{self.session_id}:heartbeat:{agent_id}"
last_seen = self.redis.r.get(key)
if not last_seen:
return AgentHealth.DEAD
elapsed = time.time() - float(last_seen)
if elapsed > self.DEAD_THRESHOLD:
return AgentHealth.DEAD
elif elapsed > self.SLOW_THRESHOLD:
return AgentHealth.SLOW
else:
return AgentHealth.HEALTHY
async def monitor_loop(self, active_agents: set[str], interval: float = 15.0):
"""Loop assíncrono de monitoramento contínuo."""
while True:
for agent_id in list(active_agents):
health = self.check_agent(agent_id)
if health == AgentHealth.DEAD:
print(f"[Monitor] ALERTA: {agent_id} está morto! Disparando callbacks.")
for cb in self.callbacks[AgentHealth.DEAD]:
await cb(agent_id)
active_agents.discard(agent_id)
elif health == AgentHealth.SLOW:
print(f"[Monitor] AVISO: {agent_id} está lento.")
for cb in self.callbacks[AgentHealth.SLOW]:
await cb(agent_id)
await asyncio.sleep(interval)
⏱️ SLA Monitoring
Cada tipo de tarefa tem um SLA de tempo esperado. O supervisor monitora em tempo real e aciona alertas (ou replanning) quando tarefas ultrapassam seus limites.
SLAs por tipo de tarefa (exemplo)
| Tipo | SLA esperado | Warning (150%) | Critical (300%) |
|---|---|---|---|
| Pesquisa web | 30s | 45s | 90s |
| Análise de dados | 60s | 90s | 180s |
| Redação de relatório | 120s | 180s | 360s |
| Análise jurídica | 90s | 135s | 270s |
class SLAMonitor:
"""Monitora cumprimento de SLAs por tipo de tarefa."""
DEFAULT_SLAS = {
"pesquisa": {"expected": 30, "warn": 45, "critical": 90},
"analise": {"expected": 60, "warn": 90, "critical": 180},
"redacao": {"expected": 120,"warn": 180, "critical": 360},
"juridico": {"expected": 90, "warn": 135, "critical": 270},
"formatacao": {"expected": 20, "warn": 30, "critical": 60},
}
def __init__(self, custom_slas: dict = None):
self.slas = {**self.DEFAULT_SLAS, **(custom_slas or {})}
self.running_tasks: dict[str, float] = {} # task_id → start_time
self.sla_breaches: list[dict] = []
def start_task(self, task_id: str, task_type: str):
self.running_tasks[task_id] = {
"start": time.time(),
"type": task_type
}
def check_all(self) -> list[dict]:
"""Verifica todos os SLAs ativos. Retorna lista de violações."""
violations = []
now = time.time()
for task_id, info in list(self.running_tasks.items()):
elapsed = now - info["start"]
task_type = info["type"]
sla = self.slas.get(task_type, {"expected": 60, "warn": 90, "critical": 180})
if elapsed > sla["critical"]:
level = "critical"
elif elapsed > sla["warn"]:
level = "warning"
else:
continue
violation = {
"task_id": task_id,
"task_type": task_type,
"elapsed_sec": round(elapsed, 1),
"sla_expected_sec": sla["expected"],
"level": level,
"overshoot_pct": round((elapsed / sla["expected"] - 1) * 100, 0)
}
violations.append(violation)
self.sla_breaches.append({**violation, "detected_at": now})
return violations
def complete_task(self, task_id: str) -> float:
"""Marca tarefa como completa. Retorna duração em segundos."""
if task_id in self.running_tasks:
duration = time.time() - self.running_tasks.pop(task_id)["start"]
return duration
return 0.0
🔁 Detecção de Stall
Um agente em loop não está travado — está ativo, gerando tokens, consumindo custo. Mas não está progredindo. A detecção de stall identifica esse padrão antes que custe $100 de tokens.
Heurísticas de detecção de stall
from collections import Counter
import hashlib
class StallDetector:
"""Detecta quando um agente está preso em loop sem progresso."""
def __init__(self,
max_repeated_actions: int = 3,
max_tokens_without_progress: int = 5000,
progress_check_interval: int = 5):
self.max_repeated = max_repeated_actions
self.max_tokens_stall = max_tokens_without_progress
self.check_interval = progress_check_interval
# Por task_id
self.action_history: dict[str, list] = {}
self.token_counts: dict[str, int] = {}
self.last_progress_markers: dict[str, str] = {}
def record_action(self, task_id: str, action_type: str,
action_input: str, tokens_used: int) -> bool:
"""
Registra ação de agente. Retorna True se stall detectado.
"""
if task_id not in self.action_history:
self.action_history[task_id] = []
self.token_counts[task_id] = 0
# Fingerprint da ação (para detectar repetição)
fingerprint = hashlib.md5(f"{action_type}:{action_input[:200]}".encode()).hexdigest()
self.action_history[task_id].append(fingerprint)
self.token_counts[task_id] += tokens_used
# Heurística 1: Ações idênticas repetidas
recent = self.action_history[task_id][-self.max_repeated:]
if len(recent) == self.max_repeated and len(set(recent)) == 1:
print(f"[StallDetector] STALL: {task_id} repetiu mesma ação {self.max_repeated}x")
return True
# Heurística 2: Muitos tokens sem mudança de estado (progress marker)
current_marker = fingerprint
last_marker = self.last_progress_markers.get(task_id, "")
if current_marker == last_marker:
if self.token_counts[task_id] > self.max_tokens_stall:
print(f"[StallDetector] STALL: {task_id} sem progresso após "
f"{self.token_counts[task_id]} tokens")
return True
else:
# Progresso detectado: reset contador de tokens
self.last_progress_markers[task_id] = current_marker
self.token_counts[task_id] = 0
return False
def reset_task(self, task_id: str):
"""Limpa histórico de uma tarefa (após retry)."""
self.action_history.pop(task_id, None)
self.token_counts.pop(task_id, None)
self.last_progress_markers.pop(task_id, None)
🔃 Retry Inteligente
Retry ingênuo: tente de novo exatamente da mesma forma. Taxa de sucesso: idêntica ao primeiro erro. Retry inteligente adapta a abordagem com base no contexto do erro.
Retry com contexto
Injeta o erro anterior como contexto adicional: "Na tentativa anterior falhou porque X. Na nova tentativa, evite Y e tente Z."
Retry com modelo diferente
Tentativa 1 com Haiku falhou. Tentativa 2 com Sonnet — mais capaz para tarefas que exigem raciocínio mais profundo.
Retry com abordagem alternativa
Se a abordagem original falhou 2x, pedir ao LLM planejador para gerar uma abordagem alternativa para a mesma tarefa.
class IntelligentRetryHandler:
"""Retry que adapta a estratégia baseado no histórico de falhas."""
MODEL_ESCALATION = [
"claude-haiku-4-5",
"claude-sonnet-4-6",
"claude-opus-4-6",
]
def __init__(self, max_retries: int = 3):
self.max_retries = max_retries
self.retry_history: dict[str, list] = {}
async def execute_with_retry(self, task: dict, execute_fn) -> dict:
"""
Executa tarefa com retry inteligente.
execute_fn: async function(task, context) → result
"""
task_id = task["id"]
self.retry_history[task_id] = []
last_error = None
for attempt in range(self.max_retries + 1):
# Montar contexto de retry
retry_context = self._build_retry_context(task_id, attempt, last_error)
# Escalada de modelo conforme tentativas
model = self.MODEL_ESCALATION[min(attempt, len(self.MODEL_ESCALATION) - 1)]
task_with_model = {**task, "model_override": model}
try:
result = await execute_fn(task_with_model, retry_context)
self.retry_history[task_id].append({
"attempt": attempt,
"success": True,
"model": model
})
return result
except Exception as e:
last_error = str(e)
self.retry_history[task_id].append({
"attempt": attempt,
"success": False,
"error": last_error,
"model": model
})
print(f"[Retry] Task {task_id}: tentativa {attempt+1} falhou: {e}")
if attempt < self.max_retries:
wait = 2 ** attempt # backoff exponencial: 1s, 2s, 4s
await asyncio.sleep(wait)
raise RuntimeError(
f"Task {task_id} falhou após {self.max_retries + 1} tentativas. "
f"Último erro: {last_error}"
)
def _build_retry_context(self, task_id: str, attempt: int, last_error: str) -> dict:
"""Constrói contexto adicional para próxima tentativa."""
if attempt == 0 or not last_error:
return {}
history = self.retry_history.get(task_id, [])
errors = [h["error"] for h in history if not h["success"]]
return {
"retry_attempt": attempt,
"previous_errors": errors,
"instruction": (
f"Esta é a tentativa {attempt+1}. "
f"Tentativas anteriores falharam com: {'; '.join(errors)}. "
f"Tente uma abordagem diferente para evitar o mesmo problema."
)
}
🗺️ Replanning Automático
Quando o retry exauriu as tentativas e a tarefa ainda falha, o orquestrador aciona o replanner — um LLM que analisa o estado atual e o histórico de falhas para gerar uma abordagem completamente diferente.
class AutoReplanner:
"""Gera novo plano quando subtarefa falha repetidamente."""
def __init__(self, max_replanning_depth: int = 2):
self.max_depth = max_replanning_depth
self.replanning_count: dict[str, int] = {}
async def replan_task(self,
failed_task: dict,
failure_history: list[dict],
session_context: dict) -> list[dict]:
"""
Gera subtarefas alternativas para substituir a tarefa que falhou.
Retorna lista de novas tarefas a inserir no DAG.
"""
task_id = failed_task["id"]
depth = self.replanning_count.get(task_id, 0)
if depth >= self.max_depth:
raise RuntimeError(
f"Replanning máximo atingido para {task_id}. Escalar para humano."
)
self.replanning_count[task_id] = depth + 1
errors_summary = "\n".join([
f"- Tentativa {i+1}: {f.get('error', 'sem detalhe')}"
for i, f in enumerate(failure_history)
])
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{
"role": "user",
"content": f"""A seguinte tarefa falhou {len(failure_history)} vezes e precisa ser replanejada.
TAREFA ORIGINAL:
{json.dumps(failed_task, ensure_ascii=False)}
HISTÓRICO DE FALHAS:
{errors_summary}
CONTEXTO DA SESSÃO:
Objetivo: {session_context.get('objective', 'N/A')}
Tarefas já completas: {', '.join(session_context.get('completed_tasks', []))}
Gere uma lista de subtarefas ALTERNATIVAS que atinjam o mesmo objetivo
usando uma abordagem completamente diferente. Cada subtarefa no formato JSON:
{{"id": "R{task_id}_01", "titulo": "...", "descricao": "...",
"agente_tipo": "...", "dependencias": [...], "duracao_estimada_seg": N}}"""
}]
)
new_tasks = json.loads(response.content[0].text)
print(f"[Replanner] Geradas {len(new_tasks)} subtarefas alternativas para {task_id}")
return new_tasks
👤 Human Escalation Pipeline
Alguns problemas não podem ser resolvidos autonomamente. O pipeline de escalação humana define quando chamar, como notificar, e o que fazer enquanto espera a resposta.
Triggers de escalação humana
import aiohttp
import asyncio
class HumanEscalationPipeline:
"""Pipeline completo de escalação para humanos."""
def __init__(self, slack_webhook: str, approval_timeout: int = 300):
self.slack_webhook = slack_webhook
self.approval_timeout = approval_timeout # 5 minutos default
self.pending_approvals: dict[str, asyncio.Event] = {}
self.approval_results: dict[str, bool] = {}
async def escalate(self,
session_id: str,
trigger: str,
context: dict,
default_on_timeout: bool = False) -> bool:
"""
Escala para humano e aguarda resposta.
Retorna True se aprovado, False se rejeitado/timeout.
"""
approval_id = f"approval_{session_id}_{int(time.time())}"
# 1. Notificar via Slack
await self._send_slack_notification(approval_id, trigger, context)
# 2. Criar evento de espera
event = asyncio.Event()
self.pending_approvals[approval_id] = event
# 3. Aguardar aprovação com timeout
try:
await asyncio.wait_for(event.wait(), timeout=self.approval_timeout)
result = self.approval_results.get(approval_id, default_on_timeout)
except asyncio.TimeoutError:
print(f"[Escalation] Timeout: nenhuma resposta em {self.approval_timeout}s. "
f"Usando default: {default_on_timeout}")
result = default_on_timeout
finally:
self.pending_approvals.pop(approval_id, None)
return result
def approve(self, approval_id: str):
"""Chamado pela interface humana para aprovar."""
self.approval_results[approval_id] = True
if approval_id in self.pending_approvals:
self.pending_approvals[approval_id].set()
def reject(self, approval_id: str):
"""Chamado pela interface humana para rejeitar."""
self.approval_results[approval_id] = False
if approval_id in self.pending_approvals:
self.pending_approvals[approval_id].set()
async def _send_slack_notification(self, approval_id: str,
trigger: str, context: dict):
blocks = {
"text": f":warning: Escalação de Agente — {trigger}",
"blocks": [
{"type": "section", "text": {"type": "mrkdwn",
"text": f"*Sessão:* `{context.get('session_id', 'N/A')}`\n"
f"*Trigger:* {trigger}\n"
f"*Contexto:* {str(context)[:500]}"}},
{"type": "actions", "elements": [
{"type": "button", "text": {"type": "plain_text", "text": "Aprovar"},
"style": "primary", "value": f"approve:{approval_id}"},
{"type": "button", "text": {"type": "plain_text", "text": "Rejeitar"},
"style": "danger", "value": f"reject:{approval_id}"}
]}
]
}
async with aiohttp.ClientSession() as session:
await session.post(self.slack_webhook, json=blocks)