pet_home_server/src/server/game/message_mgr.go
2025-12-31 14:47:50 +08:00

780 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package game
import (
"context"
"encoding/gob"
"fmt"
"runtime/debug"
mergeCluster "server/cluster"
"server/conf"
"server/game/mod/msg"
GoUtil "server/game_util"
"server/pkg/github.com/name5566/leaf/log"
"sync"
"time"
)
var id = 1
// 中间件函数类型
type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc
var save_msg_type = []int{
msg.HANDLE_MOD_PLAYER_MSG,
msg.HANDLE_MDO_CHAMPSHIP_INRANK,
msg.HANDLE_MOD_USER_VAR_SET,
}
type MessageMgr struct {
*ServerMod
middlewares []MessageMiddleware
workerPool *WorkerPool
handler map[int]MessageHandlerFunc
}
type MessageData struct {
MessageList map[int64]*MessageList
PlayerList map[int64]int
mu sync.Mutex
}
type MessageList struct {
Messages []*msg.Msg
mu sync.Mutex
}
// Worker Pool 结构
type WorkerPool struct {
workers int
minWorkers int
maxWorkers int
taskQueue chan *MessageTask
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
maxQueue int
mu sync.Mutex
workerCancels []context.CancelFunc
monitorTick *time.Ticker
}
// 消息任务
type MessageTask struct {
Msg *msg.Msg
Handler MessageHandlerFunc
Result chan *TaskResult
id int
}
// 任务结果
type TaskResult struct {
Data interface{}
Error error
}
func (m *MessageMgr) MessageMgrInit() {
m.key = MESSAGE_MGR_KEY + fmt.Sprintf("_%d", conf.Server.ServerID)
m.data = &MessageData{
MessageList: make(map[int64]*MessageList),
PlayerList: make(map[int64]int),
}
gob.Register(msg.VarData{})
gob.Register(GameResult{})
// 注册处理函数
m.init()
m.handler = make(map[int]MessageHandlerFunc)
m.middlewares = []MessageMiddleware{}
// 初始化 Worker Pool (10个worker, 1000个队列大小)
m.workerPool = NewWorkerPool(50, 10000)
// 注册默认中间件
m.Use(RecoveryMiddleware())
m.Use(LoggingMiddleware())
m.Use(TimeoutMiddleware(5 * time.Second))
m.NodeRegister()
m.CenterRegister()
}
// 注册处理器
func (s *MessageMgr) RegisterHandler(HandlerType int, fun MessageHandlerFunc) {
s.handler[HandlerType] = fun
}
func (m *MessageMgr) NodeRegister() {
if conf.Server.ServerType == "node" {
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(PlayerMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_REPLY_PLAYER_MSG, MessageHandlerFunc(PlayerReplyMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_CLUSTER_SYNC, MessageHandlerFunc(ClusterSyncHandler))
}
}
func (m *MessageMgr) CenterRegister() {
if conf.Server.ServerType == "center" {
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_LOGIN, MessageHandlerFunc(PlayerLoginHandler))
m.RegisterHandler(msg.HANDLE_MDO_PLAYER_LOGOUT, MessageHandlerFunc(PlayerLogoutHandler))
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(CenterPlayerMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_COMSUME_MSG, MessageHandlerFunc(ComsumerMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_VAR_SET, MessageHandlerFunc(SetVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_VAR_GET, MessageHandlerFunc(GetVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_SET, MessageHandlerFunc(SetUserVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_GET, MessageHandlerFunc(GetUserVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_CATNIP_PARTNER, MessageHandlerFunc(CatnipPartnerHandler))
m.RegisterHandler(msg.HANDLE_MDO_CHAMPSHIP_INRANK, MessageHandlerFunc(ChampshipInRankHandler))
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_INFO, MessageHandlerFunc(ChampshipRankInfoHandler))
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_LIST, MessageHandlerFunc(ChampshipRankListHandler))
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_PRE_RANK, MessageHandlerFunc(ChampshipRankPreHandler))
}
}
func getMessageData() *MessageData {
return G_GameLogicPtr.MessageMgr.data.(*MessageData)
}
// ----------------------------------- 处理函数实现 ---------------------------
func ChampshipRankPreHandler(data *msg.Msg) (interface{}, error) {
PlayerId := data.From
PreRankMsg := G_GameLogicPtr.ChampshipMgr.GetPreRankMsg(PlayerId)
ReplyPlayerMsgASync(data, PreRankMsg)
return nil, nil
}
func ChampshipRankListHandler(data *msg.Msg) (interface{}, error) {
PlayerId := data.From
RankMsg := G_GameLogicPtr.ChampshipMgr.GetRankMsg(PlayerId)
ReplyPlayerMsgASync(data, RankMsg)
return nil, nil
}
func ChampshipRankInfoHandler(data *msg.Msg) (interface{}, error) {
PlayerId := data.From
MyRank := G_GameLogicPtr.ChampshipMgr.getMyRank(PlayerId)
MyPreRank := G_GameLogicPtr.ChampshipMgr.getLastMyRank(PlayerId)
ReplyPlayerMsgASync(data, []int{MyRank, MyPreRank})
return nil, nil
}
func NotifyAllPlayerMsg(m *msg.Msg) {
messageMgrData := getMessageData()
for PlayerId, node := range messageMgrData.PlayerList {
m.To = int(PlayerId)
SendMsgToNodeAsync(m, node)
}
}
func ChampshipInRankHandler(data *msg.Msg) (interface{}, error) {
G_GameLogicPtr.ChampshipMgr.inRank(data)
return nil, nil
}
func CatnipPartnerHandler(data *msg.Msg) (interface{}, error) {
m, ok := data.Extra.(*CatnipPartner)
if !ok {
return nil, fmt.Errorf("invalid catnip partner data")
}
return G_GameLogicPtr.VarMgr.HandleCatnipPartner(m.Uid, m.Partner, m.GameId, m.EndTime)
}
func ReplyPlayerMsgASync(m *msg.Msg, reply interface{}) (interface{}, error) {
clone := m.Reply(reply)
messageMgrData := getMessageData()
if node, ok := messageMgrData.PlayerList[int64(m.From)]; ok {
SendMsgToNodeAsync(clone, node)
}
return nil, nil
}
// 节点连接时,同步消息
func ClusterSyncHandler(data *msg.Msg) (interface{}, error) {
// 遍历所有玩家,发送登录消息
G_GameLogicPtr.M_Players.Range(func(k, v interface{}) bool {
SendMsgToCenterAsync(&msg.Msg{
From: int(v.(*Player).M_DwUin),
HandleType: msg.HANDLE_MOD_PLAYER_LOGIN,
Extra: conf.Server.ServerID,
})
return true
})
// 发送暂存区消息
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
TempMessageList := messageMgrData.MessageList
messageMgrData.MessageList = make(map[int64]*MessageList)
defer messageMgrData.mu.Unlock()
log.Debug("[Middleware] Cluster sync send temp message len: %d", len(TempMessageList))
for _, Message := range TempMessageList {
for _, msgItem := range Message.Messages {
SendMsgToCenterAsync(msgItem)
}
}
return nil, nil
}
func PlayerLoginHandler(data *msg.Msg) (interface{}, error) {
// 关闭 Worker Pool
node := data.Extra.(int)
messageMgrData := getMessageData()
messageMgrData.PlayerList[int64(data.From)] = node
log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int))
// 对玩家消息列表加锁
messages := getMessge(int64(data.From))
messages.mu.Lock()
defer messages.mu.Unlock()
// 发送离线消息
len := len(messages.Messages)
for _, message := range messages.Messages {
SendMsgToNodeAsync(message, node)
}
log.Debug("[Middleware] Player sync logout message player id: %v, len: %d", data.From, len)
return nil, nil
}
func PlayerLogoutHandler(data *msg.Msg) (interface{}, error) {
messageMgrData := getMessageData()
delete(messageMgrData.PlayerList, int64(data.From))
log.Debug("[Middleware] Player logout success player id: %v", data.From)
return nil, nil
}
func ComsumerMsgHandler(data *msg.Msg) (interface{}, error) {
messages := getMessge(int64(data.From))
messages.mu.Lock()
defer messages.mu.Unlock()
for i, msgItem := range messages.Messages {
if msgItem.UniKey == data.UniKey {
// 删除消息
messages.Messages = append(messages.Messages[:i], messages.Messages[i+1:]...)
log.Debug("[Middleware] Comsume message success type: %d, player id: %v", msgItem.Type, msgItem.From)
break
}
}
return nil, nil
}
func CenterPlayerMsgHandler(data *msg.Msg) (interface{}, error) {
PlayerId := int64(data.To)
messageMgrData := getMessageData()
// 遍历消息列表,发送消息给在线玩家
messages := getMessge(PlayerId)
messages.mu.Lock()
defer messages.mu.Unlock()
messages.Messages = append(messages.Messages, data)
if node, ok := messageMgrData.PlayerList[int64(PlayerId)]; ok {
SendMsgToNodeAsync(data, node)
}
return nil, nil
}
func PlayerMsgHandler(data *msg.Msg) (interface{}, error) {
p := G_GameLogicPtr.GetPlayer(int64(data.To))
// 不在线 不处理
if p == nil || p.stop {
return nil, nil
}
p.Send(data.Clone())
// 处理完后发送消费消息
if data.HandleType == msg.HANDLE_MOD_PLAYER_MSG {
data.HandleType = msg.HANDLE_MOD_COMSUME_MSG
SendMsgToCenterAsync(data)
}
return nil, nil
}
func PlayerReplyMsgHandler(data *msg.Msg) (interface{}, error) {
// 先处理同步回调
if data.UniKey != "" {
if chanel, ok := mergeCluster.CallbackChan[data.UniKey]; ok {
log.Debug("reply message ")
chanel <- data
}
}
return nil, nil
}
// 添加中间件
func (m *MessageMgr) Use(middleware MessageMiddleware) {
m.middlewares = append(m.middlewares, middleware)
}
// 应用所有中间件到处理函数
func (m *MessageMgr) applyMiddlewares(handler MessageHandlerFunc) MessageHandlerFunc {
// 从后往前应用中间件
for i := len(m.middlewares) - 1; i >= 0; i-- {
handler = m.middlewares[i](handler)
}
return handler
}
type MessageHandlerFunc func(message *msg.Msg) (interface{}, error)
func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFunc) {
m.RegisterHandler(hType, handler)
}
func (m *MessageMgr) Handle(msg *msg.Msg) (interface{}, error) {
if fun, ok := m.handler[msg.Type]; ok {
return fun(msg)
}
log.Error("server mod key:%s handle not exist handle type:%d", m.key, msg.Type)
return nil, fmt.Errorf("server mod handler err")
}
// 异步处理消息 (多线程版本)
func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error {
if message.End != 0 && message.End < GoUtil.Now() {
log.Debug("message had expired type:%d,to:%d", message.Type, message.To)
return nil
}
if fun, ok := m.handler[message.HandleType]; ok {
// 应用中间件
handlerWithMiddleware := m.applyMiddlewares(fun)
id++
// 创建任务
task := &MessageTask{
id: id,
Msg: message,
Handler: handlerWithMiddleware,
Result: make(chan *TaskResult, 1),
}
// 提交到 Worker Pool
if err := m.workerPool.Submit(task); err != nil {
log.Error("Failed to submit message task: %v", err)
return err
}
// 可以选择等待结果或直接返回
go func() {
result := <-task.Result
if result.Error != nil {
log.Error("Message handle error: %v", result.Error)
}
}()
return nil
}
log.Error("server mod key:%s handle not exist handle type:%d", m.key, message.Type)
return fmt.Errorf("server mod handler err")
}
// 兼容旧版本的函数
func MessageHandle(m *msg.Msg) error {
log.Debug("RecvMessage m %v", m)
// 这里可以调用 MessageMgr 的处理方法
G_GameLogicPtr.MessageMgr.MessageHandleAsync(m)
return nil
}
// ==================== Worker Pool 实现 ====================
// 创建 Worker Pool
func NewWorkerPool(workers, maxQueue int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
if workers <= 0 {
workers = 1
}
pool := &WorkerPool{
workers: 0,
minWorkers: workers,
maxWorkers: workers * 40,
taskQueue: make(chan *MessageTask, maxQueue),
ctx: ctx,
cancel: cancel,
maxQueue: maxQueue,
}
pool.start()
// 启动监控协程,负责动态扩缩容
pool.monitorTick = time.NewTicker(3000 * time.Millisecond)
go pool.monitor()
return pool
}
// 启动 Worker Pool
func (p *WorkerPool) start() {
// 启动最小数量的 worker
for i := 0; i < p.minWorkers; i++ {
p.spawnWorker()
}
}
// spawnWorker 启动一个 worker使用独立的 cancelable context
func (p *WorkerPool) spawnWorker() {
p.mu.Lock()
defer p.mu.Unlock()
childCtx, childCancel := context.WithCancel(p.ctx)
p.workerCancels = append(p.workerCancels, childCancel)
id := len(p.workerCancels)
p.wg.Add(1)
p.workers++
go p.worker(childCtx, id)
}
// Worker 工作函数,监听其子上下文以便单独停止
func (p *WorkerPool) worker(ctx context.Context, id int) {
defer p.wg.Done()
log.Debug("Worker %d started", id)
for {
select {
case <-ctx.Done():
log.Debug("Worker %d stopped", id)
return
case <-p.ctx.Done():
log.Debug("Worker %d pool closed", id)
return
case task, ok := <-p.taskQueue:
if !ok {
log.Debug("Worker %d: task queue closed", id)
return
}
// 执行任务
result, err := task.Handler(task.Msg)
// 发送结果
task.Result <- &TaskResult{
Data: result,
Error: err,
}
close(task.Result)
}
}
}
// 提交任务
func (p *WorkerPool) Submit(task *MessageTask) error {
// 当队列已满时,等待并重试入队,直到成功或 pool 关闭
for {
select {
case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed")
case p.taskQueue <- task:
// 如果队列接近满时,尝试扩容少量 worker
queued := len(p.taskQueue)
capQ := cap(p.taskQueue)
if capQ > 0 && queued > capQ*3/4 {
go func() {
p.mu.Lock()
deficit := p.maxWorkers - p.workers
p.mu.Unlock()
if deficit > 0 {
// 最多扩容 2 个以避免剧烈波动
add := 2
if deficit < add {
add = deficit
}
p.ScaleUp(add)
}
}()
}
return nil
default:
// 队列已满,短暂等待后重试
select {
case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed")
case <-time.After(50 * time.Millisecond):
// 重试
}
}
}
}
// 关闭 Worker Pool
func (p *WorkerPool) Shutdown() {
log.Debug("Shutting down worker pool...")
// 停止监控
if p.monitorTick != nil {
p.monitorTick.Stop()
}
// 取消根 context会让所有子 worker 退出
p.cancel()
// 关闭任务通道,释放阻塞的接收者
close(p.taskQueue)
// 等待所有 worker 退出
p.wg.Wait()
log.Debug("Worker pool shut down complete")
}
// ScaleUp 按数量增加 worker
func (p *WorkerPool) ScaleUp(n int) {
if n <= 0 {
return
}
p.mu.Lock()
can := p.maxWorkers - p.workers
if can <= 0 {
p.mu.Unlock()
return
}
if n > can {
n = can
}
p.mu.Unlock()
for i := 0; i < n; i++ {
p.spawnWorker()
}
}
// ScaleDown 按数量减少 worker
func (p *WorkerPool) ScaleDown(n int) {
if n <= 0 {
return
}
p.mu.Lock()
for i := 0; i < n && len(p.workerCancels) > 0 && p.workers > p.minWorkers; i++ {
last := len(p.workerCancels) - 1
cancel := p.workerCancels[last]
// 从 slice 中移除
p.workerCancels = p.workerCancels[:last]
p.workers--
// 调用 cancel 让对应 worker 退出
cancel()
}
p.mu.Unlock()
}
// monitor 周期性检查队列长度并做扩缩容
func (p *WorkerPool) monitor() {
for {
select {
case <-p.ctx.Done():
return
case <-p.monitorTick.C:
queued := len(p.taskQueue)
capQ := cap(p.taskQueue)
if capQ == 0 {
continue
}
// 扩容条件
if queued > capQ*3/4 {
p.mu.Lock()
deficit := p.maxWorkers - p.workers
p.mu.Unlock()
if deficit > 0 {
add := 1
if deficit >= 2 {
add = 2
}
p.ScaleUp(add)
}
}
// 缩容条件
if queued < capQ/4 {
p.mu.Lock()
extra := p.workers - p.minWorkers
p.mu.Unlock()
if extra > 0 {
// 每次缩一个,避免抖动
p.ScaleDown(1)
}
}
}
}
}
// ==================== 中间件实现 ====================
// 日志中间件
func LoggingMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
start := time.Now()
log.Debug("[Middleware] Processing message : %v, time: %v", message, start)
result, err := next(message)
duration := time.Since(start)
if err != nil {
log.Error("[Middleware] Message : %v failed, duration: %v, error: %v", message, duration, err)
} else {
log.Debug("[Middleware] Message : %v success, duration: %v", message, duration)
}
return result, err
}
}
}
// 恢复 Panic 中间件
func RecoveryMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (result interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Error("[Middleware] Panic recovered: %v\nStack: %s", r, debug.Stack())
err = fmt.Errorf("panic recovered: %v", r)
}
}()
return next(message)
}
}
}
// 超时中间件
func TimeoutMiddleware(timeout time.Duration) MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
resultChan := make(chan *TaskResult, 1)
go func() {
result, err := next(message)
resultChan <- &TaskResult{Data: result, Error: err}
}()
select {
case result := <-resultChan:
return result.Data, result.Error
case <-time.After(timeout):
log.Error("[Middleware] Message : %v timeout after %v", message, timeout)
return nil, fmt.Errorf("message handler timeout")
}
}
}
}
// 重试中间件
func RetryMiddleware(maxRetries int) MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
var result interface{}
var err error
for i := 0; i <= maxRetries; i++ {
result, err = next(message)
if err == nil {
return result, nil
}
if i < maxRetries {
log.Debug("[Middleware] Retry %d/%d for message : %v, error: %v", i+1, maxRetries, message, err)
time.Sleep(time.Millisecond * 100 * time.Duration(i+1))
}
}
return result, fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}
}
}
// 验证中间件
func ValidationMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
// 添加消息验证逻辑
if message == nil {
return nil, fmt.Errorf("message is nil")
}
if message.Type <= 0 {
return nil, fmt.Errorf("invalid message type: %d", message.Type)
}
return next(message)
}
}
}
// ----------------------------------- 发送消息相关函数 ---------------------------
func SendMsgToCenterAsync(m *msg.Msg) {
go sendMessageAsync(m, conf.Server.CenterNode)
}
func SendMsgToCenterSync(m *msg.Msg) (*msg.Msg, error) {
return sendMessageSync(m, conf.Server.CenterNode)
}
func SendMsgToNodeAsync(m *msg.Msg, node int) {
go sendMessageAsync(m, node)
}
func SendMsgToNodeSync(m *msg.Msg, node int) (*msg.Msg, error) {
return sendMessageSync(m, node)
}
func SendPlayerMsgAsync(m *msg.Msg) error {
if m.SendT == 0 {
m.SendT = GoUtil.Now()
}
clone := m.Clone()
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
SendMsgToCenterAsync(clone)
return nil
}
func SendPlayerMsgSync(m *msg.Msg) (interface{}, error) {
clone := m.Clone()
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
return SendMsgToCenterSync(clone)
}
func FriendMgrSend(m1 *msg.Msg) error {
SendPlayerMsgAsync(m1)
return nil
}
// 异步发送消息到指定节点 节点不在线则保存消息
func sendMessageAsync(m *msg.Msg, node int) error {
err := mergeCluster.SendServerMsg(m, node)
if err != nil && GoUtil.InArray(m.HandleType, save_msg_type) {
saveMessage(m)
return err
}
deleteMessage(m)
return nil
}
// 同步消息到指定节点 节点不在线则保存消息
func sendMessageSync(m *msg.Msg, node int) (*msg.Msg, error) {
msg, err := mergeCluster.CallServerMsg(m, node)
if err != nil && conf.Server.ServerType == "center" && GoUtil.InArray(m.HandleType, save_msg_type) {
saveMessage(m)
return nil, err
}
deleteMessage(m)
return msg, nil
}
// 保存消息到本地
func saveMessage(m *msg.Msg) error {
data := getMessageData()
data.mu.Lock()
defer data.mu.Unlock()
messages := getMessge(int64(m.To))
messages.mu.Lock()
defer messages.mu.Unlock()
messages.Messages = append(messages.Messages, m)
return nil
}
func GetUserData(PlayerId int64, Key string) (*msg.Msg, error) {
return SendMsgToCenterSync(&msg.Msg{
From: int(PlayerId),
HandleType: msg.HANDLE_MOD_USER_VAR_GET,
Extra: msg.VarData{Key: Key},
})
}
func getMessge(PlayerId int64) *MessageList {
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
if _, ok := messageMgrData.MessageList[int64(PlayerId)]; !ok {
messageMgrData.MessageList[int64(PlayerId)] = &MessageList{
Messages: []*msg.Msg{},
}
}
return messageMgrData.MessageList[int64(PlayerId)]
}
func deleteMessage(m *msg.Msg) error {
messages := getMessge(int64(m.To))
messages.mu.Lock()
defer messages.mu.Unlock()
for i, msgItem := range messages.Messages {
if msgItem.UniKey == m.UniKey {
// 删除消息
messages.Messages = append(messages.Messages[:i], messages.Messages[i+1:]...)
log.Debug("[Middleware] send message success; message: %v, player id: %v", msgItem, msgItem.From)
break
}
}
return nil
}