feat(账号): 添加 Sora 账号双表同步与创建
- 新增 sora_accounts 表与 accounts.extra GIN 索引\n- OpenAI OAuth 支持同时创建 Sora 账号并同步配置\n- Token 刷新同步关联 Sora 账号凭证与扩展表\n- 增加 Sora 账号连通性测试与前端开关文案
This commit is contained in:
@@ -1553,3 +1553,64 @@ func joinClauses(clauses []string, sep string) string {
|
||||
func itoa(v int) string {
|
||||
return strconv.Itoa(v)
|
||||
}
|
||||
|
||||
// FindByExtraField 根据 extra 字段中的键值对查找账号。
|
||||
// 该方法限定 platform='sora',避免误查询其他平台的账号。
|
||||
// 使用 PostgreSQL JSONB @> 操作符进行高效查询(需要 GIN 索引支持)。
|
||||
//
|
||||
// 应用场景:查找通过 linked_openai_account_id 关联的 Sora 账号。
|
||||
//
|
||||
// FindByExtraField finds accounts by key-value pairs in the extra field.
|
||||
// Limited to platform='sora' to avoid querying accounts from other platforms.
|
||||
// Uses PostgreSQL JSONB @> operator for efficient queries (requires GIN index).
|
||||
//
|
||||
// Use case: Finding Sora accounts linked via linked_openai_account_id.
|
||||
func (r *accountRepository) FindByExtraField(ctx context.Context, key string, value interface{}) ([]service.Account, error) {
|
||||
accounts, err := r.client.Account.Query().
|
||||
Where(
|
||||
dbaccount.PlatformEQ("sora"), // 限定平台为 sora
|
||||
dbaccount.DeletedAtIsNil(),
|
||||
func(s *entsql.Selector) {
|
||||
path := sqljson.Path(key)
|
||||
switch v := value.(type) {
|
||||
case string:
|
||||
preds := []*entsql.Predicate{sqljson.ValueEQ(dbaccount.FieldExtra, v, path)}
|
||||
if parsed, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||||
preds = append(preds, sqljson.ValueEQ(dbaccount.FieldExtra, parsed, path))
|
||||
}
|
||||
if len(preds) == 1 {
|
||||
s.Where(preds[0])
|
||||
} else {
|
||||
s.Where(entsql.Or(preds...))
|
||||
}
|
||||
case int:
|
||||
s.Where(entsql.Or(
|
||||
sqljson.ValueEQ(dbaccount.FieldExtra, v, path),
|
||||
sqljson.ValueEQ(dbaccount.FieldExtra, strconv.Itoa(v), path),
|
||||
))
|
||||
case int64:
|
||||
s.Where(entsql.Or(
|
||||
sqljson.ValueEQ(dbaccount.FieldExtra, v, path),
|
||||
sqljson.ValueEQ(dbaccount.FieldExtra, strconv.FormatInt(v, 10), path),
|
||||
))
|
||||
case json.Number:
|
||||
if parsed, err := v.Int64(); err == nil {
|
||||
s.Where(entsql.Or(
|
||||
sqljson.ValueEQ(dbaccount.FieldExtra, parsed, path),
|
||||
sqljson.ValueEQ(dbaccount.FieldExtra, v.String(), path),
|
||||
))
|
||||
} else {
|
||||
s.Where(sqljson.ValueEQ(dbaccount.FieldExtra, v.String(), path))
|
||||
}
|
||||
default:
|
||||
s.Where(sqljson.ValueEQ(dbaccount.FieldExtra, value, path))
|
||||
}
|
||||
},
|
||||
).
|
||||
All(ctx)
|
||||
if err != nil {
|
||||
return nil, translatePersistenceError(err, service.ErrAccountNotFound, nil)
|
||||
}
|
||||
|
||||
return r.accountsToService(ctx, accounts)
|
||||
}
|
||||
|
||||
98
backend/internal/repository/sora_account_repo.go
Normal file
98
backend/internal/repository/sora_account_repo.go
Normal file
@@ -0,0 +1,98 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
|
||||
"github.com/Wei-Shaw/sub2api/internal/service"
|
||||
)
|
||||
|
||||
// soraAccountRepository 实现 service.SoraAccountRepository 接口。
|
||||
// 使用原生 SQL 操作 sora_accounts 表,因为该表不在 Ent ORM 管理范围内。
|
||||
//
|
||||
// 设计说明:
|
||||
// - sora_accounts 表是独立迁移创建的,不通过 Ent Schema 管理
|
||||
// - 使用 ON CONFLICT (account_id) DO UPDATE 实现 Upsert 语义
|
||||
// - 与 accounts 主表通过外键关联,ON DELETE CASCADE 确保级联删除
|
||||
type soraAccountRepository struct {
|
||||
sql *sql.DB
|
||||
}
|
||||
|
||||
// NewSoraAccountRepository 创建 Sora 账号扩展表仓储实例
|
||||
func NewSoraAccountRepository(sqlDB *sql.DB) service.SoraAccountRepository {
|
||||
return &soraAccountRepository{sql: sqlDB}
|
||||
}
|
||||
|
||||
// Upsert 创建或更新 Sora 账号扩展信息
|
||||
// 使用 PostgreSQL ON CONFLICT ... DO UPDATE 实现原子性 upsert
|
||||
func (r *soraAccountRepository) Upsert(ctx context.Context, accountID int64, updates map[string]any) error {
|
||||
accessToken, accessOK := updates["access_token"].(string)
|
||||
refreshToken, refreshOK := updates["refresh_token"].(string)
|
||||
sessionToken, sessionOK := updates["session_token"].(string)
|
||||
|
||||
if !accessOK || accessToken == "" || !refreshOK || refreshToken == "" {
|
||||
if !sessionOK {
|
||||
return errors.New("缺少 access_token/refresh_token,且未提供可更新字段")
|
||||
}
|
||||
result, err := r.sql.ExecContext(ctx, `
|
||||
UPDATE sora_accounts
|
||||
SET session_token = CASE WHEN $2 = '' THEN session_token ELSE $2 END,
|
||||
updated_at = NOW()
|
||||
WHERE account_id = $1
|
||||
`, accountID, sessionToken)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := result.RowsAffected()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if rows == 0 {
|
||||
return errors.New("sora_accounts 记录不存在,无法仅更新 session_token")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := r.sql.ExecContext(ctx, `
|
||||
INSERT INTO sora_accounts (account_id, access_token, refresh_token, session_token, created_at, updated_at)
|
||||
VALUES ($1, $2, $3, $4, NOW(), NOW())
|
||||
ON CONFLICT (account_id) DO UPDATE SET
|
||||
access_token = EXCLUDED.access_token,
|
||||
refresh_token = EXCLUDED.refresh_token,
|
||||
session_token = CASE WHEN EXCLUDED.session_token = '' THEN sora_accounts.session_token ELSE EXCLUDED.session_token END,
|
||||
updated_at = NOW()
|
||||
`, accountID, accessToken, refreshToken, sessionToken)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetByAccountID 根据账号 ID 获取 Sora 扩展信息
|
||||
func (r *soraAccountRepository) GetByAccountID(ctx context.Context, accountID int64) (*service.SoraAccount, error) {
|
||||
rows, err := r.sql.QueryContext(ctx, `
|
||||
SELECT account_id, access_token, refresh_token, COALESCE(session_token, '')
|
||||
FROM sora_accounts
|
||||
WHERE account_id = $1
|
||||
`, accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
if !rows.Next() {
|
||||
return nil, nil // 记录不存在
|
||||
}
|
||||
|
||||
var sa service.SoraAccount
|
||||
if err := rows.Scan(&sa.AccountID, &sa.AccessToken, &sa.RefreshToken, &sa.SessionToken); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &sa, nil
|
||||
}
|
||||
|
||||
// Delete 删除 Sora 账号扩展信息
|
||||
func (r *soraAccountRepository) Delete(ctx context.Context, accountID int64) error {
|
||||
_, err := r.sql.ExecContext(ctx, `
|
||||
DELETE FROM sora_accounts WHERE account_id = $1
|
||||
`, accountID)
|
||||
return err
|
||||
}
|
||||
@@ -53,6 +53,7 @@ var ProviderSet = wire.NewSet(
|
||||
NewAPIKeyRepository,
|
||||
NewGroupRepository,
|
||||
NewAccountRepository,
|
||||
NewSoraAccountRepository, // Sora 账号扩展表仓储
|
||||
NewProxyRepository,
|
||||
NewRedeemCodeRepository,
|
||||
NewPromoCodeRepository,
|
||||
|
||||
@@ -594,7 +594,7 @@ func newContractDeps(t *testing.T) *contractDeps {
|
||||
settingRepo := newStubSettingRepo()
|
||||
settingService := service.NewSettingService(settingRepo, cfg)
|
||||
|
||||
adminService := service.NewAdminService(userRepo, groupRepo, &accountRepo, proxyRepo, apiKeyRepo, redeemRepo, nil, nil, nil, nil)
|
||||
adminService := service.NewAdminService(userRepo, groupRepo, &accountRepo, nil, proxyRepo, apiKeyRepo, redeemRepo, nil, nil, nil, nil)
|
||||
authHandler := handler.NewAuthHandler(cfg, nil, userService, settingService, nil)
|
||||
apiKeyHandler := handler.NewAPIKeyHandler(apiKeyService)
|
||||
usageHandler := handler.NewUsageHandler(usageService, apiKeyService)
|
||||
|
||||
@@ -25,6 +25,9 @@ type AccountRepository interface {
|
||||
// GetByCRSAccountID finds an account previously synced from CRS.
|
||||
// Returns (nil, nil) if not found.
|
||||
GetByCRSAccountID(ctx context.Context, crsAccountID string) (*Account, error)
|
||||
// FindByExtraField 根据 extra 字段中的键值对查找账号(限定 platform='sora')
|
||||
// 用于查找通过 linked_openai_account_id 关联的 Sora 账号
|
||||
FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error)
|
||||
Update(ctx context.Context, account *Account) error
|
||||
Delete(ctx context.Context, id int64) error
|
||||
|
||||
|
||||
@@ -54,6 +54,10 @@ func (s *accountRepoStub) GetByCRSAccountID(ctx context.Context, crsAccountID st
|
||||
panic("unexpected GetByCRSAccountID call")
|
||||
}
|
||||
|
||||
func (s *accountRepoStub) FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error) {
|
||||
panic("unexpected FindByExtraField call")
|
||||
}
|
||||
|
||||
func (s *accountRepoStub) Update(ctx context.Context, account *Account) error {
|
||||
panic("unexpected Update call")
|
||||
}
|
||||
|
||||
@@ -31,6 +31,7 @@ var sseDataPrefix = regexp.MustCompile(`^data:\s*`)
|
||||
const (
|
||||
testClaudeAPIURL = "https://api.anthropic.com/v1/messages"
|
||||
chatgptCodexAPIURL = "https://chatgpt.com/backend-api/codex/responses"
|
||||
soraMeAPIURL = "https://sora.chatgpt.com/backend/me" // Sora 用户信息接口,用于测试连接
|
||||
)
|
||||
|
||||
// TestEvent represents a SSE event for account testing
|
||||
@@ -163,6 +164,10 @@ func (s *AccountTestService) TestAccountConnection(c *gin.Context, accountID int
|
||||
return s.testAntigravityAccountConnection(c, account, modelID)
|
||||
}
|
||||
|
||||
if account.Platform == PlatformSora {
|
||||
return s.testSoraAccountConnection(c, account)
|
||||
}
|
||||
|
||||
return s.testClaudeAccountConnection(c, account, modelID)
|
||||
}
|
||||
|
||||
@@ -461,6 +466,74 @@ func (s *AccountTestService) testGeminiAccountConnection(c *gin.Context, account
|
||||
return s.processGeminiStream(c, resp.Body)
|
||||
}
|
||||
|
||||
// testSoraAccountConnection 测试 Sora 账号的连接
|
||||
// 调用 /backend/me 接口验证 access_token 有效性(不需要 Sentinel Token)
|
||||
func (s *AccountTestService) testSoraAccountConnection(c *gin.Context, account *Account) error {
|
||||
ctx := c.Request.Context()
|
||||
|
||||
authToken := account.GetCredential("access_token")
|
||||
if authToken == "" {
|
||||
return s.sendErrorAndEnd(c, "No access token available")
|
||||
}
|
||||
|
||||
// Set SSE headers
|
||||
c.Writer.Header().Set("Content-Type", "text/event-stream")
|
||||
c.Writer.Header().Set("Cache-Control", "no-cache")
|
||||
c.Writer.Header().Set("Connection", "keep-alive")
|
||||
c.Writer.Header().Set("X-Accel-Buffering", "no")
|
||||
c.Writer.Flush()
|
||||
|
||||
// Send test_start event
|
||||
s.sendEvent(c, TestEvent{Type: "test_start", Model: "sora"})
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", soraMeAPIURL, nil)
|
||||
if err != nil {
|
||||
return s.sendErrorAndEnd(c, "Failed to create request")
|
||||
}
|
||||
|
||||
// 使用 Sora 客户端标准请求头(参考 sora2api)
|
||||
req.Header.Set("Authorization", "Bearer "+authToken)
|
||||
req.Header.Set("User-Agent", "Sora/1.2026.007 (Android 15; 24122RKC7C; build 2600700)")
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
// Get proxy URL
|
||||
proxyURL := ""
|
||||
if account.ProxyID != nil && account.Proxy != nil {
|
||||
proxyURL = account.Proxy.URL()
|
||||
}
|
||||
|
||||
resp, err := s.httpUpstream.DoWithTLS(req, proxyURL, account.ID, account.Concurrency, account.IsTLSFingerprintEnabled())
|
||||
if err != nil {
|
||||
return s.sendErrorAndEnd(c, fmt.Sprintf("Request failed: %s", err.Error()))
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return s.sendErrorAndEnd(c, fmt.Sprintf("Sora API returned %d: %s", resp.StatusCode, string(body)))
|
||||
}
|
||||
|
||||
// 解析 /me 响应,提取用户信息
|
||||
var meResp map[string]any
|
||||
if err := json.Unmarshal(body, &meResp); err != nil {
|
||||
// 能收到 200 就说明 token 有效
|
||||
s.sendEvent(c, TestEvent{Type: "content", Text: "Sora connection OK (token valid)"})
|
||||
} else {
|
||||
// 尝试提取用户名或邮箱信息
|
||||
info := "Sora connection OK"
|
||||
if name, ok := meResp["name"].(string); ok && name != "" {
|
||||
info = fmt.Sprintf("Sora connection OK - User: %s", name)
|
||||
} else if email, ok := meResp["email"].(string); ok && email != "" {
|
||||
info = fmt.Sprintf("Sora connection OK - Email: %s", email)
|
||||
}
|
||||
s.sendEvent(c, TestEvent{Type: "content", Text: info})
|
||||
}
|
||||
|
||||
s.sendEvent(c, TestEvent{Type: "test_complete", Success: true})
|
||||
return nil
|
||||
}
|
||||
|
||||
// testAntigravityAccountConnection tests an Antigravity account's connection
|
||||
// 支持 Claude 和 Gemini 两种协议,使用非流式请求
|
||||
func (s *AccountTestService) testAntigravityAccountConnection(c *gin.Context, account *Account, modelID string) error {
|
||||
|
||||
@@ -272,6 +272,7 @@ type adminServiceImpl struct {
|
||||
userRepo UserRepository
|
||||
groupRepo GroupRepository
|
||||
accountRepo AccountRepository
|
||||
soraAccountRepo SoraAccountRepository // Sora 账号扩展表仓储
|
||||
proxyRepo ProxyRepository
|
||||
apiKeyRepo APIKeyRepository
|
||||
redeemCodeRepo RedeemCodeRepository
|
||||
@@ -286,6 +287,7 @@ func NewAdminService(
|
||||
userRepo UserRepository,
|
||||
groupRepo GroupRepository,
|
||||
accountRepo AccountRepository,
|
||||
soraAccountRepo SoraAccountRepository,
|
||||
proxyRepo ProxyRepository,
|
||||
apiKeyRepo APIKeyRepository,
|
||||
redeemCodeRepo RedeemCodeRepository,
|
||||
@@ -298,6 +300,7 @@ func NewAdminService(
|
||||
userRepo: userRepo,
|
||||
groupRepo: groupRepo,
|
||||
accountRepo: accountRepo,
|
||||
soraAccountRepo: soraAccountRepo,
|
||||
proxyRepo: proxyRepo,
|
||||
apiKeyRepo: apiKeyRepo,
|
||||
redeemCodeRepo: redeemCodeRepo,
|
||||
@@ -862,6 +865,18 @@ func (s *adminServiceImpl) CreateAccount(ctx context.Context, input *CreateAccou
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 如果是 Sora 平台账号,自动创建 sora_accounts 扩展表记录
|
||||
if account.Platform == PlatformSora && s.soraAccountRepo != nil {
|
||||
soraUpdates := map[string]any{
|
||||
"access_token": account.GetCredential("access_token"),
|
||||
"refresh_token": account.GetCredential("refresh_token"),
|
||||
}
|
||||
if err := s.soraAccountRepo.Upsert(ctx, account.ID, soraUpdates); err != nil {
|
||||
// 只记录警告日志,不阻塞账号创建
|
||||
log.Printf("[AdminService] 创建 sora_accounts 记录失败: account_id=%d err=%v", account.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// 绑定分组
|
||||
if len(groupIDs) > 0 {
|
||||
if err := s.accountRepo.BindGroups(ctx, account.ID, groupIDs); err != nil {
|
||||
|
||||
@@ -22,6 +22,7 @@ const (
|
||||
PlatformOpenAI = "openai"
|
||||
PlatformGemini = "gemini"
|
||||
PlatformAntigravity = "antigravity"
|
||||
PlatformSora = "sora"
|
||||
)
|
||||
|
||||
// Account type constants
|
||||
|
||||
@@ -77,6 +77,9 @@ func (m *mockAccountRepoForPlatform) Create(ctx context.Context, account *Accoun
|
||||
func (m *mockAccountRepoForPlatform) GetByCRSAccountID(ctx context.Context, crsAccountID string) (*Account, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockAccountRepoForPlatform) FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockAccountRepoForPlatform) Update(ctx context.Context, account *Account) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -66,6 +66,9 @@ func (m *mockAccountRepoForGemini) Create(ctx context.Context, account *Account)
|
||||
func (m *mockAccountRepoForGemini) GetByCRSAccountID(ctx context.Context, crsAccountID string) (*Account, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockAccountRepoForGemini) FindByExtraField(ctx context.Context, key string, value interface{}) ([]Account, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *mockAccountRepoForGemini) Update(ctx context.Context, account *Account) error { return nil }
|
||||
func (m *mockAccountRepoForGemini) Delete(ctx context.Context, id int64) error { return nil }
|
||||
func (m *mockAccountRepoForGemini) List(ctx context.Context, params pagination.PaginationParams) ([]Account, *pagination.PaginationResult, error) {
|
||||
|
||||
40
backend/internal/service/sora_account_service.go
Normal file
40
backend/internal/service/sora_account_service.go
Normal file
@@ -0,0 +1,40 @@
|
||||
package service
|
||||
|
||||
import "context"
|
||||
|
||||
// SoraAccountRepository Sora 账号扩展表仓储接口
|
||||
// 用于管理 sora_accounts 表,与 accounts 主表形成双表结构。
|
||||
//
|
||||
// 设计说明:
|
||||
// - sora_accounts 表存储 Sora 账号的 OAuth 凭证副本
|
||||
// - Sora gateway 优先读取此表的字段以获得更好的查询性能
|
||||
// - 主表 accounts 通过 credentials JSON 字段也存储相同信息
|
||||
// - Token 刷新时需要同时更新两个表以保持数据一致性
|
||||
type SoraAccountRepository interface {
|
||||
// Upsert 创建或更新 Sora 账号扩展信息
|
||||
// accountID: 关联的 accounts.id
|
||||
// updates: 要更新的字段,支持 access_token、refresh_token、session_token
|
||||
//
|
||||
// 如果记录不存在则创建,存在则更新。
|
||||
// 用于:
|
||||
// 1. 创建 Sora 账号时初始化扩展表
|
||||
// 2. Token 刷新时同步更新扩展表
|
||||
Upsert(ctx context.Context, accountID int64, updates map[string]any) error
|
||||
|
||||
// GetByAccountID 根据账号 ID 获取 Sora 扩展信息
|
||||
// 返回 nil, nil 表示记录不存在(非错误)
|
||||
GetByAccountID(ctx context.Context, accountID int64) (*SoraAccount, error)
|
||||
|
||||
// Delete 删除 Sora 账号扩展信息
|
||||
// 通常由外键 ON DELETE CASCADE 自动处理,此方法用于手动清理
|
||||
Delete(ctx context.Context, accountID int64) error
|
||||
}
|
||||
|
||||
// SoraAccount Sora 账号扩展信息
|
||||
// 对应 sora_accounts 表,存储 Sora 账号的 OAuth 凭证副本
|
||||
type SoraAccount struct {
|
||||
AccountID int64 // 关联的 accounts.id
|
||||
AccessToken string // OAuth access_token
|
||||
RefreshToken string // OAuth refresh_token
|
||||
SessionToken string // Session token(可选,用于 ST→AT 兜底)
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
// 定期检查并刷新即将过期的token
|
||||
type TokenRefreshService struct {
|
||||
accountRepo AccountRepository
|
||||
soraAccountRepo SoraAccountRepository // Sora 扩展表仓储,用于双表同步
|
||||
refreshers []TokenRefresher
|
||||
cfg *config.TokenRefreshConfig
|
||||
cacheInvalidator TokenCacheInvalidator
|
||||
@@ -43,7 +44,7 @@ func NewTokenRefreshService(
|
||||
// 注册平台特定的刷新器
|
||||
s.refreshers = []TokenRefresher{
|
||||
NewClaudeTokenRefresher(oauthService),
|
||||
NewOpenAITokenRefresher(openaiOAuthService),
|
||||
NewOpenAITokenRefresher(openaiOAuthService, accountRepo),
|
||||
NewGeminiTokenRefresher(geminiOAuthService),
|
||||
NewAntigravityTokenRefresher(antigravityOAuthService),
|
||||
}
|
||||
@@ -51,6 +52,19 @@ func NewTokenRefreshService(
|
||||
return s
|
||||
}
|
||||
|
||||
// SetSoraAccountRepo 设置 Sora 账号扩展表仓储
|
||||
// 用于在 OpenAI Token 刷新时同步更新 sora_accounts 表
|
||||
// 需要在 Start() 之前调用
|
||||
func (s *TokenRefreshService) SetSoraAccountRepo(repo SoraAccountRepository) {
|
||||
s.soraAccountRepo = repo
|
||||
// 将 soraAccountRepo 注入到 OpenAITokenRefresher
|
||||
for _, refresher := range s.refreshers {
|
||||
if openaiRefresher, ok := refresher.(*OpenAITokenRefresher); ok {
|
||||
openaiRefresher.SetSoraAccountRepo(repo)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start 启动后台刷新服务
|
||||
func (s *TokenRefreshService) Start() {
|
||||
if !s.cfg.Enabled {
|
||||
|
||||
@@ -2,6 +2,7 @@ package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
@@ -82,16 +83,26 @@ func (r *ClaudeTokenRefresher) Refresh(ctx context.Context, account *Account) (m
|
||||
|
||||
// OpenAITokenRefresher 处理 OpenAI OAuth token刷新
|
||||
type OpenAITokenRefresher struct {
|
||||
openaiOAuthService *OpenAIOAuthService
|
||||
openaiOAuthService *OpenAIOAuthService
|
||||
accountRepo AccountRepository
|
||||
soraAccountRepo SoraAccountRepository // Sora 扩展表仓储,用于双表同步
|
||||
}
|
||||
|
||||
// NewOpenAITokenRefresher 创建 OpenAI token刷新器
|
||||
func NewOpenAITokenRefresher(openaiOAuthService *OpenAIOAuthService) *OpenAITokenRefresher {
|
||||
func NewOpenAITokenRefresher(openaiOAuthService *OpenAIOAuthService, accountRepo AccountRepository) *OpenAITokenRefresher {
|
||||
return &OpenAITokenRefresher{
|
||||
openaiOAuthService: openaiOAuthService,
|
||||
accountRepo: accountRepo,
|
||||
}
|
||||
}
|
||||
|
||||
// SetSoraAccountRepo 设置 Sora 账号扩展表仓储
|
||||
// 用于在 Token 刷新时同步更新 sora_accounts 表
|
||||
// 如果未设置,syncLinkedSoraAccounts 只会更新 accounts.credentials
|
||||
func (r *OpenAITokenRefresher) SetSoraAccountRepo(repo SoraAccountRepository) {
|
||||
r.soraAccountRepo = repo
|
||||
}
|
||||
|
||||
// CanRefresh 检查是否能处理此账号
|
||||
// 只处理 openai 平台的 oauth 类型账号
|
||||
func (r *OpenAITokenRefresher) CanRefresh(account *Account) bool {
|
||||
@@ -112,6 +123,7 @@ func (r *OpenAITokenRefresher) NeedsRefresh(account *Account, refreshWindow time
|
||||
|
||||
// Refresh 执行token刷新
|
||||
// 保留原有credentials中的所有字段,只更新token相关字段
|
||||
// 刷新成功后,异步同步关联的 Sora 账号
|
||||
func (r *OpenAITokenRefresher) Refresh(ctx context.Context, account *Account) (map[string]any, error) {
|
||||
tokenInfo, err := r.openaiOAuthService.RefreshAccountToken(ctx, account)
|
||||
if err != nil {
|
||||
@@ -128,5 +140,68 @@ func (r *OpenAITokenRefresher) Refresh(ctx context.Context, account *Account) (m
|
||||
}
|
||||
}
|
||||
|
||||
// 异步同步关联的 Sora 账号(不阻塞主流程)
|
||||
if r.accountRepo != nil {
|
||||
go r.syncLinkedSoraAccounts(context.Background(), account.ID, newCredentials)
|
||||
}
|
||||
|
||||
return newCredentials, nil
|
||||
}
|
||||
|
||||
// syncLinkedSoraAccounts 同步关联的 Sora 账号的 token(双表同步)
|
||||
// 该方法异步执行,失败只记录日志,不影响主流程
|
||||
//
|
||||
// 同步策略:
|
||||
// 1. 更新 accounts.credentials(主表)
|
||||
// 2. 更新 sora_accounts 扩展表(如果 soraAccountRepo 已设置)
|
||||
//
|
||||
// 超时控制:30 秒,防止数据库阻塞导致 goroutine 泄漏
|
||||
func (r *OpenAITokenRefresher) syncLinkedSoraAccounts(ctx context.Context, openaiAccountID int64, newCredentials map[string]any) {
|
||||
// 添加超时控制,防止 goroutine 泄漏
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
// 1. 查找所有关联的 Sora 账号(限定 platform='sora')
|
||||
soraAccounts, err := r.accountRepo.FindByExtraField(ctx, "linked_openai_account_id", openaiAccountID)
|
||||
if err != nil {
|
||||
log.Printf("[TokenSync] 查找关联 Sora 账号失败: openai_account_id=%d err=%v", openaiAccountID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(soraAccounts) == 0 {
|
||||
// 没有关联的 Sora 账号,直接返回
|
||||
return
|
||||
}
|
||||
|
||||
// 2. 同步更新每个 Sora 账号的双表数据
|
||||
for _, soraAccount := range soraAccounts {
|
||||
// 2.1 更新 accounts.credentials(主表)
|
||||
soraAccount.Credentials["access_token"] = newCredentials["access_token"]
|
||||
soraAccount.Credentials["refresh_token"] = newCredentials["refresh_token"]
|
||||
if expiresAt, ok := newCredentials["expires_at"]; ok {
|
||||
soraAccount.Credentials["expires_at"] = expiresAt
|
||||
}
|
||||
|
||||
if err := r.accountRepo.Update(ctx, &soraAccount); err != nil {
|
||||
log.Printf("[TokenSync] 更新 Sora accounts 表失败: sora_account_id=%d openai_account_id=%d err=%v",
|
||||
soraAccount.ID, openaiAccountID, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// 2.2 更新 sora_accounts 扩展表(如果仓储已设置)
|
||||
if r.soraAccountRepo != nil {
|
||||
soraUpdates := map[string]any{
|
||||
"access_token": newCredentials["access_token"],
|
||||
"refresh_token": newCredentials["refresh_token"],
|
||||
}
|
||||
if err := r.soraAccountRepo.Upsert(ctx, soraAccount.ID, soraUpdates); err != nil {
|
||||
log.Printf("[TokenSync] 更新 sora_accounts 表失败: account_id=%d openai_account_id=%d err=%v",
|
||||
soraAccount.ID, openaiAccountID, err)
|
||||
// 继续处理其他账号,不中断
|
||||
}
|
||||
}
|
||||
|
||||
log.Printf("[TokenSync] 成功同步 Sora 账号 token: sora_account_id=%d openai_account_id=%d dual_table=%v",
|
||||
soraAccount.ID, openaiAccountID, r.soraAccountRepo != nil)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ func ProvideEmailQueueService(emailService *EmailService) *EmailQueueService {
|
||||
// ProvideTokenRefreshService creates and starts TokenRefreshService
|
||||
func ProvideTokenRefreshService(
|
||||
accountRepo AccountRepository,
|
||||
soraAccountRepo SoraAccountRepository, // Sora 扩展表仓储,用于双表同步
|
||||
oauthService *OAuthService,
|
||||
openaiOAuthService *OpenAIOAuthService,
|
||||
geminiOAuthService *GeminiOAuthService,
|
||||
@@ -47,6 +48,8 @@ func ProvideTokenRefreshService(
|
||||
cfg *config.Config,
|
||||
) *TokenRefreshService {
|
||||
svc := NewTokenRefreshService(accountRepo, oauthService, openaiOAuthService, geminiOAuthService, antigravityOAuthService, cacheInvalidator, cfg)
|
||||
// 注入 Sora 账号扩展表仓储,用于 OpenAI Token 刷新时同步 sora_accounts 表
|
||||
svc.SetSoraAccountRepo(soraAccountRepo)
|
||||
svc.Start()
|
||||
return svc
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user