Merge branch 'upstream-main' into feature/improve-param-override

# Conflicts:
#	relay/channel/api_request_test.go
#	relay/common/override_test.go
#	web/src/components/table/channels/modals/EditChannelModal.jsx
This commit is contained in:
Seefs
2026-02-25 13:39:54 +08:00
124 changed files with 7729 additions and 1971 deletions

View File

@@ -455,72 +455,147 @@ func RelayNotFound(c *gin.Context) {
})
}
func RelayTask(c *gin.Context) {
retryTimes := common.RetryTimes
channelId := c.GetInt("channel_id")
c.Set("use_channel", []string{fmt.Sprintf("%d", channelId)})
func RelayTaskFetch(c *gin.Context) {
relayInfo, err := relaycommon.GenRelayInfo(c, types.RelayFormatTask, nil, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, &dto.TaskError{
Code: "gen_relay_info_failed",
Message: err.Error(),
StatusCode: http.StatusInternalServerError,
})
return
}
taskErr := taskRelayHandler(c, relayInfo)
if taskErr == nil {
retryTimes = 0
if taskErr := relay.RelayTaskFetch(c, relayInfo.RelayMode); taskErr != nil {
respondTaskError(c, taskErr)
}
}
func RelayTask(c *gin.Context) {
relayInfo, err := relaycommon.GenRelayInfo(c, types.RelayFormatTask, nil, nil)
if err != nil {
c.JSON(http.StatusInternalServerError, &dto.TaskError{
Code: "gen_relay_info_failed",
Message: err.Error(),
StatusCode: http.StatusInternalServerError,
})
return
}
if taskErr := relay.ResolveOriginTask(c, relayInfo); taskErr != nil {
respondTaskError(c, taskErr)
return
}
var result *relay.TaskSubmitResult
var taskErr *dto.TaskError
defer func() {
if taskErr != nil && relayInfo.Billing != nil {
relayInfo.Billing.Refund(c)
}
}()
retryParam := &service.RetryParam{
Ctx: c,
TokenGroup: relayInfo.TokenGroup,
ModelName: relayInfo.OriginModelName,
Retry: common.GetPointer(0),
}
for ; shouldRetryTaskRelay(c, channelId, taskErr, retryTimes) && retryParam.GetRetry() < retryTimes; retryParam.IncreaseRetry() {
channel, newAPIError := getChannel(c, relayInfo, retryParam)
if newAPIError != nil {
logger.LogError(c, fmt.Sprintf("CacheGetRandomSatisfiedChannel failed: %s", newAPIError.Error()))
taskErr = service.TaskErrorWrapperLocal(newAPIError.Err, "get_channel_failed", http.StatusInternalServerError)
break
}
channelId = channel.Id
useChannel := c.GetStringSlice("use_channel")
useChannel = append(useChannel, fmt.Sprintf("%d", channelId))
c.Set("use_channel", useChannel)
logger.LogInfo(c, fmt.Sprintf("using channel #%d to retry (remain times %d)", channel.Id, retryParam.GetRetry()))
//middleware.SetupContextForSelectedChannel(c, channel, originalModel)
bodyStorage, err := common.GetBodyStorage(c)
if err != nil {
if common.IsRequestBodyTooLargeError(err) || errors.Is(err, common.ErrRequestBodyTooLarge) {
taskErr = service.TaskErrorWrapperLocal(err, "read_request_body_failed", http.StatusRequestEntityTooLarge)
for ; retryParam.GetRetry() <= common.RetryTimes; retryParam.IncreaseRetry() {
var channel *model.Channel
if lockedCh, ok := relayInfo.LockedChannel.(*model.Channel); ok && lockedCh != nil {
channel = lockedCh
if retryParam.GetRetry() > 0 {
if setupErr := middleware.SetupContextForSelectedChannel(c, channel, relayInfo.OriginModelName); setupErr != nil {
taskErr = service.TaskErrorWrapperLocal(setupErr.Err, "setup_locked_channel_failed", http.StatusInternalServerError)
break
}
}
} else {
var channelErr *types.NewAPIError
channel, channelErr = getChannel(c, relayInfo, retryParam)
if channelErr != nil {
logger.LogError(c, channelErr.Error())
taskErr = service.TaskErrorWrapperLocal(channelErr.Err, "get_channel_failed", http.StatusInternalServerError)
break
}
}
addUsedChannel(c, channel.Id)
bodyStorage, bodyErr := common.GetBodyStorage(c)
if bodyErr != nil {
if common.IsRequestBodyTooLargeError(bodyErr) || errors.Is(bodyErr, common.ErrRequestBodyTooLarge) {
taskErr = service.TaskErrorWrapperLocal(bodyErr, "read_request_body_failed", http.StatusRequestEntityTooLarge)
} else {
taskErr = service.TaskErrorWrapperLocal(err, "read_request_body_failed", http.StatusBadRequest)
taskErr = service.TaskErrorWrapperLocal(bodyErr, "read_request_body_failed", http.StatusBadRequest)
}
break
}
c.Request.Body = io.NopCloser(bodyStorage)
taskErr = taskRelayHandler(c, relayInfo)
result, taskErr = relay.RelayTaskSubmit(c, relayInfo)
if taskErr == nil {
break
}
if !taskErr.LocalError {
processChannelError(c,
*types.NewChannelError(channel.Id, channel.Type, channel.Name, channel.ChannelInfo.IsMultiKey,
common.GetContextKeyString(c, constant.ContextKeyChannelKey), channel.GetAutoBan()),
types.NewOpenAIError(taskErr.Error, types.ErrorCodeBadResponseStatusCode, taskErr.StatusCode))
}
if !shouldRetryTaskRelay(c, channel.Id, taskErr, common.RetryTimes-retryParam.GetRetry()) {
break
}
}
useChannel := c.GetStringSlice("use_channel")
if len(useChannel) > 1 {
retryLogStr := fmt.Sprintf("重试:%s", strings.Trim(strings.Join(strings.Fields(fmt.Sprint(useChannel)), "->"), "[]"))
logger.LogInfo(c, retryLogStr)
}
if taskErr != nil {
if taskErr.StatusCode == http.StatusTooManyRequests {
taskErr.Message = "当前分组上游负载已饱和,请稍后再试"
// ── 成功:结算 + 日志 + 插入任务 ──
if taskErr == nil {
if settleErr := service.SettleBilling(c, relayInfo, result.Quota); settleErr != nil {
common.SysError("settle task billing error: " + settleErr.Error())
}
c.JSON(taskErr.StatusCode, taskErr)
service.LogTaskConsumption(c, relayInfo)
task := model.InitTask(result.Platform, relayInfo)
task.PrivateData.UpstreamTaskID = result.UpstreamTaskID
task.PrivateData.BillingSource = relayInfo.BillingSource
task.PrivateData.SubscriptionId = relayInfo.SubscriptionId
task.PrivateData.TokenId = relayInfo.TokenId
task.PrivateData.BillingContext = &model.TaskBillingContext{
ModelPrice: relayInfo.PriceData.ModelPrice,
GroupRatio: relayInfo.PriceData.GroupRatioInfo.GroupRatio,
ModelRatio: relayInfo.PriceData.ModelRatio,
OtherRatios: relayInfo.PriceData.OtherRatios,
OriginModelName: relayInfo.OriginModelName,
PerCallBilling: common.StringsContains(constant.TaskPricePatches, relayInfo.OriginModelName),
}
task.Quota = result.Quota
task.Data = result.TaskData
task.Action = relayInfo.Action
if insertErr := task.Insert(); insertErr != nil {
common.SysError("insert task error: " + insertErr.Error())
}
}
if taskErr != nil {
respondTaskError(c, taskErr)
}
}
func taskRelayHandler(c *gin.Context, relayInfo *relaycommon.RelayInfo) *dto.TaskError {
var err *dto.TaskError
switch relayInfo.RelayMode {
case relayconstant.RelayModeSunoFetch, relayconstant.RelayModeSunoFetchByID, relayconstant.RelayModeVideoFetchByID:
err = relay.RelayTaskFetch(c, relayInfo.RelayMode)
default:
err = relay.RelayTaskSubmit(c, relayInfo)
// respondTaskError 统一输出 Task 错误响应(含 429 限流提示改写)
func respondTaskError(c *gin.Context, taskErr *dto.TaskError) {
if taskErr.StatusCode == http.StatusTooManyRequests {
taskErr.Message = "当前分组上游负载已饱和,请稍后再试"
}
return err
c.JSON(taskErr.StatusCode, taskErr)
}
func shouldRetryTaskRelay(c *gin.Context, channelId int, taskErr *dto.TaskError, retryTimes int) bool {
@@ -544,7 +619,7 @@ func shouldRetryTaskRelay(c *gin.Context, channelId int, taskErr *dto.TaskError,
}
if taskErr.StatusCode/100 == 5 {
// 超时不重试
if taskErr.StatusCode == 504 || taskErr.StatusCode == 524 {
if operation_setting.IsAlwaysSkipRetryStatusCode(taskErr.StatusCode) {
return false
}
return true