Compare commits
5 Commits
v1.9.0
...
80f04c92ec
| Author | SHA1 | Date | |
|---|---|---|---|
| 80f04c92ec | |||
| ccbc6f0b67 | |||
| d273731e31 | |||
| 65f8ee360b | |||
| 042c8d1a51 |
@@ -16,6 +16,7 @@ REDIS_PORT=6379
|
||||
# otel 配置
|
||||
OTEL_HOST=127.0.0.1
|
||||
OTEL_PORT=4317
|
||||
OTEL_NAME_SUFFIX=dev
|
||||
|
||||
# 白银节点
|
||||
BAIYIN_CLOUD_URL=
|
||||
|
||||
15
README.md
15
README.md
@@ -1,11 +1,20 @@
|
||||
## TODO
|
||||
|
||||
- 选择代理部分,可以检查 redis 中的可用端口数量,无需查数据库
|
||||
- 取用端口后直接写入关闭任务,避免中途失败导致端口泄漏
|
||||
-
|
||||
提取代理:
|
||||
- 网关修改问题
|
||||
- 在提取流程中如果网关被修改,有可能导致数据不一致
|
||||
- 提供读写锁,在提取流程中获取读锁,在修改网关时获取写锁
|
||||
- 端口悬空问题
|
||||
- 在提取流程中如果发生异常,可能导致端口被占用但通道信息没有被写入数据库,配置以及端口将无法解除
|
||||
- 提交删除任务时,带上配置信息,无需再查数据库且接口总是会被正确清理
|
||||
- 异常情况下,将只清理网关端口,而无法解除节点连接,这个问题需要额外处理
|
||||
|
||||
---
|
||||
|
||||
- otel 没有正确记录接口失败信息
|
||||
|
||||
筛选和关联展示功能扩展
|
||||
|
||||
错误提示增强,展示整链路信息
|
||||
|
||||
ip 提取频率限制,在 ensure 函数加逻辑,通过 redis 或者 pg 计算分钟内提取次数,只允许每分钟提取 30 次
|
||||
|
||||
36
go.mod
36
go.mod
@@ -23,11 +23,11 @@ require (
|
||||
github.com/smartwalle/alipay/v3 v3.2.28
|
||||
github.com/valyala/fasthttp v1.68.0
|
||||
github.com/wechatpay-apiv3/wechatpay-go v0.2.21
|
||||
go.opentelemetry.io/otel v1.38.0
|
||||
go.opentelemetry.io/otel v1.43.0
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0
|
||||
go.opentelemetry.io/otel/sdk v1.38.0
|
||||
golang.org/x/crypto v0.45.0
|
||||
golang.org/x/sync v0.18.0
|
||||
go.opentelemetry.io/otel/sdk v1.43.0
|
||||
golang.org/x/crypto v0.49.0
|
||||
golang.org/x/sync v0.20.0
|
||||
gorm.io/datatypes v1.2.7
|
||||
gorm.io/driver/postgres v1.6.0
|
||||
gorm.io/gen v0.3.27
|
||||
@@ -59,7 +59,7 @@ require (
|
||||
github.com/gofiber/utils v1.1.0 // indirect
|
||||
github.com/gofrs/uuid v4.4.0+incompatible // indirect
|
||||
github.com/gomodule/redigo v2.0.0+incompatible // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
|
||||
github.com/hashicorp/errwrap v1.1.0 // indirect
|
||||
github.com/hashicorp/go-multierror v1.1.1 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
@@ -86,20 +86,20 @@ require (
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
|
||||
go.opentelemetry.io/contrib v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.38.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.38.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
|
||||
golang.org/x/mod v0.30.0 // indirect
|
||||
golang.org/x/net v0.47.0 // indirect
|
||||
golang.org/x/sys v0.38.0 // indirect
|
||||
golang.org/x/text v0.31.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 // indirect
|
||||
go.opentelemetry.io/otel/metric v1.43.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.43.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.10.0 // indirect
|
||||
golang.org/x/mod v0.33.0 // indirect
|
||||
golang.org/x/net v0.52.0 // indirect
|
||||
golang.org/x/sys v0.42.0 // indirect
|
||||
golang.org/x/text v0.35.0 // indirect
|
||||
golang.org/x/time v0.14.0 // indirect
|
||||
golang.org/x/tools v0.39.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251124214823-79d6a2a48846 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251124214823-79d6a2a48846 // indirect
|
||||
google.golang.org/grpc v1.77.0 // indirect
|
||||
google.golang.org/protobuf v1.36.10 // indirect
|
||||
golang.org/x/tools v0.42.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 // indirect
|
||||
google.golang.org/grpc v1.80.0 // indirect
|
||||
google.golang.org/protobuf v1.36.11 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gorm.io/driver/mysql v1.6.0 // indirect
|
||||
gorm.io/hints v1.1.2 // indirect
|
||||
|
||||
80
go.sum
80
go.sum
@@ -154,8 +154,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20200217142428-fce0ec30dd00/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs=
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c=
|
||||
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
|
||||
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
|
||||
@@ -277,22 +277,22 @@ go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ
|
||||
go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y=
|
||||
go.opentelemetry.io/contrib v1.38.0 h1:msaHYZ13HfLIbqXsGwZZQBg5zgxwumlZ1mCkXn3E7LM=
|
||||
go.opentelemetry.io/contrib v1.38.0/go.mod h1:4Vp7Az5Dez02V1lCi9OqLvSmSz0lbZu/O2r4XZsqwB0=
|
||||
go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8=
|
||||
go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0 h1:GqRJVj7UmLjCVyVJ3ZFLdPRmhDUp2zFmQe3RHIOsw24=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.38.0/go.mod h1:ri3aaHSmCTVYu2AWv44YMauwAQc0aqI9gHKIcSbI1pU=
|
||||
go.opentelemetry.io/otel v1.43.0 h1:mYIM03dnh5zfN7HautFE4ieIig9amkNANT+xcVxAj9I=
|
||||
go.opentelemetry.io/otel v1.43.0/go.mod h1:JuG+u74mvjvcm8vj8pI5XiHy1zDeoCS2LB1spIq7Ay0=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0 h1:88Y4s2C8oTui1LGM6bTWkw0ICGcOLCAI5l6zsD1j20k=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.43.0/go.mod h1:Vl1/iaggsuRlrHf/hfPJPvVag77kKyvrLeD10kpMl+A=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0 h1:lwI4Dc5leUqENgGuQImwLo4WnuXFPetmPpkLi2IrX54=
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.38.0/go.mod h1:Kz/oCE7z5wuyhPxsXDuaPteSWqjSBD5YaSdbxZYGbGk=
|
||||
go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA=
|
||||
go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E=
|
||||
go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA=
|
||||
go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE=
|
||||
go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs=
|
||||
go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A=
|
||||
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
|
||||
go.opentelemetry.io/otel/metric v1.43.0 h1:d7638QeInOnuwOONPp4JAOGfbCEpYb+K6DVWvdxGzgM=
|
||||
go.opentelemetry.io/otel/metric v1.43.0/go.mod h1:RDnPtIxvqlgO8GRW18W6Z/4P462ldprJtfxHxyKd2PY=
|
||||
go.opentelemetry.io/otel/sdk v1.43.0 h1:pi5mE86i5rTeLXqoF/hhiBtUNcrAGHLKQdhg4h4V9Dg=
|
||||
go.opentelemetry.io/otel/sdk v1.43.0/go.mod h1:P+IkVU3iWukmiit/Yf9AWvpyRDlUeBaRg6Y+C58QHzg=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0 h1:S88dyqXjJkuBNLeMcVPRFXpRw2fuwdvfCGLEo89fDkw=
|
||||
go.opentelemetry.io/otel/sdk/metric v1.43.0/go.mod h1:C/RJtwSEJ5hzTiUz5pXF1kILHStzb9zFlIEe85bhj6A=
|
||||
go.opentelemetry.io/otel/trace v1.43.0 h1:BkNrHpup+4k4w+ZZ86CZoHHEkohws8AY+WTX09nk+3A=
|
||||
go.opentelemetry.io/otel/trace v1.43.0/go.mod h1:/QJhyVBUUswCphDVxq+8mld+AvhXZLhe+8WVFxiFff0=
|
||||
go.opentelemetry.io/proto/otlp v1.10.0 h1:IQRWgT5srOCYfiWnpqUYz9CVmbO8bFmKcwYxpuCSL2g=
|
||||
go.opentelemetry.io/proto/otlp v1.10.0/go.mod h1:/CV4QoCR/S9yaPj8utp3lvQPoqMtxXdzn7ozvvozVqk=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
@@ -309,8 +309,8 @@ golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDf
|
||||
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
|
||||
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
|
||||
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
|
||||
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
|
||||
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
|
||||
golang.org/x/crypto v0.49.0 h1:+Ng2ULVvLHnJ/ZFEq4KdcDd/cfjrrjjNSXNzxg0Y4U4=
|
||||
golang.org/x/crypto v0.49.0/go.mod h1:ErX4dUh2UM+CFYiXZRTcMpEcN8b/1gxEuv3nODoYtCA=
|
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
|
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
|
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
|
||||
@@ -321,8 +321,8 @@ golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/mod v0.30.0 h1:fDEXFVZ/fmCKProc/yAXXUijritrDzahmwwefnjoPFk=
|
||||
golang.org/x/mod v0.30.0/go.mod h1:lAsf5O2EvJeSFMiBxXDki7sCgAxEUcZHXoXMKT4GJKc=
|
||||
golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8=
|
||||
golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
@@ -344,8 +344,8 @@ golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
|
||||
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
|
||||
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
|
||||
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
|
||||
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
|
||||
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
|
||||
golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0=
|
||||
golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw=
|
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
|
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
@@ -357,8 +357,8 @@ golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
|
||||
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
|
||||
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
|
||||
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
|
||||
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
|
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
@@ -379,8 +379,8 @@ golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
|
||||
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
|
||||
golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo=
|
||||
golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw=
|
||||
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
|
||||
@@ -403,8 +403,8 @@ golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
|
||||
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
||||
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
|
||||
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
|
||||
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
|
||||
golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8=
|
||||
golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA=
|
||||
golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI=
|
||||
golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
@@ -419,35 +419,35 @@ golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
|
||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
|
||||
golang.org/x/tools v0.39.0 h1:ik4ho21kwuQln40uelmciQPp9SipgNDdrafrYA4TmQQ=
|
||||
golang.org/x/tools v0.39.0/go.mod h1:JnefbkDPyD8UU2kI5fuf8ZX4/yUeh9W877ZeBONxUqQ=
|
||||
golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k=
|
||||
golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
|
||||
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
|
||||
gonum.org/v1/gonum v0.17.0 h1:VbpOemQlsSMrYmn7T2OUvQ4dqxQXU+ouZFQsZOx50z4=
|
||||
gonum.org/v1/gonum v0.17.0/go.mod h1:El3tOrEuMpv2UdMrbNlKEh9vd86bmQ6vqIcDwxEOc1E=
|
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
|
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
|
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
|
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251124214823-79d6a2a48846 h1:ZdyUkS9po3H7G0tuh955QVyyotWvOD4W0aEapeGeUYk=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20251124214823-79d6a2a48846/go.mod h1:Fk4kyraUvqD7i5H6S43sj2W98fbZa75lpZz/eUyhfO0=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251124214823-79d6a2a48846 h1:Wgl1rcDNThT+Zn47YyCXOXyX/COgMTIdhJ717F0l4xk=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20251124214823-79d6a2a48846/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9 h1:VPWxll4HlMw1Vs/qXtN7BvhZqsS9cdAittCNvVENElA=
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:7QBABkRtR8z+TEnmXTqIqwJLlzrZKVfAUm7tY3yGv0M=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9 h1:m8qni9SQFH0tJc1X0vmnpw/0t+AImlSvp30sEupozUg=
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20260401024825-9d38bb4040a9/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8=
|
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
|
||||
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
|
||||
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
|
||||
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
|
||||
google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM=
|
||||
google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig=
|
||||
google.golang.org/grpc v1.80.0 h1:Xr6m2WmWZLETvUNvIUmeD5OAagMw3FiKmMlTdViWsHM=
|
||||
google.golang.org/grpc v1.80.0/go.mod h1:ho/dLnxwi3EDJA4Zghp7k2Ec1+c2jqup0bFkw07bwF4=
|
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
|
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
|
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
|
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
|
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
|
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
|
||||
google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE=
|
||||
google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
|
||||
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/ini.v1 v1.56.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
|
||||
2
pkg/env/env.go
vendored
2
pkg/env/env.go
vendored
@@ -37,6 +37,7 @@ var (
|
||||
|
||||
OtelHost string
|
||||
OtelPort string
|
||||
OtelNameSuffix string
|
||||
|
||||
BaiyinCloudUrl string
|
||||
BaiyinTokenUrl string
|
||||
@@ -118,6 +119,7 @@ func Init() {
|
||||
|
||||
errs = append(errs, parse(&OtelHost, "OTEL_HOST", true, nil))
|
||||
errs = append(errs, parse(&OtelPort, "OTEL_PORT", true, nil))
|
||||
errs = append(errs, parse(&OtelNameSuffix, "OTEL_NAME_SUFFIX", true, nil))
|
||||
|
||||
errs = append(errs, parse(&BaiyinCloudUrl, "BAIYIN_CLOUD_URL", false, nil))
|
||||
errs = append(errs, parse(&BaiyinTokenUrl, "BAIYIN_TOKEN_URL", false, nil))
|
||||
|
||||
@@ -9,8 +9,8 @@ if ($confrim -ne "y") {
|
||||
exit 0
|
||||
}
|
||||
|
||||
docker build -t repo.lanhuip.com:8554/lanhu/platform:latest .
|
||||
docker build -t repo.lanhuip.com:8554/lanhu/platform:$($args[0]) .
|
||||
docker build -t repo.lanhuip.com/lanhu/platform:latest .
|
||||
docker build -t repo.lanhuip.com/lanhu/platform:$($args[0]) .
|
||||
|
||||
docker push repo.lanhuip.com:8554/lanhu/platform:latest
|
||||
docker push repo.lanhuip.com:8554/lanhu/platform:$($args[0])
|
||||
docker push repo.lanhuip.com/lanhu/platform:latest
|
||||
docker push repo.lanhuip.com/lanhu/platform:$($args[0])
|
||||
|
||||
@@ -67,6 +67,7 @@ const (
|
||||
ScopeChannelRead = string("channel:read") // 读取 IP 列表
|
||||
ScopeChannelReadOfUser = string("channel:read:of_user") // 读取指定用户的 IP 列表
|
||||
ScopeChannelWrite = string("channel:write") // 写入 IP
|
||||
ScopeChannelWriteClearExpired = string("channel:write:clear_expired") // 清理过期 IP
|
||||
|
||||
ScopeProxy = string("proxy") // 代理
|
||||
ScopeProxyRead = string("proxy:read") // 读取代理列表
|
||||
|
||||
@@ -9,9 +9,3 @@ const RemoveChannel = "channel:remove"
|
||||
func NewRemoveChannel(batch string) *asynq.Task {
|
||||
return asynq.NewTask(RemoveChannel, []byte(batch))
|
||||
}
|
||||
|
||||
const ClearExpiredChannels = "channel:clear_expired"
|
||||
|
||||
func NewClearExpiredChannels() *asynq.Task {
|
||||
return asynq.NewTask(ClearExpiredChannels, nil)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package globals
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"platform/pkg/env"
|
||||
|
||||
"go.opentelemetry.io/otel"
|
||||
@@ -17,11 +18,17 @@ import (
|
||||
var tp *trace.TracerProvider
|
||||
|
||||
func initOtel(ctx context.Context) error {
|
||||
addr := env.OtelHost + ":" + env.OtelPort
|
||||
name := "lanhu-platform"
|
||||
if env.OtelNameSuffix != "" {
|
||||
name += "-" + env.OtelNameSuffix
|
||||
}
|
||||
slog.Info("初始化 otel", "endpoint", addr, "service_suffix", name)
|
||||
|
||||
if env.OtelHost == "" || env.OtelPort == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
addr := env.OtelHost + ":" + env.OtelPort
|
||||
exporter, err := otlptracegrpc.New(ctx,
|
||||
otlptracegrpc.WithEndpoint(addr),
|
||||
otlptracegrpc.WithInsecure(),
|
||||
@@ -36,7 +43,7 @@ func initOtel(ctx context.Context) error {
|
||||
trace.WithResource(
|
||||
resource.NewWithAttributes(
|
||||
semconv.SchemaURL,
|
||||
semconv.ServiceNameKey.String("lanhu-platform"),
|
||||
semconv.ServiceNameKey.String(name),
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
@@ -341,3 +341,32 @@ type PageChannelOfUserByAdminReq struct {
|
||||
ExpiredAtStart *time.Time `json:"expired_at_start"`
|
||||
ExpiredAtEnd *time.Time `json:"expired_at_end"`
|
||||
}
|
||||
|
||||
// SyncChannelClearExpiredByAdmin 清理过期通道
|
||||
func SyncChannelClearExpiredByAdmin(c *fiber.Ctx) error {
|
||||
if _, err := auth.GetAuthCtx(c).PermitAdmin(core.ScopeChannelWriteClearExpired); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var req SyncChannelClearExpiredByAdminReq
|
||||
if err := g.Validator.ParseBody(c, &req); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
count, err := s.Channel.ClearExpiredChannels(req.ProxyID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.JSON(SyncChannelClearExpiredByAdminResp{
|
||||
Count: count,
|
||||
})
|
||||
}
|
||||
|
||||
type SyncChannelClearExpiredByAdminReq struct {
|
||||
ProxyID int32 `json:"proxy_id" validate:"required"`
|
||||
}
|
||||
|
||||
type SyncChannelClearExpiredByAdminResp struct {
|
||||
Count int `json:"count"`
|
||||
}
|
||||
|
||||
@@ -120,6 +120,8 @@ func RemoveProxy(c *fiber.Ctx) error {
|
||||
return c.JSON(nil)
|
||||
}
|
||||
|
||||
// ====================
|
||||
|
||||
// region 报告上线
|
||||
func ProxyReportOnline(c *fiber.Ctx) (err error) {
|
||||
return c.JSON(map[string]any{
|
||||
|
||||
@@ -11,6 +11,8 @@ import (
|
||||
"github.com/gofiber/fiber/v2/middleware/requestid"
|
||||
"github.com/google/uuid"
|
||||
"github.com/jxskiss/base62"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
func ApplyMiddlewares(app *fiber.App) {
|
||||
@@ -20,13 +22,8 @@ func ApplyMiddlewares(app *fiber.App) {
|
||||
EnableStackTrace: true,
|
||||
}))
|
||||
|
||||
// cors
|
||||
app.Use(cors.New(cors.Config{
|
||||
AllowCredentials: true,
|
||||
AllowOriginsFunc: func(origin string) bool {
|
||||
return true
|
||||
},
|
||||
}))
|
||||
// metric
|
||||
app.Use(otelfiber.Middleware())
|
||||
|
||||
// logger
|
||||
app.Use(logger.New(logger.Config{
|
||||
@@ -35,8 +32,31 @@ func ApplyMiddlewares(app *fiber.App) {
|
||||
},
|
||||
}))
|
||||
|
||||
// metric
|
||||
app.Use(otelfiber.Middleware())
|
||||
// 补充 otel span attr
|
||||
app.Use(func(c *fiber.Ctx) error {
|
||||
err := c.Next()
|
||||
|
||||
span := trace.SpanFromContext(c.UserContext())
|
||||
if !span.IsRecording() {
|
||||
return err
|
||||
}
|
||||
|
||||
str := ""
|
||||
if err != nil {
|
||||
str = err.Error()
|
||||
}
|
||||
|
||||
span.SetAttributes(attribute.String("http.response.error", str))
|
||||
return err
|
||||
})
|
||||
|
||||
// cors
|
||||
app.Use(cors.New(cors.Config{
|
||||
AllowCredentials: true,
|
||||
AllowOriginsFunc: func(origin string) bool {
|
||||
return true
|
||||
},
|
||||
}))
|
||||
|
||||
// request id
|
||||
app.Use(requestid.New(requestid.Config{
|
||||
|
||||
@@ -3,8 +3,8 @@ package web
|
||||
import (
|
||||
"platform/pkg/env"
|
||||
auth2 "platform/web/auth"
|
||||
"platform/web/core"
|
||||
"platform/web/handlers"
|
||||
s "platform/web/services"
|
||||
"time"
|
||||
|
||||
q "platform/web/queries"
|
||||
@@ -34,11 +34,8 @@ func ApplyRouters(app *fiber.App) {
|
||||
}
|
||||
return ctx.JSON(rs)
|
||||
})
|
||||
debug.Get("/channel/clear-expired", func(ctx *fiber.Ctx) error {
|
||||
if err := s.Channel.ClearExpiredChannels(); err != nil {
|
||||
return err
|
||||
}
|
||||
return ctx.SendStatus(fiber.StatusOK)
|
||||
debug.Get("/test/err", func(ctx *fiber.Ctx) error {
|
||||
return core.NewBizErr("测试错误")
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -208,6 +205,7 @@ func adminRouter(api fiber.Router) {
|
||||
var channel = api.Group("/channel")
|
||||
channel.Post("/page", handlers.PageChannelByAdmin)
|
||||
channel.Post("/page/of-user", handlers.PageChannelOfUserByAdmin)
|
||||
channel.Post("/sync/clear-expired", handlers.SyncChannelClearExpiredByAdmin)
|
||||
|
||||
// proxy 代理
|
||||
var proxy = api.Group("/proxy")
|
||||
|
||||
@@ -26,7 +26,7 @@ var Channel = &channelServer{
|
||||
type ChannelServiceProvider interface {
|
||||
CreateChannels(source netip.Addr, resourceId int32, authWhitelist bool, authPassword bool, count int, edgeFilter *EdgeFilter) ([]*m.Channel, error)
|
||||
RemoveChannels(batch string) error
|
||||
ClearExpiredChannels() error
|
||||
ClearExpiredChannels(proxyId int32) (int, error)
|
||||
}
|
||||
|
||||
type channelServer struct {
|
||||
@@ -41,8 +41,8 @@ func (s *channelServer) RemoveChannels(batch string) error {
|
||||
return s.provider.RemoveChannels(batch)
|
||||
}
|
||||
|
||||
func (s *channelServer) ClearExpiredChannels() error {
|
||||
return s.provider.ClearExpiredChannels()
|
||||
func (s *channelServer) ClearExpiredChannels(proxyId int32) (int, error) {
|
||||
return s.provider.ClearExpiredChannels(proxyId)
|
||||
}
|
||||
|
||||
// 授权方式
|
||||
@@ -220,10 +220,13 @@ func ensure(now time.Time, source netip.Addr, resourceId int32, authWhitelist bo
|
||||
return resource, ips, nil
|
||||
}
|
||||
|
||||
var (
|
||||
freeChansKey = "channel:free"
|
||||
usedChansKey = "channel:used"
|
||||
)
|
||||
func freeChansKey(proxy int32) string {
|
||||
return "channel:free:" + strconv.Itoa(int(proxy))
|
||||
}
|
||||
|
||||
func usedChansKey(proxy int32, batch string) string {
|
||||
return "channel:used:" + strconv.Itoa(int(proxy)) + ":" + batch
|
||||
}
|
||||
|
||||
// 扩容通道
|
||||
func regChans(proxy int32, chans []netip.AddrPort) error {
|
||||
@@ -232,7 +235,7 @@ func regChans(proxy int32, chans []netip.AddrPort) error {
|
||||
strs[i] = ch.String()
|
||||
}
|
||||
|
||||
key := freeChansKey + ":" + strconv.Itoa(int(proxy))
|
||||
key := freeChansKey(proxy)
|
||||
err := g.Redis.SAdd(context.Background(), key, strs...).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("扩容通道失败: %w", err)
|
||||
@@ -242,7 +245,7 @@ func regChans(proxy int32, chans []netip.AddrPort) error {
|
||||
|
||||
// 缩容通道
|
||||
func remChans(proxy int32) error {
|
||||
key := freeChansKey + ":" + strconv.Itoa(int(proxy))
|
||||
key := freeChansKey(proxy)
|
||||
err := g.Redis.Del(context.Background(), key).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("缩容通道失败: %w", err)
|
||||
@@ -252,13 +255,12 @@ func remChans(proxy int32) error {
|
||||
|
||||
// 取用通道
|
||||
func lockChans(proxy int32, batch string, count int) ([]netip.AddrPort, error) {
|
||||
pid := strconv.Itoa(int(proxy))
|
||||
chans, err := RedisScriptLockChans.Run(
|
||||
context.Background(),
|
||||
g.Redis,
|
||||
[]string{
|
||||
freeChansKey + ":" + pid,
|
||||
usedChansKey + ":" + pid + ":" + batch,
|
||||
freeChansKey(proxy),
|
||||
usedChansKey(proxy, batch),
|
||||
},
|
||||
count,
|
||||
).StringSlice()
|
||||
@@ -296,13 +298,12 @@ return ports
|
||||
|
||||
// 归还通道
|
||||
func freeChans(proxy int32, batch string) error {
|
||||
pid := strconv.Itoa(int(proxy))
|
||||
err := RedisScriptFreeChans.Run(
|
||||
context.Background(),
|
||||
g.Redis,
|
||||
[]string{
|
||||
freeChansKey + ":" + pid,
|
||||
usedChansKey + ":" + pid + ":" + batch,
|
||||
freeChansKey(proxy),
|
||||
usedChansKey(proxy, batch),
|
||||
},
|
||||
).Err()
|
||||
if err != nil {
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"gorm.io/gen"
|
||||
"gorm.io/gen/field"
|
||||
)
|
||||
|
||||
@@ -28,7 +30,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
batch := ID.GenReadable("bat")
|
||||
batchNo := ID.GenReadable("bat")
|
||||
|
||||
// 检查并获取套餐与白名单
|
||||
resource, whitelists, err := ensure(now, source, resourceId, authWhitelist, count)
|
||||
@@ -40,73 +42,27 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
expire := now.Add(resource.Live)
|
||||
|
||||
// 选择代理
|
||||
proxyResult := struct {
|
||||
m.Proxy
|
||||
Count int
|
||||
}{}
|
||||
err = q.Proxy.
|
||||
LeftJoin(q.Channel, q.Channel.ProxyID.EqCol(q.Proxy.ID), q.Channel.ExpiredAt.Gt(now)).
|
||||
Select(q.Proxy.ALL, field.NewUnsafeFieldRaw("10000 - count(*)").As("count")).
|
||||
Where(
|
||||
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
|
||||
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
|
||||
).
|
||||
Group(q.Proxy.ID).
|
||||
Order(field.NewField("", "count")).
|
||||
Limit(1).Scan(&proxyResult)
|
||||
proxy, gateway, err := selectProxy(count)
|
||||
if err != nil {
|
||||
return nil, core.NewBizErr("获取可用代理失败", err)
|
||||
return nil, err
|
||||
}
|
||||
if proxyResult.Count < count {
|
||||
return nil, core.NewBizErr("无可用主机,请稍后再试")
|
||||
}
|
||||
proxy := proxyResult.Proxy
|
||||
|
||||
// 取用端口
|
||||
var chans []netip.AddrPort
|
||||
err = g.Redsync.WithLock(proxyStatusLockKey(proxy.ID), func() error {
|
||||
lockedProxy, err := q.Proxy.Where(q.Proxy.ID.Eq(proxy.ID)).Take()
|
||||
chans, err := selectPorts(proxy.ID, batchNo, count, expire)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 节点查询到提交,需要锁定防止并发取用
|
||||
channels := make([]*m.Channel, count)
|
||||
err = g.Redsync.WithLock(lockChannelCreateKey(), func() error {
|
||||
// 取用节点
|
||||
edges, err := selectEdges(gateway, filter, count)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if lockedProxy.Status != m.ProxyStatusOnline {
|
||||
return core.NewBizErr("无可用主机,请稍后再试")
|
||||
}
|
||||
|
||||
chans, err = lockChans(proxy.ID, batch, count)
|
||||
if err != nil {
|
||||
return core.NewBizErr("无可用通道,请稍后再试", err)
|
||||
}
|
||||
|
||||
proxy = *lockedProxy
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = g.Asynq.Enqueue(
|
||||
e.NewRemoveChannel(batch),
|
||||
asynq.ProcessAt(expire),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, core.NewServErr("提交关闭通道任务失败", err)
|
||||
}
|
||||
|
||||
// 取用节点
|
||||
secret := strings.Split(u.Z(proxy.Secret), ":")
|
||||
if len(secret) != 2 {
|
||||
return nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
|
||||
}
|
||||
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
|
||||
|
||||
edges, err := getAvailableEdges(gateway, filter, count)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// 绑定节点到端口
|
||||
channels := make([]*m.Channel, count)
|
||||
// 绑定节点端口
|
||||
chanConfigs := make([]*g.PortConfigsReq, count)
|
||||
edgeConfigs := make([]string, 0, count)
|
||||
for i := range count {
|
||||
@@ -117,7 +73,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
channels[i] = &m.Channel{
|
||||
UserID: user.ID,
|
||||
ResourceID: resourceId,
|
||||
BatchNo: batch,
|
||||
BatchNo: batchNo,
|
||||
ProxyID: proxy.ID,
|
||||
Host: u.Else(proxy.Host, proxy.IP.String()),
|
||||
Port: ch.Port(),
|
||||
@@ -126,7 +82,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
FilterProv: filter.Prov,
|
||||
FilterCity: filter.City,
|
||||
ExpiredAt: expire,
|
||||
Proxy: &proxy,
|
||||
Proxy: proxy,
|
||||
}
|
||||
|
||||
// 通道配置数据
|
||||
@@ -156,13 +112,35 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
}
|
||||
}
|
||||
|
||||
// 提交配置
|
||||
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "total_count", len(chanConfigs), "remote_count", len(edgeConfigs))
|
||||
if env.RunMode == env.RunModeProd {
|
||||
|
||||
// 连接节点到网关
|
||||
if err := g.Cloud.CloudConnect(&g.CloudConnectReq{Uuid: proxy.Mac, Edge: &edgeConfigs}); err != nil {
|
||||
return core.NewServErr("连接云平台失败", err)
|
||||
}
|
||||
|
||||
// 启用网关代理通道
|
||||
if err := gateway.GatewayPortConfigs(chanConfigs); err != nil {
|
||||
slog.Warn("提交代理端口配置失败", "error", err.Error())
|
||||
return core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
|
||||
}
|
||||
} else {
|
||||
for _, item := range chanConfigs {
|
||||
str, _ := json.Marshal(item)
|
||||
fmt.Println(string(str))
|
||||
}
|
||||
}
|
||||
|
||||
// 保存数据
|
||||
err = q.Q.Transaction(func(q *q.Query) error {
|
||||
// 更新使用记录
|
||||
var result gen.ResultInfo
|
||||
var err error
|
||||
switch resource.Type {
|
||||
case m.ResourceTypeShort:
|
||||
_, err = q.ResourceShort.
|
||||
result, err = q.ResourceShort.
|
||||
Where(
|
||||
q.ResourceShort.ID.Eq(*resource.ShortId),
|
||||
q.ResourceShort.Used.Eq(resource.Used),
|
||||
@@ -175,7 +153,7 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
)
|
||||
|
||||
case m.ResourceTypeLong:
|
||||
_, err = q.ResourceLong.
|
||||
result, err = q.ResourceLong.
|
||||
Where(
|
||||
q.ResourceLong.ID.Eq(*resource.LongId),
|
||||
q.ResourceLong.Used.Eq(resource.Used),
|
||||
@@ -188,11 +166,14 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
)
|
||||
|
||||
default:
|
||||
return core.NewServErr("套餐类型不正确,无法更新", nil)
|
||||
return core.NewBizErr("套餐类型不正确,无法更新", nil)
|
||||
}
|
||||
if err != nil {
|
||||
return core.NewServErr("更新套餐使用记录失败", err)
|
||||
}
|
||||
if result.RowsAffected == 0 {
|
||||
return core.NewBizErr("提取太频繁,请稍后再试", nil)
|
||||
}
|
||||
|
||||
// 保存通道
|
||||
err = q.Channel.
|
||||
@@ -206,9 +187,9 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
err = q.LogsUserUsage.Create(&m.LogsUserUsage{
|
||||
UserID: user.ID,
|
||||
ResourceID: resourceId,
|
||||
BatchNo: batch,
|
||||
BatchNo: batchNo,
|
||||
Count: int32(count),
|
||||
ISP: u.P(filter.Isp.String()),
|
||||
ISP: u.X(filter.Isp.String()),
|
||||
Prov: filter.Prov,
|
||||
City: filter.City,
|
||||
IP: orm.Inet{Addr: source},
|
||||
@@ -221,39 +202,20 @@ func (s *channelBaiyinProvider) CreateChannels(source netip.Addr, resourceId int
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
|
||||
// 提交配置
|
||||
if env.RunMode == env.RunModeProd {
|
||||
|
||||
// 连接节点到网关
|
||||
err = g.Cloud.CloudConnect(&g.CloudConnectReq{
|
||||
Uuid: proxy.Mac,
|
||||
Edge: &edgeConfigs,
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, core.NewServErr("连接云平台失败", err)
|
||||
}
|
||||
|
||||
// 启用网关代理通道
|
||||
err = gateway.GatewayPortConfigs(chanConfigs)
|
||||
if err != nil {
|
||||
slog.Warn("提交代理端口配置失败", "error", err.Error())
|
||||
return nil, core.NewServErr(fmt.Sprintf("配置代理 %s 端口失败", proxy.IP.String()), err)
|
||||
}
|
||||
} else {
|
||||
slog.Debug("提交代理端口配置", "proxy", proxy.IP.String(), "count", len(chanConfigs), "remote", len(edgeConfigs))
|
||||
for _, item := range chanConfigs {
|
||||
str, _ := json.Marshal(item)
|
||||
fmt.Println(string(str))
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return channels, nil
|
||||
}
|
||||
|
||||
func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
|
||||
return g.Redsync.WithLock(lockChannelRemoveKey(batch), func() error {
|
||||
start := time.Now()
|
||||
|
||||
// 获取连接数据
|
||||
@@ -271,6 +233,15 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
|
||||
return core.NewServErr(fmt.Sprintf("获取代理数据失败,batch:%s", batch), err)
|
||||
}
|
||||
|
||||
// 检查通道是否存在
|
||||
exist, err := g.Redis.Exists(context.Background(), usedChansKey(proxy.ID, batch)).Result()
|
||||
if err != nil {
|
||||
return core.NewServErr("查询使用中通道失败", err)
|
||||
}
|
||||
if exist == 0 {
|
||||
return nil // 没有使用中通道,已经被清理过了
|
||||
}
|
||||
|
||||
// 准备配置数据
|
||||
edgeConfigs := make([]string, len(channels))
|
||||
configs := make([]*g.PortConfigsReq, len(channels))
|
||||
@@ -293,6 +264,9 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
|
||||
|
||||
// 清空通道配置
|
||||
secret := strings.Split(u.Z(proxy.Secret), ":")
|
||||
if len(secret) != 2 {
|
||||
return core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
|
||||
}
|
||||
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
|
||||
err := gateway.GatewayPortConfigs(configs)
|
||||
if err != nil {
|
||||
@@ -322,29 +296,141 @@ func (s *channelBaiyinProvider) RemoveChannels(batch string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
slog.Debug("清除代理端口配置", "proxy", proxy.IP.String(), "batch", batch, "duration", time.Since(start).String())
|
||||
slog.Debug("清除代理端口配置", "proxy", proxy.ID, "batch", batch, "duration", time.Since(start).String())
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *channelBaiyinProvider) ClearExpiredChannels() error {
|
||||
now := time.Now().Add(time.Hour)
|
||||
var batches []struct{ BatchNo string }
|
||||
err := q.Channel.Select(q.Channel.BatchNo).Where(q.Channel.ExpiredAt.Lt(now)).Group(q.Channel.BatchNo).Scan(&batches)
|
||||
// ClearExpiredChannels 清理指定代理的过期通道,并返回清理数量(现在理论上不会有需要手动批量清理的通道,未来可以废弃)
|
||||
func (s *channelBaiyinProvider) ClearExpiredChannels(proxyId int32) (int, error) {
|
||||
now := time.Now()
|
||||
|
||||
// 获取未清理通道
|
||||
keys, err := g.Redis.Keys(context.Background(), usedChansKey(proxyId, "*")).Result()
|
||||
if err != nil {
|
||||
return core.NewServErr("查询过期通道失败", err)
|
||||
return 0, core.NewServErr("查询使用中通道失败", err)
|
||||
}
|
||||
if len(keys) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
for _, batch := range batches {
|
||||
err := s.RemoveChannels(batch.BatchNo)
|
||||
batchList := make([]string, len(keys))
|
||||
batchSet := make(map[string]struct{}, len(keys))
|
||||
for i, key := range keys {
|
||||
parts := strings.Split(key, ":")
|
||||
if len(parts) != 4 {
|
||||
return 0, core.NewServErr(fmt.Sprintf("使用中通道键格式错误: %s", key), nil)
|
||||
}
|
||||
batchList[i] = parts[3]
|
||||
batchSet[parts[3]] = struct{}{}
|
||||
}
|
||||
|
||||
// 排除未过期通道
|
||||
var batchQueried []struct{ BatchNo string }
|
||||
err = q.Channel.
|
||||
Select(q.Channel.BatchNo).
|
||||
Where(
|
||||
q.Channel.BatchNo.In(batchList...),
|
||||
q.Channel.ExpiredAt.Gte(now),
|
||||
).
|
||||
Group(q.Channel.BatchNo).
|
||||
Scan(&batchQueried)
|
||||
if err != nil {
|
||||
slog.Error("清理过期通道失败", "batch", batch.BatchNo, "error", err)
|
||||
return 0, core.NewServErr("查询过期通道失败", err)
|
||||
}
|
||||
for _, batch := range batchQueried {
|
||||
delete(batchSet, batch.BatchNo)
|
||||
}
|
||||
|
||||
// 清理过期通道
|
||||
slog.Info("批量清理过期通道", "count", len(batchSet))
|
||||
for batchNo, _ := range batchSet {
|
||||
err := s.RemoveChannels(batchNo)
|
||||
if err != nil {
|
||||
slog.Error("清理过期通道失败", "batch", batchNo, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return len(batchSet), nil
|
||||
}
|
||||
|
||||
func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) {
|
||||
func lockChannelCreateKey() string {
|
||||
return "platform:channel:create"
|
||||
}
|
||||
|
||||
func lockChannelRemoveKey(bid string) string {
|
||||
return fmt.Sprintf("platform:batch:remove_expired:%s", bid)
|
||||
}
|
||||
|
||||
func selectProxy(count int) (*m.Proxy, g.GatewayClient, error) {
|
||||
// 获取在线节点
|
||||
proxies, err := q.Proxy.Where(
|
||||
q.Proxy.Type.Eq(int(m.ProxyTypeBaiYin)),
|
||||
q.Proxy.Status.Eq(int(m.ProxyStatusOnline)),
|
||||
).Find()
|
||||
if err != nil {
|
||||
return nil, nil, core.NewBizErr("获取可用代理失败", err)
|
||||
}
|
||||
if len(proxies) == 0 {
|
||||
return nil, nil, core.NewBizErr("无可用代理")
|
||||
}
|
||||
|
||||
proxyIDs := make([]int32, 0, len(proxies))
|
||||
proxyMap := make(map[int32]*m.Proxy, len(proxies))
|
||||
for _, item := range proxies {
|
||||
proxyIDs = append(proxyIDs, item.ID)
|
||||
proxyMap[item.ID] = item
|
||||
}
|
||||
|
||||
// 获取最空闲节点
|
||||
maxId := int32(0)
|
||||
maxCount := -1
|
||||
for _, id := range proxyIDs {
|
||||
idCount, err := g.Redis.SCard(context.Background(), freeChansKey(id)).Result()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("查询可用通道数量失败: %w", err)
|
||||
}
|
||||
|
||||
if idCount > int64(maxCount) {
|
||||
maxCount = int(idCount)
|
||||
maxId = id
|
||||
}
|
||||
}
|
||||
if maxCount < count {
|
||||
return nil, nil, core.NewBizErr("无可用代理")
|
||||
}
|
||||
proxy := proxyMap[maxId]
|
||||
|
||||
secret := strings.Split(u.Z(proxy.Secret), ":")
|
||||
if len(secret) != 2 {
|
||||
return nil, nil, core.NewServErr(fmt.Sprintf("代理 %s 密钥格式错误", proxy.IP.String()), nil)
|
||||
}
|
||||
gateway := g.NewGateway(proxy.IP.String(), secret[0], secret[1])
|
||||
|
||||
return proxy, gateway, nil
|
||||
}
|
||||
|
||||
func selectPorts(proxyId int32, batchNo string, count int, expire time.Time) ([]netip.AddrPort, error) {
|
||||
chans, err := lockChans(proxyId, batchNo, count)
|
||||
if err != nil {
|
||||
return nil, core.NewBizErr("无可用通道,请稍后再试", err)
|
||||
}
|
||||
|
||||
_, err = g.Asynq.Enqueue(
|
||||
e.NewRemoveChannel(batchNo),
|
||||
asynq.ProcessAt(expire),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, core.NewServErr("注册异步关闭通道任务失败", err)
|
||||
}
|
||||
|
||||
return chans, nil
|
||||
}
|
||||
|
||||
// selectEdges 选择节点,优先本地节点,失败重试,直到达到重试次数限制
|
||||
// 本地节点通过 Assigned = false 排除已分配节点
|
||||
// 云端节点通过 NoRepeat = true 排除已分配节点
|
||||
func selectEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) ([]EdgeInfo, error) {
|
||||
edges := make([]EdgeInfo, 0, count)
|
||||
|
||||
// 先查本地
|
||||
@@ -369,7 +455,7 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) (
|
||||
return edges, nil
|
||||
}
|
||||
|
||||
// 再查云端无重复
|
||||
// 再查云端
|
||||
remaining := count - len(edges)
|
||||
cloudEdgesResp, err := g.Cloud.CloudEdges(&g.CloudEdgesReq{
|
||||
Province: filter.Prov,
|
||||
@@ -390,13 +476,11 @@ func getAvailableEdges(gateway g.GatewayClient, filter *EdgeFilter, count int) (
|
||||
EdgeID: edge.EdgeID,
|
||||
})
|
||||
}
|
||||
if len(edges) >= count {
|
||||
return edges, nil
|
||||
if len(edges) < count {
|
||||
return nil, core.NewBizErr("地区可用节点数量不足")
|
||||
}
|
||||
|
||||
// 不能和已有的重复,如果有重复则再次查询云端补足,二次提取还有重复则放弃
|
||||
|
||||
return nil, core.NewBizErr("地区可用节点数量不足")
|
||||
return edges, nil
|
||||
}
|
||||
|
||||
type EdgeInfo struct {
|
||||
|
||||
@@ -2,7 +2,6 @@ package services
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/netip"
|
||||
"platform/pkg/u"
|
||||
"platform/web/core"
|
||||
@@ -10,7 +9,6 @@ import (
|
||||
"platform/web/globals/orm"
|
||||
m "platform/web/models"
|
||||
q "platform/web/queries"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"gorm.io/gen/field"
|
||||
@@ -20,13 +18,9 @@ var Proxy = &proxyService{}
|
||||
|
||||
type proxyService struct{}
|
||||
|
||||
func proxyStatusLockKey(id int32) string {
|
||||
return fmt.Sprintf("platform:proxy:status:%d", id)
|
||||
}
|
||||
|
||||
func hasUsedChans(proxyID int32) (bool, error) {
|
||||
ctx := context.Background()
|
||||
pattern := usedChansKey + ":" + strconv.Itoa(int(proxyID)) + ":*"
|
||||
pattern := usedChansKey(proxyID, "*")
|
||||
keys, _, err := g.Redis.Scan(ctx, 0, pattern, 1).Result()
|
||||
if err != nil {
|
||||
return false, err
|
||||
@@ -192,7 +186,7 @@ type UpdateProxyStatus struct {
|
||||
}
|
||||
|
||||
func (s *proxyService) UpdateStatus(update *UpdateProxyStatus) error {
|
||||
return g.Redsync.WithLock(proxyStatusLockKey(update.ID), func() error {
|
||||
return q.Q.Transaction(func(tx *q.Query) error {
|
||||
proxy, err := q.Proxy.Where(q.Proxy.ID.Eq(update.ID)).Take()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -160,6 +160,10 @@ func (s *resourceService) CalcPrice(skuCode string, count int32, user *m.User, c
|
||||
return nil, nil, nil, decimal.Zero, decimal.Zero, decimal.Zero, core.NewBizErr("产品不可用", err)
|
||||
}
|
||||
|
||||
if count < sku.CountMin {
|
||||
return nil, nil, nil, decimal.Zero, decimal.Zero, decimal.Zero, core.NewBizErr(fmt.Sprintf("购买数量不能少于 %d", sku.CountMin))
|
||||
}
|
||||
|
||||
// 原价
|
||||
amountMin := sku.PriceMin.Mul(decimal.NewFromInt32(count))
|
||||
amount := sku.Price.Mul(decimal.NewFromInt32(count))
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
)
|
||||
|
||||
func HandleCompleteTrade(_ context.Context, task *asynq.Task) error {
|
||||
slog.Info("[event]尝试结束交易")
|
||||
var event events.CloseTradeData
|
||||
if err := json.Unmarshal(task.Payload(), &event); err != nil {
|
||||
return fmt.Errorf("解析任务参数失败: %w", err)
|
||||
@@ -30,11 +31,11 @@ func HandleCompleteTrade(_ context.Context, task *asynq.Task) error {
|
||||
}
|
||||
|
||||
if err := s.Trade.CompleteTrade(user, &data); err != nil {
|
||||
slog.Debug("完成交易失败[异步结束订单]", "err", err)
|
||||
slog.Debug("结束交易失败:完成交易失败", "err", err)
|
||||
|
||||
// 交易无法完成,关闭交易
|
||||
if err := s.Trade.CancelTrade(&data); err != nil {
|
||||
return fmt.Errorf("取消交易失败[异步结束订单]: %w", err)
|
||||
return fmt.Errorf("结束交易失败:取消交易失败: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,17 +44,11 @@ func HandleCompleteTrade(_ context.Context, task *asynq.Task) error {
|
||||
|
||||
func HandleRemoveChannel(_ context.Context, task *asynq.Task) (err error) {
|
||||
batch := string(task.Payload())
|
||||
slog.Info("[event]删除通道", "batch", batch)
|
||||
|
||||
err = s.Channel.RemoveChannels(batch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("删除通道失败: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func HandleClearExpiredChannels(_ context.Context, _ *asynq.Task) (err error) {
|
||||
err = s.Channel.ClearExpiredChannels()
|
||||
if err != nil {
|
||||
return fmt.Errorf("清理过期通道失败: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
49
web/web.go
49
web/web.go
@@ -42,10 +42,6 @@ func RunApp(pCtx context.Context) error {
|
||||
return RunTask(ctx)
|
||||
})
|
||||
|
||||
g.Go(func() error {
|
||||
return RunCron(ctx)
|
||||
})
|
||||
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
@@ -89,12 +85,12 @@ func RunTask(ctx context.Context) error {
|
||||
ErrorHandler: asynq.ErrorHandlerFunc(func(ctx context.Context, task *asynq.Task, err error) {
|
||||
slog.Error("任务执行失败", "task", task.Type(), "error", err)
|
||||
}),
|
||||
Logger: &AppAsynqLogger{},
|
||||
})
|
||||
|
||||
var mux = asynq.NewServeMux()
|
||||
mux.HandleFunc(events.RemoveChannel, tasks.HandleRemoveChannel)
|
||||
mux.HandleFunc(events.CloseTrade, tasks.HandleCompleteTrade)
|
||||
mux.HandleFunc(events.ClearExpiredChannels, tasks.HandleClearExpiredChannels)
|
||||
|
||||
// 停止服务
|
||||
go func() {
|
||||
@@ -111,25 +107,32 @@ func RunTask(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func RunCron(ctx context.Context) error {
|
||||
server := asynq.NewSchedulerFromRedisClient(deps.Redis, &asynq.SchedulerOpts{
|
||||
Location: time.Local,
|
||||
})
|
||||
type AppAsynqLogger struct{}
|
||||
|
||||
// 每小时清理一次一小时之前的过期通道
|
||||
server.Register("0 * * * *", asynq.NewTask(events.ClearExpiredChannels, nil))
|
||||
|
||||
// 停止服务
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
server.Shutdown()
|
||||
}()
|
||||
|
||||
// 启动服务
|
||||
err := server.Run()
|
||||
if err != nil {
|
||||
return fmt.Errorf("定时任务服务运行失败: %w", err)
|
||||
func (l *AppAsynqLogger) Debug(args ...any) {
|
||||
slog.Debug("Asynq", anyToAttrs(args)...)
|
||||
}
|
||||
|
||||
return nil
|
||||
func (l *AppAsynqLogger) Info(args ...any) {
|
||||
slog.Info("Asynq", anyToAttrs(args)...)
|
||||
}
|
||||
|
||||
func (l *AppAsynqLogger) Warn(args ...any) {
|
||||
slog.Warn("Asynq", anyToAttrs(args)...)
|
||||
}
|
||||
|
||||
func (l *AppAsynqLogger) Error(args ...any) {
|
||||
slog.Error("Asynq", anyToAttrs(args)...)
|
||||
}
|
||||
|
||||
func (l *AppAsynqLogger) Fatal(args ...any) {
|
||||
slog.Error("Asynq[Fatal]", anyToAttrs(args)...)
|
||||
}
|
||||
|
||||
func anyToAttrs(args ...any) []any {
|
||||
attrs := make([]any, len(args))
|
||||
for i, arg := range args {
|
||||
attrs[i] = slog.Any(fmt.Sprintf("arg%d", i), arg)
|
||||
}
|
||||
return attrs
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user