From bbad9171015a20b59c4ddaf8d4622bbec9176558 Mon Sep 17 00:00:00 2001 From: Thomas Date: Sat, 7 Feb 2026 18:17:22 +0800 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20=E8=A1=A5=E5=85=A8=20streaming=20mes?= =?UTF-8?q?sage=5Fdelta=20=E4=BA=8B=E4=BB=B6=E7=BC=BA=E5=A4=B1=E7=9A=84=20?= =?UTF-8?q?input=5Ftokens=20=E5=92=8C=20cache=20=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E5=AD=97=E6=AE=B5=20(#2881)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 当上游为 AWS Bedrock 时,message_delta 的 usage 可能缺少 input_tokens、 cache_creation_input_tokens、cache_read_input_tokens 等字段,导致与原生 Anthropic 格式不一致。从 message_start 积累的 claudeInfo 中补全这些字段后 重新序列化,确保客户端收到一致的 usage 格式。 --- relay/channel/claude/relay-claude.go | 31 +++ relay/channel/claude/relay-claude_test.go | 305 ++++++++++++++++++++++ 2 files changed, 336 insertions(+) create mode 100644 relay/channel/claude/relay-claude_test.go diff --git a/relay/channel/claude/relay-claude.go b/relay/channel/claude/relay-claude.go index bdb376ed..ad9d2234 100644 --- a/relay/channel/claude/relay-claude.go +++ b/relay/channel/claude/relay-claude.go @@ -544,6 +544,30 @@ type ClaudeResponseInfo struct { Done bool } +// enrichMessageDeltaUsage 补全 message_delta 事件中缺失的 input_tokens 和 cache 相关字段 +// 当上游(如 AWS Bedrock)的 message_delta 不包含这些字段时,从 claudeInfo 中积累的数据补全 +func enrichMessageDeltaUsage(claudeResponse *dto.ClaudeResponse, claudeInfo *ClaudeResponseInfo) { + if claudeResponse.Usage == nil { + claudeResponse.Usage = &dto.ClaudeUsage{} + } + if claudeResponse.Usage.InputTokens == 0 && claudeInfo.Usage.PromptTokens > 0 { + claudeResponse.Usage.InputTokens = claudeInfo.Usage.PromptTokens + } + if claudeResponse.Usage.CacheReadInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedTokens > 0 { + claudeResponse.Usage.CacheReadInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedTokens + } + if claudeResponse.Usage.CacheCreationInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens > 0 { + claudeResponse.Usage.CacheCreationInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens + } + if claudeResponse.Usage.CacheCreation == nil && + (claudeInfo.Usage.ClaudeCacheCreation5mTokens > 0 || claudeInfo.Usage.ClaudeCacheCreation1hTokens > 0) { + claudeResponse.Usage.CacheCreation = &dto.ClaudeCacheCreationUsage{ + Ephemeral5mInputTokens: claudeInfo.Usage.ClaudeCacheCreation5mTokens, + Ephemeral1hInputTokens: claudeInfo.Usage.ClaudeCacheCreation1hTokens, + } + } +} + func FormatClaudeResponseInfo(claudeResponse *dto.ClaudeResponse, oaiResponse *dto.ChatCompletionsStreamResponse, claudeInfo *ClaudeResponseInfo) bool { if claudeInfo == nil { return false @@ -638,6 +662,13 @@ func HandleStreamResponseData(c *gin.Context, info *relaycommon.RelayInfo, claud if claudeResponse.Message != nil { info.UpstreamModelName = claudeResponse.Message.Model } + } else if claudeResponse.Type == "message_delta" { + // 确保 message_delta 的 usage 包含完整的 input_tokens 和 cache 相关字段 + // 解决 AWS Bedrock 等上游返回的 message_delta 缺少这些字段的问题 + enrichMessageDeltaUsage(&claudeResponse, claudeInfo) + if newData, err := json.Marshal(claudeResponse); err == nil { + data = string(newData) + } } helper.ClaudeChunkData(c, claudeResponse, data) } else if info.RelayFormat == types.RelayFormatOpenAI { diff --git a/relay/channel/claude/relay-claude_test.go b/relay/channel/claude/relay-claude_test.go new file mode 100644 index 00000000..82c91018 --- /dev/null +++ b/relay/channel/claude/relay-claude_test.go @@ -0,0 +1,305 @@ +package claude + +import ( + "strings" + "testing" + + "github.com/QuantumNous/new-api/dto" +) + +func TestFormatClaudeResponseInfo_MessageStart(t *testing.T) { + claudeInfo := &ClaudeResponseInfo{ + Usage: &dto.Usage{}, + } + claudeResponse := &dto.ClaudeResponse{ + Type: "message_start", + Message: &dto.ClaudeMediaMessage{ + Id: "msg_123", + Model: "claude-3-5-sonnet", + Usage: &dto.ClaudeUsage{ + InputTokens: 100, + OutputTokens: 1, + CacheCreationInputTokens: 50, + CacheReadInputTokens: 30, + }, + }, + } + + ok := FormatClaudeResponseInfo(claudeResponse, nil, claudeInfo) + if !ok { + t.Fatal("expected true") + } + if claudeInfo.Usage.PromptTokens != 100 { + t.Errorf("PromptTokens = %d, want 100", claudeInfo.Usage.PromptTokens) + } + if claudeInfo.Usage.PromptTokensDetails.CachedTokens != 30 { + t.Errorf("CachedTokens = %d, want 30", claudeInfo.Usage.PromptTokensDetails.CachedTokens) + } + if claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens != 50 { + t.Errorf("CachedCreationTokens = %d, want 50", claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens) + } + if claudeInfo.ResponseId != "msg_123" { + t.Errorf("ResponseId = %s, want msg_123", claudeInfo.ResponseId) + } + if claudeInfo.Model != "claude-3-5-sonnet" { + t.Errorf("Model = %s, want claude-3-5-sonnet", claudeInfo.Model) + } +} + +func TestFormatClaudeResponseInfo_MessageDelta_FullUsage(t *testing.T) { + // message_start 先积累 usage + claudeInfo := &ClaudeResponseInfo{ + Usage: &dto.Usage{ + PromptTokens: 100, + PromptTokensDetails: dto.InputTokenDetails{ + CachedTokens: 30, + CachedCreationTokens: 50, + }, + CompletionTokens: 1, + }, + } + + // message_delta 带完整 usage(原生 Anthropic 场景) + claudeResponse := &dto.ClaudeResponse{ + Type: "message_delta", + Usage: &dto.ClaudeUsage{ + InputTokens: 100, + OutputTokens: 200, + CacheCreationInputTokens: 50, + CacheReadInputTokens: 30, + }, + } + + ok := FormatClaudeResponseInfo(claudeResponse, nil, claudeInfo) + if !ok { + t.Fatal("expected true") + } + if claudeInfo.Usage.PromptTokens != 100 { + t.Errorf("PromptTokens = %d, want 100", claudeInfo.Usage.PromptTokens) + } + if claudeInfo.Usage.CompletionTokens != 200 { + t.Errorf("CompletionTokens = %d, want 200", claudeInfo.Usage.CompletionTokens) + } + if claudeInfo.Usage.TotalTokens != 300 { + t.Errorf("TotalTokens = %d, want 300", claudeInfo.Usage.TotalTokens) + } + if !claudeInfo.Done { + t.Error("expected Done = true") + } +} + +func TestFormatClaudeResponseInfo_MessageDelta_OnlyOutputTokens(t *testing.T) { + // 模拟 Bedrock: message_start 已积累 usage + claudeInfo := &ClaudeResponseInfo{ + Usage: &dto.Usage{ + PromptTokens: 100, + PromptTokensDetails: dto.InputTokenDetails{ + CachedTokens: 30, + CachedCreationTokens: 50, + }, + CompletionTokens: 1, + ClaudeCacheCreation5mTokens: 10, + ClaudeCacheCreation1hTokens: 20, + }, + } + + // Bedrock 的 message_delta 只有 output_tokens,缺少 input_tokens 和 cache 字段 + claudeResponse := &dto.ClaudeResponse{ + Type: "message_delta", + Usage: &dto.ClaudeUsage{ + OutputTokens: 200, + // InputTokens, CacheCreationInputTokens, CacheReadInputTokens 都是 0 + }, + } + + ok := FormatClaudeResponseInfo(claudeResponse, nil, claudeInfo) + if !ok { + t.Fatal("expected true") + } + // PromptTokens 应保持 message_start 的值(因为 message_delta 的 InputTokens=0,不更新) + if claudeInfo.Usage.PromptTokens != 100 { + t.Errorf("PromptTokens = %d, want 100", claudeInfo.Usage.PromptTokens) + } + if claudeInfo.Usage.CompletionTokens != 200 { + t.Errorf("CompletionTokens = %d, want 200", claudeInfo.Usage.CompletionTokens) + } + if claudeInfo.Usage.TotalTokens != 300 { + t.Errorf("TotalTokens = %d, want 300", claudeInfo.Usage.TotalTokens) + } + // cache 字段应保持 message_start 的值 + if claudeInfo.Usage.PromptTokensDetails.CachedTokens != 30 { + t.Errorf("CachedTokens = %d, want 30", claudeInfo.Usage.PromptTokensDetails.CachedTokens) + } + if claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens != 50 { + t.Errorf("CachedCreationTokens = %d, want 50", claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens) + } + if claudeInfo.Usage.ClaudeCacheCreation5mTokens != 10 { + t.Errorf("ClaudeCacheCreation5mTokens = %d, want 10", claudeInfo.Usage.ClaudeCacheCreation5mTokens) + } + if claudeInfo.Usage.ClaudeCacheCreation1hTokens != 20 { + t.Errorf("ClaudeCacheCreation1hTokens = %d, want 20", claudeInfo.Usage.ClaudeCacheCreation1hTokens) + } + if !claudeInfo.Done { + t.Error("expected Done = true") + } +} + +func TestFormatClaudeResponseInfo_NilClaudeInfo(t *testing.T) { + claudeResponse := &dto.ClaudeResponse{Type: "message_start"} + ok := FormatClaudeResponseInfo(claudeResponse, nil, nil) + if ok { + t.Error("expected false for nil claudeInfo") + } +} + +func TestFormatClaudeResponseInfo_ContentBlockDelta(t *testing.T) { + text := "hello" + claudeInfo := &ClaudeResponseInfo{ + Usage: &dto.Usage{}, + ResponseText: strings.Builder{}, + } + claudeResponse := &dto.ClaudeResponse{ + Type: "content_block_delta", + Delta: &dto.ClaudeMediaMessage{ + Text: &text, + }, + } + + ok := FormatClaudeResponseInfo(claudeResponse, nil, claudeInfo) + if !ok { + t.Fatal("expected true") + } + if claudeInfo.ResponseText.String() != "hello" { + t.Errorf("ResponseText = %q, want %q", claudeInfo.ResponseText.String(), "hello") + } +} + +// TestEnrichMessageDeltaUsage 测试 message_delta 事件的 usage 补全逻辑 +// 这是修复 issue #2881 的核心逻辑:当上游(如 Bedrock)的 message_delta 缺少 +// input_tokens 和 cache 相关字段时,用 claudeInfo 中积累的数据补全 +func TestEnrichMessageDeltaUsage(t *testing.T) { + tests := []struct { + name string + claudeInfo *ClaudeResponseInfo + deltaUsage *dto.ClaudeUsage + wantInput int + wantCacheRead int + wantCacheCreate int + wantOutput int + want5m int + want1h int + }{ + { + name: "Bedrock: delta 只有 output_tokens,从 claudeInfo 补全其他字段", + claudeInfo: &ClaudeResponseInfo{ + Usage: &dto.Usage{ + PromptTokens: 100, + PromptTokensDetails: dto.InputTokenDetails{ + CachedTokens: 30, + CachedCreationTokens: 50, + }, + ClaudeCacheCreation5mTokens: 10, + ClaudeCacheCreation1hTokens: 20, + }, + }, + deltaUsage: &dto.ClaudeUsage{OutputTokens: 200}, + wantInput: 100, + wantCacheRead: 30, + wantCacheCreate: 50, + wantOutput: 200, + want5m: 10, + want1h: 20, + }, + { + name: "原生 Anthropic: delta 已包含所有字段,不覆盖", + claudeInfo: &ClaudeResponseInfo{ + Usage: &dto.Usage{ + PromptTokens: 100, + PromptTokensDetails: dto.InputTokenDetails{ + CachedTokens: 30, + CachedCreationTokens: 50, + }, + }, + }, + deltaUsage: &dto.ClaudeUsage{ + InputTokens: 100, + OutputTokens: 200, + CacheReadInputTokens: 30, + CacheCreationInputTokens: 50, + }, + wantInput: 100, + wantCacheRead: 30, + wantCacheCreate: 50, + wantOutput: 200, + }, + { + name: "delta usage 为 nil,创建并补全", + claudeInfo: &ClaudeResponseInfo{ + Usage: &dto.Usage{ + PromptTokens: 80, + PromptTokensDetails: dto.InputTokenDetails{ + CachedTokens: 20, + CachedCreationTokens: 40, + }, + }, + }, + deltaUsage: nil, + wantInput: 80, + wantCacheRead: 20, + wantCacheCreate: 40, + wantOutput: 0, + }, + { + name: "没有 cache 数据,不补全", + claudeInfo: &ClaudeResponseInfo{ + Usage: &dto.Usage{ + PromptTokens: 100, + }, + }, + deltaUsage: &dto.ClaudeUsage{OutputTokens: 50}, + wantInput: 100, + wantCacheRead: 0, + wantCacheCreate: 0, + wantOutput: 50, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + claudeResponse := &dto.ClaudeResponse{ + Type: "message_delta", + Usage: tt.deltaUsage, + } + + // 模拟 HandleStreamResponseData 中 Claude 格式的补全逻辑 + enrichMessageDeltaUsage(claudeResponse, tt.claudeInfo) + + if claudeResponse.Usage == nil { + t.Fatal("Usage should not be nil after enrichment") + } + if claudeResponse.Usage.InputTokens != tt.wantInput { + t.Errorf("InputTokens = %d, want %d", claudeResponse.Usage.InputTokens, tt.wantInput) + } + if claudeResponse.Usage.CacheReadInputTokens != tt.wantCacheRead { + t.Errorf("CacheReadInputTokens = %d, want %d", claudeResponse.Usage.CacheReadInputTokens, tt.wantCacheRead) + } + if claudeResponse.Usage.CacheCreationInputTokens != tt.wantCacheCreate { + t.Errorf("CacheCreationInputTokens = %d, want %d", claudeResponse.Usage.CacheCreationInputTokens, tt.wantCacheCreate) + } + if claudeResponse.Usage.OutputTokens != tt.wantOutput { + t.Errorf("OutputTokens = %d, want %d", claudeResponse.Usage.OutputTokens, tt.wantOutput) + } + if tt.want5m > 0 || tt.want1h > 0 { + if claudeResponse.Usage.CacheCreation == nil { + t.Fatal("CacheCreation should not be nil") + } + if claudeResponse.Usage.CacheCreation.Ephemeral5mInputTokens != tt.want5m { + t.Errorf("Ephemeral5mInputTokens = %d, want %d", claudeResponse.Usage.CacheCreation.Ephemeral5mInputTokens, tt.want5m) + } + if claudeResponse.Usage.CacheCreation.Ephemeral1hInputTokens != tt.want1h { + t.Errorf("Ephemeral1hInputTokens = %d, want %d", claudeResponse.Usage.CacheCreation.Ephemeral1hInputTokens, tt.want1h) + } + } + }) + } +} From 0b3a0b38d6ac9369d0515f1c7b12a95b1f324010 Mon Sep 17 00:00:00 2001 From: Seefs Date: Sat, 7 Feb 2026 19:13:58 +0800 Subject: [PATCH 2/2] fix: patch message_delta usage via gjson/sjson and skip on passthrough --- .../claude/message_delta_usage_patch_test.go | 111 +++++++++++++++ relay/channel/claude/relay-claude.go | 83 ++++++++--- ...ay-claude_test.go => relay_claude_test.go} | 130 ------------------ 3 files changed, 177 insertions(+), 147 deletions(-) create mode 100644 relay/channel/claude/message_delta_usage_patch_test.go rename relay/channel/claude/{relay-claude_test.go => relay_claude_test.go} (56%) diff --git a/relay/channel/claude/message_delta_usage_patch_test.go b/relay/channel/claude/message_delta_usage_patch_test.go new file mode 100644 index 00000000..43312587 --- /dev/null +++ b/relay/channel/claude/message_delta_usage_patch_test.go @@ -0,0 +1,111 @@ +package claude + +import ( + "testing" + + "github.com/QuantumNous/new-api/dto" + relaycommon "github.com/QuantumNous/new-api/relay/common" + "github.com/QuantumNous/new-api/setting/model_setting" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tidwall/gjson" +) + +func TestPatchClaudeMessageDeltaUsageDataPreserveUnknownFields(t *testing.T) { + originalData := `{"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":53},"vendor_meta":{"trace_id":"trace_001"}}` + usage := &dto.ClaudeUsage{ + InputTokens: 100, + CacheReadInputTokens: 30, + CacheCreationInputTokens: 50, + } + + patchedData := patchClaudeMessageDeltaUsageData(originalData, usage) + + require.Equal(t, "message_delta", gjson.Get(patchedData, "type").String()) + require.Equal(t, "end_turn", gjson.Get(patchedData, "delta.stop_reason").String()) + require.Equal(t, "trace_001", gjson.Get(patchedData, "vendor_meta.trace_id").String()) + require.EqualValues(t, 53, gjson.Get(patchedData, "usage.output_tokens").Int()) + require.EqualValues(t, 100, gjson.Get(patchedData, "usage.input_tokens").Int()) + require.EqualValues(t, 30, gjson.Get(patchedData, "usage.cache_read_input_tokens").Int()) + require.EqualValues(t, 50, gjson.Get(patchedData, "usage.cache_creation_input_tokens").Int()) +} + +func TestPatchClaudeMessageDeltaUsageDataZeroValueChecks(t *testing.T) { + originalData := `{"type":"message_delta","usage":{"output_tokens":53,"input_tokens":9,"cache_read_input_tokens":0}}` + usage := &dto.ClaudeUsage{ + InputTokens: 100, + CacheReadInputTokens: 30, + CacheCreationInputTokens: 0, + } + + patchedData := patchClaudeMessageDeltaUsageData(originalData, usage) + + require.EqualValues(t, 9, gjson.Get(patchedData, "usage.input_tokens").Int()) + require.EqualValues(t, 30, gjson.Get(patchedData, "usage.cache_read_input_tokens").Int()) + assert.False(t, gjson.Get(patchedData, "usage.cache_creation_input_tokens").Exists()) +} + +func TestShouldSkipClaudeMessageDeltaUsagePatch(t *testing.T) { + originGlobalPassThrough := model_setting.GetGlobalSettings().PassThroughRequestEnabled + t.Cleanup(func() { + model_setting.GetGlobalSettings().PassThroughRequestEnabled = originGlobalPassThrough + }) + + model_setting.GetGlobalSettings().PassThroughRequestEnabled = true + assert.True(t, shouldSkipClaudeMessageDeltaUsagePatch(&relaycommon.RelayInfo{})) + + model_setting.GetGlobalSettings().PassThroughRequestEnabled = false + assert.True(t, shouldSkipClaudeMessageDeltaUsagePatch(&relaycommon.RelayInfo{ + ChannelMeta: &relaycommon.ChannelMeta{ChannelSetting: dto.ChannelSettings{PassThroughBodyEnabled: true}}, + })) + assert.False(t, shouldSkipClaudeMessageDeltaUsagePatch(&relaycommon.RelayInfo{ + ChannelMeta: &relaycommon.ChannelMeta{ChannelSetting: dto.ChannelSettings{PassThroughBodyEnabled: false}}, + })) +} + +func TestBuildMessageDeltaPatchUsage(t *testing.T) { + t.Run("merge missing fields from claudeInfo", func(t *testing.T) { + claudeResponse := &dto.ClaudeResponse{Usage: &dto.ClaudeUsage{OutputTokens: 53}} + claudeInfo := &ClaudeResponseInfo{ + Usage: &dto.Usage{ + PromptTokens: 100, + PromptTokensDetails: dto.InputTokenDetails{ + CachedTokens: 30, + CachedCreationTokens: 50, + }, + ClaudeCacheCreation5mTokens: 10, + ClaudeCacheCreation1hTokens: 20, + }, + } + + usage := buildMessageDeltaPatchUsage(claudeResponse, claudeInfo) + require.NotNil(t, usage) + require.EqualValues(t, 100, usage.InputTokens) + require.EqualValues(t, 30, usage.CacheReadInputTokens) + require.EqualValues(t, 50, usage.CacheCreationInputTokens) + require.EqualValues(t, 53, usage.OutputTokens) + require.NotNil(t, usage.CacheCreation) + require.EqualValues(t, 10, usage.CacheCreation.Ephemeral5mInputTokens) + require.EqualValues(t, 20, usage.CacheCreation.Ephemeral1hInputTokens) + }) + + t.Run("keep upstream non-zero values", func(t *testing.T) { + claudeResponse := &dto.ClaudeResponse{Usage: &dto.ClaudeUsage{ + InputTokens: 9, + CacheReadInputTokens: 7, + CacheCreationInputTokens: 6, + }} + claudeInfo := &ClaudeResponseInfo{Usage: &dto.Usage{ + PromptTokens: 100, + PromptTokensDetails: dto.InputTokenDetails{ + CachedTokens: 30, + CachedCreationTokens: 50, + }, + }} + + usage := buildMessageDeltaPatchUsage(claudeResponse, claudeInfo) + require.EqualValues(t, 9, usage.InputTokens) + require.EqualValues(t, 7, usage.CacheReadInputTokens) + require.EqualValues(t, 6, usage.CacheCreationInputTokens) + }) +} diff --git a/relay/channel/claude/relay-claude.go b/relay/channel/claude/relay-claude.go index ad9d2234..069c784c 100644 --- a/relay/channel/claude/relay-claude.go +++ b/relay/channel/claude/relay-claude.go @@ -21,6 +21,8 @@ import ( "github.com/QuantumNous/new-api/types" "github.com/gin-gonic/gin" + "github.com/tidwall/gjson" + "github.com/tidwall/sjson" ) const ( @@ -544,28 +546,76 @@ type ClaudeResponseInfo struct { Done bool } -// enrichMessageDeltaUsage 补全 message_delta 事件中缺失的 input_tokens 和 cache 相关字段 -// 当上游(如 AWS Bedrock)的 message_delta 不包含这些字段时,从 claudeInfo 中积累的数据补全 -func enrichMessageDeltaUsage(claudeResponse *dto.ClaudeResponse, claudeInfo *ClaudeResponseInfo) { - if claudeResponse.Usage == nil { - claudeResponse.Usage = &dto.ClaudeUsage{} +func buildMessageDeltaPatchUsage(claudeResponse *dto.ClaudeResponse, claudeInfo *ClaudeResponseInfo) *dto.ClaudeUsage { + usage := &dto.ClaudeUsage{} + if claudeResponse != nil && claudeResponse.Usage != nil { + *usage = *claudeResponse.Usage } - if claudeResponse.Usage.InputTokens == 0 && claudeInfo.Usage.PromptTokens > 0 { - claudeResponse.Usage.InputTokens = claudeInfo.Usage.PromptTokens + + if claudeInfo == nil || claudeInfo.Usage == nil { + return usage } - if claudeResponse.Usage.CacheReadInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedTokens > 0 { - claudeResponse.Usage.CacheReadInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedTokens + + if usage.InputTokens == 0 && claudeInfo.Usage.PromptTokens > 0 { + usage.InputTokens = claudeInfo.Usage.PromptTokens } - if claudeResponse.Usage.CacheCreationInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens > 0 { - claudeResponse.Usage.CacheCreationInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens + if usage.CacheReadInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedTokens > 0 { + usage.CacheReadInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedTokens } - if claudeResponse.Usage.CacheCreation == nil && - (claudeInfo.Usage.ClaudeCacheCreation5mTokens > 0 || claudeInfo.Usage.ClaudeCacheCreation1hTokens > 0) { - claudeResponse.Usage.CacheCreation = &dto.ClaudeCacheCreationUsage{ + if usage.CacheCreationInputTokens == 0 && claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens > 0 { + usage.CacheCreationInputTokens = claudeInfo.Usage.PromptTokensDetails.CachedCreationTokens + } + if usage.CacheCreation == nil && (claudeInfo.Usage.ClaudeCacheCreation5mTokens > 0 || claudeInfo.Usage.ClaudeCacheCreation1hTokens > 0) { + usage.CacheCreation = &dto.ClaudeCacheCreationUsage{ Ephemeral5mInputTokens: claudeInfo.Usage.ClaudeCacheCreation5mTokens, Ephemeral1hInputTokens: claudeInfo.Usage.ClaudeCacheCreation1hTokens, } } + return usage +} + +func shouldSkipClaudeMessageDeltaUsagePatch(info *relaycommon.RelayInfo) bool { + if model_setting.GetGlobalSettings().PassThroughRequestEnabled { + return true + } + if info == nil { + return false + } + return info.ChannelSetting.PassThroughBodyEnabled +} + +func patchClaudeMessageDeltaUsageData(data string, usage *dto.ClaudeUsage) string { + if data == "" || usage == nil { + return data + } + + data = setMessageDeltaUsageInt(data, "usage.input_tokens", usage.InputTokens) + data = setMessageDeltaUsageInt(data, "usage.cache_read_input_tokens", usage.CacheReadInputTokens) + data = setMessageDeltaUsageInt(data, "usage.cache_creation_input_tokens", usage.CacheCreationInputTokens) + + if usage.CacheCreation != nil { + data = setMessageDeltaUsageInt(data, "usage.cache_creation.ephemeral_5m_input_tokens", usage.CacheCreation.Ephemeral5mInputTokens) + data = setMessageDeltaUsageInt(data, "usage.cache_creation.ephemeral_1h_input_tokens", usage.CacheCreation.Ephemeral1hInputTokens) + } + + return data +} + +func setMessageDeltaUsageInt(data string, path string, localValue int) string { + if localValue <= 0 { + return data + } + + upstreamValue := gjson.Get(data, path) + if upstreamValue.Exists() && upstreamValue.Int() > 0 { + return data + } + + patchedData, err := sjson.Set(data, path, localValue) + if err != nil { + return data + } + return patchedData } func FormatClaudeResponseInfo(claudeResponse *dto.ClaudeResponse, oaiResponse *dto.ChatCompletionsStreamResponse, claudeInfo *ClaudeResponseInfo) bool { @@ -665,9 +715,8 @@ func HandleStreamResponseData(c *gin.Context, info *relaycommon.RelayInfo, claud } else if claudeResponse.Type == "message_delta" { // 确保 message_delta 的 usage 包含完整的 input_tokens 和 cache 相关字段 // 解决 AWS Bedrock 等上游返回的 message_delta 缺少这些字段的问题 - enrichMessageDeltaUsage(&claudeResponse, claudeInfo) - if newData, err := json.Marshal(claudeResponse); err == nil { - data = string(newData) + if !shouldSkipClaudeMessageDeltaUsagePatch(info) { + data = patchClaudeMessageDeltaUsageData(data, buildMessageDeltaPatchUsage(&claudeResponse, claudeInfo)) } } helper.ClaudeChunkData(c, claudeResponse, data) diff --git a/relay/channel/claude/relay-claude_test.go b/relay/channel/claude/relay_claude_test.go similarity index 56% rename from relay/channel/claude/relay-claude_test.go rename to relay/channel/claude/relay_claude_test.go index 82c91018..e34c861a 100644 --- a/relay/channel/claude/relay-claude_test.go +++ b/relay/channel/claude/relay_claude_test.go @@ -173,133 +173,3 @@ func TestFormatClaudeResponseInfo_ContentBlockDelta(t *testing.T) { t.Errorf("ResponseText = %q, want %q", claudeInfo.ResponseText.String(), "hello") } } - -// TestEnrichMessageDeltaUsage 测试 message_delta 事件的 usage 补全逻辑 -// 这是修复 issue #2881 的核心逻辑:当上游(如 Bedrock)的 message_delta 缺少 -// input_tokens 和 cache 相关字段时,用 claudeInfo 中积累的数据补全 -func TestEnrichMessageDeltaUsage(t *testing.T) { - tests := []struct { - name string - claudeInfo *ClaudeResponseInfo - deltaUsage *dto.ClaudeUsage - wantInput int - wantCacheRead int - wantCacheCreate int - wantOutput int - want5m int - want1h int - }{ - { - name: "Bedrock: delta 只有 output_tokens,从 claudeInfo 补全其他字段", - claudeInfo: &ClaudeResponseInfo{ - Usage: &dto.Usage{ - PromptTokens: 100, - PromptTokensDetails: dto.InputTokenDetails{ - CachedTokens: 30, - CachedCreationTokens: 50, - }, - ClaudeCacheCreation5mTokens: 10, - ClaudeCacheCreation1hTokens: 20, - }, - }, - deltaUsage: &dto.ClaudeUsage{OutputTokens: 200}, - wantInput: 100, - wantCacheRead: 30, - wantCacheCreate: 50, - wantOutput: 200, - want5m: 10, - want1h: 20, - }, - { - name: "原生 Anthropic: delta 已包含所有字段,不覆盖", - claudeInfo: &ClaudeResponseInfo{ - Usage: &dto.Usage{ - PromptTokens: 100, - PromptTokensDetails: dto.InputTokenDetails{ - CachedTokens: 30, - CachedCreationTokens: 50, - }, - }, - }, - deltaUsage: &dto.ClaudeUsage{ - InputTokens: 100, - OutputTokens: 200, - CacheReadInputTokens: 30, - CacheCreationInputTokens: 50, - }, - wantInput: 100, - wantCacheRead: 30, - wantCacheCreate: 50, - wantOutput: 200, - }, - { - name: "delta usage 为 nil,创建并补全", - claudeInfo: &ClaudeResponseInfo{ - Usage: &dto.Usage{ - PromptTokens: 80, - PromptTokensDetails: dto.InputTokenDetails{ - CachedTokens: 20, - CachedCreationTokens: 40, - }, - }, - }, - deltaUsage: nil, - wantInput: 80, - wantCacheRead: 20, - wantCacheCreate: 40, - wantOutput: 0, - }, - { - name: "没有 cache 数据,不补全", - claudeInfo: &ClaudeResponseInfo{ - Usage: &dto.Usage{ - PromptTokens: 100, - }, - }, - deltaUsage: &dto.ClaudeUsage{OutputTokens: 50}, - wantInput: 100, - wantCacheRead: 0, - wantCacheCreate: 0, - wantOutput: 50, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - claudeResponse := &dto.ClaudeResponse{ - Type: "message_delta", - Usage: tt.deltaUsage, - } - - // 模拟 HandleStreamResponseData 中 Claude 格式的补全逻辑 - enrichMessageDeltaUsage(claudeResponse, tt.claudeInfo) - - if claudeResponse.Usage == nil { - t.Fatal("Usage should not be nil after enrichment") - } - if claudeResponse.Usage.InputTokens != tt.wantInput { - t.Errorf("InputTokens = %d, want %d", claudeResponse.Usage.InputTokens, tt.wantInput) - } - if claudeResponse.Usage.CacheReadInputTokens != tt.wantCacheRead { - t.Errorf("CacheReadInputTokens = %d, want %d", claudeResponse.Usage.CacheReadInputTokens, tt.wantCacheRead) - } - if claudeResponse.Usage.CacheCreationInputTokens != tt.wantCacheCreate { - t.Errorf("CacheCreationInputTokens = %d, want %d", claudeResponse.Usage.CacheCreationInputTokens, tt.wantCacheCreate) - } - if claudeResponse.Usage.OutputTokens != tt.wantOutput { - t.Errorf("OutputTokens = %d, want %d", claudeResponse.Usage.OutputTokens, tt.wantOutput) - } - if tt.want5m > 0 || tt.want1h > 0 { - if claudeResponse.Usage.CacheCreation == nil { - t.Fatal("CacheCreation should not be nil") - } - if claudeResponse.Usage.CacheCreation.Ephemeral5mInputTokens != tt.want5m { - t.Errorf("Ephemeral5mInputTokens = %d, want %d", claudeResponse.Usage.CacheCreation.Ephemeral5mInputTokens, tt.want5m) - } - if claudeResponse.Usage.CacheCreation.Ephemeral1hInputTokens != tt.want1h { - t.Errorf("Ephemeral1hInputTokens = %d, want %d", claudeResponse.Usage.CacheCreation.Ephemeral1hInputTokens, tt.want1h) - } - } - }) - } -}