Skip to content

6.2 身份认证与授权系统

🎯 学习目标:实现企业级的身份认证和细粒度权限控制系统
⏱️ 预计时间:35分钟
📊 难度等级:⭐⭐⭐⭐⭐

🔐 现代身份认证架构

🏗️ 身份认证系统架构图

🔑 多因素身份认证(MFA)

📱 完整的MFA实现

python
# 多因素身份认证系统
import asyncio
import secrets
import hashlib
import pyotp
import qrcode
from typing import Dict, List, Optional, Tuple
from enum import Enum
from dataclasses import dataclass
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import twilio
from twilio.rest import Client as TwilioClient

class AuthFactor(Enum):
    PASSWORD = "password"
    TOTP = "totp"
    SMS = "sms"
    EMAIL = "email"
    PUSH = "push"
    BIOMETRIC = "biometric"
    HARDWARE_TOKEN = "hardware_token"

class AuthFactorStatus(Enum):
    NOT_CONFIGURED = "not_configured"
    CONFIGURED = "configured"
    VERIFIED = "verified"
    FAILED = "failed"
    EXPIRED = "expired"

@dataclass
class AuthResult:
    success: bool
    user_id: Optional[str] = None
    session_token: Optional[str] = None
    required_factors: List[AuthFactor] = None
    completed_factors: List[AuthFactor] = None
    next_factor: Optional[AuthFactor] = None
    error_message: Optional[str] = None
    expires_at: Optional[datetime] = None

