fix(sora): 优化 challenge 重试与调试日志
This commit is contained in:
@@ -37,6 +37,7 @@ type SoraGatewayHandler struct {
|
|||||||
concurrencyHelper *ConcurrencyHelper
|
concurrencyHelper *ConcurrencyHelper
|
||||||
maxAccountSwitches int
|
maxAccountSwitches int
|
||||||
streamMode string
|
streamMode string
|
||||||
|
soraTLSEnabled bool
|
||||||
soraMediaSigningKey string
|
soraMediaSigningKey string
|
||||||
soraMediaRoot string
|
soraMediaRoot string
|
||||||
}
|
}
|
||||||
@@ -52,6 +53,7 @@ func NewSoraGatewayHandler(
|
|||||||
pingInterval := time.Duration(0)
|
pingInterval := time.Duration(0)
|
||||||
maxAccountSwitches := 3
|
maxAccountSwitches := 3
|
||||||
streamMode := "force"
|
streamMode := "force"
|
||||||
|
soraTLSEnabled := true
|
||||||
signKey := ""
|
signKey := ""
|
||||||
mediaRoot := "/app/data/sora"
|
mediaRoot := "/app/data/sora"
|
||||||
if cfg != nil {
|
if cfg != nil {
|
||||||
@@ -62,6 +64,7 @@ func NewSoraGatewayHandler(
|
|||||||
if mode := strings.TrimSpace(cfg.Gateway.SoraStreamMode); mode != "" {
|
if mode := strings.TrimSpace(cfg.Gateway.SoraStreamMode); mode != "" {
|
||||||
streamMode = mode
|
streamMode = mode
|
||||||
}
|
}
|
||||||
|
soraTLSEnabled = !cfg.Sora.Client.DisableTLSFingerprint
|
||||||
signKey = strings.TrimSpace(cfg.Gateway.SoraMediaSigningKey)
|
signKey = strings.TrimSpace(cfg.Gateway.SoraMediaSigningKey)
|
||||||
if root := strings.TrimSpace(cfg.Sora.Storage.LocalPath); root != "" {
|
if root := strings.TrimSpace(cfg.Sora.Storage.LocalPath); root != "" {
|
||||||
mediaRoot = root
|
mediaRoot = root
|
||||||
@@ -74,6 +77,7 @@ func NewSoraGatewayHandler(
|
|||||||
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval),
|
concurrencyHelper: NewConcurrencyHelper(concurrencyService, SSEPingFormatComment, pingInterval),
|
||||||
maxAccountSwitches: maxAccountSwitches,
|
maxAccountSwitches: maxAccountSwitches,
|
||||||
streamMode: strings.ToLower(streamMode),
|
streamMode: strings.ToLower(streamMode),
|
||||||
|
soraTLSEnabled: soraTLSEnabled,
|
||||||
soraMediaSigningKey: signKey,
|
soraMediaSigningKey: signKey,
|
||||||
soraMediaRoot: mediaRoot,
|
soraMediaRoot: mediaRoot,
|
||||||
}
|
}
|
||||||
@@ -247,6 +251,12 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
account := selection.Account
|
account := selection.Account
|
||||||
setOpsSelectedAccount(c, account.ID, account.Platform)
|
setOpsSelectedAccount(c, account.ID, account.Platform)
|
||||||
|
proxyBound := account.ProxyID != nil
|
||||||
|
proxyID := int64(0)
|
||||||
|
if account.ProxyID != nil {
|
||||||
|
proxyID = *account.ProxyID
|
||||||
|
}
|
||||||
|
tlsFingerprintEnabled := h.soraTLSEnabled
|
||||||
|
|
||||||
accountReleaseFunc := selection.ReleaseFunc
|
accountReleaseFunc := selection.ReleaseFunc
|
||||||
if !selection.Acquired {
|
if !selection.Acquired {
|
||||||
@@ -257,10 +267,19 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
accountWaitCounted := false
|
accountWaitCounted := false
|
||||||
canWait, err := h.concurrencyHelper.IncrementAccountWaitCount(c.Request.Context(), account.ID, selection.WaitPlan.MaxWaiting)
|
canWait, err := h.concurrencyHelper.IncrementAccountWaitCount(c.Request.Context(), account.ID, selection.WaitPlan.MaxWaiting)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
reqLog.Warn("sora.account_wait_counter_increment_failed", zap.Int64("account_id", account.ID), zap.Error(err))
|
reqLog.Warn("sora.account_wait_counter_increment_failed",
|
||||||
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int64("proxy_id", proxyID),
|
||||||
|
zap.Bool("proxy_bound", proxyBound),
|
||||||
|
zap.Bool("tls_fingerprint_enabled", tlsFingerprintEnabled),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
} else if !canWait {
|
} else if !canWait {
|
||||||
reqLog.Info("sora.account_wait_queue_full",
|
reqLog.Info("sora.account_wait_queue_full",
|
||||||
zap.Int64("account_id", account.ID),
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int64("proxy_id", proxyID),
|
||||||
|
zap.Bool("proxy_bound", proxyBound),
|
||||||
|
zap.Bool("tls_fingerprint_enabled", tlsFingerprintEnabled),
|
||||||
zap.Int("max_waiting", selection.WaitPlan.MaxWaiting),
|
zap.Int("max_waiting", selection.WaitPlan.MaxWaiting),
|
||||||
)
|
)
|
||||||
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "Too many pending requests, please retry later", streamStarted)
|
h.handleStreamingAwareError(c, http.StatusTooManyRequests, "rate_limit_error", "Too many pending requests, please retry later", streamStarted)
|
||||||
@@ -284,7 +303,13 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
&streamStarted,
|
&streamStarted,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
reqLog.Warn("sora.account_slot_acquire_failed", zap.Int64("account_id", account.ID), zap.Error(err))
|
reqLog.Warn("sora.account_slot_acquire_failed",
|
||||||
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int64("proxy_id", proxyID),
|
||||||
|
zap.Bool("proxy_bound", proxyBound),
|
||||||
|
zap.Bool("tls_fingerprint_enabled", tlsFingerprintEnabled),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
h.handleConcurrencyError(c, err, "account", streamStarted)
|
h.handleConcurrencyError(c, err, "account", streamStarted)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -310,6 +335,9 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(lastFailoverHeaders, lastFailoverBody)
|
rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(lastFailoverHeaders, lastFailoverBody)
|
||||||
fields := []zap.Field{
|
fields := []zap.Field{
|
||||||
zap.Int64("account_id", account.ID),
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int64("proxy_id", proxyID),
|
||||||
|
zap.Bool("proxy_bound", proxyBound),
|
||||||
|
zap.Bool("tls_fingerprint_enabled", tlsFingerprintEnabled),
|
||||||
zap.Int("upstream_status", failoverErr.StatusCode),
|
zap.Int("upstream_status", failoverErr.StatusCode),
|
||||||
zap.Int("switch_count", switchCount),
|
zap.Int("switch_count", switchCount),
|
||||||
zap.Int("max_switches", maxAccountSwitches),
|
zap.Int("max_switches", maxAccountSwitches),
|
||||||
@@ -335,6 +363,9 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(lastFailoverHeaders, lastFailoverBody)
|
rayID, mitigated, contentType := extractSoraFailoverHeaderInsights(lastFailoverHeaders, lastFailoverBody)
|
||||||
fields := []zap.Field{
|
fields := []zap.Field{
|
||||||
zap.Int64("account_id", account.ID),
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int64("proxy_id", proxyID),
|
||||||
|
zap.Bool("proxy_bound", proxyBound),
|
||||||
|
zap.Bool("tls_fingerprint_enabled", tlsFingerprintEnabled),
|
||||||
zap.Int("upstream_status", failoverErr.StatusCode),
|
zap.Int("upstream_status", failoverErr.StatusCode),
|
||||||
zap.String("upstream_error_code", upstreamErrCode),
|
zap.String("upstream_error_code", upstreamErrCode),
|
||||||
zap.String("upstream_error_message", upstreamErrMsg),
|
zap.String("upstream_error_message", upstreamErrMsg),
|
||||||
@@ -353,7 +384,13 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
reqLog.Warn("sora.upstream_failover_switching", fields...)
|
reqLog.Warn("sora.upstream_failover_switching", fields...)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
reqLog.Error("sora.forward_failed", zap.Int64("account_id", account.ID), zap.Error(err))
|
reqLog.Error("sora.forward_failed",
|
||||||
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int64("proxy_id", proxyID),
|
||||||
|
zap.Bool("proxy_bound", proxyBound),
|
||||||
|
zap.Bool("tls_fingerprint_enabled", tlsFingerprintEnabled),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -384,6 +421,9 @@ func (h *SoraGatewayHandler) ChatCompletions(c *gin.Context) {
|
|||||||
}(result, account, userAgent, clientIP)
|
}(result, account, userAgent, clientIP)
|
||||||
reqLog.Debug("sora.request_completed",
|
reqLog.Debug("sora.request_completed",
|
||||||
zap.Int64("account_id", account.ID),
|
zap.Int64("account_id", account.ID),
|
||||||
|
zap.Int64("proxy_id", proxyID),
|
||||||
|
zap.Bool("proxy_bound", proxyBound),
|
||||||
|
zap.Bool("tls_fingerprint_enabled", tlsFingerprintEnabled),
|
||||||
zap.Int("switch_count", switchCount),
|
zap.Int("switch_count", switchCount),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -234,6 +234,14 @@ type SoraDirectClient struct {
|
|||||||
sidecarSessions map[string]soraSidecarSessionEntry
|
sidecarSessions map[string]soraSidecarSessionEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type soraRequestTraceContextKey struct{}
|
||||||
|
|
||||||
|
type soraRequestTrace struct {
|
||||||
|
ID string
|
||||||
|
ProxyKey string
|
||||||
|
UAHash string
|
||||||
|
}
|
||||||
|
|
||||||
// NewSoraDirectClient 创建 Sora 直连客户端
|
// NewSoraDirectClient 创建 Sora 直连客户端
|
||||||
func NewSoraDirectClient(cfg *config.Config, httpUpstream HTTPUpstream, tokenProvider *OpenAITokenProvider) *SoraDirectClient {
|
func NewSoraDirectClient(cfg *config.Config, httpUpstream HTTPUpstream, tokenProvider *OpenAITokenProvider) *SoraDirectClient {
|
||||||
baseURL := ""
|
baseURL := ""
|
||||||
@@ -377,6 +385,7 @@ func (c *SoraDirectClient) CreateImageTask(ctx context.Context, account *Account
|
|||||||
}
|
}
|
||||||
userAgent := c.taskUserAgent()
|
userAgent := c.taskUserAgent()
|
||||||
proxyURL := c.resolveProxyURL(account)
|
proxyURL := c.resolveProxyURL(account)
|
||||||
|
ctx = c.withRequestTrace(ctx, account, proxyURL, userAgent)
|
||||||
operation := "simple_compose"
|
operation := "simple_compose"
|
||||||
inpaintItems := []map[string]any{}
|
inpaintItems := []map[string]any{}
|
||||||
if strings.TrimSpace(req.MediaID) != "" {
|
if strings.TrimSpace(req.MediaID) != "" {
|
||||||
@@ -430,6 +439,7 @@ func (c *SoraDirectClient) CreateVideoTask(ctx context.Context, account *Account
|
|||||||
}
|
}
|
||||||
userAgent := c.taskUserAgent()
|
userAgent := c.taskUserAgent()
|
||||||
proxyURL := c.resolveProxyURL(account)
|
proxyURL := c.resolveProxyURL(account)
|
||||||
|
ctx = c.withRequestTrace(ctx, account, proxyURL, userAgent)
|
||||||
orientation := req.Orientation
|
orientation := req.Orientation
|
||||||
if orientation == "" {
|
if orientation == "" {
|
||||||
orientation = "landscape"
|
orientation = "landscape"
|
||||||
@@ -504,6 +514,7 @@ func (c *SoraDirectClient) CreateStoryboardTask(ctx context.Context, account *Ac
|
|||||||
}
|
}
|
||||||
userAgent := c.taskUserAgent()
|
userAgent := c.taskUserAgent()
|
||||||
proxyURL := c.resolveProxyURL(account)
|
proxyURL := c.resolveProxyURL(account)
|
||||||
|
ctx = c.withRequestTrace(ctx, account, proxyURL, userAgent)
|
||||||
orientation := req.Orientation
|
orientation := req.Orientation
|
||||||
if orientation == "" {
|
if orientation == "" {
|
||||||
orientation = "landscape"
|
orientation = "landscape"
|
||||||
@@ -724,6 +735,7 @@ func (c *SoraDirectClient) FinalizeCharacter(ctx context.Context, account *Accou
|
|||||||
}
|
}
|
||||||
userAgent := c.taskUserAgent()
|
userAgent := c.taskUserAgent()
|
||||||
proxyURL := c.resolveProxyURL(account)
|
proxyURL := c.resolveProxyURL(account)
|
||||||
|
ctx = c.withRequestTrace(ctx, account, proxyURL, userAgent)
|
||||||
payload := map[string]any{
|
payload := map[string]any{
|
||||||
"cameo_id": req.CameoID,
|
"cameo_id": req.CameoID,
|
||||||
"username": req.Username,
|
"username": req.Username,
|
||||||
@@ -808,6 +820,7 @@ func (c *SoraDirectClient) PostVideoForWatermarkFree(ctx context.Context, accoun
|
|||||||
}
|
}
|
||||||
userAgent := c.taskUserAgent()
|
userAgent := c.taskUserAgent()
|
||||||
proxyURL := c.resolveProxyURL(account)
|
proxyURL := c.resolveProxyURL(account)
|
||||||
|
ctx = c.withRequestTrace(ctx, account, proxyURL, userAgent)
|
||||||
payload := map[string]any{
|
payload := map[string]any{
|
||||||
"attachments_to_create": []map[string]any{
|
"attachments_to_create": []map[string]any{
|
||||||
{
|
{
|
||||||
@@ -1471,6 +1484,7 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
if cooldownErr := c.checkCloudflareChallengeCooldown(account, proxyURL); cooldownErr != nil {
|
if cooldownErr := c.checkCloudflareChallengeCooldown(account, proxyURL); cooldownErr != nil {
|
||||||
return nil, nil, cooldownErr
|
return nil, nil, cooldownErr
|
||||||
}
|
}
|
||||||
|
traceID, traceProxyKey, traceUAHash := c.requestTraceFields(ctx, proxyURL, headers.Get("User-Agent"))
|
||||||
timeout := 0
|
timeout := 0
|
||||||
if c != nil && c.cfg != nil {
|
if c != nil && c.cfg != nil {
|
||||||
timeout = c.cfg.Sora.Client.TimeoutSeconds
|
timeout = c.cfg.Sora.Client.TimeoutSeconds
|
||||||
@@ -1500,11 +1514,14 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
attempts := maxRetries + 1
|
attempts := maxRetries + 1
|
||||||
authRecovered := false
|
authRecovered := false
|
||||||
authRecoverExtraAttemptGranted := false
|
authRecoverExtraAttemptGranted := false
|
||||||
|
challengeRetried := false
|
||||||
|
sawCFChallenge := false
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for attempt := 1; attempt <= attempts; attempt++ {
|
for attempt := 1; attempt <= attempts; attempt++ {
|
||||||
if c.debugEnabled() {
|
if c.debugEnabled() {
|
||||||
c.debugLogf(
|
c.debugLogf(
|
||||||
"request_start method=%s url=%s attempt=%d/%d timeout_s=%d body_bytes=%d proxy_bound=%t headers=%s",
|
"request_start trace_id=%s method=%s url=%s attempt=%d/%d timeout_s=%d body_bytes=%d proxy_bound=%t proxy_key=%s ua_hash=%s headers=%s",
|
||||||
|
traceID,
|
||||||
method,
|
method,
|
||||||
sanitizeSoraLogURL(urlStr),
|
sanitizeSoraLogURL(urlStr),
|
||||||
attempt,
|
attempt,
|
||||||
@@ -1512,6 +1529,8 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
timeout,
|
timeout,
|
||||||
len(bodyBytes),
|
len(bodyBytes),
|
||||||
proxyURL != "",
|
proxyURL != "",
|
||||||
|
traceProxyKey,
|
||||||
|
traceUAHash,
|
||||||
formatSoraHeaders(headers),
|
formatSoraHeaders(headers),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
@@ -1532,7 +1551,8 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
lastErr = err
|
lastErr = err
|
||||||
if c.debugEnabled() {
|
if c.debugEnabled() {
|
||||||
c.debugLogf(
|
c.debugLogf(
|
||||||
"request_transport_error method=%s url=%s attempt=%d/%d err=%s",
|
"request_transport_error trace_id=%s method=%s url=%s attempt=%d/%d err=%s",
|
||||||
|
traceID,
|
||||||
method,
|
method,
|
||||||
sanitizeSoraLogURL(urlStr),
|
sanitizeSoraLogURL(urlStr),
|
||||||
attempt,
|
attempt,
|
||||||
@@ -1542,7 +1562,7 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
}
|
}
|
||||||
if attempt < attempts && allowRetry {
|
if attempt < attempts && allowRetry {
|
||||||
if c.debugEnabled() {
|
if c.debugEnabled() {
|
||||||
c.debugLogf("request_retry_scheduled method=%s url=%s reason=transport_error next_attempt=%d/%d", method, sanitizeSoraLogURL(urlStr), attempt+1, attempts)
|
c.debugLogf("request_retry_scheduled trace_id=%s method=%s url=%s reason=transport_error next_attempt=%d/%d", traceID, method, sanitizeSoraLogURL(urlStr), attempt+1, attempts)
|
||||||
}
|
}
|
||||||
c.sleepRetry(attempt)
|
c.sleepRetry(attempt)
|
||||||
continue
|
continue
|
||||||
@@ -1558,7 +1578,8 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
|
|
||||||
if c.cfg != nil && c.cfg.Sora.Client.Debug {
|
if c.cfg != nil && c.cfg.Sora.Client.Debug {
|
||||||
c.debugLogf(
|
c.debugLogf(
|
||||||
"response_received method=%s url=%s attempt=%d/%d status=%d cost=%s resp_bytes=%d resp_headers=%s",
|
"response_received trace_id=%s method=%s url=%s attempt=%d/%d status=%d cost=%s resp_bytes=%d resp_headers=%s",
|
||||||
|
traceID,
|
||||||
method,
|
method,
|
||||||
sanitizeSoraLogURL(urlStr),
|
sanitizeSoraLogURL(urlStr),
|
||||||
attempt,
|
attempt,
|
||||||
@@ -1573,7 +1594,16 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||||
isCFChallenge := soraerror.IsCloudflareChallengeResponse(resp.StatusCode, resp.Header, respBody)
|
isCFChallenge := soraerror.IsCloudflareChallengeResponse(resp.StatusCode, resp.Header, respBody)
|
||||||
if isCFChallenge {
|
if isCFChallenge {
|
||||||
|
sawCFChallenge = true
|
||||||
c.recordCloudflareChallengeCooldown(account, proxyURL, resp.StatusCode, resp.Header, respBody)
|
c.recordCloudflareChallengeCooldown(account, proxyURL, resp.StatusCode, resp.Header, respBody)
|
||||||
|
if allowRetry && attempt < attempts && !challengeRetried {
|
||||||
|
challengeRetried = true
|
||||||
|
if c.debugEnabled() {
|
||||||
|
c.debugLogf("request_retry_scheduled trace_id=%s method=%s url=%s reason=cloudflare_challenge status=%d next_attempt=%d/%d", traceID, method, sanitizeSoraLogURL(urlStr), resp.StatusCode, attempt+1, attempts)
|
||||||
|
}
|
||||||
|
c.sleepRetry(attempt)
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if !isCFChallenge && !authRecovered && shouldAttemptSoraTokenRecover(resp.StatusCode, urlStr) && account != nil {
|
if !isCFChallenge && !authRecovered && shouldAttemptSoraTokenRecover(resp.StatusCode, urlStr) && account != nil {
|
||||||
if recovered, recoverErr := c.recoverAccessToken(ctx, account, fmt.Sprintf("upstream_status_%d", resp.StatusCode)); recoverErr == nil && strings.TrimSpace(recovered) != "" {
|
if recovered, recoverErr := c.recoverAccessToken(ctx, account, fmt.Sprintf("upstream_status_%d", resp.StatusCode)); recoverErr == nil && strings.TrimSpace(recovered) != "" {
|
||||||
@@ -1584,16 +1614,17 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
authRecoverExtraAttemptGranted = true
|
authRecoverExtraAttemptGranted = true
|
||||||
}
|
}
|
||||||
if c.debugEnabled() {
|
if c.debugEnabled() {
|
||||||
c.debugLogf("request_retry_with_recovered_token method=%s url=%s status=%d", method, sanitizeSoraLogURL(urlStr), resp.StatusCode)
|
c.debugLogf("request_retry_with_recovered_token trace_id=%s method=%s url=%s status=%d", traceID, method, sanitizeSoraLogURL(urlStr), resp.StatusCode)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
} else if recoverErr != nil && c.debugEnabled() {
|
} else if recoverErr != nil && c.debugEnabled() {
|
||||||
c.debugLogf("request_recover_token_failed method=%s url=%s status=%d err=%s", method, sanitizeSoraLogURL(urlStr), resp.StatusCode, logredact.RedactText(recoverErr.Error()))
|
c.debugLogf("request_recover_token_failed trace_id=%s method=%s url=%s status=%d err=%s", traceID, method, sanitizeSoraLogURL(urlStr), resp.StatusCode, logredact.RedactText(recoverErr.Error()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if c.debugEnabled() {
|
if c.debugEnabled() {
|
||||||
c.debugLogf(
|
c.debugLogf(
|
||||||
"response_non_success method=%s url=%s attempt=%d/%d status=%d body=%s",
|
"response_non_success trace_id=%s method=%s url=%s attempt=%d/%d status=%d body=%s",
|
||||||
|
traceID,
|
||||||
method,
|
method,
|
||||||
sanitizeSoraLogURL(urlStr),
|
sanitizeSoraLogURL(urlStr),
|
||||||
attempt,
|
attempt,
|
||||||
@@ -1609,13 +1640,16 @@ func (c *SoraDirectClient) doRequestWithProxy(
|
|||||||
}
|
}
|
||||||
if allowRetry && attempt < attempts && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500) {
|
if allowRetry && attempt < attempts && (resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500) {
|
||||||
if c.debugEnabled() {
|
if c.debugEnabled() {
|
||||||
c.debugLogf("request_retry_scheduled method=%s url=%s reason=status_%d next_attempt=%d/%d", method, sanitizeSoraLogURL(urlStr), resp.StatusCode, attempt+1, attempts)
|
c.debugLogf("request_retry_scheduled trace_id=%s method=%s url=%s reason=status_%d next_attempt=%d/%d", traceID, method, sanitizeSoraLogURL(urlStr), resp.StatusCode, attempt+1, attempts)
|
||||||
}
|
}
|
||||||
c.sleepRetry(attempt)
|
c.sleepRetry(attempt)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
return nil, resp.Header, upstreamErr
|
return nil, resp.Header, upstreamErr
|
||||||
}
|
}
|
||||||
|
if sawCFChallenge {
|
||||||
|
c.clearCloudflareChallengeCooldown(account, proxyURL)
|
||||||
|
}
|
||||||
return respBody, resp.Header, nil
|
return respBody, resp.Header, nil
|
||||||
}
|
}
|
||||||
if lastErr != nil {
|
if lastErr != nil {
|
||||||
@@ -1960,6 +1994,55 @@ func hexDecodeString(s string) ([]byte, error) {
|
|||||||
return dst, err
|
return dst, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *SoraDirectClient) withRequestTrace(ctx context.Context, account *Account, proxyURL, userAgent string) context.Context {
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
if existing, ok := ctx.Value(soraRequestTraceContextKey{}).(*soraRequestTrace); ok && existing != nil && existing.ID != "" {
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
accountID := int64(0)
|
||||||
|
if account != nil {
|
||||||
|
accountID = account.ID
|
||||||
|
}
|
||||||
|
seed := fmt.Sprintf("%d|%s|%s|%d", accountID, normalizeSoraProxyKey(proxyURL), strings.TrimSpace(userAgent), time.Now().UnixNano())
|
||||||
|
trace := &soraRequestTrace{
|
||||||
|
ID: "sora-" + soraHashForLog(seed),
|
||||||
|
ProxyKey: normalizeSoraProxyKey(proxyURL),
|
||||||
|
UAHash: soraHashForLog(strings.TrimSpace(userAgent)),
|
||||||
|
}
|
||||||
|
return context.WithValue(ctx, soraRequestTraceContextKey{}, trace)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SoraDirectClient) requestTraceFields(ctx context.Context, proxyURL, userAgent string) (string, string, string) {
|
||||||
|
proxyKey := normalizeSoraProxyKey(proxyURL)
|
||||||
|
uaHash := soraHashForLog(strings.TrimSpace(userAgent))
|
||||||
|
traceID := ""
|
||||||
|
if ctx != nil {
|
||||||
|
if trace, ok := ctx.Value(soraRequestTraceContextKey{}).(*soraRequestTrace); ok && trace != nil {
|
||||||
|
if strings.TrimSpace(trace.ID) != "" {
|
||||||
|
traceID = strings.TrimSpace(trace.ID)
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(trace.ProxyKey) != "" {
|
||||||
|
proxyKey = strings.TrimSpace(trace.ProxyKey)
|
||||||
|
}
|
||||||
|
if strings.TrimSpace(trace.UAHash) != "" {
|
||||||
|
uaHash = strings.TrimSpace(trace.UAHash)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if traceID == "" {
|
||||||
|
traceID = "sora-" + soraHashForLog(fmt.Sprintf("%s|%d", proxyKey, time.Now().UnixNano()))
|
||||||
|
}
|
||||||
|
return traceID, proxyKey, uaHash
|
||||||
|
}
|
||||||
|
|
||||||
|
func soraHashForLog(raw string) string {
|
||||||
|
h := fnv.New32a()
|
||||||
|
_, _ = h.Write([]byte(raw))
|
||||||
|
return fmt.Sprintf("%08x", h.Sum32())
|
||||||
|
}
|
||||||
|
|
||||||
func sanitizeSoraLogURL(raw string) string {
|
func sanitizeSoraLogURL(raw string) string {
|
||||||
parsed, err := url.Parse(raw)
|
parsed, err := url.Parse(raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -833,7 +833,7 @@ func TestSoraDirectClient_DoHTTP_SidecarSessionKeyStableForSameAccountProxy(t *t
|
|||||||
require.Equal(t, captured[0].SessionKey, captured[1].SessionKey)
|
require.Equal(t, captured[0].SessionKey, captured[1].SessionKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSoraDirectClient_DoRequestWithProxy_CloudflareChallengeSetsCooldownAndSkipsRetry(t *testing.T) {
|
func TestSoraDirectClient_DoRequestWithProxy_CloudflareChallengeSetsCooldownAfterSingleRetry(t *testing.T) {
|
||||||
var sidecarCalls int32
|
var sidecarCalls int32
|
||||||
sidecar := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
sidecar := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
atomic.AddInt32(&sidecarCalls, 1)
|
atomic.AddInt32(&sidecarCalls, 1)
|
||||||
@@ -879,7 +879,7 @@ func TestSoraDirectClient_DoRequestWithProxy_CloudflareChallengeSetsCooldownAndS
|
|||||||
var upstreamErr *SoraUpstreamError
|
var upstreamErr *SoraUpstreamError
|
||||||
require.ErrorAs(t, err, &upstreamErr)
|
require.ErrorAs(t, err, &upstreamErr)
|
||||||
require.Equal(t, http.StatusForbidden, upstreamErr.StatusCode)
|
require.Equal(t, http.StatusForbidden, upstreamErr.StatusCode)
|
||||||
require.Equal(t, int32(1), atomic.LoadInt32(&sidecarCalls), "challenge should not trigger retry loop")
|
require.Equal(t, int32(2), atomic.LoadInt32(&sidecarCalls), "challenge should trigger exactly one same-proxy retry")
|
||||||
|
|
||||||
_, _, err = client.doRequestWithProxy(
|
_, _, err = client.doRequestWithProxy(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
@@ -896,7 +896,112 @@ func TestSoraDirectClient_DoRequestWithProxy_CloudflareChallengeSetsCooldownAndS
|
|||||||
require.Equal(t, http.StatusTooManyRequests, upstreamErr.StatusCode)
|
require.Equal(t, http.StatusTooManyRequests, upstreamErr.StatusCode)
|
||||||
require.Contains(t, upstreamErr.Message, "cooling down")
|
require.Contains(t, upstreamErr.Message, "cooling down")
|
||||||
require.Contains(t, upstreamErr.Message, "cf-ray")
|
require.Contains(t, upstreamErr.Message, "cf-ray")
|
||||||
require.Equal(t, int32(1), atomic.LoadInt32(&sidecarCalls), "cooldown should block outbound request")
|
require.Equal(t, int32(2), atomic.LoadInt32(&sidecarCalls), "cooldown should block outbound request")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSoraDirectClient_DoRequestWithProxy_CloudflareRetrySuccessClearsCooldown(t *testing.T) {
|
||||||
|
var sidecarCalls int32
|
||||||
|
sidecar := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
call := atomic.AddInt32(&sidecarCalls, 1)
|
||||||
|
if call == 1 {
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"status_code": http.StatusForbidden,
|
||||||
|
"headers": map[string]any{
|
||||||
|
"cf-ray": "9d05d73dec4d8c8e-GRU",
|
||||||
|
"content-type": "text/html",
|
||||||
|
},
|
||||||
|
"body": `<!DOCTYPE html><html><head><title>Just a moment...</title></head><body><script>window._cf_chl_opt={};</script></body></html>`,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||||
|
"status_code": http.StatusOK,
|
||||||
|
"headers": map[string]any{
|
||||||
|
"content-type": "application/json",
|
||||||
|
},
|
||||||
|
"body": `{"ok":true}`,
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
defer sidecar.Close()
|
||||||
|
|
||||||
|
cfg := &config.Config{
|
||||||
|
Sora: config.SoraConfig{
|
||||||
|
Client: config.SoraClientConfig{
|
||||||
|
BaseURL: "https://sora.chatgpt.com/backend",
|
||||||
|
MaxRetries: 3,
|
||||||
|
CloudflareChallengeCooldownSeconds: 60,
|
||||||
|
CurlCFFISidecar: config.SoraCurlCFFISidecarConfig{
|
||||||
|
Enabled: true,
|
||||||
|
BaseURL: sidecar.URL,
|
||||||
|
Impersonate: "chrome131",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
client := NewSoraDirectClient(cfg, nil, nil)
|
||||||
|
headers := http.Header{}
|
||||||
|
account := &Account{ID: 109}
|
||||||
|
proxyURL := "http://127.0.0.1:18080"
|
||||||
|
|
||||||
|
body, _, err := client.doRequestWithProxy(
|
||||||
|
context.Background(),
|
||||||
|
account,
|
||||||
|
proxyURL,
|
||||||
|
http.MethodGet,
|
||||||
|
"https://sora.chatgpt.com/backend/me",
|
||||||
|
headers,
|
||||||
|
nil,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Contains(t, string(body), `"ok":true`)
|
||||||
|
require.Equal(t, int32(2), atomic.LoadInt32(&sidecarCalls))
|
||||||
|
|
||||||
|
_, _, err = client.doRequestWithProxy(
|
||||||
|
context.Background(),
|
||||||
|
account,
|
||||||
|
proxyURL,
|
||||||
|
http.MethodGet,
|
||||||
|
"https://sora.chatgpt.com/backend/me",
|
||||||
|
headers,
|
||||||
|
nil,
|
||||||
|
true,
|
||||||
|
)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int32(3), atomic.LoadInt32(&sidecarCalls), "cooldown should be cleared after retry succeeds")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSoraComputeChallengeCooldownSeconds(t *testing.T) {
|
||||||
|
require.Equal(t, 0, soraComputeChallengeCooldownSeconds(0, 3))
|
||||||
|
require.Equal(t, 10, soraComputeChallengeCooldownSeconds(10, 1))
|
||||||
|
require.Equal(t, 20, soraComputeChallengeCooldownSeconds(10, 2))
|
||||||
|
require.Equal(t, 40, soraComputeChallengeCooldownSeconds(10, 4))
|
||||||
|
require.Equal(t, 40, soraComputeChallengeCooldownSeconds(10, 9), "streak should cap at x4")
|
||||||
|
require.Equal(t, 3600, soraComputeChallengeCooldownSeconds(1200, 9), "cooldown should cap at 3600s")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSoraDirectClient_RecordCloudflareChallengeCooldown_EscalatesStreak(t *testing.T) {
|
||||||
|
cfg := &config.Config{
|
||||||
|
Sora: config.SoraConfig{
|
||||||
|
Client: config.SoraClientConfig{
|
||||||
|
CloudflareChallengeCooldownSeconds: 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
client := NewSoraDirectClient(cfg, nil, nil)
|
||||||
|
account := &Account{ID: 201}
|
||||||
|
proxyURL := "http://127.0.0.1:18080"
|
||||||
|
|
||||||
|
client.recordCloudflareChallengeCooldown(account, proxyURL, http.StatusForbidden, http.Header{"Cf-Ray": []string{"9d05d73dec4d8c8e-GRU"}}, nil)
|
||||||
|
client.recordCloudflareChallengeCooldown(account, proxyURL, http.StatusForbidden, http.Header{"Cf-Ray": []string{"9d05d73dec4d8c8f-GRU"}}, nil)
|
||||||
|
|
||||||
|
key := soraAccountProxyKey(account, proxyURL)
|
||||||
|
entry, ok := client.challengeCooldowns[key]
|
||||||
|
require.True(t, ok)
|
||||||
|
require.Equal(t, 2, entry.ConsecutiveChallenges)
|
||||||
|
require.Equal(t, "9d05d73dec4d8c8f-GRU", entry.CFRay)
|
||||||
|
remain := int(entry.Until.Sub(entry.LastChallengeAt).Seconds())
|
||||||
|
require.GreaterOrEqual(t, remain, 19)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSoraDirectClient_SidecarSessionKey_SkipsWhenAccountMissing(t *testing.T) {
|
func TestSoraDirectClient_SidecarSessionKey_SkipsWhenAccountMissing(t *testing.T) {
|
||||||
|
|||||||
@@ -16,6 +16,8 @@ type soraChallengeCooldownEntry struct {
|
|||||||
Until time.Time
|
Until time.Time
|
||||||
StatusCode int
|
StatusCode int
|
||||||
CFRay string
|
CFRay string
|
||||||
|
ConsecutiveChallenges int
|
||||||
|
LastChallengeAt time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type soraSidecarSessionEntry struct {
|
type soraSidecarSessionEntry struct {
|
||||||
@@ -67,6 +69,9 @@ func (c *SoraDirectClient) checkCloudflareChallengeCooldown(account *Account, pr
|
|||||||
remaining = 1
|
remaining = 1
|
||||||
}
|
}
|
||||||
message := fmt.Sprintf("Sora request cooling down due to recent Cloudflare challenge. Retry in %d seconds.", remaining)
|
message := fmt.Sprintf("Sora request cooling down due to recent Cloudflare challenge. Retry in %d seconds.", remaining)
|
||||||
|
if entry.ConsecutiveChallenges > 1 {
|
||||||
|
message = fmt.Sprintf("%s (streak=%d)", message, entry.ConsecutiveChallenges)
|
||||||
|
}
|
||||||
if entry.CFRay != "" {
|
if entry.CFRay != "" {
|
||||||
message = fmt.Sprintf("%s (last cf-ray: %s)", message, entry.CFRay)
|
message = fmt.Sprintf("%s (last cf-ray: %s)", message, entry.CFRay)
|
||||||
}
|
}
|
||||||
@@ -90,14 +95,23 @@ func (c *SoraDirectClient) recordCloudflareChallengeCooldown(account *Account, p
|
|||||||
}
|
}
|
||||||
key := soraAccountProxyKey(account, proxyURL)
|
key := soraAccountProxyKey(account, proxyURL)
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
until := now.Add(time.Duration(cooldownSeconds) * time.Second)
|
|
||||||
cfRay := soraerror.ExtractCloudflareRayID(headers, body)
|
cfRay := soraerror.ExtractCloudflareRayID(headers, body)
|
||||||
|
|
||||||
c.challengeCooldownMu.Lock()
|
c.challengeCooldownMu.Lock()
|
||||||
c.cleanupExpiredChallengeCooldownsLocked(now)
|
c.cleanupExpiredChallengeCooldownsLocked(now)
|
||||||
|
|
||||||
|
streak := 1
|
||||||
existing, ok := c.challengeCooldowns[key]
|
existing, ok := c.challengeCooldowns[key]
|
||||||
|
if ok && now.Sub(existing.LastChallengeAt) <= 30*time.Minute {
|
||||||
|
streak = existing.ConsecutiveChallenges + 1
|
||||||
|
}
|
||||||
|
effectiveCooldown := soraComputeChallengeCooldownSeconds(cooldownSeconds, streak)
|
||||||
|
until := now.Add(time.Duration(effectiveCooldown) * time.Second)
|
||||||
if ok && existing.Until.After(until) {
|
if ok && existing.Until.After(until) {
|
||||||
until = existing.Until
|
until = existing.Until
|
||||||
|
if existing.ConsecutiveChallenges > streak {
|
||||||
|
streak = existing.ConsecutiveChallenges
|
||||||
|
}
|
||||||
if cfRay == "" {
|
if cfRay == "" {
|
||||||
cfRay = existing.CFRay
|
cfRay = existing.CFRay
|
||||||
}
|
}
|
||||||
@@ -106,6 +120,8 @@ func (c *SoraDirectClient) recordCloudflareChallengeCooldown(account *Account, p
|
|||||||
Until: until,
|
Until: until,
|
||||||
StatusCode: statusCode,
|
StatusCode: statusCode,
|
||||||
CFRay: cfRay,
|
CFRay: cfRay,
|
||||||
|
ConsecutiveChallenges: streak,
|
||||||
|
LastChallengeAt: now,
|
||||||
}
|
}
|
||||||
c.challengeCooldownMu.Unlock()
|
c.challengeCooldownMu.Unlock()
|
||||||
|
|
||||||
@@ -114,7 +130,44 @@ func (c *SoraDirectClient) recordCloudflareChallengeCooldown(account *Account, p
|
|||||||
if remain < 0 {
|
if remain < 0 {
|
||||||
remain = 0
|
remain = 0
|
||||||
}
|
}
|
||||||
c.debugLogf("cloudflare_challenge_cooldown_set key=%s status=%d remain_s=%d cf_ray=%s", key, statusCode, remain, cfRay)
|
c.debugLogf("cloudflare_challenge_cooldown_set key=%s status=%d remain_s=%d streak=%d cf_ray=%s", key, statusCode, remain, streak, cfRay)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func soraComputeChallengeCooldownSeconds(baseSeconds, streak int) int {
|
||||||
|
if baseSeconds <= 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
if streak < 1 {
|
||||||
|
streak = 1
|
||||||
|
}
|
||||||
|
multiplier := streak
|
||||||
|
if multiplier > 4 {
|
||||||
|
multiplier = 4
|
||||||
|
}
|
||||||
|
cooldown := baseSeconds * multiplier
|
||||||
|
if cooldown > 3600 {
|
||||||
|
cooldown = 3600
|
||||||
|
}
|
||||||
|
return cooldown
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SoraDirectClient) clearCloudflareChallengeCooldown(account *Account, proxyURL string) {
|
||||||
|
if c == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if account == nil || account.ID <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
key := soraAccountProxyKey(account, proxyURL)
|
||||||
|
c.challengeCooldownMu.Lock()
|
||||||
|
_, existed := c.challengeCooldowns[key]
|
||||||
|
if existed {
|
||||||
|
delete(c.challengeCooldowns, key)
|
||||||
|
}
|
||||||
|
c.challengeCooldownMu.Unlock()
|
||||||
|
if existed && c.debugEnabled() {
|
||||||
|
c.debugLogf("cloudflare_challenge_cooldown_cleared key=%s", key)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user