MÓDULO 5.7 Camada 6a Observabilidade

📊 Camada 6a — Observabilidade: Métricas

Métricas técnicas e de negócio, SLIs/SLOs, Prometheus + Grafana, dashboards executivos e sistema de alertas para orquestradores de IA em produção.

6
Seções
~50min
Duração
Avançado
Nível
C6a
Camada
1

Por que Observabilidade é Crítica em Orquestradores

Um orquestrador de IA em produção é uma caixa-preta em movimento: dezenas de agentes executando tarefas em paralelo, chamadas LLM caras acontecendo a cada segundo, loops de retry silenciosos consumindo budget. Sem observabilidade adequada, você descobre os problemas depois que o cliente reclamou — ou depois que a fatura chegou.

A diferença entre um sistema monitorado e um não-monitorado é a diferença entre detectar um agente preso em loop em 30 segundos versus descobrir 2 horas depois com $800 gastos.

Incidentes reais por falta de observabilidade

Caso 1: Agente de pesquisa em loop infinito por 4 horas. Sem alertas configurados. Custo: $1.200. Detectado manualmente ao verificar billing.
Caso 2: Taxa de sucesso caiu de 94% para 67% após deploy. Sem dashboard de SLO, ninguém percebeu por 3 dias. 200+ tarefas falharam silenciosamente.
Caso 3: Latência P99 subiu de 8s para 45s após mudança de modelo. Usuários abandonaram o produto. Rollback feito 6 horas depois.

Os 3 pilares da observabilidade para IA

📊 Métricas
Números agregados no tempo. Taxa de sucesso, latência P50/P99, custo/hora, throughput. Respondem "o que está acontecendo agora?"
🔍 Traces
Rastreamento de requests individuais end-to-end. Mostra exatamente onde uma tarefa específica travou. Próximo módulo.
📋 Logs
Eventos estruturados com contexto completo. Auditoria, debug, compliance. Coberto no módulo 5.5.
2

PrometheusMetricsCollector — Instrumentação do Orquestrador

Prometheus é o padrão de facto para métricas em sistemas distribuídos. Para orquestradores de IA, instrumentamos quatro tipos de métricas: Counters (incrementam), Gauges (valor atual), Histograms (distribuições de latência) e Summaries (percentis calculados no cliente).

# pip install prometheus-client asyncio

from prometheus_client import (
    Counter, Histogram, Gauge, Summary,
    start_http_server, CollectorRegistry, push_to_gateway
)
import time
import asyncio
from dataclasses import dataclass
from typing import Optional
from contextlib import asynccontextmanager

# ============================================================
# REGISTRY CENTRALIZADO DE MÉTRICAS
# ============================================================

