perf: 优化负载感知调度的准确性和响应速度
基于 Codex 审查建议的性能优化。 负载批量查询优化: - getAccountsLoadBatchScript 添加过期槽位清理 - 使用 ZREMRANGEBYSCORE 在计数前清理过期条目 - 防止过期槽位导致负载率计算偏高 - 提升负载感知调度的准确性 等待循环优化: - waitForSlotWithPingTimeout 添加立即获取尝试 - 避免不必要的 initialBackoff 延迟 - 低负载场景下减少响应延迟 测试改进: - 取消跳过 TestGetAccountsLoadBatch 集成测试 - 过期槽位清理应该修复了 CI 中的计数问题 影响: - 更准确的负载感知调度决策 - 更快的槽位获取响应 - 更好的测试覆盖率
This commit is contained in:
@@ -144,6 +144,21 @@ func (h *ConcurrencyHelper) waitForSlotWithPingTimeout(c *gin.Context, slotType
|
||||
ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
|
||||
defer cancel()
|
||||
|
||||
// Try immediate acquire first (avoid unnecessary wait)
|
||||
var result *service.AcquireResult
|
||||
var err error
|
||||
if slotType == "user" {
|
||||
result, err = h.concurrencyService.AcquireUserSlot(ctx, id, maxConcurrency)
|
||||
} else {
|
||||
result, err = h.concurrencyService.AcquireAccountSlot(ctx, id, maxConcurrency)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if result.Acquired {
|
||||
return result.ReleaseFunc, nil
|
||||
}
|
||||
|
||||
// Determine if ping is needed (streaming + ping format defined)
|
||||
needPing := isStream && h.pingFormat != ""
|
||||
|
||||
|
||||
@@ -151,11 +151,17 @@ var (
|
||||
return 1
|
||||
`)
|
||||
|
||||
// getAccountsLoadBatchScript - batch load query (read-only)
|
||||
// ARGV[1] = slot TTL (seconds, retained for compatibility)
|
||||
// getAccountsLoadBatchScript - batch load query with expired slot cleanup
|
||||
// ARGV[1] = slot TTL (seconds)
|
||||
// ARGV[2..n] = accountID1, maxConcurrency1, accountID2, maxConcurrency2, ...
|
||||
getAccountsLoadBatchScript = redis.NewScript(`
|
||||
local result = {}
|
||||
local slotTTL = tonumber(ARGV[1])
|
||||
|
||||
-- Get current server time
|
||||
local timeResult = redis.call('TIME')
|
||||
local nowSeconds = tonumber(timeResult[1])
|
||||
local cutoffTime = nowSeconds - slotTTL
|
||||
|
||||
local i = 2
|
||||
while i <= #ARGV do
|
||||
@@ -163,6 +169,9 @@ var (
|
||||
local maxConcurrency = tonumber(ARGV[i + 1])
|
||||
|
||||
local slotKey = 'concurrency:account:' .. accountID
|
||||
|
||||
-- Clean up expired slots before counting
|
||||
redis.call('ZREMRANGEBYSCORE', slotKey, '-inf', cutoffTime)
|
||||
local currentConcurrency = redis.call('ZCARD', slotKey)
|
||||
|
||||
local waitKey = 'wait:account:' .. accountID
|
||||
|
||||
@@ -275,7 +275,6 @@ func (s *ConcurrencyCacheSuite) TestGetUserConcurrency_Missing() {
|
||||
}
|
||||
|
||||
func (s *ConcurrencyCacheSuite) TestGetAccountsLoadBatch() {
|
||||
s.T().Skip("TODO: Fix this test - CurrentConcurrency returns 0 instead of expected value in CI")
|
||||
// Setup: Create accounts with different load states
|
||||
account1 := int64(100)
|
||||
account2 := int64(101)
|
||||
|
||||
Reference in New Issue
Block a user