前言:AI Agent 安全已不是选修课

2026 年,AI Agent 系统的安全风险已经从理论讨论变成了生产事故。

Claude Code 删库事件(2026 年 4 月)让整个行业意识到:当 AI Agent 拥有工具调用能力时,传统的边界防护模型已经不够用了。攻击者不再需要直接入侵你的系统——他们只需要让一个拥有工具权限的 Agent 执行错误的操作。

CISA 等机构联合发布的 Agentic AI 安全部署指南,指出了当前最大的问题:大多数企业在部署 AI Agent 时,先考虑功能实现,再考虑安全加固。这是传统软件开发的思维,但在 Agent 系统里,这个顺序是致命的——因为 Agent 的行为边界远比传统软件模糊,一个配置错误可能导致 Agent 访问不该访问的资源。

本文给出 AI Agent 安全架构的四个核心维度:身份与认证(Authentication)→ 权限与访问控制(Authorization)→ 数据边界(Data Boundaries)→ 审计与追溯(Audit & Traceability)。每个维度给出具体的架构模式和工程实践。


第一层:身份与认证——Agent 也是身份

Agent 身份的特殊性

传统软件系统的身份模型是「人 → 服务 → 数据」。身份的主体是人或服务,身份在部署时确定,之后相对静态。

AI Agent 系统的身份模型要复杂得多:

  • Agent 的行为由 LLM 驱动,行为是动态的——同一个 Agent 代码,模型参数不同,行为可能不同
  • Agent 可以被提示词诱导改变行为——这不是 bug,是 LLM 的本质特性
  • Agent 的身份和工具权限需要解耦——一个 Agent 可能有多个角色,每个角色有不同的权限级别

这意味着 AI Agent 系统的身份管理需要比传统软件更精细的模型。

Agent 身份的类型

类型一:部署身份(Deployment Identity)

Agent 进程在启动时被分配的运行时身份,用于 Agent 与平台、其他 Agent、第三方服务之间的通信认证。

# Agent 部署时的身份配置示例
agent_identity:
id: "agent-coder-prod-03"
type: "coding_agent"
deployment_context:
environment: "production"
team: "platform-engineering"
version: "2.4.1"
runtime_credentials:
# 用于向 MCP Server 等服务认证
mcp_auth_token: "${AGENT_MCP_TOKEN}"
# 用于 Agent 间通信
agent_secret: "${AGENT_SECRET}"

部署身份的核心价值是建立最小权限的认证基础——Agent 只能访问它被授权访问的资源,即使提示词被注入,运行时身份层面的权限控制仍然有效。

类型二:会话身份(Session Identity)

用户与 Agent 交互时,Agent 代表用户行事时所具有的身份。在企业场景里,这是「代理用户身份」。

用户 A 与 Agent 对话
Agent 以「用户 A 的身份」执行操作
→ Agent 的操作权限 = 用户 A 的权限(最小权限原则)
→ 任何超越用户 A 权限的操作被拒绝

会话身份的关键设计点是动态权限降级:用户的权限级别可能很高,但 Agent 在执行特定操作时需要更严格的权限验证(比如财务操作需要二次确认)。

类型三:工具身份(Tool Identity)

当 Agent 调用外部工具或 API 时,工具也需要身份。典型场景是 MCP Server——Agent 通过 MCP 协议调用工具,MCP Server 需要验证「这个 Agent 有没有权限调用这个工具」。

认证架构模式

模式一:JWT + Agent Identity Certificate

