From 2f3acd9d22f644f90b53e0b9431f7d132935cd8b Mon Sep 17 00:00:00 2001 From: CaIon <1808837298@qq.com> Date: Mon, 14 Apr 2025 19:40:23 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=B5=81=E6=A8=A1?= =?UTF-8?q?=E5=BC=8F=E4=B8=8B=E7=9A=84SSE=E4=BF=9D=E6=B4=BB=E6=9C=BA?= =?UTF-8?q?=E5=88=B6=20#945?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- relay/channel/openai/relay-openai.go | 1 - relay/common/relay_info.go | 9 +++ relay/helper/common.go | 10 ++++ relay/helper/stream_scanner.go | 60 +++++++++++++++++-- setting/operation_setting/general_setting.go | 8 ++- web/src/components/ModelSetting.js | 2 + web/src/components/PersonalSetting.js | 35 +++++------ .../pages/Setting/Model/SettingGlobalModel.js | 42 ++++++++++--- 8 files changed, 136 insertions(+), 31 deletions(-) diff --git a/relay/channel/openai/relay-openai.go b/relay/channel/openai/relay-openai.go index cb209fed..7e06ea12 100644 --- a/relay/channel/openai/relay-openai.go +++ b/relay/channel/openai/relay-openai.go @@ -141,7 +141,6 @@ func OaiStreamHandler(c *gin.Context, resp *http.Response, info *relaycommon.Rel if err != nil { common.SysError("error handling stream format: " + err.Error()) } - info.SetFirstResponseTime() } lastStreamData = data streamItems = append(streamItems, data) diff --git a/relay/common/relay_info.go b/relay/common/relay_info.go index f10d3826..8ab97f5e 100644 --- a/relay/common/relay_info.go +++ b/relay/common/relay_info.go @@ -6,6 +6,7 @@ import ( "one-api/dto" relayconstant "one-api/relay/constant" "strings" + "sync" "time" "github.com/gin-gonic/gin" @@ -54,6 +55,7 @@ type RelayInfo struct { StartTime time.Time FirstResponseTime time.Time isFirstResponse bool + responseMutex sync.Mutex // Add mutex for protecting concurrent access //SendLastReasoningResponse bool ApiType int IsStream bool @@ -212,12 +214,19 @@ func (info *RelayInfo) SetIsStream(isStream bool) { } func (info *RelayInfo) SetFirstResponseTime() { + info.responseMutex.Lock() + defer info.responseMutex.Unlock() + if info.isFirstResponse { info.FirstResponseTime = time.Now() info.isFirstResponse = false } } +func (info *RelayInfo) HasSendResponse() bool { + return info.FirstResponseTime.After(info.StartTime) +} + type TaskRelayInfo struct { *RelayInfo Action string diff --git a/relay/helper/common.go b/relay/helper/common.go index 200846f6..ebfb6d58 100644 --- a/relay/helper/common.go +++ b/relay/helper/common.go @@ -55,6 +55,16 @@ func StringData(c *gin.Context, str string) error { return nil } +func PingData(c *gin.Context) error { + c.Writer.Write([]byte(": PING\n\n")) + if flusher, ok := c.Writer.(http.Flusher); ok { + flusher.Flush() + } else { + return errors.New("streaming error: flusher not found") + } + return nil +} + func ObjectData(c *gin.Context, object interface{}) error { if object == nil { return errors.New("object is nil") diff --git a/relay/helper/stream_scanner.go b/relay/helper/stream_scanner.go index 24b0b6d4..abb98f42 100644 --- a/relay/helper/stream_scanner.go +++ b/relay/helper/stream_scanner.go @@ -3,12 +3,15 @@ package helper import ( "bufio" "context" + "github.com/bytedance/gopkg/util/gopool" "io" "net/http" "one-api/common" "one-api/constant" relaycommon "one-api/relay/common" + "one-api/setting/operation_setting" "strings" + "sync" "time" "github.com/gin-gonic/gin" @@ -17,11 +20,12 @@ import ( const ( InitialScannerBufferSize = 1 << 20 // 1MB (1*1024*1024) MaxScannerBufferSize = 10 << 20 // 10MB (10*1024*1024) + DefaultPingInterval = 10 * time.Second ) func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo, dataHandler func(data string) bool) { - if resp == nil { + if resp == nil || dataHandler == nil { return } @@ -34,13 +38,29 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon } var ( - stopChan = make(chan bool, 2) - scanner = bufio.NewScanner(resp.Body) - ticker = time.NewTicker(streamingTimeout) + stopChan = make(chan bool, 2) + scanner = bufio.NewScanner(resp.Body) + ticker = time.NewTicker(streamingTimeout) + pingTicker *time.Ticker + writeMutex sync.Mutex // Mutex to protect concurrent writes ) + generalSettings := operation_setting.GetGeneralSetting() + pingEnabled := generalSettings.PingIntervalEnabled + pingInterval := time.Duration(generalSettings.PingIntervalSeconds) * time.Second + if pingInterval <= 0 { + pingInterval = DefaultPingInterval + } + + if pingEnabled { + pingTicker = time.NewTicker(pingInterval) + } + defer func() { ticker.Stop() + if pingTicker != nil { + pingTicker.Stop() + } close(stopChan) }() scanner.Buffer(make([]byte, InitialScannerBufferSize), MaxScannerBufferSize) @@ -51,6 +71,34 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon defer cancel() ctx = context.WithValue(ctx, "stop_chan", stopChan) + + // Handle ping data sending + if pingEnabled && pingTicker != nil { + gopool.Go(func() { + for { + select { + case <-pingTicker.C: + writeMutex.Lock() // Lock before writing + err := PingData(c) + writeMutex.Unlock() // Unlock after writing + if err != nil { + common.LogError(c, "ping data error: "+err.Error()) + common.SafeSendBool(stopChan, true) + return + } + if common.DebugEnabled { + println("ping data sent") + } + case <-ctx.Done(): + if common.DebugEnabled { + println("ping data goroutine stopped") + } + return + } + } + }) + } + common.RelayCtxGo(ctx, func() { for scanner.Scan() { ticker.Reset(streamingTimeout) @@ -70,7 +118,9 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon data = strings.TrimSuffix(data, "\"") if !strings.HasPrefix(data, "[DONE]") { info.SetFirstResponseTime() + writeMutex.Lock() // Lock before writing success := dataHandler(data) + writeMutex.Unlock() // Unlock after writing if !success { break } @@ -90,7 +140,9 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon case <-ticker.C: // 超时处理逻辑 common.LogError(c, "streaming timeout") + common.SafeSendBool(stopChan, true) case <-stopChan: // 正常结束 + common.LogInfo(c, "streaming finished") } } diff --git a/setting/operation_setting/general_setting.go b/setting/operation_setting/general_setting.go index 787f0e5f..ae0c436e 100644 --- a/setting/operation_setting/general_setting.go +++ b/setting/operation_setting/general_setting.go @@ -3,12 +3,16 @@ package operation_setting import "one-api/setting/config" type GeneralSetting struct { - DocsLink string `json:"docs_link"` + DocsLink string `json:"docs_link"` + PingIntervalEnabled bool `json:"ping_interval_enabled"` + PingIntervalSeconds int `json:"ping_interval_seconds"` } // 默认配置 var generalSetting = GeneralSetting{ - DocsLink: "https://docs.newapi.pro", + DocsLink: "https://docs.newapi.pro", + PingIntervalEnabled: false, + PingIntervalSeconds: 60, } func init() { diff --git a/web/src/components/ModelSetting.js b/web/src/components/ModelSetting.js index 4815abbc..ce89c337 100644 --- a/web/src/components/ModelSetting.js +++ b/web/src/components/ModelSetting.js @@ -18,6 +18,8 @@ const ModelSetting = () => { 'claude.default_max_tokens': '', 'claude.thinking_adapter_budget_tokens_percentage': 0.8, 'global.pass_through_request_enabled': false, + 'general_setting.ping_interval_enabled': false, + 'general_setting.ping_interval_seconds': 60, }); let [loading, setLoading] = useState(false); diff --git a/web/src/components/PersonalSetting.js b/web/src/components/PersonalSetting.js index 1053f2b0..0e470a86 100644 --- a/web/src/components/PersonalSetting.js +++ b/web/src/components/PersonalSetting.js @@ -793,23 +793,7 @@ const PersonalSetting = () => { - - -
- {t('接受未设置价格模型')} -
- handleNotificationSettingChange('acceptUnsetModelRatioModel', e.target.checked)} - > - {t('接受未设置价格模型')} - - - {t('当模型没有设置价格时仍接受调用,仅当您信任该网站时使用,可能会产生高额费用')} - -
-
-
+
{t('通知方式')} @@ -923,6 +907,23 @@ const PersonalSetting = () => {
+ +
+ {t('接受未设置价格模型')} +
+ handleNotificationSettingChange('acceptUnsetModelRatioModel', e.target.checked)} + > + {t('接受未设置价格模型')} + + + {t('当模型没有设置价格时仍接受调用,仅当您信任该网站时使用,可能会产生高额费用')} + +
+
+
+