fix(antigravity): 统一转发日志格式,添加 session_id 追踪

This commit is contained in:
song
2026-01-03 19:27:46 +08:00
parent c8e55ab2ac
commit 0dc4b113d8

View File

@@ -25,6 +25,22 @@ const (
antigravityRetryMaxDelay = 16 * time.Second antigravityRetryMaxDelay = 16 * time.Second
) )
// getSessionID 从 gin.Context 获取 session_id用于日志追踪
func getSessionID(c *gin.Context) string {
if c == nil {
return ""
}
return c.GetHeader("session_id")
}
// logPrefix 生成统一的日志前缀
func logPrefix(sessionID, accountName string) string {
if sessionID != "" {
return fmt.Sprintf("[antigravity-Forward] session=%s account=%s", sessionID, accountName)
}
return fmt.Sprintf("[antigravity-Forward] account=%s", accountName)
}
// Antigravity 直接支持的模型(精确匹配透传) // Antigravity 直接支持的模型(精确匹配透传)
var antigravitySupportedModels = map[string]bool{ var antigravitySupportedModels = map[string]bool{
"claude-opus-4-5-thinking": true, "claude-opus-4-5-thinking": true,
@@ -310,6 +326,8 @@ func (s *AntigravityGatewayService) unwrapV1InternalResponse(body []byte) ([]byt
// Forward 转发 Claude 协议请求Claude → Gemini 转换) // Forward 转发 Claude 协议请求Claude → Gemini 转换)
func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte) (*ForwardResult, error) { func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte) (*ForwardResult, error) {
startTime := time.Now() startTime := time.Now()
sessionID := getSessionID(c)
prefix := logPrefix(sessionID, account.Name)
// 解析 Claude 请求 // 解析 Claude 请求
var claudeReq antigravity.ClaudeRequest var claudeReq antigravity.ClaudeRequest
@@ -364,10 +382,11 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil { if err != nil {
if attempt < antigravityMaxRetries { if attempt < antigravityMaxRetries {
log.Printf("Antigravity account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, antigravityMaxRetries, err) log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
sleepAntigravityBackoff(attempt) sleepAntigravityBackoff(attempt)
continue continue
} }
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries") return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
} }
@@ -376,13 +395,13 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
_ = resp.Body.Close() _ = resp.Body.Close()
if attempt < antigravityMaxRetries { if attempt < antigravityMaxRetries {
log.Printf("Antigravity account %d: upstream status %d, retry %d/%d", account.ID, resp.StatusCode, attempt, antigravityMaxRetries) log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
sleepAntigravityBackoff(attempt) sleepAntigravityBackoff(attempt)
continue continue
} }
// 所有重试都失败,标记限流状态 // 所有重试都失败,标记限流状态
if resp.StatusCode == 429 { if resp.StatusCode == 429 {
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
} }
// 最后一次尝试也失败 // 最后一次尝试也失败
resp = &http.Response{ resp = &http.Response{
@@ -400,7 +419,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
// 处理错误响应 // 处理错误响应
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
if s.shouldFailoverUpstreamError(resp.StatusCode) { if s.shouldFailoverUpstreamError(resp.StatusCode) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
@@ -443,6 +462,8 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
// ForwardGemini 转发 Gemini 协议请求 // ForwardGemini 转发 Gemini 协议请求
func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Context, account *Account, originalModel string, action string, stream bool, body []byte) (*ForwardResult, error) { func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Context, account *Account, originalModel string, action string, stream bool, body []byte) (*ForwardResult, error) {
startTime := time.Now() startTime := time.Now()
sessionID := getSessionID(c)
prefix := logPrefix(sessionID, account.Name)
if strings.TrimSpace(originalModel) == "" { if strings.TrimSpace(originalModel) == "" {
return nil, s.writeGoogleError(c, http.StatusBadRequest, "Missing model in URL") return nil, s.writeGoogleError(c, http.StatusBadRequest, "Missing model in URL")
@@ -518,10 +539,11 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) resp, err = s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency)
if err != nil { if err != nil {
if attempt < antigravityMaxRetries { if attempt < antigravityMaxRetries {
log.Printf("Antigravity account %d: upstream request failed, retry %d/%d: %v", account.ID, attempt, antigravityMaxRetries, err) log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
sleepAntigravityBackoff(attempt) sleepAntigravityBackoff(attempt)
continue continue
} }
log.Printf("%s status=request_failed retries_exhausted error=%v", prefix, err)
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries") return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
} }
@@ -530,13 +552,13 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
_ = resp.Body.Close() _ = resp.Body.Close()
if attempt < antigravityMaxRetries { if attempt < antigravityMaxRetries {
log.Printf("Antigravity account %d: upstream status %d, retry %d/%d", account.ID, resp.StatusCode, attempt, antigravityMaxRetries) log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
sleepAntigravityBackoff(attempt) sleepAntigravityBackoff(attempt)
continue continue
} }
// 所有重试都失败,标记限流状态 // 所有重试都失败,标记限流状态
if resp.StatusCode == 429 { if resp.StatusCode == 429 {
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
} }
resp = &http.Response{ resp = &http.Response{
StatusCode: resp.StatusCode, StatusCode: resp.StatusCode,
@@ -558,7 +580,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
// 处理错误响应 // 处理错误响应
if resp.StatusCode >= 400 { if resp.StatusCode >= 400 {
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20)) respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
s.handleUpstreamError(ctx, account, resp.StatusCode, resp.Header, respBody) s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody)
if s.shouldFailoverUpstreamError(resp.StatusCode) { if s.shouldFailoverUpstreamError(resp.StatusCode) {
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode} return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode}
@@ -628,7 +650,7 @@ func sleepAntigravityBackoff(attempt int) {
sleepGeminiBackoff(attempt) // 复用 Gemini 的退避逻辑 sleepGeminiBackoff(attempt) // 复用 Gemini 的退避逻辑
} }
func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, account *Account, statusCode int, headers http.Header, body []byte) { func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, prefix string, account *Account, statusCode int, headers http.Header, body []byte) {
// 429 使用 Gemini 格式解析(从 body 解析重置时间) // 429 使用 Gemini 格式解析(从 body 解析重置时间)
if statusCode == 429 { if statusCode == 429 {
resetAt := ParseGeminiRateLimitResetTime(body) resetAt := ParseGeminiRateLimitResetTime(body)
@@ -639,17 +661,23 @@ func (s *AntigravityGatewayService) handleUpstreamError(ctx context.Context, acc
defaultDur = 5 * time.Minute defaultDur = 5 * time.Minute
} }
ra := time.Now().Add(defaultDur) ra := time.Now().Add(defaultDur)
log.Printf("%s status=429 rate_limited reset_in=%v (fallback)", prefix, defaultDur)
_ = s.accountRepo.SetRateLimited(ctx, account.ID, ra) _ = s.accountRepo.SetRateLimited(ctx, account.ID, ra)
return return
} }
_ = s.accountRepo.SetRateLimited(ctx, account.ID, time.Unix(*resetAt, 0)) resetTime := time.Unix(*resetAt, 0)
log.Printf("%s status=429 rate_limited reset_at=%v reset_in=%v", prefix, resetTime.Format("15:04:05"), time.Until(resetTime).Truncate(time.Second))
_ = s.accountRepo.SetRateLimited(ctx, account.ID, resetTime)
return return
} }
// 其他错误码继续使用 rateLimitService // 其他错误码继续使用 rateLimitService
if s.rateLimitService == nil { if s.rateLimitService == nil {
return return
} }
s.rateLimitService.HandleUpstreamError(ctx, account, statusCode, headers, body) shouldDisable := s.rateLimitService.HandleUpstreamError(ctx, account, statusCode, headers, body)
if shouldDisable {
log.Printf("%s status=%d marked_error", prefix, statusCode)
}
} }
type antigravityStreamResult struct { type antigravityStreamResult struct {
@@ -758,7 +786,7 @@ func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int,
func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, upstreamStatus int, body []byte) error { func (s *AntigravityGatewayService) writeMappedClaudeError(c *gin.Context, upstreamStatus int, body []byte) error {
// 记录上游错误详情便于调试 // 记录上游错误详情便于调试
log.Printf("Antigravity upstream error %d: %s", upstreamStatus, string(body)) log.Printf("[antigravity-Forward] upstream_error status=%d body=%s", upstreamStatus, string(body))
var statusCode int var statusCode int
var errType, errMsg string var errType, errMsg string
@@ -832,7 +860,7 @@ func (s *AntigravityGatewayService) handleClaudeNonStreamingResponse(c *gin.Cont
// 转换 Gemini 响应为 Claude 格式 // 转换 Gemini 响应为 Claude 格式
claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(body, originalModel) claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(body, originalModel)
if err != nil { if err != nil {
log.Printf("Transform Gemini to Claude failed: %v, body: %s", err, string(body)) log.Printf("[antigravity-Forward] transform_error error=%v body=%s", err, string(body))
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response") return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response")
} }