class OrchestratorMetrics:
    """Todas as métricas do orquestrador em um único lugar."""

    def __init__(self, namespace: str = "orchestrator"):
        self.ns = namespace

        # --- COUNTERS: sempre crescem ---
        self.tasks_total = Counter(
            f'{namespace}_tasks_total',
            'Total de tarefas submetidas',
            ['status', 'agent_type', 'priority']
        )

        self.llm_calls_total = Counter(
            f'{namespace}_llm_calls_total',
            'Total de chamadas LLM',
            ['model', 'agent_id', 'success']
        )

        self.retry_total = Counter(
            f'{namespace}_retry_total',
            'Total de retries executados',
            ['reason', 'agent_type']
        )

        self.tokens_total = Counter(
            f'{namespace}_tokens_total',
            'Total de tokens consumidos',
            ['model', 'token_type']  # token_type: input/output
        )

        self.cost_total = Counter(
            f'{namespace}_cost_usd_total',
            'Custo total em USD',
            ['model', 'department']
        )

        # --- GAUGES: valor atual do sistema ---
        self.active_agents = Gauge(
            f'{namespace}_active_agents',
            'Agentes atualmente ativos',
            ['agent_type', 'status']
        )

        self.queue_depth = Gauge(
            f'{namespace}_queue_depth',
            'Tarefas na fila de execução',
            ['priority']
        )

        self.active_sessions = Gauge(
            f'{namespace}_active_sessions',
            'Sessões de orquestração ativas'
        )

        self.budget_remaining_pct = Gauge(
            f'{namespace}_budget_remaining_pct',
            'Percentual de budget restante',
            ['department']
        )

        # --- HISTOGRAMS: distribuições de tempo ---
        self.task_duration = Histogram(
            f'{namespace}_task_duration_seconds',
            'Duração de execução de tarefas',
            ['agent_type', 'task_type'],
            buckets=[1, 5, 10, 30, 60, 120, 300, 600]
        )

        self.llm_latency = Histogram(
            f'{namespace}_llm_latency_seconds',
            'Latência de chamadas LLM',
            ['model'],
            buckets=[0.5, 1, 2, 5, 10, 30, 60]
        )

        self.planning_duration = Histogram(
            f'{namespace}_planning_duration_seconds',
            'Tempo de planejamento (Camada 1)',
            buckets=[0.1, 0.5, 1, 2, 5, 10]
        )

        self.routing_duration = Histogram(
            f'{namespace}_routing_duration_seconds',
            'Tempo de roteamento (Camada 2)',
            buckets=[0.01, 0.05, 0.1, 0.5, 1]
        )

    def start_server(self, port: int = 8000):
        """Inicia servidor HTTP para Prometheus scraping."""
        start_http_server(port)
        print(f"Prometheus metrics server: http://localhost:{port}/metrics")


# Singleton global
metrics = OrchestratorMetrics()


# ============================================================
# CONTEXT MANAGERS PARA INSTRUMENTAÇÃO
# ============================================================

@asynccontextmanager
async def track_task_execution(agent_type: str, task_type: str, priority: str = "normal"):
    """Context manager para instrumentar execução de tarefa."""
    start = time.time()
    status = "success"
    try:
        yield
    except Exception as e:
        status = "error"
        raise
    finally:
        duration = time.time() - start
        metrics.task_duration.labels(
            agent_type=agent_type,
            task_type=task_type
        ).observe(duration)
        metrics.tasks_total.labels(
            status=status,
            agent_type=agent_type,
            priority=priority
        ).inc()


@asynccontextmanager
async def track_llm_call(model: str, agent_id: str):
    """Context manager para instrumentar chamadas LLM."""
    start = time.time()
    success = "true"
    try:
        yield
    except Exception:
        success = "false"
        raise
    finally:
        latency = time.time() - start
        metrics.llm_latency.labels(model=model).observe(latency)
        metrics.llm_calls_total.labels(
            model=model,
            agent_id=agent_id,
            success=success
        ).inc()


# ============================================================
# EXEMPLO DE USO NO ORQUESTRADOR
# ============================================================

async def execute_agent_task(agent_id: str, task: dict):
    """Exemplo de agente instrumentado."""
    agent_type = task.get("agent_type", "generic")
    task_type = task.get("type", "unknown")
    model = task.get("model", "claude-3-haiku-20240307")

    # Incrementa agentes ativos
    metrics.active_agents.labels(
        agent_type=agent_type,
        status="running"
    ).inc()

    try:
        async with track_task_execution(agent_type, task_type):
            async with track_llm_call(model, agent_id):
                # Simula chamada LLM
                await asyncio.sleep(0.1)  # substituir por chamada real

                # Registra tokens e custo
                input_tokens = 500
                output_tokens = 200
                cost = (input_tokens * 0.25 + output_tokens * 1.25) / 1_000_000

                metrics.tokens_total.labels(
                    model=model, token_type="input"
                ).inc(input_tokens)
                metrics.tokens_total.labels(
                    model=model, token_type="output"
                ).inc(output_tokens)
                metrics.cost_total.labels(
                    model=model,
                    department=task.get("department", "default")
                ).inc(cost)

    finally:
        metrics.active_agents.labels(
            agent_type=agent_type,
            status="running"
        ).dec()

