This commit is contained in:
hahwu 2025-12-18 15:59:35 +08:00
parent 17ac92f5a1
commit adaf454d36
4 changed files with 94 additions and 11 deletions

View File

@ -48,6 +48,12 @@ func HandShakeRecv(a *Agent, m *msg.Msg) error {
RemoteAddr: m.Extra.(string),
},
})
} else {
syncMsg := &msg.Msg{
Type: msg.CLUSTER_FRIEND_SYNC,
To: ServerId,
}
sendGameMsg(syncMsg)
}
return nil
}

View File

@ -85,6 +85,7 @@ type GameLogic struct {
VarMgr *VarMgr // 变量管理器
BanMgr *BanMgr // 封号管理器
StartTime int64 // 服务器启动时间
MessageMgr *MessageMgr // 消息管理器
}
type ServerInfo struct {
@ -361,6 +362,13 @@ func (ad *GameLogic) CreateMailMgr() {
ad.MailMgr.Init()
}
func (ad *GameLogic) CreateMessageMgr() {
ad.MessageMgr = &MessageMgr{
ServerMod: new(ServerMod),
}
ad.MessageMgr.MessageMgrInit()
}
func (ad *GameLogic) MailMgrSend(m *MsgMod.Msg) {
ad.MailMgr.Send(m)
}
@ -552,6 +560,7 @@ func G_getGameLogic() *GameLogic {
G_GameLogicPtr.CreateChampshipMgr() // 创建竞标赛管理器
G_GameLogicPtr.CreateVarMgr() // 创建变量管理器
G_GameLogicPtr.CreateBanMgr() // 创建封号管理器
G_GameLogicPtr.CreateMessageMgr() // 创建消息管理器
ClusterMgrInit() //初始化集群
G_GameLogicPtr.StartTime = time.Now().Unix()
// G_GameLogicPtr.CreateHttpManager()

View File

@ -20,11 +20,18 @@ type MessageMgr struct {
*ServerMod
middlewares []MessageMiddleware
workerPool *WorkerPool
handler map[int]MessageHandlerFunc
}
type MessageData struct {
MessageList map[int64][]*msg.Msg
MessageList map[int64]*MessageList
PlayerList map[int64]int
mu sync.Mutex
}
type MessageList struct {
Messages []*msg.Msg
mu sync.Mutex
}
// Worker Pool 结构
@ -52,7 +59,13 @@ type TaskResult struct {
func (m *MessageMgr) MessageMgrInit() {
m.key = MESSAGE_MGR_KEY
m.data = &MessageData{}
m.data = &MessageData{
MessageList: make(map[int64]*MessageList),
PlayerList: make(map[int64]int),
}
// 注册处理函数
m.init()
m.handler = make(map[int]MessageHandlerFunc)
m.middlewares = []MessageMiddleware{}
// 初始化 Worker Pool (10个worker, 1000个队列大小)
m.workerPool = NewWorkerPool(10, 1000)
@ -60,8 +73,61 @@ func (m *MessageMgr) MessageMgrInit() {
m.Use(RecoveryMiddleware())
m.Use(LoggingMiddleware())
m.Use(TimeoutMiddleware(5 * time.Second))
// 注册处理函数
m.init()
if conf.Server.ServerType == "center" {
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_LOGIN, MessageHandlerFunc(PlayerLogin))
m.RegisterHandler(msg.HANDLE_MOD_PLAYER_MSG, MessageHandlerFunc(SendToPlayer))
}
}
// 注册处理器
func (s *MessageMgr) RegisterHandler(HandlerType int, fun MessageHandlerFunc) {
s.handler[HandlerType] = fun
}
func getData() *MessageData {
return G_GameLogicPtr.MessageMgr.data.(*MessageData)
}
func PlayerLogin(data *msg.Msg) (interface{}, error) {
// 关闭 Worker Pool
messageMgrData := getData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
messageMgrData.PlayerList[int64(data.From)] = data.Extra.(int)
if _, ok := messageMgrData.MessageList[int64(data.From)]; !ok {
messageMgrData.MessageList[int64(data.From)] = &MessageList{
Messages: []*msg.Msg{},
}
}
messageMgrData.PlayerList[int64(data.From)] = data.Extra.(int)
return nil, nil
}
func SendToPlayer(data *msg.Msg) (interface{}, error) {
PlayerId := int64(data.To)
messageMgrData := getData()
// 遍历消息列表,发送消息给在线玩家
messages, ok := messageMgrData.MessageList[int64(PlayerId)]
if !ok {
messageMgrData.mu.Lock()
messages = &MessageList{
Messages: []*msg.Msg{},
}
messageMgrData.MessageList[int64(PlayerId)] = messages
messageMgrData.mu.Unlock()
}
messages.mu.Lock()
defer messages.mu.Unlock()
messages.Messages = append(messages.Messages, data)
if node, ok := messageMgrData.PlayerList[int64(PlayerId)]; ok {
for _, message := range messages.Messages {
err := SendMsgToNode(message, node)
if err != nil {
log.Error("Failed to send message to player %d: %v", PlayerId, err)
}
}
}
return nil, nil
}
// 添加中间件
@ -86,7 +152,7 @@ func (m *MessageMgr) RegisterMessageHandler(hType int, handler MessageHandlerFun
func (m *MessageMgr) Handle(msg *msg.Msg) (interface{}, error) {
if fun, ok := m.handler[msg.Type]; ok {
return fun.(MessageHandlerFunc)(msg)
return fun(msg)
}
log.Error("server mod key:%s handle not exist handle type:%d", m.key, msg.Type)
return nil, fmt.Errorf("server mod handler err")
@ -107,9 +173,9 @@ func SendMessage(m1 *msg.Msg) error {
// 异步处理消息 (多线程版本)
func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error {
if fun, ok := m.handler[message.Type]; ok {
if fun, ok := m.handler[message.HandleType]; ok {
// 应用中间件
handlerWithMiddleware := m.applyMiddlewares(fun.(MessageHandlerFunc))
handlerWithMiddleware := m.applyMiddlewares(fun)
// 创建任务
task := &MessageTask{
@ -142,7 +208,7 @@ func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error {
func MessageHandle(m *msg.Msg) error {
log.Debug("RecvMessage m %v", m)
// 这里可以调用 MessageMgr 的处理方法
// G_GameLogicPtr.MessageMgr.MessageHandleAsync(m)
G_GameLogicPtr.MessageMgr.MessageHandleAsync(m)
return nil
}
@ -334,13 +400,13 @@ func SendMsgToNode(m *msg.Msg, node int) error {
func SendPlayerMsg(m *msg.Msg) error {
clone := m.Clone()
clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m))
clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
return mergeCluster.SendServerMsg(m, conf.Server.CenterNode)
}
func CallPlayerMsg(m *msg.Msg) (interface{}, error) {
clone := m.Clone()
clone.UniKey = GoUtil.UniKey(fmt.Sprintf("%v,Player Msg", m))
clone.HandleType = msg.HANDLE_TYPE_PLAYER_MSG
clone.HandleType = msg.HANDLE_MOD_PLAYER_MSG
return mergeCluster.CallServerMsg(m, conf.Server.CenterNode)
}

View File

@ -20,7 +20,9 @@ var MSG_ZERO_UPDATE = &Msg{Type: SERVER_ZERO_UPDATE}
var MSG_NOON_UPDATE = &Msg{Type: SERVER_NOON_UPDATE}
const (
HANDLE_TYPE_PLAYER_MSG = 20001 // 玩家消息
HANDLE_MOD_PLAYER_MSG = 20001 // 玩家消息
HANDLE_MOD_CLUSTER_MSG = 20002 // 集群消息
HANDLE_MOD_PLAYER_LOGIN = 20003 // 玩家登录消息
)
const (