diff --git a/relay/channel/ollama/adaptor.go b/relay/channel/ollama/adaptor.go index 8fd1e1bf..ff88de8b 100644 --- a/relay/channel/ollama/adaptor.go +++ b/relay/channel/ollama/adaptor.go @@ -23,6 +23,9 @@ func (a *Adaptor) ConvertClaudeRequest(c *gin.Context, info *relaycommon.RelayIn if err != nil { return nil, err } + openaiRequest.(*dto.GeneralOpenAIRequest).StreamOptions = &dto.StreamOptions{ + IncludeUsage: true, + } return requestOpenAI2Ollama(openaiRequest.(*dto.GeneralOpenAIRequest)) } @@ -82,15 +85,6 @@ func (a *Adaptor) DoRequest(c *gin.Context, info *relaycommon.RelayInfo, request } func (a *Adaptor) DoResponse(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (usage any, err *types.NewAPIError) { - if info.IsStream { - usage, err = openai.OaiStreamHandler(c, info, resp) - } else { - if info.RelayMode == relayconstant.RelayModeEmbeddings { - usage, err = ollamaEmbeddingHandler(c, info, resp) - } else { - usage, err = openai.OpenaiHandler(c, info, resp) - } - } switch info.RelayMode { case relayconstant.RelayModeEmbeddings: usage, err = ollamaEmbeddingHandler(c, info, resp) diff --git a/relay/channel/openai/helper.go b/relay/channel/openai/helper.go index a068c544..7fee505a 100644 --- a/relay/channel/openai/helper.go +++ b/relay/channel/openai/helper.go @@ -27,7 +27,7 @@ func handleStreamFormat(c *gin.Context, info *relaycommon.RelayInfo, data string func handleClaudeFormat(c *gin.Context, data string, info *relaycommon.RelayInfo) error { var streamResponse dto.ChatCompletionsStreamResponse - if err := json.Unmarshal(common.StringToByteSlice(data), &streamResponse); err != nil { + if err := common.Unmarshal(common.StringToByteSlice(data), &streamResponse); err != nil { return err } @@ -174,7 +174,7 @@ func handleFinalResponse(c *gin.Context, info *relaycommon.RelayInfo, lastStream case relaycommon.RelayFormatClaude: info.ClaudeConvertInfo.Done = true var streamResponse dto.ChatCompletionsStreamResponse - if err := json.Unmarshal(common.StringToByteSlice(lastStreamData), &streamResponse); err != nil { + if err := common.Unmarshal(common.StringToByteSlice(lastStreamData), &streamResponse); err != nil { common.SysError("error unmarshalling stream response: " + err.Error()) return } @@ -183,7 +183,7 @@ func handleFinalResponse(c *gin.Context, info *relaycommon.RelayInfo, lastStream claudeResponses := service.StreamResponseOpenAI2Claude(&streamResponse, info) for _, resp := range claudeResponses { - helper.ClaudeData(c, *resp) + _ = helper.ClaudeData(c, *resp) } } } diff --git a/relay/channel/openai/relay-openai.go b/relay/channel/openai/relay-openai.go index bfe8bcd3..d739ea19 100644 --- a/relay/channel/openai/relay-openai.go +++ b/relay/channel/openai/relay-openai.go @@ -145,8 +145,10 @@ func OaiStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Re common.SysError("error handling stream format: " + err.Error()) } } - lastStreamData = data - streamItems = append(streamItems, data) + if len(data) > 0 { + lastStreamData = data + streamItems = append(streamItems, data) + } return true }) @@ -154,16 +156,18 @@ func OaiStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Re shouldSendLastResp := true if err := handleLastResponse(lastStreamData, &responseId, &createAt, &systemFingerprint, &model, &usage, &containStreamUsage, info, &shouldSendLastResp); err != nil { - common.SysError("error handling last response: " + err.Error()) + common.LogError(c, fmt.Sprintf("error handling last response: %s, lastStreamData: [%s]", err.Error(), lastStreamData)) } - if shouldSendLastResp && info.RelayFormat == relaycommon.RelayFormatOpenAI { - _ = sendStreamData(c, info, lastStreamData, forceFormat, thinkToContent) + if info.RelayFormat == relaycommon.RelayFormatOpenAI { + if shouldSendLastResp { + _ = sendStreamData(c, info, lastStreamData, forceFormat, thinkToContent) + } } // 处理token计算 if err := processTokens(info.RelayMode, streamItems, &responseTextBuilder, &toolCount); err != nil { - common.SysError("error processing tokens: " + err.Error()) + common.LogError(c, "error processing tokens: "+err.Error()) } if !containStreamUsage { @@ -176,7 +180,6 @@ func OaiStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Re } } } - handleFinalResponse(c, info, lastStreamData, responseId, createAt, model, systemFingerprint, usage, containStreamUsage) return usage, nil diff --git a/relay/helper/stream_scanner.go b/relay/helper/stream_scanner.go index b526b1c0..c72aea6a 100644 --- a/relay/helper/stream_scanner.go +++ b/relay/helper/stream_scanner.go @@ -234,6 +234,12 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon case <-stopChan: return } + } else { + // done, 处理完成标志,直接退出停止读取剩余数据防止出错 + if common.DebugEnabled { + println("received [DONE], stopping scanner") + } + return } } diff --git a/service/convert.go b/service/convert.go index 593b59d9..7d697840 100644 --- a/service/convert.go +++ b/service/convert.go @@ -251,22 +251,54 @@ func StreamResponseOpenAI2Claude(openAIResponse *dto.ChatCompletionsStreamRespon resp.SetIndex(0) claudeResponses = append(claudeResponses, resp) } else { - //resp := &dto.ClaudeResponse{ - // Type: "content_block_start", - // ContentBlock: &dto.ClaudeMediaMessage{ - // Type: "text", - // Text: common.GetPointer[string](""), - // }, - //} - //resp.SetIndex(0) - //claudeResponses = append(claudeResponses, resp) + + } + // 判断首个响应是否存在内容(非标准的 OpenAI 响应) + if len(openAIResponse.Choices) > 0 && len(openAIResponse.Choices[0].Delta.GetContentString()) > 0 { + claudeResponses = append(claudeResponses, &dto.ClaudeResponse{ + Index: &info.ClaudeConvertInfo.Index, + Type: "content_block_start", + ContentBlock: &dto.ClaudeMediaMessage{ + Type: "text", + Text: common.GetPointer[string](""), + }, + }) + claudeResponses = append(claudeResponses, &dto.ClaudeResponse{ + Type: "content_block_delta", + Delta: &dto.ClaudeMediaMessage{ + Type: "text", + Text: common.GetPointer[string](openAIResponse.Choices[0].Delta.GetContentString()), + }, + }) + info.ClaudeConvertInfo.LastMessagesType = relaycommon.LastMessageTypeText } return claudeResponses } if len(openAIResponse.Choices) == 0 { // no choices - // TODO: handle this case + // 可能为非标准的 OpenAI 响应,判断是否已经完成 + if info.Done { + claudeResponses = append(claudeResponses, generateStopBlock(info.ClaudeConvertInfo.Index)) + oaiUsage := info.ClaudeConvertInfo.Usage + if oaiUsage != nil { + claudeResponses = append(claudeResponses, &dto.ClaudeResponse{ + Type: "message_delta", + Usage: &dto.ClaudeUsage{ + InputTokens: oaiUsage.PromptTokens, + OutputTokens: oaiUsage.CompletionTokens, + CacheCreationInputTokens: oaiUsage.PromptTokensDetails.CachedCreationTokens, + CacheReadInputTokens: oaiUsage.PromptTokensDetails.CachedTokens, + }, + Delta: &dto.ClaudeMediaMessage{ + StopReason: common.GetPointer[string](stopReasonOpenAI2Claude(info.FinishReason)), + }, + }) + } + claudeResponses = append(claudeResponses, &dto.ClaudeResponse{ + Type: "message_stop", + }) + } return claudeResponses } else { chosenChoice := openAIResponse.Choices[0]