前言:AI Agent 安全已不是选修课
2026 年,AI Agent 系统的安全风险已经从理论讨论变成了生产事故。
Claude Code 删库事件(2026 年 4 月)让整个行业意识到:当 AI Agent 拥有工具调用能力时,传统的边界防护模型已经不够用了。攻击者不再需要直接入侵你的系统——他们只需要让一个拥有工具权限的 Agent 执行错误的操作。
CISA 等机构联合发布的 Agentic AI 安全部署指南,指出了当前最大的问题:大多数企业在部署 AI Agent 时,先考虑功能实现,再考虑安全加固。这是传统软件开发的思维,但在 Agent 系统里,这个顺序是致命的——因为 Agent 的行为边界远比传统软件模糊,一个配置错误可能导致 Agent 访问不该访问的资源。
本文给出 AI Agent 安全架构的四个核心维度:身份与认证(Authentication)→ 权限与访问控制(Authorization)→ 数据边界(Data Boundaries)→ 审计与追溯(Audit & Traceability)。每个维度给出具体的架构模式和工程实践。
第一层:身份与认证——Agent 也是身份
Agent 身份的特殊性
传统软件系统的身份模型是「人 → 服务 → 数据」。身份的主体是人或服务,身份在部署时确定,之后相对静态。
AI Agent 系统的身份模型要复杂得多:
- Agent 的行为由 LLM 驱动,行为是动态的——同一个 Agent 代码,模型参数不同,行为可能不同
- Agent 可以被提示词诱导改变行为——这不是 bug,是 LLM 的本质特性
- Agent 的身份和工具权限需要解耦——一个 Agent 可能有多个角色,每个角色有不同的权限级别
这意味着 AI Agent 系统的身份管理需要比传统软件更精细的模型。
Agent 身份的类型
类型一:部署身份(Deployment Identity)
Agent 进程在启动时被分配的运行时身份,用于 Agent 与平台、其他 Agent、第三方服务之间的通信认证。
# Agent 部署时的身份配置示例agent_identity: id: "agent-coder-prod-03" type: "coding_agent" deployment_context: environment: "production" team: "platform-engineering" version: "2.4.1" runtime_credentials: # 用于向 MCP Server 等服务认证 mcp_auth_token: "${AGENT_MCP_TOKEN}" # 用于 Agent 间通信 agent_secret: "${AGENT_SECRET}"部署身份的核心价值是建立最小权限的认证基础——Agent 只能访问它被授权访问的资源,即使提示词被注入,运行时身份层面的权限控制仍然有效。
类型二:会话身份(Session Identity)
用户与 Agent 交互时,Agent 代表用户行事时所具有的身份。在企业场景里,这是「代理用户身份」。
用户 A 与 Agent 对话Agent 以「用户 A 的身份」执行操作→ Agent 的操作权限 = 用户 A 的权限(最小权限原则)→ 任何超越用户 A 权限的操作被拒绝会话身份的关键设计点是动态权限降级:用户的权限级别可能很高,但 Agent 在执行特定操作时需要更严格的权限验证(比如财务操作需要二次确认)。
类型三:工具身份(Tool Identity)
当 Agent 调用外部工具或 API 时,工具也需要身份。典型场景是 MCP Server——Agent 通过 MCP 协议调用工具,MCP Server 需要验证「这个 Agent 有没有权限调用这个工具」。
认证架构模式
模式一:JWT + Agent Identity Certificate
Agent 启动 → 向 Identity Provider 请求 JWT ↓Agent 调用工具 → 携带 JWT ↓MCP Server 验证 JWT → 检查 Agent 身份和权限声明 ↓授权决策(基于策略)# Agent 端的认证实现import jwtfrom datetime import datetime, timedelta
def create_agent_token(agent_id: str, permissions: list[str]) -> str: payload = { "agent_id": agent_id, "permissions": permissions, "iat": datetime.utcnow(), "exp": datetime.utcnow() + timedelta(hours=1), "jti": f"{agent_id}-{datetime.utcnow().timestamp()}" # 唯一标识,防重放 } # 使用 Agent 私钥签名 return jwt.encode(payload, AGENT_PRIVATE_KEY, algorithm="RS256")
# MCP Server 端的验证实现def verify_agent_token(token: str) -> dict: try: payload = jwt.decode(token, IDENTITY_PROVIDER_PUBLIC_KEY, algorithms=["RS256"])
# 检查 Token 是否在黑名单(已撤销的 Token) if is_token_revoked(payload["jti"]): raise PermissionError("Token has been revoked")
return payload except jwt.ExpiredSignatureError: raise PermissionError("Token has expired") except jwt.InvalidTokenError: raise PermissionError("Invalid token")模式二:mTLS(双向 TLS)
在高安全要求场景(如金融、医疗),JWT 可能被伪造或泄露。mTLS 提供更强壮的认证——每个 Agent 持有唯一的客户端证书,证书与 Agent 身份的绑定在 CA 层面保证。
MCP Server 持有 CA 证书Agent 持有 Agent 证书(由 CA 签发,包含 Agent ID 和权限范围)Agent 连接 MCP Server → 双方交换证书 → 双向验证mTLS 的优势是身份不可伪造(除非私钥泄露),缺点是证书管理复杂,适合有成熟 PKI 基础设施的企业。
第二层:权限与访问控制——最小权限的精细化实现
权限模型的选择:RBAC vs ABAC vs RAGAS
RBAC(Role-Based Access Control)
将权限分配给角色,角色分配给 Agent:
Agent 角色:Coder_Agent角色权限:read_codebase, write_file, execute_testsRBAC 的优点是简单直观,缺点是粒度粗。当同一个 Agent 在不同场景需要不同权限时,RBAC 需要维护多个角色。
ABAC(Attribute-Based Access Control)
基于属性的权限控制,权限决策考虑多种属性:
Agent 属性:{ environment: "production", team: "platform", clearance: "level2" }资源属性:{ type: "database", sensitivity: "high", owner: "finance" }环境属性:{ time: "business_hours", location: "office_network" }
权限决策:若 Agent.clearance >= resource.sensitivity → 允许访问ABAC 的优点是灵活性高,可以表达复杂的权限逻辑,缺点是策略定义和维护复杂。
RAGAS(Risk-Driven Access control for AI Agents)
2026 年新提出的权限模型,专门针对 AI Agent 场景。核心思路是根据操作风险等级动态调整权限:
低风险操作(读取公开信息):直接放行中风险操作(修改文件、调用外部 API):提示词层面确认高风险操作(删除数据、执行系统命令):强制二次认证极危险操作(跨租户访问、绕过审计日志):直接拒绝RAGAS 的优势是安全与用户体验的平衡——低风险操作不打扰用户,高风险操作强制验证。但实现复杂度最高。
工具权限的分层控制
工具调用是 AI Agent 安全最脆弱的环节。当 Agent 可以「做任何事」时,提示词注入攻击可以让 Agent 执行恶意操作。
层级一:工具白名单
只允许 Agent 调用白名单内的工具:
# MCP Server 端工具注册class ToolRegistry: def __init__(self): self.tools = {}
def register(self, tool: Tool, allowed_roles: list[str]): self.tools[tool.name] = { "implementation": tool, "allowed_roles": allowed_roles, "risk_level": tool.risk_level }
def check_permission(self, agent_role: str, tool_name: str) -> bool: if tool_name not in self.tools: return False # 工具不存在,直接拒绝 return agent_role in self.tools[tool_name]["allowed_roles"]
# 工具定义时的权限声明class FileWriteTool(Tool): name = "file_write" risk_level = "medium"
@tool_permission(allowed_roles=["coder", "devops"]) def execute(self, path: str, content: str): ...层级二:参数级别控制
不仅是「能不能调用这个工具」,还要控制「能操作什么资源」:
# 文件操作工具的参数级别权限控制class FileWriteTool(Tool): def execute(self, path: str, content: str, agent_context: AgentContext): # 检查 Agent 是否有权限写这个路径 allowed_paths = get_agent_allowed_paths(agent_context.agent_id)
# 路径必须是 allowed_paths 列表的子路径 if not any(path.startswith(allowed) for allowed in allowed_paths): raise PermissionError(f"Agent not allowed to write to {path}")
# 额外的危险路径检查 DANGEROUS_PATHS = ["/etc/", "/var/", "/root/", ".ssh/"] if any(path.startswith(dangerous) for dangerous in DANGEROUS_PATHS): raise PermissionError(f"Cannot write to system path: {path}")层级三:操作审计(所有操作都记录)
无论权限检查通过还是拒绝,所有操作都要有日志:
class AuditedToolWrapper(Tool): def __init__(self, tool: Tool): self.tool = tool
def execute(self, *args, agent_context: AgentContext, **kwargs): # 记录操作尝试(无论成功与否) audit_log.write({ "timestamp": datetime.utcnow().isoformat(), "agent_id": agent_context.agent_id, "agent_role": agent_context.role, "tool": self.tool.name, "args": sanitize_args(args), # 脱敏后记录 "decision": "pending", "risk_level": self.tool.risk_level })
try: result = self.tool.execute(*args, **kwargs) audit_log.update_last( decision="allowed", result="success" ) return result except PermissionError as e: audit_log.update_last( decision="denied", reason=str(e) ) raise except Exception as e: audit_log.update_last( decision="error", reason=str(e) ) raiseMCP 安全:模型上下文协议的安全实践
MCP(Model Context Protocol)是 2026 年 AI Agent 生态中最流行的工具调用协议,但它的安全模型在默认配置下存在隐患。
MCP 的默认安全假设
MCP 协议假设 Agent 和 MCP Server 在同一信任边界内。这个假设在以下场景失效:
- Agent 部署在用户设备上,MCP Server 部署在企业内网
- Agent 从不可信来源获取提示词(用户输入或文件内容)
- MCP Server 连接了多个不同信任级别的工具
MCP 安全加固实践
实践一:MCP Server 网络隔离
# MCP Server 网络配置mcp_server: network: # 只接受来自特定网络的连接 bind_address: "10.0.1.0/24" # 企业内网段 # 不暴露在公共网络 public_exposed: false # 强制 TLS require_tls: true tls: cert: "/etc/mcp/tls.crt" key: "/etc/mcp/tls.key"实践二:MCP 请求来源验证
class MCPRequestValidator: def validate(self, request: MCPRequest, context: RequestContext) -> bool: # 验证请求来源 if not self.verify_source_trust(context.source_identity): return False
# 验证请求范围(防止权限提升攻击) if not self.verify_scope(request.scope, context.granted_scope): return False
# 验证操作频率(防止滥用) if self.rate_limiter.is_exceeded(context.agent_id): return False
return True实践三:工具调用的沙箱隔离
import subprocessimport resource
class SandboxedToolExecution: def execute(self, tool: Tool, args: dict, constraints: ExecutionConstraints): # 设置资源限制 resource.setrlimit(resource.RLIMIT_CPU, (constraints.max_cpu_seconds,)*2) resource.setrlimit(resource.RLIMIT_AS, (constraints.max_memory_mb * 1024 * 1024,)*2)
# 禁止网络访问(针对系统命令执行类工具) # 在容器级别实现网络隔离 with isolated_network(): result = tool.execute(**args)
return result第三层:数据边界——Agent 能看到什么数据
数据分类与 Agent 可见性
不是所有企业数据都应该对 Agent 可见。数据边界控制的目标是确保 Agent 只能访问它需要的最小数据集。
数据分类:├── 公开数据(Public):所有 Agent 可读├── 内部数据(Internal):特定团队的 Agent 可读├── 敏感数据(Confidential):需要特定权限才能访问的敏感信息└── 绝密数据(Restricted):即使 Agent 也不能直接访问,需要人工审批敏感数据识别
class DataClassifier: # 常见的敏感数据模式 SENSITIVE_PATTERNS = { "pii": [ r"\b\d{3}-\d{2}-\d{4}\b", # SSN r"\b\d{16}\b", # Credit Card r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}", # Email ], "credentials": [ r"api[_-]?key\s*[=:]\s*['\"]?\w+['\"]?", r"password\s*[=:]\s*['\"]?\w+['\"]?", r"-----BEGIN (RSA |EC )?PRIVATE KEY-----", ], "healthcare": [ r"patient[_-]?id\s*[=:]\s*\w+", r"diagnosis[_-]?code\s*[=:]\s*\w+", ] }
def classify(self, content: str) -> list[str]: findings = [] for category, patterns in self.SENSITIVE_PATTERNS.items(): for pattern in patterns: if re.search(pattern, content, re.IGNORECASE): findings.append(category) return findingsRAG 检索的数据边界
当 Agent 使用 RAG(检索增强生成)时,数据边界控制更加复杂——需要确保检索结果不包含 Agent 不应该看到的数据。
方案一:向量数据库级别的访问控制
class AccessControlledVectorStore: def __init__(self, vector_db, access_control: AccessControlList): self.vector_db = vector_db self.access_control = access_control
def query(self, query_embedding: list, agent_context: AgentContext, top_k: int): # 获取 Agent 的数据访问权限 allowed_datasets = self.access_control.get_allowed_datasets( agent_id=agent_context.agent_id, data_level=agent_context.access_level )
# 过滤检索范围 results = self.vector_db.query( embedding=query_embedding, filter={"dataset_id": {"$in": allowed_datasets}}, top_k=top_k )
# 脱敏处理 return self.sanitize(results)
def sanitize(self, results: list) -> list: # 移除检索结果中的敏感字段 for result in results: if "metadata" in result: # 移除敏感元数据 result["metadata"] = { k: v for k, v in result["metadata"].items() if k not in ["user_email", "ssn", "credit_card"] } return results方案二:RAG 结果的后置过滤
def rag_retrieve_with_filter(query: str, agent_context: AgentContext) -> list[Document]: # 正常检索 raw_results = vector_db.query(query, top_k=20)
# 后置过滤 filtered_results = [] for doc in raw_results: # 检查文档的数据级别 doc_level = get_document_data_level(doc) agent_level = get_agent_access_level(agent_context)
if agent_level >= doc_level: filtered_results.append(doc) else: # 记录访问被拒绝(但不透露文档内容) audit_log.record_access_denied( agent_id=agent_context.agent_id, document_id=doc.id, reason=f"Agent level {agent_level} < document level {doc_level}" )
return filtered_results第四层:审计与追溯——出了事怎么查
审计日志的设计原则
原则一:日志不可篡改
审计日志是事后调查的唯一依据,必须有防篡改机制:
import hashlibimport jsonfrom datetime import datetime
class ImmutableAuditLog: def __init__(self, storage_backend): self.storage = storage_backend self.previous_hash = None
def write(self, entry: dict): # 添加时间戳和序列号 entry["timestamp"] = datetime.utcnow().isoformat() entry["sequence"] = self.storage.get_next_sequence()
# 计算哈希链(类似区块链) entry["hash"] = self._compute_hash(entry, self.previous_hash)
self.storage.append(entry) self.previous_hash = entry["hash"]
def _compute_hash(self, entry: dict, previous_hash: str) -> str: # 包含前一条记录的哈希,形成链式结构 content = json.dumps(entry, sort_keys=True) + (previous_hash or "") return hashlib.sha256(content.encode()).hexdigest()
def verify(self) -> bool: """验证日志完整性""" for i, entry in enumerate(self.storage.get_all()): expected_previous = self.storage.get_entry(i-1)["hash"] if i > 0 else None if entry.get("previous_hash") != expected_previous: return False return True原则二:记录所有决策点,不只是操作结果
不完整的日志:[10:00] Agent: file_write /project/main.py → SUCCESS
完整的日志:[10:00] Agent: file_write /project/main.py → SUCCESS Decision path: - User request: "Fix the bug in main.py" - Plan: [read main.py] → [analyze bug] → [write fix] - Tool call: read /project/main.py - Context: Found bug at line 42 (null pointer in API call) - Tool call: write /project/main.py - Content length: 245 bytes, modified lines: 40-45 Risk check: - Path in allowed list: ✓ - Contains sensitive data: ✗ - External network access: ✗可追溯性的实现架构
┌─────────────────────────────────────────────────────────────┐│ Agent System ││ ┌──────────┐ ┌──────────────┐ ┌──────────────────┐ ││ │ Planner │ → │ Executor │ → │ Tool Call Layer │ ││ └──────────┘ └──────────────┘ └──────────────────┘ │└─────────────────────────────────────────────────────────────┘ │ │ │ ▼ ▼ ▼┌─────────────────────────────────────────────────────────────┐│ Audit & Trace Collector ││ - Decision logs (Planner outputs) ││ - Execution logs (Executor state) ││ - Tool call logs (arguments + results) │└─────────────────────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ Immutable Audit Storage ││ - Append-only log store ││ - Cryptographic hash chain ││ - Retention policy (compliance based) │└─────────────────────────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────┐│ Trace Reconstruction Service ││ - Full trace reconstruction from logs ││ - Anomaly detection ││ - Compliance reporting │└─────────────────────────────────────────────────────────────┘受监管行业的落地建议
金融行业
金融行业对 AI Agent 的安全要求最为严格,几个关键点:
- 数据本地化:所有操作日志、审计记录必须在境内存储
- 模型可解释性:监管要求能够解释 Agent 的每个决策
- 双人审批:高风险操作需要两个人工审批节点
- 年度安全评估:Agent 系统必须通过独立的安全评估才能续期
建议架构:
用户请求 → Agent(受限环境)→ 工具调用(MCP,加密传输) ↓所有操作写入本地审计日志(不可篡改) ↓合规审查队列(人工 + 自动) ↓定期生成监管报告医疗行业
医疗行业的核心是 HIPAA 合规,关键点:
- PHI(Protected Health Information)最小化暴露:Agent 只能看到它需要诊断/处理的那部分患者数据
- 去标识化:所有用于训练或测试的数据必须完全去标识化
- 访问审计:每次 PHI 访问都必须有完整的审计记录
建议架构:
class PHIAccessController: def check_phi_access(self, agent_context: AgentContext, patient_data: dict, purpose: str) -> bool: # 检查 Agent 是否有 PHI 访问权限 if not agent_context.can_access_phi: return False
# 检查访问目的(最小必要原则) if purpose not in ["treatment", "payment", "operations"]: return False
# 检查最小必要原则(Agent 只能看到需要的部分) required_fields = get_minimum_necessary_fields(purpose, patient_data) filtered_data = {k: patient_data.get(k) for k in required_fields}
# 记录 PHI 访问 self.audit_phi_access( agent_id=agent_context.agent_id, patient_id=patient_data["patient_id"], fields_accessed=required_fields, purpose=purpose )
return True安全架构的技术选型小结
| 维度 | 推荐方案 | 适用场景 |
|---|---|---|
| 身份认证 | JWT + Agent Identity Certificate | 通用场景 |
| 身份认证 | mTLS | 高安全要求(金融、医疗) |
| 权限模型 | RBAC + 参数级别控制 | 权限相对固定的场景 |
| 权限模型 | RAGAS | 需要动态调整权限的复杂场景 |
| 工具控制 | 白名单 + 沙箱隔离 | 所有场景(必须) |
| 数据边界 | 向量数据库级别 ACL | RAG 场景 |
| 审计日志 | 哈希链 + 不可变存储 | 所有场景(必须) |
本文首发于 MosuoAI,AI Agent 开发者的深度指南。