|
|
|
|
@@ -44,6 +44,13 @@ func (s *GatewayService) debugModelRoutingEnabled() bool {
|
|
|
|
|
return v == "1" || v == "true" || v == "yes" || v == "on"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// debugLog prints log only in non-release mode.
|
|
|
|
|
func debugLog(format string, v ...any) {
|
|
|
|
|
if gin.Mode() != gin.ReleaseMode {
|
|
|
|
|
log.Printf(format, v...)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func shortSessionHash(sessionHash string) string {
|
|
|
|
|
if sessionHash == "" {
|
|
|
|
|
return ""
|
|
|
|
|
@@ -412,6 +419,14 @@ func (s *GatewayService) SelectAccountForModelWithExclusions(ctx context.Context
|
|
|
|
|
// SelectAccountWithLoadAwareness selects account with load-awareness and wait plan.
|
|
|
|
|
// metadataUserID: 已废弃参数,会话限制现在统一使用 sessionHash
|
|
|
|
|
func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, groupID *int64, sessionHash string, requestedModel string, excludedIDs map[int64]struct{}, metadataUserID string) (*AccountSelectionResult, error) {
|
|
|
|
|
// 调试日志:记录调度入口参数
|
|
|
|
|
excludedIDsList := make([]int64, 0, len(excludedIDs))
|
|
|
|
|
for id := range excludedIDs {
|
|
|
|
|
excludedIDsList = append(excludedIDsList, id)
|
|
|
|
|
}
|
|
|
|
|
debugLog("[AccountScheduling] Starting account selection: groupID=%v model=%s session=%s excludedIDs=%v",
|
|
|
|
|
derefGroupID(groupID), requestedModel, shortSessionHash(sessionHash), excludedIDsList)
|
|
|
|
|
|
|
|
|
|
cfg := s.schedulingConfig()
|
|
|
|
|
|
|
|
|
|
var stickyAccountID int64
|
|
|
|
|
@@ -454,9 +469,9 @@ func (s *GatewayService) SelectAccountWithLoadAwareness(ctx context.Context, gro
|
|
|
|
|
if err == nil && result.Acquired {
|
|
|
|
|
// 获取槽位后检查会话限制(使用 sessionHash 作为会话标识符)
|
|
|
|
|
if !s.checkAndRegisterSession(ctx, account, sessionHash) {
|
|
|
|
|
result.ReleaseFunc() // 释放槽位
|
|
|
|
|
result.ReleaseFunc() // 释放槽位
|
|
|
|
|
localExcluded[account.ID] = struct{}{} // 排除此账号
|
|
|
|
|
continue // 重新选择
|
|
|
|
|
continue // 重新选择
|
|
|
|
|
}
|
|
|
|
|
return &AccountSelectionResult{
|
|
|
|
|
Account: account,
|
|
|
|
|
@@ -1087,7 +1102,16 @@ func (s *GatewayService) resolvePlatform(ctx context.Context, groupID *int64, gr
|
|
|
|
|
|
|
|
|
|
func (s *GatewayService) listSchedulableAccounts(ctx context.Context, groupID *int64, platform string, hasForcePlatform bool) ([]Account, bool, error) {
|
|
|
|
|
if s.schedulerSnapshot != nil {
|
|
|
|
|
return s.schedulerSnapshot.ListSchedulableAccounts(ctx, groupID, platform, hasForcePlatform)
|
|
|
|
|
accounts, useMixed, err := s.schedulerSnapshot.ListSchedulableAccounts(ctx, groupID, platform, hasForcePlatform)
|
|
|
|
|
if err == nil {
|
|
|
|
|
debugLog("[AccountScheduling] listSchedulableAccounts (snapshot): groupID=%v platform=%s useMixed=%v count=%d",
|
|
|
|
|
derefGroupID(groupID), platform, useMixed, len(accounts))
|
|
|
|
|
for _, acc := range accounts {
|
|
|
|
|
debugLog("[AccountScheduling] - Account ID=%d Name=%s Platform=%s Type=%s Status=%s TLSFingerprint=%v",
|
|
|
|
|
acc.ID, acc.Name, acc.Platform, acc.Type, acc.Status, acc.IsTLSFingerprintEnabled())
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return accounts, useMixed, err
|
|
|
|
|
}
|
|
|
|
|
useMixed := (platform == PlatformAnthropic || platform == PlatformGemini) && !hasForcePlatform
|
|
|
|
|
if useMixed {
|
|
|
|
|
@@ -1100,6 +1124,7 @@ func (s *GatewayService) listSchedulableAccounts(ctx context.Context, groupID *i
|
|
|
|
|
accounts, err = s.accountRepo.ListSchedulableByPlatforms(ctx, platforms)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
debugLog("[AccountScheduling] listSchedulableAccounts FAILED: groupID=%v platform=%s err=%v", derefGroupID(groupID), platform, err)
|
|
|
|
|
return nil, useMixed, err
|
|
|
|
|
}
|
|
|
|
|
filtered := make([]Account, 0, len(accounts))
|
|
|
|
|
@@ -1109,6 +1134,12 @@ func (s *GatewayService) listSchedulableAccounts(ctx context.Context, groupID *i
|
|
|
|
|
}
|
|
|
|
|
filtered = append(filtered, acc)
|
|
|
|
|
}
|
|
|
|
|
debugLog("[AccountScheduling] listSchedulableAccounts (mixed): groupID=%v platform=%s rawCount=%d filteredCount=%d",
|
|
|
|
|
derefGroupID(groupID), platform, len(accounts), len(filtered))
|
|
|
|
|
for _, acc := range filtered {
|
|
|
|
|
debugLog("[AccountScheduling] - Account ID=%d Name=%s Platform=%s Type=%s Status=%s TLSFingerprint=%v",
|
|
|
|
|
acc.ID, acc.Name, acc.Platform, acc.Type, acc.Status, acc.IsTLSFingerprintEnabled())
|
|
|
|
|
}
|
|
|
|
|
return filtered, useMixed, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1123,8 +1154,15 @@ func (s *GatewayService) listSchedulableAccounts(ctx context.Context, groupID *i
|
|
|
|
|
accounts, err = s.accountRepo.ListSchedulableByPlatform(ctx, platform)
|
|
|
|
|
}
|
|
|
|
|
if err != nil {
|
|
|
|
|
debugLog("[AccountScheduling] listSchedulableAccounts FAILED: groupID=%v platform=%s err=%v", derefGroupID(groupID), platform, err)
|
|
|
|
|
return nil, useMixed, err
|
|
|
|
|
}
|
|
|
|
|
debugLog("[AccountScheduling] listSchedulableAccounts (single): groupID=%v platform=%s count=%d",
|
|
|
|
|
derefGroupID(groupID), platform, len(accounts))
|
|
|
|
|
for _, acc := range accounts {
|
|
|
|
|
debugLog("[AccountScheduling] - Account ID=%d Name=%s Platform=%s Type=%s Status=%s TLSFingerprint=%v",
|
|
|
|
|
acc.ID, acc.Name, acc.Platform, acc.Type, acc.Status, acc.IsTLSFingerprintEnabled())
|
|
|
|
|
}
|
|
|
|
|
return accounts, useMixed, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -2129,6 +2167,10 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
|
|
|
|
proxyURL = account.Proxy.URL()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 调试日志:记录即将转发的账号信息
|
|
|
|
|
log.Printf("[Forward] Using account: ID=%d Name=%s Platform=%s Type=%s TLSFingerprint=%v Proxy=%s",
|
|
|
|
|
account.ID, account.Name, account.Platform, account.Type, account.IsTLSFingerprintEnabled(), proxyURL)
|
|
|
|
|
|
|
|
|
|
// 重试循环
|
|
|
|
|
var resp *http.Response
|
|
|
|
|
retryStart := time.Now()
|
|
|
|
|
@@ -2143,7 +2185,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 发送请求
|
|
|
|
|
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
|
|
|
|
resp, err = s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
|
|
|
|
if err != nil {
|
|
|
|
|
if resp != nil && resp.Body != nil {
|
|
|
|
|
_ = resp.Body.Close()
|
|
|
|
|
@@ -2217,7 +2259,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
|
|
|
|
filteredBody := FilterThinkingBlocksForRetry(body)
|
|
|
|
|
retryReq, buildErr := s.buildUpstreamRequest(ctx, c, account, filteredBody, token, tokenType, reqModel)
|
|
|
|
|
if buildErr == nil {
|
|
|
|
|
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
|
|
|
|
retryResp, retryErr := s.httpUpstream.DoWithTLS(retryReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
|
|
|
|
if retryErr == nil {
|
|
|
|
|
if retryResp.StatusCode < 400 {
|
|
|
|
|
log.Printf("Account %d: signature error retry succeeded (thinking downgraded)", account.ID)
|
|
|
|
|
@@ -2249,7 +2291,7 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
|
|
|
|
filteredBody2 := FilterSignatureSensitiveBlocksForRetry(body)
|
|
|
|
|
retryReq2, buildErr2 := s.buildUpstreamRequest(ctx, c, account, filteredBody2, token, tokenType, reqModel)
|
|
|
|
|
if buildErr2 == nil {
|
|
|
|
|
retryResp2, retryErr2 := s.httpUpstream.Do(retryReq2, proxyURL, account.ID, account.Concurrency)
|
|
|
|
|
retryResp2, retryErr2 := s.httpUpstream.DoWithTLS(retryReq2, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
|
|
|
|
if retryErr2 == nil {
|
|
|
|
|
resp = retryResp2
|
|
|
|
|
break
|
|
|
|
|
@@ -2364,6 +2406,10 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
|
|
|
|
_ = resp.Body.Close()
|
|
|
|
|
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
|
|
|
|
|
|
|
|
|
// 调试日志:打印重试耗尽后的错误响应
|
|
|
|
|
log.Printf("[Forward] Upstream error (retry exhausted, failover): Account=%d(%s) Status=%d RequestID=%s Body=%s",
|
|
|
|
|
account.ID, account.Name, resp.StatusCode, resp.Header.Get("x-request-id"), truncateString(string(respBody), 1000))
|
|
|
|
|
|
|
|
|
|
s.handleRetryExhaustedSideEffects(ctx, resp, account)
|
|
|
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
|
|
|
Platform: account.Platform,
|
|
|
|
|
@@ -2391,6 +2437,10 @@ func (s *GatewayService) Forward(ctx context.Context, c *gin.Context, account *A
|
|
|
|
|
_ = resp.Body.Close()
|
|
|
|
|
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
|
|
|
|
|
|
|
|
|
// 调试日志:打印上游错误响应
|
|
|
|
|
log.Printf("[Forward] Upstream error (failover): Account=%d(%s) Status=%d RequestID=%s Body=%s",
|
|
|
|
|
account.ID, account.Name, resp.StatusCode, resp.Header.Get("x-request-id"), truncateString(string(respBody), 1000))
|
|
|
|
|
|
|
|
|
|
s.handleFailoverSideEffects(ctx, resp, account)
|
|
|
|
|
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
|
|
|
|
Platform: account.Platform,
|
|
|
|
|
@@ -2741,6 +2791,10 @@ func extractUpstreamErrorMessage(body []byte) string {
|
|
|
|
|
func (s *GatewayService) handleErrorResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account) (*ForwardResult, error) {
|
|
|
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
|
|
|
|
|
|
|
|
|
// 调试日志:打印上游错误响应
|
|
|
|
|
log.Printf("[Forward] Upstream error (non-retryable): Account=%d(%s) Status=%d RequestID=%s Body=%s",
|
|
|
|
|
account.ID, account.Name, resp.StatusCode, resp.Header.Get("x-request-id"), truncateString(string(body), 1000))
|
|
|
|
|
|
|
|
|
|
upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(body))
|
|
|
|
|
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
|
|
|
|
|
|
|
|
|
@@ -3449,7 +3503,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 发送请求
|
|
|
|
|
resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
|
|
|
|
|
resp, err := s.httpUpstream.DoWithTLS(upstreamReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
|
|
|
|
if err != nil {
|
|
|
|
|
setOpsUpstreamError(c, 0, sanitizeUpstreamErrorMessage(err.Error()), "")
|
|
|
|
|
s.countTokensError(c, http.StatusBadGateway, "upstream_error", "Request failed")
|
|
|
|
|
@@ -3471,7 +3525,7 @@ func (s *GatewayService) ForwardCountTokens(ctx context.Context, c *gin.Context,
|
|
|
|
|
filteredBody := FilterThinkingBlocksForRetry(body)
|
|
|
|
|
retryReq, buildErr := s.buildCountTokensRequest(ctx, c, account, filteredBody, token, tokenType, reqModel)
|
|
|
|
|
if buildErr == nil {
|
|
|
|
|
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
|
|
|
|
retryResp, retryErr := s.httpUpstream.DoWithTLS(retryReq, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
|
|
|
|
if retryErr == nil {
|
|
|
|
|
resp = retryResp
|
|
|
|
|
respBody, err = io.ReadAll(resp.Body)
|
|
|
|
|
|