📥 MySQL 导入数据

数据恢复与导入 - 多种格式和方法详解

数据导入概述

MySQL 提供了多种数据导入方法,可以将各种格式的数据文件导入到数据库中。数据导入是数据迁移、恢复、批量处理和系统集成的重要手段。

🔧 mysql 命令行

执行 SQL 脚本文件,恢复数据库结构和数据

📊 LOAD DATA INFILE

高效导入 CSV、TSV 等格式的数据文件

🖥️ MySQL Workbench

图形界面工具,支持多种导入格式

📝 mysqlimport

MySQL 官方的数据导入工具

📄 SQL

SQL 脚本文件

📊 CSV

逗号分隔值格式

📋 TSV

制表符分隔格式

📑 JSON

JavaScript 对象表示法

📈 XML

可扩展标记语言

📊 Excel

Microsoft Excel 格式

准备测试环境

-- 创建测试数据库 CREATE DATABASE import_demo; USE import_demo; -- 创建用户表 CREATE TABLE users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) NOT NULL UNIQUE, email VARCHAR(100) NOT NULL, age INT, city VARCHAR(50), salary DECIMAL(10,2), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 创建订单表 CREATE TABLE orders ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, order_number VARCHAR(50) NOT NULL UNIQUE, total_amount DECIMAL(10,2) NOT NULL, status ENUM('pending', 'paid', 'shipped', 'delivered', 'cancelled') DEFAULT 'pending', order_date DATE NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users(id) ON DELETE CASCADE ); -- 创建产品表 CREATE TABLE products ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(100) NOT NULL, description TEXT, price DECIMAL(10,2) NOT NULL, category VARCHAR(50), stock_quantity INT DEFAULT 0, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 创建临时导入表(用于数据清洗) CREATE TABLE temp_import ( id INT AUTO_INCREMENT PRIMARY KEY, raw_data TEXT, processed BOOLEAN DEFAULT FALSE, import_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); SELECT 'Import demo database created successfully' AS message;

使用 mysql 命令行导入

导入 SQL 脚本文件

基本导入语法

# 基本语法 mysql -u username -p database_name < input_file.sql # 导入到指定数据库 mysql -u root -p import_demo < backup.sql # 导入时创建数据库(如果 SQL 文件包含 CREATE DATABASE) mysql -u root -p < full_backup.sql # 远程导入 mysql -h remote_host -P 3306 -u username -p database_name < backup.sql # 导入压缩文件 gunzip < backup.sql.gz | mysql -u root -p database_name zcat backup.sql.gz | mysql -u root -p database_name # 导入时显示进度(使用 pv 工具) pv backup.sql | mysql -u root -p database_name # 导入时忽略错误 mysql -u root -p --force database_name < backup.sql # 导入时设置字符集 mysql -u root -p --default-character-set=utf8mb4 database_name < backup.sql

在 MySQL 客户端内导入

-- 连接到 MySQL mysql -u root -p -- 选择数据库 USE import_demo; -- 使用 SOURCE 命令导入 SOURCE /path/to/backup.sql; -- 或者使用 \. 命令(简写) \. /path/to/backup.sql -- 导入前设置参数 SET autocommit = 0; SET unique_checks = 0; SET foreign_key_checks = 0; -- 导入文件 SOURCE /path/to/large_backup.sql; -- 恢复参数 SET foreign_key_checks = 1; SET unique_checks = 1; SET autocommit = 1; COMMIT; -- 检查导入结果 SHOW TABLES; SELECT COUNT(*) FROM users; SELECT COUNT(*) FROM orders; SELECT COUNT(*) FROM products;

创建示例 SQL 文件

