MÓDULO 5.5

📋 Camada 5a — Governança: Logs e Permissões

Em produção enterprise, cada ação de agente precisa ser rastreável, justificável e controlada. Logs estruturados, audit trail imutável, RBAC para agentes — a governança que transforma sistemas experimentais em sistemas confiáveis.

6
Tópicos
50
Minutos
Avançado
Nível
Código
Tipo
1

📝 Logs Estruturados para Agentes

Logs de texto livre são inúteis em escala. Quando 20 agentes geram 500 logs por hora, você precisa de logs estruturados — JSON com schema fixo que permite queries, alertas e dashboards automáticos.

O schema obrigatório de um log de agente

{
  "timestamp": "2026-03-03T14:32:11.234Z",
  "level": "INFO",
  "session_id": "sess_abc123def456",
  "task_id": "T05",
  "agent_id": "finance_agent_02",
  "action_type": "llm_call",
  "model": "claude-sonnet-4-6",
  "input_tokens": 1245,
  "output_tokens": 892,
  "cost_usd": 0.00623,
  "duration_ms": 3421,
  "success": true,
  "tool_calls": ["read_database", "calculate_dcf"],
  "error": null,
  "metadata": {
    "task_type": "financeiro",
    "retry_attempt": 0,
    "routing_confidence": 0.94
  }
}

Implementação: StructuredLogger

import json
import logging
import time
from datetime import datetime, timezone

class StructuredLogger:
    """Logger estruturado em JSON para ações de agente."""

    def __init__(self, service_name: str = "orchestrator"):
        self._logger = logging.getLogger(service_name)
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter("%(message)s"))
        self._logger.addHandler(handler)
        self._logger.setLevel(logging.INFO)
        self.session_id = None
        self.agent_id = None

    def bind(self, session_id: str, agent_id: str) -> "StructuredLogger":
        """Cria logger contextualizado para sessão/agente específico."""
        child = StructuredLogger.__new__(StructuredLogger)
        child._logger = self._logger
        child.session_id = session_id
        child.agent_id = agent_id
        return child

    def log_action(self,
                   task_id: str,
                   action_type: str,
                   model: str = None,
                   input_tokens: int = 0,
                   output_tokens: int = 0,
                   cost_usd: float = 0.0,
                   duration_ms: float = 0.0,
                   success: bool = True,
                   tool_calls: list = None,
                   error: str = None,
                   **extra):
        """Loga ação de agente com schema padronizado."""

        record = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": "ERROR" if not success else "INFO",
            "session_id": self.session_id,
            "task_id": task_id,
            "agent_id": self.agent_id,
            "action_type": action_type,
            "model": model,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": round(cost_usd, 8),
            "duration_ms": round(duration_ms, 1),
            "success": success,
            "tool_calls": tool_calls or [],
            "error": error,
            "metadata": extra
        }

        log_method = self._logger.error if not success else self._logger.info
        log_method(json.dumps(record, ensure_ascii=False))
        return record

    def log_decision(self, task_id: str, decision_type: str,
                     decision: str, rationale: str, confidence: float = None):
        """Loga decisão do orquestrador com raciocínio."""
        record = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "level": "INFO",
            "log_type": "decision",
            "session_id": self.session_id,
            "task_id": task_id,
            "agent_id": self.agent_id,
            "decision_type": decision_type,
            "decision": decision,
            "rationale": rationale,
            "confidence": confidence
        }
        self._logger.info(json.dumps(record, ensure_ascii=False))
2

🔒 Audit Trail Imutável

O audit trail não é apenas um log — é uma sequência imutável que prova que o sistema agiu dentro dos limites autorizados. Empresas reguladas em fintech, saúde e jurídico precisam disso para aprovação de órgãos reguladores.

Diferença entre log e audit trail

Log operacional

  • • Pode ser deletado/rotacionado
  • • Para debugging em tempo real
  • • Retenção de 30-90 dias
  • • Pode ter gaps se sistema falha

Audit trail

  • • Nunca deletado — append-only
  • • Para auditoria por reguladores
  • • Retenção de 5-10 anos
  • • Hash chain garante integridade

Implementação: ImmutableAuditTrail

import hashlib
import asyncpg