class MultiFactorAuthenticator:
    """多因素身份认证器"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.factor_handlers = {
            AuthFactor.PASSWORD: PasswordHandler(config.get('password', {})),
            AuthFactor.TOTP: TOTPHandler(config.get('totp', {})),
            AuthFactor.SMS: SMSHandler(config.get('sms', {})),
            AuthFactor.EMAIL: EmailHandler(config.get('email', {})),
            AuthFactor.PUSH: PushHandler(config.get('push', {}))
        }
        
        # 认证策略配置
        self.auth_strategies = {
            'basic': [AuthFactor.PASSWORD],
            'enhanced': [AuthFactor.PASSWORD, AuthFactor.TOTP],
            'strict': [AuthFactor.PASSWORD, AuthFactor.TOTP, AuthFactor.SMS],
            'adaptive': self._adaptive_strategy
        }
        
        # 风险评估器
        self.risk_assessor = RiskAssessment()
    
    async def authenticate(self, username: str, credentials: Dict[str, Any],
                          strategy: str = 'adaptive',
                          context: Dict[str, Any] = None) -> AuthResult:
        """执行多因素认证"""
        
        # 1. 获取用户信息
        user = await self.get_user(username)
        if not user:
            return AuthResult(
                success=False,
                error_message="用户不存在"
            )
        
        # 2. 风险评估
        risk_score = await self.risk_assessor.assess_risk(user, context or {})
        
        # 3. 确定认证策略
        if strategy == 'adaptive':
            required_factors = self._adaptive_strategy(user, risk_score, context)
        else:
            required_factors = self.auth_strategies.get(strategy, [AuthFactor.PASSWORD])
        
        # 4. 执行认证流程
        completed_factors = []
        
        for factor in required_factors:
            handler = self.factor_handlers.get(factor)
            if not handler:
                return AuthResult(
                    success=False,
                    error_message=f"认证因子 {factor.value} 不受支持"
                )
            
            # 检查用户是否配置了该认证因子
            if not await handler.is_configured(user):
                return AuthResult(
                    success=False,
                    error_message=f"用户未配置 {factor.value} 认证"
                )
            
            # 执行认证
            factor_result = await handler.authenticate(user, credentials)
            
            if not factor_result.success:
                # 记录认证失败
                await self._log_auth_failure(user, factor, factor_result.error_message)
                
                return AuthResult(
                    success=False,
                    required_factors=required_factors,
                    completed_factors=completed_factors,
                    next_factor=factor,
                    error_message=factor_result.error_message
                )
            
            completed_factors.append(factor)
        
        # 5. 所有认证因子都成功
        session_token = await self._create_session(user, completed_factors, risk_score)
        
        # 6. 记录成功登录
        await self._log_auth_success(user, completed_factors, context)
        
        return AuthResult(
            success=True,
            user_id=user.id,
            session_token=session_token,
            required_factors=required_factors,
            completed_factors=completed_factors,
            expires_at=datetime.utcnow() + timedelta(hours=8)
        )
    
    def _adaptive_strategy(self, user, risk_score: float, 
                          context: Dict[str, Any] = None) -> List[AuthFactor]:
        """自适应认证策略"""
        factors = [AuthFactor.PASSWORD]  # 基础密码认证
        
        # 根据风险分数增加认证因子
        if risk_score > 0.3:
            factors.append(AuthFactor.TOTP)
        
        if risk_score > 0.6:
            factors.append(AuthFactor.SMS)
        
        if risk_score > 0.8:
            factors.append(AuthFactor.EMAIL)
        
        # 根据用户角色调整
        if user.role in ['admin', 'super_admin']:
            if AuthFactor.TOTP not in factors:
                factors.append(AuthFactor.TOTP)
        
        # 根据访问上下文调整
        if context:
            # 新设备登录
            if context.get('is_new_device'):
                if AuthFactor.EMAIL not in factors:
                    factors.append(AuthFactor.EMAIL)
            
            # 异常地理位置
            if context.get('is_unusual_location'):
                if AuthFactor.SMS not in factors:
                    factors.append(AuthFactor.SMS)
        
        return factors
    
    async def _create_session(self, user, factors: List[AuthFactor],
                            risk_score: float) -> str:
        """创建会话令牌"""
        session_data = {
            'user_id': user.id,
            'username': user.username,
            'role': user.role,
            'auth_factors': [f.value for f in factors],
            'risk_score': risk_score,
            'created_at': datetime.utcnow().isoformat(),
            'expires_at': (datetime.utcnow() + timedelta(hours=8)).isoformat()
        }
        
        # 这里应该使用安全的令牌管理器
        return await self.token_manager.create_session_token(session_data)

class PasswordHandler:
    """密码认证处理器"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.max_attempts = config.get('max_attempts', 5)
        self.lockout_duration = config.get('lockout_duration', 900)  # 15分钟
        
    async def authenticate(self, user, credentials: Dict[str, Any]) -> AuthResult:
        """密码认证"""
        password = credentials.get('password')
        if not password:
            return AuthResult(
                success=False,
                error_message="密码不能为空"
            )
        
        # 检查账户锁定状态
        if await self._is_account_locked(user):
            return AuthResult(
                success=False,
                error_message="账户已被锁定,请稍后再试"
            )
        
        # 验证密码
        if not self._verify_password(password, user.password_hash):
            await self._record_failed_attempt(user)
            return AuthResult(
                success=False,
                error_message="密码错误"
            )
        
        # 检查密码是否需要更新
        if self._should_update_password(user):
            return AuthResult(
                success=False,
                error_message="密码已过期,请更新密码"
            )
        
        await self._clear_failed_attempts(user)
        
        return AuthResult(success=True)
    
    async def is_configured(self, user) -> bool:
        """检查用户是否配置了密码"""
        return bool(user.password_hash)
    
    def _verify_password(self, password: str, hash_str: str) -> bool:
        """验证密码"""
        import bcrypt
        return bcrypt.checkpw(password.encode('utf-8'), hash_str.encode('utf-8'))
    
    async def _is_account_locked(self, user) -> bool:
        """检查账户是否被锁定"""
        failed_attempts = await self._get_failed_attempts(user)
        if failed_attempts >= self.max_attempts:
            last_attempt = await self._get_last_failed_attempt_time(user)
            if last_attempt:
                time_since_last = datetime.utcnow() - last_attempt
                if time_since_last.total_seconds() < self.lockout_duration:
                    return True
        return False

