2049 lines
67 KiB
Go
2049 lines
67 KiB
Go
package service
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"crypto/sha256"
|
||
"encoding/base64"
|
||
"encoding/binary"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net/http"
|
||
"net/url"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
infraerrors "github.com/Wei-Shaw/sub2api/internal/pkg/errors"
|
||
"github.com/Wei-Shaw/sub2api/internal/pkg/pagination"
|
||
)
|
||
|
||
const (
|
||
ContentModerationModeOff = "off"
|
||
ContentModerationModeObserve = "observe"
|
||
ContentModerationModePreBlock = "pre_block"
|
||
|
||
contentModerationAPIKeysModeAppend = "append"
|
||
contentModerationAPIKeysModeReplace = "replace"
|
||
|
||
ContentModerationActionAllow = "allow"
|
||
ContentModerationActionBlock = "block"
|
||
ContentModerationActionHashBlock = "hash_block"
|
||
ContentModerationActionError = "error"
|
||
|
||
ContentModerationProtocolAnthropicMessages = "anthropic_messages"
|
||
ContentModerationProtocolOpenAIResponses = "openai_responses"
|
||
ContentModerationProtocolOpenAIChat = "openai_chat_completions"
|
||
ContentModerationProtocolGemini = "gemini"
|
||
ContentModerationProtocolOpenAIImages = "openai_images"
|
||
|
||
defaultContentModerationBaseURL = "https://api.openai.com"
|
||
defaultContentModerationModel = "omni-moderation-latest"
|
||
defaultContentModerationTimeoutMS = 3000
|
||
maxContentModerationTimeoutMS = 30000
|
||
maxModerationInputRunes = 12000
|
||
maxModerationExcerptRunes = 240
|
||
|
||
defaultContentModerationWorkerCount = 4
|
||
maxContentModerationWorkerCount = 32
|
||
defaultContentModerationQueueSize = 32768
|
||
maxContentModerationQueueSize = 100000
|
||
defaultContentModerationBanThreshold = 10
|
||
defaultContentModerationViolationWindowHours = 720
|
||
defaultContentModerationBlockHTTPStatus = http.StatusForbidden
|
||
defaultContentModerationBlockMessage = "内容审计命中风险规则,请调整输入后重试"
|
||
defaultContentModerationRetryCount = 2
|
||
maxContentModerationRetryCount = 5
|
||
defaultContentModerationHitRetentionDays = 180
|
||
defaultContentModerationNonHitRetentionDays = 3
|
||
maxContentModerationRetentionDays = 3650
|
||
maxContentModerationNonHitRetentionDays = 3
|
||
contentModerationKeyRateLimitFreezeDuration = time.Minute
|
||
contentModerationKeyAuthFreezeDuration = 10 * time.Minute
|
||
contentModerationKeyHTTPErrorFreezeDuration = 10 * time.Second
|
||
maxContentModerationInputImages = 1
|
||
maxContentModerationTestImages = maxContentModerationInputImages
|
||
maxContentModerationTestImageBytes = 8 * 1024 * 1024
|
||
maxContentModerationTestImageDataURLBytes = 12 * 1024 * 1024
|
||
|
||
contentModerationCleanupInterval = 24 * time.Hour
|
||
contentModerationCleanupTimeout = 30 * time.Minute
|
||
contentModerationCleanupDelay = 5 * time.Minute
|
||
)
|
||
|
||
var contentModerationCategoryOrder = []string{
|
||
"harassment",
|
||
"harassment/threatening",
|
||
"hate",
|
||
"hate/threatening",
|
||
"illicit",
|
||
"illicit/violent",
|
||
"self-harm",
|
||
"self-harm/intent",
|
||
"self-harm/instructions",
|
||
"sexual",
|
||
"sexual/minors",
|
||
"violence",
|
||
"violence/graphic",
|
||
}
|
||
|
||
func ContentModerationDefaultThresholds() map[string]float64 {
|
||
return map[string]float64{
|
||
"harassment": 0.98,
|
||
"harassment/threatening": 0.90,
|
||
"hate": 0.65,
|
||
"hate/threatening": 0.65,
|
||
"illicit": 0.95,
|
||
"illicit/violent": 0.95,
|
||
"self-harm": 0.65,
|
||
"self-harm/intent": 0.85,
|
||
"self-harm/instructions": 0.65,
|
||
"sexual": 0.65,
|
||
"sexual/minors": 0.65,
|
||
"violence": 0.95,
|
||
"violence/graphic": 0.95,
|
||
}
|
||
}
|
||
|
||
func ContentModerationCategories() []string {
|
||
out := make([]string, len(contentModerationCategoryOrder))
|
||
copy(out, contentModerationCategoryOrder)
|
||
return out
|
||
}
|
||
|
||
type ContentModerationConfig struct {
|
||
Enabled bool `json:"enabled"`
|
||
Mode string `json:"mode"`
|
||
BaseURL string `json:"base_url"`
|
||
Model string `json:"model"`
|
||
APIKey string `json:"api_key,omitempty"`
|
||
APIKeys []string `json:"api_keys,omitempty"`
|
||
TimeoutMS int `json:"timeout_ms"`
|
||
SampleRate int `json:"sample_rate"`
|
||
AllGroups bool `json:"all_groups"`
|
||
GroupIDs []int64 `json:"group_ids"`
|
||
RecordNonHits bool `json:"record_non_hits"`
|
||
Thresholds map[string]float64 `json:"thresholds"`
|
||
WorkerCount int `json:"worker_count"`
|
||
QueueSize int `json:"queue_size"`
|
||
BlockStatus int `json:"block_status"`
|
||
BlockMessage string `json:"block_message"`
|
||
EmailOnHit bool `json:"email_on_hit"`
|
||
AutoBanEnabled bool `json:"auto_ban_enabled"`
|
||
BanThreshold int `json:"ban_threshold"`
|
||
ViolationWindowHours int `json:"violation_window_hours"`
|
||
RetryCount int `json:"retry_count"`
|
||
HitRetentionDays int `json:"hit_retention_days"`
|
||
NonHitRetentionDays int `json:"non_hit_retention_days"`
|
||
PreHashCheckEnabled bool `json:"pre_hash_check_enabled"`
|
||
}
|
||
|
||
type ContentModerationConfigView struct {
|
||
Enabled bool `json:"enabled"`
|
||
Mode string `json:"mode"`
|
||
BaseURL string `json:"base_url"`
|
||
Model string `json:"model"`
|
||
APIKeyConfigured bool `json:"api_key_configured"`
|
||
APIKeyMasked string `json:"api_key_masked"`
|
||
APIKeyCount int `json:"api_key_count"`
|
||
APIKeyMasks []string `json:"api_key_masks"`
|
||
APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"`
|
||
TimeoutMS int `json:"timeout_ms"`
|
||
SampleRate int `json:"sample_rate"`
|
||
AllGroups bool `json:"all_groups"`
|
||
GroupIDs []int64 `json:"group_ids"`
|
||
RecordNonHits bool `json:"record_non_hits"`
|
||
WorkerCount int `json:"worker_count"`
|
||
QueueSize int `json:"queue_size"`
|
||
BlockStatus int `json:"block_status"`
|
||
BlockMessage string `json:"block_message"`
|
||
EmailOnHit bool `json:"email_on_hit"`
|
||
AutoBanEnabled bool `json:"auto_ban_enabled"`
|
||
BanThreshold int `json:"ban_threshold"`
|
||
ViolationWindowHours int `json:"violation_window_hours"`
|
||
RetryCount int `json:"retry_count"`
|
||
HitRetentionDays int `json:"hit_retention_days"`
|
||
NonHitRetentionDays int `json:"non_hit_retention_days"`
|
||
PreHashCheckEnabled bool `json:"pre_hash_check_enabled"`
|
||
}
|
||
|
||
type ContentModerationAPIKeyStatus struct {
|
||
Index int `json:"index"`
|
||
KeyHash string `json:"key_hash"`
|
||
Masked string `json:"masked"`
|
||
Status string `json:"status"`
|
||
FailureCount int `json:"failure_count"`
|
||
SuccessCount int64 `json:"success_count"`
|
||
LastError string `json:"last_error"`
|
||
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
|
||
FrozenUntil *time.Time `json:"frozen_until,omitempty"`
|
||
LastLatencyMS int `json:"last_latency_ms"`
|
||
LastHTTPStatus int `json:"last_http_status"`
|
||
LastTested bool `json:"last_tested"`
|
||
Configured bool `json:"configured"`
|
||
}
|
||
|
||
type TestContentModerationAPIKeysInput struct {
|
||
APIKeys []string `json:"api_keys"`
|
||
BaseURL string `json:"base_url"`
|
||
Model string `json:"model"`
|
||
TimeoutMS int `json:"timeout_ms"`
|
||
Prompt string `json:"prompt"`
|
||
Images []string `json:"images"`
|
||
}
|
||
|
||
type TestContentModerationAPIKeysResult struct {
|
||
Items []ContentModerationAPIKeyStatus `json:"items"`
|
||
AuditResult *ContentModerationTestAuditResult `json:"audit_result,omitempty"`
|
||
ImageCount int `json:"image_count"`
|
||
}
|
||
|
||
type ContentModerationTestAuditResult struct {
|
||
Flagged bool `json:"flagged"`
|
||
HighestCategory string `json:"highest_category"`
|
||
HighestScore float64 `json:"highest_score"`
|
||
CompositeScore float64 `json:"composite_score"`
|
||
CategoryScores map[string]float64 `json:"category_scores"`
|
||
Thresholds map[string]float64 `json:"thresholds"`
|
||
}
|
||
|
||
type UpdateContentModerationConfigInput struct {
|
||
Enabled *bool `json:"enabled"`
|
||
Mode *string `json:"mode"`
|
||
BaseURL *string `json:"base_url"`
|
||
Model *string `json:"model"`
|
||
APIKey *string `json:"api_key"`
|
||
APIKeys *[]string `json:"api_keys"`
|
||
APIKeysMode string `json:"api_keys_mode"`
|
||
DeleteAPIKeyHashes *[]string `json:"delete_api_key_hashes"`
|
||
ClearAPIKey bool `json:"clear_api_key"`
|
||
TimeoutMS *int `json:"timeout_ms"`
|
||
SampleRate *int `json:"sample_rate"`
|
||
AllGroups *bool `json:"all_groups"`
|
||
GroupIDs *[]int64 `json:"group_ids"`
|
||
RecordNonHits *bool `json:"record_non_hits"`
|
||
WorkerCount *int `json:"worker_count"`
|
||
QueueSize *int `json:"queue_size"`
|
||
BlockStatus *int `json:"block_status"`
|
||
BlockMessage *string `json:"block_message"`
|
||
EmailOnHit *bool `json:"email_on_hit"`
|
||
AutoBanEnabled *bool `json:"auto_ban_enabled"`
|
||
BanThreshold *int `json:"ban_threshold"`
|
||
ViolationWindowHours *int `json:"violation_window_hours"`
|
||
RetryCount *int `json:"retry_count"`
|
||
HitRetentionDays *int `json:"hit_retention_days"`
|
||
NonHitRetentionDays *int `json:"non_hit_retention_days"`
|
||
PreHashCheckEnabled *bool `json:"pre_hash_check_enabled"`
|
||
}
|
||
|
||
type ContentModerationCheckInput struct {
|
||
RequestID string
|
||
UserID int64
|
||
UserEmail string
|
||
APIKeyID int64
|
||
APIKeyName string
|
||
GroupID *int64
|
||
GroupName string
|
||
Endpoint string
|
||
Provider string
|
||
Model string
|
||
Protocol string
|
||
Body []byte
|
||
}
|
||
|
||
type ContentModerationInput struct {
|
||
Text string
|
||
Images []string
|
||
}
|
||
|
||
func (in *ContentModerationInput) Normalize() {
|
||
if in == nil {
|
||
return
|
||
}
|
||
in.Text = trimRunes(normalizeContentModerationText(in.Text), maxModerationInputRunes)
|
||
in.Images = normalizeModerationImages(in.Images)
|
||
}
|
||
|
||
func (in ContentModerationInput) IsEmpty() bool {
|
||
return strings.TrimSpace(in.Text) == "" && len(in.Images) == 0
|
||
}
|
||
|
||
func (in ContentModerationInput) ModerationInput() any {
|
||
images := limitContentModerationImages(in.Images)
|
||
if len(images) == 0 {
|
||
return in.Text
|
||
}
|
||
parts := make([]moderationAPIInputPart, 0, len(images)+1)
|
||
if strings.TrimSpace(in.Text) != "" {
|
||
parts = append(parts, moderationAPIInputPart{Type: "text", Text: in.Text})
|
||
}
|
||
for _, image := range images {
|
||
parts = append(parts, moderationAPIInputPart{
|
||
Type: "image_url",
|
||
ImageURL: &moderationAPIImageURLRef{URL: image},
|
||
})
|
||
}
|
||
return parts
|
||
}
|
||
|
||
func (in ContentModerationInput) ExcerptText() string {
|
||
return in.Text
|
||
}
|
||
|
||
func (in ContentModerationInput) Hash() string {
|
||
h := sha256.New()
|
||
_, _ = h.Write([]byte("text:"))
|
||
_, _ = h.Write([]byte(in.Text))
|
||
for _, image := range in.Images {
|
||
imageHash := sha256.Sum256([]byte(image))
|
||
_, _ = h.Write([]byte("\nimage:"))
|
||
_, _ = h.Write([]byte(hex.EncodeToString(imageHash[:])))
|
||
}
|
||
return hex.EncodeToString(h.Sum(nil))
|
||
}
|
||
|
||
type ContentModerationDecision struct {
|
||
Allowed bool `json:"allowed"`
|
||
Blocked bool `json:"blocked"`
|
||
Flagged bool `json:"flagged"`
|
||
Message string `json:"message"`
|
||
StatusCode int `json:"status_code"`
|
||
InputHash string `json:"input_hash,omitempty"`
|
||
HighestCategory string `json:"highest_category"`
|
||
HighestScore float64 `json:"highest_score"`
|
||
CategoryScores map[string]float64 `json:"category_scores"`
|
||
Action string `json:"action"`
|
||
}
|
||
|
||
type ContentModerationLog struct {
|
||
ID int64 `json:"id"`
|
||
RequestID string `json:"request_id"`
|
||
UserID *int64 `json:"user_id,omitempty"`
|
||
UserEmail string `json:"user_email"`
|
||
APIKeyID *int64 `json:"api_key_id,omitempty"`
|
||
APIKeyName string `json:"api_key_name"`
|
||
GroupID *int64 `json:"group_id,omitempty"`
|
||
GroupName string `json:"group_name"`
|
||
Endpoint string `json:"endpoint"`
|
||
Provider string `json:"provider"`
|
||
Model string `json:"model"`
|
||
Mode string `json:"mode"`
|
||
Action string `json:"action"`
|
||
Flagged bool `json:"flagged"`
|
||
HighestCategory string `json:"highest_category"`
|
||
HighestScore float64 `json:"highest_score"`
|
||
CategoryScores map[string]float64 `json:"category_scores"`
|
||
ThresholdSnapshot map[string]float64 `json:"threshold_snapshot"`
|
||
InputExcerpt string `json:"input_excerpt"`
|
||
UpstreamLatencyMS *int `json:"upstream_latency_ms,omitempty"`
|
||
Error string `json:"error"`
|
||
ViolationCount int `json:"violation_count"`
|
||
AutoBanned bool `json:"auto_banned"`
|
||
EmailSent bool `json:"email_sent"`
|
||
UserStatus string `json:"user_status"`
|
||
QueueDelayMS *int `json:"queue_delay_ms,omitempty"`
|
||
CreatedAt time.Time `json:"created_at"`
|
||
}
|
||
|
||
type ContentModerationLogFilter struct {
|
||
Pagination pagination.PaginationParams
|
||
Result string
|
||
GroupID *int64
|
||
Endpoint string
|
||
Search string
|
||
From *time.Time
|
||
To *time.Time
|
||
}
|
||
|
||
type ContentModerationCleanupResult struct {
|
||
DeletedHit int64 `json:"deleted_hit"`
|
||
DeletedNonHit int64 `json:"deleted_non_hit"`
|
||
FinishedAt time.Time `json:"finished_at"`
|
||
}
|
||
|
||
type ContentModerationRuntimeStatus struct {
|
||
Enabled bool `json:"enabled"`
|
||
RiskControlEnabled bool `json:"risk_control_enabled"`
|
||
Mode string `json:"mode"`
|
||
WorkerCount int `json:"worker_count"`
|
||
MaxWorkers int `json:"max_workers"`
|
||
ActiveWorkers int `json:"active_workers"`
|
||
IdleWorkers int `json:"idle_workers"`
|
||
QueueSize int `json:"queue_size"`
|
||
QueueLength int `json:"queue_length"`
|
||
QueueUsagePercent float64 `json:"queue_usage_percent"`
|
||
Enqueued int64 `json:"enqueued"`
|
||
Dropped int64 `json:"dropped"`
|
||
Processed int64 `json:"processed"`
|
||
Errors int64 `json:"errors"`
|
||
APIKeyStatuses []ContentModerationAPIKeyStatus `json:"api_key_statuses"`
|
||
FlaggedHashCount int64 `json:"flagged_hash_count"`
|
||
LastCleanupAt *time.Time `json:"last_cleanup_at,omitempty"`
|
||
LastCleanupDeletedHit int64 `json:"last_cleanup_deleted_hit"`
|
||
LastCleanupDeletedNonHit int64 `json:"last_cleanup_deleted_non_hit"`
|
||
}
|
||
|
||
type ContentModerationUnbanUserResult struct {
|
||
UserID int64 `json:"user_id"`
|
||
Status string `json:"status"`
|
||
}
|
||
|
||
type ContentModerationDeleteHashResult struct {
|
||
InputHash string `json:"input_hash"`
|
||
Deleted bool `json:"deleted"`
|
||
}
|
||
|
||
type ContentModerationClearHashesResult struct {
|
||
Deleted int64 `json:"deleted"`
|
||
}
|
||
|
||
type ContentModerationRepository interface {
|
||
CreateLog(ctx context.Context, log *ContentModerationLog) error
|
||
ListLogs(ctx context.Context, filter ContentModerationLogFilter) ([]ContentModerationLog, *pagination.PaginationResult, error)
|
||
CountFlaggedByUserSince(ctx context.Context, userID int64, since time.Time) (int, error)
|
||
CleanupExpiredLogs(ctx context.Context, hitBefore time.Time, nonHitBefore time.Time) (*ContentModerationCleanupResult, error)
|
||
}
|
||
|
||
type ContentModerationHashCache interface {
|
||
RecordFlaggedInputHash(ctx context.Context, inputHash string) error
|
||
HasFlaggedInputHash(ctx context.Context, inputHash string) (bool, error)
|
||
DeleteFlaggedInputHash(ctx context.Context, inputHash string) (bool, error)
|
||
ClearFlaggedInputHashes(ctx context.Context) (int64, error)
|
||
CountFlaggedInputHashes(ctx context.Context) (int64, error)
|
||
}
|
||
|
||
type ContentModerationService struct {
|
||
settingRepo SettingRepository
|
||
repo ContentModerationRepository
|
||
hashCache ContentModerationHashCache
|
||
groupRepo GroupRepository
|
||
userRepo UserRepository
|
||
authCacheInvalidator APIKeyAuthCacheInvalidator
|
||
emailService *EmailService
|
||
httpClient *http.Client
|
||
asyncQueue chan contentModerationTask
|
||
workerCount int
|
||
apiKeyCursor atomic.Uint64
|
||
asyncActive atomic.Int64
|
||
asyncEnqueued atomic.Int64
|
||
asyncDropped atomic.Int64
|
||
asyncProcessed atomic.Int64
|
||
asyncErrors atomic.Int64
|
||
lastCleanupUnix atomic.Int64
|
||
lastCleanupDeletedHit atomic.Int64
|
||
lastCleanupDeletedNonHit atomic.Int64
|
||
keyHealthMu sync.Mutex
|
||
keyHealth map[string]*contentModerationKeyHealth
|
||
}
|
||
|
||
type contentModerationTask struct {
|
||
input ContentModerationCheckInput
|
||
content ContentModerationInput
|
||
inputHash string
|
||
enqueuedAt time.Time
|
||
}
|
||
|
||
type contentModerationKeyHealth struct {
|
||
Hash string
|
||
Masked string
|
||
FailureCount int
|
||
SuccessCount int64
|
||
LastError string
|
||
LastCheckedAt time.Time
|
||
FrozenUntil time.Time
|
||
LastLatencyMS int
|
||
LastHTTPStatus int
|
||
LastTested bool
|
||
}
|
||
|
||
func NewContentModerationService(
|
||
settingRepo SettingRepository,
|
||
repo ContentModerationRepository,
|
||
hashCache ContentModerationHashCache,
|
||
groupRepo GroupRepository,
|
||
userRepo UserRepository,
|
||
authCacheInvalidator APIKeyAuthCacheInvalidator,
|
||
emailService *EmailService,
|
||
) *ContentModerationService {
|
||
svc := &ContentModerationService{
|
||
settingRepo: settingRepo,
|
||
repo: repo,
|
||
hashCache: hashCache,
|
||
groupRepo: groupRepo,
|
||
userRepo: userRepo,
|
||
authCacheInvalidator: authCacheInvalidator,
|
||
emailService: emailService,
|
||
httpClient: &http.Client{},
|
||
workerCount: maxContentModerationWorkerCount,
|
||
asyncQueue: make(chan contentModerationTask, maxContentModerationQueueSize),
|
||
keyHealth: make(map[string]*contentModerationKeyHealth),
|
||
}
|
||
if settingRepo != nil && repo != nil {
|
||
for i := 0; i < svc.workerCount; i++ {
|
||
go svc.worker(i)
|
||
}
|
||
go svc.cleanupWorker()
|
||
}
|
||
return svc
|
||
}
|
||
|
||
func (s *ContentModerationService) GetConfig(ctx context.Context) (*ContentModerationConfigView, error) {
|
||
cfg, err := s.loadConfig(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return s.configView(cfg), nil
|
||
}
|
||
|
||
func (s *ContentModerationService) UpdateConfig(ctx context.Context, input UpdateContentModerationConfigInput) (*ContentModerationConfigView, error) {
|
||
cfg, err := s.loadConfig(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if input.Enabled != nil {
|
||
cfg.Enabled = *input.Enabled
|
||
}
|
||
if input.Mode != nil {
|
||
cfg.Mode = strings.TrimSpace(*input.Mode)
|
||
}
|
||
if input.BaseURL != nil {
|
||
cfg.BaseURL = strings.TrimSpace(*input.BaseURL)
|
||
}
|
||
if input.Model != nil {
|
||
cfg.Model = strings.TrimSpace(*input.Model)
|
||
}
|
||
if input.TimeoutMS != nil {
|
||
cfg.TimeoutMS = *input.TimeoutMS
|
||
}
|
||
if input.SampleRate != nil {
|
||
cfg.SampleRate = *input.SampleRate
|
||
}
|
||
if input.WorkerCount != nil {
|
||
cfg.WorkerCount = *input.WorkerCount
|
||
}
|
||
if input.QueueSize != nil {
|
||
cfg.QueueSize = *input.QueueSize
|
||
}
|
||
if input.BlockStatus != nil {
|
||
cfg.BlockStatus = *input.BlockStatus
|
||
}
|
||
if input.BlockMessage != nil {
|
||
cfg.BlockMessage = strings.TrimSpace(*input.BlockMessage)
|
||
}
|
||
if input.EmailOnHit != nil {
|
||
cfg.EmailOnHit = *input.EmailOnHit
|
||
}
|
||
if input.AutoBanEnabled != nil {
|
||
cfg.AutoBanEnabled = *input.AutoBanEnabled
|
||
}
|
||
if input.BanThreshold != nil {
|
||
cfg.BanThreshold = *input.BanThreshold
|
||
}
|
||
if input.ViolationWindowHours != nil {
|
||
cfg.ViolationWindowHours = *input.ViolationWindowHours
|
||
}
|
||
if input.RetryCount != nil {
|
||
cfg.RetryCount = *input.RetryCount
|
||
}
|
||
if input.HitRetentionDays != nil {
|
||
cfg.HitRetentionDays = *input.HitRetentionDays
|
||
}
|
||
if input.NonHitRetentionDays != nil {
|
||
cfg.NonHitRetentionDays = *input.NonHitRetentionDays
|
||
}
|
||
if input.PreHashCheckEnabled != nil {
|
||
cfg.PreHashCheckEnabled = *input.PreHashCheckEnabled
|
||
}
|
||
if input.AllGroups != nil {
|
||
cfg.AllGroups = *input.AllGroups
|
||
}
|
||
if input.GroupIDs != nil {
|
||
cfg.GroupIDs = normalizeInt64IDs(*input.GroupIDs)
|
||
}
|
||
if input.RecordNonHits != nil {
|
||
cfg.RecordNonHits = *input.RecordNonHits
|
||
}
|
||
if input.ClearAPIKey {
|
||
cfg.APIKey = ""
|
||
cfg.APIKeys = []string{}
|
||
} else {
|
||
apiKeysMode := normalizeContentModerationAPIKeysMode(input.APIKeysMode)
|
||
if input.DeleteAPIKeyHashes != nil && apiKeysMode != contentModerationAPIKeysModeReplace {
|
||
cfg.APIKeys = deleteModerationAPIKeysByHash(cfg.apiKeys(), *input.DeleteAPIKeyHashes)
|
||
cfg.APIKey = ""
|
||
}
|
||
if input.APIKeys != nil {
|
||
if apiKeysMode == contentModerationAPIKeysModeReplace {
|
||
cfg.APIKeys = normalizeModerationAPIKeys(*input.APIKeys)
|
||
} else {
|
||
cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.apiKeys(), *input.APIKeys...))
|
||
}
|
||
cfg.APIKey = ""
|
||
}
|
||
if input.APIKey != nil && strings.TrimSpace(*input.APIKey) != "" {
|
||
cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.APIKeys, *input.APIKey))
|
||
cfg.APIKey = ""
|
||
}
|
||
}
|
||
if err := s.validateConfig(ctx, cfg); err != nil {
|
||
return nil, err
|
||
}
|
||
cfg.normalize()
|
||
raw, err := json.Marshal(cfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("marshal content moderation config: %w", err)
|
||
}
|
||
if err := s.settingRepo.Set(ctx, SettingKeyContentModerationConfig, string(raw)); err != nil {
|
||
return nil, fmt.Errorf("save content moderation config: %w", err)
|
||
}
|
||
return s.configView(cfg), nil
|
||
}
|
||
|
||
func (s *ContentModerationService) TestAPIKeys(ctx context.Context, input TestContentModerationAPIKeysInput) (*TestContentModerationAPIKeysResult, error) {
|
||
cfg, err := s.loadConfig(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
keys := normalizeModerationAPIKeys(input.APIKeys)
|
||
configured := false
|
||
if len(keys) == 0 {
|
||
keys = cfg.apiKeys()
|
||
configured = true
|
||
}
|
||
if strings.TrimSpace(input.BaseURL) != "" {
|
||
cfg.BaseURL = input.BaseURL
|
||
}
|
||
if strings.TrimSpace(input.Model) != "" {
|
||
cfg.Model = input.Model
|
||
}
|
||
if input.TimeoutMS > 0 {
|
||
cfg.TimeoutMS = input.TimeoutMS
|
||
}
|
||
cfg.normalize()
|
||
testInput, imageCount, err := buildModerationTestInput(input.Prompt, input.Images)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
auditOnly := contentModerationTestHasAuditInput(input.Prompt, input.Images)
|
||
if configured && auditOnly {
|
||
key, ok := s.nextUsableAPIKey(cfg)
|
||
if !ok {
|
||
return &TestContentModerationAPIKeysResult{
|
||
Items: s.apiKeyStatuses(keys),
|
||
ImageCount: imageCount,
|
||
}, nil
|
||
}
|
||
keys = []string{key}
|
||
}
|
||
if len(keys) == 0 {
|
||
return &TestContentModerationAPIKeysResult{Items: []ContentModerationAPIKeyStatus{}, ImageCount: imageCount}, nil
|
||
}
|
||
items := make([]ContentModerationAPIKeyStatus, 0, len(keys))
|
||
var auditResult *ContentModerationTestAuditResult
|
||
for idx, key := range keys {
|
||
start := time.Now()
|
||
httpStatus := 0
|
||
result, err := s.callModerationOnceWithInput(ctx, cfg, key, testInput, &httpStatus)
|
||
latency := int(time.Since(start).Milliseconds())
|
||
keyHash := moderationAPIKeyHash(key)
|
||
if err != nil {
|
||
s.markAPIKeyError(key, err.Error(), latency, httpStatus)
|
||
} else {
|
||
s.markAPIKeySuccess(key, latency, httpStatus)
|
||
if auditResult == nil {
|
||
auditResult = buildContentModerationTestAuditResult(result, cfg.Thresholds)
|
||
}
|
||
}
|
||
status := s.apiKeyStatusForHash(idx, keyHash, maskSecretTail(key), configured)
|
||
status.LastTested = true
|
||
items = append(items, status)
|
||
}
|
||
return &TestContentModerationAPIKeysResult{Items: items, AuditResult: auditResult, ImageCount: imageCount}, nil
|
||
}
|
||
|
||
func (s *ContentModerationService) Check(ctx context.Context, input ContentModerationCheckInput) (*ContentModerationDecision, error) {
|
||
allow := &ContentModerationDecision{Allowed: true, Action: ContentModerationActionAllow}
|
||
if s == nil || s.settingRepo == nil || s.repo == nil {
|
||
slog.Info("content_moderation.skip_unavailable",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol)
|
||
return allow, nil
|
||
}
|
||
if !s.isRiskControlEnabled(ctx) {
|
||
slog.Info("content_moderation.skip_feature_disabled",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol)
|
||
return allow, nil
|
||
}
|
||
cfg, err := s.loadConfig(ctx)
|
||
if err != nil {
|
||
slog.Warn("content_moderation.skip_config_load_failed",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"error", err)
|
||
return allow, nil
|
||
}
|
||
inScope := cfg.includesGroup(input.GroupID)
|
||
slog.Info("content_moderation.config_loaded",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"group_name", input.GroupName,
|
||
"endpoint", input.Endpoint,
|
||
"provider", input.Provider,
|
||
"protocol", input.Protocol,
|
||
"model", input.Model,
|
||
"enabled", cfg.Enabled,
|
||
"mode", cfg.Mode,
|
||
"all_groups", cfg.AllGroups,
|
||
"configured_group_ids", cfg.GroupIDs,
|
||
"in_scope", inScope,
|
||
"sample_rate", cfg.SampleRate,
|
||
"api_key_count", len(cfg.apiKeys()),
|
||
"pre_hash_check_enabled", cfg.PreHashCheckEnabled,
|
||
"record_non_hits", cfg.RecordNonHits)
|
||
if !cfg.Enabled {
|
||
slog.Info("content_moderation.skip_config_disabled",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol)
|
||
return allow, nil
|
||
}
|
||
if cfg.Mode == ContentModerationModeOff {
|
||
slog.Info("content_moderation.skip_mode_off",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol)
|
||
return allow, nil
|
||
}
|
||
if !inScope {
|
||
slog.Info("content_moderation.skip_group_out_of_scope",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"group_name", input.GroupName,
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"all_groups", cfg.AllGroups,
|
||
"configured_group_ids", cfg.GroupIDs)
|
||
return allow, nil
|
||
}
|
||
content := ExtractContentModerationInput(input.Protocol, input.Body)
|
||
if content.IsEmpty() {
|
||
slog.Info("content_moderation.skip_empty_input",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"body_bytes", len(input.Body))
|
||
return allow, nil
|
||
}
|
||
content.Normalize()
|
||
slog.Info("content_moderation.input_extracted",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"text_runes", len([]rune(content.Text)),
|
||
"image_count", len(content.Images))
|
||
hashText := content.Hash()
|
||
if cfg.PreHashCheckEnabled && s.hashCache != nil {
|
||
matched, err := s.hashCache.HasFlaggedInputHash(ctx, hashText)
|
||
if err != nil {
|
||
slog.Warn("content_moderation.hash_check_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err)
|
||
}
|
||
if matched {
|
||
slog.Info("content_moderation.hash_block",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"input_hash", hashText)
|
||
message := cfg.BlockMessage
|
||
if message != "" {
|
||
message = fmt.Sprintf("%s(hash: %s)", message, hashText)
|
||
}
|
||
return &ContentModerationDecision{
|
||
Allowed: false,
|
||
Blocked: true,
|
||
Flagged: true,
|
||
Message: message,
|
||
StatusCode: cfg.BlockStatus,
|
||
InputHash: hashText,
|
||
Action: ContentModerationActionHashBlock,
|
||
}, nil
|
||
}
|
||
}
|
||
if !cfg.shouldSample(hashText) {
|
||
slog.Info("content_moderation.skip_sample_rate",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"sample_rate", cfg.SampleRate)
|
||
return allow, nil
|
||
}
|
||
if len(cfg.apiKeys()) == 0 {
|
||
slog.Warn("content_moderation.skip_no_audit_api_keys",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol)
|
||
return allow, nil
|
||
}
|
||
if cfg.Mode == ContentModerationModeObserve {
|
||
slog.Info("content_moderation.enqueue_observe",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"queue_len", len(s.asyncQueue))
|
||
s.enqueueAsync(input, cfg, content, hashText)
|
||
return allow, nil
|
||
}
|
||
|
||
return s.checkSync(ctx, input, cfg, content, hashText, nil, true), nil
|
||
}
|
||
|
||
func (s *ContentModerationService) checkSync(ctx context.Context, input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string, queueDelay *int, allowBlock bool) *ContentModerationDecision {
|
||
allow := &ContentModerationDecision{Allowed: true, Action: ContentModerationActionAllow}
|
||
start := time.Now()
|
||
result, err := s.callModeration(ctx, cfg, content.ModerationInput())
|
||
latency := int(time.Since(start).Milliseconds())
|
||
if err != nil {
|
||
slog.Warn("content_moderation.audit_api_failed",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"mode", cfg.Mode,
|
||
"allow_block", allowBlock,
|
||
"queue_delay_ms", queueDelay,
|
||
"latency_ms", latency,
|
||
"error", err)
|
||
if queueDelay != nil {
|
||
s.asyncErrors.Add(1)
|
||
}
|
||
if cfg.RecordNonHits {
|
||
log := s.buildLog(input, cfg, ContentModerationActionError, false, "", 0, nil, content.ExcerptText(), &latency, queueDelay, err.Error())
|
||
_ = s.repo.CreateLog(ctx, log)
|
||
}
|
||
return allow
|
||
}
|
||
|
||
flagged, highestCategory, highestScore := evaluateModerationScores(result.CategoryScores, cfg.Thresholds)
|
||
action := ContentModerationActionAllow
|
||
blocked := false
|
||
if allowBlock && flagged && cfg.Mode == ContentModerationModePreBlock {
|
||
action = ContentModerationActionBlock
|
||
blocked = true
|
||
}
|
||
slog.Info("content_moderation.audit_result",
|
||
"user_id", input.UserID,
|
||
"api_key_id", input.APIKeyID,
|
||
"group_id", contentModerationLogGroupID(input.GroupID),
|
||
"group_name", input.GroupName,
|
||
"endpoint", input.Endpoint,
|
||
"protocol", input.Protocol,
|
||
"mode", cfg.Mode,
|
||
"allow_block", allowBlock,
|
||
"flagged", flagged,
|
||
"blocked", blocked,
|
||
"action", action,
|
||
"highest_category", highestCategory,
|
||
"highest_score", highestScore,
|
||
"latency_ms", latency,
|
||
"queue_delay_ms", queueDelay)
|
||
if flagged || cfg.RecordNonHits {
|
||
log := s.buildLog(input, cfg, action, flagged, highestCategory, highestScore, result.CategoryScores, content.ExcerptText(), &latency, queueDelay, "")
|
||
if flagged && s.hashCache != nil {
|
||
if err := s.hashCache.RecordFlaggedInputHash(ctx, hashText); err != nil {
|
||
slog.Warn("content_moderation.record_hash_failed", "user_id", input.UserID, "endpoint", input.Endpoint, "error", err)
|
||
}
|
||
}
|
||
s.applyFlaggedSideEffects(ctx, cfg, log)
|
||
_ = s.repo.CreateLog(ctx, log)
|
||
}
|
||
if blocked {
|
||
return &ContentModerationDecision{
|
||
Allowed: false,
|
||
Blocked: true,
|
||
Flagged: true,
|
||
Message: cfg.BlockMessage,
|
||
StatusCode: cfg.BlockStatus,
|
||
HighestCategory: highestCategory,
|
||
HighestScore: highestScore,
|
||
CategoryScores: result.CategoryScores,
|
||
Action: action,
|
||
}
|
||
}
|
||
return &ContentModerationDecision{
|
||
Allowed: true,
|
||
Flagged: flagged,
|
||
Message: "",
|
||
HighestCategory: highestCategory,
|
||
HighestScore: highestScore,
|
||
CategoryScores: result.CategoryScores,
|
||
Action: action,
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) enqueueAsync(input ContentModerationCheckInput, cfg *ContentModerationConfig, content ContentModerationInput, hashText string) {
|
||
if s == nil || s.asyncQueue == nil {
|
||
return
|
||
}
|
||
queueSize := defaultContentModerationQueueSize
|
||
if cfg != nil && cfg.QueueSize > 0 {
|
||
queueSize = cfg.QueueSize
|
||
}
|
||
if len(s.asyncQueue) >= queueSize {
|
||
slog.Warn("content_moderation.async_queue_full", "user_id", input.UserID, "endpoint", input.Endpoint, "queue_size", queueSize)
|
||
s.asyncDropped.Add(1)
|
||
return
|
||
}
|
||
task := contentModerationTask{
|
||
input: input,
|
||
content: content,
|
||
inputHash: hashText,
|
||
enqueuedAt: time.Now(),
|
||
}
|
||
select {
|
||
case s.asyncQueue <- task:
|
||
s.asyncEnqueued.Add(1)
|
||
default:
|
||
slog.Warn("content_moderation.async_queue_full", "user_id", input.UserID, "endpoint", input.Endpoint)
|
||
s.asyncDropped.Add(1)
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) worker(id int) {
|
||
for {
|
||
ctx, cancel := context.WithTimeout(context.Background(), maxContentModerationTimeoutMS*time.Millisecond+10*time.Second)
|
||
cfg, err := s.loadConfig(ctx)
|
||
if err != nil || !cfg.Enabled || cfg.Mode == ContentModerationModeOff || len(cfg.apiKeys()) == 0 || id >= cfg.WorkerCount {
|
||
cancel()
|
||
time.Sleep(time.Second)
|
||
continue
|
||
}
|
||
task, ok := s.dequeueAsyncTask(ctx, time.Second)
|
||
if !ok {
|
||
cancel()
|
||
continue
|
||
}
|
||
func() {
|
||
defer cancel()
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
slog.Error("content_moderation.worker_panic", "worker_id", id, "recover", r)
|
||
}
|
||
}()
|
||
if !cfg.includesGroup(task.input.GroupID) {
|
||
return
|
||
}
|
||
s.asyncActive.Add(1)
|
||
defer s.asyncActive.Add(-1)
|
||
queueDelay := int(time.Since(task.enqueuedAt).Milliseconds())
|
||
_ = s.checkSync(ctx, task.input, cfg, task.content, task.inputHash, &queueDelay, false)
|
||
s.asyncProcessed.Add(1)
|
||
}()
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) dequeueAsyncTask(ctx context.Context, idleWait time.Duration) (contentModerationTask, bool) {
|
||
var zero contentModerationTask
|
||
if s == nil || s.asyncQueue == nil {
|
||
return zero, false
|
||
}
|
||
if idleWait <= 0 {
|
||
idleWait = time.Second
|
||
}
|
||
timer := time.NewTimer(idleWait)
|
||
defer timer.Stop()
|
||
select {
|
||
case task, ok := <-s.asyncQueue:
|
||
return task, ok
|
||
case <-ctx.Done():
|
||
return zero, false
|
||
case <-timer.C:
|
||
return zero, false
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) ListLogs(ctx context.Context, filter ContentModerationLogFilter) ([]ContentModerationLog, *pagination.PaginationResult, error) {
|
||
if filter.Pagination.Page <= 0 {
|
||
filter.Pagination.Page = 1
|
||
}
|
||
if filter.Pagination.PageSize <= 0 {
|
||
filter.Pagination.PageSize = 20
|
||
}
|
||
if filter.Pagination.PageSize > 100 {
|
||
filter.Pagination.PageSize = 100
|
||
}
|
||
if filter.Pagination.SortOrder == "" {
|
||
filter.Pagination.SortOrder = pagination.SortOrderDesc
|
||
}
|
||
return s.repo.ListLogs(ctx, filter)
|
||
}
|
||
|
||
func (s *ContentModerationService) UnbanUser(ctx context.Context, userID int64) (*ContentModerationUnbanUserResult, error) {
|
||
if s == nil || s.userRepo == nil {
|
||
return nil, infraerrors.InternalServer("CONTENT_MODERATION_USER_REPOSITORY_UNAVAILABLE", "用户仓储不可用")
|
||
}
|
||
if userID <= 0 {
|
||
return nil, infraerrors.BadRequest("INVALID_USER_ID", "用户 ID 无效")
|
||
}
|
||
user, err := s.userRepo.GetByID(ctx, userID)
|
||
if err != nil {
|
||
if errors.Is(err, ErrUserNotFound) {
|
||
return nil, infraerrors.NotFound("USER_NOT_FOUND", "用户不存在")
|
||
}
|
||
return nil, fmt.Errorf("get content moderation unban user: %w", err)
|
||
}
|
||
if user.Status != StatusActive {
|
||
user.Status = StatusActive
|
||
if err := s.userRepo.Update(ctx, user); err != nil {
|
||
return nil, fmt.Errorf("update content moderation unban user: %w", err)
|
||
}
|
||
}
|
||
if s.authCacheInvalidator != nil {
|
||
s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, userID)
|
||
}
|
||
return &ContentModerationUnbanUserResult{
|
||
UserID: userID,
|
||
Status: StatusActive,
|
||
}, nil
|
||
}
|
||
|
||
func (s *ContentModerationService) DeleteFlaggedInputHash(ctx context.Context, inputHash string) (*ContentModerationDeleteHashResult, error) {
|
||
inputHash = normalizeContentModerationHash(inputHash)
|
||
if inputHash == "" {
|
||
return nil, infraerrors.BadRequest("INVALID_CONTENT_MODERATION_HASH", "风险输入哈希无效")
|
||
}
|
||
if s == nil || s.hashCache == nil {
|
||
return nil, infraerrors.InternalServer("CONTENT_MODERATION_HASH_CACHE_UNAVAILABLE", "内容审计哈希缓存不可用")
|
||
}
|
||
deleted, err := s.hashCache.DeleteFlaggedInputHash(ctx, inputHash)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("delete content moderation flagged hash: %w", err)
|
||
}
|
||
return &ContentModerationDeleteHashResult{
|
||
InputHash: inputHash,
|
||
Deleted: deleted,
|
||
}, nil
|
||
}
|
||
|
||
func (s *ContentModerationService) ClearFlaggedInputHashes(ctx context.Context) (*ContentModerationClearHashesResult, error) {
|
||
if s == nil || s.hashCache == nil {
|
||
return nil, infraerrors.InternalServer("CONTENT_MODERATION_HASH_CACHE_UNAVAILABLE", "内容审计哈希缓存不可用")
|
||
}
|
||
deleted, err := s.hashCache.ClearFlaggedInputHashes(ctx)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("clear content moderation flagged hashes: %w", err)
|
||
}
|
||
return &ContentModerationClearHashesResult{Deleted: deleted}, nil
|
||
}
|
||
|
||
func (s *ContentModerationService) GetStatus(ctx context.Context) (*ContentModerationRuntimeStatus, error) {
|
||
if s == nil {
|
||
return &ContentModerationRuntimeStatus{}, nil
|
||
}
|
||
cfg, err := s.loadConfig(ctx)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
riskEnabled := s.isRiskControlEnabled(ctx)
|
||
active := int(s.asyncActive.Load())
|
||
if active < 0 {
|
||
active = 0
|
||
}
|
||
if active > cfg.WorkerCount {
|
||
active = cfg.WorkerCount
|
||
}
|
||
queueLength := 0
|
||
if s.asyncQueue != nil {
|
||
queueLength = len(s.asyncQueue)
|
||
}
|
||
queueUsage := 0.0
|
||
if cfg.QueueSize > 0 {
|
||
queueUsage = float64(queueLength) * 100 / float64(cfg.QueueSize)
|
||
}
|
||
var flaggedHashCount int64
|
||
if s.hashCache != nil {
|
||
if n, err := s.hashCache.CountFlaggedInputHashes(ctx); err == nil {
|
||
flaggedHashCount = n
|
||
} else {
|
||
slog.Warn("content_moderation.hash_count_failed", "error", err)
|
||
}
|
||
}
|
||
var lastCleanupAt *time.Time
|
||
if unix := s.lastCleanupUnix.Load(); unix > 0 {
|
||
t := time.Unix(unix, 0)
|
||
lastCleanupAt = &t
|
||
}
|
||
return &ContentModerationRuntimeStatus{
|
||
Enabled: cfg.Enabled,
|
||
RiskControlEnabled: riskEnabled,
|
||
Mode: cfg.Mode,
|
||
WorkerCount: cfg.WorkerCount,
|
||
MaxWorkers: maxContentModerationWorkerCount,
|
||
ActiveWorkers: active,
|
||
IdleWorkers: cfg.WorkerCount - active,
|
||
QueueSize: cfg.QueueSize,
|
||
QueueLength: queueLength,
|
||
QueueUsagePercent: queueUsage,
|
||
Enqueued: s.asyncEnqueued.Load(),
|
||
Dropped: s.asyncDropped.Load(),
|
||
Processed: s.asyncProcessed.Load(),
|
||
Errors: s.asyncErrors.Load(),
|
||
APIKeyStatuses: s.apiKeyStatuses(cfg.apiKeys()),
|
||
FlaggedHashCount: flaggedHashCount,
|
||
LastCleanupAt: lastCleanupAt,
|
||
LastCleanupDeletedHit: s.lastCleanupDeletedHit.Load(),
|
||
LastCleanupDeletedNonHit: s.lastCleanupDeletedNonHit.Load(),
|
||
}, nil
|
||
}
|
||
|
||
func (s *ContentModerationService) cleanupWorker() {
|
||
timer := time.NewTimer(contentModerationCleanupDelay)
|
||
defer timer.Stop()
|
||
for {
|
||
<-timer.C
|
||
s.runCleanupOnce()
|
||
timer.Reset(contentModerationCleanupInterval)
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) runCleanupOnce() {
|
||
if s == nil || s.repo == nil || s.settingRepo == nil {
|
||
return
|
||
}
|
||
ctx, cancel := context.WithTimeout(context.Background(), contentModerationCleanupTimeout)
|
||
defer cancel()
|
||
cfg, err := s.loadConfig(ctx)
|
||
if err != nil {
|
||
slog.Warn("content_moderation.cleanup_load_config_failed", "error", err)
|
||
return
|
||
}
|
||
now := time.Now()
|
||
hitBefore := now.AddDate(0, 0, -cfg.HitRetentionDays)
|
||
nonHitBefore := now.AddDate(0, 0, -cfg.NonHitRetentionDays)
|
||
result, err := s.repo.CleanupExpiredLogs(ctx, hitBefore, nonHitBefore)
|
||
if err != nil {
|
||
slog.Warn("content_moderation.cleanup_failed", "error", err)
|
||
return
|
||
}
|
||
if result == nil {
|
||
return
|
||
}
|
||
s.lastCleanupUnix.Store(result.FinishedAt.Unix())
|
||
s.lastCleanupDeletedHit.Store(result.DeletedHit)
|
||
s.lastCleanupDeletedNonHit.Store(result.DeletedNonHit)
|
||
}
|
||
|
||
func (s *ContentModerationService) loadConfig(ctx context.Context) (*ContentModerationConfig, error) {
|
||
cfg := defaultContentModerationConfig()
|
||
raw, err := s.settingRepo.GetValue(ctx, SettingKeyContentModerationConfig)
|
||
if err != nil {
|
||
if errors.Is(err, ErrSettingNotFound) {
|
||
cfg.normalize()
|
||
return cfg, nil
|
||
}
|
||
return nil, fmt.Errorf("get content moderation config: %w", err)
|
||
}
|
||
if strings.TrimSpace(raw) == "" {
|
||
cfg.normalize()
|
||
return cfg, nil
|
||
}
|
||
if err := json.Unmarshal([]byte(raw), cfg); err != nil {
|
||
return nil, infraerrors.BadRequest("INVALID_CONTENT_MODERATION_CONFIG", "内容审计配置不是有效 JSON")
|
||
}
|
||
cfg.normalize()
|
||
return cfg, nil
|
||
}
|
||
|
||
func (s *ContentModerationService) isRiskControlEnabled(ctx context.Context) bool {
|
||
raw, err := s.settingRepo.GetValue(ctx, SettingKeyRiskControlEnabled)
|
||
if err != nil {
|
||
return false
|
||
}
|
||
return raw == "true"
|
||
}
|
||
|
||
func (s *ContentModerationService) validateConfig(ctx context.Context, cfg *ContentModerationConfig) error {
|
||
if cfg == nil {
|
||
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_CONFIG", "内容审计配置不能为空")
|
||
}
|
||
cfg.normalize()
|
||
switch cfg.Mode {
|
||
case ContentModerationModeOff, ContentModerationModeObserve, ContentModerationModePreBlock:
|
||
default:
|
||
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_MODE", "内容审计模式无效")
|
||
}
|
||
if _, err := url.ParseRequestURI(cfg.BaseURL); err != nil {
|
||
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_BASE_URL", "OpenAI Base URL 无效")
|
||
}
|
||
if cfg.BlockStatus < 400 || cfg.BlockStatus > 599 {
|
||
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_BLOCK_STATUS", "拦截 HTTP 状态码必须在 400-599 之间")
|
||
}
|
||
if !cfg.AllGroups && len(cfg.GroupIDs) > 0 && s.groupRepo != nil {
|
||
for _, groupID := range cfg.GroupIDs {
|
||
if _, err := s.groupRepo.GetByIDLite(ctx, groupID); err != nil {
|
||
return infraerrors.BadRequest("INVALID_CONTENT_MODERATION_GROUP", fmt.Sprintf("审计分组不存在: %d", groupID))
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *ContentModerationService) callModeration(ctx context.Context, cfg *ContentModerationConfig, input any) (*moderationAPIResult, error) {
|
||
attempts := cfg.RetryCount + 1
|
||
if attempts <= 0 {
|
||
attempts = 1
|
||
}
|
||
if attempts > maxContentModerationRetryCount+1 {
|
||
attempts = maxContentModerationRetryCount + 1
|
||
}
|
||
var lastErr error
|
||
for attempt := 0; attempt < attempts; attempt++ {
|
||
key, ok := s.nextUsableAPIKey(cfg)
|
||
if !ok {
|
||
lastErr = errors.New("no moderation api key available")
|
||
break
|
||
}
|
||
start := time.Now()
|
||
httpStatus := 0
|
||
result, err := s.callModerationOnceWithInput(ctx, cfg, key, input, &httpStatus)
|
||
latency := int(time.Since(start).Milliseconds())
|
||
if err == nil {
|
||
s.markAPIKeySuccess(key, latency, httpStatus)
|
||
return result, nil
|
||
}
|
||
s.markAPIKeyError(key, err.Error(), latency, httpStatus)
|
||
lastErr = err
|
||
if httpStatus == http.StatusBadRequest {
|
||
break
|
||
}
|
||
if attempt == attempts-1 {
|
||
break
|
||
}
|
||
wait := time.Duration(100*(attempt+1)) * time.Millisecond
|
||
select {
|
||
case <-ctx.Done():
|
||
return nil, ctx.Err()
|
||
case <-time.After(wait):
|
||
}
|
||
}
|
||
return nil, lastErr
|
||
}
|
||
|
||
func (s *ContentModerationService) callModerationOnceWithInput(ctx context.Context, cfg *ContentModerationConfig, apiKey string, input any, httpStatus *int) (*moderationAPIResult, error) {
|
||
base := strings.TrimRight(cfg.BaseURL, "/")
|
||
endpoint, err := url.JoinPath(base, "/v1/moderations")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
payload := moderationAPIRequest{
|
||
Model: cfg.Model,
|
||
Input: input,
|
||
}
|
||
raw, err := json.Marshal(payload)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
timeout := time.Duration(cfg.TimeoutMS) * time.Millisecond
|
||
reqCtx, cancel := context.WithTimeout(ctx, timeout)
|
||
defer cancel()
|
||
|
||
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, endpoint, bytes.NewReader(raw))
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
req.Header.Set("Authorization", "Bearer "+apiKey)
|
||
req.Header.Set("Content-Type", "application/json")
|
||
|
||
client := s.httpClient
|
||
if client == nil {
|
||
client = http.DefaultClient
|
||
}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer func() { _ = resp.Body.Close() }()
|
||
if httpStatus != nil {
|
||
*httpStatus = resp.StatusCode
|
||
}
|
||
|
||
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||
body, _ := io.ReadAll(io.LimitReader(resp.Body, 512))
|
||
return nil, fmt.Errorf("moderation api status %d: %s", resp.StatusCode, strings.TrimSpace(string(body)))
|
||
}
|
||
var out moderationAPIResponse
|
||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||
return nil, err
|
||
}
|
||
if len(out.Results) == 0 {
|
||
return nil, errors.New("moderation api returned empty results")
|
||
}
|
||
return &out.Results[0], nil
|
||
}
|
||
|
||
func (s *ContentModerationService) buildLog(input ContentModerationCheckInput, cfg *ContentModerationConfig, action string, flagged bool, highestCategory string, highestScore float64, scores map[string]float64, text string, latency *int, queueDelay *int, errText string) *ContentModerationLog {
|
||
var userID *int64
|
||
if input.UserID > 0 {
|
||
userID = &input.UserID
|
||
}
|
||
var apiKeyID *int64
|
||
if input.APIKeyID > 0 {
|
||
apiKeyID = &input.APIKeyID
|
||
}
|
||
return &ContentModerationLog{
|
||
RequestID: input.RequestID,
|
||
UserID: userID,
|
||
UserEmail: input.UserEmail,
|
||
APIKeyID: apiKeyID,
|
||
APIKeyName: input.APIKeyName,
|
||
GroupID: cloneInt64Ptr(input.GroupID),
|
||
GroupName: input.GroupName,
|
||
Endpoint: input.Endpoint,
|
||
Provider: input.Provider,
|
||
Model: input.Model,
|
||
Mode: cfg.Mode,
|
||
Action: action,
|
||
Flagged: flagged,
|
||
HighestCategory: highestCategory,
|
||
HighestScore: highestScore,
|
||
CategoryScores: cloneFloatMap(scores),
|
||
ThresholdSnapshot: cloneFloatMap(cfg.Thresholds),
|
||
InputExcerpt: trimRunes(redactContentModerationSecrets(text), maxModerationExcerptRunes),
|
||
UpstreamLatencyMS: latency,
|
||
QueueDelayMS: queueDelay,
|
||
Error: errText,
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) applyFlaggedSideEffects(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) {
|
||
if s == nil || cfg == nil || log == nil || !log.Flagged || log.UserID == nil || *log.UserID <= 0 {
|
||
return
|
||
}
|
||
count := 1
|
||
if s.repo != nil && cfg.ViolationWindowHours > 0 {
|
||
since := time.Now().Add(-time.Duration(cfg.ViolationWindowHours) * time.Hour)
|
||
if n, err := s.repo.CountFlaggedByUserSince(ctx, *log.UserID, since); err == nil {
|
||
count = n + 1
|
||
}
|
||
}
|
||
log.ViolationCount = count
|
||
autoBanJustApplied := false
|
||
if cfg.AutoBanEnabled && cfg.BanThreshold > 0 && count >= cfg.BanThreshold && s.userRepo != nil {
|
||
user, err := s.userRepo.GetByID(ctx, *log.UserID)
|
||
if err != nil {
|
||
slog.Warn("content_moderation.ban_get_user_failed", "user_id", *log.UserID, "error", err)
|
||
return
|
||
}
|
||
if user.Status != StatusDisabled {
|
||
user.Status = StatusDisabled
|
||
if err := s.userRepo.Update(ctx, user); err != nil {
|
||
slog.Warn("content_moderation.ban_update_user_failed", "user_id", *log.UserID, "error", err)
|
||
return
|
||
}
|
||
if s.authCacheInvalidator != nil {
|
||
s.authCacheInvalidator.InvalidateAuthCacheByUserID(ctx, *log.UserID)
|
||
}
|
||
autoBanJustApplied = true
|
||
}
|
||
log.AutoBanned = true
|
||
}
|
||
|
||
if s.emailService == nil || strings.TrimSpace(log.UserEmail) == "" {
|
||
return
|
||
}
|
||
emailSent := false
|
||
if cfg.EmailOnHit {
|
||
if err := s.sendViolationEmail(ctx, cfg, log); err != nil {
|
||
slog.Warn("content_moderation.email_failed", "user_id", *log.UserID, "email", log.UserEmail, "error", err)
|
||
} else {
|
||
emailSent = true
|
||
}
|
||
}
|
||
if autoBanJustApplied {
|
||
if err := s.sendAccountDisabledEmail(ctx, cfg, log); err != nil {
|
||
slog.Warn("content_moderation.ban_email_failed", "user_id", *log.UserID, "email", log.UserEmail, "error", err)
|
||
} else {
|
||
emailSent = true
|
||
}
|
||
}
|
||
log.EmailSent = emailSent
|
||
}
|
||
|
||
func (s *ContentModerationService) sendViolationEmail(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) error {
|
||
siteName := s.siteName(ctx)
|
||
subject := fmt.Sprintf("[%s] 账户风控提醒 / Risk Control Notice", sanitizeEmailHeader(siteName))
|
||
body := buildContentModerationViolationEmailBody(siteName, log, cfg)
|
||
return s.emailService.SendEmail(ctx, log.UserEmail, subject, body)
|
||
}
|
||
|
||
func (s *ContentModerationService) sendAccountDisabledEmail(ctx context.Context, cfg *ContentModerationConfig, log *ContentModerationLog) error {
|
||
siteName := s.siteName(ctx)
|
||
subject := fmt.Sprintf("[%s] 账户已被禁用 / Account Disabled", sanitizeEmailHeader(siteName))
|
||
body := buildContentModerationAccountDisabledEmailBody(siteName, log, cfg)
|
||
return s.emailService.SendEmail(ctx, log.UserEmail, subject, body)
|
||
}
|
||
|
||
func (s *ContentModerationService) siteName(ctx context.Context) string {
|
||
if s == nil || s.settingRepo == nil {
|
||
return "Sub2API"
|
||
}
|
||
name, err := s.settingRepo.GetValue(ctx, SettingKeySiteName)
|
||
if err != nil || strings.TrimSpace(name) == "" {
|
||
return "Sub2API"
|
||
}
|
||
return strings.TrimSpace(name)
|
||
}
|
||
|
||
func defaultContentModerationConfig() *ContentModerationConfig {
|
||
return &ContentModerationConfig{
|
||
Enabled: false,
|
||
Mode: ContentModerationModePreBlock,
|
||
BaseURL: defaultContentModerationBaseURL,
|
||
Model: defaultContentModerationModel,
|
||
TimeoutMS: defaultContentModerationTimeoutMS,
|
||
SampleRate: 100,
|
||
AllGroups: true,
|
||
GroupIDs: []int64{},
|
||
RecordNonHits: false,
|
||
Thresholds: ContentModerationDefaultThresholds(),
|
||
WorkerCount: defaultContentModerationWorkerCount,
|
||
QueueSize: defaultContentModerationQueueSize,
|
||
BlockStatus: defaultContentModerationBlockHTTPStatus,
|
||
BlockMessage: defaultContentModerationBlockMessage,
|
||
EmailOnHit: true,
|
||
AutoBanEnabled: true,
|
||
BanThreshold: defaultContentModerationBanThreshold,
|
||
ViolationWindowHours: defaultContentModerationViolationWindowHours,
|
||
RetryCount: defaultContentModerationRetryCount,
|
||
HitRetentionDays: defaultContentModerationHitRetentionDays,
|
||
NonHitRetentionDays: defaultContentModerationNonHitRetentionDays,
|
||
PreHashCheckEnabled: false,
|
||
}
|
||
}
|
||
|
||
func (cfg *ContentModerationConfig) normalize() {
|
||
if cfg.APIKey != "" {
|
||
cfg.APIKeys = normalizeModerationAPIKeys(append(cfg.APIKeys, cfg.APIKey))
|
||
cfg.APIKey = ""
|
||
} else {
|
||
cfg.APIKeys = normalizeModerationAPIKeys(cfg.APIKeys)
|
||
}
|
||
if cfg.Mode == "" {
|
||
cfg.Mode = ContentModerationModePreBlock
|
||
}
|
||
if cfg.BaseURL == "" {
|
||
cfg.BaseURL = defaultContentModerationBaseURL
|
||
}
|
||
cfg.BaseURL = strings.TrimRight(strings.TrimSpace(cfg.BaseURL), "/")
|
||
if cfg.Model == "" {
|
||
cfg.Model = defaultContentModerationModel
|
||
}
|
||
cfg.Model = strings.TrimSpace(cfg.Model)
|
||
if cfg.TimeoutMS <= 0 {
|
||
cfg.TimeoutMS = defaultContentModerationTimeoutMS
|
||
}
|
||
if cfg.TimeoutMS > maxContentModerationTimeoutMS {
|
||
cfg.TimeoutMS = maxContentModerationTimeoutMS
|
||
}
|
||
if cfg.SampleRate < 0 {
|
||
cfg.SampleRate = 0
|
||
}
|
||
if cfg.SampleRate > 100 {
|
||
cfg.SampleRate = 100
|
||
}
|
||
if cfg.WorkerCount <= 0 {
|
||
cfg.WorkerCount = defaultContentModerationWorkerCount
|
||
}
|
||
if cfg.WorkerCount > maxContentModerationWorkerCount {
|
||
cfg.WorkerCount = maxContentModerationWorkerCount
|
||
}
|
||
if cfg.QueueSize <= 0 {
|
||
cfg.QueueSize = defaultContentModerationQueueSize
|
||
}
|
||
if cfg.QueueSize > maxContentModerationQueueSize {
|
||
cfg.QueueSize = maxContentModerationQueueSize
|
||
}
|
||
if strings.TrimSpace(cfg.BlockMessage) == "" {
|
||
cfg.BlockMessage = defaultContentModerationBlockMessage
|
||
}
|
||
cfg.BlockMessage = strings.TrimSpace(cfg.BlockMessage)
|
||
if cfg.BlockStatus <= 0 {
|
||
cfg.BlockStatus = defaultContentModerationBlockHTTPStatus
|
||
}
|
||
if cfg.BanThreshold <= 0 {
|
||
cfg.BanThreshold = defaultContentModerationBanThreshold
|
||
}
|
||
if cfg.ViolationWindowHours <= 0 {
|
||
cfg.ViolationWindowHours = defaultContentModerationViolationWindowHours
|
||
}
|
||
if cfg.RetryCount < 0 {
|
||
cfg.RetryCount = 0
|
||
}
|
||
if cfg.RetryCount > maxContentModerationRetryCount {
|
||
cfg.RetryCount = maxContentModerationRetryCount
|
||
}
|
||
if cfg.HitRetentionDays <= 0 {
|
||
cfg.HitRetentionDays = defaultContentModerationHitRetentionDays
|
||
}
|
||
if cfg.HitRetentionDays > maxContentModerationRetentionDays {
|
||
cfg.HitRetentionDays = maxContentModerationRetentionDays
|
||
}
|
||
if cfg.NonHitRetentionDays <= 0 {
|
||
cfg.NonHitRetentionDays = defaultContentModerationNonHitRetentionDays
|
||
}
|
||
if cfg.NonHitRetentionDays > maxContentModerationNonHitRetentionDays {
|
||
cfg.NonHitRetentionDays = maxContentModerationNonHitRetentionDays
|
||
}
|
||
cfg.GroupIDs = normalizeInt64IDs(cfg.GroupIDs)
|
||
cfg.Thresholds = mergeContentModerationThresholds(ContentModerationDefaultThresholds(), cfg.Thresholds)
|
||
}
|
||
|
||
func (cfg *ContentModerationConfig) includesGroup(groupID *int64) bool {
|
||
if cfg.AllGroups {
|
||
return true
|
||
}
|
||
if groupID == nil {
|
||
return false
|
||
}
|
||
for _, id := range cfg.GroupIDs {
|
||
if id == *groupID {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func contentModerationLogGroupID(groupID *int64) int64 {
|
||
if groupID == nil {
|
||
return 0
|
||
}
|
||
return *groupID
|
||
}
|
||
|
||
func (cfg *ContentModerationConfig) shouldSample(hashText string) bool {
|
||
if cfg.SampleRate >= 100 {
|
||
return true
|
||
}
|
||
if cfg.SampleRate <= 0 {
|
||
return false
|
||
}
|
||
raw, err := hex.DecodeString(hashText)
|
||
if err != nil || len(raw) < 2 {
|
||
return true
|
||
}
|
||
return int(binary.BigEndian.Uint16(raw[:2])%100) < cfg.SampleRate
|
||
}
|
||
|
||
func (cfg *ContentModerationConfig) apiKeys() []string {
|
||
if cfg == nil {
|
||
return nil
|
||
}
|
||
return normalizeModerationAPIKeys(cfg.APIKeys)
|
||
}
|
||
|
||
func (s *ContentModerationService) nextUsableAPIKey(cfg *ContentModerationConfig) (string, bool) {
|
||
keys := cfg.apiKeys()
|
||
if len(keys) == 0 {
|
||
return "", false
|
||
}
|
||
now := time.Now()
|
||
for i := 0; i < len(keys); i++ {
|
||
idx := int(s.apiKeyCursor.Add(1)-1) % len(keys)
|
||
key := keys[idx]
|
||
if !s.isAPIKeyFrozen(key, now) {
|
||
return key, true
|
||
}
|
||
}
|
||
return "", false
|
||
}
|
||
|
||
func (s *ContentModerationService) isAPIKeyFrozen(key string, now time.Time) bool {
|
||
hash := moderationAPIKeyHash(key)
|
||
if hash == "" || s == nil {
|
||
return false
|
||
}
|
||
s.keyHealthMu.Lock()
|
||
defer s.keyHealthMu.Unlock()
|
||
state := s.keyHealth[hash]
|
||
return state != nil && state.FrozenUntil.After(now)
|
||
}
|
||
|
||
func (s *ContentModerationService) markAPIKeySuccess(key string, latencyMS int, httpStatus int) {
|
||
hash := moderationAPIKeyHash(key)
|
||
if hash == "" || s == nil {
|
||
return
|
||
}
|
||
s.keyHealthMu.Lock()
|
||
defer s.keyHealthMu.Unlock()
|
||
state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key))
|
||
state.FailureCount = 0
|
||
state.SuccessCount++
|
||
state.LastError = ""
|
||
state.LastCheckedAt = time.Now()
|
||
state.FrozenUntil = time.Time{}
|
||
state.LastLatencyMS = latencyMS
|
||
state.LastHTTPStatus = httpStatus
|
||
state.LastTested = true
|
||
}
|
||
|
||
func (s *ContentModerationService) markAPIKeyError(key string, errText string, latencyMS int, httpStatus int) {
|
||
hash := moderationAPIKeyHash(key)
|
||
if hash == "" || s == nil {
|
||
return
|
||
}
|
||
s.keyHealthMu.Lock()
|
||
defer s.keyHealthMu.Unlock()
|
||
state := s.ensureAPIKeyHealthLocked(hash, maskSecretTail(key))
|
||
if contentModerationFreezeDurationForHTTPStatus(httpStatus) > 0 {
|
||
state.FailureCount++
|
||
}
|
||
state.LastError = trimRunes(errText, 180)
|
||
state.LastCheckedAt = time.Now()
|
||
state.LastLatencyMS = latencyMS
|
||
state.LastHTTPStatus = httpStatus
|
||
state.LastTested = true
|
||
if freezeDuration := contentModerationFreezeDurationForHTTPStatus(httpStatus); freezeDuration > 0 {
|
||
state.FrozenUntil = time.Now().Add(freezeDuration)
|
||
}
|
||
}
|
||
|
||
func contentModerationFreezeDurationForHTTPStatus(httpStatus int) time.Duration {
|
||
switch httpStatus {
|
||
case 0, http.StatusBadRequest:
|
||
return 0
|
||
case http.StatusUnauthorized, http.StatusForbidden:
|
||
return contentModerationKeyAuthFreezeDuration
|
||
case http.StatusTooManyRequests, 529:
|
||
return contentModerationKeyRateLimitFreezeDuration
|
||
default:
|
||
return contentModerationKeyHTTPErrorFreezeDuration
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) ensureAPIKeyHealthLocked(hash string, masked string) *contentModerationKeyHealth {
|
||
if s.keyHealth == nil {
|
||
s.keyHealth = make(map[string]*contentModerationKeyHealth)
|
||
}
|
||
state := s.keyHealth[hash]
|
||
if state == nil {
|
||
state = &contentModerationKeyHealth{Hash: hash}
|
||
s.keyHealth[hash] = state
|
||
}
|
||
if strings.TrimSpace(masked) != "" {
|
||
state.Masked = masked
|
||
}
|
||
return state
|
||
}
|
||
|
||
func (s *ContentModerationService) configView(cfg *ContentModerationConfig) *ContentModerationConfigView {
|
||
keys := cfg.apiKeys()
|
||
masks := make([]string, 0, len(keys))
|
||
for _, key := range keys {
|
||
masks = append(masks, maskSecretTail(key))
|
||
}
|
||
apiKeyMasked := ""
|
||
if len(masks) > 0 {
|
||
apiKeyMasked = masks[0]
|
||
}
|
||
return &ContentModerationConfigView{
|
||
Enabled: cfg.Enabled,
|
||
Mode: cfg.Mode,
|
||
BaseURL: cfg.BaseURL,
|
||
Model: cfg.Model,
|
||
APIKeyConfigured: len(keys) > 0,
|
||
APIKeyMasked: apiKeyMasked,
|
||
APIKeyCount: len(keys),
|
||
APIKeyMasks: masks,
|
||
APIKeyStatuses: s.apiKeyStatuses(keys),
|
||
TimeoutMS: cfg.TimeoutMS,
|
||
SampleRate: cfg.SampleRate,
|
||
AllGroups: cfg.AllGroups,
|
||
GroupIDs: append([]int64(nil), cfg.GroupIDs...),
|
||
RecordNonHits: cfg.RecordNonHits,
|
||
WorkerCount: cfg.WorkerCount,
|
||
QueueSize: cfg.QueueSize,
|
||
BlockStatus: cfg.BlockStatus,
|
||
BlockMessage: cfg.BlockMessage,
|
||
EmailOnHit: cfg.EmailOnHit,
|
||
AutoBanEnabled: cfg.AutoBanEnabled,
|
||
BanThreshold: cfg.BanThreshold,
|
||
ViolationWindowHours: cfg.ViolationWindowHours,
|
||
RetryCount: cfg.RetryCount,
|
||
HitRetentionDays: cfg.HitRetentionDays,
|
||
NonHitRetentionDays: cfg.NonHitRetentionDays,
|
||
PreHashCheckEnabled: cfg.PreHashCheckEnabled,
|
||
}
|
||
}
|
||
|
||
func (s *ContentModerationService) apiKeyStatuses(keys []string) []ContentModerationAPIKeyStatus {
|
||
out := make([]ContentModerationAPIKeyStatus, 0, len(keys))
|
||
for idx, key := range keys {
|
||
out = append(out, s.apiKeyStatusForHash(idx, moderationAPIKeyHash(key), maskSecretTail(key), true))
|
||
}
|
||
return out
|
||
}
|
||
|
||
func (s *ContentModerationService) apiKeyStatusForHash(index int, hash string, masked string, configured bool) ContentModerationAPIKeyStatus {
|
||
status := ContentModerationAPIKeyStatus{
|
||
Index: index,
|
||
KeyHash: hash,
|
||
Masked: masked,
|
||
Status: "unknown",
|
||
Configured: configured,
|
||
}
|
||
if hash == "" || s == nil {
|
||
return status
|
||
}
|
||
now := time.Now()
|
||
s.keyHealthMu.Lock()
|
||
defer s.keyHealthMu.Unlock()
|
||
state := s.keyHealth[hash]
|
||
if state == nil {
|
||
return status
|
||
}
|
||
status.FailureCount = state.FailureCount
|
||
status.SuccessCount = state.SuccessCount
|
||
status.LastError = state.LastError
|
||
status.LastLatencyMS = state.LastLatencyMS
|
||
status.LastHTTPStatus = state.LastHTTPStatus
|
||
status.LastTested = state.LastTested
|
||
if !state.LastCheckedAt.IsZero() {
|
||
t := state.LastCheckedAt
|
||
status.LastCheckedAt = &t
|
||
}
|
||
if state.FrozenUntil.After(now) {
|
||
t := state.FrozenUntil
|
||
status.FrozenUntil = &t
|
||
status.Status = "frozen"
|
||
return status
|
||
}
|
||
if state.LastError != "" {
|
||
status.Status = "error"
|
||
return status
|
||
}
|
||
if state.SuccessCount > 0 || state.LastTested {
|
||
status.Status = "ok"
|
||
}
|
||
return status
|
||
}
|
||
|
||
func moderationAPIKeyHash(key string) string {
|
||
key = strings.TrimSpace(key)
|
||
if key == "" {
|
||
return ""
|
||
}
|
||
sum := sha256.Sum256([]byte(key))
|
||
return hex.EncodeToString(sum[:])
|
||
}
|
||
|
||
func buildModerationTestInput(prompt string, images []string) (any, int, error) {
|
||
prompt = trimRunes(normalizeContentModerationText(prompt), maxModerationInputRunes)
|
||
normalizedImages := make([]string, 0, len(images))
|
||
for _, image := range images {
|
||
image = strings.TrimSpace(image)
|
||
if image == "" {
|
||
continue
|
||
}
|
||
if len(normalizedImages) >= maxContentModerationTestImages {
|
||
return nil, 0, infraerrors.BadRequest("TOO_MANY_MODERATION_TEST_IMAGES", fmt.Sprintf("最多上传 %d 张测试图片", maxContentModerationTestImages))
|
||
}
|
||
if err := validateModerationTestImageDataURL(image); err != nil {
|
||
return nil, 0, err
|
||
}
|
||
normalizedImages = append(normalizedImages, image)
|
||
}
|
||
if prompt == "" && len(normalizedImages) == 0 {
|
||
return "hello", 0, nil
|
||
}
|
||
if len(normalizedImages) == 0 {
|
||
return prompt, 0, nil
|
||
}
|
||
parts := make([]moderationAPIInputPart, 0, len(normalizedImages)+1)
|
||
if prompt != "" {
|
||
parts = append(parts, moderationAPIInputPart{Type: "text", Text: prompt})
|
||
}
|
||
for _, image := range normalizedImages {
|
||
parts = append(parts, moderationAPIInputPart{
|
||
Type: "image_url",
|
||
ImageURL: &moderationAPIImageURLRef{URL: image},
|
||
})
|
||
}
|
||
return parts, len(normalizedImages), nil
|
||
}
|
||
|
||
func contentModerationTestHasAuditInput(prompt string, images []string) bool {
|
||
if normalizeContentModerationText(prompt) != "" {
|
||
return true
|
||
}
|
||
for _, image := range images {
|
||
if strings.TrimSpace(image) != "" {
|
||
return true
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
func validateModerationTestImageDataURL(value string) error {
|
||
if len(value) > maxContentModerationTestImageDataURLBytes {
|
||
return infraerrors.BadRequest("MODERATION_TEST_IMAGE_TOO_LARGE", "测试图片不能超过 8MB")
|
||
}
|
||
if !strings.HasPrefix(value, "data:image/") {
|
||
return infraerrors.BadRequest("INVALID_MODERATION_TEST_IMAGE", "测试图片必须是 data:image/* base64")
|
||
}
|
||
parts := strings.SplitN(value, ",", 2)
|
||
if len(parts) != 2 || !strings.Contains(parts[0], ";base64") {
|
||
return infraerrors.BadRequest("INVALID_MODERATION_TEST_IMAGE", "测试图片必须是 base64 data URL")
|
||
}
|
||
raw, err := base64.StdEncoding.DecodeString(parts[1])
|
||
if err != nil {
|
||
return infraerrors.BadRequest("INVALID_MODERATION_TEST_IMAGE", "测试图片 base64 无效")
|
||
}
|
||
if len(raw) > maxContentModerationTestImageBytes {
|
||
return infraerrors.BadRequest("MODERATION_TEST_IMAGE_TOO_LARGE", "测试图片不能超过 8MB")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func buildContentModerationTestAuditResult(result *moderationAPIResult, thresholds map[string]float64) *ContentModerationTestAuditResult {
|
||
if result == nil {
|
||
return nil
|
||
}
|
||
scores := make(map[string]float64, len(result.CategoryScores))
|
||
for category, score := range result.CategoryScores {
|
||
scores[category] = score
|
||
}
|
||
thresholdSnapshot := mergeContentModerationThresholds(ContentModerationDefaultThresholds(), thresholds)
|
||
flagged, highestCategory, highestScore := evaluateModerationScores(scores, thresholdSnapshot)
|
||
compositeScore := highestScore
|
||
return &ContentModerationTestAuditResult{
|
||
Flagged: flagged,
|
||
HighestCategory: highestCategory,
|
||
HighestScore: highestScore,
|
||
CompositeScore: compositeScore,
|
||
CategoryScores: scores,
|
||
Thresholds: thresholdSnapshot,
|
||
}
|
||
}
|
||
|
||
type moderationAPIRequest struct {
|
||
Model string `json:"model"`
|
||
Input any `json:"input"`
|
||
}
|
||
|
||
type moderationAPIInputPart struct {
|
||
Type string `json:"type"`
|
||
Text string `json:"text,omitempty"`
|
||
ImageURL *moderationAPIImageURLRef `json:"image_url,omitempty"`
|
||
}
|
||
|
||
type moderationAPIImageURLRef struct {
|
||
URL string `json:"url"`
|
||
}
|
||
|
||
type moderationAPIResponse struct {
|
||
Results []moderationAPIResult `json:"results"`
|
||
}
|
||
|
||
type moderationAPIResult struct {
|
||
Flagged bool `json:"flagged"`
|
||
CategoryScores map[string]float64 `json:"category_scores"`
|
||
}
|
||
|
||
func evaluateModerationScores(scores map[string]float64, thresholds map[string]float64) (bool, string, float64) {
|
||
flagged := false
|
||
highestCategory := ""
|
||
highestScore := 0.0
|
||
for _, category := range contentModerationCategoryOrder {
|
||
score := scores[category]
|
||
if score > highestScore || highestCategory == "" {
|
||
highestScore = score
|
||
highestCategory = category
|
||
}
|
||
if score >= thresholds[category] {
|
||
flagged = true
|
||
}
|
||
}
|
||
for category, score := range scores {
|
||
if score > highestScore || highestCategory == "" {
|
||
highestScore = score
|
||
highestCategory = category
|
||
}
|
||
}
|
||
return flagged, highestCategory, highestScore
|
||
}
|
||
|
||
func mergeContentModerationThresholds(base map[string]float64, override map[string]float64) map[string]float64 {
|
||
out := cloneFloatMap(base)
|
||
if out == nil {
|
||
out = map[string]float64{}
|
||
}
|
||
for _, category := range contentModerationCategoryOrder {
|
||
if v, ok := override[category]; ok {
|
||
if v < 0 {
|
||
v = 0
|
||
}
|
||
if v > 1 {
|
||
v = 1
|
||
}
|
||
out[category] = v
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
func normalizeInt64IDs(ids []int64) []int64 {
|
||
if len(ids) == 0 {
|
||
return []int64{}
|
||
}
|
||
seen := make(map[int64]struct{}, len(ids))
|
||
out := make([]int64, 0, len(ids))
|
||
for _, id := range ids {
|
||
if id <= 0 {
|
||
continue
|
||
}
|
||
if _, ok := seen[id]; ok {
|
||
continue
|
||
}
|
||
seen[id] = struct{}{}
|
||
out = append(out, id)
|
||
}
|
||
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
|
||
return out
|
||
}
|
||
|
||
func normalizeModerationAPIKeys(keys []string) []string {
|
||
if len(keys) == 0 {
|
||
return []string{}
|
||
}
|
||
seen := make(map[string]struct{}, len(keys))
|
||
out := make([]string, 0, len(keys))
|
||
for _, key := range keys {
|
||
key = strings.TrimSpace(key)
|
||
if key == "" {
|
||
continue
|
||
}
|
||
if _, ok := seen[key]; ok {
|
||
continue
|
||
}
|
||
seen[key] = struct{}{}
|
||
out = append(out, key)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func deleteModerationAPIKeysByHash(keys []string, hashes []string) []string {
|
||
keys = normalizeModerationAPIKeys(keys)
|
||
deleteHashes := make(map[string]struct{}, len(hashes))
|
||
for _, hash := range hashes {
|
||
hash = normalizeContentModerationHash(hash)
|
||
if hash != "" {
|
||
deleteHashes[hash] = struct{}{}
|
||
}
|
||
}
|
||
if len(deleteHashes) == 0 {
|
||
return keys
|
||
}
|
||
out := make([]string, 0, len(keys))
|
||
for _, key := range keys {
|
||
if _, ok := deleteHashes[moderationAPIKeyHash(key)]; ok {
|
||
continue
|
||
}
|
||
out = append(out, key)
|
||
}
|
||
return out
|
||
}
|
||
|
||
func normalizeContentModerationAPIKeysMode(mode string) string {
|
||
switch strings.ToLower(strings.TrimSpace(mode)) {
|
||
case contentModerationAPIKeysModeReplace:
|
||
return contentModerationAPIKeysModeReplace
|
||
default:
|
||
return contentModerationAPIKeysModeAppend
|
||
}
|
||
}
|
||
|
||
func normalizeContentModerationHash(inputHash string) string {
|
||
inputHash = strings.ToLower(strings.TrimSpace(inputHash))
|
||
if len(inputHash) != sha256.Size*2 {
|
||
return ""
|
||
}
|
||
if _, err := hex.DecodeString(inputHash); err != nil {
|
||
return ""
|
||
}
|
||
return inputHash
|
||
}
|
||
|
||
func cloneFloatMap(in map[string]float64) map[string]float64 {
|
||
if in == nil {
|
||
return map[string]float64{}
|
||
}
|
||
out := make(map[string]float64, len(in))
|
||
for k, v := range in {
|
||
out[k] = v
|
||
}
|
||
return out
|
||
}
|
||
|
||
func cloneInt64Ptr(in *int64) *int64 {
|
||
if in == nil {
|
||
return nil
|
||
}
|
||
v := *in
|
||
return &v
|
||
}
|
||
|
||
func trimRunes(text string, max int) string {
|
||
if max <= 0 {
|
||
return ""
|
||
}
|
||
runes := []rune(text)
|
||
if len(runes) <= max {
|
||
return text
|
||
}
|
||
return string(runes[:max])
|
||
}
|
||
|
||
func maskSecretTail(secret string) string {
|
||
secret = strings.TrimSpace(secret)
|
||
if secret == "" {
|
||
return ""
|
||
}
|
||
if len(secret) <= 4 {
|
||
return "****"
|
||
}
|
||
return strings.Repeat("*", 8) + secret[len(secret)-4:]
|
||
}
|