Dica: Use labels com moderação. Cada combinação única de labels cria uma nova série temporal no Prometheus. Com 10 agents × 5 models × 3 statuses = 150 séries. Com 100 agents, já são 1.500 séries — pode explodir a memória do Prometheus.

3

SLIs e SLOs — Definindo Contratos de Qualidade

SLI (Service Level Indicator) é a métrica real que você mede. SLO (Service Level Objective) é o alvo que você se compromete a atingir. SLA (Service Level Agreement) é o contrato comercial com penalidades se o SLO for violado.

Dimensão SLI (O que medir) SLO (Meta)
Disponibilidade % requests com status 2xx ≥ 99.5%
Taxa de Sucesso tarefas_sucesso / tarefas_total ≥ 95%
Latência P50 Mediana do tempo de conclusão ≤ 30s
Latência P99 99° percentil do tempo ≤ 120s
Freshness % dados com idade ≤ threshold ≥ 98%
Custo por Tarefa USD médio por tarefa completada ≤ $0.15
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import time

@dataclass
class SLODefinition:
    name: str
    description: str
    target: float          # 0.0 a 1.0 (ex: 0.995 = 99.5%)
    window_hours: int = 24 # janela de avaliação

@dataclass
class SLOStatus:
    slo: SLODefinition
    current_value: float
    is_compliant: bool
    error_budget_remaining: float  # quanto posso "gastar" antes de violar
    burn_rate: float               # velocidade de consumo do budget

class SLOManager:
    """Gerencia definições e avaliação de SLOs."""

    SLOS: List[SLODefinition] = [
        SLODefinition("task_success_rate", "Taxa de sucesso de tarefas", 0.95),
        SLODefinition("latency_p99",       "Latência P99 ≤ 120s",       0.99),
        SLODefinition("availability",      "Disponibilidade do sistema", 0.995),
        SLODefinition("cost_per_task",     "Custo médio ≤ $0.15",       0.90),
    ]

    def __init__(self, prometheus_url: str):
        self.prometheus_url = prometheus_url

    def evaluate_task_success_rate(self, window_hours: int = 24) -> float:
        """Calcula taxa de sucesso via PromQL."""
        # PromQL equivalente:
        # sum(rate(orchestrator_tasks_total{status="success"}[24h]))
        # / sum(rate(orchestrator_tasks_total[24h]))
        # Retorna valor entre 0 e 1
        query = f"""
            sum(rate(orchestrator_tasks_total{{status="success"}}[{window_hours}h]))
            /
            sum(rate(orchestrator_tasks_total[{window_hours}h]))
        """
        # Executar via requests.get(f"{self.prometheus_url}/query?query={query}")
        return 0.962  # simulado

    def calculate_error_budget(
        self, slo: SLODefinition, current_value: float
    ) -> float:
        """
        Error budget = quanto do SLO posso violar.
        Ex: SLO 99.5% → error budget = 0.5% de falhas permitidas.
        Se current é 99.2%, consumi 60% do budget.
        """
        total_budget = 1.0 - slo.target
        consumed = slo.target - current_value
        if consumed <= 0:
            return 1.0  # 100% do budget restante
        remaining = (total_budget - consumed) / total_budget
        return max(0.0, remaining)

    def calculate_burn_rate(
        self, current_1h: float, current_24h: float, slo_target: float
    ) -> float:
        """
        Burn rate = velocidade de consumo do error budget.
        Burn rate 1 = consumindo exatamente na taxa sustentável.
        Burn rate 10 = consumindo 10x mais rápido → alerta crítico.
        """
        hourly_budget = (1.0 - slo_target) / (30 * 24)  # budget por hora (30 dias)
        current_error_rate = 1.0 - current_1h
        if hourly_budget == 0:
            return 0
        return current_error_rate / hourly_budget

    def evaluate_all(self) -> List[SLOStatus]:
        results = []
        for slo in self.SLOS:
            current = self.evaluate_task_success_rate(slo.window_hours)
            budget_remaining = self.calculate_error_budget(slo, current)
            burn_rate = self.calculate_burn_rate(current, current, slo.target)

            results.append(SLOStatus(
                slo=slo,
                current_value=current,
                is_compliant=current >= slo.target,
                error_budget_remaining=budget_remaining,
                burn_rate=burn_rate
            ))
        return results
