"""Behavioral fingerprint collector — consistency, instruction compliance, HTTP headers.""" import re from typing import Dict, List from core.client import AIClient from core.models import BehavioralFingerprint, CollectionConfig from utils.text_analysis import text_similarity # Prompts for consistency testing (repeated multiple times) CONSISTENCY_PROMPTS = [ "In exactly 3 sentences, explain what an API is.", "List exactly 5 programming languages and nothing else.", "Translate 'Hello, how are you?' to French, Spanish, and German. Give only the translations.", ] # Instruction compliance tests with verification functions INSTRUCTION_TESTS = [ { "prompt": "Respond with exactly the word 'HELLO' and nothing else.", "check": lambda text: text.strip().upper() == "HELLO", "name": "exact_word", }, { "prompt": "Write a sentence that contains exactly 10 words.", "check": lambda text: abs(len(re.findall(r'\b\w+\b', text.strip().split('\n')[0])) - 10) <= 1, "name": "word_count", }, { "prompt": "List 3 colors, one per line, with no numbering or bullets.", "check": lambda text: ( 2 <= len([l for l in text.strip().split('\n') if l.strip()]) <= 4 and not any(re.match(r'^\s*[\d\-\*\u2022]', l) for l in text.strip().split('\n') if l.strip()) ), "name": "format_compliance", }, { "prompt": 'Answer in JSON format: {"name": "your_name", "type": "AI"}', "check": lambda text: '{' in text and '}' in text and '"name"' in text, "name": "json_format", }, { "prompt": "Start your response with the word 'Actually' and explain why the sky is blue in 2 sentences.", "check": lambda text: text.strip().lower().startswith("actually"), "name": "start_word", }, ] # Headers of interest for fingerprinting INTERESTING_HEADERS = [ "server", "x-request-id", "x-ratelimit-limit-requests", "x-ratelimit-limit-tokens", "cf-ray", "cf-cache-status", "x-cloud-trace-context", "via", "x-powered-by", "x-served-by", "request-id", "anthropic-ratelimit-requests-limit", "anthropic-ratelimit-tokens-limit", ] async def collect_behavioral(client: AIClient, config: CollectionConfig, progress_callback=None) -> BehavioralFingerprint: """ Collect behavioral fingerprint from an AI API channel. Tests response consistency, instruction compliance, and HTTP header patterns. """ consistency_scores: List[float] = [] instruction_compliance: Dict[str, bool] = {} response_headers: Dict[str, str] = {} total_tasks = (len(CONSISTENCY_PROMPTS) * config.repeat_count + len(INSTRUCTION_TESTS) + 1) # +1 for header collection completed = 0 # === Consistency testing === for prompt_idx, prompt in enumerate(CONSISTENCY_PROMPTS): responses: List[str] = [] for repeat in range(config.repeat_count): try: text, _, headers = await client.send_message( prompt=prompt, max_tokens=256, temperature=0.0, # Deterministic for consistency testing ) responses.append(text) # Capture headers from first successful response if not response_headers and headers: for key in INTERESTING_HEADERS: for h_key, h_val in headers.items(): if h_key.lower() == key.lower(): response_headers[key] = h_val except Exception as e: if progress_callback: progress_callback(f" ⚠ Consistency prompt {prompt_idx+1} repeat {repeat+1} failed: {e}") completed += 1 if progress_callback: progress_callback(f" Behavioral: {completed}/{total_tasks}") # Calculate pairwise similarity between responses if len(responses) >= 2: pair_scores = [] for i in range(len(responses)): for j in range(i + 1, len(responses)): sim = text_similarity(responses[i], responses[j]) pair_scores.append(sim) avg_consistency = sum(pair_scores) / len(pair_scores) consistency_scores.append(avg_consistency) # === Instruction compliance testing === for test in INSTRUCTION_TESTS: try: text, _, headers = await client.send_message( prompt=test["prompt"], max_tokens=256, ) try: passed = test["check"](text) except Exception: passed = False instruction_compliance[test["name"]] = passed # Update headers if needed if not response_headers and headers: for key in INTERESTING_HEADERS: for h_key, h_val in headers.items(): if h_key.lower() == key.lower(): response_headers[key] = h_val except Exception as e: instruction_compliance[test["name"]] = False if progress_callback: progress_callback(f" ⚠ Instruction test '{test['name']}' failed: {e}") completed += 1 if progress_callback: progress_callback(f" Behavioral: {completed}/{total_tasks}") # === Additional header collection via a simple request === if not response_headers: try: _, _, headers = await client.send_message( prompt="Say 'hello'.", max_tokens=16, ) if headers: for key in INTERESTING_HEADERS: for h_key, h_val in headers.items(): if h_key.lower() == key.lower(): response_headers[key] = h_val except Exception: pass completed += 1 if progress_callback: progress_callback(f" Behavioral: {completed}/{total_tasks}") return BehavioralFingerprint( consistency_scores=consistency_scores, instruction_compliance=instruction_compliance, response_headers=response_headers, )