Skip to content

6.3 输入验证与清理

🎯 学习目标:构建全面的输入验证和数据清理安全防护体系
⏱️ 预计时间:40分钟
📊 难度等级:⭐⭐⭐⭐⭐

🛡️ 输入验证架构设计

🏗️ 多层次验证架构

🔍 智能输入验证系统

📋 高级验证框架

python
# 智能输入验证系统
import re
import html
import json
import hashlib
import urllib.parse
from typing import Dict, List, Any, Optional, Union, Callable, Tuple
from enum import Enum
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import bleach
from html5lib.sanitizer import HTMLSanitizer
import sqlparse
from urllib.parse import urlparse, parse_qs
import base64
import logging

logger = logging.getLogger(__name__)

class ValidationSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    CRITICAL = "critical"

class AttackType(Enum):
    XSS = "xss"
    SQL_INJECTION = "sql_injection"
    COMMAND_INJECTION = "command_injection"
    PATH_TRAVERSAL = "path_traversal"
    DESERIALIZATION = "deserialization"
    LDAP_INJECTION = "ldap_injection"
    XXE = "xxe"
    SSRF = "ssrf"
    CRLF_INJECTION = "crlf_injection"

@dataclass
class ValidationResult:
    """验证结果"""
    is_valid: bool
    cleaned_value: Any = None
    errors: List[str] = field(default_factory=list)
    warnings: List[str] = field(default_factory=list)
    severity: ValidationSeverity = ValidationSeverity.INFO
    attack_patterns: List[AttackType] = field(default_factory=list)
    metadata: Dict[str, Any] = field(default_factory=dict)

class InputValidator(ABC):
    """输入验证器抽象基类"""
    
    def __init__(self, name: str, config: Dict[str, Any] = None):
        self.name = name
        self.config = config or {}
        self.enabled = self.config.get('enabled', True)
    
    @abstractmethod
    def validate(self, value: Any, context: Dict[str, Any] = None) -> ValidationResult:
        """验证输入值"""
        pass
    
    def pre_process(self, value: Any) -> Any:
        """预处理输入值"""
        return value
    
    def post_process(self, value: Any, result: ValidationResult) -> Any:
        """后处理验证结果"""
        return result.cleaned_value if result.cleaned_value is not None else value

