From 7f23e2741f587f4490ad783d7b193f003e3e66c2 Mon Sep 17 00:00:00 2001 From: luorijun Date: Tue, 25 Feb 2025 14:48:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E9=A1=B9=E7=9B=AE=E6=9C=BA?= =?UTF-8?q?=E6=9E=84=E5=92=8C=E6=9C=8D=E5=8A=A1=E7=AB=AF=E5=8D=8F=E7=A8=8B?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 12 +- cmd/server/main.go | 391 +------------- gen.go | 3 +- main.go | 2 +- pkg/utils/utils.go | 44 ++ server/fwd/service.go | 501 ++++++++++++++++++ server/{monitor/monitor.go => mnt/service.go} | 5 +- server/pkg/env/env.go | 86 +++ server/{ => pkg}/orm/orm.go | 16 +- {pkg => server/pkg}/resp/resp.go | 0 {pkg => server/pkg}/socks5/auth.go | 3 +- {pkg => server/pkg}/socks5/error.go | 0 {pkg => server/pkg}/socks5/request.go | 55 +- {pkg => server/pkg}/socks5/resolver.go | 0 {pkg => server/pkg}/socks5/ruleset.go | 0 {pkg => server/pkg}/socks5/server.go | 0 server/service.go | 28 +- server/web/app/handlers/channel.go | 9 +- server/web/app/handlers/node.go | 7 +- server/web/auth/middleware.go | 7 +- server/web/{web.go => service.go} | 3 +- 21 files changed, 732 insertions(+), 440 deletions(-) create mode 100644 server/fwd/service.go rename server/{monitor/monitor.go => mnt/service.go} (98%) create mode 100644 server/pkg/env/env.go rename server/{ => pkg}/orm/orm.go (71%) rename {pkg => server/pkg}/resp/resp.go (100%) rename {pkg => server/pkg}/socks5/auth.go (99%) rename {pkg => server/pkg}/socks5/error.go (100%) rename {pkg => server/pkg}/socks5/request.go (91%) rename {pkg => server/pkg}/socks5/resolver.go (100%) rename {pkg => server/pkg}/socks5/ruleset.go (100%) rename {pkg => server/pkg}/socks5/server.go (100%) rename server/web/{web.go => service.go} (99%) diff --git a/README.md b/README.md index 7f7c83e..4388b97 100644 --- a/README.md +++ b/README.md @@ -1,16 +1,24 @@ ## todo +监听进程信号,优雅关闭服务 + +加一个 log 包,实现全局日志格式控制 + 读取 conn 时加上超时机制 检查 ip 时需要判断同一 ip 的不同写法 客户端重连后出现连接卡死的情况 +实现一个 socks context 以在子组件中获取 socks 相关信息 + +fwd 使用自定义 context 实现在一个上下文中控制 cancel,errch 和其他自定义数据 + ### 长期 考虑一下连接安全性 -内部接口 http2 或者 protobuf +内部接口 rtt 是否还有优化空间(当前30-300ms,根据内容大小增长) ## 开发相关 @@ -29,6 +37,6 @@ ### 更新测试环境 1. 构建项目 -2. 使用测试配置远程启动 docker +2. 使用测试配置 `.env.test` 远程启动 docker ## 转发服务 diff --git a/cmd/server/main.go b/cmd/server/main.go index 69665a2..69fe366 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -1,396 +1,9 @@ package main import ( - "bufio" - "context" - "encoding/binary" - "github.com/joho/godotenv" - "github.com/pkg/errors" - "io" - "log/slog" - "net" - "os" - "proxy-server/pkg/socks5" - "proxy-server/pkg/utils" - "proxy-server/server/orm" - "proxy-server/server/web/app/models" - "strconv" - "time" + "proxy-server/server" ) -var connMap = make(map[string]socks5.ProxyData) - func main() { - slog.SetLogLoggerLevel(slog.LevelDebug) - - // 初始化环境变量 - err := godotenv.Load() - if err != nil { - slog.Debug("没有本地环境变量文件") - } - - orm.Init() - - go startControlTun() - go startDataTun() - select {} -} - -func startDataTun() { - dataPort := os.Getenv("APP_DATA_PORT") - if dataPort == "" { - panic("环境变量 APP_DATA_PORT 未设置") - } - - slog.Info("监听数据端口 " + dataPort) - lData, err := net.Listen("tcp", ":"+dataPort) - if err != nil { - slog.Error("listen error", err) - return - } - defer lData.Close() - - for { - conn, err := lData.Accept() - if err != nil { - slog.Error("accept error", err) - return - } - processDataConn(conn) - } -} - -func processDataConn(client net.Conn) { - - slog.Info("已建立客户端数据通道 " + client.RemoteAddr().String()) - - // 读取 tag - tagLen, err := utils.ReadByte(client) - if err != nil { - slog.Error("read error", err) - return - } - tagBuf, err := utils.ReadBuffer(client, int(tagLen)) - if err != nil { - slog.Error("read error", err) - return - } - tag := string(tagBuf) - - // 找到用户连接 - data, ok := connMap[tag] - if !ok { - slog.Error("no such connection") - return - } - - // 响应用户 - user := data.Conn - socks5.SendSuccess(user, client) - - // 写入目标地址 - _, err = client.Write([]byte{byte(len(data.Dest))}) - if err != nil { - slog.Error("写入目标地址失败", err) - return - } - _, err = client.Write([]byte(data.Dest)) - if err != nil { - slog.Error("写入目标地址失败", err) - return - } - - // 数据转发 - slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest) - errCh := make(chan error) - go func() { - _, err := io.Copy(client, user) - if err != nil { - slog.Error("processDataConn error c2u", err) - } - errCh <- err - }() - go func() { - _, err := io.Copy(user, client) - if err != nil { - slog.Error("processDataConn error u2c", err) - } - errCh <- err - }() - <-errCh - slog.Info("数据转发结束 " + client.RemoteAddr().String() + " <-> " + data.Dest) - defer func() { - err := user.Close() - if err != nil { - slog.Error("close error", err) - } - err = client.Close() - if err != nil { - slog.Error("close error", err) - } - }() -} - -func startControlTun() { - ctrlPort := os.Getenv("APP_CTRL_PORT") - if ctrlPort == "" { - panic("环境变量 APP_CTRL_PORT 未设置") - } - - slog.Info("监听控制端口 " + ctrlPort) - lControl, err := net.Listen("tcp", ":"+ctrlPort) - if err != nil { - slog.Error("listen error", err) - return - } - defer lControl.Close() - - for { - conn, err := lControl.Accept() - if err != nil { - slog.Error("accept error", err) - return - } - go processController(conn) - } -} - -func processController(controller net.Conn) { - defer controller.Close() - - slog.Info("收到客户端控制连接 " + controller.RemoteAddr().String()) - - reader := bufio.NewReader(controller) - - // 读取端口 - portBuf := make([]byte, 2) - _, err := io.ReadFull(reader, portBuf) - if err != nil { - slog.Error("读取转发端口失败", "err", err) - return - } - port := binary.BigEndian.Uint16(portBuf) - - // 新建代理服务 - slog.Info("新建代理服务", "port", port) - proxy, err := socks5.New(&socks5.Config{ - Name: strconv.Itoa(int(port)), - Port: port, - AuthMethods: []socks5.Authenticator{ - &UserPassAuthenticator{}, - &NoAuthAuthenticator{}, - }, - }) - if err != nil { - slog.Error("代理服务创建失败", err) - return - } - - go func() { - err := proxy.Run() - if err != nil { - slog.Error("代理服务建立失败", err) - return - } - }() - - slog.Info("代理服务已建立", "port", port) - for { - user := <-proxy.Conn - tag := user.Tag() - _, err := controller.Write([]byte{byte(len(tag))}) - if err != nil { - slog.Error("write error", err) - return - } - _, err = controller.Write([]byte(tag)) - slog.Info("已通知客户端建立数据通道") - if err != nil { - slog.Error("write error", err) - return - } - connMap[tag] = user - } -} - -type NoAuthAuthenticator struct { -} - -func (a *NoAuthAuthenticator) Method() socks5.AuthMethod { - return socks5.NoAuth -} - -func (a *NoAuthAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) { - - // 检查用户是否在白名单中,如果不在则返回错误 - conn, ok := writer.(net.Conn) - if !ok { - return nil, errors.New("noAuth 认证失败,无法获取连接信息") - } - - addr := conn.RemoteAddr().String() - client, _, err := net.SplitHostPort(addr) - if err != nil { - return nil, errors.Wrap(err, "noAuth 认证失败") - } - slog.Info("用户的地址为 " + client) - - // 检查此 ip 是否有权限访问目标 node - server, ok := ctx.Value("service").(*socks5.Server) - if !ok { - return nil, errors.New("noAuth 认证失败,无法获取服务信息") - } - - node := server.Name - slog.Debug(" 客户端 " + client + " 请求连接到 " + node) - - var channels []models.Channel - err = orm.DB. - Joins("INNER JOIN public.nodes n ON channels.node_id = n.id AND n.name = ?", node). - Joins("INNER JOIN public.users u ON channels.user_id = u.id"). - Joins("INNER JOIN public.user_ips ip ON u.id = ip.user_id AND ip.ip_address = ?", client). - Find(&channels).Error - if err != nil { - return nil, errors.New("noAuth 认证失败,查询用户权限失败") - } - - if len(channels) == 0 { - return nil, errors.New("noAuth 认证失败,没有权限") - } - if len(channels) > 1 { - slog.Warn(client + " + " + node + "的组合有多个权限结果,这是不应当存在的") - } - - channel := channels[0] - - if !channel.AuthIp { - return nil, errors.New("noAuth 认证失败,没有权限") - } - - if channel.Expiration.Before(time.Now()) { - return nil, errors.New("noAuth 认证失败,权限已过期") - } - - return &socks5.AuthContext{ - Method: socks5.NoAuth, - Timeout: 300, // todo - Payload: nil, - }, nil -} - -type UserPassAuthenticator struct { -} - -func (a *UserPassAuthenticator) Method() socks5.AuthMethod { - return socks5.UserPassAuth -} - -func (a *UserPassAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) { - - // 检查版本 - v, err := utils.ReadByte(reader) - if err != nil { - return nil, errors.Wrap(err, "读取版本号失败") - } - if v != socks5.AuthVersion { - _, err := writer.Write([]byte{socks5.SocksVersion, socks5.AuthFailure}) - if err != nil { - return nil, errors.Wrap(err, "响应认证失败") - } - return nil, errors.New("认证版本参数不正确") - } - - // 读取账号 - uLen, err := utils.ReadByte(reader) - if err != nil { - return nil, errors.Wrap(err, "读取用户名长度失败") - } - usernameBuf, err := utils.ReadBuffer(reader, int(uLen)) - if err != nil { - return nil, errors.Wrap(err, "读取用户名失败") - } - username := string(usernameBuf) - - // 读取密码 - pLen, err := utils.ReadByte(reader) - if err != nil { - return nil, errors.Wrap(err, "读取密码长度失败") - } - passwordBuf, err := utils.ReadBuffer(reader, int(pLen)) - if err != nil { - return nil, errors.Wrap(err, "读取密码失败") - } - password := string(passwordBuf) - - var channels []models.Channel - err = orm.DB. - Where(&models.Channel{ - Username: username, - }). - Find(&channels).Error - if err != nil { - return nil, errors.Wrap(err, "查询用户失败") - } - - if len(channels) == 0 { - return nil, errors.New("没有权限") - } - if len(channels) > 1 { - slog.Warn("用户有多个权限结果,这是不应当存在的") - } - channel := channels[0] - if !channel.AuthPass { - return nil, errors.New("没有权限") - } - - if channel.Expiration.Before(time.Now()) { - return nil, errors.New("权限已过期") - } - - // 检查密码 todo 哈希 - if channel.Password != password { - return nil, errors.New("密码错误") - } - - // 如果用户设置了双验证则检查 ip 是否在白名单中 - if channel.AuthIp { - conn, ok := writer.(net.Conn) - if !ok { - return nil, errors.New("无法获取连接信息") - } - - addr := conn.RemoteAddr().String() - client, _, err := net.SplitHostPort(addr) - if err != nil { - return nil, errors.Wrap(err, "无法获取连接信息") - } - - slog.Info("用户的地址为 " + client) - var ips []models.UserIp - err = orm.DB. - Where(&models.UserIp{ - UserId: channel.UserId, - IpAddress: client, - }). - Find(&ips).Error - if err != nil { - return nil, errors.Wrap(err, "查询用户 ip 失败") - } - - if len(ips) == 0 { - return nil, errors.New("没有权限") - } - } - - _, err = writer.Write([]byte{socks5.AuthVersion, socks5.AuthSuccess}) - if err != nil { - slog.Error("响应认证失败", err) - return nil, err - } - - return &socks5.AuthContext{ - Method: socks5.UserPassAuth, - Timeout: 300, // todo - Payload: nil, - }, nil + server.Start() } diff --git a/gen.go b/gen.go index ad183db..c58bdca 100644 --- a/gen.go +++ b/gen.go @@ -1,8 +1,9 @@ package main import ( + "proxy-server/server/pkg/orm" + "gorm.io/gen" - "proxy-server/server/orm" ) const ( diff --git a/main.go b/main.go index 69fe366..e85d256 100644 --- a/main.go +++ b/main.go @@ -5,5 +5,5 @@ import ( ) func main() { - server.Start() + server.Start2() } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index c288da2..c21399d 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -1,8 +1,13 @@ package utils import ( + "context" "io" "log/slog" + "net" + "sync" + + "github.com/pkg/errors" ) func ReadByte(reader io.Reader) (byte, error) { @@ -31,3 +36,42 @@ func Close[T io.Closer](v T) { slog.Warn("对象关闭失败", "err", err) } } + +func ConnChan(ctx context.Context, ls net.Listener) chan net.Conn { + connCh := make(chan net.Conn) + go func() { + for { + conn, err := ls.Accept() + if err != nil { + slog.Error("接受连接失败", err) + // 临时错误重试连接 + var ne net.Error + if errors.As(err, &ne) && ne.Temporary() { + slog.Debug("临时错误重试") + continue + } + return + } + // ctx 取消后退出 + select { + case <-ctx.Done(): + Close(conn) + return + case connCh <- conn: + } + } + }() + return connCh +} + +func WaitChan(ctx context.Context, wg *sync.WaitGroup) chan struct{} { + ch := make(chan struct{}) + go func() { + wg.Wait() + select { + case <-ctx.Done(): + case ch <- struct{}{}: + } + }() + return ch +} diff --git a/server/fwd/service.go b/server/fwd/service.go new file mode 100644 index 0000000..8ed4500 --- /dev/null +++ b/server/fwd/service.go @@ -0,0 +1,501 @@ +package fwd + +import ( + "bufio" + "context" + "encoding/binary" + "io" + "log/slog" + "net" + "proxy-server/pkg/utils" + "proxy-server/server/pkg/env" + "proxy-server/server/pkg/orm" + "proxy-server/server/pkg/socks5" + "proxy-server/server/web/app/models" + "strconv" + "sync" + "time" + + "github.com/pkg/errors" +) + +type Config struct { +} + +type Service struct { + Config *Config + ConnMap map[string]socks5.ProxyData + ctrlConnWg sync.WaitGroup + dataConnWg sync.WaitGroup +} + +func New(config *Config) *Service { + _config := config + if _config == nil { + _config = &Config{} + } + + return &Service{ + Config: _config, + ConnMap: make(map[string]socks5.ProxyData), + ctrlConnWg: sync.WaitGroup{}, + dataConnWg: sync.WaitGroup{}, + } +} + +func (s *Service) Run(ctx context.Context, errCh chan error) { + defer func() { + err := recover() + if err != nil { + slog.Error("服务由于意外的 panic 导致退出", err) + } + }() + + slog.Info("启动 fwd 服务") + + // 启动工作协程 + subCtx, cancel := context.WithCancel(ctx) + defer cancel() + + goNum := 2 + subErrCh := make(chan error, goNum) + defer close(subErrCh) + + go s.startCtrlTun(subCtx, subErrCh) + go s.startDataTun(subCtx, subErrCh) + + // 等待结束 + var firstSubErr error = nil + for i := 0; i < goNum; i++ { + err := <-subErrCh + if err != nil { + slog.Error("隧道错误关闭", "err", err) + if firstSubErr == nil { + firstSubErr = err + cancel() + } + } else { + slog.Info("隧道关闭") + } + } + + slog.Info("fwd 服务已结束") + errCh <- firstSubErr +} + +func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) { + ctrlPort := env.AppCtrlPort + slog.Debug("监听控制通道", slog.Uint64("port", uint64(ctrlPort))) + + // 监听端口 + ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort))) + if err != nil { + slog.Error("监听控制通道失败", "err", err) + return + } + defer utils.Close(ls) + + // 等待连接 + connCh := utils.ConnChan(ctx, ls) + defer close(connCh) + + // 处理连接 +loop: + for { + select { + case <-ctx.Done(): + slog.Debug("结束处理连接,由于上下文取消") + break loop + case conn, ok := <-connCh: + if !ok { + slog.Debug("结束处理连接,由于获取连接失败") + break loop + } + go s.processCtrlConn(conn) + } + } + + // 等待子协程结束 todo 可配置等待时间 + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + procCh := utils.WaitChan(timeout, &s.ctrlConnWg) + defer close(procCh) + + select { + case <-timeout.Done(): + slog.Warn("等待控制通道子协程结束超时") + case <-procCh: + slog.Info("控制通道子协程结束") + } + + slog.Debug("关闭控制通道") + errCh <- nil +} + +func (s *Service) processCtrlConn(controller net.Conn) { + defer func() { + s.ctrlConnWg.Done() + utils.Close(controller) + }() + + slog.Info("收到客户端控制连接 " + controller.RemoteAddr().String()) + + reader := bufio.NewReader(controller) + + // 读取端口 + portBuf := make([]byte, 2) + _, err := io.ReadFull(reader, portBuf) + if err != nil { + slog.Error("读取转发端口失败", "err", err) + return + } + port := binary.BigEndian.Uint16(portBuf) + + // 新建代理服务 + slog.Info("新建代理服务", "port", port) + proxy, err := socks5.New(&socks5.Config{ + Name: strconv.Itoa(int(port)), + Port: port, + AuthMethods: []socks5.Authenticator{ + &UserPassAuthenticator{}, + &NoAuthAuthenticator{}, + }, + }) + if err != nil { + slog.Error("代理服务创建失败", err) + return + } + + go func() { + err := proxy.Run() + if err != nil { + slog.Error("代理服务建立失败", err) + return + } + }() + + slog.Info("代理服务已建立", "port", port) + for { + user := <-proxy.Conn + tag := user.Tag() + _, err := controller.Write([]byte{byte(len(tag))}) + if err != nil { + slog.Error("write error", err) + return + } + _, err = controller.Write([]byte(tag)) + slog.Info("已通知客户端建立数据通道") + if err != nil { + slog.Error("write error", err) + return + } + s.ConnMap[tag] = user + } +} + +func (s *Service) startDataTun(ctx context.Context, errCh chan error) { + dataPort := env.AppDataPort + slog.Debug("监听数据通道", slog.Uint64("port", uint64(dataPort))) + + // 监听端口 + lData, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort))) + if err != nil { + slog.Error("listen error", err) + return + } + defer utils.Close(lData) + + // 等待连接 + connCh := utils.ConnChan(ctx, lData) + defer close(connCh) + + // 处理连接 +loop: + for { + select { + case <-ctx.Done(): + slog.Debug("结束处理连接,由于上下文取消") + break loop + case conn, ok := <-connCh: + if !ok { + slog.Debug("结束处理连接,由于获取连接失败") + break loop + } + go s.processDataConn(conn) + } + } + + // 等待子协程结束 todo 可配置等待时间 + timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + procCh := utils.WaitChan(timeout, &s.dataConnWg) + defer close(procCh) + + select { + case <-timeout.Done(): + slog.Warn("等待数据通道子协程结束超时") + case <-procCh: + slog.Info("数据通道子协程结束") + } + + slog.Debug("关闭数据通道") + errCh <- nil +} + +func (s *Service) processDataConn(client net.Conn) { + + slog.Info("已建立客户端数据通道 " + client.RemoteAddr().String()) + + // 读取 tag + tagLen, err := utils.ReadByte(client) + if err != nil { + slog.Error("read error", err) + return + } + tagBuf, err := utils.ReadBuffer(client, int(tagLen)) + if err != nil { + slog.Error("read error", err) + return + } + tag := string(tagBuf) + + // 找到用户连接 + data, ok := s.ConnMap[tag] + if !ok { + slog.Error("no such connection") + return + } + + // 响应用户 + user := data.Conn + socks5.SendSuccess(user, client) + + // 写入目标地址 + _, err = client.Write([]byte{byte(len(data.Dest))}) + if err != nil { + slog.Error("写入目标地址失败", err) + return + } + _, err = client.Write([]byte(data.Dest)) + if err != nil { + slog.Error("写入目标地址失败", err) + return + } + + // 数据转发 + slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest) + errCh := make(chan error) + go func() { + _, err := io.Copy(client, user) + if err != nil { + slog.Error("processDataConn error c2u", err) + } + errCh <- err + }() + go func() { + _, err := io.Copy(user, client) + if err != nil { + slog.Error("processDataConn error u2c", err) + } + errCh <- err + }() + <-errCh + slog.Info("数据转发结束 " + client.RemoteAddr().String() + " <-> " + data.Dest) + defer func() { + err := user.Close() + if err != nil { + slog.Error("close error", err) + } + err = client.Close() + if err != nil { + slog.Error("close error", err) + } + }() +} + +type NoAuthAuthenticator struct { +} + +func (a *NoAuthAuthenticator) Method() socks5.AuthMethod { + return socks5.NoAuth +} + +func (a *NoAuthAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) { + + // 获取用户地址 + conn, ok := writer.(net.Conn) + if !ok { + return nil, errors.New("noAuth 认证失败,无法获取连接信息") + } + addr := conn.RemoteAddr().String() + client, _, err := net.SplitHostPort(addr) + if err != nil { + return nil, errors.Wrap(err, "noAuth 认证失败") + } + slog.Debug("用户的地址为 " + client) + + // 获取服务 + server, ok := ctx.Value("service").(*socks5.Server) + if !ok { + return nil, errors.New("noAuth 认证失败,无法获取服务信息") + } + node := server.Name + slog.Debug("服务的名称为 " + server.Name) + + // 查询权限记录 + slog.Info(" 客户端 " + client + " 请求连接到 " + node) + var channels []models.Channel + err = orm.DB. + Joins("INNER JOIN public.nodes n ON channels.node_id = n.id AND n.name = ?", node). + Joins("INNER JOIN public.users u ON channels.user_id = u.id"). + Joins("INNER JOIN public.user_ips ip ON u.id = ip.user_id AND ip.ip_address = ?", client). + Where(&models.Channel{ + AuthIp: true, + }). + Find(&channels).Error + if err != nil { + return nil, errors.New("noAuth 查询用户权限失败") + } + + // 记录应该只有一条 + channel, err := orm.MaySingle(channels) + if err != nil { + return nil, errors.Wrap(err, "noAuth 没有权限") + } + + // 检查是否需要密码认证 + if channel.AuthPass { + return nil, errors.New("noAuth 没有权限,需要密码认证") + } + + // 检查权限是否过期 + timeout := uint(channel.Expiration.Sub(time.Now()).Seconds()) + if timeout <= 0 { + return nil, errors.New("noAuth 权限已过期") + } + slog.Debug("权限剩余时间", slog.Uint64("timeout", uint64(timeout))) + + return &socks5.AuthContext{ + Method: socks5.NoAuth, + Timeout: timeout, + Payload: nil, + }, nil +} + +type UserPassAuthenticator struct { +} + +func (a *UserPassAuthenticator) Method() socks5.AuthMethod { + return socks5.UserPassAuth +} + +func (a *UserPassAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) { + + // 检查认证版本 + slog.Debug("验证认证版本") + v, err := utils.ReadByte(reader) + if err != nil { + return nil, errors.Wrap(err, "读取版本号失败") + } + if v != socks5.AuthVersion { + _, err := writer.Write([]byte{socks5.SocksVersion, socks5.AuthFailure}) + if err != nil { + return nil, errors.Wrap(err, "响应认证失败") + } + return nil, errors.New("认证版本参数不正确") + } + + // 读取账号 + slog.Debug("验证用户账号") + uLen, err := utils.ReadByte(reader) + if err != nil { + return nil, errors.Wrap(err, "读取用户名长度失败") + } + usernameBuf, err := utils.ReadBuffer(reader, int(uLen)) + if err != nil { + return nil, errors.Wrap(err, "读取用户名失败") + } + username := string(usernameBuf) + + // 读取密码 + pLen, err := utils.ReadByte(reader) + if err != nil { + return nil, errors.Wrap(err, "读取密码长度失败") + } + passwordBuf, err := utils.ReadBuffer(reader, int(pLen)) + if err != nil { + return nil, errors.Wrap(err, "读取密码失败") + } + password := string(passwordBuf) + + // 查询通道配置 + var channel models.Channel + err = orm.DB. + Where(&models.Channel{ + Username: username, + AuthPass: true, + }). + First(&channel).Error + if err != nil { + return nil, errors.Wrap(err, "查询用户失败") + } + + // 检查密码 todo 哈希 + if channel.Password != password { + return nil, errors.New("密码错误") + } + + // 检查权限是否过期 + timeout := uint(channel.Expiration.Sub(time.Now()).Seconds()) + if timeout <= 0 { + return nil, errors.New("权限已过期") + } + + // 如果用户设置了双验证则检查 ip 是否在白名单中 + if channel.AuthIp { + slog.Debug("验证用户 ip") + + // 获取用户地址 + conn, ok := writer.(net.Conn) + if !ok { + return nil, errors.New("无法获取连接信息") + } + addr := conn.RemoteAddr().String() + client, _, err := net.SplitHostPort(addr) + if err != nil { + return nil, errors.Wrap(err, "无法获取连接信息") + } + + // 查询通道配置 + var ips []models.UserIp + err = orm.DB. + Where(&models.UserIp{ + UserId: channel.UserId, + IpAddress: client, + }). + Find(&ips).Error + if err != nil { + return nil, errors.Wrap(err, "查询用户 ip 失败") + } + + // 检查是否在白名单中 + if len(ips) == 0 { + return nil, errors.New("没有权限") + } + } + + // 响应认证成功 + _, err = writer.Write([]byte{socks5.AuthVersion, socks5.AuthSuccess}) + if err != nil { + slog.Error("响应认证失败", err) + return nil, err + } + + return &socks5.AuthContext{ + Method: socks5.UserPassAuth, + Timeout: 300, // todo + Payload: nil, + }, nil +} diff --git a/server/monitor/monitor.go b/server/mnt/service.go similarity index 98% rename from server/monitor/monitor.go rename to server/mnt/service.go index fa4ced3..58337f9 100644 --- a/server/monitor/monitor.go +++ b/server/mnt/service.go @@ -1,12 +1,13 @@ -package monitor +package mnt import ( "context" "encoding/hex" + "log/slog" + "github.com/google/gopacket" "github.com/google/gopacket/pcap" "github.com/pkg/errors" - "log/slog" ) func Start(ctx context.Context, errCh chan error) { diff --git a/server/pkg/env/env.go b/server/pkg/env/env.go new file mode 100644 index 0000000..9b3f36f --- /dev/null +++ b/server/pkg/env/env.go @@ -0,0 +1,86 @@ +package env + +import ( + "fmt" + "log/slog" + "os" + "strconv" + + "github.com/joho/godotenv" +) + +var ( + AppCtrlPort uint16 + AppDataPort uint16 + + DbHost string + DbPort uint16 + DbDatabase string + DbUsername string + DbPassword string + DbTimezone string +) + +func Init() { + + // 加载 .env 文件 + err := godotenv.Load() + if err != nil { + slog.Debug("没有本地环境变量文件") + } + + appCtrlPortStr := os.Getenv("APP_CTRL_PORT") + if appCtrlPortStr == "" { + panic("环境变量 APP_CTRL_PORT 未设置") + } + appCtrlPort, err := strconv.ParseUint(appCtrlPortStr, 10, 16) + if err != nil { + panic(fmt.Sprintf("环境变量 APP_CTRL_PORT 格式错误: %v", err)) + } + AppCtrlPort = uint16(appCtrlPort) + + appDataPortStr := os.Getenv("APP_DATA_PORT") + if appDataPortStr == "" { + panic("环境变量 APP_DATA_PORT 未设置") + } + appDataPort, err := strconv.ParseUint(appDataPortStr, 10, 16) + if err != nil { + panic(fmt.Sprintf("环境变量 APP_DATA_PORT 格式错误: %v", err)) + } + AppDataPort = uint16(appDataPort) + + DbHost = os.Getenv("DB_HOST") + if DbHost == "" { + panic("环境变量 DB_HOST 未设置") + } + + dbPortStr := os.Getenv("DB_PORT") + if dbPortStr == "" { + dbPortStr = "5432" + } + dbPort, err := strconv.ParseUint(dbPortStr, 10, 16) + if err != nil { + panic(fmt.Sprintf("环境变量 DB_PORT 格式错误: %v", err)) + } + DbPort = uint16(dbPort) + + DbDatabase = os.Getenv("DB_DATABASE") + if DbDatabase == "" { + panic("环境变量 DB_DATABASE 未设置") + } + + DbUsername = os.Getenv("DB_USERNAME") + if DbUsername == "" { + panic("环境变量 DB_USERNAME 未设置") + } + + DbPassword = os.Getenv("DB_PASSWORD") + if DbPassword == "" { + panic("环境变量 DB_PASSWORD 未设置") + } + + DbTimezone = os.Getenv("DB_TIMEZONE") + if DbTimezone == "" { + DbTimezone = "Asia/Shanghai" + } +} diff --git a/server/orm/orm.go b/server/pkg/orm/orm.go similarity index 71% rename from server/orm/orm.go rename to server/pkg/orm/orm.go index 152e0ab..a702f48 100644 --- a/server/orm/orm.go +++ b/server/pkg/orm/orm.go @@ -2,10 +2,13 @@ package orm import ( "fmt" + "log/slog" + "os" + + "github.com/pkg/errors" "gorm.io/driver/postgres" "gorm.io/gorm" "gorm.io/gorm/logger" - "os" ) var DB *gorm.DB @@ -32,3 +35,14 @@ func Init() { DB = db } + +func MaySingle[T any](results []T) (*T, error) { + rsLen := len(results) + if rsLen == 0 { + return nil, errors.New("记录为空") + } + if rsLen > 1 { + slog.Warn("记录不唯一", "ids") + } + return &results[0], nil +} diff --git a/pkg/resp/resp.go b/server/pkg/resp/resp.go similarity index 100% rename from pkg/resp/resp.go rename to server/pkg/resp/resp.go diff --git a/pkg/socks5/auth.go b/server/pkg/socks5/auth.go similarity index 99% rename from pkg/socks5/auth.go rename to server/pkg/socks5/auth.go index b7d5ded..d7014ce 100644 --- a/pkg/socks5/auth.go +++ b/server/pkg/socks5/auth.go @@ -2,11 +2,12 @@ package socks5 import ( "context" - "github.com/pkg/errors" "io" "log/slog" "proxy-server/pkg/utils" "slices" + + "github.com/pkg/errors" ) type AuthMethod byte diff --git a/pkg/socks5/error.go b/server/pkg/socks5/error.go similarity index 100% rename from pkg/socks5/error.go rename to server/pkg/socks5/error.go diff --git a/pkg/socks5/request.go b/server/pkg/socks5/request.go similarity index 91% rename from pkg/socks5/request.go rename to server/pkg/socks5/request.go index ec27294..ae31ac1 100644 --- a/pkg/socks5/request.go +++ b/server/pkg/socks5/request.go @@ -3,12 +3,13 @@ package socks5 import ( "context" "fmt" - "github.com/pkg/errors" "io" "log/slog" "net" "proxy-server/pkg/utils" "strconv" + + "github.com/pkg/errors" ) const ( @@ -236,10 +237,10 @@ func (server *Server) handleConnect(ctx context.Context, conn net.Conn, req *Req return nil // 与目标服务器建立连接 - //server.config.Logger.Printf("与目标服务器建立连接\n") - //dial := server.config.Dial - //target, err := dial("tcp", req.realDestAddr.Address()) - //if err != nil { + // server.config.Logger.Printf("与目标服务器建立连接\n") + // dial := server.config.Dial + // target, err := dial("tcp", req.realDestAddr.Address()) + // if err != nil { // msg := err.Error() // resp := hostUnreachable // if strings.Contains(msg, "refused") { @@ -253,38 +254,38 @@ func (server *Server) handleConnect(ctx context.Context, conn net.Conn, req *Req // return fmt.Errorf("failed to send reply: %v", err) // } // return fmt.Errorf("request to %v failed: %v", req.DestAddr, err) - //} - //defer closeConnection(target) + // } + // defer closeConnection(target) // - //// 正常响应 - //slog.Info("连接成功,开始代理流量") + // // 正常响应 + // slog.Info("连接成功,开始代理流量") // - //local := target.LocalAddr().(*net.TCPAddr) - //bind := AddrSpec{IP: local.IP, Port: local.Port} - //err = sendReply(Conn, successReply, &bind) - //if err != nil { + // local := target.LocalAddr().(*net.TCPAddr) + // bind := AddrSpec{IP: local.IP, Port: local.Port} + // err = sendReply(Conn, successReply, &bind) + // if err != nil { // return fmt.Errorf("Failed to send reply: %v", err) - //} + // } // - //// 配置超时时间和行为 - //timeout := req.AuthContext.Timeout - //slog.Debug("超时时间", "timeout", timeout) + // // 配置超时时间和行为 + // timeout := req.AuthContext.Timeout + // slog.Debug("超时时间", "timeout", timeout) // - //timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) - //defer cancel() + // timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(timeout)*time.Second) + // defer cancel() // - //// 代理流量 - //errChan := make(chan error, 2) - //go func() { + // // 代理流量 + // errChan := make(chan error, 2) + // go func() { // _, err = io.Copy(target, req.bufConn) // errChan <- err - //}() - //go func() { + // }() + // go func() { // _, err = io.Copy(Conn, target) // errChan <- err - //}() + // }() // - //for { + // for { // select { // // case <-timeoutCtx.Done(): @@ -299,7 +300,7 @@ func (server *Server) handleConnect(ctx context.Context, conn net.Conn, req *Req // } // return nil // } - //} + // } } diff --git a/pkg/socks5/resolver.go b/server/pkg/socks5/resolver.go similarity index 100% rename from pkg/socks5/resolver.go rename to server/pkg/socks5/resolver.go diff --git a/pkg/socks5/ruleset.go b/server/pkg/socks5/ruleset.go similarity index 100% rename from pkg/socks5/ruleset.go rename to server/pkg/socks5/ruleset.go diff --git a/pkg/socks5/server.go b/server/pkg/socks5/server.go similarity index 100% rename from pkg/socks5/server.go rename to server/pkg/socks5/server.go diff --git a/server/service.go b/server/service.go index e3f597c..2ee0c03 100644 --- a/server/service.go +++ b/server/service.go @@ -2,16 +2,34 @@ package server import ( "context" + "log/slog" + "os" + "proxy-server/server/fwd" + "proxy-server/server/pkg/env" + "proxy-server/server/pkg/orm" + "proxy-server/server/web" + "github.com/joho/godotenv" "github.com/lmittmann/tint" "github.com/mattn/go-colorable" - "log/slog" - "os" - "proxy-server/server/orm" - "proxy-server/server/web" ) func Start() { + + // 初始化 + initLog() + env.Init() + orm.Init() + + // 启动代理服务 + fwd.New(nil).Run(context.Background(), make(chan error)) +} + +func initLog() { + slog.SetLogLoggerLevel(slog.LevelDebug) +} + +func Start2() { defer func() { err := recover() if err != nil { @@ -51,7 +69,7 @@ func Start() { defer cancel() go web.Start(ctxC, errChan) - //go monitor.Start(ctxC, errChan) + // go monitor.Start2(ctxC, errChan) slog.Info("服务启动成功") // 监听异常 diff --git a/server/web/app/handlers/channel.go b/server/web/app/handlers/channel.go index df0f421..beb57ad 100644 --- a/server/web/app/handlers/channel.go +++ b/server/web/app/handlers/channel.go @@ -1,14 +1,15 @@ package handlers import ( - "github.com/gin-gonic/gin" - "github.com/pkg/errors" "log/slog" - "proxy-server/pkg/resp" - "proxy-server/server/orm" + "proxy-server/server/pkg/orm" + "proxy-server/server/pkg/resp" "proxy-server/server/web/app/models" "strings" "time" + + "github.com/gin-gonic/gin" + "github.com/pkg/errors" ) // region frp 接口 diff --git a/server/web/app/handlers/node.go b/server/web/app/handlers/node.go index a99547c..d9a01f5 100644 --- a/server/web/app/handlers/node.go +++ b/server/web/app/handlers/node.go @@ -1,12 +1,13 @@ package handlers import ( + "os" + "proxy-server/server/pkg/orm" + "proxy-server/server/web/app/models" + "github.com/gin-gonic/gin" "github.com/pkg/errors" "gorm.io/gorm" - "os" - "proxy-server/server/orm" - "proxy-server/server/web/app/models" ) type NodeRegisterReq struct { diff --git a/server/web/auth/middleware.go b/server/web/auth/middleware.go index 84e38fc..b080e83 100644 --- a/server/web/auth/middleware.go +++ b/server/web/auth/middleware.go @@ -2,14 +2,15 @@ package auth import ( "encoding/base64" - "github.com/gin-gonic/gin" - "github.com/pkg/errors" "log/slog" "net/http" "os" - "proxy-server/pkg/resp" + "proxy-server/server/pkg/resp" "slices" "strings" + + "github.com/gin-gonic/gin" + "github.com/pkg/errors" ) func middleware(c *gin.Context) { diff --git a/server/web/web.go b/server/web/service.go similarity index 99% rename from server/web/web.go rename to server/web/service.go index d75bef6..5331d81 100644 --- a/server/web/web.go +++ b/server/web/service.go @@ -2,12 +2,13 @@ package web import ( "context" - "github.com/gin-gonic/gin" "log/slog" "net/http" "os" "proxy-server/server/web/auth" "proxy-server/server/web/router" + + "github.com/gin-gonic/gin" ) var server *http.Server