This commit is contained in:
hahwu 2025-12-18 11:48:27 +08:00
parent 420f0a2e2c
commit 54dba4b18a
2 changed files with 282 additions and 0 deletions

View File

@ -1,16 +1,25 @@
package game package game
import ( import (
"context"
"fmt" "fmt"
"runtime/debug"
mergeCluster "server/cluster" mergeCluster "server/cluster"
"server/conf" "server/conf"
"server/game/mod/msg" "server/game/mod/msg"
GoUtil "server/game_util" GoUtil "server/game_util"
"server/pkg/github.com/name5566/leaf/log" "server/pkg/github.com/name5566/leaf/log"
"sync"
"time"
) )
// 中间件函数类型
type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc
type MessageMgr struct { type MessageMgr struct {
*ServerMod *ServerMod
middlewares []MessageMiddleware
workerPool *WorkerPool
} }
type MessageData struct { type MessageData struct {
@ -18,13 +27,57 @@ type MessageData struct {
PlayerList map[int64]int PlayerList map[int64]int
} }
// Worker Pool 结构
type WorkerPool struct {
workers int
taskQueue chan *MessageTask
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
maxQueue int
}
// 消息任务
type MessageTask struct {
Msg *msg.Msg
Handler MessageHandlerFunc
Result chan *TaskResult
}
// 任务结果
type TaskResult struct {
Data interface{}
Error error
}
func (m *MessageMgr) MessageMgrInit() { func (m *MessageMgr) MessageMgrInit() {
m.key = MESSAGE_MGR_KEY m.key = MESSAGE_MGR_KEY
m.data = &MessageData{} m.data = &MessageData{}
m.middlewares = []MessageMiddleware{}
// 初始化 Worker Pool (10个worker, 1000个队列大小)
m.workerPool = NewWorkerPool(10, 1000)
// 注册默认中间件
m.Use(RecoveryMiddleware())
m.Use(LoggingMiddleware())
m.Use(TimeoutMiddleware(5 * time.Second))
// 注册处理函数 // 注册处理函数
m.init() m.init()
} }
// 添加中间件
func (m *MessageMgr) Use(middleware MessageMiddleware) {
m.middlewares = append(m.middlewares, middleware)
}
// 应用所有中间件到处理函数
func (m *MessageMgr) applyMiddlewares(handler MessageHandlerFunc) MessageHandlerFunc {
// 从后往前应用中间件
for i := len(m.middlewares) - 1; i >= 0; i-- {
handler = m.middlewares[i](handler)
}
return handler
}
type MessageHandlerFunc func(message *msg.Msg) (interface{}, error) type MessageHandlerFunc func(message *msg.Msg) (interface{}, error)
func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFunc) { func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFunc) {
@ -52,11 +105,220 @@ func SendMessage(m1 *msg.Msg) error {
return nil return nil
} }
// 异步处理消息 (多线程版本)
func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error {
if fun, ok := m.handler[message.Type]; ok {
// 应用中间件
handlerWithMiddleware := m.applyMiddlewares(fun.(MessageHandlerFunc))
// 创建任务
task := &MessageTask{
Msg: message,
Handler: handlerWithMiddleware,
Result: make(chan *TaskResult, 1),
}
// 提交到 Worker Pool
if err := m.workerPool.Submit(task); err != nil {
log.Error("Failed to submit message task: %v", err)
return err
}
// 可以选择等待结果或直接返回
go func() {
result := <-task.Result
if result.Error != nil {
log.Error("Message handle error: %v", result.Error)
}
}()
return nil
}
log.Error("server mod key:%s handle not exist handle type:%d", m.key, message.Type)
return fmt.Errorf("server mod handler err")
}
// 兼容旧版本的函数
func MessageHandle(m *msg.Msg) error { func MessageHandle(m *msg.Msg) error {
log.Debug("RecvMessage m %v", m) log.Debug("RecvMessage m %v", m)
// 这里可以调用 MessageMgr 的处理方法
// G_GameLogicPtr.MessageMgr.MessageHandleAsync(m)
return nil return nil
} }
// ==================== Worker Pool 实现 ====================
// 创建 Worker Pool
func NewWorkerPool(workers, maxQueue int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
pool := &WorkerPool{
workers: workers,
taskQueue: make(chan *MessageTask, maxQueue),
ctx: ctx,
cancel: cancel,
maxQueue: maxQueue,
}
pool.start()
return pool
}
// 启动 Worker Pool
func (p *WorkerPool) start() {
for i := 0; i < p.workers; i++ {
p.wg.Add(1)
go p.worker(i)
}
}
// Worker 工作函数
func (p *WorkerPool) worker(id int) {
defer p.wg.Done()
log.Debug("Worker %d started", id)
for {
select {
case <-p.ctx.Done():
log.Debug("Worker %d stopped", id)
return
case task, ok := <-p.taskQueue:
if !ok {
log.Debug("Worker %d: task queue closed", id)
return
}
// 执行任务
result, err := task.Handler(task.Msg)
// 发送结果
task.Result <- &TaskResult{
Data: result,
Error: err,
}
close(task.Result)
}
}
}
// 提交任务
func (p *WorkerPool) Submit(task *MessageTask) error {
select {
case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed")
case p.taskQueue <- task:
return nil
default:
return fmt.Errorf("task queue is full")
}
}
// 关闭 Worker Pool
func (p *WorkerPool) Shutdown() {
log.Debug("Shutting down worker pool...")
p.cancel()
close(p.taskQueue)
p.wg.Wait()
log.Debug("Worker pool shut down complete")
}
// ==================== 中间件实现 ====================
// 日志中间件
func LoggingMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
start := time.Now()
log.Debug("[Middleware] Processing message type: %d, time: %v", message.Type, start)
result, err := next(message)
duration := time.Since(start)
if err != nil {
log.Error("[Middleware] Message type: %d failed, duration: %v, error: %v", message.Type, duration, err)
} else {
log.Debug("[Middleware] Message type: %d success, duration: %v", message.Type, duration)
}
return result, err
}
}
}
// 恢复 Panic 中间件
func RecoveryMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (result interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Error("[Middleware] Panic recovered: %v\nStack: %s", r, debug.Stack())
err = fmt.Errorf("panic recovered: %v", r)
}
}()
return next(message)
}
}
}
// 超时中间件
func TimeoutMiddleware(timeout time.Duration) MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
resultChan := make(chan *TaskResult, 1)
go func() {
result, err := next(message)
resultChan <- &TaskResult{Data: result, Error: err}
}()
select {
case result := <-resultChan:
return result.Data, result.Error
case <-time.After(timeout):
log.Error("[Middleware] Message type: %d timeout after %v", message.Type, timeout)
return nil, fmt.Errorf("message handler timeout")
}
}
}
}
// 重试中间件
func RetryMiddleware(maxRetries int) MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
var result interface{}
var err error
for i := 0; i <= maxRetries; i++ {
result, err = next(message)
if err == nil {
return result, nil
}
if i < maxRetries {
log.Debug("[Middleware] Retry %d/%d for message type: %d, error: %v", i+1, maxRetries, message.Type, err)
time.Sleep(time.Millisecond * 100 * time.Duration(i+1))
}
}
return result, fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}
}
}
// 验证中间件
func ValidationMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
// 添加消息验证逻辑
if message == nil {
return nil, fmt.Errorf("message is nil")
}
if message.Type <= 0 {
return nil, fmt.Errorf("invalid message type: %d", message.Type)
}
return next(message)
}
}
}
func SendMsgToCenter(m *msg.Msg) error { func SendMsgToCenter(m *msg.Msg) error {
return mergeCluster.SendServerMsg(m, conf.Server.CenterNode) return mergeCluster.SendServerMsg(m, conf.Server.CenterNode)
} }
@ -68,3 +330,17 @@ func CallMsgToCenter(m *msg.Msg) (interface{}, error) {
func SendMsgToNode(m *msg.Msg, node int) error { func SendMsgToNode(m *msg.Msg, node int) error {
return mergeCluster.SendServerMsg(m, node) return mergeCluster.SendServerMsg(m, node)
} }
func SendPlayerMsg(m *msg.Msg) error {
clone := m.Clone()
clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m))
clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG
return mergeCluster.SendServerMsg(m, conf.Server.CenterNode)
}
func CallPlayerMsg(m *msg.Msg) (interface{}, error) {
clone := m.Clone()
clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m))
clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG
return mergeCluster.CallServerMsg(m, conf.Server.CenterNode)
}