class ComprehensiveInputValidator:
    """综合输入验证器"""
    
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.validators = {}
        self.global_config = self.config.get('global', {})
        
        # 初始化内置验证器
        self._init_built_in_validators()
        
        # 攻击模式检测器
        self.attack_detector = AttackPatternDetector(
            self.config.get('attack_detection', {})
        )
        
        # 数据清理器
        self.data_sanitizer = DataSanitizer(
            self.config.get('sanitization', {})
        )
    
    def _init_built_in_validators(self):
        """初始化内置验证器"""
        validators_config = self.config.get('validators', {})
        
        # 基础数据类型验证器
        self.validators['string'] = StringValidator(
            'string', validators_config.get('string', {})
        )
        self.validators['numeric'] = NumericValidator(
            'numeric', validators_config.get('numeric', {})
        )
        self.validators['email'] = EmailValidator(
            'email', validators_config.get('email', {})
        )
        self.validators['url'] = URLValidator(
            'url', validators_config.get('url', {})
        )
        
        # 安全相关验证器
        self.validators['xss'] = XSSValidator(
            'xss', validators_config.get('xss', {})
        )
        self.validators['sql'] = SQLInjectionValidator(
            'sql', validators_config.get('sql', {})
        )
        self.validators['command'] = CommandInjectionValidator(
            'command', validators_config.get('command', {})
        )
        self.validators['path'] = PathTraversalValidator(
            'path', validators_config.get('path', {})
        )
        
        # 文件相关验证器
        self.validators['file'] = FileValidator(
            'file', validators_config.get('file', {})
        )
        self.validators['image'] = ImageValidator(
            'image', validators_config.get('image', {})
        )
        
        # 自定义验证器
        self.validators['json'] = JSONValidator(
            'json', validators_config.get('json', {})
        )
        self.validators['xml'] = XMLValidator(
            'xml', validators_config.get('xml', {})
        )
    
    def validate_input(self, data: Dict[str, Any], 
                      schema: Dict[str, Any],
                      context: Dict[str, Any] = None) -> Dict[str, ValidationResult]:
        """验证输入数据"""
        results = {}
        context = context or {}
        
        for field_name, field_value in data.items():
            field_schema = schema.get(field_name, {})
            
            if not field_schema:
                # 未定义的字段,使用默认验证
                field_schema = {'type': 'string', 'required': False}
            
            # 执行字段验证
            field_result = self._validate_field(
                field_name, field_value, field_schema, context
            )
            
            results[field_name] = field_result
        
        # 检查必需字段
        for field_name, field_schema in schema.items():
            if field_schema.get('required', False) and field_name not in data:
                results[field_name] = ValidationResult(
                    is_valid=False,
                    errors=[f"必需字段 '{field_name}' 缺失"],
                    severity=ValidationSeverity.ERROR
                )
        
        return results
    
    def _validate_field(self, field_name: str, field_value: Any,
                       field_schema: Dict[str, Any],
                       context: Dict[str, Any]) -> ValidationResult:
        """验证单个字段"""
        
        # 1. 预处理
        processed_value = self._preprocess_value(field_value, field_schema)
        
        # 2. 攻击模式检测
        attack_result = self.attack_detector.detect_attacks(processed_value, context)
        
        # 3. 类型验证
        field_type = field_schema.get('type', 'string')
        type_validator = self.validators.get(field_type)
        
        if not type_validator:
            return ValidationResult(
                is_valid=False,
                errors=[f"不支持的字段类型: {field_type}"],
                severity=ValidationSeverity.ERROR
            )
        
        # 4. 执行验证
        validation_result = type_validator.validate(processed_value, context)
        
        # 5. 合并攻击检测结果
        if attack_result.attack_patterns:
            validation_result.attack_patterns.extend(attack_result.attack_patterns)
            validation_result.warnings.extend(attack_result.warnings)
            validation_result.errors.extend(attack_result.errors)
            
            if attack_result.severity.value > validation_result.severity.value:
                validation_result.severity = attack_result.severity
                validation_result.is_valid = False
        
        # 6. 数据清理
        if validation_result.is_valid or self.global_config.get('clean_on_warning', False):
            cleaned_value = self.data_sanitizer.sanitize(
                validation_result.cleaned_value or processed_value,
                field_schema, context
            )
            validation_result.cleaned_value = cleaned_value
        
        # 7. 自定义验证规则
        custom_rules = field_schema.get('custom_rules', [])
        for rule in custom_rules:
            custom_result = self._apply_custom_rule(
                validation_result.cleaned_value or processed_value,
                rule, context
            )
            
            if not custom_result.is_valid:
                validation_result.is_valid = False
                validation_result.errors.extend(custom_result.errors)
        
        return validation_result
    
    def _preprocess_value(self, value: Any, schema: Dict[str, Any]) -> Any:
        """预处理值"""
        if value is None:
            return value
        
        # 类型转换
        target_type = schema.get('type', 'string')
        
        if target_type == 'string' and not isinstance(value, str):
            value = str(value)
        elif target_type == 'numeric':
            try:
                if isinstance(value, str):
                    if '.' in value:
                        value = float(value)
                    else:
                        value = int(value)
            except ValueError:
                pass  # 保持原值,让验证器处理
        
        # 去除前后空格(对字符串类型)
        if isinstance(value, str) and schema.get('trim', True):
            value = value.strip()
        
        return value
    
    def _apply_custom_rule(self, value: Any, rule: Dict[str, Any],
                          context: Dict[str, Any]) -> ValidationResult:
        """应用自定义验证规则"""
        rule_type = rule.get('type')
        
        if rule_type == 'regex':
            pattern = rule.get('pattern')
            if pattern and isinstance(value, str):
                if not re.match(pattern, value):
                    return ValidationResult(
                        is_valid=False,
                        errors=[rule.get('message', f'值不匹配模式: {pattern}')]
                    )
        
        elif rule_type == 'length':
            min_length = rule.get('min', 0)
            max_length = rule.get('max', float('inf'))
            
            if hasattr(value, '__len__'):
                length = len(value)
                if length < min_length or length > max_length:
                    return ValidationResult(
                        is_valid=False,
                        errors=[f'长度必须在 {min_length}{max_length} 之间']
                    )
        
        elif rule_type == 'range':
            min_val = rule.get('min')
            max_val = rule.get('max')
            
            try:
                num_value = float(value)
                if (min_val is not None and num_value < min_val) or \
                   (max_val is not None and num_value > max_val):
                    return ValidationResult(
                        is_valid=False,
                        errors=[f'值必须在 {min_val}{max_val} 之间']
                    )
            except (ValueError, TypeError):
                return ValidationResult(
                    is_valid=False,
                    errors=['无法验证数值范围:值不是数字']
                )
        
        elif rule_type == 'function':
            func_name = rule.get('function')
            if func_name and hasattr(self, f'_custom_rule_{func_name}'):
                custom_func = getattr(self, f'_custom_rule_{func_name}')
                return custom_func(value, rule, context)
        
        return ValidationResult(is_valid=True)

