fix(gateway): restore upstream account forwarding with dedicated methods
v0.1.74 merged upstream accounts into the OAuth path, causing requests to hit the wrong protocol and endpoint. Add three upstream-specific methods (testUpstreamConnection, ForwardUpstream, ForwardUpstreamGemini) that use base_url + apiKey auth and passthrough the original body, while reusing the existing response handling and error/retry logic.
This commit is contained in:
@@ -650,6 +650,10 @@ type TestConnectionResult struct {
|
||||
// TestConnection 测试 Antigravity 账号连接(非流式,无重试、无计费)
|
||||
// 支持 Claude 和 Gemini 两种协议,根据 modelID 前缀自动选择
|
||||
func (s *AntigravityGatewayService) TestConnection(ctx context.Context, account *Account, modelID string) (*TestConnectionResult, error) {
|
||||
if account.Type == AccountTypeUpstream {
|
||||
return s.testUpstreamConnection(ctx, account, modelID)
|
||||
}
|
||||
|
||||
// 获取 token
|
||||
if s.tokenProvider == nil {
|
||||
return nil, errors.New("antigravity token provider not configured")
|
||||
@@ -966,6 +970,11 @@ func isModelNotFoundError(statusCode int, body []byte) bool {
|
||||
// Forward 转发 Claude 协议请求(Claude → Gemini 转换)
|
||||
func (s *AntigravityGatewayService) Forward(ctx context.Context, c *gin.Context, account *Account, body []byte, isStickySession bool) (*ForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
if account.Type == AccountTypeUpstream {
|
||||
return s.ForwardUpstream(ctx, c, account, body, isStickySession)
|
||||
}
|
||||
|
||||
sessionID := getSessionID(c)
|
||||
prefix := logPrefix(sessionID, account.Name)
|
||||
|
||||
@@ -1585,6 +1594,11 @@ func stripSignatureSensitiveBlocksFromClaudeRequest(req *antigravity.ClaudeReque
|
||||
// ForwardGemini 转发 Gemini 协议请求
|
||||
func (s *AntigravityGatewayService) ForwardGemini(ctx context.Context, c *gin.Context, account *Account, originalModel string, action string, stream bool, body []byte, isStickySession bool) (*ForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
|
||||
if account.Type == AccountTypeUpstream {
|
||||
return s.ForwardUpstreamGemini(ctx, c, account, originalModel, action, stream, body, isStickySession)
|
||||
}
|
||||
|
||||
sessionID := getSessionID(c)
|
||||
prefix := logPrefix(sessionID, account.Name)
|
||||
|
||||
@@ -3332,3 +3346,590 @@ func filterEmptyPartsFromGeminiRequest(body []byte) ([]byte, error) {
|
||||
payload["contents"] = filtered
|
||||
return json.Marshal(payload)
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Upstream 专用转发方法
|
||||
// upstream 账号直接连接上游 Anthropic/Gemini 兼容端点,不走 Antigravity OAuth 协议转换。
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
// testUpstreamConnection 测试 upstream 账号连接
|
||||
func (s *AntigravityGatewayService) testUpstreamConnection(ctx context.Context, account *Account, modelID string) (*TestConnectionResult, error) {
|
||||
baseURL := strings.TrimRight(strings.TrimSpace(account.GetCredential("base_url")), "/")
|
||||
if baseURL == "" {
|
||||
return nil, errors.New("upstream account missing base_url in credentials")
|
||||
}
|
||||
apiKey := strings.TrimSpace(account.GetCredential("api_key"))
|
||||
if apiKey == "" {
|
||||
return nil, errors.New("upstream account missing api_key in credentials")
|
||||
}
|
||||
|
||||
mappedModel := s.getMappedModel(account, modelID)
|
||||
if mappedModel == "" {
|
||||
return nil, fmt.Errorf("model %s not in whitelist", modelID)
|
||||
}
|
||||
|
||||
// 构建最小 Claude 格式请求
|
||||
requestBody, _ := json.Marshal(map[string]any{
|
||||
"model": mappedModel,
|
||||
"max_tokens": 1,
|
||||
"messages": []map[string]any{
|
||||
{"role": "user", "content": "."},
|
||||
},
|
||||
"stream": false,
|
||||
})
|
||||
|
||||
apiURL := baseURL + "/antigravity/v1/messages"
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, bytes.NewReader(requestBody))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("构建请求失败: %w", err)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+apiKey)
|
||||
req.Header.Set("x-api-key", apiKey)
|
||||
req.Header.Set("anthropic-version", "2023-06-01")
|
||||
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
log.Printf("[antigravity-Test-Upstream] account=%s url=%s", account.Name, apiURL)
|
||||
|
||||
resp, err := s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("请求失败: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("读取响应失败: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
return nil, fmt.Errorf("API 返回 %d: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
// 从 Claude 格式非流式响应中提取文本
|
||||
var claudeResp struct {
|
||||
Content []struct {
|
||||
Text string `json:"text"`
|
||||
} `json:"content"`
|
||||
}
|
||||
text := ""
|
||||
if json.Unmarshal(respBody, &claudeResp) == nil && len(claudeResp.Content) > 0 {
|
||||
text = claudeResp.Content[0].Text
|
||||
}
|
||||
|
||||
return &TestConnectionResult{
|
||||
Text: text,
|
||||
MappedModel: mappedModel,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ForwardUpstream 转发 Claude 协议请求到 upstream(不做协议转换)
|
||||
func (s *AntigravityGatewayService) ForwardUpstream(ctx context.Context, c *gin.Context, account *Account, body []byte, isStickySession bool) (*ForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
sessionID := getSessionID(c)
|
||||
prefix := logPrefix(sessionID, account.Name)
|
||||
|
||||
baseURL := strings.TrimRight(strings.TrimSpace(account.GetCredential("base_url")), "/")
|
||||
if baseURL == "" {
|
||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "api_error", "Upstream account missing base_url")
|
||||
}
|
||||
apiKey := strings.TrimSpace(account.GetCredential("api_key"))
|
||||
if apiKey == "" {
|
||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "authentication_error", "Upstream account missing api_key")
|
||||
}
|
||||
|
||||
// 解析请求以获取模型和流式标志
|
||||
var claudeReq antigravity.ClaudeRequest
|
||||
if err := json.Unmarshal(body, &claudeReq); err != nil {
|
||||
return nil, s.writeClaudeError(c, http.StatusBadRequest, "invalid_request_error", "Invalid request body")
|
||||
}
|
||||
if strings.TrimSpace(claudeReq.Model) == "" {
|
||||
return nil, s.writeClaudeError(c, http.StatusBadRequest, "invalid_request_error", "Missing model")
|
||||
}
|
||||
|
||||
originalModel := claudeReq.Model
|
||||
mappedModel := s.getMappedModel(account, claudeReq.Model)
|
||||
if mappedModel == "" {
|
||||
return nil, s.writeClaudeError(c, http.StatusForbidden, "permission_error", fmt.Sprintf("model %s not in whitelist", claudeReq.Model))
|
||||
}
|
||||
loadModel := mappedModel
|
||||
thinkingEnabled := claudeReq.Thinking != nil && claudeReq.Thinking.Type == "enabled"
|
||||
mappedModel = applyThinkingModelSuffix(mappedModel, thinkingEnabled)
|
||||
quotaScope, _ := resolveAntigravityQuotaScope(originalModel)
|
||||
|
||||
// 代理 URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// 统计模型调用次数
|
||||
if s.cache != nil {
|
||||
_, _ = s.cache.IncrModelCallCount(ctx, account.ID, loadModel)
|
||||
}
|
||||
|
||||
apiURL := baseURL + "/antigravity/v1/messages"
|
||||
log.Printf("%s upstream_forward url=%s model=%s", prefix, apiURL, mappedModel)
|
||||
|
||||
// 预检查:模型级限流
|
||||
if remaining := account.GetRateLimitRemainingTimeWithContext(ctx, originalModel); remaining > 0 {
|
||||
if remaining < antigravityRateLimitThreshold {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(remaining):
|
||||
}
|
||||
} else {
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
ForceCacheBilling: isStickySession,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 重试循环
|
||||
var resp *http.Response
|
||||
var lastErr error
|
||||
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, s.writeClaudeError(c, http.StatusInternalServerError, "api_error", "Failed to build request")
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+apiKey)
|
||||
req.Header.Set("x-api-key", apiKey)
|
||||
|
||||
// 透传 anthropic headers
|
||||
if v := c.GetHeader("anthropic-version"); v != "" {
|
||||
req.Header.Set("anthropic-version", v)
|
||||
} else {
|
||||
req.Header.Set("anthropic-version", "2023-06-01")
|
||||
}
|
||||
if v := c.GetHeader("anthropic-beta"); v != "" {
|
||||
req.Header.Set("anthropic-beta", v)
|
||||
}
|
||||
|
||||
if c != nil && len(body) > 0 {
|
||||
c.Set(OpsUpstreamRequestBodyKey, string(body))
|
||||
}
|
||||
|
||||
resp, err = s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", "Upstream request failed after retries")
|
||||
}
|
||||
|
||||
// 429/503 重试
|
||||
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
_ = resp.Body.Close()
|
||||
|
||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope, 0, "", isStickySession)
|
||||
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 200))
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: resp.StatusCode,
|
||||
ForceCacheBilling: isStickySession,
|
||||
}
|
||||
}
|
||||
|
||||
break // 成功或非限流错误,跳出重试
|
||||
}
|
||||
if resp == nil {
|
||||
return nil, s.writeClaudeError(c, http.StatusBadGateway, "upstream_error", fmt.Sprintf("upstream request failed: %v", lastErr))
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// 错误响应处理
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
|
||||
// signature 重试
|
||||
if resp.StatusCode == http.StatusBadRequest && isSignatureRelatedError(respBody) {
|
||||
log.Printf("%s upstream signature error, retrying with thinking stripped", prefix)
|
||||
retryClaudeReq := claudeReq
|
||||
retryClaudeReq.Messages = append([]antigravity.ClaudeMessage(nil), claudeReq.Messages...)
|
||||
if stripped, stripErr := stripThinkingFromClaudeRequest(&retryClaudeReq); stripErr == nil && stripped {
|
||||
retryBody, _ := json.Marshal(&retryClaudeReq)
|
||||
retryReq, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, bytes.NewReader(retryBody))
|
||||
if err == nil {
|
||||
retryReq.Header.Set("Content-Type", "application/json")
|
||||
retryReq.Header.Set("Authorization", "Bearer "+apiKey)
|
||||
retryReq.Header.Set("x-api-key", apiKey)
|
||||
retryReq.Header.Set("anthropic-version", "2023-06-01")
|
||||
if v := c.GetHeader("anthropic-beta"); v != "" {
|
||||
retryReq.Header.Set("anthropic-beta", v)
|
||||
}
|
||||
retryResp, retryErr := s.httpUpstream.Do(retryReq, proxyURL, account.ID, account.Concurrency)
|
||||
if retryErr == nil && retryResp != nil && retryResp.StatusCode < 400 {
|
||||
resp = retryResp
|
||||
goto upstreamClaudeSuccess
|
||||
}
|
||||
if retryResp != nil {
|
||||
_ = retryResp.Body.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// prompt too long
|
||||
if resp.StatusCode == http.StatusBadRequest && isPromptTooLongError(respBody) {
|
||||
return nil, &PromptTooLongError{
|
||||
StatusCode: resp.StatusCode,
|
||||
RequestID: resp.Header.Get("x-request-id"),
|
||||
Body: respBody,
|
||||
}
|
||||
}
|
||||
|
||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope, 0, "", isStickySession)
|
||||
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody}
|
||||
}
|
||||
|
||||
return nil, s.writeMappedClaudeError(c, account, resp.StatusCode, resp.Header.Get("x-request-id"), respBody)
|
||||
}
|
||||
|
||||
upstreamClaudeSuccess:
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
if requestID != "" {
|
||||
c.Header("x-request-id", requestID)
|
||||
}
|
||||
|
||||
var usage *ClaudeUsage
|
||||
var firstTokenMs *int
|
||||
if claudeReq.Stream {
|
||||
streamRes, err := s.handleClaudeStreamingResponse(c, resp, startTime, originalModel)
|
||||
if err != nil {
|
||||
log.Printf("%s status=stream_error error=%v", prefix, err)
|
||||
return nil, err
|
||||
}
|
||||
usage = streamRes.usage
|
||||
firstTokenMs = streamRes.firstTokenMs
|
||||
} else {
|
||||
streamRes, err := s.handleClaudeStreamToNonStreaming(c, resp, startTime, originalModel)
|
||||
if err != nil {
|
||||
log.Printf("%s status=stream_collect_error error=%v", prefix, err)
|
||||
return nil, err
|
||||
}
|
||||
usage = streamRes.usage
|
||||
firstTokenMs = streamRes.firstTokenMs
|
||||
}
|
||||
|
||||
return &ForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: *usage,
|
||||
Model: originalModel,
|
||||
Stream: claudeReq.Stream,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// ForwardUpstreamGemini 转发 Gemini 协议请求到 upstream(不做协议转换)
|
||||
func (s *AntigravityGatewayService) ForwardUpstreamGemini(ctx context.Context, c *gin.Context, account *Account, originalModel string, action string, stream bool, body []byte, isStickySession bool) (*ForwardResult, error) {
|
||||
startTime := time.Now()
|
||||
sessionID := getSessionID(c)
|
||||
prefix := logPrefix(sessionID, account.Name)
|
||||
|
||||
baseURL := strings.TrimRight(strings.TrimSpace(account.GetCredential("base_url")), "/")
|
||||
if baseURL == "" {
|
||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream account missing base_url")
|
||||
}
|
||||
apiKey := strings.TrimSpace(account.GetCredential("api_key"))
|
||||
if apiKey == "" {
|
||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream account missing api_key")
|
||||
}
|
||||
|
||||
if strings.TrimSpace(originalModel) == "" {
|
||||
return nil, s.writeGoogleError(c, http.StatusBadRequest, "Missing model in URL")
|
||||
}
|
||||
if strings.TrimSpace(action) == "" {
|
||||
return nil, s.writeGoogleError(c, http.StatusBadRequest, "Missing action in URL")
|
||||
}
|
||||
if len(body) == 0 {
|
||||
return nil, s.writeGoogleError(c, http.StatusBadRequest, "Request body is empty")
|
||||
}
|
||||
quotaScope, _ := resolveAntigravityQuotaScope(originalModel)
|
||||
|
||||
imageSize := s.extractImageSize(body)
|
||||
|
||||
switch action {
|
||||
case "generateContent", "streamGenerateContent":
|
||||
// ok
|
||||
case "countTokens":
|
||||
c.JSON(http.StatusOK, map[string]any{"totalTokens": 0})
|
||||
return &ForwardResult{
|
||||
RequestID: "",
|
||||
Usage: ClaudeUsage{},
|
||||
Model: originalModel,
|
||||
Stream: false,
|
||||
Duration: time.Since(time.Now()),
|
||||
FirstTokenMs: nil,
|
||||
}, nil
|
||||
default:
|
||||
return nil, s.writeGoogleError(c, http.StatusNotFound, "Unsupported action: "+action)
|
||||
}
|
||||
|
||||
mappedModel := s.getMappedModel(account, originalModel)
|
||||
if mappedModel == "" {
|
||||
return nil, s.writeGoogleError(c, http.StatusForbidden, fmt.Sprintf("model %s not in whitelist", originalModel))
|
||||
}
|
||||
|
||||
// 代理 URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
// 统计模型调用次数
|
||||
if s.cache != nil {
|
||||
_, _ = s.cache.IncrModelCallCount(ctx, account.ID, mappedModel)
|
||||
}
|
||||
|
||||
// 构建 upstream URL: base_url + /antigravity/v1beta/models/MODEL:ACTION
|
||||
upstreamAction := action
|
||||
if action == "generateContent" && !stream {
|
||||
// 非流式也用 streamGenerateContent,与 OAuth 路径行为一致
|
||||
upstreamAction = action
|
||||
}
|
||||
apiURL := fmt.Sprintf("%s/antigravity/v1beta/models/%s:%s", baseURL, mappedModel, upstreamAction)
|
||||
if stream || upstreamAction == "streamGenerateContent" {
|
||||
apiURL += "?alt=sse"
|
||||
}
|
||||
|
||||
log.Printf("%s upstream_forward_gemini url=%s model=%s action=%s", prefix, apiURL, mappedModel, upstreamAction)
|
||||
|
||||
// 预检查:模型级限流
|
||||
if remaining := account.GetRateLimitRemainingTimeWithContext(ctx, originalModel); remaining > 0 {
|
||||
if remaining < antigravityRateLimitThreshold {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(remaining):
|
||||
}
|
||||
} else {
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: http.StatusServiceUnavailable,
|
||||
ForceCacheBilling: isStickySession,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 重试循环
|
||||
var resp *http.Response
|
||||
var lastErr error
|
||||
for attempt := 1; attempt <= antigravityMaxRetries; attempt++ {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
return nil, s.writeGoogleError(c, http.StatusInternalServerError, "Failed to build request")
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Authorization", "Bearer "+apiKey)
|
||||
|
||||
if c != nil && len(body) > 0 {
|
||||
c.Set(OpsUpstreamRequestBodyKey, string(body))
|
||||
}
|
||||
|
||||
resp, err = s.httpUpstream.Do(req, proxyURL, account.ID, account.Concurrency)
|
||||
if err != nil {
|
||||
lastErr = err
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=request_failed retry=%d/%d error=%v", prefix, attempt, antigravityMaxRetries, err)
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, "Upstream request failed after retries")
|
||||
}
|
||||
|
||||
// 429/503 重试
|
||||
if resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode == http.StatusServiceUnavailable {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
_ = resp.Body.Close()
|
||||
|
||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope, 0, "", isStickySession)
|
||||
|
||||
if attempt < antigravityMaxRetries {
|
||||
log.Printf("%s status=%d retry=%d/%d body=%s", prefix, resp.StatusCode, attempt, antigravityMaxRetries, truncateForLog(respBody, 200))
|
||||
if !sleepAntigravityBackoffWithContext(ctx, attempt) {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
return nil, &UpstreamFailoverError{
|
||||
StatusCode: resp.StatusCode,
|
||||
ForceCacheBilling: isStickySession,
|
||||
}
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
if resp == nil {
|
||||
return nil, s.writeGoogleError(c, http.StatusBadGateway, fmt.Sprintf("upstream request failed: %v", lastErr))
|
||||
}
|
||||
defer func() {
|
||||
if resp != nil && resp.Body != nil {
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// 错误响应处理
|
||||
if resp.StatusCode >= 400 {
|
||||
respBody, _ := io.ReadAll(io.LimitReader(resp.Body, 2<<20))
|
||||
contentType := resp.Header.Get("Content-Type")
|
||||
_ = resp.Body.Close()
|
||||
resp.Body = io.NopCloser(bytes.NewReader(respBody))
|
||||
|
||||
// 模型兜底
|
||||
if s.settingService != nil && s.settingService.IsModelFallbackEnabled(ctx) &&
|
||||
isModelNotFoundError(resp.StatusCode, respBody) {
|
||||
fallbackModel := s.settingService.GetFallbackModel(ctx, PlatformAntigravity)
|
||||
if fallbackModel != "" && fallbackModel != mappedModel {
|
||||
log.Printf("[Antigravity-Upstream] Model not found (%s), retrying with fallback model %s (account: %s)", mappedModel, fallbackModel, account.Name)
|
||||
fallbackURL := fmt.Sprintf("%s/antigravity/v1beta/models/%s:%s", baseURL, fallbackModel, upstreamAction)
|
||||
if stream || upstreamAction == "streamGenerateContent" {
|
||||
fallbackURL += "?alt=sse"
|
||||
}
|
||||
fallbackReq, err := http.NewRequestWithContext(ctx, http.MethodPost, fallbackURL, bytes.NewReader(body))
|
||||
if err == nil {
|
||||
fallbackReq.Header.Set("Content-Type", "application/json")
|
||||
fallbackReq.Header.Set("Authorization", "Bearer "+apiKey)
|
||||
fallbackResp, err := s.httpUpstream.Do(fallbackReq, proxyURL, account.ID, account.Concurrency)
|
||||
if err == nil && fallbackResp.StatusCode < 400 {
|
||||
_ = resp.Body.Close()
|
||||
resp = fallbackResp
|
||||
} else if fallbackResp != nil {
|
||||
_ = fallbackResp.Body.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// fallback 成功
|
||||
if resp.StatusCode < 400 {
|
||||
goto upstreamGeminiSuccess
|
||||
}
|
||||
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
if requestID != "" {
|
||||
c.Header("x-request-id", requestID)
|
||||
}
|
||||
|
||||
s.handleUpstreamError(ctx, prefix, account, resp.StatusCode, resp.Header, respBody, quotaScope, 0, "", isStickySession)
|
||||
upstreamMsg := strings.TrimSpace(extractAntigravityErrorMessage(respBody))
|
||||
upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg)
|
||||
upstreamDetail := s.getUpstreamErrorDetail(respBody)
|
||||
|
||||
setOpsUpstreamError(c, resp.StatusCode, upstreamMsg, upstreamDetail)
|
||||
|
||||
if s.shouldFailoverUpstreamError(resp.StatusCode) {
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: requestID,
|
||||
Kind: "failover",
|
||||
Message: upstreamMsg,
|
||||
Detail: upstreamDetail,
|
||||
})
|
||||
return nil, &UpstreamFailoverError{StatusCode: resp.StatusCode, ResponseBody: respBody}
|
||||
}
|
||||
if contentType == "" {
|
||||
contentType = "application/json"
|
||||
}
|
||||
appendOpsUpstreamError(c, OpsUpstreamErrorEvent{
|
||||
Platform: account.Platform,
|
||||
AccountID: account.ID,
|
||||
AccountName: account.Name,
|
||||
UpstreamStatusCode: resp.StatusCode,
|
||||
UpstreamRequestID: requestID,
|
||||
Kind: "http_error",
|
||||
Message: upstreamMsg,
|
||||
Detail: upstreamDetail,
|
||||
})
|
||||
log.Printf("[antigravity-Forward-Upstream] upstream error status=%d body=%s", resp.StatusCode, truncateForLog(respBody, 500))
|
||||
c.Data(resp.StatusCode, contentType, respBody)
|
||||
return nil, fmt.Errorf("antigravity upstream error: %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
upstreamGeminiSuccess:
|
||||
requestID := resp.Header.Get("x-request-id")
|
||||
if requestID != "" {
|
||||
c.Header("x-request-id", requestID)
|
||||
}
|
||||
|
||||
var usage *ClaudeUsage
|
||||
var firstTokenMs *int
|
||||
|
||||
if stream {
|
||||
streamRes, err := s.handleGeminiStreamingResponse(c, resp, startTime)
|
||||
if err != nil {
|
||||
log.Printf("%s status=stream_error error=%v", prefix, err)
|
||||
return nil, err
|
||||
}
|
||||
usage = streamRes.usage
|
||||
firstTokenMs = streamRes.firstTokenMs
|
||||
} else {
|
||||
streamRes, err := s.handleGeminiStreamToNonStreaming(c, resp, startTime)
|
||||
if err != nil {
|
||||
log.Printf("%s status=stream_collect_error error=%v", prefix, err)
|
||||
return nil, err
|
||||
}
|
||||
usage = streamRes.usage
|
||||
firstTokenMs = streamRes.firstTokenMs
|
||||
}
|
||||
|
||||
if usage == nil {
|
||||
usage = &ClaudeUsage{}
|
||||
}
|
||||
|
||||
imageCount := 0
|
||||
if isImageGenerationModel(mappedModel) {
|
||||
imageCount = 1
|
||||
}
|
||||
|
||||
return &ForwardResult{
|
||||
RequestID: requestID,
|
||||
Usage: *usage,
|
||||
Model: originalModel,
|
||||
Stream: stream,
|
||||
Duration: time.Since(startTime),
|
||||
FirstTokenMs: firstTokenMs,
|
||||
ImageCount: imageCount,
|
||||
ImageSize: imageSize,
|
||||
}, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user