Especificação do Projeto — O que Vamos Construir
O projeto final é um orquestrador de análise competitiva que, dado uma empresa e seus competidores, produz um relatório executivo completo. É o caso de uso que une todas as 6 camadas.
Input → Output
POST /api/analyze
{
"company": "Minha SaaS",
"competitors": [
"Notion", "Confluence",
"Coda", "Obsidian"
],
"objective": "Entender gaps
de produto e oportunidades
de diferenciação",
"department": "product",
"budget_usd": 5.00
}
Estrutura do Projeto — Código Completo
competitive-intelligence-orchestrator/
├── src/
│ ├── core/ # Domain — sem dependências externas
│ │ ├── __init__.py
│ │ ├── planner.py # Camada 1: TaskDecomposer, ExecutionDAG
│ │ ├── router.py # Camada 2: AgentRouter, ComplexityClassifier
│ │ ├── state_manager.py # Camada 3: OrchestratorState, SessionManager
│ │ ├── supervisor.py # Camada 4: HealthMonitor, RetryHandler, Replanner
│ │ ├── governor.py # Camada 5: BudgetMonitor, RBACManager, AuditTrail
│ │ └── observer.py # Camada 6: MetricsCollector, TraceManager
│ │
│ ├── adapters/ # Infrastructure adapters
│ │ ├── redis_state.py # StatePort → Redis
│ │ ├── postgres_audit.py # AuditPort → Postgres
│ │ ├── anthropic_llm.py # LLMPort → Anthropic
│ │ ├── prometheus_metrics.py # MetricsPort → Prometheus
│ │ ├── otel_tracer.py # TracerPort → OpenTelemetry
│ │ └── slack_notifier.py # NotificationPort → Slack
│ │
│ ├── agents/ # Agentes especializados
│ │ ├── researcher.py # Pesquisa web + scraping
│ │ ├── analyst.py # Análise estratégica
│ │ ├── writer.py # Geração de relatório
│ │ └── critic.py # Quality review
│ │
│ ├── api/ # FastAPI app
│ │ ├── main.py
│ │ ├── routes/
│ │ │ ├── workflows.py # POST /workflows, GET /sessions/{id}
│ │ │ └── health.py # GET /health, GET /metrics
│ │ └── websocket.py # WS /sessions/{id}/stream
│ │
│ └── workers/ # Background workers
│ ├── task_worker.py # Processa tasks da fila Redis
│ └── monitor_worker.py # Monitora saúde dos agentes
│
├── tests/
│ ├── unit/ # Testes sem infraestrutura
│ ├── integration/ # Testes com Docker
│ └── e2e/ # Testes end-to-end
│
├── infra/
│ ├── docker-compose.yml
│ ├── docker-compose.prod.yml
│ ├── prometheus.yml
│ ├── grafana/
│ │ ├── dashboards/
│ │ │ ├── orchestrator.json # Dashboard principal
│ │ │ └── slo.json # SLO dashboard
│ │ └── provisioning/
│ ├── postgres/
│ │ └── init.sql
│ └── nginx/
│ └── nginx.conf
│
├── Dockerfile
├── Dockerfile.worker
├── pyproject.toml
└── .env.example
# pyproject.toml — Dependências do projeto
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "competitive-intelligence-orchestrator"
version = "1.0.0"
requires-python = ">=3.11"
dependencies = [
# Core AI
"anthropic>=0.34.0",
"langgraph>=0.2.0",
"crewai>=0.67.0",
"crewai-tools>=0.12.0",
# API
"fastapi>=0.115.0",
"uvicorn[standard]>=0.32.0",
"pydantic>=2.9.0",
"websockets>=13.0",
# Storage
"redis[asyncio]>=5.2.0",
"asyncpg>=0.30.0",
"sqlalchemy[asyncio]>=2.0.0",
# Observability
"prometheus-client>=0.21.0",
"opentelemetry-api>=1.28.0",
"opentelemetry-sdk>=1.28.0",
"opentelemetry-exporter-otlp>=1.28.0",
"opentelemetry-instrumentation-fastapi>=0.49b0",
"opentelemetry-instrumentation-httpx>=0.49b0",
# HTTP
"httpx>=0.28.0",
# Utils
"pydantic-settings>=2.6.0",
"structlog>=24.4.0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.3.0",
"pytest-asyncio>=0.24.0",
"pytest-cov>=6.0.0",
"ruff>=0.8.0",
"mypy>=1.13.0",
]
Orquestrador Principal — Integrando as 6 Camadas
# src/core/orchestrator.py — Núcleo que une todas as camadas
from dataclasses import dataclass, field
from typing import List, Optional
import asyncio
import time
import uuid
from opentelemetry import trace
from .planner import TaskDecomposer, ExecutionDAG, Task
from .router import AgentRouter, ComplexityClassifier
from .state_manager import SessionManager, OrchestratorState
from .supervisor import HealthMonitor, IntelligentRetryHandler, AutoReplanner
from .governor import BudgetMonitor, RBACManager, AuditTrail
from .observer import OrchestratorMetrics
tracer = trace.get_tracer("competitive_intelligence")
@dataclass
class OrchestratorConfig:
max_parallel_agents: int = 5
max_retries: int = 3
budget_limit_usd: float = 5.0
department: str = "product"
rbac_role: str = "EXECUTOR"
session_ttl_hours: int = 2
@dataclass
class OrchestratorResult:
session_id: str
status: str # completed / failed / escalated
report: Optional[str]
tasks_completed: int
tasks_failed: int
total_cost_usd: float
duration_seconds: float
models_used: dict
trace_id: str
class CompetitiveIntelligenceOrchestrator:
"""
Orquestrador completo com todas as 6 camadas integradas.
Caso de uso: análise competitiva automatizada.
"""
def __init__(
self,
config: OrchestratorConfig,
# Injeção de dependência (adapters)
llm_adapter,
state_adapter,
metrics_adapter,
notification_adapter,
audit_adapter,
):
self.config = config
# Camada 1: Planejamento
self.planner = TaskDecomposer(llm_adapter)
# Camada 2: Roteamento
self.classifier = ComplexityClassifier()
self.router = AgentRouter(llm_adapter, self.classifier)
# Camada 3: Estado
self.state_mgr = SessionManager(state_adapter)
# Camada 4: Supervisão
self.health_monitor = HealthMonitor(state_adapter)
self.retry_handler = IntelligentRetryHandler(llm_adapter)
self.replanner = AutoReplanner(llm_adapter)
# Camada 5: Governança
self.budget = BudgetMonitor(
soft_limit=config.budget_limit_usd * 0.8,
hard_limit=config.budget_limit_usd
)
self.rbac = RBACManager()
self.audit = AuditTrail(audit_adapter)
# Camada 6: Observabilidade
self.metrics = OrchestratorMetrics()
# Adapters
self.notifications = notification_adapter
self.llm = llm_adapter
async def analyze(
self,
company: str,
competitors: List[str],
objective: str
) -> OrchestratorResult:
"""
Ponto de entrada principal.
Integra todas as 6 camadas sequencialmente.
"""
session_id = f"sess_{uuid.uuid4().hex[:12]}"
start_time = time.time()
with tracer.start_as_current_span(
"orchestrator.analyze",
attributes={
"session.id": session_id,
"company": company,
"competitors.count": len(competitors),
}
) as root_span:
trace_id = format(
root_span.get_span_context().trace_id, '032x'
)
try:
# ========================
# CAMADA 5: Autorização
# ========================
with tracer.start_as_current_span("governor.authorize"):
if not self.rbac.is_authorized(
self.config.rbac_role, "execute_workflow"
):
raise PermissionError(f"Role {self.config.rbac_role} não autorizado")
self.audit.log_action(
session_id=session_id,
action="workflow_started",
agent_id="orchestrator",
details={"company": company, "competitors": competitors}
)
# ========================
# CAMADA 3: Inicializa sessão
# ========================
await self.state_mgr.create_session(session_id, {
"company": company,
"competitors": competitors,
"objective": objective,
"budget_limit": self.config.budget_limit_usd,
"department": self.config.department,
"status": "planning",
})
# ========================
# CAMADA 1: Planejamento
# ========================
with tracer.start_as_current_span("planner.decompose") as plan_span:
dag = await self.planner.decompose(
objective=objective,
context={
"company": company,
"competitors": competitors
}
)
plan_span.set_attribute("tasks.count", len(dag.tasks))
self.metrics.tasks_planned.inc(len(dag.tasks))
await self.state_mgr.update_session(session_id, {
"status": "executing",
"tasks_total": len(dag.tasks),
"tasks_completed": 0,
})
# ========================
# CAMADA 2: Execução por waves (topological order)
# ========================
all_results = {}
tasks_completed = 0
tasks_failed = 0
total_cost = 0.0
for wave_idx, wave in enumerate(dag.topological_waves()):
with tracer.start_as_current_span(
f"executor.wave_{wave_idx}",
attributes={"wave.size": len(wave)}
):
wave_results = await asyncio.gather(
*[self._execute_with_supervision(
task=t,
session_id=session_id,
context={**all_results} # resultados anteriores
) for t in wave],
return_exceptions=True
)
for task, result in zip(wave, wave_results):
if isinstance(result, Exception):
tasks_failed += 1
self.audit.log_action(
session_id=session_id,
action="task_failed",
agent_id=task.assigned_agent or "unknown",
details={"task_id": task.id, "error": str(result)}
)
else:
all_results[task.id] = result["output"]
tasks_completed += 1
total_cost += result.get("cost", 0)
self.budget.record_cost(total_cost, session_id)
await self.state_mgr.update_session(session_id, {
"tasks_completed": tasks_completed,
"cost_so_far": total_cost,
})
# ========================
# SÍNTESE FINAL (Camada 2 — agente writer)
# ========================
with tracer.start_as_current_span("synthesizer.compile"):
report = await self._synthesize_report(
company=company,
competitors=competitors,
task_results=all_results,
session_id=session_id
)
total_cost += 0.05 # custo da síntese
# ========================
# CAMADA 5: Audit final
# ========================
self.audit.log_action(
session_id=session_id,
action="workflow_completed",
agent_id="orchestrator",
details={
"tasks_completed": tasks_completed,
"tasks_failed": tasks_failed,
"total_cost": total_cost,
}
)
await self.state_mgr.update_session(session_id, {
"status": "completed",
"final_result": report[:500], # preview
"cost_so_far": total_cost,
"tasks_completed": tasks_completed,
})
# ========================
# CAMADA 6: Métricas finais
# ========================
duration = time.time() - start_time
self.metrics.record_workflow_completion(
duration=duration,
cost=total_cost,
tasks=tasks_completed,
status="success"
)
root_span.set_attribute("workflow.status", "completed")
root_span.set_attribute("workflow.cost_usd", total_cost)
root_span.set_attribute("workflow.tasks_completed", tasks_completed)
return OrchestratorResult(
session_id=session_id,
status="completed",
report=report,
tasks_completed=tasks_completed,
tasks_failed=tasks_failed,
total_cost_usd=total_cost,
duration_seconds=duration,
models_used=self.router.get_model_usage_summary(),
trace_id=trace_id,
)
except Exception as e:
await self.state_mgr.update_session(session_id, {
"status": "failed",
"error": str(e)
})
await self.notifications.send_alert(
channel="#alerts-critical",
message=f"Workflow {session_id} falhou: {e}",
severity="error"
)
root_span.set_status(trace.StatusCode.ERROR, str(e))
raise
async def _execute_with_supervision(
self, task: "Task", session_id: str, context: dict
) -> dict:
"""Executa tarefa com monitoramento de saúde e retry inteligente."""
with tracer.start_as_current_span(
"agent.execute",
attributes={"task.id": task.id, "task.type": task.type}
) as span:
# Camada 2: Roteamento — qual modelo usar?
classification = await self.classifier.classify(task.__dict__)
agent_id = f"agent_{task.type}_{classification.complexity.value}"
span.set_attribute("agent.id", agent_id)
span.set_attribute("llm.model", classification.model)
# Camada 4: Heartbeat
self.health_monitor.register_agent(agent_id, session_id)
for attempt in range(self.config.max_retries + 1):
try:
self.health_monitor.heartbeat(agent_id)
# Constrói prompt mínimo com contexto relevante
relevant_context = {
k: v for k, v in context.items()
if k in task.depends_on
}
prompt = f"""Execute esta tarefa:
Tipo: {task.type}
Descrição: {task.description}
Contexto relevante: {str(relevant_context)[:1000]}
Produza apenas o resultado, sem explicações."""
result = await self.llm.complete(
prompt=prompt,
model=classification.model
)
cost = classification.estimated_cost
self.metrics.llm_calls_total.labels(
model=classification.model,
agent_id=agent_id,
success="true"
).inc()
return {"output": result, "cost": cost, "model": classification.model}
except Exception as e:
if attempt >= self.config.max_retries:
raise
# Camada 4: Retry inteligente (escala modelo se necessário)
escalated_model = self.retry_handler.get_escalated_model(
classification.model, attempt
)
classification.model = escalated_model
await asyncio.sleep(2 ** attempt) # exponential backoff
async def _synthesize_report(
self,
company: str,
competitors: List[str],
task_results: dict,
session_id: str,
) -> str:
"""Síntese final usando modelo mais poderoso."""
results_text = "\n\n".join([
f"### {task_id}\n{result}"
for task_id, result in task_results.items()
])
prompt = f"""Você é um consultor estratégico sênior.
Produza um relatório executivo completo para {company} analisando competidores: {', '.join(competitors)}.
Dados coletados pelos agentes de pesquisa e análise:
{results_text[:8000]}
Formato obrigatório (Markdown):
# Análise Competitiva: {company}
## Executive Summary
[3 bullets de insight mais importantes]
## Landscape Competitivo
[Tabela comparativa dos competidores]
## Posicionamento e Gaps
[Onde {company} se diferencia e onde está atrás]
## Análise SWOT
[SWOT comparativo focado em produto]
## Oportunidades de Diferenciação
[Top 5 oportunidades concretas]
## Recomendações e Roadmap (90 dias)
[Actions priorizadas por impacto]
## Apêndice: Dados de Suporte
[Dados relevantes das pesquisas]"""
report = await self.llm.complete(
prompt=prompt,
model="claude-opus-4-5", # síntese final merece Opus
max_tokens=4096
)
return report
Testes — Unit, Integration e E2E
# tests/unit/test_orchestrator.py — Testes sem infraestrutura
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
from src.core.orchestrator import (
CompetitiveIntelligenceOrchestrator,
OrchestratorConfig
)
@pytest.fixture
def mock_deps():
"""Cria dependências mockadas para testes unitários."""
llm = AsyncMock()
llm.complete.return_value = """
{
"tasks": [
{"id": "t1", "type": "research", "description": "Pesquisar Notion", "depends_on": []},
{"id": "t2", "type": "analysis", "description": "Analisar dados", "depends_on": ["t1"]}
]
}"""
llm.estimate_cost.return_value = 0.001
state = AsyncMock()
state.get_task_state.return_value = None
state.save_task_state.return_value = None
metrics = MagicMock()
notifications = AsyncMock()
audit = MagicMock()
return llm, state, metrics, notifications, audit
@pytest.fixture
def orchestrator(mock_deps):
llm, state, metrics, notifications, audit = mock_deps
config = OrchestratorConfig(budget_limit_usd=10.0)
return CompetitiveIntelligenceOrchestrator(
config=config,
llm_adapter=llm,
state_adapter=state,
metrics_adapter=metrics,
notification_adapter=notifications,
audit_adapter=audit
)
@pytest.mark.asyncio
async def test_analyze_creates_session(orchestrator, mock_deps):
"""Verifica que analyze() cria uma sessão no state manager."""
llm, state, *_ = mock_deps
# Configura LLM para retornar relatório na síntese
llm.complete.return_value = "# Análise Competitiva\n..."
result = await orchestrator.analyze(
company="MinhaSaaS",
competitors=["Notion"],
objective="Entender diferenciação"
)
assert result.session_id.startswith("sess_")
assert state.save_task_state.called
@pytest.mark.asyncio
async def test_budget_hard_limit_stops_execution(mock_deps):
"""Verifica que budget hard limit interrompe o workflow."""
llm, state, metrics, notifications, audit = mock_deps
# Budget zero → deve falhar imediatamente
config = OrchestratorConfig(budget_limit_usd=0.0)
orchestrator = CompetitiveIntelligenceOrchestrator(
config=config,
llm_adapter=llm,
state_adapter=state,
metrics_adapter=metrics,
notification_adapter=notifications,
audit_adapter=audit
)
with pytest.raises(Exception):
await orchestrator.analyze(
company="Test",
competitors=["Comp1"],
objective="test"
)
@pytest.mark.asyncio
async def test_retry_on_llm_failure(orchestrator, mock_deps):
"""Verifica retry com escalação de modelo após falha."""
llm, *_ = mock_deps
# Primeira chamada falha, segunda tem sucesso
llm.complete.side_effect = [
Exception("Rate limit"),
"# Análise\n..."
]
result = await orchestrator.analyze(
company="Test",
competitors=["Comp1"],
objective="test"
)
assert llm.complete.call_count >= 2
# ============================================================
# TESTE DE INTEGRAÇÃO (requer Docker)
# ============================================================
# tests/integration/test_redis_state.py
@pytest.mark.integration
@pytest.mark.asyncio
async def test_session_survives_restart():
"""Verifica que sessão sobrevive a restart do worker."""
from src.adapters.redis_state import RedisStateAdapter
adapter = RedisStateAdapter("redis://localhost:6379")
session_id = "test_sess_001"
await adapter.save_task_state(session_id, "meta", {
"status": "executing",
"tasks_completed": 3,
})
# Simula restart: cria nova instância do adapter
new_adapter = RedisStateAdapter("redis://localhost:6379")
state = await new_adapter.get_task_state(session_id, "meta")
assert state["status"] == "executing"
assert state["tasks_completed"] == 3
# Cleanup
import redis.asyncio as aioredis
r = aioredis.from_url("redis://localhost:6379")
await r.delete(f"session:{session_id}:task:meta")
Deploy em Produção — Dockerfile e CI/CD
# Dockerfile — Build otimizado com multi-stage
FROM python:3.11-slim AS builder
WORKDIR /app
RUN pip install uv
COPY pyproject.toml .
RUN uv pip install --system -r pyproject.toml --no-dev
FROM python:3.11-slim AS runtime
WORKDIR /app
# Copia apenas o necessário do builder
COPY --from=builder /usr/local/lib/python3.11 /usr/local/lib/python3.11
COPY --from=builder /usr/local/bin /usr/local/bin
# Copia código fonte
COPY src/ ./src/
# Usuário não-root para segurança
RUN useradd -r -u 1001 appuser
USER appuser
# Variáveis de ambiente defaults (sobrescrever em prod)
ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1
ENV PORT=8000
EXPOSE $PORT
EXPOSE 8001 # Prometheus metrics
CMD ["uvicorn", "src.api.main:app", "--host", "0.0.0.0", "--port", "8000",
"--workers", "2", "--log-config", "log_config.json"]
---
# Dockerfile.worker — Worker separado
FROM python:3.11-slim AS runtime
WORKDIR /app
COPY --from=builder /usr/local/lib/python3.11 /usr/local/lib/python3.11
COPY src/ ./src/
RUN useradd -r -u 1001 appuser
USER appuser
CMD ["python", "-m", "src.workers.task_worker"]
---
# .github/workflows/deploy.yml — CI/CD com GitHub Actions
name: Deploy to Production
on:
push:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
redis:
image: redis:7-alpine
ports: ["6379:6379"]
postgres:
image: postgres:16-alpine
env:
POSTGRES_DB: test_db
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
ports: ["5432:5432"]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install uv && uv pip install --system -e ".[dev]"
- run: pytest tests/unit -v --cov=src --cov-report=xml
- run: pytest tests/integration -v -m integration
env:
REDIS_URL: redis://localhost:6379
DATABASE_URL: postgresql://user:pass@localhost:5432/test_db
build-and-push:
needs: test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- uses: docker/build-push-action@v5
with:
push: true
tags: ghcr.io/${{ github.repository }}:latest,${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy:
needs: build-and-push
runs-on: ubuntu-latest
steps:
- name: Deploy via SSH
uses: appleboy/ssh-action@v1.0.0
with:
host: ${{ secrets.PROD_HOST }}
username: deploy
key: ${{ secrets.PROD_SSH_KEY }}
script: |
cd /opt/orchestrator
docker compose pull
docker compose up -d --remove-orphans
docker compose exec orchestrator-api python -m pytest tests/e2e -v
echo "Deploy concluído: $(date)"
Consolidação — O que Você Construiu na Trilha 5
Ao concluir esta trilha, você dominou a construção de um orquestrador de IA de nível de produção. Cada módulo foi uma camada que se soma às anteriores, formando um sistema coeso e robusto.
Transformou objetivos vagos em DAGs de tarefas com dependências, prioridades e waves de execução paralela.
StaticRouter → LLMRouter → AgentPool com load balancing e CostAwareRouter para roteamento por custo e complexidade.
Redis para estado efêmero (HSET, ZADD, pub/sub), Postgres para persistência e SessionManager com checkpoint/resume.
Heartbeat, SLA monitoring, stall detection com fingerprinting, retry inteligente com escalação de modelo e auto-replanejamento.
Logs estruturados JSON, audit trail imutável com hash chains SHA-256, RBAC para agentes, budget monitoring com limites hard/soft.
Prometheus + Grafana para métricas, SLIs/SLOs com error budgets, OpenTelemetry para distributed tracing, correlação logs/metrics/traces.
Números do que você construiu
Próximos Passos — Trilha 6
Na Trilha 6, você aprenderá como operacionalizar e escalar seu orquestrador: Kubernetes com auto-scaling baseado em fila, multi-tenancy, disaster recovery, compliance (SOC2, GDPR), e como construir um produto SaaS completo a partir do seu orquestrador.
Trilha 5 Concluída!
Você dominou a construção de orquestradores de IA de nível de produção. Planejamento, roteamento, estado, supervisão, governança e observabilidade — as 6 camadas que transformam um script simples em um sistema resiliente, auditável e eficiente em custo.