MÓDULO 5.3

🗄️ Camada 3 — Controle de Estado

Múltiplos agentes paralelos precisam de um único estado compartilhado. Redis para velocidade de execução, Postgres para durabilidade — e a lógica que mantém tudo consistente mesmo quando processos morrem.

6
Tópicos
50
Minutos
Avançado
Nível
Código
Tipo
1

🌐 Por que Estado Distribuído

O erro mais comum em sistemas multi-agente de primeira geração: guardar estado em variáveis Python no processo do orquestrador. Funciona em desenvolvimento local. Quebra imediatamente em produção com mais de um worker.

Cenário de falha com estado local

1. Worker A executa tarefa T01, atualiza self.completed = {"T01"} em memória local
2. Worker B executa tarefa T02 ao mesmo tempo, tem seu próprio self.completed = {}
3. Worker B verifica se T01 está completa para desbloquear T05 — não sabe, não está no seu estado local
4. Resultado: T05 nunca é executada, workflow trava silenciosamente. Race condition não reproduzível em dev.

A regra fundamental do estado distribuído

Todo estado que precisa ser lido por mais de um processo deve residir fora de qualquer processo. Redis e Postgres são a implementação padrão:

Redis (efêmero, rápido)

Estado de execução atual: status de tarefa, fila, heartbeats, locks. Sub-milissegundo. TTL automático. Perde dados no restart — isso é OK para estado de execução.

Postgres (permanente)

Histórico de execuções, audit log, configurações. Durável. ACID. Sobrevive a qualquer restart. Mais lento que Redis — use para dados que precisam existir para sempre.

2

⚡ Redis para Estado de Execução

Redis oferece as estruturas de dados perfeitas para orquestração de agentes. Cada estrutura tem seu papel específico no sistema.

import redis
import json
import time
from enum import Enum

class TaskStatus(Enum):
    PENDING   = "pending"
    RUNNING   = "running"
    DONE      = "done"
    FAILED    = "failed"
    CANCELLED = "cancelled"

class RedisStateManager:
    """Gerencia estado de execução usando Redis."""

    def __init__(self, session_id: str, redis_url: str = "redis://localhost:6379"):
        self.r = redis.from_url(redis_url, decode_responses=True)
        self.session_id = session_id
        self.prefix = f"orch:{session_id}"

    # ── Estado de tarefa (Hash) ──────────────────────────────────────────
    def set_task_status(self, task_id: str, status: TaskStatus,
                        result: dict = None, error: str = None):
        key = f"{self.prefix}:task:{task_id}"
        data = {
            "status": status.value,
            "updated_at": str(time.time()),
        }
        if result:
            data["result"] = json.dumps(result)
        if error:
            data["error"] = error

        self.r.hset(key, mapping=data)
        self.r.expire(key, 86400)  # TTL: 24 horas

    def get_task_status(self, task_id: str) -> dict:
        key = f"{self.prefix}:task:{task_id}"
        return self.r.hgetall(key)

    # ── Fila de prioridade (Sorted Set) ─────────────────────────────────
    def enqueue_task(self, task_id: str, priority: float):
        key = f"{self.prefix}:queue"
        self.r.zadd(key, {task_id: priority})

    def dequeue_task(self) -> str | None:
        key = f"{self.prefix}:queue"
        # Pop o elemento com maior score (prioridade)
        result = self.r.zpopmax(key)
        return result[0][0] if result else None

    # ── Agentes disponíveis (Set) ────────────────────────────────────────
    def mark_agent_busy(self, agent_id: str):
        self.r.srem(f"{self.prefix}:agents:available", agent_id)
        self.r.sadd(f"{self.prefix}:agents:busy", agent_id)

    def mark_agent_available(self, agent_id: str):
        self.r.srem(f"{self.prefix}:agents:busy", agent_id)
        self.r.sadd(f"{self.prefix}:agents:available", agent_id)

    # ── Heartbeat de agente ──────────────────────────────────────────────
    def agent_heartbeat(self, agent_id: str, ttl_sec: int = 30):
        key = f"{self.prefix}:heartbeat:{agent_id}"
        self.r.setex(key, ttl_sec, str(time.time()))

    def is_agent_alive(self, agent_id: str) -> bool:
        key = f"{self.prefix}:heartbeat:{agent_id}"
        return self.r.exists(key) > 0

    # ── Eventos de execução (Stream) ────────────────────────────────────
    def emit_event(self, event_type: str, data: dict):
        key = f"{self.prefix}:events"
        self.r.xadd(key, {
            "type": event_type,
            "data": json.dumps(data),
            "ts": str(time.time())
        }, maxlen=1000)  # Mantém apenas últimos 1000 eventos

    def get_events(self, count: int = 50) -> list[dict]:
        key = f"{self.prefix}:events"
        entries = self.r.xrevrange(key, count=count)
        return [{"id": e[0], **e[1]} for e in entries]
