MÓDULO 4.7

⚡ Execução Paralela

Fan-out distribui para múltiplos agentes simultâneos. Fan-in combina os resultados. asyncio.gather() implementa tudo em Python. Aprenda timeout, cancellation e como controlar o custo de paralelismo sem sacrificar performance.

6
Tópicos
45
Minutos
Interm.
Nível
Código
Tipo
1

🚀 Por que paralelismo em agentes

A lei de Amdahl aplicada a agentes: o speedup de um sistema é limitado pelo componente sequencial. Se 90% das tarefas são paralelizáveis, você pode obter até 10x de speedup com paralelismo. Isso transforma um processo de 10 minutos em 1 minuto — sem aumentar qualidade, mas eliminando espera.

🚀 Quando o paralelismo compensa

Tarefas independentes

A execução de cada tarefa não depende do resultado das outras. Podem rodar simultaneamente sem conflito.

Latência > Custo

O usuário espera o resultado e tempo importa mais que o custo adicional de múltiplos agentes.

Volume alto

Muitos itens para processar (documentos, leads, análises) onde sequencial levaria tempo inaceitável.

⚠️ Quando NÃO usar paralelismo

  • Tarefas dependentes entre si: B precisa do resultado de A antes de começar — paralelismo não ajuda aqui
  • Custo não justificado: 10 agentes custam 10x mais — se o usuário pode esperar, economize
  • Rate limits apertados: 10 agentes simultâneos podem bater no rate limit da API e todos ficarem lentos
2

📤 Fan-out pattern

Fan-out é o momento em que um input único se divide em múltiplas execuções paralelas. O coordenador divide o trabalho em partes, distribui para N agentes e aguarda a conclusão de todos (ou um subconjunto, dependendo da estratégia).

📤 Estratégias de particionamento

1

Particionamento por item

Cada agente processa um item diferente. 10 documentos → 10 agentes, cada um com 1 documento. Mais simples e mais comum.

2

Particionamento por aspecto

Cada agente analisa um aspecto diferente do mesmo item. Agente A analisa performance, Agente B analisa segurança, Agente C analisa UX — todos do mesmo produto.

3

Particionamento por batch

100 documentos divididos em 10 batches de 10. Cada agente processa um batch. Útil quando o número de itens é muito maior que o número de agentes.

📊 Controlando concorrência com semáforo

# Sem semáforo: 100 agentes simultâneos → rate limit

# Com semáforo: máximo 10 simultâneos → estável

semaphore = asyncio.Semaphore(10) # max 10 paralelos

async def run_with_limit(agent, task):

async with semaphore:

return await agent.run(task)

3

📥 Fan-in pattern

O fan-in é onde a qualidade do resultado final é definida. Você pode ter 10 análises paralelas excelentes e desperdiçar tudo com uma estratégia de combinação ruim. A lógica de merge é subestimada e é onde a maioria dos bugs de sistemas paralelos se esconde.

Estratégias de fan-in

Concatenação (merge)

Une todos os resultados em uma lista ou documento. Simples, mas pode gerar duplicatas ou inconsistências.

Votação (vote)

Cada agente "vota" numa resposta. A maioria vence. Bom para classificações onde agentes podem discordar.

Sumarização (summarize)

Um LLM lê todos os resultados e gera uma síntese coerente. Mais caro, melhor qualidade de combinação.

Agregação (aggregate)

Operação matemática sobre resultados numéricos (média, soma, max). Perfeito para métricas.

Tratando resultado parcial

O que acontece quando 2 de 10 agentes falham?

Parcial aceitável: retorna o resultado dos 8 que funcionaram com flag "partial"
Threshold mínimo: precisa de pelo menos 7 de 10 para ser válido, senão falha total
Retry dos falhos: reexecuta só os 2 que falharam antes de fazer o fan-in
4

🐍 asyncio na prática com agentes

Chamadas LLM são I/O-bound — o Python fica esperando a resposta da API. asyncio foi criado exatamente para isso: executar múltiplas operações de I/O simultaneamente sem bloqueio. É o mecanismo nativo Python para paralelismo de agentes.

Python — Fan-out e fan-in com asyncio

import asyncio
from typing import List, Any

async def analyze_document(agent, document: str) -> dict:
    """Agente analisa um documento"""
    result = await agent.run(
        f"Analise este documento: {document}"
    )
    return {"doc": document[:50], "analysis": result}

