feat(ops): 增强告警评估和指标收集服务功能

This commit is contained in:
IanShaw027
2026-01-11 20:56:02 +08:00
parent f5e45c1a8a
commit e4ed35fe01
2 changed files with 134 additions and 23 deletions

View File

@@ -423,6 +423,55 @@ func (s *OpsAlertEvaluatorService) computeRuleMetric(
return float64(*systemMetrics.ConcurrencyQueueDepth), true
}
return 0, false
case "group_available_accounts":
if groupID == nil || *groupID <= 0 {
return 0, false
}
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
if availability.Group == nil {
return 0, true
}
return float64(availability.Group.AvailableCount), true
case "group_available_ratio":
if groupID == nil || *groupID <= 0 {
return 0, false
}
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
return computeGroupAvailableRatio(availability.Group), true
case "account_rate_limited_count":
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool {
return acc.IsRateLimited
})), true
case "account_error_count":
if s == nil || s.opsService == nil {
return 0, false
}
availability, err := s.opsService.GetAccountAvailability(ctx, platform, groupID)
if err != nil || availability == nil {
return 0, false
}
return float64(countAccountsByCondition(availability.Accounts, func(acc *AccountAvailability) bool {
return acc.HasError && acc.TempUnschedulableUntil == nil
})), true
}
overview, err := s.opsRepo.GetDashboardOverview(ctx, &OpsDashboardFilter{
@@ -849,18 +898,7 @@ func computeGroupAvailableRatio(group *GroupAvailability) float64 {
return (float64(group.AvailableCount) / float64(group.TotalAccounts)) * 100
}
// computeGroupRateLimitRatio returns the rate-limited percentage for a group.
// Formula: (RateLimitCount / TotalAccounts) * 100.
// Returns 0 when TotalAccounts is 0.
func computeGroupRateLimitRatio(group *GroupAvailability) float64 {
if group == nil || group.TotalAccounts <= 0 {
return 0
}
return (float64(group.RateLimitCount) / float64(group.TotalAccounts)) * 100
}
// countAccountsByCondition counts accounts that satisfy the given condition.
// It iterates over accounts and applies the predicate to each entry.
func countAccountsByCondition(accounts map[int64]*AccountAvailability, condition func(*AccountAvailability) bool) int64 {
if len(accounts) == 0 || condition == nil {
return 0

View File

@@ -44,6 +44,9 @@ type OpsMetricsCollector struct {
settingRepo SettingRepository
cfg *config.Config
accountRepo AccountRepository
concurrencyService *ConcurrencyService
db *sql.DB
redisClient *redis.Client
instanceID string
@@ -62,17 +65,21 @@ type OpsMetricsCollector struct {
func NewOpsMetricsCollector(
opsRepo OpsRepository,
settingRepo SettingRepository,
accountRepo AccountRepository,
concurrencyService *ConcurrencyService,
db *sql.DB,
redisClient *redis.Client,
cfg *config.Config,
) *OpsMetricsCollector {
return &OpsMetricsCollector{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
db: db,
redisClient: redisClient,
instanceID: uuid.NewString(),
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
accountRepo: accountRepo,
concurrencyService: concurrencyService,
db: db,
redisClient: redisClient,
instanceID: uuid.NewString(),
}
}
@@ -287,6 +294,7 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
tps := float64(tokenConsumed) / windowSeconds
goroutines := runtime.NumGoroutine()
concurrencyQueueDepth := c.collectConcurrencyQueueDepth(ctx)
input := &OpsInsertSystemMetricsInput{
CreatedAt: windowEnd,
@@ -340,14 +348,79 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
return intPtr(redisIdle)
}(),
DBConnActive: intPtr(active),
DBConnIdle: intPtr(idle),
GoroutineCount: intPtr(goroutines),
DBConnActive: intPtr(active),
DBConnIdle: intPtr(idle),
GoroutineCount: intPtr(goroutines),
ConcurrencyQueueDepth: concurrencyQueueDepth,
}
return c.opsRepo.InsertSystemMetrics(ctx, input)
}
func (c *OpsMetricsCollector) collectConcurrencyQueueDepth(parentCtx context.Context) *int {
if c == nil || c.accountRepo == nil || c.concurrencyService == nil {
return nil
}
if parentCtx == nil {
parentCtx = context.Background()
}
// Best-effort: never let concurrency sampling break the metrics collector.
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
defer cancel()
accounts, err := c.accountRepo.ListSchedulable(ctx)
if err != nil {
return nil
}
if len(accounts) == 0 {
zero := 0
return &zero
}
batch := make([]AccountWithConcurrency, 0, len(accounts))
for _, acc := range accounts {
if acc.ID <= 0 {
continue
}
maxConc := acc.Concurrency
if maxConc < 0 {
maxConc = 0
}
batch = append(batch, AccountWithConcurrency{
ID: acc.ID,
MaxConcurrency: maxConc,
})
}
if len(batch) == 0 {
zero := 0
return &zero
}
loadMap, err := c.concurrencyService.GetAccountsLoadBatch(ctx, batch)
if err != nil {
return nil
}
var total int64
for _, info := range loadMap {
if info == nil || info.WaitingCount <= 0 {
continue
}
total += int64(info.WaitingCount)
}
if total < 0 {
total = 0
}
maxInt := int64(^uint(0) >> 1)
if total > maxInt {
total = maxInt
}
v := int(total)
return &v
}
type opsCollectedPercentiles struct {
p50 *int
p90 *int
@@ -459,9 +532,9 @@ SELECT
COALESCE(COUNT(*), 0) AS error_total,
COALESCE(COUNT(*) FILTER (WHERE is_business_limited), 0) AS business_limited,
COALESCE(COUNT(*) FILTER (WHERE NOT is_business_limited), 0) AS error_sla,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 429), 0) AS upstream_429,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(status_code, 0) = 529), 0) AS upstream_529
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429), 0) AS upstream_429,
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529), 0) AS upstream_529
FROM ops_error_logs
WHERE created_at >= $1 AND created_at < $2`