pet_home_server/src/server/game/message_mgr.go
2026-03-09 12:21:22 +08:00

1007 lines
28 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package game
import (
"context"
"fmt"
"runtime/debug"
mergeCluster "server/cluster"
"server/conf"
"server/game/mod/msg"
GoUtil "server/game_util"
"server/pkg/github.com/name5566/leaf/log"
"sync"
"time"
)
var id = 1
// 中间件函数类型
type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc
var save_msg_type = []int{
msg.HANDLE_MOD_PLAYER_MSG,
msg.HANDLE_MDO_CHAMPSHIP_INRANK,
msg.HANDLE_MOD_USER_VAR_SET,
}
var notify_msg_type = []int{
msg.HANDLE_TYPE_CHAMPSHIP_GROUP, //锦标赛分组操作
msg.HANDLE_TYPE_CHAMPSHIP_INRANK, //锦标赛入榜操作
msg.HANDLE_TYPE_CHAMPSHIP_AI, //锦标赛入榜操作
msg.HANDLE_TYPE_CHAMPSHIP_NOTIFY, //锦标赛排名变动通知
msg.HANDLE_TYPE_CHAMPSHIP_ZERO, //锦标赛0点更新
msg.HANDLE_TYPE_CHAMPSHIP_NOTIFY2, //锦标赛0.30点通知
}
type MessageMgr struct {
*ServerMod
middlewares []MessageMiddleware
workerPool *WorkerPool
handler map[int]MessageHandlerFunc
}
type MessageData struct {
MessageList map[int64]*MessageList
PlayerList map[int64]int
mu sync.Mutex
}
type MessageList struct {
Messages []*msg.Msg
mu sync.Mutex
}
// Worker Pool 结构
type WorkerPool struct {
workers int
minWorkers int
maxWorkers int
taskQueue chan *MessageTask
wg sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
maxQueue int
mu sync.Mutex
workerCancels []context.CancelFunc
monitorTick *time.Ticker
}
// 消息任务
type MessageTask struct {
Msg *msg.Msg
Handler MessageHandlerFunc
Result chan *TaskResult
id int
}
// 任务结果
type TaskResult struct {
Data interface{}
Error error
}
func (m *MessageMgr) MessageMgrInit() {
m.key = MESSAGE_MGR_KEY + fmt.Sprintf("_%d", conf.Server.ServerID)
m.data = &MessageData{
MessageList: make(map[int64]*MessageList),
PlayerList: make(map[int64]int),
}
// 注册处理函数
m.init()
m.handler = make(map[int]MessageHandlerFunc)
m.middlewares = []MessageMiddleware{}
// 初始化 Worker Pool (10个worker, 1000个队列大小)
m.workerPool = NewWorkerPool(50, 10000)
// 注册默认中间件
m.Use(RecoveryMiddleware())
m.Use(LoggingMiddleware())
m.Use(TimeoutMiddleware(5 * time.Second))
m.NodeRegister()
m.CenterRegister()
FixBug()
}
func FixBug() {
// messageMgrData := getMessageData()
// // 先更新 PlayerList需要加锁
// messageMgrData.mu.Lock()
// defer messageMgrData.mu.Unlock()
// now := GoUtil.Now()
// for k, v := range messageMgrData.MessageList {
// if k < 100000 {
// delete(messageMgrData.MessageList, k)
// continue
// }
// isLose := CheckPlayerLose(int(k))
// // 反向遍历以安全删除元素
// for i := len(v.Messages) - 1; i >= 0; i-- {
// if v.Messages[i].Type == msg.HANDLE_TYPE_CHAMPSHIP_NOTIFY || (v.Messages[i].End != 0 && v.Messages[i].End < now) {
// // 删除消息
// v.Messages = append(v.Messages[:i], v.Messages[i+1:]...)
// }
// if isLose && v.Messages[i].Type == msg.HANDLE_TYPE_CHAMPSHIP_RESULT {
// // 删除消息
// v.Messages = append(v.Messages[:i], v.Messages[i+1:]...)
// }
// }
// if len(v.Messages) == 0 {
// delete(messageMgrData.MessageList, k)
// }
// }
}
// 注册处理器
func (s *MessageMgr) RegisterHandler(HandlerType int, fun MessageHandlerFunc) {
s.handler[HandlerType] = fun
}
func (m *MessageMgr) NodeRegister() {
if conf.Server.ServerType == "node" {
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(PlayerMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_REPLY_PLAYER_MSG, MessageHandlerFunc(PlayerReplyMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_CLUSTER_SYNC, MessageHandlerFunc(ClusterSyncHandler))
}
}
func (m *MessageMgr) CenterRegister() {
if conf.Server.ServerType == "center" {
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_LOGIN, MessageHandlerFunc(PlayerLoginHandler))
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_LOGOUT, MessageHandlerFunc(PlayerLogoutHandler))
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(CenterPlayerMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_COMSUME_MSG, MessageHandlerFunc(ComsumerMsgHandler))
m.RegisterHandler(msg.HANDLE_MOD_VAR_SET, MessageHandlerFunc(SetVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_VAR_GET, MessageHandlerFunc(GetVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_SET, MessageHandlerFunc(SetUserVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_GET, MessageHandlerFunc(GetUserVarDataHandler))
m.RegisterHandler(msg.HANDLE_MOD_CATNIP_PARTNER, MessageHandlerFunc(CatnipPartnerHandler))
m.RegisterHandler(msg.HANDLE_MDO_CHAMPSHIP_INRANK, MessageHandlerFunc(ChampshipInRankHandler))
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_INFO, MessageHandlerFunc(ChampshipRankInfoHandler))
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_LIST, MessageHandlerFunc(ChampshipRankListHandler))
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_PRE_RANK, MessageHandlerFunc(ChampshipRankPreHandler))
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_GROUP, MessageHandlerFunc(ChampshipGroupHandler))
}
}
func getMessageData() *MessageData {
return G_GameLogicPtr.MessageMgr.data.(*MessageData)
}
// ----------------------------------- 处理函数实现 ---------------------------
func ChampshipRankPreHandler(data *msg.Msg) (interface{}, error) {
PlayerId := data.From
PreRankMsg := G_GameLogicPtr.ChampshipMgr.GetPreRankMsg(PlayerId)
ReplyPlayerMsgASync(data, PreRankMsg)
return nil, nil
}
func ChampshipRankListHandler(data *msg.Msg) (interface{}, error) {
PlayerId := data.From
RankMsg := G_GameLogicPtr.ChampshipMgr.GetRankMsg(PlayerId)
ReplyPlayerMsgASync(data, RankMsg)
return nil, nil
}
func ChampshipRankInfoHandler(data *msg.Msg) (interface{}, error) {
PlayerId := data.From
MyRank := G_GameLogicPtr.ChampshipMgr.getMyRank(PlayerId)
MyPreRank := G_GameLogicPtr.ChampshipMgr.getLastMyRank(PlayerId)
ReplyPlayerMsgASync(data, []int{MyRank, MyPreRank})
return nil, nil
}
func NotifyAllPlayerMsg(m *msg.Msg) {
messageMgrData := getMessageData()
// 先复制 PlayerList避免长时间持有锁
messageMgrData.mu.Lock()
playerListCopy := make(map[int64]int, len(messageMgrData.PlayerList))
for k, v := range messageMgrData.PlayerList {
playerListCopy[k] = v
}
messageMgrData.mu.Unlock()
// 在锁外发送消息
for PlayerId, node := range playerListCopy {
copym := m.Clone()
copym.To = int(PlayerId)
SendMsgToNodeAsync(copym, node)
}
}
func ChampshipGroupHandler(data *msg.Msg) (interface{}, error) {
G_GameLogicPtr.ChampshipMgr.group(true)
return nil, nil
}
func ChampshipInRankHandler(data *msg.Msg) (interface{}, error) {
G_GameLogicPtr.ChampshipMgr.inRank(data)
return nil, nil
}
func CatnipPartnerHandler(data *msg.Msg) (interface{}, error) {
m, ok := data.Extra.(*CatnipPartner)
if !ok {
return nil, fmt.Errorf("invalid catnip partner data")
}
return G_GameLogicPtr.VarMgr.HandleCatnipPartner(m.Uid, m.Partner, m.GameId, m.EndTime)
}
func ReplyPlayerMsgASync(m *msg.Msg, reply interface{}) (interface{}, error) {
clone := m.Reply(reply)
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
node, ok := messageMgrData.PlayerList[int64(m.From)]
messageMgrData.mu.Unlock()
if ok {
SendMsgToNodeAsync(clone, node)
}
return nil, nil
}
// 节点连接时,同步消息
func ClusterSyncHandler(data *msg.Msg) (interface{}, error) {
// 遍历所有玩家,发送登录消息
G_GameLogicPtr.M_Players.Range(func(k, v interface{}) bool {
SendMsgToCenterAsync(&msg.Msg{
From: int(v.(*Player).M_DwUin),
HandleType: msg.HANDLE_MOD_PLAYER_LOGIN,
Extra: conf.Server.ServerID,
})
return true
})
// 发送暂存区消息(先复制再释放锁,避免长时间持有锁)
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
TempMessageList := messageMgrData.MessageList
messageMgrData.MessageList = make(map[int64]*MessageList)
messageMgrData.mu.Unlock() // 立即释放锁,在锁外发送消息
log.Debug("[Middleware] Cluster sync send temp message len: %d", len(TempMessageList))
for _, Message := range TempMessageList {
for _, msgItem := range Message.Messages {
SendMsgToCenterAsync(msgItem)
}
}
return nil, nil
}
func PlayerLoginHandler(data *msg.Msg) (interface{}, error) {
// 关闭 Worker Pool
node := data.Extra.(int)
messageMgrData := getMessageData()
// 先更新 PlayerList需要加锁
messageMgrData.mu.Lock()
messageMgrData.PlayerList[int64(data.From)] = node
messageMgrData.mu.Unlock()
log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int))
// 对玩家消息列表加锁
messages := getMessge(int64(data.From))
messages.mu.Lock()
// 复制消息列表,避免在锁内发送消息
// 过滤过期消息
now := GoUtil.Now()
validMessages := make([]*msg.Msg, 0, len(messages.Messages))
for _, message := range messages.Messages {
if message.End == 0 || message.End >= now {
validMessages = append(validMessages, message)
}
}
messages.Messages = validMessages
messagesToSend := make([]*msg.Msg, len(messages.Messages))
copy(messagesToSend, messages.Messages)
messages.mu.Unlock()
ReplyPlayerMsgASync(data, nil)
// 在锁外发送离线消息
var applyUids []int64
var otherUids []int64
for _, message := range messagesToSend {
if message.Type == msg.HANDLE_TYPE_APPLY {
applyUids = append(applyUids, int64(message.From))
} else {
otherUids = append(otherUids, int64(message.From))
}
}
for _, message := range messagesToSend {
if message.Type == msg.HANDLE_TYPE_APPLY && len(applyUids) >= 3 {
message.H = msg.MSG_TYPE_OFFLINE // 标记为离线消息
}
if message.Type != msg.HANDLE_TYPE_APPLY && len(otherUids) >= 3 {
message.H = msg.MSG_TYPE_OFFLINE // 标记为离线消息
}
SendMsgToNodeAsync(message, node)
}
applyUidsFive := applyUids
if len(applyUids) > 5 {
applyUidsFive = applyUids[len(applyUids)-5:]
}
otherUidsFive := otherUids
if len(otherUids) > 5 {
otherUidsFive = otherUids[len(otherUids)-5:]
}
SendMsgToNodeAsync(&msg.Msg{
From: data.From,
To: data.From,
Type: msg.SERVER_PLAYER_SYNC_LOGOUT_MSG,
HandleType: msg.HANDLE_MOD_PLAYER_MSG,
Extra: map[string]interface{}{
"apply_uids": applyUidsFive,
"apply_count": len(applyUids),
"other_uids": otherUidsFive,
"other_count": len(otherUids),
},
}, node) // 发送离线消息处理完成通知
log.Debug("[Middleware] Player sync logout message player id: %v, len: %d", data.From, len(messagesToSend))
return nil, nil
}
func PlayerLogoutHandler(data *msg.Msg) (interface{}, error) {
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
delete(messageMgrData.PlayerList, int64(data.From))
messageMgrData.mu.Unlock()
log.Debug("[Middleware] Player logout success player id: %v", data.From)
return nil, nil
}
func ComsumerMsgHandler(data *msg.Msg) (interface{}, error) {
messages := getMessge(int64(data.From))
messages.mu.Lock()
defer messages.mu.Unlock()
for i, msgItem := range messages.Messages {
if msgItem == nil {
continue
}
if msgItem.UniKey == data.UniKey {
// 删除消息
messages.Messages = append(messages.Messages[:i], messages.Messages[i+1:]...)
log.Debug("[Middleware] Comsume message success type: %d, player id: %v", msgItem.Type, msgItem.From)
break
}
}
return nil, nil
}
func NotifyPlayerMsgAsync(m *msg.Msg) {
messageMgrData := getMessageData()
// 检查玩家是否在线(需要加锁)
messageMgrData.mu.Lock()
node, ok := messageMgrData.PlayerList[int64(m.To)]
messageMgrData.mu.Unlock()
// 在线则直接发送消息
if ok {
SendMsgToNodeAsync(m, node)
}
}
// 处理玩家消息
func CenterPlayerMsgHandler(data *msg.Msg) (interface{}, error) {
PlayerId := int64(data.To)
messageMgrData := getMessageData()
// 遍历消息列表,发送消息给在线玩家
// 检查玩家是否在线(需要加锁)
messageMgrData.mu.Lock()
node, ok := messageMgrData.PlayerList[int64(PlayerId)]
messageMgrData.mu.Unlock()
// 在线则直接发送消息
if ok {
SendMsgToNodeAsync(data, node)
} else {
// 不在线则存储消息
applycount := 0
if !GoUtil.InArray(data.Type, notify_msg_type) {
messages := getMessge(PlayerId)
messages.mu.Lock()
messages.Messages = append(messages.Messages, data)
for _, msgItem := range messages.Messages {
if msgItem == nil {
continue
}
if msgItem.Type == msg.HANDLE_TYPE_APPLY {
applycount++
}
}
if applycount > 1 && data.Type == msg.HANDLE_TYPE_APPLY {
go NotifyFriendApply(data.To, data.From)
}
messages.mu.Unlock()
}
}
return nil, nil
}
func PlayerMsgHandler(data *msg.Msg) (interface{}, error) {
p := G_GameLogicPtr.GetPlayer(int64(data.To))
// 不在线 不处理
if p == nil || p.stop {
return nil, nil
}
go p.Send(data.Clone())
// 处理完后发送消费消息
if data.HandleType == msg.HANDLE_MOD_PLAYER_MSG {
data.HandleType = msg.HANDLE_MOD_COMSUME_MSG
SendMsgToCenterAsync(data)
}
return nil, nil
}
func PlayerReplyMsgHandler(data *msg.Msg) (interface{}, error) {
// 先处理同步回调
if data.UniKey != "" {
mergeCluster.GetCallbackChanMu().RLock()
chanel, ok := mergeCluster.CallbackChan[data.UniKey]
mergeCluster.GetCallbackChanMu().RUnlock()
if ok {
// log.Debug("reply message ")
chanel <- data
}
}
return nil, nil
}
// 添加中间件
func (m *MessageMgr) Use(middleware MessageMiddleware) {
m.middlewares = append(m.middlewares, middleware)
}
// 应用所有中间件到处理函数
func (m *MessageMgr) applyMiddlewares(handler MessageHandlerFunc) MessageHandlerFunc {
// 从后往前应用中间件
for i := len(m.middlewares) - 1; i >= 0; i-- {
handler = m.middlewares[i](handler)
}
return handler
}
type MessageHandlerFunc func(message *msg.Msg) (interface{}, error)
func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFunc) {
m.RegisterHandler(hType, handler)
}
func (m *MessageMgr) Handle(msg *msg.Msg) (interface{}, error) {
if fun, ok := m.handler[msg.Type]; ok {
return fun(msg)
}
log.Error("server mod key:%s handle not exist handle type:%d", m.key, msg.Type)
return nil, fmt.Errorf("server mod handler err")
}
// 异步处理消息 (多线程版本)
func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error {
if message.End != 0 && message.End < GoUtil.Now() {
log.Debug("message had expired type:%d,to:%d", message.Type, message.To)
return nil
}
if fun, ok := m.handler[message.HandleType]; ok {
// 应用中间件
handlerWithMiddleware := m.applyMiddlewares(fun)
id++
// 创建任务
task := &MessageTask{
id: id,
Msg: message,
Handler: handlerWithMiddleware,
Result: make(chan *TaskResult, 1),
}
// 提交到 Worker Pool
if err := m.workerPool.Submit(task); err != nil {
log.Error("Failed to submit message task: %v", err)
return err
}
// 可以选择等待结果或直接返回
go func() {
result := <-task.Result
if result.Error != nil {
log.Error("Message handle error: %v", result.Error)
}
}()
return nil
}
log.Error("server mod key:%s handle not exist handle type:%d", m.key, message.Type)
return fmt.Errorf("server mod handler err")
}
// 兼容旧版本的函数
func MessageHandle(m *msg.Msg) error {
// log.Debug("RecvMessage m %v", m)
// 这里可以调用 MessageMgr 的处理方法
G_GameLogicPtr.MessageMgr.MessageHandleAsync(m)
return nil
}
// ==================== Worker Pool 实现 ====================
// 创建 Worker Pool
func NewWorkerPool(workers, maxQueue int) *WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
if workers <= 0 {
workers = 1
}
pool := &WorkerPool{
workers: 0,
minWorkers: workers,
maxWorkers: workers * 40,
taskQueue: make(chan *MessageTask, maxQueue),
ctx: ctx,
cancel: cancel,
maxQueue: maxQueue,
}
pool.start()
// 启动监控协程,负责动态扩缩容
pool.monitorTick = time.NewTicker(3000 * time.Millisecond)
go pool.monitor()
return pool
}
// 启动 Worker Pool
func (p *WorkerPool) start() {
// 启动最小数量的 worker
for i := 0; i < p.minWorkers; i++ {
p.spawnWorker()
}
}
// spawnWorker 启动一个 worker使用独立的 cancelable context
func (p *WorkerPool) spawnWorker() {
p.mu.Lock()
defer p.mu.Unlock()
childCtx, childCancel := context.WithCancel(p.ctx)
p.workerCancels = append(p.workerCancels, childCancel)
id := len(p.workerCancels)
p.wg.Add(1)
p.workers++
go p.worker(childCtx, id)
}
// Worker 工作函数,监听其子上下文以便单独停止
func (p *WorkerPool) worker(ctx context.Context, id int) {
defer p.wg.Done()
for {
select {
case <-ctx.Done():
log.Debug("Worker %d stopped", id)
return
case <-p.ctx.Done():
log.Debug("Worker %d pool closed", id)
return
case task, ok := <-p.taskQueue:
if !ok {
log.Debug("Worker %d: task queue closed", id)
return
}
// 执行任务
result, err := task.Handler(task.Msg)
// 发送结果
task.Result <- &TaskResult{
Data: result,
Error: err,
}
close(task.Result)
}
}
}
// 提交任务
func (p *WorkerPool) Submit(task *MessageTask) error {
// 当队列已满时,等待并重试入队,直到成功或 pool 关闭
for {
select {
case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed")
case p.taskQueue <- task:
// 如果队列接近满时,尝试扩容少量 worker
queued := len(p.taskQueue)
capQ := cap(p.taskQueue)
if capQ > 0 && queued > capQ*3/4 {
go func() {
p.mu.Lock()
deficit := p.maxWorkers - p.workers
p.mu.Unlock()
if deficit > 0 {
// 最多扩容 2 个以避免剧烈波动
add := 2
if deficit < add {
add = deficit
}
p.ScaleUp(add)
}
}()
}
return nil
default:
// 队列已满,短暂等待后重试
select {
case <-p.ctx.Done():
return fmt.Errorf("worker pool is closed")
case <-time.After(50 * time.Millisecond):
// 重试
}
}
}
}
// 关闭 Worker Pool
func (p *WorkerPool) Shutdown() {
log.Debug("Shutting down worker pool...")
// 停止监控
if p.monitorTick != nil {
p.monitorTick.Stop()
}
// 取消根 context会让所有子 worker 退出
p.cancel()
// 关闭任务通道,释放阻塞的接收者
close(p.taskQueue)
// 等待所有 worker 退出
p.wg.Wait()
log.Debug("Worker pool shut down complete")
}
// ScaleUp 按数量增加 worker
func (p *WorkerPool) ScaleUp(n int) {
if n <= 0 {
return
}
p.mu.Lock()
can := p.maxWorkers - p.workers
if can <= 0 {
p.mu.Unlock()
return
}
if n > can {
n = can
}
p.mu.Unlock()
for i := 0; i < n; i++ {
p.spawnWorker()
}
}
// ScaleDown 按数量减少 worker
func (p *WorkerPool) ScaleDown(n int) {
if n <= 0 {
return
}
p.mu.Lock()
for i := 0; i < n && len(p.workerCancels) > 0 && p.workers > p.minWorkers; i++ {
last := len(p.workerCancels) - 1
cancel := p.workerCancels[last]
// 从 slice 中移除
p.workerCancels = p.workerCancels[:last]
p.workers--
// 调用 cancel 让对应 worker 退出
cancel()
}
p.mu.Unlock()
}
// monitor 周期性检查队列长度并做扩缩容
func (p *WorkerPool) monitor() {
for {
select {
case <-p.ctx.Done():
return
case <-p.monitorTick.C:
queued := len(p.taskQueue)
capQ := cap(p.taskQueue)
if capQ == 0 {
continue
}
// 扩容条件
if queued > capQ*3/4 {
p.mu.Lock()
deficit := p.maxWorkers - p.workers
p.mu.Unlock()
if deficit > 0 {
add := 1
if deficit >= 2 {
add = 2
}
p.ScaleUp(add)
}
}
// 缩容条件
if queued < capQ/4 {
p.mu.Lock()
extra := p.workers - p.minWorkers
p.mu.Unlock()
if extra > 0 {
// 每次缩一个,避免抖动
p.ScaleDown(1)
}
}
}
}
}
// ==================== 中间件实现 ====================
// 日志中间件
func LoggingMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
start := time.Now()
// log.Debug("[Middleware] Processing message : %v, time: %v", message, start)
result, err := next(message)
duration := time.Since(start)
if err != nil {
log.Error("[Middleware] Message handle error! message: %v; duration: %v; error: %v", message, duration, err)
} else {
log.Debug("[Middleware] Message handle success! message: %v; duration: %v", message, duration)
}
return result, err
}
}
}
// 恢复 Panic 中间件
func RecoveryMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (result interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Error("[Middleware] Panic recovered: %v\nStack: %s", r, debug.Stack())
err = fmt.Errorf("panic recovered: %v", r)
}
}()
return next(message)
}
}
}
// 超时中间件
func TimeoutMiddleware(timeout time.Duration) MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
resultChan := make(chan *TaskResult, 1)
go func() {
result, err := next(message)
resultChan <- &TaskResult{Data: result, Error: err}
}()
select {
case result := <-resultChan:
return result.Data, result.Error
case <-time.After(timeout):
log.Error("[Middleware] Message : %v timeout after %v", message, timeout)
GoUtil.SendFeishuFatal(0, "message_mgr", fmt.Sprintf("Message Handler Timeout\nMessage: %v\nTimeout: %v", message, timeout))
return nil, fmt.Errorf("message handler timeout")
}
}
}
}
// 重试中间件
func RetryMiddleware(maxRetries int) MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
var result interface{}
var err error
for i := 0; i <= maxRetries; i++ {
result, err = next(message)
if err == nil {
return result, nil
}
if i < maxRetries {
log.Debug("[Middleware] Retry %d/%d for message : %v, error: %v", i+1, maxRetries, message, err)
time.Sleep(time.Millisecond * 100 * time.Duration(i+1))
}
}
return result, fmt.Errorf("failed after %d retries: %w", maxRetries, err)
}
}
}
// 验证中间件
func ValidationMiddleware() MessageMiddleware {
return func(next MessageHandlerFunc) MessageHandlerFunc {
return func(message *msg.Msg) (interface{}, error) {
// 添加消息验证逻辑
if message == nil {
return nil, fmt.Errorf("message is nil")
}
if message.Type <= 0 {
return nil, fmt.Errorf("invalid message type: %d", message.Type)
}
return next(message)
}
}
}
// ----------------------------------- 发送消息相关函数 ---------------------------
func SendMsgToCenterAsync(m *msg.Msg) {
go sendMessageAsync(m, conf.Server.CenterNode)
}
func SendMsgToCenterSync(m *msg.Msg) (*msg.Msg, error) {
return sendMessageSync(m, conf.Server.CenterNode)
}
func SendMsgToNodeAsync(m *msg.Msg, node int) {
go sendMessageAsync(m, node)
}
func SendMsgToNodeSync(m *msg.Msg, node int) (*msg.Msg, error) {
return sendMessageSync(m, node)
}
func SendPlayerMsgAsync(m *msg.Msg) error {
if m.SendT == 0 {
m.SendT = GoUtil.Now()
}
clone := m.Clone()
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
SendMsgToCenterAsync(clone)
return nil
}
func SendPlayerMsgSync(m *msg.Msg) (interface{}, error) {
clone := m.Clone()
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
return SendMsgToCenterSync(clone)
}
func FriendMgrSend(m1 *msg.Msg) error {
if m1.SendT == 0 {
m1.SendT = GoUtil.Now()
}
SendPlayerMsgAsync(m1)
return nil
}
// 异步发送消息到指定节点 节点不在线则保存消息
func sendMessageAsync(m *msg.Msg, node int) error {
defer func() {
if r := recover(); r != nil {
log.Error("message_mgr fatal : sendMessageAsync err:%s", fmt.Sprintf("fatal : %s", r))
GoUtil.SendFeishuFatal(0, "sendMessageAsync", fmt.Sprintf("fatal : %s", r))
}
}()
log.Debug("[Middleware] Send Async message to node: %d, message: %v", node, m)
err := mergeCluster.SendServerMsg(m, node)
// 节点不在线且消息需要保存则保存消息
// 需要保存的消息类型:玩家消息,锦标赛入榜,用户变量设置
// 不需要保存的消息类型:锦标赛相关通知类消息
if err != nil && GoUtil.InArray(m.HandleType, save_msg_type) && !GoUtil.InArray(m.Type, notify_msg_type) {
saveMessage(m)
return err
}
deleteMessage(m)
return nil
}
// 同步消息到指定节点 节点不在线则保存消息
func sendMessageSync(m *msg.Msg, node int) (*msg.Msg, error) {
defer func() {
if r := recover(); r != nil {
log.Error("message_mgr fatal : sendMessageSync err:%s", fmt.Sprintf("fatal : %s", r))
GoUtil.SendFeishuFatal(0, "sendMessageSync", fmt.Sprintf("fatal : %s", r))
}
}()
log.Debug("[Middleware] Send Sync message to node: %d, message: %v", node, m)
msg, err := mergeCluster.CallServerMsg(m, node)
if err != nil && conf.Server.ServerType == "center" && GoUtil.InArray(m.HandleType, save_msg_type) {
saveMessage(m)
return nil, err
}
deleteMessage(m)
return msg, nil
}
// 保存消息到本地
func saveMessage(m *msg.Msg) error {
data := getMessageData()
data.mu.Lock()
defer data.mu.Unlock()
// 使用不加锁的内部方法,避免死锁
messages := getMessgeUnsafe(int64(m.To))
messages.mu.Lock()
defer messages.mu.Unlock()
now := GoUtil.Now()
applycount := 0
for _, msgItem := range messages.Messages {
if msgItem == nil {
continue
}
if msgItem.UniKey == m.UniKey {
// 已存在相同消息,直接返回
return nil
}
if msgItem.End != 0 && msgItem.End < now {
// 删除过期消息
messages.Messages = append(messages.Messages[:0], messages.Messages[1:]...)
}
if msgItem.Type == msg.HANDLE_TYPE_APPLY {
applycount++
}
}
if applycount > 1 && m.Type == msg.HANDLE_TYPE_APPLY {
go NotifyFriendApply(m.To, m.From)
}
// 添加消息
messages.Messages = append(messages.Messages, m)
// 设置上限为2000条超过则丢弃最早的消息
if len(messages.Messages) > 2000 {
excess := len(messages.Messages) - 2000
messages.Messages = messages.Messages[excess:]
}
return nil
}
func GetUserData(PlayerId int64, Key string) (*msg.Msg, error) {
return SendMsgToCenterSync(&msg.Msg{
From: int(PlayerId),
HandleType: msg.HANDLE_MOD_USER_VAR_GET,
Extra: msg.VarData{Key: Key},
})
}
// getMessgeUnsafe 获取消息列表(不加锁,调用者需要持有锁)
func getMessgeUnsafe(PlayerId int64) *MessageList {
messageMgrData := getMessageData()
if _, ok := messageMgrData.MessageList[int64(PlayerId)]; !ok {
messageMgrData.MessageList[int64(PlayerId)] = &MessageList{
Messages: []*msg.Msg{},
}
}
return messageMgrData.MessageList[int64(PlayerId)]
}
// getMessge 获取消息列表(加锁版本)
func getMessge(PlayerId int64) *MessageList {
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
return getMessgeUnsafe(PlayerId)
}
func deleteMessage(m *msg.Msg) error {
if m == nil {
return nil
}
messages := getMessge(int64(m.To))
messages.mu.Lock()
defer messages.mu.Unlock()
// 使用更安全的方式删除元素找到索引后再删除避免在range中修改切片
foundIndex := -1
for i, msgItem := range messages.Messages {
if msgItem == nil {
continue
}
if msgItem.UniKey == m.UniKey {
foundIndex = i
log.Debug("[Middleware] send message success; message: %v, player id: %v", msgItem, msgItem.From)
break
}
}
if foundIndex >= 0 {
// 删除消息:将后面的元素前移,避免内存泄漏
copy(messages.Messages[foundIndex:], messages.Messages[foundIndex+1:])
messages.Messages[len(messages.Messages)-1] = nil // 清除最后一个元素的引用
messages.Messages = messages.Messages[:len(messages.Messages)-1]
}
if len(messages.Messages) == 0 {
// 如果消息列表为空,则删除该玩家的消息列表
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
delete(messageMgrData.MessageList, int64(m.To))
}
return nil
}