fix: merge 30 general improvements from release branch

Bug fixes:
- Detached context for GetAccountConcurrencyBatch (prevent all-zero on request cancel)
- Filter soft-deleted users in GetByGroupID
- Stripe CSP policy (allow Stripe.js in script-src and frame-src)
- WebSearch API key validation on save
- RECHARGING status in payment result success check
- Windows test fixes (logger Sync deadlock, config path escaping)

Feature enhancements:
- Webhook multi-instance dispatch (extractOutTradeNo + GetWebhookProvider)
- EasyPay mobile H5 payment (device param + PayURL2)
- SSE error propagation in WebSearch emulation
- AccountStatsCost DTO field for admin usage logs
- Plans sort by sort_order instead of created_at
- UsageMapHook for streaming response usage data
- apicompat Instructions field passthrough
- EffectiveLoadFactor for ops concurrency/metrics
- Usage billing RETURNING balance for notify system
- BulkUpdate mixed channel warning with details
- println to slog migration in auth cache
- Wire ProviderSet cleanup
- CI cache-dependency-path optimization

Frontend:
- Refund eligibility check per provider (canRequestRefund)
- Plan sort_order editing
- Dead code cleanup (simulate_claude_max, client_affinity)
- GroupsView platform switch guard
- channels features_config API type
- UsageView account_stats_cost export
This commit is contained in:
erio
2026-04-14 17:35:27 +08:00
parent f1297a3694
commit 6ac8ccde46
34 changed files with 306 additions and 118 deletions

View File

@@ -6,6 +6,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"log/slog"
"math/rand/v2"
"time"
@@ -99,7 +100,7 @@ func (s *APIKeyService) StartAuthCacheInvalidationSubscriber(ctx context.Context
s.authCacheL1.Del(cacheKey)
}); err != nil {
// Log but don't fail - L1 cache will still work, just without cross-instance invalidation
println("[Service] Warning: failed to start auth cache invalidation subscriber:", err.Error())
slog.Warn("failed to start auth cache invalidation subscriber", "error", err)
}
}

View File

