Files
sub2api/backend/internal/pkg/logger/logger.go
shaw 980fc9608f fix: 修复日志重复输出及清理冗余迁移逻辑
- logger: sinkCore 包装 tee core 时绕过了子 core 的 Check 级别过滤,
  导致每条日志同时写入 stdout 和 stderr,表现为启动日志重复显示。
  修复为正确委托 Check 给内部 tee core,sinkCore.Write 仅负责 sink 转发。
- migration 054: 移除冗余的遗留列回填逻辑,migration 009 已完成数据迁移,
  直接删除遗留列即可。
2026-02-24 11:31:19 +08:00

520 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logger
import (
"context"
"fmt"
"io"
"log"
"log/slog"
"os"
"path/filepath"
"strings"
"sync"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
type Level = zapcore.Level
const (
LevelDebug = zapcore.DebugLevel
LevelInfo = zapcore.InfoLevel
LevelWarn = zapcore.WarnLevel
LevelError = zapcore.ErrorLevel
LevelFatal = zapcore.FatalLevel
)
type Sink interface {
WriteLogEvent(event *LogEvent)
}
type LogEvent struct {
Time time.Time
Level string
Component string
Message string
LoggerName string
Fields map[string]any
}
var (
mu sync.RWMutex
global *zap.Logger
sugar *zap.SugaredLogger
atomicLevel zap.AtomicLevel
initOptions InitOptions
currentSink Sink
stdLogUndo func()
bootstrapOnce sync.Once
)
func InitBootstrap() {
bootstrapOnce.Do(func() {
if err := Init(bootstrapOptions()); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "logger bootstrap init failed: %v\n", err)
}
})
}
func Init(options InitOptions) error {
mu.Lock()
defer mu.Unlock()
return initLocked(options)
}
func initLocked(options InitOptions) error {
normalized := options.normalized()
zl, al, err := buildLogger(normalized)
if err != nil {
return err
}
prev := global
global = zl
sugar = zl.Sugar()
atomicLevel = al
initOptions = normalized
bridgeSlogLocked()
bridgeStdLogLocked()
if prev != nil {
_ = prev.Sync()
}
return nil
}
func Reconfigure(mutator func(*InitOptions) error) error {
mu.Lock()
defer mu.Unlock()
next := initOptions
if mutator != nil {
if err := mutator(&next); err != nil {
return err
}
}
return initLocked(next)
}
func SetLevel(level string) error {
lv, ok := parseLevel(level)
if !ok {
return fmt.Errorf("invalid log level: %s", level)
}
mu.Lock()
defer mu.Unlock()
atomicLevel.SetLevel(lv)
initOptions.Level = strings.ToLower(strings.TrimSpace(level))
return nil
}
func CurrentLevel() string {
mu.RLock()
defer mu.RUnlock()
if global == nil {
return "info"
}
return atomicLevel.Level().String()
}
func SetSink(sink Sink) {
mu.Lock()
defer mu.Unlock()
currentSink = sink
}
// WriteSinkEvent 直接写入日志 sink不经过全局日志级别门控。
// 用于需要“可观测性入库”与“业务输出级别”解耦的场景(例如 ops 系统日志索引)。
func WriteSinkEvent(level, component, message string, fields map[string]any) {
mu.RLock()
sink := currentSink
mu.RUnlock()
if sink == nil {
return
}
level = strings.ToLower(strings.TrimSpace(level))
if level == "" {
level = "info"
}
component = strings.TrimSpace(component)
message = strings.TrimSpace(message)
if message == "" {
return
}
eventFields := make(map[string]any, len(fields)+1)
for k, v := range fields {
eventFields[k] = v
}
if component != "" {
if _, ok := eventFields["component"]; !ok {
eventFields["component"] = component
}
}
sink.WriteLogEvent(&LogEvent{
Time: time.Now(),
Level: level,
Component: component,
Message: message,
LoggerName: component,
Fields: eventFields,
})
}
func L() *zap.Logger {
mu.RLock()
defer mu.RUnlock()
if global != nil {
return global
}
return zap.NewNop()
}
func S() *zap.SugaredLogger {
mu.RLock()
defer mu.RUnlock()
if sugar != nil {
return sugar
}
return zap.NewNop().Sugar()
}
func With(fields ...zap.Field) *zap.Logger {
return L().With(fields...)
}
func Sync() {
mu.RLock()
l := global
mu.RUnlock()
if l != nil {
_ = l.Sync()
}
}
func bridgeStdLogLocked() {
if stdLogUndo != nil {
stdLogUndo()
stdLogUndo = nil
}
prevFlags := log.Flags()
prevPrefix := log.Prefix()
prevWriter := log.Writer()
log.SetFlags(0)
log.SetPrefix("")
log.SetOutput(newStdLogBridge(global.Named("stdlog")))
stdLogUndo = func() {
log.SetOutput(prevWriter)
log.SetFlags(prevFlags)
log.SetPrefix(prevPrefix)
}
}
func bridgeSlogLocked() {
slog.SetDefault(slog.New(newSlogZapHandler(global.Named("slog"))))
}
func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) {
level, _ := parseLevel(options.Level)
atomic := zap.NewAtomicLevelAt(level)
encoderCfg := zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "level",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "msg",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: zapcore.CapitalLevelEncoder,
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.MillisDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}
var enc zapcore.Encoder
if options.Format == "console" {
enc = zapcore.NewConsoleEncoder(encoderCfg)
} else {
enc = zapcore.NewJSONEncoder(encoderCfg)
}
sinkCore := newSinkCore()
cores := make([]zapcore.Core, 0, 3)
if options.Output.ToStdout {
infoPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= atomic.Level() && lvl < zapcore.WarnLevel
})
errPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
return lvl >= atomic.Level() && lvl >= zapcore.WarnLevel
})
cores = append(cores, zapcore.NewCore(enc, zapcore.Lock(os.Stdout), infoPriority))
cores = append(cores, zapcore.NewCore(enc, zapcore.Lock(os.Stderr), errPriority))
}
if options.Output.ToFile {
fileCore, filePath, fileErr := buildFileCore(enc, atomic, options)
if fileErr != nil {
_, _ = fmt.Fprintf(os.Stderr, "time=%s level=WARN msg=\"日志文件输出初始化失败,降级为仅标准输出\" path=%s err=%v\n",
time.Now().Format(time.RFC3339Nano),
filePath,
fileErr,
)
} else {
cores = append(cores, fileCore)
}
}
if len(cores) == 0 {
cores = append(cores, zapcore.NewCore(enc, zapcore.Lock(os.Stdout), atomic))
}
core := zapcore.NewTee(cores...)
if options.Sampling.Enabled {
core = zapcore.NewSamplerWithOptions(core, samplingTick(), options.Sampling.Initial, options.Sampling.Thereafter)
}
core = sinkCore.Wrap(core)
stacktraceLevel, _ := parseStacktraceLevel(options.StacktraceLevel)
zapOpts := make([]zap.Option, 0, 5)
if options.Caller {
zapOpts = append(zapOpts, zap.AddCaller())
}
if stacktraceLevel <= zapcore.FatalLevel {
zapOpts = append(zapOpts, zap.AddStacktrace(stacktraceLevel))
}
logger := zap.New(core, zapOpts...).With(
zap.String("service", options.ServiceName),
zap.String("env", options.Environment),
)
return logger, atomic, nil
}
func buildFileCore(enc zapcore.Encoder, atomic zap.AtomicLevel, options InitOptions) (zapcore.Core, string, error) {
filePath := options.Output.FilePath
if strings.TrimSpace(filePath) == "" {
filePath = resolveLogFilePath("")
}
dir := filepath.Dir(filePath)
if err := os.MkdirAll(dir, 0o755); err != nil {
return nil, filePath, err
}
lj := &lumberjack.Logger{
Filename: filePath,
MaxSize: options.Rotation.MaxSizeMB,
MaxBackups: options.Rotation.MaxBackups,
MaxAge: options.Rotation.MaxAgeDays,
Compress: options.Rotation.Compress,
LocalTime: options.Rotation.LocalTime,
}
return zapcore.NewCore(enc, zapcore.AddSync(lj), atomic), filePath, nil
}
type sinkCore struct {
core zapcore.Core
fields []zapcore.Field
}
func newSinkCore() *sinkCore {
return &sinkCore{}
}
func (s *sinkCore) Wrap(core zapcore.Core) zapcore.Core {
cp := *s
cp.core = core
return &cp
}
func (s *sinkCore) Enabled(level zapcore.Level) bool {
return s.core.Enabled(level)
}
func (s *sinkCore) With(fields []zapcore.Field) zapcore.Core {
nextFields := append([]zapcore.Field{}, s.fields...)
nextFields = append(nextFields, fields...)
return &sinkCore{
core: s.core.With(fields),
fields: nextFields,
}
}
func (s *sinkCore) Check(entry zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
// Delegate to inner core (tee) so each sub-core's level enabler is respected.
// Then add ourselves for sink forwarding only.
ce = s.core.Check(entry, ce)
if ce != nil {
ce = ce.AddCore(entry, s)
}
return ce
}
func (s *sinkCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
// Only handle sink forwarding — the inner cores write via their own
// Write methods (added to CheckedEntry by s.core.Check above).
mu.RLock()
sink := currentSink
mu.RUnlock()
if sink == nil {
return nil
}
enc := zapcore.NewMapObjectEncoder()
for _, f := range s.fields {
f.AddTo(enc)
}
for _, f := range fields {
f.AddTo(enc)
}
event := &LogEvent{
Time: entry.Time,
Level: strings.ToLower(entry.Level.String()),
Component: entry.LoggerName,
Message: entry.Message,
LoggerName: entry.LoggerName,
Fields: enc.Fields,
}
sink.WriteLogEvent(event)
return nil
}
func (s *sinkCore) Sync() error {
return s.core.Sync()
}
type stdLogBridge struct {
logger *zap.Logger
}
func newStdLogBridge(l *zap.Logger) io.Writer {
if l == nil {
l = zap.NewNop()
}
return &stdLogBridge{logger: l}
}
func (b *stdLogBridge) Write(p []byte) (int, error) {
msg := normalizeStdLogMessage(string(p))
if msg == "" {
return len(p), nil
}
level := inferStdLogLevel(msg)
entry := b.logger.WithOptions(zap.AddCallerSkip(4))
switch level {
case LevelDebug:
entry.Debug(msg, zap.Bool("legacy_stdlog", true))
case LevelWarn:
entry.Warn(msg, zap.Bool("legacy_stdlog", true))
case LevelError, LevelFatal:
entry.Error(msg, zap.Bool("legacy_stdlog", true))
default:
entry.Info(msg, zap.Bool("legacy_stdlog", true))
}
return len(p), nil
}
func normalizeStdLogMessage(raw string) string {
msg := strings.TrimSpace(strings.ReplaceAll(raw, "\n", " "))
if msg == "" {
return ""
}
return strings.Join(strings.Fields(msg), " ")
}
func inferStdLogLevel(msg string) Level {
lower := strings.ToLower(strings.TrimSpace(msg))
if lower == "" {
return LevelInfo
}
if strings.HasPrefix(lower, "[debug]") || strings.HasPrefix(lower, "debug:") {
return LevelDebug
}
if strings.HasPrefix(lower, "[warn]") || strings.HasPrefix(lower, "[warning]") || strings.HasPrefix(lower, "warn:") || strings.HasPrefix(lower, "warning:") {
return LevelWarn
}
if strings.HasPrefix(lower, "[error]") || strings.HasPrefix(lower, "error:") || strings.HasPrefix(lower, "fatal:") || strings.HasPrefix(lower, "panic:") {
return LevelError
}
if strings.Contains(lower, " failed") || strings.Contains(lower, "error") || strings.Contains(lower, "panic") || strings.Contains(lower, "fatal") {
return LevelError
}
if strings.Contains(lower, "warning") || strings.Contains(lower, "warn") || strings.Contains(lower, " retry") || strings.Contains(lower, " queue full") || strings.Contains(lower, "fallback") {
return LevelWarn
}
return LevelInfo
}
// LegacyPrintf 用于平滑迁移历史的 printf 风格日志到结构化 logger。
func LegacyPrintf(component, format string, args ...any) {
msg := normalizeStdLogMessage(fmt.Sprintf(format, args...))
if msg == "" {
return
}
mu.RLock()
initialized := global != nil
mu.RUnlock()
if !initialized {
// 在日志系统未初始化前,回退到标准库 log避免测试/工具链丢日志。
log.Print(msg)
return
}
l := L()
if component != "" {
l = l.With(zap.String("component", component))
}
l = l.WithOptions(zap.AddCallerSkip(1))
switch inferStdLogLevel(msg) {
case LevelDebug:
l.Debug(msg, zap.Bool("legacy_printf", true))
case LevelWarn:
l.Warn(msg, zap.Bool("legacy_printf", true))
case LevelError, LevelFatal:
l.Error(msg, zap.Bool("legacy_printf", true))
default:
l.Info(msg, zap.Bool("legacy_printf", true))
}
}
type contextKey string
const loggerContextKey contextKey = "ctx_logger"
func IntoContext(ctx context.Context, l *zap.Logger) context.Context {
if ctx == nil {
ctx = context.Background()
}
if l == nil {
l = L()
}
return context.WithValue(ctx, loggerContextKey, l)
}
func FromContext(ctx context.Context) *zap.Logger {
if ctx == nil {
return L()
}
if l, ok := ctx.Value(loggerContextKey).(*zap.Logger); ok && l != nil {
return l
}
return L()
}