📝 Logs Estruturados para Agentes
Logs de texto livre são inúteis em escala. Quando 20 agentes geram 500 logs por hora, você precisa de logs estruturados — JSON com schema fixo que permite queries, alertas e dashboards automáticos.
O schema obrigatório de um log de agente
{
"timestamp": "2026-03-03T14:32:11.234Z",
"level": "INFO",
"session_id": "sess_abc123def456",
"task_id": "T05",
"agent_id": "finance_agent_02",
"action_type": "llm_call",
"model": "claude-sonnet-4-6",
"input_tokens": 1245,
"output_tokens": 892,
"cost_usd": 0.00623,
"duration_ms": 3421,
"success": true,
"tool_calls": ["read_database", "calculate_dcf"],
"error": null,
"metadata": {
"task_type": "financeiro",
"retry_attempt": 0,
"routing_confidence": 0.94
}
}
Implementação: StructuredLogger
import json
import logging
import time
from datetime import datetime, timezone
class StructuredLogger:
"""Logger estruturado em JSON para ações de agente."""
def __init__(self, service_name: str = "orchestrator"):
self._logger = logging.getLogger(service_name)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(message)s"))
self._logger.addHandler(handler)
self._logger.setLevel(logging.INFO)
self.session_id = None
self.agent_id = None
def bind(self, session_id: str, agent_id: str) -> "StructuredLogger":
"""Cria logger contextualizado para sessão/agente específico."""
child = StructuredLogger.__new__(StructuredLogger)
child._logger = self._logger
child.session_id = session_id
child.agent_id = agent_id
return child
def log_action(self,
task_id: str,
action_type: str,
model: str = None,
input_tokens: int = 0,
output_tokens: int = 0,
cost_usd: float = 0.0,
duration_ms: float = 0.0,
success: bool = True,
tool_calls: list = None,
error: str = None,
**extra):
"""Loga ação de agente com schema padronizado."""
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": "ERROR" if not success else "INFO",
"session_id": self.session_id,
"task_id": task_id,
"agent_id": self.agent_id,
"action_type": action_type,
"model": model,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": round(cost_usd, 8),
"duration_ms": round(duration_ms, 1),
"success": success,
"tool_calls": tool_calls or [],
"error": error,
"metadata": extra
}
log_method = self._logger.error if not success else self._logger.info
log_method(json.dumps(record, ensure_ascii=False))
return record
def log_decision(self, task_id: str, decision_type: str,
decision: str, rationale: str, confidence: float = None):
"""Loga decisão do orquestrador com raciocínio."""
record = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": "INFO",
"log_type": "decision",
"session_id": self.session_id,
"task_id": task_id,
"agent_id": self.agent_id,
"decision_type": decision_type,
"decision": decision,
"rationale": rationale,
"confidence": confidence
}
self._logger.info(json.dumps(record, ensure_ascii=False))
🔒 Audit Trail Imutável
O audit trail não é apenas um log — é uma sequência imutável que prova que o sistema agiu dentro dos limites autorizados. Empresas reguladas em fintech, saúde e jurídico precisam disso para aprovação de órgãos reguladores.
Diferença entre log e audit trail
Log operacional
- • Pode ser deletado/rotacionado
- • Para debugging em tempo real
- • Retenção de 30-90 dias
- • Pode ter gaps se sistema falha
Audit trail
- • Nunca deletado — append-only
- • Para auditoria por reguladores
- • Retenção de 5-10 anos
- • Hash chain garante integridade
Implementação: ImmutableAuditTrail
import hashlib
import asyncpg
class ImmutableAuditTrail:
"""Audit trail com hash chain para garantia de integridade."""
def __init__(self, db_pool: asyncpg.Pool):
self.db = db_pool
self._last_hash: dict[str, str] = {} # session_id → último hash
def _compute_hash(self, event: dict, prev_hash: str) -> str:
"""Computa hash do evento encadeado com o anterior."""
content = json.dumps({**event, "prev_hash": prev_hash},
sort_keys=True, ensure_ascii=False)
return hashlib.sha256(content.encode()).hexdigest()
async def append(self,
session_id: str,
event_type: str,
actor: str,
action: str,
details: dict = None) -> str:
"""
Adiciona entrada imutável ao audit trail.
Retorna hash da entrada para referência.
"""
event = {
"session_id": session_id,
"event_type": event_type,
"actor": actor,
"action": action,
"details": details or {},
"timestamp": datetime.now(timezone.utc).isoformat()
}
# Hash encadeado com o registro anterior
prev_hash = self._last_hash.get(session_id, "GENESIS")
event_hash = self._compute_hash(event, prev_hash)
self._last_hash[session_id] = event_hash
# Inserir no Postgres (NUNCA deletar esta tabela)
await self.db.execute("""
INSERT INTO audit_log
(session_id, event_type, actor, action, details, event_hash, prev_hash)
VALUES ($1, $2, $3, $4, $5, $6, $7)
""", session_id, event_type, actor, action,
json.dumps(details or {}), event_hash, prev_hash)
return event_hash
async def verify_integrity(self, session_id: str) -> bool:
"""Verifica se a cadeia de hashes está íntegra (não foi adulterada)."""
rows = await self.db.fetch("""
SELECT event_hash, prev_hash, event_type, actor, action, details, created_at
FROM audit_log WHERE session_id = $1
ORDER BY id ASC
""", session_id)
prev_hash = "GENESIS"
for row in rows:
event = {
"session_id": session_id,
"event_type": row["event_type"],
"actor": row["actor"],
"action": row["action"],
"details": json.loads(row["details"]),
"timestamp": row["created_at"].isoformat()
}
expected_hash = self._compute_hash(event, prev_hash)
if expected_hash != row["event_hash"]:
print(f"[Audit] VIOLACAO DE INTEGRIDADE detectada! Hash inválido.")
return False
prev_hash = row["event_hash"]
return True
# Uso típico:
# audit = ImmutableAuditTrail(db_pool)
# await audit.append(
# session_id="sess_123",
# event_type="tool_execution",
# actor="finance_agent_02",
# action="read_database",
# details={"table": "financial_reports", "query": "SELECT..."}
# )
🛡️ RBAC para Agentes
Role-Based Access Control aplicado a agentes: cada agente tem um role, cada role tem permissões específicas. O mesmo princípio de menor privilégio que aplicamos a usuários humanos — agora para sistemas autônomos.
Hierarquia de roles de agente
from enum import Enum
from functools import wraps
class AgentRole(Enum):
READ_ONLY = "read_only"
WRITER = "writer"
EXECUTOR = "executor"
PRIVILEGED = "privileged"
class RBACManager:
"""Gerencia roles e permissões de agentes."""
# Permissões por role (hierárquica: cada role inclui o anterior)
PERMISSIONS = {
AgentRole.READ_ONLY: {
"web_search", "read_database", "fetch_api_data",
"read_file", "query_elasticsearch"
},
AgentRole.WRITER: {
"web_search", "read_database", "fetch_api_data",
"read_file", "query_elasticsearch",
"write_file", "insert_database", "generate_document"
},
AgentRole.EXECUTOR: {
"web_search", "read_database", "fetch_api_data",
"read_file", "query_elasticsearch",
"write_file", "insert_database", "generate_document",
"execute_code", "call_external_api", "update_database"
},
AgentRole.PRIVILEGED: None # None = tudo permitido
}
# Roles por agent_type padrão
DEFAULT_ROLE_ASSIGNMENTS = {
"research_agent": AgentRole.READ_ONLY,
"analysis_agent": AgentRole.READ_ONLY,
"writing_agent": AgentRole.WRITER,
"tech_agent": AgentRole.EXECUTOR,
"finance_agent": AgentRole.READ_ONLY,
"legal_agent": AgentRole.READ_ONLY,
"orchestrator": AgentRole.PRIVILEGED,
}
def __init__(self, custom_assignments: dict = None):
self.assignments = {
**self.DEFAULT_ROLE_ASSIGNMENTS,
**(custom_assignments or {})
}
def get_role(self, agent_id: str) -> AgentRole:
"""Obtém role de um agente pelo seu ID."""
# Extrai tipo do agent_id (ex: "finance_agent_02" → "finance_agent")
agent_type = "_".join(agent_id.split("_")[:-1]) if "_" in agent_id else agent_id
return self.assignments.get(agent_type, AgentRole.READ_ONLY)
def can(self, agent_id: str, tool_name: str) -> bool:
"""Verifica se agente tem permissão para usar a tool."""
role = self.get_role(agent_id)
if role == AgentRole.PRIVILEGED:
return True
allowed = self.PERMISSIONS.get(role, set())
return tool_name in allowed
def check_or_raise(self, agent_id: str, tool_name: str):
"""Verifica permissão ou lança PermissionError."""
if not self.can(agent_id, tool_name):
role = self.get_role(agent_id)
raise PermissionError(
f"Agente '{agent_id}' (role={role.value}) não tem permissão "
f"para usar tool '{tool_name}'"
)
🔧 Permissões Granulares por Tool
RBAC define o que um agente pode fazer em geral. Permissões granulares por tool vão além: definem o nível de risco de cada tool e os controles adicionais necessários para tools de alto risco.
Risco Baixo
Qualquer agente autorizado pode usar sem controle adicional.
- • web_search
- • read_file
- • fetch_api_data (GET)
- • query_elasticsearch
Risco Médio
Requer role WRITER ou superior. Log obrigatório detalhado.
- • write_file
- • insert_database
- • call_external_api (POST)
- • execute_code
Risco Alto
Aprovação humana explícita + audit trail + notificação.
- • delete_database
- • send_email
- • deploy_service
- • transfer_funds
Middleware de interceptação de tool
class ToolPermissionMiddleware:
"""Intercepta chamadas de tool e aplica controles de segurança."""
HIGH_RISK_TOOLS = {
"delete_database", "send_email", "deploy_service",
"transfer_funds", "delete_file"
}
def __init__(self, rbac: RBACManager, audit: ImmutableAuditTrail,
escalation: HumanEscalationPipeline, logger: StructuredLogger):
self.rbac = rbac
self.audit = audit
self.escalation = escalation
self.logger = logger
async def execute_tool(self, agent_id: str, tool_name: str,
tool_args: dict, session_id: str,
actual_tool_fn) -> dict:
"""
Executa tool com pipeline completo de segurança:
1. Verifica RBAC
2. Para tools de alto risco: solicita aprovação humana
3. Loga no audit trail
4. Executa a tool
5. Loga resultado
"""
# 1. Verificação de RBAC
try:
self.rbac.check_or_raise(agent_id, tool_name)
except PermissionError as e:
await self.audit.append(session_id, "permission_denied",
agent_id, str(e))
raise
# 2. Aprovação humana para tools de alto risco
if tool_name in self.HIGH_RISK_TOOLS:
approved = await self.escalation.escalate(
session_id=session_id,
trigger=f"Tool de alto risco: {tool_name}",
context={"agent": agent_id, "tool": tool_name, "args": tool_args},
default_on_timeout=False
)
if not approved:
await self.audit.append(session_id, "tool_rejected_by_human",
"human", f"Rejeitou {tool_name}")
raise PermissionError(f"Humano rejeitou execução de {tool_name}")
# 3. Log pré-execução no audit trail
await self.audit.append(session_id, "tool_execution_start", agent_id,
tool_name, {"args": tool_args})
# 4. Executar tool
start = time.time()
try:
result = await actual_tool_fn(**tool_args)
duration_ms = (time.time() - start) * 1000
await self.audit.append(session_id, "tool_execution_success",
agent_id, tool_name,
{"duration_ms": duration_ms})
return result
except Exception as e:
await self.audit.append(session_id, "tool_execution_error",
agent_id, tool_name, {"error": str(e)})
raise
📊 Log Centralizado
Logs de 10 agentes rodando em paralelo precisam ir para um lugar só. Elasticsearch ou Loki (stack Grafana) são as escolhas padrão para log centralizado em produção.
Elasticsearch + Kibana
- +Full-text search poderoso em logs
- +Kibana para dashboards e exploração
- +Alertas nativos via Kibana alerting
- -Mais complexo de operar (JVM, memória)
- -Custo mais alto em infraestrutura própria
Grafana Loki
- +Integrado com Prometheus/Grafana stack
- +Muito mais barato: índice por labels apenas
- +LogQL para queries flexíveis
- -Full-text search mais limitado que Elasticsearch
- →Recomendado se já usa Grafana para métricas
Configuração: envio de logs para Loki
# docker-compose.yml — Loki + Promtail + Grafana
services:
loki:
image: grafana/loki:2.9.4
ports: ["3100:3100"]
command: -config.file=/etc/loki/local-config.yaml
promtail:
image: grafana/promtail:2.9.4
volumes:
- /var/log:/var/log
- ./promtail-config.yaml:/etc/promtail/config.yaml
grafana:
image: grafana/grafana:10.3.1
ports: ["3000:3000"]
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
# Python: envio direto para Loki via API
import aiohttp
class LokiLogShipper:
def __init__(self, loki_url: str = "http://localhost:3100"):
self.url = f"{loki_url}/loki/api/v1/push"
async def ship(self, labels: dict, log_entry: dict):
"""Envia log para Loki com labels para indexação."""
payload = {
"streams": [{
"stream": {
"service": "orchestrator",
"session_id": labels.get("session_id", ""),
"agent_id": labels.get("agent_id", ""),
"level": log_entry.get("level", "INFO")
},
"values": [[
str(int(time.time() * 1e9)), # nanoseconds
json.dumps(log_entry)
]]
}]
}
async with aiohttp.ClientSession() as session:
await session.post(self.url, json=payload)
# LogQL query para encontrar erros de um session_id:
# {service="orchestrator", session_id="sess_abc123"} |= "ERROR"
# {service="orchestrator", agent_id="finance_agent_02"} | json | cost_usd > 0.01
⚖️ Compliance e Auditoria
Empresas reguladas em setores como fintech, saúde e jurídico precisam demonstrar a reguladores que todos os agentes agiram dentro dos limites autorizados. O relatório de auditoria é o documento que prova isso.
Conteúdo de um relatório de auditoria típico
Geração de relatório de auditoria
class AuditReportGenerator:
"""Gera relatórios de auditoria para compliance."""
def __init__(self, db: asyncpg.Pool, audit_trail: ImmutableAuditTrail):
self.db = db
self.audit = audit_trail
async def generate_period_report(self,
start_date: str,
end_date: str,
include_details: bool = True) -> dict:
"""Gera relatório de auditoria para um período."""
# 1. Sumário de execuções
summary = await self.db.fetchrow("""
SELECT
COUNT(*) as total_sessions,
COUNT(*) FILTER (WHERE status = 'completed') as successful,
COUNT(*) FILTER (WHERE status = 'failed') as failed,
SUM(cost_usd) as total_cost_usd
FROM orchestration_sessions
WHERE started_at BETWEEN $1::timestamptz AND $2::timestamptz
""", start_date, end_date)
# 2. Ações de alto risco
high_risk = await self.db.fetch("""
SELECT session_id, event_type, actor, action, details, created_at
FROM audit_log
WHERE event_type IN ('tool_execution_start', 'tool_rejected_by_human')
AND details::jsonb->>'tool' IN
('send_email','delete_database','deploy_service','transfer_funds')
AND created_at BETWEEN $1::timestamptz AND $2::timestamptz
ORDER BY created_at
""", start_date, end_date)
# 3. Violações de permissão
violations = await self.db.fetch("""
SELECT * FROM audit_log
WHERE event_type = 'permission_denied'
AND created_at BETWEEN $1::timestamptz AND $2::timestamptz
""", start_date, end_date)
return {
"report_period": {"start": start_date, "end": end_date},
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": dict(summary),
"high_risk_actions": [dict(r) for r in high_risk],
"permission_violations": [dict(r) for r in violations],
"integrity_check": "PASS — audit trail verificado"
}