Visão Geral da Arquitetura Hexagonal
A arquitetura hexagonal (ports and adapters) separa o núcleo de lógica de negócio das dependências externas. Para um orquestrador de IA, isso significa que a lógica de planejamento e roteamento não conhece Redis, Postgres ou Prometheus diretamente — ela fala com interfaces (ports).
- • Trocar Redis por DynamoDB: só muda o Adapter
- • Testar sem infraestrutura: usa Mock adapters
- • Múltiplos LLMs: mesmo Router, Adapters diferentes
- • Deploy gradual: core estável, adapters evoluem
- • LangGraph — grafo de execução (C1/C4)
- • CrewAI — times de agentes especializados
- • Redis 7 — estado efêmero, pub/sub, queues
- • Postgres 16 — persistência, audit trail
- • Prometheus + Grafana — métricas/traces
LangGraph — Orquestração como Grafo de Estado
LangGraph modela o workflow do orquestrador como um grafo direcionado de estados. Cada nó é uma função que transforma o estado. Edges podem ser condicionais (baseados em resultado de LLM). É ideal para a Camada 1 (Planejamento) e Camada 4 (Supervisão/Replanejamento).
# pip install langgraph langchain-anthropic
from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, List, Annotated, Optional
import operator
import asyncio
# ============================================================
# DEFINIÇÃO DE ESTADO DO GRAFO
# ============================================================
class OrchestratorState(TypedDict):
"""Estado compartilhado entre todos os nós do grafo."""
session_id: str
objective: str
messages: Annotated[List, operator.add] # append-only
plan: Optional[dict]
current_tasks: List[dict]
completed_tasks: List[dict]
failed_tasks: List[dict]
retry_count: int
final_result: Optional[str]
should_escalate: bool
budget_remaining: float
# ============================================================
# NÓS DO GRAFO (cada nó é uma função async)
# ============================================================
llm_planner = ChatAnthropic(model="claude-3-haiku-20240307")
llm_executor = ChatAnthropic(model="claude-3-sonnet-20240229")
llm_supervisor = ChatAnthropic(model="claude-3-haiku-20240307")
async def plan_node(state: OrchestratorState) -> dict:
"""Nó 1: Planejamento — decompõe o objetivo em tarefas."""
response = await llm_planner.ainvoke([
HumanMessage(content=f"""
Decomponha o objetivo em 3-7 tarefas paralelas.
Objetivo: {state['objective']}
Responda em JSON:
{{"tasks": [{{"id": "t1", "type": "research", "target": "...", "priority": 1}}]}}
""")
])
import json, re
match = re.search(r'\{.*\}', response.content, re.DOTALL)
plan = json.loads(match.group()) if match else {"tasks": []}
return {
"plan": plan,
"current_tasks": plan.get("tasks", []),
"messages": [AIMessage(content=f"Plano criado: {len(plan.get('tasks', []))} tarefas")]
}
async def execute_node(state: OrchestratorState) -> dict:
"""Nó 2: Execução — processa as tarefas pendentes."""
tasks = state.get("current_tasks", [])
if not tasks:
return {"current_tasks": [], "completed_tasks": state.get("completed_tasks", [])}
# Executa primeiro task (em prod: paraleliza com asyncio.gather)
task = tasks[0]
try:
response = await llm_executor.ainvoke([
HumanMessage(content=f"Execute esta tarefa: {task}")
])
completed = state.get("completed_tasks", []) + [{**task, "result": response.content}]
return {
"current_tasks": tasks[1:],
"completed_tasks": completed,
"messages": [AIMessage(content=f"Tarefa {task['id']} concluída")]
}
except Exception as e:
failed = state.get("failed_tasks", []) + [{**task, "error": str(e)}]
return {
"current_tasks": tasks[1:],
"failed_tasks": failed,
}
async def supervise_node(state: OrchestratorState) -> dict:
"""Nó 3: Supervisão — verifica qualidade e decide próximo passo."""
failed = state.get("failed_tasks", [])
retry = state.get("retry_count", 0)
if failed and retry < 2:
# Reagenda tasks falhas
return {
"current_tasks": state.get("current_tasks", []) + failed,
"failed_tasks": [],
"retry_count": retry + 1,
"messages": [AIMessage(content=f"Reagendando {len(failed)} tarefas (retry {retry+1})")]
}
if failed and retry >= 2:
return {"should_escalate": True}
return {}
async def synthesize_node(state: OrchestratorState) -> dict:
"""Nó 4: Síntese — gera resultado final."""
completed = state.get("completed_tasks", [])
results_summary = "\n".join([f"- {t['id']}: {str(t.get('result',''))[:200]}" for t in completed])
response = await llm_executor.ainvoke([
HumanMessage(content=f"""
Sintetize os resultados das tarefas abaixo em um relatório coeso.
Objetivo original: {state['objective']}
Resultados:
{results_summary}
""")
])
return {"final_result": response.content}
# ============================================================
# EDGES CONDICIONAIS
# ============================================================
def route_after_execute(state: OrchestratorState) -> str:
"""Decide próximo nó após execução."""
pending = state.get("current_tasks", [])
if pending:
return "execute" # ainda tem tarefas → continua executando
return "supervise" # acabou → vai supervisionar
def route_after_supervise(state: OrchestratorState) -> str:
if state.get("should_escalate"):
return END # escala para humano → encerra
pending = state.get("current_tasks", [])
if pending:
return "execute" # retry tasks → executa de novo
return "synthesize" # tudo ok → sintetiza
# ============================================================
# CONSTRUÇÃO DO GRAFO
# ============================================================
def build_orchestrator_graph() -> StateGraph:
graph = StateGraph(OrchestratorState)
# Adiciona nós
graph.add_node("plan", plan_node)
graph.add_node("execute", execute_node)
graph.add_node("supervise", supervise_node)
graph.add_node("synthesize", synthesize_node)
# Edges fixos
graph.set_entry_point("plan")
graph.add_edge("plan", "execute")
# Edges condicionais
graph.add_conditional_edges(
"execute",
route_after_execute,
{"execute": "execute", "supervise": "supervise"}
)
graph.add_conditional_edges(
"supervise",
route_after_supervise,
{"execute": "execute", "synthesize": "synthesize", END: END}
)
graph.add_edge("synthesize", END)
return graph.compile()
# Uso:
# app = build_orchestrator_graph()
# result = await app.ainvoke({
# "session_id": "sess_001",
# "objective": "Análise competitiva do mercado SaaS B2B",
# "messages": [], "plan": None, "current_tasks": [],
# "completed_tasks": [], "failed_tasks": [],
# "retry_count": 0, "final_result": None,
# "should_escalate": False, "budget_remaining": 10.0
# })
CrewAI — Times de Agentes Especializados
CrewAI modela times de agentes com papéis, objetivos e ferramentas específicas. É ideal para tarefas que mapeiam naturalmente para "quem faz o quê" — um time de pesquisa + analista + redator, cada um com suas responsabilidades.
# pip install crewai crewai-tools
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, ScrapeWebsiteTool
from langchain_anthropic import ChatAnthropic
# ============================================================
# FERRAMENTAS DOS AGENTES
# ============================================================
web_search = SerperDevTool()
web_scraper = ScrapeWebsiteTool()
# Modelo para cada papel
haiku = ChatAnthropic(model="claude-3-haiku-20240307")
sonnet = ChatAnthropic(model="claude-3-sonnet-20240229")
opus = ChatAnthropic(model="claude-3-opus-20240229")
# ============================================================
# DEFINIÇÃO DO TIME
# ============================================================
class CompetitiveAnalysisCrew:
"""Time especializado em análise competitiva."""
def build_agents(self):
"""Cria os agentes do time."""
# Agente 1: Pesquisador — busca dados brutos
researcher = Agent(
role="Pesquisador de Mercado Sênior",
goal="Coletar dados precisos e atuais sobre competidores",
backstory="""Especialista em pesquisa de mercado com 10 anos
de experiência em SaaS B2B. Extremamente focado em fatos
verificáveis e fontes primárias.""",
tools=[web_search, web_scraper],
llm=haiku, # modelo mais barato para pesquisa
verbose=True,
max_iter=5,
memory=True, # lembra do contexto anterior
)
# Agente 2: Analista — interpreta os dados
analyst = Agent(
role="Analista Estratégico",
goal="Transformar dados brutos em insights acionáveis",
backstory="""MBA pela Wharton, ex-McKinsey. Especialista em
análise SWOT, Porter's Five Forces e posicionamento de mercado.
Identifica padrões que outros não veem.""",
tools=[], # análise é intelectual, não usa ferramentas
llm=sonnet, # modelo médio para análise
verbose=True,
allow_delegation=True, # pode delegar pesquisa adicional
)
# Agente 3: Redator — produz o relatório final
writer = Agent(
role="Redator Executivo",
goal="Produzir relatório claro e persuasivo para C-level",
backstory="""Especialista em comunicação executiva. Transforma
análises complexas em narrativas claras com recommendations
práticas e priorizadas por impacto.""",
tools=[],
llm=opus, # modelo mais potente para síntese
verbose=True,
)
return researcher, analyst, writer
def build_tasks(self, objective: str, companies: list, agents: tuple):
"""Cria as tarefas do time."""
researcher, analyst, writer = agents
research_task = Task(
description=f"""
Pesquise os seguintes competidores: {', '.join(companies)}
Para cada um, colete:
1. Modelo de negócio e pricing
2. Funcionalidades principais vs. secundárias
3. Posicionamento e mensagem de marketing
4. Reviews de clientes (G2, Capterra)
5. Notícias recentes (últimos 6 meses)
6. Estimativa de tamanho de time e funding
Objetivo geral: {objective}
""",
expected_output="Relatório de pesquisa estruturado com dados verificáveis por competidor",
agent=researcher,
)
analysis_task = Task(
description=f"""
Com base nos dados de pesquisa fornecidos:
1. Mapa de posicionamento (eixos preço vs. funcionalidade)
2. SWOT comparativo (nossa empresa vs. cada competidor)
3. Gaps de mercado não atendidos
4. Top 3 ameaças imediatas e top 3 oportunidades
5. Forecast: quem vai ganhar nos próximos 2 anos e por quê
Objetivo: {objective}
""",
expected_output="Análise estratégica com insights priorizados e evidenciados",
agent=analyst,
context=[research_task], # depende da pesquisa
)
report_task = Task(
description="""
Produza um relatório executivo de 5 páginas:
- Executive Summary (1 parágrafo, 3 bullets de recomendação)
- Landscape Competitivo (tabela visual)
- Análise SWOT
- Oportunidades de diferenciação
- Roadmap de resposta recomendado (90 dias)
- Apêndice: dados de suporte
Formato: Markdown estruturado, pronto para Notion/Confluence.
""",
expected_output="Relatório executivo completo em Markdown",
agent=writer,
context=[research_task, analysis_task],
)
return [research_task, analysis_task, report_task]
def run(self, objective: str, companies: list) -> str:
agents = self.build_agents()
tasks = self.build_tasks(objective, companies, agents)
crew = Crew(
agents=list(agents),
tasks=tasks,
process=Process.sequential, # researcher → analyst → writer
verbose=True,
memory=True, # memória compartilhada entre agentes
embedder={
"provider": "anthropic",
"config": {"model": "voyage-3"}
}
)
result = crew.kickoff()
return result.raw
# Uso:
# crew = CompetitiveAnalysisCrew()
# report = crew.run(
# objective="Entender posição competitiva no mercado de AI assistants B2B",
# companies=["Notion AI", "Jasper AI", "Copy.ai", "Writer.com"]
# )
Ports e Adapters — Interface entre Core e Infraestrutura
A chave da arquitetura hexagonal é definir interfaces (ports) que o core usa, e implementações (adapters) que conectam à infraestrutura real. O core nunca importa Redis, Postgres ou httpx diretamente.
from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from dataclasses import dataclass
# ============================================================
# PORTS (interfaces que o core conhece)
# ============================================================
class StatePort(ABC):
"""Port para gerenciamento de estado."""
@abstractmethod
async def save_task_state(self, session_id: str, task_id: str, state: dict) -> None:
pass
@abstractmethod
async def get_task_state(self, session_id: str, task_id: str) -> Optional[dict]:
pass
@abstractmethod
async def list_pending_tasks(self, session_id: str) -> List[str]:
pass
class LLMPort(ABC):
"""Port para chamadas LLM."""
@abstractmethod
async def complete(
self, messages: List[dict], model: str, max_tokens: int = 2048
) -> str:
pass
@abstractmethod
def estimate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
pass
class MetricsPort(ABC):
"""Port para emissão de métricas."""
@abstractmethod
def increment_counter(self, name: str, labels: dict, value: float = 1.0) -> None:
pass
@abstractmethod
def observe_histogram(self, name: str, value: float, labels: dict) -> None:
pass
@abstractmethod
def set_gauge(self, name: str, value: float, labels: dict) -> None:
pass
class NotificationPort(ABC):
"""Port para notificações (Slack, email, etc)."""
@abstractmethod
async def send_alert(self, channel: str, message: str, severity: str) -> None:
pass
# ============================================================
# ADAPTERS CONCRETOS
# ============================================================
import redis.asyncio as aioredis
class RedisStateAdapter(StatePort):
"""Adapter de estado usando Redis."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = aioredis.from_url(redis_url, decode_responses=True)
async def save_task_state(self, session_id: str, task_id: str, state: dict) -> None:
import json
key = f"session:{session_id}:task:{task_id}"
await self.redis.setex(key, 3600, json.dumps(state))
async def get_task_state(self, session_id: str, task_id: str) -> Optional[dict]:
import json
key = f"session:{session_id}:task:{task_id}"
data = await self.redis.get(key)
return json.loads(data) if data else None
async def list_pending_tasks(self, session_id: str) -> List[str]:
pattern = f"session:{session_id}:task:*"
keys = await self.redis.keys(pattern)
return [k.split(":")[-1] for k in keys]
class MockStateAdapter(StatePort):
"""Adapter de estado em memória para testes."""
def __init__(self):
self._store: Dict[str, dict] = {}
async def save_task_state(self, session_id: str, task_id: str, state: dict) -> None:
self._store[f"{session_id}:{task_id}"] = state
async def get_task_state(self, session_id: str, task_id: str) -> Optional[dict]:
return self._store.get(f"{session_id}:{task_id}")
async def list_pending_tasks(self, session_id: str) -> List[str]:
prefix = f"{session_id}:"
return [k.split(":")[1] for k in self._store if k.startswith(prefix)]
class AnthropicLLMAdapter(LLMPort):
"""Adapter LLM para Anthropic Claude."""
MODEL_COSTS = {
"claude-3-haiku-20240307": {"input": 0.25, "output": 1.25},
"claude-3-sonnet-20240229": {"input": 3.0, "output": 15.0},
"claude-3-opus-20240229": {"input": 15.0, "output": 75.0},
}
def __init__(self, api_key: str):
import anthropic
self.client = anthropic.AsyncAnthropic(api_key=api_key)
async def complete(self, messages: List[dict], model: str, max_tokens: int = 2048) -> str:
resp = await self.client.messages.create(
model=model,
max_tokens=max_tokens,
messages=messages
)
return resp.content[0].text
def estimate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
costs = self.MODEL_COSTS.get(model, {"input": 3.0, "output": 15.0})
return (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000
class PrometheusMetricsAdapter(MetricsPort):
"""Adapter de métricas para Prometheus."""
def __init__(self):
from prometheus_client import Counter, Histogram, Gauge
self._counters: Dict[str, Counter] = {}
self._histograms: Dict[str, Histogram] = {}
self._gauges: Dict[str, Gauge] = {}
def increment_counter(self, name: str, labels: dict, value: float = 1.0) -> None:
from prometheus_client import Counter
if name not in self._counters:
self._counters[name] = Counter(name, name, list(labels.keys()))
self._counters[name].labels(**labels).inc(value)
def observe_histogram(self, name: str, value: float, labels: dict) -> None:
from prometheus_client import Histogram
if name not in self._histograms:
self._histograms[name] = Histogram(name, name, list(labels.keys()))
self._histograms[name].labels(**labels).observe(value)
def set_gauge(self, name: str, value: float, labels: dict) -> None:
from prometheus_client import Gauge
if name not in self._gauges:
self._gauges[name] = Gauge(name, name, list(labels.keys()))
self._gauges[name].labels(**labels).set(value)
# ============================================================
# DEPENDENCY INJECTION — montagem da aplicação
# ============================================================
@dataclass
class OrchestratorDeps:
"""Container de dependências — injetado no core."""
state: StatePort
llm: LLMPort
metrics: MetricsPort
notifications: NotificationPort
def create_production_deps(config: dict) -> OrchestratorDeps:
"""Factory para dependências de produção."""
# Import aqui para não quebrar em testes
from slack_adapter import SlackNotificationAdapter
return OrchestratorDeps(
state=RedisStateAdapter(config["redis_url"]),
llm=AnthropicLLMAdapter(config["anthropic_api_key"]),
metrics=PrometheusMetricsAdapter(),
notifications=SlackNotificationAdapter(config["slack_webhook"])
)
def create_test_deps() -> OrchestratorDeps:
"""Factory para dependências de teste (sem infraestrutura)."""
from unittest.mock import AsyncMock
mock_notif = AsyncMock(spec=NotificationPort)
mock_llm = AsyncMock(spec=LLMPort)
mock_llm.complete.return_value = '{"tasks": [{"id": "t1", "type": "research"}]}'
mock_llm.estimate_cost.return_value = 0.001
return OrchestratorDeps(
state=MockStateAdapter(),
llm=mock_llm,
metrics=type('NoopMetrics', (MetricsPort,), {
'increment_counter': lambda *a, **k: None,
'observe_histogram': lambda *a, **k: None,
'set_gauge': lambda *a, **k: None,
})(),
notifications=mock_notif
)
FastAPI — Entry Point e API REST do Orquestrador
A API REST é o driver principal do orquestrador — o ponto de entrada para sistemas externos. FastAPI com WebSocket para updates em tempo real e endpoints para gestão de sessões.
# pip install fastapi uvicorn websockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import asyncio
import uuid
import os
app = FastAPI(
title="AI Orchestrator API",
version="1.0.0",
docs_url="/docs"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# Dependency injection via FastAPI
def get_deps() -> OrchestratorDeps:
config = {
"redis_url": os.getenv("REDIS_URL", "redis://localhost:6379"),
"anthropic_api_key": os.getenv("ANTHROPIC_API_KEY"),
"slack_webhook": os.getenv("SLACK_WEBHOOK_URL"),
}
return create_production_deps(config)
# ============================================================
# MODELOS DE REQUEST/RESPONSE
# ============================================================
class WorkflowRequest(BaseModel):
objective: str
priority: str = "normal" # low / normal / high / critical
department: str = "default"
max_budget_usd: float = 5.0
companies: Optional[list] = None # para análise competitiva
class WorkflowResponse(BaseModel):
session_id: str
status: str
message: str
class SessionStatus(BaseModel):
session_id: str
status: str # planning / executing / completed / failed
progress_pct: int
tasks_completed: int
tasks_total: int
cost_so_far: float
elapsed_seconds: float
result: Optional[str]
# ============================================================
# ENDPOINTS
# ============================================================
@app.post("/workflows", response_model=WorkflowResponse)
async def create_workflow(
request: WorkflowRequest,
deps: OrchestratorDeps = Depends(get_deps)
):
"""Cria e inicia um novo workflow de orquestração."""
session_id = f"sess_{uuid.uuid4().hex[:12]}"
# Salva estado inicial
await deps.state.save_task_state(session_id, "meta", {
"objective": request.objective,
"status": "planning",
"priority": request.priority,
"department": request.department,
"budget_limit": request.max_budget_usd,
"started_at": __import__('time').time()
})
# Inicia workflow em background
asyncio.create_task(
run_workflow(session_id, request, deps)
)
deps.metrics.increment_counter(
"orchestrator_workflows_created",
{"priority": request.priority, "department": request.department}
)
return WorkflowResponse(
session_id=session_id,
status="started",
message=f"Workflow iniciado. Acompanhe em /sessions/{session_id}"
)
@app.get("/sessions/{session_id}", response_model=SessionStatus)
async def get_session_status(
session_id: str,
deps: OrchestratorDeps = Depends(get_deps)
):
"""Retorna status atual de uma sessão."""
meta = await deps.state.get_task_state(session_id, "meta")
if not meta:
raise HTTPException(status_code=404, detail="Sessão não encontrada")
import time
return SessionStatus(
session_id=session_id,
status=meta.get("status", "unknown"),
progress_pct=meta.get("progress_pct", 0),
tasks_completed=meta.get("tasks_completed", 0),
tasks_total=meta.get("tasks_total", 0),
cost_so_far=meta.get("cost_so_far", 0.0),
elapsed_seconds=time.time() - meta.get("started_at", time.time()),
result=meta.get("final_result")
)
@app.websocket("/sessions/{session_id}/stream")
async def stream_session(
websocket: WebSocket,
session_id: str,
deps: OrchestratorDeps = Depends(get_deps)
):
"""WebSocket para receber updates em tempo real."""
await websocket.accept()
try:
while True:
meta = await deps.state.get_task_state(session_id, "meta")
if meta:
await websocket.send_json(meta)
if meta.get("status") in ["completed", "failed", "escalated"]:
break
await asyncio.sleep(2) # poll a cada 2 segundos
except WebSocketDisconnect:
pass
@app.get("/health")
async def health():
return {"status": "ok", "version": "1.0.0"}
async def run_workflow(session_id: str, request: WorkflowRequest, deps: OrchestratorDeps):
"""Executa workflow completo em background."""
# Em prod: instancia o grafo LangGraph e executa
# orchestrator = build_orchestrator_graph()
# result = await orchestrator.ainvoke(initial_state)
pass
Docker Compose — Deploy Completo do Stack
Com toda a arquitetura definida, o deploy em produção envolve orquestrar todos os componentes: a API, os workers assíncronos, Redis, Postgres e o stack de observabilidade.
# docker-compose.prod.yml — Deploy completo
version: '3.8'
services:
# API do orquestrador
orchestrator-api:
build:
context: .
dockerfile: Dockerfile
ports:
- "8000:8000"
- "8001:8001" # Prometheus metrics
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@postgres:5432/orchestrator
- SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL}
- PROMETHEUS_PORT=8001
- ENVIRONMENT=production
depends_on:
- redis
- postgres
restart: unless-stopped
networks:
- app
- monitoring
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
interval: 30s
timeout: 10s
retries: 3
# Workers assíncronos para processamento de tasks
orchestrator-worker:
build:
context: .
dockerfile: Dockerfile.worker
command: python worker.py
environment:
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
- DATABASE_URL=postgresql://user:pass@postgres:5432/orchestrator
depends_on:
- redis
- postgres
restart: unless-stopped
deploy:
replicas: 3 # 3 workers paralelos
networks:
- app
# Redis 7 com persistência
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
- ./redis.conf:/etc/redis/redis.conf
command: redis-server /etc/redis/redis.conf
restart: unless-stopped
networks:
- app
# Postgres 16
postgres:
image: postgres:16-alpine
environment:
- POSTGRES_DB=orchestrator
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
restart: unless-stopped
networks:
- app
# Observabilidade
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
networks:
- monitoring
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
networks:
- monitoring
tempo:
image: grafana/tempo:latest
volumes:
- ./tempo.yml:/etc/tempo.yml
- tempo_data:/tmp/tempo
networks:
- monitoring
loki:
image: grafana/loki:latest
volumes:
- loki_data:/loki
networks:
- monitoring
# Nginx como reverse proxy
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./certs:/etc/nginx/certs
depends_on:
- orchestrator-api
networks:
- app
volumes:
redis_data:
postgres_data:
prometheus_data:
grafana_data:
tempo_data:
loki_data:
networks:
app:
driver: bridge
monitoring:
driver: bridge
Resumo do Módulo 5.9
- • Arquitetura hexagonal: core puro de domínio + adapters para infraestrutura — testável e evolutiva
- • LangGraph para workflows com branching condicional (replanejamento, retry, escalada)
- • CrewAI para times especializados com papéis, memória compartilhada e delegação
- • Ports/Adapters: StatePort, LLMPort, MetricsPort — trocar Redis por DynamoDB = 1 arquivo
- • FastAPI com WebSocket para updates em tempo real + REST para integração
- • Docker Compose com 3 workers paralelos, Redis persistente, Postgres, e stack de obs completo