MÓDULO 5.1

🎯 Camada 1 — Planejamento Inteligente

Transformar "analise a concorrência e prepare um relatório estratégico" em um plano executável com 12 tarefas, dependências explícitas e estimativa de custo. Essa é a responsabilidade da Camada 1.

6
Tópicos
50
Minutos
Avançado
Nível
Código
Tipo
1

🧩 O problema que a Camada 1 resolve

Um usuário digita: "Analise a concorrência e prepare um relatório estratégico para Q2." Isso é completamente inviável de executar diretamente. Nenhum agente — por mais poderoso que seja — consegue executar esse objetivo de uma vez, em um único prompt.

Por que objetivos vagos quebram agentes

Problema 1: Escopo infinito

Sem limites claros, o agente pode tentar analisar 50 concorrentes em vez de 5, gerando custo 10x maior que o esperado.

Problema 2: Sem critério de sucesso

Como o agente sabe quando terminou? "Relatório estratégico" pode ser 1 página ou 50 páginas — sem definição, o loop nunca fecha.

Problema 3: Sem paralelismo

Sem decompor em subtarefas, tudo é executado sequencialmente. Um trabalho de 12 minutos paralelos vira 2 horas sequenciais.

O que a Camada 1 entrega

Objetivo analisado: dimensões executáveis extraídas — o quê, para quem, com que critérios, em que formato
Plano estruturado: lista de subtarefas atômicas, cada uma executável por um agente em uma chamada
DAG de dependências: grafo de quais tarefas bloqueiam outras e quais podem rodar em paralelo
Estimativa de custo: custo esperado em tokens e dólares antes de executar qualquer tarefa
2

🔍 Objective Parsing

Antes de decompor, é preciso entender. O objective parsing é o processo de extrair do objetivo em linguagem natural as dimensões que tornam a execução possível e verificável.

As 4 dimensões do objetivo executável

1

O que fazer?

A ação concreta: "analisar concorrentes" → coletar dados de pricing, features, posicionamento, reviews de clientes de 5 empresas específicas do setor fintech.

2

Para quem?

O stakeholder implícito define o formato. Para o CEO: executive summary de 1 página. Para o time de produto: análise detalhada de features. Para investidores: benchmarks financeiros.

3

Com que critérios de sucesso?

O que define "relatório bom"? Cobertura de N empresas? Dados atualizados nos últimos 90 dias? Recomendações acionáveis? Sem isso, não há condição de parada.

4

Qual o formato de entrega?

PDF, Google Doc, Notion, Slack summary? A forma de entrega determina as tools necessárias e o agente de formatação adequado.

Código: objective_parser()

import anthropic
import json

client = anthropic.Anthropic()

