MÓDULO 5.8 Camada 6b Rastreamento Distribuído

🔍 Camada 6b — Observabilidade: Rastreamento

OpenTelemetry, distributed tracing com Jaeger/Tempo, propagação de contexto entre agentes, análise de gargalos e debugging de workflows complexos em produção.

6
Seções
~55min
Duração
Avançado
Nível
C6b
Camada
1

Métricas vs. Traces — Quando Usar Cada Um

Métricas (módulo 5.7) respondem "o que está acontecendo em geral". Traces respondem "o que aconteceu nessa tarefa específica que levou 8 minutos". São complementares, não concorrentes.

📊 Métricas — Use quando:
  • P99 latência está alta (mas não sabe qual tarefa)
  • Taxa de erro subiu nos últimos 5 minutos
  • Budget consumindo mais rápido que o normal
  • Alertas e dashboards de saúde geral
🔍 Traces — Use quando:
  • Uma tarefa específica falhou — o que aconteceu?
  • Qual agente causou a lentidão em task #12345?
  • Quais ferramentas foram chamadas e em que ordem?
  • Onde está o gargalo no pipeline multi-agente?

Anatomia de um Trace

TRACE: competitive_analysis [trace_id: abc123] — 142.3s total
├── orchestrator.plan [span: def456] — 2.1s
├── orchestrator.route [span: ghi789] — 0.3s
├── agent.research_amazon [span: jkl012] — 45.2s
├── tool.web_search [span: mno345] — 12.1s
├── llm.claude-3-sonnet [span: pqr678] — 28.4s ⚠️ LENTO
└── tool.write_result [span: stu901] — 4.7s
├── agent.research_google [span: vwx234] — 38.8s
└── agent.synthesizer [span: yza567] — 55.9s
└── llm.claude-3-opus [span: bcd890] — 54.1s ⚠️ GARGALO
2

OpenTelemetry — O Padrão Universal de Instrumentação

OpenTelemetry (OTel) é o padrão aberto para instrumentação. Você escreve código OTel uma vez e pode enviar para Jaeger, Tempo, Zipkin, Datadog, Honeycomb — qualquer backend. Zero vendor lock-in.

# pip install opentelemetry-api opentelemetry-sdk
# pip install opentelemetry-exporter-otlp opentelemetry-instrumentation-httpx

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
from opentelemetry import propagate
import asyncio
from typing import Optional, Dict, Any
from contextlib import asynccontextmanager

# ============================================================
# SETUP DO TRACER PROVIDER
# ============================================================

def setup_tracing(
    service_name: str,
    otlp_endpoint: str = "http://localhost:4317"  # Jaeger/Tempo endpoint
) -> trace.Tracer:
    """Configura OpenTelemetry com exportador OTLP."""

    resource = Resource.create({
        "service.name": service_name,
        "service.version": "1.0.0",
        "deployment.environment": "production",
    })

    provider = TracerProvider(resource=resource)

    # Exportador OTLP (gRPC) → Jaeger/Tempo/etc
    exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
    processor = BatchSpanProcessor(
        exporter,
        max_queue_size=2048,
        max_export_batch_size=512,
        export_timeout_millis=30000,
    )
    provider.add_span_processor(processor)

    trace.set_tracer_provider(provider)
    return trace.get_tracer(service_name)


# Tracer global
tracer = setup_tracing("ai-orchestrator")


# ============================================================
# DECORATORS E CONTEXT MANAGERS PARA INSTRUMENTAÇÃO
# ============================================================

@asynccontextmanager
async def traced_span(
    name: str,
    attributes: Optional[Dict[str, Any]] = None,
    parent_context=None
):
    """Context manager que cria um span OpenTelemetry."""
    ctx = parent_context or trace.get_current_span().get_span_context()

    with tracer.start_as_current_span(
        name,
        attributes=attributes or {}
    ) as span:
        try:
            yield span
        except Exception as e:
            span.set_status(trace.StatusCode.ERROR, str(e))
            span.record_exception(e)
            raise
        finally:
            span.set_status(trace.StatusCode.OK)


