diff --git a/backend/internal/service/ops_alert_evaluator_service.go b/backend/internal/service/ops_alert_evaluator_service.go index ceec5d87..f376c246 100644 --- a/backend/internal/service/ops_alert_evaluator_service.go +++ b/backend/internal/service/ops_alert_evaluator_service.go @@ -423,6 +423,55 @@ func (s *OpsAlertEvaluatorService) computeRuleMetric( return float64(*systemMetrics.ConcurrencyQueueDepth), true } return 0, false + case "group_available_accounts": + if groupID == nil || *groupID <= 0 { + return 0, false + } + if s == nil || s.opsService == nil { + return 0, false + } + availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) + if err != nil || availability == nil { + return 0, false + } + if availability.Group == nil { + return 0, true + } + return float64(availability.Group.AvailableCount), true + case "group_available_ratio": + if groupID == nil || *groupID <= 0 { + return 0, false + } + if s == nil || s.opsService == nil { + return 0, false + } + availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) + if err != nil || availability == nil { + return 0, false + } + return computeGroupAvailableRatio(availability.Group), true + case "account_rate_limited_count": + if s == nil || s.opsService == nil { + return 0, false + } + availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) + if err != nil || availability == nil { + return 0, false + } + return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool { + return acc.IsRateLimited + })), true + case "account_error_count": + if s == nil || s.opsService == nil { + return 0, false + } + availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID) + if err != nil || availability == nil { + return 0, false + } + return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool { + return acc.HasError && acc.TempUnschedulableUntil == nil + })), true } overview, err := s.opsRepo.GetDashboardOverview(ctx, &OpsDashboardFilter{ @@ -849,18 +898,7 @@ func computeGroupAvailableRatio(group *GroupAvailability) float64 { return (float64(group.AvailableCount) / float64(group.TotalAccounts)) * 100 } -// computeGroupRateLimitRatio returns the rate-limited percentage for a group. -// Formula: (RateLimitCount / TotalAccounts) * 100. -// Returns 0 when TotalAccounts is 0. -func computeGroupRateLimitRatio(group *GroupAvailability) float64 { - if group == nil || group.TotalAccounts <= 0 { - return 0 - } - return (float64(group.RateLimitCount) / float64(group.TotalAccounts)) * 100 -} - // countAccountsByCondition counts accounts that satisfy the given condition. -// It iterates over accounts and applies the predicate to each entry. func countAccountsByCondition(accounts map[int64]*AccountAvailability, condition func(*AccountAvailability) bool) int64 { if len(accounts) == 0 || condition == nil { return 0 diff --git a/backend/internal/service/ops_metrics_collector.go b/backend/internal/service/ops_metrics_collector.go index e55e365b..8a4dd2d0 100644 --- a/backend/internal/service/ops_metrics_collector.go +++ b/backend/internal/service/ops_metrics_collector.go @@ -44,6 +44,9 @@ type OpsMetricsCollector struct { settingRepo SettingRepository cfg *config.Config + accountRepo AccountRepository + concurrencyService *ConcurrencyService + db *sql.DB redisClient *redis.Client instanceID string @@ -62,17 +65,21 @@ type OpsMetricsCollector struct { func NewOpsMetricsCollector( opsRepo OpsRepository, settingRepo SettingRepository, + accountRepo AccountRepository, + concurrencyService *ConcurrencyService, db *sql.DB, redisClient *redis.Client, cfg *config.Config, ) *OpsMetricsCollector { return &OpsMetricsCollector{ - opsRepo: opsRepo, - settingRepo: settingRepo, - cfg: cfg, - db: db, - redisClient: redisClient, - instanceID: uuid.NewString(), + opsRepo: opsRepo, + settingRepo: settingRepo, + cfg: cfg, + accountRepo: accountRepo, + concurrencyService: concurrencyService, + db: db, + redisClient: redisClient, + instanceID: uuid.NewString(), } } @@ -287,6 +294,7 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error { tps := float64(tokenConsumed) / windowSeconds goroutines := runtime.NumGoroutine() + concurrencyQueueDepth := c.collectConcurrencyQueueDepth(ctx) input := &OpsInsertSystemMetricsInput{ CreatedAt: windowEnd, @@ -340,14 +348,79 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error { return intPtr(redisIdle) }(), - DBConnActive: intPtr(active), - DBConnIdle: intPtr(idle), - GoroutineCount: intPtr(goroutines), + DBConnActive: intPtr(active), + DBConnIdle: intPtr(idle), + GoroutineCount: intPtr(goroutines), + ConcurrencyQueueDepth: concurrencyQueueDepth, } return c.opsRepo.InsertSystemMetrics(ctx, input) } +func (c *OpsMetricsCollector) collectConcurrencyQueueDepth(parentCtx context.Context) *int { + if c == nil || c.accountRepo == nil || c.concurrencyService == nil { + return nil + } + if parentCtx == nil { + parentCtx = context.Background() + } + + // Best-effort: never let concurrency sampling break the metrics collector. + ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second) + defer cancel() + + accounts, err := c.accountRepo.ListSchedulable(ctx) + if err != nil { + return nil + } + if len(accounts) == 0 { + zero := 0 + return &zero + } + + batch := make([]AccountWithConcurrency, 0, len(accounts)) + for _, acc := range accounts { + if acc.ID <= 0 { + continue + } + maxConc := acc.Concurrency + if maxConc < 0 { + maxConc = 0 + } + batch = append(batch, AccountWithConcurrency{ + ID: acc.ID, + MaxConcurrency: maxConc, + }) + } + if len(batch) == 0 { + zero := 0 + return &zero + } + + loadMap, err := c.concurrencyService.GetAccountsLoadBatch(ctx, batch) + if err != nil { + return nil + } + + var total int64 + for _, info := range loadMap { + if info == nil || info.WaitingCount <= 0 { + continue + } + total += int64(info.WaitingCount) + } + if total < 0 { + total = 0 + } + + maxInt := int64(^uint(0) >> 1) + if total > maxInt { + total = maxInt + } + v := int(total) + return &v +} + type opsCollectedPercentiles struct { p50 *int p90 *int @@ -459,9 +532,9 @@ SELECT COALESCE(COUNT(*), 0) AS error_total, COALESCE(COUNT(*) FILTER (WHERE is_business_limited), 0) AS business_limited, COALESCE(COUNT(*) FILTER (WHERE NOT is_business_limited), 0) AS error_sla, - COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl, - COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 429), 0) AS upstream_429, - COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 529), 0) AS upstream_529 + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429), 0) AS upstream_429, + COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529), 0) AS upstream_529 FROM ops_error_logs WHERE created_at >= $1 AND created_at < $2`