4

Prometheus + Grafana — Setup Completo

O stack Prometheus + Grafana é o padrão para visualização de métricas. Prometheus coleta e armazena; Grafana visualiza com dashboards interativos.

# docker-compose.yml — Stack de observabilidade completo

version: '3.8'
services:

  # Seu orquestrador expõe métricas na porta 8000
  orchestrator:
    build: .
    ports:
      - "8000:8000"   # app
      - "8001:8001"   # /metrics
    environment:
      - PROMETHEUS_PORT=8001
    networks:
      - monitoring

  # Prometheus: coleta e armazena métricas
  prometheus:
    image: prom/prometheus:latest
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - ./alerts.yml:/etc/prometheus/alerts.yml
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.retention.time=30d'
      - '--web.enable-lifecycle'
    networks:
      - monitoring

  # Grafana: dashboards e visualizações
  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
      - GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/orchestrator.json
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/var/lib/grafana/dashboards
      - ./grafana/provisioning:/etc/grafana/provisioning
    networks:
      - monitoring

  # AlertManager: roteamento de alertas
  alertmanager:
    image: prom/alertmanager:latest
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager.yml:/etc/alertmanager/alertmanager.yml
    networks:
      - monitoring

volumes:
  prometheus_data:
  grafana_data:

networks:
  monitoring:
    driver: bridge

---
# prometheus.yml — Configuração de scraping

global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alerts.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets: ['alertmanager:9093']

scrape_configs:
  - job_name: 'orchestrator'
    static_configs:
      - targets: ['orchestrator:8001']
    scrape_interval: 10s
    metrics_path: /metrics

  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres-exporter:9187']
# alerts.yml — Regras de alertas Prometheus

groups:
  - name: orchestrator_alerts
    interval: 30s
    rules:

      # Alerta: taxa de sucesso caindo
      - alert: TaskSuccessRateLow
        expr: |
          (
            sum(rate(orchestrator_tasks_total{status="success"}[5m]))
            / sum(rate(orchestrator_tasks_total[5m]))
          ) < 0.90
        for: 2m
        labels:
          severity: critical
          team: platform
        annotations:
          summary: "Taxa de sucesso abaixo de 90%"
          description: "Taxa atual: {{ $value | humanizePercentage }}"
          runbook: "https://wiki/runbooks/task-success-rate"

      # Alerta: agente em loop (muito retry)
      - alert: AgentRetryLoop
        expr: |
          rate(orchestrator_retry_total[5m]) > 5
        for: 1m
        labels:
          severity: warning
        annotations:
          summary: "Agente {{ $labels.agent_type }} com alta taxa de retry"

      # Alerta: budget quase esgotado
      - alert: BudgetCritical
        expr: orchestrator_budget_remaining_pct < 10
        for: 0m
        labels:
          severity: critical
          pager: "true"
        annotations:
          summary: "Budget crítico: {{ $value }}% restante"

      # Alerta: latência P99 alta
      - alert: HighLatencyP99
        expr: |
          histogram_quantile(0.99,
            rate(orchestrator_task_duration_seconds_bucket[10m])
          ) > 120
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Latência P99 acima de 120s: {{ $value }}s"

      # Alerta: burn rate acelerado
      - alert: ErrorBudgetBurnRate
        expr: |
          (
            1 - sum(rate(orchestrator_tasks_total{status="success"}[1h]))
            / sum(rate(orchestrator_tasks_total[1h]))
          ) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "Error budget consumindo 10x mais rápido que o sustentável"
5

Dashboard Executivo — Métricas de Negócio

CEOs e CFOs não leem gráficos de P99. Eles querem saber: quantas tarefas completamos, qual o custo, qual o ROI. O dashboard executivo traduz métricas técnicas em linguagem de negócio.