# ============================================================
# ORQUESTRADOR COMPLETAMENTE INSTRUMENTADO
# ============================================================

class InstrumentedOrchestrator:
    """Orquestrador com tracing completo via OpenTelemetry."""

    def __init__(self):
        self.tracer = tracer

    async def execute_workflow(
        self, objective: str, session_id: str
    ) -> dict:
        """Ponto de entrada — cria o trace raiz."""

        # Span raiz: representa todo o workflow
        async with traced_span(
            "orchestrator.execute_workflow",
            attributes={
                "session.id": session_id,
                "objective.length": len(objective),
                "objective.preview": objective[:100],
            }
        ) as root_span:

            try:
                # Fase 1: Planejamento
                plan = await self._plan(objective, root_span)

                # Fase 2: Execução paralela
                results = await asyncio.gather(*[
                    self._execute_task(task, root_span)
                    for task in plan["tasks"]
                ])

                # Fase 3: Síntese
                final = await self._synthesize(results, root_span)

                root_span.set_attribute("tasks.total", len(plan["tasks"]))
                root_span.set_attribute("tasks.succeeded", len(results))
                return final

            except Exception as e:
                root_span.set_attribute("error", True)
                root_span.set_attribute("error.message", str(e))
                raise

    async def _plan(self, objective: str, parent_span) -> dict:
        """Fase de planejamento com span filho."""
        async with traced_span(
            "orchestrator.plan",
            attributes={
                "planner.model": "claude-3-haiku-20240307",
                "objective": objective[:200],
            }
        ) as span:
            # Simula planejamento
            await asyncio.sleep(0.5)
            tasks = [
                {"id": f"task_{i}", "type": "research", "target": f"company_{i}"}
                for i in range(3)
            ]
            span.set_attribute("plan.tasks_count", len(tasks))
            span.set_attribute("plan.strategy", "parallel")
            return {"tasks": tasks}

    async def _execute_task(self, task: dict, parent_span) -> dict:
        """Execução de uma tarefa individual."""
        async with traced_span(
            f"agent.execute",
            attributes={
                "task.id": task["id"],
                "task.type": task["type"],
                "task.target": task.get("target", ""),
                "agent.type": "research_agent",
            }
        ) as span:
            # Sub-span para chamada LLM
            async with traced_span(
                "llm.call",
                attributes={
                    "llm.model": "claude-3-sonnet-20240229",
                    "llm.input_tokens": 450,
                }
            ) as llm_span:
                await asyncio.sleep(0.3)  # simula LLM
                llm_span.set_attribute("llm.output_tokens", 320)
                llm_span.set_attribute("llm.cost_usd", 0.002)
                llm_span.set_attribute("llm.finish_reason", "end_turn")

            # Sub-span para tool call
            async with traced_span(
                "tool.web_search",
                attributes={"tool.name": "web_search", "tool.query": task["target"]}
            ):
                await asyncio.sleep(0.1)

            span.set_attribute("task.result_length", 1500)
            return {"task_id": task["id"], "result": "..."}

    async def _synthesize(self, results: list, parent_span) -> dict:
        async with traced_span(
            "orchestrator.synthesize",
            attributes={"results.count": len(results)}
        ) as span:
            await asyncio.sleep(0.2)
            span.set_attribute("synthesis.model", "claude-3-opus-20240229")
            return {"status": "completed", "tasks": len(results)}
3

Propagação de Contexto entre Agentes

Em sistemas multi-agente, cada agente pode rodar em um processo diferente ou até em um servidor diferente. A propagação de contexto garante que o trace_id e span_id sejam transmitidos junto com cada request, mantendo a árvore de spans coesa.

from opentelemetry import propagate
from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
import httpx
import asyncio

# ============================================================
# PROPAGAÇÃO VIA HTTP HEADERS
# ============================================================

