diff --git a/docker-compose.yaml b/docker-compose.yaml index 537f53a..691abc5 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -42,10 +42,10 @@ services: gost: image: gogost/gost - network_mode: host - command: - - -api - - :8900 + command: > + -api test:test@:9700 + ports: + - "9700:9700" volumes: postgres_data: diff --git a/pkg/env/env.go b/pkg/env/env.go index c7e8e21..3576bee 100644 --- a/pkg/env/env.go +++ b/pkg/env/env.go @@ -45,7 +45,7 @@ var ( BaiyinCloudUrl string BaiyinTokenUrl string - GostApiPort = 8900 + GostApiPort = 9700 GostApiPathPrefix = "" IdenCallbackUrl string diff --git a/scripts/sql/init.sql b/scripts/sql/init.sql index a3c9907..9843bd4 100644 --- a/scripts/sql/init.sql +++ b/scripts/sql/init.sql @@ -605,6 +605,7 @@ create table proxy ( mac text not null, ip inet not null, host text, + port int, secret text, type int not null, status int not null, @@ -621,6 +622,7 @@ create index idx_proxy_created_at on proxy (created_at) where deleted_at is null comment on table proxy is '代理服务表'; comment on column proxy.id is '代理服务ID'; comment on column proxy.version is '代理服务版本'; +comment on column proxy.port is '代理服务端口'; comment on column proxy.mac is '代理服务名称'; comment on column proxy.ip is '代理服务地址'; comment on column proxy.host is '代理服务域名'; diff --git a/web/models/proxy.go b/web/models/proxy.go index 04b1a20..f694ce9 100644 --- a/web/models/proxy.go +++ b/web/models/proxy.go @@ -14,6 +14,7 @@ type Proxy struct { Mac string `json:"mac" gorm:"column:mac"` // 代理服务名称 IP orm.Inet `json:"ip" gorm:"column:ip;not null"` // 代理服务地址 Host *string `json:"host,omitempty" gorm:"column:host"` // 代理服务域名 + Port *int `json:"port,omitempty" gorm:"column:port"` // 代理服务端口 Secret *string `json:"secret,omitempty" gorm:"column:secret"` // 代理服务密钥 Type ProxyType `json:"type" gorm:"column:type"` // 代理服务类型:1-自有,2-白银 Status ProxyStatus `json:"status" gorm:"column:status"` // 代理服务状态:0-离线,1-在线 diff --git a/web/queries/edge.gen.go b/web/queries/edge.gen.go index 8e72315..150a29b 100644 --- a/web/queries/edge.gen.go +++ b/web/queries/edge.gen.go @@ -35,6 +35,7 @@ func newEdge(db *gorm.DB, opts ...gen.DOOption) edge { _edge.Version = field.NewInt32(tableName, "version") _edge.Mac = field.NewString(tableName, "mac") _edge.IP = field.NewField(tableName, "ip") + _edge.Port = field.NewUint16(tableName, "port") _edge.ISP = field.NewInt(tableName, "isp") _edge.Prov = field.NewString(tableName, "prov") _edge.City = field.NewString(tableName, "city") @@ -59,6 +60,7 @@ type edge struct { Version field.Int32 Mac field.String IP field.Field + Port field.Uint16 ISP field.Int Prov field.String City field.String @@ -89,6 +91,7 @@ func (e *edge) updateTableName(table string) *edge { e.Version = field.NewInt32(table, "version") e.Mac = field.NewString(table, "mac") e.IP = field.NewField(table, "ip") + e.Port = field.NewUint16(table, "port") e.ISP = field.NewInt(table, "isp") e.Prov = field.NewString(table, "prov") e.City = field.NewString(table, "city") @@ -111,7 +114,7 @@ func (e *edge) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (e *edge) fillFieldMap() { - e.fieldMap = make(map[string]field.Expr, 14) + e.fieldMap = make(map[string]field.Expr, 15) e.fieldMap["id"] = e.ID e.fieldMap["created_at"] = e.CreatedAt e.fieldMap["updated_at"] = e.UpdatedAt @@ -120,6 +123,7 @@ func (e *edge) fillFieldMap() { e.fieldMap["version"] = e.Version e.fieldMap["mac"] = e.Mac e.fieldMap["ip"] = e.IP + e.fieldMap["port"] = e.Port e.fieldMap["isp"] = e.ISP e.fieldMap["prov"] = e.Prov e.fieldMap["city"] = e.City diff --git a/web/queries/proxy.gen.go b/web/queries/proxy.gen.go index 5bdc38d..b9a264e 100644 --- a/web/queries/proxy.gen.go +++ b/web/queries/proxy.gen.go @@ -35,6 +35,7 @@ func newProxy(db *gorm.DB, opts ...gen.DOOption) proxy { _proxy.Mac = field.NewString(tableName, "mac") _proxy.IP = field.NewField(tableName, "ip") _proxy.Host = field.NewString(tableName, "host") + _proxy.Port = field.NewInt(tableName, "port") _proxy.Secret = field.NewString(tableName, "secret") _proxy.Type = field.NewInt(tableName, "type") _proxy.Status = field.NewInt(tableName, "status") @@ -283,6 +284,7 @@ type proxy struct { Mac field.String IP field.Field Host field.String + Port field.Int Secret field.String Type field.Int Status field.Int @@ -312,6 +314,7 @@ func (p *proxy) updateTableName(table string) *proxy { p.Mac = field.NewString(table, "mac") p.IP = field.NewField(table, "ip") p.Host = field.NewString(table, "host") + p.Port = field.NewInt(table, "port") p.Secret = field.NewString(table, "secret") p.Type = field.NewInt(table, "type") p.Status = field.NewInt(table, "status") @@ -332,7 +335,7 @@ func (p *proxy) GetFieldByName(fieldName string) (field.OrderExpr, bool) { } func (p *proxy) fillFieldMap() { - p.fieldMap = make(map[string]field.Expr, 13) + p.fieldMap = make(map[string]field.Expr, 14) p.fieldMap["id"] = p.ID p.fieldMap["created_at"] = p.CreatedAt p.fieldMap["updated_at"] = p.UpdatedAt @@ -341,6 +344,7 @@ func (p *proxy) fillFieldMap() { p.fieldMap["mac"] = p.Mac p.fieldMap["ip"] = p.IP p.fieldMap["host"] = p.Host + p.fieldMap["port"] = p.Port p.fieldMap["secret"] = p.Secret p.fieldMap["type"] = p.Type p.fieldMap["status"] = p.Status diff --git a/web/services/channel.go b/web/services/channel.go index cc85994..4044954 100644 --- a/web/services/channel.go +++ b/web/services/channel.go @@ -7,11 +7,11 @@ import ( "log/slog" "math/rand/v2" "net/netip" - "platform/pkg/env" "platform/pkg/u" "platform/web/core" e "platform/web/events" g "platform/web/globals" + "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" "strconv" @@ -20,6 +20,7 @@ import ( "github.com/hibiken/asynq" "github.com/redis/go-redis/v9" + "gorm.io/gen" "gorm.io/gen/field" ) @@ -28,26 +29,278 @@ var Channel = &channelServer{ provider: &channelGostProvider{}, } -type ChannelServiceProvider interface { - CreateChannels(source netip.Addr, resourceNo string, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error) - RemoveChannels(batch string) error - ClearExpiredChannels(proxyId int32) (int, error) +type channelProvider interface { + selectProxy(count int) (*m.Proxy, error) + prepareCreate(ctx *channelCreateContext) (*channelCreateResult, error) + removeRemote(batchNo string, batch *usedChanBatch) error } type channelServer struct { - provider ChannelServiceProvider + provider channelProvider } func (s *channelServer) CreateChannels(source netip.Addr, resourceNo string, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error) { - return s.provider.CreateChannels(source, resourceNo, authWhitelist, authPassword, count, edgeFilter) + now := time.Now() + batchNo := ID.GenReadable("bat") + var channels []*m.Channel + if edgeFilter == nil { + edgeFilter = &EdgeFilter{} + } + + var whitelistText *string + err := g.Redsync.WithLock(lockChannelCreateKey(resourceNo), func() error { + resource, whitelists, err := ensure(now, source, resourceNo, authWhitelist, count) + if err != nil { + return err + } + + if authWhitelist { + joined := strings.Join(whitelists, ",") + whitelistText = &joined + } + + expire := now.Add(resource.Live) + proxy, err := s.provider.selectProxy(count) + if err != nil { + return err + } + + ports, err := selectPorts(proxy.ID, batchNo, count, expire) + if err != nil { + return err + } + + createCtx := &channelCreateContext{ + Now: now, + Source: source, + Resource: resource, + Proxy: proxy, + BatchNo: batchNo, + Ports: ports, + Expire: expire, + Count: count, + Filter: edgeFilter, + AuthWhitelist: authWhitelist, + AuthPassword: authPassword, + Whitelists: whitelists, + WhitelistText: whitelistText, + } + + result, err := s.provider.prepareCreate(createCtx) + if err != nil { + return err + } + if result.applyRemote != nil { + if err := result.applyRemote(); err != nil { + return err + } + } + if err := persistChannelCreate(createCtx, result.Channels); err != nil { + return err + } + + channels = result.Channels + return nil + }) + if err != nil { + return nil, err + } + + return channels, nil } func (s *channelServer) RemoveChannels(batch string) error { - return s.provider.RemoveChannels(batch) + return g.Redsync.WithLock(lockChannelRemoveKey(batch), func() error { + start := time.Now() + + usedBatch, err := findUsedChanBatch(batch) + if err != nil { + return err + } + if usedBatch == nil { + slog.Debug("通道为空,跳过清理", "batch", batch) + return nil + } + + if err := s.provider.removeRemote(batch, usedBatch); err != nil { + return err + } + if err := freeChans(usedBatch.ProxyID, batch); err != nil { + return err + } + + slog.Debug("清除通道配置", "proxy", usedBatch.ProxyID, "batch", batch, "duration", time.Since(start).String()) + return nil + }) } func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) { - return s.provider.ClearExpiredChannels(proxyId) + batchSet, err := findExpiredChannelBatches(proxyId, time.Now()) + if err != nil { + return 0, err + } + + slog.Info("批量清理过期通道", "count", len(batchSet)) + for batchNo := range batchSet { + if err := s.RemoveChannels(batchNo); err != nil { + slog.Error("清理过期通道失败", "batch", batchNo, "error", err) + } + } + + return len(batchSet), nil +} + +type channelCreateContext struct { + Now time.Time + Source netip.Addr + Resource *ResourceView + Proxy *m.Proxy + BatchNo string + Ports []netip.AddrPort + Expire time.Time + Count int + Filter *EdgeFilter + AuthWhitelist bool + AuthPassword bool + Whitelists []string + WhitelistText *string +} + +type channelCreateResult struct { + Channels []*m.Channel + applyRemote func() error +} + +func newBaseChannel(ctx *channelCreateContext, port uint16) *m.Channel { + return &m.Channel{ + UserID: ctx.Resource.User.ID, + ResourceID: ctx.Resource.ID, + BatchNo: ctx.BatchNo, + ProxyID: ctx.Proxy.ID, + Host: u.Else(ctx.Proxy.Host, ctx.Proxy.IP.String()), + Port: port, + FilterISP: ctx.Filter.Isp, + FilterProv: ctx.Filter.Prov, + FilterCity: ctx.Filter.City, + ExpiredAt: ctx.Expire, + Proxy: ctx.Proxy, + } +} + +func applyChannelAuth(ctx *channelCreateContext, channel *m.Channel) (username string, password string, ok bool) { + if ctx.AuthWhitelist { + channel.Whitelists = ctx.WhitelistText + } + if !ctx.AuthPassword { + return "", "", false + } + + username, password = genPassPair() + channel.Username = &username + channel.Password = &password + return username, password, true +} + +func persistChannelCreate(ctx *channelCreateContext, channels []*m.Channel) error { + return q.Q.Transaction(func(tx *q.Query) error { + var ( + result gen.ResultInfo + err error + ) + switch ctx.Resource.Type { + case m.ResourceTypeShort: + result, err = tx.ResourceShort. + Where( + tx.ResourceShort.ID.Eq(*ctx.Resource.ShortId), + tx.ResourceShort.Used.Eq(ctx.Resource.Used), + tx.ResourceShort.Daily.Eq(ctx.Resource.Daily), + ). + UpdateSimple( + tx.ResourceShort.Used.Add(int32(ctx.Count)), + tx.ResourceShort.Daily.Value(int32(ctx.Resource.Today+ctx.Count)), + tx.ResourceShort.LastAt.Value(ctx.Now), + ) + case m.ResourceTypeLong: + result, err = tx.ResourceLong. + Where( + tx.ResourceLong.ID.Eq(*ctx.Resource.LongId), + tx.ResourceLong.Used.Eq(ctx.Resource.Used), + tx.ResourceLong.Daily.Eq(ctx.Resource.Daily), + ). + UpdateSimple( + tx.ResourceLong.Used.Add(int32(ctx.Count)), + tx.ResourceLong.Daily.Value(int32(ctx.Resource.Today+ctx.Count)), + tx.ResourceLong.LastAt.Value(ctx.Now), + ) + default: + return core.NewBizErr("套餐类型不正确,无法更新") + } + if err != nil { + return core.NewServErr("更新套餐使用记录失败", err) + } + if result.RowsAffected == 0 { + return core.NewBizErr("套餐状态已过期") + } + + if err := tx.Channel.Omit(field.AssociationFields).Create(channels...); err != nil { + return core.NewServErr("保存通道失败", err) + } + + if err := tx.LogsUserUsage.Create(&m.LogsUserUsage{ + UserID: ctx.Resource.User.ID, + ResourceID: ctx.Resource.ID, + BatchNo: ctx.BatchNo, + Count: int32(ctx.Count), + ISP: u.X(ctx.Filter.Isp.String()), + Prov: ctx.Filter.Prov, + City: ctx.Filter.City, + IP: orm.Inet{Addr: ctx.Source}, + Time: ctx.Now, + }); err != nil { + return core.NewServErr("保存用户使用记录失败", err) + } + + return nil + }) +} + +func findExpiredChannelBatches(proxyId int32, now time.Time) (map[string]struct{}, error) { + keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result() + if err != nil { + return nil, core.NewServErr("查询使用中通道失败", err) + } + if len(keys) == 0 { + return map[string]struct{}{}, nil + } + + batchList := make([]string, len(keys)) + batchSet := make(map[string]struct{}, len(keys)) + for i, key := range keys { + parsed, err := parseUsedChanKey(key) + if err != nil { + return nil, err + } + batchList[i] = parsed.BatchNo + batchSet[parsed.BatchNo] = struct{}{} + } + + var batchQueried []struct{ BatchNo string } + err = q.Channel. + Select(q.Channel.BatchNo). + Where( + q.Channel.BatchNo.In(batchList...), + q.Channel.ExpiredAt.Gte(now.UTC()), + ). + Group(q.Channel.BatchNo). + Scan(&batchQueried) + if err != nil { + return nil, core.NewServErr("查询过期通道失败", err) + } + for _, batch := range batchQueried { + delete(batchSet, batch.BatchNo) + } + + return batchSet, nil } func lockChannelCreateKey(resourceNo string) string { @@ -87,36 +340,26 @@ func selectProxyByType(proxyType m.ProxyType, count int) (*m.Proxy, error) { return nil, core.NewBizErr("无可用代理") } - proxyIDs := make([]int32, 0, len(proxies)) - proxyMap := make(map[int32]*m.Proxy, len(proxies)) - for _, item := range proxies { - proxyIDs = append(proxyIDs, item.ID) - proxyMap[item.ID] = item - } - - maxID := int32(0) + var bestProxy *m.Proxy maxCount := -1 - for _, id := range proxyIDs { - idCount, err := g.Redis.SCard(context.Background(), freeChansKey(id)).Result() + for _, proxy := range proxies { + idCount, err := g.Redis.SCard(context.Background(), freeChansKey(proxy.ID)).Result() if err != nil { return nil, core.NewServErr("查询可用通道数量失败", err) } if idCount > int64(maxCount) { maxCount = int(idCount) - maxID = id + bestProxy = proxy } } if maxCount < count { return nil, core.NewBizErr("无可用代理") } - return proxyMap[maxID], nil + return bestProxy, nil } func (s *channelServer) RefreshEdges() error { - if env.RunMode != env.RunModeProd { - return nil - } // 仅白银网关支持边缘节点刷新,GOST 不参与此流程。 proxies, err := q.Proxy.Where( @@ -355,6 +598,11 @@ type usedChanBatch struct { Chans []netip.AddrPort } +type usedChanKey struct { + ProxyID int32 + BatchNo string +} + func findUsedChanBatch(batch string) (*usedChanBatch, error) { keys, err := g.Redis.Keys(context.Background(), "channel:used:*:"+batch).Result() if err != nil { @@ -393,7 +641,7 @@ func selectUsedChanBatchKey(batch string, keys []string) (string, bool, error) { } func parseUsedChanBatch(key string, chans []string) (*usedChanBatch, error) { - proxyID, err := parseUsedChansKey(key) + parsed, err := parseUsedChanKey(key) if err != nil { return nil, err } @@ -408,23 +656,26 @@ func parseUsedChanBatch(key string, chans []string) (*usedChanBatch, error) { } return &usedChanBatch{ - ProxyID: proxyID, + ProxyID: parsed.ProxyID, Chans: addrs, }, nil } -func parseUsedChansKey(key string) (int32, error) { +func parseUsedChanKey(key string) (*usedChanKey, error) { parts := strings.Split(key, ":") if len(parts) != 4 { - return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil) + return nil, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil) } proxyID, err := strconv.Atoi(parts[2]) if err != nil { - return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), err) + return nil, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), err) } - return int32(proxyID), nil + return &usedChanKey{ + ProxyID: int32(proxyID), + BatchNo: parts[3], + }, nil } // 扩容通道 @@ -530,9 +781,7 @@ return 1 // 错误信息 var ( ErrResourceNotExist = core.NewBizErr("套餐不存在") - ErrResourceInvalid = core.NewBizErr("套餐不可用") ErrResourceExhausted = core.NewBizErr("套餐已用完") ErrResourceExpired = core.NewBizErr("套餐已过期") ErrResourceDailyLimit = core.NewBizErr("套餐每日配额已用完") - ErrEdgesNoAvailable = core.NewBizErr("没有可用的节点") ) diff --git a/web/services/channel_baiyin.go b/web/services/channel_baiyin.go index 8af05e5..166aff9 100644 --- a/web/services/channel_baiyin.go +++ b/web/services/channel_baiyin.go @@ -1,331 +1,96 @@ package services import ( - "context" - "encoding/json" "fmt" "log/slog" - "net/netip" - "platform/pkg/env" "platform/pkg/u" "platform/web/core" g "platform/web/globals" - "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" - "strings" - "time" - - "gorm.io/gen" - "gorm.io/gen/field" ) type channelBaiyinProvider struct{} -func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceNo string, authWhitelist bool, authPassword bool, count int, filter *EdgeFilter) ([]*m.Channel, error) { - if filter == nil { - return nil, core.NewBizErr("缺少节点过滤条件") +func (s *channelBaiyinProvider) selectProxy(count int) (*m.Proxy, error) { + return selectProxyByType(m.ProxyTypeBaiYin, count) +} + +func (s *channelBaiyinProvider) prepareCreate(ctx *channelCreateContext) (*channelCreateResult, error) { + gateway, err := proxyGateway(ctx.Proxy) + if err != nil { + return nil, core.NewServErr("创建代理网关失败", err) } - now := time.Now() - batchNo := ID.GenReadable("bat") - channels := make([]*m.Channel, count) - - // 资源锁,防止并发扣减失败导致的端口悬空问题 - err := g.Redsync.WithLock(lockChannelCreateKey(resourceNo), func() error { - // 检查并获取套餐与白名单 - resource, whitelists, err := ensure(now, source, resourceNo, authWhitelist, count) - if err != nil { - return err + channels := make([]*m.Channel, len(ctx.Ports)) + chanConfigs := make([]*g.PortConfigsReq, len(ctx.Ports)) + for i, portRef := range ctx.Ports { + channel := newBaseChannel(ctx, portRef.Port()) + cfg := &g.PortConfigsReq{ + Port: int(portRef.Port()), + Status: true, + AutoEdgeConfig: &g.AutoEdgeConfig{ + Province: u.Z(ctx.Filter.Prov), + City: u.Z(ctx.Filter.City), + Isp: ctx.Filter.Isp.String(), + Count: u.P(1), + }, } - user := resource.User - expire := now.Add(resource.Live) - - // 选择代理 - proxy, gateway, err := selectProxy(count) - if err != nil { - return err + if ctx.AuthWhitelist { + cfg.Whitelist = &ctx.Whitelists + } + if username, password, ok := applyChannelAuth(ctx, channel); ok { + cfg.Userpass = u.P(username + ":" + password) } - // 取用端口 - chans, err := selectPorts(proxy.ID, batchNo, count, expire) - if err != nil { - return err - } + channels[i] = channel + chanConfigs[i] = cfg + } - // 绑定节点端口 - chanConfigs := make([]*g.PortConfigsReq, count) - edgeConfigs := make([]string, 0, count) - for i := range count { - ch := chans[i] - - // 通道数据 - channels[i] = &m.Channel{ - UserID: user.ID, - ResourceID: resource.ID, - BatchNo: batchNo, - ProxyID: proxy.ID, - Host: u.Else(proxy.Host, proxy.IP.String()), - Port: ch.Port(), - FilterISP: filter.Isp, - FilterProv: filter.Prov, - FilterCity: filter.City, - ExpiredAt: expire, - Proxy: proxy, + return &channelCreateResult{ + Channels: channels, + applyRemote: func() error { + slog.Debug("提交代理端口配置", "proxy", ctx.Proxy.IP.String(), "total_count", len(chanConfigs)) + if err := ensureEdges(ctx.Proxy, gateway, ctx.Filter, ctx.Count); err != nil { + slog.Warn("ensureEdges 失败", "err", err) } - - // 通道配置数据 - chanConfigs[i] = &g.PortConfigsReq{ - Port: int(ch.Port()), - Status: true, - AutoEdgeConfig: &g.AutoEdgeConfig{ - Province: u.Z(filter.Prov), - City: u.Z(filter.City), - Isp: filter.Isp.String(), - Count: u.P(1), - }, - } - - // 白名单模式 - if authWhitelist { - channels[i].Whitelists = u.P(strings.Join(whitelists, ",")) - chanConfigs[i].Whitelist = &whitelists - } - - // 密码模式 - if authPassword { - username, password := genPassPair() - channels[i].Username = &username - channels[i].Password = &password - chanConfigs[i].Userpass = u.P(username + ":" + password) - } - } - - // 提交配置 - slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs)) - if env.RunMode == env.RunModeProd { - - // 从云端补足节点 - err := ensureEdges(proxy, gateway, filter, count) - if err != nil { - slog.Warn("ensureEdges 失败", "err", err) // 不阻止通道创建,继续走后续流程 - } - - // 启用网关代理通道 if len(chanConfigs) > 0 { if err := gateway.GatewayPortConfigs(chanConfigs); err != nil { slog.Warn("提交代理端口配置失败", "error", err.Error()) - return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err) + return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", ctx.Proxy.IP.String()), err) } } - } else { - for _, item := range chanConfigs { - str, _ := json.Marshal(item) - fmt.Println(string(str)) - } - } - - // 保存数据 - err = q.Q.Transaction(func(q *q.Query) error { - // 更新使用记录 - var result gen.ResultInfo - var err error - switch resource.Type { - case m.ResourceTypeShort: - result, err = q.ResourceShort. - Where( - q.ResourceShort.ID.Eq(*resource.ShortId), - q.ResourceShort.Used.Eq(resource.Used), - q.ResourceShort.Daily.Eq(resource.Daily), - ). - UpdateSimple( - q.ResourceShort.Used.Add(int32(count)), - q.ResourceShort.Daily.Value(int32(resource.Today+count)), - q.ResourceShort.LastAt.Value(now), - ) - - case m.ResourceTypeLong: - result, err = q.ResourceLong. - Where( - q.ResourceLong.ID.Eq(*resource.LongId), - q.ResourceLong.Used.Eq(resource.Used), - q.ResourceLong.Daily.Eq(resource.Daily), - ). - UpdateSimple( - q.ResourceLong.Used.Add(int32(count)), - q.ResourceLong.Daily.Value(int32(resource.Today+count)), - q.ResourceLong.LastAt.Value(now), - ) - - default: - return core.NewBizErr("套餐类型不正确,无法更新") - } - if err != nil { - return core.NewServErr("更新套餐使用记录失败", err) - } - if result.RowsAffected == 0 { - return core.NewBizErr("套餐状态已过期") - } - - // 保存通道 - err = q.Channel. - Omit(field.AssociationFields). - Create(channels...) - if err != nil { - return core.NewServErr("保存通道失败", err) - } - - // 保存提取记录 - err = q.LogsUserUsage.Create(&m.LogsUserUsage{ - UserID: user.ID, - ResourceID: resource.ID, - BatchNo: batchNo, - Count: int32(count), - ISP: u.X(filter.Isp.String()), - Prov: filter.Prov, - City: filter.City, - IP: orm.Inet{Addr: source}, - Time: now, - }) - if err != nil { - return core.NewServErr("保存用户使用记录失败", err) - } - return nil - }) - if err != nil { - return err - } - - return nil - }) - if err != nil { - return nil, err - } - - return channels, nil + }, + }, nil } -func (s *channelBaiyinProvider) RemoveChannels(batchNo string) error { - return g.Redsync.WithLock(lockChannelRemoveKey(batchNo), func() error { - start := time.Now() - - batch, err := findUsedChanBatch(batchNo) - if err != nil { - return err - } - if batch == nil { - slog.Debug("通道为空,跳过清理", "batch", batchNo) - return nil +func (s *channelBaiyinProvider) removeRemote(_ string, batch *usedChanBatch) error { + configs := make([]*g.PortConfigsReq, len(batch.Chans)) + for i, ch := range batch.Chans { + configs[i] = &g.PortConfigsReq{ + Port: int(ch.Port()), + Edge: &[]string{}, + AutoEdgeConfig: &g.AutoEdgeConfig{Count: u.P(0)}, + Status: false, } + } - configs := make([]*g.PortConfigsReq, len(batch.Chans)) - for i, ch := range batch.Chans { - configs[i] = &g.PortConfigsReq{ - Port: int(ch.Port()), - Edge: &[]string{}, - AutoEdgeConfig: &g.AutoEdgeConfig{Count: u.P(0)}, - Status: false, - } - } - - // 提交配置 - if env.RunMode == env.RunModeProd { - proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(batch.ProxyID)).Take() - if err != nil { - return core.NewServErr("获取代理数据失败", err) - } - - gateway, err := proxyGateway(proxy) - if err != nil { - return core.NewServErr("创建代理网关失败", err) - } - - if err = gateway.GatewayPortConfigs(configs); err != nil { - return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) - } - } else { - for _, item := range configs { - str, _ := json.Marshal(item) - fmt.Println(string(str)) - } - } - - if err := freeChans(batch.ProxyID, batchNo); err != nil { - return err - } - - slog.Debug("清除代理端口配置", "proxy", batch.ProxyID, "batch", batchNo, "duration", time.Since(start).String()) - return nil - }) -} - -// ClearExpiredChannels 清理指定代理的过期通道,并返回清理数量(现在理论上不会有需要手动批量清理的通道,未来可以废弃) -func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) { - now := time.Now() - - // 获取未清理通道 - keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result() + proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(batch.ProxyID)).Take() if err != nil { - return 0, core.NewServErr("查询使用中通道失败", err) - } - if len(keys) == 0 { - return 0, nil - } - - batchList := make([]string, len(keys)) - batchSet := make(map[string]struct{}, len(keys)) - for i, key := range keys { - parts := strings.Split(key, ":") - if len(parts) != 4 { - return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil) - } - batchList[i] = parts[3] - batchSet[parts[3]] = struct{}{} - } - - // 排除未过期通道 - var batchQueried []struct{ BatchNo string } - err = q.Channel. - Select(q.Channel.BatchNo). - Where( - q.Channel.BatchNo.In(batchList...), - q.Channel.ExpiredAt.Gte(now.UTC()), - ). - Group(q.Channel.BatchNo). - Scan(&batchQueried) - if err != nil { - return 0, core.NewServErr("查询过期通道失败", err) - } - for _, batch := range batchQueried { - delete(batchSet, batch.BatchNo) - } - - // 清理过期通道 - slog.Info("批量清理过期通道", "count", len(batchSet)) - for batchNo, _ := range batchSet { - err := s.RemoveChannels(batchNo) - if err != nil { - slog.Error("清理过期通道失败", "batch", batchNo, "error", err) - } - } - - return len(batchSet), nil -} - -func selectProxy(count int) (*m.Proxy, g.GatewayClient, error) { - proxy, err := selectProxyByType(m.ProxyTypeBaiYin, count) - if err != nil { - return nil, nil, err + return core.NewServErr("获取代理数据失败", err) } gateway, err := proxyGateway(proxy) if err != nil { - return nil, nil, core.NewServErr("创建代理网关失败", err) + return core.NewServErr("创建代理网关失败", err) } - return proxy, gateway, nil + if err = gateway.GatewayPortConfigs(configs); err != nil { + return core.NewServErr(fmt.Sprintf("清空代理 %s 端口配置失败", proxy.IP.String()), err) + } + return nil } // ensureEdges 检查本地节点是否足够,如果不足从云端连入 diff --git a/web/services/channel_gost.go b/web/services/channel_gost.go index 57bd0d6..4c9771c 100644 --- a/web/services/channel_gost.go +++ b/web/services/channel_gost.go @@ -1,321 +1,130 @@ package services import ( - "context" "fmt" - "log/slog" - "net/netip" "platform/pkg/env" "platform/pkg/u" "platform/web/core" g "platform/web/globals" - "platform/web/globals/orm" m "platform/web/models" q "platform/web/queries" "strings" - "time" - - "gorm.io/gen" - "gorm.io/gen/field" ) type channelGostProvider struct{} -func (s *channelGostProvider) CreateChannels(source netip.Addr, resourceNo string, authWhitelist bool, authPassword bool, count int, filter *EdgeFilter) ([]*m.Channel, error) { - now := time.Now() - batchNo := ID.GenReadable("bat") - channels := make([]*m.Channel, count) - if filter == nil { - filter = &EdgeFilter{} - } - - err := g.Redsync.WithLock(lockChannelCreateKey(resourceNo), func() error { - resource, whitelists, err := ensure(now, source, resourceNo, authWhitelist, count) - if err != nil { - return err - } - - user := resource.User - expire := now.Add(resource.Live) - - proxy, err := s.selectProxy(count) - if err != nil { - return err - } - - chans, err := selectPorts(proxy.ID, batchNo, count, expire) - if err != nil { - return err - } - - edges, err := s.selectEdge(filter, count) - if err != nil { - return err - } - - client, err := proxyGost(proxy) - if err != nil { - return err - } - - admissions := make([]*g.GostAdmissionConfig, 0, count) - authers := make([]*g.GostAutherConfig, 0, count) - services := make([]*g.GostServiceConfig, count) - - for i := range count { - ch := chans[i] - edge := edges[i] - port := ch.Port() - host := u.Else(proxy.Host, proxy.IP.String()) - - serviceName := gostServiceName(batchNo, port) - channel := &m.Channel{ - UserID: user.ID, - ResourceID: resource.ID, - BatchNo: batchNo, - ProxyID: proxy.ID, - Host: host, - Port: port, - EdgeID: u.P(edge.ID), - EdgeRef: u.P(serviceName), - FilterISP: filter.Isp, - FilterProv: filter.Prov, - FilterCity: filter.City, - IP: u.P(edge.IP), - ExpiredAt: expire, - Proxy: proxy, - } - - service := &g.GostServiceConfig{ - Name: serviceName, - Addr: fmt.Sprintf(":%d", port), - Handler: g.GostHandlerConfig{ - Type: "auto", - Chain: edge.Mac, - }, - Listener: g.GostListenerConfig{ - Type: "tcp", - }, - } - - if authWhitelist { - channel.Whitelists = u.P(strings.Join(whitelists, ",")) - service.Admission = gostAdmissionName(batchNo, port) - admission := &g.GostAdmissionConfig{ - Name: service.Admission, - Whitelist: true, - Matchers: whitelists, - } - admissions = append(admissions, admission) - } - - if authPassword { - username, password := genPassPair() - channel.Username = &username - channel.Password = &password - service.Handler.Auther = gostAutherName(batchNo, port) - auther := &g.GostAutherConfig{ - Name: service.Handler.Auther, - Auths: []g.GostAuthConfig{{ - Username: username, - Password: password, - }}, - } - authers = append(authers, auther) - } - - services[i] = service - channels[i] = channel - } - - for _, admission := range admissions { - if err := client.CreateAdmission(admission); err != nil { - return core.NewServErr(fmt.Sprintf("创建 GOST admission 失败: %s", admission.Name), err) - } - } - for _, auther := range authers { - if err := client.CreateAuther(auther); err != nil { - return core.NewServErr(fmt.Sprintf("创建 GOST auther 失败: %s", auther.Name), err) - } - } - for _, service := range services { - if err := client.CreateService(service); err != nil { - return core.NewServErr(fmt.Sprintf("创建 GOST service 失败: %s", service.Name), err) - } - } - - err = q.Q.Transaction(func(tx *q.Query) error { - var result gen.ResultInfo - var err error - switch resource.Type { - case m.ResourceTypeShort: - result, err = tx.ResourceShort. - Where( - tx.ResourceShort.ID.Eq(*resource.ShortId), - tx.ResourceShort.Used.Eq(resource.Used), - tx.ResourceShort.Daily.Eq(resource.Daily), - ). - UpdateSimple( - tx.ResourceShort.Used.Add(int32(count)), - tx.ResourceShort.Daily.Value(int32(resource.Today+count)), - tx.ResourceShort.LastAt.Value(now), - ) - case m.ResourceTypeLong: - result, err = tx.ResourceLong. - Where( - tx.ResourceLong.ID.Eq(*resource.LongId), - tx.ResourceLong.Used.Eq(resource.Used), - tx.ResourceLong.Daily.Eq(resource.Daily), - ). - UpdateSimple( - tx.ResourceLong.Used.Add(int32(count)), - tx.ResourceLong.Daily.Value(int32(resource.Today+count)), - tx.ResourceLong.LastAt.Value(now), - ) - default: - return core.NewBizErr("套餐类型不正确,无法更新") - } - if err != nil { - return core.NewServErr("更新套餐使用记录失败", err) - } - if result.RowsAffected == 0 { - return core.NewBizErr("套餐状态已过期") - } - - if err := tx.Channel.Omit(field.AssociationFields).Create(channels...); err != nil { - return core.NewServErr("保存通道失败", err) - } - - if err := tx.LogsUserUsage.Create(&m.LogsUserUsage{ - UserID: user.ID, - ResourceID: resource.ID, - BatchNo: batchNo, - Count: int32(count), - ISP: u.X(filter.Isp.String()), - Prov: filter.Prov, - City: filter.City, - IP: orm.Inet{Addr: source}, - Time: now, - }); err != nil { - return core.NewServErr("保存用户使用记录失败", err) - } - - return nil - }) - if err != nil { - return err - } - - return nil - }) +func (s *channelGostProvider) prepareCreate(ctx *channelCreateContext) (*channelCreateResult, error) { + edges, err := s.selectEdge(ctx.Filter, ctx.Count) if err != nil { return nil, err } - return channels, nil -} + client, err := proxyGost(ctx.Proxy) + if err != nil { + return nil, err + } -func (s *channelGostProvider) RemoveChannels(batchNo string) error { - return g.Redsync.WithLock(lockChannelRemoveKey(batchNo), func() error { - start := time.Now() + admissions := make([]*g.GostAdmissionConfig, 0, ctx.Count) + authers := make([]*g.GostAutherConfig, 0, ctx.Count) + services := make([]*g.GostServiceConfig, len(ctx.Ports)) + channels := make([]*m.Channel, len(ctx.Ports)) - batch, err := findUsedChanBatch(batchNo) - if err != nil { - return err + for i, portRef := range ctx.Ports { + edge := edges[i] + port := portRef.Port() + serviceName := gostServiceName(ctx.BatchNo, port) + channel := newBaseChannel(ctx, port) + channel.EdgeID = u.P(edge.ID) + channel.EdgeRef = u.P(serviceName) + channel.IP = u.P(edge.IP) + + service := &g.GostServiceConfig{ + Name: serviceName, + Addr: fmt.Sprintf(":%d", port), + Handler: g.GostHandlerConfig{ + Type: "auto", + Chain: edge.Mac, + }, + Listener: g.GostListenerConfig{ + Type: "tcp", + }, } - if batch == nil { - slog.Debug("通道为空,跳过清理", "batch", batchNo) + + if ctx.AuthWhitelist { + service.Admission = gostAdmissionName(ctx.BatchNo, port) + admissions = append(admissions, &g.GostAdmissionConfig{ + Name: service.Admission, + Whitelist: true, + Matchers: ctx.Whitelists, + }) + } + if username, password, ok := applyChannelAuth(ctx, channel); ok { + service.Handler.Auther = gostAutherName(ctx.BatchNo, port) + authers = append(authers, &g.GostAutherConfig{ + Name: service.Handler.Auther, + Auths: []g.GostAuthConfig{{ + Username: username, + Password: password, + }}, + }) + } + + services[i] = service + channels[i] = channel + } + + return &channelCreateResult{ + Channels: channels, + applyRemote: func() error { + for _, admission := range admissions { + if err := client.CreateAdmission(admission); err != nil { + return core.NewServErr(fmt.Sprintf("创建 GOST admission 失败: %s", admission.Name), err) + } + } + for _, auther := range authers { + if err := client.CreateAuther(auther); err != nil { + return core.NewServErr(fmt.Sprintf("创建 GOST auther 失败: %s", auther.Name), err) + } + } + for _, service := range services { + if err := client.CreateService(service); err != nil { + return core.NewServErr(fmt.Sprintf("创建 GOST service 失败: %s", service.Name), err) + } + } return nil - } - - if env.RunMode == env.RunModeProd { - proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(batch.ProxyID)).Take() - if err != nil { - return core.NewServErr("获取代理数据失败", err) - } - - client, err := proxyGost(proxy) - if err != nil { - return core.NewServErr("创建 GOST 客户端失败", err) - } - var deleteErrs []error - for _, ch := range batch.Chans { - port := ch.Port() - serviceName := gostServiceName(batchNo, port) - deleteErrs = append(deleteErrs, deleteGostResource("service", serviceName, func() error { - return client.DeleteService(serviceName) - })) - - autherName := gostAutherName(batchNo, port) - deleteErrs = append(deleteErrs, deleteGostResource("auther", autherName, func() error { - return client.DeleteAuther(autherName) - })) - - admissionName := gostAdmissionName(batchNo, port) - deleteErrs = append(deleteErrs, deleteGostResource("admission", admissionName, func() error { - return client.DeleteAdmission(admissionName) - })) - } - if err := u.CombineErrors(deleteErrs); err != nil { - return err - } - } - - if err := freeChans(batch.ProxyID, batchNo); err != nil { - return err - } - - slog.Debug("清除 GOST 端口配置", "proxy", batch.ProxyID, "batch", batchNo, "duration", time.Since(start).String()) - return nil - }) + }, + }, nil } -func (s *channelGostProvider) ClearExpiredChannels(proxyId int32) (int, error) { - now := time.Now() - - keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result() +func (s *channelGostProvider) removeRemote(batchNo string, batch *usedChanBatch) error { + proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(batch.ProxyID)).Take() if err != nil { - return 0, core.NewServErr("查询使用中通道失败", err) - } - if len(keys) == 0 { - return 0, nil + return core.NewServErr("获取代理数据失败", err) } - batchList := make([]string, len(keys)) - batchSet := make(map[string]struct{}, len(keys)) - for i, key := range keys { - parts := strings.Split(key, ":") - if len(parts) != 4 { - return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil) - } - batchList[i] = parts[3] - batchSet[parts[3]] = struct{}{} - } - - var batchQueried []struct{ BatchNo string } - err = q.Channel. - Select(q.Channel.BatchNo). - Where( - q.Channel.BatchNo.In(batchList...), - q.Channel.ExpiredAt.Gte(now.UTC()), - ). - Group(q.Channel.BatchNo). - Scan(&batchQueried) + client, err := proxyGost(proxy) if err != nil { - return 0, core.NewServErr("查询过期通道失败", err) - } - for _, batch := range batchQueried { - delete(batchSet, batch.BatchNo) + return core.NewServErr("创建 GOST 客户端失败", err) } + var deleteErrs []error + for _, ch := range batch.Chans { + port := ch.Port() + serviceName := gostServiceName(batchNo, port) + deleteErrs = append(deleteErrs, deleteGostResource("service", serviceName, func() error { + return client.DeleteService(serviceName) + })) - slog.Info("批量清理过期 GOST 通道", "count", len(batchSet)) - for batchNo := range batchSet { - if err := s.RemoveChannels(batchNo); err != nil { - slog.Error("清理过期 GOST 通道失败", "batch", batchNo, "error", err) - } - } + autherName := gostAutherName(batchNo, port) + deleteErrs = append(deleteErrs, deleteGostResource("auther", autherName, func() error { + return client.DeleteAuther(autherName) + })) - return len(batchSet), nil + admissionName := gostAdmissionName(batchNo, port) + deleteErrs = append(deleteErrs, deleteGostResource("admission", admissionName, func() error { + return client.DeleteAdmission(admissionName) + })) + } + return u.CombineErrors(deleteErrs) } func (s *channelGostProvider) selectProxy(count int) (*m.Proxy, error) { diff --git a/web/services/proxy.go b/web/services/proxy.go index e563604..b813514 100644 --- a/web/services/proxy.go +++ b/web/services/proxy.go @@ -64,6 +64,7 @@ type CreateProxy struct { Mac string `json:"mac" validate:"required"` IP string `json:"ip" validate:"required"` Host *string `json:"host"` + Port *int `json:"port"` Secret *string `json:"secret"` Type *m.ProxyType `json:"type"` Status *m.ProxyStatus `json:"status"` @@ -80,6 +81,7 @@ func (s *proxyService) Create(create *CreateProxy) error { Mac: create.Mac, IP: orm.Inet{Addr: addr}, Host: create.Host, + Port: create.Port, Secret: create.Secret, Type: u.Else(create.Type, m.ProxyTypeSelfHosted), Status: u.Else(create.Status, m.ProxyStatusOffline), @@ -99,6 +101,7 @@ type UpdateProxy struct { Mac *string `json:"mac"` IP *string `json:"ip"` Host *string `json:"host"` + Port *int `json:"port"` Secret *string `json:"secret"` } @@ -121,6 +124,9 @@ func (s *proxyService) Update(update *UpdateProxy) error { if update.Host != nil { simples = append(simples, q.Proxy.Host.Value(*update.Host)) } + if update.Port != nil { + simples = append(simples, q.Proxy.Port.Value(*update.Port)) + } if update.Secret != nil { hasSideEffect = true simples = append(simples, q.Proxy.Secret.Value(*update.Secret))