Agent 启动 → 向 Identity Provider 请求 JWT
Agent 调用工具 → 携带 JWT
MCP Server 验证 JWT → 检查 Agent 身份和权限声明
授权决策(基于策略)
# Agent 端的认证实现
import jwt
from datetime import datetime, timedelta
def create_agent_token(agent_id: str, permissions: list[str]) -> str:
payload = {
"agent_id": agent_id,
"permissions": permissions,
"iat": datetime.utcnow(),
"exp": datetime.utcnow() + timedelta(hours=1),
"jti": f"{agent_id}-{datetime.utcnow().timestamp()}" # 唯一标识,防重放
}
# 使用 Agent 私钥签名
return jwt.encode(payload, AGENT_PRIVATE_KEY, algorithm="RS256")
# MCP Server 端的验证实现
def verify_agent_token(token: str) -> dict:
try:
payload = jwt.decode(token, IDENTITY_PROVIDER_PUBLIC_KEY, algorithms=["RS256"])
# 检查 Token 是否在黑名单(已撤销的 Token)
if is_token_revoked(payload["jti"]):
raise PermissionError("Token has been revoked")
return payload
except jwt.ExpiredSignatureError:
raise PermissionError("Token has expired")
except jwt.InvalidTokenError:
raise PermissionError("Invalid token")

模式二:mTLS(双向 TLS)

在高安全要求场景(如金融、医疗),JWT 可能被伪造或泄露。mTLS 提供更强壮的认证——每个 Agent 持有唯一的客户端证书,证书与 Agent 身份的绑定在 CA 层面保证。

MCP Server 持有 CA 证书
Agent 持有 Agent 证书(由 CA 签发,包含 Agent ID 和权限范围)
Agent 连接 MCP Server → 双方交换证书 → 双向验证

mTLS 的优势是身份不可伪造(除非私钥泄露),缺点是证书管理复杂,适合有成熟 PKI 基础设施的企业。


第二层:权限与访问控制——最小权限的精细化实现

权限模型的选择:RBAC vs ABAC vs RAGAS

RBAC(Role-Based Access Control)

将权限分配给角色,角色分配给 Agent:

Agent 角色:Coder_Agent
角色权限:read_codebase, write_file, execute_tests

RBAC 的优点是简单直观,缺点是粒度粗。当同一个 Agent 在不同场景需要不同权限时,RBAC 需要维护多个角色。

ABAC(Attribute-Based Access Control)

基于属性的权限控制,权限决策考虑多种属性:

Agent 属性:{ environment: "production", team: "platform", clearance: "level2" }
资源属性:{ type: "database", sensitivity: "high", owner: "finance" }
环境属性:{ time: "business_hours", location: "office_network" }
权限决策:若 Agent.clearance >= resource.sensitivity → 允许访问

ABAC 的优点是灵活性高,可以表达复杂的权限逻辑,缺点是策略定义和维护复杂。

RAGAS(Risk-Driven Access control for AI Agents)

2026 年新提出的权限模型,专门针对 AI Agent 场景。核心思路是根据操作风险等级动态调整权限

低风险操作(读取公开信息):直接放行
中风险操作(修改文件、调用外部 API):提示词层面确认
高风险操作(删除数据、执行系统命令):强制二次认证
极危险操作(跨租户访问、绕过审计日志):直接拒绝

RAGAS 的优势是安全与用户体验的平衡——低风险操作不打扰用户,高风险操作强制验证。但实现复杂度最高。

工具权限的分层控制

工具调用是 AI Agent 安全最脆弱的环节。当 Agent 可以「做任何事」时,提示词注入攻击可以让 Agent 执行恶意操作。

层级一:工具白名单

只允许 Agent 调用白名单内的工具:

# MCP Server 端工具注册
class ToolRegistry:
def __init__(self):
self.tools = {}
def register(self, tool: Tool, allowed_roles: list[str]):
self.tools[tool.name] = {
"implementation": tool,
"allowed_roles": allowed_roles,
"risk_level": tool.risk_level
}
def check_permission(self, agent_role: str, tool_name: str) -> bool:
if tool_name not in self.tools:
return False # 工具不存在,直接拒绝
return agent_role in self.tools[tool_name]["allowed_roles"]
# 工具定义时的权限声明
class FileWriteTool(Tool):
name = "file_write"
risk_level = "medium"
@tool_permission(allowed_roles=["coder", "devops"])
def execute(self, path: str, content: str):
...

层级二:参数级别控制

不仅是「能不能调用这个工具」,还要控制「能操作什么资源」:

