🌐 Por que Estado Distribuído
O erro mais comum em sistemas multi-agente de primeira geração: guardar estado em variáveis Python no processo do orquestrador. Funciona em desenvolvimento local. Quebra imediatamente em produção com mais de um worker.
Cenário de falha com estado local
self.completed = {"T01"} em memória local
self.completed = {}
A regra fundamental do estado distribuído
Todo estado que precisa ser lido por mais de um processo deve residir fora de qualquer processo. Redis e Postgres são a implementação padrão:
Redis (efêmero, rápido)
Estado de execução atual: status de tarefa, fila, heartbeats, locks. Sub-milissegundo. TTL automático. Perde dados no restart — isso é OK para estado de execução.
Postgres (permanente)
Histórico de execuções, audit log, configurações. Durável. ACID. Sobrevive a qualquer restart. Mais lento que Redis — use para dados que precisam existir para sempre.
⚡ Redis para Estado de Execução
Redis oferece as estruturas de dados perfeitas para orquestração de agentes. Cada estrutura tem seu papel específico no sistema.
import redis
import json
import time
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
DONE = "done"
FAILED = "failed"
CANCELLED = "cancelled"
class RedisStateManager:
"""Gerencia estado de execução usando Redis."""
def __init__(self, session_id: str, redis_url: str = "redis://localhost:6379"):
self.r = redis.from_url(redis_url, decode_responses=True)
self.session_id = session_id
self.prefix = f"orch:{session_id}"
# ── Estado de tarefa (Hash) ──────────────────────────────────────────
def set_task_status(self, task_id: str, status: TaskStatus,
result: dict = None, error: str = None):
key = f"{self.prefix}:task:{task_id}"
data = {
"status": status.value,
"updated_at": str(time.time()),
}
if result:
data["result"] = json.dumps(result)
if error:
data["error"] = error
self.r.hset(key, mapping=data)
self.r.expire(key, 86400) # TTL: 24 horas
def get_task_status(self, task_id: str) -> dict:
key = f"{self.prefix}:task:{task_id}"
return self.r.hgetall(key)
# ── Fila de prioridade (Sorted Set) ─────────────────────────────────
def enqueue_task(self, task_id: str, priority: float):
key = f"{self.prefix}:queue"
self.r.zadd(key, {task_id: priority})
def dequeue_task(self) -> str | None:
key = f"{self.prefix}:queue"
# Pop o elemento com maior score (prioridade)
result = self.r.zpopmax(key)
return result[0][0] if result else None
# ── Agentes disponíveis (Set) ────────────────────────────────────────
def mark_agent_busy(self, agent_id: str):
self.r.srem(f"{self.prefix}:agents:available", agent_id)
self.r.sadd(f"{self.prefix}:agents:busy", agent_id)
def mark_agent_available(self, agent_id: str):
self.r.srem(f"{self.prefix}:agents:busy", agent_id)
self.r.sadd(f"{self.prefix}:agents:available", agent_id)
# ── Heartbeat de agente ──────────────────────────────────────────────
def agent_heartbeat(self, agent_id: str, ttl_sec: int = 30):
key = f"{self.prefix}:heartbeat:{agent_id}"
self.r.setex(key, ttl_sec, str(time.time()))
def is_agent_alive(self, agent_id: str) -> bool:
key = f"{self.prefix}:heartbeat:{agent_id}"
return self.r.exists(key) > 0
# ── Eventos de execução (Stream) ────────────────────────────────────
def emit_event(self, event_type: str, data: dict):
key = f"{self.prefix}:events"
self.r.xadd(key, {
"type": event_type,
"data": json.dumps(data),
"ts": str(time.time())
}, maxlen=1000) # Mantém apenas últimos 1000 eventos
def get_events(self, count: int = 50) -> list[dict]:
key = f"{self.prefix}:events"
entries = self.r.xrevrange(key, count=count)
return [{"id": e[0], **e[1]} for e in entries]
🐘 Postgres para Persistência
O que vai para o Postgres: tudo que precisa sobreviver ao restart do sistema, ser auditável por reguladores, ou ser consultado semanas depois por análise histórica.
Vai para Redis
- • Status atual de cada tarefa (pending/running/done)
- • Fila de prioridade de tarefas prontas
- • Heartbeats dos agentes
- • Locks de execução (SETNX)
- • Cache de resultados intermediários
- • Eventos em tempo real (Stream)
Vai para Postgres
- • Histórico completo de execuções
- • Audit log imutável (append-only)
- • Resultados finais de cada sessão
- • Configurações do orquestrador
- • Métricas históricas de custo e performance
- • Metadados de agentes e capabilities
Schema Postgres para orquestrador
-- Schema SQL para persistência do orquestrador
CREATE TABLE orchestration_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id VARCHAR(64) UNIQUE NOT NULL,
objective TEXT NOT NULL,
status VARCHAR(20) DEFAULT 'running',
cost_usd DECIMAL(10,4) DEFAULT 0,
started_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ,
metadata JSONB DEFAULT '{}'
);
CREATE TABLE task_executions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id VARCHAR(64) REFERENCES orchestration_sessions(session_id),
task_id VARCHAR(32) NOT NULL,
agent_id VARCHAR(64),
model VARCHAR(64),
status VARCHAR(20),
input_data JSONB,
output_data JSONB,
tokens_in INTEGER DEFAULT 0,
tokens_out INTEGER DEFAULT 0,
cost_usd DECIMAL(10,6) DEFAULT 0,
started_at TIMESTAMPTZ DEFAULT NOW(),
completed_at TIMESTAMPTZ,
error_msg TEXT,
UNIQUE(session_id, task_id)
);
-- Audit log append-only (NUNCA deletar registros)
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
session_id VARCHAR(64),
event_type VARCHAR(64) NOT NULL,
actor VARCHAR(64), -- agent_id ou 'human'
action TEXT NOT NULL,
details JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
-- Index para queries frequentes
CREATE INDEX idx_task_exec_session ON task_executions(session_id);
CREATE INDEX idx_audit_session ON audit_log(session_id);
CREATE INDEX idx_audit_created ON audit_log(created_at);
🔑 Session Management
Cada execução do orquestrador tem um session_id único. Todo estado, log e resultado fica associado a ele. TTL automático garante limpeza de sessões abandonadas.
import uuid
import asyncpg
from datetime import datetime, timezone
class SessionManager:
"""Gerencia ciclo de vida de sessões de orquestração."""
def __init__(self, redis_state: RedisStateManager, db_pool: asyncpg.Pool):
self.redis = redis_state
self.db = db_pool
@classmethod
def new_session_id(cls) -> str:
"""Gera session_id único e legível."""
return f"sess_{uuid.uuid4().hex[:16]}"
async def create_session(self, objective: str, metadata: dict = None) -> str:
"""Cria nova sessão no Redis (efêmero) e Postgres (permanente)."""
session_id = self.new_session_id()
# Redis: estado de execução com TTL de 4 horas
pipe = self.redis.r.pipeline()
pipe.hset(f"orch:{session_id}:meta", mapping={
"objective": objective,
"status": "running",
"created_at": str(datetime.now(timezone.utc).timestamp())
})
pipe.expire(f"orch:{session_id}:meta", 14400) # 4 horas
pipe.execute()
# Postgres: registro permanente
await self.db.execute("""
INSERT INTO orchestration_sessions (session_id, objective, metadata)
VALUES ($1, $2, $3)
""", session_id, objective, metadata or {})
return session_id
async def complete_session(self, session_id: str, result: dict, cost: float):
"""Finaliza sessão: salva resultado no Postgres."""
await self.db.execute("""
UPDATE orchestration_sessions
SET status = 'completed',
cost_usd = $2,
completed_at = NOW(),
metadata = metadata || $3::jsonb
WHERE session_id = $1
""", session_id, cost, {"final_result": result})
# Limpa estado Redis (ou deixa expirar pelo TTL)
self.redis.r.delete(f"orch:{session_id}:queue")
async def list_active_sessions(self) -> list[dict]:
"""Lista sessões ativas no Postgres."""
rows = await self.db.fetch("""
SELECT session_id, objective, status, cost_usd, started_at
FROM orchestration_sessions
WHERE status = 'running'
ORDER BY started_at DESC
LIMIT 50
""")
return [dict(row) for row in rows]
async def cleanup_orphan_sessions(self, max_age_hours: int = 12):
"""Remove sessões que ficaram em 'running' por muito tempo (processo morreu)."""
await self.db.execute("""
UPDATE orchestration_sessions
SET status = 'abandoned'
WHERE status = 'running'
AND started_at < NOW() - INTERVAL '$1 hours'
""", max_age_hours)
🧠 Context Window Management
O maior consumidor de tokens em sistemas agentic não é o trabalho real — é o contexto injetado. O orquestrador deve injetar apenas o contexto relevante para cada agente, não o histórico completo da sessão.
O problema do contexto ingênuo
Se uma sessão gera 50 mensagens de histórico durante 15 tarefas, e você injeta todo o histórico para cada agente, o custo se multiplica: 50 mensagens × 15 agentes = cada agente vê 750 mensagens de contexto desnecessário. Custo extra: 5-10x.
class ContextWindowManager:
"""Gerencia qual contexto injetar em cada agente."""
MAX_CONTEXT_TOKENS = 4000 # Budget de contexto por agente
def build_context_for_task(self,
task: dict,
session_history: list[dict],
completed_tasks: dict) -> list[dict]:
"""
Constrói contexto mínimo relevante para uma tarefa específica.
Retorna lista de mensagens para injetar no agente.
"""
messages = []
# 1. Contexto do objetivo (sempre incluir)
messages.append({
"role": "user",
"content": f"Objetivo da sessão: {session_history[0]['objective']}"
})
# 2. Resultados das dependências diretas (sempre incluir)
for dep_id in task.get("dependencias", []):
if dep_id in completed_tasks:
result = completed_tasks[dep_id]
messages.append({
"role": "user",
"content": f"Resultado de {dep_id}: {self._summarize_if_needed(result)}"
})
# 3. Contexto adicional por relevância semântica (opcional)
# Se ainda há budget de tokens, adicionar resultados relacionados
current_tokens = self._estimate_tokens(messages)
if current_tokens < self.MAX_CONTEXT_TOKENS * 0.7:
for task_id, result in completed_tasks.items():
if task_id not in task.get("dependencias", []):
relevance = self._estimate_relevance(task, result)
if relevance > 0.7:
summary = self._summarize_if_needed(result, max_chars=500)
messages.append({
"role": "user",
"content": f"Contexto relacionado ({task_id}): {summary}"
})
current_tokens += self._estimate_tokens([messages[-1]])
if current_tokens > self.MAX_CONTEXT_TOKENS * 0.9:
break
return messages
def _summarize_if_needed(self, result: dict, max_chars: int = 2000) -> str:
"""Sumariza resultado se for muito longo."""
text = result.get("output", "")
if len(text) <= max_chars:
return text
# Retorna início + ... + fim
half = max_chars // 2
return f"{text[:half]}...[{len(text)-max_chars} chars omitidos]...{text[-half:]}"
def _estimate_tokens(self, messages: list) -> int:
"""Estimativa rápida de tokens (1 token ≈ 4 chars)."""
total_chars = sum(len(str(m)) for m in messages)
return total_chars // 4
def _estimate_relevance(self, task: dict, result: dict) -> float:
"""Relevância simples por overlap de palavras-chave."""
task_words = set(task["descricao"].lower().split())
result_words = set(str(result).lower().split())
intersection = task_words & result_words
return len(intersection) / max(len(task_words), 1)
🔄 Recuperação após Falha
O processo do orquestrador morreu no meio de uma execução de $8 em tokens. Como retomar exatamente do ponto onde parou, sem reiniciar do zero e sem duplicar trabalho já feito?
O protocolo de recuperação
class OrchestratorRecovery:
"""Lógica de recuperação após crash do orquestrador."""
def __init__(self, redis: RedisStateManager, db: asyncpg.Pool):
self.redis = redis
self.db = db
async def detect_and_resume(self, session_id: str) -> dict:
"""Detecta se sessão pode ser retomada e carrega estado."""
# 1. Verificar status da sessão no Postgres
session = await self.db.fetchrow("""
SELECT * FROM orchestration_sessions WHERE session_id = $1
""", session_id)
if not session or session["status"] not in ("running", "abandoned"):
return {"can_resume": False, "reason": "Sessão não está em andamento"}
# 2. Carregar tarefas já concluídas (Postgres é fonte de verdade)
done_tasks = await self.db.fetch("""
SELECT task_id, output_data, cost_usd
FROM task_executions
WHERE session_id = $1 AND status = 'done'
""", session_id)
completed = {r["task_id"]: dict(r) for r in done_tasks}
# 3. Identificar tarefas que estavam "running" (perdidas no crash)
running_tasks = await self.db.fetch("""
SELECT task_id FROM task_executions
WHERE session_id = $1 AND status = 'running'
""", session_id)
# Marcar como FAILED para reexecução
for row in running_tasks:
await self.db.execute("""
UPDATE task_executions SET status = 'failed',
error_msg = 'Processo encerrado inesperadamente — reexecutar'
WHERE session_id = $1 AND task_id = $2
""", session_id, row["task_id"])
cost_so_far = sum(r["cost_usd"] for r in done_tasks)
return {
"can_resume": True,
"session_id": session_id,
"completed_tasks": completed,
"tasks_to_retry": [r["task_id"] for r in running_tasks],
"cost_saved_usd": cost_so_far,
}