class AttackPatternDetector:
    """攻击模式检测器"""
    
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.patterns = self._load_attack_patterns()
        self.enabled_detectors = self.config.get('enabled', list(AttackType))
    
    def _load_attack_patterns(self) -> Dict[AttackType, List[Dict[str, Any]]]:
        """加载攻击模式"""
        return {
            AttackType.XSS: [
                {
                    'pattern': r'<\s*script[^>]*>.*?</\s*script\s*>',
                    'flags': re.IGNORECASE | re.DOTALL,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': 'Script标签XSS攻击'
                },
                {
                    'pattern': r'javascript\s*:',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': 'JavaScript协议XSS攻击'
                },
                {
                    'pattern': r'on\w+\s*=',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.ERROR,
                    'description': 'HTML事件处理器XSS攻击'
                },
                {
                    'pattern': r'<\s*iframe[^>]*>',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.WARNING,
                    'description': 'IFrame注入'
                }
            ],
            
            AttackType.SQL_INJECTION: [
                {
                    'pattern': r"'\s*(OR|AND)\s*'?\d*'?\s*=\s*'?\d*",
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': '经典SQL注入模式'
                },
                {
                    'pattern': r'(UNION\s+SELECT|SELECT\s+.*\s+FROM)',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': 'UNION SELECT注入'
                },
                {
                    'pattern': r'(DROP\s+TABLE|DELETE\s+FROM|UPDATE\s+.*\s+SET)',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': '破坏性SQL操作'
                },
                {
                    'pattern': r'--\s*$',
                    'flags': re.MULTILINE,
                    'severity': ValidationSeverity.WARNING,
                    'description': 'SQL注释注入'
                }
            ],
            
            AttackType.COMMAND_INJECTION: [
                {
                    'pattern': r'[;&|`$\(\)]',
                    'flags': 0,
                    'severity': ValidationSeverity.ERROR,
                    'description': '系统命令注入字符'
                },
                {
                    'pattern': r'(nc|netcat|telnet|ssh|curl|wget)\s+',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': '网络命令注入'
                },
                {
                    'pattern': r'(rm|del|format|fdisk)\s+',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': '破坏性命令注入'
                }
            ],
            
            AttackType.PATH_TRAVERSAL: [
                {
                    'pattern': r'\.\./|\.\.\|',
                    'flags': 0,
                    'severity': ValidationSeverity.ERROR,
                    'description': '目录遍历攻击'
                },
                {
                    'pattern': r'%2e%2e%2f|%2e%2e/|\.\.%2f',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.ERROR,
                    'description': 'URL编码的目录遍历'
                },
                {
                    'pattern': r'/etc/passwd|/etc/shadow|C:\\Windows\\',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': '系统敏感文件访问'
                }
            ],
            
            AttackType.DESERIALIZATION: [
                {
                    'pattern': r'(__reduce__|__setstate__|eval\(|exec\()',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': 'Python反序列化攻击'
                },
                {
                    'pattern': r'(ObjectInputStream|readObject)',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': 'Java反序列化攻击'
                }
            ],
            
            AttackType.XXE: [
                {
                    'pattern': r'<!ENTITY\s+\w+\s+SYSTEM',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': 'XML外部实体注入'
                },
                {
                    'pattern': r'<!DOCTYPE[^>]*\[.*ENTITY',
                    'flags': re.IGNORECASE | re.DOTALL,
                    'severity': ValidationSeverity.CRITICAL,
                    'description': 'XXE攻击载荷'
                }
            ],
            
            AttackType.SSRF: [
                {
                    'pattern': r'(file://|ftp://|gopher://|dict://)',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.ERROR,
                    'description': 'SSRF协议攻击'
                },
                {
                    'pattern': r'(localhost|127\.0\.0\.1|0\.0\.0\.0|10\.|192\.168\.|172\.)',
                    'flags': re.IGNORECASE,
                    'severity': ValidationSeverity.WARNING,
                    'description': '内网地址访问'
                }
            ]
        }
    
    def detect_attacks(self, value: Any, context: Dict[str, Any] = None) -> ValidationResult:
        """检测攻击模式"""
        if not isinstance(value, str):
            return ValidationResult(is_valid=True)
        
        detected_attacks = []
        warnings = []
        errors = []
        max_severity = ValidationSeverity.INFO
        
        for attack_type in self.enabled_detectors:
            if attack_type not in self.patterns:
                continue
            
            patterns = self.patterns[attack_type]
            
            for pattern_config in patterns:
                pattern = pattern_config['pattern']
                flags = pattern_config.get('flags', 0)
                severity = pattern_config['severity']
                description = pattern_config['description']
                
                if re.search(pattern, value, flags):
                    detected_attacks.append(attack_type)
                    
                    message = f"{description}: {attack_type.value}"
                    
                    if severity == ValidationSeverity.CRITICAL:
                        errors.append(message)
                    elif severity == ValidationSeverity.ERROR:
                        errors.append(message)
                    elif severity == ValidationSeverity.WARNING:
                        warnings.append(message)
                    
                    if severity.value > max_severity.value:
                        max_severity = severity
        
        is_valid = max_severity not in [ValidationSeverity.CRITICAL, ValidationSeverity.ERROR]
        
        return ValidationResult(
            is_valid=is_valid,
            warnings=warnings,
            errors=errors,
            severity=max_severity,
            attack_patterns=detected_attacks
        )

