diff --git a/src/server/cluster/cluster_func.go b/src/server/cluster/cluster_func.go index 27eb476f..751c0443 100644 --- a/src/server/cluster/cluster_func.go +++ b/src/server/cluster/cluster_func.go @@ -48,6 +48,12 @@ func HandShakeRecv(a *Agent, m *msg.Msg) error { RemoteAddr: m.Extra.(string), }, }) + } else { + syncMsg := &msg.Msg{ + Type: msg.CLUSTER_FRIEND_SYNC, + To: ServerId, + } + sendGameMsg(syncMsg) } return nil } diff --git a/src/server/game/GameLogic.go b/src/server/game/GameLogic.go index a46b8b02..7b650c7a 100644 --- a/src/server/game/GameLogic.go +++ b/src/server/game/GameLogic.go @@ -85,6 +85,7 @@ type GameLogic struct { VarMgr *VarMgr // 变量管理器 BanMgr *BanMgr // 封号管理器 StartTime int64 // 服务器启动时间 + MessageMgr *MessageMgr // 消息管理器 } type ServerInfo struct { @@ -361,6 +362,13 @@ func (ad *GameLogic) CreateMailMgr() { ad.MailMgr.Init() } +func (ad *GameLogic) CreateMessageMgr() { + ad.MessageMgr = &MessageMgr{ + ServerMod: new(ServerMod), + } + ad.MessageMgr.MessageMgrInit() +} + func (ad *GameLogic) MailMgrSend(m *MsgMod.Msg) { ad.MailMgr.Send(m) } @@ -552,6 +560,7 @@ func G_getGameLogic() *GameLogic { G_GameLogicPtr.CreateChampshipMgr() // 创建竞标赛管理器 G_GameLogicPtr.CreateVarMgr() // 创建变量管理器 G_GameLogicPtr.CreateBanMgr() // 创建封号管理器 + G_GameLogicPtr.CreateMessageMgr() // 创建消息管理器 ClusterMgrInit() //初始化集群 G_GameLogicPtr.StartTime = time.Now().Unix() // G_GameLogicPtr.CreateHttpManager() diff --git a/src/server/game/message_mgr.go b/src/server/game/message_mgr.go index 350a6d37..2fc70250 100644 --- a/src/server/game/message_mgr.go +++ b/src/server/game/message_mgr.go @@ -20,11 +20,18 @@ type MessageMgr struct { *ServerMod middlewares []MessageMiddleware workerPool *WorkerPool + handler map[int]MessageHandlerFunc } type MessageData struct { - MessageList map[int64][]*msg.Msg + MessageList map[int64]*MessageList PlayerList map[int64]int + mu sync.Mutex +} + +type MessageList struct { + Messages []*msg.Msg + mu sync.Mutex } // Worker Pool 结构 @@ -52,7 +59,13 @@ type TaskResult struct { func (m *MessageMgr) MessageMgrInit() { m.key = MESSAGE_MGR_KEY - m.data = &MessageData{} + m.data = &MessageData{ + MessageList: make(map[int64]*MessageList), + PlayerList: make(map[int64]int), + } + // 注册处理函数 + m.init() + m.handler = make(map[int]MessageHandlerFunc) m.middlewares = []MessageMiddleware{} // 初始化 Worker Pool (10个worker, 1000个队列大小) m.workerPool = NewWorkerPool(10, 1000) @@ -60,8 +73,61 @@ func (m *MessageMgr) MessageMgrInit() { m.Use(RecoveryMiddleware()) m.Use(LoggingMiddleware()) m.Use(TimeoutMiddleware(5 * time.Second)) - // 注册处理函数 - m.init() + if conf.Server.ServerType == "center" { + m.RegisterHandler(msg.HANDLE_MOD_PLAYER_LOGIN, MessageHandlerFunc(PlayerLogin)) + m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(SendToPlayer)) + } +} + +// 注册处理器 +func (s *MessageMgr) RegisterHandler(HandlerType int, fun MessageHandlerFunc) { + s.handler[HandlerType] = fun +} + +func getData() *MessageData { + return G_GameLogicPtr.MessageMgr.data.(*MessageData) +} + +func PlayerLogin(data *msg.Msg) (interface{}, error) { + // 关闭 Worker Pool + messageMgrData := getData() + messageMgrData.mu.Lock() + defer messageMgrData.mu.Unlock() + messageMgrData.PlayerList[int64(data.From)] = data.Extra.(int) + if _, ok := messageMgrData.MessageList[int64(data.From)]; !ok { + messageMgrData.MessageList[int64(data.From)] = &MessageList{ + Messages: []*msg.Msg{}, + } + } + messageMgrData.PlayerList[int64(data.From)] = data.Extra.(int) + return nil, nil +} + +func SendToPlayer(data *msg.Msg) (interface{}, error) { + PlayerId := int64(data.To) + messageMgrData := getData() + // 遍历消息列表,发送消息给在线玩家 + messages, ok := messageMgrData.MessageList[int64(PlayerId)] + if !ok { + messageMgrData.mu.Lock() + messages = &MessageList{ + Messages: []*msg.Msg{}, + } + messageMgrData.MessageList[int64(PlayerId)] = messages + messageMgrData.mu.Unlock() + } + messages.mu.Lock() + defer messages.mu.Unlock() + messages.Messages = append(messages.Messages, data) + if node, ok := messageMgrData.PlayerList[int64(PlayerId)]; ok { + for _, message := range messages.Messages { + err := SendMsgToNode(message, node) + if err != nil { + log.Error("Failed to send message to player %d: %v", PlayerId, err) + } + } + } + return nil, nil } // 添加中间件 @@ -86,7 +152,7 @@ func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFun func (m *MessageMgr) Handle(msg *msg.Msg) (interface{}, error) { if fun, ok := m.handler[msg.Type]; ok { - return fun.(MessageHandlerFunc)(msg) + return fun(msg) } log.Error("server mod key:%s handle not exist handle type:%d", m.key, msg.Type) return nil, fmt.Errorf("server mod handler err") @@ -107,9 +173,9 @@ func SendMessage(m1 *msg.Msg) error { // 异步处理消息 (多线程版本) func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error { - if fun, ok := m.handler[message.Type]; ok { + if fun, ok := m.handler[message.HandleType]; ok { // 应用中间件 - handlerWithMiddleware := m.applyMiddlewares(fun.(MessageHandlerFunc)) + handlerWithMiddleware := m.applyMiddlewares(fun) // 创建任务 task := &MessageTask{ @@ -142,7 +208,7 @@ func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error { func MessageHandle(m *msg.Msg) error { log.Debug("RecvMessage m %v", m) // 这里可以调用 MessageMgr 的处理方法 - // G_GameLogicPtr.MessageMgr.MessageHandleAsync(m) + G_GameLogicPtr.MessageMgr.MessageHandleAsync(m) return nil } @@ -334,13 +400,13 @@ func SendMsgToNode(m *msg.Msg, node int) error { func SendPlayerMsg(m *msg.Msg) error { clone := m.Clone() clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m)) - clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG + clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG return mergeCluster.SendServerMsg(m, conf.Server.CenterNode) } func CallPlayerMsg(m *msg.Msg) (interface{}, error) { clone := m.Clone() clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m)) - clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG + clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG return mergeCluster.CallServerMsg(m, conf.Server.CenterNode) } diff --git a/src/server/game/mod/msg/Msg.go b/src/server/game/mod/msg/Msg.go index 8e1e4ddb..ae3bb7fa 100644 --- a/src/server/game/mod/msg/Msg.go +++ b/src/server/game/mod/msg/Msg.go @@ -20,7 +20,9 @@ var MSG_ZERO_UPDATE = &Msg{Type: SERVER_ZERO_UPDATE} var MSG_NOON_UPDATE = &Msg{Type: SERVER_NOON_UPDATE} const ( - HANDLE_TYPE_PLAYER_MSG = 20001 // 玩家消息 + HANDLE_MOD_PLAYER_MSG = 20001 // 玩家消息 + HANDLE_MOD_CLUSTER_MSG = 20002 // 集群消息 + HANDLE_MOD_PLAYER_LOGIN = 20003 // 玩家登录消息 ) const (