Merge remote-tracking branch 'upstream/main'

This commit is contained in:
song
2026-01-08 22:52:18 +08:00
2 changed files with 375 additions and 46 deletions

View File

@@ -149,6 +149,11 @@ func defaultIdentityPatch(_ string) string {
return antigravityIdentity
}
// GetDefaultIdentityPatch 返回默认的 Antigravity 身份提示词
func GetDefaultIdentityPatch() string {
return antigravityIdentity
}
// buildSystemInstruction 构建 systemInstruction
func buildSystemInstruction(system json.RawMessage, modelName string, opts TransformOptions) *GeminiContent {
var parts []GeminiPart

View File

@@ -188,10 +188,8 @@ func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account
return nil, err
}
// DEBUG: 打印请求 header 和 body
log.Printf("[DEBUG] Antigravity TestConnection - URL: %s", req.URL.String())
log.Printf("[DEBUG] Antigravity TestConnection - Headers: %v", req.Header)
log.Printf("[DEBUG] Antigravity TestConnection - Body: %s", string(requestBody))
// 调试日志Test 请求信息
log.Printf("[antigravity-Test] account=%s request_size=%d url=%s", account.Name, len(requestBody), req.URL.String())
// 代理 URL
proxyURL := ""
@@ -236,6 +234,12 @@ func (s *AntigravityGatewayService) buildGeminiTestRequest(projectID, model stri
},
},
},
// Antigravity 上游要求必须包含身份提示词
"systemInstruction": map[string]any{
"parts": []map[string]any{
{"text": antigravity.GetDefaultIdentityPatch()},
},
},
}
payloadBytes, _ := json.Marshal(payload)
return s.wrapV1InternalRequest(projectID, model, payloadBytes)
@@ -334,6 +338,53 @@ func extractTextFromSSEResponse(respBody []byte) string {
return strings.Join(texts, "")
}
// injectIdentityPatchToGeminiRequest 为 Gemini 格式请求注入身份提示词
// 如果请求中已包含 "You are Antigravity" 则不重复注入
func injectIdentityPatchToGeminiRequest(body []byte) ([]byte, error) {
var request map[string]any
if err := json.Unmarshal(body, &request); err != nil {
return nil, fmt.Errorf("解析 Gemini 请求失败: %w", err)
}
// 检查现有 systemInstruction 是否已包含身份提示词
if sysInst, ok := request["systemInstruction"].(map[string]any); ok {
if parts, ok := sysInst["parts"].([]any); ok {
for _, part := range parts {
if partMap, ok := part.(map[string]any); ok {
if text, ok := partMap["text"].(string); ok {
if strings.Contains(text, "You are Antigravity") {
// 已包含身份提示词,直接返回原始请求
return body, nil
}
}
}
}
}
}
// 获取默认身份提示词
identityPatch := antigravity.GetDefaultIdentityPatch()
// 构建新的 systemInstruction
newPart := map[string]any{"text": identityPatch}
if existing, ok := request["systemInstruction"].(map[string]any); ok {
// 已有 systemInstruction在开头插入身份提示词
if parts, ok := existing["parts"].([]any); ok {
existing["parts"] = append([]any{newPart}, parts...)
} else {
existing["parts"] = []any{newPart}
}
} else {
// 没有 systemInstruction创建新的
request["systemInstruction"] = map[string]any{
"parts": []any{newPart},
}
}
return json.Marshal(request)
}
// wrapV1InternalRequest 包装请求为 v1internal 格式
func (s *AntigravityGatewayService) wrapV1InternalRequest(projectID, model string, originalBody []byte) ([]byte, error) {
var request any
@@ -419,17 +470,20 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
proxyURL = account.Proxy.URL()
}
// 获取转换选项
// Antigravity 上游要求必须包含身份提示词,否则会返回 429
transformOpts := s.getClaudeTransformOptions(ctx)
transformOpts.EnableIdentityPatch = true // 强制启用Antigravity 上游必需
// 转换 Claude 请求为 Gemini 格式
geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, s.getClaudeTransformOptions(ctx))
geminiBody, err := antigravity.TransformClaudeToGeminiWithOptions(&claudeReq, projectID, mappedModel, transformOpts)
if err != nil {
return nil, fmt.Errorf("transform request: %w", err)
}
// 构建上游 actionNewAPIRequest 会自动处理 ?alt=sse 和 Accept Header
action := "generateContent"
if claudeReq.Stream {
action = "streamGenerateContent"
}
// Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后转换返回
action := "streamGenerateContent"
// 重试循环
var resp *http.Response
@@ -466,7 +520,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
_ = resp.Body.Close()
if attempt < antigravityMaxRetries {
log.Printf("%s status=%d retry=%d/%d", prefix, resp.StatusCode, attempt, antigravityMaxRetries)
log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 500))
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
log.Printf("%s status=context_canceled_during_backoff", prefix)
return nil, ctx.Err()
@@ -585,6 +639,7 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
var usage *ClaudeUsage
var firstTokenMs *int
if claudeReq.Stream {
// 客户端要求流式,直接透传转换
streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel)
if err != nil {
log.Printf("%s status=stream_error error=%v", prefix, err)
@@ -593,10 +648,14 @@ func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context,
usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs
} else {
usage, err = s.handleClaudeNonStreamingResponse(c, resp, originalModel)
// 客户端要求非流式,收集流式响应后转换返回
streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel)
if err != nil {
log.Printf("%s status=stream_collect_error error=%v", prefix, err)
return nil, err
}
usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs
}
return &ForwardResult{
@@ -929,18 +988,22 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
proxyURL = account.Proxy.URL()
}
// 包装请求
wrappedBody, err := s.wrapV1InternalRequest(projectID, mappedModel, body)
// Antigravity 上游要求必须包含身份提示词,注入到请求
injectedBody, err := injectIdentityPatchToGeminiRequest(body)
if err != nil {
return nil, err
}
// 构建上游 actionNewAPIRequest 会自动处理 ?alt=sse 和 Accept Header
upstreamAction := action
if action == "generateContent" && stream {
upstreamAction = "streamGenerateContent"
// 包装请求
wrappedBody, err := s.wrapV1InternalRequest(projectID, mappedModel, injectedBody)
if err != nil {
return nil, err
}
// Antigravity 上游只支持流式请求,统一使用 streamGenerateContent
// 如果客户端请求非流式,在响应处理阶段会收集完整流式响应后返回
upstreamAction := "streamGenerateContent"
// 重试循环
var resp *http.Response
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
@@ -1017,7 +1080,7 @@ func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Co
if fallbackModel != "" && fallbackModel != mappedModel {
log.Printf("[Antigravity] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name)
fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, body)
fallbackWrapped, err := s.wrapV1InternalRequest(projectID, fallbackModel, injectedBody)
if err == nil {
fallbackReq, err := antigravity.NewAPIRequest(ctx, upstreamAction, accessToken, fallbackWrapped)
if err == nil {
@@ -1067,7 +1130,8 @@ handleSuccess:
var usage *ClaudeUsage
var firstTokenMs *int
if stream || upstreamAction == "streamGenerateContent" {
if stream {
// 客户端要求流式,直接透传
streamRes, err := s.handleGeminiStreamingResponse(c, resp, startTime)
if err != nil {
log.Printf("%s status=stream_error error=%v", prefix, err)
@@ -1076,11 +1140,14 @@ handleSuccess:
usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs
} else {
usageResp, err := s.handleGeminiNonStreamingResponse(c, resp)
// 客户端要求非流式,收集流式响应后返回
streamRes, err := s.handleGeminiStreamToNonStreaming(c, resp, startTime)
if err != nil {
log.Printf("%s status=stream_collect_error error=%v", prefix, err)
return nil, err
}
usage = usageResp
usage = streamRes.usage
firstTokenMs = streamRes.firstTokenMs
}
if usage == nil {
@@ -1127,9 +1194,9 @@ func (s *AntigravityGatewayService) shouldFailoverUpstreamError(statusCode int)
// sleepAntigravityBackoffWithContext 带 context 取消检查的退避等待
// 返回 true 表示正常完成等待false 表示 context 已取消
func sleepAntigravityBackoffWithContext(ctx context.Context, attempt int) bool {
delay := geminiRetryBaseDelay * time.Duration(1<<uint(attempt-1))
if delay > geminiRetryMaxDelay {
delay = geminiRetryMaxDelay
delay := antigravityRetryBaseDelay * time.Duration(1<<uint(attempt-1))
if delay > antigravityRetryMaxDelay {
delay = antigravityRetryMaxDelay
}
// +/- 20% jitter
@@ -1341,25 +1408,150 @@ func (s *AntigravityGatewayService) handleGeminiStreamingResponse(c *gin.Context
}
}
func (s *AntigravityGatewayService) handleGeminiNonStreamingResponse(c *gin.Context, resp *http.Response) (*ClaudeUsage, error) {
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
// handleGeminiStreamToNonStreaming 读取上游流式响应,合并为非流式响应返回给客户端
// Gemini 流式响应中每个 chunk 都包含累积的完整文本,只需保留最后一个有效响应
func (s *AntigravityGatewayService) handleGeminiStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time) (*antigravityStreamResult, error) {
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.settingService.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 64*1024), maxLineSize)
usage := &ClaudeUsage{}
var firstTokenMs *int
var last map[string]any
var lastWithParts map[string]any
type scanEvent struct {
line string
err error
}
// 解包 v1internal 响应
unwrapped, _ := s.unwrapV1InternalResponse(respBody)
var parsed map[string]any
if json.Unmarshal(unwrapped, &parsed) == nil {
if u := extractGeminiUsage(parsed); u != nil {
c.Data(resp.StatusCode, "application/json", unwrapped)
return u, nil
// 独立 goroutine 读取上游,避免读取阻塞影响超时处理
events := make(chan scanEvent, 16)
done := make(chan struct{})
sendEvent := func(ev scanEvent) bool {
select {
case events <- ev:
return true
case <-done:
return false
}
}
c.Data(resp.StatusCode, "application/json", unwrapped)
return &ClaudeUsage{}, nil
var lastReadAt int64
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
go func() {
defer close(events)
for scanner.Scan() {
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
if !sendEvent(scanEvent{line: scanner.Text()}) {
return
}
}
if err := scanner.Err(); err != nil {
_ = sendEvent(scanEvent{err: err})
}
}()
defer close(done)
// 上游数据间隔超时保护(防止上游挂起长期占用连接)
streamInterval := time.Duration(0)
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 {
streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second
}
var intervalTicker *time.Ticker
if streamInterval > 0 {
intervalTicker = time.NewTicker(streamInterval)
defer intervalTicker.Stop()
}
var intervalCh <-chan time.Time
if intervalTicker != nil {
intervalCh = intervalTicker.C
}
for {
select {
case ev, ok := <-events:
if !ok {
// 流结束,返回收集的响应
goto returnResponse
}
if ev.err != nil {
if errors.Is(ev.err, bufio.ErrTooLong) {
log.Printf("SSE line too long (antigravity non-stream): max_size=%d error=%v", maxLineSize, ev.err)
}
return nil, ev.err
}
line := ev.line
trimmed := strings.TrimRight(line, "\r\n")
if !strings.HasPrefix(trimmed, "data:") {
continue
}
payload := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:"))
if payload == "" || payload == "[DONE]" {
continue
}
// 解包 v1internal 响应
inner, parseErr := s.unwrapV1InternalResponse([]byte(payload))
if parseErr != nil {
continue
}
var parsed map[string]any
if err := json.Unmarshal(inner, &parsed); err != nil {
continue
}
// 记录首 token 时间
if firstTokenMs == nil {
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
last = parsed
// 提取 usage
if u := extractGeminiUsage(parsed); u != nil {
usage = u
}
// 保留最后一个有 parts 的响应
if parts := extractGeminiParts(parsed); len(parts) > 0 {
lastWithParts = parsed
}
case <-intervalCh:
lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt))
if time.Since(lastRead) < streamInterval {
continue
}
log.Printf("Stream data interval timeout (antigravity non-stream)")
return nil, fmt.Errorf("stream data interval timeout")
}
}
returnResponse:
// 选择最后一个有效响应
finalResponse := pickGeminiCollectResult(last, lastWithParts)
// 处理空响应情况
if last == nil && lastWithParts == nil {
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
}
respBody, err := json.Marshal(finalResponse)
if err != nil {
return nil, fmt.Errorf("failed to marshal response: %w", err)
}
c.Data(http.StatusOK, "application/json", respBody)
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil
}
func (s *AntigravityGatewayService) writeClaudeError(c *gin.Context, status int, errType, message string) error {
@@ -1436,17 +1628,148 @@ func (s *AntigravityGatewayService) writeGoogleError(c *gin.Context, status int,
return fmt.Errorf("%s", message)
}
// handleClaudeNonStreamingResponse 处理 Claude 非流式响应Gemini → Claude 转换)
func (s *AntigravityGatewayService) handleClaudeNonStreamingResponse(c *gin.Context, resp *http.Response, originalModel string) (*ClaudeUsage, error) {
body, err := io.ReadAll(io.LimitReader(resp.Body, 8<<20))
// handleClaudeStreamToNonStreaming 收集上游流式响应,转换为 Claude 非流式格式返回
// 用于处理客户端非流式请求但上游只支持流式的情况
func (s *AntigravityGatewayService) handleClaudeStreamToNonStreaming(c *gin.Context, resp *http.Response, startTime time.Time, originalModel string) (*antigravityStreamResult, error) {
scanner := bufio.NewScanner(resp.Body)
maxLineSize := defaultMaxLineSize
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.MaxLineSize > 0 {
maxLineSize = s.settingService.cfg.Gateway.MaxLineSize
}
scanner.Buffer(make([]byte, 64*1024), maxLineSize)
var firstTokenMs *int
var last map[string]any
var lastWithParts map[string]any
type scanEvent struct {
line string
err error
}
// 独立 goroutine 读取上游,避免读取阻塞影响超时处理
events := make(chan scanEvent, 16)
done := make(chan struct{})
sendEvent := func(ev scanEvent) bool {
select {
case events <- ev:
return true
case <-done:
return false
}
}
var lastReadAt int64
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
go func() {
defer close(events)
for scanner.Scan() {
atomic.StoreInt64(&lastReadAt, time.Now().UnixNano())
if !sendEvent(scanEvent{line: scanner.Text()}) {
return
}
}
if err := scanner.Err(); err != nil {
_ = sendEvent(scanEvent{err: err})
}
}()
defer close(done)
// 上游数据间隔超时保护(防止上游挂起长期占用连接)
streamInterval := time.Duration(0)
if s.settingService.cfg != nil && s.settingService.cfg.Gateway.StreamDataIntervalTimeout > 0 {
streamInterval = time.Duration(s.settingService.cfg.Gateway.StreamDataIntervalTimeout) * time.Second
}
var intervalTicker *time.Ticker
if streamInterval > 0 {
intervalTicker = time.NewTicker(streamInterval)
defer intervalTicker.Stop()
}
var intervalCh <-chan time.Time
if intervalTicker != nil {
intervalCh = intervalTicker.C
}
for {
select {
case ev, ok := <-events:
if !ok {
// 流结束,转换并返回响应
goto returnResponse
}
if ev.err != nil {
if errors.Is(ev.err, bufio.ErrTooLong) {
log.Printf("SSE line too long (antigravity claude non-stream): max_size=%d error=%v", maxLineSize, ev.err)
}
return nil, ev.err
}
line := ev.line
trimmed := strings.TrimRight(line, "\r\n")
if !strings.HasPrefix(trimmed, "data:") {
continue
}
payload := strings.TrimSpace(strings.TrimPrefix(trimmed, "data:"))
if payload == "" || payload == "[DONE]" {
continue
}
// 解包 v1internal 响应
inner, parseErr := s.unwrapV1InternalResponse([]byte(payload))
if parseErr != nil {
continue
}
var parsed map[string]any
if err := json.Unmarshal(inner, &parsed); err != nil {
continue
}
// 记录首 token 时间
if firstTokenMs == nil {
ms := int(time.Since(startTime).Milliseconds())
firstTokenMs = &ms
}
last = parsed
// 保留最后一个有 parts 的响应
if parts := extractGeminiParts(parsed); len(parts) > 0 {
lastWithParts = parsed
}
case <-intervalCh:
lastRead := time.Unix(0, atomic.LoadInt64(&lastReadAt))
if time.Since(lastRead) < streamInterval {
continue
}
log.Printf("Stream data interval timeout (antigravity claude non-stream)")
return nil, fmt.Errorf("stream data interval timeout")
}
}
returnResponse:
// 选择最后一个有效响应
finalResponse := pickGeminiCollectResult(last, lastWithParts)
// 处理空响应情况
if last == nil && lastWithParts == nil {
log.Printf("[antigravity-Forward] warning: empty stream response, no valid chunks received")
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Empty response from upstream")
}
// 序列化为 JSONGemini 格式)
geminiBody, err := json.Marshal(finalResponse)
if err != nil {
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to read upstream response")
return nil, fmt.Errorf("failed to marshal gemini response: %w", err)
}
// 转换 Gemini 响应为 Claude 格式
claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(body, originalModel)
claudeResp, agUsage, err := antigravity.TransformGeminiToClaude(geminiBody, originalModel)
if err != nil {
log.Printf("[antigravity-Forward] transform_error error=%v body=%s", err, string(body))
log.Printf("[antigravity-Forward] transform_error error=%v body=%s", err, string(geminiBody))
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Failed to parse upstream response")
}
@@ -1459,7 +1782,8 @@ func (s *AntigravityGatewayService) handleClaudeNonStreamingResponse(c *gin.Cont
CacheCreationInputTokens: agUsage.CacheCreationInputTokens,
CacheReadInputTokens: agUsage.CacheReadInputTokens,
}
return usage, nil
return &antigravityStreamResult{usage: usage, firstTokenMs: firstTokenMs}, nil
}
// handleClaudeStreamingResponse 处理 Claude 流式响应Gemini SSE → Claude SSE 转换)