MÓDULO 3.1

🕸️ LangGraph Profundo

Domine o StateGraph, a arquitetura de nós e edges, ciclos controlados e checkpoints. O framework que dá controle total sobre o fluxo do seu agente — sem abrir mão da flexibilidade.

6
Tópicos
50
Minutos
Inter.
Nível
Código
Tipo
1

🗺️ O que é um grafo de estado

LangGraph representa o fluxo de um agente como um grafo direcionado, onde cada nó é uma função Python que recebe o estado atual e retorna uma versão atualizada. Diferente de pipelines lineares, o grafo pode ter branches, loops e pontos de decisão — exatamente como a lógica de um ser humano resolvendo um problema complexo.

🧩 Anatomia do grafo de estado

Três componentes essenciais formam qualquer LangGraph: o Estado (TypedDict com os dados compartilhados), os Nós (funções que processam o estado) e as Edges (conexões que definem o fluxo).

# 1. Estado: o coração do grafo
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator

class AgentState(TypedDict):
    mensagem: str
    historico: Annotated[list, operator.add]  # acumula
    iteracoes: int
    aprovado: bool

# 2. Nós: funções que transformam o estado
def processar(state: AgentState) -> AgentState:
    return {"iteracoes": state["iteracoes"] + 1}

# 3. Grafo: conecta tudo
grafo = StateGraph(AgentState)
grafo.add_node("processar", processar)
grafo.add_edge(START, "processar")
grafo.add_edge("processar", END)

Estado imutável por convenção

  • Retorne um novo dict com campos atualizados
  • Use Annotated[list, operator.add] para acumular listas
  • Campos não retornados mantêm valor anterior
  • TypedDict garante type safety em toda a execução

Armadilhas comuns

  • Não mutate o estado diretamente (state["x"] = ...)
  • Não esqueça de retornar campos que quer atualizar
  • Evite estado muito grande — impacta serialização
  • Não use tipos não serializáveis no estado

💡 Dica Prática

Projete o estado primeiro, antes de qualquer nó. Pense: quais dados precisam fluir pelo sistema? Quais precisam ser acumulados? O TypedDict do estado é o contrato de dados de todo o grafo.

2

🔗 Nós e edges: blocos de construção

Nós são funções Python puras — recebem o estado, fazem alguma coisa (chamar LLM, executar tool, tomar decisão) e retornam uma atualização parcial do estado. Edges são as conexões: fixas (sempre vão para o mesmo nó) ou condicionais (escolhem o próximo nó com base no estado atual).

⚙️ Edges fixas vs. condicionais

# Edge FIXA: sempre vai para o mesmo próximo nó
grafo.add_edge("analisar", "formatar")  # sempre

# Edge CONDICIONAL: função decide o próximo nó
def decidir_rota(state: AgentState) -> str:
    if state["aprovado"]:
        return "finalizar"
    elif state["iteracoes"] >= 3:
        return "escalar"
    else:
        return "revisar"

grafo.add_conditional_edges(
    "avaliar",          # nó de origem
    decidir_rota,       # função roteadora
    {                   # mapeamento string → nó
        "finalizar": "finalizar",
        "escalar": "escalar",
        "revisar": "revisar"
    }
)

# Múltiplos destinos possíveis do mesmo nó
grafo.add_node("avaliar", avaliar_resultado)
grafo.add_node("finalizar", gerar_output)
grafo.add_node("escalar", notificar_humano)
grafo.add_node("revisar", melhorar_resposta)

📊 Por que edges condicionais são poderosas

  • Branching baseado em dados: o fluxo muda com base no estado real, não em lógica hardcoded no nó
  • Separação de concerns: o nó faz a análise, a edge faz o roteamento — responsabilidades claras
  • Testabilidade: a função roteadora pode ser testada unitariamente sem executar o grafo inteiro
  • Visualização: LangGraph pode gerar diagrama visual do grafo incluindo todos os caminhos possíveis
3

⚙️ StateGraph na prática: código completo

Veja um agente de análise completo construído do zero. O padrão define estado → cria nós → conecta edges → compila → executa é sempre o mesmo, independente da complexidade do sistema.

🏗️ Agente de análise completo

from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict

# 1. Define o estado
class AnaliseState(TypedDict):
    pergunta: str
    rascunho: str
    critica: str
    resposta_final: str
    tentativas: int

llm = ChatOpenAI(model="gpt-4o-mini")

# 2. Define os nós
def gerar_rascunho(state: AnaliseState):
    resposta = llm.invoke(f"Analise: {state['pergunta']}")
    return {"rascunho": resposta.content, "tentativas": state.get("tentativas", 0) + 1}

def criticar(state: AnaliseState):
    prompt = f"Critique esta análise: {state['rascunho']}"
    critica = llm.invoke(prompt)
    return {"critica": critica.content}

def refinar(state: AnaliseState):
    prompt = f"Melhore com base na crítica: {state['rascunho']}\nCrítica: {state['critica']}"
    melhorado = llm.invoke(prompt)
    return {"rascunho": melhorado.content}

def finalizar(state: AnaliseState):
    return {"resposta_final": state["rascunho"]}

# 3. Lógica de roteamento
def checar_qualidade(state: AnaliseState) -> str:
    if "APROVADO" in state["critica"] or state["tentativas"] >= 3:
        return "finalizar"
    return "refinar"

# 4. Monta o grafo
grafo = StateGraph(AnaliseState)
grafo.add_node("gerar", gerar_rascunho)
grafo.add_node("criticar", criticar)
grafo.add_node("refinar", refinar)
grafo.add_node("finalizar", finalizar)

