用户代理通道超时关闭(暂时跳过认证)

This commit is contained in:
2025-03-06 18:08:57 +08:00
parent 2a50237af2
commit 791f20d2d7
4 changed files with 67 additions and 11 deletions

View File

@@ -1,13 +1,17 @@
## todo
压力测试
排查启动速度很慢的问题
可配置 processUserConn 超时等待时间
测试跳过认证时的最大 qps需要注意单机连接数上限会导致连接失败
简化数据传递时的 tag 文本量(找一个无重复 hash 的办法),并且在控制通道直接传输目标地址,客户端可以同时开始数据通道和目标地址的连接建立
读取 conn 时加上超时机制
代理节点超时控制
在控制通道直接传输目标地址,客户端可以同时开始数据通道和目标地址的连接建立
网关根据代理节点对目标服务连接的反馈,决定向用户返回的 socks 响应
数据通道池化

View File

@@ -21,6 +21,12 @@ type AuthContext struct {
}
func CheckIp(conn net.Conn) (*AuthContext, error) {
return &AuthContext{
Timeout: 0,
Payload: Payload{
1,
},
}, nil
// 获取用户地址
remoteAddr := conn.RemoteAddr().String()
@@ -74,6 +80,12 @@ func CheckIp(conn net.Conn) (*AuthContext, error) {
}
func CheckPass(conn net.Conn, username, password string) (*AuthContext, error) {
return &AuthContext{
Timeout: 0,
Payload: Payload{
1,
},
}, nil
// 查询通道配置
var channel models.Channel

View File

@@ -4,9 +4,36 @@ import (
"bufio"
"fmt"
"net"
"sync"
"time"
)
type ConnMap struct {
_map sync.Map
}
func (c *ConnMap) LoadAndDelete(key string) (*Conn, bool) {
_value, ok := c._map.LoadAndDelete(key)
if !ok {
return nil, false
}
return _value.(*Conn), true
}
func (c *ConnMap) Store(key string, value *Conn) {
c._map.Store(key, value)
}
func (c *ConnMap) Range(f func(key string, value *Conn) bool) {
c._map.Range(func(key, value any) bool {
return f(key.(string), value.(*Conn))
})
}
func (c *ConnMap) Clear() {
c._map.Clear()
}
type Conn struct {
Conn net.Conn
Reader *bufio.Reader

View File

@@ -14,6 +14,7 @@ import (
"strconv"
"strings"
"sync"
"time"
"github.com/pkg/errors"
)
@@ -26,7 +27,7 @@ type Service struct {
ctx context.Context
cancel context.CancelFunc
userConnMap sync.Map
userConnMap core.ConnMap
fwdLesWg utils.CountWaitGroup
ctrlConnWg utils.CountWaitGroup
@@ -45,7 +46,7 @@ func New(config *Config) *Service {
ctx: ctx,
cancel: cancel,
userConnMap: sync.Map{},
userConnMap: core.ConnMap{},
fwdLesWg: utils.CountWaitGroup{},
ctrlConnWg: utils.CountWaitGroup{},
@@ -106,9 +107,8 @@ func (s *Service) Run() {
s.userConnWg.Wait()
// 清理资源
s.userConnMap.Range(func(key, value any) bool {
conn := value.(*core.Conn)
utils.Close(conn)
s.userConnMap.Range(func(key string, value *core.Conn) bool {
utils.Close(value)
return true
})
s.userConnMap.Clear()
@@ -283,13 +283,11 @@ func (s *Service) processDataConn(client net.Conn) error {
}
// 找到用户连接
userAny, ok := s.userConnMap.Load(tag)
user, ok := s.userConnMap.LoadAndDelete(tag)
if !ok {
return errors.New("查找用户连接失败")
}
user := userAny.(*core.Conn)
defer utils.Close(user)
defer s.userConnMap.Delete(tag)
// 发送目标地址
select {
@@ -360,5 +358,20 @@ func (s *Service) processUserConn(user *core.Conn, ctrl net.Conn) error {
// 记录用户连接
s.userConnMap.Store(user.Tag, user)
// 如果限定时间内没有建立数据通道,则关闭连接
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
select {
case <-s.ctx.Done():
// 服务会在退出时统一关闭未消费的连接
case <-timeout.Done():
storedUser, ok := s.userConnMap.LoadAndDelete(user.Tag)
if ok {
slog.Debug("用户连接超时,关闭连接", "tag", user.Tag)
utils.Close(storedUser)
}
}
return nil
}