Por que Observabilidade é Crítica em Orquestradores
Um orquestrador de IA em produção é uma caixa-preta em movimento: dezenas de agentes executando tarefas em paralelo, chamadas LLM caras acontecendo a cada segundo, loops de retry silenciosos consumindo budget. Sem observabilidade adequada, você descobre os problemas depois que o cliente reclamou — ou depois que a fatura chegou.
A diferença entre um sistema monitorado e um não-monitorado é a diferença entre detectar um agente preso em loop em 30 segundos versus descobrir 2 horas depois com $800 gastos.
Incidentes reais por falta de observabilidade
Os 3 pilares da observabilidade para IA
PrometheusMetricsCollector — Instrumentação do Orquestrador
Prometheus é o padrão de facto para métricas em sistemas distribuídos. Para orquestradores de IA, instrumentamos quatro tipos de métricas: Counters (incrementam), Gauges (valor atual), Histograms (distribuições de latência) e Summaries (percentis calculados no cliente).
# pip install prometheus-client asyncio
from prometheus_client import (
Counter, Histogram, Gauge, Summary,
start_http_server, CollectorRegistry, push_to_gateway
)
import time
import asyncio
from dataclasses import dataclass
from typing import Optional
from contextlib import asynccontextmanager
# ============================================================
# REGISTRY CENTRALIZADO DE MÉTRICAS
# ============================================================
class OrchestratorMetrics:
"""Todas as métricas do orquestrador em um único lugar."""
def __init__(self, namespace: str = "orchestrator"):
self.ns = namespace
# --- COUNTERS: sempre crescem ---
self.tasks_total = Counter(
f'{namespace}_tasks_total',
'Total de tarefas submetidas',
['status', 'agent_type', 'priority']
)
self.llm_calls_total = Counter(
f'{namespace}_llm_calls_total',
'Total de chamadas LLM',
['model', 'agent_id', 'success']
)
self.retry_total = Counter(
f'{namespace}_retry_total',
'Total de retries executados',
['reason', 'agent_type']
)
self.tokens_total = Counter(
f'{namespace}_tokens_total',
'Total de tokens consumidos',
['model', 'token_type'] # token_type: input/output
)
self.cost_total = Counter(
f'{namespace}_cost_usd_total',
'Custo total em USD',
['model', 'department']
)
# --- GAUGES: valor atual do sistema ---
self.active_agents = Gauge(
f'{namespace}_active_agents',
'Agentes atualmente ativos',
['agent_type', 'status']
)
self.queue_depth = Gauge(
f'{namespace}_queue_depth',
'Tarefas na fila de execução',
['priority']
)
self.active_sessions = Gauge(
f'{namespace}_active_sessions',
'Sessões de orquestração ativas'
)
self.budget_remaining_pct = Gauge(
f'{namespace}_budget_remaining_pct',
'Percentual de budget restante',
['department']
)
# --- HISTOGRAMS: distribuições de tempo ---
self.task_duration = Histogram(
f'{namespace}_task_duration_seconds',
'Duração de execução de tarefas',
['agent_type', 'task_type'],
buckets=[1, 5, 10, 30, 60, 120, 300, 600]
)
self.llm_latency = Histogram(
f'{namespace}_llm_latency_seconds',
'Latência de chamadas LLM',
['model'],
buckets=[0.5, 1, 2, 5, 10, 30, 60]
)
self.planning_duration = Histogram(
f'{namespace}_planning_duration_seconds',
'Tempo de planejamento (Camada 1)',
buckets=[0.1, 0.5, 1, 2, 5, 10]
)
self.routing_duration = Histogram(
f'{namespace}_routing_duration_seconds',
'Tempo de roteamento (Camada 2)',
buckets=[0.01, 0.05, 0.1, 0.5, 1]
)
def start_server(self, port: int = 8000):
"""Inicia servidor HTTP para Prometheus scraping."""
start_http_server(port)
print(f"Prometheus metrics server: http://localhost:{port}/metrics")
# Singleton global
metrics = OrchestratorMetrics()
# ============================================================
# CONTEXT MANAGERS PARA INSTRUMENTAÇÃO
# ============================================================
@asynccontextmanager
async def track_task_execution(agent_type: str, task_type: str, priority: str = "normal"):
"""Context manager para instrumentar execução de tarefa."""
start = time.time()
status = "success"
try:
yield
except Exception as e:
status = "error"
raise
finally:
duration = time.time() - start
metrics.task_duration.labels(
agent_type=agent_type,
task_type=task_type
).observe(duration)
metrics.tasks_total.labels(
status=status,
agent_type=agent_type,
priority=priority
).inc()
@asynccontextmanager
async def track_llm_call(model: str, agent_id: str):
"""Context manager para instrumentar chamadas LLM."""
start = time.time()
success = "true"
try:
yield
except Exception:
success = "false"
raise
finally:
latency = time.time() - start
metrics.llm_latency.labels(model=model).observe(latency)
metrics.llm_calls_total.labels(
model=model,
agent_id=agent_id,
success=success
).inc()
# ============================================================
# EXEMPLO DE USO NO ORQUESTRADOR
# ============================================================
async def execute_agent_task(agent_id: str, task: dict):
"""Exemplo de agente instrumentado."""
agent_type = task.get("agent_type", "generic")
task_type = task.get("type", "unknown")
model = task.get("model", "claude-3-haiku-20240307")
# Incrementa agentes ativos
metrics.active_agents.labels(
agent_type=agent_type,
status="running"
).inc()
try:
async with track_task_execution(agent_type, task_type):
async with track_llm_call(model, agent_id):
# Simula chamada LLM
await asyncio.sleep(0.1) # substituir por chamada real
# Registra tokens e custo
input_tokens = 500
output_tokens = 200
cost = (input_tokens * 0.25 + output_tokens * 1.25) / 1_000_000
metrics.tokens_total.labels(
model=model, token_type="input"
).inc(input_tokens)
metrics.tokens_total.labels(
model=model, token_type="output"
).inc(output_tokens)
metrics.cost_total.labels(
model=model,
department=task.get("department", "default")
).inc(cost)
finally:
metrics.active_agents.labels(
agent_type=agent_type,
status="running"
).dec()
Dica: Use labels com moderação. Cada combinação única de labels cria uma nova série temporal no Prometheus. Com 10 agents × 5 models × 3 statuses = 150 séries. Com 100 agents, já são 1.500 séries — pode explodir a memória do Prometheus.
SLIs e SLOs — Definindo Contratos de Qualidade
SLI (Service Level Indicator) é a métrica real que você mede. SLO (Service Level Objective) é o alvo que você se compromete a atingir. SLA (Service Level Agreement) é o contrato comercial com penalidades se o SLO for violado.
| Dimensão | SLI (O que medir) | SLO (Meta) |
|---|---|---|
| Disponibilidade | % requests com status 2xx | ≥ 99.5% |
| Taxa de Sucesso | tarefas_sucesso / tarefas_total | ≥ 95% |
| Latência P50 | Mediana do tempo de conclusão | ≤ 30s |
| Latência P99 | 99° percentil do tempo | ≤ 120s |
| Freshness | % dados com idade ≤ threshold | ≥ 98% |
| Custo por Tarefa | USD médio por tarefa completada | ≤ $0.15 |
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time
@dataclass
class SLODefinition:
name: str
description: str
target: float # 0.0 a 1.0 (ex: 0.995 = 99.5%)
window_hours: int = 24 # janela de avaliação
@dataclass
class SLOStatus:
slo: SLODefinition
current_value: float
is_compliant: bool
error_budget_remaining: float # quanto posso "gastar" antes de violar
burn_rate: float # velocidade de consumo do budget
class SLOManager:
"""Gerencia definições e avaliação de SLOs."""
SLOS: List[SLODefinition] = [
SLODefinition("task_success_rate", "Taxa de sucesso de tarefas", 0.95),
SLODefinition("latency_p99", "Latência P99 ≤ 120s", 0.99),
SLODefinition("availability", "Disponibilidade do sistema", 0.995),
SLODefinition("cost_per_task", "Custo médio ≤ $0.15", 0.90),
]
def __init__(self, prometheus_url: str):
self.prometheus_url = prometheus_url
def evaluate_task_success_rate(self, window_hours: int = 24) -> float:
"""Calcula taxa de sucesso via PromQL."""
# PromQL equivalente:
# sum(rate(orchestrator_tasks_total{status="success"}[24h]))
# / sum(rate(orchestrator_tasks_total[24h]))
# Retorna valor entre 0 e 1
query = f"""
sum(rate(orchestrator_tasks_total{{status="success"}}[{window_hours}h]))
/
sum(rate(orchestrator_tasks_total[{window_hours}h]))
"""
# Executar via requests.get(f"{self.prometheus_url}/query?query={query}")
return 0.962 # simulado
def calculate_error_budget(
self, slo: SLODefinition, current_value: float
) -> float:
"""
Error budget = quanto do SLO posso violar.
Ex: SLO 99.5% → error budget = 0.5% de falhas permitidas.
Se current é 99.2%, consumi 60% do budget.
"""
total_budget = 1.0 - slo.target
consumed = slo.target - current_value
if consumed <= 0:
return 1.0 # 100% do budget restante
remaining = (total_budget - consumed) / total_budget
return max(0.0, remaining)
def calculate_burn_rate(
self, current_1h: float, current_24h: float, slo_target: float
) -> float:
"""
Burn rate = velocidade de consumo do error budget.
Burn rate 1 = consumindo exatamente na taxa sustentável.
Burn rate 10 = consumindo 10x mais rápido → alerta crítico.
"""
hourly_budget = (1.0 - slo_target) / (30 * 24) # budget por hora (30 dias)
current_error_rate = 1.0 - current_1h
if hourly_budget == 0:
return 0
return current_error_rate / hourly_budget
def evaluate_all(self) -> List[SLOStatus]:
results = []
for slo in self.SLOS:
current = self.evaluate_task_success_rate(slo.window_hours)
budget_remaining = self.calculate_error_budget(slo, current)
burn_rate = self.calculate_burn_rate(current, current, slo.target)
results.append(SLOStatus(
slo=slo,
current_value=current,
is_compliant=current >= slo.target,
error_budget_remaining=budget_remaining,
burn_rate=burn_rate
))
return results
Prometheus + Grafana — Setup Completo
O stack Prometheus + Grafana é o padrão para visualização de métricas. Prometheus coleta e armazena; Grafana visualiza com dashboards interativos.
# docker-compose.yml — Stack de observabilidade completo
version: '3.8'
services:
# Seu orquestrador expõe métricas na porta 8000
orchestrator:
build: .
ports:
- "8000:8000" # app
- "8001:8001" # /metrics
environment:
- PROMETHEUS_PORT=8001
networks:
- monitoring
# Prometheus: coleta e armazena métricas
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- ./alerts.yml:/etc/prometheus/alerts.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=30d'
- '--web.enable-lifecycle'
networks:
- monitoring
# Grafana: dashboards e visualizações
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin123
- GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/orchestrator.json
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/var/lib/grafana/dashboards
- ./grafana/provisioning:/etc/grafana/provisioning
networks:
- monitoring
# AlertManager: roteamento de alertas
alertmanager:
image: prom/alertmanager:latest
ports:
- "9093:9093"
volumes:
- ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
networks:
- monitoring
volumes:
prometheus_data:
grafana_data:
networks:
monitoring:
driver: bridge
---
# prometheus.yml — Configuração de scraping
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alerts.yml"
alerting:
alertmanagers:
- static_configs:
- targets: ['alertmanager:9093']
scrape_configs:
- job_name: 'orchestrator'
static_configs:
- targets: ['orchestrator:8001']
scrape_interval: 10s
metrics_path: /metrics
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
# alerts.yml — Regras de alertas Prometheus
groups:
- name: orchestrator_alerts
interval: 30s
rules:
# Alerta: taxa de sucesso caindo
- alert: TaskSuccessRateLow
expr: |
(
sum(rate(orchestrator_tasks_total{status="success"}[5m]))
/ sum(rate(orchestrator_tasks_total[5m]))
) < 0.90
for: 2m
labels:
severity: critical
team: platform
annotations:
summary: "Taxa de sucesso abaixo de 90%"
description: "Taxa atual: {{ $value | humanizePercentage }}"
runbook: "https://wiki/runbooks/task-success-rate"
# Alerta: agente em loop (muito retry)
- alert: AgentRetryLoop
expr: |
rate(orchestrator_retry_total[5m]) > 5
for: 1m
labels:
severity: warning
annotations:
summary: "Agente {{ $labels.agent_type }} com alta taxa de retry"
# Alerta: budget quase esgotado
- alert: BudgetCritical
expr: orchestrator_budget_remaining_pct < 10
for: 0m
labels:
severity: critical
pager: "true"
annotations:
summary: "Budget crítico: {{ $value }}% restante"
# Alerta: latência P99 alta
- alert: HighLatencyP99
expr: |
histogram_quantile(0.99,
rate(orchestrator_task_duration_seconds_bucket[10m])
) > 120
for: 5m
labels:
severity: warning
annotations:
summary: "Latência P99 acima de 120s: {{ $value }}s"
# Alerta: burn rate acelerado
- alert: ErrorBudgetBurnRate
expr: |
(
1 - sum(rate(orchestrator_tasks_total{status="success"}[1h]))
/ sum(rate(orchestrator_tasks_total[1h]))
) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Error budget consumindo 10x mais rápido que o sustentável"
Dashboard Executivo — Métricas de Negócio
CEOs e CFOs não leem gráficos de P99. Eles querem saber: quantas tarefas completamos, qual o custo, qual o ROI. O dashboard executivo traduz métricas técnicas em linguagem de negócio.
from dataclasses import dataclass
from typing import Dict, List
import httpx
import asyncio
from datetime import datetime, timedelta
@dataclass
class BusinessMetricSnapshot:
timestamp: datetime
tasks_completed_today: int
tasks_failed_today: int
success_rate_pct: float
cost_today_usd: float
avg_cost_per_task: float
human_hours_saved: float # horas equivalentes de trabalho humano
roi_multiplier: float # quantas vezes o custo foi economizado
top_failure_reason: str
budget_remaining_pct: float
class ExecutiveDashboard:
"""Coleta métricas de negócio para dashboard executivo."""
# Custo médio de um analista humano por hora
HUMAN_HOURLY_RATE = 45.0 # USD
# Tempo médio de uma tarefa feita por humano
HUMAN_TASK_MINUTES = {
"research": 30,
"analysis": 45,
"report": 60,
"data_extraction": 20,
"competitor_analysis": 90,
}
def __init__(self, prometheus_url: str):
self.prom = prometheus_url
async def query(self, promql: str) -> float:
"""Executa query PromQL e retorna valor escalar."""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.prom}/api/v1/query",
params={"query": promql}
)
data = resp.json()
results = data.get("data", {}).get("result", [])
if results:
return float(results[0]["value"][1])
return 0.0
async def get_snapshot(self) -> BusinessMetricSnapshot:
"""Coleta snapshot atual de todas as métricas de negócio."""
# Queries paralelas para eficiência
(
tasks_success,
tasks_failed,
cost_today,
budget_pct
) = await asyncio.gather(
self.query('sum(increase(orchestrator_tasks_total{status="success"}[24h]))'),
self.query('sum(increase(orchestrator_tasks_total{status="error"}[24h]))'),
self.query('sum(increase(orchestrator_cost_usd_total[24h]))'),
self.query('min(orchestrator_budget_remaining_pct)')
)
total = tasks_success + tasks_failed
success_rate = (tasks_success / total * 100) if total > 0 else 0
avg_cost = (cost_today / tasks_success) if tasks_success > 0 else 0
# Calcular horas humanas economizadas
# Assume média de 35 minutos por tarefa para humano
human_hours_saved = (tasks_success * 35) / 60
# ROI: quanto economizou vs. custo do sistema
human_cost_equivalent = human_hours_saved * self.HUMAN_HOURLY_RATE
roi = (human_cost_equivalent / cost_today) if cost_today > 0 else 0
return BusinessMetricSnapshot(
timestamp=datetime.now(),
tasks_completed_today=int(tasks_success),
tasks_failed_today=int(tasks_failed),
success_rate_pct=success_rate,
cost_today_usd=cost_today,
avg_cost_per_task=avg_cost,
human_hours_saved=human_hours_saved,
roi_multiplier=roi,
top_failure_reason="context_limit", # do log analysis
budget_remaining_pct=budget_pct
)
def format_executive_report(self, snap: BusinessMetricSnapshot) -> str:
return f"""
RELATÓRIO EXECUTIVO — ORQUESTRADOR IA
Data: {snap.timestamp.strftime('%d/%m/%Y %H:%M')}
{'='*50}
PRODUÇÃO DO DIA
Tarefas concluídas: {snap.tasks_completed_today:,}
Taxa de sucesso: {snap.success_rate_pct:.1f}%
Tarefas falhas: {snap.tasks_failed_today}
FINANCEIRO
Custo hoje: ${snap.cost_today_usd:.2f}
Custo médio/tarefa: ${snap.avg_cost_per_task:.4f}
Budget restante: {snap.budget_remaining_pct:.0f}%
VALOR GERADO
Horas humanas salvas: {snap.human_hours_saved:.1f}h
Equivalente humano: ${snap.human_hours_saved * 45:.2f}
ROI do sistema: {snap.roi_multiplier:.1f}x
PRÓXIMAS AÇÕES
{'⚠️ ALERTA: Budget crítico' if snap.budget_remaining_pct < 20 else '✓ Budget saudável'}
{'⚠️ ALERTA: Taxa de sucesso baixa' if snap.success_rate_pct < 90 else '✓ Qualidade OK'}
"""
AlertManager — Sistema de Alertas com Roteamento Inteligente
AlertManager recebe alertas do Prometheus e roteia para os canais corretos: Slack para on-call, PagerDuty para críticos, e-mail para relatórios. O roteamento evita notification fatigue — nem todo alerta precisa acordar alguém às 3h da manhã.
# alertmanager.yml — Roteamento de alertas
global:
resolve_timeout: 5m
slack_api_url: 'https://hooks.slack.com/services/T.../B.../...'
# Templates de mensagem
templates:
- '/etc/alertmanager/templates/*.tmpl'
route:
# Regra default
receiver: 'slack-warnings'
group_by: ['alertname', 'severity']
group_wait: 30s
group_interval: 5m
repeat_interval: 4h
routes:
# Críticos → PagerDuty + Slack
- match:
severity: critical
receiver: 'pagerduty-critical'
continue: true # continua para próxima rota
- match:
severity: critical
receiver: 'slack-critical'
# Budget crítico → também alerta CFO
- match:
alertname: BudgetCritical
receiver: 'email-finance'
# Warnings → apenas Slack
- match:
severity: warning
receiver: 'slack-warnings'
receivers:
- name: 'slack-critical'
slack_configs:
- channel: '#alerts-critical'
title: '🚨 CRÍTICO: {{ .GroupLabels.alertname }}'
text: |
{{ range .Alerts }}
*{{ .Annotations.summary }}*
{{ .Annotations.description }}
Runbook: {{ .Annotations.runbook }}
{{ end }}
color: 'danger'
send_resolved: true
- name: 'slack-warnings'
slack_configs:
- channel: '#alerts-platform'
title: '⚠️ {{ .GroupLabels.alertname }}'
text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
color: 'warning'
- name: 'pagerduty-critical'
pagerduty_configs:
- routing_key: 'YOUR_PAGERDUTY_KEY'
severity: 'critical'
description: '{{ .GroupLabels.alertname }}: {{ .CommonAnnotations.summary }}'
- name: 'email-finance'
email_configs:
- to: 'cfo@empresa.com'
subject: '[URGENTE] Budget de IA - Ação Necessária'
body: |
O budget de IA atingiu nível crítico.
Ação necessária: revisar limites ou aprovar aumento.
inhibit_rules:
# Se há alerta crítico, suprime warnings relacionados
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname']
Resumo do Módulo 5.7
- • Prometheus com 4 tipos de métricas: Counter, Gauge, Histogram, Summary
- • SLIs medem qualidade real; SLOs definem metas; error budget quantifica quanto posso falhar
- • Burn rate > 10x → alerta imediato, mesmo que SLO ainda não violado
- • Dashboard executivo traduz latência P99 em "horas humanas salvas" e ROI
- • AlertManager roteia: críticos → PagerDuty, warnings → Slack, budget → CFO
- • Inhibit rules evitam notification fatigue quando sistemas estão em cascata