#!/usr/bin/env python # -*- coding: utf-8 -*- import sqlite3 import os import sys import json import csv import datetime def export_cursor_tables(): """导出Cursor数据库中的ItemTable和cursorDiskKV表""" print("Cursor 数据库表导出工具") print("-" * 50) # 获取数据库路径 if sys.platform == "win32": appdata = os.getenv("APPDATA") if appdata is None: print("错误: APPDATA 环境变量未设置!") return db_path = os.path.join(appdata, "Cursor", "User", "globalStorage", "state.vscdb") else: print("目前只支持Windows系统") return if not os.path.exists(db_path): print(f"错误: Cursor数据库文件不存在: {db_path}") return print(f"使用Cursor数据库: {db_path}") # 创建导出目录 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") export_dir = f"cursor_db_export_{timestamp}" os.makedirs(export_dir, exist_ok=True) print(f"将导出数据到目录: {export_dir}") try: # 连接到数据库 conn = sqlite3.connect(db_path) cursor = conn.cursor() tables_to_export = ["ItemTable", "cursorDiskKV"] for table_name in tables_to_export: print(f"\n正在导出表 '{table_name}'...") # 检查表是否存在 cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';") if not cursor.fetchone(): print(f"表 '{table_name}' 不存在!") continue # 获取表的列名 cursor.execute(f"PRAGMA table_info({table_name});") columns = cursor.fetchall() col_names = [col[1] for col in columns] # 获取所有数据 cursor.execute(f"SELECT * FROM {table_name};") rows = cursor.fetchall() if not rows: print(f"表 '{table_name}' 中没有数据") continue record_count = len(rows) print(f"找到 {record_count} 条记录") # 导出为JSON json_file = os.path.join(export_dir, f"{table_name}.json") with open(json_file, 'w', encoding='utf-8') as f: json_data = [] for row in rows: row_dict = {} for i, col_name in enumerate(col_names): # 处理特殊类型 if isinstance(row[i], bytes): try: # 尝试解码为JSON row_dict[col_name] = json.loads(row[i]) except: # 如果失败则使用hex表示 row_dict[col_name] = row[i].hex() else: row_dict[col_name] = row[i] json_data.append(row_dict) json.dump(json_data, f, ensure_ascii=False, indent=2) print(f"已导出JSON: {json_file}") # 导出为CSV csv_file = os.path.join(export_dir, f"{table_name}.csv") with open(csv_file, 'w', encoding='utf-8', newline='') as f: csv_writer = csv.writer(f) csv_writer.writerow(col_names) # 写入表头 for row in rows: # 处理每个字段,确保CSV可以正确处理 processed_row = [] for item in row: if isinstance(item, bytes): try: # 尝试解码为字符串 processed_row.append(str(item)) except: # 如果失败则使用hex表示 processed_row.append(item.hex()) else: processed_row.append(item) csv_writer.writerow(processed_row) print(f"已导出CSV: {csv_file}") # 导出为SQL插入语句 sql_file = os.path.join(export_dir, f"{table_name}.sql") with open(sql_file, 'w', encoding='utf-8') as f: f.write(f"-- 导出自 {db_path}\n") f.write(f"-- 表: {table_name}\n") f.write(f"-- 时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n") f.write(f"-- 记录数: {record_count}\n\n") # 获取表结构 cursor.execute(f"SELECT sql FROM sqlite_master WHERE type='table' AND name='{table_name}';") table_sql = cursor.fetchone()[0] f.write(f"{table_sql};\n\n") # 写入INSERT语句 for row in rows: values = [] for item in row: if item is None: values.append("NULL") elif isinstance(item, (int, float)): values.append(str(item)) elif isinstance(item, bytes): hex_value = ''.join([f'\\x{b:02x}' for b in item]) values.append(f"X'{item.hex()}'") else: # 转义字符串中的引号 item_str = str(item).replace("'", "''") values.append(f"'{item_str}'") f.write(f"INSERT INTO {table_name} ({', '.join(col_names)}) VALUES ({', '.join(values)});\n") print(f"已导出SQL: {sql_file}") # 如果是cursorDiskKV表,尝试解析其中的JSON数据 if table_name == "cursorDiskKV": json_parsed_file = os.path.join(export_dir, f"{table_name}_parsed.json") with open(json_parsed_file, 'w', encoding='utf-8') as f: parsed_data = {} for row in rows: # 通常cursorDiskKV表有key和value两列 key = row[0] if isinstance(row[0], str) else str(row[0]) value = row[1] # 尝试解析value为JSON if isinstance(value, bytes): try: parsed_value = json.loads(value) parsed_data[key] = parsed_value except: parsed_data[key] = value.hex() elif isinstance(value, str): try: parsed_value = json.loads(value) parsed_data[key] = parsed_value except: parsed_data[key] = value else: parsed_data[key] = value json.dump(parsed_data, f, ensure_ascii=False, indent=2) print(f"已导出解析后的JSON: {json_parsed_file}") conn.close() print(f"\n导出完成!所有数据已保存到目录: {os.path.abspath(export_dir)}") except sqlite3.Error as e: print(f"SQLite 错误: {e}") except Exception as e: print(f"发生错误: {e}") import traceback traceback.print_exc() if __name__ == "__main__": export_cursor_tables()