MÓDULO 5.9 Arquitetura Completa Integração

🏛️ Arquitetura Composta — LangGraph + CrewAI + Redis + Postgres

Integração das 6 camadas em uma arquitetura hexagonal completa. LangGraph para orquestração de grafos, CrewAI para times de agentes, Redis para estado efêmero, Postgres para persistência, Prometheus para métricas.

6
Seções
~60min
Duração
Avançado
Nível
Stack
Completo
1

Visão Geral da Arquitetura Hexagonal

A arquitetura hexagonal (ports and adapters) separa o núcleo de lógica de negócio das dependências externas. Para um orquestrador de IA, isso significa que a lógica de planejamento e roteamento não conhece Redis, Postgres ou Prometheus diretamente — ela fala com interfaces (ports).

ARQUITETURA HEXAGONAL DO ORQUESTRADOR
DRIVERS (Input)
FastAPI REST
WebSocket
CLI Interface
Task Queue
CORE (Domain)
Planner (C1)
Router (C2)
StateManager (C3)
Supervisor (C4)
Governor (C5)
Observer (C6)
DRIVEN (Output)
Redis Adapter
Postgres Adapter
Prometheus Adapter
LLM Adapter
Slack Adapter
Ports são interfaces Python (Protocol/ABC). Adapters são implementações concretas.
Por que Hexagonal?
  • • Trocar Redis por DynamoDB: só muda o Adapter
  • • Testar sem infraestrutura: usa Mock adapters
  • • Múltiplos LLMs: mesmo Router, Adapters diferentes
  • • Deploy gradual: core estável, adapters evoluem
Stack de Referência
  • • LangGraph — grafo de execução (C1/C4)
  • • CrewAI — times de agentes especializados
  • • Redis 7 — estado efêmero, pub/sub, queues
  • • Postgres 16 — persistência, audit trail
  • • Prometheus + Grafana — métricas/traces
2

LangGraph — Orquestração como Grafo de Estado

LangGraph modela o workflow do orquestrador como um grafo direcionado de estados. Cada nó é uma função que transforma o estado. Edges podem ser condicionais (baseados em resultado de LLM). É ideal para a Camada 1 (Planejamento) e Camada 4 (Supervisão/Replanejamento).

# pip install langgraph langchain-anthropic

from langgraph.graph import StateGraph, END
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, AIMessage
from typing import TypedDict, List, Annotated, Optional
import operator
import asyncio

# ============================================================
# DEFINIÇÃO DE ESTADO DO GRAFO
# ============================================================

class OrchestratorState(TypedDict):
    """Estado compartilhado entre todos os nós do grafo."""
    session_id: str
    objective: str
    messages: Annotated[List, operator.add]  # append-only
    plan: Optional[dict]
    current_tasks: List[dict]
    completed_tasks: List[dict]
    failed_tasks: List[dict]
    retry_count: int
    final_result: Optional[str]
    should_escalate: bool
    budget_remaining: float


# ============================================================
# NÓS DO GRAFO (cada nó é uma função async)
# ============================================================

llm_planner = ChatAnthropic(model="claude-3-haiku-20240307")
llm_executor = ChatAnthropic(model="claude-3-sonnet-20240229")
llm_supervisor = ChatAnthropic(model="claude-3-haiku-20240307")


async def plan_node(state: OrchestratorState) -> dict:
    """Nó 1: Planejamento — decompõe o objetivo em tarefas."""
    response = await llm_planner.ainvoke([
        HumanMessage(content=f"""
Decomponha o objetivo em 3-7 tarefas paralelas.
Objetivo: {state['objective']}

Responda em JSON:
{{"tasks": [{{"id": "t1", "type": "research", "target": "...", "priority": 1}}]}}
        """)
    ])

    import json, re
    match = re.search(r'\{.*\}', response.content, re.DOTALL)
    plan = json.loads(match.group()) if match else {"tasks": []}

    return {
        "plan": plan,
        "current_tasks": plan.get("tasks", []),
        "messages": [AIMessage(content=f"Plano criado: {len(plan.get('tasks', []))} tarefas")]
    }


