优化goroutine,decorate part增加校验

This commit is contained in:
hahwu 2026-01-20 16:36:17 +08:00
parent c0858e20f0
commit 08eeeb2a40
9 changed files with 212 additions and 65 deletions

View File

@ -17,8 +17,9 @@ var (
LittleEndian = false LittleEndian = false
// skeleton conf // skeleton conf
GoLen = 10000 // 增加 goroutine 相关配置,避免 "Too many goroutines" 错误
TimerDispatcherLen = 10000 GoLen = 50000 // 从 10000 增加到 50000控制并发 goroutine 数量
AsynCallLen = 10000 TimerDispatcherLen = 50000 // 从 10000 增加到 50000定时器队列长度
ChanRPCLen = 10000 AsynCallLen = 50000 // 从 10000 增加到 50000异步调用队列长度
ChanRPCLen = 50000 // 从 10000 增加到 50000RPC 通道长度
) )

View File

@ -213,3 +213,11 @@ func GetPartNumByAreaId(AreaId int) map[int]int {
} }
return res return res
} }
func GetAreaIdByIndoorId(IndoorId int) int {
data, err := gamedata.GetDataByIntKey(INDOOR_PROGRESS, IndoorId)
if err != nil {
return 0
}
return gamedata.GetIntValue(data, "Scene")
}

View File

