948 lines
23 KiB
Go
948 lines
23 KiB
Go
package service
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"os"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/Wei-Shaw/sub2api/internal/config"
|
|
"github.com/google/uuid"
|
|
"github.com/redis/go-redis/v9"
|
|
"github.com/shirou/gopsutil/v4/cpu"
|
|
"github.com/shirou/gopsutil/v4/mem"
|
|
)
|
|
|
|
const (
|
|
opsMetricsCollectorJobName = "ops_metrics_collector"
|
|
opsMetricsCollectorMinInterval = 60 * time.Second
|
|
opsMetricsCollectorMaxInterval = 1 * time.Hour
|
|
|
|
opsMetricsCollectorTimeout = 10 * time.Second
|
|
|
|
opsMetricsCollectorLeaderLockKey = "ops:metrics:collector:leader"
|
|
opsMetricsCollectorLeaderLockTTL = 90 * time.Second
|
|
|
|
opsMetricsCollectorHeartbeatTimeout = 2 * time.Second
|
|
|
|
bytesPerMB = 1024 * 1024
|
|
)
|
|
|
|
var opsMetricsCollectorAdvisoryLockID = hashAdvisoryLockID(opsMetricsCollectorLeaderLockKey)
|
|
|
|
type OpsMetricsCollector struct {
|
|
opsRepo OpsRepository
|
|
settingRepo SettingRepository
|
|
cfg *config.Config
|
|
|
|
accountRepo AccountRepository
|
|
concurrencyService *ConcurrencyService
|
|
|
|
db *sql.DB
|
|
redisClient *redis.Client
|
|
instanceID string
|
|
|
|
lastCgroupCPUUsageNanos uint64
|
|
lastCgroupCPUSampleAt time.Time
|
|
|
|
stopCh chan struct{}
|
|
startOnce sync.Once
|
|
stopOnce sync.Once
|
|
|
|
skipLogMu sync.Mutex
|
|
skipLogAt time.Time
|
|
}
|
|
|
|
func NewOpsMetricsCollector(
|
|
opsRepo OpsRepository,
|
|
settingRepo SettingRepository,
|
|
accountRepo AccountRepository,
|
|
concurrencyService *ConcurrencyService,
|
|
db *sql.DB,
|
|
redisClient *redis.Client,
|
|
cfg *config.Config,
|
|
) *OpsMetricsCollector {
|
|
return &OpsMetricsCollector{
|
|
opsRepo: opsRepo,
|
|
settingRepo: settingRepo,
|
|
cfg: cfg,
|
|
accountRepo: accountRepo,
|
|
concurrencyService: concurrencyService,
|
|
db: db,
|
|
redisClient: redisClient,
|
|
instanceID: uuid.NewString(),
|
|
}
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) Start() {
|
|
if c == nil {
|
|
return
|
|
}
|
|
c.startOnce.Do(func() {
|
|
if c.stopCh == nil {
|
|
c.stopCh = make(chan struct{})
|
|
}
|
|
go c.run()
|
|
})
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) Stop() {
|
|
if c == nil {
|
|
return
|
|
}
|
|
c.stopOnce.Do(func() {
|
|
if c.stopCh != nil {
|
|
close(c.stopCh)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) run() {
|
|
// First run immediately so the dashboard has data soon after startup.
|
|
c.collectOnce()
|
|
|
|
for {
|
|
interval := c.getInterval()
|
|
timer := time.NewTimer(interval)
|
|
select {
|
|
case <-timer.C:
|
|
c.collectOnce()
|
|
case <-c.stopCh:
|
|
timer.Stop()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) getInterval() time.Duration {
|
|
interval := opsMetricsCollectorMinInterval
|
|
|
|
if c.settingRepo == nil {
|
|
return interval
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
|
|
raw, err := c.settingRepo.GetValue(ctx, SettingKeyOpsMetricsIntervalSeconds)
|
|
if err != nil {
|
|
return interval
|
|
}
|
|
raw = strings.TrimSpace(raw)
|
|
if raw == "" {
|
|
return interval
|
|
}
|
|
|
|
seconds, err := strconv.Atoi(raw)
|
|
if err != nil {
|
|
return interval
|
|
}
|
|
if seconds < int(opsMetricsCollectorMinInterval.Seconds()) {
|
|
seconds = int(opsMetricsCollectorMinInterval.Seconds())
|
|
}
|
|
if seconds > int(opsMetricsCollectorMaxInterval.Seconds()) {
|
|
seconds = int(opsMetricsCollectorMaxInterval.Seconds())
|
|
}
|
|
return time.Duration(seconds) * time.Second
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) collectOnce() {
|
|
if c == nil {
|
|
return
|
|
}
|
|
if c.cfg != nil && !c.cfg.Ops.Enabled {
|
|
return
|
|
}
|
|
if c.opsRepo == nil {
|
|
return
|
|
}
|
|
if c.db == nil {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), opsMetricsCollectorTimeout)
|
|
defer cancel()
|
|
|
|
if !c.isMonitoringEnabled(ctx) {
|
|
return
|
|
}
|
|
|
|
release, ok := c.tryAcquireLeaderLock(ctx)
|
|
if !ok {
|
|
return
|
|
}
|
|
if release != nil {
|
|
defer release()
|
|
}
|
|
|
|
startedAt := time.Now().UTC()
|
|
err := c.collectAndPersist(ctx)
|
|
finishedAt := time.Now().UTC()
|
|
|
|
durationMs := finishedAt.Sub(startedAt).Milliseconds()
|
|
dur := durationMs
|
|
runAt := startedAt
|
|
|
|
if err != nil {
|
|
msg := truncateString(err.Error(), 2048)
|
|
errAt := finishedAt
|
|
hbCtx, hbCancel := context.WithTimeout(context.Background(), opsMetricsCollectorHeartbeatTimeout)
|
|
defer hbCancel()
|
|
_ = c.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
|
|
JobName: opsMetricsCollectorJobName,
|
|
LastRunAt: &runAt,
|
|
LastErrorAt: &errAt,
|
|
LastError: &msg,
|
|
LastDurationMs: &dur,
|
|
})
|
|
log.Printf("[OpsMetricsCollector] collect failed: %v", err)
|
|
return
|
|
}
|
|
|
|
successAt := finishedAt
|
|
hbCtx, hbCancel := context.WithTimeout(context.Background(), opsMetricsCollectorHeartbeatTimeout)
|
|
defer hbCancel()
|
|
_ = c.opsRepo.UpsertJobHeartbeat(hbCtx, &OpsUpsertJobHeartbeatInput{
|
|
JobName: opsMetricsCollectorJobName,
|
|
LastRunAt: &runAt,
|
|
LastSuccessAt: &successAt,
|
|
LastDurationMs: &dur,
|
|
})
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) isMonitoringEnabled(ctx context.Context) bool {
|
|
if c == nil {
|
|
return false
|
|
}
|
|
if c.cfg != nil && !c.cfg.Ops.Enabled {
|
|
return false
|
|
}
|
|
if c.settingRepo == nil {
|
|
return true
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
value, err := c.settingRepo.GetValue(ctx, SettingKeyOpsMonitoringEnabled)
|
|
if err != nil {
|
|
if errors.Is(err, ErrSettingNotFound) {
|
|
return true
|
|
}
|
|
// Fail-open: collector should not become a hard dependency.
|
|
return true
|
|
}
|
|
switch strings.ToLower(strings.TrimSpace(value)) {
|
|
case "false", "0", "off", "disabled":
|
|
return false
|
|
default:
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) collectAndPersist(ctx context.Context) error {
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
// Align to stable minute boundaries to avoid partial buckets and to maximize cache hits.
|
|
now := time.Now().UTC()
|
|
windowEnd := now.Truncate(time.Minute)
|
|
windowStart := windowEnd.Add(-1 * time.Minute)
|
|
|
|
sys, err := c.collectSystemStats(ctx)
|
|
if err != nil {
|
|
// Continue; system stats are best-effort.
|
|
log.Printf("[OpsMetricsCollector] system stats error: %v", err)
|
|
}
|
|
|
|
dbOK := c.checkDB(ctx)
|
|
redisOK := c.checkRedis(ctx)
|
|
active, idle := c.dbPoolStats()
|
|
redisTotal, redisIdle, redisStatsOK := c.redisPoolStats()
|
|
|
|
successCount, tokenConsumed, err := c.queryUsageCounts(ctx, windowStart, windowEnd)
|
|
if err != nil {
|
|
return fmt.Errorf("query usage counts: %w", err)
|
|
}
|
|
|
|
duration, ttft, err := c.queryUsageLatency(ctx, windowStart, windowEnd)
|
|
if err != nil {
|
|
return fmt.Errorf("query usage latency: %w", err)
|
|
}
|
|
|
|
errorTotal, businessLimited, errorSLA, upstreamExcl, upstream429, upstream529, err := c.queryErrorCounts(ctx, windowStart, windowEnd)
|
|
if err != nil {
|
|
return fmt.Errorf("query error counts: %w", err)
|
|
}
|
|
|
|
accountSwitchCount, err := c.queryAccountSwitchCount(ctx, windowStart, windowEnd)
|
|
if err != nil {
|
|
return fmt.Errorf("query account switch counts: %w", err)
|
|
}
|
|
|
|
windowSeconds := windowEnd.Sub(windowStart).Seconds()
|
|
if windowSeconds <= 0 {
|
|
windowSeconds = 60
|
|
}
|
|
requestTotal := successCount + errorTotal
|
|
qps := float64(requestTotal) / windowSeconds
|
|
tps := float64(tokenConsumed) / windowSeconds
|
|
|
|
goroutines := runtime.NumGoroutine()
|
|
concurrencyQueueDepth := c.collectConcurrencyQueueDepth(ctx)
|
|
|
|
input := &OpsInsertSystemMetricsInput{
|
|
CreatedAt: windowEnd,
|
|
WindowMinutes: 1,
|
|
|
|
SuccessCount: successCount,
|
|
ErrorCountTotal: errorTotal,
|
|
BusinessLimitedCount: businessLimited,
|
|
ErrorCountSLA: errorSLA,
|
|
|
|
UpstreamErrorCountExcl429529: upstreamExcl,
|
|
Upstream429Count: upstream429,
|
|
Upstream529Count: upstream529,
|
|
|
|
TokenConsumed: tokenConsumed,
|
|
AccountSwitchCount: accountSwitchCount,
|
|
QPS: float64Ptr(roundTo1DP(qps)),
|
|
TPS: float64Ptr(roundTo1DP(tps)),
|
|
|
|
DurationP50Ms: duration.p50,
|
|
DurationP90Ms: duration.p90,
|
|
DurationP95Ms: duration.p95,
|
|
DurationP99Ms: duration.p99,
|
|
DurationAvgMs: duration.avg,
|
|
DurationMaxMs: duration.max,
|
|
|
|
TTFTP50Ms: ttft.p50,
|
|
TTFTP90Ms: ttft.p90,
|
|
TTFTP95Ms: ttft.p95,
|
|
TTFTP99Ms: ttft.p99,
|
|
TTFTAvgMs: ttft.avg,
|
|
TTFTMaxMs: ttft.max,
|
|
|
|
CPUUsagePercent: sys.cpuUsagePercent,
|
|
MemoryUsedMB: sys.memoryUsedMB,
|
|
MemoryTotalMB: sys.memoryTotalMB,
|
|
MemoryUsagePercent: sys.memoryUsagePercent,
|
|
|
|
DBOK: boolPtr(dbOK),
|
|
RedisOK: boolPtr(redisOK),
|
|
|
|
RedisConnTotal: func() *int {
|
|
if !redisStatsOK {
|
|
return nil
|
|
}
|
|
return intPtr(redisTotal)
|
|
}(),
|
|
RedisConnIdle: func() *int {
|
|
if !redisStatsOK {
|
|
return nil
|
|
}
|
|
return intPtr(redisIdle)
|
|
}(),
|
|
|
|
DBConnActive: intPtr(active),
|
|
DBConnIdle: intPtr(idle),
|
|
GoroutineCount: intPtr(goroutines),
|
|
ConcurrencyQueueDepth: concurrencyQueueDepth,
|
|
}
|
|
|
|
return c.opsRepo.InsertSystemMetrics(ctx, input)
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) collectConcurrencyQueueDepth(parentCtx context.Context) *int {
|
|
if c == nil || c.accountRepo == nil || c.concurrencyService == nil {
|
|
return nil
|
|
}
|
|
if parentCtx == nil {
|
|
parentCtx = context.Background()
|
|
}
|
|
|
|
// Best-effort: never let concurrency sampling break the metrics collector.
|
|
ctx, cancel := context.WithTimeout(parentCtx, 2*time.Second)
|
|
defer cancel()
|
|
|
|
accounts, err := c.accountRepo.ListSchedulable(ctx)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
if len(accounts) == 0 {
|
|
zero := 0
|
|
return &zero
|
|
}
|
|
|
|
batch := make([]AccountWithConcurrency, 0, len(accounts))
|
|
for _, acc := range accounts {
|
|
if acc.ID <= 0 {
|
|
continue
|
|
}
|
|
maxConc := acc.Concurrency
|
|
if maxConc < 0 {
|
|
maxConc = 0
|
|
}
|
|
batch = append(batch, AccountWithConcurrency{
|
|
ID: acc.ID,
|
|
MaxConcurrency: maxConc,
|
|
})
|
|
}
|
|
if len(batch) == 0 {
|
|
zero := 0
|
|
return &zero
|
|
}
|
|
|
|
loadMap, err := c.concurrencyService.GetAccountsLoadBatch(ctx, batch)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
|
|
var total int64
|
|
for _, info := range loadMap {
|
|
if info == nil || info.WaitingCount <= 0 {
|
|
continue
|
|
}
|
|
total += int64(info.WaitingCount)
|
|
}
|
|
if total < 0 {
|
|
total = 0
|
|
}
|
|
|
|
maxInt := int64(^uint(0) >> 1)
|
|
if total > maxInt {
|
|
total = maxInt
|
|
}
|
|
v := int(total)
|
|
return &v
|
|
}
|
|
|
|
type opsCollectedPercentiles struct {
|
|
p50 *int
|
|
p90 *int
|
|
p95 *int
|
|
p99 *int
|
|
avg *float64
|
|
max *int
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) queryUsageCounts(ctx context.Context, start, end time.Time) (successCount int64, tokenConsumed int64, err error) {
|
|
q := `
|
|
SELECT
|
|
COALESCE(COUNT(*), 0) AS success_count,
|
|
COALESCE(SUM(input_tokens + output_tokens + cache_creation_tokens + cache_read_tokens), 0) AS token_consumed
|
|
FROM usage_logs
|
|
WHERE created_at >= $1 AND created_at < $2`
|
|
|
|
var tokens sql.NullInt64
|
|
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&successCount, &tokens); err != nil {
|
|
return 0, 0, err
|
|
}
|
|
if tokens.Valid {
|
|
tokenConsumed = tokens.Int64
|
|
}
|
|
return successCount, tokenConsumed, nil
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) queryUsageLatency(ctx context.Context, start, end time.Time) (duration opsCollectedPercentiles, ttft opsCollectedPercentiles, err error) {
|
|
{
|
|
q := `
|
|
SELECT
|
|
percentile_cont(0.50) WITHIN GROUP (ORDER BY duration_ms) AS p50,
|
|
percentile_cont(0.90) WITHIN GROUP (ORDER BY duration_ms) AS p90,
|
|
percentile_cont(0.95) WITHIN GROUP (ORDER BY duration_ms) AS p95,
|
|
percentile_cont(0.99) WITHIN GROUP (ORDER BY duration_ms) AS p99,
|
|
AVG(duration_ms) AS avg_ms,
|
|
MAX(duration_ms) AS max_ms
|
|
FROM usage_logs
|
|
WHERE created_at >= $1 AND created_at < $2
|
|
AND duration_ms IS NOT NULL`
|
|
|
|
var p50, p90, p95, p99 sql.NullFloat64
|
|
var avg sql.NullFloat64
|
|
var max sql.NullInt64
|
|
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil {
|
|
return opsCollectedPercentiles{}, opsCollectedPercentiles{}, err
|
|
}
|
|
duration.p50 = floatToIntPtr(p50)
|
|
duration.p90 = floatToIntPtr(p90)
|
|
duration.p95 = floatToIntPtr(p95)
|
|
duration.p99 = floatToIntPtr(p99)
|
|
if avg.Valid {
|
|
v := roundTo1DP(avg.Float64)
|
|
duration.avg = &v
|
|
}
|
|
if max.Valid {
|
|
v := int(max.Int64)
|
|
duration.max = &v
|
|
}
|
|
}
|
|
|
|
{
|
|
q := `
|
|
SELECT
|
|
percentile_cont(0.50) WITHIN GROUP (ORDER BY first_token_ms) AS p50,
|
|
percentile_cont(0.90) WITHIN GROUP (ORDER BY first_token_ms) AS p90,
|
|
percentile_cont(0.95) WITHIN GROUP (ORDER BY first_token_ms) AS p95,
|
|
percentile_cont(0.99) WITHIN GROUP (ORDER BY first_token_ms) AS p99,
|
|
AVG(first_token_ms) AS avg_ms,
|
|
MAX(first_token_ms) AS max_ms
|
|
FROM usage_logs
|
|
WHERE created_at >= $1 AND created_at < $2
|
|
AND first_token_ms IS NOT NULL`
|
|
|
|
var p50, p90, p95, p99 sql.NullFloat64
|
|
var avg sql.NullFloat64
|
|
var max sql.NullInt64
|
|
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&p50, &p90, &p95, &p99, &avg, &max); err != nil {
|
|
return opsCollectedPercentiles{}, opsCollectedPercentiles{}, err
|
|
}
|
|
ttft.p50 = floatToIntPtr(p50)
|
|
ttft.p90 = floatToIntPtr(p90)
|
|
ttft.p95 = floatToIntPtr(p95)
|
|
ttft.p99 = floatToIntPtr(p99)
|
|
if avg.Valid {
|
|
v := roundTo1DP(avg.Float64)
|
|
ttft.avg = &v
|
|
}
|
|
if max.Valid {
|
|
v := int(max.Int64)
|
|
ttft.max = &v
|
|
}
|
|
}
|
|
|
|
return duration, ttft, nil
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) queryErrorCounts(ctx context.Context, start, end time.Time) (
|
|
errorTotal int64,
|
|
businessLimited int64,
|
|
errorSLA int64,
|
|
upstreamExcl429529 int64,
|
|
upstream429 int64,
|
|
upstream529 int64,
|
|
err error,
|
|
) {
|
|
q := `
|
|
SELECT
|
|
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400), 0) AS error_total,
|
|
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND is_business_limited), 0) AS business_limited,
|
|
COALESCE(COUNT(*) FILTER (WHERE COALESCE(status_code, 0) >= 400 AND NOT is_business_limited), 0) AS error_sla,
|
|
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) NOT IN (429, 529)), 0) AS upstream_excl,
|
|
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 429), 0) AS upstream_429,
|
|
COALESCE(COUNT(*) FILTER (WHERE error_owner = 'provider' AND NOT is_business_limited AND COALESCE(upstream_status_code, status_code, 0) = 529), 0) AS upstream_529
|
|
FROM ops_error_logs
|
|
WHERE created_at >= $1 AND created_at < $2`
|
|
|
|
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(
|
|
&errorTotal,
|
|
&businessLimited,
|
|
&errorSLA,
|
|
&upstreamExcl429529,
|
|
&upstream429,
|
|
&upstream529,
|
|
); err != nil {
|
|
return 0, 0, 0, 0, 0, 0, err
|
|
}
|
|
return errorTotal, businessLimited, errorSLA, upstreamExcl429529, upstream429, upstream529, nil
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) queryAccountSwitchCount(ctx context.Context, start, end time.Time) (int64, error) {
|
|
q := `
|
|
SELECT
|
|
COALESCE(SUM(CASE
|
|
WHEN split_part(ev->>'kind', ':', 1) IN ('failover', 'retry_exhausted_failover', 'failover_on_400') THEN 1
|
|
ELSE 0
|
|
END), 0) AS switch_count
|
|
FROM ops_error_logs o
|
|
CROSS JOIN LATERAL jsonb_array_elements(
|
|
COALESCE(NULLIF(o.upstream_errors, 'null'::jsonb), '[]'::jsonb)
|
|
) AS ev
|
|
WHERE o.created_at >= $1 AND o.created_at < $2
|
|
AND o.is_count_tokens = FALSE`
|
|
|
|
var count int64
|
|
if err := c.db.QueryRowContext(ctx, q, start, end).Scan(&count); err != nil {
|
|
return 0, err
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
type opsCollectedSystemStats struct {
|
|
cpuUsagePercent *float64
|
|
memoryUsedMB *int64
|
|
memoryTotalMB *int64
|
|
memoryUsagePercent *float64
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) collectSystemStats(ctx context.Context) (*opsCollectedSystemStats, error) {
|
|
out := &opsCollectedSystemStats{}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
sampleAt := time.Now().UTC()
|
|
|
|
// Prefer cgroup (container) metrics when available.
|
|
if cpuPct := c.tryCgroupCPUPercent(sampleAt); cpuPct != nil {
|
|
out.cpuUsagePercent = cpuPct
|
|
}
|
|
|
|
cgroupUsed, cgroupTotal, cgroupOK := readCgroupMemoryBytes()
|
|
if cgroupOK {
|
|
usedMB := int64(cgroupUsed / bytesPerMB)
|
|
out.memoryUsedMB = &usedMB
|
|
if cgroupTotal > 0 {
|
|
totalMB := int64(cgroupTotal / bytesPerMB)
|
|
out.memoryTotalMB = &totalMB
|
|
pct := roundTo1DP(float64(cgroupUsed) / float64(cgroupTotal) * 100)
|
|
out.memoryUsagePercent = &pct
|
|
}
|
|
}
|
|
|
|
// Fallback to host metrics if cgroup metrics are unavailable (or incomplete).
|
|
if out.cpuUsagePercent == nil {
|
|
if cpuPercents, err := cpu.PercentWithContext(ctx, 0, false); err == nil && len(cpuPercents) > 0 {
|
|
v := roundTo1DP(cpuPercents[0])
|
|
out.cpuUsagePercent = &v
|
|
}
|
|
}
|
|
|
|
// If total memory isn't available from cgroup (e.g. memory.max = "max"), fill total from host.
|
|
if out.memoryUsedMB == nil || out.memoryTotalMB == nil || out.memoryUsagePercent == nil {
|
|
if vm, err := mem.VirtualMemoryWithContext(ctx); err == nil && vm != nil {
|
|
if out.memoryUsedMB == nil {
|
|
usedMB := int64(vm.Used / bytesPerMB)
|
|
out.memoryUsedMB = &usedMB
|
|
}
|
|
if out.memoryTotalMB == nil {
|
|
totalMB := int64(vm.Total / bytesPerMB)
|
|
out.memoryTotalMB = &totalMB
|
|
}
|
|
if out.memoryUsagePercent == nil {
|
|
if out.memoryUsedMB != nil && out.memoryTotalMB != nil && *out.memoryTotalMB > 0 {
|
|
pct := roundTo1DP(float64(*out.memoryUsedMB) / float64(*out.memoryTotalMB) * 100)
|
|
out.memoryUsagePercent = &pct
|
|
} else {
|
|
pct := roundTo1DP(vm.UsedPercent)
|
|
out.memoryUsagePercent = &pct
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) tryCgroupCPUPercent(now time.Time) *float64 {
|
|
usageNanos, ok := readCgroupCPUUsageNanos()
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
// Initialize baseline sample.
|
|
if c.lastCgroupCPUSampleAt.IsZero() {
|
|
c.lastCgroupCPUUsageNanos = usageNanos
|
|
c.lastCgroupCPUSampleAt = now
|
|
return nil
|
|
}
|
|
|
|
elapsed := now.Sub(c.lastCgroupCPUSampleAt)
|
|
if elapsed <= 0 {
|
|
c.lastCgroupCPUUsageNanos = usageNanos
|
|
c.lastCgroupCPUSampleAt = now
|
|
return nil
|
|
}
|
|
|
|
prev := c.lastCgroupCPUUsageNanos
|
|
c.lastCgroupCPUUsageNanos = usageNanos
|
|
c.lastCgroupCPUSampleAt = now
|
|
|
|
if usageNanos < prev {
|
|
// Counter reset (container restarted).
|
|
return nil
|
|
}
|
|
|
|
deltaUsageSec := float64(usageNanos-prev) / 1e9
|
|
elapsedSec := elapsed.Seconds()
|
|
if elapsedSec <= 0 {
|
|
return nil
|
|
}
|
|
|
|
cores := readCgroupCPULimitCores()
|
|
if cores <= 0 {
|
|
// Can't reliably normalize; skip and fall back to gopsutil.
|
|
return nil
|
|
}
|
|
|
|
pct := (deltaUsageSec / (elapsedSec * cores)) * 100
|
|
if pct < 0 {
|
|
pct = 0
|
|
}
|
|
// Clamp to avoid noise/jitter showing impossible values.
|
|
if pct > 100 {
|
|
pct = 100
|
|
}
|
|
v := roundTo1DP(pct)
|
|
return &v
|
|
}
|
|
|
|
func readCgroupMemoryBytes() (usedBytes uint64, totalBytes uint64, ok bool) {
|
|
// cgroup v2 (most common in modern containers)
|
|
if used, ok1 := readUintFile("/sys/fs/cgroup/memory.current"); ok1 {
|
|
usedBytes = used
|
|
rawMax, err := os.ReadFile("/sys/fs/cgroup/memory.max")
|
|
if err == nil {
|
|
s := strings.TrimSpace(string(rawMax))
|
|
if s != "" && s != "max" {
|
|
if v, err := strconv.ParseUint(s, 10, 64); err == nil {
|
|
totalBytes = v
|
|
}
|
|
}
|
|
}
|
|
return usedBytes, totalBytes, true
|
|
}
|
|
|
|
// cgroup v1 fallback
|
|
if used, ok1 := readUintFile("/sys/fs/cgroup/memory/memory.usage_in_bytes"); ok1 {
|
|
usedBytes = used
|
|
if limit, ok2 := readUintFile("/sys/fs/cgroup/memory/memory.limit_in_bytes"); ok2 {
|
|
// Some environments report a very large number when unlimited.
|
|
if limit > 0 && limit < (1<<60) {
|
|
totalBytes = limit
|
|
}
|
|
}
|
|
return usedBytes, totalBytes, true
|
|
}
|
|
|
|
return 0, 0, false
|
|
}
|
|
|
|
func readCgroupCPUUsageNanos() (usageNanos uint64, ok bool) {
|
|
// cgroup v2: cpu.stat has usage_usec
|
|
if raw, err := os.ReadFile("/sys/fs/cgroup/cpu.stat"); err == nil {
|
|
lines := strings.Split(string(raw), "\n")
|
|
for _, line := range lines {
|
|
fields := strings.Fields(line)
|
|
if len(fields) != 2 {
|
|
continue
|
|
}
|
|
if fields[0] != "usage_usec" {
|
|
continue
|
|
}
|
|
v, err := strconv.ParseUint(fields[1], 10, 64)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
return v * 1000, true
|
|
}
|
|
}
|
|
|
|
// cgroup v1: cpuacct.usage is in nanoseconds
|
|
if v, ok := readUintFile("/sys/fs/cgroup/cpuacct/cpuacct.usage"); ok {
|
|
return v, true
|
|
}
|
|
|
|
return 0, false
|
|
}
|
|
|
|
func readCgroupCPULimitCores() float64 {
|
|
// cgroup v2: cpu.max => "<quota> <period>" or "max <period>"
|
|
if raw, err := os.ReadFile("/sys/fs/cgroup/cpu.max"); err == nil {
|
|
fields := strings.Fields(string(raw))
|
|
if len(fields) >= 2 && fields[0] != "max" {
|
|
quota, err1 := strconv.ParseFloat(fields[0], 64)
|
|
period, err2 := strconv.ParseFloat(fields[1], 64)
|
|
if err1 == nil && err2 == nil && quota > 0 && period > 0 {
|
|
return quota / period
|
|
}
|
|
}
|
|
}
|
|
|
|
// cgroup v1: cpu.cfs_quota_us / cpu.cfs_period_us
|
|
quota, okQuota := readIntFile("/sys/fs/cgroup/cpu/cpu.cfs_quota_us")
|
|
period, okPeriod := readIntFile("/sys/fs/cgroup/cpu/cpu.cfs_period_us")
|
|
if okQuota && okPeriod && quota > 0 && period > 0 {
|
|
return float64(quota) / float64(period)
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
func readUintFile(path string) (uint64, bool) {
|
|
raw, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
s := strings.TrimSpace(string(raw))
|
|
if s == "" {
|
|
return 0, false
|
|
}
|
|
v, err := strconv.ParseUint(s, 10, 64)
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
return v, true
|
|
}
|
|
|
|
func readIntFile(path string) (int64, bool) {
|
|
raw, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
s := strings.TrimSpace(string(raw))
|
|
if s == "" {
|
|
return 0, false
|
|
}
|
|
v, err := strconv.ParseInt(s, 10, 64)
|
|
if err != nil {
|
|
return 0, false
|
|
}
|
|
return v, true
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) checkDB(ctx context.Context) bool {
|
|
if c == nil || c.db == nil {
|
|
return false
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
var one int
|
|
if err := c.db.QueryRowContext(ctx, "SELECT 1").Scan(&one); err != nil {
|
|
return false
|
|
}
|
|
return one == 1
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) checkRedis(ctx context.Context) bool {
|
|
if c == nil || c.redisClient == nil {
|
|
return false
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
return c.redisClient.Ping(ctx).Err() == nil
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) redisPoolStats() (total int, idle int, ok bool) {
|
|
if c == nil || c.redisClient == nil {
|
|
return 0, 0, false
|
|
}
|
|
stats := c.redisClient.PoolStats()
|
|
if stats == nil {
|
|
return 0, 0, false
|
|
}
|
|
return int(stats.TotalConns), int(stats.IdleConns), true
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) dbPoolStats() (active int, idle int) {
|
|
if c == nil || c.db == nil {
|
|
return 0, 0
|
|
}
|
|
stats := c.db.Stats()
|
|
return stats.InUse, stats.Idle
|
|
}
|
|
|
|
var opsMetricsCollectorReleaseScript = redis.NewScript(`
|
|
if redis.call("GET", KEYS[1]) == ARGV[1] then
|
|
return redis.call("DEL", KEYS[1])
|
|
end
|
|
return 0
|
|
`)
|
|
|
|
func (c *OpsMetricsCollector) tryAcquireLeaderLock(ctx context.Context) (func(), bool) {
|
|
if c == nil || c.redisClient == nil {
|
|
return nil, true
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
|
|
ok, err := c.redisClient.SetNX(ctx, opsMetricsCollectorLeaderLockKey, c.instanceID, opsMetricsCollectorLeaderLockTTL).Result()
|
|
if err != nil {
|
|
// Prefer fail-closed to avoid stampeding the database when Redis is flaky.
|
|
// Fallback to a DB advisory lock when Redis is present but unavailable.
|
|
release, ok := tryAcquireDBAdvisoryLock(ctx, c.db, opsMetricsCollectorAdvisoryLockID)
|
|
if !ok {
|
|
c.maybeLogSkip()
|
|
return nil, false
|
|
}
|
|
return release, true
|
|
}
|
|
if !ok {
|
|
c.maybeLogSkip()
|
|
return nil, false
|
|
}
|
|
|
|
release := func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
|
defer cancel()
|
|
_, _ = opsMetricsCollectorReleaseScript.Run(ctx, c.redisClient, []string{opsMetricsCollectorLeaderLockKey}, c.instanceID).Result()
|
|
}
|
|
return release, true
|
|
}
|
|
|
|
func (c *OpsMetricsCollector) maybeLogSkip() {
|
|
c.skipLogMu.Lock()
|
|
defer c.skipLogMu.Unlock()
|
|
|
|
now := time.Now()
|
|
if !c.skipLogAt.IsZero() && now.Sub(c.skipLogAt) < time.Minute {
|
|
return
|
|
}
|
|
c.skipLogAt = now
|
|
log.Printf("[OpsMetricsCollector] leader lock held by another instance; skipping")
|
|
}
|
|
|
|
func floatToIntPtr(v sql.NullFloat64) *int {
|
|
if !v.Valid {
|
|
return nil
|
|
}
|
|
n := int(math.Round(v.Float64))
|
|
return &n
|
|
}
|
|
|
|
func roundTo1DP(v float64) float64 {
|
|
return math.Round(v*10) / 10
|
|
}
|
|
|
|
func truncateString(s string, max int) string {
|
|
if max <= 0 {
|
|
return ""
|
|
}
|
|
if len(s) <= max {
|
|
return s
|
|
}
|
|
cut := s[:max]
|
|
for len(cut) > 0 && !utf8.ValidString(cut) {
|
|
cut = cut[:len(cut)-1]
|
|
}
|
|
return cut
|
|
}
|
|
|
|
func boolPtr(v bool) *bool {
|
|
out := v
|
|
return &out
|
|
}
|
|
|
|
func intPtr(v int) *int {
|
|
out := v
|
|
return &out
|
|
}
|
|
|
|
func float64Ptr(v float64) *float64 {
|
|
out := v
|
|
return &out
|
|
}
|