优化项目结构

This commit is contained in:
2025-02-27 18:07:00 +08:00
parent a541a7bd3a
commit 38d5341e84
9 changed files with 176 additions and 36 deletions

View File

@@ -1,22 +1,24 @@
## todo ## todo
监听进程信号,优雅关闭服务 配置退出等待时间
log 控制台颜色,输出错误堆栈
读取 conn 时加上超时机制 读取 conn 时加上超时机制
检查 ip 时需要判断同一 ip 的不同写法
代理节点超时控制 代理节点超时控制
实现一个 socks context 以在子组件中获取 socks 相关信息
网关根据代理节点对目标服务连接的反馈,决定向用户返回的 socks 响应 网关根据代理节点对目标服务连接的反馈,决定向用户返回的 socks 响应
数据通道池化 数据通道池化
在控制通道直接传输目标地址,客户端可以同时开始数据通道和目标地址的连接建立
### 长期 ### 长期
代理端口支持混合端口转发(支持 tcp_mux 实现一个 socks context 以在子组件中获取 socks 相关信息
代理端口支持混合端口转发
数据通道支持 tcp 多路复用(分离逻辑流) 数据通道支持 tcp 多路复用(分离逻辑流)

View File

@@ -1,6 +1,8 @@
# 应用配置
APP_CTRL_PORT=18080 APP_CTRL_PORT=18080
APP_DATA_PORT=18081 APP_DATA_PORT=18081
# 数据库配置
DB_HOST=localhost DB_HOST=localhost
DB_PORT=5432 DB_PORT=5432
DB_DATABASE=app DB_DATABASE=app

View File

@@ -5,6 +5,12 @@ import (
"sync/atomic" "sync/atomic"
) )
type WaitGroup interface {
Add(delta uint)
Done()
Wait()
}
type CountWaitGroup struct { type CountWaitGroup struct {
wg sync.WaitGroup wg sync.WaitGroup
num atomic.Int64 num atomic.Int64

57
server/fwd/analysis.go Normal file
View File

@@ -0,0 +1,57 @@
package fwd
import (
"bufio"
"io"
"log/slog"
"proxy-server/pkg/utils"
)
func analysis(reader io.Reader) {
buf := bufio.NewReader(reader)
first, err := buf.Peek(8)
if err != nil {
slog.Error("analysis peek error", "err", err)
} else {
switch {
case first[0] == 0x16:
analysisHttps(reader)
case
string(first[:4]) == "GET ",
// string(first[:4]) == "PUT ",
string(first[:5]) == "POST ":
// string(first[:4]) == "HEAD ",
// string(first[:4]) == "TRACE ",
// string(first[:4]) == "PATCH ",
// string(first[:4]) == "DELETE ",
// string(first[:4]) == "CONNECT ",
// string(first[:4]) == "OPTIONS ":
analysisHttp(reader)
}
}
discord(reader)
}
func analysisHttp(reader io.Reader) {
}
func analysisHttps(reader io.Reader) {
head, err := utils.ReadBuffer(reader, 5)
if err != nil {
slog.Error("analysis https err", "err", err)
return
}
if head[1] == 0x03 && head[2] == 0x03 {
// tls1.2
}
}
func discord(reader io.Reader) {
_, err := io.Copy(io.Discard, reader)
if err != nil {
slog.Error("analysis discord err", "err", err)
}
}

View File

@@ -30,27 +30,19 @@ type Service struct {
} }
func New(config *Config) *Service { func New(config *Config) *Service {
_config := config if config == nil {
if _config == nil { config = &Config{}
_config = &Config{}
} }
return &Service{ return &Service{
Config: _config, Config: config,
connMap: make(map[string]socks.ProxyConn), connMap: make(map[string]socks.ProxyConn),
ctrlConnWg: utils.CountWaitGroup{}, ctrlConnWg: utils.CountWaitGroup{},
dataConnWg: utils.CountWaitGroup{}, dataConnWg: utils.CountWaitGroup{},
} }
} }
func (s *Service) Run(ctx context.Context, errCh chan error) { func (s *Service) Run(ctx context.Context) {
defer func() {
err := recover()
if err != nil {
slog.Error("服务由于意外的 panic 导致退出", err)
}
}()
slog.Info("启动 fwd 服务") slog.Info("启动 fwd 服务")
// 启动工作协程 // 启动工作协程
@@ -61,7 +53,7 @@ func (s *Service) Run(ctx context.Context, errCh chan error) {
subErrCh := make(chan error, goNum) subErrCh := make(chan error, goNum)
defer close(subErrCh) defer close(subErrCh)
go s.startCtrlTun(subCtx, subErrCh) go s.startCtrlTun(subCtx)
go s.startDataTun(subCtx, subErrCh) go s.startDataTun(subCtx, subErrCh)
// 等待结束 // 等待结束
@@ -80,18 +72,20 @@ func (s *Service) Run(ctx context.Context, errCh chan error) {
} }
slog.Info("fwd 服务已结束") slog.Info("fwd 服务已结束")
errCh <- firstSubErr
} }
func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) { func (s *Service) Close() {
}
func (s *Service) startCtrlTun(ctx context.Context) error {
ctrlPort := env.AppCtrlPort ctrlPort := env.AppCtrlPort
slog.Debug("监听控制通道", slog.Uint64("port", uint64(ctrlPort))) slog.Debug("监听控制通道", slog.Uint64("port", uint64(ctrlPort)))
// 监听端口 // 监听端口
ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort))) ls, err := net.Listen("tcp", ":"+strconv.Itoa(int(ctrlPort)))
if err != nil { if err != nil {
slog.Error("监听控制通道失败", "err", err) return errors.Wrap(err, "监听控制通道失败")
return
} }
defer utils.Close(ls) defer utils.Close(ls)
@@ -120,9 +114,10 @@ func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) {
} }
// 等待子协程结束 todo 可配置等待时间 // 等待子协程结束 todo 可配置等待时间
s.Close()
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second) timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
procCh := utils.ChanWgWait(timeout, &s.ctrlConnWg) procCh := utils.ChanWgWait(timeout, &s.ctrlConnWg)
defer close(procCh) defer close(procCh)
@@ -134,7 +129,7 @@ func (s *Service) startCtrlTun(ctx context.Context, errCh chan error) {
} }
slog.Debug("关闭控制通道") slog.Debug("关闭控制通道")
errCh <- nil return nil
} }
func (s *Service) processCtrlConn(controller net.Conn) { func (s *Service) processCtrlConn(controller net.Conn) {
@@ -283,6 +278,11 @@ func (s *Service) processDataConn(client net.Conn) {
// 数据转发 // 数据转发
slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest) slog.Info("开始数据转发 " + client.RemoteAddr().String() + " <-> " + data.Dest)
// userPipeReader, userPipeWriter := io.Pipe()
// defer utils.Close(userPipeWriter)
// teeUser := io.TeeReader(user, userPipeWriter)
errCh := make(chan error) errCh := make(chan error)
go func() { go func() {
_, err := io.Copy(client, user) _, err := io.Copy(client, user)
@@ -291,6 +291,8 @@ func (s *Service) processDataConn(client net.Conn) {
} }
errCh <- err errCh <- err
}() }()
// go analysis(userPipeReader)
go func() { go func() {
_, err := io.Copy(user, client) _, err := io.Copy(user, client)
if err != nil { if err != nil {
@@ -298,6 +300,7 @@ func (s *Service) processDataConn(client net.Conn) {
} }
errCh <- err errCh <- err
}() }()
<-errCh <-errCh
slog.Info("数据转发结束 " + client.RemoteAddr().String() + " <-> " + data.Dest) slog.Info("数据转发结束 " + client.RemoteAddr().String() + " <-> " + data.Dest)
} }