3

🐘 Postgres para Persistência

O que vai para o Postgres: tudo que precisa sobreviver ao restart do sistema, ser auditável por reguladores, ou ser consultado semanas depois por análise histórica.

Vai para Redis

  • • Status atual de cada tarefa (pending/running/done)
  • • Fila de prioridade de tarefas prontas
  • • Heartbeats dos agentes
  • • Locks de execução (SETNX)
  • • Cache de resultados intermediários
  • • Eventos em tempo real (Stream)

Vai para Postgres

  • • Histórico completo de execuções
  • • Audit log imutável (append-only)
  • • Resultados finais de cada sessão
  • • Configurações do orquestrador
  • • Métricas históricas de custo e performance
  • • Metadados de agentes e capabilities

Schema Postgres para orquestrador

-- Schema SQL para persistência do orquestrador
CREATE TABLE orchestration_sessions (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id  VARCHAR(64) UNIQUE NOT NULL,
    objective   TEXT NOT NULL,
    status      VARCHAR(20) DEFAULT 'running',
    cost_usd    DECIMAL(10,4) DEFAULT 0,
    started_at  TIMESTAMPTZ DEFAULT NOW(),
    completed_at TIMESTAMPTZ,
    metadata    JSONB DEFAULT '{}'
);

CREATE TABLE task_executions (
    id          UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    session_id  VARCHAR(64) REFERENCES orchestration_sessions(session_id),
    task_id     VARCHAR(32) NOT NULL,
    agent_id    VARCHAR(64),
    model       VARCHAR(64),
    status      VARCHAR(20),
    input_data  JSONB,
    output_data JSONB,
    tokens_in   INTEGER DEFAULT 0,
    tokens_out  INTEGER DEFAULT 0,
    cost_usd    DECIMAL(10,6) DEFAULT 0,
    started_at  TIMESTAMPTZ DEFAULT NOW(),
    completed_at TIMESTAMPTZ,
    error_msg   TEXT,
    UNIQUE(session_id, task_id)
);

-- Audit log append-only (NUNCA deletar registros)
CREATE TABLE audit_log (
    id          BIGSERIAL PRIMARY KEY,
    session_id  VARCHAR(64),
    event_type  VARCHAR(64) NOT NULL,
    actor       VARCHAR(64),  -- agent_id ou 'human'
    action      TEXT NOT NULL,
    details     JSONB DEFAULT '{}',
    created_at  TIMESTAMPTZ DEFAULT NOW()
);

-- Index para queries frequentes
CREATE INDEX idx_task_exec_session ON task_executions(session_id);
CREATE INDEX idx_audit_session ON audit_log(session_id);
CREATE INDEX idx_audit_created ON audit_log(created_at);
4

🔑 Session Management

Cada execução do orquestrador tem um session_id único. Todo estado, log e resultado fica associado a ele. TTL automático garante limpeza de sessões abandonadas.

import uuid
import asyncpg
from datetime import datetime, timezone