from dataclasses import dataclass
from typing import Dict, List
import httpx
import asyncio
from datetime import datetime, timedelta

@dataclass
class BusinessMetricSnapshot:
    timestamp: datetime
    tasks_completed_today: int
    tasks_failed_today: int
    success_rate_pct: float
    cost_today_usd: float
    avg_cost_per_task: float
    human_hours_saved: float    # horas equivalentes de trabalho humano
    roi_multiplier: float       # quantas vezes o custo foi economizado
    top_failure_reason: str
    budget_remaining_pct: float

class ExecutiveDashboard:
    """Coleta métricas de negócio para dashboard executivo."""

    # Custo médio de um analista humano por hora
    HUMAN_HOURLY_RATE = 45.0  # USD

    # Tempo médio de uma tarefa feita por humano
    HUMAN_TASK_MINUTES = {
        "research": 30,
        "analysis": 45,
        "report": 60,
        "data_extraction": 20,
        "competitor_analysis": 90,
    }

    def __init__(self, prometheus_url: str):
        self.prom = prometheus_url

    async def query(self, promql: str) -> float:
        """Executa query PromQL e retorna valor escalar."""
        async with httpx.AsyncClient() as client:
            resp = await client.get(
                f"{self.prom}/api/v1/query",
                params={"query": promql}
            )
            data = resp.json()
            results = data.get("data", {}).get("result", [])
            if results:
                return float(results[0]["value"][1])
        return 0.0

    async def get_snapshot(self) -> BusinessMetricSnapshot:
        """Coleta snapshot atual de todas as métricas de negócio."""

        # Queries paralelas para eficiência
        (
            tasks_success,
            tasks_failed,
            cost_today,
            budget_pct
        ) = await asyncio.gather(
            self.query('sum(increase(orchestrator_tasks_total{status="success"}[24h]))'),
            self.query('sum(increase(orchestrator_tasks_total{status="error"}[24h]))'),
            self.query('sum(increase(orchestrator_cost_usd_total[24h]))'),
            self.query('min(orchestrator_budget_remaining_pct)')
        )

        total = tasks_success + tasks_failed
        success_rate = (tasks_success / total * 100) if total > 0 else 0
        avg_cost = (cost_today / tasks_success) if tasks_success > 0 else 0

        # Calcular horas humanas economizadas
        # Assume média de 35 minutos por tarefa para humano
        human_hours_saved = (tasks_success * 35) / 60

        # ROI: quanto economizou vs. custo do sistema
        human_cost_equivalent = human_hours_saved * self.HUMAN_HOURLY_RATE
        roi = (human_cost_equivalent / cost_today) if cost_today > 0 else 0

        return BusinessMetricSnapshot(
            timestamp=datetime.now(),
            tasks_completed_today=int(tasks_success),
            tasks_failed_today=int(tasks_failed),
            success_rate_pct=success_rate,
            cost_today_usd=cost_today,
            avg_cost_per_task=avg_cost,
            human_hours_saved=human_hours_saved,
            roi_multiplier=roi,
            top_failure_reason="context_limit",  # do log analysis
            budget_remaining_pct=budget_pct
        )

    def format_executive_report(self, snap: BusinessMetricSnapshot) -> str:
        return f"""
RELATÓRIO EXECUTIVO — ORQUESTRADOR IA
Data: {snap.timestamp.strftime('%d/%m/%Y %H:%M')}
{'='*50}

PRODUÇÃO DO DIA
  Tarefas concluídas:  {snap.tasks_completed_today:,}
  Taxa de sucesso:     {snap.success_rate_pct:.1f}%
  Tarefas falhas:      {snap.tasks_failed_today}

FINANCEIRO
  Custo hoje:          ${snap.cost_today_usd:.2f}
  Custo médio/tarefa:  ${snap.avg_cost_per_task:.4f}
  Budget restante:     {snap.budget_remaining_pct:.0f}%

VALOR GERADO
  Horas humanas salvas: {snap.human_hours_saved:.1f}h
  Equivalente humano:   ${snap.human_hours_saved * 45:.2f}
  ROI do sistema:       {snap.roi_multiplier:.1f}x

PRÓXIMAS AÇÕES
  {'⚠️  ALERTA: Budget crítico' if snap.budget_remaining_pct < 20 else '✓  Budget saudável'}
  {'⚠️  ALERTA: Taxa de sucesso baixa' if snap.success_rate_pct < 90 else '✓  Qualidade OK'}
"""
6