@@ -81,9 +81,9 @@ type wildcardMappingEntry struct {
type channelCache struct {
// 热路径查找
pricingByGroupModel map[channelModelKey]*ChannelModelPricing // (groupID, platform, model) → 定价
wildcardByGroupPlatform map[channelGroupPlatformKey][]*wildcardPricingEntry // (groupID, platform) → 通配符定价(前缀长度降序
wildcardByGroupPlatform map[channelGroupPlatformKey][]*wildcardPricingEntry // (groupID, platform) → 通配符定价(按配置顺序,先匹配先使用
mappingByGroupModel map[channelModelKey]string // (groupID, platform, model) → 映射目标
wildcardMappingByGP map[channelGroupPlatformKey][]*wildcardMappingEntry // (groupID, platform) → 通配符映射(前缀长度降序
wildcardMappingByGP map[channelGroupPlatformKey][]*wildcardMappingEntry // (groupID, platform) → 通配符映射(按配置顺序,先匹配先使用
channelByGroupID map[int64]*Channel // groupID → 渠道
groupPlatform map[int64]string // groupID → platform
@@ -680,6 +680,7 @@ func (s *ChannelService) Create(ctx context.Context, input *CreateChannelInput)
ModelPricing: input.ModelPricing,
ModelMapping: input.ModelMapping,
Features: input.Features,
FeaturesConfig: input.FeaturesConfig,
ApplyPricingToAccountStats: input.ApplyPricingToAccountStats,
AccountStatsPricingRules: input.AccountStatsPricingRules,
}
@@ -780,6 +781,9 @@ func (s *ChannelService) applyUpdateInput(ctx context.Context, channel *Channel,
if input.BillingModelSource != "" {
channel.BillingModelSource = input.BillingModelSource
}
if input.FeaturesConfig != nil {
channel.FeaturesConfig = input.FeaturesConfig
}
if input.ApplyPricingToAccountStats != nil {
channel.ApplyPricingToAccountStats = *input.ApplyPricingToAccountStats
}
@@ -959,6 +963,7 @@ type CreateChannelInput struct {
BillingModelSource string
RestrictModels bool
Features string
FeaturesConfig map[string]any
ApplyPricingToAccountStats bool
AccountStatsPricingRules []AccountStatsPricingRule
}
@@ -974,6 +979,7 @@ type UpdateChannelInput struct {
BillingModelSource string
RestrictModels *bool
Features *string
FeaturesConfig map[string]any
ApplyPricingToAccountStats *bool
AccountStatsPricingRules *[]AccountStatsPricingRule
}

View File

@@ -343,8 +343,9 @@ func (s *ConcurrencyService) StartSlotCleanupWorker(accountRepo AccountRepositor
}()
}
// GetAccountConcurrencyBatch gets current concurrency counts for multiple accounts
// Returns a map of accountID -> current concurrency count
// GetAccountConcurrencyBatch gets current concurrency counts for multiple accounts.
// Uses a detached context with timeout to prevent HTTP request cancellation from
// causing the entire batch to fail (which would show all concurrency as 0).
func (s *ConcurrencyService) GetAccountConcurrencyBatch(ctx context.Context, accountIDs []int64) (map[int64]int, error) {
if len(accountIDs) == 0 {
return map[int64]int{}, nil
@@ -356,5 +357,11 @@ func (s *ConcurrencyService) GetAccountConcurrencyBatch(ctx context.Context, acc
}
return result, nil
}
return s.cache.GetAccountConcurrencyBatch(ctx, accountIDs)
// Use a detached context so that a cancelled HTTP request doesn't cause
// the Redis pipeline to fail and return all-zero concurrency counts.
redisCtx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
return s.cache.GetAccountConcurrencyBatch(redisCtx, accountIDs)
}

View File

@@ -214,17 +214,23 @@ func writeWebSearchStreamResponse(
) (*ForwardResult, error) {
msgID := webSearchMsgIDPrefix + uuid.New().String()
toolUseID := webSearchToolUseIDPrefix + uuid.New().String()[:16]
textSummary := buildTextSummary(query, resp.Results)
setSSEHeaders(c)
if err := writeSSEMessageStart(c.Writer, msgID, model); err != nil {
return nil, fmt.Errorf("web search emulation: SSE write: %w", err)
w := c.Writer
for _, fn := range []func() error{
func() error { return writeSSEMessageStart(w, msgID, model) },
func() error { return writeSSEServerToolUse(w, toolUseID, query, 0) },
func() error { return writeSSEToolResult(w, toolUseID, resp.Results, 1) },
func() error { return writeSSETextBlock(w, textSummary, 2) },
func() error { return writeSSEMessageEnd(w, len(textSummary)/tokenEstimateDivisor) },
} {
if err := fn(); err != nil {
slog.Warn("web search emulation: SSE write failed, stopping", "error", err)
break
}
}
writeSSEServerToolUse(c.Writer, toolUseID, query, 0)
writeSSEToolResult(c.Writer, toolUseID, resp.Results, 1)
textSummary := buildTextSummary(query, resp.Results)
writeSSETextBlock(c.Writer, textSummary, 2)
writeSSEMessageEnd(c.Writer, len(textSummary)/tokenEstimateDivisor)
c.Writer.Flush()
w.Flush()
return &ForwardResult{Model: model, Duration: time.Since(startTime), Usage: ClaudeUsage{}}, nil
}
@@ -249,7 +255,7 @@ func writeSSEMessageStart(w http.ResponseWriter, msgID, model string) error {
return flushSSEJSON(w, "message_start", evt)
}
func writeSSEServerToolUse(w http.ResponseWriter, toolUseID, query string, index int) {
func writeSSEServerToolUse(w http.ResponseWriter, toolUseID, query string, index int) error {
start := map[string]any{
"type": "content_block_start", "index": index,
"content_block": map[string]any{
@@ -257,11 +263,13 @@ func writeSSEServerToolUse(w http.ResponseWriter, toolUseID, query string, index
"name": toolNameWebSearch, "input": map[string]string{"query": query},
},
}
_ = flushSSEJSON(w, "content_block_start", start)
_ = flushSSEJSON(w, "content_block_stop", map[string]any{"type": "content_block_stop", "index": index})
if err := flushSSEJSON(w, "content_block_start", start); err != nil {
return err
}
return flushSSEJSON(w, "content_block_stop", map[string]any{"type": "content_block_stop", "index": index})
}
func writeSSEToolResult(w http.ResponseWriter, toolUseID string, results []websearch.SearchResult, index int) {
func writeSSEToolResult(w http.ResponseWriter, toolUseID string, results []websearch.SearchResult, index int) error {
start := map[string]any{
"type": "content_block_start", "index": index,
"content_block": map[string]any{
@@ -269,40 +277,48 @@ func writeSSEToolResult(w http.ResponseWriter, toolUseID string, results []webse
"content": buildSearchResultBlocks(results),
},
}
_ = flushSSEJSON(w, "content_block_start", start)
_ = flushSSEJSON(w, "content_block_stop", map[string]any{"type": "content_block_stop", "index": index})
if err := flushSSEJSON(w, "content_block_start", start); err != nil {
return err
}
return flushSSEJSON(w, "content_block_stop", map[string]any{"type": "content_block_stop", "index": index})
}
func writeSSETextBlock(w http.ResponseWriter, text string, index int) {
_ = flushSSEJSON(w, "content_block_start", map[string]any{
func writeSSETextBlock(w http.ResponseWriter, text string, index int) error {
if err := flushSSEJSON(w, "content_block_start", map[string]any{
"type": "content_block_start", "index": index,
"content_block": map[string]any{"type": "text", "text": ""},
})
_ = flushSSEJSON(w, "content_block_delta", map[string]any{
}); err != nil {
return err
}
if err := flushSSEJSON(w, "content_block_delta", map[string]any{
"type": "content_block_delta", "index": index,
"delta": map[string]string{"type": "text_delta", "text": text},
})
_ = flushSSEJSON(w, "content_block_stop", map[string]any{"type": "content_block_stop", "index": index})
}); err != nil {
return err
}
return flushSSEJSON(w, "content_block_stop", map[string]any{"type": "content_block_stop", "index": index})
}
func writeSSEMessageEnd(w http.ResponseWriter, outputTokens int) {
_ = flushSSEJSON(w, "message_delta", map[string]any{
func writeSSEMessageEnd(w http.ResponseWriter, outputTokens int) error {
if err := flushSSEJSON(w, "message_delta", map[string]any{
"type": "message_delta",
"delta": map[string]any{"stop_reason": "end_turn", "stop_sequence": nil},
"usage": map[string]int{"output_tokens": outputTokens},
})
_ = flushSSEJSON(w, "message_stop", map[string]string{"type": "message_stop"})
}); err != nil {
return err
}
return flushSSEJSON(w, "message_stop", map[string]string{"type": "message_stop"})
}
// flushSSEJSON marshals data to JSON and writes an SSE event. Returns error on marshal failure.
// flushSSEJSON marshals data to JSON and writes an SSE event.
func flushSSEJSON(w http.ResponseWriter, event string, data any) error {
b, err := json.Marshal(data)
if err != nil {
slog.Error("web search emulation: failed to marshal SSE event",
"event", event, "error", err)
return err
return fmt.Errorf("marshal: %w", err)
}
if _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, b); err != nil {
return fmt.Errorf("write: %w", err)
}
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event, b)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}

View File

@@ -64,12 +64,9 @@ func (s *OpsService) getAccountsLoadMapBestEffort(ctx context.Context, accounts
if acc.ID <= 0 {
continue
}
c := acc.Concurrency
if c <= 0 {
c = 1
}
if prev, ok := unique[acc.ID]; !ok || c > prev {
unique[acc.ID] = c
lf := acc.EffectiveLoadFactor()
if prev, ok := unique[acc.ID]; !ok || lf > prev {
unique[acc.ID] = lf
}
}

View File

@@ -391,7 +391,7 @@ func (c *OpsMetricsCollector) collectConcurrencyQueueDepth(parentCtx context.Con
}
batch = append(batch, AccountWithConcurrency{
ID: acc.ID,
MaxConcurrency: acc.Concurrency,
MaxConcurrency: acc.EffectiveLoadFactor(),
})
}
if len(batch) == 0 {

View File

@@ -183,6 +183,15 @@ func TestOpsSystemLogSink_StartStopAndFlushSuccess(t *testing.T) {
if strings.TrimSpace(item.Message) == "" {
t.Fatalf("message should not be empty")
}
// writtenCount is incremented after BatchInsertSystemLogsFn returns,
// so poll briefly to avoid a race between the done signal and the atomic add.
deadline := time.Now().Add(time.Second)
for time.Now().Before(deadline) {
if sink.Health().WrittenCount > 0 {
break
}
time.Sleep(time.Millisecond)
}
health := sink.Health()
if health.WrittenCount == 0 {
t.Fatalf("written_count should be >0")

View File

@@ -113,11 +113,11 @@ func (s *PaymentConfigService) GetGroupInfoMap(ctx context.Context, plans []*dbe
}
func (s *PaymentConfigService) ListPlans(ctx context.Context) ([]*dbent.SubscriptionPlan, error) {
return s.entClient.SubscriptionPlan.Query().Order(subscriptionplan.ByCreatedAt()).All(ctx)
return s.entClient.SubscriptionPlan.Query().Order(subscriptionplan.BySortOrder()).All(ctx)
}
func (s *PaymentConfigService) ListPlansForSale(ctx context.Context) ([]*dbent.SubscriptionPlan, error) {
return s.entClient.SubscriptionPlan.Query().Where(subscriptionplan.ForSaleEQ(true)).Order(subscriptionplan.ByCreatedAt()).All(ctx)
return s.entClient.SubscriptionPlan.Query().Where(subscriptionplan.ForSaleEQ(true)).Order(subscriptionplan.BySortOrder()).All(ctx)
}
func (s *PaymentConfigService) CreatePlan(ctx context.Context, req CreatePlanRequest) (*dbent.SubscriptionPlan, error) {

View File

@@ -140,6 +140,16 @@ func (s *SettingService) SaveWebSearchEmulationConfig(ctx context.Context, cfg *
}
s.mergeExistingAPIKeys(ctx, cfg)
// After merge, validate all enabled providers have API keys
if cfg.Enabled {
for _, p := range cfg.Providers {
if p.APIKey == "" {
return infraerrors.BadRequest("MISSING_API_KEY",
fmt.Sprintf("provider %s has no API key configured", p.Type))
}
}
}
data, err := json.Marshal(cfg)
if err != nil {
return fmt.Errorf("websearch: marshal config: %w", err)