消息日志优化

This commit is contained in:
hahwu 2025-12-30 15:03:27 +08:00
parent 4cf882ecd2
commit f5fef79619

View File

@ -14,6 +14,8 @@ import (
"time" "time"
) )
var id = 1
// 中间件函数类型 // 中间件函数类型
type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc type MessageMiddleware func(MessageHandlerFunc) MessageHandlerFunc
@ -55,6 +57,7 @@ type MessageTask struct {
Msg *msg.Msg Msg *msg.Msg
Handler MessageHandlerFunc Handler MessageHandlerFunc
Result chan *TaskResult Result chan *TaskResult
id int
} }
// 任务结果 // 任务结果
@ -200,23 +203,12 @@ func ClusterSyncHandler(data *msg.Msg) (interface{}, error) {
func PlayerLoginHandler(data *msg.Msg) (interface{}, error) { func PlayerLoginHandler(data *msg.Msg) (interface{}, error) {
// 关闭 Worker Pool // 关闭 Worker Pool
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
node := data.Extra.(int) node := data.Extra.(int)
messageMgrData.PlayerList[int64(data.From)] = node
if _, ok := messageMgrData.MessageList[int64(data.From)]; !ok {
messageMgrData.MessageList[int64(data.From)] = &MessageList{
Messages: []*msg.Msg{},
}
}
messageMgrData.mu.Unlock()
log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int)) log.Debug("[Middleware] Player login success player id: %v, node: %v", data.From, data.Extra.(int))
// 对玩家消息列表加锁 // 对玩家消息列表加锁
messages := messageMgrData.MessageList[int64(data.From)] messages := getMessge(int64(data.From))
messages.mu.Lock() messages.mu.Lock()
defer messages.mu.Lock() defer messages.mu.Unlock()
// 发送离线消息 // 发送离线消息
len := len(messages.Messages) len := len(messages.Messages)
for _, message := range messages.Messages { for _, message := range messages.Messages {
@ -234,17 +226,13 @@ func PlayerLogoutHandler(data *msg.Msg) (interface{}, error) {
} }
func ComsumerMsgHandler(data *msg.Msg) (interface{}, error) { func ComsumerMsgHandler(data *msg.Msg) (interface{}, error) {
messageMgrData := getMessageData() messages := getMessge(int64(data.From))
Message, ok := messageMgrData.MessageList[int64(data.To)] messages.mu.Lock()
if !ok { defer messages.mu.Unlock()
return nil, nil for i, msgItem := range messages.Messages {
}
Message.mu.Lock()
defer Message.mu.Unlock()
for i, msgItem := range Message.Messages {
if msgItem.UniKey == data.UniKey { if msgItem.UniKey == data.UniKey {
// 删除消息 // 删除消息
Message.Messages = append(Message.Messages[:i], Message.Messages[i+1:]...) messages.Messages = append(messages.Messages[:i], messages.Messages[i+1:]...)
log.Debug("[Middleware] Comsume message success type: %d, player id: %v", msgItem.Type, msgItem.From) log.Debug("[Middleware] Comsume message success type: %d, player id: %v", msgItem.Type, msgItem.From)
break break
} }
@ -256,15 +244,7 @@ func CenterPlayerMsgHandler(data *msg.Msg) (interface{}, error) {
PlayerId := int64(data.To) PlayerId := int64(data.To)
messageMgrData := getMessageData() messageMgrData := getMessageData()
// 遍历消息列表,发送消息给在线玩家 // 遍历消息列表,发送消息给在线玩家
messages, ok := messageMgrData.MessageList[int64(PlayerId)] messages := getMessge(PlayerId)
if !ok {
messageMgrData.mu.Lock()
messages = &MessageList{
Messages: []*msg.Msg{},
}
messageMgrData.MessageList[int64(PlayerId)] = messages
messageMgrData.mu.Unlock()
}
messages.mu.Lock() messages.mu.Lock()
defer messages.mu.Unlock() defer messages.mu.Unlock()
messages.Messages = append(messages.Messages, data) messages.Messages = append(messages.Messages, data)
@ -300,6 +280,18 @@ func PlayerReplyMsgHandler(data *msg.Msg) (interface{}, error) {
return nil, nil return nil, nil
} }
func getMessge(PlayerId int64) *MessageList {
messageMgrData := getMessageData()
messageMgrData.mu.Lock()
defer messageMgrData.mu.Unlock()
if _, ok := messageMgrData.MessageList[int64(PlayerId)]; !ok {
messageMgrData.MessageList[int64(PlayerId)] = &MessageList{
Messages: []*msg.Msg{},
}
}
return messageMgrData.MessageList[int64(PlayerId)]
}
// 添加中间件 // 添加中间件
func (m *MessageMgr) Use(middleware MessageMiddleware) { func (m *MessageMgr) Use(middleware MessageMiddleware) {
m.middlewares = append(m.middlewares, middleware) m.middlewares = append(m.middlewares, middleware)
@ -337,9 +329,10 @@ func (m *MessageMgr) MessageHandleAsync(message *msg.Msg) error {
if fun, ok := m.handler[message.HandleType]; ok { if fun, ok := m.handler[message.HandleType]; ok {
// 应用中间件 // 应用中间件
handlerWithMiddleware := m.applyMiddlewares(fun) handlerWithMiddleware := m.applyMiddlewares(fun)
id++
// 创建任务 // 创建任务
task := &MessageTask{ task := &MessageTask{
id: id,
Msg: message, Msg: message,
Handler: handlerWithMiddleware, Handler: handlerWithMiddleware,
Result: make(chan *TaskResult, 1), Result: make(chan *TaskResult, 1),
@ -744,14 +737,10 @@ func saveMessage(m *msg.Msg) error {
data := getMessageData() data := getMessageData()
data.mu.Lock() data.mu.Lock()
defer data.mu.Unlock() defer data.mu.Unlock()
if _, ok := data.MessageList[int64(m.To)]; !ok { messages := getMessge(int64(m.To))
data.MessageList[int64(m.To)] = &MessageList{ messages.mu.Lock()
Messages: []*msg.Msg{}, defer messages.mu.Unlock()
} messages.Messages = append(messages.Messages, m)
}
data.MessageList[int64(m.To)].mu.Lock()
defer data.MessageList[int64(m.To)].mu.Unlock()
data.MessageList[int64(m.To)].Messages = append(data.MessageList[int64(m.To)].Messages, m)
return nil return nil
} }