fix: stabilize thinking streams, multimodal parsing, and token accounting (#20)

* fix: stabilize multimodal image compatibility across OpenCode flows

Advertise vision-capable metadata in /v1/models and make model matching deterministic so OpenCode does not downgrade image support or route 4.6 models incorrectly. Expand request translation to accept OpenCode/OpenAI attachment shapes, sanitize [Image N] placeholders safely, keep image-only follow-up turns non-empty, and improve token accounting so base64 image bytes no longer inflate prompt token usage and trigger premature compaction.

* fix: deduplicate thinking streams and trim injected prompt noise

* fix: align /v1/messages thinking blocks and message_start usage

* fix: reduce repetitive thinking across tool turns

Select a single reasoning stream source, prevent chunk replay, and preserve structured tool-loop context so the model keeps continuity instead of re-planning each turn.

* fix: unify token counting on existing API endpoints

Compute usage deterministically on /v1/messages and /v1/chat/completions even when upstream omits tokenUsage.

- remove roo-only token path and keep behavior on existing endpoints
- add proxy/token_estimator.go with shared Claude/OpenAI estimators (input/system/messages/tools + output/thinking/tool calls)
- wire stream/non-stream handlers to use estimator-derived input/output usage
- update /v1/messages/count_tokens to reuse the same estimator
- keep robust upstream usage parsing/normalization in proxy/kiro.go while dropping parser-level estimate fallback

Why: direct upstream tests show metering/context events frequently arrive without tokenUsage in this environment; this made usage zero or inconsistent. Local deterministic accounting keeps reported usage stable and explicit.
This commit is contained in:
edxeth
2026-02-23 13:33:53 +01:00
committed by GitHub
parent f4049948f1
commit 6151888df5
7 changed files with 1413 additions and 326 deletions

View File

@@ -35,6 +35,32 @@ type Handler struct {
modelsCacheTime int64
}
type thinkingStreamSource int
const (
thinkingSourceUnknown thinkingStreamSource = iota
thinkingSourceReasoningEvent
thinkingSourceTagBlock
)
func allowReasoningSource(source *thinkingStreamSource) bool {
if *source == thinkingSourceTagBlock {
return false
}
*source = thinkingSourceReasoningEvent
return true
}
func allowTagSource(source *thinkingStreamSource) bool {
if *source == thinkingSourceReasoningEvent {
return false
}
if *source == thinkingSourceUnknown {
*source = thinkingSourceTagBlock
}
return *source == thinkingSourceTagBlock
}
func NewHandler() *Handler {
totalReq, successReq, failedReq, totalTokens, totalCredits := config.GetStats()
h := &Handler{
@@ -248,36 +274,33 @@ func (h *Handler) handleModels(w http.ResponseWriter, r *http.Request) {
var models []map[string]interface{}
if len(cached) > 0 {
for _, m := range cached {
models = append(models, map[string]interface{}{
"id": m.ModelId, "object": "model", "owned_by": "anthropic",
})
supportsImage := modelSupportsImage(m.InputTypes)
models = append(models, buildModelInfo(m.ModelId, "anthropic", supportsImage))
// 自动生成 thinking 变体
models = append(models, map[string]interface{}{
"id": m.ModelId + thinkingSuffix, "object": "model", "owned_by": "anthropic",
})
models = append(models, buildModelInfo(m.ModelId+thinkingSuffix, "anthropic", supportsImage))
}
} else {
// fallback 静态列表
models = []map[string]interface{}{
{"id": "claude-sonnet-4.6", "object": "model", "owned_by": "anthropic"},
{"id": "claude-sonnet-4.6" + thinkingSuffix, "object": "model", "owned_by": "anthropic"},
{"id": "claude-opus-4.6", "object": "model", "owned_by": "anthropic"},
{"id": "claude-opus-4.6" + thinkingSuffix, "object": "model", "owned_by": "anthropic"},
{"id": "claude-sonnet-4.5", "object": "model", "owned_by": "anthropic"},
{"id": "claude-sonnet-4.5" + thinkingSuffix, "object": "model", "owned_by": "anthropic"},
{"id": "claude-sonnet-4", "object": "model", "owned_by": "anthropic"},
{"id": "claude-sonnet-4" + thinkingSuffix, "object": "model", "owned_by": "anthropic"},
{"id": "claude-haiku-4.5", "object": "model", "owned_by": "anthropic"},
{"id": "claude-haiku-4.5" + thinkingSuffix, "object": "model", "owned_by": "anthropic"},
{"id": "claude-opus-4.5", "object": "model", "owned_by": "anthropic"},
{"id": "claude-opus-4.5" + thinkingSuffix, "object": "model", "owned_by": "anthropic"},
buildModelInfo("claude-sonnet-4.6", "anthropic", true),
buildModelInfo("claude-sonnet-4.6"+thinkingSuffix, "anthropic", true),
buildModelInfo("claude-opus-4.6", "anthropic", true),
buildModelInfo("claude-opus-4.6"+thinkingSuffix, "anthropic", true),
buildModelInfo("claude-sonnet-4.5", "anthropic", true),
buildModelInfo("claude-sonnet-4.5"+thinkingSuffix, "anthropic", true),
buildModelInfo("claude-sonnet-4", "anthropic", true),
buildModelInfo("claude-sonnet-4"+thinkingSuffix, "anthropic", true),
buildModelInfo("claude-haiku-4.5", "anthropic", true),
buildModelInfo("claude-haiku-4.5"+thinkingSuffix, "anthropic", true),
buildModelInfo("claude-opus-4.5", "anthropic", true),
buildModelInfo("claude-opus-4.5"+thinkingSuffix, "anthropic", true),
}
}
// 添加别名模型
models = append(models,
map[string]interface{}{"id": "auto", "object": "model", "owned_by": "kiro-proxy"},
map[string]interface{}{"id": "gpt-4o", "object": "model", "owned_by": "kiro-proxy"},
map[string]interface{}{"id": "gpt-4", "object": "model", "owned_by": "kiro-proxy"},
buildModelInfo("auto", "kiro-proxy", true),
buildModelInfo("gpt-4o", "kiro-proxy", true),
buildModelInfo("gpt-4", "kiro-proxy", true),
)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
@@ -287,6 +310,49 @@ func (h *Handler) handleModels(w http.ResponseWriter, r *http.Request) {
})
}
func modelSupportsImage(inputTypes []string) bool {
for _, t := range inputTypes {
lt := strings.ToLower(t)
if strings.Contains(lt, "image") || strings.Contains(lt, "vision") {
return true
}
}
return false
}
func buildModelInfo(id, ownedBy string, supportsImage bool) map[string]interface{} {
modalities := []string{"text"}
if supportsImage {
modalities = append(modalities, "image")
}
modalitiesMap := map[string][]string{
"input": modalities,
"output": []string{"text"},
}
return map[string]interface{}{
"id": id,
"object": "model",
"owned_by": ownedBy,
"supports_image": supportsImage,
"input_modalities": modalities,
"modalities": modalitiesMap,
"capabilities": map[string]bool{
"vision": supportsImage,
"image": supportsImage,
"image_vision": supportsImage,
},
"info": map[string]interface{}{
"meta": map[string]interface{}{
"capabilities": map[string]bool{
"vision": supportsImage,
"image_vision": supportsImage,
},
},
},
}
}
// refreshModelsCache 从 Kiro API 拉取模型列表并缓存
func (h *Handler) refreshModelsCache() {
account := h.pool.GetNext()
@@ -327,50 +393,13 @@ func (h *Handler) handleCountTokens(w http.ResponseWriter, r *http.Request) {
return
}
var req struct {
Messages []struct {
Role string `json:"role"`
Content interface{} `json:"content"`
} `json:"messages"`
System interface{} `json:"system"`
}
var req ClaudeRequest
if err := json.Unmarshal(body, &req); err != nil {
h.sendClaudeError(w, 400, "invalid_request_error", "Invalid JSON")
return
}
// 简单估算 token 数量(每 4 个字符约 1 个 token
var totalChars int
for _, msg := range req.Messages {
switch content := msg.Content.(type) {
case string:
totalChars += len(content)
case []interface{}:
for _, part := range content {
if p, ok := part.(map[string]interface{}); ok {
if text, ok := p["text"].(string); ok {
totalChars += len(text)
}
}
}
}
}
// 系统提示
switch system := req.System.(type) {
case string:
totalChars += len(system)
case []interface{}:
for _, part := range system {
if p, ok := part.(map[string]interface{}); ok {
if text, ok := p["text"].(string); ok {
totalChars += len(text)
}
}
}
}
estimatedTokens := (totalChars + 3) / 4 // 向上取整
estimatedTokens := estimateClaudeRequestInputTokens(&req)
if estimatedTokens < 1 {
estimatedTokens = 1
}
@@ -381,6 +410,10 @@ func (h *Handler) handleCountTokens(w http.ResponseWriter, r *http.Request) {
// handleClaudeMessages Claude API 处理
func (h *Handler) handleClaudeMessages(w http.ResponseWriter, r *http.Request) {
h.handleClaudeMessagesInternal(w, r)
}
func (h *Handler) handleClaudeMessagesInternal(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
http.Error(w, "Method Not Allowed", 405)
return
@@ -416,20 +449,21 @@ func (h *Handler) handleClaudeMessages(w http.ResponseWriter, r *http.Request) {
thinkingCfg := config.GetThinkingConfig()
actualModel, thinking := ParseModelAndThinking(req.Model, thinkingCfg.Suffix)
req.Model = actualModel
estimatedInputTokens := estimateClaudeRequestInputTokens(&req)
// 转换请求
kiroPayload := ClaudeToKiro(&req, thinking)
// 流式或非流式
if req.Stream {
h.handleClaudeStream(w, account, kiroPayload, req.Model)
h.handleClaudeStream(w, account, kiroPayload, req.Model, thinking, estimatedInputTokens)
} else {
h.handleClaudeNonStream(w, account, kiroPayload, req.Model)
h.handleClaudeNonStream(w, account, kiroPayload, req.Model, thinking, estimatedInputTokens)
}
}
// handleClaudeStream Claude 流式响应
func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string) {
func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string, thinking bool, estimatedInputTokens int) {
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
@@ -444,91 +478,169 @@ func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Acco
thinkingFormat := config.GetThinkingConfig().ClaudeFormat
msgID := "msg_" + uuid.New().String()
var contentStarted bool
var toolUseIndex int
var inputTokens, outputTokens int
var credits float64
var toolUses []KiroToolUse
var nextContentIndex int
var rawContentBuilder strings.Builder
var rawThinkingBuilder strings.Builder
activeBlockIndex := -1
activeBlockType := ""
startInputTokens := estimatedInputTokens
closeActiveBlock := func() {
if activeBlockIndex < 0 {
return
}
h.sendSSE(w, flusher, "content_block_stop", map[string]interface{}{
"type": "content_block_stop",
"index": activeBlockIndex,
})
activeBlockIndex = -1
activeBlockType = ""
}
startContentBlock := func(blockType string) {
if activeBlockType == blockType {
return
}
closeActiveBlock()
idx := nextContentIndex
nextContentIndex++
if blockType == "thinking" {
h.sendSSE(w, flusher, "content_block_start", map[string]interface{}{
"type": "content_block_start",
"index": idx,
"content_block": map[string]string{
"type": "thinking",
"thinking": "",
},
})
} else {
h.sendSSE(w, flusher, "content_block_start", map[string]interface{}{
"type": "content_block_start",
"index": idx,
"content_block": map[string]string{
"type": "text",
"text": "",
},
})
}
activeBlockIndex = idx
activeBlockType = blockType
}
// Thinking 标签解析状态
var textBuffer string
var inThinkingBlock bool
var dropTagThinking bool
var thinkingSource thinkingStreamSource
// 发送文本的辅助函数
// thinkingState: 0=普通内容, 1=thinking开始, 2=thinking中间, 3=thinking结束
sendText := func(text string, thinkingState int) {
// 确保 content_block 已开始
if !contentStarted {
h.sendSSE(w, flusher, "content_block_start", map[string]interface{}{
"type": "content_block_start",
"index": 0,
"content_block": map[string]string{"type": "text", "text": ""},
})
contentStarted = true
}
if thinkingState == 0 {
// 普通内容
if text == "" {
return
}
startContentBlock("text")
h.sendSSE(w, flusher, "content_block_delta", map[string]interface{}{
"type": "content_block_delta",
"index": 0,
"index": activeBlockIndex,
"delta": map[string]string{"type": "text_delta", "text": text},
})
} else {
// thinking 内容
return
}
if !thinking {
return
}
switch thinkingFormat {
case "think":
var outputText string
switch thinkingFormat {
case "think":
switch thinkingState {
case 1:
outputText = "<think>" + text
case 2:
outputText = text
case 3:
outputText = text + "</think>"
}
case "reasoning_content":
// Claude 格式不支持 reasoning_content直接输出内容
switch thinkingState {
case 1:
outputText = "<think>" + text
case 2:
outputText = text
default: // "thinking"
switch thinkingState {
case 1:
outputText = "<thinking>" + text
case 2:
outputText = text
case 3:
outputText = text + "</thinking>"
}
case 3:
outputText = text + "</think>"
}
if outputText == "" {
return
}
startContentBlock("text")
h.sendSSE(w, flusher, "content_block_delta", map[string]interface{}{
"type": "content_block_delta",
"index": 0,
"index": activeBlockIndex,
"delta": map[string]string{"type": "text_delta", "text": outputText},
})
case "reasoning_content":
if text == "" {
return
}
startContentBlock("text")
h.sendSSE(w, flusher, "content_block_delta", map[string]interface{}{
"type": "content_block_delta",
"index": activeBlockIndex,
"delta": map[string]string{"type": "text_delta", "text": text},
})
default:
if thinkingState == 3 && text == "" {
if activeBlockType == "thinking" {
closeActiveBlock()
}
return
}
if text != "" {
startContentBlock("thinking")
h.sendSSE(w, flusher, "content_block_delta", map[string]interface{}{
"type": "content_block_delta",
"index": activeBlockIndex,
"delta": map[string]string{"type": "thinking_delta", "thinking": text},
})
}
if thinkingState == 3 && activeBlockType == "thinking" {
closeActiveBlock()
}
}
}
// 处理文本,解析 <thinking> 标签
var thinkingStarted bool
var eventThinkingOpen bool
processClaudeText := func(text string, isThinking bool, forceFlush bool) {
if isThinking && !thinking {
return
}
// 如果是 reasoningContentEvent直接输出
if isThinking {
if !allowReasoningSource(&thinkingSource) {
return
}
if !thinkingStarted {
sendText(text, 1)
thinkingStarted = true
eventThinkingOpen = true
} else {
sendText(text, 2)
}
return
}
if eventThinkingOpen {
sendText("", 3)
eventThinkingOpen = false
thinkingStarted = false
}
textBuffer += text
for {
@@ -540,6 +652,7 @@ func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Acco
}
textBuffer = textBuffer[thinkingStart+10:]
inThinkingBlock = true
dropTagThinking = !allowTagSource(&thinkingSource)
thinkingStarted = false
} else if forceFlush || len([]rune(textBuffer)) > 50 {
// 使用 rune 切片来正确处理 Unicode 字符
@@ -560,25 +673,33 @@ func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Acco
thinkingEnd := strings.Index(textBuffer, "</thinking>")
if thinkingEnd != -1 {
content := textBuffer[:thinkingEnd]
if !thinkingStarted {
sendText(content, 1)
sendText("", 3)
} else {
sendText(content, 3)
if !dropTagThinking {
if !thinkingStarted {
sendText(content, 1)
sendText("", 3)
} else {
sendText(content, 3)
}
}
textBuffer = textBuffer[thinkingEnd+11:]
inThinkingBlock = false
dropTagThinking = false
thinkingStarted = false
} else if forceFlush {
if textBuffer != "" {
if !thinkingStarted {
sendText(textBuffer, 1)
sendText("", 3)
} else {
sendText(textBuffer, 3)
if !dropTagThinking {
if !thinkingStarted {
sendText(textBuffer, 1)
sendText("", 3)
} else {
sendText(textBuffer, 3)
}
}
textBuffer = ""
}
inThinkingBlock = false
dropTagThinking = false
thinkingStarted = false
break
} else {
// 流式输出 thinking 块内的内容
@@ -586,11 +707,13 @@ func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Acco
if len(runes) > 20 {
safeLen := len(runes) - 15
if safeLen > 0 {
if !thinkingStarted {
sendText(string(runes[:safeLen]), 1)
thinkingStarted = true
} else {
sendText(string(runes[:safeLen]), 2)
if !dropTagThinking {
if !thinkingStarted {
sendText(string(runes[:safeLen]), 1)
thinkingStarted = true
} else {
sendText(string(runes[:safeLen]), 2)
}
}
textBuffer = string(runes[safeLen:])
}
@@ -605,11 +728,17 @@ func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Acco
h.sendSSE(w, flusher, "message_start", map[string]interface{}{
"type": "message_start",
"message": map[string]interface{}{
"id": msgID,
"type": "message",
"role": "assistant",
"content": []interface{}{},
"model": model,
"id": msgID,
"type": "message",
"role": "assistant",
"content": []interface{}{},
"model": model,
"stop_reason": nil,
"stop_sequence": nil,
"usage": map[string]int{
"input_tokens": startInputTokens,
"output_tokens": 0,
},
},
})
@@ -618,27 +747,26 @@ func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Acco
if text == "" {
return
}
if isThinking {
rawThinkingBuilder.WriteString(text)
} else {
rawContentBuilder.WriteString(text)
}
processClaudeText(text, isThinking, false)
},
OnToolUse: func(tu KiroToolUse) {
// 先刷新缓冲区
processClaudeText("", false, true)
rawContentBuilder.WriteString(tu.Name)
if b, err := json.Marshal(tu.Input); err == nil {
rawContentBuilder.Write(b)
}
toolUses = append(toolUses, tu)
closeActiveBlock()
// 关闭文本块
if contentStarted && toolUseIndex == 0 {
h.sendSSE(w, flusher, "content_block_stop", map[string]interface{}{
"type": "content_block_stop",
"index": 0,
})
}
idx := toolUseIndex
if contentStarted {
idx = toolUseIndex + 1
}
toolUseIndex++
idx := nextContentIndex
nextContentIndex++
h.sendSSE(w, flusher, "content_block_start", map[string]interface{}{
"type": "content_block_start",
@@ -691,19 +819,27 @@ func (h *Handler) handleClaudeStream(w http.ResponseWriter, account *config.Acco
// 刷新剩余缓冲区
processClaudeText("", false, true)
if eventThinkingOpen {
sendText("", 3)
eventThinkingOpen = false
}
closeActiveBlock()
inputTokens = estimatedInputTokens
outputContent, extractedReasoning := extractThinkingFromContent(rawContentBuilder.String())
thinkingOutput := rawThinkingBuilder.String()
if thinking && thinkingOutput == "" && extractedReasoning != "" {
thinkingOutput = extractedReasoning
}
if !thinking {
thinkingOutput = ""
}
outputTokens = estimateClaudeOutputTokens(outputContent, thinkingOutput, toolUses)
h.recordSuccess(inputTokens, outputTokens, credits)
h.pool.RecordSuccess(account.ID)
h.pool.UpdateStats(account.ID, inputTokens+outputTokens, credits)
// 关闭最后的内容块
if contentStarted && toolUseIndex == 0 {
h.sendSSE(w, flusher, "content_block_stop", map[string]interface{}{
"type": "content_block_stop",
"index": 0,
})
}
// 发送 message_delta
stopReason := "end_turn"
if len(toolUses) > 0 {
@@ -787,7 +923,7 @@ func (h *Handler) recordFailure() {
}
// handleClaudeNonStream Claude 非流式响应
func (h *Handler) handleClaudeNonStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string) {
func (h *Handler) handleClaudeNonStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string, thinking bool, estimatedInputTokens int) {
var content string
var thinkingContent string
var toolUses []KiroToolUse
@@ -825,25 +961,36 @@ func (h *Handler) handleClaudeNonStream(w http.ResponseWriter, account *config.A
return
}
// 合并 thinking 内容(如果有 reasoningContentEvent 的内容)
thinkingFormat := config.GetThinkingConfig().ClaudeFormat
finalContent, extractedReasoning := extractThinkingFromContent(content)
if thinking && thinkingContent == "" && extractedReasoning != "" {
thinkingContent = extractedReasoning
}
if !thinking {
thinkingContent = ""
}
inputTokens = estimatedInputTokens
outputTokens = estimateClaudeOutputTokens(finalContent, thinkingContent, toolUses)
h.recordSuccess(inputTokens, outputTokens, credits)
h.pool.RecordSuccess(account.ID)
h.pool.UpdateStats(account.ID, inputTokens+outputTokens, credits)
// 合并 thinking 内容(如果有 reasoningContentEvent 的内容)
thinkingFormat := config.GetThinkingConfig().ClaudeFormat
finalContent := content
if thinkingContent != "" {
if thinking && thinkingContent != "" {
switch thinkingFormat {
case "think":
finalContent = "<think>" + thinkingContent + "</think>" + content
finalContent = "<think>" + thinkingContent + "</think>" + finalContent
thinkingContent = ""
case "reasoning_content":
finalContent = thinkingContent + content // Claude 格式不支持 reasoning_content直接拼接
default: // "thinking"
finalContent = "<thinking>" + thinkingContent + "</thinking>" + content
finalContent = thinkingContent + finalContent // Claude 格式不支持 reasoning_content直接拼接
thinkingContent = ""
default:
}
}
resp := KiroToClaudeResponse(finalContent, toolUses, inputTokens, outputTokens, model)
resp := KiroToClaudeResponse(finalContent, thinkingContent, toolUses, inputTokens, outputTokens, model)
w.Header().Set("Content-Type", "application/json; charset=utf-8")
json.NewEncoder(w).Encode(resp)
}
@@ -894,18 +1041,19 @@ func (h *Handler) handleOpenAIChat(w http.ResponseWriter, r *http.Request) {
thinkingCfg := config.GetThinkingConfig()
actualModel, thinking := ParseModelAndThinking(req.Model, thinkingCfg.Suffix)
req.Model = actualModel
estimatedInputTokens := estimateOpenAIRequestInputTokens(&req)
kiroPayload := OpenAIToKiro(&req, thinking)
if req.Stream {
h.handleOpenAIStream(w, account, kiroPayload, req.Model)
h.handleOpenAIStream(w, account, kiroPayload, req.Model, thinking, estimatedInputTokens)
} else {
h.handleOpenAINonStream(w, account, kiroPayload, req.Model)
h.handleOpenAINonStream(w, account, kiroPayload, req.Model, thinking, estimatedInputTokens)
}
}
// handleOpenAIStream OpenAI 流式响应
func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string) {
func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string, thinking bool, estimatedInputTokens int) {
w.Header().Set("Content-Type", "text/event-stream; charset=utf-8")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
@@ -924,10 +1072,14 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
var toolCallIndex int
var inputTokens, outputTokens int
var credits float64
var rawContentBuilder strings.Builder
var rawReasoningBuilder strings.Builder
// Thinking 标签解析状态
var textBuffer string
var inThinkingBlock bool
var dropTagThinking bool
var thinkingSource thinkingStreamSource
// 发送 chunk 的辅助函数
// thinkingState: 0=普通内容, 1=thinking开始, 2=thinking中间, 3=thinking结束
@@ -939,6 +1091,9 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
var chunk map[string]interface{}
if thinkingState > 0 {
if !thinking {
return
}
// thinking 内容
switch thinkingFormat {
case "thinking":
@@ -1031,19 +1186,34 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
// 处理文本,解析 <thinking> 标签
// thinkingStarted 用于跟踪是否已发送开始标签
var thinkingStarted bool
var eventThinkingOpen bool
processText := func(text string, isThinking bool, forceFlush bool) {
if isThinking && !thinking {
return
}
// 如果是 reasoningContentEvent直接输出
if isThinking {
if !allowReasoningSource(&thinkingSource) {
return
}
if !thinkingStarted {
sendChunk(text, 1) // 开始
thinkingStarted = true
eventThinkingOpen = true
} else {
sendChunk(text, 2) // 中间
}
return
}
if eventThinkingOpen {
sendChunk("", 3)
eventThinkingOpen = false
thinkingStarted = false
}
textBuffer += text
for {
@@ -1057,6 +1227,7 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
}
textBuffer = textBuffer[thinkingStart+10:] // 移除 <thinking>
inThinkingBlock = true
dropTagThinking = !allowTagSource(&thinkingSource)
thinkingStarted = false // 重置,准备发送新的开始标签
} else if forceFlush || len([]rune(textBuffer)) > 50 {
// 没有找到标签,安全输出(保留可能的部分标签)
@@ -1079,28 +1250,36 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
if thinkingEnd != -1 {
// 输出 thinking 内容
content := textBuffer[:thinkingEnd]
if !thinkingStarted {
// 一次性输出完整内容(开始+内容+结束)
sendChunk(content, 1) // 开始
sendChunk("", 3) // 结束(空内容,只发结束标签)
} else {
// 已经开始了,发送剩余内容和结束
sendChunk(content, 3) // 结束
if !dropTagThinking {
if !thinkingStarted {
// 一次性输出完整内容(开始+内容+结束)
sendChunk(content, 1) // 开始
sendChunk("", 3) // 结束(空内容,只发结束标签)
} else {
// 已经开始了,发送剩余内容和结束
sendChunk(content, 3) // 结束
}
}
textBuffer = textBuffer[thinkingEnd+11:] // 移除 </thinking>
inThinkingBlock = false
dropTagThinking = false
thinkingStarted = false
} else if forceFlush {
// 强制刷新:输出剩余内容
if textBuffer != "" {
if !thinkingStarted {
sendChunk(textBuffer, 1) // 开始
sendChunk("", 3) // 结束
} else {
sendChunk(textBuffer, 3) // 结束
if !dropTagThinking {
if !thinkingStarted {
sendChunk(textBuffer, 1) // 开始
sendChunk("", 3) // 结束
} else {
sendChunk(textBuffer, 3) // 结束
}
}
textBuffer = ""
}
inThinkingBlock = false
dropTagThinking = false
thinkingStarted = false
break
} else {
// 流式输出 thinking 块内的内容
@@ -1108,11 +1287,13 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
if len(runes) > 20 {
safeLen := len(runes) - 15 // 保留可能的 </thinking> 部分
if safeLen > 0 {
if !thinkingStarted {
sendChunk(string(runes[:safeLen]), 1) // 开始
thinkingStarted = true
} else {
sendChunk(string(runes[:safeLen]), 2) // 中间
if !dropTagThinking {
if !thinkingStarted {
sendChunk(string(runes[:safeLen]), 1) // 开始
thinkingStarted = true
} else {
sendChunk(string(runes[:safeLen]), 2) // 中间
}
}
textBuffer = string(runes[safeLen:])
}
@@ -1128,6 +1309,11 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
if text == "" {
return
}
if isThinking {
rawReasoningBuilder.WriteString(text)
} else {
rawContentBuilder.WriteString(text)
}
processText(text, isThinking, false)
},
OnToolUse: func(tu KiroToolUse) {
@@ -1135,6 +1321,8 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
processText("", false, true)
args, _ := json.Marshal(tu.Input)
rawContentBuilder.WriteString(tu.Name)
rawContentBuilder.Write(args)
tc := ToolCall{ID: tu.ToolUseID, Type: "function"}
tc.Function.Name = tu.Name
tc.Function.Arguments = string(args)
@@ -1187,6 +1375,25 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
// 刷新剩余缓冲区
processText("", false, true)
if eventThinkingOpen {
sendChunk("", 3)
eventThinkingOpen = false
}
inputTokens = estimatedInputTokens
outputContent, extractedReasoning := extractThinkingFromContent(rawContentBuilder.String())
reasoningOutput := rawReasoningBuilder.String()
if thinking && reasoningOutput == "" && extractedReasoning != "" {
reasoningOutput = extractedReasoning
}
if !thinking {
reasoningOutput = ""
}
outputTokens = estimateApproxTokens(outputContent) + estimateApproxTokens(reasoningOutput)
for _, tc := range toolCalls {
outputTokens += estimateApproxTokens(tc.Function.Name)
outputTokens += estimateApproxTokens(tc.Function.Arguments)
}
h.recordSuccess(inputTokens, outputTokens, credits)
h.pool.RecordSuccess(account.ID)
@@ -1221,7 +1428,7 @@ func (h *Handler) handleOpenAIStream(w http.ResponseWriter, account *config.Acco
}
// handleOpenAINonStream OpenAI 非流式响应
func (h *Handler) handleOpenAINonStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string) {
func (h *Handler) handleOpenAINonStream(w http.ResponseWriter, account *config.Account, payload *KiroPayload, model string, thinking bool, estimatedInputTokens int) {
var content string
var reasoningContent string
var toolUses []KiroToolUse
@@ -1250,16 +1457,21 @@ func (h *Handler) handleOpenAINonStream(w http.ResponseWriter, account *config.A
return
}
// 解析 content 中的 <thinking> 标签
finalContent, extractedReasoning := extractThinkingFromContent(content)
if thinking && reasoningContent == "" && extractedReasoning != "" {
reasoningContent = extractedReasoning
} else if !thinking {
reasoningContent = ""
}
inputTokens = estimatedInputTokens
outputTokens = estimateOpenAIOutputTokens(finalContent, reasoningContent, toolUses)
h.recordSuccess(inputTokens, outputTokens, credits)
h.pool.RecordSuccess(account.ID)
h.pool.UpdateStats(account.ID, inputTokens+outputTokens, credits)
// 解析 content 中的 <thinking> 标签
finalContent, extractedReasoning := extractThinkingFromContent(content)
if extractedReasoning != "" {
reasoningContent = extractedReasoning + reasoningContent
}
thinkingFormat := config.GetThinkingConfig().OpenAIFormat
resp := KiroToOpenAIResponseWithReasoning(finalContent, reasoningContent, toolUses, inputTokens, outputTokens, model, thinkingFormat)
w.Header().Set("Content-Type", "application/json; charset=utf-8")