feat(log): 落地统一日志底座与系统日志运维能力

This commit is contained in:
yangjianbo
2026-02-12 16:27:29 +08:00
parent a5f29019d9
commit fff1d54858
48 changed files with 4265 additions and 65 deletions

View File

@@ -161,6 +161,9 @@ const (
// SettingKeyOpsAdvancedSettings stores JSON config for ops advanced settings (data retention, aggregation).
SettingKeyOpsAdvancedSettings = "ops_advanced_settings"
// SettingKeyOpsRuntimeLogConfig stores JSON config for runtime log settings.
SettingKeyOpsRuntimeLogConfig = "ops_runtime_log_config"
// =========================
// Stream Timeout Handling
// =========================

View File

@@ -157,6 +157,8 @@ type opsCleanupDeletedCounts struct {
errorLogs int64
retryAttempts int64
alertEvents int64
systemLogs int64
logAudits int64
systemMetrics int64
hourlyPreagg int64
dailyPreagg int64
@@ -164,10 +166,12 @@ type opsCleanupDeletedCounts struct {
func (c opsCleanupDeletedCounts) String() string {
return fmt.Sprintf(
"error_logs=%d retry_attempts=%d alert_events=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
"error_logs=%d retry_attempts=%d alert_events=%d system_logs=%d log_audits=%d system_metrics=%d hourly_preagg=%d daily_preagg=%d",
c.errorLogs,
c.retryAttempts,
c.alertEvents,
c.systemLogs,
c.logAudits,
c.systemMetrics,
c.hourlyPreagg,
c.dailyPreagg,
@@ -204,6 +208,18 @@ func (s *OpsCleanupService) runCleanupOnce(ctx context.Context) (opsCleanupDelet
return out, err
}
out.alertEvents = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_system_logs", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.systemLogs = n
n, err = deleteOldRowsByID(ctx, s.db, "ops_system_log_cleanup_audits", "created_at", cutoff, batchSize, false)
if err != nil {
return out, err
}
out.logAudits = n
}
// Minute-level metrics snapshots.

View File

@@ -0,0 +1,267 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"go.uber.org/zap"
)
func defaultOpsRuntimeLogConfig(cfg *config.Config) *OpsRuntimeLogConfig {
out := &OpsRuntimeLogConfig{
Level: "info",
EnableSampling: false,
SamplingInitial: 100,
SamplingNext: 100,
Caller: true,
StacktraceLevel: "error",
RetentionDays: 30,
}
if cfg == nil {
return out
}
out.Level = strings.ToLower(strings.TrimSpace(cfg.Log.Level))
out.EnableSampling = cfg.Log.Sampling.Enabled
out.SamplingInitial = cfg.Log.Sampling.Initial
out.SamplingNext = cfg.Log.Sampling.Thereafter
out.Caller = cfg.Log.Caller
out.StacktraceLevel = strings.ToLower(strings.TrimSpace(cfg.Log.StacktraceLevel))
if cfg.Ops.Cleanup.ErrorLogRetentionDays > 0 {
out.RetentionDays = cfg.Ops.Cleanup.ErrorLogRetentionDays
}
return out
}
func normalizeOpsRuntimeLogConfig(cfg *OpsRuntimeLogConfig, defaults *OpsRuntimeLogConfig) {
if cfg == nil || defaults == nil {
return
}
cfg.Level = strings.ToLower(strings.TrimSpace(cfg.Level))
if cfg.Level == "" {
cfg.Level = defaults.Level
}
cfg.StacktraceLevel = strings.ToLower(strings.TrimSpace(cfg.StacktraceLevel))
if cfg.StacktraceLevel == "" {
cfg.StacktraceLevel = defaults.StacktraceLevel
}
if cfg.SamplingInitial <= 0 {
cfg.SamplingInitial = defaults.SamplingInitial
}
if cfg.SamplingNext <= 0 {
cfg.SamplingNext = defaults.SamplingNext
}
if cfg.RetentionDays <= 0 {
cfg.RetentionDays = defaults.RetentionDays
}
}
func validateOpsRuntimeLogConfig(cfg *OpsRuntimeLogConfig) error {
if cfg == nil {
return errors.New("invalid config")
}
switch strings.ToLower(strings.TrimSpace(cfg.Level)) {
case "debug", "info", "warn", "error":
default:
return errors.New("level must be one of: debug/info/warn/error")
}
switch strings.ToLower(strings.TrimSpace(cfg.StacktraceLevel)) {
case "none", "error", "fatal":
default:
return errors.New("stacktrace_level must be one of: none/error/fatal")
}
if cfg.SamplingInitial <= 0 {
return errors.New("sampling_initial must be positive")
}
if cfg.SamplingNext <= 0 {
return errors.New("sampling_thereafter must be positive")
}
if cfg.RetentionDays < 1 || cfg.RetentionDays > 3650 {
return errors.New("retention_days must be between 1 and 3650")
}
return nil
}
func (s *OpsService) GetRuntimeLogConfig(ctx context.Context) (*OpsRuntimeLogConfig, error) {
if s == nil || s.settingRepo == nil {
var cfg *config.Config
if s != nil {
cfg = s.cfg
}
defaultCfg := defaultOpsRuntimeLogConfig(cfg)
return defaultCfg, nil
}
defaultCfg := defaultOpsRuntimeLogConfig(s.cfg)
if ctx == nil {
ctx = context.Background()
}
raw, err := s.settingRepo.GetValue(ctx, SettingKeyOpsRuntimeLogConfig)
if err != nil {
if errors.Is(err, ErrSettingNotFound) {
b, _ := json.Marshal(defaultCfg)
_ = s.settingRepo.Set(ctx, SettingKeyOpsRuntimeLogConfig, string(b))
return defaultCfg, nil
}
return nil, err
}
cfg := &OpsRuntimeLogConfig{}
if err := json.Unmarshal([]byte(raw), cfg); err != nil {
return defaultCfg, nil
}
normalizeOpsRuntimeLogConfig(cfg, defaultCfg)
return cfg, nil
}
func (s *OpsService) UpdateRuntimeLogConfig(ctx context.Context, req *OpsRuntimeLogConfig, operatorID int64) (*OpsRuntimeLogConfig, error) {
if s == nil || s.settingRepo == nil {
return nil, errors.New("setting repository not initialized")
}
if req == nil {
return nil, errors.New("invalid config")
}
if ctx == nil {
ctx = context.Background()
}
if operatorID <= 0 {
return nil, errors.New("invalid operator id")
}
oldCfg, err := s.GetRuntimeLogConfig(ctx)
if err != nil {
return nil, err
}
next := *req
normalizeOpsRuntimeLogConfig(&next, defaultOpsRuntimeLogConfig(s.cfg))
if err := validateOpsRuntimeLogConfig(&next); err != nil {
s.auditRuntimeLogConfigFailure(operatorID, oldCfg, &next, "validation_failed: "+err.Error())
return nil, err
}
if err := applyOpsRuntimeLogConfig(&next); err != nil {
s.auditRuntimeLogConfigFailure(operatorID, oldCfg, &next, "apply_failed: "+err.Error())
return nil, err
}
next.Source = "runtime_setting"
next.UpdatedAt = time.Now().UTC().Format(time.RFC3339Nano)
next.UpdatedByUserID = operatorID
encoded, err := json.Marshal(&next)
if err != nil {
return nil, err
}
if err := s.settingRepo.Set(ctx, SettingKeyOpsRuntimeLogConfig, string(encoded)); err != nil {
// 存储失败时回滚到旧配置,避免内存状态与持久化状态不一致。
_ = applyOpsRuntimeLogConfig(oldCfg)
s.auditRuntimeLogConfigFailure(operatorID, oldCfg, &next, "persist_failed: "+err.Error())
return nil, err
}
s.auditRuntimeLogConfigChange(operatorID, oldCfg, &next, "updated")
return &next, nil
}
func (s *OpsService) ResetRuntimeLogConfig(ctx context.Context, operatorID int64) (*OpsRuntimeLogConfig, error) {
if s == nil || s.settingRepo == nil {
return nil, errors.New("setting repository not initialized")
}
if ctx == nil {
ctx = context.Background()
}
if operatorID <= 0 {
return nil, errors.New("invalid operator id")
}
oldCfg, err := s.GetRuntimeLogConfig(ctx)
if err != nil {
return nil, err
}
resetCfg := defaultOpsRuntimeLogConfig(s.cfg)
normalizeOpsRuntimeLogConfig(resetCfg, defaultOpsRuntimeLogConfig(s.cfg))
if err := validateOpsRuntimeLogConfig(resetCfg); err != nil {
s.auditRuntimeLogConfigFailure(operatorID, oldCfg, resetCfg, "reset_validation_failed: "+err.Error())
return nil, err
}
if err := applyOpsRuntimeLogConfig(resetCfg); err != nil {
s.auditRuntimeLogConfigFailure(operatorID, oldCfg, resetCfg, "reset_apply_failed: "+err.Error())
return nil, err
}
// 清理 runtime 覆盖配置,回退到 env/yaml baseline。
if err := s.settingRepo.Delete(ctx, SettingKeyOpsRuntimeLogConfig); err != nil && !errors.Is(err, ErrSettingNotFound) {
_ = applyOpsRuntimeLogConfig(oldCfg)
s.auditRuntimeLogConfigFailure(operatorID, oldCfg, resetCfg, "reset_persist_failed: "+err.Error())
return nil, err
}
now := time.Now().UTC().Format(time.RFC3339Nano)
resetCfg.Source = "baseline"
resetCfg.UpdatedAt = now
resetCfg.UpdatedByUserID = operatorID
s.auditRuntimeLogConfigChange(operatorID, oldCfg, resetCfg, "reset")
return resetCfg, nil
}
func applyOpsRuntimeLogConfig(cfg *OpsRuntimeLogConfig) error {
if cfg == nil {
return fmt.Errorf("nil runtime log config")
}
if err := logger.Reconfigure(func(opts *logger.InitOptions) error {
opts.Level = strings.ToLower(strings.TrimSpace(cfg.Level))
opts.Caller = cfg.Caller
opts.StacktraceLevel = strings.ToLower(strings.TrimSpace(cfg.StacktraceLevel))
opts.Sampling.Enabled = cfg.EnableSampling
opts.Sampling.Initial = cfg.SamplingInitial
opts.Sampling.Thereafter = cfg.SamplingNext
return nil
}); err != nil {
return err
}
return nil
}
func (s *OpsService) applyRuntimeLogConfigOnStartup(ctx context.Context) {
if s == nil {
return
}
cfg, err := s.GetRuntimeLogConfig(ctx)
if err != nil {
return
}
_ = applyOpsRuntimeLogConfig(cfg)
}
func (s *OpsService) auditRuntimeLogConfigChange(operatorID int64, oldCfg *OpsRuntimeLogConfig, newCfg *OpsRuntimeLogConfig, action string) {
oldRaw, _ := json.Marshal(oldCfg)
newRaw, _ := json.Marshal(newCfg)
logger.With(
zap.String("component", "audit.log_config_change"),
zap.String("action", strings.TrimSpace(action)),
zap.Int64("operator_id", operatorID),
zap.String("old", string(oldRaw)),
zap.String("new", string(newRaw)),
).Info("runtime log config changed")
}
func (s *OpsService) auditRuntimeLogConfigFailure(operatorID int64, oldCfg *OpsRuntimeLogConfig, newCfg *OpsRuntimeLogConfig, reason string) {
oldRaw, _ := json.Marshal(oldCfg)
newRaw, _ := json.Marshal(newCfg)
logger.With(
zap.String("component", "audit.log_config_change"),
zap.String("action", "failed"),
zap.Int64("operator_id", operatorID),
zap.String("reason", strings.TrimSpace(reason)),
zap.String("old", string(oldRaw)),
zap.String("new", string(newRaw)),
).Warn("runtime log config change failed")
}

View File

@@ -2,6 +2,21 @@ package service
import "time"
type OpsSystemLog struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`
Level string `json:"level"`
Component string `json:"component"`
Message string `json:"message"`
RequestID string `json:"request_id"`
ClientRequestID string `json:"client_request_id"`
UserID *int64 `json:"user_id"`
AccountID *int64 `json:"account_id"`
Platform string `json:"platform"`
Model string `json:"model"`
Extra map[string]any `json:"extra,omitempty"`
}
type OpsErrorLog struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`

View File

@@ -10,6 +10,10 @@ type OpsRepository interface {
ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error)
GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error)
ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) ([]*OpsRequestDetail, int64, error)
BatchInsertSystemLogs(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error)
ListSystemLogs(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error)
DeleteSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error)
InsertSystemLogCleanupAudit(ctx context.Context, input *OpsSystemLogCleanupAudit) error
InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error)
UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error
@@ -205,6 +209,69 @@ type OpsInsertSystemMetricsInput struct {
ConcurrencyQueueDepth *int
}
type OpsInsertSystemLogInput struct {
CreatedAt time.Time
Level string
Component string
Message string
RequestID string
ClientRequestID string
UserID *int64
AccountID *int64
Platform string
Model string
ExtraJSON string
}
type OpsSystemLogFilter struct {
StartTime *time.Time
EndTime *time.Time
Level string
Component string
RequestID string
ClientRequestID string
UserID *int64
AccountID *int64
Platform string
Model string
Query string
Page int
PageSize int
}
type OpsSystemLogCleanupFilter struct {
StartTime *time.Time
EndTime *time.Time
Level string
Component string
RequestID string
ClientRequestID string
UserID *int64
AccountID *int64
Platform string
Model string
Query string
}
type OpsSystemLogList struct {
Logs []*OpsSystemLog `json:"logs"`
Total int `json:"total"`
Page int `json:"page"`
PageSize int `json:"page_size"`
}
type OpsSystemLogCleanupAudit struct {
CreatedAt time.Time
OperatorID int64
Conditions string
DeletedRows int64
}
type OpsSystemMetricsSnapshot struct {
ID int64 `json:"id"`
CreatedAt time.Time `json:"created_at"`

View File

@@ -0,0 +1,196 @@
package service
import (
"context"
"time"
)
// opsRepoMock is a test-only OpsRepository implementation with optional function hooks.
type opsRepoMock struct {
BatchInsertSystemLogsFn func(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error)
ListSystemLogsFn func(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error)
DeleteSystemLogsFn func(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error)
InsertSystemLogCleanupAuditFn func(ctx context.Context, input *OpsSystemLogCleanupAudit) error
}
func (m *opsRepoMock) InsertErrorLog(ctx context.Context, input *OpsInsertErrorLogInput) (int64, error) {
return 0, nil
}
func (m *opsRepoMock) ListErrorLogs(ctx context.Context, filter *OpsErrorLogFilter) (*OpsErrorLogList, error) {
return &OpsErrorLogList{Errors: []*OpsErrorLog{}, Page: 1, PageSize: 20}, nil
}
func (m *opsRepoMock) GetErrorLogByID(ctx context.Context, id int64) (*OpsErrorLogDetail, error) {
return &OpsErrorLogDetail{}, nil
}
func (m *opsRepoMock) ListRequestDetails(ctx context.Context, filter *OpsRequestDetailFilter) ([]*OpsRequestDetail, int64, error) {
return []*OpsRequestDetail{}, 0, nil
}
func (m *opsRepoMock) BatchInsertSystemLogs(ctx context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) {
if m.BatchInsertSystemLogsFn != nil {
return m.BatchInsertSystemLogsFn(ctx, inputs)
}
return int64(len(inputs)), nil
}
func (m *opsRepoMock) ListSystemLogs(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error) {
if m.ListSystemLogsFn != nil {
return m.ListSystemLogsFn(ctx, filter)
}
return &OpsSystemLogList{Logs: []*OpsSystemLog{}, Total: 0, Page: 1, PageSize: 50}, nil
}
func (m *opsRepoMock) DeleteSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter) (int64, error) {
if m.DeleteSystemLogsFn != nil {
return m.DeleteSystemLogsFn(ctx, filter)
}
return 0, nil
}
func (m *opsRepoMock) InsertSystemLogCleanupAudit(ctx context.Context, input *OpsSystemLogCleanupAudit) error {
if m.InsertSystemLogCleanupAuditFn != nil {
return m.InsertSystemLogCleanupAuditFn(ctx, input)
}
return nil
}
func (m *opsRepoMock) InsertRetryAttempt(ctx context.Context, input *OpsInsertRetryAttemptInput) (int64, error) {
return 0, nil
}
func (m *opsRepoMock) UpdateRetryAttempt(ctx context.Context, input *OpsUpdateRetryAttemptInput) error {
return nil
}
func (m *opsRepoMock) GetLatestRetryAttemptForError(ctx context.Context, sourceErrorID int64) (*OpsRetryAttempt, error) {
return nil, nil
}
func (m *opsRepoMock) ListRetryAttemptsByErrorID(ctx context.Context, sourceErrorID int64, limit int) ([]*OpsRetryAttempt, error) {
return []*OpsRetryAttempt{}, nil
}
func (m *opsRepoMock) UpdateErrorResolution(ctx context.Context, errorID int64, resolved bool, resolvedByUserID *int64, resolvedRetryID *int64, resolvedAt *time.Time) error {
return nil
}
func (m *opsRepoMock) GetWindowStats(ctx context.Context, filter *OpsDashboardFilter) (*OpsWindowStats, error) {
return &OpsWindowStats{}, nil
}
func (m *opsRepoMock) GetRealtimeTrafficSummary(ctx context.Context, filter *OpsDashboardFilter) (*OpsRealtimeTrafficSummary, error) {
return &OpsRealtimeTrafficSummary{}, nil
}
func (m *opsRepoMock) GetDashboardOverview(ctx context.Context, filter *OpsDashboardFilter) (*OpsDashboardOverview, error) {
return &OpsDashboardOverview{}, nil
}
func (m *opsRepoMock) GetThroughputTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsThroughputTrendResponse, error) {
return &OpsThroughputTrendResponse{}, nil
}
func (m *opsRepoMock) GetLatencyHistogram(ctx context.Context, filter *OpsDashboardFilter) (*OpsLatencyHistogramResponse, error) {
return &OpsLatencyHistogramResponse{}, nil
}
func (m *opsRepoMock) GetErrorTrend(ctx context.Context, filter *OpsDashboardFilter, bucketSeconds int) (*OpsErrorTrendResponse, error) {
return &OpsErrorTrendResponse{}, nil
}
func (m *opsRepoMock) GetErrorDistribution(ctx context.Context, filter *OpsDashboardFilter) (*OpsErrorDistributionResponse, error) {
return &OpsErrorDistributionResponse{}, nil
}
func (m *opsRepoMock) GetOpenAITokenStats(ctx context.Context, filter *OpsOpenAITokenStatsFilter) (*OpsOpenAITokenStatsResponse, error) {
return &OpsOpenAITokenStatsResponse{}, nil
}
func (m *opsRepoMock) InsertSystemMetrics(ctx context.Context, input *OpsInsertSystemMetricsInput) error {
return nil
}
func (m *opsRepoMock) GetLatestSystemMetrics(ctx context.Context, windowMinutes int) (*OpsSystemMetricsSnapshot, error) {
return &OpsSystemMetricsSnapshot{}, nil
}
func (m *opsRepoMock) UpsertJobHeartbeat(ctx context.Context, input *OpsUpsertJobHeartbeatInput) error {
return nil
}
func (m *opsRepoMock) ListJobHeartbeats(ctx context.Context) ([]*OpsJobHeartbeat, error) {
return []*OpsJobHeartbeat{}, nil
}
func (m *opsRepoMock) ListAlertRules(ctx context.Context) ([]*OpsAlertRule, error) {
return []*OpsAlertRule{}, nil
}
func (m *opsRepoMock) CreateAlertRule(ctx context.Context, input *OpsAlertRule) (*OpsAlertRule, error) {
return input, nil
}
func (m *opsRepoMock) UpdateAlertRule(ctx context.Context, input *OpsAlertRule) (*OpsAlertRule, error) {
return input, nil
}
func (m *opsRepoMock) DeleteAlertRule(ctx context.Context, id int64) error {
return nil
}
func (m *opsRepoMock) ListAlertEvents(ctx context.Context, filter *OpsAlertEventFilter) ([]*OpsAlertEvent, error) {
return []*OpsAlertEvent{}, nil
}
func (m *opsRepoMock) GetAlertEventByID(ctx context.Context, eventID int64) (*OpsAlertEvent, error) {
return &OpsAlertEvent{}, nil
}
func (m *opsRepoMock) GetActiveAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
return nil, nil
}
func (m *opsRepoMock) GetLatestAlertEvent(ctx context.Context, ruleID int64) (*OpsAlertEvent, error) {
return nil, nil
}
func (m *opsRepoMock) CreateAlertEvent(ctx context.Context, event *OpsAlertEvent) (*OpsAlertEvent, error) {
return event, nil
}
func (m *opsRepoMock) UpdateAlertEventStatus(ctx context.Context, eventID int64, status string, resolvedAt *time.Time) error {
return nil
}
func (m *opsRepoMock) UpdateAlertEventEmailSent(ctx context.Context, eventID int64, emailSent bool) error {
return nil
}
func (m *opsRepoMock) CreateAlertSilence(ctx context.Context, input *OpsAlertSilence) (*OpsAlertSilence, error) {
return input, nil
}
func (m *opsRepoMock) IsAlertSilenced(ctx context.Context, ruleID int64, platform string, groupID *int64, region *string, now time.Time) (bool, error) {
return false, nil
}
func (m *opsRepoMock) UpsertHourlyMetrics(ctx context.Context, startTime, endTime time.Time) error {
return nil
}
func (m *opsRepoMock) UpsertDailyMetrics(ctx context.Context, startTime, endTime time.Time) error {
return nil
}
func (m *opsRepoMock) GetLatestHourlyBucketStart(ctx context.Context) (time.Time, bool, error) {
return time.Time{}, false, nil
}
func (m *opsRepoMock) GetLatestDailyBucketDate(ctx context.Context) (time.Time, bool, error) {
return time.Time{}, false, nil
}
var _ OpsRepository = (*opsRepoMock)(nil)

