Files
tingquanzhushou/db_interactive.py
huangzhenpc 2d603c33aa xxx
2025-05-17 18:16:24 +08:00

171 lines
6.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sqlite3
import os
def main():
print("SQLite3 数据库查看器")
print("-" * 40)
# 获取数据库路径
db_path = input("请输入数据库文件的完整路径: ").strip()
if not os.path.exists(db_path):
print(f"错误: 文件 '{db_path}' 不存在!")
return
try:
# 连接到数据库
print(f"连接到数据库 '{db_path}'...")
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
while True:
print("\n" + "-" * 40)
print("可用操作:")
print("1. 显示所有表")
print("2. 查看表结构")
print("3. 查看表数据")
print("4. 执行自定义SQL查询")
print("5. 退出")
choice = input("\n请选择操作 (1-5): ").strip()
if choice == "1":
# 显示所有表
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = cursor.fetchall()
if not tables:
print("数据库中没有表")
else:
print(f"\n数据库中的表 ({len(tables)}):")
for i, table in enumerate(tables):
print(f"{i+1}. {table[0]}")
elif choice == "2":
# 查看表结构
table_name = input("请输入表名: ").strip()
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
if not cursor.fetchone():
print(f"'{table_name}' 不存在!")
continue
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
print(f"\n'{table_name}' 的结构:")
print("-" * 40)
print(f"{'序号':5} {'列名':20} {'类型':15} {'是否可空':10} {'默认值':15}")
print("-" * 70)
for col in columns:
col_id, name, type_, notnull, default_val, pk = col
print(f"{col_id:5} {name:20} {type_:15} {'' if notnull else '':10} {str(default_val)[:15]:15}")
elif choice == "3":
# 查看表数据
table_name = input("请输入表名: ").strip()
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table_name}';")
if not cursor.fetchone():
print(f"'{table_name}' 不存在!")
continue
limit = input("显示多少条记录 (默认10): ").strip() or "10"
try:
limit = int(limit)
except ValueError:
print("请输入有效数字!")
continue
cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
total = cursor.fetchone()[0]
print(f"\n'{table_name}' 共有 {total} 条记录,显示前 {limit} 条:")
if total == 0:
print("表中没有数据")
continue
# 获取列名
cursor.execute(f"PRAGMA table_info({table_name});")
columns = cursor.fetchall()
col_names = [col[1] for col in columns]
# 显示数据
cursor.execute(f"SELECT * FROM {table_name} LIMIT {limit};")
rows = cursor.fetchall()
# 打印列名
header = " | ".join(col_names)
print("\n" + header)
print("-" * len(header))
# 打印数据
for row in rows:
row_str = []
for item in row:
item_str = str(item)
if len(item_str) > 20:
item_str = item_str[:17] + "..."
row_str.append(item_str)
print(" | ".join(row_str))
elif choice == "4":
# 执行自定义SQL
sql = input("请输入SQL查询语句: ").strip()
if not sql:
continue
try:
cursor.execute(sql)
if sql.lower().startswith(("select", "pragma")):
rows = cursor.fetchall()
if not rows:
print("查询返回0条结果")
continue
# 获取列名
col_names = [description[0] for description in cursor.description]
# 打印列名
header = " | ".join(col_names)
print("\n" + header)
print("-" * len(header))
# 最多显示50条
display_rows = rows[:50]
for row in display_rows:
row_str = []
for item in row:
item_str = str(item)
if len(item_str) > 20:
item_str = item_str[:17] + "..."
row_str.append(item_str)
print(" | ".join(row_str))
if len(rows) > 50:
print(f"...(还有 {len(rows)-50} 条记录)")
print(f"\n总共 {len(rows)} 条记录")
else:
conn.commit()
print("查询执行成功")
except sqlite3.Error as e:
print(f"SQL错误: {e}")
elif choice == "5":
# 退出
break
else:
print("无效选择请输入1-5之间的数字")
conn.close()
print("\n已关闭数据库连接。再见!")
except sqlite3.Error as e:
print(f"SQLite 错误: {e}")
except Exception as e:
print(f"发生错误: {e}")
if __name__ == "__main__":
main()