忽略
This commit is contained in:
@@ -1,3 +1,3 @@
|
||||
set GOARCH=amd64
|
||||
set GOOS=linux
|
||||
set GOARCH=amd64
|
||||
set GOOS=linux
|
||||
go build
|
||||
@@ -1,8 +1,8 @@
|
||||
module log_storage
|
||||
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis v6.15.2+incompatible
|
||||
github.com/lib/pq v1.2.0
|
||||
)
|
||||
module log_storage
|
||||
|
||||
go 1.13
|
||||
|
||||
require (
|
||||
github.com/go-redis/redis v6.15.2+incompatible
|
||||
github.com/lib/pq v1.2.0
|
||||
)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
|
||||
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
||||
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
|
||||
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
|
||||
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
|
||||
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
|
||||
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
|
||||
|
||||
@@ -1,131 +1,131 @@
|
||||
package logparse
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
type filebeat struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type Log struct {
|
||||
Time string
|
||||
Title string
|
||||
Message string
|
||||
Level string
|
||||
App string
|
||||
}
|
||||
|
||||
var lineSplitChar = "\n"
|
||||
var leftMiddleBracketsChar = "["
|
||||
var rightMiddleBracketsChar = "]"
|
||||
var assemblyChars = "Assembly"
|
||||
var titleChars = "Title"
|
||||
var messageChars = "Message :"
|
||||
var sysLogStartChars1 = "at lambda_method(Closure , Object )"
|
||||
var sysLogStartChars2 = "at Microsoft.Extensions."
|
||||
var sysLogStartChars3 = "at Microsoft.AspNetCore"
|
||||
|
||||
func LogParseStart(parse <-chan *string, db chan<- *Log) {
|
||||
|
||||
for fileBeatStr := range parse {
|
||||
|
||||
jsonobj := filebeat{}
|
||||
|
||||
err := json.Unmarshal([]byte(*fileBeatStr), &jsonobj)
|
||||
|
||||
if err != nil {
|
||||
println(err)
|
||||
}
|
||||
|
||||
log, err := parseLog(&jsonobj.Message)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
db <- log
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func parseLog(logChars *string) (log *Log, err error) {
|
||||
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
err = fmt.Errorf("解析日志失败:%v", p)
|
||||
}
|
||||
}()
|
||||
|
||||
log = &Log{}
|
||||
|
||||
lines := strings.Split(*logChars, lineSplitChar)
|
||||
|
||||
for index, line := range lines {
|
||||
|
||||
i := index + 1
|
||||
|
||||
if i == 1 && strings.HasPrefix(line, leftMiddleBracketsChar) {
|
||||
|
||||
timeEndIndex := strings.Index(line, rightMiddleBracketsChar)
|
||||
|
||||
log.Level = slice(line, timeEndIndex+2, -1)
|
||||
log.Time = slice(line, strings.Index(line, leftMiddleBracketsChar)+1, timeEndIndex-1)
|
||||
}
|
||||
|
||||
if i == 2 && strings.HasPrefix(line, assemblyChars) {
|
||||
|
||||
log.App = strings.TrimSpace(slice(line, 9, -1))
|
||||
}
|
||||
|
||||
if i == 3 && strings.HasPrefix(line, titleChars) {
|
||||
log.Title = slice(line, 7, -1)
|
||||
}
|
||||
|
||||
if i > 3 {
|
||||
|
||||
if utf8Len(line) > 0 {
|
||||
|
||||
if i == 4 && strings.HasPrefix(line, messageChars) {
|
||||
line = slice(line, 9, -1)
|
||||
}
|
||||
|
||||
if log.Level == "ERROR" &&
|
||||
(strings.HasPrefix(strings.TrimSpace(line), sysLogStartChars1) ||
|
||||
strings.HasPrefix(strings.TrimSpace(line), sysLogStartChars2) ||
|
||||
strings.HasPrefix(strings.TrimSpace(line), sysLogStartChars3)) {
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
log.Message += line + "\n"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func slice(str string, start int, end int) string {
|
||||
|
||||
str2 := []rune(str)
|
||||
|
||||
len := len(str2)
|
||||
|
||||
if start < 0 || start > len {
|
||||
start = 0
|
||||
}
|
||||
if end < 0 || end > len {
|
||||
end = len
|
||||
}
|
||||
|
||||
return string(str2[start:end])
|
||||
}
|
||||
|
||||
func utf8Len(str string) int {
|
||||
return utf8.RuneCountInString(str)
|
||||
}
|
||||
package logparse
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
)
|
||||
|
||||
type filebeat struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type Log struct {
|
||||
Time string
|
||||
Title string
|
||||
Message string
|
||||
Level string
|
||||
App string
|
||||
}
|
||||
|
||||
var lineSplitChar = "\n"
|
||||
var leftMiddleBracketsChar = "["
|
||||
var rightMiddleBracketsChar = "]"
|
||||
var assemblyChars = "Assembly"
|
||||
var titleChars = "Title"
|
||||
var messageChars = "Message :"
|
||||
var sysLogStartChars1 = "at lambda_method(Closure , Object )"
|
||||
var sysLogStartChars2 = "at Microsoft.Extensions."
|
||||
var sysLogStartChars3 = "at Microsoft.AspNetCore"
|
||||
|
||||
func LogParseStart(parse <-chan *string, db chan<- *Log) {
|
||||
|
||||
for fileBeatStr := range parse {
|
||||
|
||||
jsonobj := filebeat{}
|
||||
|
||||
err := json.Unmarshal([]byte(*fileBeatStr), &jsonobj)
|
||||
|
||||
if err != nil {
|
||||
println(err)
|
||||
}
|
||||
|
||||
log, err := parseLog(&jsonobj.Message)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
db <- log
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
func parseLog(logChars *string) (log *Log, err error) {
|
||||
|
||||
defer func() {
|
||||
if p := recover(); p != nil {
|
||||
err = fmt.Errorf("解析日志失败:%v", p)
|
||||
}
|
||||
}()
|
||||
|
||||
log = &Log{}
|
||||
|
||||
lines := strings.Split(*logChars, lineSplitChar)
|
||||
|
||||
for index, line := range lines {
|
||||
|
||||
i := index + 1
|
||||
|
||||
if i == 1 && strings.HasPrefix(line, leftMiddleBracketsChar) {
|
||||
|
||||
timeEndIndex := strings.Index(line, rightMiddleBracketsChar)
|
||||
|
||||
log.Level = slice(line, timeEndIndex+2, -1)
|
||||
log.Time = slice(line, strings.Index(line, leftMiddleBracketsChar)+1, timeEndIndex-1)
|
||||
}
|
||||
|
||||
if i == 2 && strings.HasPrefix(line, assemblyChars) {
|
||||
|
||||
log.App = strings.TrimSpace(slice(line, 9, -1))
|
||||
}
|
||||
|
||||
if i == 3 && strings.HasPrefix(line, titleChars) {
|
||||
log.Title = slice(line, 7, -1)
|
||||
}
|
||||
|
||||
if i > 3 {
|
||||
|
||||
if utf8Len(line) > 0 {
|
||||
|
||||
if i == 4 && strings.HasPrefix(line, messageChars) {
|
||||
line = slice(line, 9, -1)
|
||||
}
|
||||
|
||||
if log.Level == "ERROR" &&
|
||||
(strings.HasPrefix(strings.TrimSpace(line), sysLogStartChars1) ||
|
||||
strings.HasPrefix(strings.TrimSpace(line), sysLogStartChars2) ||
|
||||
strings.HasPrefix(strings.TrimSpace(line), sysLogStartChars3)) {
|
||||
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
log.Message += line + "\n"
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func slice(str string, start int, end int) string {
|
||||
|
||||
str2 := []rune(str)
|
||||
|
||||
len := len(str2)
|
||||
|
||||
if start < 0 || start > len {
|
||||
start = 0
|
||||
}
|
||||
if end < 0 || end > len {
|
||||
end = len
|
||||
}
|
||||
|
||||
return string(str2[start:end])
|
||||
}
|
||||
|
||||
func utf8Len(str string) int {
|
||||
return utf8.RuneCountInString(str)
|
||||
}
|
||||
|
||||
@@ -1,29 +1,29 @@
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log_storage/logparse"
|
||||
)
|
||||
|
||||
func PgsqlStorageStart(logchan <-chan *logparse.Log, dbconn string) {
|
||||
|
||||
db, err := sql.Open("postgres", dbconn)
|
||||
|
||||
if err != nil {
|
||||
panic("pgsql连接失败")
|
||||
}
|
||||
|
||||
for log := range logchan {
|
||||
|
||||
sql := "INSERT INTO psiplog (time,title,message,level,app) VALUES($1,$2,$3,$4,$5);"
|
||||
|
||||
_, err := db.Exec(sql, log.Time, log.Title, log.Message, log.Level, log.App)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
fmt.Println("入库一条数据")
|
||||
}
|
||||
}
|
||||
}
|
||||
package logstorage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log_storage/logparse"
|
||||
)
|
||||
|
||||
func PgsqlStorageStart(logchan <-chan *logparse.Log, dbconn string) {
|
||||
|
||||
db, err := sql.Open("postgres", dbconn)
|
||||
|
||||
if err != nil {
|
||||
panic("pgsql连接失败")
|
||||
}
|
||||
|
||||
for log := range logchan {
|
||||
|
||||
sql := "INSERT INTO psiplog (time,title,message,level,app) VALUES($1,$2,$3,$4,$5);"
|
||||
|
||||
_, err := db.Exec(sql, log.Time, log.Title, log.Message, log.Level, log.App)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
fmt.Println("入库一条数据")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,40 +1,40 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"log_storage/logparse"
|
||||
"log_storage/logstorage"
|
||||
logreids "log_storage/redis"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
redisAddr := flag.String("RedisAddr", "", "reids server ip地址")
|
||||
redisPort := flag.Int("RedisPort", 6379, "redis端口号")
|
||||
redisPassword := flag.String("RedisPwd", "", "redis密码")
|
||||
pgsqlConn := flag.String("PgsqlConn", "", "pgsql连接字符串(host=192.168.1.245 port=5432 user=postgres password=123456 dbname=log)")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
redisClient := redis.NewClient(&redis.Options{
|
||||
Addr: *redisAddr + ":" + strconv.Itoa(*redisPort),
|
||||
Password: *redisPassword,
|
||||
DB: 14,
|
||||
})
|
||||
|
||||
parseChan := make(chan *string)
|
||||
dbChan := make(chan *logparse.Log)
|
||||
|
||||
go logstorage.PgsqlStorageStart(dbChan, *pgsqlConn)
|
||||
go logparse.LogParseStart(parseChan, dbChan)
|
||||
go logreids.RedisPullStart(parseChan, redisClient)
|
||||
|
||||
waiting := make(chan interface{})
|
||||
|
||||
<-waiting
|
||||
|
||||
}
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"log_storage/logparse"
|
||||
"log_storage/logstorage"
|
||||
logreids "log_storage/redis"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-redis/redis"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
redisAddr := flag.String("RedisAddr", "", "reids server ip地址")
|
||||
redisPort := flag.Int("RedisPort", 6379, "redis端口号")
|
||||
redisPassword := flag.String("RedisPwd", "", "redis密码")
|
||||
pgsqlConn := flag.String("PgsqlConn", "", "pgsql连接字符串(host=192.168.1.245 port=5432 user=postgres password=123456 dbname=log)")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
redisClient := redis.NewClient(&redis.Options{
|
||||
Addr: *redisAddr + ":" + strconv.Itoa(*redisPort),
|
||||
Password: *redisPassword,
|
||||
DB: 14,
|
||||
})
|
||||
|
||||
parseChan := make(chan *string)
|
||||
dbChan := make(chan *logparse.Log)
|
||||
|
||||
go logstorage.PgsqlStorageStart(dbChan, *pgsqlConn)
|
||||
go logparse.LogParseStart(parseChan, dbChan)
|
||||
go logreids.RedisPullStart(parseChan, redisClient)
|
||||
|
||||
waiting := make(chan interface{})
|
||||
|
||||
<-waiting
|
||||
|
||||
}
|
||||
|
||||
@@ -1,32 +1,32 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/go-redis/redis"
|
||||
"time"
|
||||
)
|
||||
|
||||
func RedisPullStart(parse chan<- *string, client *redis.Client) {
|
||||
|
||||
pong, err := client.Ping().Result()
|
||||
|
||||
if err != nil {
|
||||
panic("redis连接失败")
|
||||
}
|
||||
|
||||
fmt.Println(pong, err)
|
||||
|
||||
for {
|
||||
|
||||
vals, err := client.BLPop(0, "filebeat").Result()
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
||||
|
||||
if len(vals) == 2 {
|
||||
parse <- &vals[1]
|
||||
}
|
||||
}
|
||||
}
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/go-redis/redis"
|
||||
"time"
|
||||
)
|
||||
|
||||
func RedisPullStart(parse chan<- *string, client *redis.Client) {
|
||||
|
||||
pong, err := client.Ping().Result()
|
||||
|
||||
if err != nil {
|
||||
panic("redis连接失败")
|
||||
}
|
||||
|
||||
fmt.Println(pong, err)
|
||||
|
||||
for {
|
||||
|
||||
vals, err := client.BLPop(0, "filebeat").Result()
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
time.Sleep(time.Second * 10)
|
||||
}
|
||||
|
||||
if len(vals) == 2 {
|
||||
parse <- &vals[1]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
#!/bin/bash
|
||||
exec $(dirname "$0")/log_storage \
|
||||
-RedisAddr=192.168.1.245 \
|
||||
-RedisPwd=123456 \
|
||||
#!/bin/bash
|
||||
exec $(dirname "$0")/log_storage \
|
||||
-RedisAddr=192.168.1.245 \
|
||||
-RedisPwd=123456 \
|
||||
-PgsqlConn="host=192.168.1.245 port=5432 user=postgres password=123456 dbname=log sslmode=disable"
|
||||
Reference in New Issue
Block a user