def parse_objective(raw_objective: str) -> dict:
    """Extrai dimensões executáveis de um objetivo em linguagem natural."""

    response = client.messages.create(
        model="claude-opus-4-6",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": f"""Analise este objetivo e extraia as dimensões executáveis no formato JSON:

OBJETIVO: {raw_objective}

Retorne APENAS um JSON com:
{{
  "acao_principal": "string - o que concretamente fazer",
  "escopo": "string - limites claros do que incluir/excluir",
  "stakeholder": "string - para quem é o resultado",
  "criterios_sucesso": ["lista de critérios verificáveis"],
  "formato_entrega": "string - como entregar o resultado",
  "estimativa_subtarefas": "number - quantas subtarefas estimar"
}}"""
        }]
    )

    return json.loads(response.content[0].text)

# Exemplo de uso:
objetivo = "analise a concorrência do setor fintech e prepare relatório estratégico para Q2"
parsed = parse_objective(objetivo)
print(json.dumps(parsed, indent=2, ensure_ascii=False))

Dica Prática

Sempre faça o parsing com um modelo capaz de raciocinar (Claude Sonnet ou superior). O resultado do parsing guia todo o restante da execução — um parsing ruim gera um plano ruim que gera execução ruim. Não economize aqui.

3

🧩 Task Decomposition Automática

Com o objetivo parseado, o próximo passo é decompô-lo em subtarefas atômicas — cada uma pequena o suficiente para ser executada por um único agente em uma chamada de LLM, com critério de conclusão verificável.

Critérios de uma boa subtarefa

  • Atômica: não pode ser dividida em partes menores sem perder sentido
  • Verificável: tem um critério claro de "concluída" (ex: "relatório de N páginas gerado")
  • Independente: define claramente quais subtarefas devem preceder
  • Executável: um agente com as tools corretas pode completar em <2 minutos

Erros comuns na decomposição

  • Subtarefa vaga: "pesquisar concorrentes" — o quê exatamente, onde, quantos?
  • Sobreposição: duas subtarefas cobrindo o mesmo trabalho gera duplicação
  • Granularidade errada: subtarefa tão pequena que gera overhead de orquestração desnecessário
  • Lacuna: subtarefas que não cobrem todo o objetivo — entrega incompleta garantida

Código: decompose_objective()

def decompose_objective(parsed_objective: dict) -> list[dict]:
    """Decompõe objetivo parseado em subtarefas atômicas."""

    prompt = f"""Com base neste objetivo estruturado, gere subtarefas atômicas:

OBJETIVO: {json.dumps(parsed_objective, ensure_ascii=False)}

Gere uma lista de subtarefas. Cada subtarefa deve ter:
- id: string único (ex: "T01", "T02")
- titulo: string descritivo curto
- descricao: o que exatamente fazer
- agente_tipo: qual tipo de agente pode executar (pesquisa/analise/redacao/formatacao)
- dependencias: lista de IDs de subtarefas que devem completar antes
- duracao_estimada_seg: estimativa em segundos
- custo_estimado_usd: estimativa em dólares

Retorne JSON array de subtarefas. Garanta completude (sem lacunas) e sem sobreposição."""

    response = client.messages.create(
        model="claude-opus-4-6",
        max_tokens=2048,
        messages=[{"role": "user", "content": prompt}]
    )

    tasks = json.loads(response.content[0].text)

    # Validação básica
    ids = {t["id"] for t in tasks}
    for task in tasks:
        for dep in task.get("dependencias", []):
            assert dep in ids, f"Dependência {dep} inválida em {task['id']}"

    return tasks
4

🌐 Dependency Graph (DAG)

Com as subtarefas definidas, o próximo passo é construir o Directed Acyclic Graph (DAG) — a estrutura que representa quais tarefas bloqueiam outras e quais podem ser executadas em paralelo.

Anatomia de um DAG de execução

T01 Coleta de dados empresa A — sem dependências → pode iniciar imediatamente
T02 Coleta de dados empresa B — sem dependências → pode rodar paralelo com T01
T05 Análise comparativa — depende de T01, T02, T03, T04 → nó bloqueador
T08 Relatório final — depende de T05, T06, T07 → nó de saída (folha)

Código: build_dag() e topological_sort()

from collections import defaultdict, deque

class ExecutionDAG:
    def __init__(self, tasks: list[dict]):
        self.tasks = {t["id"]: t for t in tasks}
        self.graph = defaultdict(list)    # id -> [dependentes]
        self.in_degree = defaultdict(int) # id -> número de dependências
        self._build()

    def _build(self):
        for task_id in self.tasks:
            self.in_degree[task_id] = 0

        for task in self.tasks.values():
            for dep in task.get("dependencias", []):
                self.graph[dep].append(task["id"])
                self.in_degree[task["id"]] += 1

    def get_ready_tasks(self, completed: set) -> list[str]:
        """Retorna IDs das tarefas prontas para execução (dependências satisfeitas)."""
        ready = []
        for task_id, task in self.tasks.items():
            if task_id in completed:
                continue
            deps = set(task.get("dependencias", []))
            if deps.issubset(completed):
                ready.append(task_id)
        return ready

    def topological_order(self) -> list[str]:
        """Retorna ordem de execução respeitando dependências (Kahn's algorithm)."""
        in_deg = dict(self.in_degree)
        queue = deque([t for t in in_deg if in_deg[t] == 0])
        order = []

        while queue:
            node = queue.popleft()
            order.append(node)
            for neighbor in self.graph[node]:
                in_deg[neighbor] -= 1
                if in_deg[neighbor] == 0:
                    queue.append(neighbor)

        if len(order) != len(self.tasks):
            raise ValueError("DAG contém ciclo — plano inválido!")

        return order

    def estimate_total_time(self) -> float:
        """Estima tempo total considerando paralelismo máximo."""
        # Simplificado: baseado no caminho crítico
        return max(
            self.tasks[t]["duracao_estimada_seg"]
            for t in self.tasks
        ) * 1.5  # fator de overhead

Por que DAG e não apenas lista ordenada?

Uma lista ordenada executa sequencialmente — 12 tarefas de 30 segundos cada = 6 minutos. O DAG identifica quais tarefas são independentes e podem rodar em paralelo — essas mesmas 12 tarefas podem completar em 90 segundos com 4 agentes paralelos. O ganho de tempo é quadrático com o número de agentes disponíveis.

5

⚖️ Priorização Inteligente

Quando múltiplas tarefas estão desbloqueadas e disponíveis para execução, qual o orquestrador deve executar primeiro? A resposta impacta diretamente o tempo total e a detecção precoce de bloqueios.

Critério 1: Caminho crítico

A tarefa que, se atrasada, atrasa o resultado final. Identificada pelo maior tempo acumulado de dependentes downstream. Execute tarefas do caminho crítico primeiro — sempre.

Exemplo: T05 (análise comparativa) tem 3 dependentes. T01 (coleta A) não tem. Execute T05 primeiro quando ambas estiverem prontas.

Critério 2: Desbloqueio de downstream

Quantas outras tarefas essa tarefa desbloqueará quando concluída? Tarefas que desbloqueiam muitas outras têm prioridade sobre tarefas que não bloqueiam nenhuma.

Score de prioridade: priority = num_dependentes * 0.5 + (1/duracao) * 0.3 + criticidade * 0.2

Critério 3: Fail-fast

Execute primeiro tarefas com maior probabilidade de falha (acesso a APIs externas, dados incertos). Se vão falhar, melhor saber cedo — antes de gastar tokens em análises que dependem desses dados.

Implementação: priority_queue

import heapq

class PriorityTaskScheduler:
    def __init__(self, dag: ExecutionDAG):
        self.dag = dag
        self.heap = []  # min-heap (negativo para max-priority)
        self.completed = set()
        self.running = set()

    def _calculate_priority(self, task_id: str) -> float:
        task = self.dag.tasks[task_id]
        downstream = len(self.dag.graph[task_id])
        duration = task.get("duracao_estimada_seg", 60)

        # Score composto: mais downstream = maior prioridade
        score = downstream * 2.0 + (1.0 / duration) * 10
        return score

    def get_next_batch(self, max_parallel: int = 4) -> list[str]:
        """Retorna próximo lote de tarefas para execução paralela."""
        ready = self.dag.get_ready_tasks(self.completed | self.running)

        # Ordena por prioridade (maior primeiro)
        prioritized = sorted(ready,
            key=self._calculate_priority,
            reverse=True
        )

        # Retorna até max_parallel tarefas
        batch = prioritized[:max_parallel]
        self.running.update(batch)
        return batch

    def mark_done(self, task_id: str):
        self.running.discard(task_id)
        self.completed.add(task_id)

    def is_complete(self) -> bool:
        return len(self.completed) == len(self.dag.tasks)
6

📋 Exemplo Real: Due Diligence de Empresa X

Vamos percorrer o fluxo completo da Camada 1 com um caso de uso real: "Prepare due diligence completa da empresa Acme Corp para avaliação de aquisição."

Passo 1: Parsing do objetivo

{
  "acao_principal": "Análise de due diligence para aquisição",
  "escopo": "Empresa Acme Corp — financeiro, jurídico, técnico, mercado",
  "stakeholder": "Comitê de M&A e CFO",
  "criterios_sucesso": [
    "Cobertura de 4 dimensões: financeiro, jurídico, técnico, mercado",
    "Dados dos últimos 3 anos financeiros",
    "Identificação de red flags e riscos",
    "Recomendação: avançar / não avançar / avançar com condições"
  ],
  "formato_entrega": "PDF executivo + planilha de dados detalhados",
  "estimativa_subtarefas": 15
}

Passo 2: Subtarefas geradas (15 total)

T01Coleta dados financeiros públicos
T02Análise de registros jurídicos/processos
T03Auditoria de propriedade intelectual
T04Análise de tecnologia e stack técnico
T05Pesquisa de mercado e posicionamento
T06Análise de equipe e liderança
T07Levantamento de clientes e churn
T08Análise de contratos-chave
T09Síntese financeira (dep: T01)
T10Análise de riscos jurídicos (dep: T02,T03,T08)
T11Avaliação técnica consolidada (dep: T04)
T12Análise de mercado consolidada (dep: T05,T07)
T13Análise de pessoas (dep: T06)
T14Red flags e recomendações (dep: T09-T13)
T15Relatório PDF final (dep: T14)

Passo 3: Plano de execução em ondas

ONDA 1 T01, T02, T03, T04, T05, T06, T07, T08 — execução paralela (8 agentes) — ~90 seg
ONDA 2 T09, T10, T11, T12, T13 — execução paralela (5 agentes) — ~120 seg
ONDA 3 T14 (red flags) — 1 agente — ~60 seg
ONDA 4 T15 (relatório final) — 1 agente — ~120 seg

Resultado: 15 tarefas completadas em ~6.5 minutos com paralelismo máximo. Sequencial levaria ~25 minutos. Custo estimado: $2.40 (tokens totais ~800k, mix de modelos).

Resumo do Módulo 5.1

Objective parsing extrai dimensões executáveis antes de qualquer decomposição
Task decomposition gera subtarefas atômicas, verificáveis e sem sobreposição
DAG identifica paralelismo: execução 4x mais rápida com agentes paralelos
Priorização por caminho crítico minimiza o tempo total de execução
Estimativa prévia de custo e tempo permite decisão informada antes de executar