- logger: sinkCore 包装 tee core 时绕过了子 core 的 Check 级别过滤, 导致每条日志同时写入 stdout 和 stderr,表现为启动日志重复显示。 修复为正确委托 Check 给内部 tee core,sinkCore.Write 仅负责 sink 转发。 - migration 054: 移除冗余的遗留列回填逻辑,migration 009 已完成数据迁移, 直接删除遗留列即可。
520 lines
12 KiB
Go
520 lines
12 KiB
Go
package logger
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"log/slog"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"go.uber.org/zap"
|
||
"go.uber.org/zap/zapcore"
|
||
"gopkg.in/natefinch/lumberjack.v2"
|
||
)
|
||
|
||
type Level = zapcore.Level
|
||
|
||
const (
|
||
LevelDebug = zapcore.DebugLevel
|
||
LevelInfo = zapcore.InfoLevel
|
||
LevelWarn = zapcore.WarnLevel
|
||
LevelError = zapcore.ErrorLevel
|
||
LevelFatal = zapcore.FatalLevel
|
||
)
|
||
|
||
type Sink interface {
|
||
WriteLogEvent(event *LogEvent)
|
||
}
|
||
|
||
type LogEvent struct {
|
||
Time time.Time
|
||
Level string
|
||
Component string
|
||
Message string
|
||
LoggerName string
|
||
Fields map[string]any
|
||
}
|
||
|
||
var (
|
||
mu sync.RWMutex
|
||
global *zap.Logger
|
||
sugar *zap.SugaredLogger
|
||
atomicLevel zap.AtomicLevel
|
||
initOptions InitOptions
|
||
currentSink Sink
|
||
stdLogUndo func()
|
||
bootstrapOnce sync.Once
|
||
)
|
||
|
||
func InitBootstrap() {
|
||
bootstrapOnce.Do(func() {
|
||
if err := Init(bootstrapOptions()); err != nil {
|
||
_, _ = fmt.Fprintf(os.Stderr, "logger bootstrap init failed: %v\n", err)
|
||
}
|
||
})
|
||
}
|
||
|
||
func Init(options InitOptions) error {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
return initLocked(options)
|
||
}
|
||
|
||
func initLocked(options InitOptions) error {
|
||
normalized := options.normalized()
|
||
zl, al, err := buildLogger(normalized)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
prev := global
|
||
global = zl
|
||
sugar = zl.Sugar()
|
||
atomicLevel = al
|
||
initOptions = normalized
|
||
|
||
bridgeSlogLocked()
|
||
bridgeStdLogLocked()
|
||
|
||
if prev != nil {
|
||
_ = prev.Sync()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func Reconfigure(mutator func(*InitOptions) error) error {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
next := initOptions
|
||
if mutator != nil {
|
||
if err := mutator(&next); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return initLocked(next)
|
||
}
|
||
|
||
func SetLevel(level string) error {
|
||
lv, ok := parseLevel(level)
|
||
if !ok {
|
||
return fmt.Errorf("invalid log level: %s", level)
|
||
}
|
||
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
atomicLevel.SetLevel(lv)
|
||
initOptions.Level = strings.ToLower(strings.TrimSpace(level))
|
||
return nil
|
||
}
|
||
|
||
func CurrentLevel() string {
|
||
mu.RLock()
|
||
defer mu.RUnlock()
|
||
if global == nil {
|
||
return "info"
|
||
}
|
||
return atomicLevel.Level().String()
|
||
}
|
||
|
||
func SetSink(sink Sink) {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
currentSink = sink
|
||
}
|
||
|
||
// WriteSinkEvent 直接写入日志 sink,不经过全局日志级别门控。
|
||
// 用于需要“可观测性入库”与“业务输出级别”解耦的场景(例如 ops 系统日志索引)。
|
||
func WriteSinkEvent(level, component, message string, fields map[string]any) {
|
||
mu.RLock()
|
||
sink := currentSink
|
||
mu.RUnlock()
|
||
if sink == nil {
|
||
return
|
||
}
|
||
|
||
level = strings.ToLower(strings.TrimSpace(level))
|
||
if level == "" {
|
||
level = "info"
|
||
}
|
||
component = strings.TrimSpace(component)
|
||
message = strings.TrimSpace(message)
|
||
if message == "" {
|
||
return
|
||
}
|
||
|
||
eventFields := make(map[string]any, len(fields)+1)
|
||
for k, v := range fields {
|
||
eventFields[k] = v
|
||
}
|
||
if component != "" {
|
||
if _, ok := eventFields["component"]; !ok {
|
||
eventFields["component"] = component
|
||
}
|
||
}
|
||
|
||
sink.WriteLogEvent(&LogEvent{
|
||
Time: time.Now(),
|
||
Level: level,
|
||
Component: component,
|
||
Message: message,
|
||
LoggerName: component,
|
||
Fields: eventFields,
|
||
})
|
||
}
|
||
|
||
func L() *zap.Logger {
|
||
mu.RLock()
|
||
defer mu.RUnlock()
|
||
if global != nil {
|
||
return global
|
||
}
|
||
return zap.NewNop()
|
||
}
|
||
|
||
func S() *zap.SugaredLogger {
|
||
mu.RLock()
|
||
defer mu.RUnlock()
|
||
if sugar != nil {
|
||
return sugar
|
||
}
|
||
return zap.NewNop().Sugar()
|
||
}
|
||
|
||
func With(fields ...zap.Field) *zap.Logger {
|
||
return L().With(fields...)
|
||
}
|
||
|
||
func Sync() {
|
||
mu.RLock()
|
||
l := global
|
||
mu.RUnlock()
|
||
if l != nil {
|
||
_ = l.Sync()
|
||
}
|
||
}
|
||
|
||
func bridgeStdLogLocked() {
|
||
if stdLogUndo != nil {
|
||
stdLogUndo()
|
||
stdLogUndo = nil
|
||
}
|
||
|
||
prevFlags := log.Flags()
|
||
prevPrefix := log.Prefix()
|
||
prevWriter := log.Writer()
|
||
|
||
log.SetFlags(0)
|
||
log.SetPrefix("")
|
||
log.SetOutput(newStdLogBridge(global.Named("stdlog")))
|
||
|
||
stdLogUndo = func() {
|
||
log.SetOutput(prevWriter)
|
||
log.SetFlags(prevFlags)
|
||
log.SetPrefix(prevPrefix)
|
||
}
|
||
}
|
||
|
||
func bridgeSlogLocked() {
|
||
slog.SetDefault(slog.New(newSlogZapHandler(global.Named("slog"))))
|
||
}
|
||
|
||
func buildLogger(options InitOptions) (*zap.Logger, zap.AtomicLevel, error) {
|
||
level, _ := parseLevel(options.Level)
|
||
atomic := zap.NewAtomicLevelAt(level)
|
||
|
||
encoderCfg := zapcore.EncoderConfig{
|
||
TimeKey: "time",
|
||
LevelKey: "level",
|
||
NameKey: "logger",
|
||
CallerKey: "caller",
|
||
MessageKey: "msg",
|
||
StacktraceKey: "stacktrace",
|
||
LineEnding: zapcore.DefaultLineEnding,
|
||
EncodeLevel: zapcore.CapitalLevelEncoder,
|
||
EncodeTime: zapcore.ISO8601TimeEncoder,
|
||
EncodeDuration: zapcore.MillisDurationEncoder,
|
||
EncodeCaller: zapcore.ShortCallerEncoder,
|
||
}
|
||
|
||
var enc zapcore.Encoder
|
||
if options.Format == "console" {
|
||
enc = zapcore.NewConsoleEncoder(encoderCfg)
|
||
} else {
|
||
enc = zapcore.NewJSONEncoder(encoderCfg)
|
||
}
|
||
|
||
sinkCore := newSinkCore()
|
||
cores := make([]zapcore.Core, 0, 3)
|
||
|
||
if options.Output.ToStdout {
|
||
infoPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
|
||
return lvl >= atomic.Level() && lvl < zapcore.WarnLevel
|
||
})
|
||
errPriority := zap.LevelEnablerFunc(func(lvl zapcore.Level) bool {
|
||
return lvl >= atomic.Level() && lvl >= zapcore.WarnLevel
|
||
})
|
||
cores = append(cores, zapcore.NewCore(enc, zapcore.Lock(os.Stdout), infoPriority))
|
||
cores = append(cores, zapcore.NewCore(enc, zapcore.Lock(os.Stderr), errPriority))
|
||
}
|
||
|
||
if options.Output.ToFile {
|
||
fileCore, filePath, fileErr := buildFileCore(enc, atomic, options)
|
||
if fileErr != nil {
|
||
_, _ = fmt.Fprintf(os.Stderr, "time=%s level=WARN msg=\"日志文件输出初始化失败,降级为仅标准输出\" path=%s err=%v\n",
|
||
time.Now().Format(time.RFC3339Nano),
|
||
filePath,
|
||
fileErr,
|
||
)
|
||
} else {
|
||
cores = append(cores, fileCore)
|
||
}
|
||
}
|
||
|
||
if len(cores) == 0 {
|
||
cores = append(cores, zapcore.NewCore(enc, zapcore.Lock(os.Stdout), atomic))
|
||
}
|
||
|
||
core := zapcore.NewTee(cores...)
|
||
if options.Sampling.Enabled {
|
||
core = zapcore.NewSamplerWithOptions(core, samplingTick(), options.Sampling.Initial, options.Sampling.Thereafter)
|
||
}
|
||
core = sinkCore.Wrap(core)
|
||
|
||
stacktraceLevel, _ := parseStacktraceLevel(options.StacktraceLevel)
|
||
zapOpts := make([]zap.Option, 0, 5)
|
||
if options.Caller {
|
||
zapOpts = append(zapOpts, zap.AddCaller())
|
||
}
|
||
if stacktraceLevel <= zapcore.FatalLevel {
|
||
zapOpts = append(zapOpts, zap.AddStacktrace(stacktraceLevel))
|
||
}
|
||
|
||
logger := zap.New(core, zapOpts...).With(
|
||
zap.String("service", options.ServiceName),
|
||
zap.String("env", options.Environment),
|
||
)
|
||
return logger, atomic, nil
|
||
}
|
||
|
||
func buildFileCore(enc zapcore.Encoder, atomic zap.AtomicLevel, options InitOptions) (zapcore.Core, string, error) {
|
||
filePath := options.Output.FilePath
|
||
if strings.TrimSpace(filePath) == "" {
|
||
filePath = resolveLogFilePath("")
|
||
}
|
||
|
||
dir := filepath.Dir(filePath)
|
||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||
return nil, filePath, err
|
||
}
|
||
lj := &lumberjack.Logger{
|
||
Filename: filePath,
|
||
MaxSize: options.Rotation.MaxSizeMB,
|
||
MaxBackups: options.Rotation.MaxBackups,
|
||
MaxAge: options.Rotation.MaxAgeDays,
|
||
Compress: options.Rotation.Compress,
|
||
LocalTime: options.Rotation.LocalTime,
|
||
}
|
||
return zapcore.NewCore(enc, zapcore.AddSync(lj), atomic), filePath, nil
|
||
}
|
||
|
||
type sinkCore struct {
|
||
core zapcore.Core
|
||
fields []zapcore.Field
|
||
}
|
||
|
||
func newSinkCore() *sinkCore {
|
||
return &sinkCore{}
|
||
}
|
||
|
||
func (s *sinkCore) Wrap(core zapcore.Core) zapcore.Core {
|
||
cp := *s
|
||
cp.core = core
|
||
return &cp
|
||
}
|
||
|
||
func (s *sinkCore) Enabled(level zapcore.Level) bool {
|
||
return s.core.Enabled(level)
|
||
}
|
||
|
||
func (s *sinkCore) With(fields []zapcore.Field) zapcore.Core {
|
||
nextFields := append([]zapcore.Field{}, s.fields...)
|
||
nextFields = append(nextFields, fields...)
|
||
return &sinkCore{
|
||
core: s.core.With(fields),
|
||
fields: nextFields,
|
||
}
|
||
}
|
||
|
||
func (s *sinkCore) Check(entry zapcore.Entry, ce *zapcore.CheckedEntry) *zapcore.CheckedEntry {
|
||
// Delegate to inner core (tee) so each sub-core's level enabler is respected.
|
||
// Then add ourselves for sink forwarding only.
|
||
ce = s.core.Check(entry, ce)
|
||
if ce != nil {
|
||
ce = ce.AddCore(entry, s)
|
||
}
|
||
return ce
|
||
}
|
||
|
||
func (s *sinkCore) Write(entry zapcore.Entry, fields []zapcore.Field) error {
|
||
// Only handle sink forwarding — the inner cores write via their own
|
||
// Write methods (added to CheckedEntry by s.core.Check above).
|
||
mu.RLock()
|
||
sink := currentSink
|
||
mu.RUnlock()
|
||
if sink == nil {
|
||
return nil
|
||
}
|
||
|
||
enc := zapcore.NewMapObjectEncoder()
|
||
for _, f := range s.fields {
|
||
f.AddTo(enc)
|
||
}
|
||
for _, f := range fields {
|
||
f.AddTo(enc)
|
||
}
|
||
|
||
event := &LogEvent{
|
||
Time: entry.Time,
|
||
Level: strings.ToLower(entry.Level.String()),
|
||
Component: entry.LoggerName,
|
||
Message: entry.Message,
|
||
LoggerName: entry.LoggerName,
|
||
Fields: enc.Fields,
|
||
}
|
||
sink.WriteLogEvent(event)
|
||
return nil
|
||
}
|
||
|
||
func (s *sinkCore) Sync() error {
|
||
return s.core.Sync()
|
||
}
|
||
|
||
type stdLogBridge struct {
|
||
logger *zap.Logger
|
||
}
|
||
|
||
func newStdLogBridge(l *zap.Logger) io.Writer {
|
||
if l == nil {
|
||
l = zap.NewNop()
|
||
}
|
||
return &stdLogBridge{logger: l}
|
||
}
|
||
|
||
func (b *stdLogBridge) Write(p []byte) (int, error) {
|
||
msg := normalizeStdLogMessage(string(p))
|
||
if msg == "" {
|
||
return len(p), nil
|
||
}
|
||
|
||
level := inferStdLogLevel(msg)
|
||
entry := b.logger.WithOptions(zap.AddCallerSkip(4))
|
||
|
||
switch level {
|
||
case LevelDebug:
|
||
entry.Debug(msg, zap.Bool("legacy_stdlog", true))
|
||
case LevelWarn:
|
||
entry.Warn(msg, zap.Bool("legacy_stdlog", true))
|
||
case LevelError, LevelFatal:
|
||
entry.Error(msg, zap.Bool("legacy_stdlog", true))
|
||
default:
|
||
entry.Info(msg, zap.Bool("legacy_stdlog", true))
|
||
}
|
||
return len(p), nil
|
||
}
|
||
|
||
func normalizeStdLogMessage(raw string) string {
|
||
msg := strings.TrimSpace(strings.ReplaceAll(raw, "\n", " "))
|
||
if msg == "" {
|
||
return ""
|
||
}
|
||
return strings.Join(strings.Fields(msg), " ")
|
||
}
|
||
|
||
func inferStdLogLevel(msg string) Level {
|
||
lower := strings.ToLower(strings.TrimSpace(msg))
|
||
if lower == "" {
|
||
return LevelInfo
|
||
}
|
||
|
||
if strings.HasPrefix(lower, "[debug]") || strings.HasPrefix(lower, "debug:") {
|
||
return LevelDebug
|
||
}
|
||
if strings.HasPrefix(lower, "[warn]") || strings.HasPrefix(lower, "[warning]") || strings.HasPrefix(lower, "warn:") || strings.HasPrefix(lower, "warning:") {
|
||
return LevelWarn
|
||
}
|
||
if strings.HasPrefix(lower, "[error]") || strings.HasPrefix(lower, "error:") || strings.HasPrefix(lower, "fatal:") || strings.HasPrefix(lower, "panic:") {
|
||
return LevelError
|
||
}
|
||
|
||
if strings.Contains(lower, " failed") || strings.Contains(lower, "error") || strings.Contains(lower, "panic") || strings.Contains(lower, "fatal") {
|
||
return LevelError
|
||
}
|
||
if strings.Contains(lower, "warning") || strings.Contains(lower, "warn") || strings.Contains(lower, " retry") || strings.Contains(lower, " queue full") || strings.Contains(lower, "fallback") {
|
||
return LevelWarn
|
||
}
|
||
return LevelInfo
|
||
}
|
||
|
||
// LegacyPrintf 用于平滑迁移历史的 printf 风格日志到结构化 logger。
|
||
func LegacyPrintf(component, format string, args ...any) {
|
||
msg := normalizeStdLogMessage(fmt.Sprintf(format, args...))
|
||
if msg == "" {
|
||
return
|
||
}
|
||
|
||
mu.RLock()
|
||
initialized := global != nil
|
||
mu.RUnlock()
|
||
if !initialized {
|
||
// 在日志系统未初始化前,回退到标准库 log,避免测试/工具链丢日志。
|
||
log.Print(msg)
|
||
return
|
||
}
|
||
|
||
l := L()
|
||
if component != "" {
|
||
l = l.With(zap.String("component", component))
|
||
}
|
||
l = l.WithOptions(zap.AddCallerSkip(1))
|
||
|
||
switch inferStdLogLevel(msg) {
|
||
case LevelDebug:
|
||
l.Debug(msg, zap.Bool("legacy_printf", true))
|
||
case LevelWarn:
|
||
l.Warn(msg, zap.Bool("legacy_printf", true))
|
||
case LevelError, LevelFatal:
|
||
l.Error(msg, zap.Bool("legacy_printf", true))
|
||
default:
|
||
l.Info(msg, zap.Bool("legacy_printf", true))
|
||
}
|
||
}
|
||
|
||
type contextKey string
|
||
|
||
const loggerContextKey contextKey = "ctx_logger"
|
||
|
||
func IntoContext(ctx context.Context, l *zap.Logger) context.Context {
|
||
if ctx == nil {
|
||
ctx = context.Background()
|
||
}
|
||
if l == nil {
|
||
l = L()
|
||
}
|
||
return context.WithValue(ctx, loggerContextKey, l)
|
||
}
|
||
|
||
func FromContext(ctx context.Context) *zap.Logger {
|
||
if ctx == nil {
|
||
return L()
|
||
}
|
||
if l, ok := ctx.Value(loggerContextKey).(*zap.Logger); ok && l != nil {
|
||
return l
|
||
}
|
||
return L()
|
||
}
|