View File

@@ -37,6 +37,7 @@ type OpsService struct {
openAIGatewayService *OpenAIGatewayService
geminiCompatService *GeminiMessagesCompatService
antigravityGatewayService *AntigravityGatewayService
systemLogSink *OpsSystemLogSink
}
func NewOpsService(
@@ -50,8 +51,9 @@ func NewOpsService(
openAIGatewayService *OpenAIGatewayService,
geminiCompatService *GeminiMessagesCompatService,
antigravityGatewayService *AntigravityGatewayService,
systemLogSink *OpsSystemLogSink,
) *OpsService {
return &OpsService{
svc := &OpsService{
opsRepo: opsRepo,
settingRepo: settingRepo,
cfg: cfg,
@@ -64,7 +66,10 @@ func NewOpsService(
openAIGatewayService: openAIGatewayService,
geminiCompatService: geminiCompatService,
antigravityGatewayService: antigravityGatewayService,
systemLogSink: systemLogSink,
}
svc.applyRuntimeLogConfigOnStartup(context.Background())
return svc
}
func (s *OpsService) RequireMonitoringEnabled(ctx context.Context) error {

View File

@@ -0,0 +1,124 @@
package service
import (
"context"
"database/sql"
"encoding/json"
"errors"
"log"
"strings"
"time"
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
)
func (s *OpsService) ListSystemLogs(ctx context.Context, filter *OpsSystemLogFilter) (*OpsSystemLogList, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return nil, err
}
if s.opsRepo == nil {
return &OpsSystemLogList{
Logs: []*OpsSystemLog{},
Total: 0,
Page: 1,
PageSize: 50,
}, nil
}
if filter == nil {
filter = &OpsSystemLogFilter{}
}
if filter.Page <= 0 {
filter.Page = 1
}
if filter.PageSize <= 0 {
filter.PageSize = 50
}
if filter.PageSize > 200 {
filter.PageSize = 200
}
result, err := s.opsRepo.ListSystemLogs(ctx, filter)
if err != nil {
return nil, infraerrors.InternalServer("OPS_SYSTEM_LOG_LIST_FAILED", "Failed to list system logs").WithCause(err)
}
return result, nil
}
func (s *OpsService) CleanupSystemLogs(ctx context.Context, filter *OpsSystemLogCleanupFilter, operatorID int64) (int64, error) {
if err := s.RequireMonitoringEnabled(ctx); err != nil {
return 0, err
}
if s.opsRepo == nil {
return 0, infraerrors.ServiceUnavailable("OPS_REPO_UNAVAILABLE", "Ops repository not available")
}
if operatorID <= 0 {
return 0, infraerrors.BadRequest("OPS_SYSTEM_LOG_CLEANUP_INVALID_OPERATOR", "invalid operator")
}
if filter == nil {
filter = &OpsSystemLogCleanupFilter{}
}
if filter.EndTime != nil && filter.StartTime != nil && filter.StartTime.After(*filter.EndTime) {
return 0, infraerrors.BadRequest("OPS_SYSTEM_LOG_CLEANUP_INVALID_RANGE", "invalid time range")
}
deletedRows, err := s.opsRepo.DeleteSystemLogs(ctx, filter)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
if strings.Contains(strings.ToLower(err.Error()), "requires at least one filter") {
return 0, infraerrors.BadRequest("OPS_SYSTEM_LOG_CLEANUP_FILTER_REQUIRED", "cleanup requires at least one filter condition")
}
return 0, infraerrors.InternalServer("OPS_SYSTEM_LOG_CLEANUP_FAILED", "Failed to cleanup system logs").WithCause(err)
}
if auditErr := s.opsRepo.InsertSystemLogCleanupAudit(ctx, &OpsSystemLogCleanupAudit{
CreatedAt: time.Now().UTC(),
OperatorID: operatorID,
Conditions: marshalSystemLogCleanupConditions(filter),
DeletedRows: deletedRows,
}); auditErr != nil {
// 审计失败不影响主流程,避免运维清理被阻塞。
log.Printf("[OpsSystemLog] cleanup audit failed: %v", auditErr)
}
return deletedRows, nil
}
func marshalSystemLogCleanupConditions(filter *OpsSystemLogCleanupFilter) string {
if filter == nil {
return "{}"
}
payload := map[string]any{
"level": strings.TrimSpace(filter.Level),
"component": strings.TrimSpace(filter.Component),
"request_id": strings.TrimSpace(filter.RequestID),
"client_request_id": strings.TrimSpace(filter.ClientRequestID),
"platform": strings.TrimSpace(filter.Platform),
"model": strings.TrimSpace(filter.Model),
"query": strings.TrimSpace(filter.Query),
}
if filter.UserID != nil {
payload["user_id"] = *filter.UserID
}
if filter.AccountID != nil {
payload["account_id"] = *filter.AccountID
}
if filter.StartTime != nil && !filter.StartTime.IsZero() {
payload["start_time"] = filter.StartTime.UTC().Format(time.RFC3339Nano)
}
if filter.EndTime != nil && !filter.EndTime.IsZero() {
payload["end_time"] = filter.EndTime.UTC().Format(time.RFC3339Nano)
}
raw, err := json.Marshal(payload)
if err != nil {
return "{}"
}
return string(raw)
}
func (s *OpsService) GetSystemLogSinkHealth() OpsSystemLogSinkHealth {
if s == nil || s.systemLogSink == nil {
return OpsSystemLogSinkHealth{}
}
return s.systemLogSink.Health()
}

View File

@@ -0,0 +1,302 @@
package service
import (
"context"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/Wei-Shaw/sub2api/internal/util/logredact"
)
type OpsSystemLogSinkHealth struct {
QueueDepth int64 `json:"queue_depth"`
QueueCapacity int64 `json:"queue_capacity"`
DroppedCount uint64 `json:"dropped_count"`
WriteFailed uint64 `json:"write_failed_count"`
WrittenCount uint64 `json:"written_count"`
AvgWriteDelayMs uint64 `json:"avg_write_delay_ms"`
LastError string `json:"last_error"`
}
type OpsSystemLogSink struct {
opsRepo OpsRepository
queue chan *logger.LogEvent
batchSize int
flushInterval time.Duration
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
droppedCount uint64
writeFailed uint64
writtenCount uint64
totalDelayNs uint64
lastError atomic.Value
}
func NewOpsSystemLogSink(opsRepo OpsRepository) *OpsSystemLogSink {
ctx, cancel := context.WithCancel(context.Background())
s := &OpsSystemLogSink{
opsRepo: opsRepo,
queue: make(chan *logger.LogEvent, 5000),
batchSize: 200,
flushInterval: time.Second,
ctx: ctx,
cancel: cancel,
}
s.lastError.Store("")
return s
}
func (s *OpsSystemLogSink) Start() {
if s == nil || s.opsRepo == nil {
return
}
s.wg.Add(1)
go s.run()
}
func (s *OpsSystemLogSink) Stop() {
if s == nil {
return
}
s.cancel()
s.wg.Wait()
}
func (s *OpsSystemLogSink) WriteLogEvent(event *logger.LogEvent) {
if s == nil || event == nil || !s.shouldIndex(event) {
return
}
select {
case s.queue <- event:
default:
atomic.AddUint64(&s.droppedCount, 1)
}
}
func (s *OpsSystemLogSink) shouldIndex(event *logger.LogEvent) bool {
level := strings.ToLower(strings.TrimSpace(event.Level))
switch level {
case "warn", "warning", "error", "fatal", "panic", "dpanic":
return true
}
component := strings.ToLower(strings.TrimSpace(event.Component))
if strings.Contains(component, "http.access") {
return true
}
if strings.Contains(component, "audit") {
return true
}
return false
}
func (s *OpsSystemLogSink) run() {
defer s.wg.Done()
ticker := time.NewTicker(s.flushInterval)
defer ticker.Stop()
batch := make([]*logger.LogEvent, 0, s.batchSize)
flush := func() {
if len(batch) == 0 {
return
}
started := time.Now()
inserted, err := s.flushBatch(batch)
delay := time.Since(started)
if err != nil {
atomic.AddUint64(&s.writeFailed, uint64(len(batch)))
s.lastError.Store(err.Error())
_, _ = fmt.Fprintf(os.Stderr, "time=%s level=WARN msg=\"ops system log sink flush failed\" err=%v batch=%d\n",
time.Now().Format(time.RFC3339Nano), err, len(batch),
)
} else {
atomic.AddUint64(&s.writtenCount, uint64(inserted))
atomic.AddUint64(&s.totalDelayNs, uint64(delay.Nanoseconds()))
s.lastError.Store("")
}
batch = batch[:0]
}
for {
select {
case <-s.ctx.Done():
flush()
return
case item := <-s.queue:
if item == nil {
continue
}
batch = append(batch, item)
if len(batch) >= s.batchSize {
flush()
}
case <-ticker.C:
flush()
}
}
}
func (s *OpsSystemLogSink) flushBatch(batch []*logger.LogEvent) (int, error) {
inputs := make([]*OpsInsertSystemLogInput, 0, len(batch))
for _, event := range batch {
if event == nil {
continue
}
createdAt := event.Time.UTC()
if createdAt.IsZero() {
createdAt = time.Now().UTC()
}
fields := copyMap(event.Fields)
requestID := asString(fields["request_id"])
clientRequestID := asString(fields["client_request_id"])
platform := asString(fields["platform"])
model := asString(fields["model"])
component := strings.TrimSpace(event.Component)
if fieldComponent := asString(fields["component"]); fieldComponent != "" {
component = fieldComponent
}
if component == "" {
component = "app"
}
userID := asInt64Ptr(fields["user_id"])
accountID := asInt64Ptr(fields["account_id"])
// 统一脱敏后写入索引。
message := logredact.RedactText(strings.TrimSpace(event.Message))
redactedExtra := logredact.RedactMap(fields)
extraJSONBytes, _ := json.Marshal(redactedExtra)
extraJSON := string(extraJSONBytes)
if strings.TrimSpace(extraJSON) == "" {
extraJSON = "{}"
}
inputs = append(inputs, &OpsInsertSystemLogInput{
CreatedAt: createdAt,
Level: strings.ToLower(strings.TrimSpace(event.Level)),
Component: component,
Message: message,
RequestID: requestID,
ClientRequestID: clientRequestID,
UserID: userID,
AccountID: accountID,
Platform: platform,
Model: model,
ExtraJSON: extraJSON,
})
}
if len(inputs) == 0 {
return 0, nil
}
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
defer cancel()
inserted, err := s.opsRepo.BatchInsertSystemLogs(ctx, inputs)
if err != nil {
return 0, err
}
return int(inserted), nil
}
func (s *OpsSystemLogSink) Health() OpsSystemLogSinkHealth {
if s == nil {
return OpsSystemLogSinkHealth{}
}
written := atomic.LoadUint64(&s.writtenCount)
totalDelay := atomic.LoadUint64(&s.totalDelayNs)
var avgDelay uint64
if written > 0 {
avgDelay = (totalDelay / written) / uint64(time.Millisecond)
}
lastErr, _ := s.lastError.Load().(string)
return OpsSystemLogSinkHealth{
QueueDepth: int64(len(s.queue)),
QueueCapacity: int64(cap(s.queue)),
DroppedCount: atomic.LoadUint64(&s.droppedCount),
WriteFailed: atomic.LoadUint64(&s.writeFailed),
WrittenCount: written,
AvgWriteDelayMs: avgDelay,
LastError: strings.TrimSpace(lastErr),
}
}
func copyMap(in map[string]any) map[string]any {
if len(in) == 0 {
return map[string]any{}
}
out := make(map[string]any, len(in))
for k, v := range in {
out[k] = v
}
return out
}
func asString(v any) string {
switch t := v.(type) {
case string:
return strings.TrimSpace(t)
case fmt.Stringer:
return strings.TrimSpace(t.String())
default:
return ""
}
}
func asInt64Ptr(v any) *int64 {
switch t := v.(type) {
case int:
n := int64(t)
if n <= 0 {
return nil
}
return &n
case int64:
n := t
if n <= 0 {
return nil
}
return &n
case float64:
n := int64(t)
if n <= 0 {
return nil
}
return &n
case json.Number:
if n, err := t.Int64(); err == nil {
if n <= 0 {
return nil
}
return &n
}
case string:
raw := strings.TrimSpace(t)
if raw == "" {
return nil
}
if n, err := strconv.ParseInt(raw, 10, 64); err == nil {
if n <= 0 {
return nil
}
return &n
}
}
return nil
}

View File

@@ -0,0 +1,254 @@
package service
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
"time"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
)
func TestOpsSystemLogSink_ShouldIndex(t *testing.T) {
sink := &OpsSystemLogSink{}
cases := []struct {
name string
event *logger.LogEvent
want bool
}{
{
name: "warn level",
event: &logger.LogEvent{Level: "warn", Component: "app"},
want: true,
},
{
name: "error level",
event: &logger.LogEvent{Level: "error", Component: "app"},
want: true,
},
{
name: "access component",
event: &logger.LogEvent{Level: "info", Component: "http.access"},
want: true,
},
{
name: "audit component",
event: &logger.LogEvent{Level: "info", Component: "audit.log_config_change"},
want: true,
},
{
name: "plain info",
event: &logger.LogEvent{Level: "info", Component: "app"},
want: false,
},
}
for _, tc := range cases {
if got := sink.shouldIndex(tc.event); got != tc.want {
t.Fatalf("%s: shouldIndex()=%v, want %v", tc.name, got, tc.want)
}
}
}
func TestOpsSystemLogSink_WriteLogEvent_ShouldDropWhenQueueFull(t *testing.T) {
sink := &OpsSystemLogSink{
queue: make(chan *logger.LogEvent, 1),
}
sink.WriteLogEvent(&logger.LogEvent{Level: "warn", Component: "app"})
sink.WriteLogEvent(&logger.LogEvent{Level: "warn", Component: "app"})
if got := len(sink.queue); got != 1 {
t.Fatalf("queue len = %d, want 1", got)
}
if dropped := atomic.LoadUint64(&sink.droppedCount); dropped != 1 {
t.Fatalf("droppedCount = %d, want 1", dropped)
}
}
func TestOpsSystemLogSink_Health(t *testing.T) {
sink := &OpsSystemLogSink{
queue: make(chan *logger.LogEvent, 10),
}
sink.lastError.Store("db timeout")
atomic.StoreUint64(&sink.droppedCount, 3)
atomic.StoreUint64(&sink.writeFailed, 2)
atomic.StoreUint64(&sink.writtenCount, 5)
atomic.StoreUint64(&sink.totalDelayNs, uint64(5000000)) // 5ms total -> avg 1ms
sink.queue <- &logger.LogEvent{Level: "warn", Component: "app"}
sink.queue <- &logger.LogEvent{Level: "warn", Component: "app"}
health := sink.Health()
if health.QueueDepth != 2 {
t.Fatalf("queue depth = %d, want 2", health.QueueDepth)
}
if health.QueueCapacity != 10 {
t.Fatalf("queue capacity = %d, want 10", health.QueueCapacity)
}
if health.DroppedCount != 3 {
t.Fatalf("dropped = %d, want 3", health.DroppedCount)
}
if health.WriteFailed != 2 {
t.Fatalf("write failed = %d, want 2", health.WriteFailed)
}
if health.WrittenCount != 5 {
t.Fatalf("written = %d, want 5", health.WrittenCount)
}
if health.AvgWriteDelayMs != 1 {
t.Fatalf("avg delay ms = %d, want 1", health.AvgWriteDelayMs)
}
if health.LastError != "db timeout" {
t.Fatalf("last error = %q, want db timeout", health.LastError)
}
}
func TestOpsSystemLogSink_StartStopAndFlushSuccess(t *testing.T) {
done := make(chan struct{}, 1)
var captured []*OpsInsertSystemLogInput
repo := &opsRepoMock{
BatchInsertSystemLogsFn: func(_ context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) {
captured = append(captured, inputs...)
select {
case done <- struct{}{}:
default:
}
return int64(len(inputs)), nil
},
}
sink := NewOpsSystemLogSink(repo)
sink.batchSize = 1
sink.flushInterval = 10 * time.Millisecond
sink.Start()
defer sink.Stop()
sink.WriteLogEvent(&logger.LogEvent{
Time: time.Now().UTC(),
Level: "warn",
Component: "http.access",
Message: `authorization="Bearer sk-test-123"`,
Fields: map[string]any{
"component": "http.access",
"request_id": "req-1",
"client_request_id": "creq-1",
"user_id": "12",
"account_id": json.Number("34"),
"platform": "openai",
"model": "gpt-5",
},
})
select {
case <-done:
case <-time.After(2 * time.Second):
t.Fatalf("timeout waiting for sink flush")
}
if len(captured) != 1 {
t.Fatalf("captured len = %d, want 1", len(captured))
}
item := captured[0]
if item.RequestID != "req-1" || item.ClientRequestID != "creq-1" {
t.Fatalf("unexpected request ids: %+v", item)
}
if item.UserID == nil || *item.UserID != 12 {
t.Fatalf("unexpected user_id: %+v", item.UserID)
}
if item.AccountID == nil || *item.AccountID != 34 {
t.Fatalf("unexpected account_id: %+v", item.AccountID)
}
if strings.TrimSpace(item.Message) == "" {
t.Fatalf("message should not be empty")
}
health := sink.Health()
if health.WrittenCount == 0 {
t.Fatalf("written_count should be >0")
}
}
func TestOpsSystemLogSink_FlushFailureUpdatesHealth(t *testing.T) {
repo := &opsRepoMock{
BatchInsertSystemLogsFn: func(_ context.Context, inputs []*OpsInsertSystemLogInput) (int64, error) {
return 0, errors.New("db unavailable")
},
}
sink := NewOpsSystemLogSink(repo)
sink.batchSize = 1
sink.flushInterval = 10 * time.Millisecond
sink.Start()
defer sink.Stop()
sink.WriteLogEvent(&logger.LogEvent{
Time: time.Now().UTC(),
Level: "warn",
Component: "app",
Message: "boom",
Fields: map[string]any{},
})
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
health := sink.Health()
if health.WriteFailed > 0 {
if !strings.Contains(health.LastError, "db unavailable") {
t.Fatalf("unexpected last error: %s", health.LastError)
}
return
}
time.Sleep(20 * time.Millisecond)
}
t.Fatalf("write_failed_count not updated")
}
type stringerValue string
func (s stringerValue) String() string { return string(s) }
func TestOpsSystemLogSink_HelperFunctions(t *testing.T) {
src := map[string]any{"a": 1}
cloned := copyMap(src)
src["a"] = 2
v, ok := cloned["a"].(int)
if !ok || v != 1 {
t.Fatalf("copyMap should create copy")
}
if got := asString(stringerValue(" hello ")); got != "hello" {
t.Fatalf("asString stringer = %q", got)
}
if got := asString(fmt.Errorf("x")); got != "" {
t.Fatalf("asString error should be empty, got %q", got)
}
if got := asString(123); got != "" {
t.Fatalf("asString non-string should be empty, got %q", got)
}
cases := []struct {
in any
want int64
ok bool
}{
{in: 5, want: 5, ok: true},
{in: int64(6), want: 6, ok: true},
{in: float64(7), want: 7, ok: true},
{in: json.Number("8"), want: 8, ok: true},
{in: "9", want: 9, ok: true},
{in: "0", ok: false},
{in: -1, ok: false},
{in: "abc", ok: false},
}
for _, tc := range cases {
got := asInt64Ptr(tc.in)
if tc.ok {
if got == nil || *got != tc.want {
t.Fatalf("asInt64Ptr(%v) = %+v, want %d", tc.in, got, tc.want)
}
} else if got != nil {
t.Fatalf("asInt64Ptr(%v) should be nil, got %d", tc.in, *got)
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/Wei-Shaw/sub2api/internal/config"
"github.com/Wei-Shaw/sub2api/internal/pkg/logger"
"github.com/google/wire"
"github.com/redis/go-redis/v9"
)
@@ -193,6 +194,13 @@ func ProvideOpsCleanupService(
return svc
}
func ProvideOpsSystemLogSink(opsRepo OpsRepository) *OpsSystemLogSink {
sink := NewOpsSystemLogSink(opsRepo)
sink.Start()
logger.SetSink(sink)
return sink
}
// ProvideSoraMediaStorage 初始化 Sora 媒体存储
func ProvideSoraMediaStorage(cfg *config.Config) *SoraMediaStorage {
return NewSoraMediaStorage(cfg)
@@ -268,6 +276,7 @@ var ProviderSet = wire.NewSet(
NewAccountUsageService,
NewAccountTestService,
NewSettingService,
ProvideOpsSystemLogSink,
NewOpsService,
ProvideOpsMetricsCollector,
ProvideOpsAggregationService,