class SessionManager:
    """Gerencia ciclo de vida de sessões de orquestração."""

    def __init__(self, redis_state: RedisStateManager, db_pool: asyncpg.Pool):
        self.redis = redis_state
        self.db = db_pool

    @classmethod
    def new_session_id(cls) -> str:
        """Gera session_id único e legível."""
        return f"sess_{uuid.uuid4().hex[:16]}"

    async def create_session(self, objective: str, metadata: dict = None) -> str:
        """Cria nova sessão no Redis (efêmero) e Postgres (permanente)."""
        session_id = self.new_session_id()

        # Redis: estado de execução com TTL de 4 horas
        pipe = self.redis.r.pipeline()
        pipe.hset(f"orch:{session_id}:meta", mapping={
            "objective": objective,
            "status": "running",
            "created_at": str(datetime.now(timezone.utc).timestamp())
        })
        pipe.expire(f"orch:{session_id}:meta", 14400)  # 4 horas
        pipe.execute()

        # Postgres: registro permanente
        await self.db.execute("""
            INSERT INTO orchestration_sessions (session_id, objective, metadata)
            VALUES ($1, $2, $3)
        """, session_id, objective, metadata or {})

        return session_id

    async def complete_session(self, session_id: str, result: dict, cost: float):
        """Finaliza sessão: salva resultado no Postgres."""
        await self.db.execute("""
            UPDATE orchestration_sessions
            SET status = 'completed',
                cost_usd = $2,
                completed_at = NOW(),
                metadata = metadata || $3::jsonb
            WHERE session_id = $1
        """, session_id, cost, {"final_result": result})

        # Limpa estado Redis (ou deixa expirar pelo TTL)
        self.redis.r.delete(f"orch:{session_id}:queue")

    async def list_active_sessions(self) -> list[dict]:
        """Lista sessões ativas no Postgres."""
        rows = await self.db.fetch("""
            SELECT session_id, objective, status, cost_usd, started_at
            FROM orchestration_sessions
            WHERE status = 'running'
            ORDER BY started_at DESC
            LIMIT 50
        """)
        return [dict(row) for row in rows]

    async def cleanup_orphan_sessions(self, max_age_hours: int = 12):
        """Remove sessões que ficaram em 'running' por muito tempo (processo morreu)."""
        await self.db.execute("""
            UPDATE orchestration_sessions
            SET status = 'abandoned'
            WHERE status = 'running'
              AND started_at < NOW() - INTERVAL '$1 hours'
        """, max_age_hours)
5

🧠 Context Window Management

O maior consumidor de tokens em sistemas agentic não é o trabalho real — é o contexto injetado. O orquestrador deve injetar apenas o contexto relevante para cada agente, não o histórico completo da sessão.

O problema do contexto ingênuo

Se uma sessão gera 50 mensagens de histórico durante 15 tarefas, e você injeta todo o histórico para cada agente, o custo se multiplica: 50 mensagens × 15 agentes = cada agente vê 750 mensagens de contexto desnecessário. Custo extra: 5-10x.

class ContextWindowManager:
    """Gerencia qual contexto injetar em cada agente."""

    MAX_CONTEXT_TOKENS = 4000  # Budget de contexto por agente

    def build_context_for_task(self,
                                task: dict,
                                session_history: list[dict],
                                completed_tasks: dict) -> list[dict]:
        """
        Constrói contexto mínimo relevante para uma tarefa específica.
        Retorna lista de mensagens para injetar no agente.
        """
        messages = []

        # 1. Contexto do objetivo (sempre incluir)
        messages.append({
            "role": "user",
            "content": f"Objetivo da sessão: {session_history[0]['objective']}"
        })

        # 2. Resultados das dependências diretas (sempre incluir)
        for dep_id in task.get("dependencias", []):
            if dep_id in completed_tasks:
                result = completed_tasks[dep_id]
                messages.append({
                    "role": "user",
                    "content": f"Resultado de {dep_id}: {self._summarize_if_needed(result)}"
                })

        # 3. Contexto adicional por relevância semântica (opcional)
        # Se ainda há budget de tokens, adicionar resultados relacionados
        current_tokens = self._estimate_tokens(messages)
        if current_tokens < self.MAX_CONTEXT_TOKENS * 0.7:
            for task_id, result in completed_tasks.items():
                if task_id not in task.get("dependencias", []):
                    relevance = self._estimate_relevance(task, result)
                    if relevance > 0.7:
                        summary = self._summarize_if_needed(result, max_chars=500)
                        messages.append({
                            "role": "user",
                            "content": f"Contexto relacionado ({task_id}): {summary}"
                        })
                        current_tokens += self._estimate_tokens([messages[-1]])
                        if current_tokens > self.MAX_CONTEXT_TOKENS * 0.9:
                            break

        return messages

    def _summarize_if_needed(self, result: dict, max_chars: int = 2000) -> str:
        """Sumariza resultado se for muito longo."""
        text = result.get("output", "")
        if len(text) <= max_chars:
            return text
        # Retorna início + ... + fim
        half = max_chars // 2
        return f"{text[:half]}...[{len(text)-max_chars} chars omitidos]...{text[-half:]}"

    def _estimate_tokens(self, messages: list) -> int:
        """Estimativa rápida de tokens (1 token ≈ 4 chars)."""
        total_chars = sum(len(str(m)) for m in messages)
        return total_chars // 4

    def _estimate_relevance(self, task: dict, result: dict) -> float:
        """Relevância simples por overlap de palavras-chave."""
        task_words = set(task["descricao"].lower().split())
        result_words = set(str(result).lower().split())
        intersection = task_words & result_words
        return len(intersection) / max(len(task_words), 1)
