Files
tingquanzhushou/cursor_db_export.py
huangzhenpc 2d603c33aa xxx
2025-05-17 18:16:24 +08:00

185 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sqlite3
import os
import sys
import json
import csv
import datetime
def export_cursor_tables():
"""导出Cursor数据库中的ItemTable和cursorDiskKV表"""
print("Cursor 数据库表导出工具")
print("-" * 50)
# 获取数据库路径
if sys.platform == "win32":
appdata = os.getenv("APPDATA")
if appdata is None:
print("错误: APPDATA 环境变量未设置!")
return
db_path = os.path.join(appdata, "Cursor", "User", "globalStorage", "state.vscdb")
else:
print("目前只支持Windows系统")
return
if not os.path.exists(db_path):
print(f"错误: Cursor数据库文件不存在: {db_path}")
return
print(f"使用Cursor数据库: {db_path}")
# 创建导出目录
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
export_dir = f"cursor_db_export_{timestamp}"
os.makedirs(export_dir, exist_ok=True)
print(f"将导出数据到目录: {export_dir}")
try:
# 连接到数据库
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
tables_to_export = ["ItemTable", "cursorDiskKV"]
for table_name in tables_to_export:
print(f"\n正在导出表 '{table_name}'...")
# 检查表是否存在
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
if not cursor.fetchone():
print(f"'{table_name}' 不存在!")
continue
# 获取表的列名
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
col_names = [col[1] for col in columns]
# 获取所有数据
cursor.execute(f"SELECT * FROM {table_name};")
rows = cursor.fetchall()
if not rows:
print(f"'{table_name}' 中没有数据")
continue
record_count = len(rows)
print(f"找到 {record_count} 条记录")
# 导出为JSON
json_file = os.path.join(export_dir, f"{table_name}.json")
with open(json_file, 'w', encoding='utf-8') as f:
json_data = []
for row in rows:
row_dict = {}
for i, col_name in enumerate(col_names):
# 处理特殊类型
if isinstance(row[i], bytes):
try:
# 尝试解码为JSON
row_dict[col_name] = json.loads(row[i])
except:
# 如果失败则使用hex表示
row_dict[col_name] = row[i].hex()
else:
row_dict[col_name] = row[i]
json_data.append(row_dict)
json.dump(json_data, f, ensure_ascii=False, indent=2)
print(f"已导出JSON: {json_file}")
# 导出为CSV
csv_file = os.path.join(export_dir, f"{table_name}.csv")
with open(csv_file, 'w', encoding='utf-8', newline='') as f:
csv_writer = csv.writer(f)
csv_writer.writerow(col_names) # 写入表头
for row in rows:
# 处理每个字段确保CSV可以正确处理
processed_row = []
for item in row:
if isinstance(item, bytes):
try:
# 尝试解码为字符串
processed_row.append(str(item))
except:
# 如果失败则使用hex表示
processed_row.append(item.hex())
else:
processed_row.append(item)
csv_writer.writerow(processed_row)
print(f"已导出CSV: {csv_file}")
# 导出为SQL插入语句
sql_file = os.path.join(export_dir, f"{table_name}.sql")
with open(sql_file, 'w', encoding='utf-8') as f:
f.write(f"-- 导出自 {db_path}\n")
f.write(f"-- 表: {table_name}\n")
f.write(f"-- 时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"-- 记录数: {record_count}\n\n")
# 获取表结构
cursor.execute(f"SELECT sql FROM sqlite_master WHERE type='table' AND name='{table_name}';")
table_sql = cursor.fetchone()[0]
f.write(f"{table_sql};\n\n")
# 写入INSERT语句
for row in rows:
values = []
for item in row:
if item is None:
values.append("NULL")
elif isinstance(item, (int, float)):
values.append(str(item))
elif isinstance(item, bytes):
hex_value = ''.join([f'\\x{b:02x}' for b in item])
values.append(f"X'{item.hex()}'")
else:
# 转义字符串中的引号
item_str = str(item).replace("'", "''")
values.append(f"'{item_str}'")
f.write(f"INSERT INTO {table_name} ({', '.join(col_names)}) VALUES ({', '.join(values)});\n")
print(f"已导出SQL: {sql_file}")
# 如果是cursorDiskKV表尝试解析其中的JSON数据
if table_name == "cursorDiskKV":
json_parsed_file = os.path.join(export_dir, f"{table_name}_parsed.json")
with open(json_parsed_file, 'w', encoding='utf-8') as f:
parsed_data = {}
for row in rows:
# 通常cursorDiskKV表有key和value两列
key = row[0] if isinstance(row[0], str) else str(row[0])
value = row[1]
# 尝试解析value为JSON
if isinstance(value, bytes):
try:
parsed_value = json.loads(value)
parsed_data[key] = parsed_value
except:
parsed_data[key] = value.hex()
elif isinstance(value, str):
try:
parsed_value = json.loads(value)
parsed_data[key] = parsed_value
except:
parsed_data[key] = value
else:
parsed_data[key] = value
json.dump(parsed_data, f, ensure_ascii=False, indent=2)
print(f"已导出解析后的JSON: {json_parsed_file}")
conn.close()
print(f"\n导出完成!所有数据已保存到目录: {os.path.abspath(export_dir)}")
except sqlite3.Error as e:
print(f"SQLite 错误: {e}")
except Exception as e:
print(f"发生错误: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
export_cursor_tables()