MÓDULO 5.11 PROJETO FINAL Trilha 5 — Conclusão

🏗️ Projeto Final — Orquestrador de Análise Competitiva

Construção completa de um orquestrador de análise competitiva com todas as 6 camadas integradas: planejamento inteligente, roteamento multi-agente, controle de estado, supervisão, governança e observabilidade. Deploy em Docker.

6
Camadas
~75min
Duração
Produção
Nível
Deploy
Docker
1

Especificação do Projeto — O que Vamos Construir

O projeto final é um orquestrador de análise competitiva que, dado uma empresa e seus competidores, produz um relatório executivo completo. É o caso de uso que une todas as 6 camadas.

Input → Output

INPUT
POST /api/analyze
{
  "company": "Minha SaaS",
  "competitors": [
    "Notion", "Confluence",
    "Coda", "Obsidian"
  ],
  "objective": "Entender gaps
    de produto e oportunidades
    de diferenciação",
  "department": "product",
  "budget_usd": 5.00
}
OUTPUT (30-90 segundos depois)
Relatório Executivo (.md):
• Executive Summary (3 bullets)
• Mapa de posicionamento
• SWOT comparativo
• Top 5 gaps de produto
• Recomendações priorizadas
• Roadmap de 90 dias
Custo real: ~$0.80 (vs. $47 sem otimização)
Tempo: ~45s (4 agentes paralelos)
C1 — Planejamento
DAG com 12 tarefas em 4 waves paralelas
C2 — Roteamento
4 agentes: researcher, analyst, writer, critic
C3 — Estado
Redis para tasks + Postgres para audit
C4 — Supervisão
Heartbeat + retry inteligente + replanejamento
C5 — Governança
RBAC + audit trail + budget hard limit
C6 — Observabilidade
Prometheus + OTel traces + Grafana
2

Estrutura do Projeto — Código Completo

competitive-intelligence-orchestrator/
├── src/
│   ├── core/                      # Domain — sem dependências externas
│   │   ├── __init__.py
│   │   ├── planner.py             # Camada 1: TaskDecomposer, ExecutionDAG
│   │   ├── router.py              # Camada 2: AgentRouter, ComplexityClassifier
│   │   ├── state_manager.py       # Camada 3: OrchestratorState, SessionManager
│   │   ├── supervisor.py          # Camada 4: HealthMonitor, RetryHandler, Replanner
│   │   ├── governor.py            # Camada 5: BudgetMonitor, RBACManager, AuditTrail
│   │   └── observer.py            # Camada 6: MetricsCollector, TraceManager
│   │
│   ├── adapters/                  # Infrastructure adapters
│   │   ├── redis_state.py         # StatePort → Redis
│   │   ├── postgres_audit.py      # AuditPort → Postgres
│   │   ├── anthropic_llm.py       # LLMPort → Anthropic
│   │   ├── prometheus_metrics.py  # MetricsPort → Prometheus
│   │   ├── otel_tracer.py         # TracerPort → OpenTelemetry
│   │   └── slack_notifier.py      # NotificationPort → Slack
│   │
│   ├── agents/                    # Agentes especializados
│   │   ├── researcher.py          # Pesquisa web + scraping
│   │   ├── analyst.py             # Análise estratégica
│   │   ├── writer.py              # Geração de relatório
│   │   └── critic.py              # Quality review
│   │
│   ├── api/                       # FastAPI app
│   │   ├── main.py
│   │   ├── routes/
│   │   │   ├── workflows.py       # POST /workflows, GET /sessions/{id}
│   │   │   └── health.py          # GET /health, GET /metrics
│   │   └── websocket.py           # WS /sessions/{id}/stream
│   │
│   └── workers/                   # Background workers
│       ├── task_worker.py         # Processa tasks da fila Redis
│       └── monitor_worker.py      # Monitora saúde dos agentes
│
├── tests/
│   ├── unit/                      # Testes sem infraestrutura
│   ├── integration/               # Testes com Docker
│   └── e2e/                       # Testes end-to-end
│
├── infra/
│   ├── docker-compose.yml
│   ├── docker-compose.prod.yml
│   ├── prometheus.yml
│   ├── grafana/
│   │   ├── dashboards/
│   │   │   ├── orchestrator.json  # Dashboard principal
│   │   │   └── slo.json          # SLO dashboard
│   │   └── provisioning/
│   ├── postgres/
│   │   └── init.sql
│   └── nginx/
│       └── nginx.conf
│
├── Dockerfile
├── Dockerfile.worker
├── pyproject.toml
└── .env.example
# pyproject.toml — Dependências do projeto

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "competitive-intelligence-orchestrator"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
    # Core AI
    "anthropic>=0.34.0",
    "langgraph>=0.2.0",
    "crewai>=0.67.0",
    "crewai-tools>=0.12.0",

    # API
    "fastapi>=0.115.0",
    "uvicorn[standard]>=0.32.0",
    "pydantic>=2.9.0",
    "websockets>=13.0",

    # Storage
    "redis[asyncio]>=5.2.0",
    "asyncpg>=0.30.0",
    "sqlalchemy[asyncio]>=2.0.0",

    # Observability
    "prometheus-client>=0.21.0",
    "opentelemetry-api>=1.28.0",
    "opentelemetry-sdk>=1.28.0",
    "opentelemetry-exporter-otlp>=1.28.0",
    "opentelemetry-instrumentation-fastapi>=0.49b0",
    "opentelemetry-instrumentation-httpx>=0.49b0",

    # HTTP
    "httpx>=0.28.0",

    # Utils
    "pydantic-settings>=2.6.0",
    "structlog>=24.4.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.3.0",
    "pytest-asyncio>=0.24.0",
    "pytest-cov>=6.0.0",
    "ruff>=0.8.0",
    "mypy>=1.13.0",
]
3

