- 4维指纹采集: 性能/语言/能力/行为 - models.py 已加入 IdentityFingerprintModel (第5维数据模型) - comparator.py 已升级为5维评分 (含identity维度比较) - reporter.py 已加入身份验证报告输出 - main.py 已集成identity采集流程 - identity collector 待下次提交补充完整代码
248 lines
8.9 KiB
Python
248 lines
8.9 KiB
Python
"""AI API Fingerprint Detection Tool — CLI Entry Point."""
|
|
|
|
import sys
|
|
import os
|
|
import asyncio
|
|
import argparse
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
from rich.console import Console
|
|
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
|
|
|
|
from core.config import load_config
|
|
from core.client import AIClient
|
|
from core.models import FullFingerprint, CollectionConfig, IdentityFingerprintModel
|
|
from collectors.performance import collect_performance
|
|
from collectors.language import collect_language
|
|
from collectors.capability import collect_capability
|
|
from collectors.behavioral import collect_behavioral
|
|
from collectors.identity import collect_identity
|
|
from analysis.comparator import compare_fingerprints
|
|
from analysis.reporter import print_report, save_json_report
|
|
|
|
console = Console()
|
|
|
|
|
|
async def collect_fingerprint(channel_name: str, client: AIClient,
|
|
config: CollectionConfig,
|
|
progress: Progress, task_id,
|
|
expected_model: str = "claude") -> FullFingerprint:
|
|
"""Collect full fingerprint from a single channel."""
|
|
|
|
raw_responses = {}
|
|
|
|
def make_callback(phase_name):
|
|
def callback(msg):
|
|
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] {msg}")
|
|
return callback
|
|
|
|
# Phase 1: Performance
|
|
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] ⚡ Collecting performance...")
|
|
perf = await collect_performance(client, config, make_callback("performance"))
|
|
progress.advance(task_id, 20)
|
|
|
|
# Phase 2: Language
|
|
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 📝 Collecting language patterns...")
|
|
lang = await collect_language(client, config, make_callback("language"))
|
|
progress.advance(task_id, 20)
|
|
|
|
# Phase 3: Capability
|
|
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 🧪 Collecting capabilities...")
|
|
cap = await collect_capability(client, config, make_callback("capability"))
|
|
progress.advance(task_id, 20)
|
|
|
|
# Phase 4: Behavioral
|
|
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 🔍 Collecting behavioral patterns...")
|
|
beh = await collect_behavioral(client, config, make_callback("behavioral"))
|
|
progress.advance(task_id, 20)
|
|
|
|
# Phase 5: Identity Verification
|
|
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 🆔 Collecting identity verification...")
|
|
identity_fp = await collect_identity(client, config, expected_model, make_callback("identity"))
|
|
progress.advance(task_id, 20)
|
|
|
|
# Convert identity fingerprint to model for storage
|
|
identity_model = IdentityFingerprintModel(
|
|
claimed_identity=identity_fp.claimed_identity,
|
|
claimed_developer=identity_fp.claimed_developer,
|
|
identity_consistency=identity_fp.identity_consistency,
|
|
detected_model=identity_fp.detected_model,
|
|
detection_confidence=identity_fp.detection_confidence,
|
|
model_scores=identity_fp.model_scores,
|
|
vocab_markers=identity_fp.vocab_markers,
|
|
marker_details=identity_fp.marker_details,
|
|
signature_behaviors=identity_fp.signature_behaviors,
|
|
system_prompt_leaked=identity_fp.system_prompt_leaked,
|
|
system_prompt_hints=identity_fp.system_prompt_hints,
|
|
knowledge_results=identity_fp.knowledge_results,
|
|
identity_responses=identity_fp.identity_responses,
|
|
is_claimed_model=identity_fp.is_claimed_model,
|
|
identity_mismatch_reasons=identity_fp.identity_mismatch_reasons,
|
|
)
|
|
|
|
return FullFingerprint(
|
|
channel_name=channel_name,
|
|
timestamp=datetime.now().isoformat(),
|
|
performance=perf,
|
|
language=lang,
|
|
capability=cap,
|
|
behavioral=beh,
|
|
identity=identity_model,
|
|
raw_responses=raw_responses,
|
|
)
|
|
|
|
|
|
async def main_async(args):
|
|
"""Main async workflow."""
|
|
|
|
console.print()
|
|
console.print("[bold cyan]🔍 AI API 指纹检测对比工具[/bold cyan]")
|
|
console.print("[dim] AI API Fingerprint Detection & Comparison Tool[/dim]")
|
|
console.print()
|
|
|
|
# Load configuration
|
|
try:
|
|
cfg = load_config(args.config)
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Configuration error: {e}[/red]")
|
|
sys.exit(1)
|
|
|
|
genuine_cfg = cfg['genuine']
|
|
suspect_cfg = cfg['suspect']
|
|
collection_cfg = cfg['collection']
|
|
output_cfg = cfg.get('output', {})
|
|
|
|
console.print(f"[green]✓[/green] Config loaded: {args.config}")
|
|
console.print(f" Genuine: {genuine_cfg.base_url} (model: {genuine_cfg.model})")
|
|
console.print(f" Suspect: {suspect_cfg.base_url} (model: {suspect_cfg.model})")
|
|
console.print(f" Repeat count: {collection_cfg.repeat_count}")
|
|
console.print()
|
|
|
|
genuine_fp = None
|
|
suspect_fp = None
|
|
|
|
# Check for cached genuine fingerprint
|
|
cache_path = Path(output_cfg.get('results_dir', 'results')) / "genuine_cache.json"
|
|
|
|
if args.skip_genuine and cache_path.exists():
|
|
console.print("[yellow]⏭ Skipping genuine collection (using cache)[/yellow]")
|
|
import json
|
|
with open(cache_path, 'r', encoding='utf-8') as f:
|
|
cache_data = json.load(f)
|
|
genuine_fp = FullFingerprint.from_dict(cache_data)
|
|
console.print()
|
|
|
|
with Progress(
|
|
SpinnerColumn(),
|
|
TextColumn("[progress.description]{task.description}"),
|
|
BarColumn(),
|
|
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
|
|
TimeElapsedColumn(),
|
|
console=console,
|
|
) as progress:
|
|
|
|
# Collect genuine fingerprint
|
|
if genuine_fp is None:
|
|
task1 = progress.add_task("[green]Genuine channel", total=100)
|
|
|
|
async with AIClient(
|
|
base_url=genuine_cfg.base_url,
|
|
api_key=genuine_cfg.api_key,
|
|
model=genuine_cfg.model,
|
|
timeout=collection_cfg.timeout,
|
|
anthropic_version=collection_cfg.anthropic_version,
|
|
) as client:
|
|
genuine_fp = await collect_fingerprint(
|
|
"Genuine", client, collection_cfg, progress, task1,
|
|
expected_model=genuine_cfg.model
|
|
)
|
|
|
|
progress.update(task1, description="[green]✓ Genuine channel complete[/green]")
|
|
|
|
# Cache genuine fingerprint
|
|
try:
|
|
import json
|
|
cache_dir = Path(output_cfg.get('results_dir', 'results'))
|
|
cache_dir.mkdir(parents=True, exist_ok=True)
|
|
with open(cache_path, 'w', encoding='utf-8') as f:
|
|
json.dump(genuine_fp.to_dict(), f, ensure_ascii=False, indent=2, default=str)
|
|
except Exception:
|
|
pass
|
|
|
|
# Collect suspect fingerprint
|
|
task2 = progress.add_task("[yellow]Suspect channel", total=100)
|
|
|
|
async with AIClient(
|
|
base_url=suspect_cfg.base_url,
|
|
api_key=suspect_cfg.api_key,
|
|
model=suspect_cfg.model,
|
|
timeout=collection_cfg.timeout,
|
|
anthropic_version=collection_cfg.anthropic_version,
|
|
) as client:
|
|
suspect_fp = await collect_fingerprint(
|
|
"Suspect", client, collection_cfg, progress, task2,
|
|
expected_model=suspect_cfg.model
|
|
)
|
|
|
|
progress.update(task2, description="[yellow]✓ Suspect channel complete[/yellow]")
|
|
|
|
console.print()
|
|
console.print("[bold]🔬 Analyzing fingerprints...[/bold]")
|
|
console.print()
|
|
|
|
# Compare fingerprints
|
|
result = compare_fingerprints(genuine_fp, suspect_fp)
|
|
|
|
# Print terminal report
|
|
print_report(result, genuine_fp, suspect_fp)
|
|
|
|
# Save JSON report
|
|
if output_cfg.get('save_json', True):
|
|
results_dir = output_cfg.get('results_dir', 'results')
|
|
save_json_report(result, genuine_fp, suspect_fp, results_dir)
|
|
|
|
console.print()
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="AI API Fingerprint Detection & Comparison Tool",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
python main.py --config config.yaml
|
|
python main.py --config config.yaml --skip-genuine
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"--config", "-c",
|
|
default="config.yaml",
|
|
help="Path to configuration YAML file (default: config.yaml)",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-genuine",
|
|
action="store_true",
|
|
help="Skip genuine channel collection and use cached results",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
try:
|
|
asyncio.run(main_async(args))
|
|
except KeyboardInterrupt:
|
|
console.print("\n[yellow]⚠ Interrupted by user[/yellow]")
|
|
sys.exit(130)
|
|
except Exception as e:
|
|
console.print(f"\n[red]❌ Fatal error: {e}[/red]")
|
|
import traceback
|
|
console.print(f"[dim]{traceback.format_exc()}[/dim]")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|