View File

@@ -1,5 +0,0 @@
package logs
func Write(str ...string) {
}

View File

@@ -61,6 +61,10 @@ type Server struct {
// New 创建服务器 // New 创建服务器
func New(conf *Config) (*Server, error) { func New(conf *Config) (*Server, error) {
if conf == nil {
conf = &Config{}
}
if len(conf.AuthMethods) == 0 { if len(conf.AuthMethods) == 0 {
return nil, ConfigError("认证方法不能为空") return nil, ConfigError("认证方法不能为空")
} }

View File

@@ -2,12 +2,12 @@ package mnt
import ( import (
"context" "context"
"encoding/hex"
"log/slog" "log/slog"
"github.com/google/gopacket" "github.com/google/gopacket"
"github.com/google/gopacket/pcap" "github.com/google/gopacket/pcap"
"github.com/pkg/errors" "github.com/pkg/errors"
"golang.org/x/text/encoding/simplifiedchinese"
) )
func Start(ctx context.Context, errCh chan error) { func Start(ctx context.Context, errCh chan error) {
@@ -15,9 +15,12 @@ func Start(ctx context.Context, errCh chan error) {
// 打开一个网络接口 // 打开一个网络接口
device, err := pcap.OpenLive("WLAN", 1600, true, pcap.BlockForever) device, err := pcap.OpenLive("WLAN", 1600, true, pcap.BlockForever)
if err != nil { if err != nil {
b, er := hex.DecodeString("\\xbb") gbk := simplifiedchinese.GBK.NewDecoder()
slog.Debug("b", b, er) errMsg, err := gbk.String(err.Error())
errCh <- errors.Wrap(err, "打开网络接口失败") if err != nil {
errMsg = err.Error()
}
errCh <- errors.Wrap(err, "打开网络接口失败, "+errMsg)
return return
} }
defer device.Close() defer device.Close()

View File

@@ -4,16 +4,25 @@ import (
"context" "context"
"log/slog" "log/slog"
"os" "os"
"os/signal"
"proxy-server/pkg/utils"
"proxy-server/server/fwd" "proxy-server/server/fwd"
"proxy-server/server/pkg/env" "proxy-server/server/pkg/env"
"proxy-server/server/pkg/orm" "proxy-server/server/pkg/orm"
"proxy-server/server/web" "proxy-server/server/web"
"syscall"
"time"
"github.com/joho/godotenv" "github.com/joho/godotenv"
"github.com/lmittmann/tint" "github.com/lmittmann/tint"
"github.com/mattn/go-colorable" "github.com/mattn/go-colorable"
) )
type Context struct {
context.Context
log *slog.Logger
}
func Start() { func Start() {
// 初始化 // 初始化
@@ -21,14 +30,73 @@ func Start() {
env.Init() env.Init()
orm.Init() orm.Init()
// 启动代理服务 // 启动服务
fwd.New(nil).Run(context.Background(), make(chan error)) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
errQuit := make(chan error)
defer close(errQuit)
wg := utils.CountWaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := startFwdServer(ctx)
if err != nil {
slog.Error("代理服务发生错误", "err", err)
}
errQuit <- err
}()
// 等待退出信号
osQuit := make(chan os.Signal)
signal.Notify(osQuit, os.Interrupt, syscall.SIGTERM)
select {
case <-osQuit:
slog.Info("服务关闭")
case <-errQuit:
slog.Error("服务异常退出")
}
// 等待子服务退出
cancel()
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wgCh := utils.ChanWgWait(timeout, &wg)
select {
case <-timeout.Done():
slog.Error("关闭超时,强制关闭")
case <-wgCh:
slog.Info("服务已退出")
}
} }
func initLog() { func initLog() {
slog.SetLogLoggerLevel(slog.LevelDebug) slog.SetLogLoggerLevel(slog.LevelDebug)
} }
func startFwdServer(ctx context.Context) error {
server := fwd.New(nil)
go func() {
<-ctx.Done()
server.Close()
}()
server.Run(ctx)
return nil
}
func startMntServer(ctx context.Context) {
}
func startWebServer(ctx context.Context) {
}
func Start2() { func Start2() {
defer func() { defer func() {
err := recover() err := recover()