async def execute_node(state: OrchestratorState) -> dict:
    """Nó 2: Execução — processa as tarefas pendentes."""
    tasks = state.get("current_tasks", [])
    if not tasks:
        return {"current_tasks": [], "completed_tasks": state.get("completed_tasks", [])}

    # Executa primeiro task (em prod: paraleliza com asyncio.gather)
    task = tasks[0]
    try:
        response = await llm_executor.ainvoke([
            HumanMessage(content=f"Execute esta tarefa: {task}")
        ])
        completed = state.get("completed_tasks", []) + [{**task, "result": response.content}]
        return {
            "current_tasks": tasks[1:],
            "completed_tasks": completed,
            "messages": [AIMessage(content=f"Tarefa {task['id']} concluída")]
        }
    except Exception as e:
        failed = state.get("failed_tasks", []) + [{**task, "error": str(e)}]
        return {
            "current_tasks": tasks[1:],
            "failed_tasks": failed,
        }


async def supervise_node(state: OrchestratorState) -> dict:
    """Nó 3: Supervisão — verifica qualidade e decide próximo passo."""
    failed = state.get("failed_tasks", [])
    retry = state.get("retry_count", 0)

    if failed and retry < 2:
        # Reagenda tasks falhas
        return {
            "current_tasks": state.get("current_tasks", []) + failed,
            "failed_tasks": [],
            "retry_count": retry + 1,
            "messages": [AIMessage(content=f"Reagendando {len(failed)} tarefas (retry {retry+1})")]
        }

    if failed and retry >= 2:
        return {"should_escalate": True}

    return {}


async def synthesize_node(state: OrchestratorState) -> dict:
    """Nó 4: Síntese — gera resultado final."""
    completed = state.get("completed_tasks", [])
    results_summary = "\n".join([f"- {t['id']}: {str(t.get('result',''))[:200]}" for t in completed])

    response = await llm_executor.ainvoke([
        HumanMessage(content=f"""
Sintetize os resultados das tarefas abaixo em um relatório coeso.
Objetivo original: {state['objective']}

Resultados:
{results_summary}
        """)
    ])
    return {"final_result": response.content}


# ============================================================
# EDGES CONDICIONAIS
# ============================================================

def route_after_execute(state: OrchestratorState) -> str:
    """Decide próximo nó após execução."""
    pending = state.get("current_tasks", [])
    if pending:
        return "execute"          # ainda tem tarefas → continua executando
    return "supervise"            # acabou → vai supervisionar


def route_after_supervise(state: OrchestratorState) -> str:
    if state.get("should_escalate"):
        return END                # escala para humano → encerra
    pending = state.get("current_tasks", [])
    if pending:
        return "execute"          # retry tasks → executa de novo
    return "synthesize"           # tudo ok → sintetiza


# ============================================================
# CONSTRUÇÃO DO GRAFO
# ============================================================

def build_orchestrator_graph() -> StateGraph:
    graph = StateGraph(OrchestratorState)

    # Adiciona nós
    graph.add_node("plan", plan_node)
    graph.add_node("execute", execute_node)
    graph.add_node("supervise", supervise_node)
    graph.add_node("synthesize", synthesize_node)

    # Edges fixos
    graph.set_entry_point("plan")
    graph.add_edge("plan", "execute")

    # Edges condicionais
    graph.add_conditional_edges(
        "execute",
        route_after_execute,
        {"execute": "execute", "supervise": "supervise"}
    )
    graph.add_conditional_edges(
        "supervise",
        route_after_supervise,
        {"execute": "execute", "synthesize": "synthesize", END: END}
    )
    graph.add_edge("synthesize", END)

    return graph.compile()


# Uso:
# app = build_orchestrator_graph()
# result = await app.ainvoke({
#     "session_id": "sess_001",
#     "objective": "Análise competitiva do mercado SaaS B2B",
#     "messages": [], "plan": None, "current_tasks": [],
#     "completed_tasks": [], "failed_tasks": [],
#     "retry_count": 0, "final_result": None,
#     "should_escalate": False, "budget_remaining": 10.0
# })
3