class DataSanitizer:
    """数据清理器"""
    
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        
        # HTML清理配置
        self.html_config = self.config.get('html', {})
        self.allowed_tags = self.html_config.get('allowed_tags', [
            'p', 'br', 'strong', 'em', 'u', 'ol', 'ul', 'li',
            'h1', 'h2', 'h3', 'h4', 'h5', 'h6'
        ])
        self.allowed_attributes = self.html_config.get('allowed_attributes', {
            '*': ['class', 'id'],
            'a': ['href', 'title'],
            'img': ['src', 'alt', 'width', 'height']
        })
    
    def sanitize(self, value: Any, schema: Dict[str, Any],
                context: Dict[str, Any] = None) -> Any:
        """清理数据"""
        if value is None:
            return value
        
        sanitization_mode = schema.get('sanitization', 'auto')
        
        if sanitization_mode == 'none':
            return value
        elif sanitization_mode == 'html':
            return self._sanitize_html(value)
        elif sanitization_mode == 'strict':
            return self._strict_sanitize(value)
        elif sanitization_mode == 'auto':
            return self._auto_sanitize(value, schema, context)
        else:
            return value
    
    def _sanitize_html(self, value: str) -> str:
        """HTML清理"""
        if not isinstance(value, str):
            return value
        
        # 使用bleach进行HTML清理
        cleaned = bleach.clean(
            value,
            tags=self.allowed_tags,
            attributes=self.allowed_attributes,
            strip=True
        )
        
        # 额外的安全检查
        cleaned = self._remove_dangerous_patterns(cleaned)
        
        return cleaned
    
    def _strict_sanitize(self, value: str) -> str:
        """严格清理"""
        if not isinstance(value, str):
            return value
        
        # 移除所有HTML标签
        cleaned = re.sub(r'<[^>]+>', '', value)
        
        # HTML实体解码
        cleaned = html.unescape(cleaned)
        
        # 移除危险字符
        dangerous_chars = ['<', '>', '"', "'", '&', '%', '\\', '/', '?']
        for char in dangerous_chars:
            cleaned = cleaned.replace(char, '')
        
        # 移除控制字符
        cleaned = ''.join(c for c in cleaned if ord(c) >= 32 or ord(c) in [9, 10, 13])
        
        return cleaned.strip()
    
    def _auto_sanitize(self, value: str, schema: Dict[str, Any],
                      context: Dict[str, Any] = None) -> str:
        """自动清理"""
        if not isinstance(value, str):
            return value
        
        field_type = schema.get('type', 'string')
        
        if field_type == 'email':
            return self._sanitize_email(value)
        elif field_type == 'url':
            return self._sanitize_url(value)
        elif field_type == 'numeric':
            return self._sanitize_numeric(value)
        elif 'html' in schema.get('format', '').lower():
            return self._sanitize_html(value)
        else:
            return self._sanitize_text(value)
    
    def _sanitize_email(self, email: str) -> str:
        """邮箱清理"""
        # 转换为小写
        email = email.lower().strip()
        
        # 移除危险字符
        email = re.sub(r'[<>"\'\\\r\n]', '', email)
        
        return email
    
    def _sanitize_url(self, url: str) -> str:
        """URL清理"""
        url = url.strip()
        
        # 检查协议
        parsed = urlparse(url)
        if parsed.scheme not in ['http', 'https', 'ftp']:
            # 如果没有协议或协议不安全,添加https
            if not parsed.scheme:
                url = 'https://' + url
            else:
                return ''  # 移除危险协议
        
        # URL编码特殊字符
        url = urllib.parse.quote(url, safe=':/?#[]@!$&\'()*+,;=')
        
        return url
    
    def _sanitize_numeric(self, value: str) -> str:
        """数字清理"""
        # 只保留数字、小数点和负号
        return re.sub(r'[^0-9.\-+eE]', '', value)
    
    def _sanitize_text(self, text: str) -> str:
        """文本清理"""
        # HTML实体编码
        text = html.escape(text)
        
        # 移除控制字符(保留换行和制表符)
        text = ''.join(c for c in text if ord(c) >= 32 or ord(c) in [9, 10, 13])
        
        # 规范化空白字符
        text = re.sub(r'\s+', ' ', text)
        
        return text.strip()
    
    def _remove_dangerous_patterns(self, content: str) -> str:
        """移除危险模式"""
        # 移除JavaScript
        content = re.sub(r'javascript\s*:', '', content, flags=re.IGNORECASE)
        
        # 移除data URLs
        content = re.sub(r'data\s*:', '', content, flags=re.IGNORECASE)
        
        # 移除vbscript
        content = re.sub(r'vbscript\s*:', '', content, flags=re.IGNORECASE)
        
        # 移除危险的CSS表达式
        content = re.sub(r'expression\s*\(', '', content, flags=re.IGNORECASE)
        
        return content

# 具体验证器实现
class StringValidator(InputValidator):
    """字符串验证器"""
    
    def validate(self, value: Any, context: Dict[str, Any] = None) -> ValidationResult:
        if value is None:
            return ValidationResult(is_valid=True, cleaned_value=None)
        
        if not isinstance(value, str):
            try:
                value = str(value)
            except Exception as e:
                return ValidationResult(
                    is_valid=False,
                    errors=[f"无法转换为字符串: {str(e)}"]
                )
        
        errors = []
        warnings = []
        
        # 长度检查
        min_length = self.config.get('min_length', 0)
        max_length = self.config.get('max_length', 10000)
        
        if len(value) < min_length:
            errors.append(f"字符串长度不能少于 {min_length}")
        
        if len(value) > max_length:
            errors.append(f"字符串长度不能超过 {max_length}")
        
        # 字符集检查
        allowed_chars = self.config.get('allowed_chars')
        if allowed_chars:
            pattern = f"[^{re.escape(allowed_chars)}]"
            if re.search(pattern, value):
                errors.append(f"包含不允许的字符")
        
        # 禁止字符检查
        forbidden_chars = self.config.get('forbidden_chars', '')
        if forbidden_chars:
            for char in forbidden_chars:
                if char in value:
                    errors.append(f"包含禁止字符: {char}")
        
        return ValidationResult(
            is_valid=len(errors) == 0,
            cleaned_value=value,
            errors=errors,
            warnings=warnings
        )

