日志优化,GC优化

This commit is contained in:
hahwu 2025-12-04 15:27:54 +08:00
parent 4421b9f613
commit 1191168a19
2 changed files with 105 additions and 41 deletions

View File

@ -5,8 +5,7 @@ import (
kafkaMiddleware "server/middleware/kafka" kafkaMiddleware "server/middleware/kafka"
"server/pkg/github.com/name5566/leaf/log" "server/pkg/github.com/name5566/leaf/log"
"sync" "sync"
"time"
"github.com/robfig/cron/v3"
) )
const ( const (
@ -17,12 +16,16 @@ const (
const ( const (
PLAYROOM_LOST = "playroom_lost" PLAYROOM_LOST = "playroom_lost"
LOG_LENGTH = 10000 LOG_LENGTH = 10000
WORKER_COUNT = 8
) )
type LogMgr struct { type LogMgr struct {
L []*Log // 将切片改为带缓冲通道
McronSave *cron.Cron L chan *Log
Lock sync.Mutex Lock sync.Mutex
wg sync.WaitGroup
stopOnce sync.Once
closing bool
} }
type Log struct { type Log struct {
@ -35,47 +38,98 @@ type Log struct {
} }
func (L *LogMgr) InitManager() { func (L *LogMgr) InitManager() {
L.McronSave = cron.New() // 初始化通道与启动 worker 循环写入 kafka
L.L = make([]*Log, 0, LOG_LENGTH) L.L = make(chan *Log, LOG_LENGTH)
L.McronSave.AddFunc("@every 10s", func() {
L.Lock.Lock() // 启动 worker
defer L.Lock.Unlock() workerCount := WORKER_COUNT
if kafkaMiddleware.KafkaMod == nil { for i := 0; i < workerCount; i++ {
if len(L.L) > LOG_LENGTH { L.wg.Add(1)
L.L = L.L[:0] go func() {
} else if len(L.L) > LOG_LENGTH { defer L.wg.Done()
L.L = L.L[len(L.L)-LOG_LENGTH:] for v := range L.L {
// 如果 kafka 未就绪,尝试重入队列并等待一小段时间
if kafkaMiddleware.KafkaMod == nil {
// 尝试非阻塞重入,否则丢弃最旧再试一次
select {
case L.L <- v:
default:
select {
case <-L.L:
default:
}
select {
case L.L <- v:
default:
}
}
time.Sleep(100 * time.Millisecond)
continue
}
value, _ := json.Marshal(v)
if err := kafkaMiddleware.SendMsg([]byte(v.EventName), value); err != nil {
log.Debug("kafka log send err:%s", err.Error())
// 发送失败,尝试将消息放回通道(非阻塞策略)
select {
case L.L <- v:
default:
// 通道满,丢弃最旧一条后再尝试一次入队
select {
case <-L.L:
default:
}
select {
case L.L <- v:
default:
// 放不下则丢弃
}
}
// 轻微退避,避免快速循环
time.Sleep(50 * time.Millisecond)
}
} }
return }()
} }
Quene := L.L
NewQuene := make([]*Log, 0, LOG_LENGTH)
for _, v := range Quene {
value, _ := json.Marshal(v)
err := kafkaMiddleware.SendMsg([]byte(v.EventName), value)
if err != nil {
log.Debug("kafka log in err:%s", err.Error())
NewQuene = append(NewQuene, v)
}
}
L.L = NewQuene
})
L.McronSave.Start()
} }
func (L *LogMgr) AddLog(logs *Log) { func (L *LogMgr) AddLog(logs *Log) {
// 如果已经开始关闭,直接丢弃
L.Lock.Lock() L.Lock.Lock()
defer L.Lock.Unlock() if L.closing {
L.L = append(L.L, logs) L.Lock.Unlock()
return
}
// 非阻塞入队:若通道满则先丢弃最旧一条再入队,避免阻塞调用者
select {
case L.L <- logs:
L.Lock.Unlock()
return
default:
// 丢弃最旧一条以腾出空间(若有)
select {
case <-L.L:
default:
}
// 再尝试入队一次
select {
case L.L <- logs:
default:
// 放不下就直接丢弃
}
L.Lock.Unlock()
}
} }
func (L *LogMgr) Close() { func (L *LogMgr) Close() {
L.McronSave.Stop() L.stopOnce.Do(func() {
L.Lock.Lock() L.Lock.Lock()
defer L.Lock.Unlock() // 标记为正在关闭,阻止后续入队
for _, v := range L.L { L.closing = true
value, _ := json.Marshal(v) // 关闭通道,通知所有 worker 退出workers 会消费完所有已入队的消息)
kafkaMiddleware.SendMsg([]byte(v.EventName), value) close(L.L)
} L.Lock.Unlock()
L.L = L.L[:0] // 等待所有 worker 处理完
L.wg.Wait()
})
} }

View File

@ -1,11 +1,13 @@
package main package main
import ( import (
"net/http"
_ "net/http/pprof"
"runtime/debug"
"server/conf" "server/conf"
"server/game" "server/game"
"server/gate" "server/gate"
"server/pkg/github.com/name5566/leaf" "server/pkg/github.com/name5566/leaf"
lconf "server/pkg/github.com/name5566/leaf/conf" lconf "server/pkg/github.com/name5566/leaf/conf"
) )
@ -19,6 +21,14 @@ func main() {
lconf.ListenAddr = conf.Server.ListenAddr lconf.ListenAddr = conf.Server.ListenAddr
lconf.CenterAddr = conf.Server.CenterAddr lconf.CenterAddr = conf.Server.CenterAddr
lconf.PendingWriteNum = conf.PendingWriteNum lconf.PendingWriteNum = conf.PendingWriteNum
// 当内存>2G时开始GC
debug.SetGCPercent(50)
debug.SetMemoryLimit(2 << 30)
// 启动 pprof仅绑定本地
go func() {
// 如果需要绑定所有接口改为 ":6060"
_ = http.ListenAndServe("127.0.0.1:6060", nil)
}()
leaf.Run( leaf.Run(
game.Module, game.Module,