-- 创建示例数据文件 sample_data.sql -- 可以保存为文件然后导入 -- 插入用户数据 INSERT INTO users (username, email, age, city, salary) VALUES ('alice', 'alice@example.com', 28, 'Beijing', 8500.00), ('bob', 'bob@example.com', 32, 'Shanghai', 12000.00), ('charlie', 'charlie@example.com', 25, 'Guangzhou', 7500.00), ('diana', 'diana@example.com', 29, 'Shenzhen', 9500.00), ('eve', 'eve@example.com', 35, 'Hangzhou', 11000.00); -- 插入产品数据 INSERT INTO products (name, description, price, category, stock_quantity) VALUES ('iPhone 15', 'Latest Apple smartphone', 7999.00, 'Electronics', 50), ('MacBook Pro', 'Professional laptop', 15999.00, 'Electronics', 30), ('Nike Air Max', 'Running shoes', 899.00, 'Sports', 100), ('Coffee Maker', 'Automatic coffee machine', 1299.00, 'Home', 40), ('Yoga Mat', 'Premium yoga mat', 299.00, 'Sports', 80); -- 插入订单数据 INSERT INTO orders (user_id, order_number, total_amount, status, order_date) VALUES (1, 'ORD-2024-001', 8898.00, 'delivered', '2024-01-15'), (2, 'ORD-2024-002', 15999.00, 'delivered', '2024-01-20'), (3, 'ORD-2024-003', 1198.00, 'shipped', '2024-02-01'), (4, 'ORD-2024-004', 4999.00, 'paid', '2024-02-05'), (5, 'ORD-2024-005', 1898.00, 'delivered', '2024-02-10'); -- 使用命令导入: -- mysql -u root -p import_demo < sample_data.sql

使用 LOAD DATA INFILE 导入

基本语法和选项

-- 基本语法 LOAD DATA INFILE 'file_path' INTO TABLE table_name FIELDS TERMINATED BY 'delimiter' LINES TERMINATED BY 'line_ending' [IGNORE number LINES] [REPLACE | IGNORE] (column1, column2, ...); -- 检查文件导入设置 SHOW VARIABLES LIKE 'secure_file_priv'; SHOW VARIABLES LIKE 'local_infile'; -- 如果需要启用本地文件导入 -- SET GLOBAL local_infile = 1; -- 1. 导入 CSV 文件(基本示例) LOAD DATA INFILE '/tmp/users.csv' INTO TABLE users FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES (username, email, age, city, salary); -- 2. 导入 TSV 文件 LOAD DATA INFILE '/tmp/products.tsv' INTO TABLE products FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' IGNORE 1 LINES (name, description, price, category, stock_quantity); -- 3. 使用 REPLACE 处理重复数据 LOAD DATA INFILE '/tmp/users_update.csv' REPLACE INTO TABLE users FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES (id, username, email, age, city, salary); -- 4. 使用 IGNORE 跳过重复数据 LOAD DATA INFILE '/tmp/new_users.csv' IGNORE INTO TABLE users FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES (username, email, age, city, salary); -- 5. 导入时进行数据转换 LOAD DATA INFILE '/tmp/orders.csv' INTO TABLE orders FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES (user_id, order_number, total_amount, @status, @order_date) SET status = CASE WHEN @status = '1' THEN 'pending' WHEN @status = '2' THEN 'paid' WHEN @status = '3' THEN 'shipped' WHEN @status = '4' THEN 'delivered' ELSE 'pending' END, order_date = STR_TO_DATE(@order_date, '%Y-%m-%d'); -- 6. 导入部分列 LOAD DATA INFILE '/tmp/partial_users.csv' INTO TABLE users FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' (username, email, city) SET age = 25, salary = 5000.00; -- 7. 使用本地文件导入(客户端文件) LOAD DATA LOCAL INFILE '/local/path/users.csv' INTO TABLE users FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES;

创建示例 CSV 文件

username,email,age,city,salary "john","john@example.com",30,"Beijing",9000.00 "jane","jane@example.com",28,"Shanghai",8500.00 "mike","mike@example.com",35,"Guangzhou",11000.00 "sarah","sarah@example.com",27,"Shenzhen",7800.00 "tom","tom@example.com",32,"Hangzhou",9500.00
name description price category stock_quantity iPad Air Tablet computer 4999.00 Electronics 40 Sony Headphones Wireless headphones 899.00 Electronics 60 Running Shoes Sports shoes 599.00 Sports 120 Office Desk Wooden desk 1599.00 Furniture 25 Water Bottle Stainless steel 89.00 Sports 200

LOAD DATA 选项详解

选项 说明 示例
FIELDS TERMINATED BY 字段分隔符 FIELDS TERMINATED BY ','
ENCLOSED BY 字段包围符 ENCLOSED BY '"'
ESCAPED BY 转义字符 ESCAPED BY '\\'
LINES TERMINATED BY 行分隔符 LINES TERMINATED BY '\n'
IGNORE n LINES 忽略前 n 行 IGNORE 1 LINES
REPLACE 替换重复记录 REPLACE INTO TABLE
IGNORE 忽略重复记录 IGNORE INTO TABLE
LOCAL 从客户端导入文件 LOAD DATA LOCAL INFILE
⚠️ LOAD DATA INFILE 注意事项:
  • 文件权限:MySQL 服务器需要对文件有读权限
  • secure_file_priv:检查文件导入路径限制
  • 字符编码:确保文件编码与数据库字符集一致
  • 数据格式:确保数据格式与表结构匹配
  • 外键约束:导入时可能需要临时禁用外键检查

