Files
ai-xn-check/main.py
nosqli cdcd69256b feat: AI API 指纹检测对比工具 - 初始版本
- 4维指纹采集: 性能/语言/能力/行为
- models.py 已加入 IdentityFingerprintModel (第5维数据模型)
- comparator.py 已升级为5维评分 (含identity维度比较)
- reporter.py 已加入身份验证报告输出
- main.py 已集成identity采集流程
- identity collector 待下次提交补充完整代码
2026-03-09 00:15:03 +08:00

248 lines
8.9 KiB
Python

"""AI API Fingerprint Detection Tool — CLI Entry Point."""
import sys
import os
import asyncio
import argparse
from datetime import datetime
from pathlib import Path
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn
from core.config import load_config
from core.client import AIClient
from core.models import FullFingerprint, CollectionConfig, IdentityFingerprintModel
from collectors.performance import collect_performance
from collectors.language import collect_language
from collectors.capability import collect_capability
from collectors.behavioral import collect_behavioral
from collectors.identity import collect_identity
from analysis.comparator import compare_fingerprints
from analysis.reporter import print_report, save_json_report
console = Console()
async def collect_fingerprint(channel_name: str, client: AIClient,
config: CollectionConfig,
progress: Progress, task_id,
expected_model: str = "claude") -> FullFingerprint:
"""Collect full fingerprint from a single channel."""
raw_responses = {}
def make_callback(phase_name):
def callback(msg):
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] {msg}")
return callback
# Phase 1: Performance
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] ⚡ Collecting performance...")
perf = await collect_performance(client, config, make_callback("performance"))
progress.advance(task_id, 20)
# Phase 2: Language
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 📝 Collecting language patterns...")
lang = await collect_language(client, config, make_callback("language"))
progress.advance(task_id, 20)
# Phase 3: Capability
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 🧪 Collecting capabilities...")
cap = await collect_capability(client, config, make_callback("capability"))
progress.advance(task_id, 20)
# Phase 4: Behavioral
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 🔍 Collecting behavioral patterns...")
beh = await collect_behavioral(client, config, make_callback("behavioral"))
progress.advance(task_id, 20)
# Phase 5: Identity Verification
progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] 🆔 Collecting identity verification...")
identity_fp = await collect_identity(client, config, expected_model, make_callback("identity"))
progress.advance(task_id, 20)
# Convert identity fingerprint to model for storage
identity_model = IdentityFingerprintModel(
claimed_identity=identity_fp.claimed_identity,
claimed_developer=identity_fp.claimed_developer,
identity_consistency=identity_fp.identity_consistency,
detected_model=identity_fp.detected_model,
detection_confidence=identity_fp.detection_confidence,
model_scores=identity_fp.model_scores,
vocab_markers=identity_fp.vocab_markers,
marker_details=identity_fp.marker_details,
signature_behaviors=identity_fp.signature_behaviors,
system_prompt_leaked=identity_fp.system_prompt_leaked,
system_prompt_hints=identity_fp.system_prompt_hints,
knowledge_results=identity_fp.knowledge_results,
identity_responses=identity_fp.identity_responses,
is_claimed_model=identity_fp.is_claimed_model,
identity_mismatch_reasons=identity_fp.identity_mismatch_reasons,
)
return FullFingerprint(
channel_name=channel_name,
timestamp=datetime.now().isoformat(),
performance=perf,
language=lang,
capability=cap,
behavioral=beh,
identity=identity_model,
raw_responses=raw_responses,
)
async def main_async(args):
"""Main async workflow."""
console.print()
console.print("[bold cyan]🔍 AI API 指纹检测对比工具[/bold cyan]")
console.print("[dim] AI API Fingerprint Detection & Comparison Tool[/dim]")
console.print()
# Load configuration
try:
cfg = load_config(args.config)
except Exception as e:
console.print(f"[red]❌ Configuration error: {e}[/red]")
sys.exit(1)
genuine_cfg = cfg['genuine']
suspect_cfg = cfg['suspect']
collection_cfg = cfg['collection']
output_cfg = cfg.get('output', {})
console.print(f"[green]✓[/green] Config loaded: {args.config}")
console.print(f" Genuine: {genuine_cfg.base_url} (model: {genuine_cfg.model})")
console.print(f" Suspect: {suspect_cfg.base_url} (model: {suspect_cfg.model})")
console.print(f" Repeat count: {collection_cfg.repeat_count}")
console.print()
genuine_fp = None
suspect_fp = None
# Check for cached genuine fingerprint
cache_path = Path(output_cfg.get('results_dir', 'results')) / "genuine_cache.json"
if args.skip_genuine and cache_path.exists():
console.print("[yellow]⏭ Skipping genuine collection (using cache)[/yellow]")
import json
with open(cache_path, 'r', encoding='utf-8') as f:
cache_data = json.load(f)
genuine_fp = FullFingerprint.from_dict(cache_data)
console.print()
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
# Collect genuine fingerprint
if genuine_fp is None:
task1 = progress.add_task("[green]Genuine channel", total=100)
async with AIClient(
base_url=genuine_cfg.base_url,
api_key=genuine_cfg.api_key,
model=genuine_cfg.model,
timeout=collection_cfg.timeout,
anthropic_version=collection_cfg.anthropic_version,
) as client:
genuine_fp = await collect_fingerprint(
"Genuine", client, collection_cfg, progress, task1,
expected_model=genuine_cfg.model
)
progress.update(task1, description="[green]✓ Genuine channel complete[/green]")
# Cache genuine fingerprint
try:
import json
cache_dir = Path(output_cfg.get('results_dir', 'results'))
cache_dir.mkdir(parents=True, exist_ok=True)
with open(cache_path, 'w', encoding='utf-8') as f:
json.dump(genuine_fp.to_dict(), f, ensure_ascii=False, indent=2, default=str)
except Exception:
pass
# Collect suspect fingerprint
task2 = progress.add_task("[yellow]Suspect channel", total=100)
async with AIClient(
base_url=suspect_cfg.base_url,
api_key=suspect_cfg.api_key,
model=suspect_cfg.model,
timeout=collection_cfg.timeout,
anthropic_version=collection_cfg.anthropic_version,
) as client:
suspect_fp = await collect_fingerprint(
"Suspect", client, collection_cfg, progress, task2,
expected_model=suspect_cfg.model
)
progress.update(task2, description="[yellow]✓ Suspect channel complete[/yellow]")
console.print()
console.print("[bold]🔬 Analyzing fingerprints...[/bold]")
console.print()
# Compare fingerprints
result = compare_fingerprints(genuine_fp, suspect_fp)
# Print terminal report
print_report(result, genuine_fp, suspect_fp)
# Save JSON report
if output_cfg.get('save_json', True):
results_dir = output_cfg.get('results_dir', 'results')
save_json_report(result, genuine_fp, suspect_fp, results_dir)
console.print()
def main():
parser = argparse.ArgumentParser(
description="AI API Fingerprint Detection & Comparison Tool",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python main.py --config config.yaml
python main.py --config config.yaml --skip-genuine
""",
)
parser.add_argument(
"--config", "-c",
default="config.yaml",
help="Path to configuration YAML file (default: config.yaml)",
)
parser.add_argument(
"--skip-genuine",
action="store_true",
help="Skip genuine channel collection and use cached results",
)
args = parser.parse_args()
try:
asyncio.run(main_async(args))
except KeyboardInterrupt:
console.print("\n[yellow]⚠ Interrupted by user[/yellow]")
sys.exit(130)
except Exception as e:
console.print(f"\n[red]❌ Fatal error: {e}[/red]")
import traceback
console.print(f"[dim]{traceback.format_exc()}[/dim]")
sys.exit(1)
if __name__ == "__main__":
main()