🧩 O problema que a Camada 1 resolve
Um usuário digita: "Analise a concorrência e prepare um relatório estratégico para Q2." Isso é completamente inviável de executar diretamente. Nenhum agente — por mais poderoso que seja — consegue executar esse objetivo de uma vez, em um único prompt.
Por que objetivos vagos quebram agentes
Problema 1: Escopo infinito
Sem limites claros, o agente pode tentar analisar 50 concorrentes em vez de 5, gerando custo 10x maior que o esperado.
Problema 2: Sem critério de sucesso
Como o agente sabe quando terminou? "Relatório estratégico" pode ser 1 página ou 50 páginas — sem definição, o loop nunca fecha.
Problema 3: Sem paralelismo
Sem decompor em subtarefas, tudo é executado sequencialmente. Um trabalho de 12 minutos paralelos vira 2 horas sequenciais.
O que a Camada 1 entrega
🔍 Objective Parsing
Antes de decompor, é preciso entender. O objective parsing é o processo de extrair do objetivo em linguagem natural as dimensões que tornam a execução possível e verificável.
As 4 dimensões do objetivo executável
O que fazer?
A ação concreta: "analisar concorrentes" → coletar dados de pricing, features, posicionamento, reviews de clientes de 5 empresas específicas do setor fintech.
Para quem?
O stakeholder implícito define o formato. Para o CEO: executive summary de 1 página. Para o time de produto: análise detalhada de features. Para investidores: benchmarks financeiros.
Com que critérios de sucesso?
O que define "relatório bom"? Cobertura de N empresas? Dados atualizados nos últimos 90 dias? Recomendações acionáveis? Sem isso, não há condição de parada.
Qual o formato de entrega?
PDF, Google Doc, Notion, Slack summary? A forma de entrega determina as tools necessárias e o agente de formatação adequado.
Código: objective_parser()
import anthropic
import json
client = anthropic.Anthropic()
def parse_objective(raw_objective: str) -> dict:
"""Extrai dimensões executáveis de um objetivo em linguagem natural."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
messages=[{
"role": "user",
"content": f"""Analise este objetivo e extraia as dimensões executáveis no formato JSON:
OBJETIVO: {raw_objective}
Retorne APENAS um JSON com:
{{
"acao_principal": "string - o que concretamente fazer",
"escopo": "string - limites claros do que incluir/excluir",
"stakeholder": "string - para quem é o resultado",
"criterios_sucesso": ["lista de critérios verificáveis"],
"formato_entrega": "string - como entregar o resultado",
"estimativa_subtarefas": "number - quantas subtarefas estimar"
}}"""
}]
)
return json.loads(response.content[0].text)
# Exemplo de uso:
objetivo = "analise a concorrência do setor fintech e prepare relatório estratégico para Q2"
parsed = parse_objective(objetivo)
print(json.dumps(parsed, indent=2, ensure_ascii=False))
Dica Prática
Sempre faça o parsing com um modelo capaz de raciocinar (Claude Sonnet ou superior). O resultado do parsing guia todo o restante da execução — um parsing ruim gera um plano ruim que gera execução ruim. Não economize aqui.
🧩 Task Decomposition Automática
Com o objetivo parseado, o próximo passo é decompô-lo em subtarefas atômicas — cada uma pequena o suficiente para ser executada por um único agente em uma chamada de LLM, com critério de conclusão verificável.
Critérios de uma boa subtarefa
- ✓Atômica: não pode ser dividida em partes menores sem perder sentido
- ✓Verificável: tem um critério claro de "concluída" (ex: "relatório de N páginas gerado")
- ✓Independente: define claramente quais subtarefas devem preceder
- ✓Executável: um agente com as tools corretas pode completar em <2 minutos
Erros comuns na decomposição
- ✗Subtarefa vaga: "pesquisar concorrentes" — o quê exatamente, onde, quantos?
- ✗Sobreposição: duas subtarefas cobrindo o mesmo trabalho gera duplicação
- ✗Granularidade errada: subtarefa tão pequena que gera overhead de orquestração desnecessário
- ✗Lacuna: subtarefas que não cobrem todo o objetivo — entrega incompleta garantida
Código: decompose_objective()
def decompose_objective(parsed_objective: dict) -> list[dict]:
"""Decompõe objetivo parseado em subtarefas atômicas."""
prompt = f"""Com base neste objetivo estruturado, gere subtarefas atômicas:
OBJETIVO: {json.dumps(parsed_objective, ensure_ascii=False)}
Gere uma lista de subtarefas. Cada subtarefa deve ter:
- id: string único (ex: "T01", "T02")
- titulo: string descritivo curto
- descricao: o que exatamente fazer
- agente_tipo: qual tipo de agente pode executar (pesquisa/analise/redacao/formatacao)
- dependencias: lista de IDs de subtarefas que devem completar antes
- duracao_estimada_seg: estimativa em segundos
- custo_estimado_usd: estimativa em dólares
Retorne JSON array de subtarefas. Garanta completude (sem lacunas) e sem sobreposição."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}]
)
tasks = json.loads(response.content[0].text)
# Validação básica
ids = {t["id"] for t in tasks}
for task in tasks:
for dep in task.get("dependencias", []):
assert dep in ids, f"Dependência {dep} inválida em {task['id']}"
return tasks
🌐 Dependency Graph (DAG)
Com as subtarefas definidas, o próximo passo é construir o Directed Acyclic Graph (DAG) — a estrutura que representa quais tarefas bloqueiam outras e quais podem ser executadas em paralelo.
Anatomia de um DAG de execução
Código: build_dag() e topological_sort()
from collections import defaultdict, deque
class ExecutionDAG:
def __init__(self, tasks: list[dict]):
self.tasks = {t["id"]: t for t in tasks}
self.graph = defaultdict(list) # id -> [dependentes]
self.in_degree = defaultdict(int) # id -> número de dependências
self._build()
def _build(self):
for task_id in self.tasks:
self.in_degree[task_id] = 0
for task in self.tasks.values():
for dep in task.get("dependencias", []):
self.graph[dep].append(task["id"])
self.in_degree[task["id"]] += 1
def get_ready_tasks(self, completed: set) -> list[str]:
"""Retorna IDs das tarefas prontas para execução (dependências satisfeitas)."""
ready = []
for task_id, task in self.tasks.items():
if task_id in completed:
continue
deps = set(task.get("dependencias", []))
if deps.issubset(completed):
ready.append(task_id)
return ready
def topological_order(self) -> list[str]:
"""Retorna ordem de execução respeitando dependências (Kahn's algorithm)."""
in_deg = dict(self.in_degree)
queue = deque([t for t in in_deg if in_deg[t] == 0])
order = []
while queue:
node = queue.popleft()
order.append(node)
for neighbor in self.graph[node]:
in_deg[neighbor] -= 1
if in_deg[neighbor] == 0:
queue.append(neighbor)
if len(order) != len(self.tasks):
raise ValueError("DAG contém ciclo — plano inválido!")
return order
def estimate_total_time(self) -> float:
"""Estima tempo total considerando paralelismo máximo."""
# Simplificado: baseado no caminho crítico
return max(
self.tasks[t]["duracao_estimada_seg"]
for t in self.tasks
) * 1.5 # fator de overhead
Por que DAG e não apenas lista ordenada?
Uma lista ordenada executa sequencialmente — 12 tarefas de 30 segundos cada = 6 minutos. O DAG identifica quais tarefas são independentes e podem rodar em paralelo — essas mesmas 12 tarefas podem completar em 90 segundos com 4 agentes paralelos. O ganho de tempo é quadrático com o número de agentes disponíveis.
⚖️ Priorização Inteligente
Quando múltiplas tarefas estão desbloqueadas e disponíveis para execução, qual o orquestrador deve executar primeiro? A resposta impacta diretamente o tempo total e a detecção precoce de bloqueios.
Critério 1: Caminho crítico
A tarefa que, se atrasada, atrasa o resultado final. Identificada pelo maior tempo acumulado de dependentes downstream. Execute tarefas do caminho crítico primeiro — sempre.
Critério 2: Desbloqueio de downstream
Quantas outras tarefas essa tarefa desbloqueará quando concluída? Tarefas que desbloqueiam muitas outras têm prioridade sobre tarefas que não bloqueiam nenhuma.
priority = num_dependentes * 0.5 + (1/duracao) * 0.3 + criticidade * 0.2
Critério 3: Fail-fast
Execute primeiro tarefas com maior probabilidade de falha (acesso a APIs externas, dados incertos). Se vão falhar, melhor saber cedo — antes de gastar tokens em análises que dependem desses dados.
Implementação: priority_queue
import heapq
class PriorityTaskScheduler:
def __init__(self, dag: ExecutionDAG):
self.dag = dag
self.heap = [] # min-heap (negativo para max-priority)
self.completed = set()
self.running = set()
def _calculate_priority(self, task_id: str) -> float:
task = self.dag.tasks[task_id]
downstream = len(self.dag.graph[task_id])
duration = task.get("duracao_estimada_seg", 60)
# Score composto: mais downstream = maior prioridade
score = downstream * 2.0 + (1.0 / duration) * 10
return score
def get_next_batch(self, max_parallel: int = 4) -> list[str]:
"""Retorna próximo lote de tarefas para execução paralela."""
ready = self.dag.get_ready_tasks(self.completed | self.running)
# Ordena por prioridade (maior primeiro)
prioritized = sorted(ready,
key=self._calculate_priority,
reverse=True
)
# Retorna até max_parallel tarefas
batch = prioritized[:max_parallel]
self.running.update(batch)
return batch
def mark_done(self, task_id: str):
self.running.discard(task_id)
self.completed.add(task_id)
def is_complete(self) -> bool:
return len(self.completed) == len(self.dag.tasks)
📋 Exemplo Real: Due Diligence de Empresa X
Vamos percorrer o fluxo completo da Camada 1 com um caso de uso real: "Prepare due diligence completa da empresa Acme Corp para avaliação de aquisição."
Passo 1: Parsing do objetivo
{
"acao_principal": "Análise de due diligence para aquisição",
"escopo": "Empresa Acme Corp — financeiro, jurídico, técnico, mercado",
"stakeholder": "Comitê de M&A e CFO",
"criterios_sucesso": [
"Cobertura de 4 dimensões: financeiro, jurídico, técnico, mercado",
"Dados dos últimos 3 anos financeiros",
"Identificação de red flags e riscos",
"Recomendação: avançar / não avançar / avançar com condições"
],
"formato_entrega": "PDF executivo + planilha de dados detalhados",
"estimativa_subtarefas": 15
}
Passo 2: Subtarefas geradas (15 total)
Passo 3: Plano de execução em ondas
Resultado: 15 tarefas completadas em ~6.5 minutos com paralelismo máximo. Sequencial levaria ~25 minutos. Custo estimado: $2.40 (tokens totais ~800k, mix de modelos).