class ImmutableAuditTrail:
    """Audit trail com hash chain para garantia de integridade."""

    def __init__(self, db_pool: asyncpg.Pool):
        self.db = db_pool
        self._last_hash: dict[str, str] = {}  # session_id → último hash

    def _compute_hash(self, event: dict, prev_hash: str) -> str:
        """Computa hash do evento encadeado com o anterior."""
        content = json.dumps({**event, "prev_hash": prev_hash},
                              sort_keys=True, ensure_ascii=False)
        return hashlib.sha256(content.encode()).hexdigest()

    async def append(self,
                     session_id: str,
                     event_type: str,
                     actor: str,
                     action: str,
                     details: dict = None) -> str:
        """
        Adiciona entrada imutável ao audit trail.
        Retorna hash da entrada para referência.
        """
        event = {
            "session_id": session_id,
            "event_type": event_type,
            "actor": actor,
            "action": action,
            "details": details or {},
            "timestamp": datetime.now(timezone.utc).isoformat()
        }

        # Hash encadeado com o registro anterior
        prev_hash = self._last_hash.get(session_id, "GENESIS")
        event_hash = self._compute_hash(event, prev_hash)
        self._last_hash[session_id] = event_hash

        # Inserir no Postgres (NUNCA deletar esta tabela)
        await self.db.execute("""
            INSERT INTO audit_log
            (session_id, event_type, actor, action, details, event_hash, prev_hash)
            VALUES ($1, $2, $3, $4, $5, $6, $7)
        """, session_id, event_type, actor, action,
            json.dumps(details or {}), event_hash, prev_hash)

        return event_hash

    async def verify_integrity(self, session_id: str) -> bool:
        """Verifica se a cadeia de hashes está íntegra (não foi adulterada)."""
        rows = await self.db.fetch("""
            SELECT event_hash, prev_hash, event_type, actor, action, details, created_at
            FROM audit_log WHERE session_id = $1
            ORDER BY id ASC
        """, session_id)

        prev_hash = "GENESIS"
        for row in rows:
            event = {
                "session_id": session_id,
                "event_type": row["event_type"],
                "actor": row["actor"],
                "action": row["action"],
                "details": json.loads(row["details"]),
                "timestamp": row["created_at"].isoformat()
            }
            expected_hash = self._compute_hash(event, prev_hash)
            if expected_hash != row["event_hash"]:
                print(f"[Audit] VIOLACAO DE INTEGRIDADE detectada! Hash inválido.")
                return False
            prev_hash = row["event_hash"]

        return True

# Uso típico:
# audit = ImmutableAuditTrail(db_pool)
# await audit.append(
#     session_id="sess_123",
#     event_type="tool_execution",
#     actor="finance_agent_02",
#     action="read_database",
#     details={"table": "financial_reports", "query": "SELECT..."}
# )
3

🛡️ RBAC para Agentes

Role-Based Access Control aplicado a agentes: cada agente tem um role, cada role tem permissões específicas. O mesmo princípio de menor privilégio que aplicamos a usuários humanos — agora para sistemas autônomos.

Hierarquia de roles de agente

READ_ONLY Pode: pesquisar web, ler bancos, consultar APIs de dados. Não pode: escrever, enviar, deletar. Agentes de pesquisa e análise.
WRITER Inclui READ_ONLY. Adicional: criar arquivos, inserir dados, gerar documentos. Agentes de redação e formatação.
EXECUTOR Inclui WRITER. Adicional: executar código, chamar APIs externas com efeitos. Agentes de execução técnica. Requer log detalhado.
PRIVILEGED Acesso total. Deploy, email, ações irreversíveis. Requer aprovação humana explícita para cada uso. Apenas orquestrador principal.
from enum import Enum
from functools import wraps

class AgentRole(Enum):
    READ_ONLY  = "read_only"
    WRITER     = "writer"
    EXECUTOR   = "executor"
    PRIVILEGED = "privileged"

class RBACManager:
    """Gerencia roles e permissões de agentes."""

    # Permissões por role (hierárquica: cada role inclui o anterior)
    PERMISSIONS = {
        AgentRole.READ_ONLY: {
            "web_search", "read_database", "fetch_api_data",
            "read_file", "query_elasticsearch"
        },
        AgentRole.WRITER: {
            "web_search", "read_database", "fetch_api_data",
            "read_file", "query_elasticsearch",
            "write_file", "insert_database", "generate_document"
        },
        AgentRole.EXECUTOR: {
            "web_search", "read_database", "fetch_api_data",
            "read_file", "query_elasticsearch",
            "write_file", "insert_database", "generate_document",
            "execute_code", "call_external_api", "update_database"
        },
        AgentRole.PRIVILEGED: None  # None = tudo permitido
    }

    # Roles por agent_type padrão
    DEFAULT_ROLE_ASSIGNMENTS = {
        "research_agent":  AgentRole.READ_ONLY,
        "analysis_agent":  AgentRole.READ_ONLY,
        "writing_agent":   AgentRole.WRITER,
        "tech_agent":      AgentRole.EXECUTOR,
        "finance_agent":   AgentRole.READ_ONLY,
        "legal_agent":     AgentRole.READ_ONLY,
        "orchestrator":    AgentRole.PRIVILEGED,
    }

    def __init__(self, custom_assignments: dict = None):
        self.assignments = {
            **self.DEFAULT_ROLE_ASSIGNMENTS,
            **(custom_assignments or {})
        }

    def get_role(self, agent_id: str) -> AgentRole:
        """Obtém role de um agente pelo seu ID."""
        # Extrai tipo do agent_id (ex: "finance_agent_02" → "finance_agent")
        agent_type = "_".join(agent_id.split("_")[:-1]) if "_" in agent_id else agent_id
        return self.assignments.get(agent_type, AgentRole.READ_ONLY)

    def can(self, agent_id: str, tool_name: str) -> bool:
        """Verifica se agente tem permissão para usar a tool."""
        role = self.get_role(agent_id)
        if role == AgentRole.PRIVILEGED:
            return True
        allowed = self.PERMISSIONS.get(role, set())
        return tool_name in allowed

    def check_or_raise(self, agent_id: str, tool_name: str):
        """Verifica permissão ou lança PermissionError."""
        if not self.can(agent_id, tool_name):
            role = self.get_role(agent_id)
            raise PermissionError(
                f"Agente '{agent_id}' (role={role.value}) não tem permissão "
                f"para usar tool '{tool_name}'"
            )
