From 89f731cb19eaaf73c4789ee8c7432c134682eedc Mon Sep 17 00:00:00 2001 From: huangzhenpc Date: Tue, 12 May 2026 09:04:11 +0800 Subject: [PATCH] feat: first-byte timeout with same-endpoint retry Upstream sometimes accepts a request (HTTP 200 headers) but stalls without sending any event-stream packet. Add a configurable timeout that counts from request dispatch until the first AWS event-stream prelude is read, and retry on the same endpoint before falling back. - Config: FirstByteTimeoutSec (default 10s, 0=disabled, range 0-300), FirstByteRetries (default 1, range 0-10), with Get/Update helpers. - kiro.go: parseEventStream signature gains onFirstByte callback, fired once when the first 12-byte prelude reads successfully. CallKiroAPI wraps each attempt in a context.WithCancel + time.AfterFunc timer that cancels the HTTP request if no event arrives before the deadline. Separate retry budgets for INVALID_MODEL_ID and first-byte timeout, tracked on the same attempt loop; maxAttempts = max(both)+1. - handler.go: /admin/api/general extended to read/write the two new fields with validation (timeout 0-300, retries 0-10). - web/index.html: General Settings card gains two numeric inputs plus CN/EN i18n and the corresponding load/save JS. --- config/config.go | 50 +++++++++++++++++++++++++++++ proxy/handler.go | 32 +++++++++++++++++++ proxy/kiro.go | 82 +++++++++++++++++++++++++++++++++++++++++++----- web/index.html | 45 ++++++++++++++++++++++---- 4 files changed, 196 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index 21b9adf..1107ebc 100644 --- a/config/config.go +++ b/config/config.go @@ -116,6 +116,8 @@ type Config struct { // General behavior settings InvalidModelRetries int `json:"invalidModelRetries,omitempty"` // Same-endpoint retry count on INVALID_MODEL_ID (default: 3) + FirstByteTimeoutSec int `json:"firstByteTimeoutSec,omitempty"` // First-byte timeout in seconds (default: 10, 0=disabled) + FirstByteRetries int `json:"firstByteRetries,omitempty"` // Same-endpoint retry count on first-byte timeout (default: 1) // Global statistics (persisted across restarts) TotalRequests int `json:"totalRequests,omitempty"` // Total API requests received @@ -493,6 +495,54 @@ func UpdateInvalidModelRetries(n int) error { return Save() } +// GetFirstByteTimeoutSec 返回首字节超时秒数(默认 10,<=0 表示禁用) +func GetFirstByteTimeoutSec() int { + cfgLock.RLock() + defer cfgLock.RUnlock() + if cfg.FirstByteTimeoutSec < 0 { + return 0 + } + if cfg.FirstByteTimeoutSec == 0 { + return 10 + } + return cfg.FirstByteTimeoutSec +} + +// UpdateFirstByteTimeoutSec 更新首字节超时秒数 +func UpdateFirstByteTimeoutSec(n int) error { + cfgLock.Lock() + defer cfgLock.Unlock() + if n < 0 { + n = 0 + } + cfg.FirstByteTimeoutSec = n + return Save() +} + +// GetFirstByteRetries 返回首字节超时同端点重试次数(默认 1) +func GetFirstByteRetries() int { + cfgLock.RLock() + defer cfgLock.RUnlock() + if cfg.FirstByteRetries < 0 { + return 0 + } + if cfg.FirstByteRetries == 0 { + return 1 + } + return cfg.FirstByteRetries +} + +// UpdateFirstByteRetries 更新首字节超时同端点重试次数 +func UpdateFirstByteRetries(n int) error { + cfgLock.Lock() + defer cfgLock.Unlock() + if n < 0 { + n = 0 + } + cfg.FirstByteRetries = n + return Save() +} + type KiroClientConfig struct { KiroVersion string SystemVersion string diff --git a/proxy/handler.go b/proxy/handler.go index 2e5ee76..bfccaf3 100644 --- a/proxy/handler.go +++ b/proxy/handler.go @@ -2935,6 +2935,8 @@ func (h *Handler) apiUpdateProxy(w http.ResponseWriter, r *http.Request) { func (h *Handler) apiGetGeneralConfig(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(map[string]interface{}{ "invalidModelRetries": config.GetInvalidModelRetries(), + "firstByteTimeoutSec": config.GetFirstByteTimeoutSec(), + "firstByteRetries": config.GetFirstByteRetries(), }) } @@ -2942,6 +2944,8 @@ func (h *Handler) apiGetGeneralConfig(w http.ResponseWriter, r *http.Request) { func (h *Handler) apiUpdateGeneralConfig(w http.ResponseWriter, r *http.Request) { var req struct { InvalidModelRetries *int `json:"invalidModelRetries"` + FirstByteTimeoutSec *int `json:"firstByteTimeoutSec"` + FirstByteRetries *int `json:"firstByteRetries"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { w.WriteHeader(400) @@ -2963,6 +2967,34 @@ func (h *Handler) apiUpdateGeneralConfig(w http.ResponseWriter, r *http.Request) } } + if req.FirstByteTimeoutSec != nil { + n := *req.FirstByteTimeoutSec + if n < 0 || n > 300 { + w.WriteHeader(400) + json.NewEncoder(w).Encode(map[string]string{"error": "firstByteTimeoutSec must be 0-300"}) + return + } + if err := config.UpdateFirstByteTimeoutSec(n); err != nil { + w.WriteHeader(500) + json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + return + } + } + + if req.FirstByteRetries != nil { + n := *req.FirstByteRetries + if n < 0 || n > 10 { + w.WriteHeader(400) + json.NewEncoder(w).Encode(map[string]string{"error": "firstByteRetries must be 0-10"}) + return + } + if err := config.UpdateFirstByteRetries(n); err != nil { + w.WriteHeader(500) + json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) + return + } + } + json.NewEncoder(w).Encode(map[string]bool{"success": true}) } diff --git a/proxy/kiro.go b/proxy/kiro.go index 4f37e4b..b64c6bd 100644 --- a/proxy/kiro.go +++ b/proxy/kiro.go @@ -4,6 +4,7 @@ package proxy import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -192,6 +193,8 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt // 根据配置排序端点 endpoints := getSortedEndpoints(config.GetPreferredEndpoint()) invalidModelRetries := config.GetInvalidModelRetries() + firstByteTimeoutSec := config.GetFirstByteTimeoutSec() + firstByteRetries := config.GetFirstByteRetries() modelID := payload.ConversationState.CurrentMessage.UserInputMessage.ModelID accountLabel := account.Email @@ -203,7 +206,7 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt for _, ep := range endpoints { endpointNames = append(endpointNames, ep.Name) } - log.Printf("[KiroAPI] request start account=%s model=%q endpoints=[%s] invalid_model_retries=%d", accountLabel, modelID, strings.Join(endpointNames, ","), invalidModelRetries) + log.Printf("[KiroAPI] request start account=%s model=%q endpoints=[%s] invalid_model_retries=%d first_byte_timeout=%ds first_byte_retries=%d", accountLabel, modelID, strings.Join(endpointNames, ","), invalidModelRetries, firstByteTimeoutSec, firstByteRetries) requestStart := time.Now() @@ -212,13 +215,22 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt // 更新 payload 中的 origin payload.ConversationState.CurrentMessage.UserInputMessage.Origin = ep.Origin - // 单端点内重试循环:INVALID_MODEL_ID 时同端点重试 + // 同端点重试上限取两种策略中更大的那个,分别记数 maxAttempts := invalidModelRetries + 1 + if firstByteRetries+1 > maxAttempts { + maxAttempts = firstByteRetries + 1 + } + invalidModelUsed := 0 + firstByteUsed := 0 shouldFallback := false + for attempt := 1; attempt <= maxAttempts; attempt++ { reqBody, _ := json.Marshal(payload) - req, err := http.NewRequest("POST", ep.URL, bytes.NewReader(reqBody)) + + ctx, cancel := context.WithCancel(context.Background()) + req, err := http.NewRequestWithContext(ctx, "POST", ep.URL, bytes.NewReader(reqBody)) if err != nil { + cancel() lastErr = err shouldFallback = true break @@ -244,6 +256,7 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt resp, err := kiroHttpStore.Load().Do(req) if err != nil { + cancel() lastErr = err log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q transport_error elapsed=%s err=%v", ep.Name, attempt, maxAttempts, accountLabel, modelID, time.Since(attemptStart), err) shouldFallback = true @@ -252,6 +265,7 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt if resp.StatusCode == 429 { resp.Body.Close() + cancel() log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q status=429 quota_exhausted elapsed=%s", ep.Name, attempt, maxAttempts, accountLabel, modelID, time.Since(attemptStart)) lastErr = fmt.Errorf("quota exhausted on %s", ep.Name) shouldFallback = true @@ -261,6 +275,7 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt if resp.StatusCode != 200 { errBody, _ := io.ReadAll(resp.Body) resp.Body.Close() + cancel() lastErr = fmt.Errorf("HTTP %d from %s: %s", resp.StatusCode, ep.Name, string(errBody)) bodyStr := string(errBody) @@ -272,8 +287,9 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt // INVALID_MODEL_ID: 同端点再试 if resp.StatusCode == 400 && strings.Contains(bodyStr, "INVALID_MODEL_ID") { - if attempt < maxAttempts { - log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q status=400 INVALID_MODEL_ID retrying elapsed=%s", ep.Name, attempt, maxAttempts, accountLabel, modelID, time.Since(attemptStart)) + if invalidModelUsed < invalidModelRetries { + invalidModelUsed++ + log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q status=400 INVALID_MODEL_ID retrying (%d/%d) elapsed=%s", ep.Name, attempt, maxAttempts, accountLabel, modelID, invalidModelUsed, invalidModelRetries, time.Since(attemptStart)) continue } log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q status=400 INVALID_MODEL_ID exhausted, fallback elapsed=%s", ep.Name, attempt, maxAttempts, accountLabel, modelID, time.Since(attemptStart)) @@ -287,8 +303,50 @@ func CallKiroAPI(account *config.Account, payload *KiroPayload, callback *KiroSt } log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q status=200 headers_elapsed=%s, streaming...", ep.Name, attempt, maxAttempts, accountLabel, modelID, time.Since(attemptStart)) - err = parseEventStream(resp.Body, callback) + + // 首字节超时:启动 AfterFunc 在超时时 cancel context;收到首包后 stop timer + var firstByteReceived atomic.Bool + var firstByteTimedOut atomic.Bool + var timer *time.Timer + if firstByteTimeoutSec > 0 { + timer = time.AfterFunc(time.Duration(firstByteTimeoutSec)*time.Second, func() { + if !firstByteReceived.Load() { + firstByteTimedOut.Store(true) + log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q first_byte_timeout=%ds, cancelling", ep.Name, attempt, maxAttempts, accountLabel, modelID, firstByteTimeoutSec) + cancel() + } + }) + } + + onFirstByte := func() { + firstByteReceived.Store(true) + if timer != nil { + timer.Stop() + } + log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q first_byte elapsed=%s", ep.Name, attempt, maxAttempts, accountLabel, modelID, time.Since(attemptStart)) + } + + err = parseEventStream(resp.Body, callback, onFirstByte) resp.Body.Close() + if timer != nil { + timer.Stop() + } + cancel() + + // 首字节超时触发且无数据到达 → 同端点重试 + if err != nil && firstByteTimedOut.Load() && !firstByteReceived.Load() { + if firstByteUsed < firstByteRetries { + firstByteUsed++ + lastErr = fmt.Errorf("first-byte timeout after %ds", firstByteTimeoutSec) + log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q first_byte_timeout retrying (%d/%d)", ep.Name, attempt, maxAttempts, accountLabel, modelID, firstByteUsed, firstByteRetries) + continue + } + lastErr = fmt.Errorf("first-byte timeout after %ds on %s", firstByteTimeoutSec, ep.Name) + log.Printf("[KiroAPI] endpoint=%s attempt=%d/%d account=%s model=%q first_byte_timeout exhausted, fallback", ep.Name, attempt, maxAttempts, accountLabel, modelID) + shouldFallback = true + break + } + log.Printf("[KiroAPI] endpoint=%s account=%s model=%q done total_elapsed=%s err=%v", ep.Name, accountLabel, modelID, time.Since(requestStart), err) return err } @@ -316,13 +374,16 @@ func truncateForLog(s string, max int) string { // ==================== Event Stream 解析 ==================== // parseEventStream 解析 AWS Event Stream 二进制格式 -func parseEventStream(body io.Reader, callback *KiroStreamCallback) error { +// onFirstByte 会在读完第一个完整 event-stream 包 prelude 时触发一次(只一次), +// 供外层判断「首字节是否已收到」,以决定首字节超时时是否应该重试。 +func parseEventStream(body io.Reader, callback *KiroStreamCallback, onFirstByte func()) error { // 不使用 bufio,直接读取避免缓冲延迟 var inputTokens, outputTokens int var totalCredits float64 var currentToolUse *toolUseState var lastAssistantContent string var lastReasoningContent string + firstByteFired := false for { // Prelude: 12 bytes (total_len + headers_len + crc) @@ -335,6 +396,13 @@ func parseEventStream(body io.Reader, callback *KiroStreamCallback) error { return err } + if !firstByteFired { + firstByteFired = true + if onFirstByte != nil { + onFirstByte() + } + } + totalLength := int(prelude[0])<<24 | int(prelude[1])<<16 | int(prelude[2])<<8 | int(prelude[3]) headersLength := int(prelude[4])<<24 | int(prelude[5])<<16 | int(prelude[6])<<8 | int(prelude[7]) diff --git a/web/index.html b/web/index.html index 88b07ee..26accb1 100644 --- a/web/index.html +++ b/web/index.html @@ -977,6 +977,18 @@ +
+ + + +
+
+ + + +
@@ -1165,6 +1177,10 @@ 'settings.generalSettings': '通用设置', 'settings.invalidModelRetries': 'INVALID_MODEL_ID 同端点重试次数', 'settings.invalidModelRetriesHint': '当上游返回 INVALID_MODEL_ID(HTTP 400)时,先在当前端点重试 N 次后再 fallback 到下一个端点。默认 3,范围 0-20', + 'settings.firstByteTimeoutSec': '首字节超时(秒)', + 'settings.firstByteTimeoutHint': '请求发出后,若在 N 秒内未收到上游任何 event-stream 数据包,则判定首字节超时并在同端点重试。默认 10,设为 0 禁用。范围 0-300', + 'settings.firstByteRetries': '首字节超时同端点重试次数', + 'settings.firstByteRetriesHint': '首字节超时发生时,在当前端点重试 N 次后再 fallback 到下一个端点。默认 1,范围 0-10', 'settings.saveGeneral': '保存通用设置', 'settings.generalSaved': '通用设置已保存', 'settings.thinkingSettings': 'Thinking 模式设置', @@ -1386,6 +1402,10 @@ 'settings.generalSettings': 'General Settings', 'settings.invalidModelRetries': 'INVALID_MODEL_ID same-endpoint retries', 'settings.invalidModelRetriesHint': 'When upstream returns INVALID_MODEL_ID (HTTP 400), retry the current endpoint N times before falling back. Default 3, range 0-20', + 'settings.firstByteTimeoutSec': 'First-byte timeout (seconds)', + 'settings.firstByteTimeoutHint': 'After sending a request, if no upstream event-stream packet arrives within N seconds, treat as first-byte timeout and retry on the same endpoint. Default 10, set 0 to disable. Range 0-300', + 'settings.firstByteRetries': 'First-byte timeout same-endpoint retries', + 'settings.firstByteRetriesHint': 'On first-byte timeout, retry the current endpoint N times before falling back. Default 1, range 0-10', 'settings.saveGeneral': 'Save General Settings', 'settings.generalSaved': 'General settings saved', 'settings.thinkingSettings': 'Thinking Mode Settings', @@ -2094,19 +2114,32 @@ async function loadGeneralConfig() { const res = await fetch('/admin/api/general', { headers: { 'X-Admin-Password': password } }); const d = await res.json(); - const v = (d && typeof d.invalidModelRetries === 'number') ? d.invalidModelRetries : 3; - document.getElementById('invalidModelRetries').value = v; + document.getElementById('invalidModelRetries').value = + (d && typeof d.invalidModelRetries === 'number') ? d.invalidModelRetries : 3; + document.getElementById('firstByteTimeoutSec').value = + (d && typeof d.firstByteTimeoutSec === 'number') ? d.firstByteTimeoutSec : 10; + document.getElementById('firstByteRetries').value = + (d && typeof d.firstByteRetries === 'number') ? d.firstByteRetries : 1; } async function saveGeneralConfig() { - const raw = document.getElementById('invalidModelRetries').value; - const n = parseInt(raw, 10); + const n = parseInt(document.getElementById('invalidModelRetries').value, 10); + const t1 = parseInt(document.getElementById('firstByteTimeoutSec').value, 10); + const r1 = parseInt(document.getElementById('firstByteRetries').value, 10); if (isNaN(n) || n < 0 || n > 20) { - alert(t('common.saveFailed') + ': 0-20'); + alert(t('common.saveFailed') + ': invalidModelRetries 0-20'); + return; + } + if (isNaN(t1) || t1 < 0 || t1 > 300) { + alert(t('common.saveFailed') + ': firstByteTimeoutSec 0-300'); + return; + } + if (isNaN(r1) || r1 < 0 || r1 > 10) { + alert(t('common.saveFailed') + ': firstByteRetries 0-10'); return; } const res = await fetch('/admin/api/general', { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Admin-Password': password }, - body: JSON.stringify({ invalidModelRetries: n }) + body: JSON.stringify({ invalidModelRetries: n, firstByteTimeoutSec: t1, firstByteRetries: r1 }) }); const d = await res.json(); if (d.success) { alert(t('settings.generalSaved')); } else { alert(t('common.saveFailed') + ': ' + d.error); }