优化项目机构和服务端协程控制逻辑

This commit is contained in:
2025-02-25 14:48:50 +08:00
parent 83fd749d50
commit 7f23e2741f
21 changed files with 732 additions and 440 deletions

View File

@@ -1,16 +1,24 @@
## todo ## todo
监听进程信号,优雅关闭服务
加一个 log 包,实现全局日志格式控制
读取 conn 时加上超时机制 读取 conn 时加上超时机制
检查 ip 时需要判断同一 ip 的不同写法 检查 ip 时需要判断同一 ip 的不同写法
客户端重连后出现连接卡死的情况 客户端重连后出现连接卡死的情况
实现一个 socks context 以在子组件中获取 socks 相关信息
fwd 使用自定义 context 实现在一个上下文中控制 cancelerrch 和其他自定义数据
### 长期 ### 长期
考虑一下连接安全性 考虑一下连接安全性
内部接口 http2 或者 protobuf 内部接口 rtt 是否还有优化空间当前30-300ms根据内容大小增长
## 开发相关 ## 开发相关
@@ -29,6 +37,6 @@
### 更新测试环境 ### 更新测试环境
1. 构建项目 1. 构建项目
2. 使用测试配置远程启动 docker 2. 使用测试配置 `.env.test` 远程启动 docker
## 转发服务 ## 转发服务

View File