class XSSValidator(InputValidator):
    """XSS验证器"""
    
    def validate(self, value: Any, context: Dict[str, Any] = None) -> ValidationResult:
        if not isinstance(value, str):
            return ValidationResult(is_valid=True, cleaned_value=value)
        
        errors = []
        warnings = []
        attack_patterns = []
        
        # XSS模式检测
        xss_patterns = [
            (r'<\s*script[^>]*>.*?</\s*script\s*>', 'Script标签', ValidationSeverity.CRITICAL),
            (r'javascript\s*:', 'JavaScript协议', ValidationSeverity.CRITICAL),
            (r'on\w+\s*=', '事件处理器', ValidationSeverity.ERROR),
            (r'<\s*iframe[^>]*>', 'IFrame注入', ValidationSeverity.WARNING),
            (r'<\s*object[^>]*>', 'Object标签', ValidationSeverity.WARNING),
            (r'<\s*embed[^>]*>', 'Embed标签', ValidationSeverity.WARNING),
            (r'<\s*link[^>]*>', 'Link标签', ValidationSeverity.WARNING),
            (r'<\s*meta[^>]*>', 'Meta标签', ValidationSeverity.WARNING),
            (r'eval\s*\(', 'Eval函数', ValidationSeverity.CRITICAL),
            (r'setTimeout\s*\(', 'SetTimeout函数', ValidationSeverity.ERROR),
            (r'setInterval\s*\(', 'SetInterval函数', ValidationSeverity.ERROR)
        ]
        
        max_severity = ValidationSeverity.INFO
        
        for pattern, description, severity in xss_patterns:
            if re.search(pattern, value, re.IGNORECASE | re.DOTALL):
                attack_patterns.append(AttackType.XSS)
                message = f"检测到XSS攻击模式: {description}"
                
                if severity == ValidationSeverity.CRITICAL:
                    errors.append(message)
                elif severity == ValidationSeverity.ERROR:
                    errors.append(message)
                else:
                    warnings.append(message)
                
                if severity.value > max_severity.value:
                    max_severity = severity
        
        # 清理XSS
        cleaned_value = self._clean_xss(value)
        
        return ValidationResult(
            is_valid=max_severity not in [ValidationSeverity.CRITICAL, ValidationSeverity.ERROR],
            cleaned_value=cleaned_value,
            errors=errors,
            warnings=warnings,
            severity=max_severity,
            attack_patterns=attack_patterns
        )
    
    def _clean_xss(self, value: str) -> str:
        """清理XSS攻击"""
        # 移除script标签
        value = re.sub(r'<\s*script[^>]*>.*?</\s*script\s*>', '', value, flags=re.IGNORECASE | re.DOTALL)
        
        # 移除JavaScript协议
        value = re.sub(r'javascript\s*:', '', value, flags=re.IGNORECASE)
        
        # 移除事件处理器
        value = re.sub(r'\s*on\w+\s*=\s*["\'][^"\']*["\']', '', value, flags=re.IGNORECASE)
        
        # HTML实体编码
        value = html.escape(value)
        
        return value