CrewAI — Times de Agentes Especializados

CrewAI modela times de agentes com papéis, objetivos e ferramentas específicas. É ideal para tarefas que mapeiam naturalmente para "quem faz o quê" — um time de pesquisa + analista + redator, cada um com suas responsabilidades.

# pip install crewai crewai-tools

from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, ScrapeWebsiteTool
from langchain_anthropic import ChatAnthropic

# ============================================================
# FERRAMENTAS DOS AGENTES
# ============================================================

web_search = SerperDevTool()
web_scraper = ScrapeWebsiteTool()

# Modelo para cada papel
haiku = ChatAnthropic(model="claude-3-haiku-20240307")
sonnet = ChatAnthropic(model="claude-3-sonnet-20240229")
opus = ChatAnthropic(model="claude-3-opus-20240229")


# ============================================================
# DEFINIÇÃO DO TIME
# ============================================================

class CompetitiveAnalysisCrew:
    """Time especializado em análise competitiva."""

    def build_agents(self):
        """Cria os agentes do time."""

        # Agente 1: Pesquisador — busca dados brutos
        researcher = Agent(
            role="Pesquisador de Mercado Sênior",
            goal="Coletar dados precisos e atuais sobre competidores",
            backstory="""Especialista em pesquisa de mercado com 10 anos
            de experiência em SaaS B2B. Extremamente focado em fatos
            verificáveis e fontes primárias.""",
            tools=[web_search, web_scraper],
            llm=haiku,  # modelo mais barato para pesquisa
            verbose=True,
            max_iter=5,
            memory=True,  # lembra do contexto anterior
        )

        # Agente 2: Analista — interpreta os dados
        analyst = Agent(
            role="Analista Estratégico",
            goal="Transformar dados brutos em insights acionáveis",
            backstory="""MBA pela Wharton, ex-McKinsey. Especialista em
            análise SWOT, Porter's Five Forces e posicionamento de mercado.
            Identifica padrões que outros não veem.""",
            tools=[],  # análise é intelectual, não usa ferramentas
            llm=sonnet,  # modelo médio para análise
            verbose=True,
            allow_delegation=True,  # pode delegar pesquisa adicional
        )

        # Agente 3: Redator — produz o relatório final
        writer = Agent(
            role="Redator Executivo",
            goal="Produzir relatório claro e persuasivo para C-level",
            backstory="""Especialista em comunicação executiva. Transforma
            análises complexas em narrativas claras com recommendations
            práticas e priorizadas por impacto.""",
            tools=[],
            llm=opus,  # modelo mais potente para síntese
            verbose=True,
        )

        return researcher, analyst, writer

    def build_tasks(self, objective: str, companies: list, agents: tuple):
        """Cria as tarefas do time."""
        researcher, analyst, writer = agents

        research_task = Task(
            description=f"""
Pesquise os seguintes competidores: {', '.join(companies)}
Para cada um, colete:
1. Modelo de negócio e pricing
2. Funcionalidades principais vs. secundárias
3. Posicionamento e mensagem de marketing
4. Reviews de clientes (G2, Capterra)
5. Notícias recentes (últimos 6 meses)
6. Estimativa de tamanho de time e funding

Objetivo geral: {objective}
            """,
            expected_output="Relatório de pesquisa estruturado com dados verificáveis por competidor",
            agent=researcher,
        )

        analysis_task = Task(
            description=f"""
Com base nos dados de pesquisa fornecidos:
1. Mapa de posicionamento (eixos preço vs. funcionalidade)
2. SWOT comparativo (nossa empresa vs. cada competidor)
3. Gaps de mercado não atendidos
4. Top 3 ameaças imediatas e top 3 oportunidades
5. Forecast: quem vai ganhar nos próximos 2 anos e por quê

Objetivo: {objective}
            """,
            expected_output="Análise estratégica com insights priorizados e evidenciados",
            agent=analyst,
            context=[research_task],  # depende da pesquisa
        )

        report_task = Task(
            description="""
Produza um relatório executivo de 5 páginas:
- Executive Summary (1 parágrafo, 3 bullets de recomendação)
- Landscape Competitivo (tabela visual)
- Análise SWOT
- Oportunidades de diferenciação
- Roadmap de resposta recomendado (90 dias)
- Apêndice: dados de suporte

Formato: Markdown estruturado, pronto para Notion/Confluence.
            """,
            expected_output="Relatório executivo completo em Markdown",
            agent=writer,
            context=[research_task, analysis_task],
        )

        return [research_task, analysis_task, report_task]

    def run(self, objective: str, companies: list) -> str:
        agents = self.build_agents()
        tasks = self.build_tasks(objective, companies, agents)

        crew = Crew(
            agents=list(agents),
            tasks=tasks,
            process=Process.sequential,  # researcher → analyst → writer
            verbose=True,
            memory=True,           # memória compartilhada entre agentes
            embedder={
                "provider": "anthropic",
                "config": {"model": "voyage-3"}
            }
        )

        result = crew.kickoff()
        return result.raw