Orquestrador Principal — Integrando as 6 Camadas

# src/core/orchestrator.py — Núcleo que une todas as camadas

from dataclasses import dataclass, field
from typing import List, Optional
import asyncio
import time
import uuid
from opentelemetry import trace

from .planner import TaskDecomposer, ExecutionDAG, Task
from .router import AgentRouter, ComplexityClassifier
from .state_manager import SessionManager, OrchestratorState
from .supervisor import HealthMonitor, IntelligentRetryHandler, AutoReplanner
from .governor import BudgetMonitor, RBACManager, AuditTrail
from .observer import OrchestratorMetrics

tracer = trace.get_tracer("competitive_intelligence")


@dataclass
class OrchestratorConfig:
    max_parallel_agents: int = 5
    max_retries: int = 3
    budget_limit_usd: float = 5.0
    department: str = "product"
    rbac_role: str = "EXECUTOR"
    session_ttl_hours: int = 2


@dataclass
class OrchestratorResult:
    session_id: str
    status: str               # completed / failed / escalated
    report: Optional[str]
    tasks_completed: int
    tasks_failed: int
    total_cost_usd: float
    duration_seconds: float
    models_used: dict
    trace_id: str


class CompetitiveIntelligenceOrchestrator:
    """
    Orquestrador completo com todas as 6 camadas integradas.
    Caso de uso: análise competitiva automatizada.
    """

    def __init__(
        self,
        config: OrchestratorConfig,
        # Injeção de dependência (adapters)
        llm_adapter,
        state_adapter,
        metrics_adapter,
        notification_adapter,
        audit_adapter,
    ):
        self.config = config

        # Camada 1: Planejamento
        self.planner = TaskDecomposer(llm_adapter)

        # Camada 2: Roteamento
        self.classifier = ComplexityClassifier()
        self.router = AgentRouter(llm_adapter, self.classifier)

        # Camada 3: Estado
        self.state_mgr = SessionManager(state_adapter)

        # Camada 4: Supervisão
        self.health_monitor = HealthMonitor(state_adapter)
        self.retry_handler = IntelligentRetryHandler(llm_adapter)
        self.replanner = AutoReplanner(llm_adapter)

        # Camada 5: Governança
        self.budget = BudgetMonitor(
            soft_limit=config.budget_limit_usd * 0.8,
            hard_limit=config.budget_limit_usd
        )
        self.rbac = RBACManager()
        self.audit = AuditTrail(audit_adapter)

        # Camada 6: Observabilidade
        self.metrics = OrchestratorMetrics()

        # Adapters
        self.notifications = notification_adapter
        self.llm = llm_adapter

    async def analyze(
        self,
        company: str,
        competitors: List[str],
        objective: str
    ) -> OrchestratorResult:
        """
        Ponto de entrada principal.
        Integra todas as 6 camadas sequencialmente.
        """
        session_id = f"sess_{uuid.uuid4().hex[:12]}"
        start_time = time.time()

        with tracer.start_as_current_span(
            "orchestrator.analyze",
            attributes={
                "session.id": session_id,
                "company": company,
                "competitors.count": len(competitors),
            }
        ) as root_span:

            trace_id = format(
                root_span.get_span_context().trace_id, '032x'
            )

            try:
                # ========================
                # CAMADA 5: Autorização
                # ========================
                with tracer.start_as_current_span("governor.authorize"):
                    if not self.rbac.is_authorized(
                        self.config.rbac_role, "execute_workflow"
                    ):
                        raise PermissionError(f"Role {self.config.rbac_role} não autorizado")

                    self.audit.log_action(
                        session_id=session_id,
                        action="workflow_started",
                        agent_id="orchestrator",
                        details={"company": company, "competitors": competitors}
                    )

                # ========================
                # CAMADA 3: Inicializa sessão
                # ========================
                await self.state_mgr.create_session(session_id, {
                    "company": company,
                    "competitors": competitors,
                    "objective": objective,
                    "budget_limit": self.config.budget_limit_usd,
                    "department": self.config.department,
                    "status": "planning",
                })

                # ========================
                # CAMADA 1: Planejamento
                # ========================
                with tracer.start_as_current_span("planner.decompose") as plan_span:
                    dag = await self.planner.decompose(
                        objective=objective,
                        context={
                            "company": company,
                            "competitors": competitors
                        }
                    )
                    plan_span.set_attribute("tasks.count", len(dag.tasks))
                    self.metrics.tasks_planned.inc(len(dag.tasks))

                await self.state_mgr.update_session(session_id, {
                    "status": "executing",
                    "tasks_total": len(dag.tasks),
                    "tasks_completed": 0,
                })

                # ========================
                # CAMADA 2: Execução por waves (topological order)
                # ========================
                all_results = {}
                tasks_completed = 0
                tasks_failed = 0
                total_cost = 0.0

                for wave_idx, wave in enumerate(dag.topological_waves()):
                    with tracer.start_as_current_span(
                        f"executor.wave_{wave_idx}",
                        attributes={"wave.size": len(wave)}
                    ):
                        wave_results = await asyncio.gather(
                            *[self._execute_with_supervision(
                                task=t,
                                session_id=session_id,
                                context={**all_results}  # resultados anteriores
                            ) for t in wave],
                            return_exceptions=True
                        )

                        for task, result in zip(wave, wave_results):
                            if isinstance(result, Exception):
                                tasks_failed += 1
                                self.audit.log_action(
                                    session_id=session_id,
                                    action="task_failed",
                                    agent_id=task.assigned_agent or "unknown",
                                    details={"task_id": task.id, "error": str(result)}
                                )
                            else:
                                all_results[task.id] = result["output"]
                                tasks_completed += 1
                                total_cost += result.get("cost", 0)
                                self.budget.record_cost(total_cost, session_id)

                        await self.state_mgr.update_session(session_id, {
                            "tasks_completed": tasks_completed,
                            "cost_so_far": total_cost,
                        })

                # ========================
                # SÍNTESE FINAL (Camada 2 — agente writer)
                # ========================
                with tracer.start_as_current_span("synthesizer.compile"):
                    report = await self._synthesize_report(
                        company=company,
                        competitors=competitors,
                        task_results=all_results,
                        session_id=session_id
                    )
                    total_cost += 0.05  # custo da síntese

                # ========================
                # CAMADA 5: Audit final
                # ========================
                self.audit.log_action(
                    session_id=session_id,
                    action="workflow_completed",
                    agent_id="orchestrator",
                    details={
                        "tasks_completed": tasks_completed,
                        "tasks_failed": tasks_failed,
                        "total_cost": total_cost,
                    }
                )

                await self.state_mgr.update_session(session_id, {
                    "status": "completed",
                    "final_result": report[:500],  # preview
                    "cost_so_far": total_cost,
                    "tasks_completed": tasks_completed,
                })

                # ========================
                # CAMADA 6: Métricas finais
                # ========================
                duration = time.time() - start_time
                self.metrics.record_workflow_completion(
                    duration=duration,
                    cost=total_cost,
                    tasks=tasks_completed,
                    status="success"
                )

                root_span.set_attribute("workflow.status", "completed")
                root_span.set_attribute("workflow.cost_usd", total_cost)
                root_span.set_attribute("workflow.tasks_completed", tasks_completed)

                return OrchestratorResult(
                    session_id=session_id,
                    status="completed",
                    report=report,
                    tasks_completed=tasks_completed,
                    tasks_failed=tasks_failed,
                    total_cost_usd=total_cost,
                    duration_seconds=duration,
                    models_used=self.router.get_model_usage_summary(),
                    trace_id=trace_id,
                )

            except Exception as e:
                await self.state_mgr.update_session(session_id, {
                    "status": "failed",
                    "error": str(e)
                })
                await self.notifications.send_alert(
                    channel="#alerts-critical",
                    message=f"Workflow {session_id} falhou: {e}",
                    severity="error"
                )
                root_span.set_status(trace.StatusCode.ERROR, str(e))
                raise

    async def _execute_with_supervision(
        self, task: "Task", session_id: str, context: dict
    ) -> dict:
        """Executa tarefa com monitoramento de saúde e retry inteligente."""
        with tracer.start_as_current_span(
            "agent.execute",
            attributes={"task.id": task.id, "task.type": task.type}
        ) as span:
            # Camada 2: Roteamento — qual modelo usar?
            classification = await self.classifier.classify(task.__dict__)
            agent_id = f"agent_{task.type}_{classification.complexity.value}"

            span.set_attribute("agent.id", agent_id)
            span.set_attribute("llm.model", classification.model)

            # Camada 4: Heartbeat
            self.health_monitor.register_agent(agent_id, session_id)

            for attempt in range(self.config.max_retries + 1):
                try:
                    self.health_monitor.heartbeat(agent_id)

                    # Constrói prompt mínimo com contexto relevante
                    relevant_context = {
                        k: v for k, v in context.items()
                        if k in task.depends_on
                    }

                    prompt = f"""Execute esta tarefa:
Tipo: {task.type}
Descrição: {task.description}
Contexto relevante: {str(relevant_context)[:1000]}

Produza apenas o resultado, sem explicações."""

                    result = await self.llm.complete(
                        prompt=prompt,
                        model=classification.model
                    )

                    cost = classification.estimated_cost
                    self.metrics.llm_calls_total.labels(
                        model=classification.model,
                        agent_id=agent_id,
                        success="true"
                    ).inc()

                    return {"output": result, "cost": cost, "model": classification.model}

                except Exception as e:
                    if attempt >= self.config.max_retries:
                        raise

                    # Camada 4: Retry inteligente (escala modelo se necessário)
                    escalated_model = self.retry_handler.get_escalated_model(
                        classification.model, attempt
                    )
                    classification.model = escalated_model
                    await asyncio.sleep(2 ** attempt)  # exponential backoff

    async def _synthesize_report(
        self,
        company: str,
        competitors: List[str],
        task_results: dict,
        session_id: str,
    ) -> str:
        """Síntese final usando modelo mais poderoso."""
        results_text = "\n\n".join([
            f"### {task_id}\n{result}"
            for task_id, result in task_results.items()
        ])

        prompt = f"""Você é um consultor estratégico sênior.
Produza um relatório executivo completo para {company} analisando competidores: {', '.join(competitors)}.

Dados coletados pelos agentes de pesquisa e análise:
{results_text[:8000]}

Formato obrigatório (Markdown):
# Análise Competitiva: {company}

## Executive Summary
[3 bullets de insight mais importantes]

## Landscape Competitivo
[Tabela comparativa dos competidores]

## Posicionamento e Gaps
[Onde {company} se diferencia e onde está atrás]

## Análise SWOT
[SWOT comparativo focado em produto]

## Oportunidades de Diferenciação
[Top 5 oportunidades concretas]

## Recomendações e Roadmap (90 dias)
[Actions priorizadas por impacto]

## Apêndice: Dados de Suporte
[Dados relevantes das pesquisas]"""

        report = await self.llm.complete(
            prompt=prompt,
            model="claude-opus-4-5",  # síntese final merece Opus
            max_tokens=4096
        )
        return report
