Merge remote-tracking branch 'origin/main'

This commit is contained in:
CaIon
2025-05-07 16:16:19 +08:00
17 changed files with 575 additions and 208 deletions

View File

@@ -429,7 +429,7 @@ func (a *Adaptor) DoResponse(c *gin.Context, resp *http.Response, info *relaycom
if info.IsStream {
err, usage = OaiResponsesStreamHandler(c, resp, info)
} else {
err, usage = OpenaiResponsesHandler(c, resp, info)
err, usage = OaiResponsesHandler(c, resp, info)
}
default:
if info.IsStream {

View File

@@ -187,3 +187,10 @@ func handleFinalResponse(c *gin.Context, info *relaycommon.RelayInfo, lastStream
}
}
}
func sendResponsesStreamData(c *gin.Context, streamResponse dto.ResponsesStreamResponse, data string) {
if data == "" {
return
}
helper.ResponseChunkData(c, streamResponse, data)
}

View File

@@ -644,102 +644,3 @@ func OpenaiHandlerWithUsage(c *gin.Context, resp *http.Response, info *relaycomm
}
return nil, &usageResp.Usage
}
func OpenaiResponsesHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
// read response body
var responsesResponse dto.OpenAIResponsesResponse
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return service.OpenAIErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
}
err = resp.Body.Close()
if err != nil {
return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
}
err = common.DecodeJson(responseBody, &responsesResponse)
if err != nil {
return service.OpenAIErrorWrapper(err, "unmarshal_response_body_failed", http.StatusInternalServerError), nil
}
if responsesResponse.Error != nil {
return &dto.OpenAIErrorWithStatusCode{
Error: dto.OpenAIError{
Message: responsesResponse.Error.Message,
Type: "openai_error",
Code: responsesResponse.Error.Code,
},
StatusCode: resp.StatusCode,
}, nil
}
// reset response body
resp.Body = io.NopCloser(bytes.NewBuffer(responseBody))
// We shouldn't set the header before we parse the response body, because the parse part may fail.
// And then we will have to send an error response, but in this case, the header has already been set.
// So the httpClient will be confused by the response.
// For example, Postman will report error, and we cannot check the response at all.
for k, v := range resp.Header {
c.Writer.Header().Set(k, v[0])
}
c.Writer.WriteHeader(resp.StatusCode)
// copy response body
_, err = io.Copy(c.Writer, resp.Body)
if err != nil {
common.SysError("error copying response body: " + err.Error())
}
resp.Body.Close()
// compute usage
usage := dto.Usage{}
usage.PromptTokens = responsesResponse.Usage.InputTokens
usage.CompletionTokens = responsesResponse.Usage.OutputTokens
usage.TotalTokens = responsesResponse.Usage.TotalTokens
return nil, &usage
}
func OaiResponsesStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
if resp == nil || resp.Body == nil {
common.LogError(c, "invalid response or response body")
return service.OpenAIErrorWrapper(fmt.Errorf("invalid response"), "invalid_response", http.StatusInternalServerError), nil
}
var usage = &dto.Usage{}
var responseTextBuilder strings.Builder
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
// 检查当前数据是否包含 completed 状态和 usage 信息
var streamResponse dto.ResponsesStreamResponse
if err := common.DecodeJsonStr(data, &streamResponse); err == nil {
sendResponsesStreamData(c, streamResponse, data)
switch streamResponse.Type {
case "response.completed":
usage.PromptTokens = streamResponse.Response.Usage.InputTokens
usage.CompletionTokens = streamResponse.Response.Usage.OutputTokens
usage.TotalTokens = streamResponse.Response.Usage.TotalTokens
case "response.output_text.delta":
// 处理输出文本
responseTextBuilder.WriteString(streamResponse.Delta)
}
}
return true
})
if usage.CompletionTokens == 0 {
// 计算输出文本的 token 数量
tempStr := responseTextBuilder.String()
if len(tempStr) > 0 {
// 非正常结束,使用输出文本的 token 数量
completionTokens, _ := service.CountTextToken(tempStr, info.UpstreamModelName)
usage.CompletionTokens = completionTokens
}
}
return nil, usage
}
func sendResponsesStreamData(c *gin.Context, streamResponse dto.ResponsesStreamResponse, data string) {
if data == "" {
return
}
helper.ResponseChunkData(c, streamResponse, data)
}

View File