class TOTPHandler:
    """TOTP认证处理器"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.issuer_name = config.get('issuer_name', 'MCP Server')
        self.valid_window = config.get('valid_window', 1)  # 允许的时间窗口
    
    async def authenticate(self, user, credentials: Dict[str, Any]) -> AuthResult:
        """TOTP认证"""
        totp_code = credentials.get('totp_code')
        if not totp_code:
            return AuthResult(
                success=False,
                error_message="TOTP验证码不能为空"
            )
        
        if not user.totp_secret:
            return AuthResult(
                success=False,
                error_message="用户未配置TOTP"
            )
        
        # 验证TOTP代码
        totp = pyotp.TOTP(user.totp_secret)
        
        if totp.verify(totp_code, valid_window=self.valid_window):
            # 检查重放攻击
            if await self._is_code_used_recently(user, totp_code):
                return AuthResult(
                    success=False,
                    error_message="验证码已使用,请等待下一个验证码"
                )
            
            await self._record_used_code(user, totp_code)
            return AuthResult(success=True)
        
        return AuthResult(
            success=False,
            error_message="TOTP验证码错误或已过期"
        )
    
    async def is_configured(self, user) -> bool:
        """检查用户是否配置了TOTP"""
        return bool(user.totp_secret)
    
    async def setup_totp(self, user) -> Dict[str, str]:
        """设置TOTP"""
        secret = pyotp.random_base32()
        
        # 生成QR码
        totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
            name=user.username,
            issuer_name=self.issuer_name
        )
        
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(totp_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        
        # 转换为base64
        from io import BytesIO
        import base64
        
        buffer = BytesIO()
        img.save(buffer, format='PNG')
        qr_code_data = base64.b64encode(buffer.getvalue()).decode()
        
        return {
            "secret": secret,
            "qr_code": f"data:image/png;base64,{qr_code_data}",
            "manual_entry_key": secret,
            "issuer": self.issuer_name
        }

class SMSHandler:
    """SMS认证处理器"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.twilio_client = TwilioClient(
            config.get('twilio_sid'),
            config.get('twilio_token')
        )
        self.from_number = config.get('from_number')
        self.code_length = config.get('code_length', 6)
        self.code_expiry = config.get('code_expiry', 300)  # 5分钟
    
    async def authenticate(self, user, credentials: Dict[str, Any]) -> AuthResult:
        """SMS认证"""
        sms_code = credentials.get('sms_code')
        if not sms_code:
            return AuthResult(
                success=False,
                error_message="SMS验证码不能为空"
            )
        
        # 验证SMS代码
        stored_code = await self._get_stored_code(user)
        if not stored_code:
            return AuthResult(
                success=False,
                error_message="请先请求SMS验证码"
            )
        
        if stored_code['code'] != sms_code:
            return AuthResult(
                success=False,
                error_message="SMS验证码错误"
            )
        
        # 检查是否过期
        if datetime.utcnow() > stored_code['expires_at']:
            await self._clear_stored_code(user)
            return AuthResult(
                success=False,
                error_message="SMS验证码已过期"
            )
        
        await self._clear_stored_code(user)
        return AuthResult(success=True)
    
    async def is_configured(self, user) -> bool:
        """检查用户是否配置了手机号"""
        return bool(user.phone_number)
    
    async def send_sms_code(self, user) -> bool:
        """发送SMS验证码"""
        if not user.phone_number:
            return False
        
        # 生成验证码
        code = self._generate_code()
        
        # 发送SMS
        try:
            message = self.twilio_client.messages.create(
                body=f"您的MCP验证码是:{code},有效期5分钟。",
                from_=self.from_number,
                to=user.phone_number
            )
            
            # 存储验证码
            await self._store_code(user, code)
            
            return True
            
        except Exception as e:
            logger.error(f"SMS发送失败: {e}")
            return False
    
    def _generate_code(self) -> str:
        """生成验证码"""
        import random
        return ''.join([str(random.randint(0, 9)) for _ in range(self.code_length)])
    
    async def _store_code(self, user, code: str):
        """存储验证码"""
        code_data = {
            'code': code,
            'created_at': datetime.utcnow(),
            'expires_at': datetime.utcnow() + timedelta(seconds=self.code_expiry)
        }
        # 存储到Redis或数据库
        await self.redis_client.setex(
            f"sms_code:{user.id}",
            self.code_expiry,
            json.dumps(code_data, default=str)
        )