@@ -1,396 +1,9 @@
package main package main
import ( import (
"bufio" "proxy-server/server"
"context"
"encoding/binary"
"github.com/joho/godotenv"
"github.com/pkg/errors"
"io"
"log/slog"
"net"
"os"
"proxy-server/pkg/socks5"
"proxy-server/pkg/utils"
"proxy-server/server/orm"
"proxy-server/server/web/app/models"
"strconv"
"time"
) )
var connMap = make(map[string]socks5.ProxyData)
func main() { func main() {
slog.SetLogLoggerLevel(slog.LevelDebug) server.Start()
// 初始化环境变量
err := godotenv.Load()
if err != nil {
slog.Debug("没有本地环境变量文件")
}
orm.Init()
go startControlTun()
go startDataTun()
select {}
}
func startDataTun() {
dataPort := os.Getenv("APP_DATA_PORT")
if dataPort == "" {
panic("环境变量 APP_DATA_PORT 未设置")
}
slog.Info("监听数据端口 " + dataPort)
lData, err := net.Listen("tcp", ":"+dataPort)
if err != nil {
slog.Error("listen error", err)
return
}
defer lData.Close()
for {
conn, err := lData.Accept()
if err != nil {
slog.Error("accept error", err)
return
}
processDataConn(conn)
}
}
func processDataConn(client net.Conn) {
slog.Info("已建立客户端数据通道 " + client.RemoteAddr().String())
// 读取 tag
tagLen, err := utils.ReadByte(client)
if err != nil {
slog.Error("read error", err)
return
}
tagBuf, err := utils.ReadBuffer(client, int(tagLen))
if err != nil {
slog.Error("read error", err)
return
}
tag := string(tagBuf)
// 找到用户连接
data, ok := connMap[tag]
if !ok {
slog.Error("no such connection")
return
}
// 响应用户
user := data.Conn
socks5.SendSuccess(user, client)
// 写入目标地址
_, err = client.Write([]byte{byte(len(data.Dest))})
if err != nil {
slog.Error("写入目标地址失败", err)
return
}
_, err = client.Write([]byte(data.Dest))
if err != nil {
slog.Error("写入目标地址失败", err)
return
}
// 数据转发
slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest)
errCh := make(chan error)
go func() {
_, err := io.Copy(client, user)
if err != nil {
slog.Error("processDataConn error c2u", err)
}
errCh <- err
}()
go func() {
_, err := io.Copy(user, client)
if err != nil {
slog.Error("processDataConn error u2c", err)
}
errCh <- err
}()
<-errCh
slog.Info("数据转发结束 " + client.RemoteAddr().String() + " <-> " + data.Dest)
defer func() {
err := user.Close()
if err != nil {
slog.Error("close error", err)
}
err = client.Close()
if err != nil {
slog.Error("close error", err)
}
}()
}
func startControlTun() {
ctrlPort := os.Getenv("APP_CTRL_PORT")
if ctrlPort == "" {
panic("环境变量 APP_CTRL_PORT 未设置")
}
slog.Info("监听控制端口 " + ctrlPort)
lControl, err := net.Listen("tcp", ":"+ctrlPort)
if err != nil {
slog.Error("listen error", err)
return
}
defer lControl.Close()
for {
conn, err := lControl.Accept()
if err != nil {
slog.Error("accept error", err)
return
}
go processController(conn)
}
}
func processController(controller net.Conn) {
defer controller.Close()
slog.Info("收到客户端控制连接 " + controller.RemoteAddr().String())
reader := bufio.NewReader(controller)
// 读取端口
portBuf := make([]byte, 2)
_, err := io.ReadFull(reader, portBuf)
if err != nil {
slog.Error("读取转发端口失败", "err", err)
return
}
port := binary.BigEndian.Uint16(portBuf)
// 新建代理服务
slog.Info("新建代理服务", "port", port)
proxy, err := socks5.New(&socks5.Config{
Name: strconv.Itoa(int(port)),
Port: port,
AuthMethods: []socks5.Authenticator{
&UserPassAuthenticator{},
&NoAuthAuthenticator{},
},
})
if err != nil {
slog.Error("代理服务创建失败", err)
return
}
go func() {
err := proxy.Run()
if err != nil {
slog.Error("代理服务建立失败", err)
return
}
}()
slog.Info("代理服务已建立", "port", port)
for {
user := <-proxy.Conn
tag := user.Tag()
_, err := controller.Write([]byte{byte(len(tag))})
if err != nil {
slog.Error("write error", err)
return
}
_, err = controller.Write([]byte(tag))
slog.Info("已通知客户端建立数据通道")
if err != nil {
slog.Error("write error", err)
return
}
connMap[tag] = user
}
}
type NoAuthAuthenticator struct {
}
func (a *NoAuthAuthenticator) Method() socks5.AuthMethod {
return socks5.NoAuth
}
func (a *NoAuthAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) {
// 检查用户是否在白名单中,如果不在则返回错误
conn, ok := writer.(net.Conn)
if !ok {
return nil, errors.New("noAuth 认证失败,无法获取连接信息")
}
addr := conn.RemoteAddr().String()
client, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, errors.Wrap(err, "noAuth 认证失败")
}
slog.Info("用户的地址为 " + client)
// 检查此 ip 是否有权限访问目标 node
server, ok := ctx.Value("service").(*socks5.Server)
if !ok {
return nil, errors.New("noAuth 认证失败,无法获取服务信息")
}
node := server.Name
slog.Debug(" 客户端 " + client + " 请求连接到 " + node)
var channels []models.Channel
err = orm.DB.
Joins("INNER JOIN public.nodes n ON channels.node_id = n.id AND n.name = ?", node).
Joins("INNER JOIN public.users u ON channels.user_id = u.id").
Joins("INNER JOIN public.user_ips ip ON u.id = ip.user_id AND ip.ip_address = ?", client).
Find(&channels).Error
if err != nil {
return nil, errors.New("noAuth 认证失败,查询用户权限失败")
}
if len(channels) == 0 {
return nil, errors.New("noAuth 认证失败,没有权限")
}
if len(channels) > 1 {
slog.Warn(client + " + " + node + "的组合有多个权限结果,这是不应当存在的")
}
channel := channels[0]
if !channel.AuthIp {
return nil, errors.New("noAuth 认证失败,没有权限")
}
if channel.Expiration.Before(time.Now()) {
return nil, errors.New("noAuth 认证失败,权限已过期")
}
return &socks5.AuthContext{
Method: socks5.NoAuth,
Timeout: 300, // todo
Payload: nil,
}, nil
}
type UserPassAuthenticator struct {
}
func (a *UserPassAuthenticator) Method() socks5.AuthMethod {
return socks5.UserPassAuth
}
func (a *UserPassAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) {
// 检查版本
v, err := utils.ReadByte(reader)
if err != nil {
return nil, errors.Wrap(err, "读取版本号失败")
}
if v != socks5.AuthVersion {
_, err := writer.Write([]byte{socks5.SocksVersion, socks5.AuthFailure})
if err != nil {
return nil, errors.Wrap(err, "响应认证失败")
}
return nil, errors.New("认证版本参数不正确")
}
// 读取账号
uLen, err := utils.ReadByte(reader)
if err != nil {
return nil, errors.Wrap(err, "读取用户名长度失败")
}
usernameBuf, err := utils.ReadBuffer(reader, int(uLen))
if err != nil {
return nil, errors.Wrap(err, "读取用户名失败")
}
username := string(usernameBuf)
// 读取密码
pLen, err := utils.ReadByte(reader)
if err != nil {
return nil, errors.Wrap(err, "读取密码长度失败")
}
passwordBuf, err := utils.ReadBuffer(reader, int(pLen))
if err != nil {
return nil, errors.Wrap(err, "读取密码失败")
}
password := string(passwordBuf)
var channels []models.Channel
err = orm.DB.
Where(&models.Channel{
Username: username,
}).
Find(&channels).Error
if err != nil {
return nil, errors.Wrap(err, "查询用户失败")
}
if len(channels) == 0 {
return nil, errors.New("没有权限")
}
if len(channels) > 1 {
slog.Warn("用户有多个权限结果,这是不应当存在的")
}
channel := channels[0]
if !channel.AuthPass {
return nil, errors.New("没有权限")
}
if channel.Expiration.Before(time.Now()) {
return nil, errors.New("权限已过期")
}
// 检查密码 todo 哈希
if channel.Password != password {
return nil, errors.New("密码错误")
}
// 如果用户设置了双验证则检查 ip 是否在白名单中
if channel.AuthIp {
conn, ok := writer.(net.Conn)
if !ok {
return nil, errors.New("无法获取连接信息")
}
addr := conn.RemoteAddr().String()
client, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, errors.Wrap(err, "无法获取连接信息")
}
slog.Info("用户的地址为 " + client)
var ips []models.UserIp
err = orm.DB.
Where(&models.UserIp{
UserId: channel.UserId,
IpAddress: client,
}).
Find(&ips).Error
if err != nil {
return nil, errors.Wrap(err, "查询用户 ip 失败")
}
if len(ips) == 0 {
return nil, errors.New("没有权限")
}
}
_, err = writer.Write([]byte{socks5.AuthVersion, socks5.AuthSuccess})
if err != nil {
slog.Error("响应认证失败", err)
return nil, err
}
return &socks5.AuthContext{
Method: socks5.UserPassAuth,
Timeout: 300, // todo
Payload: nil,
}, nil
} }

