diff --git a/backend/internal/pkg/antigravity/request_transformer.go b/backend/internal/pkg/antigravity/request_transformer.go index 4ccb7d12..a8474576 100644 --- a/backend/internal/pkg/antigravity/request_transformer.go +++ b/backend/internal/pkg/antigravity/request_transformer.go @@ -149,6 +149,11 @@ func defaultIdentityPatch(_ string) string { return antigravityIdentity } +// GetDefaultIdentityPatch 返回默认的 Antigravity 身份提示词 +func GetDefaultIdentityPatch() string { + return antigravityIdentity +} + // buildSystemInstruction 构建 systemInstruction func buildSystemInstruction(system json.RawMessage, modelName string, opts TransformOptions) *GeminiContent { var parts []GeminiPart diff --git a/backend/internal/service/antigravity_gateway_service.go b/backend/internal/service/antigravity_gateway_service.go index 48b836f4..2fe77b2d 100644 --- a/backend/internal/service/antigravity_gateway_service.go +++ b/backend/internal/service/antigravity_gateway_service.go @@ -187,10 +187,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account return nil, err } - // DEBUG: 打印请求 header 和 body - log.Printf("[DEBUG] Antigravity TestConnection - URL: %s", req.URL.String()) - log.Printf("[DEBUG] Antigravity TestConnection - Headers: %v", req.Header) - log.Printf("[DEBUG] Antigravity TestConnection - Body: %s", string(requestBody)) + // 调试日志:Test 请求信息 + log.Printf("[antigravity-Test] account=%s request_size=%d url=%s", account.Name, len(requestBody), req.URL.String()) // 代理 URL proxyURL := "" @@ -235,6 +233,12 @@ func (s *AntigravityGatewayService) buildGeminiTestRequest(projectID, model stri }, }, }, + // Antigravity 上游要求必须包含身份提示词 + "systemInstruction": map[string]any{ + "parts": []map[string]any{ + {"text": antigravity.GetDefaultIdentityPatch()}, + }, + }, } payloadBytes, _ := json.Marshal(payload) return s.wrapV1InternalRequest(projectID, model, payloadBytes) @@ -333,6 +337,53 @@ func extractTextFromSSEResponse(respBody []byte) string { return strings.Join(texts, "") } +// injectIdentityPatchToGeminiRequest 为 Gemini 格式请求注入身份提示词 +// 如果请求中已包含 "You are Antigravity" 则不重复注入 +func injectIdentityPatchToGeminiRequest(body []byte) ([]byte, error) { + var request map[string]any + if err := json.Unmarshal(body, &request); err != nil { + return nil, fmt.Errorf("解析 Gemini 请求失败: %w", err) + } + + // 检查现有 systemInstruction 是否已包含身份提示词 + if sysInst, ok := request["systemInstruction"].(map[string]any); ok { + if parts, ok := sysInst["parts"].([]any); ok { + for _, part := range parts { + if partMap, ok := part.(map[string]any); ok { + if text, ok := partMap["text"].(string); ok { + if strings.Contains(text, "You are Antigravity") { + // 已包含身份提示词,直接返回原始请求 + return body, nil + } + } + } + } + } + } + + // 获取默认身份提示词 + identityPatch := antigravity.GetDefaultIdentityPatch() + + // 构建新的 systemInstruction + newPart := map[string]any{"text": identityPatch} + + if existing, ok := request["systemInstruction"].(map[string]any); ok { + // 已有 systemInstruction,在开头插入身份提示词 + if parts, ok := existing["parts"].([]any); ok { + existing["parts"] = append([]any{newPart}, parts...) + } else { + existing["parts"] = []any{newPart} + } + } else { + // 没有 systemInstruction,创建新的 + request["systemInstruction"] = map[string]any{ + "parts": []any{newPart}, + } + } + + return json.Marshal(request) +} + // wrapV1InternalRequest 包装请求为 v1internal 格式 func (s *AntigravityGatewayService) wrapV1InternalRequest(projectID, model string, originalBody []byte) ([]byte, error) { var request any @@ -418,17 +469,20 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, proxyURL = account.Proxy.URL() } + // 获取转换选项 + // Antigravity 上游要求必须包含身份提示词,否则会返回 429 + transformOpts := s.getClaudeTransformOptions(ctx) + transformOpts.EnableIdentityPatch = true // 强制启用,Antigravity 上游必需 + // 转换 Claude 请求为 Gemini 格式 - geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, s.getClaudeTransformOptions(ctx)) + geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, transformOpts) if err != nil { return nil, fmt.Errorf("transform request: %w", err) } - // 构建上游 action(NewAPIRequest 会自动处理 ?alt=sse 和 Accept Header) - action := "generateContent" - if claudeReq.Stream { - action = "streamGenerateContent" - } + // Antigravity 上游只支持流式请求,统一使用 streamGenerateContent + // 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回 + action := "streamGenerateContent" // 重试循环 var resp *http.Response @@ -465,7 +519,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, _ = resp.Body.Close() if attempt < antigravityMaxRetries { - log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries) + log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500)) if !sleepAntigravityBackoffWithContext(ctx, attempt) { log.Printf("%s status=context_canceled_during_backoff", prefix) return nil, ctx.Err() @@ -584,6 +638,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, var usage *ClaudeUsage var firstTokenMs *int if claudeReq.Stream { + // 客户端要求流式,直接透传转换 streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel) if err != nil { log.Printf("%s status=stream_error error=%v", prefix, err) @@ -592,10 +647,14 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, usage = streamRes.usage firstTokenMs = streamRes.firstTokenMs } else { - usage, err = s.handleClaudeNonStreamingResponse(c, resp, originalModel) + // 客户端要求非流式,收集流式响应后转换返回 + streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel) if err != nil { + log.Printf("%s status=stream_collect_error error=%v", prefix, err) return nil, err } + usage = streamRes.usage + firstTokenMs = streamRes.firstTokenMs } return &ForwardResult{ @@ -928,18 +987,22 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co proxyURL = account.Proxy.URL() } - // 包装请求 - wrappedBody, err := s.wrapV1InternalRequest(projectID, mappedModel, body) + // Antigravity 上游要求必须包含身份提示词,注入到请求中 + injectedBody, err := injectIdentityPatchToGeminiRequest(body) if err != nil { return nil, err } - // 构建上游 action(NewAPIRequest 会自动处理 ?alt=sse 和 Accept Header) - upstreamAction := action - if action == "generateContent" && stream { - upstreamAction = "streamGenerateContent" + // 包装请求 + wrappedBody, err := s.wrapV1InternalRequest(projectID, mappedModel, injectedBody) + if err != nil { + return nil, err } + // Antigravity 上游只支持流式请求,统一使用 streamGenerateContent + // 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回 + upstreamAction := "streamGenerateContent" + // 重试循环 var resp *http.Response for attempt := 1; attempt <= antigravityMaxRetries; attempt++ { @@ -1016,7 +1079,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co if fallbackModel != "" && fallbackModel != mappedModel { log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name) - fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, body) + fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, injectedBody) if err == nil { fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackWrapped) if err == nil { @@ -1066,7 +1129,8 @@ handleSuccess: var usage *ClaudeUsage var firstTokenMs *int - if stream || upstreamAction == "streamGenerateContent" { + if stream { + // 客户端要求流式,直接透传 streamRes, err := s.handleGeminiStreamingResponse(c, resp, startTime) if err != nil { log.Printf("%s status=stream_error error=%v", prefix, err) @@ -1075,11 +1139,14 @@ handleSuccess: usage = streamRes.usage firstTokenMs = streamRes.firstTokenMs } else { - usageResp, err := s.handleGeminiNonStreamingResponse(c, resp) + // 客户端要求非流式,收集流式响应后返回 + streamRes, err := s.handleGeminiStreamToNonStreaming(c, resp, startTime) if err != nil { + log.Printf("%s status=stream_collect_error error=%v", prefix, err) return nil, err } - usage = usageResp + usage = streamRes.usage + firstTokenMs = streamRes.firstTokenMs } if usage == nil { @@ -1126,9 +1193,9 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int) // sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待 // 返回 true 表示正常完成等待,false 表示 context 已取消 func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool { - delay := geminiRetryBaseDelay * time.Duration(1< geminiRetryMaxDelay { - delay = geminiRetryMaxDelay + delay := antigravityRetryBaseDelay * time.Duration(1< antigravityRetryMaxDelay { + delay = antigravityRetryMaxDelay } // +/- 20% jitter @@ -1340,25 +1407,150 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context } } -func (s *AntigravityGatewayService) handleGeminiNonStreamingResponse(c *gin.Context, resp *http.Response) (*ClaudeUsage, error) { - respBody, err := io.ReadAll(resp.Body) - if err != nil { - return nil, err +// handleGeminiStreamToNonStreaming 读取上游流式响应,合并为非流式响应返回给客户端 +// Gemini 流式响应中每个 chunk 都包含累积的完整文本,只需保留最后一个有效响应 +func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time) (*antigravityStreamResult, error) { + scanner := bufio.NewScanner(resp.Body) + maxLineSize := defaultMaxLineSize + if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 { + maxLineSize = s.settingService.cfg.Gateway.MaxLineSize + } + scanner.Buffer(make([]byte, 64*1024), maxLineSize) + + usage := &ClaudeUsage{} + var firstTokenMs *int + var last map[string]any + var lastWithParts map[string]any + + type scanEvent struct { + line string + err error } - // 解包 v1internal 响应 - unwrapped, _ := s.unwrapV1InternalResponse(respBody) - - var parsed map[string]any - if json.Unmarshal(unwrapped, &parsed) == nil { - if u := extractGeminiUsage(parsed); u != nil { - c.Data(resp.StatusCode, "application/json", unwrapped) - return u, nil + // 独立 goroutine 读取上游,避免读取阻塞影响超时处理 + events := make(chan scanEvent, 16) + done := make(chan struct{}) + sendEvent := func(ev scanEvent) bool { + select { + case events <- ev: + return true + case <-done: + return false } } - c.Data(resp.StatusCode, "application/json", unwrapped) - return &ClaudeUsage{}, nil + var lastReadAt int64 + atomic.StoreInt64(&lastReadAt, time.Now().UnixNano()) + go func() { + defer close(events) + for scanner.Scan() { + atomic.StoreInt64(&lastReadAt, time.Now().UnixNano()) + if !sendEvent(scanEvent{line: scanner.Text()}) { + return + } + } + if err := scanner.Err(); err != nil { + _ = sendEvent(scanEvent{err: err}) + } + }() + defer close(done) + + // 上游数据间隔超时保护(防止上游挂起长期占用连接) + streamInterval := time.Duration(0) + if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 { + streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second + } + var intervalTicker *time.Ticker + if streamInterval > 0 { + intervalTicker = time.NewTicker(streamInterval) + defer intervalTicker.Stop() + } + var intervalCh <-chan time.Time + if intervalTicker != nil { + intervalCh = intervalTicker.C + } + + for { + select { + case ev, ok := <-events: + if !ok { + // 流结束,返回收集的响应 + goto returnResponse + } + if ev.err != nil { + if errors.Is(ev.err, bufio.ErrTooLong) { + log.Printf("SSE line too long (antigravity non-stream): max_size=%d error=%v", maxLineSize, ev.err) + } + return nil, ev.err + } + + line := ev.line + trimmed := strings.TrimRight(line, "\r\n") + + if !strings.HasPrefix(trimmed, "data:") { + continue + } + + payload := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:")) + if payload == "" || payload == "[DONE]" { + continue + } + + // 解包 v1internal 响应 + inner, parseErr := s.unwrapV1InternalResponse([]byte(payload)) + if parseErr != nil { + continue + } + + var parsed map[string]any + if err := json.Unmarshal(inner, &parsed); err != nil { + continue + } + + // 记录首 token 时间 + if firstTokenMs == nil { + ms := int(time.Since(startTime).Milliseconds()) + firstTokenMs = &ms + } + + last = parsed + + // 提取 usage + if u := extractGeminiUsage(parsed); u != nil { + usage = u + } + + // 保留最后一个有 parts 的响应 + if parts := extractGeminiParts(parsed); len(parts) > 0 { + lastWithParts = parsed + } + + case <-intervalCh: + lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt)) + if time.Since(lastRead) < streamInterval { + continue + } + log.Printf("Stream data interval timeout (antigravity non-stream)") + return nil, fmt.Errorf("stream data interval timeout") + } + } + +returnResponse: + // 选择最后一个有效响应 + finalResponse := pickGeminiCollectResult(last, lastWithParts) + + // 处理空响应情况 + if last == nil && lastWithParts == nil { + log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received") + } + + respBody, err := json.Marshal(finalResponse) + if err != nil { + return nil, fmt.Errorf("failed to marshal response: %w", err) + } + c.Data(http.StatusOK, "application/json", respBody) + + return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil } func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int, errType, message string) error { @@ -1435,17 +1627,148 @@ func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int, return fmt.Errorf("%s", message) } -// handleClaudeNonStreamingResponse 处理 Claude 非流式响应(Gemini → Claude 转换) -func (s *AntigravityGatewayService) handleClaudeNonStreamingResponse(c *gin.Context, resp *http.Response, originalModel string) (*ClaudeUsage, error) { - body, err := io.ReadAll(io.LimitReader(resp.Body, 8<<20)) +// handleClaudeStreamToNonStreaming 收集上游流式响应,转换为 Claude 非流式格式返回 +// 用于处理客户端非流式请求但上游只支持流式的情况 +func (s *AntigravityGatewayService) handleClaudeStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time, originalModel string) (*antigravityStreamResult, error) { + scanner := bufio.NewScanner(resp.Body) + maxLineSize := defaultMaxLineSize + if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 { + maxLineSize = s.settingService.cfg.Gateway.MaxLineSize + } + scanner.Buffer(make([]byte, 64*1024), maxLineSize) + + var firstTokenMs *int + var last map[string]any + var lastWithParts map[string]any + + type scanEvent struct { + line string + err error + } + + // 独立 goroutine 读取上游,避免读取阻塞影响超时处理 + events := make(chan scanEvent, 16) + done := make(chan struct{}) + sendEvent := func(ev scanEvent) bool { + select { + case events <- ev: + return true + case <-done: + return false + } + } + + var lastReadAt int64 + atomic.StoreInt64(&lastReadAt, time.Now().UnixNano()) + go func() { + defer close(events) + for scanner.Scan() { + atomic.StoreInt64(&lastReadAt, time.Now().UnixNano()) + if !sendEvent(scanEvent{line: scanner.Text()}) { + return + } + } + if err := scanner.Err(); err != nil { + _ = sendEvent(scanEvent{err: err}) + } + }() + defer close(done) + + // 上游数据间隔超时保护(防止上游挂起长期占用连接) + streamInterval := time.Duration(0) + if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 { + streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second + } + var intervalTicker *time.Ticker + if streamInterval > 0 { + intervalTicker = time.NewTicker(streamInterval) + defer intervalTicker.Stop() + } + var intervalCh <-chan time.Time + if intervalTicker != nil { + intervalCh = intervalTicker.C + } + + for { + select { + case ev, ok := <-events: + if !ok { + // 流结束,转换并返回响应 + goto returnResponse + } + if ev.err != nil { + if errors.Is(ev.err, bufio.ErrTooLong) { + log.Printf("SSE line too long (antigravity claude non-stream): max_size=%d error=%v", maxLineSize, ev.err) + } + return nil, ev.err + } + + line := ev.line + trimmed := strings.TrimRight(line, "\r\n") + + if !strings.HasPrefix(trimmed, "data:") { + continue + } + + payload := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:")) + if payload == "" || payload == "[DONE]" { + continue + } + + // 解包 v1internal 响应 + inner, parseErr := s.unwrapV1InternalResponse([]byte(payload)) + if parseErr != nil { + continue + } + + var parsed map[string]any + if err := json.Unmarshal(inner, &parsed); err != nil { + continue + } + + // 记录首 token 时间 + if firstTokenMs == nil { + ms := int(time.Since(startTime).Milliseconds()) + firstTokenMs = &ms + } + + last = parsed + + // 保留最后一个有 parts 的响应 + if parts := extractGeminiParts(parsed); len(parts) > 0 { + lastWithParts = parsed + } + + case <-intervalCh: + lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt)) + if time.Since(lastRead) < streamInterval { + continue + } + log.Printf("Stream data interval timeout (antigravity claude non-stream)") + return nil, fmt.Errorf("stream data interval timeout") + } + } + +returnResponse: + // 选择最后一个有效响应 + finalResponse := pickGeminiCollectResult(last, lastWithParts) + + // 处理空响应情况 + if last == nil && lastWithParts == nil { + log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received") + return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Empty response from upstream") + } + + // 序列化为 JSON(Gemini 格式) + geminiBody, err := json.Marshal(finalResponse) if err != nil { - return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to read upstream response") + return nil, fmt.Errorf("failed to marshal gemini response: %w", err) } // 转换 Gemini 响应为 Claude 格式 - claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(body, originalModel) + claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(geminiBody, originalModel) if err != nil { - log.Printf("[antigravity-Forward] transform_error error=%v body=%s", err, string(body)) + log.Printf("[antigravity-Forward] transform_error error=%v body=%s", err, string(geminiBody)) return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response") } @@ -1458,7 +1781,8 @@ func (s *AntigravityGatewayService) handleClaudeNonStreamingResponse(c *gin.Cont CacheCreationInputTokens: agUsage.CacheCreationInputTokens, CacheReadInputTokens: agUsage.CacheReadInputTokens, } - return usage, nil + + return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil } // handleClaudeStreamingResponse 处理 Claude 流式响应(Gemini SSE → Claude SSE 转换)