View File

@ -19,6 +19,10 @@ type Msg struct {
var MSG_ZERO_UPDATE = &Msg{Type: SERVER_ZERO_UPDATE} var MSG_ZERO_UPDATE = &Msg{Type: SERVER_ZERO_UPDATE}
var MSG_NOON_UPDATE = &Msg{Type: SERVER_NOON_UPDATE} var MSG_NOON_UPDATE = &Msg{Type: SERVER_NOON_UPDATE}
const (
HANDLE_TYPE_PLAYER_MSG = 20001 // 玩家消息
)
const ( const (
//好友操作 //好友操作
HANDLE_TYPE_APPLY = iota //申请好友 HANDLE_TYPE_APPLY = iota //申请好友
@ -114,6 +118,8 @@ const (
HANDLE_TYPE_SET_CATNIP_PARTNER // 设置猫薄荷伙伴 HANDLE_TYPE_SET_CATNIP_PARTNER // 设置猫薄荷伙伴
HANDLE_TYPE_CATNIP_SEND_EMOJI // 发送猫薄荷表情 HANDLE_TYPE_CATNIP_SEND_EMOJI // 发送猫薄荷表情
HANDLE_TYPE_CHAMPSHIP_MY_RANK // 锦标赛我的排名 HANDLE_TYPE_CHAMPSHIP_MY_RANK // 锦标赛我的排名
HANDLE_TYPE_LOGIN // 玩家登录处理
) )
const ( const (