package game import ( "context" "encoding/gob" "fmt" "runtime/debug" mergeCluster "server/cluster" "server/conf" "server/game/mod/card" "server/game/mod/friend" "server/game/mod/item" limitedTimeEvent "server/game/mod/limited_time_event" "server/game/mod/msg" GoUtil "server/game_util" proto "server/msg" "server/pkg/github.com/name5566/leaf/log" "sync" "time" ) var id = 1 // 中间件函数类型 type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc var save_msg_type = []int{ msg.HANDLE_MOD_PLAYER_MSG, msg.HANDLE_MDO_CHAMPSHIP_INRANK, msg.HANDLE_MOD_USER_VAR_SET, } type MessageMgr struct { *ServerMod middlewares []MessageMiddleware workerPool *WorkerPool handler map[int]MessageHandlerFunc } type MessageData struct { MessageList map[int64]*MessageList PlayerList map[int64]int mu sync.Mutex } type MessageList struct { Messages []*msg.Msg mu sync.Mutex } // Worker Pool 结构 type WorkerPool struct { workers int minWorkers int maxWorkers int taskQueue chan *MessageTask wg sync.WaitGroup ctx context.Context cancel context.CancelFunc maxQueue int mu sync.Mutex workerCancels []context.CancelFunc monitorTick *time.Ticker } // 消息任务 type MessageTask struct { Msg *msg.Msg Handler MessageHandlerFunc Result chan *TaskResult id int } // 任务结果 type TaskResult struct { Data interface{} Error error } func (m *MessageMgr) MessageMgrInit() { m.key = MESSAGE_MGR_KEY + fmt.Sprintf("_%d", conf.Server.ServerID) m.data = &MessageData{ MessageList: make(map[int64]*MessageList), PlayerList: make(map[int64]int), } // 注册所有可能在消息中使用的类型 gob.Register(&limitedTimeEvent.MoneyCat{}) gob.Register(&limitedTimeEvent.LuckyCat{}) gob.Register(&msg.HandbookMsg{}) gob.Register(&limitedTimeEvent.CatTrick{}) gob.Register(&VarOpration{}) gob.Register(&VarUserData{}) gob.Register(&ActivityInfo{}) gob.Register(&ChargeExtra{}) gob.Register(CatnipMsg{}) gob.Register(&CatnipLock{}) gob.Register(CRank{}) gob.Register(&proto.ResChampshipRank{}) gob.Register(&proto.ResChampshipPreRank{}) gob.Register(card.CardInfo{}) gob.Register(item.Item{}) gob.Register([]*item.Item{}) // 注册 []*item.Item 类型 gob.Register(friend.ReplyInfo{}) // 注册处理函数 m.init() m.handler = make(map[int]MessageHandlerFunc) m.middlewares = []MessageMiddleware{} // 初始化 Worker Pool (10个worker, 1000个队列大小) m.workerPool = NewWorkerPool(50, 10000) // 注册默认中间件 m.Use(RecoveryMiddleware()) m.Use(LoggingMiddleware()) m.Use(TimeoutMiddleware(5 * time.Second)) m.NodeRegister() m.CenterRegister() } // 注册处理器 func (s *MessageMgr) RegisterHandler(HandlerType int, fun MessageHandlerFunc) { s.handler[HandlerType] = fun } func (m *MessageMgr) NodeRegister() { if conf.Server.ServerType == "node" { m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(PlayerMsgHandler)) m.RegisterHandler(msg.HANDLE_MOD_REPLY_PLAYER_MSG, MessageHandlerFunc(PlayerReplyMsgHandler)) m.RegisterHandler(msg.HANDLE_MOD_CLUSTER_SYNC, MessageHandlerFunc(ClusterSyncHandler)) } } func (m *MessageMgr) CenterRegister() { if conf.Server.ServerType == "center" { m.RegisterHandler(msg.HANDLE_MOD_PLAYER_LOGIN, MessageHandlerFunc(PlayerLoginHandler)) m.RegisterHandler(msg.HANDLE_MDO_PLAYER_LOGOUT, MessageHandlerFunc(PlayerLogoutHandler)) m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(CenterPlayerMsgHandler)) m.RegisterHandler(msg.HANDLE_MOD_COMSUME_MSG, MessageHandlerFunc(ComsumerMsgHandler)) m.RegisterHandler(msg.HANDLE_MOD_VAR_SET, MessageHandlerFunc(SetVarDataHandler)) m.RegisterHandler(msg.HANDLE_MOD_VAR_GET, MessageHandlerFunc(GetVarDataHandler)) m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_SET, MessageHandlerFunc(SetUserVarDataHandler)) m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_GET, MessageHandlerFunc(GetUserVarDataHandler)) m.RegisterHandler(msg.HANDLE_MOD_CATNIP_PARTNER, MessageHandlerFunc(CatnipPartnerHandler)) m.RegisterHandler(msg.HANDLE_MDO_CHAMPSHIP_INRANK, MessageHandlerFunc(ChampshipInRankHandler)) m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_INFO, MessageHandlerFunc(ChampshipRankInfoHandler)) m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_LIST, MessageHandlerFunc(ChampshipRankListHandler)) m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_PRE_RANK, MessageHandlerFunc(ChampshipRankPreHandler)) } } func getMessageData() *MessageData { return G_GameLogicPtr.MessageMgr.data.(*MessageData) } // ----------------------------------- 处理函数实现 --------------------------- func ChampshipRankPreHandler(data *msg.Msg) (interface{}, error) { PlayerId := data.From PreRankMsg := G_GameLogicPtr.ChampshipMgr.GetPreRankMsg(PlayerId) ReplyPlayerMsgASync(data, PreRankMsg) return nil, nil } func ChampshipRankListHandler(data *msg.Msg) (interface{}, error) { PlayerId := data.From RankMsg := G_GameLogicPtr.ChampshipMgr.GetRankMsg(PlayerId) ReplyPlayerMsgASync(data, RankMsg) return nil, nil } func ChampshipRankInfoHandler(data *msg.Msg) (interface{}, error) { PlayerId := data.From MyRank := G_GameLogicPtr.ChampshipMgr.getMyRank(PlayerId) MyPreRank := G_GameLogicPtr.ChampshipMgr.getLastMyRank(PlayerId) ReplyPlayerMsgASync(data, []int{MyRank, MyPreRank}) return nil, nil } func NotifyAllPlayerMsg(m *msg.Msg) { messageMgrData := getMessageData() // 先复制 PlayerList,避免长时间持有锁 messageMgrData.mu.Lock() playerListCopy := make(map[int64]int, len(messageMgrData.PlayerList)) for k, v := range messageMgrData.PlayerList { playerListCopy[k] = v } messageMgrData.mu.Unlock() // 在锁外发送消息 for PlayerId, node := range playerListCopy { m.To = int(PlayerId) SendMsgToNodeAsync(m, node) } } func ChampshipInRankHandler(data *msg.Msg) (interface{}, error) { G_GameLogicPtr.ChampshipMgr.inRank(data) return nil, nil } func CatnipPartnerHandler(data *msg.Msg) (interface{}, error) { m, ok := data.Extra.(*CatnipPartner) if !ok { return nil, fmt.Errorf("invalid catnip partner data") } return G_GameLogicPtr.VarMgr.HandleCatnipPartner(m.Uid, m.Partner, m.GameId, m.EndTime) } func ReplyPlayerMsgASync(m *msg.Msg, reply interface{}) (interface{}, error) { clone := m.Reply(reply) messageMgrData := getMessageData() messageMgrData.mu.Lock() node, ok := messageMgrData.PlayerList[int64(m.From)] messageMgrData.mu.Unlock() if ok { SendMsgToNodeAsync(clone, node) } return nil, nil } // 节点连接时,同步消息 func ClusterSyncHandler(data *msg.Msg) (interface{}, error) { // 遍历所有玩家,发送登录消息 G_GameLogicPtr.M_Players.Range(func(k, v interface{}) bool { SendMsgToCenterAsync(&msg.Msg{ From: int(v.(*Player).M_DwUin), HandleType: msg.HANDLE_MOD_PLAYER_LOGIN, Extra: conf.Server.ServerID, }) return true }) // 发送暂存区消息(先复制再释放锁,避免长时间持有锁) messageMgrData := getMessageData() messageMgrData.mu.Lock() TempMessageList := messageMgrData.MessageList messageMgrData.MessageList = make(map[int64]*MessageList) messageMgrData.mu.Unlock() // 立即释放锁,在锁外发送消息 log.Debug("[Middleware] Cluster sync send temp message len: %d", len(TempMessageList)) for _, Message := range TempMessageList { for _, msgItem := range Message.Messages { SendMsgToCenterAsync(msgItem) } } return nil, nil } func PlayerLoginHandler(data *msg.Msg) (interface{}, error) { // 关闭 Worker Pool node := data.Extra.(int) messageMgrData := getMessageData() // 先更新 PlayerList(需要加锁) messageMgrData.mu.Lock() messageMgrData.PlayerList[int64(data.From)] = node messageMgrData.mu.Unlock() log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int)) // 对玩家消息列表加锁 messages := getMessge(int64(data.From)) messages.mu.Lock() // 复制消息列表,避免在锁内发送消息 messagesToSend := make([]*msg.Msg, len(messages.Messages)) copy(messagesToSend, messages.Messages) messages.mu.Unlock() // 在锁外发送离线消息 for _, message := range messagesToSend { SendMsgToNodeAsync(message, node) } log.Debug("[Middleware] Player sync logout message player id: %v, len: %d", data.From, len(messagesToSend)) ReplyPlayerMsgASync(data, nil) return nil, nil } func PlayerLogoutHandler(data *msg.Msg) (interface{}, error) { messageMgrData := getMessageData() messageMgrData.mu.Lock() delete(messageMgrData.PlayerList, int64(data.From)) messageMgrData.mu.Unlock() log.Debug("[Middleware] Player logout success player id: %v", data.From) return nil, nil } func ComsumerMsgHandler(data *msg.Msg) (interface{}, error) { messages := getMessge(int64(data.From)) messages.mu.Lock() defer messages.mu.Unlock() for i, msgItem := range messages.Messages { if msgItem.UniKey == data.UniKey { // 删除消息 messages.Messages = append(messages.Messages[:i], messages.Messages[i+1:]...) log.Debug("[Middleware] Comsume message success type: %d, player id: %v", msgItem.Type, msgItem.From) break } } return nil, nil } func CenterPlayerMsgHandler(data *msg.Msg) (interface{}, error) { PlayerId := int64(data.To) messageMgrData := getMessageData() // 遍历消息列表,发送消息给在线玩家 messages := getMessge(PlayerId) messages.mu.Lock() messages.Messages = append(messages.Messages, data) messages.mu.Unlock() // 检查玩家是否在线(需要加锁) messageMgrData.mu.Lock() node, ok := messageMgrData.PlayerList[int64(PlayerId)] messageMgrData.mu.Unlock() if ok { SendMsgToNodeAsync(data, node) } return nil, nil } func PlayerMsgHandler(data *msg.Msg) (interface{}, error) { p := G_GameLogicPtr.GetPlayer(int64(data.To)) // 不在线 不处理 if p == nil || p.stop { return nil, nil } p.Send(data.Clone()) // 处理完后发送消费消息 if data.HandleType == msg.HANDLE_MOD_PLAYER_MSG { data.HandleType = msg.HANDLE_MOD_COMSUME_MSG SendMsgToCenterAsync(data) } return nil, nil } func PlayerReplyMsgHandler(data *msg.Msg) (interface{}, error) { // 先处理同步回调 if data.UniKey != "" { mergeCluster.GetCallbackChanMu().RLock() chanel, ok := mergeCluster.CallbackChan[data.UniKey] mergeCluster.GetCallbackChanMu().RUnlock() if ok { log.Debug("reply message ") chanel <- data } } return nil, nil } // 添加中间件 func (m *MessageMgr) Use(middleware MessageMiddleware) { m.middlewares = append(m.middlewares, middleware) } // 应用所有中间件到处理函数 func (m *MessageMgr) applyMiddlewares(handler MessageHandlerFunc) MessageHandlerFunc { // 从后往前应用中间件 for i := len(m.middlewares) - 1; i >= 0; i-- { handler = m.middlewares[i](handler) } return handler } type MessageHandlerFunc func(message *msg.Msg) (interface{}, error) func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFunc) { m.RegisterHandler(hType, handler) } func (m *MessageMgr) Handle(msg *msg.Msg) (interface{}, error) { if fun, ok := m.handler[msg.Type]; ok { return fun(msg) } log.Error("server mod key:%s handle not exist handle type:%d", m.key, msg.Type) return nil, fmt.Errorf("server mod handler err") } // 异步处理消息 (多线程版本) func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error { if message.End != 0 && message.End < GoUtil.Now() { log.Debug("message had expired type:%d,to:%d", message.Type, message.To) return nil } if fun, ok := m.handler[message.HandleType]; ok { // 应用中间件 handlerWithMiddleware := m.applyMiddlewares(fun) id++ // 创建任务 task := &MessageTask{ id: id, Msg: message, Handler: handlerWithMiddleware, Result: make(chan *TaskResult, 1), } // 提交到 Worker Pool if err := m.workerPool.Submit(task); err != nil { log.Error("Failed to submit message task: %v", err) return err } // 可以选择等待结果或直接返回 go func() { result := <-task.Result if result.Error != nil { log.Error("Message handle error: %v", result.Error) } }() return nil } log.Error("server mod key:%s handle not exist handle type:%d", m.key, message.Type) return fmt.Errorf("server mod handler err") } // 兼容旧版本的函数 func MessageHandle(m *msg.Msg) error { log.Debug("RecvMessage m %v", m) // 这里可以调用 MessageMgr 的处理方法 G_GameLogicPtr.MessageMgr.MessageHandleAsync(m) return nil } // ==================== Worker Pool 实现 ==================== // 创建 Worker Pool func NewWorkerPool(workers, maxQueue int) *WorkerPool { ctx, cancel := context.WithCancel(context.Background()) if workers <= 0 { workers = 1 } pool := &WorkerPool{ workers: 0, minWorkers: workers, maxWorkers: workers * 40, taskQueue: make(chan *MessageTask, maxQueue), ctx: ctx, cancel: cancel, maxQueue: maxQueue, } pool.start() // 启动监控协程,负责动态扩缩容 pool.monitorTick = time.NewTicker(3000 * time.Millisecond) go pool.monitor() return pool } // 启动 Worker Pool func (p *WorkerPool) start() { // 启动最小数量的 worker for i := 0; i < p.minWorkers; i++ { p.spawnWorker() } } // spawnWorker 启动一个 worker,使用独立的 cancelable context func (p *WorkerPool) spawnWorker() { p.mu.Lock() defer p.mu.Unlock() childCtx, childCancel := context.WithCancel(p.ctx) p.workerCancels = append(p.workerCancels, childCancel) id := len(p.workerCancels) p.wg.Add(1) p.workers++ go p.worker(childCtx, id) } // Worker 工作函数,监听其子上下文以便单独停止 func (p *WorkerPool) worker(ctx context.Context, id int) { defer p.wg.Done() log.Debug("Worker %d started", id) for { select { case <-ctx.Done(): log.Debug("Worker %d stopped", id) return case <-p.ctx.Done(): log.Debug("Worker %d pool closed", id) return case task, ok := <-p.taskQueue: if !ok { log.Debug("Worker %d: task queue closed", id) return } // 执行任务 result, err := task.Handler(task.Msg) // 发送结果 task.Result <- &TaskResult{ Data: result, Error: err, } close(task.Result) } } } // 提交任务 func (p *WorkerPool) Submit(task *MessageTask) error { // 当队列已满时,等待并重试入队,直到成功或 pool 关闭 for { select { case <-p.ctx.Done(): return fmt.Errorf("worker pool is closed") case p.taskQueue <- task: // 如果队列接近满时,尝试扩容少量 worker queued := len(p.taskQueue) capQ := cap(p.taskQueue) if capQ > 0 && queued > capQ*3/4 { go func() { p.mu.Lock() deficit := p.maxWorkers - p.workers p.mu.Unlock() if deficit > 0 { // 最多扩容 2 个以避免剧烈波动 add := 2 if deficit < add { add = deficit } p.ScaleUp(add) } }() } return nil default: // 队列已满,短暂等待后重试 select { case <-p.ctx.Done(): return fmt.Errorf("worker pool is closed") case <-time.After(50 * time.Millisecond): // 重试 } } } } // 关闭 Worker Pool func (p *WorkerPool) Shutdown() { log.Debug("Shutting down worker pool...") // 停止监控 if p.monitorTick != nil { p.monitorTick.Stop() } // 取消根 context,会让所有子 worker 退出 p.cancel() // 关闭任务通道,释放阻塞的接收者 close(p.taskQueue) // 等待所有 worker 退出 p.wg.Wait() log.Debug("Worker pool shut down complete") } // ScaleUp 按数量增加 worker func (p *WorkerPool) ScaleUp(n int) { if n <= 0 { return } p.mu.Lock() can := p.maxWorkers - p.workers if can <= 0 { p.mu.Unlock() return } if n > can { n = can } p.mu.Unlock() for i := 0; i < n; i++ { p.spawnWorker() } } // ScaleDown 按数量减少 worker func (p *WorkerPool) ScaleDown(n int) { if n <= 0 { return } p.mu.Lock() for i := 0; i < n && len(p.workerCancels) > 0 && p.workers > p.minWorkers; i++ { last := len(p.workerCancels) - 1 cancel := p.workerCancels[last] // 从 slice 中移除 p.workerCancels = p.workerCancels[:last] p.workers-- // 调用 cancel 让对应 worker 退出 cancel() } p.mu.Unlock() } // monitor 周期性检查队列长度并做扩缩容 func (p *WorkerPool) monitor() { for { select { case <-p.ctx.Done(): return case <-p.monitorTick.C: queued := len(p.taskQueue) capQ := cap(p.taskQueue) if capQ == 0 { continue } // 扩容条件 if queued > capQ*3/4 { p.mu.Lock() deficit := p.maxWorkers - p.workers p.mu.Unlock() if deficit > 0 { add := 1 if deficit >= 2 { add = 2 } p.ScaleUp(add) } } // 缩容条件 if queued < capQ/4 { p.mu.Lock() extra := p.workers - p.minWorkers p.mu.Unlock() if extra > 0 { // 每次缩一个,避免抖动 p.ScaleDown(1) } } } } } // ==================== 中间件实现 ==================== // 日志中间件 func LoggingMiddleware() MessageMiddleware { return func(next MessageHandlerFunc) MessageHandlerFunc { return func(message *msg.Msg) (interface{}, error) { start := time.Now() log.Debug("[Middleware] Processing message : %v, time: %v", message, start) result, err := next(message) duration := time.Since(start) if err != nil { log.Error("[Middleware] Message : %v failed, duration: %v, error: %v", message, duration, err) } else { log.Debug("[Middleware] Message : %v success, duration: %v", message, duration) } return result, err } } } // 恢复 Panic 中间件 func RecoveryMiddleware() MessageMiddleware { return func(next MessageHandlerFunc) MessageHandlerFunc { return func(message *msg.Msg) (result interface{}, err error) { defer func() { if r := recover(); r != nil { log.Error("[Middleware] Panic recovered: %v\nStack: %s", r, debug.Stack()) err = fmt.Errorf("panic recovered: %v", r) } }() return next(message) } } } // 超时中间件 func TimeoutMiddleware(timeout time.Duration) MessageMiddleware { return func(next MessageHandlerFunc) MessageHandlerFunc { return func(message *msg.Msg) (interface{}, error) { resultChan := make(chan *TaskResult, 1) go func() { result, err := next(message) resultChan <- &TaskResult{Data: result, Error: err} }() select { case result := <-resultChan: return result.Data, result.Error case <-time.After(timeout): log.Error("[Middleware] Message : %v timeout after %v", message, timeout) GoUtil.SendFeishuFatal(0, "message_mgr", fmt.Sprintf("Message Handler Timeout\nMessage: %v\nTimeout: %v", message, timeout)) return nil, fmt.Errorf("message handler timeout") } } } } // 重试中间件 func RetryMiddleware(maxRetries int) MessageMiddleware { return func(next MessageHandlerFunc) MessageHandlerFunc { return func(message *msg.Msg) (interface{}, error) { var result interface{} var err error for i := 0; i <= maxRetries; i++ { result, err = next(message) if err == nil { return result, nil } if i < maxRetries { log.Debug("[Middleware] Retry %d/%d for message : %v, error: %v", i+1, maxRetries, message, err) time.Sleep(time.Millisecond * 100 * time.Duration(i+1)) } } return result, fmt.Errorf("failed after %d retries: %w", maxRetries, err) } } } // 验证中间件 func ValidationMiddleware() MessageMiddleware { return func(next MessageHandlerFunc) MessageHandlerFunc { return func(message *msg.Msg) (interface{}, error) { // 添加消息验证逻辑 if message == nil { return nil, fmt.Errorf("message is nil") } if message.Type <= 0 { return nil, fmt.Errorf("invalid message type: %d", message.Type) } return next(message) } } } // ----------------------------------- 发送消息相关函数 --------------------------- func SendMsgToCenterAsync(m *msg.Msg) { go sendMessageAsync(m, conf.Server.CenterNode) } func SendMsgToCenterSync(m *msg.Msg) (*msg.Msg, error) { return sendMessageSync(m, conf.Server.CenterNode) } func SendMsgToNodeAsync(m *msg.Msg, node int) { go sendMessageAsync(m, node) } func SendMsgToNodeSync(m *msg.Msg, node int) (*msg.Msg, error) { return sendMessageSync(m, node) } func SendPlayerMsgAsync(m *msg.Msg) error { if m.SendT == 0 { m.SendT = GoUtil.Now() } clone := m.Clone() clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG SendMsgToCenterAsync(clone) return nil } func SendPlayerMsgSync(m *msg.Msg) (interface{}, error) { clone := m.Clone() clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG return SendMsgToCenterSync(clone) } func FriendMgrSend(m1 *msg.Msg) error { SendPlayerMsgAsync(m1) return nil } // 异步发送消息到指定节点 节点不在线则保存消息 func sendMessageAsync(m *msg.Msg, node int) error { err := mergeCluster.SendServerMsg(m, node) if err != nil && GoUtil.InArray(m.HandleType, save_msg_type) { saveMessage(m) return err } deleteMessage(m) return nil } // 同步消息到指定节点 节点不在线则保存消息 func sendMessageSync(m *msg.Msg, node int) (*msg.Msg, error) { msg, err := mergeCluster.CallServerMsg(m, node) if err != nil && conf.Server.ServerType == "center" && GoUtil.InArray(m.HandleType, save_msg_type) { saveMessage(m) return nil, err } deleteMessage(m) return msg, nil } // 保存消息到本地 func saveMessage(m *msg.Msg) error { data := getMessageData() data.mu.Lock() defer data.mu.Unlock() // 使用不加锁的内部方法,避免死锁 messages := getMessgeUnsafe(int64(m.To)) messages.mu.Lock() defer messages.mu.Unlock() messages.Messages = append(messages.Messages, m) return nil } func GetUserData(PlayerId int64, Key string) (*msg.Msg, error) { return SendMsgToCenterSync(&msg.Msg{ From: int(PlayerId), HandleType: msg.HANDLE_MOD_USER_VAR_GET, Extra: msg.VarData{Key: Key}, }) } // getMessgeUnsafe 获取消息列表(不加锁,调用者需要持有锁) func getMessgeUnsafe(PlayerId int64) *MessageList { messageMgrData := getMessageData() if _, ok := messageMgrData.MessageList[int64(PlayerId)]; !ok { messageMgrData.MessageList[int64(PlayerId)] = &MessageList{ Messages: []*msg.Msg{}, } } return messageMgrData.MessageList[int64(PlayerId)] } // getMessge 获取消息列表(加锁版本) func getMessge(PlayerId int64) *MessageList { messageMgrData := getMessageData() messageMgrData.mu.Lock() defer messageMgrData.mu.Unlock() return getMessgeUnsafe(PlayerId) } func deleteMessage(m *msg.Msg) error { if m == nil { return nil } messages := getMessge(int64(m.To)) messages.mu.Lock() defer messages.mu.Unlock() // 使用更安全的方式删除元素:找到索引后再删除,避免在range中修改切片 foundIndex := -1 for i, msgItem := range messages.Messages { if msgItem == nil { continue } if msgItem.UniKey == m.UniKey { foundIndex = i log.Debug("[Middleware] send message success; message: %v, player id: %v", msgItem, msgItem.From) break } } if foundIndex >= 0 { // 删除消息:将后面的元素前移,避免内存泄漏 copy(messages.Messages[foundIndex:], messages.Messages[foundIndex+1:]) messages.Messages[len(messages.Messages)-1] = nil // 清除最后一个元素的引用 messages.Messages = messages.Messages[:len(messages.Messages)-1] } return nil }