class FileValidator(InputValidator):
    """文件验证器"""
    
    def validate(self, value: Any, context: Dict[str, Any] = None) -> ValidationResult:
        if not hasattr(value, 'read'):  # 检查是否为文件对象
            return ValidationResult(
                is_valid=False,
                errors=["不是有效的文件对象"]
            )
        
        errors = []
        warnings = []
        
        # 文件大小检查
        max_size = self.config.get('max_size', 10 * 1024 * 1024)  # 默认10MB
        
        if hasattr(value, 'size') and value.size > max_size:
            errors.append(f"文件大小超过限制: {max_size} bytes")
        
        # 文件类型检查
        allowed_types = self.config.get('allowed_types', [])
        if allowed_types and hasattr(value, 'content_type'):
            if value.content_type not in allowed_types:
                errors.append(f"不允许的文件类型: {value.content_type}")
        
        # 文件扩展名检查
        allowed_extensions = self.config.get('allowed_extensions', [])
        if allowed_extensions and hasattr(value, 'name'):
            ext = value.name.split('.')[-1].lower() if '.' in value.name else ''
            if ext not in allowed_extensions:
                errors.append(f"不允许的文件扩展名: {ext}")
        
        # 文件内容安全检查
        if self.config.get('scan_content', True):
            content_result = self._scan_file_content(value)
            errors.extend(content_result.errors)
            warnings.extend(content_result.warnings)
        
        return ValidationResult(
            is_valid=len(errors) == 0,
            cleaned_value=value,
            errors=errors,
            warnings=warnings
        )
    
    def _scan_file_content(self, file_obj) -> ValidationResult:
        """扫描文件内容"""
        errors = []
        warnings = []
        
        try:
            # 读取文件头部进行检查
            file_obj.seek(0)
            header = file_obj.read(1024)
            file_obj.seek(0)
            
            # 检查恶意签名
            malicious_signatures = [
                b'PK\x03\x04',  # ZIP文件签名 - 可能包含恶意脚本
                b'MZ',          # PE文件签名 - 可执行文件
                b'\x7fELF',     # ELF文件签名 - Linux可执行文件
            ]
            
            for signature in malicious_signatures:
                if header.startswith(signature):
                    warnings.append(f"检测到潜在危险文件类型")
                    break
            
            # 检查脚本内容
            if isinstance(header, bytes):
                try:
                    text_content = header.decode('utf-8', errors='ignore')
                    script_patterns = [
                        r'<\s*script[^>]*>',
                        r'javascript\s*:',
                        r'eval\s*\(',
                        r'system\s*\(',
                        r'exec\s*\('
                    ]
                    
                    for pattern in script_patterns:
                        if re.search(pattern, text_content, re.IGNORECASE):
                            warnings.append("文件包含潜在危险脚本内容")
                            break
                except:
                    pass  # 忽略解码错误
        
        except Exception as e:
            warnings.append(f"文件内容扫描异常: {str(e)}")
        
        return ValidationResult(
            is_valid=True,
            errors=errors,
            warnings=warnings
        )

🧹 高级数据清理技术

🔍 智能清理引擎

python
# 高级数据清理系统
import re
import unicodedata
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
import hashlib
import base64
import json
from urllib.parse import unquote, quote
import html
import xml.etree.ElementTree as ET
from xml.parsers.expat import ExpatError