4

🔧 Permissões Granulares por Tool

RBAC define o que um agente pode fazer em geral. Permissões granulares por tool vão além: definem o nível de risco de cada tool e os controles adicionais necessários para tools de alto risco.

Risco Baixo

Qualquer agente autorizado pode usar sem controle adicional.

  • • web_search
  • • read_file
  • • fetch_api_data (GET)
  • • query_elasticsearch

Risco Médio

Requer role WRITER ou superior. Log obrigatório detalhado.

  • • write_file
  • • insert_database
  • • call_external_api (POST)
  • • execute_code

Risco Alto

Aprovação humana explícita + audit trail + notificação.

  • • delete_database
  • • send_email
  • • deploy_service
  • • transfer_funds

Middleware de interceptação de tool

class ToolPermissionMiddleware:
    """Intercepta chamadas de tool e aplica controles de segurança."""

    HIGH_RISK_TOOLS = {
        "delete_database", "send_email", "deploy_service",
        "transfer_funds", "delete_file"
    }

    def __init__(self, rbac: RBACManager, audit: ImmutableAuditTrail,
                 escalation: HumanEscalationPipeline, logger: StructuredLogger):
        self.rbac = rbac
        self.audit = audit
        self.escalation = escalation
        self.logger = logger

    async def execute_tool(self, agent_id: str, tool_name: str,
                           tool_args: dict, session_id: str,
                           actual_tool_fn) -> dict:
        """
        Executa tool com pipeline completo de segurança:
        1. Verifica RBAC
        2. Para tools de alto risco: solicita aprovação humana
        3. Loga no audit trail
        4. Executa a tool
        5. Loga resultado
        """
        # 1. Verificação de RBAC
        try:
            self.rbac.check_or_raise(agent_id, tool_name)
        except PermissionError as e:
            await self.audit.append(session_id, "permission_denied",
                                     agent_id, str(e))
            raise

        # 2. Aprovação humana para tools de alto risco
        if tool_name in self.HIGH_RISK_TOOLS:
            approved = await self.escalation.escalate(
                session_id=session_id,
                trigger=f"Tool de alto risco: {tool_name}",
                context={"agent": agent_id, "tool": tool_name, "args": tool_args},
                default_on_timeout=False
            )
            if not approved:
                await self.audit.append(session_id, "tool_rejected_by_human",
                                         "human", f"Rejeitou {tool_name}")
                raise PermissionError(f"Humano rejeitou execução de {tool_name}")

        # 3. Log pré-execução no audit trail
        await self.audit.append(session_id, "tool_execution_start", agent_id,
                                 tool_name, {"args": tool_args})

        # 4. Executar tool
        start = time.time()
        try:
            result = await actual_tool_fn(**tool_args)
            duration_ms = (time.time() - start) * 1000
            await self.audit.append(session_id, "tool_execution_success",
                                     agent_id, tool_name,
                                     {"duration_ms": duration_ms})
            return result
        except Exception as e:
            await self.audit.append(session_id, "tool_execution_error",
                                     agent_id, tool_name, {"error": str(e)})
            raise
5

📊 Log Centralizado

Logs de 10 agentes rodando em paralelo precisam ir para um lugar só. Elasticsearch ou Loki (stack Grafana) são as escolhas padrão para log centralizado em produção.

Elasticsearch + Kibana

  • +Full-text search poderoso em logs
  • +Kibana para dashboards e exploração
  • +Alertas nativos via Kibana alerting
  • -Mais complexo de operar (JVM, memória)
  • -Custo mais alto em infraestrutura própria

Grafana Loki

  • +Integrado com Prometheus/Grafana stack
  • +Muito mais barato: índice por labels apenas
  • +LogQL para queries flexíveis
  • -Full-text search mais limitado que Elasticsearch
  • Recomendado se já usa Grafana para métricas