# 文件操作工具的参数级别权限控制
class FileWriteTool(Tool):
def execute(self, path: str, content: str, agent_context: AgentContext):
# 检查 Agent 是否有权限写这个路径
allowed_paths = get_agent_allowed_paths(agent_context.agent_id)
# 路径必须是 allowed_paths 列表的子路径
if not any(path.startswith(allowed) for allowed in allowed_paths):
raise PermissionError(f"Agent not allowed to write to {path}")
# 额外的危险路径检查
DANGEROUS_PATHS = ["/etc/", "/var/", "/root/", ".ssh/"]
if any(path.startswith(dangerous) for dangerous in DANGEROUS_PATHS):
raise PermissionError(f"Cannot write to system path: {path}")

层级三:操作审计(所有操作都记录)

无论权限检查通过还是拒绝,所有操作都要有日志:

class AuditedToolWrapper(Tool):
def __init__(self, tool: Tool):
self.tool = tool
def execute(self, *args, agent_context: AgentContext, **kwargs):
# 记录操作尝试(无论成功与否)
audit_log.write({
"timestamp": datetime.utcnow().isoformat(),
"agent_id": agent_context.agent_id,
"agent_role": agent_context.role,
"tool": self.tool.name,
"args": sanitize_args(args), # 脱敏后记录
"decision": "pending",
"risk_level": self.tool.risk_level
})
try:
result = self.tool.execute(*args, **kwargs)
audit_log.update_last(
decision="allowed",
result="success"
)
return result
except PermissionError as e:
audit_log.update_last(
decision="denied",
reason=str(e)
)
raise
except Exception as e:
audit_log.update_last(
decision="error",
reason=str(e)
)
raise

MCP 安全:模型上下文协议的安全实践

MCP(Model Context Protocol)是 2026 年 AI Agent 生态中最流行的工具调用协议,但它的安全模型在默认配置下存在隐患。

MCP 的默认安全假设

MCP 协议假设 Agent 和 MCP Server 在同一信任边界内。这个假设在以下场景失效:

  • Agent 部署在用户设备上,MCP Server 部署在企业内网
  • Agent 从不可信来源获取提示词(用户输入或文件内容)
  • MCP Server 连接了多个不同信任级别的工具

MCP 安全加固实践

实践一:MCP Server 网络隔离

# MCP Server 网络配置
mcp_server:
network:
# 只接受来自特定网络的连接
bind_address: "10.0.1.0/24" # 企业内网段
# 不暴露在公共网络
public_exposed: false
# 强制 TLS
require_tls: true
tls:
cert: "/etc/mcp/tls.crt"
key: "/etc/mcp/tls.key"

实践二:MCP 请求来源验证

class MCPRequestValidator:
def validate(self, request: MCPRequest, context: RequestContext) -> bool:
# 验证请求来源
if not self.verify_source_trust(context.source_identity):
return False
# 验证请求范围(防止权限提升攻击)
if not self.verify_scope(request.scope, context.granted_scope):
return False
# 验证操作频率(防止滥用)
if self.rate_limiter.is_exceeded(context.agent_id):
return False
return True

实践三:工具调用的沙箱隔离

import subprocess
import resource
class SandboxedToolExecution:
def execute(self, tool: Tool, args: dict, constraints: ExecutionConstraints):
# 设置资源限制
resource.setrlimit(resource.RLIMIT_CPU, (constraints.max_cpu_seconds,)*2)
resource.setrlimit(resource.RLIMIT_AS, (constraints.max_memory_mb * 1024 * 1024,)*2)
# 禁止网络访问(针对系统命令执行类工具)
# 在容器级别实现网络隔离
with isolated_network():
result = tool.execute(**args)
return result

第三层:数据边界——Agent 能看到什么数据

数据分类与 Agent 可见性

不是所有企业数据都应该对 Agent 可见。数据边界控制的目标是确保 Agent 只能访问它需要的最小数据集