3
gen.go
View File

@@ -1,8 +1,9 @@
package main package main
import ( import (
"proxy-server/server/pkg/orm"
"gorm.io/gen" "gorm.io/gen"
"proxy-server/server/orm"
) )
const ( const (

View File

@@ -5,5 +5,5 @@ import (
) )
func main() { func main() {
server.Start() server.Start2()
} }

View File

@@ -1,8 +1,13 @@
package utils package utils
import ( import (
"context"
"io" "io"
"log/slog" "log/slog"
"net"
"sync"
"github.com/pkg/errors"
) )
func ReadByte(reader io.Reader) (byte, error) { func ReadByte(reader io.Reader) (byte, error) {
@@ -31,3 +36,42 @@ func Close[T io.Closer](v T) {
slog.Warn("对象关闭失败", "err", err) slog.Warn("对象关闭失败", "err", err)
} }
} }
func ConnChan(ctx context.Context, ls net.Listener) chan net.Conn {
connCh := make(chan net.Conn)
go func() {
for {
conn, err := ls.Accept()
if err != nil {
slog.Error("接受连接失败", err)
// 临时错误重试连接
var ne net.Error
if errors.As(err, &ne) && ne.Temporary() {
slog.Debug("临时错误重试")
continue
}
return
}
// ctx 取消后退出
select {
case <-ctx.Done():
Close(conn)
return
case connCh <- conn:
}
}
}()
return connCh
}
func WaitChan(ctx context.Context, wg *sync.WaitGroup) chan struct{} {
ch := make(chan struct{})
go func() {
wg.Wait()
select {
case <-ctx.Done():
case ch <- struct{}{}:
}
}()
return ch
}