# Uso:
# crew = CompetitiveAnalysisCrew()
# report = crew.run(
#     objective="Entender posição competitiva no mercado de AI assistants B2B",
#     companies=["Notion AI", "Jasper AI", "Copy.ai", "Writer.com"]
# )
4

Ports e Adapters — Interface entre Core e Infraestrutura

A chave da arquitetura hexagonal é definir interfaces (ports) que o core usa, e implementações (adapters) que conectam à infraestrutura real. O core nunca importa Redis, Postgres ou httpx diretamente.

from abc import ABC, abstractmethod
from typing import Optional, Dict, Any, List
from dataclasses import dataclass

# ============================================================
# PORTS (interfaces que o core conhece)
# ============================================================

class StatePort(ABC):
    """Port para gerenciamento de estado."""

    @abstractmethod
    async def save_task_state(self, session_id: str, task_id: str, state: dict) -> None:
        pass

    @abstractmethod
    async def get_task_state(self, session_id: str, task_id: str) -> Optional[dict]:
        pass

    @abstractmethod
    async def list_pending_tasks(self, session_id: str) -> List[str]:
        pass


class LLMPort(ABC):
    """Port para chamadas LLM."""

    @abstractmethod
    async def complete(
        self, messages: List[dict], model: str, max_tokens: int = 2048
    ) -> str:
        pass

    @abstractmethod
    def estimate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        pass


class MetricsPort(ABC):
    """Port para emissão de métricas."""

    @abstractmethod
    def increment_counter(self, name: str, labels: dict, value: float = 1.0) -> None:
        pass

    @abstractmethod
    def observe_histogram(self, name: str, value: float, labels: dict) -> None:
        pass

    @abstractmethod
    def set_gauge(self, name: str, value: float, labels: dict) -> None:
        pass


class NotificationPort(ABC):
    """Port para notificações (Slack, email, etc)."""

    @abstractmethod
    async def send_alert(self, channel: str, message: str, severity: str) -> None:
        pass


# ============================================================
# ADAPTERS CONCRETOS
# ============================================================

import redis.asyncio as aioredis

class RedisStateAdapter(StatePort):
    """Adapter de estado usando Redis."""

    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = aioredis.from_url(redis_url, decode_responses=True)

    async def save_task_state(self, session_id: str, task_id: str, state: dict) -> None:
        import json
        key = f"session:{session_id}:task:{task_id}"
        await self.redis.setex(key, 3600, json.dumps(state))

    async def get_task_state(self, session_id: str, task_id: str) -> Optional[dict]:
        import json
        key = f"session:{session_id}:task:{task_id}"
        data = await self.redis.get(key)
        return json.loads(data) if data else None

    async def list_pending_tasks(self, session_id: str) -> List[str]:
        pattern = f"session:{session_id}:task:*"
        keys = await self.redis.keys(pattern)
        return [k.split(":")[-1] for k in keys]


