Files
proxy/server/fwd/dispatcher/dispatch.go

149 lines
2.6 KiB
Go
Raw Normal View History

package dispatcher
import (
"context"
"log/slog"
"net"
"proxy-server/pkg/utils"
"proxy-server/server/fwd/core"
"proxy-server/server/fwd/http"
"proxy-server/server/fwd/socks"
"strconv"
"time"
"github.com/pkg/errors"
"github.com/soheilhy/cmux"
)
type Server struct {
ctx context.Context
cancel context.CancelFunc
Port uint16
Conn chan *core.Conn
}
func New(port uint16) (*Server, error) {
if port == 0 {
return nil, errors.New("port is required")
}
ctx, cancel := context.WithCancel(context.Background())
return &Server{
ctx,
cancel,
port,
make(chan *core.Conn),
}, nil
}
func (s *Server) Close() {
s.cancel()
}
func (s *Server) Run() error {
port := strconv.Itoa(int(s.Port))
ls, err := net.Listen("tcp", ":"+port)
if err != nil {
return errors.Wrap(err, "dispatcher 监听失败")
}
m := cmux.New(ls)
m.SetReadTimeout(5 * time.Second)
go func() {
<-s.ctx.Done()
close(s.Conn)
m.Close()
}()
socksLs := m.Match(cmux.PrefixMatcher(string([]byte{0x05})))
defer utils.Close(socksLs)
go func() {
err = s.acceptSocks(socksLs)
if err != nil {
slog.Error("dispatcher socks accept error", "err", err)
}
}()
httpLs := m.Match(cmux.HTTP1Fast("PATCH"))
defer utils.Close(httpLs)
go func() {
err = s.acceptHttp(httpLs)
if err != nil {
slog.Error("dispatcher http accept error", "err", err)
}
}()
err = m.Serve()
if err != nil {
return errors.Wrap(err, "dispatcher serve error")
}
return nil
}
func (s *Server) acceptHttp(ls net.Listener) error {
for {
conn, err := ls.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
}
var ne net.Error
if errors.As(err, &ne) && ne.Temporary() {
continue
}
return errors.Wrap(err, "dispatcher http accept error")
}
go func() {
user, err := http.Process(s.ctx, conn)
if err != nil {
slog.Error("dispatcher http process error", "err", err)
utils.Close(conn)
return
}
select {
case <-s.ctx.Done():
utils.Close(user)
case s.Conn <- user:
}
}()
}
}
func (s *Server) acceptSocks(ls net.Listener) error {
for {
conn, err := ls.Accept()
if err != nil {
if errors.Is(err, net.ErrClosed) {
return nil
}
var ne net.Error
if errors.As(err, &ne) && ne.Temporary() {
continue
}
return errors.Wrap(err, "dispatcher socks accept error")
}
go func() {
user, err := socks.Process(s.ctx, conn)
if err != nil {
slog.Error("处理 socks 连接失败", "err", err)
2025-03-01 18:06:04 +08:00
utils.Close(conn)
return
}
select {
case <-s.ctx.Done():
utils.Close(user)
case s.Conn <- user:
}
}()
}
}
type Conn struct {
}