@@ -0,0 +1,119 @@
package openai
import (
"bytes"
"fmt"
"io"
"net/http"
"one-api/common"
"one-api/dto"
relaycommon "one-api/relay/common"
"one-api/relay/helper"
"one-api/service"
"strings"
"github.com/gin-gonic/gin"
)
func OaiResponsesHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
// read response body
var responsesResponse dto.OpenAIResponsesResponse
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return service.OpenAIErrorWrapper(err, "read_response_body_failed", http.StatusInternalServerError), nil
}
err = resp.Body.Close()
if err != nil {
return service.OpenAIErrorWrapper(err, "close_response_body_failed", http.StatusInternalServerError), nil
}
err = common.DecodeJson(responseBody, &responsesResponse)
if err != nil {
return service.OpenAIErrorWrapper(err, "unmarshal_response_body_failed", http.StatusInternalServerError), nil
}
if responsesResponse.Error != nil {
return &dto.OpenAIErrorWithStatusCode{
Error: dto.OpenAIError{
Message: responsesResponse.Error.Message,
Type: "openai_error",
Code: responsesResponse.Error.Code,
},
StatusCode: resp.StatusCode,
}, nil
}
// reset response body
resp.Body = io.NopCloser(bytes.NewBuffer(responseBody))
// We shouldn't set the header before we parse the response body, because the parse part may fail.
// And then we will have to send an error response, but in this case, the header has already been set.
// So the httpClient will be confused by the response.
// For example, Postman will report error, and we cannot check the response at all.
for k, v := range resp.Header {
c.Writer.Header().Set(k, v[0])
}
c.Writer.WriteHeader(resp.StatusCode)
// copy response body
_, err = io.Copy(c.Writer, resp.Body)
if err != nil {
common.SysError("error copying response body: " + err.Error())
}
resp.Body.Close()
// compute usage
usage := dto.Usage{}
usage.PromptTokens = responsesResponse.Usage.InputTokens
usage.CompletionTokens = responsesResponse.Usage.OutputTokens
usage.TotalTokens = responsesResponse.Usage.TotalTokens
// 解析 Tools 用量
for _, tool := range responsesResponse.Tools {
info.ResponsesUsageInfo.BuiltInTools[tool.Type].CallCount++
}
return nil, &usage
}
func OaiResponsesStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo) (*dto.OpenAIErrorWithStatusCode, *dto.Usage) {
if resp == nil || resp.Body == nil {
common.LogError(c, "invalid response or response body")
return service.OpenAIErrorWrapper(fmt.Errorf("invalid response"), "invalid_response", http.StatusInternalServerError), nil
}
var usage = &dto.Usage{}
var responseTextBuilder strings.Builder
helper.StreamScannerHandler(c, resp, info, func(data string) bool {
// 检查当前数据是否包含 completed 状态和 usage 信息
var streamResponse dto.ResponsesStreamResponse
if err := common.DecodeJsonStr(data, &streamResponse); err == nil {
sendResponsesStreamData(c, streamResponse, data)
switch streamResponse.Type {
case "response.completed":
usage.PromptTokens = streamResponse.Response.Usage.InputTokens
usage.CompletionTokens = streamResponse.Response.Usage.OutputTokens
usage.TotalTokens = streamResponse.Response.Usage.TotalTokens
case "response.output_text.delta":
// 处理输出文本
responseTextBuilder.WriteString(streamResponse.Delta)
case dto.ResponsesOutputTypeItemDone:
// 函数调用处理
if streamResponse.Item != nil {
switch streamResponse.Item.Type {
case dto.BuildInCallWebSearchCall:
info.ResponsesUsageInfo.BuiltInTools[dto.BuildInToolWebSearchPreview].CallCount++
}
}
}
}
return true
})
if usage.CompletionTokens == 0 {
// 计算输出文本的 token 数量
tempStr := responseTextBuilder.String()
if len(tempStr) > 0 {
// 非正常结束,使用输出文本的 token 数量
completionTokens, _ := service.CountTextToken(tempStr, info.UpstreamModelName)
usage.CompletionTokens = completionTokens
}
}
return nil, usage
}

View File