class AgentHTTPClient:
    """Cliente HTTP que propaga trace context automaticamente."""

    def __init__(self, base_url: str):
        self.base_url = base_url

    async def call_agent(
        self, endpoint: str, payload: dict
    ) -> dict:
        """Chama agente remoto propagando o trace context."""

        # Injeta trace context nos headers
        headers = {}
        propagate.inject(headers)
        # Headers agora contém:
        # traceparent: 00-{trace_id}-{span_id}-01
        # tracestate: (opcional, vendor-specific)

        async with tracer.start_as_current_span(
            f"http.post {endpoint}",
            attributes={
                "http.method": "POST",
                "http.url": f"{self.base_url}{endpoint}",
                "http.target": endpoint,
            }
        ) as span:
            async with httpx.AsyncClient() as client:
                resp = await client.post(
                    f"{self.base_url}{endpoint}",
                    json=payload,
                    headers=headers  # trace propagado aqui
                )
                span.set_attribute("http.status_code", resp.status_code)
                return resp.json()


# ============================================================
# LADO DO AGENTE RECEPTOR (FastAPI)
# ============================================================

# from fastapi import FastAPI, Request
# from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
#
# app = FastAPI()
# FastAPIInstrumentor.instrument_app(app)  # instrumenta automaticamente!
#
# @app.post("/execute")
# async def execute_task(request: Request, payload: dict):
#     # O contexto já foi extraído automaticamente pelo middleware
#     # Qualquer span criado aqui será filho do span do orquestrador
#     with tracer.start_as_current_span("agent.process") as span:
#         span.set_attribute("task.id", payload["task_id"])
#         result = await process(payload)
#         return result


# ============================================================
# PROPAGAÇÃO VIA REDIS (para agentes assíncronos)
# ============================================================

class TraceContextRedisQueue:
    """Propaga trace context via Redis para workers assíncronos."""

    def __init__(self, redis_client):
        self.redis = redis_client

    async def enqueue_with_context(
        self, queue_name: str, task: dict
    ):
        """Adiciona tarefa à fila com trace context embutido."""
        carrier = {}
        propagate.inject(carrier)  # extrai traceparent e tracestate

        message = {
            **task,
            "_trace_context": carrier  # embutido no payload
        }
        await self.redis.lpush(queue_name, str(message))

    async def dequeue_and_restore(
        self, queue_name: str
    ) -> tuple[dict, object]:
        """Remove tarefa da fila e restaura trace context."""
        raw = await self.redis.brpop(queue_name, timeout=30)
        if not raw:
            return None, None

        import ast
        message = ast.literal_eval(raw[1].decode())
        carrier = message.pop("_trace_context", {})

        # Restaura contexto do trace original
        ctx = propagate.extract(carrier)
        return message, ctx

    async def process_with_restored_context(self, queue_name: str):
        """Worker que processa tarefas mantendo o trace original."""
        task, ctx = await self.dequeue_and_restore(queue_name)
        if not task:
            return

        # Cria span filho do span original (em outro processo!)
        with tracer.start_as_current_span(
            "worker.process_task",
            context=ctx,  # ← restaura o contexto pai
            attributes={"task.id": task.get("id", "unknown")}
        ) as span:
            # Este span aparece na mesma trace tree do orquestrador
            await self._process(task)

    async def _process(self, task: dict):
        await asyncio.sleep(0.1)  # simula processamento

W3C Trace Context: O formato traceparent: 00-{trace_id}-{span_id}-{flags} é um padrão W3C. O trace_id tem 128 bits (32 hex chars). É o mesmo ID que conecta todos os spans de um workflow, mesmo distribuído por 10 processos.

4

Jaeger + Grafana Tempo — Setup e Configuração

Jaeger é o backend de tracing open-source mais usado. Grafana Tempo é uma alternativa eficiente que integra nativamente com Grafana, Loki e Prometheus — ideal quando você já tem o stack de observabilidade no Grafana.

# docker-compose.tracing.yml — Backend de tracing

