消息服务优化

This commit is contained in:
hahwu 2025-12-25 10:57:20 +08:00
parent 9ec87776f0
commit 0f4cae323e
2 changed files with 60 additions and 31 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt"
"runtime"
"server/game"
"server/game/mod/msg"
"testing"
)
@ -19,6 +20,25 @@ import (
// }
// }
/*
*
cluster 消息处理基准测试
36716 34961 ns/op 1690 B/op 38 allocs/op
*/
func BenchmarkClusterMsg(b *testing.B) {
game.ClusterMgrInit()
runtime.GOMAXPROCS(8)
game.G_getGameLogic()
for i := 0; i < b.N; i++ {
m := &msg.Msg{
HandleType: msg.HANDLE_MOD_PLAYER_LOGIN,
Extra: 0,
}
game.MessageHandle(m)
}
}
func BenchmarkLog(b *testing.B) {
runtime.GOMAXPROCS(2)
t := game.G_getGameLogic()

View File

@ -76,7 +76,7 @@ func (m *MessageMgr) MessageMgrInit() {
m.handler = make(map[int]MessageHandlerFunc)
m.middlewares = []MessageMiddleware{}
// 初始化 Worker Pool (10个worker, 1000个队列大小)
m.workerPool = NewWorkerPool(10, 1000)
m.workerPool = NewWorkerPool(50, 10000)
// 注册默认中间件
m.Use(RecoveryMiddleware())
m.Use(LoggingMiddleware())
@ -187,9 +187,9 @@ func ClusterSyncHandler(data *msg.Msg) (interface{}, error) {
// 发送暂存区消息
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
TempMessageList := messageMgrData.MessageList
messageMgrData.MessageList = make(map[int64]*MessageList)
messageMgrData.mu.Unlock()
for _, Message := range TempMessageList {
for _, msgItem := range Message.Messages {
SendMsgToCenterAsync(msgItem)
@ -202,13 +202,17 @@ func PlayerLoginHandler(data *msg.Msg) (interface{}, error) {
// 关闭 Worker Pool
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
messageMgrData.PlayerList[int64(data.From)] = data.Extra.(int)
if _, ok := messageMgrData.MessageList[int64(data.From)]; !ok {
messageMgrData.MessageList[int64(data.From)] = &MessageList{
Messages: []*msg.Msg{},
}
}
messageMgrData.mu.Unlock()
// 对玩家消息列表加锁
messageMgrData.MessageList[int64(data.From)].mu.Lock()
defer messageMgrData.MessageList[int64(data.From)].mu.Unlock()
log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int))
node := data.Extra.(int)
messageMgrData.PlayerList[int64(data.From)] = node
@ -225,8 +229,6 @@ func PlayerLoginHandler(data *msg.Msg) (interface{}, error) {
func PlayerLogoutHandler(data *msg.Msg) (interface{}, error) {
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
delete(messageMgrData.PlayerList, int64(data.From))
log.Debug("[Middleware] Player logout success player id: %v", data.From)
return nil, nil
@ -279,8 +281,6 @@ func PlayerMsgHandler(data *msg.Msg) (interface{}, error) {
if p == nil || p.stop {
return nil, nil
}
p.lock.Lock()
defer p.lock.Unlock()
p.Send(data.Clone())
// 处理完后发送消费消息
if data.HandleType == msg.HANDLE_MOD_PLAYER_MSG {
@ -385,7 +385,7 @@ func NewWorkerPool(workers, maxQueue int) *WorkerPool {
pool := &WorkerPool{
workers: 0,
minWorkers: workers,
maxWorkers: workers * 4,
maxWorkers: workers * 40,
taskQueue: make(chan *MessageTask, maxQueue),
ctx: ctx,
cancel: cancel,
@ -450,6 +450,8 @@ func (p *WorkerPool) worker(ctx context.Context, id int) {
// 提交任务
func (p *WorkerPool) Submit(task *MessageTask) error {
// 当队列已满时,等待并重试入队,直到成功或 pool 关闭
for {
select {
case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed")
@ -474,7 +476,14 @@ func (p *WorkerPool) Submit(task *MessageTask) error {
}
return nil
default:
return fmt.Errorf("task queue is full")
// 队列已满,短暂等待后重试
select {
case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed")
case <-time.After(50 * time.Millisecond):
// 重试
}
}
}
}