使用 mysqlimport 工具

基本用法

mysqlimport 命令语法

# 基本语法 mysqlimport [options] database_name file_name # 导入单个文件(文件名必须与表名匹配) mysqlimport -u root -p import_demo users.txt # 导入多个文件 mysqlimport -u root -p import_demo users.txt products.txt orders.txt # 常用选项 mysqlimport -u root -p \ --fields-terminated-by=',' \ --fields-enclosed-by='"' \ --lines-terminated-by='\n' \ --ignore-lines=1 \ --replace \ import_demo users.csv # 本地文件导入 mysqlimport -u root -p \ --local \ --fields-terminated-by=',' \ --ignore-lines=1 \ import_demo users.txt # 压缩文件导入 mysqlimport -u root -p \ --compress \ --fields-terminated-by=',' \ import_demo users.txt # 并行导入多个文件 mysqlimport -u root -p \ --use-threads=4 \ --fields-terminated-by=',' \ import_demo *.txt # 导入时显示详细信息 mysqlimport -u root -p \ --verbose \ --fields-terminated-by=',' \ import_demo users.txt

mysqlimport 选项对比

选项 说明 等效的 LOAD DATA 语法
--fields-terminated-by=',' 字段分隔符 FIELDS TERMINATED BY ','
--fields-enclosed-by='"' 字段包围符 ENCLOSED BY '"'
--lines-terminated-by='\n' 行分隔符 LINES TERMINATED BY '\n'
--ignore-lines=1 忽略首行 IGNORE 1 LINES
--replace 替换重复记录 REPLACE INTO TABLE
--ignore 忽略重复记录 IGNORE INTO TABLE
--local 本地文件导入 LOAD DATA LOCAL INFILE

编程语言导入示例

Python 导入示例