数据分类:
├── 公开数据(Public):所有 Agent 可读
├── 内部数据(Internal):特定团队的 Agent 可读
├── 敏感数据(Confidential):需要特定权限才能访问的敏感信息
└── 绝密数据(Restricted):即使 Agent 也不能直接访问,需要人工审批

敏感数据识别

class DataClassifier:
# 常见的敏感数据模式
SENSITIVE_PATTERNS = {
"pii": [
r"\b\d{3}-\d{2}-\d{4}\b", # SSN
r"\b\d{16}\b", # Credit Card
r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", # Email
],
"credentials": [
r"api[_-]?key\s*[=:]\s*['\"]?\w+['\"]?",
r"password\s*[=:]\s*['\"]?\w+['\"]?",
r"-----BEGIN (RSA |EC )?PRIVATE KEY-----",
],
"healthcare": [
r"patient[_-]?id\s*[=:]\s*\w+",
r"diagnosis[_-]?code\s*[=:]\s*\w+",
]
}
def classify(self, content: str) -> list[str]:
findings = []
for category, patterns in self.SENSITIVE_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, content, re.IGNORECASE):
findings.append(category)
return findings

RAG 检索的数据边界

当 Agent 使用 RAG(检索增强生成)时,数据边界控制更加复杂——需要确保检索结果不包含 Agent 不应该看到的数据。

方案一:向量数据库级别的访问控制

class AccessControlledVectorStore:
def __init__(self, vector_db, access_control: AccessControlList):
self.vector_db = vector_db
self.access_control = access_control
def query(self, query_embedding: list, agent_context: AgentContext, top_k: int):
# 获取 Agent 的数据访问权限
allowed_datasets = self.access_control.get_allowed_datasets(
agent_id=agent_context.agent_id,
data_level=agent_context.access_level
)
# 过滤检索范围
results = self.vector_db.query(
embedding=query_embedding,
filter={"dataset_id": {"$in": allowed_datasets}},
top_k=top_k
)
# 脱敏处理
return self.sanitize(results)
def sanitize(self, results: list) -> list:
# 移除检索结果中的敏感字段
for result in results:
if "metadata" in result:
# 移除敏感元数据
result["metadata"] = {
k: v for k, v in result["metadata"].items()
if k not in ["user_email", "ssn", "credit_card"]
}
return results

方案二:RAG 结果的后置过滤

def rag_retrieve_with_filter(query: str, agent_context: AgentContext) -> list[Document]:
# 正常检索
raw_results = vector_db.query(query, top_k=20)
# 后置过滤
filtered_results = []
for doc in raw_results:
# 检查文档的数据级别
doc_level = get_document_data_level(doc)
agent_level = get_agent_access_level(agent_context)
if agent_level >= doc_level:
filtered_results.append(doc)
else:
# 记录访问被拒绝(但不透露文档内容)
audit_log.record_access_denied(
agent_id=agent_context.agent_id,
document_id=doc.id,
reason=f"Agent level {agent_level} < document level {doc_level}"
)
return filtered_results

第四层:审计与追溯——出了事怎么查

审计日志的设计原则

原则一:日志不可篡改

审计日志是事后调查的唯一依据,必须有防篡改机制:

import hashlib
import json
from datetime import datetime
class ImmutableAuditLog:
def __init__(self, storage_backend):
self.storage = storage_backend
self.previous_hash = None
def write(self, entry: dict):
# 添加时间戳和序列号
entry["timestamp"] = datetime.utcnow().isoformat()
entry["sequence"] = self.storage.get_next_sequence()
# 计算哈希链(类似区块链)
entry["hash"] = self._compute_hash(entry, self.previous_hash)
self.storage.append(entry)
self.previous_hash = entry["hash"]
def _compute_hash(self, entry: dict, previous_hash: str) -> str:
# 包含前一条记录的哈希,形成链式结构
content = json.dumps(entry, sort_keys=True) + (previous_hash or "")
return hashlib.sha256(content.encode()).hexdigest()
def verify(self) -> bool:
"""验证日志完整性"""
for i, entry in enumerate(self.storage.get_all()):
expected_previous = self.storage.get_entry(i-1)["hash"] if i > 0 else None
if entry.get("previous_hash") != expected_previous:
return False
return True