# 风险评估器
class RiskAssessment:
    """风险评估器"""
    
    def __init__(self):
        self.risk_factors = {
            'device_trust': DeviceTrustFactor(),
            'location_trust': LocationTrustFactor(), 
            'behavior_pattern': BehaviorPatternFactor(),
            'time_pattern': TimePatternFactor(),
            'network_reputation': NetworkReputationFactor()
        }
    
    async def assess_risk(self, user, context: Dict[str, Any]) -> float:
        """评估认证风险"""
        risk_scores = []
        
        for factor_name, factor in self.risk_factors.items():
            score = await factor.calculate_risk(user, context)
            risk_scores.append(score)
        
        # 计算加权平均风险分数
        weights = [0.3, 0.25, 0.2, 0.15, 0.1]  # 各因子权重
        weighted_score = sum(score * weight for score, weight in zip(risk_scores, weights))
        
        return min(max(weighted_score, 0.0), 1.0)  # 限制在0-1范围内

class DeviceTrustFactor:
    """设备信任度因子"""
    
    async def calculate_risk(self, user, context: Dict[str, Any]) -> float:
        """计算设备风险分数"""
        device_id = context.get('device_id')
        user_agent = context.get('user_agent', '')
        
        if not device_id:
            return 0.8  # 无设备ID,高风险
        
        # 检查设备是否已知
        is_known_device = await self._is_known_device(user, device_id)
        if is_known_device:
            return 0.1  # 已知设备,低风险
        
        # 检查设备指纹
        device_fingerprint = self._generate_device_fingerprint(context)
        is_similar_device = await self._is_similar_device(user, device_fingerprint)
        
        if is_similar_device:
            return 0.3  # 相似设备,中等风险
        
        # 新设备
        return 0.7
    
    def _generate_device_fingerprint(self, context: Dict[str, Any]) -> str:
        """生成设备指纹"""
        components = [
            context.get('user_agent', ''),
            context.get('screen_resolution', ''),
            context.get('timezone', ''),
            context.get('language', ''),
            context.get('platform', '')
        ]
        
        fingerprint_data = '|'.join(components)
        return hashlib.sha256(fingerprint_data.encode()).hexdigest()

class LocationTrustFactor:
    """地理位置信任度因子"""
    
    async def calculate_risk(self, user, context: Dict[str, Any]) -> float:
        """计算地理位置风险分数"""
        client_ip = context.get('client_ip')
        if not client_ip:
            return 0.5
        
        # 获取IP地理位置
        location = await self._get_ip_location(client_ip)
        if not location:
            return 0.6
        
        # 检查是否为用户常用地理位置
        is_usual_location = await self._is_usual_location(user, location)
        if is_usual_location:
            return 0.1
        
        # 检查地理位置风险
        country_risk = self._get_country_risk(location.get('country'))
        distance_risk = await self._calculate_distance_risk(user, location)
        
        return min(country_risk + distance_risk, 1.0)
    
    async def _get_ip_location(self, ip: str) -> Dict[str, Any]:
        """获取IP地理位置"""
        # 这里应该集成真实的地理IP服务
        return {
            'country': 'US',
            'region': 'California',
            'city': 'San Francisco',
            'latitude': 37.7749,
            'longitude': -122.4194
        }
    
    def _get_country_risk(self, country: str) -> float:
        """获取国家风险等级"""
        high_risk_countries = ['CN', 'RU', 'KP', 'IR']
        medium_risk_countries = ['BR', 'IN', 'PK']
        
        if country in high_risk_countries:
            return 0.8
        elif country in medium_risk_countries:
            return 0.4
        else:
            return 0.1