501
server/fwd/service.go Normal file
View File

@@ -0,0 +1,501 @@
package fwd
import (
"bufio"
"context"
"encoding/binary"
"io"
"log/slog"
"net"
"proxy-server/pkg/utils"
"proxy-server/server/pkg/env"
"proxy-server/server/pkg/orm"
"proxy-server/server/pkg/socks5"
"proxy-server/server/web/app/models"
"strconv"
"sync"
"time"
"github.com/pkg/errors"
)
type Config struct {
}
type Service struct {
Config *Config
ConnMap map[string]socks5.ProxyData
ctrlConnWg sync.WaitGroup
dataConnWg sync.WaitGroup
}
func New(config *Config) *Service {
_config := config
if _config == nil {
_config = &Config{}
}
return &Service{
Config: _config,
ConnMap: make(map[string]socks5.ProxyData),
ctrlConnWg: sync.WaitGroup{},
dataConnWg: sync.WaitGroup{},
}
}
func (s *Service) Run(ctx context.Context, errCh chan error) {
defer func() {
err := recover()
if err != nil {
slog.Error("服务由于意外的 panic 导致退出", err)
}
}()
slog.Info("启动 fwd 服务")
// 启动工作协程
subCtx, cancel := context.WithCancel(ctx)
defer cancel()
goNum := 2
subErrCh := make(chan error, goNum)
defer close(subErrCh)
go s.startCtrlTun(subCtx, subErrCh)
go s.startDataTun(subCtx, subErrCh)
// 等待结束
var firstSubErr error = nil
for i := 0; i < goNum; i++ {
err := <-subErrCh
if err != nil {
slog.Error("隧道错误关闭", "err", err)
if firstSubErr == nil {
firstSubErr = err
cancel()
}
} else {
slog.Info("隧道关闭")
}
}
slog.Info("fwd 服务已结束")
errCh <- firstSubErr
}
func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) {
ctrlPort := env.AppCtrlPort
slog.Debug("监听控制通道", slog.Uint64("port", uint64(ctrlPort)))
// 监听端口
ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort)))
if err != nil {
slog.Error("监听控制通道失败", "err", err)
return
}
defer utils.Close(ls)
// 等待连接
connCh := utils.ConnChan(ctx, ls)
defer close(connCh)
// 处理连接
loop:
for {
select {
case <-ctx.Done():
slog.Debug("结束处理连接,由于上下文取消")
break loop
case conn, ok := <-connCh:
if !ok {
slog.Debug("结束处理连接,由于获取连接失败")
break loop
}
go s.processCtrlConn(conn)
}
}
// 等待子协程结束 todo 可配置等待时间
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
procCh := utils.WaitChan(timeout, &s.ctrlConnWg)
defer close(procCh)
select {
case <-timeout.Done():
slog.Warn("等待控制通道子协程结束超时")
case <-procCh:
slog.Info("控制通道子协程结束")
}
slog.Debug("关闭控制通道")
errCh <- nil
}
func (s *Service) processCtrlConn(controller net.Conn) {
defer func() {
s.ctrlConnWg.Done()
utils.Close(controller)
}()
slog.Info("收到客户端控制连接 " + controller.RemoteAddr().String())
reader := bufio.NewReader(controller)
// 读取端口
portBuf := make([]byte, 2)
_, err := io.ReadFull(reader, portBuf)
if err != nil {
slog.Error("读取转发端口失败", "err", err)
return
}
port := binary.BigEndian.Uint16(portBuf)
// 新建代理服务
slog.Info("新建代理服务", "port", port)
proxy, err := socks5.New(&socks5.Config{
Name: strconv.Itoa(int(port)),
Port: port,
AuthMethods: []socks5.Authenticator{
&UserPassAuthenticator{},
&NoAuthAuthenticator{},
},
})
if err != nil {
slog.Error("代理服务创建失败", err)
return
}
go func() {
err := proxy.Run()
if err != nil {
slog.Error("代理服务建立失败", err)
return
}
}()
slog.Info("代理服务已建立", "port", port)
for {
user := <-proxy.Conn
tag := user.Tag()
_, err := controller.Write([]byte{byte(len(tag))})
if err != nil {
slog.Error("write error", err)
return
}
_, err = controller.Write([]byte(tag))
slog.Info("已通知客户端建立数据通道")
if err != nil {
slog.Error("write error", err)
return
}
s.ConnMap[tag] = user
}
}
func (s *Service) startDataTun(ctx context.Context, errCh chan error) {
dataPort := env.AppDataPort
slog.Debug("监听数据通道", slog.Uint64("port", uint64(dataPort)))
// 监听端口
lData, err := net.Listen("tcp", ":"+strconv.Itoa(int(dataPort)))
if err != nil {
slog.Error("listen error", err)
return
}
defer utils.Close(lData)
// 等待连接
connCh := utils.ConnChan(ctx, lData)
defer close(connCh)
// 处理连接
loop:
for {
select {
case <-ctx.Done():
slog.Debug("结束处理连接,由于上下文取消")
break loop
case conn, ok := <-connCh:
if !ok {
slog.Debug("结束处理连接,由于获取连接失败")
break loop
}
go s.processDataConn(conn)
}
}
// 等待子协程结束 todo 可配置等待时间
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
procCh := utils.WaitChan(timeout, &s.dataConnWg)
defer close(procCh)
select {
case <-timeout.Done():
slog.Warn("等待数据通道子协程结束超时")
case <-procCh:
slog.Info("数据通道子协程结束")
}
slog.Debug("关闭数据通道")
errCh <- nil
}
func (s *Service) processDataConn(client net.Conn) {
slog.Info("已建立客户端数据通道 " + client.RemoteAddr().String())
// 读取 tag
tagLen, err := utils.ReadByte(client)
if err != nil {
slog.Error("read error", err)
return
}
tagBuf, err := utils.ReadBuffer(client, int(tagLen))
if err != nil {
slog.Error("read error", err)
return
}
tag := string(tagBuf)
// 找到用户连接
data, ok := s.ConnMap[tag]
if !ok {
slog.Error("no such connection")
return
}
// 响应用户
user := data.Conn
socks5.SendSuccess(user, client)
// 写入目标地址
_, err = client.Write([]byte{byte(len(data.Dest))})
if err != nil {
slog.Error("写入目标地址失败", err)
return
}
_, err = client.Write([]byte(data.Dest))
if err != nil {
slog.Error("写入目标地址失败", err)
return
}
// 数据转发
slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest)
errCh := make(chan error)
go func() {
_, err := io.Copy(client, user)
if err != nil {
slog.Error("processDataConn error c2u", err)
}
errCh <- err
}()
go func() {
_, err := io.Copy(user, client)
if err != nil {
slog.Error("processDataConn error u2c", err)
}
errCh <- err
}()
<-errCh
slog.Info("数据转发结束 " + client.RemoteAddr().String() + " <-> " + data.Dest)
defer func() {
err := user.Close()
if err != nil {
slog.Error("close error", err)
}
err = client.Close()
if err != nil {
slog.Error("close error", err)
}
}()
}
type NoAuthAuthenticator struct {
}
func (a *NoAuthAuthenticator) Method() socks5.AuthMethod {
return socks5.NoAuth
}
func (a *NoAuthAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) {
// 获取用户地址
conn, ok := writer.(net.Conn)
if !ok {
return nil, errors.New("noAuth 认证失败,无法获取连接信息")
}
addr := conn.RemoteAddr().String()
client, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, errors.Wrap(err, "noAuth 认证失败")
}
slog.Debug("用户的地址为 " + client)
// 获取服务
server, ok := ctx.Value("service").(*socks5.Server)
if !ok {
return nil, errors.New("noAuth 认证失败,无法获取服务信息")
}
node := server.Name
slog.Debug("服务的名称为 " + server.Name)
// 查询权限记录
slog.Info(" 客户端 " + client + " 请求连接到 " + node)
var channels []models.Channel
err = orm.DB.
Joins("INNER JOIN public.nodes n ON channels.node_id = n.id AND n.name = ?", node).
Joins("INNER JOIN public.users u ON channels.user_id = u.id").
Joins("INNER JOIN public.user_ips ip ON u.id = ip.user_id AND ip.ip_address = ?", client).
Where(&models.Channel{
AuthIp: true,
}).
Find(&channels).Error
if err != nil {
return nil, errors.New("noAuth 查询用户权限失败")
}
// 记录应该只有一条
channel, err := orm.MaySingle(channels)
if err != nil {
return nil, errors.Wrap(err, "noAuth 没有权限")
}
// 检查是否需要密码认证
if channel.AuthPass {
return nil, errors.New("noAuth 没有权限,需要密码认证")
}
// 检查权限是否过期
timeout := uint(channel.Expiration.Sub(time.Now()).Seconds())
if timeout <= 0 {
return nil, errors.New("noAuth 权限已过期")
}
slog.Debug("权限剩余时间", slog.Uint64("timeout", uint64(timeout)))
return &socks5.AuthContext{
Method: socks5.NoAuth,
Timeout: timeout,
Payload: nil,
}, nil
}
type UserPassAuthenticator struct {
}
func (a *UserPassAuthenticator) Method() socks5.AuthMethod {
return socks5.UserPassAuth
}
func (a *UserPassAuthenticator) Authenticate(ctx context.Context, reader io.Reader, writer io.Writer) (*socks5.AuthContext, error) {
// 检查认证版本
slog.Debug("验证认证版本")
v, err := utils.ReadByte(reader)
if err != nil {
return nil, errors.Wrap(err, "读取版本号失败")
}
if v != socks5.AuthVersion {
_, err := writer.Write([]byte{socks5.SocksVersion, socks5.AuthFailure})
if err != nil {
return nil, errors.Wrap(err, "响应认证失败")
}
return nil, errors.New("认证版本参数不正确")
}
// 读取账号
slog.Debug("验证用户账号")
uLen, err := utils.ReadByte(reader)
if err != nil {
return nil, errors.Wrap(err, "读取用户名长度失败")
}
usernameBuf, err := utils.ReadBuffer(reader, int(uLen))
if err != nil {
return nil, errors.Wrap(err, "读取用户名失败")
}
username := string(usernameBuf)
// 读取密码
pLen, err := utils.ReadByte(reader)
if err != nil {
return nil, errors.Wrap(err, "读取密码长度失败")
}
passwordBuf, err := utils.ReadBuffer(reader, int(pLen))
if err != nil {
return nil, errors.Wrap(err, "读取密码失败")
}
password := string(passwordBuf)
// 查询通道配置
var channel models.Channel
err = orm.DB.
Where(&models.Channel{
Username: username,
AuthPass: true,
}).
First(&channel).Error
if err != nil {
return nil, errors.Wrap(err, "查询用户失败")
}
// 检查密码 todo 哈希
if channel.Password != password {
return nil, errors.New("密码错误")
}
// 检查权限是否过期
timeout := uint(channel.Expiration.Sub(time.Now()).Seconds())
if timeout <= 0 {
return nil, errors.New("权限已过期")
}
// 如果用户设置了双验证则检查 ip 是否在白名单中
if channel.AuthIp {
slog.Debug("验证用户 ip")
// 获取用户地址
conn, ok := writer.(net.Conn)
if !ok {
return nil, errors.New("无法获取连接信息")
}
addr := conn.RemoteAddr().String()
client, _, err := net.SplitHostPort(addr)
if err != nil {
return nil, errors.Wrap(err, "无法获取连接信息")
}
// 查询通道配置
var ips []models.UserIp
err = orm.DB.
Where(&models.UserIp{
UserId: channel.UserId,
IpAddress: client,
}).
Find(&ips).Error
if err != nil {
return nil, errors.Wrap(err, "查询用户 ip 失败")
}
// 检查是否在白名单中
if len(ips) == 0 {
return nil, errors.New("没有权限")
}
}
// 响应认证成功
_, err = writer.Write([]byte{socks5.AuthVersion, socks5.AuthSuccess})
if err != nil {
slog.Error("响应认证失败", err)
return nil, err
}
return &socks5.AuthContext{
Method: socks5.UserPassAuth,
Timeout: 300, // todo
Payload: nil,
}, nil
}

