@@ -55,23 +55,19 @@ var antigravityModelMapping = map[string]string{
// AntigravityGatewayService 处理 Antigravity 平台的 API 转发
type AntigravityGatewayService struct {
accountRepo AccountRepository
cache GatewayCache
tokenProvider * AntigravityTokenProvider
rateLimitService * RateLimitService
httpUpstream HTTPUpstream
}
func NewAntigravityGatewayService (
accountRepo AccountRepository ,
cache GatewayCache ,
_ AccountRepository ,
_ GatewayCache ,
tokenProvider * AntigravityTokenProvider ,
rateLimitService * RateLimitService ,
httpUpstream HTTPUpstream ,
) * AntigravityGatewayService {
return & AntigravityGatewayService {
accountRepo : accountRepo ,
cache : cache ,
tokenProvider : tokenProvider ,
rateLimitService : rateLimitService ,
httpUpstream : httpUpstream ,
@@ -163,33 +159,6 @@ func (s *AntigravityGatewayService) unwrapV1InternalResponse(body []byte) ([]byt
return body , nil
}
// unwrapSSELine 解包 SSE 行中的 v1internal 响应
func ( s * AntigravityGatewayService ) unwrapSSELine ( line string ) string {
if ! strings . HasPrefix ( line , "data: " ) {
return line
}
data := strings . TrimPrefix ( line , "data: " )
if data == "" || data == "[DONE]" {
return line
}
var outer map [ string ] any
if err := json . Unmarshal ( [ ] byte ( data ) , & outer ) ; err != nil {
return line
}
if resp , ok := outer [ "response" ] ; ok {
unwrapped , err := json . Marshal ( resp )
if err != nil {
return line
}
return "data: " + string ( unwrapped )
}
return line
}
// Forward 转发 Claude 协议请求( Claude → Gemini 转换)
func ( s * AntigravityGatewayService ) Forward ( ctx context . Context , c * gin . Context , account * Account , body [ ] byte ) ( * ForwardResult , error ) {
startTime := time . Now ( )
@@ -568,81 +537,6 @@ type antigravityStreamResult struct {
firstTokenMs * int
}
func ( s * AntigravityGatewayService ) handleStreamingResponse ( c * gin . Context , resp * http . Response , startTime time . Time , originalModel string ) ( * antigravityStreamResult , error ) {
c . Header ( "Content-Type" , "text/event-stream" )
c . Header ( "Cache-Control" , "no-cache" )
c . Header ( "Connection" , "keep-alive" )
c . Header ( "X-Accel-Buffering" , "no" )
c . Status ( http . StatusOK )
flusher , ok := c . Writer . ( http . Flusher )
if ! ok {
return nil , errors . New ( "streaming not supported" )
}
usage := & ClaudeUsage { }
var firstTokenMs * int
reader := bufio . NewReader ( resp . Body )
for {
line , err := reader . ReadString ( '\n' )
if err != nil && ! errors . Is ( err , io . EOF ) {
return nil , fmt . Errorf ( "stream read error: %w" , err )
}
if len ( line ) > 0 {
// 解包 v1internal 响应
unwrapped := s . unwrapSSELine ( strings . TrimRight ( line , "\r\n" ) )
// 解析 usage
if strings . HasPrefix ( unwrapped , "data: " ) {
data := strings . TrimPrefix ( unwrapped , "data: " )
if data != "" && data != "[DONE]" {
if firstTokenMs == nil {
ms := int ( time . Since ( startTime ) . Milliseconds ( ) )
firstTokenMs = & ms
}
s . parseClaudeSSEUsage ( data , usage )
}
}
// 写入响应
if _ , writeErr := fmt . Fprintf ( c . Writer , "%s\n" , unwrapped ) ; writeErr != nil {
return & antigravityStreamResult { usage : usage , firstTokenMs : firstTokenMs } , writeErr
}
flusher . Flush ( )
}
if errors . Is ( err , io . EOF ) {
break
}
}
return & antigravityStreamResult { usage : usage , firstTokenMs : firstTokenMs } , nil
}
func ( s * AntigravityGatewayService ) handleNonStreamingResponse ( c * gin . Context , resp * http . Response , originalModel string ) ( * ClaudeUsage , error ) {
body , err := io . ReadAll ( io . LimitReader ( resp . Body , 8 << 20 ) )
if err != nil {
return nil , s . writeClaudeError ( c , http . StatusBadGateway , "upstream_error" , "Failed to read upstream response" )
}
// 解包 v1internal 响应
unwrapped , err := s . unwrapV1InternalResponse ( body )
if err != nil {
return nil , s . writeClaudeError ( c , http . StatusBadGateway , "upstream_error" , "Failed to parse upstream response" )
}
// 解析 usage
var respObj struct {
Usage ClaudeUsage ` json:"usage" `
}
_ = json . Unmarshal ( unwrapped , & respObj )
c . Data ( http . StatusOK , "application/json" , unwrapped )
return & respObj . Usage , nil
}
func ( s * AntigravityGatewayService ) handleGeminiStreamingResponse ( c * gin . Context , resp * http . Response , startTime time . Time ) ( * antigravityStreamResult , error ) {
c . Status ( resp . StatusCode )
c . Header ( "Cache-Control" , "no-cache" )
@@ -734,44 +628,6 @@ func (s *AntigravityGatewayService) handleGeminiNonStreamingResponse(c *gin.Cont
return & ClaudeUsage { } , nil
}
func ( s * AntigravityGatewayService ) parseClaudeSSEUsage ( data string , usage * ClaudeUsage ) {
// 解析 message_start 获取 input tokens
var msgStart struct {
Type string ` json:"type" `
Message struct {
Usage ClaudeUsage ` json:"usage" `
} ` json:"message" `
}
if json . Unmarshal ( [ ] byte ( data ) , & msgStart ) == nil && msgStart . Type == "message_start" {
usage . InputTokens = msgStart . Message . Usage . InputTokens
usage . CacheCreationInputTokens = msgStart . Message . Usage . CacheCreationInputTokens
usage . CacheReadInputTokens = msgStart . Message . Usage . CacheReadInputTokens
}
// 解析 message_delta 获取 output tokens
var msgDelta struct {
Type string ` json:"type" `
Usage struct {
InputTokens int ` json:"input_tokens" `
OutputTokens int ` json:"output_tokens" `
CacheCreationInputTokens int ` json:"cache_creation_input_tokens" `
CacheReadInputTokens int ` json:"cache_read_input_tokens" `
} ` json:"usage" `
}
if json . Unmarshal ( [ ] byte ( data ) , & msgDelta ) == nil && msgDelta . Type == "message_delta" {
usage . OutputTokens = msgDelta . Usage . OutputTokens
if usage . InputTokens == 0 {
usage . InputTokens = msgDelta . Usage . InputTokens
}
if usage . CacheCreationInputTokens == 0 {
usage . CacheCreationInputTokens = msgDelta . Usage . CacheCreationInputTokens
}
if usage . CacheReadInputTokens == 0 {
usage . CacheReadInputTokens = msgDelta . Usage . CacheReadInputTokens
}
}
}
func ( s * AntigravityGatewayService ) writeClaudeError ( c * gin . Context , status int , errType , message string ) error {
c . JSON ( status , gin . H {
"type" : "error" ,