Métricas vs. Traces — Quando Usar Cada Um
Métricas (módulo 5.7) respondem "o que está acontecendo em geral". Traces respondem "o que aconteceu nessa tarefa específica que levou 8 minutos". São complementares, não concorrentes.
- • P99 latência está alta (mas não sabe qual tarefa)
- • Taxa de erro subiu nos últimos 5 minutos
- • Budget consumindo mais rápido que o normal
- • Alertas e dashboards de saúde geral
- • Uma tarefa específica falhou — o que aconteceu?
- • Qual agente causou a lentidão em task #12345?
- • Quais ferramentas foram chamadas e em que ordem?
- • Onde está o gargalo no pipeline multi-agente?
Anatomia de um Trace
OpenTelemetry — O Padrão Universal de Instrumentação
OpenTelemetry (OTel) é o padrão aberto para instrumentação. Você escreve código OTel uma vez e pode enviar para Jaeger, Tempo, Zipkin, Datadog, Honeycomb — qualquer backend. Zero vendor lock-in.
# pip install opentelemetry-api opentelemetry-sdk
# pip install opentelemetry-exporter-otlp opentelemetry-instrumentation-httpx
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry import propagate
import asyncio
from typing import Optional, Dict, Any
from contextlib import asynccontextmanager
# ============================================================
# SETUP DO TRACER PROVIDER
# ============================================================
def setup_tracing(
service_name: str,
otlp_endpoint: str = "http://localhost:4317" # Jaeger/Tempo endpoint
) -> trace.Tracer:
"""Configura OpenTelemetry com exportador OTLP."""
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": "production",
})
provider = TracerProvider(resource=resource)
# Exportador OTLP (gRPC) → Jaeger/Tempo/etc
exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
processor = BatchSpanProcessor(
exporter,
max_queue_size=2048,
max_export_batch_size=512,
export_timeout_millis=30000,
)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
# Tracer global
tracer = setup_tracing("ai-orchestrator")
# ============================================================
# DECORATORS E CONTEXT MANAGERS PARA INSTRUMENTAÇÃO
# ============================================================
@asynccontextmanager
async def traced_span(
name: str,
attributes: Optional[Dict[str, Any]] = None,
parent_context=None
):
"""Context manager que cria um span OpenTelemetry."""
ctx = parent_context or trace.get_current_span().get_span_context()
with tracer.start_as_current_span(
name,
attributes=attributes or {}
) as span:
try:
yield span
except Exception as e:
span.set_status(trace.StatusCode.ERROR, str(e))
span.record_exception(e)
raise
finally:
span.set_status(trace.StatusCode.OK)
# ============================================================
# ORQUESTRADOR COMPLETAMENTE INSTRUMENTADO
# ============================================================
class InstrumentedOrchestrator:
"""Orquestrador com tracing completo via OpenTelemetry."""
def __init__(self):
self.tracer = tracer
async def execute_workflow(
self, objective: str, session_id: str
) -> dict:
"""Ponto de entrada — cria o trace raiz."""
# Span raiz: representa todo o workflow
async with traced_span(
"orchestrator.execute_workflow",
attributes={
"session.id": session_id,
"objective.length": len(objective),
"objective.preview": objective[:100],
}
) as root_span:
try:
# Fase 1: Planejamento
plan = await self._plan(objective, root_span)
# Fase 2: Execução paralela
results = await asyncio.gather(*[
self._execute_task(task, root_span)
for task in plan["tasks"]
])
# Fase 3: Síntese
final = await self._synthesize(results, root_span)
root_span.set_attribute("tasks.total", len(plan["tasks"]))
root_span.set_attribute("tasks.succeeded", len(results))
return final
except Exception as e:
root_span.set_attribute("error", True)
root_span.set_attribute("error.message", str(e))
raise
async def _plan(self, objective: str, parent_span) -> dict:
"""Fase de planejamento com span filho."""
async with traced_span(
"orchestrator.plan",
attributes={
"planner.model": "claude-3-haiku-20240307",
"objective": objective[:200],
}
) as span:
# Simula planejamento
await asyncio.sleep(0.5)
tasks = [
{"id": f"task_{i}", "type": "research", "target": f"company_{i}"}
for i in range(3)
]
span.set_attribute("plan.tasks_count", len(tasks))
span.set_attribute("plan.strategy", "parallel")
return {"tasks": tasks}
async def _execute_task(self, task: dict, parent_span) -> dict:
"""Execução de uma tarefa individual."""
async with traced_span(
f"agent.execute",
attributes={
"task.id": task["id"],
"task.type": task["type"],
"task.target": task.get("target", ""),
"agent.type": "research_agent",
}
) as span:
# Sub-span para chamada LLM
async with traced_span(
"llm.call",
attributes={
"llm.model": "claude-3-sonnet-20240229",
"llm.input_tokens": 450,
}
) as llm_span:
await asyncio.sleep(0.3) # simula LLM
llm_span.set_attribute("llm.output_tokens", 320)
llm_span.set_attribute("llm.cost_usd", 0.002)
llm_span.set_attribute("llm.finish_reason", "end_turn")
# Sub-span para tool call
async with traced_span(
"tool.web_search",
attributes={"tool.name": "web_search", "tool.query": task["target"]}
):
await asyncio.sleep(0.1)
span.set_attribute("task.result_length", 1500)
return {"task_id": task["id"], "result": "..."}
async def _synthesize(self, results: list, parent_span) -> dict:
async with traced_span(
"orchestrator.synthesize",
attributes={"results.count": len(results)}
) as span:
await asyncio.sleep(0.2)
span.set_attribute("synthesis.model", "claude-3-opus-20240229")
return {"status": "completed", "tasks": len(results)}
Propagação de Contexto entre Agentes
Em sistemas multi-agente, cada agente pode rodar em um processo diferente ou até em um servidor diferente. A propagação de contexto garante que o trace_id e span_id sejam transmitidos junto com cada request, mantendo a árvore de spans coesa.
from opentelemetry import propagate
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import httpx
import asyncio
# ============================================================
# PROPAGAÇÃO VIA HTTP HEADERS
# ============================================================
class AgentHTTPClient:
"""Cliente HTTP que propaga trace context automaticamente."""
def __init__(self, base_url: str):
self.base_url = base_url
async def call_agent(
self, endpoint: str, payload: dict
) -> dict:
"""Chama agente remoto propagando o trace context."""
# Injeta trace context nos headers
headers = {}
propagate.inject(headers)
# Headers agora contém:
# traceparent: 00-{trace_id}-{span_id}-01
# tracestate: (opcional, vendor-specific)
async with tracer.start_as_current_span(
f"http.post {endpoint}",
attributes={
"http.method": "POST",
"http.url": f"{self.base_url}{endpoint}",
"http.target": endpoint,
}
) as span:
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{self.base_url}{endpoint}",
json=payload,
headers=headers # trace propagado aqui
)
span.set_attribute("http.status_code", resp.status_code)
return resp.json()
# ============================================================
# LADO DO AGENTE RECEPTOR (FastAPI)
# ============================================================
# from fastapi import FastAPI, Request
# from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
#
# app = FastAPI()
# FastAPIInstrumentor.instrument_app(app) # instrumenta automaticamente!
#
# @app.post("/execute")
# async def execute_task(request: Request, payload: dict):
# # O contexto já foi extraído automaticamente pelo middleware
# # Qualquer span criado aqui será filho do span do orquestrador
# with tracer.start_as_current_span("agent.process") as span:
# span.set_attribute("task.id", payload["task_id"])
# result = await process(payload)
# return result
# ============================================================
# PROPAGAÇÃO VIA REDIS (para agentes assíncronos)
# ============================================================
class TraceContextRedisQueue:
"""Propaga trace context via Redis para workers assíncronos."""
def __init__(self, redis_client):
self.redis = redis_client
async def enqueue_with_context(
self, queue_name: str, task: dict
):
"""Adiciona tarefa à fila com trace context embutido."""
carrier = {}
propagate.inject(carrier) # extrai traceparent e tracestate
message = {
**task,
"_trace_context": carrier # embutido no payload
}
await self.redis.lpush(queue_name, str(message))
async def dequeue_and_restore(
self, queue_name: str
) -> tuple[dict, object]:
"""Remove tarefa da fila e restaura trace context."""
raw = await self.redis.brpop(queue_name, timeout=30)
if not raw:
return None, None
import ast
message = ast.literal_eval(raw[1].decode())
carrier = message.pop("_trace_context", {})
# Restaura contexto do trace original
ctx = propagate.extract(carrier)
return message, ctx
async def process_with_restored_context(self, queue_name: str):
"""Worker que processa tarefas mantendo o trace original."""
task, ctx = await self.dequeue_and_restore(queue_name)
if not task:
return
# Cria span filho do span original (em outro processo!)
with tracer.start_as_current_span(
"worker.process_task",
context=ctx, # ← restaura o contexto pai
attributes={"task.id": task.get("id", "unknown")}
) as span:
# Este span aparece na mesma trace tree do orquestrador
await self._process(task)
async def _process(self, task: dict):
await asyncio.sleep(0.1) # simula processamento
W3C Trace Context: O formato traceparent: 00-{trace_id}-{span_id}-{flags} é um padrão W3C. O trace_id tem 128 bits (32 hex chars). É o mesmo ID que conecta todos os spans de um workflow, mesmo distribuído por 10 processos.
Jaeger + Grafana Tempo — Setup e Configuração
Jaeger é o backend de tracing open-source mais usado. Grafana Tempo é uma alternativa eficiente que integra nativamente com Grafana, Loki e Prometheus — ideal quando você já tem o stack de observabilidade no Grafana.
# docker-compose.tracing.yml — Backend de tracing
version: '3.8'
services:
# OPÇÃO A: Jaeger (standalone, fácil de configurar)
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI do Jaeger
- "4317:4317" # OTLP gRPC receiver
- "4318:4318" # OTLP HTTP receiver
- "14268:14268" # Jaeger HTTP (legacy)
environment:
- COLLECTOR_OTLP_ENABLED=true
- SPAN_STORAGE_TYPE=memory # prod: use elasticsearch
networks:
- monitoring
# OPÇÃO B: Grafana Tempo (integra com Grafana/Loki/Prometheus)
tempo:
image: grafana/tempo:latest
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "3200:3200" # Tempo HTTP API
volumes:
- ./tempo.yml:/etc/tempo.yml
- tempo_data:/tmp/tempo
command: ["-config.file=/etc/tempo.yml"]
networks:
- monitoring
# OTel Collector (opcional mas recomendado em prod)
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
ports:
- "4317:4317" # OTLP gRPC input
- "4318:4318" # OTLP HTTP input
- "8888:8888" # Prometheus metrics do collector
volumes:
- ./otel-collector.yml:/etc/otelcol/config.yaml
networks:
- monitoring
volumes:
tempo_data:
---
# tempo.yml — Configuração do Grafana Tempo
server:
http_listen_port: 3200
distributor:
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
http:
endpoint: "0.0.0.0:4318"
ingester:
max_block_duration: 5m
compactor:
compaction:
block_retention: 1h # prod: 30d+
storage:
trace:
backend: local # prod: s3 / gcs
local:
path: /tmp/tempo/blocks
---
# otel-collector.yml — Pipeline de telemetria
receivers:
otlp:
protocols:
grpc:
endpoint: "0.0.0.0:4317"
http:
endpoint: "0.0.0.0:4318"
processors:
batch:
timeout: 1s
send_batch_size: 1024
# Remove dados sensíveis
attributes:
actions:
- key: llm.prompt # não logar prompts completos
action: delete
- key: user.email
action: delete
exporters:
otlp/tempo:
endpoint: "tempo:4317"
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [otlp]
processors: [batch, attributes]
exporters: [otlp/tempo]
metrics:
receivers: [otlp]
processors: [batch]
exporters: [prometheus]
Análise de Gargalos — TraceAnalyzer
Com os traces coletados, podemos programaticamente identificar gargalos: qual agente é mais lento, qual ferramenta falha mais, quais tarefas têm critical path mais longo.
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
import httpx
import asyncio
@dataclass
class SpanData:
trace_id: str
span_id: str
parent_span_id: Optional[str]
operation_name: str
start_time: datetime
end_time: datetime
duration_ms: float
tags: Dict[str, str]
status: str # OK / ERROR
@property
def is_error(self) -> bool:
return self.status == "ERROR"
@property
def is_llm_call(self) -> bool:
return self.operation_name.startswith("llm.")
@property
def is_tool_call(self) -> bool:
return self.operation_name.startswith("tool.")
@dataclass
class TraceAnalysis:
trace_id: str
total_duration_ms: float
critical_path: List[SpanData]
bottleneck_span: Optional[SpanData]
parallel_efficiency: float # 1.0 = perfeito, <1.0 = overhead
error_spans: List[SpanData]
llm_time_ms: float
tool_time_ms: float
orchestration_overhead_ms: float
class TraceAnalyzer:
"""Analisa traces do Jaeger/Tempo para encontrar gargalos."""
def __init__(self, jaeger_url: str = "http://localhost:16686"):
self.jaeger_url = jaeger_url
async def fetch_trace(self, trace_id: str) -> List[SpanData]:
"""Busca todos os spans de um trace no Jaeger."""
async with httpx.AsyncClient() as client:
resp = await client.get(
f"{self.jaeger_url}/api/traces/{trace_id}"
)
data = resp.json()
spans = []
for span in data.get("data", [{}])[0].get("spans", []):
start_us = span["startTime"]
duration_us = span["duration"]
start_dt = datetime.fromtimestamp(start_us / 1_000_000)
end_dt = start_dt + timedelta(microseconds=duration_us)
tags = {t["key"]: str(t["value"]) for t in span.get("tags", [])}
has_error = any(t["key"] == "error" and t["value"] for t in span.get("tags", []))
spans.append(SpanData(
trace_id=trace_id,
span_id=span["spanID"],
parent_span_id=(span.get("references") or [{}])[0].get("spanID"),
operation_name=span["operationName"],
start_time=start_dt,
end_time=end_dt,
duration_ms=duration_us / 1000,
tags=tags,
status="ERROR" if has_error else "OK"
))
return spans
def find_critical_path(self, spans: List[SpanData]) -> List[SpanData]:
"""Encontra o caminho crítico (sequência de spans mais longa)."""
# Constrói árvore de spans
children: Dict[Optional[str], List[SpanData]] = {}
for span in spans:
parent = span.parent_span_id
children.setdefault(parent, []).append(span)
# DFS para encontrar o caminho mais longo
def longest_path(span_id: Optional[str]) -> List[SpanData]:
kids = children.get(span_id, [])
if not kids:
return []
paths = [longest_path(s.span_id) for s in kids]
longest = max(paths, key=lambda p: sum(s.duration_ms for s in p), default=[])
if kids:
representative = max(kids, key=lambda s: s.duration_ms)
return [representative] + longest
return longest
roots = children.get(None, [])
if not roots:
return []
root = max(roots, key=lambda s: s.duration_ms)
return [root] + longest_path(root.span_id)
def analyze(self, spans: List[SpanData]) -> TraceAnalysis:
"""Análise completa de um trace."""
root_spans = [s for s in spans if not s.parent_span_id]
total_duration = max(s.duration_ms for s in root_spans) if root_spans else 0
critical_path = self.find_critical_path(spans)
bottleneck = max(critical_path, key=lambda s: s.duration_ms) if critical_path else None
# Calcula tempo em cada categoria
llm_time = sum(s.duration_ms for s in spans if s.is_llm_call)
tool_time = sum(s.duration_ms for s in spans if s.is_tool_call)
total_span_time = sum(s.duration_ms for s in spans)
# Parallel efficiency: se tudo fosse serial, demoraria total_span_time
# Mas demora total_duration (com paralelismo)
efficiency = total_duration / total_span_time if total_span_time > 0 else 1.0
orchestration_overhead = total_duration - llm_time - tool_time
return TraceAnalysis(
trace_id=spans[0].trace_id if spans else "",
total_duration_ms=total_duration,
critical_path=critical_path,
bottleneck_span=bottleneck,
parallel_efficiency=efficiency,
error_spans=[s for s in spans if s.is_error],
llm_time_ms=llm_time,
tool_time_ms=tool_time,
orchestration_overhead_ms=max(0, orchestration_overhead)
)
def format_report(self, analysis: TraceAnalysis) -> str:
bn = analysis.bottleneck_span
return f"""
ANÁLISE DE TRACE: {analysis.trace_id[:16]}...
Duração total: {analysis.total_duration_ms:.0f}ms
DECOMPOSIÇÃO DO TEMPO:
LLM calls: {analysis.llm_time_ms:.0f}ms ({analysis.llm_time_ms/analysis.total_duration_ms*100:.0f}%)
Tool calls: {analysis.tool_time_ms:.0f}ms
Orquestração: {analysis.orchestration_overhead_ms:.0f}ms
PARALELISMO:
Eficiência: {analysis.parallel_efficiency:.1%}
GARGALO: {bn.operation_name if bn else 'N/A'} ({bn.duration_ms:.0f}ms)
ERROS: {len(analysis.error_spans)} spans com erro
"""
Correlacionando Métricas, Logs e Traces — Observabilidade Unificada
O poder real da observabilidade aparece quando você pode correlacionar os 3 pilares: viu uma spike de latência no Grafana? Clica no ponto, o Grafana abre os logs do Loki naquele intervalo, e você pode pular direto para o trace no Tempo que causou a anomalia.
import logging
import json
from opentelemetry import trace as otel_trace
class OTelAwareLogger:
"""Logger que injeta trace_id e span_id em cada log entry.
Isso permite correlacionar logs no Loki com traces no Tempo.
No Grafana, você pode clicar num log e pular para o trace.
"""
def __init__(self, name: str):
self._logger = logging.getLogger(name)
def _get_trace_context(self) -> dict:
"""Extrai trace context do span atual."""
span = otel_trace.get_current_span()
if not span or not span.is_recording():
return {}
ctx = span.get_span_context()
return {
"trace_id": format(ctx.trace_id, '032x'),
"span_id": format(ctx.span_id, '016x'),
"trace_flags": ctx.trace_flags,
}
def info(self, message: str, **extra):
entry = {
"level": "INFO",
"message": message,
**self._get_trace_context(),
**extra
}
self._logger.info(json.dumps(entry))
def error(self, message: str, exc: Exception = None, **extra):
entry = {
"level": "ERROR",
"message": message,
**self._get_trace_context(),
**extra
}
if exc:
entry["exception"] = str(exc)
entry["exception_type"] = type(exc).__name__
self._logger.error(json.dumps(entry))
def warning(self, message: str, **extra):
entry = {
"level": "WARNING",
"message": message,
**self._get_trace_context(),
**extra
}
self._logger.warning(json.dumps(entry))
# ============================================================
# LOG ENTRY RESULTANTE (correlacionado com trace)
# ============================================================
# {
# "level": "INFO",
# "message": "LLM call completed",
# "trace_id": "7f3a9b2c1e4d5f6a8b9c0d1e2f3a4b5c",
# "span_id": "1a2b3c4d5e6f7890",
# "model": "claude-3-sonnet-20240229",
# "tokens": 750,
# "cost_usd": 0.0023,
# "duration_ms": 2340
# }
#
# Com este formato no Loki, você pode:
# 1. Ver spike de latência no Grafana (Prometheus)
# 2. Filtrar logs no Loki por trace_id
# 3. Navegar para o trace no Tempo com 1 clique
# ============================================================
# GRAFANA DATASOURCE LINKING (grafana provisioning)
# ============================================================
# Em grafana/provisioning/datasources/datasources.yml:
#
# datasources:
# - name: Tempo
# type: tempo
# url: http://tempo:3200
# jsonData:
# tracesToLogs:
# datasourceUid: loki
# tags: ['trace_id']
# mappedTags: [{key: 'trace_id', value: 'trace_id'}]
#
# - name: Loki
# type: loki
# url: http://loki:3100
# jsonData:
# derivedFields:
# - name: TraceID
# matcherRegex: '"trace_id":"(\w+)"'
# url: '$${__value.raw}'
# datasourceUid: tempo
Resumo do Módulo 5.8
- • Métricas respondem "o que"; traces respondem "por que" — use ambos para observabilidade completa
- • OpenTelemetry é o padrão universal — escreva uma vez, exporte para qualquer backend
- • Propagação de contexto via W3C traceparent garante trace coeso em sistemas distribuídos
- • OTel Collector em produção: filtra dados sensíveis (prompts, emails) antes de exportar
- • TraceAnalyzer programático: encontra critical path, gargalos e eficiência de paralelismo
- • Logs com trace_id + Grafana datasource linking = navegação 1-clique entre métricas/logs/traces