package game import ( "encoding/json" "runtime" kafkaMiddleware "server/middleware/kafka" "server/pkg/github.com/name5566/leaf/log" "sync" "sync/atomic" "time" ) const ( Login_log = 1 LoginOut_log = 2 Event_log = 3 ) const ( PLAYROOM_LOST = "playroom_lost" // 增大缓冲:针对 ~1000 条/s,保留足够秒数的缓冲(可按需调整) LOG_LENGTH = 100000 WORKER_COUNT = 500 ) type LogMgr struct { // 将切片改为带缓冲通道 L chan *Log Lock sync.Mutex wg sync.WaitGroup stopOnce sync.Once closing bool } type Log struct { Uid int64 AppId int ServerId int EventName string Param map[string]interface{} TimeStamp int64 } func (L *LogMgr) InitManager() { // 初始化通道 L.L = make(chan *Log, LOG_LENGTH) // 动态协程池参数 maxWorkers := WORKER_COUNT if cpuWorkers := runtime.NumCPU() * 2; cpuWorkers > maxWorkers { maxWorkers = cpuWorkers } minWorkers := 100 idleTimeout := 500 * time.Millisecond var activeWorkers int32 // worker 启动函数 spawnWorker := func() { L.wg.Add(1) atomic.AddInt32(&activeWorkers, 1) go func() { defer L.wg.Done() defer atomic.AddInt32(&activeWorkers, -1) for { // 首先阻塞等待一个任务(若 channel 关闭则退出) v, ok := <-L.L if !ok { return } // 处理消息(与原来逻辑一致) value, _ := json.Marshal(v) if kafkaMiddleware.KafkaMod == nil { // 尝试非阻塞重入,否则丢弃最旧再试一次 select { case L.L <- v: default: select { case <-L.L: default: } select { case L.L <- v: default: } } time.Sleep(10 * time.Millisecond) } else { if err := kafkaMiddleware.SendMsg([]byte(v.EventName), value); err != nil { log.Debug("kafka log send err:%s", err.Error()) select { case L.L <- v: default: select { case <-L.L: default: } select { case L.L <- v: default: } } time.Sleep(10 * time.Millisecond) } } // 快速循环处理后续可用任务,若超时则退出该 worker(回收) timer := time.NewTimer(idleTimeout) for { select { case v, ok := <-L.L: if !ok { if !timer.Stop() { <-timer.C } return } // 处理消息 value, _ := json.Marshal(v) if kafkaMiddleware.KafkaMod == nil { select { case L.L <- v: default: select { case <-L.L: default: } select { case L.L <- v: default: } } time.Sleep(10 * time.Millisecond) } else { if err := kafkaMiddleware.SendMsg([]byte(v.EventName), value); err != nil { log.Debug("kafka log send err:%s", err.Error()) select { case L.L <- v: default: select { case <-L.L: default: } select { case L.L <- v: default: } } time.Sleep(10 * time.Millisecond) } } // 重置定时器以继续快速处理 if !timer.Stop() { <-timer.C } timer.Reset(idleTimeout) case <-timer.C: timer.Stop() // 空闲超时,退出该 worker return } } } }() } // 启动初始最小 worker 数量 for i := 0; i < minWorkers; i++ { spawnWorker() } // 监督器:动态根据队列长度扩展 worker,但不超过 maxWorkers go func() { ticker := time.NewTicker(200 * time.Millisecond) defer ticker.Stop() for range ticker.C { L.Lock.Lock() closing := L.closing L.Lock.Unlock() if closing { return } backlog := len(L.L) active := int(atomic.LoadInt32(&activeWorkers)) if backlog > active && active < maxWorkers { toSpawn := backlog - active remain := maxWorkers - active if toSpawn > remain { toSpawn = remain } for i := 0; i < toSpawn; i++ { spawnWorker() } } } }() } func (L *LogMgr) AddLog(logs *Log) { // 复制结构体和 Param map,避免并发修改导致 json.Marshal 时 panic copyLog := *logs if logs.Param != nil { newParam := make(map[string]interface{}, len(logs.Param)) for k, v := range logs.Param { newParam[k] = v } copyLog.Param = newParam } // 如果已经开始关闭,直接丢弃 L.Lock.Lock() if L.closing { L.Lock.Unlock() return } // 非阻塞入队:若通道满则先丢弃最旧一条再入队,避免阻塞调用者 select { case L.L <- logs: L.Lock.Unlock() return default: // 丢弃最旧一条以腾出空间(若有) select { case <-L.L: default: } // 再尝试入队一次 select { case L.L <- logs: default: // 放不下就直接丢弃 } L.Lock.Unlock() } } func (L *LogMgr) Close() { L.stopOnce.Do(func() { L.Lock.Lock() // 标记为正在关闭,阻止后续入队 L.closing = true // 关闭通道,通知所有 worker 退出(workers 会消费完所有已入队的消息) close(L.L) L.Lock.Unlock() // 等待所有 worker 处理完 L.wg.Wait() }) }