@ -1,6 +1,7 @@
package db package db
import ( import (
"database/sql"
"fmt" "fmt"
"reflect" "reflect"
"server/MergeConst" "server/MergeConst"
@ -17,17 +18,50 @@ import (
) )
var SqlDb *sqlx.DB var SqlDb *sqlx.DB
var sqlDbMu sync.Mutex var sqlDbMu sync.RWMutex
// GetDB 线程安全地获取数据库连接
func GetDB() *sqlx.DB {
sqlDbMu.RLock()
defer sqlDbMu.RUnlock()
return SqlDb
}
// GetDBOrPanic 获取数据库连接,如果为 nil 则记录错误
func GetDBOrPanic() *sqlx.DB {
db := GetDB()
if db == nil {
log.Error("Database connection is nil, please check database initialization")
}
return db
}
// EnsureDB 确保数据库连接可用,如果不可用则返回错误
func EnsureDB() (*sqlx.DB, error) {
db := GetDB()
if db == nil {
return nil, fmt.Errorf("database connection is nil")
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("database ping failed: %w", err)
}
return db, nil
}
// 封装创建连接 // 封装创建连接
func connectMySQL() (*sqlx.DB, error) { func connectMySQL() (*sqlx.DB, error) {
MysqlPwd, _ := GoUtil.Decrypt(conf.Server.MySqlPwd) MysqlPwd, _ := GoUtil.Decrypt(conf.Server.MySqlPwd)
connect := fmt.Sprintf("%s:%s@(%s:%s)/%s", conf.Server.MySqlUsr, MysqlPwd, conf.Server.MySqlAddr, conf.Server.MySqlPort, conf.Server.DbName) // 减少超时时间,避免长时间阻塞
connect := fmt.Sprintf("%s:%s@(%s:%s)/%s?timeout=10s&readTimeout=15s&writeTimeout=15s&parseTime=true", conf.Server.MySqlUsr, MysqlPwd, conf.Server.MySqlAddr, conf.Server.MySqlPort, conf.Server.DbName)
db, err := sqlx.Connect("mysql", connect) db, err := sqlx.Connect("mysql", connect)
if err != nil { if err != nil {
return nil, err return nil, err
} }
db.SetMaxOpenConns(20) // 增加最大连接数,减少连接等待时间
db.SetMaxOpenConns(100)
db.SetMaxIdleConns(20)
db.SetConnMaxLifetime(30 * time.Minute) // 减少连接生命周期
db.SetConnMaxIdleTime(5 * time.Minute) // 减少空闲时间
return db, nil return db, nil
} }
@ -38,19 +72,29 @@ func InitDB() {
log.Debug("connect mysql failed: %v", err) log.Debug("connect mysql failed: %v", err)
return return
} }
sqlDbMu.Lock()
SqlDb = db SqlDb = db
sqlDbMu.Unlock()
log.Debug("connect mysql success") log.Debug("connect mysql success")
// 定时检测与重连 // 定时检测与重连
go func() { go func() {
ticker := time.NewTicker(time.Second) ticker := time.NewTicker(5 * time.Second) // 改为5秒检测一次降低频率
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {
sqlDbMu.Lock() sqlDbMu.RLock()
cur := SqlDb cur := SqlDb
sqlDbMu.Unlock() sqlDbMu.RUnlock()
if cur == nil || cur.Ping() != nil {
log.Debug("mysql ping failed, start reconnect") if cur == nil {
log.Debug("mysql connection is nil, start reconnect")
ReconnectDB()
continue
}
// Ping 操作不持有锁,避免阻塞其他操作
if err := cur.Ping(); err != nil {
log.Debug("mysql ping failed: %v, start reconnect", err)
ReconnectDB() ReconnectDB()
} }
} }
@ -74,7 +118,12 @@ func ReconnectDB() {
} }
func SeriesTransaction(sqlstrs []string, params [][]any) (err error) { func SeriesTransaction(sqlstrs []string, params [][]any) (err error) {
tx, err := SqlDb.Begin() sqlDb := GetDB()
if sqlDb == nil {
return fmt.Errorf("database connection is nil")
}
tx, err := sqlDb.Begin()
if err != nil { if err != nil {
log.Debug("Transaction failed, err:%v\n", err) log.Debug("Transaction failed, err:%v\n", err)
return err return err
@ -379,20 +428,38 @@ func FormatAllMemLoadDb(u interface{}, tableName string, Exclude string) (err er
} }
func GetServerData(d interface{}, Key string) (err error) { func GetServerData(d interface{}, Key string) (err error) {
sqlDb := GetDB()
if sqlDb == nil {
return fmt.Errorf("database connection is nil")
}
sql := "select * from t_server_mod where `key` = ?" sql := "select * from t_server_mod where `key` = ?"
err = SqlDb.Get(d, sql, Key) err = sqlDb.Get(d, sql, Key)
return return
} }
func SaveServerData(data *SqlServerModStruct) error { func SaveServerData(data *SqlServerModStruct) error {
sqlDb := GetDB()
if sqlDb == nil {
return fmt.Errorf("database connection is nil")
}
sql := "update t_server_mod set `mData` = ? , `updateTime` = ? where `key` = ?" sql := "update t_server_mod set `mData` = ? , `updateTime` = ? where `key` = ?"
_, err := SqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key) _, err := sqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key)
return err
}
func SaveServerDataWithTx(tx *sql.Tx, data *SqlServerModStruct) error {
sql := "update t_server_mod set `mData` = ? , `updateTime` = ? where `key` = ?"
_, err := tx.Exec(sql, data.ModData, data.UpdataTime, data.Key)
return err return err
} }
func InsertServerData(data *SqlServerModStruct) error { func InsertServerData(data *SqlServerModStruct) error {
sqlDb := GetDB()
if sqlDb == nil {
return fmt.Errorf("database connection is nil")
}
sql := "insert into t_server_mod (`mData` , `updateTime` ,`key`) Values (?,?,?)" sql := "insert into t_server_mod (`mData` , `updateTime` ,`key`) Values (?,?,?)"
_, err := SqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key) _, err := sqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key)
return err return err
} }

View File

@ -219,8 +219,12 @@ func HandleClientReq(args []interface{}) {
} }
p.(*Player).ProcessTrigger() p.(*Player).ProcessTrigger()
execTime := time.Now().UnixMilli() - now execTime := time.Now().UnixMilli() - now
if execTime > int64(1) { if execTime > int64(500) {
log.Warn("uid : %d, func : %s, execTime : %d ms", p.(*Player).M_DwUin, m.GetFunc(), execTime) log.Warn("uid : %d, func : %s, execTime : %d ms", p.(*Player).M_DwUin, m.GetFunc(), execTime)
p.(*Player).TeLog("Long_Method_Log", map[string]interface{}{
"method_name": m.GetFunc(),
"exec_time": execTime,
})
} }
} }
} }

