Files
proxy/gateway/fwd/user.go

94 lines
2.0 KiB
Go

package fwd
import (
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"log/slog"
"proxy-server/gateway/app"
"proxy-server/gateway/core"
"proxy-server/gateway/env"
"proxy-server/gateway/fwd/dispatcher"
"proxy-server/gateway/fwd/metrics"
"proxy-server/utils"
"time"
)
func ListenUser(ctx context.Context, port uint16, ctrl io.Writer) error {
dspt, err := dispatcher.New(port, time.Duration(env.AppUserRWTimeout)*time.Second)
if err != nil {
return err
}
defer dspt.Stop()
var errCh = make(chan error)
go func() {
err := dspt.Run()
if err != nil {
// slog.Error("代理服务运行失败", "err", err)
err = fmt.Errorf("协议嗅探服务运行失败: %w", err)
}
errCh <- err
}()
// 处理连接
for {
select {
case <-ctx.Done():
return nil
case err := <-errCh:
if err != nil {
err = fmt.Errorf("监听转发端口失败: %w", err)
}
return err
case user := <-dspt.Conn:
metrics.TimerAuth.Store(user.Conn, time.Now())
app.UserConnWg.Add(1)
go func() {
defer app.UserConnWg.Done()
err := processUserConn(ctx, user, ctrl)
if err != nil {
slog.Error("处理用户连接失败", "err", err)
utils.Close(user)
}
}()
}
}
}
func processUserConn(ctx context.Context, user *core.Conn, ctrl io.Writer) (err error) {
// 发送代理命令
err = sendProxy(ctrl, user.Tag, user.Dest.String())
if err != nil {
return err
}
// 保存用户连接
var tag = hex.EncodeToString(user.Tag[:])
app.UserConnMap.Store(tag, user)
// 如果限定时间内没有建立数据通道,则关闭连接
var timeout, cancel = context.WithTimeout(context.Background(), time.Duration(env.AppDataTimeout)*time.Second)
defer cancel()
select {
case <-timeout.Done():
err = timeout.Err()
case <-ctx.Done():
err = ctx.Err()
}
_, ok := app.UserConnMap.LoadAndDelete(tag)
if ok {
utils.Close(user)
if errors.Is(err, context.DeadlineExceeded) {
slog.Warn("建立数据通道超时", "tag", tag, "addr", user.RemoteAddr().String())
}
}
return nil
}