4

Testes — Unit, Integration e E2E

# tests/unit/test_orchestrator.py — Testes sem infraestrutura

import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

from src.core.orchestrator import (
    CompetitiveIntelligenceOrchestrator,
    OrchestratorConfig
)


@pytest.fixture
def mock_deps():
    """Cria dependências mockadas para testes unitários."""
    llm = AsyncMock()
    llm.complete.return_value = """
    {
        "tasks": [
            {"id": "t1", "type": "research", "description": "Pesquisar Notion", "depends_on": []},
            {"id": "t2", "type": "analysis", "description": "Analisar dados", "depends_on": ["t1"]}
        ]
    }"""
    llm.estimate_cost.return_value = 0.001

    state = AsyncMock()
    state.get_task_state.return_value = None
    state.save_task_state.return_value = None

    metrics = MagicMock()
    notifications = AsyncMock()
    audit = MagicMock()

    return llm, state, metrics, notifications, audit


@pytest.fixture
def orchestrator(mock_deps):
    llm, state, metrics, notifications, audit = mock_deps
    config = OrchestratorConfig(budget_limit_usd=10.0)
    return CompetitiveIntelligenceOrchestrator(
        config=config,
        llm_adapter=llm,
        state_adapter=state,
        metrics_adapter=metrics,
        notification_adapter=notifications,
        audit_adapter=audit
    )