@@ -11,8 +11,8 @@ import (
"one-api/relay/channel/claude"
"one-api/relay/channel/gemini"
"one-api/relay/channel/openai"
"one-api/setting/model_setting"
relaycommon "one-api/relay/common"
"one-api/setting/model_setting"
"strings"
"github.com/gin-gonic/gin"

View File

@@ -36,6 +36,7 @@ type ClaudeConvertInfo struct {
const (
RelayFormatOpenAI = "openai"
RelayFormatClaude = "claude"
RelayFormatGemini = "gemini"
)
type RerankerInfo struct {
@@ -43,6 +44,16 @@ type RerankerInfo struct {
ReturnDocuments bool
}
type BuildInToolInfo struct {
ToolName string
CallCount int
SearchContextSize string
}
type ResponsesUsageInfo struct {
BuiltInTools map[string]*BuildInToolInfo
}
type RelayInfo struct {
ChannelType int
ChannelId int
@@ -90,6 +101,7 @@ type RelayInfo struct {
ThinkingContentInfo
*ClaudeConvertInfo
*RerankerInfo
*ResponsesUsageInfo
}
// 定义支持流式选项的通道类型
@@ -135,6 +147,31 @@ func GenRelayInfoRerank(c *gin.Context, req *dto.RerankRequest) *RelayInfo {
return info
}
func GenRelayInfoResponses(c *gin.Context, req *dto.OpenAIResponsesRequest) *RelayInfo {
info := GenRelayInfo(c)
info.RelayMode = relayconstant.RelayModeResponses
info.ResponsesUsageInfo = &ResponsesUsageInfo{
BuiltInTools: make(map[string]*BuildInToolInfo),
}
if len(req.Tools) > 0 {
for _, tool := range req.Tools {
info.ResponsesUsageInfo.BuiltInTools[tool.Type] = &BuildInToolInfo{
ToolName: tool.Type,
CallCount: 0,
}
switch tool.Type {
case dto.BuildInToolWebSearchPreview:
if tool.SearchContextSize == "" {
tool.SearchContextSize = "medium"
}
info.ResponsesUsageInfo.BuiltInTools[tool.Type].SearchContextSize = tool.SearchContextSize
}
}
}
info.IsStream = req.Stream
return info
}
func GenRelayInfo(c *gin.Context) *RelayInfo {
channelType := c.GetInt("channel_type")
channelId := c.GetInt("channel_id")

View File

@@ -2,9 +2,11 @@ package helper
import (
"encoding/json"
"errors"
"fmt"
"github.com/gin-gonic/gin"
"one-api/relay/common"
"github.com/gin-gonic/gin"
)
func ModelMappedHelper(c *gin.Context, info *common.RelayInfo) error {
@@ -16,9 +18,36 @@ func ModelMappedHelper(c *gin.Context, info *common.RelayInfo) error {
if err != nil {
return fmt.Errorf("unmarshal_model_mapping_failed")
}
if modelMap[info.OriginModelName] != "" {
info.UpstreamModelName = modelMap[info.OriginModelName]
info.IsModelMapped = true
// 支持链式模型重定向,最终使用链尾的模型
currentModel := info.OriginModelName
visitedModels := map[string]bool{
currentModel: true,
}
for {
if mappedModel, exists := modelMap[currentModel]; exists && mappedModel != "" {
// 模型重定向循环检测,避免无限循环
if visitedModels[mappedModel] {
if mappedModel == currentModel {
if currentModel == info.OriginModelName {
info.IsModelMapped = false
return nil
} else {
info.IsModelMapped = true
break
}
}
return errors.New("model_mapping_contains_cycle")
}
visitedModels[mappedModel] = true
currentModel = mappedModel
info.IsModelMapped = true
} else {
break
}
}
if info.IsModelMapped {
info.UpstreamModelName = currentModel
}
}
return nil

View File

@@ -19,7 +19,7 @@ import (
"github.com/gin-gonic/gin"
)
func getAndValidateResponsesRequest(c *gin.Context, relayInfo *relaycommon.RelayInfo) (*dto.OpenAIResponsesRequest, error) {
func getAndValidateResponsesRequest(c *gin.Context) (*dto.OpenAIResponsesRequest, error) {
request := &dto.OpenAIResponsesRequest{}
err := common.UnmarshalBodyReusable(c, request)
if err != nil {
@@ -31,13 +31,11 @@ func getAndValidateResponsesRequest(c *gin.Context, relayInfo *relaycommon.Relay
if len(request.Input) == 0 {
return nil, errors.New("input is required")
}
relayInfo.IsStream = request.Stream
return request, nil
}
func checkInputSensitive(textRequest *dto.OpenAIResponsesRequest, info *relaycommon.RelayInfo) ([]string, error) {
sensitiveWords, err := service.CheckSensitiveInput(textRequest.Input)
return sensitiveWords, err
}
@@ -49,12 +47,14 @@ func getInputTokens(req *dto.OpenAIResponsesRequest, info *relaycommon.RelayInfo
}
func ResponsesHelper(c *gin.Context) (openaiErr *dto.OpenAIErrorWithStatusCode) {
relayInfo := relaycommon.GenRelayInfo(c)
req, err := getAndValidateResponsesRequest(c, relayInfo)
req, err := getAndValidateResponsesRequest(c)
if err != nil {
common.LogError(c, fmt.Sprintf("getAndValidateResponsesRequest error: %s", err.Error()))
return service.OpenAIErrorWrapperLocal(err, "invalid_responses_request", http.StatusBadRequest)
}
relayInfo := relaycommon.GenRelayInfoResponses(c, req)
if setting.ShouldCheckPromptSensitive() {
sensitiveWords, err := checkInputSensitive(req, relayInfo)
if err != nil {

View File

@@ -18,6 +18,7 @@ import (
"one-api/service"
"one-api/setting"
"one-api/setting/model_setting"
"one-api/setting/operation_setting"
"strings"
"time"
@@ -358,6 +359,34 @@ func postConsumeQuota(ctx *gin.Context, relayInfo *relaycommon.RelayInfo,
ratio := dModelRatio.Mul(dGroupRatio)
// openai web search 工具计费
var dWebSearchQuota decimal.Decimal
var webSearchPrice float64
if relayInfo.ResponsesUsageInfo != nil {
if webSearchTool, exists := relayInfo.ResponsesUsageInfo.BuiltInTools[dto.BuildInToolWebSearchPreview]; exists && webSearchTool.CallCount > 0 {
// 计算 web search 调用的配额 (配额 = 价格 * 调用次数 / 1000)
webSearchPrice = operation_setting.GetWebSearchPricePerThousand(modelName, webSearchTool.SearchContextSize)
dWebSearchQuota = decimal.NewFromFloat(webSearchPrice).
Mul(decimal.NewFromInt(int64(webSearchTool.CallCount))).
Div(decimal.NewFromInt(1000))
extraContent += fmt.Sprintf("Web Search 调用 %d 次,上下文大小 %s调用花费 $%s",
webSearchTool.CallCount, webSearchTool.SearchContextSize, dWebSearchQuota.String())
}
}
// file search tool 计费
var dFileSearchQuota decimal.Decimal
var fileSearchPrice float64
if relayInfo.ResponsesUsageInfo != nil {
if fileSearchTool, exists := relayInfo.ResponsesUsageInfo.BuiltInTools[dto.BuildInToolFileSearch]; exists && fileSearchTool.CallCount > 0 {
fileSearchPrice = operation_setting.GetFileSearchPricePerThousand()
dFileSearchQuota = decimal.NewFromFloat(fileSearchPrice).
Mul(decimal.NewFromInt(int64(fileSearchTool.CallCount))).
Div(decimal.NewFromInt(1000))
extraContent += fmt.Sprintf("File Search 调用 %d 次,调用花费 $%s",
fileSearchTool.CallCount, dFileSearchQuota.String())
}
}
var quotaCalculateDecimal decimal.Decimal
if !priceData.UsePrice {
nonCachedTokens := dPromptTokens.Sub(dCacheTokens)
@@ -380,6 +409,9 @@ func postConsumeQuota(ctx *gin.Context, relayInfo *relaycommon.RelayInfo,
} else {
quotaCalculateDecimal = dModelPrice.Mul(dQuotaPerUnit).Mul(dGroupRatio)
}
// 添加 responses tools call 调用的配额
quotaCalculateDecimal = quotaCalculateDecimal.Add(dWebSearchQuota)
quotaCalculateDecimal = quotaCalculateDecimal.Add(dFileSearchQuota)
quota := int(quotaCalculateDecimal.Round(0).IntPart())
totalTokens := promptTokens + completionTokens
@@ -430,6 +462,20 @@ func postConsumeQuota(ctx *gin.Context, relayInfo *relaycommon.RelayInfo,
other["image_ratio"] = imageRatio
other["image_output"] = imageTokens
}
if !dWebSearchQuota.IsZero() && relayInfo.ResponsesUsageInfo != nil {
if webSearchTool, exists := relayInfo.ResponsesUsageInfo.BuiltInTools[dto.BuildInToolWebSearchPreview]; exists {
other["web_search"] = true
other["web_search_call_count"] = webSearchTool.CallCount
other["web_search_price"] = webSearchPrice
}
}
if !dFileSearchQuota.IsZero() && relayInfo.ResponsesUsageInfo != nil {
if fileSearchTool, exists := relayInfo.ResponsesUsageInfo.BuiltInTools[dto.BuildInToolFileSearch]; exists {
other["file_search"] = true
other["file_search_call_count"] = fileSearchTool.CallCount
other["file_search_price"] = fileSearchPrice
}
}
model.RecordConsumeLog(ctx, relayInfo.UserId, relayInfo.ChannelId, promptTokens, completionTokens, logModel,
tokenName, quota, logContent, relayInfo.TokenId, userQuota, int(useTimeSeconds), relayInfo.IsStream, relayInfo.Group, other)
}