🎫 基于角色的访问控制(RBAC)

🏗️ 高级RBAC实现

python
# 基于角色的访问控制系统
from typing import Dict, List, Set, Optional, Any, Tuple
from enum import Enum
from dataclasses import dataclass
from abc import ABC, abstractmethod
import json
from datetime import datetime, timedelta

class ResourceType(Enum):
    API_ENDPOINT = "api_endpoint"
    DATABASE_TABLE = "database_table"
    FILE_SYSTEM = "file_system"
    MCP_TOOL = "mcp_tool"
    CONFIGURATION = "configuration"
    AUDIT_LOG = "audit_log"

class ActionType(Enum):
    CREATE = "create"
    READ = "read"
    UPDATE = "update"
    DELETE = "delete"
    EXECUTE = "execute"
    ADMIN = "admin"
    LIST = "list"
    EXPORT = "export"

@dataclass
class Permission:
    """权限定义"""
    resource: str
    actions: Set[ActionType]
    conditions: Optional[Dict[str, Any]] = None
    
    def allows_action(self, action: ActionType, context: Dict[str, Any] = None) -> bool:
        """检查权限是否允许指定操作"""
        if action not in self.actions:
            return False
        
        # 检查条件约束
        if self.conditions and context:
            return self._evaluate_conditions(self.conditions, context)
        
        return True
    
    def _evaluate_conditions(self, conditions: Dict[str, Any], 
                           context: Dict[str, Any]) -> bool:
        """评估权限条件"""
        for condition, expected_value in conditions.items():
            if condition == "time_range":
                if not self._check_time_range(expected_value, context):
                    return False
            elif condition == "ip_range":
                if not self._check_ip_range(expected_value, context):
                    return False
            elif condition == "resource_owner":
                if not self._check_resource_ownership(expected_value, context):
                    return False
            elif condition in context:
                if context[condition] != expected_value:
                    return False
        
        return True
    
    def _check_time_range(self, time_range: Dict[str, str], 
                         context: Dict[str, Any]) -> bool:
        """检查时间范围限制"""
        current_time = datetime.now().time()
        start_time = datetime.strptime(time_range["start"], "%H:%M").time()
        end_time = datetime.strptime(time_range["end"], "%H:%M").time()
        
        return start_time <= current_time <= end_time
    
    def _check_ip_range(self, ip_ranges: List[str], 
                       context: Dict[str, Any]) -> bool:
        """检查IP范围限制"""
        import ipaddress
        
        client_ip = context.get("client_ip")
        if not client_ip:
            return False
        
        try:
            client_ip_obj = ipaddress.ip_address(client_ip)
            for ip_range in ip_ranges:
                if client_ip_obj in ipaddress.ip_network(ip_range):
                    return True
        except ValueError:
            return False
        
        return False
    
    def _check_resource_ownership(self, owner_field: str, 
                                context: Dict[str, Any]) -> bool:
        """检查资源所有权"""
        resource_owner = context.get(owner_field)
        current_user = context.get("user_id")
        
        return resource_owner == current_user

@dataclass
class Role:
    """角色定义"""
    name: str
    description: str
    permissions: List[Permission]
    parent_roles: List['Role'] = None
    is_system_role: bool = False
    
    def __post_init__(self):
        if self.parent_roles is None:
            self.parent_roles = []
    
    def get_all_permissions(self) -> List[Permission]:
        """获取所有权限(包括继承的权限)"""
        all_permissions = list(self.permissions)
        
        # 递归获取父角色权限
        for parent_role in self.parent_roles:
            all_permissions.extend(parent_role.get_all_permissions())
        
        # 去重(基于资源和操作)
        unique_permissions = {}
        for perm in all_permissions:
            key = (perm.resource, frozenset(perm.actions))
            if key not in unique_permissions:
                unique_permissions[key] = perm
            else:
                # 合并权限条件
                existing_perm = unique_permissions[key]
                if perm.conditions:
                    if existing_perm.conditions:
                        existing_perm.conditions.update(perm.conditions)
                    else:
                        existing_perm.conditions = perm.conditions.copy()
        
        return list(unique_permissions.values())

