feat(ops): add account switch metrics and trend
This commit is contained in:
@@ -43,6 +43,7 @@ INSERT INTO ops_system_metrics (
|
||||
upstream_529_count,
|
||||
|
||||
token_consumed,
|
||||
account_switch_count,
|
||||
qps,
|
||||
tps,
|
||||
|
||||
@@ -81,14 +82,14 @@ INSERT INTO ops_system_metrics (
|
||||
$1,$2,$3,$4,
|
||||
$5,$6,$7,$8,
|
||||
$9,$10,$11,
|
||||
$12,$13,$14,
|
||||
$15,$16,$17,$18,$19,$20,
|
||||
$21,$22,$23,$24,$25,$26,
|
||||
$27,$28,$29,$30,
|
||||
$31,$32,
|
||||
$33,$34,
|
||||
$35,$36,$37,
|
||||
$38,$39
|
||||
$12,$13,$14,$15,
|
||||
$16,$17,$18,$19,$20,$21,
|
||||
$22,$23,$24,$25,$26,$27,
|
||||
$28,$29,$30,$31,
|
||||
$32,$33,
|
||||
$34,$35,
|
||||
$36,$37,$38,
|
||||
$39,$40
|
||||
)`
|
||||
|
||||
_, err := r.db.ExecContext(
|
||||
@@ -109,6 +110,7 @@ INSERT INTO ops_system_metrics (
|
||||
input.Upstream529Count,
|
||||
|
||||
input.TokenConsumed,
|
||||
input.AccountSwitchCount,
|
||||
opsNullFloat64(input.QPS),
|
||||
opsNullFloat64(input.TPS),
|
||||
|
||||
@@ -177,7 +179,8 @@ SELECT
|
||||
db_conn_waiting,
|
||||
|
||||
goroutine_count,
|
||||
concurrency_queue_depth
|
||||
concurrency_queue_depth,
|
||||
account_switch_count
|
||||
FROM ops_system_metrics
|
||||
WHERE window_minutes = $1
|
||||
AND platform IS NULL
|
||||
@@ -199,6 +202,7 @@ LIMIT 1`
|
||||
var dbWaiting sql.NullInt64
|
||||
var goroutines sql.NullInt64
|
||||
var queueDepth sql.NullInt64
|
||||
var accountSwitchCount sql.NullInt64
|
||||
|
||||
if err := r.db.QueryRowContext(ctx, q, windowMinutes).Scan(
|
||||
&out.ID,
|
||||
@@ -217,6 +221,7 @@ LIMIT 1`
|
||||
&dbWaiting,
|
||||
&goroutines,
|
||||
&queueDepth,
|
||||
&accountSwitchCount,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -273,6 +278,10 @@ LIMIT 1`
|
||||
v := int(queueDepth.Int64)
|
||||
out.ConcurrencyQueueDepth = &v
|
||||
}
|
||||
if accountSwitchCount.Valid {
|
||||
v := accountSwitchCount.Int64
|
||||
out.AccountSwitchCount = &v
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
@@ -56,18 +56,44 @@ error_buckets AS (
|
||||
AND COALESCE(status_code, 0) >= 400
|
||||
GROUP BY 1
|
||||
),
|
||||
switch_buckets AS (
|
||||
SELECT ` + errorBucketExpr + ` AS bucket,
|
||||
COALESCE(SUM(CASE
|
||||
WHEN ev->>'kind' IN ('failover', 'retry_exhausted_failover', 'failover_on_400') THEN 1
|
||||
ELSE 0
|
||||
END), 0) AS switch_count
|
||||
FROM ops_error_logs
|
||||
CROSS JOIN LATERAL jsonb_array_elements(
|
||||
COALESCE(NULLIF(upstream_errors, 'null'::jsonb), '[]'::jsonb)
|
||||
) AS ev
|
||||
` + errorWhere + `
|
||||
AND upstream_errors IS NOT NULL
|
||||
GROUP BY 1
|
||||
),
|
||||
combined AS (
|
||||
SELECT COALESCE(u.bucket, e.bucket) AS bucket,
|
||||
COALESCE(u.success_count, 0) AS success_count,
|
||||
COALESCE(e.error_count, 0) AS error_count,
|
||||
COALESCE(u.token_consumed, 0) AS token_consumed
|
||||
FROM usage_buckets u
|
||||
FULL OUTER JOIN error_buckets e ON u.bucket = e.bucket
|
||||
SELECT
|
||||
bucket,
|
||||
SUM(success_count) AS success_count,
|
||||
SUM(error_count) AS error_count,
|
||||
SUM(token_consumed) AS token_consumed,
|
||||
SUM(switch_count) AS switch_count
|
||||
FROM (
|
||||
SELECT bucket, success_count, 0 AS error_count, token_consumed, 0 AS switch_count
|
||||
FROM usage_buckets
|
||||
UNION ALL
|
||||
SELECT bucket, 0, error_count, 0, 0
|
||||
FROM error_buckets
|
||||
UNION ALL
|
||||
SELECT bucket, 0, 0, 0, switch_count
|
||||
FROM switch_buckets
|
||||
) t
|
||||
GROUP BY bucket
|
||||
)
|
||||
SELECT
|
||||
bucket,
|
||||
(success_count + error_count) AS request_count,
|
||||
token_consumed
|
||||
token_consumed,
|
||||
switch_count
|
||||
FROM combined
|
||||
ORDER BY bucket ASC`
|
||||
|
||||
@@ -84,13 +110,18 @@ ORDER BY bucket ASC`
|
||||
var bucket time.Time
|
||||
var requests int64
|
||||
var tokens sql.NullInt64
|
||||
if err := rows.Scan(&bucket, &requests, &tokens); err != nil {
|
||||
var switches sql.NullInt64
|
||||
if err := rows.Scan(&bucket, &requests, &tokens, &switches); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tokenConsumed := int64(0)
|
||||
if tokens.Valid {
|
||||
tokenConsumed = tokens.Int64
|
||||
}
|
||||
switchCount := int64(0)
|
||||
if switches.Valid {
|
||||
switchCount = switches.Int64
|
||||
}
|
||||
|
||||
denom := float64(bucketSeconds)
|
||||
if denom <= 0 {
|
||||
@@ -103,6 +134,7 @@ ORDER BY bucket ASC`
|
||||
BucketStart: bucket.UTC(),
|
||||
RequestCount: requests,
|
||||
TokenConsumed: tokenConsumed,
|
||||
SwitchCount: switchCount,
|
||||
QPS: qps,
|
||||
TPS: tps,
|
||||
})
|
||||
@@ -385,6 +417,7 @@ func fillOpsThroughputBuckets(start, end time.Time, bucketSeconds int, points []
|
||||
BucketStart: cursor,
|
||||
RequestCount: 0,
|
||||
TokenConsumed: 0,
|
||||
SwitchCount: 0,
|
||||
QPS: 0,
|
||||
TPS: 0,
|
||||
})
|
||||
|
||||
@@ -285,6 +285,11 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
|
||||
return fmt.Errorf("query error counts: %w", err)
|
||||
}
|
||||
|
||||
accountSwitchCount, err := c.queryAccountSwitchCount(ctx, windowStart, windowEnd)
|
||||
if err != nil {
|
||||
return fmt.Errorf("query account switch counts: %w", err)
|
||||
}
|
||||
|
||||
windowSeconds := windowEnd.Sub(windowStart).Seconds()
|
||||
if windowSeconds <= 0 {
|
||||
windowSeconds = 60
|
||||
@@ -310,6 +315,7 @@ func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
|
||||
Upstream529Count: upstream529,
|
||||
|
||||
TokenConsumed: tokenConsumed,
|
||||
AccountSwitchCount: accountSwitchCount,
|
||||
QPS: float64Ptr(roundTo1DP(qps)),
|
||||
TPS: float64Ptr(roundTo1DP(tps)),
|
||||
|
||||
@@ -551,6 +557,27 @@ WHERE created_at >= $1 AND created_at < $2`
|
||||
return errorTotal, businessLimited, errorSLA, upstreamExcl429529, upstream429, upstream529, nil
|
||||
}
|
||||
|
||||
func (c *OpsMetricsCollector) queryAccountSwitchCount(ctx context.Context, start, end time.Time) (int64, error) {
|
||||
q := `
|
||||
SELECT
|
||||
COALESCE(SUM(CASE
|
||||
WHEN ev->>'kind' IN ('failover', 'retry_exhausted_failover', 'failover_on_400') THEN 1
|
||||
ELSE 0
|
||||
END), 0) AS switch_count
|
||||
FROM ops_error_logs o
|
||||
CROSS JOIN LATERAL jsonb_array_elements(
|
||||
COALESCE(NULLIF(o.upstream_errors, 'null'::jsonb), '[]'::jsonb)
|
||||
) AS ev
|
||||
WHERE o.created_at >= $1 AND o.created_at < $2
|
||||
AND o.is_count_tokens = FALSE`
|
||||
|
||||
var count int64
|
||||
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&count); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
type opsCollectedSystemStats struct {
|
||||
cpuUsagePercent *float64
|
||||
memoryUsedMB *int64
|
||||
|
||||
@@ -162,6 +162,7 @@ type OpsInsertSystemMetricsInput struct {
|
||||
Upstream529Count int64
|
||||
|
||||
TokenConsumed int64
|
||||
AccountSwitchCount int64
|
||||
|
||||
QPS *float64
|
||||
TPS *float64
|
||||
@@ -225,6 +226,7 @@ type OpsSystemMetricsSnapshot struct {
|
||||
|
||||
GoroutineCount *int `json:"goroutine_count"`
|
||||
ConcurrencyQueueDepth *int `json:"concurrency_queue_depth"`
|
||||
AccountSwitchCount *int64 `json:"account_switch_count"`
|
||||
}
|
||||
|
||||
type OpsUpsertJobHeartbeatInput struct {
|
||||
|
||||
@@ -6,6 +6,7 @@ type OpsThroughputTrendPoint struct {
|
||||
BucketStart time.Time `json:"bucket_start"`
|
||||
RequestCount int64 `json:"request_count"`
|
||||
TokenConsumed int64 `json:"token_consumed"`
|
||||
SwitchCount int64 `json:"switch_count"`
|
||||
QPS float64 `json:"qps"`
|
||||
TPS float64 `json:"tps"`
|
||||
}
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
-- ops_system_metrics 增加账号切换次数统计(按分钟窗口)
|
||||
ALTER TABLE ops_system_metrics
|
||||
ADD COLUMN IF NOT EXISTS account_switch_count BIGINT NOT NULL DEFAULT 0;
|
||||
Reference in New Issue
Block a user