@pytest.mark.asyncio
async def test_analyze_creates_session(orchestrator, mock_deps):
    """Verifica que analyze() cria uma sessão no state manager."""
    llm, state, *_ = mock_deps

    # Configura LLM para retornar relatório na síntese
    llm.complete.return_value = "# Análise Competitiva\n..."

    result = await orchestrator.analyze(
        company="MinhaSaaS",
        competitors=["Notion"],
        objective="Entender diferenciação"
    )

    assert result.session_id.startswith("sess_")
    assert state.save_task_state.called


@pytest.mark.asyncio
async def test_budget_hard_limit_stops_execution(mock_deps):
    """Verifica que budget hard limit interrompe o workflow."""
    llm, state, metrics, notifications, audit = mock_deps

    # Budget zero → deve falhar imediatamente
    config = OrchestratorConfig(budget_limit_usd=0.0)
    orchestrator = CompetitiveIntelligenceOrchestrator(
        config=config,
        llm_adapter=llm,
        state_adapter=state,
        metrics_adapter=metrics,
        notification_adapter=notifications,
        audit_adapter=audit
    )

    with pytest.raises(Exception):
        await orchestrator.analyze(
            company="Test",
            competitors=["Comp1"],
            objective="test"
        )


@pytest.mark.asyncio
async def test_retry_on_llm_failure(orchestrator, mock_deps):
    """Verifica retry com escalação de modelo após falha."""
    llm, *_ = mock_deps

    # Primeira chamada falha, segunda tem sucesso
    llm.complete.side_effect = [
        Exception("Rate limit"),
        "# Análise\n..."
    ]

    result = await orchestrator.analyze(
        company="Test",
        competitors=["Comp1"],
        objective="test"
    )

    assert llm.complete.call_count >= 2