#!/usr/bin/env python3 # -*- coding: utf-8 -*- import mysql.connector import csv import json import pandas as pd from datetime import datetime import logging # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 数据库连接配置 config = { 'host': 'localhost', 'user': 'root', 'password': 'your_password', 'database': 'import_demo', 'charset': 'utf8mb4', 'autocommit': False } def import_from_csv(csv_file, table_name, column_mapping=None): """从 CSV 文件导入数据""" try: conn = mysql.connector.connect(**config) cursor = conn.cursor() with open(csv_file, 'r', encoding='utf-8') as file: csv_reader = csv.DictReader(file) # 获取列名 columns = list(csv_reader.fieldnames) if column_mapping: columns = [column_mapping.get(col, col) for col in columns] # 构建 INSERT 语句 placeholders = ', '.join(['%s'] * len(columns)) insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" # 批量插入数据 batch_data = [] batch_size = 1000 for row_num, row in enumerate(csv_reader, 1): values = [row[col] if col in row else None for col in csv_reader.fieldnames] batch_data.append(values) if len(batch_data) >= batch_size: cursor.executemany(insert_sql, batch_data) conn.commit() logger.info(f"Imported {row_num} rows") batch_data = [] # 插入剩余数据 if batch_data: cursor.executemany(insert_sql, batch_data) conn.commit() logger.info(f"Successfully imported {row_num} rows from {csv_file}") except mysql.connector.Error as err: logger.error(f"Database error: {err}") conn.rollback() except Exception as err: logger.error(f"Error: {err}") conn.rollback() finally: if conn.is_connected(): cursor.close() conn.close() def import_from_json(json_file, table_name): """从 JSON 文件导入数据""" try: conn = mysql.connector.connect(**config) cursor = conn.cursor() with open(json_file, 'r', encoding='utf-8') as file: data = json.load(file) if not data: logger.warning("No data found in JSON file") return # 获取列名(从第一条记录) columns = list(data[0].keys()) placeholders = ', '.join(['%s'] * len(columns)) insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" # 准备数据 batch_data = [] for record in data: values = [record.get(col) for col in columns] batch_data.append(values) # 批量插入 cursor.executemany(insert_sql, batch_data) conn.commit() logger.info(f"Successfully imported {len(data)} records from {json_file}") except mysql.connector.Error as err: logger.error(f"Database error: {err}") conn.rollback() except Exception as err: logger.error(f"Error: {err}") conn.rollback() finally: if conn.is_connected(): cursor.close() conn.close() def import_from_excel(excel_file, table_name, sheet_name=0): """从 Excel 文件导入数据""" try: conn = mysql.connector.connect(**config) cursor = conn.cursor() # 读取 Excel 文件 df = pd.read_excel(excel_file, sheet_name=sheet_name) # 处理 NaN 值 df = df.fillna('') # 获取列名 columns = df.columns.tolist() placeholders = ', '.join(['%s'] * len(columns)) insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})" # 转换数据为列表 data = df.values.tolist() # 批量插入 batch_size = 1000 for i in range(0, len(data), batch_size): batch = data[i:i + batch_size] cursor.executemany(insert_sql, batch) conn.commit() logger.info(f"Imported {min(i + batch_size, len(data))} / {len(data)} rows") logger.info(f"Successfully imported {len(data)} records from {excel_file}") except mysql.connector.Error as err: logger.error(f"Database error: {err}") conn.rollback() except Exception as err: logger.error(f"Error: {err}") conn.rollback() finally: if conn.is_connected(): cursor.close() conn.close() def validate_and_clean_data(data, table_name): """数据验证和清洗""" cleaned_data = [] errors = [] for i, record in enumerate(data): try: # 示例验证规则 if table_name == 'users': # 验证邮箱格式 if '@' not in record.get('email', ''): errors.append(f"Row {i+1}: Invalid email format") continue # 验证年龄范围 age = record.get('age') if age and (int(age) < 0 or int(age) > 120): errors.append(f"Row {i+1}: Invalid age value") continue cleaned_data.append(record) except Exception as e: errors.append(f"Row {i+1}: {str(e)}") return cleaned_data, errors def import_with_error_handling(file_path, table_name, file_type='csv'): """带错误处理的导入函数""" try: logger.info(f"Starting import from {file_path} to {table_name}") # 根据文件类型选择导入方法 if file_type.lower() == 'csv': import_from_csv(file_path, table_name) elif file_type.lower() == 'json': import_from_json(file_path, table_name) elif file_type.lower() in ['xlsx', 'xls']: import_from_excel(file_path, table_name) else: raise ValueError(f"Unsupported file type: {file_type}") logger.info("Import completed successfully") except Exception as e: logger.error(f"Import failed: {str(e)}") raise # 使用示例 if __name__ == "__main__": # 导入 CSV 文件 # import_from_csv('users.csv', 'users') # 导入 JSON 文件 # import_from_json('products.json', 'products') # 导入 Excel 文件 # import_from_excel('orders.xlsx', 'orders') # 带错误处理的导入 # import_with_error_handling('data.csv', 'users', 'csv') print("Import examples ready to use")

PHP 导入示例