6

🔄 Recuperação após Falha

O processo do orquestrador morreu no meio de uma execução de $8 em tokens. Como retomar exatamente do ponto onde parou, sem reiniciar do zero e sem duplicar trabalho já feito?

O protocolo de recuperação

1 Novo processo do orquestrador inicia. Detecta sessão com status "running" no Postgres com mais de 5 minutos sem atualização.
2 Carrega lista completa de tarefas e seus status do Redis (ou do Postgres se Redis foi reiniciado).
3 Tarefas com status DONE: carrega resultado salvo do Postgres — não reexecuta.
4 Tarefas com status RUNNING: marcar como FAILED (processo morreu, resultado desconhecido). Reagendar para reexecução.
5 Retoma execução normal a partir das tarefas PENDING e FAILED — economizando o custo das tarefas já concluídas.
class OrchestratorRecovery:
    """Lógica de recuperação após crash do orquestrador."""

    def __init__(self, redis: RedisStateManager, db: asyncpg.Pool):
        self.redis = redis
        self.db = db

    async def detect_and_resume(self, session_id: str) -> dict:
        """Detecta se sessão pode ser retomada e carrega estado."""

        # 1. Verificar status da sessão no Postgres
        session = await self.db.fetchrow("""
            SELECT * FROM orchestration_sessions WHERE session_id = $1
        """, session_id)

        if not session or session["status"] not in ("running", "abandoned"):
            return {"can_resume": False, "reason": "Sessão não está em andamento"}

        # 2. Carregar tarefas já concluídas (Postgres é fonte de verdade)
        done_tasks = await self.db.fetch("""
            SELECT task_id, output_data, cost_usd
            FROM task_executions
            WHERE session_id = $1 AND status = 'done'
        """, session_id)

        completed = {r["task_id"]: dict(r) for r in done_tasks}

        # 3. Identificar tarefas que estavam "running" (perdidas no crash)
        running_tasks = await self.db.fetch("""
            SELECT task_id FROM task_executions
            WHERE session_id = $1 AND status = 'running'
        """, session_id)

        # Marcar como FAILED para reexecução
        for row in running_tasks:
            await self.db.execute("""
                UPDATE task_executions SET status = 'failed',
                error_msg = 'Processo encerrado inesperadamente — reexecutar'
                WHERE session_id = $1 AND task_id = $2
            """, session_id, row["task_id"])

        cost_so_far = sum(r["cost_usd"] for r in done_tasks)

        return {
            "can_resume": True,
            "session_id": session_id,
            "completed_tasks": completed,
            "tasks_to_retry": [r["task_id"] for r in running_tasks],
            "cost_saved_usd": cost_so_far,
        }

Resumo do Módulo 5.3

Estado em memória local é o erro #1 em sistemas multi-agente reais
Redis gerencia estado efêmero (sub-ms) — filas, locks, heartbeats, eventos
Postgres persiste histórico permanente — audit log, resultados, configurações
Session_id como namespace: isolamento completo entre execuções concorrentes
Context budget: injete apenas o necessário para cada agente — economiza 70% em tokens
Recuperação de checkpoint: retomar de onde parou sem reiniciar execuções completas