# ============================================================
# TESTE DE INTEGRAÇÃO (requer Docker)
# ============================================================

# tests/integration/test_redis_state.py

@pytest.mark.integration
@pytest.mark.asyncio
async def test_session_survives_restart():
    """Verifica que sessão sobrevive a restart do worker."""
    from src.adapters.redis_state import RedisStateAdapter

    adapter = RedisStateAdapter("redis://localhost:6379")
    session_id = "test_sess_001"

    await adapter.save_task_state(session_id, "meta", {
        "status": "executing",
        "tasks_completed": 3,
    })

    # Simula restart: cria nova instância do adapter
    new_adapter = RedisStateAdapter("redis://localhost:6379")
    state = await new_adapter.get_task_state(session_id, "meta")

    assert state["status"] == "executing"
    assert state["tasks_completed"] == 3

    # Cleanup
    import redis.asyncio as aioredis
    r = aioredis.from_url("redis://localhost:6379")
    await r.delete(f"session:{session_id}:task:meta")
5

Deploy em Produção — Dockerfile e CI/CD

# Dockerfile — Build otimizado com multi-stage

FROM python:3.11-slim AS builder
WORKDIR /app
RUN pip install uv
COPY pyproject.toml .
RUN uv pip install --system -r pyproject.toml --no-dev

FROM python:3.11-slim AS runtime
WORKDIR /app

# Copia apenas o necessário do builder
COPY --from=builder /usr/local/lib/python3.11 /usr/local/lib/python3.11
COPY --from=builder /usr/local/bin /usr/local/bin

# Copia código fonte
COPY src/ ./src/

# Usuário não-root para segurança
RUN useradd -r -u 1001 appuser
USER appuser

