203 lines
4.8 KiB
Go
203 lines
4.8 KiB
Go
package game
|
||
|
||
import (
|
||
"context"
|
||
"database/sql"
|
||
"fmt"
|
||
"server/db"
|
||
"server/game/mod/msg"
|
||
GoUtil "server/game_util"
|
||
"server/pkg/github.com/name5566/leaf/log"
|
||
"server/pkg/github.com/name5566/leaf/timer"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
const (
|
||
FRIEND_MGR_KEY = "FRIEND_MGR"
|
||
RANK_MGR_KEY = "RANK_MGR"
|
||
MAIL_MGR_KEY = "MAIL_MGR"
|
||
VAR_MGR_KEY = "VAR_MGR"
|
||
CHAMPSHIP_MGR_KEY = "CHAMPSHIP_MGR"
|
||
BAN_MGR_KEY = "BAN_MGR"
|
||
PER_SAVE_TIME = 60
|
||
MESSAGE_MGR_KEY = "MESSAGE_MGR"
|
||
)
|
||
|
||
type ServerMod struct {
|
||
mDispatr *timer.Dispatcher
|
||
lock sync.Mutex
|
||
msgChan chan *msg.Msg
|
||
key string
|
||
handler map[int]interface{}
|
||
timeout time.Duration
|
||
update bool
|
||
data interface{}
|
||
}
|
||
|
||
// 初始化
|
||
func (s *ServerMod) init() {
|
||
s.msgChan = make(chan *msg.Msg, 10)
|
||
s.mDispatr = timer.NewDispatcher(10)
|
||
s.timeout = time.Duration(time.Second * 10)
|
||
s.handler = make(map[int]interface{})
|
||
s.update = false
|
||
s.LoadData()
|
||
// 直接调用 SaveData,不要创建新的 goroutine
|
||
s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME)*time.Second, func() {
|
||
s.SaveData()
|
||
})
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Error("%s panic: %s", s.key, r)
|
||
s.lock.Unlock()
|
||
}
|
||
}()
|
||
|
||
for {
|
||
select {
|
||
case msg := <-s.msgChan:
|
||
s.lock.Lock()
|
||
s.Handle(msg)
|
||
s.lock.Unlock()
|
||
case Cb := <-s.mDispatr.ChanTimer:
|
||
s.lock.Lock()
|
||
Cb.Cb()
|
||
s.lock.Unlock()
|
||
}
|
||
}
|
||
}()
|
||
|
||
}
|
||
|
||
// 处理消息
|
||
func (s *ServerMod) Handle(m *msg.Msg) (interface{}, error) {
|
||
if fun, ok := s.handler[m.Type]; ok {
|
||
return fun.(func(*msg.Msg) (interface{}, error))(m)
|
||
}
|
||
log.Error("server mod key:%s handle not exist handle type:%d", s.key, m.Type)
|
||
return nil, fmt.Errorf("server mod handler err")
|
||
}
|
||
|
||
// 注册处理器
|
||
func (s *ServerMod) RegisterHandler(HandlerType int, fun interface{}) {
|
||
switch fun.(type) {
|
||
case func(*msg.Msg) (interface{}, error):
|
||
default:
|
||
log.Error("RegisterHandler fun type err, key:%s, handler type :%d", s.key, HandlerType)
|
||
}
|
||
s.handler[HandlerType] = fun
|
||
}
|
||
|
||
// 发送消息队列
|
||
func (s *ServerMod) Send(msg *msg.Msg) {
|
||
s.msgChan <- msg
|
||
}
|
||
|
||
// 同步请求
|
||
func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) {
|
||
// 使用带缓冲的 channel 避免 goroutine 在超时时泄漏
|
||
responseChan := make(chan interface{}, 1)
|
||
errorChan := make(chan error, 1)
|
||
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
errorChan <- fmt.Errorf("panic: %v", r)
|
||
}
|
||
}()
|
||
s.lock.Lock()
|
||
defer s.lock.Unlock()
|
||
result, err := s.Handle(m)
|
||
if err != nil {
|
||
errorChan <- err
|
||
return
|
||
}
|
||
responseChan <- result
|
||
}()
|
||
|
||
select {
|
||
case res := <-responseChan:
|
||
return res, nil
|
||
case err := <-errorChan:
|
||
log.Debug("handle call err. %v", err)
|
||
return nil, err
|
||
case <-time.After(s.timeout):
|
||
log.Error("handle call timeout")
|
||
return nil, fmt.Errorf("timeout after %v", s.timeout)
|
||
}
|
||
}
|
||
|
||
// mysql 保存消息
|
||
func (s *ServerMod) SaveData() {
|
||
// 直接在定时器回调中执行,不创建新的 goroutine 避免泄漏
|
||
s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME+GoUtil.RandNum(5, 10))*time.Second, func() {
|
||
s.SaveData()
|
||
})
|
||
DbData := db.SqlServerModStruct{}
|
||
DbData.Key = s.key
|
||
DbData.UpdataTime = GoUtil.Now()
|
||
var err error
|
||
DbData.ModData, err = GoUtil.GobMarshal(s.data)
|
||
if err != nil {
|
||
log.Error("SaveData Marshal failed,Mod Key: %s err:%v", s.key, err)
|
||
return
|
||
}
|
||
// log.Debug("SaveData Marshal success,Mod Key: %s", s.key)
|
||
|
||
// 使用线程安全的方式获取数据库连接
|
||
sqlDb := db.GetDB()
|
||
if sqlDb == nil {
|
||
log.Error("SaveData failed, database connection is nil, Mod Key: %s", s.key)
|
||
return
|
||
}
|
||
|
||
// 先测试连接是否可用
|
||
if err := sqlDb.Ping(); err != nil {
|
||
log.Error("SaveData failed, database ping error, Mod Key: %s err:%v", s.key, err)
|
||
return
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
txOptions := &sql.TxOptions{}
|
||
tx, err := sqlDb.BeginTx(ctx, txOptions)
|
||
if err != nil {
|
||
log.Error("SaveData sql begin tx failed,Mod Key: %s err:%v, data size: %d bytes", s.key, err, len(DbData.ModData))
|
||
return
|
||
}
|
||
err = db.SaveServerDataWithTx(tx, &DbData)
|
||
if err != nil {
|
||
tx.Rollback()
|
||
log.Error("SaveData sql exec failed,Mod Key: %s err:%v", s.key, err)
|
||
return
|
||
}
|
||
err = tx.Commit()
|
||
if err != nil {
|
||
log.Error("SaveData sql commit failed,Mod Key: %s err:%v", s.key, err)
|
||
}
|
||
}
|
||
|
||
func (s *ServerMod) LoadData() {
|
||
DbData := db.SqlServerModStruct{}
|
||
err := db.GetServerData(&DbData, s.key)
|
||
if err != nil {
|
||
DbData.Key = s.key
|
||
DbData.UpdataTime = GoUtil.Now()
|
||
err = db.InsertServerData(&DbData)
|
||
if err != nil {
|
||
log.Error("LoadData sql exec ,Mod Key: %s err:%v", s.key, err)
|
||
}
|
||
return
|
||
}
|
||
if len(DbData.ModData) == 0 {
|
||
return
|
||
}
|
||
err = GoUtil.GobUnmarshal(DbData.ModData, s.data)
|
||
if err != nil {
|
||
log.Error("LoadData Unmarshal failed,Mod Key: %s err:%v", s.key, err)
|
||
return
|
||
}
|
||
}
|