class AdvancedDataSanitizer:
    """高级数据清理器"""
    
    def __init__(self, config: Dict[str, Any] = None):
        self.config = config or {}
        self.cleaning_rules = self._load_cleaning_rules()
        self.context_aware = self.config.get('context_aware', True)
        
        # 清理策略
        self.strategies = {
            'conservative': self._conservative_clean,
            'aggressive': self._aggressive_clean,
            'balanced': self._balanced_clean,
            'custom': self._custom_clean
        }
    
    def _load_cleaning_rules(self) -> Dict[str, List[Dict[str, Any]]]:
        """加载清理规则"""
        return {
            'html': [
                {
                    'name': 'remove_scripts',
                    'pattern': r'<\s*script[^>]*>.*?</\s*script\s*>',
                    'replacement': '',
                    'flags': re.IGNORECASE | re.DOTALL
                },
                {
                    'name': 'remove_dangerous_attributes',
                    'pattern': r'\s*(on\w+|javascript:|vbscript:|data:)\s*=\s*["\'][^"\']*["\']',
                    'replacement': '',
                    'flags': re.IGNORECASE
                },
                {
                    'name': 'sanitize_links',
                    'pattern': r'<\s*a\s+([^>]*\s+)?href\s*=\s*["\']([^"\']*)["\']',
                    'replacement': lambda m: self._sanitize_link(m),
                    'flags': re.IGNORECASE
                }
            ],
            
            'sql': [
                {
                    'name': 'escape_quotes',
                    'pattern': r"'",
                    'replacement': "''",
                    'flags': 0
                },
                {
                    'name': 'remove_comments',
                    'pattern': r'--.*$',
                    'replacement': '',
                    'flags': re.MULTILINE
                },
                {
                    'name': 'remove_multiline_comments',
                    'pattern': r'/\*.*?\*/',
                    'replacement': '',
                    'flags': re.DOTALL
                }
            ],
            
            'command': [
                {
                    'name': 'escape_shell_chars',
                    'pattern': r'([;&|`$\(\)\\])',
                    'replacement': r'\\\1',
                    'flags': 0
                },
                {
                    'name': 'remove_dangerous_commands',
                    'pattern': r'\b(rm|del|format|fdisk|nc|netcat|curl|wget)\b',
                    'replacement': '',
                    'flags': re.IGNORECASE
                }
            ],
            
            'path': [
                {
                    'name': 'normalize_separators',
                    'pattern': r'[/\\]+',
                    'replacement': '/',
                    'flags': 0
                },
                {
                    'name': 'remove_traversal',
                    'pattern': r'\.\./',
                    'replacement': '',
                    'flags': 0
                },
                {
                    'name': 'decode_url_encoding',
                    'pattern': r'%[0-9a-fA-F]{2}',
                    'replacement': lambda m: unquote(m.group(0)),
                    'flags': 0
                }
            ],
            
            'unicode': [
                {
                    'name': 'normalize_unicode',
                    'pattern': None,
                    'replacement': lambda text: unicodedata.normalize('NFKC', text),
                    'flags': 0
                },
                {
                    'name': 'remove_control_chars',
                    'pattern': r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]',
                    'replacement': '',
                    'flags': 0
                },
                {
                    'name': 'remove_rtl_override',
                    'pattern': r'[\u202E\u202D\u2066-\u2069]',
                    'replacement': '',
                    'flags': 0
                }
            ]
        }
    
    def clean_data(self, data: Any, cleaning_type: str = 'balanced',
                  context: Dict[str, Any] = None) -> Any:
        """清理数据"""
        context = context or {}
        
        if cleaning_type not in self.strategies:
            cleaning_type = 'balanced'
        
        strategy = self.strategies[cleaning_type]
        return strategy(data, context)
    
    def _conservative_clean(self, data: Any, context: Dict[str, Any]) -> Any:
        """保守清理策略 - 最小化更改"""
        if isinstance(data, str):
            # 只进行基本的安全清理
            data = self._apply_unicode_normalization(data)
            data = self._remove_control_characters(data)
            data = html.escape(data)
        
        elif isinstance(data, dict):
            return {k: self._conservative_clean(v, context) for k, v in data.items()}
        
        elif isinstance(data, list):
            return [self._conservative_clean(item, context) for item in data]
        
        return data
    
    def _aggressive_clean(self, data: Any, context: Dict[str, Any]) -> Any:
        """激进清理策略 - 移除所有潜在危险内容"""
        if isinstance(data, str):
            # 应用所有清理规则
            for rule_type, rules in self.cleaning_rules.items():
                data = self._apply_rules(data, rules)
            
            # 额外的激进清理
            data = self._remove_all_html_tags(data)
            data = self._remove_special_characters(data)
            data = self._truncate_if_too_long(data, 1000)
        
        elif isinstance(data, dict):
            return {k: self._aggressive_clean(v, context) for k, v in data.items()}
        
        elif isinstance(data, list):
            return [self._aggressive_clean(item, context) for item in data]
        
        return data
    
    def _balanced_clean(self, data: Any, context: Dict[str, Any]) -> Any:
        """平衡清理策略 - 在安全性和功能性之间平衡"""
        if isinstance(data, str):
            # 检测数据类型并应用相应清理
            data_type = self._detect_data_type(data, context)
            
            if data_type == 'html':
                data = self._clean_html_balanced(data)
            elif data_type == 'sql':
                data = self._escape_sql_content(data)
            elif data_type == 'path':
                data = self._sanitize_path(data)
            elif data_type == 'url':
                data = self._sanitize_url_content(data)
            else:
                data = self._clean_text_content(data)
        
        elif isinstance(data, dict):
            return {k: self._balanced_clean(v, context) for k, v in data.items()}
        
        elif isinstance(data, list):
            return [self._balanced_clean(item, context) for item in data]
        
        return data
    
    def _custom_clean(self, data: Any, context: Dict[str, Any]) -> Any:
        """自定义清理策略"""
        custom_rules = context.get('custom_rules', [])
        
        if isinstance(data, str):
            for rule in custom_rules:
                data = self._apply_custom_rule(data, rule)
        
        elif isinstance(data, dict):
            return {k: self._custom_clean(v, context) for k, v in data.items()}
        
        elif isinstance(data, list):
            return [self._custom_clean(item, context) for item in data]
        
        return data
    
    def _detect_data_type(self, data: str, context: Dict[str, Any]) -> str:
        """检测数据类型"""
        # 基于上下文提示
        field_name = context.get('field_name', '').lower()
        field_type = context.get('field_type', '').lower()
        
        if 'html' in field_name or 'content' in field_name:
            return 'html'
        elif 'url' in field_name or 'link' in field_name:
            return 'url'
        elif 'path' in field_name or 'file' in field_name:
            return 'path'
        elif 'sql' in field_name or 'query' in field_name:
            return 'sql'
        
        # 基于内容检测
        if re.search(r'<[^>]+>', data):
            return 'html'
        elif re.search(r'^https?://', data):
            return 'url'
        elif re.search(r'[/\\]', data) and len(data.split()) == 1:
            return 'path'
        elif re.search(r'\b(SELECT|INSERT|UPDATE|DELETE)\b', data, re.IGNORECASE):
            return 'sql'
        
        return 'text'
    
    def _clean_html_balanced(self, html_content: str) -> str:
        """平衡的HTML清理"""
        # 移除危险标签和属性
        dangerous_tags = ['script', 'object', 'embed', 'iframe', 'form', 'input', 'meta', 'link']
        for tag in dangerous_tags:
            pattern = f'<\s*{tag}[^>]*>.*?</\s*{tag}\s*>'
            html_content = re.sub(pattern, '', html_content, flags=re.IGNORECASE | re.DOTALL)
        
        # 移除危险属性
        dangerous_attrs = ['onload', 'onclick', 'onerror', 'onmouseover', 'onfocus', 'onblur']
        for attr in dangerous_attrs:
            pattern = f'\\s*{attr}\\s*=\\s*["\'][^"\']*["\']'
            html_content = re.sub(pattern, '', html_content, flags=re.IGNORECASE)
        
        # 清理JavaScript和危险协议
        html_content = re.sub(r'javascript\s*:', '', html_content, flags=re.IGNORECASE)
        html_content = re.sub(r'vbscript\s*:', '', html_content, flags=re.IGNORECASE)
        html_content = re.sub(r'data\s*:', '', html_content, flags=re.IGNORECASE)
        
        return html_content
    
    def _escape_sql_content(self, sql_content: str) -> str:
        """SQL内容转义"""
        # 转义单引号
        sql_content = sql_content.replace("'", "''")
        
        # 移除SQL注释
        sql_content = re.sub(r'--.*$', '', sql_content, flags=re.MULTILINE)
        sql_content = re.sub(r'/\*.*?\*/', '', sql_content, flags=re.DOTALL)
        
        # 移除危险的SQL关键字
        dangerous_keywords = ['DROP', 'DELETE', 'TRUNCATE', 'ALTER', 'CREATE', 'EXEC', 'EXECUTE']
        for keyword in dangerous_keywords:
            pattern = f'\\b{keyword}\\b'
            sql_content = re.sub(pattern, '', sql_content, flags=re.IGNORECASE)
        
        return sql_content
    
    def _sanitize_path(self, path: str) -> str:
        """路径清理"""
        # 移除路径遍历
        path = re.sub(r'\.\./', '', path)
        path = re.sub(r'\.\.\\', '', path)
        
        # URL解码
        path = unquote(path)
        
        # 规范化路径分隔符
        path = path.replace('\\', '/')
        
        # 移除多余的斜杠
        path = re.sub(r'/+', '/', path)
        
        # 限制路径长度
        if len(path) > 255:
            path = path[:255]
        
        return path
    
    def _sanitize_url_content(self, url: str) -> str:
        """URL内容清理"""
        # 移除危险协议
        dangerous_protocols = ['javascript:', 'vbscript:', 'data:', 'file:', 'ftp:']
        for protocol in dangerous_protocols:
            if url.lower().startswith(protocol):
                return ''
        
        # 如果没有协议,添加https
        if not re.match(r'^https?://', url):
            url = 'https://' + url
        
        # URL编码特殊字符
        url = quote(url, safe=':/?#[]@!$&\'()*+,;=')
        
        return url
    
    def _clean_text_content(self, text: str) -> str:
        """文本内容清理"""
        # Unicode规范化
        text = unicodedata.normalize('NFKC', text)
        
        # 移除控制字符
        text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
        
        # 移除RTL覆盖字符
        text = re.sub(r'[\u202E\u202D\u2066-\u2069]', '', text)
        
        # HTML实体编码特殊字符
        text = html.escape(text)
        
        # 规范化空白字符
        text = re.sub(r'\s+', ' ', text)
        
        return text.strip()
    
    def _apply_rules(self, data: str, rules: List[Dict[str, Any]]) -> str:
        """应用清理规则"""
        for rule in rules:
            pattern = rule.get('pattern')
            replacement = rule.get('replacement', '')
            flags = rule.get('flags', 0)
            
            if pattern is None:
                # 特殊处理函数
                if callable(replacement):
                    data = replacement(data)
            else:
                if callable(replacement):
                    data = re.sub(pattern, replacement, data, flags=flags)
                else:
                    data = re.sub(pattern, replacement, data, flags=flags)
        
        return data
    
    def _apply_unicode_normalization(self, text: str) -> str:
        """Unicode规范化"""
        return unicodedata.normalize('NFKC', text)
    
    def _remove_control_characters(self, text: str) -> str:
        """移除控制字符"""
        return re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', text)
    
    def _remove_all_html_tags(self, html: str) -> str:
        """移除所有HTML标签"""
        return re.sub(r'<[^>]+>', '', html)
    
    def _remove_special_characters(self, text: str) -> str:
        """移除特殊字符"""
        return re.sub(r'[^\w\s\-.,!?]', '', text)
    
    def _truncate_if_too_long(self, text: str, max_length: int) -> str:
        """截断过长文本"""
        if len(text) > max_length:
            return text[:max_length] + '...'
        return text
    
    def _sanitize_link(self, match) -> str:
        """清理链接"""
        href = match.group(2)
        
        # 检查危险协议
        if href.lower().startswith(('javascript:', 'vbscript:', 'data:')):
            return '<a href="#">'
        
        # 外部链接添加安全属性
        if href.startswith('http'):
            return f'<a {match.group(1) or ""}href="{href}" rel="noopener noreferrer" target="_blank"'
        
        return match.group(0)

🎯 本节小结

通过本节学习,你已经掌握了:

多层验证架构:构建从客户端到数据库的全方位验证体系
智能攻击检测:识别和防护各种常见的Web攻击模式
高级数据清理:实现上下文感知的智能数据清理系统
自定义规则引擎:构建灵活的验证和清理规则系统
性能优化:缓存和优化验证过程以提高系统性能

🤔 思考题

  1. 验证策略:如何根据数据敏感性动态调整验证严格程度?
  2. 性能平衡:在高并发场景下如何平衡验证完整性和响应速度?
  3. 误报处理:如何减少安全验证中的误报率?

输入验证是安全防线的第一道门槛,数据清理是系统安全的最后保障! 🛡️

👉 下一节:6.5 安全监控与日志