🗺️ O que é um grafo de estado
LangGraph representa o fluxo de um agente como um grafo direcionado, onde cada nó é uma função Python que recebe o estado atual e retorna uma versão atualizada. Diferente de pipelines lineares, o grafo pode ter branches, loops e pontos de decisão — exatamente como a lógica de um ser humano resolvendo um problema complexo.
🧩 Anatomia do grafo de estado
Três componentes essenciais formam qualquer LangGraph: o Estado (TypedDict com os dados compartilhados), os Nós (funções que processam o estado) e as Edges (conexões que definem o fluxo).
# 1. Estado: o coração do grafo
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
import operator
class AgentState(TypedDict):
mensagem: str
historico: Annotated[list, operator.add] # acumula
iteracoes: int
aprovado: bool
# 2. Nós: funções que transformam o estado
def processar(state: AgentState) -> AgentState:
return {"iteracoes": state["iteracoes"] + 1}
# 3. Grafo: conecta tudo
grafo = StateGraph(AgentState)
grafo.add_node("processar", processar)
grafo.add_edge(START, "processar")
grafo.add_edge("processar", END)
Estado imutável por convenção
- ✓Retorne um novo dict com campos atualizados
- ✓Use Annotated[list, operator.add] para acumular listas
- ✓Campos não retornados mantêm valor anterior
- ✓TypedDict garante type safety em toda a execução
Armadilhas comuns
- ✗Não mutate o estado diretamente (state["x"] = ...)
- ✗Não esqueça de retornar campos que quer atualizar
- ✗Evite estado muito grande — impacta serialização
- ✗Não use tipos não serializáveis no estado
💡 Dica Prática
Projete o estado primeiro, antes de qualquer nó. Pense: quais dados precisam fluir pelo sistema? Quais precisam ser acumulados? O TypedDict do estado é o contrato de dados de todo o grafo.
🔗 Nós e edges: blocos de construção
Nós são funções Python puras — recebem o estado, fazem alguma coisa (chamar LLM, executar tool, tomar decisão) e retornam uma atualização parcial do estado. Edges são as conexões: fixas (sempre vão para o mesmo nó) ou condicionais (escolhem o próximo nó com base no estado atual).
⚙️ Edges fixas vs. condicionais
# Edge FIXA: sempre vai para o mesmo próximo nó
grafo.add_edge("analisar", "formatar") # sempre
# Edge CONDICIONAL: função decide o próximo nó
def decidir_rota(state: AgentState) -> str:
if state["aprovado"]:
return "finalizar"
elif state["iteracoes"] >= 3:
return "escalar"
else:
return "revisar"
grafo.add_conditional_edges(
"avaliar", # nó de origem
decidir_rota, # função roteadora
{ # mapeamento string → nó
"finalizar": "finalizar",
"escalar": "escalar",
"revisar": "revisar"
}
)
# Múltiplos destinos possíveis do mesmo nó
grafo.add_node("avaliar", avaliar_resultado)
grafo.add_node("finalizar", gerar_output)
grafo.add_node("escalar", notificar_humano)
grafo.add_node("revisar", melhorar_resposta)
📊 Por que edges condicionais são poderosas
- •Branching baseado em dados: o fluxo muda com base no estado real, não em lógica hardcoded no nó
- •Separação de concerns: o nó faz a análise, a edge faz o roteamento — responsabilidades claras
- •Testabilidade: a função roteadora pode ser testada unitariamente sem executar o grafo inteiro
- •Visualização: LangGraph pode gerar diagrama visual do grafo incluindo todos os caminhos possíveis
⚙️ StateGraph na prática: código completo
Veja um agente de análise completo construído do zero. O padrão define estado → cria nós → conecta edges → compila → executa é sempre o mesmo, independente da complexidade do sistema.
🏗️ Agente de análise completo
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from typing import TypedDict
# 1. Define o estado
class AnaliseState(TypedDict):
pergunta: str
rascunho: str
critica: str
resposta_final: str
tentativas: int
llm = ChatOpenAI(model="gpt-4o-mini")
# 2. Define os nós
def gerar_rascunho(state: AnaliseState):
resposta = llm.invoke(f"Analise: {state['pergunta']}")
return {"rascunho": resposta.content, "tentativas": state.get("tentativas", 0) + 1}
def criticar(state: AnaliseState):
prompt = f"Critique esta análise: {state['rascunho']}"
critica = llm.invoke(prompt)
return {"critica": critica.content}
def refinar(state: AnaliseState):
prompt = f"Melhore com base na crítica: {state['rascunho']}\nCrítica: {state['critica']}"
melhorado = llm.invoke(prompt)
return {"rascunho": melhorado.content}
def finalizar(state: AnaliseState):
return {"resposta_final": state["rascunho"]}
# 3. Lógica de roteamento
def checar_qualidade(state: AnaliseState) -> str:
if "APROVADO" in state["critica"] or state["tentativas"] >= 3:
return "finalizar"
return "refinar"
# 4. Monta o grafo
grafo = StateGraph(AnaliseState)
grafo.add_node("gerar", gerar_rascunho)
grafo.add_node("criticar", criticar)
grafo.add_node("refinar", refinar)
grafo.add_node("finalizar", finalizar)
grafo.add_edge(START, "gerar")
grafo.add_edge("gerar", "criticar")
grafo.add_conditional_edges("criticar", checar_qualidade, {
"finalizar": "finalizar",
"refinar": "refinar"
})
grafo.add_edge("refinar", "criticar") # loop!
grafo.add_edge("finalizar", END)
# 5. Compila e executa
app = grafo.compile()
resultado = app.invoke({"pergunta": "Qual o impacto do LangGraph em produção?"})
💡 Visualize o grafo antes de executar
# Gera imagem PNG do grafo
from IPython.display import Image
Image(app.get_graph().draw_mermaid_png())
# Ou imprime em formato texto
print(app.get_graph().draw_ascii())
🔁 Ciclos e loops controlados
A capacidade de retornar para um nó anterior é o que torna LangGraph fundamentalmente diferente de pipelines lineares. Loops controlados permitem retry automático, reflexão iterativa e aprovação humana antes de continuar.
Loop de reflexão automática
Agente gera → critica → melhora → repete até qualidade ou limite
# Contador no estado previne loop infinito
def checar_loop(state) -> str:
if state["qualidade"] > 0.8:
return END
if state["tentativas"] >= 5:
return "escalar" # fallback
return "melhorar" # volta
Human-in-the-loop com interrupt
Pausa o grafo, espera aprovação humana, retoma do mesmo ponto
# Compila com ponto de interrupção
app = grafo.compile(
interrupt_before=["acao_critica"]
)
# Executa até o interrupt
estado = app.invoke(input_inicial, config={"thread_id": "1"})
# Humano aprova e retoma
app.update_state(config, {"aprovado": True})
resultado = app.invoke(None, config=config)
⚠️ Atenção: loops infinitos
Todo loop deve ter um critério de saída garantido. Use um contador de iterações no estado e defina um limite máximo. Um agente preso em loop infinito consome tokens sem parar — e a conta chega no final do mês.
💾 Checkpoints e persistência
LangGraph salva o estado após cada nó executado usando um checkpointer. Isso permite retomar workflows interrompidos, auditar cada passo da execução e implementar human-in-the-loop sem perder progresso.
💽 MemorySaver e PostgresSaver
# Desenvolvimento: memória (não persiste entre reinicializações)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = grafo.compile(checkpointer=checkpointer)
# Produção: PostgreSQL (persiste entre restarts)
from langgraph.checkpoint.postgres import PostgresSaver
with PostgresSaver.from_conn_string(DATABASE_URL) as checkpointer:
app = grafo.compile(checkpointer=checkpointer)
# thread_id identifica a sessão — SEMPRE use um
config = {"configurable": {"thread_id": "usuario_123_sessao_456"}}
# Cada invocação com o mesmo thread_id retoma do ponto anterior
resultado1 = app.invoke({"mensagem": "olá"}, config=config)
resultado2 = app.invoke({"mensagem": "continue"}, config=config)
# Inspeciona o estado salvo
estado_atual = app.get_state(config)
print(estado_atual.values) # dados atuais
print(estado_atual.next) # próximo nó a executar
# Histórico completo de todos os checkpoints
for snapshot in app.get_state_history(config):
print(snapshot.config, snapshot.values)
MemorySaver
- • Ideal para desenvolvimento e testes
- • Estado em RAM — rápido
- • Perde tudo ao reiniciar o processo
- • Sem dependência de banco de dados
PostgresSaver / SQLiteSaver
- • Produção com resiliência real
- • Sobrevive a restarts
- • Permite auditoria histórica
- • Suporte a multi-thread_id concorrente
📡 Streaming de resultados
Com graph.stream(), você recebe eventos em tempo real a cada nó executado. Isso transforma a UX: o usuário vê o sistema processando ao invés de esperar uma resposta que demora 30 segundos.
📡 Modos de streaming
# stream_mode="values": estado completo após cada nó
for estado in app.stream(input, stream_mode="values"):
print(f"Estado: {estado}")
# stream_mode="updates": apenas o diff (mais eficiente)
for chunk in app.stream(input, stream_mode="updates"):
no, atualizacao = list(chunk.items())[0]
print(f"Nó '{no}' atualizou: {atualizacao}")
# stream_mode="debug": tudo, incluindo metadados internos
for evento in app.stream(input, stream_mode="debug"):
print(evento)
# Streaming de tokens do LLM dentro dos nós
async for evento in app.astream_events(input, version="v2"):
if evento["event"] == "on_chat_model_stream":
token = evento["data"]["chunk"].content
print(token, end="", flush=True)
# Integração com FastAPI SSE para UI em tempo real
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app_api = FastAPI()
@app_api.get("/executar")
async def executar_agente(pergunta: str):
async def gerar():
async for chunk in app.astream({"pergunta": pergunta}):
yield f"data: {str(chunk)}\n\n"
return StreamingResponse(gerar(), media_type="text/event-stream")
💡 Use astream_events para UI rica
O método astream_events(version="v2") emite eventos granulares: início de nó, fim de nó, tokens de LLM, chamadas de tool e muito mais. Com esses eventos, você pode construir uma UI que mostra exatamente o que o agente está fazendo em cada momento.
🎯 Resumo do Módulo
Próximo Módulo:
3.2 — 👥 CrewAI na Prática: times de agentes com papéis definidos, tasks e análise de mercado