重构代理解析流程,引入端口混合协议转发
This commit is contained in:
142
server/fwd/dispatcher/dispatch.go
Normal file
142
server/fwd/dispatcher/dispatch.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package dispatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"net"
|
||||
"proxy-server/pkg/utils"
|
||||
"proxy-server/server/fwd/core"
|
||||
"proxy-server/server/fwd/socks"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/soheilhy/cmux"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
Port uint16
|
||||
Conn chan *core.Conn
|
||||
}
|
||||
|
||||
func New(port uint16) (*Server, error) {
|
||||
|
||||
if port == 0 {
|
||||
return nil, errors.New("port is required")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &Server{
|
||||
ctx,
|
||||
cancel,
|
||||
port,
|
||||
make(chan *core.Conn),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *Server) Close() {
|
||||
s.cancel()
|
||||
}
|
||||
|
||||
func (s *Server) Run() error {
|
||||
port := strconv.Itoa(int(s.Port))
|
||||
|
||||
ls, err := net.Listen("tcp", ":"+port)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "dispatcher 监听失败")
|
||||
}
|
||||
|
||||
m := cmux.New(ls)
|
||||
m.SetReadTimeout(5 * time.Second)
|
||||
|
||||
go func() {
|
||||
<-s.ctx.Done()
|
||||
close(s.Conn)
|
||||
m.Close()
|
||||
}()
|
||||
|
||||
socksLs := m.Match(cmux.PrefixMatcher(string([]byte{0x05})))
|
||||
defer utils.Close(socksLs)
|
||||
go func() {
|
||||
err = s.acceptSocks(socksLs)
|
||||
if err != nil {
|
||||
slog.Error("dispatcher socks accept error", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
httpLs := m.Match(cmux.HTTP1Fast("PATCH"))
|
||||
defer utils.Close(httpLs)
|
||||
go func() {
|
||||
err = s.acceptHttp(httpLs)
|
||||
if err != nil {
|
||||
slog.Error("dispatcher http accept error", "err", err)
|
||||
}
|
||||
}()
|
||||
|
||||
err = m.Serve()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "dispatcher serve error")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) acceptHttp(ls net.Listener) error {
|
||||
for {
|
||||
conn, err := ls.Accept()
|
||||
if err != nil {
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
return nil
|
||||
}
|
||||
var ne net.Error
|
||||
if errors.As(err, &ne) && ne.Temporary() {
|
||||
continue
|
||||
}
|
||||
return errors.Wrap(err, "dispatcher http accept error")
|
||||
}
|
||||
|
||||
go func() {
|
||||
err := s.processHttp(conn)
|
||||
if err != nil {
|
||||
slog.Error("dispatcher http process error", "err", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) acceptSocks(ls net.Listener) error {
|
||||
for {
|
||||
conn, err := ls.Accept()
|
||||
if err != nil {
|
||||
if errors.Is(err, net.ErrClosed) {
|
||||
return nil
|
||||
}
|
||||
var ne net.Error
|
||||
if errors.As(err, &ne) && ne.Temporary() {
|
||||
continue
|
||||
}
|
||||
return errors.Wrap(err, "dispatcher socks accept error")
|
||||
}
|
||||
|
||||
go func() {
|
||||
conn, err := socks.Process(s.ctx, conn)
|
||||
if err != nil {
|
||||
slog.Error("处理 socks 连接失败", "err", err)
|
||||
}
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
utils.Close(conn)
|
||||
case s.Conn <- conn:
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) processHttp(conn net.Conn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type Conn struct {
|
||||
}
|
||||
Reference in New Issue
Block a user