diff --git a/backend/cmd/server/wire.go b/backend/cmd/server/wire.go index 1aa31ab6..d0d2df69 100644 --- a/backend/cmd/server/wire.go +++ b/backend/cmd/server/wire.go @@ -71,6 +71,7 @@ func provideCleanup( openaiOAuth *service.OpenAIOAuthService, geminiOAuth *service.GeminiOAuthService, antigravityOAuth *service.AntigravityOAuthService, + antigravityQuota *service.AntigravityQuotaRefresher, ) func() { return func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -109,6 +110,10 @@ func provideCleanup( antigravityOAuth.Stop() return nil }}, + {"AntigravityQuotaRefresher", func() error { + antigravityQuota.Stop() + return nil + }}, {"Redis", func() error { return rdb.Close() }}, diff --git a/backend/cmd/server/wire_gen.go b/backend/cmd/server/wire_gen.go index 438864be..fe4e9a34 100644 --- a/backend/cmd/server/wire_gen.go +++ b/backend/cmd/server/wire_gen.go @@ -136,7 +136,8 @@ func initializeApplication(buildInfo handler.BuildInfo) (*Application, error) { engine := server.ProvideRouter(configConfig, handlers, jwtAuthMiddleware, adminAuthMiddleware, apiKeyAuthMiddleware, apiKeyService, subscriptionService) httpServer := server.ProvideHTTPServer(configConfig, engine) tokenRefreshService := service.ProvideTokenRefreshService(accountRepository, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, configConfig) - v := provideCleanup(db, client, tokenRefreshService, pricingService, emailQueueService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService) + antigravityQuotaRefresher := service.ProvideAntigravityQuotaRefresher(accountRepository, proxyRepository, antigravityOAuthService, configConfig) + v := provideCleanup(db, client, tokenRefreshService, pricingService, emailQueueService, oAuthService, openAIOAuthService, geminiOAuthService, antigravityOAuthService, antigravityQuotaRefresher) application := &Application{ Server: httpServer, Cleanup: v, @@ -168,6 +169,7 @@ func provideCleanup( openaiOAuth *service.OpenAIOAuthService, geminiOAuth *service.GeminiOAuthService, antigravityOAuth *service.AntigravityOAuthService, + antigravityQuota *service.AntigravityQuotaRefresher, ) func() { return func() { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) @@ -205,6 +207,10 @@ func provideCleanup( antigravityOAuth.Stop() return nil }}, + {"AntigravityQuotaRefresher", func() error { + antigravityQuota.Stop() + return nil + }}, {"Redis", func() error { return rdb.Close() }}, diff --git a/backend/internal/pkg/antigravity/client.go b/backend/internal/pkg/antigravity/client.go index 7a419dba..7a2f6ca1 100644 --- a/backend/internal/pkg/antigravity/client.go +++ b/backend/internal/pkg/antigravity/client.go @@ -214,3 +214,64 @@ func (c *Client) LoadCodeAssist(ctx context.Context, accessToken string) (*LoadC return &loadResp, nil } + +// ModelQuotaInfo 模型配额信息 +type ModelQuotaInfo struct { + RemainingFraction float64 `json:"remainingFraction"` + ResetTime string `json:"resetTime,omitempty"` +} + +// ModelInfo 模型信息 +type ModelInfo struct { + QuotaInfo *ModelQuotaInfo `json:"quotaInfo,omitempty"` +} + +// FetchAvailableModelsRequest fetchAvailableModels 请求 +type FetchAvailableModelsRequest struct { + Project string `json:"project"` +} + +// FetchAvailableModelsResponse fetchAvailableModels 响应 +type FetchAvailableModelsResponse struct { + Models map[string]ModelInfo `json:"models"` +} + +// FetchAvailableModels 获取可用模型和配额信息 +func (c *Client) FetchAvailableModels(ctx context.Context, accessToken, projectID string) (*FetchAvailableModelsResponse, error) { + reqBody := FetchAvailableModelsRequest{Project: projectID} + bodyBytes, err := json.Marshal(reqBody) + if err != nil { + return nil, fmt.Errorf("序列化请求失败: %w", err) + } + + apiURL := BaseURL + "/v1internal:fetchAvailableModels" + req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiURL, strings.NewReader(string(bodyBytes))) + if err != nil { + return nil, fmt.Errorf("创建请求失败: %w", err) + } + req.Header.Set("Authorization", "Bearer "+accessToken) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", UserAgent) + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("fetchAvailableModels 请求失败: %w", err) + } + defer func() { _ = resp.Body.Close() }() + + respBodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("读取响应失败: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("fetchAvailableModels 失败 (HTTP %d): %s", resp.StatusCode, string(respBodyBytes)) + } + + var modelsResp FetchAvailableModelsResponse + if err := json.Unmarshal(respBodyBytes, &modelsResp); err != nil { + return nil, fmt.Errorf("响应解析失败: %w", err) + } + + return &modelsResp, nil +} diff --git a/backend/internal/service/antigravity_quota_refresher.go b/backend/internal/service/antigravity_quota_refresher.go new file mode 100644 index 00000000..61b21977 --- /dev/null +++ b/backend/internal/service/antigravity_quota_refresher.go @@ -0,0 +1,203 @@ +package service + +import ( + "context" + "log" + "sync" + "time" + + "github.com/Wei-Shaw/sub2api/internal/config" + "github.com/Wei-Shaw/sub2api/internal/pkg/antigravity" +) + +// AntigravityQuotaRefresher 定时刷新 Antigravity 账户的配额信息 +type AntigravityQuotaRefresher struct { + accountRepo AccountRepository + proxyRepo ProxyRepository + oauthSvc *AntigravityOAuthService + cfg *config.TokenRefreshConfig + + stopCh chan struct{} + wg sync.WaitGroup +} + +// NewAntigravityQuotaRefresher 创建配额刷新器 +func NewAntigravityQuotaRefresher( + accountRepo AccountRepository, + proxyRepo ProxyRepository, + oauthSvc *AntigravityOAuthService, + cfg *config.Config, +) *AntigravityQuotaRefresher { + return &AntigravityQuotaRefresher{ + accountRepo: accountRepo, + proxyRepo: proxyRepo, + oauthSvc: oauthSvc, + cfg: &cfg.TokenRefresh, + stopCh: make(chan struct{}), + } +} + +// Start 启动后台配额刷新服务 +func (r *AntigravityQuotaRefresher) Start() { + if !r.cfg.Enabled { + log.Println("[AntigravityQuota] Service disabled by configuration") + return + } + + r.wg.Add(1) + go r.refreshLoop() + + log.Printf("[AntigravityQuota] Service started (check every %d minutes)", r.cfg.CheckIntervalMinutes) +} + +// Stop 停止服务 +func (r *AntigravityQuotaRefresher) Stop() { + close(r.stopCh) + r.wg.Wait() + log.Println("[AntigravityQuota] Service stopped") +} + +// refreshLoop 刷新循环 +func (r *AntigravityQuotaRefresher) refreshLoop() { + defer r.wg.Done() + + checkInterval := time.Duration(r.cfg.CheckIntervalMinutes) * time.Minute + if checkInterval < time.Minute { + checkInterval = 5 * time.Minute + } + + ticker := time.NewTicker(checkInterval) + defer ticker.Stop() + + // 启动时立即执行一次 + r.processRefresh() + + for { + select { + case <-ticker.C: + r.processRefresh() + case <-r.stopCh: + return + } + } +} + +// processRefresh 执行一次刷新 +func (r *AntigravityQuotaRefresher) processRefresh() { + ctx := context.Background() + + // 查询所有 active 的账户,然后过滤 antigravity 平台 + allAccounts, err := r.accountRepo.ListActive(ctx) + if err != nil { + log.Printf("[AntigravityQuota] Failed to list accounts: %v", err) + return + } + + // 过滤 antigravity 平台账户 + var accounts []Account + for _, acc := range allAccounts { + if acc.Platform == PlatformAntigravity { + accounts = append(accounts, acc) + } + } + + if len(accounts) == 0 { + return + } + + refreshed, failed := 0, 0 + + for i := range accounts { + account := &accounts[i] + + if err := r.refreshAccountQuota(ctx, account); err != nil { + log.Printf("[AntigravityQuota] Account %d (%s) failed: %v", account.ID, account.Name, err) + failed++ + } else { + refreshed++ + } + } + + log.Printf("[AntigravityQuota] Cycle complete: total=%d, refreshed=%d, failed=%d", + len(accounts), refreshed, failed) +} + +// refreshAccountQuota 刷新单个账户的配额 +func (r *AntigravityQuotaRefresher) refreshAccountQuota(ctx context.Context, account *Account) error { + accessToken := account.GetCredential("access_token") + projectID := account.GetCredential("project_id") + + if accessToken == "" || projectID == "" { + return nil // 没有有效凭证,跳过 + } + + // 检查 token 是否过期,过期则刷新 + if r.isTokenExpired(account) { + tokenInfo, err := r.oauthSvc.RefreshAccountToken(ctx, account) + if err != nil { + return err + } + accessToken = tokenInfo.AccessToken + // 更新凭证 + account.Credentials = r.oauthSvc.BuildAccountCredentials(tokenInfo) + } + + // 获取代理 URL + var proxyURL string + if account.ProxyID != nil { + proxy, err := r.proxyRepo.GetByID(ctx, *account.ProxyID) + if err == nil && proxy != nil { + proxyURL = proxy.URL() + } + } + + // 调用 API 获取配额 + client := antigravity.NewClient(proxyURL) + modelsResp, err := client.FetchAvailableModels(ctx, accessToken, projectID) + if err != nil { + return err + } + + // 解析配额数据并更新 extra 字段 + r.updateAccountQuota(account, modelsResp) + + // 保存到数据库 + return r.accountRepo.Update(ctx, account) +} + +// isTokenExpired 检查 token 是否过期 +func (r *AntigravityQuotaRefresher) isTokenExpired(account *Account) bool { + expiresAt := parseAntigravityExpiresAt(account) + if expiresAt == nil { + return false + } + + // 提前 5 分钟认为过期 + return time.Now().Add(5 * time.Minute).After(*expiresAt) +} + +// updateAccountQuota 更新账户的配额信息 +func (r *AntigravityQuotaRefresher) updateAccountQuota(account *Account, modelsResp *antigravity.FetchAvailableModelsResponse) { + if account.Extra == nil { + account.Extra = make(map[string]any) + } + + quota := make(map[string]any) + + for modelName, modelInfo := range modelsResp.Models { + if modelInfo.QuotaInfo == nil { + continue + } + + // 转换 remainingFraction (0.0-1.0) 为百分比 (0-100) + remaining := int(modelInfo.QuotaInfo.RemainingFraction * 100) + + quota[modelName] = map[string]any{ + "remaining": remaining, + "reset_time": modelInfo.QuotaInfo.ResetTime, + } + } + + account.Extra["quota"] = quota + account.Extra["last_quota_check"] = time.Now().Format(time.RFC3339) +} diff --git a/backend/internal/service/wire.go b/backend/internal/service/wire.go index 5927dd5c..81e01d47 100644 --- a/backend/internal/service/wire.go +++ b/backend/internal/service/wire.go @@ -54,6 +54,18 @@ func ProvideTimingWheelService() *TimingWheelService { return svc } +// ProvideAntigravityQuotaRefresher creates and starts AntigravityQuotaRefresher +func ProvideAntigravityQuotaRefresher( + accountRepo AccountRepository, + proxyRepo ProxyRepository, + oauthSvc *AntigravityOAuthService, + cfg *config.Config, +) *AntigravityQuotaRefresher { + svc := NewAntigravityQuotaRefresher(accountRepo, proxyRepo, oauthSvc, cfg) + svc.Start() + return svc +} + // ProvideDeferredService creates and starts DeferredService func ProvideDeferredService(accountRepo AccountRepository, timingWheel *TimingWheelService) *DeferredService { svc := NewDeferredService(accountRepo, timingWheel, 10*time.Second) @@ -102,4 +114,5 @@ var ProviderSet = wire.NewSet( ProvideTokenRefreshService, ProvideTimingWheelService, ProvideDeferredService, + ProvideAntigravityQuotaRefresher, ) diff --git a/frontend/src/components/account/AccountUsageCell.vue b/frontend/src/components/account/AccountUsageCell.vue index 2c0162df..f46f41be 100644 --- a/frontend/src/components/account/AccountUsageCell.vue +++ b/frontend/src/components/account/AccountUsageCell.vue @@ -93,6 +93,48 @@
-
+ + +