class RBACManager:
    """RBAC管理器"""
    
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.user_roles: Dict[str, List[str]] = {}
        self.role_cache: Dict[str, List[Permission]] = {}
        
        # 初始化系统角色
        self._init_system_roles()
    
    def _init_system_roles(self):
        """初始化系统角色"""
        
        # 匿名用户角色
        guest_role = Role(
            name="guest",
            description="匿名访客角色",
            permissions=[
                Permission("/api/public", {ActionType.READ}),
                Permission("/api/health", {ActionType.READ})
            ],
            is_system_role=True
        )
        
        # 普通用户角色
        user_role = Role(
            name="user",
            description="普通用户角色",
            permissions=[
                Permission("/api/user/profile", {ActionType.READ, ActionType.UPDATE}),
                Permission("/api/tools", {ActionType.READ, ActionType.EXECUTE}),
                Permission("/api/resources", {ActionType.READ})
            ],
            parent_roles=[guest_role],
            is_system_role=True
        )
        
        # 管理员角色
        admin_role = Role(
            name="admin",
            description="管理员角色",
            permissions=[
                Permission("/api/admin", {ActionType.CREATE, ActionType.READ, 
                          ActionType.UPDATE, ActionType.DELETE}),
                Permission("/api/users", {ActionType.CREATE, ActionType.READ,
                          ActionType.UPDATE, ActionType.DELETE}),
                Permission("/api/roles", {ActionType.CREATE, ActionType.READ,
                          ActionType.UPDATE, ActionType.DELETE})
            ],
            parent_roles=[user_role],
            is_system_role=True
        )
        
        # 超级管理员角色
        super_admin_role = Role(
            name="super_admin",
            description="超级管理员角色",
            permissions=[
                Permission("*", {ActionType.ADMIN}),  # 所有权限
                Permission("/api/system", {ActionType.CREATE, ActionType.READ,
                          ActionType.UPDATE, ActionType.DELETE})
            ],
            parent_roles=[admin_role],
            is_system_role=True
        )
        
        self.roles.update({
            "guest": guest_role,
            "user": user_role,
            "admin": admin_role,
            "super_admin": super_admin_role
        })
    
    def create_role(self, name: str, description: str, 
                   permissions: List[Permission],
                   parent_roles: List[str] = None) -> Role:
        """创建新角色"""
        if name in self.roles:
            raise ValueError(f"角色 '{name}' 已存在")
        
        # 解析父角色
        parent_role_objects = []
        if parent_roles:
            for parent_name in parent_roles:
                if parent_name not in self.roles:
                    raise ValueError(f"父角色 '{parent_name}' 不存在")
                parent_role_objects.append(self.roles[parent_name])
        
        role = Role(
            name=name,
            description=description,
            permissions=permissions,
            parent_roles=parent_role_objects
        )
        
        self.roles[name] = role
        self._clear_cache()  # 清除缓存
        
        return role
    
    def assign_role_to_user(self, user_id: str, role_name: str):
        """为用户分配角色"""
        if role_name not in self.roles:
            raise ValueError(f"角色 '{role_name}' 不存在")
        
        if user_id not in self.user_roles:
            self.user_roles[user_id] = []
        
        if role_name not in self.user_roles[user_id]:
            self.user_roles[user_id].append(role_name)
            self._clear_user_cache(user_id)
    
    def revoke_role_from_user(self, user_id: str, role_name: str):
        """撤销用户角色"""
        if user_id in self.user_roles and role_name in self.user_roles[user_id]:
            self.user_roles[user_id].remove(role_name)
            self._clear_user_cache(user_id)
    
    def check_permission(self, user_id: str, resource: str, 
                        action: ActionType, context: Dict[str, Any] = None) -> bool:
        """检查用户权限"""
        user_permissions = self.get_user_permissions(user_id)
        
        # 检查直接权限匹配
        for permission in user_permissions:
            if self._resource_matches(permission.resource, resource):
                if permission.allows_action(action, context):
                    return True
        
        return False
    
    def get_user_permissions(self, user_id: str) -> List[Permission]:
        """获取用户所有权限"""
        cache_key = f"user_permissions:{user_id}"
        
        if cache_key in self.role_cache:
            return self.role_cache[cache_key]
        
        all_permissions = []
        user_role_names = self.user_roles.get(user_id, ["guest"])  # 默认为guest角色
        
        for role_name in user_role_names:
            if role_name in self.roles:
                role = self.roles[role_name]
                all_permissions.extend(role.get_all_permissions())
        
        # 去重和合并权限
        merged_permissions = self._merge_permissions(all_permissions)
        
        # 缓存结果
        self.role_cache[cache_key] = merged_permissions
        
        return merged_permissions
    
    def _resource_matches(self, permission_resource: str, requested_resource: str) -> bool:
        """检查资源是否匹配权限"""
        # 通配符匹配
        if permission_resource == "*":
            return True
        
        # 精确匹配
        if permission_resource == requested_resource:
            return True
        
        # 前缀匹配
        if permission_resource.endswith("*"):
            prefix = permission_resource[:-1]
            if requested_resource.startswith(prefix):
                return True
        
        # 正则表达式匹配
        if permission_resource.startswith("regex:"):
            import re
            pattern = permission_resource[6:]  # 移除 "regex:" 前缀
            if re.match(pattern, requested_resource):
                return True
        
        return False
    
    def _merge_permissions(self, permissions: List[Permission]) -> List[Permission]:
        """合并重复权限"""
        permission_dict = {}
        
        for perm in permissions:
            key = perm.resource
            
            if key in permission_dict:
                # 合并操作
                existing_perm = permission_dict[key]
                existing_perm.actions.update(perm.actions)
                
                # 合并条件
                if perm.conditions:
                    if existing_perm.conditions:
                        existing_perm.conditions.update(perm.conditions)
                    else:
                        existing_perm.conditions = perm.conditions.copy()
            else:
                permission_dict[key] = Permission(
                    resource=perm.resource,
                    actions=perm.actions.copy(),
                    conditions=perm.conditions.copy() if perm.conditions else None
                )
        
        return list(permission_dict.values())
    
    def _clear_cache(self):
        """清除所有缓存"""
        self.role_cache.clear()
    
    def _clear_user_cache(self, user_id: str):
        """清除特定用户的缓存"""
        cache_key = f"user_permissions:{user_id}"
        if cache_key in self.role_cache:
            del self.role_cache[cache_key]