async def parallel_analyze(agents, documents: List[str]) -> List[dict]:
    """Fan-out: distribui, aguarda, retorna todos"""
    semaphore = asyncio.Semaphore(5)  # max 5 simultâneos

    async def bounded_analyze(agent, doc):
        async with semaphore:
            return await analyze_document(agent, doc)

    # Fan-out: todos os documentos em paralelo
    tasks = [
        bounded_analyze(agents[i % len(agents)], doc)
        for i, doc in enumerate(documents)
    ]

    # return_exceptions=True: não cancela os outros se um falha
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Fan-in: filtra erros, retorna resultados válidos
    valid = [r for r in results if not isinstance(r, Exception)]
    failed = len(results) - len(valid)

    if failed > 0:
        print(f"⚠️ {failed} agentes falharam de {len(results)}")

    return valid

📊 asyncio.TaskGroup (Python 3.11+)

# TaskGroup: cancela todos se qualquer um falha

async with asyncio.TaskGroup() as tg:

tasks = [tg.create_task(agent.run(doc))

for doc in documents]

# Após o bloco: todos concluídos ou exceção levantada

results = [t.result() for t in tasks]

5

⏱️ Timeout e cancellation

Em um sistema de 10 agentes paralelos, é praticamente garantido que algum vai demorar mais que os outros — por rate limit, contexto mais complexo ou simplesmente variação natural da API. Timeout é o mecanismo que impede que um agente lento bloqueie o resultado de todos os outros.

⏱️ Implementando timeout por agente

async def run_with_timeout(agent, task, timeout_s=60):

try:

result = await asyncio.wait_for(

agent.run(task),

timeout=timeout_s

)

return {"status": "done", "result": result}

except asyncio.TimeoutError:

# Cancela e retorna resultado parcial com flag

return {"status": "timeout", "result": None}

# asyncio.wait para "primeiro que terminar" ou "N completos"

done, pending = await asyncio.wait(

tasks,

timeout=90, # timeout global

return_when=asyncio.ALL_COMPLETED

)

# Cancela os pendentes

for task in pending:

task.cancel()

💡 Como calibrar o timeout

Use o P95 de latência histórica como base: se 95% das chamadas terminam em 30s, use timeout de 45s (1.5x). Isso elimina outliers sem cancelar casos legítimos que demoram um pouco mais.

O que fazer com o resultado parcial

→ Retornar resultado dos que terminaram com flag "partial"

→ Retry assíncrono dos que fizeram timeout

→ Escalar para humano se resultado parcial for insuficiente

→ Logar o timeout para análise de P95 histórico

6

💰 Custo do paralelismo

A equação financeira do paralelismo é simples: N agentes paralelos custam aproximadamente N× o custo de 1 agente. Você paga por tempo de cada agente independentemente. A questão não é se paralelo é mais caro (é), mas se o benefício de tempo justifica o custo adicional.

💰 Calculando ROI do paralelismo

Exemplo: 100 análises

Sequencial: 100 análises × 60s = 100min | $5.00

Paralelo (10 agentes): ~10min | $5.00

Mesmo custo, 10x mais rápido

Otimização por tier de modelo

Haiku (análise simples): $0.25 cada

Sonnet (análise média): $0.50 cada

Opus (análise complexa): $2.50 cada

Use o modelo certo para cada subtarefa

Estratégias de otimização de custo

  • Batching: agrupe chamadas pequenas em uma chamada maior
  • Tier de modelo: use haiku para tarefas simples e opus só para complexas
  • Caching: resultados idênticos não precisam ser reprocessados
  • Semáforo: limite de concorrência previne explosão de custo

Budget por execução paralela

Defina um budget máximo antes de iniciar o fan-out:

MAX_BUDGET = 10.00 # USD

estimated = n_agents * cost_per_agent

if estimated > MAX_BUDGET:

n_agents = int(MAX_BUDGET / cost_per_agent)

# Ajusta paralelismo ao budget

Resumo do Módulo 4.7

Paralelismo se justifica quando tarefas são independentes e latência importa mais que custo
Fan-out com semáforo distribui para N agentes sem bater em rate limit da API
Fan-in tem 4 estratégias — escolha baseada no tipo de dado a combinar
asyncio.gather() com return_exceptions é o padrão Python para fan-out não-bloqueante
Custo = N × custo unitário — tier de modelo por subtarefa e batching são as principais alavancas

Próximo Módulo:

4.8 — Tratamento de Erros: os tipos específicos de falha em agentes e como implementar retry, fallback, circuit breaker e escalada para humano.