diff --git a/src/server/game/LogMgr.go b/src/server/game/LogMgr.go index 6c308110..beab8867 100644 --- a/src/server/game/LogMgr.go +++ b/src/server/game/LogMgr.go @@ -5,8 +5,7 @@ import ( kafkaMiddleware "server/middleware/kafka" "server/pkg/github.com/name5566/leaf/log" "sync" - - "github.com/robfig/cron/v3" + "time" ) const ( @@ -17,12 +16,16 @@ const ( const ( PLAYROOM_LOST = "playroom_lost" LOG_LENGTH = 10000 + WORKER_COUNT = 8 ) type LogMgr struct { - L []*Log - McronSave *cron.Cron - Lock sync.Mutex + // 将切片改为带缓冲通道 + L chan *Log + Lock sync.Mutex + wg sync.WaitGroup + stopOnce sync.Once + closing bool } type Log struct { @@ -35,47 +38,98 @@ type Log struct { } func (L *LogMgr) InitManager() { - L.McronSave = cron.New() - L.L = make([]*Log, 0, LOG_LENGTH) - L.McronSave.AddFunc("@every 10s", func() { - L.Lock.Lock() - defer L.Lock.Unlock() - if kafkaMiddleware.KafkaMod == nil { - if len(L.L) > LOG_LENGTH { - L.L = L.L[:0] - } else if len(L.L) > LOG_LENGTH { - L.L = L.L[len(L.L)-LOG_LENGTH:] + // 初始化通道与启动 worker 循环写入 kafka + L.L = make(chan *Log, LOG_LENGTH) + + // 启动 worker + workerCount := WORKER_COUNT + for i := 0; i < workerCount; i++ { + L.wg.Add(1) + go func() { + defer L.wg.Done() + for v := range L.L { + // 如果 kafka 未就绪,尝试重入队列并等待一小段时间 + if kafkaMiddleware.KafkaMod == nil { + // 尝试非阻塞重入,否则丢弃最旧再试一次 + select { + case L.L <- v: + default: + select { + case <-L.L: + default: + } + select { + case L.L <- v: + default: + } + } + time.Sleep(100 * time.Millisecond) + continue + } + + value, _ := json.Marshal(v) + if err := kafkaMiddleware.SendMsg([]byte(v.EventName), value); err != nil { + log.Debug("kafka log send err:%s", err.Error()) + // 发送失败,尝试将消息放回通道(非阻塞策略) + select { + case L.L <- v: + default: + // 通道满,丢弃最旧一条后再尝试一次入队 + select { + case <-L.L: + default: + } + select { + case L.L <- v: + default: + // 放不下则丢弃 + } + } + // 轻微退避,避免快速循环 + time.Sleep(50 * time.Millisecond) + } } - return - } - Quene := L.L - NewQuene := make([]*Log, 0, LOG_LENGTH) - for _, v := range Quene { - value, _ := json.Marshal(v) - err := kafkaMiddleware.SendMsg([]byte(v.EventName), value) - if err != nil { - log.Debug("kafka log in err:%s", err.Error()) - NewQuene = append(NewQuene, v) - } - } - L.L = NewQuene - }) - L.McronSave.Start() + }() + } } func (L *LogMgr) AddLog(logs *Log) { + // 如果已经开始关闭,直接丢弃 L.Lock.Lock() - defer L.Lock.Unlock() - L.L = append(L.L, logs) + if L.closing { + L.Lock.Unlock() + return + } + // 非阻塞入队:若通道满则先丢弃最旧一条再入队,避免阻塞调用者 + select { + case L.L <- logs: + L.Lock.Unlock() + return + default: + // 丢弃最旧一条以腾出空间(若有) + select { + case <-L.L: + default: + } + // 再尝试入队一次 + select { + case L.L <- logs: + default: + // 放不下就直接丢弃 + } + L.Lock.Unlock() + } } func (L *LogMgr) Close() { - L.McronSave.Stop() - L.Lock.Lock() - defer L.Lock.Unlock() - for _, v := range L.L { - value, _ := json.Marshal(v) - kafkaMiddleware.SendMsg([]byte(v.EventName), value) - } - L.L = L.L[:0] + L.stopOnce.Do(func() { + L.Lock.Lock() + // 标记为正在关闭,阻止后续入队 + L.closing = true + // 关闭通道,通知所有 worker 退出(workers 会消费完所有已入队的消息) + close(L.L) + L.Lock.Unlock() + // 等待所有 worker 处理完 + L.wg.Wait() + }) } diff --git a/src/server/main.go b/src/server/main.go index a1b8ea1a..0617485d 100644 --- a/src/server/main.go +++ b/src/server/main.go @@ -1,11 +1,13 @@ package main import ( + "net/http" + _ "net/http/pprof" + "runtime/debug" "server/conf" "server/game" "server/gate" "server/pkg/github.com/name5566/leaf" - lconf "server/pkg/github.com/name5566/leaf/conf" ) @@ -19,6 +21,14 @@ func main() { lconf.ListenAddr = conf.Server.ListenAddr lconf.CenterAddr = conf.Server.CenterAddr lconf.PendingWriteNum = conf.PendingWriteNum + // 当内存>2G时开始GC + debug.SetGCPercent(50) + debug.SetMemoryLimit(2 << 30) + // 启动 pprof(仅绑定本地) + go func() { + // 如果需要绑定所有接口改为 ":6060" + _ = http.ListenAndServe("127.0.0.1:6060", nil) + }() leaf.Run( game.Module,