diff --git a/src/server/benchmark_test.go b/src/server/benchmark_test.go index f1c7dc52..dbb7cdb9 100644 --- a/src/server/benchmark_test.go +++ b/src/server/benchmark_test.go @@ -4,6 +4,7 @@ import ( "fmt" "runtime" "server/game" + "server/game/mod/msg" "testing" ) @@ -19,6 +20,25 @@ import ( // } // } +/* +* +cluster 消息处理基准测试 + + 36716 34961 ns/op 1690 B/op 38 allocs/op +*/ +func BenchmarkClusterMsg(b *testing.B) { + game.ClusterMgrInit() + runtime.GOMAXPROCS(8) + game.G_getGameLogic() + for i := 0; i < b.N; i++ { + m := &msg.Msg{ + HandleType: msg.HANDLE_MOD_PLAYER_LOGIN, + Extra: 0, + } + game.MessageHandle(m) + } +} + func BenchmarkLog(b *testing.B) { runtime.GOMAXPROCS(2) t := game.G_getGameLogic() diff --git a/src/server/game/message_mgr.go b/src/server/game/message_mgr.go index b126dab7..98dfedc6 100644 --- a/src/server/game/message_mgr.go +++ b/src/server/game/message_mgr.go @@ -76,7 +76,7 @@ func (m *MessageMgr) MessageMgrInit() { m.handler = make(map[int]MessageHandlerFunc) m.middlewares = []MessageMiddleware{} // 初始化 Worker Pool (10个worker, 1000个队列大小) - m.workerPool = NewWorkerPool(10, 1000) + m.workerPool = NewWorkerPool(50, 10000) // 注册默认中间件 m.Use(RecoveryMiddleware()) m.Use(LoggingMiddleware()) @@ -187,9 +187,9 @@ func ClusterSyncHandler(data *msg.Msg) (interface{}, error) { // 发送暂存区消息 messageMgrData := getMessageData() messageMgrData.mu.Lock() - defer messageMgrData.mu.Unlock() TempMessageList := messageMgrData.MessageList messageMgrData.MessageList = make(map[int64]*MessageList) + messageMgrData.mu.Unlock() for _, Message := range TempMessageList { for _, msgItem := range Message.Messages { SendMsgToCenterAsync(msgItem) @@ -202,13 +202,17 @@ func PlayerLoginHandler(data *msg.Msg) (interface{}, error) { // 关闭 Worker Pool messageMgrData := getMessageData() messageMgrData.mu.Lock() - defer messageMgrData.mu.Unlock() messageMgrData.PlayerList[int64(data.From)] = data.Extra.(int) if _, ok := messageMgrData.MessageList[int64(data.From)]; !ok { messageMgrData.MessageList[int64(data.From)] = &MessageList{ Messages: []*msg.Msg{}, } } + messageMgrData.mu.Unlock() + + // 对玩家消息列表加锁 + messageMgrData.MessageList[int64(data.From)].mu.Lock() + defer messageMgrData.MessageList[int64(data.From)].mu.Unlock() log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int)) node := data.Extra.(int) messageMgrData.PlayerList[int64(data.From)] = node @@ -225,8 +229,6 @@ func PlayerLoginHandler(data *msg.Msg) (interface{}, error) { func PlayerLogoutHandler(data *msg.Msg) (interface{}, error) { messageMgrData := getMessageData() - messageMgrData.mu.Lock() - defer messageMgrData.mu.Unlock() delete(messageMgrData.PlayerList, int64(data.From)) log.Debug("[Middleware] Player logout success player id: %v", data.From) return nil, nil @@ -279,8 +281,6 @@ func PlayerMsgHandler(data *msg.Msg) (interface{}, error) { if p == nil || p.stop { return nil, nil } - p.lock.Lock() - defer p.lock.Unlock() p.Send(data.Clone()) // 处理完后发送消费消息 if data.HandleType == msg.HANDLE_MOD_PLAYER_MSG { @@ -385,7 +385,7 @@ func NewWorkerPool(workers, maxQueue int) *WorkerPool { pool := &WorkerPool{ workers: 0, minWorkers: workers, - maxWorkers: workers * 4, + maxWorkers: workers * 40, taskQueue: make(chan *MessageTask, maxQueue), ctx: ctx, cancel: cancel, @@ -450,31 +450,40 @@ func (p *WorkerPool) worker(ctx context.Context, id int) { // 提交任务 func (p *WorkerPool) Submit(task *MessageTask) error { - select { - case <-p.ctx.Done(): - return fmt.Errorf("worker pool is closed") - case p.taskQueue <- task: - // 如果队列接近满时,尝试扩容少量 worker - queued := len(p.taskQueue) - capQ := cap(p.taskQueue) - if capQ > 0 && queued > capQ*3/4 { - go func() { - p.mu.Lock() - deficit := p.maxWorkers - p.workers - p.mu.Unlock() - if deficit > 0 { - // 最多扩容 2 个以避免剧烈波动 - add := 2 - if deficit < add { - add = deficit + // 当队列已满时,等待并重试入队,直到成功或 pool 关闭 + for { + select { + case <-p.ctx.Done(): + return fmt.Errorf("worker pool is closed") + case p.taskQueue <- task: + // 如果队列接近满时,尝试扩容少量 worker + queued := len(p.taskQueue) + capQ := cap(p.taskQueue) + if capQ > 0 && queued > capQ*3/4 { + go func() { + p.mu.Lock() + deficit := p.maxWorkers - p.workers + p.mu.Unlock() + if deficit > 0 { + // 最多扩容 2 个以避免剧烈波动 + add := 2 + if deficit < add { + add = deficit + } + p.ScaleUp(add) } - p.ScaleUp(add) - } - }() + }() + } + return nil + default: + // 队列已满,短暂等待后重试 + select { + case <-p.ctx.Done(): + return fmt.Errorf("worker pool is closed") + case <-time.After(50 * time.Millisecond): + // 重试 + } } - return nil - default: - return fmt.Errorf("task queue is full") } }