Files
sub2api/backend/internal/service/account_usage_service.go
shaw 30b95cf5ce fix(usage): 分离 API 响应和窗口统计缓存,修复 5h 窗口未激活时的显示 bug
问题:
1. WindowStats 与 API 响应一起缓存 10 分钟,导致费用数据更新延迟
2. 当 5h 窗口未激活(ResetsAt 为空)时,FiveHour 为 nil,导致所有窗口的 WindowStats 都无法显示

修复:
- 分离缓存:API 响应缓存 10 分钟,窗口统计独立缓存 1 分钟
- RemainingSeconds 每次请求时实时计算
- FiveHour 对象始终创建(即使 ResetsAt 为空)
- addWindowStats 增强防护,支持 FiveHour 为 nil 时仍处理其他窗口
2025-12-28 23:12:44 +08:00

388 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
"github.com/Wei-Shaw/sub2api/internal/pkg/usagestats"
)
type UsageLogRepository interface {
Create(ctx context.Context, log *UsageLog) error
GetByID(ctx context.Context, id int64) (*UsageLog, error)
Delete(ctx context.Context, id int64) error
ListByUser(ctx context.Context, userID int64, params pagination.PaginationParams) ([]UsageLog, *pagination.PaginationResult, error)
ListByApiKey(ctx context.Context, apiKeyID int64, params pagination.PaginationParams) ([]UsageLog, *pagination.PaginationResult, error)
ListByAccount(ctx context.Context, accountID int64, params pagination.PaginationParams) ([]UsageLog, *pagination.PaginationResult, error)
ListByUserAndTimeRange(ctx context.Context, userID int64, startTime, endTime time.Time) ([]UsageLog, *pagination.PaginationResult, error)
ListByApiKeyAndTimeRange(ctx context.Context, apiKeyID int64, startTime, endTime time.Time) ([]UsageLog, *pagination.PaginationResult, error)
ListByAccountAndTimeRange(ctx context.Context, accountID int64, startTime, endTime time.Time) ([]UsageLog, *pagination.PaginationResult, error)
ListByModelAndTimeRange(ctx context.Context, modelName string, startTime, endTime time.Time) ([]UsageLog, *pagination.PaginationResult, error)
GetAccountWindowStats(ctx context.Context, accountID int64, startTime time.Time) (*usagestats.AccountStats, error)
GetAccountTodayStats(ctx context.Context, accountID int64) (*usagestats.AccountStats, error)
// Admin dashboard stats
GetDashboardStats(ctx context.Context) (*usagestats.DashboardStats, error)
GetUsageTrendWithFilters(ctx context.Context, startTime, endTime time.Time, granularity string, userID, apiKeyID int64) ([]usagestats.TrendDataPoint, error)
GetModelStatsWithFilters(ctx context.Context, startTime, endTime time.Time, userID, apiKeyID, accountID int64) ([]usagestats.ModelStat, error)
GetApiKeyUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.ApiKeyUsageTrendPoint, error)
GetUserUsageTrend(ctx context.Context, startTime, endTime time.Time, granularity string, limit int) ([]usagestats.UserUsageTrendPoint, error)
GetBatchUserUsageStats(ctx context.Context, userIDs []int64) (map[int64]*usagestats.BatchUserUsageStats, error)
GetBatchApiKeyUsageStats(ctx context.Context, apiKeyIDs []int64) (map[int64]*usagestats.BatchApiKeyUsageStats, error)
// User dashboard stats
GetUserDashboardStats(ctx context.Context, userID int64) (*usagestats.UserDashboardStats, error)
GetUserUsageTrendByUserID(ctx context.Context, userID int64, startTime, endTime time.Time, granularity string) ([]usagestats.TrendDataPoint, error)
GetUserModelStats(ctx context.Context, userID int64, startTime, endTime time.Time) ([]usagestats.ModelStat, error)
// Admin usage listing/stats
ListWithFilters(ctx context.Context, params pagination.PaginationParams, filters usagestats.UsageLogFilters) ([]UsageLog, *pagination.PaginationResult, error)
GetGlobalStats(ctx context.Context, startTime, endTime time.Time) (*usagestats.UsageStats, error)
// Account stats
GetAccountUsageStats(ctx context.Context, accountID int64, startTime, endTime time.Time) (*usagestats.AccountUsageStatsResponse, error)
// Aggregated stats (optimized)
GetUserStatsAggregated(ctx context.Context, userID int64, startTime, endTime time.Time) (*usagestats.UsageStats, error)
GetApiKeyStatsAggregated(ctx context.Context, apiKeyID int64, startTime, endTime time.Time) (*usagestats.UsageStats, error)
}
// apiUsageCache 缓存从 Anthropic API 获取的使用率数据utilization, resets_at
type apiUsageCache struct {
response *ClaudeUsageResponse
timestamp time.Time
}
// windowStatsCache 缓存从本地数据库查询的窗口统计requests, tokens, cost
type windowStatsCache struct {
stats *WindowStats
timestamp time.Time
}
var (
apiCacheMap = sync.Map{} // 缓存 API 响应
windowStatsCacheMap = sync.Map{} // 缓存窗口统计
apiCacheTTL = 10 * time.Minute
windowStatsCacheTTL = 1 * time.Minute
)
// WindowStats 窗口期统计
type WindowStats struct {
Requests int64 `json:"requests"`
Tokens int64 `json:"tokens"`
Cost float64 `json:"cost"`
}
// UsageProgress 使用量进度
type UsageProgress struct {
Utilization float64 `json:"utilization"` // 使用率百分比 (0-100+100表示100%)
ResetsAt *time.Time `json:"resets_at"` // 重置时间
RemainingSeconds int `json:"remaining_seconds"` // 距重置剩余秒数
WindowStats *WindowStats `json:"window_stats,omitempty"` // 窗口期统计(从窗口开始到当前的使用量)
}
// UsageInfo 账号使用量信息
type UsageInfo struct {
UpdatedAt *time.Time `json:"updated_at,omitempty"` // 更新时间
FiveHour *UsageProgress `json:"five_hour"` // 5小时窗口
SevenDay *UsageProgress `json:"seven_day,omitempty"` // 7天窗口
SevenDaySonnet *UsageProgress `json:"seven_day_sonnet,omitempty"` // 7天Sonnet窗口
}
// ClaudeUsageResponse Anthropic API返回的usage结构
type ClaudeUsageResponse struct {
FiveHour struct {
Utilization float64 `json:"utilization"`
ResetsAt string `json:"resets_at"`
} `json:"five_hour"`
SevenDay struct {
Utilization float64 `json:"utilization"`
ResetsAt string `json:"resets_at"`
} `json:"seven_day"`
SevenDaySonnet struct {
Utilization float64 `json:"utilization"`
ResetsAt string `json:"resets_at"`
} `json:"seven_day_sonnet"`
}
// ClaudeUsageFetcher fetches usage data from Anthropic OAuth API
type ClaudeUsageFetcher interface {
FetchUsage(ctx context.Context, accessToken, proxyURL string) (*ClaudeUsageResponse, error)
}
// AccountUsageService 账号使用量查询服务
type AccountUsageService struct {
accountRepo AccountRepository
usageLogRepo UsageLogRepository
usageFetcher ClaudeUsageFetcher
}
// NewAccountUsageService 创建AccountUsageService实例
func NewAccountUsageService(accountRepo AccountRepository, usageLogRepo UsageLogRepository, usageFetcher ClaudeUsageFetcher) *AccountUsageService {
return &AccountUsageService{
accountRepo: accountRepo,
usageLogRepo: usageLogRepo,
usageFetcher: usageFetcher,
}
}
// GetUsage 获取账号使用量
// OAuth账号: 调用Anthropic API获取真实数据需要profile scopeAPI响应缓存10分钟窗口统计缓存1分钟
// Setup Token账号: 根据session_window推算5h窗口7d数据不可用没有profile scope
// API Key账号: 不支持usage查询
func (s *AccountUsageService) GetUsage(ctx context.Context, accountID int64) (*UsageInfo, error) {
account, err := s.accountRepo.GetByID(ctx, accountID)
if err != nil {
return nil, fmt.Errorf("get account failed: %w", err)
}
// 只有oauth类型账号可以通过API获取usage有profile scope
if account.CanGetUsage() {
var apiResp *ClaudeUsageResponse
// 1. 检查 API 缓存10 分钟)
if cached, ok := apiCacheMap.Load(accountID); ok {
if cache, ok := cached.(*apiUsageCache); ok && time.Since(cache.timestamp) < apiCacheTTL {
apiResp = cache.response
}
}
// 2. 如果没有缓存,从 API 获取
if apiResp == nil {
apiResp, err = s.fetchOAuthUsageRaw(ctx, account)
if err != nil {
return nil, err
}
// 缓存 API 响应
apiCacheMap.Store(accountID, &apiUsageCache{
response: apiResp,
timestamp: time.Now(),
})
}
// 3. 构建 UsageInfo每次都重新计算 RemainingSeconds
now := time.Now()
usage := s.buildUsageInfo(apiResp, &now)
// 4. 添加窗口统计有独立缓存1 分钟)
s.addWindowStats(ctx, account, usage)
return usage, nil
}
// Setup Token账号根据session_window推算没有profile scope无法调用usage API
if account.Type == AccountTypeSetupToken {
usage := s.estimateSetupTokenUsage(account)
// 添加窗口统计
s.addWindowStats(ctx, account, usage)
return usage, nil
}
// API Key账号不支持usage查询
return nil, fmt.Errorf("account type %s does not support usage query", account.Type)
}
// addWindowStats 为 usage 数据添加窗口期统计
// 使用独立缓存1 分钟),与 API 缓存分离
func (s *AccountUsageService) addWindowStats(ctx context.Context, account *Account, usage *UsageInfo) {
// 修复:即使 FiveHour 为 nil也要尝试获取统计数据
// 因为 SevenDay/SevenDaySonnet 可能需要
if usage.FiveHour == nil && usage.SevenDay == nil && usage.SevenDaySonnet == nil {
return
}
// 检查窗口统计缓存1 分钟)
var windowStats *WindowStats
if cached, ok := windowStatsCacheMap.Load(account.ID); ok {
if cache, ok := cached.(*windowStatsCache); ok && time.Since(cache.timestamp) < windowStatsCacheTTL {
windowStats = cache.stats
}
}
// 如果没有缓存,从数据库查询
if windowStats == nil {
var startTime time.Time
if account.SessionWindowStart != nil {
startTime = *account.SessionWindowStart
} else {
startTime = time.Now().Add(-5 * time.Hour)
}
stats, err := s.usageLogRepo.GetAccountWindowStats(ctx, account.ID, startTime)
if err != nil {
log.Printf("Failed to get window stats for account %d: %v", account.ID, err)
return
}
windowStats = &WindowStats{
Requests: stats.Requests,
Tokens: stats.Tokens,
Cost: stats.Cost,
}
// 缓存窗口统计1 分钟)
windowStatsCacheMap.Store(account.ID, &windowStatsCache{
stats: windowStats,
timestamp: time.Now(),
})
}
// 为 FiveHour 添加 WindowStats5h 窗口统计)
if usage.FiveHour != nil {
usage.FiveHour.WindowStats = windowStats
}
}
// GetTodayStats 获取账号今日统计
func (s *AccountUsageService) GetTodayStats(ctx context.Context, accountID int64) (*WindowStats, error) {
stats, err := s.usageLogRepo.GetAccountTodayStats(ctx, accountID)
if err != nil {
return nil, fmt.Errorf("get today stats failed: %w", err)
}
return &WindowStats{
Requests: stats.Requests,
Tokens: stats.Tokens,
Cost: stats.Cost,
}, nil
}
func (s *AccountUsageService) GetAccountUsageStats(ctx context.Context, accountID int64, startTime, endTime time.Time) (*usagestats.AccountUsageStatsResponse, error) {
stats, err := s.usageLogRepo.GetAccountUsageStats(ctx, accountID, startTime, endTime)
if err != nil {
return nil, fmt.Errorf("get account usage stats failed: %w", err)
}
return stats, nil
}
// fetchOAuthUsageRaw 从 Anthropic API 获取原始响应(不构建 UsageInfo
func (s *AccountUsageService) fetchOAuthUsageRaw(ctx context.Context, account *Account) (*ClaudeUsageResponse, error) {
accessToken := account.GetCredential("access_token")
if accessToken == "" {
return nil, fmt.Errorf("no access token available")
}
var proxyURL string
if account.ProxyID != nil && account.Proxy != nil {
proxyURL = account.Proxy.URL()
}
return s.usageFetcher.FetchUsage(ctx, accessToken, proxyURL)
}
// parseTime 尝试多种格式解析时间
func parseTime(s string) (time.Time, error) {
formats := []string{
time.RFC3339,
time.RFC3339Nano,
"2006-01-02T15:04:05Z",
"2006-01-02T15:04:05.000Z",
}
for _, format := range formats {
if t, err := time.Parse(format, s); err == nil {
return t, nil
}
}
return time.Time{}, fmt.Errorf("unable to parse time: %s", s)
}
// buildUsageInfo 构建UsageInfo
func (s *AccountUsageService) buildUsageInfo(resp *ClaudeUsageResponse, updatedAt *time.Time) *UsageInfo {
info := &UsageInfo{
UpdatedAt: updatedAt,
}
// 5小时窗口 - 始终创建对象(即使 ResetsAt 为空)
info.FiveHour = &UsageProgress{
Utilization: resp.FiveHour.Utilization,
}
if resp.FiveHour.ResetsAt != "" {
if fiveHourReset, err := parseTime(resp.FiveHour.ResetsAt); err == nil {
info.FiveHour.ResetsAt = &fiveHourReset
info.FiveHour.RemainingSeconds = int(time.Until(fiveHourReset).Seconds())
} else {
log.Printf("Failed to parse FiveHour.ResetsAt: %s, error: %v", resp.FiveHour.ResetsAt, err)
}
}
// 7天窗口
if resp.SevenDay.ResetsAt != "" {
if sevenDayReset, err := parseTime(resp.SevenDay.ResetsAt); err == nil {
info.SevenDay = &UsageProgress{
Utilization: resp.SevenDay.Utilization,
ResetsAt: &sevenDayReset,
RemainingSeconds: int(time.Until(sevenDayReset).Seconds()),
}
} else {
log.Printf("Failed to parse SevenDay.ResetsAt: %s, error: %v", resp.SevenDay.ResetsAt, err)
info.SevenDay = &UsageProgress{
Utilization: resp.SevenDay.Utilization,
}
}
}
// 7天Sonnet窗口
if resp.SevenDaySonnet.ResetsAt != "" {
if sonnetReset, err := parseTime(resp.SevenDaySonnet.ResetsAt); err == nil {
info.SevenDaySonnet = &UsageProgress{
Utilization: resp.SevenDaySonnet.Utilization,
ResetsAt: &sonnetReset,
RemainingSeconds: int(time.Until(sonnetReset).Seconds()),
}
} else {
log.Printf("Failed to parse SevenDaySonnet.ResetsAt: %s, error: %v", resp.SevenDaySonnet.ResetsAt, err)
info.SevenDaySonnet = &UsageProgress{
Utilization: resp.SevenDaySonnet.Utilization,
}
}
}
return info
}
// estimateSetupTokenUsage 根据session_window推算Setup Token账号的使用量
func (s *AccountUsageService) estimateSetupTokenUsage(account *Account) *UsageInfo {
info := &UsageInfo{}
// 如果有session_window信息
if account.SessionWindowEnd != nil {
remaining := int(time.Until(*account.SessionWindowEnd).Seconds())
if remaining < 0 {
remaining = 0
}
// 根据状态估算使用率 (百分比形式100 = 100%)
var utilization float64
switch account.SessionWindowStatus {
case "rejected":
utilization = 100.0
case "allowed_warning":
utilization = 80.0
default:
utilization = 0.0
}
info.FiveHour = &UsageProgress{
Utilization: utilization,
ResetsAt: account.SessionWindowEnd,
RemainingSeconds: remaining,
}
} else {
// 没有窗口信息,返回空数据
info.FiveHour = &UsageProgress{
Utilization: 0,
RemainingSeconds: 0,
}
}
// Setup Token无法获取7d数据
return info
}