179 lines
7.4 KiB
Python
179 lines
7.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
SQLite 到 MySQL 数据迁移脚本
|
||
用于将旧的 SQLite 数据库迁移到新的 MySQL 数据库
|
||
"""
|
||
|
||
import asyncio
|
||
import sqlite3
|
||
import sys
|
||
from typing import List, Dict, Any
|
||
|
||
import aiomysql
|
||
import yaml
|
||
from loguru import logger
|
||
|
||
|
||
async def migrate_data():
|
||
"""迁移数据主函数"""
|
||
# 读取配置文件
|
||
logger.info("读取配置文件")
|
||
with open("config.yaml", "r", encoding="utf-8") as f:
|
||
config = yaml.safe_load(f)
|
||
|
||
# 获取数据库配置
|
||
db_config = config.get("database", {})
|
||
sqlite_path = db_config.get("path", "cursor.db")
|
||
mysql_config = {
|
||
"host": db_config.get("host", "localhost"),
|
||
"port": db_config.get("port", 3306),
|
||
"user": db_config.get("username", "auto_cursor_reg"), # 使用正确的默认用户名
|
||
"password": db_config.get("password", "this_password_jiaqiao"), # 使用正确的默认密码
|
||
"db": db_config.get("database", "auto_cursor_reg"), # 使用正确的默认数据库名
|
||
"charset": "utf8mb4"
|
||
}
|
||
|
||
logger.info(f"MySQL配置: 主机={mysql_config['host']}:{mysql_config['port']}, 用户={mysql_config['user']}, 数据库={mysql_config['db']}")
|
||
|
||
# 连接SQLite数据库
|
||
logger.info(f"连接SQLite数据库: {sqlite_path}")
|
||
sqlite_conn = None
|
||
mysql_conn = None
|
||
|
||
try:
|
||
sqlite_conn = sqlite3.connect(sqlite_path)
|
||
sqlite_conn.row_factory = sqlite3.Row # 启用字典行工厂
|
||
except Exception as e:
|
||
logger.error(f"无法连接SQLite数据库: {e}")
|
||
return
|
||
|
||
# 连接MySQL数据库
|
||
logger.info(f"尝试连接MySQL数据库...")
|
||
try:
|
||
mysql_conn = await aiomysql.connect(**mysql_config)
|
||
logger.info("MySQL数据库连接成功")
|
||
except Exception as e:
|
||
logger.error(f"无法连接MySQL数据库: {e}")
|
||
logger.error(f"请确认MySQL服务已启动且用户名密码正确")
|
||
logger.info(f"您可能需要创建MySQL用户和数据库:")
|
||
logger.info(f" CREATE USER '{mysql_config['user']}'@'localhost' IDENTIFIED BY '{mysql_config['password']}';")
|
||
logger.info(f" CREATE DATABASE {mysql_config['db']} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")
|
||
logger.info(f" GRANT ALL PRIVILEGES ON {mysql_config['db']}.* TO '{mysql_config['user']}'@'localhost';")
|
||
logger.info(f" FLUSH PRIVILEGES;")
|
||
if sqlite_conn:
|
||
sqlite_conn.close()
|
||
return
|
||
|
||
try:
|
||
# 检查email_accounts表是否存在
|
||
logger.info("检查并创建MySQL表结构")
|
||
async with mysql_conn.cursor() as cursor:
|
||
await cursor.execute('''
|
||
CREATE TABLE IF NOT EXISTS email_accounts (
|
||
id INT AUTO_INCREMENT PRIMARY KEY,
|
||
email VARCHAR(255) UNIQUE NOT NULL,
|
||
password VARCHAR(255) NOT NULL,
|
||
client_id VARCHAR(255) NOT NULL,
|
||
refresh_token TEXT NOT NULL,
|
||
in_use BOOLEAN DEFAULT 0,
|
||
cursor_password VARCHAR(255),
|
||
cursor_cookie TEXT,
|
||
cursor_token TEXT,
|
||
sold BOOLEAN DEFAULT 0,
|
||
status VARCHAR(20) DEFAULT 'pending',
|
||
extracted BOOLEAN DEFAULT 0,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
||
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||
INDEX idx_status_inuse_sold (status, in_use, sold),
|
||
INDEX idx_extracted (extracted, status, sold)
|
||
)
|
||
''')
|
||
await mysql_conn.commit()
|
||
|
||
# 从SQLite读取数据
|
||
logger.info("从SQLite读取数据")
|
||
try:
|
||
sqlite_cursor = sqlite_conn.cursor()
|
||
sqlite_cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='email_accounts'")
|
||
if not sqlite_cursor.fetchone():
|
||
logger.warning("SQLite数据库中不存在email_accounts表,无需迁移")
|
||
return
|
||
|
||
# 获取表结构信息
|
||
sqlite_cursor.execute("PRAGMA table_info(email_accounts)")
|
||
columns_info = sqlite_cursor.fetchall()
|
||
column_names = [col["name"] for col in columns_info]
|
||
|
||
# 读取所有数据
|
||
sqlite_cursor.execute("SELECT * FROM email_accounts")
|
||
rows = sqlite_cursor.fetchall()
|
||
logger.info(f"从SQLite读取到 {len(rows)} 条数据")
|
||
except Exception as e:
|
||
logger.error(f"读取SQLite数据失败: {e}")
|
||
return
|
||
|
||
# 迁移数据到MySQL
|
||
if rows:
|
||
logger.info("开始迁移数据到MySQL")
|
||
try:
|
||
async with mysql_conn.cursor() as cursor:
|
||
# 检查MySQL表中是否已有数据
|
||
await cursor.execute("SELECT COUNT(*) FROM email_accounts")
|
||
result = await cursor.fetchone()
|
||
if result and result[0] > 0:
|
||
logger.warning("MySQL表中已存在数据,是否继续?(y/n)")
|
||
response = input().strip().lower()
|
||
if response != 'y':
|
||
logger.info("用户取消迁移")
|
||
return
|
||
|
||
# 构建插入查询
|
||
placeholders = ", ".join(["%s"] * len(column_names))
|
||
columns = ", ".join(column_names)
|
||
query = f"INSERT INTO email_accounts ({columns}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE id=id"
|
||
|
||
# 批量插入数据
|
||
batch_size = 100
|
||
for i in range(0, len(rows), batch_size):
|
||
batch = rows[i:i+batch_size]
|
||
batch_values = []
|
||
for row in batch:
|
||
# 将行数据转换为列表
|
||
row_values = [row[name] for name in column_names]
|
||
batch_values.append(row_values)
|
||
|
||
await cursor.executemany(query, batch_values)
|
||
await mysql_conn.commit()
|
||
logger.info(f"已迁移 {min(i+batch_size, len(rows))}/{len(rows)} 条数据")
|
||
|
||
logger.success(f"数据迁移完成,共迁移 {len(rows)} 条数据")
|
||
except Exception as e:
|
||
logger.error(f"迁移数据到MySQL失败: {e}")
|
||
else:
|
||
logger.warning("SQLite数据库中没有数据需要迁移")
|
||
|
||
finally:
|
||
# 安全关闭连接
|
||
if sqlite_conn:
|
||
sqlite_conn.close()
|
||
logger.debug("SQLite连接已关闭")
|
||
|
||
if mysql_conn:
|
||
await mysql_conn.close()
|
||
logger.debug("MySQL连接已关闭")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
# Windows平台特殊处理,强制使用SelectorEventLoop
|
||
if sys.platform.startswith('win'):
|
||
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
|
||
|
||
# 设置日志
|
||
logger.remove()
|
||
logger.add(sys.stderr, level="INFO")
|
||
logger.add("migrate.log", rotation="1 MB", level="DEBUG")
|
||
|
||
# 执行迁移
|
||
logger.info("开始执行数据迁移")
|
||
asyncio.run(migrate_data())
|
||
logger.info("数据迁移脚本执行完毕") |