setAttribute(PDO::ATTR_ERRMODE, PDO::ERRMODE_EXCEPTION); $pdo->setAttribute(PDO::ATTR_AUTOCOMMIT, false); } catch(PDOException $e) { die("Connection failed: " . $e->getMessage()); } /** * 从 CSV 文件导入数据 */ function importFromCSV($pdo, $csvFile, $tableName, $hasHeader = true) { try { $pdo->beginTransaction(); $file = fopen($csvFile, 'r'); if (!$file) { throw new Exception("Cannot open file: $csvFile"); } $rowCount = 0; $headers = null; while (($row = fgetcsv($file)) !== false) { $rowCount++; // 跳过标题行 if ($hasHeader && $rowCount === 1) { $headers = $row; continue; } // 构建 INSERT 语句 if ($headers) { $columns = implode(', ', $headers); $placeholders = ':' . implode(', :', $headers); $sql = "INSERT INTO $tableName ($columns) VALUES ($placeholders)"; $stmt = $pdo->prepare($sql); // 绑定参数 foreach ($headers as $index => $header) { $value = isset($row[$index]) ? $row[$index] : null; $stmt->bindValue(":$header", $value); } } else { // 没有标题行,使用位置参数 $placeholders = str_repeat('?,', count($row) - 1) . '?'; $sql = "INSERT INTO $tableName VALUES ($placeholders)"; $stmt = $pdo->prepare($sql); $stmt->execute($row); } $stmt->execute(); } fclose($file); $pdo->commit(); echo "Successfully imported " . ($rowCount - ($hasHeader ? 1 : 0)) . " rows from $csvFile\n"; } catch (Exception $e) { $pdo->rollback(); echo "Error importing CSV: " . $e->getMessage() . "\n"; } } /** * 从 JSON 文件导入数据 */ function importFromJSON($pdo, $jsonFile, $tableName) { try { $pdo->beginTransaction(); $jsonData = file_get_contents($jsonFile); $data = json_decode($jsonData, true); if (json_last_error() !== JSON_ERROR_NONE) { throw new Exception("Invalid JSON format: " . json_last_error_msg()); } if (empty($data)) { throw new Exception("No data found in JSON file"); } // 获取列名(从第一条记录) $columns = array_keys($data[0]); $columnList = implode(', ', $columns); $placeholders = ':' . implode(', :', $columns); $sql = "INSERT INTO $tableName ($columnList) VALUES ($placeholders)"; $stmt = $pdo->prepare($sql); $importCount = 0; foreach ($data as $record) { // 绑定参数 foreach ($columns as $column) { $value = isset($record[$column]) ? $record[$column] : null; $stmt->bindValue(":$column", $value); } $stmt->execute(); $importCount++; } $pdo->commit(); echo "Successfully imported $importCount records from $jsonFile\n"; } catch (Exception $e) { $pdo->rollback(); echo "Error importing JSON: " . $e->getMessage() . "\n"; } } /** * 批量导入数据 */ function batchImport($pdo, $data, $tableName, $batchSize = 1000) { try { $pdo->beginTransaction(); if (empty($data)) { throw new Exception("No data to import"); } // 获取列名 $columns = array_keys($data[0]); $columnList = implode(', ', $columns); $placeholders = ':' . implode(', :', $columns); $sql = "INSERT INTO $tableName ($columnList) VALUES ($placeholders)"; $stmt = $pdo->prepare($sql); $totalCount = count($data); $importedCount = 0; for ($i = 0; $i < $totalCount; $i += $batchSize) { $batch = array_slice($data, $i, $batchSize); foreach ($batch as $record) { foreach ($columns as $column) { $value = isset($record[$column]) ? $record[$column] : null; $stmt->bindValue(":$column", $value); } $stmt->execute(); $importedCount++; } echo "Imported $importedCount / $totalCount records\n"; } $pdo->commit(); echo "Batch import completed successfully\n"; } catch (Exception $e) { $pdo->rollback(); echo "Error in batch import: " . $e->getMessage() . "\n"; } } /** * 数据验证和清洗 */ function validateAndCleanData($data, $tableName) { $cleanedData = []; $errors = []; foreach ($data as $index => $record) { $rowNum = $index + 1; $isValid = true; // 示例验证规则 if ($tableName === 'users') { // 验证邮箱 if (empty($record['email']) || !filter_var($record['email'], FILTER_VALIDATE_EMAIL)) { $errors[] = "Row $rowNum: Invalid email format"; $isValid = false; } // 验证年龄 if (isset($record['age']) && ($record['age'] < 0 || $record['age'] > 120)) { $errors[] = "Row $rowNum: Invalid age value"; $isValid = false; } // 清洗数据 if (isset($record['username'])) { $record['username'] = trim($record['username']); } } if ($isValid) { $cleanedData[] = $record; } } return [$cleanedData, $errors]; } /** * 导入进度监控 */ function importWithProgress($pdo, $data, $tableName) { $total = count($data); $imported = 0; $batchSize = 100; echo "Starting import of $total records...\n"; try { $pdo->beginTransaction(); $columns = array_keys($data[0]); $columnList = implode(', ', $columns); $placeholders = ':' . implode(', :', $columns); $sql = "INSERT INTO $tableName ($columnList) VALUES ($placeholders)"; $stmt = $pdo->prepare($sql); foreach ($data as $record) { foreach ($columns as $column) { $stmt->bindValue(":$column", $record[$column] ?? null); } $stmt->execute(); $imported++; // 显示进度 if ($imported % $batchSize === 0 || $imported === $total) { $percentage = round(($imported / $total) * 100, 2); echo "Progress: $imported / $total ($percentage%)\n"; } } $pdo->commit(); echo "Import completed successfully!\n"; } catch (Exception $e) { $pdo->rollback(); echo "Import failed: " . $e->getMessage() . "\n"; } } // 使用示例 // 导入 CSV 文件 // importFromCSV($pdo, 'users.csv', 'users', true); // 导入 JSON 文件 // importFromJSON($pdo, 'products.json', 'products'); // 示例数据 $sampleData = [ ['username' => 'test1', 'email' => 'test1@example.com', 'age' => 25], ['username' => 'test2', 'email' => 'test2@example.com', 'age' => 30], ['username' => 'test3', 'email' => 'test3@example.com', 'age' => 28] ]; // 批量导入 // batchImport($pdo, $sampleData, 'users'); echo "Import functions ready to use\n"; ?>

导入方法对比

方法 适用场景 优点 缺点 性能
mysql 命令行 SQL 脚本导入 简单直接,支持完整备份恢复 只支持 SQL 格式 中等
LOAD DATA INFILE 大量结构化数据 性能最高,支持多种格式 文件路径限制,格式要求严格 最高
mysqlimport 批量文件导入 命令行操作,支持并行 文件名必须与表名匹配
编程语言 复杂数据处理 灵活性高,可进行数据验证 开发复杂度高 中等
MySQL Workbench 小量数据,图形操作 界面友好,操作简单 不适合大量数据

性能优化和最佳实践

🚀 批量导入

使用批量插入减少网络往返,提高导入效率

🔧 禁用约束

导入时临时禁用外键和唯一性检查

📊 事务控制

合理使用事务,平衡性能和数据安全

🔍 数据验证

导入前进行数据格式和完整性验证

📈 监控进度

大数据导入时显示进度,便于监控

🔄 错误处理

建立完善的错误处理和回滚机制

大数据量导入优化

-- 导入前的优化设置 SET autocommit = 0; SET unique_checks = 0; SET foreign_key_checks = 0; SET sql_log_bin = 0; -- 如果不需要复制到从服务器 -- 调整 InnoDB 参数(需要重启 MySQL) -- innodb_buffer_pool_size = 70% of RAM -- innodb_log_file_size = 256M or larger -- innodb_flush_log_at_trx_commit = 2 -- 使用 LOAD DATA INFILE 进行高效导入 LOAD DATA INFILE '/path/to/large_file.csv' INTO TABLE large_table FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' IGNORE 1 LINES; -- 恢复设置 SET foreign_key_checks = 1; SET unique_checks = 1; SET autocommit = 1; COMMIT; -- 分批导入大文件的示例 DELIMITER // CREATE PROCEDURE ImportLargeFile( IN file_path VARCHAR(255), IN table_name VARCHAR(64), IN batch_size INT DEFAULT 10000 ) BEGIN DECLARE done INT DEFAULT FALSE; DECLARE batch_count INT DEFAULT 0; DECLARE total_rows INT DEFAULT 0; -- 这里需要实现分批读取文件的逻辑 -- 实际应用中可能需要外部脚本配合 SELECT CONCAT('Starting import to ', table_name) as status; -- 示例:假设我们有一个临时表来存储批次信息 WHILE batch_count < 100 DO -- 假设最多100个批次 -- 这里应该是实际的导入逻辑 SET batch_count = batch_count + 1; SET total_rows = total_rows + batch_size; SELECT CONCAT('Processed batch ', batch_count, ', total rows: ', total_rows) as progress; END WHILE; SELECT CONCAT('Import completed. Total rows: ', total_rows) as result; END // DELIMITER ; -- 并行导入(分割文件后并行处理) -- 文件1:records_1_100000.csv -- 文件2:records_100001_200000.csv -- 文件3:records_200001_300000.csv -- 在不同的连接中同时执行: -- Connection 1: -- LOAD DATA INFILE '/path/records_1_100000.csv' INTO TABLE target_table; -- Connection 2: -- LOAD DATA INFILE '/path/records_100001_200000.csv' INTO TABLE target_table; -- Connection 3: -- LOAD DATA INFILE '/path/records_200001_300000.csv' INTO TABLE target_table;

导入错误处理

-- 创建错误日志表 CREATE TABLE import_errors ( id INT AUTO_INCREMENT PRIMARY KEY, import_session VARCHAR(50), table_name VARCHAR(64), error_type VARCHAR(50), error_message TEXT, raw_data TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- 创建导入状态表 CREATE TABLE import_status ( id INT AUTO_INCREMENT PRIMARY KEY, session_id VARCHAR(50) UNIQUE, table_name VARCHAR(64), file_name VARCHAR(255), total_records INT, imported_records INT, error_records INT, status ENUM('running', 'completed', 'failed', 'cancelled'), start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, end_time TIMESTAMP NULL, notes TEXT ); -- 导入监控存储过程 DELIMITER // CREATE PROCEDURE StartImportSession( IN session_id VARCHAR(50), IN table_name VARCHAR(64), IN file_name VARCHAR(255), IN total_records INT ) BEGIN INSERT INTO import_status (session_id, table_name, file_name, total_records, imported_records, error_records, status) VALUES (session_id, table_name, file_name, total_records, 0, 0, 'running'); END // CREATE PROCEDURE UpdateImportProgress( IN session_id VARCHAR(50), IN imported_count INT, IN error_count INT ) BEGIN UPDATE import_status SET imported_records = imported_count, error_records = error_count WHERE session_id = session_id; END // CREATE PROCEDURE CompleteImportSession( IN session_id VARCHAR(50), IN final_status VARCHAR(20), IN notes TEXT ) BEGIN UPDATE import_status SET status = final_status, end_time = CURRENT_TIMESTAMP, notes = notes WHERE session_id = session_id; END // DELIMITER ; -- 使用示例 -- CALL StartImportSession('IMP-2024-001', 'users', 'users.csv', 10000); -- CALL UpdateImportProgress('IMP-2024-001', 5000, 10); -- CALL CompleteImportSession('IMP-2024-001', 'completed', 'Import successful'); -- 查询导入状态 SELECT session_id, table_name, file_name, CONCAT(imported_records, '/', total_records) as progress, ROUND((imported_records / total_records) * 100, 2) as percentage, error_records, status, TIMESTAMPDIFF(SECOND, start_time, COALESCE(end_time, NOW())) as duration_seconds FROM import_status ORDER BY start_time DESC;

数据验证和清洗

-- 创建数据验证函数 DELIMITER // CREATE FUNCTION ValidateEmail(email VARCHAR(255)) RETURNS BOOLEAN READS SQL DATA DETERMINISTIC BEGIN RETURN email REGEXP '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$'; END // CREATE FUNCTION ValidatePhone(phone VARCHAR(20)) RETURNS BOOLEAN READS SQL DATA DETERMINISTIC BEGIN RETURN phone REGEXP '^[0-9+\-\s\(\)]{10,20}$'; END // CREATE FUNCTION CleanString(input_str TEXT) RETURNS TEXT READS SQL DATA DETERMINISTIC BEGIN DECLARE cleaned TEXT; SET cleaned = TRIM(input_str); SET cleaned = REPLACE(cleaned, '\t', ' '); SET cleaned = REPLACE(cleaned, '\n', ' '); SET cleaned = REPLACE(cleaned, '\r', ' '); WHILE LOCATE(' ', cleaned) > 0 DO SET cleaned = REPLACE(cleaned, ' ', ' '); END WHILE; RETURN cleaned; END // DELIMITER ; -- 数据清洗示例 UPDATE temp_import_users SET username = CleanString(username), email = LOWER(TRIM(email)), phone = REGEXP_REPLACE(phone, '[^0-9+\-]', ''), age = CASE WHEN age < 0 OR age > 120 THEN NULL ELSE age END WHERE processed = FALSE; -- 验证数据质量 SELECT 'Total Records' as metric, COUNT(*) as count FROM temp_import_users UNION ALL SELECT 'Valid Emails' as metric, COUNT(*) as count FROM temp_import_users WHERE ValidateEmail(email) = TRUE UNION ALL SELECT 'Invalid Emails' as metric, COUNT(*) as count FROM temp_import_users WHERE ValidateEmail(email) = FALSE UNION ALL SELECT 'Duplicate Usernames' as metric, COUNT(*) - COUNT(DISTINCT username) as count FROM temp_import_users;

最佳实践和注意事项

📋 导入前准备

  • 备份数据:导入前务必备份目标数据库
  • 测试环境:先在测试环境验证导入流程
  • 数据验证:检查数据格式、编码和完整性
  • 权限检查:确保有足够的数据库权限
  • 空间评估:确保有足够的磁盘空间

⚡ 性能优化建议

  • 批量操作:使用批量插入而非逐条插入
  • 禁用约束:导入时临时禁用外键和索引
  • 调整参数:优化 MySQL 配置参数
  • 并行处理:大文件分割后并行导入
  • 事务控制:合理使用事务大小

⚠️ 常见问题和解决方案

  • 字符编码问题:确保文件编码与数据库字符集一致
  • 文件路径限制:检查 secure_file_priv 设置
  • 权限不足:确保 MySQL 用户有 FILE 权限
  • 数据类型不匹配:检查数据格式与表结构的兼容性
  • 重复数据:使用 REPLACE 或 IGNORE 处理重复记录
  • 大文件超时:调整 max_execution_time 等参数

导入安全考虑

-- 1. 创建专用导入用户 CREATE USER 'import_user'@'localhost' IDENTIFIED BY 'strong_password'; GRANT SELECT, INSERT, UPDATE, DELETE ON import_demo.* TO 'import_user'@'localhost'; GRANT FILE ON *.* TO 'import_user'@'localhost'; -- 仅在需要时授予 -- 2. 限制导入文件路径 -- 在 my.cnf 中设置: -- secure_file_priv = '/var/lib/mysql-files/' -- 3. 记录导入操作 CREATE TABLE import_audit ( id INT AUTO_INCREMENT PRIMARY KEY, user_name VARCHAR(50), operation VARCHAR(100), table_name VARCHAR(64), file_name VARCHAR(255), records_affected INT, execution_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ip_address VARCHAR(45), status VARCHAR(20) ); -- 4. 导入前检查 SELECT 'Database Size' as check_item, ROUND(SUM(data_length + index_length) / 1024 / 1024, 2) as size_mb FROM information_schema.tables WHERE table_schema = 'import_demo' UNION ALL SELECT 'Available Space' as check_item, ROUND((@@datadir_disk_usage_percentage), 2) as percentage; -- 5. 导入后验证 SELECT table_name, table_rows, ROUND((data_length + index_length) / 1024 / 1024, 2) as size_mb, create_time, update_time FROM information_schema.tables WHERE table_schema = 'import_demo' ORDER BY update_time DESC;

自动化导入脚本

#!/bin/bash # MySQL 数据导入自动化脚本 # 配置变量 DB_HOST="localhost" DB_USER="import_user" DB_PASS="your_password" DB_NAME="import_demo" IMPORT_DIR="/var/lib/mysql-files" LOG_FILE="/var/log/mysql_import.log" BACKUP_DIR="/backup/mysql" # 日志函数 log_message() { echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE" } # 备份函数 backup_database() { log_message "Starting database backup..." mysqldump -h "$DB_HOST" -u "$DB_USER" -p"$DB_PASS" "$DB_NAME" > "$BACKUP_DIR/backup_$(date +%Y%m%d_%H%M%S).sql" if [ $? -eq 0 ]; then log_message "Database backup completed successfully" return 0 else log_message "Database backup failed" return 1 fi } # 导入函数 import_csv_file() { local file_path="$1" local table_name="$2" log_message "Starting import of $file_path to $table_name" # 检查文件是否存在 if [ ! -f "$file_path" ]; then log_message "Error: File $file_path not found" return 1 fi # 执行导入 mysql -h "$DB_HOST" -u "$DB_USER" -p"$DB_PASS" "$DB_NAME" << EOF LOAD DATA INFILE '$file_path' INTO TABLE $table_name FIELDS TERMINATED BY ',' ENCLOSED BY '"' LINES TERMINATED BY '\n' IGNORE 1 LINES; EOF if [ $? -eq 0 ]; then log_message "Import of $file_path completed successfully" return 0 else log_message "Import of $file_path failed" return 1 fi } # 主函数 main() { log_message "Starting automated import process" # 备份数据库 if ! backup_database; then log_message "Backup failed, aborting import" exit 1 fi # 导入文件列表 declare -A import_files=( ["$IMPORT_DIR/users.csv"]="users" ["$IMPORT_DIR/products.csv"]="products" ["$IMPORT_DIR/orders.csv"]="orders" ) # 执行导入 for file_path in "${!import_files[@]}"; do table_name="${import_files[$file_path]}" import_csv_file "$file_path" "$table_name" done log_message "Automated import process completed" } # 执行主函数 main "$@"

总结

MySQL 数据导入是数据库管理中的重要技能,选择合适的导入方法可以大大提高工作效率。本教程介绍了多种导入方法,从简单的命令行操作到复杂的编程实现,每种方法都有其适用场景。

🎯 关键要点

  • 方法选择:根据数据量、格式和需求选择合适的导入方法
  • 性能优化:大数据量导入时注意性能优化技巧
  • 错误处理:建立完善的错误处理和监控机制
  • 数据验证:导入前后都要进行数据验证
  • 安全考虑:注意导入操作的安全性和权限控制

掌握这些导入技术,可以帮助你高效地处理各种数据导入任务,无论是日常的数据维护还是大规模的数据迁移项目。