重构项目结构,将 orm 和 rds 包迁移到 web/globals
This commit is contained in:
@@ -3,9 +3,9 @@ package services
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"platform/pkg/orm"
|
||||
auth2 "platform/web/auth"
|
||||
client2 "platform/web/domains/client"
|
||||
"platform/web/globals/orm"
|
||||
m "platform/web/models"
|
||||
q "platform/web/queries"
|
||||
"time"
|
||||
|
||||
@@ -10,14 +10,13 @@ import (
|
||||
"math"
|
||||
"math/rand/v2"
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/orm"
|
||||
"platform/pkg/rds"
|
||||
"platform/pkg/u"
|
||||
"platform/web/auth"
|
||||
"platform/web/core"
|
||||
channel2 "platform/web/domains/channel"
|
||||
proxy2 "platform/web/domains/proxy"
|
||||
g "platform/web/globals"
|
||||
"platform/web/globals/orm"
|
||||
m "platform/web/models"
|
||||
q "platform/web/queries"
|
||||
"strconv"
|
||||
@@ -652,7 +651,7 @@ func cache(ctx context.Context, channels []*m.Channel) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
pipe := rds.Client.TxPipeline()
|
||||
pipe := g.Redis.TxPipeline()
|
||||
|
||||
zList := make([]redis.Z, 0, len(channels))
|
||||
for _, channel := range channels {
|
||||
@@ -686,7 +685,7 @@ func deleteCache(ctx context.Context, channels []*m.Channel) error {
|
||||
for i := range channels {
|
||||
keys[i] = chKey(channels[i])
|
||||
}
|
||||
_, err := rds.Client.Del(ctx, keys...).Result()
|
||||
_, err := g.Redis.Del(ctx, keys...).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -4,11 +4,11 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"platform/pkg/orm"
|
||||
"platform/pkg/testutil"
|
||||
"platform/web/auth"
|
||||
g "platform/web/globals"
|
||||
"platform/web/globals/orm"
|
||||
"platform/web/models"
|
||||
testutil2 "platform/web/testutil"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -101,7 +101,7 @@ func Test_chKey(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_cache(t *testing.T) {
|
||||
mr := testutil.SetupRedisTest(t)
|
||||
mr := testutil2.SetupRedisTest(t)
|
||||
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
@@ -202,7 +202,7 @@ func Test_cache(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_deleteCache(t *testing.T) {
|
||||
mr := testutil.SetupRedisTest(t)
|
||||
mr := testutil2.SetupRedisTest(t)
|
||||
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
@@ -270,10 +270,10 @@ func Test_deleteCache(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_channelService_CreateChannel(t *testing.T) {
|
||||
mr := testutil.SetupRedisTest(t)
|
||||
db := testutil.SetupDBTest(t)
|
||||
mc := testutil.SetupCloudClientMock(t)
|
||||
mg := testutil.SetupGatewayClientMock(t)
|
||||
mr := testutil2.SetupRedisTest(t)
|
||||
db := testutil2.SetupDBTest(t)
|
||||
mc := testutil2.SetupCloudClientMock(t)
|
||||
mg := testutil2.SetupGatewayClientMock(t)
|
||||
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
@@ -390,7 +390,7 @@ func Test_channelService_CreateChannel(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
mg.PortConfigsMock = func(c *testutil.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
mg.PortConfigsMock = func(c *testutil2.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
if c.Host != proxy.Host {
|
||||
return fmt.Errorf("代理主机不符合预期: %s", c.Host)
|
||||
}
|
||||
@@ -540,7 +540,7 @@ func Test_channelService_CreateChannel(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
mg.PortConfigsMock = func(c *testutil.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
mg.PortConfigsMock = func(c *testutil2.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
if c.Host != proxy.Host {
|
||||
return fmt.Errorf("代理主机不符合预期: %s", c.Host)
|
||||
}
|
||||
@@ -684,7 +684,7 @@ func Test_channelService_CreateChannel(t *testing.T) {
|
||||
return nil
|
||||
}
|
||||
|
||||
mg.PortConfigsMock = func(c *testutil.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
mg.PortConfigsMock = func(c *testutil2.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
if c.Host != proxy.Host {
|
||||
return fmt.Errorf("代理主机不符合预期: %s", c.Host)
|
||||
}
|
||||
@@ -961,10 +961,10 @@ func Test_channelService_CreateChannel(t *testing.T) {
|
||||
}
|
||||
|
||||
func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
mr := testutil.SetupRedisTest(t)
|
||||
md := testutil.SetupDBTest(t)
|
||||
mg := testutil.SetupGatewayClientMock(t)
|
||||
mc := testutil.SetupCloudClientMock(t)
|
||||
mr := testutil2.SetupRedisTest(t)
|
||||
md := testutil2.SetupDBTest(t)
|
||||
mg := testutil2.SetupGatewayClientMock(t)
|
||||
mc := testutil2.SetupCloudClientMock(t)
|
||||
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
@@ -1055,7 +1055,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
}
|
||||
|
||||
// 模拟网关客户端的响应
|
||||
mg.PortActiveMock = func(m *testutil.MockGatewayClient, param ...g.PortActiveReq) (map[string]g.PortData, error) {
|
||||
mg.PortActiveMock = func(m *testutil2.MockGatewayClient, param ...g.PortActiveReq) (map[string]g.PortData, error) {
|
||||
switch {
|
||||
case m.Host == proxy.Host:
|
||||
return map[string]g.PortData{
|
||||
@@ -1069,7 +1069,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
}
|
||||
return nil, fmt.Errorf("代理主机不符合预期: %s", m.Host)
|
||||
}
|
||||
mg.PortConfigsMock = func(m *testutil.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
mg.PortConfigsMock = func(m *testutil2.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
switch {
|
||||
case m.Host == proxy.Host:
|
||||
for _, param := range params {
|
||||
@@ -1104,7 +1104,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
switch {
|
||||
case param.Uuid == proxy.Name:
|
||||
var edges = []string{"edge1", "edge2", "edge4"}
|
||||
if !testutil.SliceEqual(edges, param.Edge) {
|
||||
if !testutil2.SliceEqual(edges, param.Edge) {
|
||||
return 0, fmt.Errorf("边缘节点不符合预期3: %v", param.Edge)
|
||||
}
|
||||
if len(param.Config) != 0 {
|
||||
@@ -1113,7 +1113,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
return len(param.Edge), nil
|
||||
case param.Uuid == proxy2.Name:
|
||||
var edges = []string{"edge3"}
|
||||
if !testutil.SliceEqual(edges, param.Edge) {
|
||||
if !testutil2.SliceEqual(edges, param.Edge) {
|
||||
return 0, fmt.Errorf("边缘节点不符合预期4: %v", param.Edge)
|
||||
}
|
||||
if len(param.Config) != 0 {
|
||||
@@ -1169,7 +1169,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
}
|
||||
|
||||
// 模拟网关客户端的响应
|
||||
mg.PortActiveMock = func(m *testutil.MockGatewayClient, param ...g.PortActiveReq) (map[string]g.PortData, error) {
|
||||
mg.PortActiveMock = func(m *testutil2.MockGatewayClient, param ...g.PortActiveReq) (map[string]g.PortData, error) {
|
||||
switch {
|
||||
case m.Host == proxy.Host:
|
||||
return map[string]g.PortData{
|
||||
@@ -1183,7 +1183,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
}
|
||||
return nil, fmt.Errorf("代理主机不符合预期: %s", m.Host)
|
||||
}
|
||||
mg.PortConfigsMock = func(m *testutil.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
mg.PortConfigsMock = func(m *testutil2.MockGatewayClient, params []g.PortConfigsReq) error {
|
||||
switch {
|
||||
case m.Host == proxy.Host:
|
||||
for _, param := range params {
|
||||
@@ -1218,7 +1218,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
switch {
|
||||
case param.Uuid == proxy.Name:
|
||||
var edges = []string{"edge1", "edge2", "edge4"}
|
||||
if !testutil.SliceEqual(edges, param.Edge) {
|
||||
if !testutil2.SliceEqual(edges, param.Edge) {
|
||||
return 0, fmt.Errorf("边缘节点不符合预期7: %v", param.Edge)
|
||||
}
|
||||
if len(param.Config) != 0 {
|
||||
@@ -1227,7 +1227,7 @@ func Test_channelService_RemoveChannels(t *testing.T) {
|
||||
return len(param.Edge), nil
|
||||
case param.Uuid == proxy2.Name:
|
||||
var edges = []string{"edge3"}
|
||||
if !testutil.SliceEqual(edges, param.Edge) {
|
||||
if !testutil2.SliceEqual(edges, param.Edge) {
|
||||
return 0, fmt.Errorf("边缘节点不符合预期8: %v", param.Edge)
|
||||
}
|
||||
if len(param.Config) != 0 {
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"platform/pkg/rds"
|
||||
g "platform/web/globals"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
@@ -50,7 +50,7 @@ func (s *IdService) GenSerial(ctx context.Context) (string, error) {
|
||||
|
||||
// 使用Redis事务确保原子操作
|
||||
var sequence int64
|
||||
err := rds.Client.Watch(ctx, func(tx *redis.Tx) error {
|
||||
err := g.Redis.Watch(ctx, func(tx *redis.Tx) error {
|
||||
|
||||
// 获取当前序列号
|
||||
currentVal, err := tx.Get(ctx, key).Int64()
|
||||
|
||||
@@ -2,8 +2,8 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"platform/pkg/orm"
|
||||
"platform/web/models"
|
||||
g "platform/web/globals"
|
||||
m "platform/web/models"
|
||||
)
|
||||
|
||||
type NodeServiceErr string
|
||||
@@ -16,7 +16,7 @@ var Node = &nodeService{}
|
||||
|
||||
type nodeService struct{}
|
||||
|
||||
func (s *nodeService) Filter(ctx context.Context, userId int32, count int, config ...NodeFilterConfig) ([]*models.Node, error) {
|
||||
func (s *nodeService) Filter(ctx context.Context, userId int32, count int, config ...NodeFilterConfig) ([]*m.Node, error) {
|
||||
_config := NodeFilterConfig{}
|
||||
if len(config) > 0 {
|
||||
_config = config[0]
|
||||
@@ -26,7 +26,7 @@ func (s *nodeService) Filter(ctx context.Context, userId int32, count int, confi
|
||||
// 静态条件:省,市,运营商
|
||||
// 排序方式,1.分配给该用户的次数 2.分配给全部用户的次数 3.todo 节点的健康状态
|
||||
var nodes []*FilteredNode
|
||||
orm.DB.Raw(filterSqlRaw, userId, _config.Isp, _config.Prov, _config.City).
|
||||
g.DB.Raw(filterSqlRaw, userId, _config.Isp, _config.Prov, _config.City).
|
||||
Limit(count).
|
||||
Find(&nodes)
|
||||
|
||||
|
||||
@@ -5,11 +5,11 @@ import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"platform/pkg/orm"
|
||||
"platform/pkg/rds"
|
||||
bill2 "platform/web/domains/bill"
|
||||
resource2 "platform/web/domains/resource"
|
||||
trade2 "platform/web/domains/trade"
|
||||
g "platform/web/globals"
|
||||
"platform/web/globals/orm"
|
||||
m "platform/web/models"
|
||||
q "platform/web/queries"
|
||||
"strings"
|
||||
@@ -52,7 +52,7 @@ func (s *resourceService) PrepareResource(ctx context.Context, data *CreateResou
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = rds.Client.Set(ctx, result.TradeNo, reqStr, 30*time.Minute).Err()
|
||||
err = g.Redis.Set(ctx, result.TradeNo, reqStr, 30*time.Minute).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -69,7 +69,7 @@ func (s *resourceService) PrepareResource(ctx context.Context, data *CreateResou
|
||||
func (s *resourceService) CompleteResource(ctx context.Context, tradeNo string, rs *TransactionVerifyResult) error {
|
||||
|
||||
// 获取请求缓存
|
||||
reqStr, err := rds.Client.Get(ctx, tradeNo).Result()
|
||||
reqStr, err := g.Redis.Get(ctx, tradeNo).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -108,7 +108,7 @@ func (s *resourceService) CompleteResource(ctx context.Context, tradeNo string,
|
||||
}
|
||||
|
||||
// 删除缓存
|
||||
err = rds.Client.Del(ctx, tradeNo).Err()
|
||||
err = g.Redis.Del(ctx, tradeNo).Err()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -265,7 +265,7 @@ func createResource(q *q.Query, data *CreateResourceData, uid int32) (*m.Resourc
|
||||
|
||||
func (s *resourceService) CancelResource(ctx context.Context, tradeNo string, at time.Time, method trade2.Method) error {
|
||||
// 删除请求缓存
|
||||
_, err := rds.Client.Del(ctx, tradeNo).Result()
|
||||
_, err := g.Redis.Del(ctx, tradeNo).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -6,8 +6,8 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/rds"
|
||||
"platform/web/auth"
|
||||
g "platform/web/globals"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
@@ -45,7 +45,7 @@ type sessionService struct{}
|
||||
func (s *sessionService) Find(ctx context.Context, token string) (*auth.Context, error) {
|
||||
|
||||
// 读取认证数据
|
||||
authJSON, err := rds.Client.Get(ctx, accessKey(token)).Result()
|
||||
authJSON, err := g.Redis.Get(ctx, accessKey(token)).Result()
|
||||
if err != nil {
|
||||
if errors.Is(err, redis.Nil) {
|
||||
return nil, ErrInvalidToken
|
||||
@@ -89,7 +89,7 @@ func (s *sessionService) Create(ctx context.Context, authCtx auth.Context, remem
|
||||
var accessExpire = time.Duration(env.SessionAccessExpire) * time.Second
|
||||
var refreshExpire = time.Duration(env.SessionRefreshExpire) * time.Second
|
||||
|
||||
pipe := rds.Client.TxPipeline()
|
||||
pipe := g.Redis.TxPipeline()
|
||||
pipe.Set(ctx, accessKey(accessToken), authData, accessExpire)
|
||||
if remember {
|
||||
pipe.Set(ctx, refreshKey(refreshToken), refreshData, refreshExpire)
|
||||
@@ -116,7 +116,7 @@ func (s *sessionService) Refresh(ctx context.Context, refreshToken string) (*Tok
|
||||
var tokenDetails *TokenDetails
|
||||
|
||||
// 刷新令牌
|
||||
err := rds.Client.Watch(ctx, func(tx *redis.Tx) error {
|
||||
err := g.Redis.Watch(ctx, func(tx *redis.Tx) error {
|
||||
|
||||
// 先获取刷新令牌数据
|
||||
refreshJson, err := tx.Get(ctx, rKey).Result()
|
||||
@@ -185,7 +185,7 @@ func (s *sessionService) Refresh(ctx context.Context, refreshToken string) (*Tok
|
||||
|
||||
// Remove 删除会话
|
||||
func (s *sessionService) Remove(ctx context.Context, accessToken, refreshToken string) error {
|
||||
rds.Client.Del(ctx, accessKey(accessToken), refreshKey(refreshToken))
|
||||
g.Redis.Del(ctx, accessKey(accessToken), refreshKey(refreshToken))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -3,8 +3,8 @@ package services
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"platform/pkg/testutil"
|
||||
"platform/web/auth"
|
||||
"platform/web/testutil"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -7,12 +7,12 @@ import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/orm"
|
||||
"platform/pkg/u"
|
||||
bill2 "platform/web/domains/bill"
|
||||
coupon2 "platform/web/domains/coupon"
|
||||
trade2 "platform/web/domains/trade"
|
||||
g "platform/web/globals"
|
||||
"platform/web/globals/orm"
|
||||
m "platform/web/models"
|
||||
q "platform/web/queries"
|
||||
"strconv"
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"log/slog"
|
||||
"math/rand"
|
||||
"platform/pkg/env"
|
||||
"platform/pkg/rds"
|
||||
"platform/pkg/u"
|
||||
g "platform/web/globals"
|
||||
"strconv"
|
||||
@@ -50,7 +49,7 @@ func (s *verifierService) SendSms(ctx context.Context, phone string, purpose Ver
|
||||
keyLock := key + ":lock"
|
||||
|
||||
// 检查发送频率,1 分钟内只能发送一次
|
||||
err := rds.Client.Watch(ctx, func(tx *redis.Tx) error {
|
||||
err := g.Redis.Watch(ctx, func(tx *redis.Tx) error {
|
||||
result, err := tx.TTL(ctx, keyLock).Result()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -62,7 +61,7 @@ func (s *verifierService) SendSms(ctx context.Context, phone string, purpose Ver
|
||||
return VerifierServiceError("验证码检查异常")
|
||||
}
|
||||
|
||||
pipe := rds.Client.Pipeline()
|
||||
pipe := g.Redis.Pipeline()
|
||||
pipe.Set(ctx, keyLock, "", 1*time.Minute)
|
||||
_, err = pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
@@ -92,19 +91,19 @@ func (s *verifierService) SendSms(ctx context.Context, phone string, purpose Ver
|
||||
TemplateParam: u.P(string(params)),
|
||||
})
|
||||
if err != nil {
|
||||
_ = rds.Client.Del(ctx, key, keyLock).Err()
|
||||
_ = g.Redis.Del(ctx, key, keyLock).Err()
|
||||
return err
|
||||
}
|
||||
if response.Body.Code == nil || *response.Body.Code != "OK" {
|
||||
_ = rds.Client.Del(ctx, key, keyLock).Err()
|
||||
_ = g.Redis.Del(ctx, key, keyLock).Err()
|
||||
return VerifierServiceError("验证码发送失败")
|
||||
}
|
||||
}
|
||||
|
||||
// 设置验证码
|
||||
err = rds.Client.Set(ctx, key, code, 5*time.Minute).Err()
|
||||
err = g.Redis.Set(ctx, key, code, 5*time.Minute).Err()
|
||||
if err != nil {
|
||||
_ = rds.Client.Del(ctx, key, keyLock).Err()
|
||||
_ = g.Redis.Del(ctx, key, keyLock).Err()
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -116,10 +115,10 @@ func (s *verifierService) VerifySms(ctx context.Context, phone, code string) err
|
||||
key := smsKey(phone, VerifierSmsPurposeLogin)
|
||||
keyLock := key + ":lock"
|
||||
|
||||
err := rds.Client.Watch(ctx, func(tx *redis.Tx) error {
|
||||
err := g.Redis.Watch(ctx, func(tx *redis.Tx) error {
|
||||
|
||||
// 检查验证码
|
||||
val, err := rds.Client.Get(ctx, key).Result()
|
||||
val, err := g.Redis.Get(ctx, key).Result()
|
||||
if err != nil && !errors.Is(err, redis.Nil) {
|
||||
slog.Error("验证码获取失败", slog.Any("err", err))
|
||||
return err
|
||||
|
||||
@@ -3,7 +3,7 @@ package services
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"platform/pkg/testutil"
|
||||
"platform/web/testutil"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
Reference in New Issue
Block a user