diff --git a/src/server/cluster/Cluster.go b/src/server/cluster/Cluster.go index 71835dd4..0a9e5efa 100644 --- a/src/server/cluster/Cluster.go +++ b/src/server/cluster/Cluster.go @@ -34,7 +34,7 @@ func Init() { server.MaxConnNum = int(math.MaxInt32) server.PendingWriteNum = 1 << 14 server.LenMsgLen = 4 - server.MaxMsgLen = 4096 + server.MaxMsgLen = 1 << 16 server.NewAgent = newServerAgent server.Start() } diff --git a/src/server/cluster/cluster_func.go b/src/server/cluster/cluster_func.go index 8be63217..c99b9be2 100644 --- a/src/server/cluster/cluster_func.go +++ b/src/server/cluster/cluster_func.go @@ -93,7 +93,7 @@ func connectRemote(RemoteAddr string, ConnType int, ConnLabel string) error { client.ConnNum = 1 client.PendingWriteNum = 1 << 14 client.LenMsgLen = 4 - client.MaxMsgLen = 4096 + client.MaxMsgLen = 1 << 16 client.NewAgent = newAgent client.ConnType = ConnType client.ConnLabel = ConnLabel diff --git a/src/server/conf/conf.go b/src/server/conf/conf.go index 0da14e78..df9225bd 100644 --- a/src/server/conf/conf.go +++ b/src/server/conf/conf.go @@ -17,8 +17,9 @@ var ( LittleEndian = false // skeleton conf - GoLen = 10000 - TimerDispatcherLen = 10000 - AsynCallLen = 10000 - ChanRPCLen = 10000 + // 增加 goroutine 相关配置,避免 "Too many goroutines" 错误 + GoLen = 50000 // 从 10000 增加到 50000,控制并发 goroutine 数量 + TimerDispatcherLen = 50000 // 从 10000 增加到 50000,定时器队列长度 + AsynCallLen = 50000 // 从 10000 增加到 50000,异步调用队列长度 + ChanRPCLen = 50000 // 从 10000 增加到 50000,RPC 通道长度 ) diff --git a/src/server/conf/decorate/decorate_cfg.go b/src/server/conf/decorate/decorate_cfg.go index 41ffefb3..e4d2bcab 100644 --- a/src/server/conf/decorate/decorate_cfg.go +++ b/src/server/conf/decorate/decorate_cfg.go @@ -213,3 +213,11 @@ func GetPartNumByAreaId(AreaId int) map[int]int { } return res } + +func GetAreaIdByIndoorId(IndoorId int) int { + data, err := gamedata.GetDataByIntKey(INDOOR_PROGRESS, IndoorId) + if err != nil { + return 0 + } + return gamedata.GetIntValue(data, "Scene") +} diff --git a/src/server/conf/limited_time_event/limited_time_event_cfg.go b/src/server/conf/limited_time_event/limited_time_event_cfg.go index 99efcf8b..51a49df7 100644 --- a/src/server/conf/limited_time_event/limited_time_event_cfg.go +++ b/src/server/conf/limited_time_event/limited_time_event_cfg.go @@ -1,6 +1,7 @@ package limitedTimeEventCfg import ( + "math" "server/game/mod/item" GoUtil "server/game_util" "server/gamedata" @@ -146,18 +147,32 @@ func GetSenceJackpotReward(Id int) []*item.Item { } // 获取连击快手奖励 -func GetFastProduceReward(Times, Energy int) []*item.Item { +func GetFastProduceReward(Energy int) []*item.Item { data, err := gamedata.GetData(CFG_LIMITED_TIME_EVENT_FAST) if err != nil { log.Debug("GetSceneDashReward err:%v", err) return nil } - for _, v := range data { - if Times == gamedata.GetIntValue(v, "Times") && Energy <= gamedata.GetIntValue(v, "Max") && Energy >= gamedata.GetIntValue(v, "Min") { - return gamedata.GetItemList(v, "Items") - } + if len(data) == 0 { + return nil } - return nil + // Convert map to slice for sorting + type sortData struct { + Id string + Energy float64 + } + sortedList := make([]sortData, 0) + energy := float64(Energy) / 10.0 + for k, v := range data { + dataEnergy := gamedata.GetFloatValue(v, "EnergyValue") + sortedList = append(sortedList, sortData{k, math.Abs(energy - dataEnergy)}) + } + // Sort by Energy in ascending order + sort.Slice(sortedList, func(i, j int) bool { + return sortedList[i].Energy < sortedList[j].Energy + }) + + return gamedata.GetItemList(data[sortedList[0].Id], "Items") } // 获取连击快手最大次数 diff --git a/src/server/conf/server.json b/src/server/conf/server.json index 782ff82b..decddb79 100644 --- a/src/server/conf/server.json +++ b/src/server/conf/server.json @@ -1,5 +1,5 @@ { - "AppID": 1, + "AppID": 0, "LogLevel": "debug", "LogPath": "./log", "TCPAddr": ":3601", @@ -9,7 +9,7 @@ "MySqlUsr": "root", "MySqlPwd": "IOagNEq3C84c-20CmHEin5iODVc=", "MaxConnNum": 20000, - "DbName": "Merge_Pet_1", + "DbName": "merge_pet_1", "HttpPort": ":8081", "AppPath": "./app", "TELOGDIR" : "./teLog/", diff --git a/src/server/db/Mysql.go b/src/server/db/Mysql.go index fecbf2de..0b870d1d 100644 --- a/src/server/db/Mysql.go +++ b/src/server/db/Mysql.go @@ -1,6 +1,7 @@ package db import ( + "database/sql" "fmt" "reflect" "server/MergeConst" @@ -17,17 +18,50 @@ import ( ) var SqlDb *sqlx.DB -var sqlDbMu sync.Mutex +var sqlDbMu sync.RWMutex + +// GetDB 线程安全地获取数据库连接 +func GetDB() *sqlx.DB { + sqlDbMu.RLock() + defer sqlDbMu.RUnlock() + return SqlDb +} + +// GetDBOrPanic 获取数据库连接,如果为 nil 则记录错误 +func GetDBOrPanic() *sqlx.DB { + db := GetDB() + if db == nil { + log.Error("Database connection is nil, please check database initialization") + } + return db +} + +// EnsureDB 确保数据库连接可用,如果不可用则返回错误 +func EnsureDB() (*sqlx.DB, error) { + db := GetDB() + if db == nil { + return nil, fmt.Errorf("database connection is nil") + } + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("database ping failed: %w", err) + } + return db, nil +} // 封装创建连接 func connectMySQL() (*sqlx.DB, error) { MysqlPwd, _ := GoUtil.Decrypt(conf.Server.MySqlPwd) - connect := fmt.Sprintf("%s:%s@(%s:%s)/%s", conf.Server.MySqlUsr, MysqlPwd, conf.Server.MySqlAddr, conf.Server.MySqlPort, conf.Server.DbName) + // 减少超时时间,避免长时间阻塞 + connect := fmt.Sprintf("%s:%s@(%s:%s)/%s?timeout=10s&readTimeout=15s&writeTimeout=15s&parseTime=true", conf.Server.MySqlUsr, MysqlPwd, conf.Server.MySqlAddr, conf.Server.MySqlPort, conf.Server.DbName) db, err := sqlx.Connect("mysql", connect) if err != nil { return nil, err } - db.SetMaxOpenConns(20) + // 增加最大连接数,减少连接等待时间 + db.SetMaxOpenConns(100) + db.SetMaxIdleConns(20) + db.SetConnMaxLifetime(30 * time.Minute) // 减少连接生命周期 + db.SetConnMaxIdleTime(5 * time.Minute) // 减少空闲时间 return db, nil } @@ -38,19 +72,29 @@ func InitDB() { log.Debug("connect mysql failed: %v", err) return } + sqlDbMu.Lock() SqlDb = db + sqlDbMu.Unlock() log.Debug("connect mysql success") // 定时检测与重连 go func() { - ticker := time.NewTicker(time.Second) + ticker := time.NewTicker(5 * time.Second) // 改为5秒检测一次,降低频率 defer ticker.Stop() for range ticker.C { - sqlDbMu.Lock() + sqlDbMu.RLock() cur := SqlDb - sqlDbMu.Unlock() - if cur == nil || cur.Ping() != nil { - log.Debug("mysql ping failed, start reconnect") + sqlDbMu.RUnlock() + + if cur == nil { + log.Debug("mysql connection is nil, start reconnect") + ReconnectDB() + continue + } + + // Ping 操作不持有锁,避免阻塞其他操作 + if err := cur.Ping(); err != nil { + log.Debug("mysql ping failed: %v, start reconnect", err) ReconnectDB() } } @@ -74,7 +118,12 @@ func ReconnectDB() { } func SeriesTransaction(sqlstrs []string, params [][]any) (err error) { - tx, err := SqlDb.Begin() + sqlDb := GetDB() + if sqlDb == nil { + return fmt.Errorf("database connection is nil") + } + + tx, err := sqlDb.Begin() if err != nil { log.Debug("Transaction failed, err:%v\n", err) return err @@ -379,20 +428,38 @@ func FormatAllMemLoadDb(u interface{}, tableName string, Exclude string) (err er } func GetServerData(d interface{}, Key string) (err error) { + sqlDb := GetDB() + if sqlDb == nil { + return fmt.Errorf("database connection is nil") + } sql := "select * from t_server_mod where `key` = ?" - err = SqlDb.Get(d, sql, Key) + err = sqlDb.Get(d, sql, Key) return } func SaveServerData(data *SqlServerModStruct) error { + sqlDb := GetDB() + if sqlDb == nil { + return fmt.Errorf("database connection is nil") + } sql := "update t_server_mod set `mData` = ? , `updateTime` = ? where `key` = ?" - _, err := SqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key) + _, err := sqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key) + return err +} + +func SaveServerDataWithTx(tx *sql.Tx, data *SqlServerModStruct) error { + sql := "update t_server_mod set `mData` = ? , `updateTime` = ? where `key` = ?" + _, err := tx.Exec(sql, data.ModData, data.UpdataTime, data.Key) return err } func InsertServerData(data *SqlServerModStruct) error { + sqlDb := GetDB() + if sqlDb == nil { + return fmt.Errorf("database connection is nil") + } sql := "insert into t_server_mod (`mData` , `updateTime` ,`key`) Values (?,?,?)" - _, err := SqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key) + _, err := sqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key) return err } diff --git a/src/server/game/admin.go b/src/server/game/admin.go index b1eb58ee..be4b5acf 100644 --- a/src/server/game/admin.go +++ b/src/server/game/admin.go @@ -206,7 +206,7 @@ func AdminPlayerBack(a gate.Agent, res map[string]interface{}) { JsonBuff, _ := json.Marshal(res) response := &msg.AdminRes{} response.Func = "admin" - response.Info = JsonBuff + response.Info = string(JsonBuff) a.WriteMsg(response) } diff --git a/src/server/game/external.go b/src/server/game/external.go index 7e7c3709..40c6271b 100644 --- a/src/server/game/external.go +++ b/src/server/game/external.go @@ -52,6 +52,7 @@ func HandleAdminReq(args []interface{}) { } func HandleClientReq(args []interface{}) { + start := time.Now() if G_GameLogicPtr.SeverInfo.Status == SERVER_STATUS_CLOSE || G_GameLogicPtr.SeverInfo.Status == SERVER_STATUS_MAINTAIN { return // 服务器关闭或者维护中,不处理任何消息 } @@ -146,7 +147,7 @@ func HandleClientReq(args []interface{}) { ResRegisterAccount.ResultCode = 0 data, _ := proto.Marshal(ResRegisterAccount) gl.PackResInfo(a, "ResRegisterAccount", data) - case "ReqLogin": + case "ReqLogin": // 登录请求 detail := &msg.ReqLogin{} proto.Unmarshal(buf, detail) accountInfo := db.GetAccountInfoFromDb(detail.UserName) @@ -162,11 +163,13 @@ func HandleClientReq(args []interface{}) { G_GameLogicPtr.PackLoginResInfo(a, ResLogin) return } + newPlayer := false if ResLogin.DwUin > 0 { PlayerInfo := G_GameLogicPtr.GetPlayer(ResLogin.DwUin) err := G_GameLogicPtr.ReplaceExistPlayerAndAgent(a, PlayerInfo) if err != nil { PlayerInfo = G_GameLogicPtr.CreateNewPlayer(a, detail.UserName) + newPlayer = true } if PlayerInfo.PlayMod.getBaseMod().IdCardName == "" && conf.Server.IdVerify { ResLogin.ResultCode = MergeConst.Protocol_Error_Id_Not_Verify @@ -184,6 +187,9 @@ func HandleClientReq(args []interface{}) { p.(*Player).LoginBackData() p.(*Player).TeLog("Login_log", nil) } + if newPlayer { + log.Debug("uid : %d, init user process : %s, execTime : %v , isNew: %v", p.(*Player).M_DwUin, m.GetFunc(), time.Since(start), newPlayer) + } p.(*Player).ProcessTrigger() case "ReqServerTime": // 获取服务器时间 detail := &msg.ReqServerTime{} @@ -213,15 +219,23 @@ func HandleClientReq(args []interface{}) { err := RunNetProcessByKey(m.GetFunc(), []interface{}{a, buf}) if err != nil { log.Error("uid : %d, func : %s, err : %s", p.(*Player).M_DwUin, m.GetFunc(), err) + p.(*Player).TeLog("func_exec_error", map[string]interface{}{ + "method_name": m.GetFunc(), + "error_info": err.Error(), + }) p.(*Player).Recover(backup) //还原Player的数据 return } p.(*Player).ProcessTrigger() + p.(*Player).TeLog("func_exec_time", map[string]interface{}{ + "method_name": m.GetFunc(), + "exec_time": fmt.Sprintf("%v", time.Since(start)), + }) } } p, b := internal.Agents.Load(a) if b { p.(*Player).SendClientRes() } - + log.Debug("uid : %d, func : %s, execTime : %s ", p.(*Player).M_DwUin, m.GetFunc(), time.Since(start)) } diff --git a/src/server/game/gm_handler.go b/src/server/game/gm_handler.go index 88126e6b..f7ce991e 100644 --- a/src/server/game/gm_handler.go +++ b/src/server/game/gm_handler.go @@ -581,7 +581,12 @@ func ReqGmCommand_(player *Player, Command string) error { player.AddPlayroomUpvote(100100129) i := player.GetPlayroomUpvote(100100129) log.Debug("debug upvote:%d", i) - + case "addLimitEvent": + Id, _ := strconv.Atoi(arg[1]) + Cd, _ := strconv.Atoi(arg[2]) + LimitedTimeEventMod := player.PlayMod.getLimitedTimeEventMod() + LimitedTimeEventMod.AddEvent(Id, Cd) + player.PushClientRes(LimitedTimeEventMod.BackData()) default: return fmt.Errorf("Player %d ReqGmCommand:%v not found", player.M_DwUin, arg) } diff --git a/src/server/game/log_mgr.go b/src/server/game/log_mgr.go index e30a375b..825bedfc 100644 --- a/src/server/game/log_mgr.go +++ b/src/server/game/log_mgr.go @@ -197,6 +197,7 @@ func (L *LogMgr) InitManager() { } func (L *LogMgr) AddLog(logs *Log) { + return // 复制结构体和 Param map,避免并发修改导致 json.Marshal 时 panic copyLog := *logs diff --git a/src/server/game/message_mgr.go b/src/server/game/message_mgr.go index 4dc21d3b..98aecb1d 100644 --- a/src/server/game/message_mgr.go +++ b/src/server/game/message_mgr.go @@ -467,8 +467,6 @@ func (p *WorkerPool) spawnWorker() { // Worker 工作函数,监听其子上下文以便单独停止 func (p *WorkerPool) worker(ctx context.Context, id int) { defer p.wg.Done() - log.Debug("Worker %d started", id) - for { select { case <-ctx.Done(): @@ -768,6 +766,7 @@ func FriendMgrSend(m1 *msg.Msg) error { // 异步发送消息到指定节点 节点不在线则保存消息 func sendMessageAsync(m *msg.Msg, node int) error { + log.Debug("[Middleware] Send Async message to node: %d, message: %v", node, m) err := mergeCluster.SendServerMsg(m, node) if err != nil && GoUtil.InArray(m.HandleType, save_msg_type) { saveMessage(m) @@ -779,6 +778,7 @@ func sendMessageAsync(m *msg.Msg, node int) error { // 同步消息到指定节点 节点不在线则保存消息 func sendMessageSync(m *msg.Msg, node int) (*msg.Msg, error) { + log.Debug("[Middleware] Send Sync message to node: %d, message: %v", node, m) msg, err := mergeCluster.CallServerMsg(m, node) if err != nil && conf.Server.ServerType == "center" && GoUtil.InArray(m.HandleType, save_msg_type) { saveMessage(m) diff --git a/src/server/game/mod/chess/Chess.go b/src/server/game/mod/chess/Chess.go index c5502a94..dd9880ac 100644 --- a/src/server/game/mod/chess/Chess.go +++ b/src/server/game/mod/chess/Chess.go @@ -339,14 +339,37 @@ func (cb *ChessBorad) GetEmitList() []int { // 完成订单 移除棋子 func (cb *ChessBorad) FinishOrder(ChessId []int) error { + unlockChessList := cb.GetUnlockChessList() + unlockChessMap := make(map[int]int) + for _, v := range unlockChessList { + unlockChessMap[v]++ + } + + boardChess := []int{} + BagChess := []int{} for _, v := range ChessId { + if unlockChessMap[v] > 0 { + unlockChessMap[v]-- + boardChess = append(boardChess, v) + } else { + BagChess = append(BagChess, v) + } + } + for _, v := range boardChess { err := cb.FinishOrderChess(v) if err != nil { return err } } + for _, v := range BagChess { + err := cb.FinishOrderChessByBag(v) + if err != nil { + return err + } + } return nil } + func (cb *ChessBorad) FinishOrderChess(Chess int) error { for k, v := range cb.ChessList { if v == Chess { @@ -354,13 +377,17 @@ func (cb *ChessBorad) FinishOrderChess(Chess int) error { return nil } } + return fmt.Errorf("order finish board chess id:%d not exist", Chess) +} + +func (cb *ChessBorad) FinishOrderChessByBag(Chess int) error { for k, v := range cb.ChessBag.List { if v.ChessId == Chess { cb.ChessBag.List[k] = ChessBagGrid{} return nil } } - return errors.New("order finish chess id not exist") + return fmt.Errorf("order finish bag chess id:%d not exist", Chess) } // 棋子转换 diff --git a/src/server/game/mod/decorate/Decorate.go b/src/server/game/mod/decorate/Decorate.go index dcbf187d..c6d98afb 100644 --- a/src/server/game/mod/decorate/Decorate.go +++ b/src/server/game/mod/decorate/Decorate.go @@ -41,6 +41,12 @@ func (d *Decorate) InitData() { if len(d.PartCost) == 0 { d.initPartCost(d.AreaId) } + for k := range d.PartCost { + AreaId := decorateCfg.GetAreaIdByIndoorId(k) + if AreaId < d.AreaId { + delete(d.PartCost, k) + } + } } // 装饰 @@ -64,6 +70,15 @@ func (d *Decorate) Decorate(areaId int, decorateId int) ([]*item.Item, error) { d.AreaId++ d.Progress = 0 d.FinishList = make(map[int]struct{}) + for k := range d.PartCost { + AreaId := decorateCfg.GetAreaIdByIndoorId(k) + if AreaId < d.AreaId { + delete(d.PartCost, k) + } + } + if len(d.PartCost) == 0 { + d.initPartCost(d.AreaId) + } } d.DecorateNum++ @@ -93,6 +108,12 @@ func (d *Decorate) GetDecorateCostItem(AreaId, DecorateId int, DecorateOffIsExis PartItemList = PartItem.Items delete(d.PartCost, Id) } + for k := range d.PartCost { + AreaId := decorateCfg.GetAreaIdByIndoorId(k) + if AreaId < d.AreaId { + delete(d.PartCost, k) + } + } if len(d.PartCost) == 0 { d.initPartCost(d.AreaId + 1) } @@ -155,9 +176,15 @@ func (d *Decorate) DecorateAll(Star int, DecorateOffIsExist bool) ([]*item.Item, d.AreaId++ d.Progress = 0 d.FinishList = make(map[int]struct{}) - } - if len(d.PartCost) == 0 { - d.initPartCost(d.AreaId) + for k := range d.PartCost { + AreaId := decorateCfg.GetAreaIdByIndoorId(k) + if AreaId < d.AreaId { + delete(d.PartCost, k) + } + } + if len(d.PartCost) == 0 { + d.initPartCost(d.AreaId) + } } SubItems = append(SubItems, item.NewItem(item.ITEM_STAR_ID, SubItem)) return SubItems, AddItem, Num, DecorateList, Log, PetExp diff --git a/src/server/game/mod/limited_time_event/limited_time_event.go b/src/server/game/mod/limited_time_event/limited_time_event.go index 218c8ae3..62aa7f98 100644 --- a/src/server/game/mod/limited_time_event/limited_time_event.go +++ b/src/server/game/mod/limited_time_event/limited_time_event.go @@ -6,6 +6,7 @@ import ( limitedTimeEventCfg "server/conf/limited_time_event" mergeDataCfg "server/conf/merge_data" "server/game/mod/item" + "server/game/mod/order" GoUtil "server/game_util" "server/msg" ) @@ -268,30 +269,38 @@ func (l *LimitedTimeEventMod) ProgressBackData() *msg.ResLimitEventProgress { } // 获取流星雨奖励 -func (l *LimitedTimeEventMod) GetMeteorReward(MergeList []int) []*item.Item { - MaxLv := 0 - Star := 0 +func (l *LimitedTimeEventMod) GetMeteorReward(MergeList, EmitList []int) []*item.Item { + eneryg := 0 for _, v := range MergeList { ChessLv := mergeDataCfg.GetLvById(v) - Star += mergeDataCfg.GetStarById(v) - if ChessLv > MaxLv { - MaxLv = ChessLv + Color := mergeDataCfg.GetColorById(v) + EmitId := order.GetEmitByColor(EmitList, Color) + if EmitId == 0 { + continue } + NewChessLv := mergeDataCfg.DynamicLevRev(ChessLv, EmitId, Color) + eneryg += int(math.Pow(2, float64(NewChessLv))) } - Add := limitedTimeEventCfg.GetMeteorAdd(MaxLv) - NewStar := int(float64(Star) * (float64(Add) / 100)) - NewStar = max(NewStar, 1) + + NewStar := int(max(math.Ceil(float64(eneryg)/0.36*0.1), 1)) return []*item.Item{{Id: item.ITEM_STAR_ID, Num: NewStar}} } // 获取宝箱雨奖励 -func (l *LimitedTimeEventMod) GetChestReward(MergeList []int) []*item.Item { - Star := 0 +func (l *LimitedTimeEventMod) GetChestReward(MergeList, EmitList []int) []*item.Item { + eneryg := 0 for _, v := range MergeList { - Star += mergeDataCfg.GetStarById(v) + ChessLv := mergeDataCfg.GetLvById(v) + Color := mergeDataCfg.GetColorById(v) + EmitId := order.GetEmitByColor(EmitList, Color) + if EmitId == 0 { + continue + } + NewChessLv := mergeDataCfg.DynamicLevRev(ChessLv, EmitId, Color) + eneryg += int(math.Pow(2, float64(NewChessLv-1))) } - - return limitedTimeEventCfg.GetChestReward(Star) + star := math.Ceil(float64(eneryg) / 10 / 2.5) + return []*item.Item{item.NewItem(item.ITEM_DIAMOND_ID, int(star))} } // 获取场景冲刺奖励 @@ -307,24 +316,17 @@ func (l *LimitedTimeEventMod) GetSceneDashReward() (int, []*item.Item) { } // 获取连击快手奖励 -func (l *LimitedTimeEventMod) GetFastProduceReward(Energy int) ([]*item.Item, int64, int, error) { +func (l *LimitedTimeEventMod) GetFastProduceReward(Energy int) ([]*item.Item, error) { Event, ok := l.EventList[EVENT_TYPE_FAST_PRODUCE] if !ok { - return nil, 0, 0, fmt.Errorf("FastProduce event not exist") + return nil, fmt.Errorf("FastProduce event not exist") } Now := GoUtil.Now() if Now < GoUtil.Int64(Event.Info["NextPlay"]) { - return nil, 0, 0, fmt.Errorf("FastProduce CD") + return nil, fmt.Errorf("FastProduce CD") } - Times := GoUtil.Int(Event.Info["Times"]) - Times++ - Event.Info["Times"] = Times - MaxTimes := limitedTimeEventCfg.GetFastProduceMaxTimes() - Times = min(Times, MaxTimes) - CD := limitedTimeEventCfg.GetFastCD() - Event.Info["NextPlay"] = GoUtil.Now() + int64(CD) // CD5分钟 - return limitedTimeEventCfg.GetFastProduceReward(Times, Energy), GoUtil.Now() + int64(CD), Event.Info["Times"].(int), nil + return limitedTimeEventCfg.GetFastProduceReward(Energy), nil } func (l *LimitedTimeEventMod) ResetFastProduceCD() { @@ -434,17 +436,18 @@ func (l *LimitedTimeEventMod) AddCatTrickEnergy(Energy int) { } func (l *LimitedTimeEventMod) SubPaybackDay() error { - if l.EventList[EVENT_TYPE_PAYBACK_DAY] == nil { - return fmt.Errorf("PaybackDay event not exist") - } - d := l.EventList[EVENT_TYPE_PAYBACK_DAY].D.(*PaybackDay) - if d.count <= 0 { - return fmt.Errorf("PaybackDay count is 0") - } - d.count-- - if d.count <= 0 { - delete(l.EventList, EVENT_TYPE_PAYBACK_DAY) - } + // 2026.1.20 改版 不限制次数 + // if l.EventList[EVENT_TYPE_PAYBACK_DAY] == nil { + // return fmt.Errorf("PaybackDay event not exist") + // } + // d := l.EventList[EVENT_TYPE_PAYBACK_DAY].D.(*PaybackDay) + // if d.count <= 0 { + // return fmt.Errorf("PaybackDay count is 0") + // } + // d.count-- + // if d.count <= 0 { + // delete(l.EventList, EVENT_TYPE_PAYBACK_DAY) + // } return nil } @@ -457,8 +460,9 @@ func (l *LimitedTimeEventMod) GetCatTrickReward() ([]*item.Item, error) { return nil, fmt.Errorf("CatTrick energy not enough") } d.Energy -= 100 + // TODO 放到配置中 return []*item.Item{ - {Id: item.ITEM_DIAMOND_ID, Num: 1}, + {Id: item.ITEM_DIAMOND_ID, Num: 5}, }, nil } diff --git a/src/server/game/mod_factory.go b/src/server/game/mod_factory.go index 4d9187e6..3b8c07bb 100644 --- a/src/server/game/mod_factory.go +++ b/src/server/game/mod_factory.go @@ -3,6 +3,7 @@ package game import ( "server/game/mod/chess" "server/game/mod/decorate" + limitedTimeEvent "server/game/mod/limited_time_event" ) func (p *Player) GetChessMod() *chess.ChessBorad { @@ -12,3 +13,7 @@ func (p *Player) GetChessMod() *chess.ChessBorad { func (p *Player) GetDecorateMod() *decorate.Decorate { return p.PlayMod.getDecorateMod() } + +func (p *Player) GetLimitEventMod() *limitedTimeEvent.LimitedTimeEventMod { + return p.PlayMod.getLimitedTimeEventMod() +} diff --git a/src/server/game/player_base_mod.go b/src/server/game/player_base_mod.go index 0f092636..d0dabf0c 100644 --- a/src/server/game/player_base_mod.go +++ b/src/server/game/player_base_mod.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "server/MergeConst" + "server/conf" baseCfg "server/conf/base" userCfg "server/conf/user" "server/db" @@ -136,7 +137,7 @@ func (p *PlayerBaseData) SaveDataFromDB(Key interface{}) bool { sqlStruck.LoginTime = int32(BaseMod.LoginTime) sqlStruck.UserName = p.Data.UserName sqlStruck.LogoutTime = int32(BaseMod.LogoutTime) - sqlStruck.Node = p.Data.Node + sqlStruck.Node = int32(conf.Server.ServerID) sqlStruck.Rolecreatetime = p.Data.Rolecreatetime sqlStruck.NoAd = p.Data.NoAd sqlStruck.ChampshipsGroupID = p.Data.ChampshipsGroupID @@ -611,6 +612,9 @@ func (p *PlayerBaseData) GetLastLoginTime() int { } func (p *PlayerBaseData) GetName() string { + if p == nil { + return "" + } return p.Data.UserName } diff --git a/src/server/game/player_chess_mod.go b/src/server/game/player_chess_mod.go index 4ebefa4b..c443d2f5 100644 --- a/src/server/game/player_chess_mod.go +++ b/src/server/game/player_chess_mod.go @@ -92,7 +92,7 @@ func (p *PlayerChessData) UpdatePlayerChessData(player *Player, buf []byte) erro for _, v := range update.MChessHandle { HandleStr += fmt.Sprintf("%v-%v-%v,", v.Id, v.ChessId, v.Type) } - log.Debug("棋子数据不一致, %v===%v===%v===%v", HandleStr, LastMap, update.MChessData, player.PlayMod.getChessMod().GetChessList()) + log.Debug("棋子数据不一致地图, %v===%v===%v===%v", HandleStr, LastMap, update.MChessData, player.PlayMod.getChessMod().GetChessList()) player.SendErrClienRes(res) player.TeLog("outsync_event", map[string]interface{}{ "outsync_event": "UpdatePlayerChessDataFunc", @@ -118,7 +118,7 @@ func (p *PlayerChessData) UpdateChessData(player *Player, MChessData map[string] Code: msg.RES_CODE_FAIL, Msg: "棋子数据不一致", } - log.Debug("棋子数据不一致, %v---%v", p.Data.MChessData, player.PlayMod.getChessMod().GetChessList()) + log.Debug("棋子数据不一致地图, %v---%v---%v", player.PlayMod.getChessMod().ChessMap, p.Data.MChessData, player.PlayMod.getChessMod().GetChessList()) player.SendErrClienRes(res) player.TeLog("outsync_event", map[string]interface{}{ "outsync_event": "UpdatePlayerChessDataFunc", @@ -165,7 +165,47 @@ func (p *PlayerChessData) checkChessEqual(player *Player) bool { for _, v := range p.Data.MChessData { bCopy = append(bCopy, int(v)) } - return SlicesEqual(aCopy, bCopy) + isEqual := SlicesEqual(aCopy, bCopy) + + if isEqual { + return true + } + // 找出aCopy多的元素和少的元素 + aMap := make(map[int]int) + bMap := make(map[int]int) + + for _, v := range aCopy { + aMap[v]++ + } + for _, v := range bCopy { + bMap[v]++ + } + + extra := make([]int, 0) + missing := make([]int, 0) + + // 找出aCopy多的元素 + for k, v := range aMap { + if bMap[k] < v { + for i := 0; i < v-bMap[k]; i++ { + extra = append(extra, k) + } + } + } + + // 找出aCopy少的元素 + for k, v := range bMap { + if aMap[k] < v { + for i := 0; i < v-aMap[k]; i++ { + missing = append(missing, k) + } + } + } + + if len(extra) > 0 || len(missing) > 0 { + log.Debug("棋子数据对比: aCopy多的元素=%v, aCopy少的元素=%v", extra, missing) + } + return false } // 棋子操作 diff --git a/src/server/game/player_data.go b/src/server/game/player_data.go index 9cc3bda4..5a971a0e 100644 --- a/src/server/game/player_data.go +++ b/src/server/game/player_data.go @@ -27,6 +27,7 @@ import ( "server/game/mod/quest" GoUtil "server/game_util" "server/msg" + telog "server/thinkdata" "strconv" "sync" "time" @@ -268,8 +269,10 @@ func (p *Player) OrderShip() { if err != nil { return } + // 避免为每个订单创建 goroutine,改为批量处理或同步处理 for _, OrderInfo := range OrderList { - go p.TriggerShippingOrderOrigin(&msg.ReqShippingOrder{ + // 直接同步处理,避免创建过多 goroutine + p.TriggerShippingOrderOrigin(&msg.ReqShippingOrder{ OrderSn: OrderInfo.OrderId, }) } @@ -417,6 +420,7 @@ func (p *Player) Login() { if Duration > 604800 { FriendMod := p.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_LOST_USER_RETURN, "") + p.UpdateUserInfo() } } @@ -708,6 +712,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error { }) FriendMod := p.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_AVATAR_FRAME, "") + p.UpdateUserInfo() p.PlayerDecoLog("avatar", Effect[0], Label) BackDataType[item.ITEM_TYPE_AVATAR] = struct{}{} case item.ITEM_TYPE_EMOJI: // 表情 @@ -719,6 +724,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error { }) FriendMod := p.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_EMOTION, "") + p.UpdateUserInfo() p.PlayerDecoLog("emoji", Effect[0], Label) BackDataType[item.ITEM_TYPE_EMOJI] = struct{}{} case item.ITEM_TYPE_FACE: // 头像 @@ -730,6 +736,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error { }) FriendMod := p.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_AVATAR, "") + p.UpdateUserInfo() p.PlayerDecoLog("face", Effect[0], Label) BackDataType[item.ITEM_TYPE_FACE] = struct{}{} case item.ITEM_TYPE_ACTIVITY_RACE: // 活动竞速 @@ -755,6 +762,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error { Type, Name := playroomCfg.GetDecoInfo(Effect) FriendMod := p.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_DECORATION, "") + p.UpdateUserInfo() p.TeLog("room_deco_get", map[string]interface{}{ "room_deco_type": Type, "room_deco_name": Name, @@ -769,6 +777,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error { Name := playroomCfg.GetDressName(Effect) FriendMod := p.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_COSTUME, "") + p.UpdateUserInfo() p.TeLog("pet_deco_get", map[string]interface{}{ "pet_deco_type": Type, "pet_deco_name": Name, @@ -1061,8 +1070,9 @@ func (p *Player) TeLog(Type string, Param map[string]interface{}) { } //Param["#zone_offset"] = -5 // 游戏内TE日志 - //go telog.Te.Track(p.GetPlayerBaseMod().GetName(), p.GetPlayerBaseMod().GetName(), Type, Param) BaseMod := p.PlayMod.getBaseMod() + UidStr := GoUtil.String(p.M_DwUin) + go telog.Te.Track(BaseMod.Account, UidStr, Type, Param) //途游GA go ga.GAlogEvent(Type, BaseMod.Account, "", Param) } @@ -1205,7 +1215,8 @@ func (p *Player) DispatcherHandle() { if msg != nil { p.wg.Done() log.Debug("player %d recive msg %v", p.M_DwUin, msg) - go p.HandleMsg(msg.Clone()) + // 直接在当前 goroutine 中处理,避免创建过多 goroutine + p.HandleMsg(msg.Clone()) } } } diff --git a/src/server/game/register_network_func.go b/src/server/game/register_network_func.go index 5ac81bba..ea3bb576 100644 --- a/src/server/game/register_network_func.go +++ b/src/server/game/register_network_func.go @@ -245,6 +245,9 @@ func RegHandbookAllReward(player *Player, buf []byte) error { }) return err } + FriendMod := player.PlayMod.getFriendMod() + FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_HANDBOOK_ACHIEVEMENT, req.Type) + player.UpdateUserInfo() player.PushClientRes(&msg.ResHandbookAllReward{ Code: msg.RES_CODE_SUCCESS, }) @@ -290,7 +293,7 @@ func ReqRewardOrder(player *Player, buf []byte) error { } } if LimitedTimeEventMod.CheckExist(limitedTimeEvent.EVENT_TYPE_METEOR_SHOW) { //流星雨活动 - AddItem := LimitedTimeEventMod.GetMeteorReward(mergeList) + AddItem := LimitedTimeEventMod.GetMeteorReward(mergeList, ChessMod.GetStarEmitList()) if len(AddItem) > 0 { player.TeLog("time_limited_event_action", map[string]interface{}{ "event_type": limitedTimeEventCfg.GetEventName(limitedTimeEvent.EVENT_TYPE_METEOR_SHOW), @@ -338,7 +341,7 @@ func ReqRewardOrder(player *Player, buf []byte) error { } if LimitedTimeEventMod.CheckExist(limitedTimeEvent.EVENT_TYPE_CHEST_RAIN) { //宝箱雨活动 - ChestRainItems := LimitedTimeEventMod.GetChestReward(mergeList) + ChestRainItems := LimitedTimeEventMod.GetChestReward(mergeList, ChessMod.GetStarEmitList()) player.args["ResItemPopId"] = req.OrderId err = player.HandleItem(ChestRainItems, msg.ITEM_POP_LABEL_LimitEventChestRain.String()) if err != nil { @@ -560,7 +563,9 @@ func ReqDecorate(player *Player, buf []byte) error { if AreaId == 1 && DecorateId == 44 { FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_CLOAKROOM, "") } - + if AreaId != DecorateMod.AreaId { + FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_CHAPTER_SCENES, GoUtil.String(AreaId)) + } player.PlayMod.save() player.PushClientRes(DecorateMod.BackData()) player.PushClientRes(&msg.ResDecorate{ @@ -1190,6 +1195,7 @@ func ReqCardCollectReward(player *Player, buf []byte) error { player.PlayMod.getChessMod().AddChessBuff(chess) player.PushClientRes(player.PlayMod.getOrderMod().BackData()) } + player.UpdateUserInfo() player.PlayMod.save() player.PushClientRes(CardMod.NotifyCard()) player.PushClientRes(&msg.ResCardCollectReward{ @@ -1271,6 +1277,7 @@ func ReqAllCollectReward(player *Player, buf []byte) error { } FriendMod := player.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_ALL_CARDS, "all") + player.UpdateUserInfo() player.PlayMod.save() player.TeLog("ReqAllCollectReward", map[string]interface{}{ "item_list": itemList, @@ -1711,7 +1718,7 @@ func ReqFastProduceReward(player *Player, buf []byte) error { return err } LimitedTimeEventMod := player.PlayMod.getLimitedTimeEventMod() - itemList, EndTime, Times, err := LimitedTimeEventMod.GetFastProduceReward(int(req.Energy)) + itemList, err := LimitedTimeEventMod.GetFastProduceReward(int(req.Energy)) if err != nil { player.SendErrClienRes(&msg.ResFastProduceReward{ Code: msg.RES_CODE_FAIL, @@ -1731,14 +1738,10 @@ func ReqFastProduceReward(player *Player, buf []byte) error { player.TeLog("ReqFastProduceReward", map[string]interface{}{ "energy": int(req.Energy), "item_list": itemList, - "end_time": EndTime, - "times": Times, }) player.PushClientRes(LimitedTimeEventMod.BackData()) player.PushClientRes(&msg.ResFastProduceReward{ - Code: msg.RES_CODE_SUCCESS, - EndTime: EndTime, - Num: int32(Times), + Code: msg.RES_CODE_SUCCESS, }) return nil } @@ -2993,6 +2996,7 @@ func ReqChampshipReward(player *Player, buf []byte) error { if MaxId == ChampshipMod.Reward { FriendMod := player.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_CHAMPIONSHIP_PRIZE, "") + player.UpdateUserInfo() } player.TeLog("championship_reward", map[string]interface{}{ "season_id": GoUtil.ZeroTimestamp(), @@ -3104,6 +3108,8 @@ func ReqFriendTLUpvote(player *Player, buf []byte) error { To: int(FUid), SendT: GoUtil.Now(), } + FriendMod.AddActLog(friend.ACT_LOG_TYPE_VISIT_UPVOTE, "") + player.UpdateUserInfo() FriendMgrSend(m) player.PlayMod.save() player.PushClientRes(&msg.ResFriendTLUpvote{ @@ -3169,6 +3175,7 @@ func ReqChampshipRankReward(player *Player, buf []byte) error { if myPreRank <= 5 { FriendMod := player.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_CHAMPIONSHIP_RANK, GoUtil.String(myPreRank)) + player.UpdateUserInfo() } player.PlayMod.save() player.BackChampship() @@ -3960,6 +3967,7 @@ func ReqPlayroomSelectReward(player *Player, buf []byte) error { }) FriendMod := player.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_VISIT_GAME_PRIZE_1, "") + player.UpdateUserInfo() PlayroomMod.ResetGame() player.PlayerDecoSetLog("emoji", int(req.EmojiId), "playroom_select_reward") player.PlayroomBackData() @@ -4154,10 +4162,7 @@ func ReqPlayroomFlipReward(player *Player, buf []byte) error { if LimitedTimeEventMod.CheckExist(limitedTimeEvent.EVENT_TYPE_PET_THIEF) && Result == playroom.FLIP_TYPE_GOLD { player.GetPetThiefReward(Target) } - if Result == playroom.FLIP_TYPE_GOLD { - FriendMod := player.PlayMod.getFriendMod() - FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_VISIT_GAME_PRIZE, "") - } + err = player.HandleItem(Items1, msg.ITEM_POP_LABEL_PlayroomFlip.String()) if err != nil { player.SendErrClienRes(&msg.ResPlayroomFlip{ @@ -4166,6 +4171,11 @@ func ReqPlayroomFlipReward(player *Player, buf []byte) error { }) return err } + if Result == playroom.FLIP_TYPE_GOLD { + FriendMod := player.PlayMod.getFriendMod() + FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_VISIT_GAME_PRIZE, "") + player.UpdateUserInfo() + } FriendMgrSend(&MsqMod.Msg{ From: int(player.M_DwUin), To: Target, @@ -4651,6 +4661,7 @@ func ReqFriendTreasureEnd(player *Player, buf []byte) error { } FriendMod := player.PlayMod.getFriendMod() FriendMod.AddActLog(friend.ACT_LOG_TYPE_OPEN_PET_TREASURE, "") + player.UpdateUserInfo() player.TeLog("pet_treasure_open", map[string]interface{}{ "pet_treasure_step": FriendTreasureMod.Shift, "pet_treasure_box": FriendTreasureMod.BoxItems, diff --git a/src/server/game/server_mod.go b/src/server/game/server_mod.go index eb1ce64a..cddcee79 100644 --- a/src/server/game/server_mod.go +++ b/src/server/game/server_mod.go @@ -43,8 +43,9 @@ func (s *ServerMod) init() { s.handler = make(map[int]interface{}) s.update = false s.LoadData() + // 直接调用 SaveData,不要创建新的 goroutine s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME)*time.Second, func() { - go s.SaveData() + s.SaveData() }) go func() { defer func() { @@ -96,8 +97,9 @@ func (s *ServerMod) Send(msg *msg.Msg) { // 同步请求 func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) { - responseChan := make(chan interface{}) - errorChan := make(chan error) + // 使用带缓冲的 channel 避免 goroutine 在超时时泄漏 + responseChan := make(chan interface{}, 1) + errorChan := make(chan error, 1) go func() { defer func() { @@ -129,8 +131,9 @@ func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) { // mysql 保存消息 func (s *ServerMod) SaveData() { + // 直接在定时器回调中执行,不创建新的 goroutine 避免泄漏 s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME+GoUtil.RandNum(5, 10))*time.Second, func() { - go s.SaveData() + s.SaveData() }) DbData := db.SqlServerModStruct{} DbData.Key = s.key @@ -139,23 +142,41 @@ func (s *ServerMod) SaveData() { DbData.ModData, err = GoUtil.GobMarshal(s.data) if err != nil { log.Error("SaveData Marshal failed,Mod Key: %s err:%v", s.key, err) + return } // log.Debug("SaveData Marshal success,Mod Key: %s", s.key) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + + // 使用线程安全的方式获取数据库连接 + sqlDb := db.GetDB() + if sqlDb == nil { + log.Error("SaveData failed, database connection is nil, Mod Key: %s", s.key) + return + } + + // 先测试连接是否可用 + if err := sqlDb.Ping(); err != nil { + log.Error("SaveData failed, database ping error, Mod Key: %s err:%v", s.key, err) + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() txOptions := &sql.TxOptions{} - tx, err := db.SqlDb.BeginTx(ctx, txOptions) + tx, err := sqlDb.BeginTx(ctx, txOptions) if err != nil { - log.Error("SaveData sql begin tx failed,Mod Key: %s err:%v", s.key, err) + log.Error("SaveData sql begin tx failed,Mod Key: %s err:%v, data size: %d bytes", s.key, err, len(DbData.ModData)) return } - err = db.SaveServerData(&DbData) + err = db.SaveServerDataWithTx(tx, &DbData) if err != nil { tx.Rollback() - log.Error("SaveData sql exec ,Mod Key: %s err:%v", s.key, err) + log.Error("SaveData sql exec failed,Mod Key: %s err:%v", s.key, err) return } - tx.Commit() + err = tx.Commit() + if err != nil { + log.Error("SaveData sql commit failed,Mod Key: %s err:%v", s.key, err) + } } func (s *ServerMod) LoadData() { diff --git a/src/server/game/unit_test.go b/src/server/game/unit_test.go index 746caa13..ffa0d114 100644 --- a/src/server/game/unit_test.go +++ b/src/server/game/unit_test.go @@ -185,9 +185,9 @@ func UnitLimitProgress(p *Player) error { func UnitLimitedTimeEvent(p *Player) error { LimitedTimeEventMod := p.PlayMod.getLimitedTimeEventMod() - + ChessMod := p.PlayMod.getChessMod() mergeList := []int{246, 15} - AddItem := LimitedTimeEventMod.GetChestReward(mergeList) + AddItem := LimitedTimeEventMod.GetChestReward(mergeList, ChessMod.GetStarEmitList()) fmt.Print(AddItem) return nil } diff --git a/src/server/main.go b/src/server/main.go index 92ba76e1..bb9e2371 100644 --- a/src/server/main.go +++ b/src/server/main.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" _ "net/http/pprof" + "runtime" "runtime/debug" "server/conf" "server/game" @@ -13,6 +14,9 @@ import ( ) func main() { + // 设置使用所有 CPU 核心 + runtime.GOMAXPROCS(runtime.NumCPU()) + lconf.LogLevel = conf.Server.LogLevel lconf.LogPath = conf.Server.LogPath lconf.LogFlag = conf.LogFlag diff --git a/src/server/msg/Gameapi.pb.go b/src/server/msg/Gameapi.pb.go index 7c6158c6..81e4fcad 100644 --- a/src/server/msg/Gameapi.pb.go +++ b/src/server/msg/Gameapi.pb.go @@ -27231,7 +27231,7 @@ func (x *AdminReq) GetInfo() []byte { type AdminRes struct { state protoimpl.MessageState `protogen:"open.v1"` Func string `protobuf:"bytes,1,opt,name=Func,proto3" json:"Func,omitempty"` - Info []byte `protobuf:"bytes,2,opt,name=Info,proto3" json:"Info,omitempty"` + Info string `protobuf:"bytes,2,opt,name=Info,proto3" json:"Info,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -27273,11 +27273,11 @@ func (x *AdminRes) GetFunc() string { return "" } -func (x *AdminRes) GetInfo() []byte { +func (x *AdminRes) GetInfo() string { if x != nil { return x.Info } - return nil + return "" } type ReqAdminInfo struct { @@ -29602,7 +29602,7 @@ const file_proto_Gameapi_proto_rawDesc = "" + "\x04Info\x18\x02 \x01(\fR\x04Info\"2\n" + "\bAdminRes\x12\x12\n" + "\x04Func\x18\x01 \x01(\tR\x04Func\x12\x12\n" + - "\x04Info\x18\x02 \x01(\fR\x04Info\" \n" + + "\x04Info\x18\x02 \x01(\tR\x04Info\" \n" + "\fReqAdminInfo\x12\x10\n" + "\x03Uid\x18\x01 \x01(\x03R\x03Uid\"\x15\n" + "\x13ReqReloadServerMail\"\x0f\n" + diff --git a/src/server/pkg/github.com/name5566/leaf/log/log.go b/src/server/pkg/github.com/name5566/leaf/log/log.go index ffde67a9..bc1c9774 100644 --- a/src/server/pkg/github.com/name5566/leaf/log/log.go +++ b/src/server/pkg/github.com/name5566/leaf/log/log.go @@ -18,8 +18,8 @@ const ( releaseLevel = 1 errorLevel = 2 fatalLevel = 3 - - DEBUG_LEVEL = 0 + warnLevel = 4 + DEBUG_LEVEL = 0 ) const ( @@ -27,6 +27,7 @@ const ( printReleaseLevel = "[release] " printErrorLevel = "[error ] " printFatalLevel = "[fatal ] " + printWarnLevel = "[warn ] " ) type Logger struct { @@ -52,6 +53,8 @@ func New(strLevel string, pathname string, flag int) (*Logger, error) { level = errorLevel case "fatal": level = fatalLevel + case "warn": + level = warnLevel default: return nil, errors.New("unknown level: " + strLevel) } @@ -100,6 +103,8 @@ func NewDailyLog(now time.Time, Level int, pathname string, flag int) error { level = errorLevel case fatalLevel: level = fatalLevel + case warnLevel: + level = warnLevel default: return fmt.Errorf("unknown level: %d", Level) } @@ -145,6 +150,8 @@ func BindLoggerToFile(strLevel string, pathName string, flag int) (*Logger, erro level = errorLevel case "fatal": level = fatalLevel + case "warn": + level = warnLevel default: return nil, errors.New("unknown level: " + strLevel) } @@ -207,6 +214,9 @@ func (logger *Logger) Error(format string, a ...interface{}) { func (logger *Logger) Fatal(format string, a ...interface{}) { logger.doPrintf(fatalLevel, printFatalLevel, format, a...) } +func (logger *Logger) Warn(format string, a ...interface{}) { + logger.doPrintf(warnLevel, printWarnLevel, format, a...) +} // It's dangerous to call the method on logging func Export(logger *Logger) { @@ -235,6 +245,15 @@ func Release(format string, a ...interface{}) { gLogger.doPrintf(releaseLevel, printReleaseLevel, format, a...) } +func Warn(format string, a ...interface{}) { + gloggerLock.Lock() + defer gloggerLock.Unlock() + if gLogger == nil { + return + } + gLogger.doPrintf(warnLevel, printWarnLevel, format, a...) +} + func Error(format string, a ...interface{}) { gloggerLock.Lock() defer gloggerLock.Unlock() diff --git a/src/server/test/conf/server.json b/src/server/test/conf/server.json index c6779e49..b1a8388a 100644 --- a/src/server/test/conf/server.json +++ b/src/server/test/conf/server.json @@ -1,5 +1,5 @@ { - "AppID": 1, + "AppID": 0, "LogLevel": "debug", "LogPath": "./log", "TCPAddr": ":3601", @@ -9,7 +9,7 @@ "MySqlUsr": "root", "MySqlPwd": "IOagNEq3C84c-20CmHEin5iODVc=", "MaxConnNum": 20000, - "DbName": "Merge_Pet_1", + "DbName": "merge_pet_1", "HttpPort": ":8081", "AppPath": "./app", "TELOGDIR" : "./teLog/", diff --git a/src/server/test/fix_test.go b/src/server/test/fix_test.go index 4898481f..ea90b595 100644 --- a/src/server/test/fix_test.go +++ b/src/server/test/fix_test.go @@ -1,7 +1,11 @@ package test import ( + "fmt" + decorateCfg "server/conf/decorate" + "server/db" "server/game" + "server/pkg/github.com/name5566/leaf/log" "testing" ) @@ -11,7 +15,7 @@ func TestFixDecorate(t *testing.T) { p.FixDecorate() // - p.InitPlayer("202601K111") + p.InitPlayer("dee8eeb83ea3c54e48427b7ff20066fb") p.FixDecorate() DecorateMod := p.GetDecorateMod() @@ -19,3 +23,58 @@ func TestFixDecorate(t *testing.T) { DecorateMod.Progress = 22 p.FixDecorate() } + +func TestFixUserData(t *testing.T) { + // 确保数据库已初始化 + if db.SqlDb == nil { + db.InitDB() + // 等待初始化完成 + for i := 0; i < 10; i++ { + if db.SqlDb != nil { + break + } + log.Warn("Waiting for database initialization...") + // time.Sleep(100 * time.Millisecond) + } + } + + if db.SqlDb == nil { + t.Fatal("Database initialization failed") + } + + log.Warn("hello world") + type account struct { + Account string `db:"user_name"` + } + var accounts []account + sqlDb := db.GetDB() // 使用线程安全的方式获取连接 + if sqlDb == nil { + t.Fatal("Database connection is nil") + } + err := sqlDb.Select(&accounts, "SELECT `user_name` FROM t_account order by auto_id") + if err != nil { + t.Errorf("Failed to fetch accounts: %v", err) + return + } + i := 0 + for _, acc := range accounts { + i++ + fmt.Printf("Fixing account %d/%d: %s\n", i, len(accounts), acc.Account) + account := acc.Account + p := new(game.Player) + p.InitPlayer(account) + DecorateMod := p.GetDecorateMod() + if DecorateMod.PartCost == nil { + return + } + for k := range DecorateMod.PartCost { + AreaId := decorateCfg.GetAreaIdByIndoorId(k) + if AreaId < DecorateMod.AreaId { + log.Debug("Fixing account: %s, PartId: %d, OldAreaId: %d, NewAreaId: %d\n", account, k, AreaId, DecorateMod.AreaId) + } + } + p.Stop() + p = nil + } + log.Debug("All accounts fixed") +} diff --git a/src/server/test/limit_test.go b/src/server/test/limit_test.go new file mode 100644 index 00000000..9d10a98f --- /dev/null +++ b/src/server/test/limit_test.go @@ -0,0 +1,30 @@ +package test + +import ( + "server/game" + limitedTimeEvent "server/game/mod/limited_time_event" + "testing" +) + +func TestMetroRain(t *testing.T) { + // 3625212 + p := new(game.Player) + p.InitPlayer("3625212") + ChessMod := p.GetChessMod() + LimitEventMod := p.GetLimitEventMod() + rewards := LimitEventMod.GetMeteorReward([]int{1, 22, 3}, ChessMod.GetStarEmitList()) + t.Logf("rewards: %v", rewards) +} + +func TestFast(t *testing.T) { + p := new(game.Player) + p.InitPlayer("3625212") + LimitEventMod := p.GetLimitEventMod() + LimitEventMod.AddEvent(limitedTimeEvent.EVENT_TYPE_FAST_PRODUCE, 60) + items, err := LimitEventMod.GetFastProduceReward(50) + if err != nil { + t.Errorf("GetFastProduceReward error: %v", err) + return + } + t.Logf("Fast produce items: %v", items) +} diff --git a/src/server/test/order_test.go b/src/server/test/order_test.go index 456bc06d..a8eb5bc6 100644 --- a/src/server/test/order_test.go +++ b/src/server/test/order_test.go @@ -20,3 +20,23 @@ func TestOrderStart(t *testing.T) { star = int(float64(star)*float64(order_facotry)/1000+0.5) * 10 fmt.Printf("star is %d", star) } + +func TestOrderFinish(t *testing.T) { + p1 := new(game.Player) + p1.InitPlayer("3625212") + game.G_GameLogicPtr.SetPlayer(p1) + ChessMod := p1.GetChessMod() + err := ChessMod.FinishOrder([]int{1, 2, 3}) + if err != nil { + t.Errorf("finish order failed:%v", err) + } +} + +func TestChestRain(t *testing.T) { + p1 := new(game.Player) + p1.InitPlayer("GSTTEST011") + ChessMod := p1.GetChessMod() + LimitEventMod := p1.GetLimitEventMod() + f := LimitEventMod.GetChestReward([]int{928}, ChessMod.GetStarEmitList()) + fmt.Printf("chest rain reward:%v", f) +} diff --git a/src/server/thinkdata/te.go b/src/server/thinkdata/te.go index 76734023..a4c0f84b 100644 --- a/src/server/thinkdata/te.go +++ b/src/server/thinkdata/te.go @@ -2,8 +2,7 @@ package telog import ( "server/conf" - - "github.com/ThinkingDataAnalytics/go-sdk/v2/src/thinkingdata" + "server/thinkingdata" ) var Te thinkingdata.TDAnalytics diff --git a/src/server/thinkingdata/consumer_batch.go b/src/server/thinkingdata/consumer_batch.go new file mode 100644 index 00000000..8d559121 --- /dev/null +++ b/src/server/thinkingdata/consumer_batch.go @@ -0,0 +1,352 @@ +package thinkingdata + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" +) + +// TDBatchConsumer upload data to TE by http +type TDBatchConsumer struct { + serverUrl string // serverUrl + appId string // appId + timeout time.Duration // http timeout (mill second) + compress bool // is need compress + bufferMutex *sync.RWMutex + cacheMutex *sync.RWMutex // cache mutex + + buffer []Data + batchSize int // flush event count each time + cacheBuffer [][]Data // buffer + cacheCapacity int // buffer max count +} + +type TDBatchConfig struct { + ServerUrl string // serverUrl + AppId string // appId + BatchSize int // flush event count each time + Timeout int // http timeout (mill second) + Compress bool // enable compress data + AutoFlush bool // enable auto flush + Interval int // auto flush spacing (second) + CacheCapacity int // cache event count +} + +const ( + DefaultTimeOut = 30000 + DefaultBatchSize = 20 + MaxBatchSize = 200 + DefaultInterval = 30 + DefaultCacheCapacity = 50 +) + +// NewBatchConsumer create TDBatchConsumer +func NewBatchConsumer(serverUrl string, appId string) (TDConsumer, error) { + config := TDBatchConfig{ + ServerUrl: serverUrl, + AppId: appId, + Compress: true, + } + return initBatchConsumer(config) +} + +// NewBatchConsumerWithBatchSize create TDBatchConsumer +// serverUrl +// appId +// batchSize: flush event count each time +func NewBatchConsumerWithBatchSize(serverUrl string, appId string, batchSize int) (TDConsumer, error) { + config := TDBatchConfig{ + ServerUrl: serverUrl, + AppId: appId, + Compress: true, + BatchSize: batchSize, + } + return initBatchConsumer(config) +} + +// NewBatchConsumerWithCompress create TDBatchConsumer +// serverUrl +// appId +// compress: enable data compress +func NewBatchConsumerWithCompress(serverUrl string, appId string, compress bool) (TDConsumer, error) { + config := TDBatchConfig{ + ServerUrl: serverUrl, + AppId: appId, + Compress: compress, + } + return initBatchConsumer(config) +} + +func NewBatchConsumerWithConfig(config TDBatchConfig) (TDConsumer, error) { + return initBatchConsumer(config) +} + +func initBatchConsumer(config TDBatchConfig) (TDConsumer, error) { + if config.ServerUrl == "" { + msg := fmt.Sprint("ServerUrl not be empty") + tdLogInfo(msg) + return nil, errors.New(msg) + } + u, err := url.Parse(config.ServerUrl) + if err != nil { + return nil, err + } + u.Path = "/sync_server" + + var batchSize int + if config.BatchSize > MaxBatchSize { + batchSize = MaxBatchSize + } else if config.BatchSize <= 0 { + batchSize = DefaultBatchSize + } else { + batchSize = config.BatchSize + } + + var cacheCapacity int + if config.CacheCapacity <= 0 { + cacheCapacity = DefaultCacheCapacity + } else { + cacheCapacity = config.CacheCapacity + } + + var timeout int + if config.Timeout == 0 { + timeout = DefaultTimeOut + } else { + timeout = config.Timeout + } + + c := &TDBatchConsumer{ + serverUrl: u.String(), + appId: config.AppId, + timeout: time.Duration(timeout) * time.Millisecond, + compress: config.Compress, + bufferMutex: new(sync.RWMutex), + cacheMutex: new(sync.RWMutex), + batchSize: batchSize, + buffer: make([]Data, 0, batchSize), + cacheCapacity: cacheCapacity, + cacheBuffer: make([][]Data, 0, cacheCapacity), + } + + var interval int + if config.Interval == 0 { + interval = DefaultInterval + } else { + interval = config.Interval + } + if config.AutoFlush { + go func() { + ticker := time.NewTicker(time.Duration(interval) * time.Second) + defer ticker.Stop() + for { + <-ticker.C + _ = c.timerFlush() + } + }() + } + + tdLogInfo("Mode: batch consumer, appId: %s, serverUrl: %s", c.appId, c.serverUrl) + + return c, nil +} + +func (c *TDBatchConsumer) Add(d Data) error { + c.bufferMutex.Lock() + c.buffer = append(c.buffer, d) + c.bufferMutex.Unlock() + + tdLogInfo("Enqueue event data: %v", d) + + if c.getBufferLength() >= c.batchSize || c.getCacheLength() > 0 { + err := c.Flush() + return err + } + + return nil +} + +func (c *TDBatchConsumer) timerFlush() error { + tdLogInfo("timer flush data") + return c.innerFlush() +} + +func (c *TDBatchConsumer) Flush() error { + tdLogInfo("flush data") + return c.innerFlush() +} + +func (c *TDBatchConsumer) innerFlush() error { + + c.cacheMutex.Lock() + defer c.cacheMutex.Unlock() + + c.bufferMutex.Lock() + defer c.bufferMutex.Unlock() + + if len(c.buffer) == 0 && len(c.cacheBuffer) == 0 { + return nil + } + + defer func() { + if len(c.cacheBuffer) > c.cacheCapacity { + c.cacheBuffer = c.cacheBuffer[1:] + } + }() + + if len(c.cacheBuffer) == 0 || len(c.buffer) >= c.batchSize { + c.cacheBuffer = append(c.cacheBuffer, c.buffer) + c.buffer = make([]Data, 0, c.batchSize) + } + + err := c.uploadEvents() + + return err +} + +func (c *TDBatchConsumer) uploadEvents() error { + buffer := c.cacheBuffer[0] + + jsonBytes, err := json.Marshal(buffer) + if err == nil { + params := parseTime(jsonBytes) + for i := 0; i < 3; i++ { + statusCode, code, err := c.send(params, len(buffer)) + if statusCode == 200 { + c.cacheBuffer = c.cacheBuffer[1:] + switch code { + case 0: + tdLogInfo("send success: %v", params) + return nil + case 1, -1: + msg := "invalid data format" + tdLogError(msg) + return fmt.Errorf(msg) + case -2: + msg := "APP ID doesn't exist" + tdLogError(msg) + return fmt.Errorf(msg) + case -3: + msg := "invalid ip transmission" + tdLogError(msg) + return fmt.Errorf(msg) + default: + msg := "unknown error" + tdLogError(msg) + return fmt.Errorf(msg) + } + } + if err != nil { + if i == 2 { + return err + } + } + } + } + return err +} + +func (c *TDBatchConsumer) FlushAll() error { + for c.getCacheLength() > 0 || c.getBufferLength() > 0 { + if err := c.Flush(); err != nil { + if !strings.Contains(err.Error(), "ThinkingDataError") { + return err + } + } + } + return nil +} + +func (c *TDBatchConsumer) Close() error { + tdLogInfo("batch consumer close") + return c.FlushAll() +} + +func (c *TDBatchConsumer) IsStringent() bool { + return false +} + +func (c *TDBatchConsumer) send(data string, size int) (statusCode int, code int, err error) { + var encodedData string + var compressType = "gzip" + if c.compress { + encodedData, err = encodeData(data) + } else { + encodedData = data + compressType = "none" + } + if err != nil { + return 0, 0, err + } + postData := bytes.NewBufferString(encodedData) + + var resp *http.Response + req, _ := http.NewRequest("POST", c.serverUrl, postData) + req.Header["appid"] = []string{c.appId} + req.Header.Set("user-agent", "ta-go-sdk") + req.Header.Set("version", SdkVersion) + req.Header.Set("compress", compressType) + req.Header["TA-Integration-Type"] = []string{LibName} + req.Header["TA-Integration-Version"] = []string{SdkVersion} + req.Header["TA-Integration-Count"] = []string{strconv.Itoa(size)} + client := &http.Client{Timeout: c.timeout} + resp, err = client.Do(req) + + if err != nil { + return 0, 0, err + } + + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + var result struct { + Code int + } + + err = json.Unmarshal(body, &result) + if err != nil { + return resp.StatusCode, 1, err + } + + return resp.StatusCode, result.Code, nil + } else { + return resp.StatusCode, -1, nil + } +} + +// Gzip +func encodeData(data string) (string, error) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + + _, err := gw.Write([]byte(data)) + if err != nil { + gw.Close() + return "", err + } + gw.Close() + + return string(buf.Bytes()), nil +} + +func (c *TDBatchConsumer) getBufferLength() int { + c.bufferMutex.RLock() + defer c.bufferMutex.RUnlock() + return len(c.buffer) +} + +func (c *TDBatchConsumer) getCacheLength() int { + c.cacheMutex.RLock() + defer c.cacheMutex.RUnlock() + return len(c.cacheBuffer) +} diff --git a/src/server/thinkingdata/consumer_debug.go b/src/server/thinkingdata/consumer_debug.go new file mode 100644 index 00000000..2e7726de --- /dev/null +++ b/src/server/thinkingdata/consumer_debug.go @@ -0,0 +1,120 @@ +package thinkingdata + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" +) + +// TDDebugConsumer The data is reported one by one, and when an error occurs, the log will be printed on the console. +type TDDebugConsumer struct { + serverUrl string // serverUrl + appId string // appId + writeData bool // is archive to TE + deviceId string // be used to debug in TE +} + +// NewDebugConsumer init TDDebugConsumer +func NewDebugConsumer(serverUrl string, appId string) (TDConsumer, error) { + return NewDebugConsumerWithWriter(serverUrl, appId, true) +} + +func NewDebugConsumerWithWriter(serverUrl string, appId string, writeData bool) (TDConsumer, error) { + return NewDebugConsumerWithDeviceId(serverUrl, appId, writeData, "") +} + +func NewDebugConsumerWithDeviceId(serverUrl string, appId string, writeData bool, deviceId string) (TDConsumer, error) { + // enable console log + SetLogLevel(TDLogLevelDebug) + + if len(serverUrl) <= 0 { + msg := fmt.Sprint("ServerUrl not be empty") + tdLogError(msg) + return nil, errors.New(msg) + } + + u, err := url.Parse(serverUrl) + if err != nil { + return nil, err + } + + u.Path = "/data_debug" + + c := &TDDebugConsumer{serverUrl: u.String(), appId: appId, writeData: writeData, deviceId: deviceId} + + tdLogInfo("Mode: debug consumer, appId: %s, serverUrl: %s", c.appId, c.serverUrl) + + return c, nil +} + +func (c *TDDebugConsumer) Add(d Data) error { + jsonBytes, err := json.Marshal(d) + if err != nil { + return err + } + + var jsonStr string + // if properties has includes complex data, SDK need parse time with regular expression + if d.IsComplex { + jsonStr = parseTime(jsonBytes) + } else { + jsonStr = string(jsonBytes) + } + + tdLogInfo("%v", jsonStr) + + return c.send(jsonStr) +} + +func (c *TDDebugConsumer) Flush() error { + tdLogInfo("flush data") + return nil +} + +func (c *TDDebugConsumer) Close() error { + tdLogInfo("debug consumer close") + return nil +} + +func (c *TDDebugConsumer) IsStringent() bool { + return true +} + +func (c *TDDebugConsumer) send(data string) error { + var dryRun = "0" + if !c.writeData { + dryRun = "1" + } + postData := url.Values{"data": {data}, "appid": {c.appId}, "source": {"server"}, "dryRun": {dryRun}} + if len(c.deviceId) > 0 { + postData.Add("deviceId", c.deviceId) + } + resp, err := http.PostForm(c.serverUrl, postData) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + result := map[string]interface{}{} + err = json.Unmarshal(body, &result) + if err != nil { + return err + } + if uint64(result["errorLevel"].(float64)) != 0 { + msg := fmt.Sprintf("send to receiver failed with return content: %s", string(body)) + tdLogError(msg) + return errors.New(msg) + } else { + tdLogInfo("send success: %v", result) + } + } else { + return errors.New(fmt.Sprintf("Unexpected Status Code: %d", resp.StatusCode)) + } + return nil +} diff --git a/src/server/thinkingdata/consumer_log.go b/src/server/thinkingdata/consumer_log.go new file mode 100644 index 00000000..e39f45b3 --- /dev/null +++ b/src/server/thinkingdata/consumer_log.go @@ -0,0 +1,256 @@ +package thinkingdata + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "sync" + "time" +) + +type RotateMode int32 + +const ( + DefaultChannelSize = 1000 // channel size + ROTATE_DAILY RotateMode = 0 // by the day + ROTATE_HOURLY RotateMode = 1 // by the hour +) + +// TDLogConsumer write data to file, it works with LogBus +type TDLogConsumer struct { + directory string // directory of log file + dateFormat string // name format of log file + fileSize int64 // max size of single log file (MByte) + fileNamePrefix string // prefix of log file + currentFile *os.File // current file handler + wg sync.WaitGroup + ch chan []byte + mutex *sync.RWMutex + sdkClose bool +} + +type TDLogConsumerConfig struct { + Directory string // directory of log file + RotateMode RotateMode // rotate mode of log file + FileSize int // max size of single log file (MByte) + FileNamePrefix string // prefix of log file + ChannelSize int +} + +func NewLogConsumer(directory string, r RotateMode) (TDConsumer, error) { + return NewLogConsumerWithFileSize(directory, r, 0) +} + +// NewLogConsumerWithFileSize init TDLogConsumer +// directory: directory of log file +// r: rotate mode of log file. (in days / hours) +// size: max size of single log file (MByte) +func NewLogConsumerWithFileSize(directory string, r RotateMode, size int) (TDConsumer, error) { + config := TDLogConsumerConfig{ + Directory: directory, + RotateMode: r, + FileSize: size, + } + return NewLogConsumerWithConfig(config) +} + +func NewLogConsumerWithConfig(config TDLogConsumerConfig) (TDConsumer, error) { + var df string + switch config.RotateMode { + case ROTATE_DAILY: + df = "2006-01-02" + case ROTATE_HOURLY: + df = "2006-01-02-15" + default: + errStr := "unknown rotate mode" + tdLogInfo(errStr) + return nil, errors.New(errStr) + } + + chanSize := DefaultChannelSize + if config.ChannelSize > 0 { + chanSize = config.ChannelSize + } + + c := &TDLogConsumer{ + directory: config.Directory, + dateFormat: df, + fileSize: int64(config.FileSize * 1024 * 1024), + fileNamePrefix: config.FileNamePrefix, + wg: sync.WaitGroup{}, + ch: make(chan []byte, chanSize), + mutex: new(sync.RWMutex), + sdkClose: false, + } + + return c, c.init() +} + +func (c *TDLogConsumer) Add(d Data) error { + var err error = nil + c.mutex.Lock() + defer func() { + c.mutex.Unlock() + }() + if c.sdkClose { + err = errors.New("add event failed, SDK has been closed") + tdLogError(err.Error()) + } else { + jsonBytes, jsonErr := json.Marshal(d) + if jsonErr != nil { + err = jsonErr + } else { + c.ch <- jsonBytes + } + } + return err +} + +func (c *TDLogConsumer) Flush() error { + tdLogInfo("flush data") + var err error = nil + c.mutex.Lock() + if c.currentFile != nil { + err = c.currentFile.Sync() + } + c.mutex.Unlock() + return err +} + +func (c *TDLogConsumer) Close() error { + tdLogInfo("log consumer close") + + var err error = nil + c.mutex.Lock() + if c.sdkClose { + err = errors.New("[ThinkingData][error]: SDK has been closed") + } else { + close(c.ch) + c.wg.Wait() + if c.currentFile != nil { + _ = c.currentFile.Sync() + err = c.currentFile.Close() + c.currentFile = nil + } + } + c.sdkClose = true + c.mutex.Unlock() + return err +} + +func (c *TDLogConsumer) IsStringent() bool { + return false +} + +func (c *TDLogConsumer) constructFileName(timeStr string, i int) string { + fileNamePrefix := "" + if len(c.fileNamePrefix) != 0 { + fileNamePrefix = c.fileNamePrefix + "." + } + // is need paging + if c.fileSize > 0 { + return fmt.Sprintf("%s/%slog.%s_%d", c.directory, fileNamePrefix, timeStr, i) + } else { + return fmt.Sprintf("%s/%slog.%s", c.directory, fileNamePrefix, timeStr) + } +} + +func (c *TDLogConsumer) init() error { + fd, err := c.initLogFile() + if err != nil { + tdLogError("init log file failed: %s", err) + return err + } + c.currentFile = fd + + c.wg.Add(1) + + go func() { + defer func() { + c.wg.Done() + }() + for { + select { + case rec, ok := <-c.ch: + if !ok { + return + } + jsonStr := parseTime(rec) + tdLogInfo("write event data: %s", jsonStr) + c.writeToFile(jsonStr) + } + } + }() + + tdLogInfo("Mode: log consumer, log path: " + c.directory) + + return nil +} + +func (c *TDLogConsumer) initLogFile() (*os.File, error) { + _, err := os.Stat(c.directory) + if err != nil && os.IsNotExist(err) { + e := os.MkdirAll(c.directory, os.ModePerm) + if e != nil { + return nil, e + } + } + timeStr := time.Now().Format(c.dateFormat) + return os.OpenFile(c.constructFileName(timeStr, 0), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664) +} + +var logFileIndex = 0 + +func (c *TDLogConsumer) writeToFile(str string) { + timeStr := time.Now().Format(c.dateFormat) + // paging by Rotate Mode and current file size + var newName string + fName := c.constructFileName(timeStr, logFileIndex) + + if c.currentFile == nil { + var openFileErr error + c.currentFile, openFileErr = os.OpenFile(fName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664) + if openFileErr != nil { + tdLogInfo("open log file failed: %s\n", openFileErr) + return + } + } + + if c.currentFile.Name() != fName { + newName = fName + } else if c.fileSize > 0 { + stat, _ := c.currentFile.Stat() + if stat.Size() > c.fileSize { + logFileIndex++ + newName = c.constructFileName(timeStr, logFileIndex) + } + } + if newName != "" { + err := c.currentFile.Close() + if err != nil { + tdLogInfo("close file failed: %s\n", err) + return + } + c.currentFile, err = os.OpenFile(fName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664) + if err != nil { + tdLogInfo("rotate log file failed: %s\n", err) + return + } + } + _, err := fmt.Fprintln(c.currentFile, str) + if err != nil { + tdLogInfo("LoggerWriter(%q): %s\n", c.currentFile.Name(), err) + return + } +} + +// Deprecated: please use TDLogConsumer +type LogConsumer struct { + TDLogConsumer +} + +// Deprecated: please use TDLogConsumerConfig +type LogConfig struct { + TDLogConsumerConfig +} diff --git a/src/server/thinkingdata/td_log.go b/src/server/thinkingdata/td_log.go new file mode 100644 index 00000000..a35ec036 --- /dev/null +++ b/src/server/thinkingdata/td_log.go @@ -0,0 +1,125 @@ +package thinkingdata + +import ( + "fmt" + "time" +) + +// SDK_LOG_PREFIX const +const SDK_LOG_PREFIX = "[ThinkingData]" + +var logInstance TDLogger + +type TDLogLevel int32 + +const ( + TDLogLevelOff TDLogLevel = 1 + TDLogLevelError TDLogLevel = 2 + TDLogLevelWarning TDLogLevel = 3 + TDLogLevelInfo TDLogLevel = 4 + TDLogLevelDebug TDLogLevel = 5 +) + +// default is TDLogLevelOff +var currentLogLevel = TDLogLevelOff + +// TDLogger User-defined log classes must comply with interface +type TDLogger interface { + Print(message string) +} + +// SetLogLevel Set the log output level +func SetLogLevel(level TDLogLevel) { + if level < TDLogLevelOff || level > TDLogLevelDebug { + fmt.Println(SDK_LOG_PREFIX + "log type error") + return + } else { + currentLogLevel = level + } +} + +// SetCustomLogger Set a custom log input class, usually you don't need to set it up. +func SetCustomLogger(logger TDLogger) { + if logger != nil { + logInstance = logger + } +} + +func tdLog(level TDLogLevel, format string, v ...interface{}) { + if level > currentLogLevel { + return + } + + var modeStr string + switch level { + case TDLogLevelError: + modeStr = "[Error] " + break + case TDLogLevelWarning: + modeStr = "[Warning] " + break + case TDLogLevelInfo: + modeStr = "[Info] " + break + case TDLogLevelDebug: + modeStr = "[Debug] " + break + default: + modeStr = "[Info] " + break + } + + if logInstance != nil { + msg := fmt.Sprintf(SDK_LOG_PREFIX+modeStr+format+"\n", v...) + logInstance.Print(msg) + } else { + logTime := fmt.Sprintf("[%v]", time.Now().Format("2006-01-02 15:04:05.000")) + fmt.Printf(logTime+SDK_LOG_PREFIX+modeStr+format+"\n", v...) + } +} + +func tdLogDebug(format string, v ...interface{}) { + tdLog(TDLogLevelDebug, format, v...) +} + +func tdLogInfo(format string, v ...interface{}) { + tdLog(TDLogLevelInfo, format, v...) +} + +func tdLogError(format string, v ...interface{}) { + tdLog(TDLogLevelError, format, v...) +} + +func tdLogWarning(format string, v ...interface{}) { + tdLog(TDLogLevelWarning, format, v...) +} + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +type LogType int32 + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +const ( + LoggerTypeOff LogType = 1 << 0 // disable log + LoggerTypePrint LogType = 1 << 1 // print on console + LoggerTypeWriteFile LogType = 1 << 2 // print to file + LoggerTypePrintAndWriteFile = LoggerTypePrint | LoggerTypeWriteFile // print both on console and file +) + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +type LoggerConfig struct { + Type LogType + Path string +} + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +func SetLoggerConfig(config LoggerConfig) { + if config.Type < LoggerTypeOff || config.Type > LoggerTypePrintAndWriteFile { + fmt.Println(SDK_LOG_PREFIX + "log type error") + return + } + if config.Type&LoggerTypeOff == LoggerTypeOff { + currentLogLevel = TDLogLevelOff + } else { + currentLogLevel = TDLogLevelInfo + } +} diff --git a/src/server/thinkingdata/thinkingdata.go b/src/server/thinkingdata/thinkingdata.go new file mode 100644 index 00000000..aeea763b --- /dev/null +++ b/src/server/thinkingdata/thinkingdata.go @@ -0,0 +1,314 @@ +package thinkingdata + +import ( + "encoding/json" + "errors" + "sync" + "time" +) + +const ( + Track = "track" + TrackUpdate = "track_update" + TrackOverwrite = "track_overwrite" + UserSet = "user_set" + UserUnset = "user_unset" + UserSetOnce = "user_setOnce" + UserAdd = "user_add" + UserAppend = "user_append" + UserUniqAppend = "user_uniq_append" + UserDel = "user_del" + + SdkVersion = "2.0.3" + LibName = "Golang" +) + +type Data struct { + IsComplex bool `json:"-"` // properties are nested or not + AccountId string `json:"#account_id,omitempty"` + DistinctId string `json:"#distinct_id,omitempty"` + Type string `json:"#type"` + Time string `json:"#time"` + Timestamp int64 `json:"#timestamp,omitempty"` + EventName string `json:"#event_name,omitempty"` + EventId string `json:"#event_id,omitempty"` + FirstCheckId string `json:"#first_check_id,omitempty"` + Ip string `json:"#ip,omitempty"` + UUID string `json:"#uuid,omitempty"` + AppId string `json:"#app_id,omitempty"` + Properties map[string]interface{} `json:"properties"` + PropertiesSummary string `json:"properties_summary,omitempty"` +} + +// TDConsumer define operation interface +type TDConsumer interface { + Add(d Data) error + Flush() error + Close() error + IsStringent() bool // check data or not. +} + +type TDAnalytics struct { + consumer TDConsumer + superProperties map[string]interface{} + mutex *sync.RWMutex + dynamicSuperProperties func() map[string]interface{} +} + +// New init SDK +func New(c TDConsumer) TDAnalytics { + tdLogInfo("init SDK success") + return TDAnalytics{ + consumer: c, + superProperties: make(map[string]interface{}), + mutex: new(sync.RWMutex), + } +} + +// GetSuperProperties get common properties +func (ta *TDAnalytics) GetSuperProperties() map[string]interface{} { + result := make(map[string]interface{}) + ta.mutex.Lock() + mergeProperties(result, ta.superProperties) + ta.mutex.Unlock() + return result +} + +// SetSuperProperties set common properties +func (ta *TDAnalytics) SetSuperProperties(superProperties map[string]interface{}) { + ta.mutex.Lock() + mergeProperties(ta.superProperties, superProperties) + ta.mutex.Unlock() +} + +// ClearSuperProperties clear common properties +func (ta *TDAnalytics) ClearSuperProperties() { + ta.mutex.Lock() + ta.superProperties = make(map[string]interface{}) + ta.mutex.Unlock() +} + +// SetDynamicSuperProperties set common properties dynamically. +// not recommend to add the operation which with a lot of computation +func (ta *TDAnalytics) SetDynamicSuperProperties(action func() map[string]interface{}) { + ta.mutex.Lock() + ta.dynamicSuperProperties = action + ta.mutex.Unlock() +} + +// GetDynamicSuperProperties dynamic common properties +func (ta *TDAnalytics) GetDynamicSuperProperties() map[string]interface{} { + result := make(map[string]interface{}) + ta.mutex.RLock() + if ta.dynamicSuperProperties != nil { + mergeProperties(result, ta.dynamicSuperProperties()) + } + ta.mutex.RUnlock() + return result +} + +// Track report ordinary event +func (ta *TDAnalytics) Track(accountId, distinctId, eventName string, properties map[string]interface{}) error { + return ta.track(accountId, distinctId, Track, eventName, "", properties) +} + +// TrackFirst report first event +func (ta *TDAnalytics) TrackFirst(accountId, distinctId, eventName, firstCheckId string, properties map[string]interface{}) error { + if len(firstCheckId) == 0 { + msg := "the 'firstCheckId' must be provided" + tdLogInfo(msg) + return errors.New(msg) + } + p := make(map[string]interface{}) + mergeProperties(p, properties) + p["#first_check_id"] = firstCheckId + return ta.track(accountId, distinctId, Track, eventName, "", p) +} + +// TrackUpdate report updatable event +func (ta *TDAnalytics) TrackUpdate(accountId, distinctId, eventName, eventId string, properties map[string]interface{}) error { + return ta.track(accountId, distinctId, TrackUpdate, eventName, eventId, properties) +} + +// TrackOverwrite report overridable event +func (ta *TDAnalytics) TrackOverwrite(accountId, distinctId, eventName, eventId string, properties map[string]interface{}) error { + return ta.track(accountId, distinctId, TrackOverwrite, eventName, eventId, properties) +} + +func (ta *TDAnalytics) track(accountId, distinctId, dataType, eventName, eventId string, properties map[string]interface{}) error { + defer func() { + if r := recover(); r != nil { + tdLogError("%+v\ndata: %+v", r, properties) + } + }() + + if len(eventName) == 0 { + msg := "the event name must be provided" + tdLogError(msg) + return errors.New(msg) + } + + // eventId not be null unless eventType is equal Track. + if len(eventId) == 0 && dataType != Track { + msg := "the event id must be provided" + tdLogError(msg) + return errors.New(msg) + } + + p := ta.GetSuperProperties() + dynamicSuperProperties := ta.GetDynamicSuperProperties() + + mergeProperties(p, dynamicSuperProperties) + // preset properties has the highest priority + p["#lib"] = LibName + p["#lib_version"] = SdkVersion + // custom properties + mergeProperties(p, properties) + + return ta.add(accountId, distinctId, dataType, eventName, eventId, p) +} + +// UserSet set user properties. would overwrite existing names. +func (ta *TDAnalytics) UserSet(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserSet, properties) +} + +// UserUnset clear the user properties of users. +func (ta *TDAnalytics) UserUnset(accountId string, distinctId string, s []string) error { + if len(s) == 0 { + msg := "invalid params for UserUnset: keys is nil" + tdLogInfo(msg) + return errors.New(msg) + } + prop := make(map[string]interface{}) + for _, v := range s { + prop[v] = 0 + } + return ta.user(accountId, distinctId, UserUnset, prop) +} + +func (ta *TDAnalytics) UserUnsetWithProperties(accountId string, distinctId string, properties map[string]interface{}) error { + if len(properties) == 0 { + msg := "invalid params for UserUnset: properties is nil" + tdLogInfo(msg) + return errors.New(msg) + } + return ta.user(accountId, distinctId, UserUnset, properties) +} + +// UserSetOnce set user properties, If such property had been set before, this message would be neglected. +func (ta *TDAnalytics) UserSetOnce(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserSetOnce, properties) +} + +// UserAdd to accumulate operations against the property. +func (ta *TDAnalytics) UserAdd(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserAdd, properties) +} + +// UserAppend to add user properties of array type. +func (ta *TDAnalytics) UserAppend(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserAppend, properties) +} + +// UserUniqAppend append user properties to array type by unique. +func (ta *TDAnalytics) UserUniqAppend(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserUniqAppend, properties) +} + +// UserDelete delete a user, This operation cannot be undone. +func (ta *TDAnalytics) UserDelete(accountId string, distinctId string) error { + return ta.user(accountId, distinctId, UserDel, nil) +} + +// UserDeleteWithProperties delete a user, This operation cannot be undone. +func (ta *TDAnalytics) UserDeleteWithProperties(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserDel, properties) +} + +func (ta *TDAnalytics) user(accountId, distinctId, dataType string, properties map[string]interface{}) error { + defer func() { + if r := recover(); r != nil { + tdLogError("%+v\ndata: %+v", r, properties) + } + }() + if properties == nil && dataType != UserDel { + msg := "invalid params for " + dataType + ": properties is nil" + tdLogError(msg) + return errors.New(msg) + } + p := make(map[string]interface{}) + mergeProperties(p, properties) + return ta.add(accountId, distinctId, dataType, "", "", p) +} + +// Flush report data immediately. +func (ta *TDAnalytics) Flush() error { + return ta.consumer.Flush() +} + +// Close and exit sdk +func (ta *TDAnalytics) Close() error { + err := ta.consumer.Close() + tdLogInfo("SDK close") + return err +} + +func (ta *TDAnalytics) add(accountId, distinctId, dataType, eventName, eventId string, properties map[string]interface{}) error { + if len(accountId) == 0 && len(distinctId) == 0 { + msg := "invalid parameters: account_id and distinct_id cannot be empty at the same time" + tdLogError(msg) + return errors.New(msg) + } + + // get "#ip" value in properties, empty string will be return when not found. + ip := extractStringProperty(properties, "#ip") + + // get "#app_id" value in properties, empty string will be return when not found. + appId := extractStringProperty(properties, "#app_id") + + // get "#time" value in properties, empty string will be return when not found. + eventTime := extractTime(properties) + + firstCheckId := extractStringProperty(properties, "#first_check_id") + + // get "#uuid" value in properties, empty string will be return when not found. + uuid := extractStringProperty(properties, "#uuid") + if len(uuid) == 0 { + uuid = generateUUID() + } + properties_summary, err := json.Marshal(properties) + if err != nil { + properties_summary = []byte{} + } + data := Data{ + AccountId: accountId, + DistinctId: distinctId, + Type: dataType, + Time: eventTime, + Timestamp: time.Now().Unix(), + EventName: eventName, + EventId: eventId, + FirstCheckId: firstCheckId, + Ip: ip, + UUID: uuid, + Properties: properties, + PropertiesSummary: string(properties_summary), + } + + if len(appId) > 0 { + data.AppId = appId + } + + err = formatProperties(&data, ta) + if err != nil { + return err + } + + return ta.consumer.Add(data) +} + +// Deprecated: please use TDConsumer +type Consumer interface { + TDConsumer +} diff --git a/src/server/thinkingdata/utils.go b/src/server/thinkingdata/utils.go new file mode 100644 index 00000000..3ac20f07 --- /dev/null +++ b/src/server/thinkingdata/utils.go @@ -0,0 +1,143 @@ +package thinkingdata + +import ( + "errors" + "fmt" + "github.com/google/uuid" + "os" + "reflect" + "regexp" + "time" +) + +const ( + DATE_FORMAT = "2006-01-02 15:04:05.000" + KEY_PATTERN = "^[a-zA-Z#][A-Za-z0-9_]{0,49}$" +) + +// A string of 50 letters and digits that starts with '#' or a letter +var keyPattern, _ = regexp.Compile(KEY_PATTERN) + +func mergeProperties(target, source map[string]interface{}) { + for k, v := range source { + target[k] = v + } +} + +func extractTime(p map[string]interface{}) string { + if t, ok := p["#time"]; ok { + delete(p, "#time") + switch v := t.(type) { + case string: + return v + case time.Time: + return v.Format(DATE_FORMAT) + default: + return time.Now().Format(DATE_FORMAT) + } + } + + return time.Now().Format(DATE_FORMAT) +} + +func extractStringProperty(p map[string]interface{}, key string) string { + if t, ok := p[key]; ok { + delete(p, key) + v, ok := t.(string) + if !ok { + fmt.Fprintln(os.Stderr, "Invalid data type for "+key) + } + return v + } + return "" +} + +func isNotNumber(v interface{}) bool { + switch v.(type) { + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + case float32, float64: + default: + return true + } + return false +} + +func formatProperties(d *Data, ta *TDAnalytics) error { + + if d.EventName != "" { + matched := checkPattern([]byte(d.EventName)) + if !matched { + msg := "invalid event name: " + d.EventName + tdLogInfo(msg) + return errors.New(msg) + } + } + + if d.Properties != nil { + for k, v := range d.Properties { + if ta.consumer.IsStringent() { + isMatch := checkPattern([]byte(k)) + if !isMatch { + msg := "invalid property key: " + k + tdLogInfo(msg) + return errors.New(msg) + } + } + + if d.Type == UserAdd && isNotNumber(v) { + msg := "invalid property value: only numbers is supported by UserAdd" + tdLogInfo(msg) + return errors.New(msg) + } + + // check value + switch v.(type) { + case int: + case bool: + case float64: + case string: + case time.Time: + d.Properties[k] = v.(time.Time).Format(DATE_FORMAT) + case []string: + d.IsComplex = true + default: + d.IsComplex = true + } + } + } + + return nil +} + +func isNotArrayOrSlice(v interface{}) bool { + typeOf := reflect.TypeOf(v) + switch typeOf.Kind() { + case reflect.Array: + case reflect.Slice: + default: + return true + } + return false +} + +func checkPattern(name []byte) bool { + return keyPattern.Match(name) +} + +func parseTime(input []byte) string { + var re = regexp.MustCompile(`"((\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2})(?:\.(\d{3}))\d*)(Z|[\+-]\d{2}:\d{2})"`) + var substitution = "\"$2 $3.$4\"" + + for re.Match(input) { + input = re.ReplaceAll(input, []byte(substitution)) + } + return string(input) +} + +func generateUUID() string { + newUUID, err := uuid.NewUUID() + if err != nil { + return "" + } + return newUUID.String() +}