View File

@@ -1,12 +1,13 @@
package monitor package mnt
import ( import (
"context" "context"
"encoding/hex" "encoding/hex"
"log/slog"
"github.com/google/gopacket" "github.com/google/gopacket"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/pkg/errors" "github.com/pkg/errors"
"log/slog"
) )
func Start(ctx context.Context, errCh chan error) { func Start(ctx context.Context, errCh chan error) {

86
server/pkg/env/env.go vendored Normal file
View File

@@ -0,0 +1,86 @@
package env
import (
"fmt"
"log/slog"
"os"
"strconv"
"github.com/joho/godotenv"
)
var (
AppCtrlPort uint16
AppDataPort uint16
DbHost string
DbPort uint16
DbDatabase string
DbUsername string
DbPassword string
DbTimezone string
)
func Init() {
// 加载 .env 文件
err := godotenv.Load()
if err != nil {
slog.Debug("没有本地环境变量文件")
}
appCtrlPortStr := os.Getenv("APP_CTRL_PORT")
if appCtrlPortStr == "" {
panic("环境变量 APP_CTRL_PORT 未设置")
}
appCtrlPort, err := strconv.ParseUint(appCtrlPortStr, 10, 16)
if err != nil {
panic(fmt.Sprintf("环境变量 APP_CTRL_PORT 格式错误: %v", err))
}
AppCtrlPort = uint16(appCtrlPort)
appDataPortStr := os.Getenv("APP_DATA_PORT")
if appDataPortStr == "" {
panic("环境变量 APP_DATA_PORT 未设置")
}
appDataPort, err := strconv.ParseUint(appDataPortStr, 10, 16)
if err != nil {
panic(fmt.Sprintf("环境变量 APP_DATA_PORT 格式错误: %v", err))
}
AppDataPort = uint16(appDataPort)
DbHost = os.Getenv("DB_HOST")
if DbHost == "" {
panic("环境变量 DB_HOST 未设置")
}
dbPortStr := os.Getenv("DB_PORT")
if dbPortStr == "" {
dbPortStr = "5432"
}
dbPort, err := strconv.ParseUint(dbPortStr, 10, 16)
if err != nil {
panic(fmt.Sprintf("环境变量 DB_PORT 格式错误: %v", err))
}
DbPort = uint16(dbPort)
DbDatabase = os.Getenv("DB_DATABASE")
if DbDatabase == "" {
panic("环境变量 DB_DATABASE 未设置")
}
DbUsername = os.Getenv("DB_USERNAME")
if DbUsername == "" {
panic("环境变量 DB_USERNAME 未设置")
}
DbPassword = os.Getenv("DB_PASSWORD")
if DbPassword == "" {
panic("环境变量 DB_PASSWORD 未设置")
}
DbTimezone = os.Getenv("DB_TIMEZONE")
if DbTimezone == "" {
DbTimezone = "Asia/Shanghai"
}
}

View File

@@ -2,10 +2,13 @@ package orm
import ( import (
"fmt" "fmt"
"log/slog"
"os"
"github.com/pkg/errors"
"gorm.io/driver/postgres" "gorm.io/driver/postgres"
"gorm.io/gorm" "gorm.io/gorm"
"gorm.io/gorm/logger" "gorm.io/gorm/logger"
"os"
) )
var DB *gorm.DB var DB *gorm.DB
@@ -32,3 +35,14 @@ func Init() {
DB = db DB = db
} }
func MaySingle[T any](results []T) (*T, error) {
rsLen := len(results)
if rsLen == 0 {
return nil, errors.New("记录为空")
}
if rsLen > 1 {
slog.Warn("记录不唯一", "ids")
}
return &results[0], nil
}

View File

@@ -2,11 +2,12 @@ package socks5
import ( import (
"context" "context"
"github.com/pkg/errors"
"io" "io"
"log/slog" "log/slog"
"proxy-server/pkg/utils" "proxy-server/pkg/utils"
"slices" "slices"
"github.com/pkg/errors"
) )
type AuthMethod byte type AuthMethod byte

View File

@@ -3,12 +3,13 @@ package socks5
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/pkg/errors"
"io" "io"
"log/slog" "log/slog"
"net" "net"
"proxy-server/pkg/utils" "proxy-server/pkg/utils"
"strconv" "strconv"
"github.com/pkg/errors"
) )
const ( const (

View File

@@ -2,16 +2,34 @@ package server
import ( import (
"context" "context"
"log/slog"
"os"
"proxy-server/server/fwd"
"proxy-server/server/pkg/env"
"proxy-server/server/pkg/orm"
"proxy-server/server/web"
"github.com/joho/godotenv" "github.com/joho/godotenv"
"github.com/lmittmann/tint" "github.com/lmittmann/tint"
"github.com/mattn/go-colorable" "github.com/mattn/go-colorable"
"log/slog"
"os"
"proxy-server/server/orm"
"proxy-server/server/web"
) )
func Start() { func Start() {
// 初始化
initLog()
env.Init()
orm.Init()
// 启动代理服务
fwd.New(nil).Run(context.Background(), make(chan error))
}
func initLog() {
slog.SetLogLoggerLevel(slog.LevelDebug)
}
func Start2() {
defer func() { defer func() {
err := recover() err := recover()
if err != nil { if err != nil {
@@ -51,7 +69,7 @@ func Start() {
defer cancel() defer cancel()
go web.Start(ctxC, errChan) go web.Start(ctxC, errChan)
//go monitor.Start(ctxC, errChan) // go monitor.Start2(ctxC, errChan)
slog.Info("服务启动成功") slog.Info("服务启动成功")
// 监听异常 // 监听异常

View File

@@ -1,14 +1,15 @@
package handlers package handlers
import ( import (
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"log/slog" "log/slog"
"proxy-server/pkg/resp" "proxy-server/server/pkg/orm"
"proxy-server/server/orm" "proxy-server/server/pkg/resp"
"proxy-server/server/web/app/models" "proxy-server/server/web/app/models"
"strings" "strings"
"time" "time"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
) )
// region frp 接口 // region frp 接口

View File

@@ -1,12 +1,13 @@
package handlers package handlers
import ( import (
"os"
"proxy-server/server/pkg/orm"
"proxy-server/server/web/app/models"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
"github.com/pkg/errors" "github.com/pkg/errors"
"gorm.io/gorm" "gorm.io/gorm"
"os"
"proxy-server/server/orm"
"proxy-server/server/web/app/models"
) )
type NodeRegisterReq struct { type NodeRegisterReq struct {

View File

@@ -2,14 +2,15 @@ package auth
import ( import (
"encoding/base64" "encoding/base64"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"proxy-server/pkg/resp" "proxy-server/server/pkg/resp"
"slices" "slices"
"strings" "strings"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
) )
func middleware(c *gin.Context) { func middleware(c *gin.Context) {

View File

@@ -2,12 +2,13 @@ package web
import ( import (
"context" "context"
"github.com/gin-gonic/gin"
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"proxy-server/server/web/auth" "proxy-server/server/web/auth"
"proxy-server/server/web/router" "proxy-server/server/web/router"
"github.com/gin-gonic/gin"
) )
var server *http.Server var server *http.Server