集成 Vector 日志处理

This commit is contained in:
2025-03-03 17:14:45 +08:00
parent cc0b74c5c2
commit 4b3cb8e354
8 changed files with 74 additions and 23 deletions

View File

@@ -1,8 +1,6 @@
## todo ## todo
认证失败应当是 Warn 级别而非 Error 级别,需要修改 客户端断开后端口未释放问题
考虑再修改逻辑,等待子协程退出不应当级联,而是放在包全局管理,否则流程可能有问题
ProxyConn 直接实现 Conn 相同的接口,不再取出 Conn 使用 ProxyConn 直接实现 Conn 相同的接口,不再取出 Conn 使用

View File

@@ -41,6 +41,23 @@ services:
- postgres - postgres
restart: always restart: always
vector:
container_name: proxy-server-dev-vector
build:
context: ./vector
dockerfile: Dockerfile
ports:
- "8686:8686"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /var/lib/docker/containers:/var/lib/docker/containers:ro
- /var/log/vector:/temp/vector
networks:
- proxy-server-test
depends_on:
- service
restart: always
networks: networks:
proxy-server-test: proxy-server-test:
driver: bridge driver: bridge

View File

@@ -0,0 +1,6 @@
FROM timberio/vector:0.45.0-debian
# Copy the configuration file
COPY ./vector.toml /etc/vector/vector.toml
CMD ["-c", "/etc/vector/vector.toml"]

View File

@@ -0,0 +1,28 @@
[sources.docker]
type = "docker_logs"
include_containers = ["proxy-server-dev-service"]
[transforms.parse]
type = "remap"
inputs = ["docker"]
source = ". = parse_json!(.message)"
[transforms.destinations]
type = "filter"
inputs = ["parse"]
condition = ".msg == \"用户访问记录\""
[sinks.file_out]
type = "file"
inputs = ["destinations"]
path = "/temp/vector/service-destinations/%Y-%m-%d.log"
encoding.codec = "csv"
encoding.csv.fields = ["time", "uid", "user", "proxy", "node", "proto", "dest", "domain"]
[sinks.file_out.buffer]
type = "disk"
max_size = 268435488
[api]
enabled = true
address = "0.0.0.0:8686"

View File

@@ -122,7 +122,7 @@ func isTls(bytes []byte) (string, string, bool) {
} }
func analysisHttp(reader *bufio.Reader) (string, error) { func analysisHttp(reader *bufio.Reader) (string, error) {
slog.Debug("analysis http")
// reade top // reade top
top, err := httpReadLine(reader) top, err := httpReadLine(reader)
if err != nil { if err != nil {
@@ -164,7 +164,6 @@ func httpReadLine(reader *bufio.Reader) (line string, err error) {
} }
func analysisTls(reader *bufio.Reader) (string, error) { func analysisTls(reader *bufio.Reader) (string, error) {
slog.Debug("analysis https")
// tls record // tls record
_, err := utils.ReadBuffer(reader, 5) _, err := utils.ReadBuffer(reader, 5)

View File

@@ -48,9 +48,11 @@ func (s *Server) Run() error {
if err != nil { if err != nil {
return errors.Wrap(err, "dispatcher 监听失败") return errors.Wrap(err, "dispatcher 监听失败")
} }
defer utils.Close(ls)
m := cmux.New(ls) m := cmux.New(ls)
m.SetReadTimeout(5 * time.Second) m.SetReadTimeout(5 * time.Second)
defer m.Close()
go func() { go func() {
<-s.ctx.Done() <-s.ctx.Done()

View File

@@ -125,7 +125,7 @@ func processHttps(ctx context.Context, req *Request) (*core.Conn, error) {
Conn: req.conn, Conn: req.conn,
Reader: req.reader, Reader: req.reader,
Tag: req.conn.RemoteAddr().String() + "_" + req.conn.LocalAddr().String(), Tag: req.conn.RemoteAddr().String() + "_" + req.conn.LocalAddr().String(),
Protocol: "https", Protocol: "http",
Dest: req.dest, Dest: req.dest,
Auth: req.auth, Auth: req.auth,
}, nil }, nil

View File

@@ -12,9 +12,6 @@ import (
"sync" "sync"
"syscall" "syscall"
"time" "time"
"github.com/lmittmann/tint"
"github.com/mattn/go-colorable"
) )
type Context struct { type Context struct {
@@ -78,19 +75,23 @@ func Start() {
} }
func initLog() { func initLog() {
writer := colorable.NewColorable(os.Stdout) // writer := colorable.NewColorable(os.Stdout)
logger := slog.New(tint.NewHandler(writer, &tint.Options{ // logger := slog.New(tint.NewHandler(writer, &tint.Options{
// Level: slog.LevelDebug,
// TimeFormat: time.RFC3339,
// ReplaceAttr: func(_ []string, attr slog.Attr) slog.Attr {
// err, ok := attr.Value.Any().(error)
// if !ok {
// return attr
// }
// return tint.Err(err)
// },
// }))
// slog.SetDefault(logger)
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug, Level: slog.LevelDebug,
TimeFormat: time.RFC3339, })
ReplaceAttr: func(_ []string, attr slog.Attr) slog.Attr { slog.SetDefault(slog.New(handler))
err, ok := attr.Value.Any().(error)
if !ok {
return attr
}
return tint.Err(err)
},
}))
slog.SetDefault(logger)
} }
func startFwdServer(ctx context.Context) error { func startFwdServer(ctx context.Context) error {