version: '3.8'
services:

  # OPÇÃO A: Jaeger (standalone, fácil de configurar)
  jaeger:
    image: jaegertracing/all-in-one:latest
    ports:
      - "16686:16686"  # UI do Jaeger
      - "4317:4317"    # OTLP gRPC receiver
      - "4318:4318"    # OTLP HTTP receiver
      - "14268:14268"  # Jaeger HTTP (legacy)
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      - SPAN_STORAGE_TYPE=memory  # prod: use elasticsearch
    networks:
      - monitoring

  # OPÇÃO B: Grafana Tempo (integra com Grafana/Loki/Prometheus)
  tempo:
    image: grafana/tempo:latest
    ports:
      - "4317:4317"    # OTLP gRPC
      - "4318:4318"    # OTLP HTTP
      - "3200:3200"    # Tempo HTTP API
    volumes:
      - ./tempo.yml:/etc/tempo.yml
      - tempo_data:/tmp/tempo
    command: ["-config.file=/etc/tempo.yml"]
    networks:
      - monitoring

  # OTel Collector (opcional mas recomendado em prod)
  otel-collector:
    image: otel/opentelemetry-collector-contrib:latest
    ports:
      - "4317:4317"    # OTLP gRPC input
      - "4318:4318"    # OTLP HTTP input
      - "8888:8888"    # Prometheus metrics do collector
    volumes:
      - ./otel-collector.yml:/etc/otelcol/config.yaml
    networks:
      - monitoring

volumes:
  tempo_data:

---
# tempo.yml — Configuração do Grafana Tempo

server:
  http_listen_port: 3200

distributor:
  receivers:
    otlp:
      protocols:
        grpc:
          endpoint: "0.0.0.0:4317"
        http:
          endpoint: "0.0.0.0:4318"

ingester:
  max_block_duration: 5m

compactor:
  compaction:
    block_retention: 1h  # prod: 30d+

storage:
  trace:
    backend: local  # prod: s3 / gcs
    local:
      path: /tmp/tempo/blocks

---
# otel-collector.yml — Pipeline de telemetria

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: "0.0.0.0:4317"
      http:
        endpoint: "0.0.0.0:4318"

processors:
  batch:
    timeout: 1s
    send_batch_size: 1024

  # Remove dados sensíveis
  attributes:
    actions:
      - key: llm.prompt  # não logar prompts completos
        action: delete
      - key: user.email
        action: delete

exporters:
  otlp/tempo:
    endpoint: "tempo:4317"
    tls:
      insecure: true

  prometheus:
    endpoint: "0.0.0.0:8889"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch, attributes]
      exporters: [otlp/tempo]
    metrics:
      receivers: [otlp]
      processors: [batch]
      exporters: [prometheus]
5

Análise de Gargalos — TraceAnalyzer

Com os traces coletados, podemos programaticamente identificar gargalos: qual agente é mais lento, qual ferramenta falha mais, quais tarefas têm critical path mais longo.

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
import httpx
import asyncio

@dataclass
class SpanData:
    trace_id: str
    span_id: str
    parent_span_id: Optional[str]
    operation_name: str
    start_time: datetime
    end_time: datetime
    duration_ms: float
    tags: Dict[str, str]
    status: str  # OK / ERROR

    @property
    def is_error(self) -> bool:
        return self.status == "ERROR"

    @property
    def is_llm_call(self) -> bool:
        return self.operation_name.startswith("llm.")

    @property
    def is_tool_call(self) -> bool:
        return self.operation_name.startswith("tool.")


@dataclass
class TraceAnalysis:
    trace_id: str
    total_duration_ms: float
    critical_path: List[SpanData]
    bottleneck_span: Optional[SpanData]
    parallel_efficiency: float   # 1.0 = perfeito, <1.0 = overhead
    error_spans: List[SpanData]
    llm_time_ms: float
    tool_time_ms: float
    orchestration_overhead_ms: float