View File

@ -43,7 +43,7 @@ func (d *Decorate) InitData() {
} }
for k := range d.PartCost { for k := range d.PartCost {
AreaId := decorateCfg.GetAreaId(k) AreaId := decorateCfg.GetAreaId(k)
if AreaId != d.AreaId { if AreaId < d.AreaId {
delete(d.PartCost, k) delete(d.PartCost, k)
} }
} }
@ -70,6 +70,15 @@ func (d *Decorate) Decorate(areaId int, decorateId int) ([]*item.Item, error) {
d.AreaId++ d.AreaId++
d.Progress = 0 d.Progress = 0
d.FinishList = make(map[int]struct{}) d.FinishList = make(map[int]struct{})
for k := range d.PartCost {
AreaId := decorateCfg.GetAreaId(k)
if AreaId < d.AreaId {
delete(d.PartCost, k)
}
}
if len(d.PartCost) == 0 {
d.initPartCost(d.AreaId)
}
} }
d.DecorateNum++ d.DecorateNum++
@ -101,7 +110,7 @@ func (d *Decorate) GetDecorateCostItem(AreaId, DecorateId int, DecorateOffIsExis
} }
for k := range d.PartCost { for k := range d.PartCost {
AreaId := decorateCfg.GetAreaId(k) AreaId := decorateCfg.GetAreaId(k)
if AreaId != d.AreaId { if AreaId < d.AreaId {
delete(d.PartCost, k) delete(d.PartCost, k)
} }
} }
@ -167,17 +176,16 @@ func (d *Decorate) DecorateAll(Star int, DecorateOffIsExist bool) ([]*item.Item,
d.AreaId++ d.AreaId++
d.Progress = 0 d.Progress = 0
d.FinishList = make(map[int]struct{}) d.FinishList = make(map[int]struct{})
} for k := range d.PartCost {
for k := range d.PartCost { AreaId := decorateCfg.GetAreaId(k)
AreaId := decorateCfg.GetAreaId(k) if AreaId < d.AreaId {
if AreaId != d.AreaId { delete(d.PartCost, k)
delete(d.PartCost, k) }
}
if len(d.PartCost) == 0 {
d.initPartCost(d.AreaId)
} }
} }
if len(d.PartCost) == 0 {
d.initPartCost(d.AreaId)
}
SubItems = append(SubItems, item.NewItem(item.ITEM_STAR_ID, SubItem)) SubItems = append(SubItems, item.NewItem(item.ITEM_STAR_ID, SubItem))
return SubItems, AddItem, Num, DecorateList, Log, PetExp return SubItems, AddItem, Num, DecorateList, Log, PetExp
} }

View File

@ -260,7 +260,7 @@ func (p *Player) InitPlayer(UserName string) error {
ChessMod := p.PlayMod.getChessMod() ChessMod := p.PlayMod.getChessMod()
ChargeMod.FixBug(ChessMod.GetEmitList()) ChargeMod.FixBug(ChessMod.GetEmitList())
p.FixOrderBug() p.FixOrderBug()
p.FixDecorate() //p.FixDecorate()
return nil return nil
} }
@ -269,8 +269,10 @@ func (p *Player) OrderShip() {
if err != nil { if err != nil {
return return
} }
// 避免为每个订单创建 goroutine改为批量处理或同步处理
for _, OrderInfo := range OrderList { for _, OrderInfo := range OrderList {
go p.TriggerShippingOrderOrigin(&msg.ReqShippingOrder{ // 直接同步处理,避免创建过多 goroutine
p.TriggerShippingOrderOrigin(&msg.ReqShippingOrder{
OrderSn: OrderInfo.OrderId, OrderSn: OrderInfo.OrderId,
}) })
} }
@ -1207,7 +1209,8 @@ func (p *Player) DispatcherHandle() {
if msg != nil { if msg != nil {
p.wg.Done() p.wg.Done()
log.Debug("player %d recive msg %v", p.M_DwUin, msg) log.Debug("player %d recive msg %v", p.M_DwUin, msg)
go p.HandleMsg(msg.Clone()) // 直接在当前 goroutine 中处理,避免创建过多 goroutine
p.HandleMsg(msg.Clone())
} }
} }
} }