# 动态权限策略引擎
class PolicyEngine:
    """动态权限策略引擎"""
    
    def __init__(self):
        self.policies: Dict[str, 'Policy'] = {}
        self.policy_cache: Dict[str, bool] = {}
    
    def add_policy(self, policy: 'Policy'):
        """添加策略"""
        self.policies[policy.name] = policy
        self._clear_cache()
    
    def evaluate_policies(self, user_id: str, resource: str, 
                         action: ActionType, context: Dict[str, Any]) -> bool:
        """评估所有相关策略"""
        cache_key = self._get_cache_key(user_id, resource, action, context)
        
        if cache_key in self.policy_cache:
            return self.policy_cache[cache_key]
        
        # 找到所有适用的策略
        applicable_policies = []
        for policy in self.policies.values():
            if policy.applies_to(user_id, resource, action, context):
                applicable_policies.append(policy)
        
        # 评估策略
        result = True
        for policy in applicable_policies:
            policy_result = policy.evaluate(user_id, resource, action, context)
            
            if policy.effect == PolicyEffect.DENY:
                if policy_result:
                    result = False
                    break  # DENY策略优先
            elif policy.effect == PolicyEffect.ALLOW:
                result = result and policy_result
        
        # 缓存结果
        self.policy_cache[cache_key] = result
        
        return result
    
    def _get_cache_key(self, user_id: str, resource: str, 
                      action: ActionType, context: Dict[str, Any]) -> str:
        """生成缓存键"""
        context_str = json.dumps(context, sort_keys=True) if context else ""
        return f"{user_id}:{resource}:{action.value}:{hashlib.md5(context_str.encode()).hexdigest()}"
    
    def _clear_cache(self):
        """清除缓存"""
        self.policy_cache.clear()