class TraceAnalyzer:
    """Analisa traces do Jaeger/Tempo para encontrar gargalos."""

    def __init__(self, jaeger_url: str = "http://localhost:16686"):
        self.jaeger_url = jaeger_url

    async def fetch_trace(self, trace_id: str) -> List[SpanData]:
        """Busca todos os spans de um trace no Jaeger."""
        async with httpx.AsyncClient() as client:
            resp = await client.get(
                f"{self.jaeger_url}/api/traces/{trace_id}"
            )
            data = resp.json()

        spans = []
        for span in data.get("data", [{}])[0].get("spans", []):
            start_us = span["startTime"]
            duration_us = span["duration"]
            start_dt = datetime.fromtimestamp(start_us / 1_000_000)
            end_dt = start_dt + timedelta(microseconds=duration_us)

            tags = {t["key"]: str(t["value"]) for t in span.get("tags", [])}
            has_error = any(t["key"] == "error" and t["value"] for t in span.get("tags", []))

            spans.append(SpanData(
                trace_id=trace_id,
                span_id=span["spanID"],
                parent_span_id=(span.get("references") or [{}])[0].get("spanID"),
                operation_name=span["operationName"],
                start_time=start_dt,
                end_time=end_dt,
                duration_ms=duration_us / 1000,
                tags=tags,
                status="ERROR" if has_error else "OK"
            ))
        return spans

    def find_critical_path(self, spans: List[SpanData]) -> List[SpanData]:
        """Encontra o caminho crítico (sequência de spans mais longa)."""
        # Constrói árvore de spans
        children: Dict[Optional[str], List[SpanData]] = {}
        for span in spans:
            parent = span.parent_span_id
            children.setdefault(parent, []).append(span)

        # DFS para encontrar o caminho mais longo
        def longest_path(span_id: Optional[str]) -> List[SpanData]:
            kids = children.get(span_id, [])
            if not kids:
                return []
            paths = [longest_path(s.span_id) for s in kids]
            longest = max(paths, key=lambda p: sum(s.duration_ms for s in p), default=[])
            if kids:
                representative = max(kids, key=lambda s: s.duration_ms)
                return [representative] + longest
            return longest

        roots = children.get(None, [])
        if not roots:
            return []
        root = max(roots, key=lambda s: s.duration_ms)
        return [root] + longest_path(root.span_id)

    def analyze(self, spans: List[SpanData]) -> TraceAnalysis:
        """Análise completa de um trace."""
        root_spans = [s for s in spans if not s.parent_span_id]
        total_duration = max(s.duration_ms for s in root_spans) if root_spans else 0

        critical_path = self.find_critical_path(spans)
        bottleneck = max(critical_path, key=lambda s: s.duration_ms) if critical_path else None

        # Calcula tempo em cada categoria
        llm_time = sum(s.duration_ms for s in spans if s.is_llm_call)
        tool_time = sum(s.duration_ms for s in spans if s.is_tool_call)
        total_span_time = sum(s.duration_ms for s in spans)

        # Parallel efficiency: se tudo fosse serial, demoraria total_span_time
        # Mas demora total_duration (com paralelismo)
        efficiency = total_duration / total_span_time if total_span_time > 0 else 1.0

        orchestration_overhead = total_duration - llm_time - tool_time

        return TraceAnalysis(
            trace_id=spans[0].trace_id if spans else "",
            total_duration_ms=total_duration,
            critical_path=critical_path,
            bottleneck_span=bottleneck,
            parallel_efficiency=efficiency,
            error_spans=[s for s in spans if s.is_error],
            llm_time_ms=llm_time,
            tool_time_ms=tool_time,
            orchestration_overhead_ms=max(0, orchestration_overhead)
        )

    def format_report(self, analysis: TraceAnalysis) -> str:
        bn = analysis.bottleneck_span
        return f"""
ANÁLISE DE TRACE: {analysis.trace_id[:16]}...
Duração total: {analysis.total_duration_ms:.0f}ms

DECOMPOSIÇÃO DO TEMPO:
  LLM calls:      {analysis.llm_time_ms:.0f}ms ({analysis.llm_time_ms/analysis.total_duration_ms*100:.0f}%)
  Tool calls:     {analysis.tool_time_ms:.0f}ms
  Orquestração:   {analysis.orchestration_overhead_ms:.0f}ms

PARALELISMO:
  Eficiência: {analysis.parallel_efficiency:.1%}

GARGALO: {bn.operation_name if bn else 'N/A'} ({bn.duration_ms:.0f}ms)
ERROS: {len(analysis.error_spans)} spans com erro
"""
6