AlertManager — Sistema de Alertas com Roteamento Inteligente

AlertManager recebe alertas do Prometheus e roteia para os canais corretos: Slack para on-call, PagerDuty para críticos, e-mail para relatórios. O roteamento evita notification fatigue — nem todo alerta precisa acordar alguém às 3h da manhã.

# alertmanager.yml — Roteamento de alertas

global:
  resolve_timeout: 5m
  slack_api_url: 'https://hooks.slack.com/services/T.../B.../...'

# Templates de mensagem
templates:
  - '/etc/alertmanager/templates/*.tmpl'

route:
  # Regra default
  receiver: 'slack-warnings'
  group_by: ['alertname', 'severity']
  group_wait: 30s
  group_interval: 5m
  repeat_interval: 4h

  routes:
    # Críticos → PagerDuty + Slack
    - match:
        severity: critical
      receiver: 'pagerduty-critical'
      continue: true  # continua para próxima rota

    - match:
        severity: critical
      receiver: 'slack-critical'

    # Budget crítico → também alerta CFO
    - match:
        alertname: BudgetCritical
      receiver: 'email-finance'

    # Warnings → apenas Slack
    - match:
        severity: warning
      receiver: 'slack-warnings'

receivers:
  - name: 'slack-critical'
    slack_configs:
      - channel: '#alerts-critical'
        title: '🚨 CRÍTICO: {{ .GroupLabels.alertname }}'
        text: |
          {{ range .Alerts }}
          *{{ .Annotations.summary }}*
          {{ .Annotations.description }}
          Runbook: {{ .Annotations.runbook }}
          {{ end }}
        color: 'danger'
        send_resolved: true

  - name: 'slack-warnings'
    slack_configs:
      - channel: '#alerts-platform'
        title: '⚠️ {{ .GroupLabels.alertname }}'
        text: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
        color: 'warning'

  - name: 'pagerduty-critical'
    pagerduty_configs:
      - routing_key: 'YOUR_PAGERDUTY_KEY'
        severity: 'critical'
        description: '{{ .GroupLabels.alertname }}: {{ .CommonAnnotations.summary }}'

  - name: 'email-finance'
    email_configs:
      - to: 'cfo@empresa.com'
        subject: '[URGENTE] Budget de IA - Ação Necessária'
        body: |
          O budget de IA atingiu nível crítico.
          Ação necessária: revisar limites ou aprovar aumento.

inhibit_rules:
  # Se há alerta crítico, suprime warnings relacionados
  - source_match:
      severity: 'critical'
    target_match:
      severity: 'warning'
    equal: ['alertname']
PromQL Essenciais
rate(metric[5m])
histogram_quantile(0.99, ...)
increase(counter[24h])
avg_over_time(gauge[1h])
Grafana Panels
Time Series → latências
Stat → valor atual
Gauge → budget %
Table → top failures
Retenção
Prometheus: 30 dias
Thanos: 1 ano (S3)
Resolution: 15s raw
Downsample: 5m / 1h

Resumo do Módulo 5.7

  • Prometheus com 4 tipos de métricas: Counter, Gauge, Histogram, Summary
  • SLIs medem qualidade real; SLOs definem metas; error budget quantifica quanto posso falhar
  • Burn rate > 10x → alerta imediato, mesmo que SLO ainda não violado
  • Dashboard executivo traduz latência P99 em "horas humanas salvas" e ROI
  • AlertManager roteia: críticos → PagerDuty, warnings → Slack, budget → CFO
  • Inhibit rules evitam notification fatigue quando sistemas estão em cascata
5.6 — Custo e Audit 5.8 — Rastreamento