Files
new-api-hunter/relay/channel/openai/chat_via_responses.go
Seefs 3d0c75f42c Merge branch 'feature/messages2responses' into upstream-main
# Conflicts:
#	service/openaicompat/chat_to_responses.go
2026-02-08 00:16:35 +08:00

540 lines
15 KiB
Go

package openai
import (
"fmt"
"io"
"net/http"
"strings"
"time"
"github.com/QuantumNous/new-api/common"
"github.com/QuantumNous/new-api/dto"
"github.com/QuantumNous/new-api/logger"
relaycommon "github.com/QuantumNous/new-api/relay/common"
"github.com/QuantumNous/new-api/relay/helper"
"github.com/QuantumNous/new-api/service"
"github.com/QuantumNous/new-api/types"
"github.com/gin-gonic/gin"
)
func responsesStreamIndexKey(itemID string, idx *int) string {
if itemID == "" {
return ""
}
if idx == nil {
return itemID
}
return fmt.Sprintf("%s:%d", itemID, *idx)
}
func stringDeltaFromPrefix(prev string, next string) string {
if next == "" {
return ""
}
if prev != "" && strings.HasPrefix(next, prev) {
return next[len(prev):]
}
return next
}
func OaiResponsesToChatHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) {
if resp == nil || resp.Body == nil {
return nil, types.NewOpenAIError(fmt.Errorf("invalid response"), types.ErrorCodeBadResponse, http.StatusInternalServerError)
}
defer service.CloseResponseBodyGracefully(resp)
var responsesResp dto.OpenAIResponsesResponse
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, types.NewOpenAIError(err, types.ErrorCodeReadResponseBodyFailed, http.StatusInternalServerError)
}
if err := common.Unmarshal(body, &responsesResp); err != nil {
return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError)
}
if oaiError := responsesResp.GetOpenAIError(); oaiError != nil && oaiError.Type != "" {
return nil, types.WithOpenAIError(*oaiError, resp.StatusCode)
}
chatId := helper.GetResponseID(c)
chatResp, usage, err := service.ResponsesResponseToChatCompletionsResponse(&responsesResp, chatId)
if err != nil {
return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponseBody, http.StatusInternalServerError)
}
if usage == nil || usage.TotalTokens == 0 {
text := service.ExtractOutputTextFromResponses(&responsesResp)
usage = service.ResponseText2Usage(c, text, info.UpstreamModelName, info.GetEstimatePromptTokens())
chatResp.Usage = *usage
}
var responseBody []byte
switch info.RelayFormat {
case types.RelayFormatClaude:
claudeResp := service.ResponseOpenAI2Claude(chatResp, info)
responseBody, err = common.Marshal(claudeResp)
case types.RelayFormatGemini:
geminiResp := service.ResponseOpenAI2Gemini(chatResp, info)
responseBody, err = common.Marshal(geminiResp)
default:
responseBody, err = common.Marshal(chatResp)
}
if err != nil {
return nil, types.NewOpenAIError(err, types.ErrorCodeJsonMarshalFailed, http.StatusInternalServerError)
}
service.IOCopyBytesGracefully(c, resp, responseBody)
return usage, nil
}
func OaiResponsesToChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) {
if resp == nil || resp.Body == nil {
return nil, types.NewOpenAIError(fmt.Errorf("invalid response"), types.ErrorCodeBadResponse, http.StatusInternalServerError)
}
defer service.CloseResponseBodyGracefully(resp)
responseId := helper.GetResponseID(c)
createAt := time.Now().Unix()
model := info.UpstreamModelName
var (
usage = &dto.Usage{}
outputText strings.Builder
usageText strings.Builder
sentStart bool
sentStop bool
sawToolCall bool
streamErr *types.NewAPIError
)
toolCallIndexByID := make(map[string]int)
toolCallNameByID := make(map[string]string)
toolCallArgsByID := make(map[string]string)
toolCallNameSent := make(map[string]bool)
toolCallCanonicalIDByItemID := make(map[string]string)
hasSentReasoningSummary := false
needsReasoningSummarySeparator := false
//reasoningSummaryTextByKey := make(map[string]string)
if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo == nil {
info.ClaudeConvertInfo = &relaycommon.ClaudeConvertInfo{LastMessagesType: relaycommon.LastMessageTypeNone}
}
sendChatChunk := func(chunk *dto.ChatCompletionsStreamResponse) bool {
if chunk == nil {
return true
}
if info.RelayFormat == types.RelayFormatOpenAI {
if err := helper.ObjectData(c, chunk); err != nil {
streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError)
return false
}
return true
}
chunkData, err := common.Marshal(chunk)
if err != nil {
streamErr = types.NewOpenAIError(err, types.ErrorCodeJsonMarshalFailed, http.StatusInternalServerError)
return false
}
if err := HandleStreamFormat(c, info, string(chunkData), false, false); err != nil {
streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError)
return false
}
return true
}
sendStartIfNeeded := func() bool {
if sentStart {
return true
}
if !sendChatChunk(helper.GenerateStartEmptyResponse(responseId, createAt, model, nil)) {
return false
}
sentStart = true
return true
}
//sendReasoningDelta := func(delta string) bool {
// if delta == "" {
// return true
// }
// if !sendStartIfNeeded() {
// return false
// }
//
// usageText.WriteString(delta)
// chunk := &dto.ChatCompletionsStreamResponse{
// Id: responseId,
// Object: "chat.completion.chunk",
// Created: createAt,
// Model: model,
// Choices: []dto.ChatCompletionsStreamResponseChoice{
// {
// Index: 0,
// Delta: dto.ChatCompletionsStreamResponseChoiceDelta{
// ReasoningContent: &delta,
// },
// },
// },
// }
// if err := helper.ObjectData(c, chunk); err != nil {
// streamErr = types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError)
// return false
// }
// return true
//}
sendReasoningSummaryDelta := func(delta string) bool {
if delta == "" {
return true
}
if needsReasoningSummarySeparator {
if strings.HasPrefix(delta, "\n\n") {
needsReasoningSummarySeparator = false
} else if strings.HasPrefix(delta, "\n") {
delta = "\n" + delta
needsReasoningSummarySeparator = false
} else {
delta = "\n\n" + delta
needsReasoningSummarySeparator = false
}
}
if !sendStartIfNeeded() {
return false
}
usageText.WriteString(delta)
chunk := &dto.ChatCompletionsStreamResponse{
Id: responseId,
Object: "chat.completion.chunk",
Created: createAt,
Model: model,
Choices: []dto.ChatCompletionsStreamResponseChoice{
{
Index: 0,
Delta: dto.ChatCompletionsStreamResponseChoiceDelta{
ReasoningContent: &delta,
},
},
},
}
if !sendChatChunk(chunk) {
return false
}
hasSentReasoningSummary = true
return true
}
sendToolCallDelta := func(callID string, name string, argsDelta string) bool {
if callID == "" {
return true
}
if outputText.Len() > 0 {
// Prefer streaming assistant text over tool calls to match non-stream behavior.
return true
}
if !sendStartIfNeeded() {
return false
}
idx, ok := toolCallIndexByID[callID]
if !ok {
idx = len(toolCallIndexByID)
toolCallIndexByID[callID] = idx
}
if name != "" {
toolCallNameByID[callID] = name
}
if toolCallNameByID[callID] != "" {
name = toolCallNameByID[callID]
}
tool := dto.ToolCallResponse{
ID: callID,
Type: "function",
Function: dto.FunctionResponse{
Arguments: argsDelta,
},
}
tool.SetIndex(idx)
if name != "" && !toolCallNameSent[callID] {
tool.Function.Name = name
toolCallNameSent[callID] = true
}
chunk := &dto.ChatCompletionsStreamResponse{
Id: responseId,
Object: "chat.completion.chunk",
Created: createAt,
Model: model,
Choices: []dto.ChatCompletionsStreamResponseChoice{
{
Index: 0,
Delta: dto.ChatCompletionsStreamResponseChoiceDelta{
ToolCalls: []dto.ToolCallResponse{tool},
},
},
},
}
if !sendChatChunk(chunk) {
return false
}
sawToolCall = true
// Include tool call data in the local builder for fallback token estimation.
if tool.Function.Name != "" {
usageText.WriteString(tool.Function.Name)
}
if argsDelta != "" {
usageText.WriteString(argsDelta)
}
return true
}
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
if streamErr != nil {
return false
}
var streamResp dto.ResponsesStreamResponse
if err := common.UnmarshalJsonStr(data, &streamResp); err != nil {
logger.LogError(c, "failed to unmarshal responses stream event: "+err.Error())
return true
}
switch streamResp.Type {
case "response.created":
if streamResp.Response != nil {
if streamResp.Response.Model != "" {
model = streamResp.Response.Model
}
if streamResp.Response.CreatedAt != 0 {
createAt = int64(streamResp.Response.CreatedAt)
}
}
//case "response.reasoning_text.delta":
//if !sendReasoningDelta(streamResp.Delta) {
// return false
//}
//case "response.reasoning_text.done":
case "response.reasoning_summary_text.delta":
if !sendReasoningSummaryDelta(streamResp.Delta) {
return false
}
case "response.reasoning_summary_text.done":
if hasSentReasoningSummary {
needsReasoningSummarySeparator = true
}
//case "response.reasoning_summary_part.added", "response.reasoning_summary_part.done":
// key := responsesStreamIndexKey(strings.TrimSpace(streamResp.ItemID), streamResp.SummaryIndex)
// if key == "" || streamResp.Part == nil {
// break
// }
// // Only handle summary text parts, ignore other part types.
// if streamResp.Part.Type != "" && streamResp.Part.Type != "summary_text" {
// break
// }
// prev := reasoningSummaryTextByKey[key]
// next := streamResp.Part.Text
// delta := stringDeltaFromPrefix(prev, next)
// reasoningSummaryTextByKey[key] = next
// if !sendReasoningSummaryDelta(delta) {
// return false
// }
case "response.output_text.delta":
if !sendStartIfNeeded() {
return false
}
if streamResp.Delta != "" {
outputText.WriteString(streamResp.Delta)
usageText.WriteString(streamResp.Delta)
delta := streamResp.Delta
chunk := &dto.ChatCompletionsStreamResponse{
Id: responseId,
Object: "chat.completion.chunk",
Created: createAt,
Model: model,
Choices: []dto.ChatCompletionsStreamResponseChoice{
{
Index: 0,
Delta: dto.ChatCompletionsStreamResponseChoiceDelta{
Content: &delta,
},
},
},
}
if !sendChatChunk(chunk) {
return false
}
}
case "response.output_item.added", "response.output_item.done":
if streamResp.Item == nil {
break
}
if streamResp.Item.Type != "function_call" {
break
}
itemID := strings.TrimSpace(streamResp.Item.ID)
callID := strings.TrimSpace(streamResp.Item.CallId)
if callID == "" {
callID = itemID
}
if itemID != "" && callID != "" {
toolCallCanonicalIDByItemID[itemID] = callID
}
name := strings.TrimSpace(streamResp.Item.Name)
if name != "" {
toolCallNameByID[callID] = name
}
newArgs := streamResp.Item.Arguments
prevArgs := toolCallArgsByID[callID]
argsDelta := ""
if newArgs != "" {
if strings.HasPrefix(newArgs, prevArgs) {
argsDelta = newArgs[len(prevArgs):]
} else {
argsDelta = newArgs
}
toolCallArgsByID[callID] = newArgs
}
if !sendToolCallDelta(callID, name, argsDelta) {
return false
}
case "response.function_call_arguments.delta":
itemID := strings.TrimSpace(streamResp.ItemID)
callID := toolCallCanonicalIDByItemID[itemID]
if callID == "" {
callID = itemID
}
if callID == "" {
break
}
toolCallArgsByID[callID] += streamResp.Delta
if !sendToolCallDelta(callID, "", streamResp.Delta) {
return false
}
case "response.function_call_arguments.done":
case "response.completed":
if streamResp.Response != nil {
if streamResp.Response.Model != "" {
model = streamResp.Response.Model
}
if streamResp.Response.CreatedAt != 0 {
createAt = int64(streamResp.Response.CreatedAt)
}
if streamResp.Response.Usage != nil {
if streamResp.Response.Usage.InputTokens != 0 {
usage.PromptTokens = streamResp.Response.Usage.InputTokens
usage.InputTokens = streamResp.Response.Usage.InputTokens
}
if streamResp.Response.Usage.OutputTokens != 0 {
usage.CompletionTokens = streamResp.Response.Usage.OutputTokens
usage.OutputTokens = streamResp.Response.Usage.OutputTokens
}
if streamResp.Response.Usage.TotalTokens != 0 {
usage.TotalTokens = streamResp.Response.Usage.TotalTokens
} else {
usage.TotalTokens = usage.PromptTokens + usage.CompletionTokens
}
if streamResp.Response.Usage.InputTokensDetails != nil {
usage.PromptTokensDetails.CachedTokens = streamResp.Response.Usage.InputTokensDetails.CachedTokens
usage.PromptTokensDetails.ImageTokens = streamResp.Response.Usage.InputTokensDetails.ImageTokens
usage.PromptTokensDetails.AudioTokens = streamResp.Response.Usage.InputTokensDetails.AudioTokens
}
if streamResp.Response.Usage.CompletionTokenDetails.ReasoningTokens != 0 {
usage.CompletionTokenDetails.ReasoningTokens = streamResp.Response.Usage.CompletionTokenDetails.ReasoningTokens
}
}
}
if !sendStartIfNeeded() {
return false
}
if !sentStop {
if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil {
info.ClaudeConvertInfo.Usage = usage
}
finishReason := "stop"
if sawToolCall && outputText.Len() == 0 {
finishReason = "tool_calls"
}
stop := helper.GenerateStopResponse(responseId, createAt, model, finishReason)
if !sendChatChunk(stop) {
return false
}
sentStop = true
}
case "response.error", "response.failed":
if streamResp.Response != nil {
if oaiErr := streamResp.Response.GetOpenAIError(); oaiErr != nil && oaiErr.Type != "" {
streamErr = types.WithOpenAIError(*oaiErr, http.StatusInternalServerError)
return false
}
}
streamErr = types.NewOpenAIError(fmt.Errorf("responses stream error: %s", streamResp.Type), types.ErrorCodeBadResponse, http.StatusInternalServerError)
return false
default:
}
return true
})
if streamErr != nil {
return nil, streamErr
}
if usage.TotalTokens == 0 {
usage = service.ResponseText2Usage(c, usageText.String(), info.UpstreamModelName, info.GetEstimatePromptTokens())
}
if !sentStart {
if !sendChatChunk(helper.GenerateStartEmptyResponse(responseId, createAt, model, nil)) {
return nil, streamErr
}
}
if !sentStop {
if info.RelayFormat == types.RelayFormatClaude && info.ClaudeConvertInfo != nil {
info.ClaudeConvertInfo.Usage = usage
}
finishReason := "stop"
if sawToolCall && outputText.Len() == 0 {
finishReason = "tool_calls"
}
stop := helper.GenerateStopResponse(responseId, createAt, model, finishReason)
if !sendChatChunk(stop) {
return nil, streamErr
}
}
if info.RelayFormat == types.RelayFormatOpenAI && info.ShouldIncludeUsage && usage != nil {
if err := helper.ObjectData(c, helper.GenerateFinalUsageResponse(responseId, createAt, model, *usage)); err != nil {
return nil, types.NewOpenAIError(err, types.ErrorCodeBadResponse, http.StatusInternalServerError)
}
}
if info.RelayFormat == types.RelayFormatOpenAI {
helper.Done(c)
}
return usage, nil
}