Correlacionando Métricas, Logs e Traces — Observabilidade Unificada

O poder real da observabilidade aparece quando você pode correlacionar os 3 pilares: viu uma spike de latência no Grafana? Clica no ponto, o Grafana abre os logs do Loki naquele intervalo, e você pode pular direto para o trace no Tempo que causou a anomalia.

import logging
import json
from opentelemetry import trace as otel_trace

class OTelAwareLogger:
    """Logger que injeta trace_id e span_id em cada log entry.

    Isso permite correlacionar logs no Loki com traces no Tempo.
    No Grafana, você pode clicar num log e pular para o trace.
    """

    def __init__(self, name: str):
        self._logger = logging.getLogger(name)

    def _get_trace_context(self) -> dict:
        """Extrai trace context do span atual."""
        span = otel_trace.get_current_span()
        if not span or not span.is_recording():
            return {}

        ctx = span.get_span_context()
        return {
            "trace_id": format(ctx.trace_id, '032x'),
            "span_id": format(ctx.span_id, '016x'),
            "trace_flags": ctx.trace_flags,
        }

    def info(self, message: str, **extra):
        entry = {
            "level": "INFO",
            "message": message,
            **self._get_trace_context(),
            **extra
        }
        self._logger.info(json.dumps(entry))

    def error(self, message: str, exc: Exception = None, **extra):
        entry = {
            "level": "ERROR",
            "message": message,
            **self._get_trace_context(),
            **extra
        }
        if exc:
            entry["exception"] = str(exc)
            entry["exception_type"] = type(exc).__name__
        self._logger.error(json.dumps(entry))

    def warning(self, message: str, **extra):
        entry = {
            "level": "WARNING",
            "message": message,
            **self._get_trace_context(),
            **extra
        }
        self._logger.warning(json.dumps(entry))


# ============================================================
# LOG ENTRY RESULTANTE (correlacionado com trace)
# ============================================================
# {
#   "level": "INFO",
#   "message": "LLM call completed",
#   "trace_id": "7f3a9b2c1e4d5f6a8b9c0d1e2f3a4b5c",
#   "span_id": "1a2b3c4d5e6f7890",
#   "model": "claude-3-sonnet-20240229",
#   "tokens": 750,
#   "cost_usd": 0.0023,
#   "duration_ms": 2340
# }
#
# Com este formato no Loki, você pode:
# 1. Ver spike de latência no Grafana (Prometheus)
# 2. Filtrar logs no Loki por trace_id
# 3. Navegar para o trace no Tempo com 1 clique


# ============================================================
# GRAFANA DATASOURCE LINKING (grafana provisioning)
# ============================================================
# Em grafana/provisioning/datasources/datasources.yml:
#
# datasources:
#   - name: Tempo
#     type: tempo
#     url: http://tempo:3200
#     jsonData:
#       tracesToLogs:
#         datasourceUid: loki
#         tags: ['trace_id']
#         mappedTags: [{key: 'trace_id', value: 'trace_id'}]
#
#   - name: Loki
#     type: loki
#     url: http://loki:3100
#     jsonData:
#       derivedFields:
#         - name: TraceID
#           matcherRegex: '"trace_id":"(\w+)"'
#           url: '$${__value.raw}'
#           datasourceUid: tempo

Resumo do Módulo 5.8

  • Métricas respondem "o que"; traces respondem "por que" — use ambos para observabilidade completa
  • OpenTelemetry é o padrão universal — escreva uma vez, exporte para qualquer backend
  • Propagação de contexto via W3C traceparent garante trace coeso em sistemas distribuídos
  • OTel Collector em produção: filtra dados sensíveis (prompts, emails) antes de exportar
  • TraceAnalyzer programático: encontra critical path, gargalos e eficiência de paralelismo
  • Logs com trace_id + Grafana datasource linking = navegação 1-clique entre métricas/logs/traces
5.7 — Métricas 5.9 — Arquitetura Composta