diff --git a/src/server/game/message_mgr.go b/src/server/game/message_mgr.go index bd11da46..350a6d37 100644 --- a/src/server/game/message_mgr.go +++ b/src/server/game/message_mgr.go @@ -1,16 +1,25 @@ package game import ( + "context" "fmt" + "runtime/debug" mergeCluster "server/cluster" "server/conf" "server/game/mod/msg" GoUtil "server/game_util" "server/pkg/github.com/name5566/leaf/log" + "sync" + "time" ) +// 中间件函数类型 +type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc + type MessageMgr struct { *ServerMod + middlewares []MessageMiddleware + workerPool *WorkerPool } type MessageData struct { @@ -18,13 +27,57 @@ type MessageData struct { PlayerList map[int64]int } +// Worker Pool 结构 +type WorkerPool struct { + workers int + taskQueue chan *MessageTask + wg sync.WaitGroup + ctx context.Context + cancel context.CancelFunc + maxQueue int +} + +// 消息任务 +type MessageTask struct { + Msg *msg.Msg + Handler MessageHandlerFunc + Result chan *TaskResult +} + +// 任务结果 +type TaskResult struct { + Data interface{} + Error error +} + func (m *MessageMgr) MessageMgrInit() { m.key = MESSAGE_MGR_KEY m.data = &MessageData{} + m.middlewares = []MessageMiddleware{} + // 初始化 Worker Pool (10个worker, 1000个队列大小) + m.workerPool = NewWorkerPool(10, 1000) + // 注册默认中间件 + m.Use(RecoveryMiddleware()) + m.Use(LoggingMiddleware()) + m.Use(TimeoutMiddleware(5 * time.Second)) // 注册处理函数 m.init() } +// 添加中间件 +func (m *MessageMgr) Use(middleware MessageMiddleware) { + m.middlewares = append(m.middlewares, middleware) +} + +// 应用所有中间件到处理函数 +func (m *MessageMgr) applyMiddlewares(handler MessageHandlerFunc) MessageHandlerFunc { + // 从后往前应用中间件 + for i := len(m.middlewares) - 1; i >= 0; i-- { + handler = m.middlewares[i](handler) + } + return handler +} + type MessageHandlerFunc func(message *msg.Msg) (interface{}, error) func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFunc) { @@ -52,11 +105,220 @@ func SendMessage(m1 *msg.Msg) error { return nil } +// 异步处理消息 (多线程版本) +func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error { + if fun, ok := m.handler[message.Type]; ok { + // 应用中间件 + handlerWithMiddleware := m.applyMiddlewares(fun.(MessageHandlerFunc)) + + // 创建任务 + task := &MessageTask{ + Msg: message, + Handler: handlerWithMiddleware, + Result: make(chan *TaskResult, 1), + } + + // 提交到 Worker Pool + if err := m.workerPool.Submit(task); err != nil { + log.Error("Failed to submit message task: %v", err) + return err + } + + // 可以选择等待结果或直接返回 + go func() { + result := <-task.Result + if result.Error != nil { + log.Error("Message handle error: %v", result.Error) + } + }() + + return nil + } + log.Error("server mod key:%s handle not exist handle type:%d", m.key, message.Type) + return fmt.Errorf("server mod handler err") +} + +// 兼容旧版本的函数 func MessageHandle(m *msg.Msg) error { log.Debug("RecvMessage m %v", m) + // 这里可以调用 MessageMgr 的处理方法 + // G_GameLogicPtr.MessageMgr.MessageHandleAsync(m) return nil } +// ==================== Worker Pool 实现 ==================== + +// 创建 Worker Pool +func NewWorkerPool(workers, maxQueue int) *WorkerPool { + ctx, cancel := context.WithCancel(context.Background()) + pool := &WorkerPool{ + workers: workers, + taskQueue: make(chan *MessageTask, maxQueue), + ctx: ctx, + cancel: cancel, + maxQueue: maxQueue, + } + pool.start() + return pool +} + +// 启动 Worker Pool +func (p *WorkerPool) start() { + for i := 0; i < p.workers; i++ { + p.wg.Add(1) + go p.worker(i) + } +} + +// Worker 工作函数 +func (p *WorkerPool) worker(id int) { + defer p.wg.Done() + log.Debug("Worker %d started", id) + + for { + select { + case <-p.ctx.Done(): + log.Debug("Worker %d stopped", id) + return + case task, ok := <-p.taskQueue: + if !ok { + log.Debug("Worker %d: task queue closed", id) + return + } + // 执行任务 + result, err := task.Handler(task.Msg) + // 发送结果 + task.Result <- &TaskResult{ + Data: result, + Error: err, + } + close(task.Result) + } + } +} + +// 提交任务 +func (p *WorkerPool) Submit(task *MessageTask) error { + select { + case <-p.ctx.Done(): + return fmt.Errorf("worker pool is closed") + case p.taskQueue <- task: + return nil + default: + return fmt.Errorf("task queue is full") + } +} + +// 关闭 Worker Pool +func (p *WorkerPool) Shutdown() { + log.Debug("Shutting down worker pool...") + p.cancel() + close(p.taskQueue) + p.wg.Wait() + log.Debug("Worker pool shut down complete") +} + +// ==================== 中间件实现 ==================== + +// 日志中间件 +func LoggingMiddleware() MessageMiddleware { + return func(next MessageHandlerFunc) MessageHandlerFunc { + return func(message *msg.Msg) (interface{}, error) { + start := time.Now() + log.Debug("[Middleware] Processing message type: %d, time: %v", message.Type, start) + + result, err := next(message) + + duration := time.Since(start) + if err != nil { + log.Error("[Middleware] Message type: %d failed, duration: %v, error: %v", message.Type, duration, err) + } else { + log.Debug("[Middleware] Message type: %d success, duration: %v", message.Type, duration) + } + + return result, err + } + } +} + +// 恢复 Panic 中间件 +func RecoveryMiddleware() MessageMiddleware { + return func(next MessageHandlerFunc) MessageHandlerFunc { + return func(message *msg.Msg) (result interface{}, err error) { + defer func() { + if r := recover(); r != nil { + log.Error("[Middleware] Panic recovered: %v\nStack: %s", r, debug.Stack()) + err = fmt.Errorf("panic recovered: %v", r) + } + }() + return next(message) + } + } +} + +// 超时中间件 +func TimeoutMiddleware(timeout time.Duration) MessageMiddleware { + return func(next MessageHandlerFunc) MessageHandlerFunc { + return func(message *msg.Msg) (interface{}, error) { + resultChan := make(chan *TaskResult, 1) + + go func() { + result, err := next(message) + resultChan <- &TaskResult{Data: result, Error: err} + }() + + select { + case result := <-resultChan: + return result.Data, result.Error + case <-time.After(timeout): + log.Error("[Middleware] Message type: %d timeout after %v", message.Type, timeout) + return nil, fmt.Errorf("message handler timeout") + } + } + } +} + +// 重试中间件 +func RetryMiddleware(maxRetries int) MessageMiddleware { + return func(next MessageHandlerFunc) MessageHandlerFunc { + return func(message *msg.Msg) (interface{}, error) { + var result interface{} + var err error + + for i := 0; i <= maxRetries; i++ { + result, err = next(message) + if err == nil { + return result, nil + } + + if i < maxRetries { + log.Debug("[Middleware] Retry %d/%d for message type: %d, error: %v", i+1, maxRetries, message.Type, err) + time.Sleep(time.Millisecond * 100 * time.Duration(i+1)) + } + } + + return result, fmt.Errorf("failed after %d retries: %w", maxRetries, err) + } + } +} + +// 验证中间件 +func ValidationMiddleware() MessageMiddleware { + return func(next MessageHandlerFunc) MessageHandlerFunc { + return func(message *msg.Msg) (interface{}, error) { + // 添加消息验证逻辑 + if message == nil { + return nil, fmt.Errorf("message is nil") + } + if message.Type <= 0 { + return nil, fmt.Errorf("invalid message type: %d", message.Type) + } + + return next(message) + } + } +} + func SendMsgToCenter(m *msg.Msg) error { return mergeCluster.SendServerMsg(m, conf.Server.CenterNode) } @@ -68,3 +330,17 @@ func CallMsgToCenter(m *msg.Msg) (interface{}, error) { func SendMsgToNode(m *msg.Msg, node int) error { return mergeCluster.SendServerMsg(m, node) } + +func SendPlayerMsg(m *msg.Msg) error { + clone := m.Clone() + clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m)) + clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG + return mergeCluster.SendServerMsg(m, conf.Server.CenterNode) +} + +func CallPlayerMsg(m *msg.Msg) (interface{}, error) { + clone := m.Clone() + clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m)) + clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG + return mergeCluster.CallServerMsg(m, conf.Server.CenterNode) +} diff --git a/src/server/game/mod/msg/Msg.go b/src/server/game/mod/msg/Msg.go index c8592553..8e1e4ddb 100644 --- a/src/server/game/mod/msg/Msg.go +++ b/src/server/game/mod/msg/Msg.go @@ -19,6 +19,10 @@ type Msg struct { var MSG_ZERO_UPDATE = &Msg{Type: SERVER_ZERO_UPDATE} var MSG_NOON_UPDATE = &Msg{Type: SERVER_NOON_UPDATE} +const ( + HANDLE_TYPE_PLAYER_MSG = 20001 // 玩家消息 +) + const ( //好友操作 HANDLE_TYPE_APPLY = iota //申请好友 @@ -114,6 +118,8 @@ const ( HANDLE_TYPE_SET_CATNIP_PARTNER // 设置猫薄荷伙伴 HANDLE_TYPE_CATNIP_SEND_EMOJI // 发送猫薄荷表情 HANDLE_TYPE_CHAMPSHIP_MY_RANK // 锦标赛我的排名 + + HANDLE_TYPE_LOGIN // 玩家登录处理 ) const (