"""AI API Fingerprint Detection Tool โ€” CLI Entry Point.""" import sys import os import asyncio import argparse from datetime import datetime from pathlib import Path # Add project root to path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from rich.console import Console from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TimeElapsedColumn from core.config import load_config from core.client import AIClient from core.models import FullFingerprint, CollectionConfig, IdentityFingerprintModel from collectors.performance import collect_performance from collectors.language import collect_language from collectors.capability import collect_capability from collectors.behavioral import collect_behavioral from collectors.identity import collect_identity from analysis.comparator import compare_fingerprints from analysis.reporter import print_report, save_json_report console = Console() async def collect_fingerprint(channel_name: str, client: AIClient, config: CollectionConfig, progress: Progress, task_id, expected_model: str = "claude") -> FullFingerprint: """Collect full fingerprint from a single channel.""" raw_responses = {} def make_callback(phase_name): def callback(msg): progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] {msg}") return callback # Phase 1: Performance progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] โšก Collecting performance...") perf = await collect_performance(client, config, make_callback("performance")) progress.advance(task_id, 20) # Phase 2: Language progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] ๐Ÿ“ Collecting language patterns...") lang = await collect_language(client, config, make_callback("language")) progress.advance(task_id, 20) # Phase 3: Capability progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] ๐Ÿงช Collecting capabilities...") cap = await collect_capability(client, config, make_callback("capability")) progress.advance(task_id, 20) # Phase 4: Behavioral progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] ๐Ÿ” Collecting behavioral patterns...") beh = await collect_behavioral(client, config, make_callback("behavioral")) progress.advance(task_id, 20) # Phase 5: Identity Verification progress.update(task_id, description=f"[cyan]{channel_name}[/cyan] ๐Ÿ†” Collecting identity verification...") identity_fp = await collect_identity(client, config, expected_model, make_callback("identity")) progress.advance(task_id, 20) # Convert identity fingerprint to model for storage identity_model = IdentityFingerprintModel( claimed_identity=identity_fp.claimed_identity, claimed_developer=identity_fp.claimed_developer, identity_consistency=identity_fp.identity_consistency, detected_model=identity_fp.detected_model, detection_confidence=identity_fp.detection_confidence, model_scores=identity_fp.model_scores, vocab_markers=identity_fp.vocab_markers, marker_details=identity_fp.marker_details, signature_behaviors=identity_fp.signature_behaviors, system_prompt_leaked=identity_fp.system_prompt_leaked, system_prompt_hints=identity_fp.system_prompt_hints, knowledge_results=identity_fp.knowledge_results, identity_responses=identity_fp.identity_responses, is_claimed_model=identity_fp.is_claimed_model, identity_mismatch_reasons=identity_fp.identity_mismatch_reasons, ) return FullFingerprint( channel_name=channel_name, timestamp=datetime.now().isoformat(), performance=perf, language=lang, capability=cap, behavioral=beh, identity=identity_model, raw_responses=raw_responses, ) async def main_async(args): """Main async workflow.""" console.print() console.print("[bold cyan]๐Ÿ” AI API ๆŒ‡็บนๆฃ€ๆต‹ๅฏนๆฏ”ๅทฅๅ…ท[/bold cyan]") console.print("[dim] AI API Fingerprint Detection & Comparison Tool[/dim]") console.print() # Load configuration try: cfg = load_config(args.config) except Exception as e: console.print(f"[red]โŒ Configuration error: {e}[/red]") sys.exit(1) genuine_cfg = cfg['genuine'] suspect_cfg = cfg['suspect'] collection_cfg = cfg['collection'] output_cfg = cfg.get('output', {}) console.print(f"[green]โœ“[/green] Config loaded: {args.config}") console.print(f" Genuine: {genuine_cfg.base_url} (model: {genuine_cfg.model})") console.print(f" Suspect: {suspect_cfg.base_url} (model: {suspect_cfg.model})") console.print(f" Repeat count: {collection_cfg.repeat_count}") console.print() genuine_fp = None suspect_fp = None # Check for cached genuine fingerprint cache_path = Path(output_cfg.get('results_dir', 'results')) / "genuine_cache.json" if args.skip_genuine and cache_path.exists(): console.print("[yellow]โญ Skipping genuine collection (using cache)[/yellow]") import json with open(cache_path, 'r', encoding='utf-8') as f: cache_data = json.load(f) genuine_fp = FullFingerprint.from_dict(cache_data) console.print() with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), TimeElapsedColumn(), console=console, ) as progress: # Collect genuine fingerprint if genuine_fp is None: task1 = progress.add_task("[green]Genuine channel", total=100) async with AIClient( base_url=genuine_cfg.base_url, api_key=genuine_cfg.api_key, model=genuine_cfg.model, timeout=collection_cfg.timeout, anthropic_version=collection_cfg.anthropic_version, ) as client: genuine_fp = await collect_fingerprint( "Genuine", client, collection_cfg, progress, task1, expected_model=genuine_cfg.model ) progress.update(task1, description="[green]โœ“ Genuine channel complete[/green]") # Cache genuine fingerprint try: import json cache_dir = Path(output_cfg.get('results_dir', 'results')) cache_dir.mkdir(parents=True, exist_ok=True) with open(cache_path, 'w', encoding='utf-8') as f: json.dump(genuine_fp.to_dict(), f, ensure_ascii=False, indent=2, default=str) except Exception: pass # Collect suspect fingerprint task2 = progress.add_task("[yellow]Suspect channel", total=100) async with AIClient( base_url=suspect_cfg.base_url, api_key=suspect_cfg.api_key, model=suspect_cfg.model, timeout=collection_cfg.timeout, anthropic_version=collection_cfg.anthropic_version, ) as client: suspect_fp = await collect_fingerprint( "Suspect", client, collection_cfg, progress, task2, expected_model=suspect_cfg.model ) progress.update(task2, description="[yellow]โœ“ Suspect channel complete[/yellow]") console.print() console.print("[bold]๐Ÿ”ฌ Analyzing fingerprints...[/bold]") console.print() # Compare fingerprints result = compare_fingerprints(genuine_fp, suspect_fp) # Print terminal report print_report(result, genuine_fp, suspect_fp) # Save JSON report if output_cfg.get('save_json', True): results_dir = output_cfg.get('results_dir', 'results') save_json_report(result, genuine_fp, suspect_fp, results_dir) console.print() def main(): parser = argparse.ArgumentParser( description="AI API Fingerprint Detection & Comparison Tool", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python main.py --config config.yaml python main.py --config config.yaml --skip-genuine """, ) parser.add_argument( "--config", "-c", default="config.yaml", help="Path to configuration YAML file (default: config.yaml)", ) parser.add_argument( "--skip-genuine", action="store_true", help="Skip genuine channel collection and use cached results", ) args = parser.parse_args() try: asyncio.run(main_async(args)) except KeyboardInterrupt: console.print("\n[yellow]โš  Interrupted by user[/yellow]") sys.exit(130) except Exception as e: console.print(f"\n[red]โŒ Fatal error: {e}[/red]") import traceback console.print(f"[dim]{traceback.format_exc()}[/dim]") sys.exit(1) if __name__ == "__main__": main()