class MockStateAdapter(StatePort):
    """Adapter de estado em memória para testes."""

    def __init__(self):
        self._store: Dict[str, dict] = {}

    async def save_task_state(self, session_id: str, task_id: str, state: dict) -> None:
        self._store[f"{session_id}:{task_id}"] = state

    async def get_task_state(self, session_id: str, task_id: str) -> Optional[dict]:
        return self._store.get(f"{session_id}:{task_id}")

    async def list_pending_tasks(self, session_id: str) -> List[str]:
        prefix = f"{session_id}:"
        return [k.split(":")[1] for k in self._store if k.startswith(prefix)]


class AnthropicLLMAdapter(LLMPort):
    """Adapter LLM para Anthropic Claude."""

    MODEL_COSTS = {
        "claude-3-haiku-20240307":    {"input": 0.25,  "output": 1.25},
        "claude-3-sonnet-20240229":   {"input": 3.0,   "output": 15.0},
        "claude-3-opus-20240229":     {"input": 15.0,  "output": 75.0},
    }

    def __init__(self, api_key: str):
        import anthropic
        self.client = anthropic.AsyncAnthropic(api_key=api_key)

    async def complete(self, messages: List[dict], model: str, max_tokens: int = 2048) -> str:
        resp = await self.client.messages.create(
            model=model,
            max_tokens=max_tokens,
            messages=messages
        )
        return resp.content[0].text

    def estimate_cost(self, input_tokens: int, output_tokens: int, model: str) -> float:
        costs = self.MODEL_COSTS.get(model, {"input": 3.0, "output": 15.0})
        return (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000


class PrometheusMetricsAdapter(MetricsPort):
    """Adapter de métricas para Prometheus."""

    def __init__(self):
        from prometheus_client import Counter, Histogram, Gauge
        self._counters: Dict[str, Counter] = {}
        self._histograms: Dict[str, Histogram] = {}
        self._gauges: Dict[str, Gauge] = {}

    def increment_counter(self, name: str, labels: dict, value: float = 1.0) -> None:
        from prometheus_client import Counter
        if name not in self._counters:
            self._counters[name] = Counter(name, name, list(labels.keys()))
        self._counters[name].labels(**labels).inc(value)

    def observe_histogram(self, name: str, value: float, labels: dict) -> None:
        from prometheus_client import Histogram
        if name not in self._histograms:
            self._histograms[name] = Histogram(name, name, list(labels.keys()))
        self._histograms[name].labels(**labels).observe(value)

    def set_gauge(self, name: str, value: float, labels: dict) -> None:
        from prometheus_client import Gauge
        if name not in self._gauges:
            self._gauges[name] = Gauge(name, name, list(labels.keys()))
        self._gauges[name].labels(**labels).set(value)


# ============================================================
# DEPENDENCY INJECTION — montagem da aplicação
# ============================================================

@dataclass
class OrchestratorDeps:
    """Container de dependências — injetado no core."""
    state: StatePort
    llm: LLMPort
    metrics: MetricsPort
    notifications: NotificationPort


def create_production_deps(config: dict) -> OrchestratorDeps:
    """Factory para dependências de produção."""
    # Import aqui para não quebrar em testes
    from slack_adapter import SlackNotificationAdapter

    return OrchestratorDeps(
        state=RedisStateAdapter(config["redis_url"]),
        llm=AnthropicLLMAdapter(config["anthropic_api_key"]),
        metrics=PrometheusMetricsAdapter(),
        notifications=SlackNotificationAdapter(config["slack_webhook"])
    )


def create_test_deps() -> OrchestratorDeps:
    """Factory para dependências de teste (sem infraestrutura)."""
    from unittest.mock import AsyncMock

    mock_notif = AsyncMock(spec=NotificationPort)
    mock_llm = AsyncMock(spec=LLMPort)
    mock_llm.complete.return_value = '{"tasks": [{"id": "t1", "type": "research"}]}'
    mock_llm.estimate_cost.return_value = 0.001

    return OrchestratorDeps(
        state=MockStateAdapter(),
        llm=mock_llm,
        metrics=type('NoopMetrics', (MetricsPort,), {
            'increment_counter': lambda *a, **k: None,
            'observe_histogram': lambda *a, **k: None,
            'set_gauge': lambda *a, **k: None,
        })(),
        notifications=mock_notif
    )
5

FastAPI — Entry Point e API REST do Orquestrador

A API REST é o driver principal do orquestrador — o ponto de entrada para sistemas externos. FastAPI com WebSocket para updates em tempo real e endpoints para gestão de sessões.

# pip install fastapi uvicorn websockets

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional
import asyncio
import uuid
import os

app = FastAPI(
    title="AI Orchestrator API",
    version="1.0.0",
    docs_url="/docs"
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# Dependency injection via FastAPI
def get_deps() -> OrchestratorDeps:
    config = {
        "redis_url": os.getenv("REDIS_URL", "redis://localhost:6379"),
        "anthropic_api_key": os.getenv("ANTHROPIC_API_KEY"),
        "slack_webhook": os.getenv("SLACK_WEBHOOK_URL"),
    }
    return create_production_deps(config)


# ============================================================
# MODELOS DE REQUEST/RESPONSE
# ============================================================

class WorkflowRequest(BaseModel):
    objective: str
    priority: str = "normal"        # low / normal / high / critical
    department: str = "default"
    max_budget_usd: float = 5.0
    companies: Optional[list] = None  # para análise competitiva

class WorkflowResponse(BaseModel):
    session_id: str
    status: str
    message: str

class SessionStatus(BaseModel):
    session_id: str
    status: str             # planning / executing / completed / failed
    progress_pct: int
    tasks_completed: int
    tasks_total: int
    cost_so_far: float
    elapsed_seconds: float
    result: Optional[str]


# ============================================================
# ENDPOINTS
# ============================================================

@app.post("/workflows", response_model=WorkflowResponse)
async def create_workflow(
    request: WorkflowRequest,
    deps: OrchestratorDeps = Depends(get_deps)
):
    """Cria e inicia um novo workflow de orquestração."""
    session_id = f"sess_{uuid.uuid4().hex[:12]}"

    # Salva estado inicial
    await deps.state.save_task_state(session_id, "meta", {
        "objective": request.objective,
        "status": "planning",
        "priority": request.priority,
        "department": request.department,
        "budget_limit": request.max_budget_usd,
        "started_at": __import__('time').time()
    })

    # Inicia workflow em background
    asyncio.create_task(
        run_workflow(session_id, request, deps)
    )

    deps.metrics.increment_counter(
        "orchestrator_workflows_created",
        {"priority": request.priority, "department": request.department}
    )

    return WorkflowResponse(
        session_id=session_id,
        status="started",
        message=f"Workflow iniciado. Acompanhe em /sessions/{session_id}"
    )


@app.get("/sessions/{session_id}", response_model=SessionStatus)
async def get_session_status(
    session_id: str,
    deps: OrchestratorDeps = Depends(get_deps)
):
    """Retorna status atual de uma sessão."""
    meta = await deps.state.get_task_state(session_id, "meta")
    if not meta:
        raise HTTPException(status_code=404, detail="Sessão não encontrada")

    import time
    return SessionStatus(
        session_id=session_id,
        status=meta.get("status", "unknown"),
        progress_pct=meta.get("progress_pct", 0),
        tasks_completed=meta.get("tasks_completed", 0),
        tasks_total=meta.get("tasks_total", 0),
        cost_so_far=meta.get("cost_so_far", 0.0),
        elapsed_seconds=time.time() - meta.get("started_at", time.time()),
        result=meta.get("final_result")
    )


@app.websocket("/sessions/{session_id}/stream")
async def stream_session(
    websocket: WebSocket,
    session_id: str,
    deps: OrchestratorDeps = Depends(get_deps)
):
    """WebSocket para receber updates em tempo real."""
    await websocket.accept()
    try:
        while True:
            meta = await deps.state.get_task_state(session_id, "meta")
            if meta:
                await websocket.send_json(meta)
                if meta.get("status") in ["completed", "failed", "escalated"]:
                    break
            await asyncio.sleep(2)  # poll a cada 2 segundos
    except WebSocketDisconnect:
        pass


@app.get("/health")
async def health():
    return {"status": "ok", "version": "1.0.0"}


async def run_workflow(session_id: str, request: WorkflowRequest, deps: OrchestratorDeps):
    """Executa workflow completo em background."""
    # Em prod: instancia o grafo LangGraph e executa
    # orchestrator = build_orchestrator_graph()
    # result = await orchestrator.ainvoke(initial_state)
    pass
6

Docker Compose — Deploy Completo do Stack

Com toda a arquitetura definida, o deploy em produção envolve orquestrar todos os componentes: a API, os workers assíncronos, Redis, Postgres e o stack de observabilidade.

# docker-compose.prod.yml — Deploy completo

version: '3.8'
services:

  # API do orquestrador
  orchestrator-api:
    build:
      context: .
      dockerfile: Dockerfile
    ports:
      - "8000:8000"
      - "8001:8001"   # Prometheus metrics
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://user:pass@postgres:5432/orchestrator
      - SLACK_WEBHOOK_URL=${SLACK_WEBHOOK_URL}
      - PROMETHEUS_PORT=8001
      - ENVIRONMENT=production
    depends_on:
      - redis
      - postgres
    restart: unless-stopped
    networks:
      - app
      - monitoring
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3

  # Workers assíncronos para processamento de tasks
  orchestrator-worker:
    build:
      context: .
      dockerfile: Dockerfile.worker
    command: python worker.py
    environment:
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - REDIS_URL=redis://redis:6379
      - DATABASE_URL=postgresql://user:pass@postgres:5432/orchestrator
    depends_on:
      - redis
      - postgres
    restart: unless-stopped
    deploy:
      replicas: 3  # 3 workers paralelos
    networks:
      - app

  # Redis 7 com persistência
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
      - ./redis.conf:/etc/redis/redis.conf
    command: redis-server /etc/redis/redis.conf
    restart: unless-stopped
    networks:
      - app

  # Postgres 16
  postgres:
    image: postgres:16-alpine
    environment:
      - POSTGRES_DB=orchestrator
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
      - ./init.sql:/docker-entrypoint-initdb.d/init.sql
    restart: unless-stopped
    networks:
      - app

  # Observabilidade
  prometheus:
    image: prom/prometheus:latest
    volumes:
      - ./prometheus.yml:/etc/prometheus/prometheus.yml
      - prometheus_data:/prometheus
    networks:
      - monitoring

  grafana:
    image: grafana/grafana:latest
    ports:
      - "3000:3000"
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/provisioning:/etc/grafana/provisioning
    networks:
      - monitoring

  tempo:
    image: grafana/tempo:latest
    volumes:
      - ./tempo.yml:/etc/tempo.yml
      - tempo_data:/tmp/tempo
    networks:
      - monitoring

  loki:
    image: grafana/loki:latest
    volumes:
      - loki_data:/loki
    networks:
      - monitoring

  # Nginx como reverse proxy
  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./certs:/etc/nginx/certs
    depends_on:
      - orchestrator-api
    networks:
      - app

volumes:
  redis_data:
  postgres_data:
  prometheus_data:
  grafana_data:
  tempo_data:
  loki_data:

networks:
  app:
    driver: bridge
  monitoring:
    driver: bridge

Resumo do Módulo 5.9

  • Arquitetura hexagonal: core puro de domínio + adapters para infraestrutura — testável e evolutiva
  • LangGraph para workflows com branching condicional (replanejamento, retry, escalada)
  • CrewAI para times especializados com papéis, memória compartilhada e delegação
  • Ports/Adapters: StatePort, LLMPort, MetricsPort — trocar Redis por DynamoDB = 1 arquivo
  • FastAPI com WebSocket para updates em tempo real + REST para integração
  • Docker Compose com 3 workers paralelos, Redis persistente, Postgres, e stack de obs completo
5.8 — Rastreamento 5.10 — Custo-Otimização