class PolicyEffect(Enum):
    ALLOW = "allow"
    DENY = "deny"

class Policy(ABC):
    """策略抽象基类"""
    
    def __init__(self, name: str, effect: PolicyEffect):
        self.name = name
        self.effect = effect
    
    @abstractmethod
    def applies_to(self, user_id: str, resource: str, 
                  action: ActionType, context: Dict[str, Any]) -> bool:
        """检查策略是否适用于当前请求"""
        pass
    
    @abstractmethod
    def evaluate(self, user_id: str, resource: str,
                action: ActionType, context: Dict[str, Any]) -> bool:
        """评估策略"""
        pass

class TimeBasedAccessPolicy(Policy):
    """基于时间的访问策略"""
    
    def __init__(self, name: str, effect: PolicyEffect, 
                 allowed_hours: List[int], 
                 allowed_days: List[int] = None):
        super().__init__(name, effect)
        self.allowed_hours = allowed_hours
        self.allowed_days = allowed_days or list(range(7))  # 默认所有天
    
    def applies_to(self, user_id: str, resource: str,
                  action: ActionType, context: Dict[str, Any]) -> bool:
        """策略适用于所有请求"""
        return True
    
    def evaluate(self, user_id: str, resource: str,
                action: ActionType, context: Dict[str, Any]) -> bool:
        """评估时间限制"""
        now = datetime.now()
        
        # 检查小时限制
        if now.hour not in self.allowed_hours:
            return False
        
        # 检查天限制
        if now.weekday() not in self.allowed_days:
            return False
        
        return True

class IPWhitelistPolicy(Policy):
    """IP白名单策略"""
    
    def __init__(self, name: str, effect: PolicyEffect,
                 allowed_ips: List[str],
                 resources: List[str] = None):
        super().__init__(name, effect)
        self.allowed_networks = []
        
        # 解析IP地址和网络
        import ipaddress
        for ip in allowed_ips:
            try:
                self.allowed_networks.append(ipaddress.ip_network(ip, strict=False))
            except ValueError:
                logger.warning(f"无效的IP地址或网络: {ip}")
        
        self.resources = resources or ["*"]
    
    def applies_to(self, user_id: str, resource: str,
                  action: ActionType, context: Dict[str, Any]) -> bool:
        """检查策略是否适用于指定资源"""
        if "*" in self.resources:
            return True
        
        return any(resource.startswith(res) for res in self.resources)
    
    def evaluate(self, user_id: str, resource: str,
                action: ActionType, context: Dict[str, Any]) -> bool:
        """评估IP限制"""
        client_ip = context.get("client_ip")
        if not client_ip:
            return False
        
        try:
            import ipaddress
            client_ip_obj = ipaddress.ip_address(client_ip)
            
            for network in self.allowed_networks:
                if client_ip_obj in network:
                    return True
        except ValueError:
            return False
        
        return False

🎯 本节小结

通过本节学习,你已经掌握了:

多因素认证:实现强大的MFA系统,支持多种认证方式
RBAC权限控制:构建灵活的基于角色的访问控制系统
动态权限策略:实现基于上下文的智能权限管理
风险评估:建立全面的认证风险评估机制
会话管理:安全的会话创建和管理机制

🤔 思考题

  1. 认证平衡:如何在安全性和用户体验之间找到最佳平衡?
  2. 权限细化:如何设计更细粒度的权限控制而不影响系统性能?
  3. 动态调整:如何根据用户行为模式动态调整认证策略?

身份是数字世界的通行证,权限是数据安全的守护神! 🔐

👉 下一节:6.4 输入验证与清理