原则二:记录所有决策点,不只是操作结果

不完整的日志:
[10:00] Agent: file_write /project/main.py → SUCCESS
完整的日志:
[10:00] Agent: file_write /project/main.py → SUCCESS
Decision path:
- User request: "Fix the bug in main.py"
- Plan: [read main.py] → [analyze bug] → [write fix]
- Tool call: read /project/main.py
- Context: Found bug at line 42 (null pointer in API call)
- Tool call: write /project/main.py
- Content length: 245 bytes, modified lines: 40-45
Risk check:
- Path in allowed list: ✓
- Contains sensitive data: ✗
- External network access: ✗

可追溯性的实现架构

┌─────────────────────────────────────────────────────────────┐
│ Agent System │
│ ┌──────────┐ ┌──────────────┐ ┌──────────────────┐ │
│ │ Planner │ → │ Executor │ → │ Tool Call Layer │ │
│ └──────────┘ └──────────────┘ └──────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────────────────────────────────────────────────┐
│ Audit & Trace Collector │
│ - Decision logs (Planner outputs) │
│ - Execution logs (Executor state) │
│ - Tool call logs (arguments + results) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Immutable Audit Storage │
│ - Append-only log store │
│ - Cryptographic hash chain │
│ - Retention policy (compliance based) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ Trace Reconstruction Service │
│ - Full trace reconstruction from logs │
│ - Anomaly detection │
│ - Compliance reporting │
└─────────────────────────────────────────────────────────────┘

受监管行业的落地建议

金融行业

金融行业对 AI Agent 的安全要求最为严格,几个关键点:

  1. 数据本地化:所有操作日志、审计记录必须在境内存储
  2. 模型可解释性:监管要求能够解释 Agent 的每个决策
  3. 双人审批:高风险操作需要两个人工审批节点
  4. 年度安全评估:Agent 系统必须通过独立的安全评估才能续期

建议架构

用户请求 → Agent(受限环境)→ 工具调用(MCP,加密传输)
所有操作写入本地审计日志(不可篡改)
合规审查队列(人工 + 自动)
定期生成监管报告

医疗行业

医疗行业的核心是 HIPAA 合规,关键点:

  1. PHI(Protected Health Information)最小化暴露:Agent 只能看到它需要诊断/处理的那部分患者数据
  2. 去标识化:所有用于训练或测试的数据必须完全去标识化
  3. 访问审计:每次 PHI 访问都必须有完整的审计记录

建议架构

class PHIAccessController:
def check_phi_access(self, agent_context: AgentContext, patient_data: dict, purpose: str) -> bool:
# 检查 Agent 是否有 PHI 访问权限
if not agent_context.can_access_phi:
return False
# 检查访问目的(最小必要原则)
if purpose not in ["treatment", "payment", "operations"]:
return False
# 检查最小必要原则(Agent 只能看到需要的部分)
required_fields = get_minimum_necessary_fields(purpose, patient_data)
filtered_data = {k: patient_data.get(k) for k in required_fields}
# 记录 PHI 访问
self.audit_phi_access(
agent_id=agent_context.agent_id,
patient_id=patient_data["patient_id"],
fields_accessed=required_fields,
purpose=purpose
)
return True

安全架构的技术选型小结

维度推荐方案适用场景
身份认证JWT + Agent Identity Certificate通用场景
身份认证mTLS高安全要求(金融、医疗)
权限模型RBAC + 参数级别控制权限相对固定的场景
权限模型RAGAS需要动态调整权限的复杂场景
工具控制白名单 + 沙箱隔离所有场景(必须)
数据边界向量数据库级别 ACLRAG 场景
审计日志哈希链 + 不可变存储所有场景(必须)

本文首发于 MosuoAI,AI Agent 开发者的深度指南。