Files
platform/web/globals/baiyin.go

608 lines
14 KiB
Go

package globals
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"platform/pkg/env"
"platform/web/core"
"strconv"
"strings"
"time"
)
// CloudClient 定义云服务接口
type CloudClient interface {
CloudEdges(param *CloudEdgesReq) (*CloudEdgesResp, error)
CloudConnect(param *CloudConnectReq) error
CloudDisconnect(param *CloudDisconnectReq) (int, error)
CloudAutoQuery() (CloudConnectResp, error)
}
type cloud struct {
url string
}
var Cloud CloudClient
func initBaiyin() error {
Cloud = &cloud{
url: env.BaiyinCloudUrl,
}
return nil
}
// cloud:/edges 筛选查询边缘节点
func (c *cloud) CloudEdges(param *CloudEdgesReq) (*CloudEdgesResp, error) {
resp, err := c.requestCloud("GET", "/edges?"+core.Query(param).Encode(), "")
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, errors.New("failed to get edges")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result CloudEdgesResp
err = json.Unmarshal(body, &result)
if err != nil {
return nil, err
}
return &result, nil
}
type CloudEdgesReq struct {
Province *string `query:"province"`
City *string `query:"city"`
Isp *string `query:"isp"`
Offset *int `query:"offset"`
Limit *int `query:"limit"`
NoRepeat *bool `query:"norepeat,b2i"`
NoDayRepeat *bool `query:"nodayrepeat,b2i"`
IpUnchangedTime *int `query:"ip_unchanged_time"` // 单位秒
ActiveTime *int `query:"active_time"` // 单位秒
// 排序方式,可选值:
// - create_time_asc 设备创建时间顺序
// - create_time_desc 设备创建时间倒序
// - ip_unchanged_time_asc ip持续没变化时间顺序
// - ip_unchanged_time_desc ip持续没变化时间倒序
// - active_time_asc 连续活跃时间顺序
// - active_time_desc 连续活跃时间倒序
// - rand 随机排序 (默认)
Sort *string `query:"sort"`
}
type CloudEdgesResp struct {
Edges []Edge `json:"edges"`
Total int `json:"total"`
Offset int `json:"offset"`
Limit int `json:"limit"`
}
type Edge struct {
EdgeID string `json:"edge_id"`
Province string `json:"province"`
City string `json:"city"`
Isp string `json:"isp"`
Ip string `json:"ip"`
Rtt int `json:"rtt"`
PacketLoss int `json:"packet_loss"`
}
// cloud:/connect 连接边缘节点到网关
func (c *cloud) CloudConnect(param *CloudConnectReq) error {
data, err := json.Marshal(param)
if err != nil {
return err
}
resp, err := c.requestCloud("POST", "/connect", string(data))
if err != nil {
return err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return errors.New("failed to connect")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
var result map[string]any
err = json.Unmarshal(body, &result)
if err != nil {
return err
}
if result["status"] == "error" {
return errors.New(result["details"].(string))
}
return nil
}
type CloudConnectReq struct {
Uuid string `json:"uuid"`
Edge *[]string `json:"edge,omitempty"`
AutoConfig *[]AutoConfig `json:"auto_config,omitempty"`
}
type AutoConfig struct {
Province string `json:"province"`
City string `json:"city"`
Isp string `json:"isp"`
Count int `json:"count"`
}
// cloud:/disconnect 解除连接边缘节点到网关
func (c *cloud) CloudDisconnect(param *CloudDisconnectReq) (int, error) {
data, err := json.Marshal(param)
if err != nil {
return 0, err
}
resp, err := c.requestCloud("POST", "/disconnect", string(data))
if err != nil {
return 0, err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return 0, errors.New("failed to disconnect")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, err
}
var result map[string]any
err = json.Unmarshal(body, &result)
if err != nil {
return 0, err
}
if result["status"] == "error" {
return 0, errors.New(result["details"].(string))
}
return int(result["disconnected_edges"].(float64)), nil
}
type CloudDisconnectReq struct {
Uuid string `json:"uuid"`
Edge *[]string `json:"edge,omitempty"`
Config *[]Config `json:"auto_config,omitempty"`
}
type Config struct {
Province string `json:"province"`
City string `json:"city"`
Isp string `json:"isp"`
Count int `json:"count"`
Online bool `json:"online"`
}
// cloud:/auto_query 自动连接配置查询
func (c *cloud) CloudAutoQuery() (CloudConnectResp, error) {
resp, err := c.requestCloud("GET", "/auto_query", "")
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, errors.New("failed to get auto_query")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result CloudConnectResp
err = json.Unmarshal(body, &result)
if err != nil {
return nil, err
}
return result, nil
}
type CloudConnectResp map[string][]AutoConfig
func (c *cloud) requestCloud(method string, url string, data string) (*http.Response, error) {
url = fmt.Sprintf("%s/api%s", c.url, url)
req, err := http.NewRequest(method, url, strings.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
var resp *http.Response
for i := range 2 {
token, err := c.token(i == 1)
if err != nil {
return nil, err
}
req.Header.Set("token", token)
if env.DebugHttpDump {
str, err := httputil.DumpRequest(req, true)
if err != nil {
return nil, err
}
fmt.Println("==============================")
fmt.Println(string(str))
}
resp, err = http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
if env.DebugHttpDump {
str, err := httputil.DumpResponse(resp, true)
if err != nil {
return nil, err
}
fmt.Println("------------------------------")
fmt.Println(string(str))
}
if resp.StatusCode != 401 {
break
}
}
return resp, nil
}
func (c *cloud) token(refresh bool) (string, error) {
// redis 获取令牌
if !refresh {
token, err := Redis.Get(context.Background(), BaiyinToken).Result()
if err == nil && token != "" {
return token, nil
}
}
// redis 获取失败,重新获取
resp, err := http.Get(env.BaiyinTokenUrl)
if err != nil {
return "", err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if env.DebugHttpDump {
dump, err := httputil.DumpResponse(resp, true)
if err != nil {
return "", err
}
fmt.Println(string(dump))
}
if resp.StatusCode != http.StatusOK {
return "", errors.New("failed to get token")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", err
}
var result map[string]any
err = json.Unmarshal(body, &result)
if err != nil {
return "", fmt.Errorf("解析响应 [%s] 失败: %w", string(body), err)
}
if result["code"].(float64) != 1 {
return "", errors.New("failed to get cloud token")
}
// redis 设置令牌
token := result["token"].(string)
err = Redis.Set(context.Background(), BaiyinToken, token, 1*time.Hour).Err()
if err != nil {
return "", err
}
return token, nil
}
const BaiyinToken = "clients:baiyin:token"
// GatewayClient 定义网关接口
type GatewayClient interface {
GatewayPortConfigs(params []*PortConfigsReq) error
GatewayPortActive(param ...*PortActiveReq) (map[string]PortData, error)
GatewayEdge(params *GatewayEdgeReq) (map[string]GatewayEdgeInfo, error)
}
type gateway struct {
url string
username string
password string
}
var GatewayInitializer = func(url, username, password string) GatewayClient {
return &gateway{
url: url,
username: username,
password: password,
}
}
func NewGateway(url, username, password string) GatewayClient {
return GatewayInitializer(url, username, password)
}
type GatewayEdgeReq struct {
EdgeID *string `query:"edge_id"`
Province *string `query:"province"`
City *string `query:"city"`
Isp *string `query:"isp"`
Connected *bool `query:"connected"`
Assigned *bool `query:"assigned"`
GetRand *int `query:"getRand"`
IpUnchangedTimeStart *int `query:"ip_unchanged_time_start"`
IpUnchangedTimeEnd *int `query:"ip_unchanged_time_end"`
OnlineTimeStart *int `query:"online_time_start"`
OnlineTimeEnd *int `query:"online_time_end"`
Rtt *int `query:"rtt"`
MinRtt *int `query:"min_rtt"`
RttBaidu *int `query:"rtt_baidu"`
PacketLoss *int `query:"packet_loss"`
PacketLossBaidu *int `query:"packet_loss_baidu"`
IP *string `query:"ip"`
Limit *int `query:"limit"`
Offset *int `query:"offset"`
}
type GatewayEdgeResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data map[string]GatewayEdgeInfo `json:"data"`
Total int `json:"total"`
}
type GatewayEdgeInfo struct {
IP string `json:"ip"`
Connected bool `json:"connected"`
Assigned bool `json:"assigned"`
AssignedTo string `json:"assignedto"`
PacketLoss int `json:"packet_loss"`
PacketLossBaidu int `json:"packet_loss_baidu"`
Rtt int `json:"rtt"`
RttBaidu int `json:"rtt_baidu"`
OfflineTime int `json:"offline_time"`
OnlineTime int `json:"online_time"`
IpUnchangedTime int `json:"ip_unchanged_time"`
}
func (c *gateway) GatewayEdge(req *GatewayEdgeReq) (map[string]GatewayEdgeInfo, error) {
resp, err := c.get("/edge", core.Query(req))
if err != nil {
return nil, fmt.Errorf("查询可用节点失败:%w", err)
}
defer resp.Body.Close()
body := new(GatewayEdgeResp)
if err = json.NewDecoder(resp.Body).Decode(body); err != nil {
return nil, fmt.Errorf("解析响应内容失败:%w", err)
}
if body.Code != 0 {
return nil, fmt.Errorf("接口业务响应异常: %d %s", body.Code, body.Msg)
}
return body.Data, nil
}
// region gateway:/port/configs
type PortConfigsReq struct {
Port int `json:"port"`
Edge *[]string `json:"edge,omitempty"`
Type string `json:"type,omitempty"`
Time int `json:"time,omitempty"`
Status bool `json:"status"`
Rate int `json:"rate,omitempty"`
Whitelist *[]string `json:"whitelist,omitempty"`
Userpass *string `json:"userpass,omitempty"`
AutoEdgeConfig *AutoEdgeConfig `json:"auto_edge_config,omitempty"`
}
type AutoEdgeConfig struct {
Province string `json:"province,omitempty"`
City string `json:"city,omitempty"`
Isp string `json:"isp,omitempty"`
Count *int `json:"count,omitempty"`
PacketLoss int `json:"packet_loss,omitempty"`
}
func (c *gateway) GatewayPortConfigs(params []*PortConfigsReq) error {
if len(params) == 0 {
return errors.New("params is empty")
}
data, err := json.Marshal(params)
if err != nil {
return err
}
resp, err := c.requestGateway("POST", "/port/configs", string(data))
if err != nil {
return err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return errors.New("failed to get port configs: " + string(body))
}
var result map[string]any
err = json.Unmarshal(body, &result)
if err != nil {
return err
}
if result["code"].(float64) != 0 {
return errors.New("failed to configure port")
}
return nil
}
// endregion
// region gateway:/port/active
type PortActiveReq struct {
Port string `json:"port"`
Active *bool `json:"active"`
Status *bool `json:"status"`
}
type PortActiveResp struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data map[string]PortData `json:"data"`
}
type PortData struct {
Edge []string `json:"edge"`
Type string `json:"type"`
Status bool `json:"status"`
Active bool `json:"active"`
Time int `json:"time"`
Whitelist []string `json:"whitelist"`
Userpass string `json:"userpass"`
}
func (c *gateway) GatewayPortActive(param ...*PortActiveReq) (map[string]PortData, error) {
_param := PortActiveReq{}
if len(param) != 0 && param[0] != nil {
_param = *param[0]
}
path := strings.Builder{}
path.WriteString("/port/active")
if _param.Port != "" {
path.WriteString("/")
path.WriteString(_param.Port)
}
values := url.Values{}
if _param.Active != nil {
values.Set("active", strconv.FormatBool(*_param.Active))
}
if _param.Status != nil {
values.Set("status", strconv.FormatBool(*_param.Status))
}
if len(values) > 0 {
path.WriteString("?")
path.WriteString(values.Encode())
}
resp, err := c.requestGateway("GET", path.String(), "")
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, errors.New("failed to get port active")
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
var result PortActiveResp
err = json.Unmarshal(body, &result)
if err != nil {
return nil, err
}
if result.Code != 0 {
return nil, errors.New(result.Msg)
}
return result.Data, nil
}
// endregion
func (c *gateway) get(url string, params url.Values) (*http.Response, error) {
url = fmt.Sprintf("http://%s:%s@%s:9990%s?%s", c.username, c.password, c.url, url, params.Encode())
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, fmt.Errorf("创建请求失败:%w", err)
}
res, err := core.Fetch(req)
if err != nil {
return nil, fmt.Errorf("获取数据失败:%w", err)
}
if res.StatusCode != http.StatusOK {
bytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("接口响应异常: %d %s", res.StatusCode, string(bytes))
}
return res, nil
}
func (c *gateway) requestGateway(method string, url string, data string) (*http.Response, error) {
url = fmt.Sprintf("http://%s:%s@%s:9990%s", c.username, c.password, c.url, url)
req, err := http.NewRequest(method, url, strings.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
return core.Fetch(req)
}