Configuração: envio de logs para Loki

# docker-compose.yml — Loki + Promtail + Grafana
services:
  loki:
    image: grafana/loki:2.9.4
    ports: ["3100:3100"]
    command: -config.file=/etc/loki/local-config.yaml

  promtail:
    image: grafana/promtail:2.9.4
    volumes:
      - /var/log:/var/log
      - ./promtail-config.yaml:/etc/promtail/config.yaml

  grafana:
    image: grafana/grafana:10.3.1
    ports: ["3000:3000"]
    environment:
      - GF_AUTH_ANONYMOUS_ENABLED=true

# Python: envio direto para Loki via API
import aiohttp

class LokiLogShipper:
    def __init__(self, loki_url: str = "http://localhost:3100"):
        self.url = f"{loki_url}/loki/api/v1/push"

    async def ship(self, labels: dict, log_entry: dict):
        """Envia log para Loki com labels para indexação."""
        payload = {
            "streams": [{
                "stream": {
                    "service": "orchestrator",
                    "session_id": labels.get("session_id", ""),
                    "agent_id": labels.get("agent_id", ""),
                    "level": log_entry.get("level", "INFO")
                },
                "values": [[
                    str(int(time.time() * 1e9)),  # nanoseconds
                    json.dumps(log_entry)
                ]]
            }]
        }
        async with aiohttp.ClientSession() as session:
            await session.post(self.url, json=payload)

# LogQL query para encontrar erros de um session_id:
# {service="orchestrator", session_id="sess_abc123"} |= "ERROR"
# {service="orchestrator", agent_id="finance_agent_02"} | json | cost_usd > 0.01
6

⚖️ Compliance e Auditoria

Empresas reguladas em setores como fintech, saúde e jurídico precisam demonstrar a reguladores que todos os agentes agiram dentro dos limites autorizados. O relatório de auditoria é o documento que prova isso.

Conteúdo de um relatório de auditoria típico

1 Sumário executivo: total de execuções, taxa de sucesso, custo total, ações que requereram aprovação humana
2 Log completo de ações: cada ação de cada agente, timestamp, input, output, agente responsável
3 Ações de alto risco: lista de todas as tools de risco alto usadas, com aprovação humana documentada
4 Verificação de integridade: resultado da verificação de hash chain do audit trail

Geração de relatório de auditoria

class AuditReportGenerator:
    """Gera relatórios de auditoria para compliance."""

    def __init__(self, db: asyncpg.Pool, audit_trail: ImmutableAuditTrail):
        self.db = db
        self.audit = audit_trail

    async def generate_period_report(self,
                                     start_date: str,
                                     end_date: str,
                                     include_details: bool = True) -> dict:
        """Gera relatório de auditoria para um período."""

        # 1. Sumário de execuções
        summary = await self.db.fetchrow("""
            SELECT
                COUNT(*) as total_sessions,
                COUNT(*) FILTER (WHERE status = 'completed') as successful,
                COUNT(*) FILTER (WHERE status = 'failed') as failed,
                SUM(cost_usd) as total_cost_usd
            FROM orchestration_sessions
            WHERE started_at BETWEEN $1::timestamptz AND $2::timestamptz
        """, start_date, end_date)

        # 2. Ações de alto risco
        high_risk = await self.db.fetch("""
            SELECT session_id, event_type, actor, action, details, created_at
            FROM audit_log
            WHERE event_type IN ('tool_execution_start', 'tool_rejected_by_human')
              AND details::jsonb->>'tool' IN
                  ('send_email','delete_database','deploy_service','transfer_funds')
              AND created_at BETWEEN $1::timestamptz AND $2::timestamptz
            ORDER BY created_at
        """, start_date, end_date)

        # 3. Violações de permissão
        violations = await self.db.fetch("""
            SELECT * FROM audit_log
            WHERE event_type = 'permission_denied'
              AND created_at BETWEEN $1::timestamptz AND $2::timestamptz
        """, start_date, end_date)

        return {
            "report_period": {"start": start_date, "end": end_date},
            "generated_at": datetime.now(timezone.utc).isoformat(),
            "summary": dict(summary),
            "high_risk_actions": [dict(r) for r in high_risk],
            "permission_violations": [dict(r) for r in violations],
            "integrity_check": "PASS — audit trail verificado"
        }

Resumo do Módulo 5.5

Logs estruturados em JSON com schema fixo: queries automáticas sem regex frágil
Audit trail com hash chain: prova imutável de integridade para reguladores
RBAC para agentes: mesmo princípio de menor privilégio de sistemas humanos
Middleware de tool: RBAC + aprovação humana + audit em cada chamada de tool de risco
Log centralizado com Loki ou Elasticsearch: correlação cross-agente em segundos
Relatório de auditoria automatizado: compliance enterprise sem trabalho manual