860 lines
23 KiB
Go
860 lines
23 KiB
Go
package game
|
||
|
||
import (
|
||
"context"
|
||
"encoding/gob"
|
||
"fmt"
|
||
"runtime/debug"
|
||
mergeCluster "server/cluster"
|
||
"server/conf"
|
||
"server/game/mod/card"
|
||
"server/game/mod/friend"
|
||
"server/game/mod/item"
|
||
limitedTimeEvent "server/game/mod/limited_time_event"
|
||
"server/game/mod/msg"
|
||
GoUtil "server/game_util"
|
||
proto "server/msg"
|
||
"server/pkg/github.com/name5566/leaf/log"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
var id = 1
|
||
|
||
// 中间件函数类型
|
||
type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc
|
||
|
||
var save_msg_type = []int{
|
||
msg.HANDLE_MOD_PLAYER_MSG,
|
||
msg.HANDLE_MDO_CHAMPSHIP_INRANK,
|
||
msg.HANDLE_MOD_USER_VAR_SET,
|
||
}
|
||
|
||
type MessageMgr struct {
|
||
*ServerMod
|
||
middlewares []MessageMiddleware
|
||
workerPool *WorkerPool
|
||
handler map[int]MessageHandlerFunc
|
||
}
|
||
|
||
type MessageData struct {
|
||
MessageList map[int64]*MessageList
|
||
PlayerList map[int64]int
|
||
mu sync.Mutex
|
||
}
|
||
|
||
type MessageList struct {
|
||
Messages []*msg.Msg
|
||
mu sync.Mutex
|
||
}
|
||
|
||
// Worker Pool 结构
|
||
type WorkerPool struct {
|
||
workers int
|
||
minWorkers int
|
||
maxWorkers int
|
||
taskQueue chan *MessageTask
|
||
wg sync.WaitGroup
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
maxQueue int
|
||
mu sync.Mutex
|
||
workerCancels []context.CancelFunc
|
||
monitorTick *time.Ticker
|
||
}
|
||
|
||
// 消息任务
|
||
type MessageTask struct {
|
||
Msg *msg.Msg
|
||
Handler MessageHandlerFunc
|
||
Result chan *TaskResult
|
||
id int
|
||
}
|
||
|
||
// 任务结果
|
||
type TaskResult struct {
|
||
Data interface{}
|
||
Error error
|
||
}
|
||
|
||
func (m *MessageMgr) MessageMgrInit() {
|
||
m.key = MESSAGE_MGR_KEY + fmt.Sprintf("_%d", conf.Server.ServerID)
|
||
m.data = &MessageData{
|
||
MessageList: make(map[int64]*MessageList),
|
||
PlayerList: make(map[int64]int),
|
||
}
|
||
// 注册所有可能在消息中使用的类型
|
||
gob.Register(&limitedTimeEvent.MoneyCat{})
|
||
gob.Register(&limitedTimeEvent.LuckyCat{})
|
||
gob.Register(&msg.HandbookMsg{})
|
||
gob.Register(&limitedTimeEvent.CatTrick{})
|
||
gob.Register(&VarOpration{})
|
||
gob.Register(&VarUserData{})
|
||
gob.Register(&ActivityInfo{})
|
||
gob.Register(&ChargeExtra{})
|
||
gob.Register(CatnipMsg{})
|
||
gob.Register(&CatnipLock{})
|
||
gob.Register(CRank{})
|
||
gob.Register(&proto.ResChampshipRank{})
|
||
gob.Register(&proto.ResChampshipPreRank{})
|
||
gob.Register(card.CardInfo{})
|
||
gob.Register(item.Item{})
|
||
gob.Register([]*item.Item{}) // 注册 []*item.Item 类型
|
||
gob.Register(friend.ReplyInfo{})
|
||
gob.Register(msg.VarData{})
|
||
gob.Register(GameResult{})
|
||
// 注册处理函数
|
||
m.init()
|
||
m.handler = make(map[int]MessageHandlerFunc)
|
||
m.middlewares = []MessageMiddleware{}
|
||
// 初始化 Worker Pool (10个worker, 1000个队列大小)
|
||
m.workerPool = NewWorkerPool(50, 10000)
|
||
// 注册默认中间件
|
||
m.Use(RecoveryMiddleware())
|
||
m.Use(LoggingMiddleware())
|
||
m.Use(TimeoutMiddleware(5 * time.Second))
|
||
m.NodeRegister()
|
||
m.CenterRegister()
|
||
}
|
||
|
||
// 注册处理器
|
||
func (s *MessageMgr) RegisterHandler(HandlerType int, fun MessageHandlerFunc) {
|
||
s.handler[HandlerType] = fun
|
||
}
|
||
|
||
func (m *MessageMgr) NodeRegister() {
|
||
if conf.Server.ServerType == "node" {
|
||
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(PlayerMsgHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_REPLY_PLAYER_MSG, MessageHandlerFunc(PlayerReplyMsgHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_CLUSTER_SYNC, MessageHandlerFunc(ClusterSyncHandler))
|
||
}
|
||
}
|
||
|
||
func (m *MessageMgr) CenterRegister() {
|
||
if conf.Server.ServerType == "center" {
|
||
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_LOGIN, MessageHandlerFunc(PlayerLoginHandler))
|
||
m.RegisterHandler(msg.HANDLE_MDO_PLAYER_LOGOUT, MessageHandlerFunc(PlayerLogoutHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(CenterPlayerMsgHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_COMSUME_MSG, MessageHandlerFunc(ComsumerMsgHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_VAR_SET, MessageHandlerFunc(SetVarDataHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_VAR_GET, MessageHandlerFunc(GetVarDataHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_SET, MessageHandlerFunc(SetUserVarDataHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_USER_VAR_GET, MessageHandlerFunc(GetUserVarDataHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_CATNIP_PARTNER, MessageHandlerFunc(CatnipPartnerHandler))
|
||
m.RegisterHandler(msg.HANDLE_MDO_CHAMPSHIP_INRANK, MessageHandlerFunc(ChampshipInRankHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_INFO, MessageHandlerFunc(ChampshipRankInfoHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_RANK_LIST, MessageHandlerFunc(ChampshipRankListHandler))
|
||
m.RegisterHandler(msg.HANDLE_MOD_CHAMPSHIP_PRE_RANK, MessageHandlerFunc(ChampshipRankPreHandler))
|
||
}
|
||
}
|
||
|
||
func getMessageData() *MessageData {
|
||
return G_GameLogicPtr.MessageMgr.data.(*MessageData)
|
||
}
|
||
|
||
// ----------------------------------- 处理函数实现 ---------------------------
|
||
func ChampshipRankPreHandler(data *msg.Msg) (interface{}, error) {
|
||
PlayerId := data.From
|
||
PreRankMsg := G_GameLogicPtr.ChampshipMgr.GetPreRankMsg(PlayerId)
|
||
ReplyPlayerMsgASync(data, PreRankMsg)
|
||
return nil, nil
|
||
}
|
||
|
||
func ChampshipRankListHandler(data *msg.Msg) (interface{}, error) {
|
||
PlayerId := data.From
|
||
RankMsg := G_GameLogicPtr.ChampshipMgr.GetRankMsg(PlayerId)
|
||
ReplyPlayerMsgASync(data, RankMsg)
|
||
return nil, nil
|
||
}
|
||
|
||
func ChampshipRankInfoHandler(data *msg.Msg) (interface{}, error) {
|
||
PlayerId := data.From
|
||
MyRank := G_GameLogicPtr.ChampshipMgr.getMyRank(PlayerId)
|
||
MyPreRank := G_GameLogicPtr.ChampshipMgr.getLastMyRank(PlayerId)
|
||
ReplyPlayerMsgASync(data, []int{MyRank, MyPreRank})
|
||
return nil, nil
|
||
}
|
||
|
||
func NotifyAllPlayerMsg(m *msg.Msg) {
|
||
messageMgrData := getMessageData()
|
||
// 先复制 PlayerList,避免长时间持有锁
|
||
messageMgrData.mu.Lock()
|
||
playerListCopy := make(map[int64]int, len(messageMgrData.PlayerList))
|
||
for k, v := range messageMgrData.PlayerList {
|
||
playerListCopy[k] = v
|
||
}
|
||
messageMgrData.mu.Unlock()
|
||
|
||
// 在锁外发送消息
|
||
for PlayerId, node := range playerListCopy {
|
||
m.To = int(PlayerId)
|
||
SendMsgToNodeAsync(m, node)
|
||
}
|
||
}
|
||
|
||
func ChampshipInRankHandler(data *msg.Msg) (interface{}, error) {
|
||
G_GameLogicPtr.ChampshipMgr.inRank(data)
|
||
return nil, nil
|
||
}
|
||
|
||
func CatnipPartnerHandler(data *msg.Msg) (interface{}, error) {
|
||
m, ok := data.Extra.(*CatnipPartner)
|
||
if !ok {
|
||
return nil, fmt.Errorf("invalid catnip partner data")
|
||
}
|
||
return G_GameLogicPtr.VarMgr.HandleCatnipPartner(m.Uid, m.Partner, m.GameId, m.EndTime)
|
||
}
|
||
|
||
func ReplyPlayerMsgASync(m *msg.Msg, reply interface{}) (interface{}, error) {
|
||
clone := m.Reply(reply)
|
||
messageMgrData := getMessageData()
|
||
messageMgrData.mu.Lock()
|
||
node, ok := messageMgrData.PlayerList[int64(m.From)]
|
||
messageMgrData.mu.Unlock()
|
||
|
||
if ok {
|
||
SendMsgToNodeAsync(clone, node)
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
// 节点连接时,同步消息
|
||
func ClusterSyncHandler(data *msg.Msg) (interface{}, error) {
|
||
// 遍历所有玩家,发送登录消息
|
||
G_GameLogicPtr.M_Players.Range(func(k, v interface{}) bool {
|
||
SendMsgToCenterAsync(&msg.Msg{
|
||
From: int(v.(*Player).M_DwUin),
|
||
HandleType: msg.HANDLE_MOD_PLAYER_LOGIN,
|
||
Extra: conf.Server.ServerID,
|
||
})
|
||
return true
|
||
})
|
||
// 发送暂存区消息(先复制再释放锁,避免长时间持有锁)
|
||
messageMgrData := getMessageData()
|
||
messageMgrData.mu.Lock()
|
||
TempMessageList := messageMgrData.MessageList
|
||
messageMgrData.MessageList = make(map[int64]*MessageList)
|
||
messageMgrData.mu.Unlock() // 立即释放锁,在锁外发送消息
|
||
|
||
log.Debug("[Middleware] Cluster sync send temp message len: %d", len(TempMessageList))
|
||
for _, Message := range TempMessageList {
|
||
for _, msgItem := range Message.Messages {
|
||
SendMsgToCenterAsync(msgItem)
|
||
}
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
func PlayerLoginHandler(data *msg.Msg) (interface{}, error) {
|
||
// 关闭 Worker Pool
|
||
node := data.Extra.(int)
|
||
messageMgrData := getMessageData()
|
||
// 先更新 PlayerList(需要加锁)
|
||
messageMgrData.mu.Lock()
|
||
messageMgrData.PlayerList[int64(data.From)] = node
|
||
messageMgrData.mu.Unlock()
|
||
|
||
log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int))
|
||
// 对玩家消息列表加锁
|
||
messages := getMessge(int64(data.From))
|
||
messages.mu.Lock()
|
||
// 复制消息列表,避免在锁内发送消息
|
||
messagesToSend := make([]*msg.Msg, len(messages.Messages))
|
||
copy(messagesToSend, messages.Messages)
|
||
messages.mu.Unlock()
|
||
|
||
// 在锁外发送离线消息
|
||
for _, message := range messagesToSend {
|
||
SendMsgToNodeAsync(message, node)
|
||
}
|
||
log.Debug("[Middleware] Player sync logout message player id: %v, len: %d", data.From, len(messagesToSend))
|
||
ReplyPlayerMsgASync(data, nil)
|
||
return nil, nil
|
||
}
|
||
|
||
func PlayerLogoutHandler(data *msg.Msg) (interface{}, error) {
|
||
messageMgrData := getMessageData()
|
||
messageMgrData.mu.Lock()
|
||
delete(messageMgrData.PlayerList, int64(data.From))
|
||
messageMgrData.mu.Unlock()
|
||
log.Debug("[Middleware] Player logout success player id: %v", data.From)
|
||
return nil, nil
|
||
}
|
||
|
||
func ComsumerMsgHandler(data *msg.Msg) (interface{}, error) {
|
||
messages := getMessge(int64(data.From))
|
||
messages.mu.Lock()
|
||
defer messages.mu.Unlock()
|
||
for i, msgItem := range messages.Messages {
|
||
if msgItem.UniKey == data.UniKey {
|
||
// 删除消息
|
||
messages.Messages = append(messages.Messages[:i], messages.Messages[i+1:]...)
|
||
log.Debug("[Middleware] Comsume message success type: %d, player id: %v", msgItem.Type, msgItem.From)
|
||
break
|
||
}
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
func CenterPlayerMsgHandler(data *msg.Msg) (interface{}, error) {
|
||
PlayerId := int64(data.To)
|
||
messageMgrData := getMessageData()
|
||
// 遍历消息列表,发送消息给在线玩家
|
||
messages := getMessge(PlayerId)
|
||
messages.mu.Lock()
|
||
messages.Messages = append(messages.Messages, data)
|
||
messages.mu.Unlock()
|
||
|
||
// 检查玩家是否在线(需要加锁)
|
||
messageMgrData.mu.Lock()
|
||
node, ok := messageMgrData.PlayerList[int64(PlayerId)]
|
||
messageMgrData.mu.Unlock()
|
||
|
||
if ok {
|
||
SendMsgToNodeAsync(data, node)
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
func PlayerMsgHandler(data *msg.Msg) (interface{}, error) {
|
||
p := G_GameLogicPtr.GetPlayer(int64(data.To))
|
||
// 不在线 不处理
|
||
if p == nil || p.stop {
|
||
return nil, nil
|
||
}
|
||
p.Send(data.Clone())
|
||
// 处理完后发送消费消息
|
||
if data.HandleType == msg.HANDLE_MOD_PLAYER_MSG {
|
||
data.HandleType = msg.HANDLE_MOD_COMSUME_MSG
|
||
SendMsgToCenterAsync(data)
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
func PlayerReplyMsgHandler(data *msg.Msg) (interface{}, error) {
|
||
// 先处理同步回调
|
||
if data.UniKey != "" {
|
||
mergeCluster.GetCallbackChanMu().RLock()
|
||
chanel, ok := mergeCluster.CallbackChan[data.UniKey]
|
||
mergeCluster.GetCallbackChanMu().RUnlock()
|
||
|
||
if ok {
|
||
// log.Debug("reply message ")
|
||
chanel <- data
|
||
}
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
// 添加中间件
|
||
func (m *MessageMgr) Use(middleware MessageMiddleware) {
|
||
m.middlewares = append(m.middlewares, middleware)
|
||
}
|
||
|
||
// 应用所有中间件到处理函数
|
||
func (m *MessageMgr) applyMiddlewares(handler MessageHandlerFunc) MessageHandlerFunc {
|
||
// 从后往前应用中间件
|
||
for i := len(m.middlewares) - 1; i >= 0; i-- {
|
||
handler = m.middlewares[i](handler)
|
||
}
|
||
return handler
|
||
}
|
||
|
||
type MessageHandlerFunc func(message *msg.Msg) (interface{}, error)
|
||
|
||
func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFunc) {
|
||
m.RegisterHandler(hType, handler)
|
||
}
|
||
|
||
func (m *MessageMgr) Handle(msg *msg.Msg) (interface{}, error) {
|
||
if fun, ok := m.handler[msg.Type]; ok {
|
||
return fun(msg)
|
||
}
|
||
log.Error("server mod key:%s handle not exist handle type:%d", m.key, msg.Type)
|
||
return nil, fmt.Errorf("server mod handler err")
|
||
}
|
||
|
||
// 异步处理消息 (多线程版本)
|
||
func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error {
|
||
if message.End != 0 && message.End < GoUtil.Now() {
|
||
log.Debug("message had expired type:%d,to:%d", message.Type, message.To)
|
||
return nil
|
||
}
|
||
if fun, ok := m.handler[message.HandleType]; ok {
|
||
// 应用中间件
|
||
handlerWithMiddleware := m.applyMiddlewares(fun)
|
||
id++
|
||
// 创建任务
|
||
task := &MessageTask{
|
||
id: id,
|
||
Msg: message,
|
||
Handler: handlerWithMiddleware,
|
||
Result: make(chan *TaskResult, 1),
|
||
}
|
||
|
||
// 提交到 Worker Pool
|
||
if err := m.workerPool.Submit(task); err != nil {
|
||
log.Error("Failed to submit message task: %v", err)
|
||
return err
|
||
}
|
||
|
||
// 可以选择等待结果或直接返回
|
||
go func() {
|
||
result := <-task.Result
|
||
if result.Error != nil {
|
||
log.Error("Message handle error: %v", result.Error)
|
||
}
|
||
}()
|
||
|
||
return nil
|
||
}
|
||
log.Error("server mod key:%s handle not exist handle type:%d", m.key, message.Type)
|
||
return fmt.Errorf("server mod handler err")
|
||
}
|
||
|
||
// 兼容旧版本的函数
|
||
func MessageHandle(m *msg.Msg) error {
|
||
// log.Debug("RecvMessage m %v", m)
|
||
// 这里可以调用 MessageMgr 的处理方法
|
||
G_GameLogicPtr.MessageMgr.MessageHandleAsync(m)
|
||
return nil
|
||
}
|
||
|
||
// ==================== Worker Pool 实现 ====================
|
||
|
||
// 创建 Worker Pool
|
||
func NewWorkerPool(workers, maxQueue int) *WorkerPool {
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
if workers <= 0 {
|
||
workers = 1
|
||
}
|
||
pool := &WorkerPool{
|
||
workers: 0,
|
||
minWorkers: workers,
|
||
maxWorkers: workers * 40,
|
||
taskQueue: make(chan *MessageTask, maxQueue),
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
maxQueue: maxQueue,
|
||
}
|
||
pool.start()
|
||
// 启动监控协程,负责动态扩缩容
|
||
pool.monitorTick = time.NewTicker(3000 * time.Millisecond)
|
||
go pool.monitor()
|
||
return pool
|
||
}
|
||
|
||
// 启动 Worker Pool
|
||
func (p *WorkerPool) start() {
|
||
// 启动最小数量的 worker
|
||
for i := 0; i < p.minWorkers; i++ {
|
||
p.spawnWorker()
|
||
}
|
||
}
|
||
|
||
// spawnWorker 启动一个 worker,使用独立的 cancelable context
|
||
func (p *WorkerPool) spawnWorker() {
|
||
p.mu.Lock()
|
||
defer p.mu.Unlock()
|
||
childCtx, childCancel := context.WithCancel(p.ctx)
|
||
p.workerCancels = append(p.workerCancels, childCancel)
|
||
id := len(p.workerCancels)
|
||
p.wg.Add(1)
|
||
p.workers++
|
||
go p.worker(childCtx, id)
|
||
}
|
||
|
||
// Worker 工作函数,监听其子上下文以便单独停止
|
||
func (p *WorkerPool) worker(ctx context.Context, id int) {
|
||
defer p.wg.Done()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
log.Debug("Worker %d stopped", id)
|
||
return
|
||
case <-p.ctx.Done():
|
||
log.Debug("Worker %d pool closed", id)
|
||
return
|
||
case task, ok := <-p.taskQueue:
|
||
if !ok {
|
||
log.Debug("Worker %d: task queue closed", id)
|
||
return
|
||
}
|
||
// 执行任务
|
||
result, err := task.Handler(task.Msg)
|
||
// 发送结果
|
||
task.Result <- &TaskResult{
|
||
Data: result,
|
||
Error: err,
|
||
}
|
||
close(task.Result)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 提交任务
|
||
func (p *WorkerPool) Submit(task *MessageTask) error {
|
||
// 当队列已满时,等待并重试入队,直到成功或 pool 关闭
|
||
for {
|
||
select {
|
||
case <-p.ctx.Done():
|
||
return fmt.Errorf("worker pool is closed")
|
||
case p.taskQueue <- task:
|
||
// 如果队列接近满时,尝试扩容少量 worker
|
||
queued := len(p.taskQueue)
|
||
capQ := cap(p.taskQueue)
|
||
if capQ > 0 && queued > capQ*3/4 {
|
||
go func() {
|
||
p.mu.Lock()
|
||
deficit := p.maxWorkers - p.workers
|
||
p.mu.Unlock()
|
||
if deficit > 0 {
|
||
// 最多扩容 2 个以避免剧烈波动
|
||
add := 2
|
||
if deficit < add {
|
||
add = deficit
|
||
}
|
||
p.ScaleUp(add)
|
||
}
|
||
}()
|
||
}
|
||
return nil
|
||
default:
|
||
// 队列已满,短暂等待后重试
|
||
select {
|
||
case <-p.ctx.Done():
|
||
return fmt.Errorf("worker pool is closed")
|
||
case <-time.After(50 * time.Millisecond):
|
||
// 重试
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 关闭 Worker Pool
|
||
func (p *WorkerPool) Shutdown() {
|
||
log.Debug("Shutting down worker pool...")
|
||
// 停止监控
|
||
if p.monitorTick != nil {
|
||
p.monitorTick.Stop()
|
||
}
|
||
// 取消根 context,会让所有子 worker 退出
|
||
p.cancel()
|
||
// 关闭任务通道,释放阻塞的接收者
|
||
close(p.taskQueue)
|
||
// 等待所有 worker 退出
|
||
p.wg.Wait()
|
||
log.Debug("Worker pool shut down complete")
|
||
}
|
||
|
||
// ScaleUp 按数量增加 worker
|
||
func (p *WorkerPool) ScaleUp(n int) {
|
||
if n <= 0 {
|
||
return
|
||
}
|
||
p.mu.Lock()
|
||
can := p.maxWorkers - p.workers
|
||
if can <= 0 {
|
||
p.mu.Unlock()
|
||
return
|
||
}
|
||
if n > can {
|
||
n = can
|
||
}
|
||
p.mu.Unlock()
|
||
|
||
for i := 0; i < n; i++ {
|
||
p.spawnWorker()
|
||
}
|
||
}
|
||
|
||
// ScaleDown 按数量减少 worker
|
||
func (p *WorkerPool) ScaleDown(n int) {
|
||
if n <= 0 {
|
||
return
|
||
}
|
||
p.mu.Lock()
|
||
for i := 0; i < n && len(p.workerCancels) > 0 && p.workers > p.minWorkers; i++ {
|
||
last := len(p.workerCancels) - 1
|
||
cancel := p.workerCancels[last]
|
||
// 从 slice 中移除
|
||
p.workerCancels = p.workerCancels[:last]
|
||
p.workers--
|
||
// 调用 cancel 让对应 worker 退出
|
||
cancel()
|
||
}
|
||
p.mu.Unlock()
|
||
}
|
||
|
||
// monitor 周期性检查队列长度并做扩缩容
|
||
func (p *WorkerPool) monitor() {
|
||
for {
|
||
select {
|
||
case <-p.ctx.Done():
|
||
return
|
||
case <-p.monitorTick.C:
|
||
queued := len(p.taskQueue)
|
||
capQ := cap(p.taskQueue)
|
||
if capQ == 0 {
|
||
continue
|
||
}
|
||
// 扩容条件
|
||
if queued > capQ*3/4 {
|
||
p.mu.Lock()
|
||
deficit := p.maxWorkers - p.workers
|
||
p.mu.Unlock()
|
||
if deficit > 0 {
|
||
add := 1
|
||
if deficit >= 2 {
|
||
add = 2
|
||
}
|
||
p.ScaleUp(add)
|
||
}
|
||
}
|
||
// 缩容条件
|
||
if queued < capQ/4 {
|
||
p.mu.Lock()
|
||
extra := p.workers - p.minWorkers
|
||
p.mu.Unlock()
|
||
if extra > 0 {
|
||
// 每次缩一个,避免抖动
|
||
p.ScaleDown(1)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// ==================== 中间件实现 ====================
|
||
|
||
// 日志中间件
|
||
func LoggingMiddleware() MessageMiddleware {
|
||
return func(next MessageHandlerFunc) MessageHandlerFunc {
|
||
return func(message *msg.Msg) (interface{}, error) {
|
||
start := time.Now()
|
||
// log.Debug("[Middleware] Processing message : %v, time: %v", message, start)
|
||
result, err := next(message)
|
||
|
||
duration := time.Since(start)
|
||
if err != nil {
|
||
log.Error("[Middleware] Message handle type: %d; type: %d failed, duration: %v, error: %v", message.HandleType, message.Type, duration, err)
|
||
} else {
|
||
log.Debug("[Middleware] Message handle type: %d; type: %d success, duration: %v", message.HandleType, message.Type, duration)
|
||
}
|
||
|
||
return result, err
|
||
}
|
||
}
|
||
}
|
||
|
||
// 恢复 Panic 中间件
|
||
func RecoveryMiddleware() MessageMiddleware {
|
||
return func(next MessageHandlerFunc) MessageHandlerFunc {
|
||
return func(message *msg.Msg) (result interface{}, err error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Error("[Middleware] Panic recovered: %v\nStack: %s", r, debug.Stack())
|
||
err = fmt.Errorf("panic recovered: %v", r)
|
||
}
|
||
}()
|
||
return next(message)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 超时中间件
|
||
func TimeoutMiddleware(timeout time.Duration) MessageMiddleware {
|
||
return func(next MessageHandlerFunc) MessageHandlerFunc {
|
||
return func(message *msg.Msg) (interface{}, error) {
|
||
resultChan := make(chan *TaskResult, 1)
|
||
|
||
go func() {
|
||
result, err := next(message)
|
||
resultChan <- &TaskResult{Data: result, Error: err}
|
||
}()
|
||
|
||
select {
|
||
case result := <-resultChan:
|
||
return result.Data, result.Error
|
||
case <-time.After(timeout):
|
||
log.Error("[Middleware] Message : %v timeout after %v", message, timeout)
|
||
GoUtil.SendFeishuFatal(0, "message_mgr", fmt.Sprintf("Message Handler Timeout\nMessage: %v\nTimeout: %v", message, timeout))
|
||
return nil, fmt.Errorf("message handler timeout")
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 重试中间件
|
||
func RetryMiddleware(maxRetries int) MessageMiddleware {
|
||
return func(next MessageHandlerFunc) MessageHandlerFunc {
|
||
return func(message *msg.Msg) (interface{}, error) {
|
||
var result interface{}
|
||
var err error
|
||
|
||
for i := 0; i <= maxRetries; i++ {
|
||
result, err = next(message)
|
||
if err == nil {
|
||
return result, nil
|
||
}
|
||
|
||
if i < maxRetries {
|
||
log.Debug("[Middleware] Retry %d/%d for message : %v, error: %v", i+1, maxRetries, message, err)
|
||
time.Sleep(time.Millisecond * 100 * time.Duration(i+1))
|
||
}
|
||
}
|
||
|
||
return result, fmt.Errorf("failed after %d retries: %w", maxRetries, err)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 验证中间件
|
||
func ValidationMiddleware() MessageMiddleware {
|
||
return func(next MessageHandlerFunc) MessageHandlerFunc {
|
||
return func(message *msg.Msg) (interface{}, error) {
|
||
// 添加消息验证逻辑
|
||
if message == nil {
|
||
return nil, fmt.Errorf("message is nil")
|
||
}
|
||
if message.Type <= 0 {
|
||
return nil, fmt.Errorf("invalid message type: %d", message.Type)
|
||
}
|
||
|
||
return next(message)
|
||
}
|
||
}
|
||
}
|
||
|
||
// ----------------------------------- 发送消息相关函数 ---------------------------
|
||
func SendMsgToCenterAsync(m *msg.Msg) {
|
||
go sendMessageAsync(m, conf.Server.CenterNode)
|
||
}
|
||
|
||
func SendMsgToCenterSync(m *msg.Msg) (*msg.Msg, error) {
|
||
return sendMessageSync(m, conf.Server.CenterNode)
|
||
}
|
||
|
||
func SendMsgToNodeAsync(m *msg.Msg, node int) {
|
||
go sendMessageAsync(m, node)
|
||
}
|
||
|
||
func SendMsgToNodeSync(m *msg.Msg, node int) (*msg.Msg, error) {
|
||
return sendMessageSync(m, node)
|
||
}
|
||
|
||
func SendPlayerMsgAsync(m *msg.Msg) error {
|
||
if m.SendT == 0 {
|
||
m.SendT = GoUtil.Now()
|
||
}
|
||
clone := m.Clone()
|
||
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
|
||
SendMsgToCenterAsync(clone)
|
||
return nil
|
||
}
|
||
|
||
func SendPlayerMsgSync(m *msg.Msg) (interface{}, error) {
|
||
clone := m.Clone()
|
||
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
|
||
return SendMsgToCenterSync(clone)
|
||
}
|
||
|
||
func FriendMgrSend(m1 *msg.Msg) error {
|
||
SendPlayerMsgAsync(m1)
|
||
return nil
|
||
}
|
||
|
||
// 异步发送消息到指定节点 节点不在线则保存消息
|
||
func sendMessageAsync(m *msg.Msg, node int) error {
|
||
log.Debug("[Middleware] Send Async message to node: %d, message: %v", node, m)
|
||
err := mergeCluster.SendServerMsg(m, node)
|
||
if err != nil && GoUtil.InArray(m.HandleType, save_msg_type) {
|
||
saveMessage(m)
|
||
return err
|
||
}
|
||
deleteMessage(m)
|
||
return nil
|
||
}
|
||
|
||
// 同步消息到指定节点 节点不在线则保存消息
|
||
func sendMessageSync(m *msg.Msg, node int) (*msg.Msg, error) {
|
||
log.Debug("[Middleware] Send Sync message to node: %d, message: %v", node, m)
|
||
msg, err := mergeCluster.CallServerMsg(m, node)
|
||
if err != nil && conf.Server.ServerType == "center" && GoUtil.InArray(m.HandleType, save_msg_type) {
|
||
saveMessage(m)
|
||
return nil, err
|
||
}
|
||
deleteMessage(m)
|
||
return msg, nil
|
||
}
|
||
|
||
// 保存消息到本地
|
||
func saveMessage(m *msg.Msg) error {
|
||
data := getMessageData()
|
||
data.mu.Lock()
|
||
defer data.mu.Unlock()
|
||
// 使用不加锁的内部方法,避免死锁
|
||
messages := getMessgeUnsafe(int64(m.To))
|
||
messages.mu.Lock()
|
||
defer messages.mu.Unlock()
|
||
messages.Messages = append(messages.Messages, m)
|
||
return nil
|
||
}
|
||
|
||
func GetUserData(PlayerId int64, Key string) (*msg.Msg, error) {
|
||
return SendMsgToCenterSync(&msg.Msg{
|
||
From: int(PlayerId),
|
||
HandleType: msg.HANDLE_MOD_USER_VAR_GET,
|
||
Extra: msg.VarData{Key: Key},
|
||
})
|
||
}
|
||
|
||
// getMessgeUnsafe 获取消息列表(不加锁,调用者需要持有锁)
|
||
func getMessgeUnsafe(PlayerId int64) *MessageList {
|
||
messageMgrData := getMessageData()
|
||
if _, ok := messageMgrData.MessageList[int64(PlayerId)]; !ok {
|
||
messageMgrData.MessageList[int64(PlayerId)] = &MessageList{
|
||
Messages: []*msg.Msg{},
|
||
}
|
||
}
|
||
return messageMgrData.MessageList[int64(PlayerId)]
|
||
}
|
||
|
||
// getMessge 获取消息列表(加锁版本)
|
||
func getMessge(PlayerId int64) *MessageList {
|
||
messageMgrData := getMessageData()
|
||
messageMgrData.mu.Lock()
|
||
defer messageMgrData.mu.Unlock()
|
||
return getMessgeUnsafe(PlayerId)
|
||
}
|
||
|
||
func deleteMessage(m *msg.Msg) error {
|
||
if m == nil {
|
||
return nil
|
||
}
|
||
messages := getMessge(int64(m.To))
|
||
messages.mu.Lock()
|
||
defer messages.mu.Unlock()
|
||
|
||
// 使用更安全的方式删除元素:找到索引后再删除,避免在range中修改切片
|
||
foundIndex := -1
|
||
for i, msgItem := range messages.Messages {
|
||
if msgItem == nil {
|
||
continue
|
||
}
|
||
if msgItem.UniKey == m.UniKey {
|
||
foundIndex = i
|
||
log.Debug("[Middleware] send message success; message: %v, player id: %v", msgItem, msgItem.From)
|
||
break
|
||
}
|
||
}
|
||
|
||
if foundIndex >= 0 {
|
||
// 删除消息:将后面的元素前移,避免内存泄漏
|
||
copy(messages.Messages[foundIndex:], messages.Messages[foundIndex+1:])
|
||
messages.Messages[len(messages.Messages)-1] = nil // 清除最后一个元素的引用
|
||
messages.Messages = messages.Messages[:len(messages.Messages)-1]
|
||
}
|
||
return nil
|
||
}
|