185 lines
7.6 KiB
Python
185 lines
7.6 KiB
Python
#!/usr/bin/env python
|
||
# -*- coding: utf-8 -*-
|
||
|
||
import sqlite3
|
||
import os
|
||
import sys
|
||
import json
|
||
import csv
|
||
import datetime
|
||
|
||
def export_cursor_tables():
|
||
"""导出Cursor数据库中的ItemTable和cursorDiskKV表"""
|
||
print("Cursor 数据库表导出工具")
|
||
print("-" * 50)
|
||
|
||
# 获取数据库路径
|
||
if sys.platform == "win32":
|
||
appdata = os.getenv("APPDATA")
|
||
if appdata is None:
|
||
print("错误: APPDATA 环境变量未设置!")
|
||
return
|
||
db_path = os.path.join(appdata, "Cursor", "User", "globalStorage", "state.vscdb")
|
||
else:
|
||
print("目前只支持Windows系统")
|
||
return
|
||
|
||
if not os.path.exists(db_path):
|
||
print(f"错误: Cursor数据库文件不存在: {db_path}")
|
||
return
|
||
|
||
print(f"使用Cursor数据库: {db_path}")
|
||
|
||
# 创建导出目录
|
||
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
export_dir = f"cursor_db_export_{timestamp}"
|
||
os.makedirs(export_dir, exist_ok=True)
|
||
print(f"将导出数据到目录: {export_dir}")
|
||
|
||
try:
|
||
# 连接到数据库
|
||
conn = sqlite3.connect(db_path)
|
||
cursor = conn.cursor()
|
||
|
||
tables_to_export = ["ItemTable", "cursorDiskKV"]
|
||
|
||
for table_name in tables_to_export:
|
||
print(f"\n正在导出表 '{table_name}'...")
|
||
|
||
# 检查表是否存在
|
||
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
|
||
if not cursor.fetchone():
|
||
print(f"表 '{table_name}' 不存在!")
|
||
continue
|
||
|
||
# 获取表的列名
|
||
cursor.execute(f"PRAGMA table_info({table_name});")
|
||
columns = cursor.fetchall()
|
||
col_names = [col[1] for col in columns]
|
||
|
||
# 获取所有数据
|
||
cursor.execute(f"SELECT * FROM {table_name};")
|
||
rows = cursor.fetchall()
|
||
|
||
if not rows:
|
||
print(f"表 '{table_name}' 中没有数据")
|
||
continue
|
||
|
||
record_count = len(rows)
|
||
print(f"找到 {record_count} 条记录")
|
||
|
||
# 导出为JSON
|
||
json_file = os.path.join(export_dir, f"{table_name}.json")
|
||
with open(json_file, 'w', encoding='utf-8') as f:
|
||
json_data = []
|
||
for row in rows:
|
||
row_dict = {}
|
||
for i, col_name in enumerate(col_names):
|
||
# 处理特殊类型
|
||
if isinstance(row[i], bytes):
|
||
try:
|
||
# 尝试解码为JSON
|
||
row_dict[col_name] = json.loads(row[i])
|
||
except:
|
||
# 如果失败则使用hex表示
|
||
row_dict[col_name] = row[i].hex()
|
||
else:
|
||
row_dict[col_name] = row[i]
|
||
json_data.append(row_dict)
|
||
json.dump(json_data, f, ensure_ascii=False, indent=2)
|
||
print(f"已导出JSON: {json_file}")
|
||
|
||
# 导出为CSV
|
||
csv_file = os.path.join(export_dir, f"{table_name}.csv")
|
||
with open(csv_file, 'w', encoding='utf-8', newline='') as f:
|
||
csv_writer = csv.writer(f)
|
||
csv_writer.writerow(col_names) # 写入表头
|
||
for row in rows:
|
||
# 处理每个字段,确保CSV可以正确处理
|
||
processed_row = []
|
||
for item in row:
|
||
if isinstance(item, bytes):
|
||
try:
|
||
# 尝试解码为字符串
|
||
processed_row.append(str(item))
|
||
except:
|
||
# 如果失败则使用hex表示
|
||
processed_row.append(item.hex())
|
||
else:
|
||
processed_row.append(item)
|
||
csv_writer.writerow(processed_row)
|
||
print(f"已导出CSV: {csv_file}")
|
||
|
||
# 导出为SQL插入语句
|
||
sql_file = os.path.join(export_dir, f"{table_name}.sql")
|
||
with open(sql_file, 'w', encoding='utf-8') as f:
|
||
f.write(f"-- 导出自 {db_path}\n")
|
||
f.write(f"-- 表: {table_name}\n")
|
||
f.write(f"-- 时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
|
||
f.write(f"-- 记录数: {record_count}\n\n")
|
||
|
||
# 获取表结构
|
||
cursor.execute(f"SELECT sql FROM sqlite_master WHERE type='table' AND name='{table_name}';")
|
||
table_sql = cursor.fetchone()[0]
|
||
f.write(f"{table_sql};\n\n")
|
||
|
||
# 写入INSERT语句
|
||
for row in rows:
|
||
values = []
|
||
for item in row:
|
||
if item is None:
|
||
values.append("NULL")
|
||
elif isinstance(item, (int, float)):
|
||
values.append(str(item))
|
||
elif isinstance(item, bytes):
|
||
hex_value = ''.join([f'\\x{b:02x}' for b in item])
|
||
values.append(f"X'{item.hex()}'")
|
||
else:
|
||
# 转义字符串中的引号
|
||
item_str = str(item).replace("'", "''")
|
||
values.append(f"'{item_str}'")
|
||
|
||
f.write(f"INSERT INTO {table_name} ({', '.join(col_names)}) VALUES ({', '.join(values)});\n")
|
||
print(f"已导出SQL: {sql_file}")
|
||
|
||
# 如果是cursorDiskKV表,尝试解析其中的JSON数据
|
||
if table_name == "cursorDiskKV":
|
||
json_parsed_file = os.path.join(export_dir, f"{table_name}_parsed.json")
|
||
with open(json_parsed_file, 'w', encoding='utf-8') as f:
|
||
parsed_data = {}
|
||
for row in rows:
|
||
# 通常cursorDiskKV表有key和value两列
|
||
key = row[0] if isinstance(row[0], str) else str(row[0])
|
||
value = row[1]
|
||
|
||
# 尝试解析value为JSON
|
||
if isinstance(value, bytes):
|
||
try:
|
||
parsed_value = json.loads(value)
|
||
parsed_data[key] = parsed_value
|
||
except:
|
||
parsed_data[key] = value.hex()
|
||
elif isinstance(value, str):
|
||
try:
|
||
parsed_value = json.loads(value)
|
||
parsed_data[key] = parsed_value
|
||
except:
|
||
parsed_data[key] = value
|
||
else:
|
||
parsed_data[key] = value
|
||
|
||
json.dump(parsed_data, f, ensure_ascii=False, indent=2)
|
||
print(f"已导出解析后的JSON: {json_parsed_file}")
|
||
|
||
conn.close()
|
||
print(f"\n导出完成!所有数据已保存到目录: {os.path.abspath(export_dir)}")
|
||
|
||
except sqlite3.Error as e:
|
||
print(f"SQLite 错误: {e}")
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
if __name__ == "__main__":
|
||
export_cursor_tables() |