View File

@ -43,8 +43,9 @@ func (s *ServerMod) init() {
s.handler = make(map[int]interface{}) s.handler = make(map[int]interface{})
s.update = false s.update = false
s.LoadData() s.LoadData()
// 直接调用 SaveData不要创建新的 goroutine
s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME)*time.Second, func() { s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME)*time.Second, func() {
go s.SaveData() s.SaveData()
}) })
go func() { go func() {
defer func() { defer func() {
@ -96,8 +97,9 @@ func (s *ServerMod) Send(msg *msg.Msg) {
// 同步请求 // 同步请求
func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) { func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) {
responseChan := make(chan interface{}) // 使用带缓冲的 channel 避免 goroutine 在超时时泄漏
errorChan := make(chan error) responseChan := make(chan interface{}, 1)
errorChan := make(chan error, 1)
go func() { go func() {
defer func() { defer func() {
@ -129,8 +131,9 @@ func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) {
// mysql 保存消息 // mysql 保存消息
func (s *ServerMod) SaveData() { func (s *ServerMod) SaveData() {
// 直接在定时器回调中执行,不创建新的 goroutine 避免泄漏
s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME+GoUtil.RandNum(5, 10))*time.Second, func() { s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME+GoUtil.RandNum(5, 10))*time.Second, func() {
go s.SaveData() s.SaveData()
}) })
DbData := db.SqlServerModStruct{} DbData := db.SqlServerModStruct{}
DbData.Key = s.key DbData.Key = s.key
@ -139,23 +142,41 @@ func (s *ServerMod) SaveData() {
DbData.ModData, err = GoUtil.GobMarshal(s.data) DbData.ModData, err = GoUtil.GobMarshal(s.data)
if err != nil { if err != nil {
log.Error("SaveData Marshal failed,Mod Key: %s err:%v", s.key, err) log.Error("SaveData Marshal failed,Mod Key: %s err:%v", s.key, err)
return
} }
// log.Debug("SaveData Marshal success,Mod Key: %s", s.key) // log.Debug("SaveData Marshal success,Mod Key: %s", s.key)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
// 使用线程安全的方式获取数据库连接
sqlDb := db.GetDB()
if sqlDb == nil {
log.Error("SaveData failed, database connection is nil, Mod Key: %s", s.key)
return
}
// 先测试连接是否可用
if err := sqlDb.Ping(); err != nil {
log.Error("SaveData failed, database ping error, Mod Key: %s err:%v", s.key, err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
txOptions := &sql.TxOptions{} txOptions := &sql.TxOptions{}
tx, err := db.SqlDb.BeginTx(ctx, txOptions) tx, err := sqlDb.BeginTx(ctx, txOptions)
if err != nil { if err != nil {
log.Error("SaveData sql begin tx failed,Mod Key: %s err:%v", s.key, err) log.Error("SaveData sql begin tx failed,Mod Key: %s err:%v, data size: %d bytes", s.key, err, len(DbData.ModData))
return return
} }
err = db.SaveServerData(&DbData) err = db.SaveServerDataWithTx(tx, &DbData)
if err != nil { if err != nil {
tx.Rollback() tx.Rollback()
log.Error("SaveData sql exec ,Mod Key: %s err:%v", s.key, err) log.Error("SaveData sql exec failed,Mod Key: %s err:%v", s.key, err)
return return
} }
tx.Commit() err = tx.Commit()
if err != nil {
log.Error("SaveData sql commit failed,Mod Key: %s err:%v", s.key, err)
}
} }
func (s *ServerMod) LoadData() { func (s *ServerMod) LoadData() {

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"runtime"
"runtime/debug" "runtime/debug"
"server/conf" "server/conf"
"server/game" "server/game"
@ -13,6 +14,9 @@ import (
) )
func main() { func main() {
// 设置使用所有 CPU 核心
runtime.GOMAXPROCS(runtime.NumCPU())
lconf.LogLevel = conf.Server.LogLevel lconf.LogLevel = conf.Server.LogLevel
lconf.LogPath = conf.Server.LogPath lconf.LogPath = conf.Server.LogPath
lconf.LogFlag = conf.LogFlag lconf.LogFlag = conf.LogFlag

View File

@ -1,6 +1,9 @@
package test package test
import ( import (
"fmt"
decorateCfg "server/conf/decorate"
"server/db"
"server/game" "server/game"
"server/pkg/github.com/name5566/leaf/log" "server/pkg/github.com/name5566/leaf/log"
"testing" "testing"
@ -12,7 +15,7 @@ func TestFixDecorate(t *testing.T) {
p.FixDecorate() p.FixDecorate()
// //
p.InitPlayer("202601K111") p.InitPlayer("dee8eeb83ea3c54e48427b7ff20066fb")
p.FixDecorate() p.FixDecorate()
DecorateMod := p.GetDecorateMod() DecorateMod := p.GetDecorateMod()
@ -22,28 +25,56 @@ func TestFixDecorate(t *testing.T) {
} }
func TestFixUserData(t *testing.T) { func TestFixUserData(t *testing.T) {
// 确保数据库已初始化
if db.SqlDb == nil {
db.InitDB()
// 等待初始化完成
for i := 0; i < 10; i++ {
if db.SqlDb != nil {
break
}
log.Warn("Waiting for database initialization...")
// time.Sleep(100 * time.Millisecond)
}
}
if db.SqlDb == nil {
t.Fatal("Database initialization failed")
}
log.Warn("hello world") log.Warn("hello world")
// type account struct { type account struct {
// Account string `db:"user_name"` Account string `db:"user_name"`
// } }
// var accounts []account var accounts []account
// err := db.SqlDb.Select(&accounts, "SELECT `user_name` FROM t_account ") sqlDb := db.GetDB() // 使用线程安全的方式获取连接
// if err != nil { if sqlDb == nil {
// t.Errorf("Failed to fetch accounts: %v", err) t.Fatal("Database connection is nil")
// return }
// } err := sqlDb.Select(&accounts, "SELECT `user_name` FROM t_account order by auto_id")
// for _, acc := range accounts { if err != nil {
// p := new(game.Player) t.Errorf("Failed to fetch accounts: %v", err)
// p.InitPlayer(acc.Account) return
// DecorateMod := p.GetDecorateMod() }
// if DecorateMod.PartCost == nil { i := 0
// continue for _, acc := range accounts {
// } i++
// for k := range DecorateMod.PartCost { fmt.Printf("Fixing account %d/%d: %s\n", i, len(accounts), acc.Account)
// AreaId := decorateCfg.GetAreaId(k) account := acc.Account
// if AreaId != DecorateMod.AreaId { p := new(game.Player)
// fmt.Printf("Fixing account: %s, PartId: %d, OldAreaId: %d, NewAreaId: %d\n", acc.Account, k, AreaId, DecorateMod.AreaId) p.InitPlayer(account)
// } DecorateMod := p.GetDecorateMod()
// } if DecorateMod.PartCost == nil {
// } return
}
for k := range DecorateMod.PartCost {
AreaId := decorateCfg.GetAreaIdByIndoorId(k)
if AreaId < DecorateMod.AreaId {
log.Debug("Fixing account: %s, PartId: %d, OldAreaId: %d, NewAreaId: %d\n", account, k, AreaId, DecorateMod.AreaId)
}
}
p.Stop()
p = nil
}
log.Debug("All accounts fixed")
} }