Files
xinghuoapi/backend/internal/service/force_cache_billing_test.go
erio 5e98445b22 feat(antigravity): comprehensive enhancements - model mapping, rate limiting, scheduling & ops
Key changes:
- Upgrade model mapping: Opus 4.5 → Opus 4.6-thinking with precise matching
- Unified rate limiting: scope-level → model-level with Redis snapshot sync
- Load-balanced scheduling by call count with smart retry mechanism
- Force cache billing support
- Model identity injection in prompts with leak prevention
- Thinking mode auto-handling (max_tokens/budget_tokens fix)
- Frontend: whitelist mode toggle, model mapping validation, status indicators
- Gemini session fallback with Redis Trie O(L) matching
- Ops: enhanced concurrency monitoring, account availability, retry logic
- Migration scripts: 049-051 for model mapping unification
2026-02-07 12:31:10 +08:00

134 lines
3.6 KiB
Go

//go:build unit
package service
import (
"context"
"testing"
)
func TestIsForceCacheBilling(t *testing.T) {
tests := []struct {
name string
ctx context.Context
expected bool
}{
{
name: "context without force cache billing",
ctx: context.Background(),
expected: false,
},
{
name: "context with force cache billing set to true",
ctx: context.WithValue(context.Background(), ForceCacheBillingContextKey, true),
expected: true,
},
{
name: "context with force cache billing set to false",
ctx: context.WithValue(context.Background(), ForceCacheBillingContextKey, false),
expected: false,
},
{
name: "context with wrong type value",
ctx: context.WithValue(context.Background(), ForceCacheBillingContextKey, "true"),
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := IsForceCacheBilling(tt.ctx)
if result != tt.expected {
t.Errorf("IsForceCacheBilling() = %v, want %v", result, tt.expected)
}
})
}
}
func TestWithForceCacheBilling(t *testing.T) {
ctx := context.Background()
// 原始上下文没有标记
if IsForceCacheBilling(ctx) {
t.Error("original context should not have force cache billing")
}
// 使用 WithForceCacheBilling 后应该有标记
newCtx := WithForceCacheBilling(ctx)
if !IsForceCacheBilling(newCtx) {
t.Error("new context should have force cache billing")
}
// 原始上下文应该不受影响
if IsForceCacheBilling(ctx) {
t.Error("original context should still not have force cache billing")
}
}
func TestForceCacheBilling_TokenConversion(t *testing.T) {
tests := []struct {
name string
forceCacheBilling bool
inputTokens int
cacheReadInputTokens int
expectedInputTokens int
expectedCacheReadTokens int
}{
{
name: "force cache billing converts input to cache_read",
forceCacheBilling: true,
inputTokens: 1000,
cacheReadInputTokens: 500,
expectedInputTokens: 0,
expectedCacheReadTokens: 1500, // 500 + 1000
},
{
name: "no force cache billing keeps tokens unchanged",
forceCacheBilling: false,
inputTokens: 1000,
cacheReadInputTokens: 500,
expectedInputTokens: 1000,
expectedCacheReadTokens: 500,
},
{
name: "force cache billing with zero input tokens does nothing",
forceCacheBilling: true,
inputTokens: 0,
cacheReadInputTokens: 500,
expectedInputTokens: 0,
expectedCacheReadTokens: 500,
},
{
name: "force cache billing with zero cache_read tokens",
forceCacheBilling: true,
inputTokens: 1000,
cacheReadInputTokens: 0,
expectedInputTokens: 0,
expectedCacheReadTokens: 1000,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// 模拟 RecordUsage 中的 ForceCacheBilling 逻辑
usage := ClaudeUsage{
InputTokens: tt.inputTokens,
CacheReadInputTokens: tt.cacheReadInputTokens,
}
// 这是 RecordUsage 中的实际逻辑
if tt.forceCacheBilling && usage.InputTokens > 0 {
usage.CacheReadInputTokens += usage.InputTokens
usage.InputTokens = 0
}
if usage.InputTokens != tt.expectedInputTokens {
t.Errorf("InputTokens = %d, want %d", usage.InputTokens, tt.expectedInputTokens)
}
if usage.CacheReadInputTokens != tt.expectedCacheReadTokens {
t.Errorf("CacheReadInputTokens = %d, want %d", usage.CacheReadInputTokens, tt.expectedCacheReadTokens)
}
})
}
}