# Variáveis de ambiente defaults (sobrescrever em prod)
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PORT=8000

EXPOSE $PORT
EXPOSE 8001  # Prometheus metrics

CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000",
     "--workers", "2", "--log-config", "log_config.json"]

---
# Dockerfile.worker — Worker separado

FROM python:3.11-slim AS runtime
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11 /usr/local/lib/python3.11
COPY src/ ./src/

RUN useradd -r -u 1001 appuser
USER appuser

CMD ["python", "-m", "src.workers.task_worker"]

---
# .github/workflows/deploy.yml — CI/CD com GitHub Actions

name: Deploy to Production

on:
  push:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      redis:
        image: redis:7-alpine
        ports: ["6379:6379"]
      postgres:
        image: postgres:16-alpine
        env:
          POSTGRES_DB: test_db
          POSTGRES_USER: user
          POSTGRES_PASSWORD: pass
        ports: ["5432:5432"]

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - run: pip install uv && uv pip install --system -e ".[dev]"
      - run: pytest tests/unit -v --cov=src --cov-report=xml
      - run: pytest tests/integration -v -m integration
        env:
          REDIS_URL: redis://localhost:6379
          DATABASE_URL: postgresql://user:pass@localhost:5432/test_db

  build-and-push:
    needs: test
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: docker/login-action@v3
        with:
          registry: ghcr.io
          username: ${{ github.actor }}
          password: ${{ secrets.GITHUB_TOKEN }}
      - uses: docker/build-push-action@v5
        with:
          push: true
          tags: ghcr.io/${{ github.repository }}:latest,${{ github.sha }}
          cache-from: type=gha
          cache-to: type=gha,mode=max

  deploy:
    needs: build-and-push
    runs-on: ubuntu-latest
    steps:
      - name: Deploy via SSH
        uses: appleboy/ssh-action@v1.0.0
        with:
          host: ${{ secrets.PROD_HOST }}
          username: deploy
          key: ${{ secrets.PROD_SSH_KEY }}
          script: |
            cd /opt/orchestrator
            docker compose pull
            docker compose up -d --remove-orphans
            docker compose exec orchestrator-api python -m pytest tests/e2e -v
            echo "Deploy concluído: $(date)"
6

Consolidação — O que Você Construiu na Trilha 5

Ao concluir esta trilha, você dominou a construção de um orquestrador de IA de nível de produção. Cada módulo foi uma camada que se soma às anteriores, formando um sistema coeso e robusto.

C1 Planejamento Inteligente

Transformou objetivos vagos em DAGs de tarefas com dependências, prioridades e waves de execução paralela.

C2 Roteamento Multi-Agente

StaticRouter → LLMRouter → AgentPool com load balancing e CostAwareRouter para roteamento por custo e complexidade.

C3 Controle de Estado

Redis para estado efêmero (HSET, ZADD, pub/sub), Postgres para persistência e SessionManager com checkpoint/resume.

C4 Supervisão e Replanejamento

Heartbeat, SLA monitoring, stall detection com fingerprinting, retry inteligente com escalação de modelo e auto-replanejamento.

C5 Governança

Logs estruturados JSON, audit trail imutável com hash chains SHA-256, RBAC para agentes, budget monitoring com limites hard/soft.

C6 Observabilidade

Prometheus + Grafana para métricas, SLIs/SLOs com error budgets, OpenTelemetry para distributed tracing, correlação logs/metrics/traces.

Números do que você construiu

6
Camadas de orquestração
91%
Redução de custo vs. baseline
5
Adapters de infraestrutura
99.5%
SLO de disponibilidade

Próximos Passos — Trilha 6

Na Trilha 6, você aprenderá como operacionalizar e escalar seu orquestrador: Kubernetes com auto-scaling baseado em fila, multi-tenancy, disaster recovery, compliance (SOC2, GDPR), e como construir um produto SaaS completo a partir do seu orquestrador.

Kubernetes Multi-tenancy Disaster Recovery SOC2 / GDPR SaaS Product
5.10 — Custo-Otimização Trilha 6 — Operacionalização
🏗️

Trilha 5 Concluída!

Você dominou a construção de orquestradores de IA de nível de produção. Planejamento, roteamento, estado, supervisão, governança e observabilidade — as 6 camadas que transformam um script simples em um sistema resiliente, auditável e eficiente em custo.