grafo.add_edge(START, "gerar")
grafo.add_edge("gerar", "criticar")
grafo.add_conditional_edges("criticar", checar_qualidade, {
    "finalizar": "finalizar",
    "refinar": "refinar"
})
grafo.add_edge("refinar", "criticar")  # loop!
grafo.add_edge("finalizar", END)

# 5. Compila e executa
app = grafo.compile()
resultado = app.invoke({"pergunta": "Qual o impacto do LangGraph em produção?"})

💡 Visualize o grafo antes de executar

# Gera imagem PNG do grafo
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())

# Ou imprime em formato texto
print(app.get_graph().draw_ascii())
4

🔁 Ciclos e loops controlados

A capacidade de retornar para um nó anterior é o que torna LangGraph fundamentalmente diferente de pipelines lineares. Loops controlados permitem retry automático, reflexão iterativa e aprovação humana antes de continuar.

A

Loop de reflexão automática

Agente gera → critica → melhora → repete até qualidade ou limite

# Contador no estado previne loop infinito
def checar_loop(state) -> str:
    if state["qualidade"] > 0.8:
        return END
    if state["tentativas"] >= 5:
        return "escalar"  # fallback
    return "melhorar"     # volta
B

Human-in-the-loop com interrupt

Pausa o grafo, espera aprovação humana, retoma do mesmo ponto

# Compila com ponto de interrupção
app = grafo.compile(
    interrupt_before=["acao_critica"]
)

# Executa até o interrupt
estado = app.invoke(input_inicial, config={"thread_id": "1"})

# Humano aprova e retoma
app.update_state(config, {"aprovado": True})
resultado = app.invoke(None, config=config)

⚠️ Atenção: loops infinitos

Todo loop deve ter um critério de saída garantido. Use um contador de iterações no estado e defina um limite máximo. Um agente preso em loop infinito consome tokens sem parar — e a conta chega no final do mês.

5

💾 Checkpoints e persistência

LangGraph salva o estado após cada nó executado usando um checkpointer. Isso permite retomar workflows interrompidos, auditar cada passo da execução e implementar human-in-the-loop sem perder progresso.

💽 MemorySaver e PostgresSaver

# Desenvolvimento: memória (não persiste entre reinicializações)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = grafo.compile(checkpointer=checkpointer)

# Produção: PostgreSQL (persiste entre restarts)
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
    app = grafo.compile(checkpointer=checkpointer)

# thread_id identifica a sessão — SEMPRE use um
config = {"configurable": {"thread_id": "usuario_123_sessao_456"}}

# Cada invocação com o mesmo thread_id retoma do ponto anterior
resultado1 = app.invoke({"mensagem": "olá"}, config=config)
resultado2 = app.invoke({"mensagem": "continue"}, config=config)

# Inspeciona o estado salvo
estado_atual = app.get_state(config)
print(estado_atual.values)   # dados atuais
print(estado_atual.next)     # próximo nó a executar

# Histórico completo de todos os checkpoints
for snapshot in app.get_state_history(config):
    print(snapshot.config, snapshot.values)

MemorySaver

  • • Ideal para desenvolvimento e testes
  • • Estado em RAM — rápido
  • • Perde tudo ao reiniciar o processo
  • • Sem dependência de banco de dados

PostgresSaver / SQLiteSaver

  • • Produção com resiliência real
  • • Sobrevive a restarts
  • • Permite auditoria histórica
  • • Suporte a multi-thread_id concorrente
6

📡 Streaming de resultados

Com graph.stream(), você recebe eventos em tempo real a cada nó executado. Isso transforma a UX: o usuário vê o sistema processando ao invés de esperar uma resposta que demora 30 segundos.

📡 Modos de streaming

# stream_mode="values": estado completo após cada nó
for estado in app.stream(input, stream_mode="values"):
    print(f"Estado: {estado}")

# stream_mode="updates": apenas o diff (mais eficiente)
for chunk in app.stream(input, stream_mode="updates"):
    no, atualizacao = list(chunk.items())[0]
    print(f"Nó '{no}' atualizou: {atualizacao}")

# stream_mode="debug": tudo, incluindo metadados internos
for evento in app.stream(input, stream_mode="debug"):
    print(evento)

# Streaming de tokens do LLM dentro dos nós
async for evento in app.astream_events(input, version="v2"):
    if evento["event"] == "on_chat_model_stream":
        token = evento["data"]["chunk"].content
        print(token, end="", flush=True)

# Integração com FastAPI SSE para UI em tempo real
from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app_api = FastAPI()

@app_api.get("/executar")
async def executar_agente(pergunta: str):
    async def gerar():
        async for chunk in app.astream({"pergunta": pergunta}):
            yield f"data: {str(chunk)}\n\n"
    return StreamingResponse(gerar(), media_type="text/event-stream")

💡 Use astream_events para UI rica

O método astream_events(version="v2") emite eventos granulares: início de nó, fim de nó, tokens de LLM, chamadas de tool e muito mais. Com esses eventos, você pode construir uma UI que mostra exatamente o que o agente está fazendo em cada momento.

🎯 Resumo do Módulo

StateGraph — grafo direcionado onde nós são funções Python e estado é um TypedDict compartilhado
Edges fixas e condicionais — definem o fluxo de execução com add_edge() e add_conditional_edges()
Ciclos controlados — loops de retry e reflexão com critério de saída obrigatório
Checkpoints — MemorySaver para dev, PostgresSaver para produção, thread_id para sessões
Streaming — graph.stream() e astream_events() para UX responsiva em tempo real

Próximo Módulo:

3.2 — 👥 CrewAI na Prática: times de agentes com papéis definidos, tasks e análise de mercado