feat(channel): enhance channel status management

This commit is contained in:
CaIon
2025-07-10 17:49:53 +08:00
parent a9e03e6172
commit cd8c23c0ab
16 changed files with 363 additions and 119 deletions

View File

@@ -452,14 +452,14 @@ func updateAllChannelsBalance() error {
//if channel.Type != common.ChannelTypeOpenAI && channel.Type != common.ChannelTypeCustom {
// continue
//}
balance, err := updateChannelBalance(channel)
_, err := updateChannelBalance(channel)
if err != nil {
continue
} else {
// err is nil & balance <= 0 means quota is used up
if balance <= 0 {
service.DisableChannel(channel.Id, channel.Name, "余额不足")
}
//if balance <= 0 {
// service.DisableChannel(channel.Id, channel.Name, "余额不足")
//}
}
time.Sleep(common.RequestInterval)
}

View File

@@ -30,22 +30,43 @@ import (
"github.com/gin-gonic/gin"
)
func testChannel(channel *model.Channel, testModel string) (err error, newAPIError *types.NewAPIError) {
type testResult struct {
context *gin.Context
localErr error
newAPIError *types.NewAPIError
}
func testChannel(channel *model.Channel, testModel string) testResult {
tik := time.Now()
if channel.Type == constant.ChannelTypeMidjourney {
return errors.New("midjourney channel test is not supported"), nil
return testResult{
localErr: errors.New("midjourney channel test is not supported"),
newAPIError: nil,
}
}
if channel.Type == constant.ChannelTypeMidjourneyPlus {
return errors.New("midjourney plus channel test is not supported"), nil
return testResult{
localErr: errors.New("midjourney plus channel test is not supported"),
newAPIError: nil,
}
}
if channel.Type == constant.ChannelTypeSunoAPI {
return errors.New("suno channel test is not supported"), nil
return testResult{
localErr: errors.New("suno channel test is not supported"),
newAPIError: nil,
}
}
if channel.Type == constant.ChannelTypeKling {
return errors.New("kling channel test is not supported"), nil
return testResult{
localErr: errors.New("kling channel test is not supported"),
newAPIError: nil,
}
}
if channel.Type == constant.ChannelTypeJimeng {
return errors.New("jimeng channel test is not supported"), nil
return testResult{
localErr: errors.New("jimeng channel test is not supported"),
newAPIError: nil,
}
}
w := httptest.NewRecorder()
c, _ := gin.CreateTestContext(w)
@@ -82,7 +103,10 @@ func testChannel(channel *model.Channel, testModel string) (err error, newAPIErr
cache, err := model.GetUserCache(1)
if err != nil {
return err, nil
return testResult{
localErr: err,
newAPIError: nil,
}
}
cache.WriteContext(c)
@@ -93,20 +117,35 @@ func testChannel(channel *model.Channel, testModel string) (err error, newAPIErr
group, _ := model.GetUserGroup(1, false)
c.Set("group", group)
middleware.SetupContextForSelectedChannel(c, channel, testModel)
newAPIError := middleware.SetupContextForSelectedChannel(c, channel, testModel)
if newAPIError != nil {
return testResult{
context: c,
localErr: newAPIError,
newAPIError: newAPIError,
}
}
info := relaycommon.GenRelayInfo(c)
err = helper.ModelMappedHelper(c, info, nil)
if err != nil {
return err, types.NewError(err, types.ErrorCodeChannelModelMappedError)
return testResult{
context: c,
localErr: err,
newAPIError: types.NewError(err, types.ErrorCodeChannelModelMappedError),
}
}
testModel = info.UpstreamModelName
apiType, _ := common.ChannelType2APIType(channel.Type)
adaptor := relay.GetAdaptor(apiType)
if adaptor == nil {
return fmt.Errorf("invalid api type: %d, adaptor is nil", apiType), types.NewError(fmt.Errorf("invalid api type: %d, adaptor is nil", apiType), types.ErrorCodeInvalidApiType)
return testResult{
context: c,
localErr: fmt.Errorf("invalid api type: %d, adaptor is nil", apiType),
newAPIError: types.NewError(fmt.Errorf("invalid api type: %d, adaptor is nil", apiType), types.ErrorCodeInvalidApiType),
}
}
request := buildTestRequest(testModel)
@@ -117,45 +156,77 @@ func testChannel(channel *model.Channel, testModel string) (err error, newAPIErr
priceData, err := helper.ModelPriceHelper(c, info, 0, int(request.MaxTokens))
if err != nil {
return err, types.NewError(err, types.ErrorCodeModelPriceError)
return testResult{
context: c,
localErr: err,
newAPIError: types.NewError(err, types.ErrorCodeModelPriceError),
}
}
adaptor.Init(info)
convertedRequest, err := adaptor.ConvertOpenAIRequest(c, info, request)
if err != nil {
return err, types.NewError(err, types.ErrorCodeConvertRequestFailed)
return testResult{
context: c,
localErr: err,
newAPIError: types.NewError(err, types.ErrorCodeConvertRequestFailed),
}
}
jsonData, err := json.Marshal(convertedRequest)
if err != nil {
return err, types.NewError(err, types.ErrorCodeJsonMarshalFailed)
return testResult{
context: c,
localErr: err,
newAPIError: types.NewError(err, types.ErrorCodeJsonMarshalFailed),
}
}
requestBody := bytes.NewBuffer(jsonData)
c.Request.Body = io.NopCloser(requestBody)
resp, err := adaptor.DoRequest(c, info, requestBody)
if err != nil {
return err, types.NewError(err, types.ErrorCodeDoRequestFailed)
return testResult{
context: c,
localErr: err,
newAPIError: types.NewError(err, types.ErrorCodeDoRequestFailed),
}
}
var httpResp *http.Response
if resp != nil {
httpResp = resp.(*http.Response)
if httpResp.StatusCode != http.StatusOK {
err := service.RelayErrorHandler(httpResp, true)
return err, types.NewError(err, types.ErrorCodeBadResponse)
return testResult{
context: c,
localErr: err,
newAPIError: types.NewError(err, types.ErrorCodeBadResponse),
}
}
}
usageA, respErr := adaptor.DoResponse(c, httpResp, info)
if respErr != nil {
return respErr, respErr
return testResult{
context: c,
localErr: respErr,
newAPIError: respErr,
}
}
if usageA == nil {
return errors.New("usage is nil"), types.NewError(errors.New("usage is nil"), types.ErrorCodeBadResponseBody)
return testResult{
context: c,
localErr: errors.New("usage is nil"),
newAPIError: types.NewError(errors.New("usage is nil"), types.ErrorCodeBadResponseBody),
}
}
usage := usageA.(*dto.Usage)
result := w.Result()
respBody, err := io.ReadAll(result.Body)
if err != nil {
return err, types.NewError(err, types.ErrorCodeReadResponseBodyFailed)
return testResult{
context: c,
localErr: err,
newAPIError: types.NewError(err, types.ErrorCodeReadResponseBodyFailed),
}
}
info.PromptTokens = usage.PromptTokens
@@ -188,7 +259,11 @@ func testChannel(channel *model.Channel, testModel string) (err error, newAPIErr
Other: other,
})
common.SysLog(fmt.Sprintf("testing channel #%d, response: \n%s", channel.Id, string(respBody)))
return nil, nil
return testResult{
context: c,
localErr: nil,
newAPIError: nil,
}
}
func buildTestRequest(model string) *dto.GeneralOpenAIRequest {
@@ -247,15 +322,23 @@ func TestChannel(c *gin.Context) {
}
testModel := c.Query("model")
tik := time.Now()
_, newAPIError := testChannel(channel, testModel)
result := testChannel(channel, testModel)
if result.localErr != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": result.localErr.Error(),
"time": 0.0,
})
return
}
tok := time.Now()
milliseconds := tok.Sub(tik).Milliseconds()
go channel.UpdateResponseTime(milliseconds)
consumedTime := float64(milliseconds) / 1000.0
if newAPIError != nil {
if result.newAPIError != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": newAPIError.Error(),
"message": result.newAPIError.Error(),
"time": consumedTime,
})
return
@@ -280,9 +363,9 @@ func testAllChannels(notify bool) error {
}
testAllChannelsRunning = true
testAllChannelsLock.Unlock()
channels, err := model.GetAllChannels(0, 0, true, false)
if err != nil {
return err
channels, getChannelErr := model.GetAllChannels(0, 0, true, false)
if getChannelErr != nil {
return getChannelErr
}
var disableThreshold = int64(common.ChannelDisableThreshold * 1000)
if disableThreshold == 0 {
@@ -299,30 +382,34 @@ func testAllChannels(notify bool) error {
for _, channel := range channels {
isChannelEnabled := channel.Status == common.ChannelStatusEnabled
tik := time.Now()
err, newAPIError := testChannel(channel, "")
result := testChannel(channel, "")
tok := time.Now()
milliseconds := tok.Sub(tik).Milliseconds()
shouldBanChannel := false
newAPIError := result.newAPIError
// request error disables the channel
if err != nil {
shouldBanChannel = service.ShouldDisableChannel(channel.Type, newAPIError)
if newAPIError != nil {
shouldBanChannel = service.ShouldDisableChannel(channel.Type, result.newAPIError)
}
if milliseconds > disableThreshold {
err = errors.New(fmt.Sprintf("响应时间 %.2fs 超过阈值 %.2fs", float64(milliseconds)/1000.0, float64(disableThreshold)/1000.0))
shouldBanChannel = true
// 当错误检查通过,才检查响应时间
if common.AutomaticDisableChannelEnabled && !shouldBanChannel {
if milliseconds > disableThreshold {
err := errors.New(fmt.Sprintf("响应时间 %.2fs 超过阈值 %.2fs", float64(milliseconds)/1000.0, float64(disableThreshold)/1000.0))
newAPIError = types.NewError(err, types.ErrorCodeChannelResponseTimeExceeded)
shouldBanChannel = true
}
}
// disable channel
if isChannelEnabled && shouldBanChannel && channel.GetAutoBan() {
service.DisableChannel(channel.Id, channel.Name, err.Error())
go processChannelError(result.context, *types.NewChannelError(channel.Id, channel.Type, channel.Name, channel.ChannelInfo.IsMultiKey, common.GetContextKeyString(result.context, constant.ContextKeyChannelKey), channel.GetAutoBan()), newAPIError)
}
// enable channel
if !isChannelEnabled && service.ShouldEnableChannel(err, newAPIError, channel.Status) {
service.EnableChannel(channel.Id, channel.Name)
if !isChannelEnabled && service.ShouldEnableChannel(newAPIError, channel.Status) {
service.EnableChannel(channel.Id, common.GetContextKeyString(result.context, constant.ContextKeyChannelKey), channel.Name)
}
channel.UpdateResponseTime(milliseconds)

View File

@@ -497,6 +497,7 @@ func AddChannel(c *gin.Context) {
})
return
}
addChannelRequest.Channel.ChannelInfo.MultiKeySize = len(array)
addChannelRequest.Channel.Key = strings.Join(array, "\n")
} else {
cleanKeys := make([]string, 0)
@@ -507,6 +508,7 @@ func AddChannel(c *gin.Context) {
key = strings.TrimSpace(key)
cleanKeys = append(cleanKeys, key)
}
addChannelRequest.Channel.ChannelInfo.MultiKeySize = len(cleanKeys)
addChannelRequest.Channel.Key = strings.Join(cleanKeys, "\n")
}
keys = []string{addChannelRequest.Channel.Key}

View File

@@ -80,7 +80,7 @@ func Relay(c *gin.Context) {
channel, err := getChannel(c, group, originalModel, i)
if err != nil {
common.LogError(c, err.Error())
newAPIError = types.NewError(err, types.ErrorCodeGetChannelFailed)
newAPIError = err
break
}
@@ -90,7 +90,7 @@ func Relay(c *gin.Context) {
return // 成功处理请求,直接返回
}
go processChannelError(c, channel.Id, channel.Type, channel.Name, channel.GetAutoBan(), newAPIError)
go processChannelError(c, *types.NewChannelError(channel.Id, channel.Type, channel.Name, channel.ChannelInfo.IsMultiKey, common.GetContextKeyString(c, constant.ContextKeyChannelKey), channel.GetAutoBan()), newAPIError)
if !shouldRetry(c, newAPIError, common.RetryTimes-i) {
break
@@ -103,10 +103,10 @@ func Relay(c *gin.Context) {
}
if newAPIError != nil {
if newAPIError.StatusCode == http.StatusTooManyRequests {
common.LogError(c, fmt.Sprintf("origin 429 error: %s", newAPIError.Error()))
newAPIError.SetMessage("当前分组上游负载已饱和,请稍后再试")
}
//if newAPIError.StatusCode == http.StatusTooManyRequests {
// common.LogError(c, fmt.Sprintf("origin 429 error: %s", newAPIError.Error()))
// newAPIError.SetMessage("当前分组上游负载已饱和,请稍后再试")
//}
newAPIError.SetMessage(common.MessageWithRequestId(newAPIError.Error(), requestId))
c.JSON(newAPIError.StatusCode, gin.H{
"error": newAPIError.ToOpenAIError(),
@@ -143,7 +143,7 @@ func WssRelay(c *gin.Context) {
channel, err := getChannel(c, group, originalModel, i)
if err != nil {
common.LogError(c, err.Error())
newAPIError = types.NewError(err, types.ErrorCodeGetChannelFailed)
newAPIError = err
break
}
@@ -153,7 +153,7 @@ func WssRelay(c *gin.Context) {
return // 成功处理请求,直接返回
}
go processChannelError(c, channel.Id, channel.Type, channel.Name, channel.GetAutoBan(), newAPIError)
go processChannelError(c, *types.NewChannelError(channel.Id, channel.Type, channel.Name, channel.ChannelInfo.IsMultiKey, common.GetContextKeyString(c, constant.ContextKeyChannelKey), channel.GetAutoBan()), newAPIError)
if !shouldRetry(c, newAPIError, common.RetryTimes-i) {
break
@@ -166,9 +166,9 @@ func WssRelay(c *gin.Context) {
}
if newAPIError != nil {
if newAPIError.StatusCode == http.StatusTooManyRequests {
newAPIError.SetMessage("当前分组上游负载已饱和,请稍后再试")
}
//if newAPIError.StatusCode == http.StatusTooManyRequests {
// newAPIError.SetMessage("当前分组上游负载已饱和,请稍后再试")
//}
newAPIError.SetMessage(common.MessageWithRequestId(newAPIError.Error(), requestId))
helper.WssError(c, ws, newAPIError.ToOpenAIError())
}
@@ -185,7 +185,7 @@ func RelayClaude(c *gin.Context) {
channel, err := getChannel(c, group, originalModel, i)
if err != nil {
common.LogError(c, err.Error())
newAPIError = types.NewError(err, types.ErrorCodeGetChannelFailed)
newAPIError = err
break
}
@@ -195,7 +195,7 @@ func RelayClaude(c *gin.Context) {
return // 成功处理请求,直接返回
}
go processChannelError(c, channel.Id, channel.Type, channel.Name, channel.GetAutoBan(), newAPIError)
go processChannelError(c, *types.NewChannelError(channel.Id, channel.Type, channel.Name, channel.ChannelInfo.IsMultiKey, common.GetContextKeyString(c, constant.ContextKeyChannelKey), channel.GetAutoBan()), newAPIError)
if !shouldRetry(c, newAPIError, common.RetryTimes-i) {
break
@@ -243,7 +243,7 @@ func addUsedChannel(c *gin.Context, channelId int) {
c.Set("use_channel", useChannel)
}
func getChannel(c *gin.Context, group, originalModel string, retryCount int) (*model.Channel, error) {
func getChannel(c *gin.Context, group, originalModel string, retryCount int) (*model.Channel, *types.NewAPIError) {
if retryCount == 0 {
autoBan := c.GetBool("auto_ban")
autoBanInt := 1
@@ -260,11 +260,14 @@ func getChannel(c *gin.Context, group, originalModel string, retryCount int) (*m
channel, selectGroup, err := model.CacheGetRandomSatisfiedChannel(c, group, originalModel, retryCount)
if err != nil {
if group == "auto" {
return nil, errors.New(fmt.Sprintf("获取自动分组下模型 %s 的可用渠道失败: %s", originalModel, err.Error()))
return nil, types.NewError(errors.New(fmt.Sprintf("获取自动分组下模型 %s 的可用渠道失败: %s", originalModel, err.Error())), types.ErrorCodeGetChannelFailed)
}
return nil, errors.New(fmt.Sprintf("获取分组 %s 下模型 %s 的可用渠道失败: %s", selectGroup, originalModel, err.Error()))
return nil, types.NewError(errors.New(fmt.Sprintf("获取分组 %s 下模型 %s 的可用渠道失败: %s", selectGroup, originalModel, err.Error())), types.ErrorCodeGetChannelFailed)
}
newAPIError := middleware.SetupContextForSelectedChannel(c, channel, originalModel)
if newAPIError != nil {
return nil, newAPIError
}
middleware.SetupContextForSelectedChannel(c, channel, originalModel)
return channel, nil
}
@@ -314,12 +317,12 @@ func shouldRetry(c *gin.Context, openaiErr *types.NewAPIError, retryTimes int) b
return true
}
func processChannelError(c *gin.Context, channelId int, channelType int, channelName string, autoBan bool, err *types.NewAPIError) {
func processChannelError(c *gin.Context, channelError types.ChannelError, err *types.NewAPIError) {
// 不要使用context获取渠道信息异步处理时可能会出现渠道信息不一致的情况
// do not use context to get channel info, there may be inconsistent channel info when processing asynchronously
common.LogError(c, fmt.Sprintf("relay error (channel #%d, status code: %d): %s", channelId, err.StatusCode, err.Error()))
if service.ShouldDisableChannel(channelType, err) && autoBan {
service.DisableChannel(channelId, channelName, err.Error())
common.LogError(c, fmt.Sprintf("relay error (channel #%d, status code: %d): %s", channelError.ChannelId, err.StatusCode, err.Error()))
if service.ShouldDisableChannel(channelError.ChannelId, err) && channelError.AutoBan {
service.DisableChannel(channelError, err.Error())
}
}
@@ -392,10 +395,10 @@ func RelayTask(c *gin.Context) {
retryTimes = 0
}
for i := 0; shouldRetryTaskRelay(c, channelId, taskErr, retryTimes) && i < retryTimes; i++ {
channel, err := getChannel(c, group, originalModel, i)
if err != nil {
common.LogError(c, fmt.Sprintf("CacheGetRandomSatisfiedChannel failed: %s", err.Error()))
taskErr = service.TaskErrorWrapperLocal(err, "get_channel_failed", http.StatusInternalServerError)
channel, newAPIError := getChannel(c, group, originalModel, i)
if newAPIError != nil {
common.LogError(c, fmt.Sprintf("CacheGetRandomSatisfiedChannel failed: %s", newAPIError.Error()))
taskErr = service.TaskErrorWrapperLocal(newAPIError.Err, "get_channel_failed", http.StatusInternalServerError)
break
}
channelId = channel.Id
@@ -405,7 +408,7 @@ func RelayTask(c *gin.Context) {
common.LogInfo(c, fmt.Sprintf("using channel #%d to retry (remain times %d)", channel.Id, i))
//middleware.SetupContextForSelectedChannel(c, channel, originalModel)
requestBody, err := common.GetRequestBody(c)
requestBody, _ := common.GetRequestBody(c)
c.Request.Body = io.NopCloser(bytes.NewBuffer(requestBody))
taskErr = taskRelayHandler(c, relayMode)
}