Merge branch 'develop' into sdk
This commit is contained in:
commit
616bd52784
@ -34,7 +34,7 @@ func Init() {
|
||||
server.MaxConnNum = int(math.MaxInt32)
|
||||
server.PendingWriteNum = 1 << 14
|
||||
server.LenMsgLen = 4
|
||||
server.MaxMsgLen = 4096
|
||||
server.MaxMsgLen = 1 << 16
|
||||
server.NewAgent = newServerAgent
|
||||
server.Start()
|
||||
}
|
||||
|
||||
@ -93,7 +93,7 @@ func connectRemote(RemoteAddr string, ConnType int, ConnLabel string) error {
|
||||
client.ConnNum = 1
|
||||
client.PendingWriteNum = 1 << 14
|
||||
client.LenMsgLen = 4
|
||||
client.MaxMsgLen = 4096
|
||||
client.MaxMsgLen = 1 << 16
|
||||
client.NewAgent = newAgent
|
||||
client.ConnType = ConnType
|
||||
client.ConnLabel = ConnLabel
|
||||
|
||||
@ -17,8 +17,9 @@ var (
|
||||
LittleEndian = false
|
||||
|
||||
// skeleton conf
|
||||
GoLen = 10000
|
||||
TimerDispatcherLen = 10000
|
||||
AsynCallLen = 10000
|
||||
ChanRPCLen = 10000
|
||||
// 增加 goroutine 相关配置,避免 "Too many goroutines" 错误
|
||||
GoLen = 50000 // 从 10000 增加到 50000,控制并发 goroutine 数量
|
||||
TimerDispatcherLen = 50000 // 从 10000 增加到 50000,定时器队列长度
|
||||
AsynCallLen = 50000 // 从 10000 增加到 50000,异步调用队列长度
|
||||
ChanRPCLen = 50000 // 从 10000 增加到 50000,RPC 通道长度
|
||||
)
|
||||
|
||||
@ -213,3 +213,11 @@ func GetPartNumByAreaId(AreaId int) map[int]int {
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func GetAreaIdByIndoorId(IndoorId int) int {
|
||||
data, err := gamedata.GetDataByIntKey(INDOOR_PROGRESS, IndoorId)
|
||||
if err != nil {
|
||||
return 0
|
||||
}
|
||||
return gamedata.GetIntValue(data, "Scene")
|
||||
}
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package limitedTimeEventCfg
|
||||
|
||||
import (
|
||||
"math"
|
||||
"server/game/mod/item"
|
||||
GoUtil "server/game_util"
|
||||
"server/gamedata"
|
||||
@ -146,18 +147,32 @@ func GetSenceJackpotReward(Id int) []*item.Item {
|
||||
}
|
||||
|
||||
// 获取连击快手奖励
|
||||
func GetFastProduceReward(Times, Energy int) []*item.Item {
|
||||
func GetFastProduceReward(Energy int) []*item.Item {
|
||||
data, err := gamedata.GetData(CFG_LIMITED_TIME_EVENT_FAST)
|
||||
if err != nil {
|
||||
log.Debug("GetSceneDashReward err:%v", err)
|
||||
return nil
|
||||
}
|
||||
for _, v := range data {
|
||||
if Times == gamedata.GetIntValue(v, "Times") && Energy <= gamedata.GetIntValue(v, "Max") && Energy >= gamedata.GetIntValue(v, "Min") {
|
||||
return gamedata.GetItemList(v, "Items")
|
||||
}
|
||||
}
|
||||
if len(data) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Convert map to slice for sorting
|
||||
type sortData struct {
|
||||
Id string
|
||||
Energy float64
|
||||
}
|
||||
sortedList := make([]sortData, 0)
|
||||
energy := float64(Energy) / 10.0
|
||||
for k, v := range data {
|
||||
dataEnergy := gamedata.GetFloatValue(v, "EnergyValue")
|
||||
sortedList = append(sortedList, sortData{k, math.Abs(energy - dataEnergy)})
|
||||
}
|
||||
// Sort by Energy in ascending order
|
||||
sort.Slice(sortedList, func(i, j int) bool {
|
||||
return sortedList[i].Energy < sortedList[j].Energy
|
||||
})
|
||||
|
||||
return gamedata.GetItemList(data[sortedList[0].Id], "Items")
|
||||
}
|
||||
|
||||
// 获取连击快手最大次数
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
{
|
||||
"AppID": 1,
|
||||
"AppID": 0,
|
||||
"LogLevel": "debug",
|
||||
"LogPath": "./log",
|
||||
"TCPAddr": ":3601",
|
||||
@ -9,7 +9,7 @@
|
||||
"MySqlUsr": "root",
|
||||
"MySqlPwd": "IOagNEq3C84c-20CmHEin5iODVc=",
|
||||
"MaxConnNum": 20000,
|
||||
"DbName": "Merge_Pet_1",
|
||||
"DbName": "merge_pet_1",
|
||||
"HttpPort": ":8081",
|
||||
"AppPath": "./app",
|
||||
"TELOGDIR" : "./teLog/",
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package db
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"server/MergeConst"
|
||||
@ -17,17 +18,50 @@ import (
|
||||
)
|
||||
|
||||
var SqlDb *sqlx.DB
|
||||
var sqlDbMu sync.Mutex
|
||||
var sqlDbMu sync.RWMutex
|
||||
|
||||
// GetDB 线程安全地获取数据库连接
|
||||
func GetDB() *sqlx.DB {
|
||||
sqlDbMu.RLock()
|
||||
defer sqlDbMu.RUnlock()
|
||||
return SqlDb
|
||||
}
|
||||
|
||||
// GetDBOrPanic 获取数据库连接,如果为 nil 则记录错误
|
||||
func GetDBOrPanic() *sqlx.DB {
|
||||
db := GetDB()
|
||||
if db == nil {
|
||||
log.Error("Database connection is nil, please check database initialization")
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
// EnsureDB 确保数据库连接可用,如果不可用则返回错误
|
||||
func EnsureDB() (*sqlx.DB, error) {
|
||||
db := GetDB()
|
||||
if db == nil {
|
||||
return nil, fmt.Errorf("database connection is nil")
|
||||
}
|
||||
if err := db.Ping(); err != nil {
|
||||
return nil, fmt.Errorf("database ping failed: %w", err)
|
||||
}
|
||||
return db, nil
|
||||
}
|
||||
|
||||
// 封装创建连接
|
||||
func connectMySQL() (*sqlx.DB, error) {
|
||||
MysqlPwd, _ := GoUtil.Decrypt(conf.Server.MySqlPwd)
|
||||
connect := fmt.Sprintf("%s:%s@(%s:%s)/%s", conf.Server.MySqlUsr, MysqlPwd, conf.Server.MySqlAddr, conf.Server.MySqlPort, conf.Server.DbName)
|
||||
// 减少超时时间,避免长时间阻塞
|
||||
connect := fmt.Sprintf("%s:%s@(%s:%s)/%s?timeout=10s&readTimeout=15s&writeTimeout=15s&parseTime=true", conf.Server.MySqlUsr, MysqlPwd, conf.Server.MySqlAddr, conf.Server.MySqlPort, conf.Server.DbName)
|
||||
db, err := sqlx.Connect("mysql", connect)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
db.SetMaxOpenConns(20)
|
||||
// 增加最大连接数,减少连接等待时间
|
||||
db.SetMaxOpenConns(100)
|
||||
db.SetMaxIdleConns(20)
|
||||
db.SetConnMaxLifetime(30 * time.Minute) // 减少连接生命周期
|
||||
db.SetConnMaxIdleTime(5 * time.Minute) // 减少空闲时间
|
||||
return db, nil
|
||||
}
|
||||
|
||||
@ -38,19 +72,29 @@ func InitDB() {
|
||||
log.Debug("connect mysql failed: %v", err)
|
||||
return
|
||||
}
|
||||
sqlDbMu.Lock()
|
||||
SqlDb = db
|
||||
sqlDbMu.Unlock()
|
||||
log.Debug("connect mysql success")
|
||||
|
||||
// 定时检测与重连
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second)
|
||||
ticker := time.NewTicker(5 * time.Second) // 改为5秒检测一次,降低频率
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
sqlDbMu.Lock()
|
||||
sqlDbMu.RLock()
|
||||
cur := SqlDb
|
||||
sqlDbMu.Unlock()
|
||||
if cur == nil || cur.Ping() != nil {
|
||||
log.Debug("mysql ping failed, start reconnect")
|
||||
sqlDbMu.RUnlock()
|
||||
|
||||
if cur == nil {
|
||||
log.Debug("mysql connection is nil, start reconnect")
|
||||
ReconnectDB()
|
||||
continue
|
||||
}
|
||||
|
||||
// Ping 操作不持有锁,避免阻塞其他操作
|
||||
if err := cur.Ping(); err != nil {
|
||||
log.Debug("mysql ping failed: %v, start reconnect", err)
|
||||
ReconnectDB()
|
||||
}
|
||||
}
|
||||
@ -74,7 +118,12 @@ func ReconnectDB() {
|
||||
}
|
||||
|
||||
func SeriesTransaction(sqlstrs []string, params [][]any) (err error) {
|
||||
tx, err := SqlDb.Begin()
|
||||
sqlDb := GetDB()
|
||||
if sqlDb == nil {
|
||||
return fmt.Errorf("database connection is nil")
|
||||
}
|
||||
|
||||
tx, err := sqlDb.Begin()
|
||||
if err != nil {
|
||||
log.Debug("Transaction failed, err:%v\n", err)
|
||||
return err
|
||||
@ -379,20 +428,38 @@ func FormatAllMemLoadDb(u interface{}, tableName string, Exclude string) (err er
|
||||
}
|
||||
|
||||
func GetServerData(d interface{}, Key string) (err error) {
|
||||
sqlDb := GetDB()
|
||||
if sqlDb == nil {
|
||||
return fmt.Errorf("database connection is nil")
|
||||
}
|
||||
sql := "select * from t_server_mod where `key` = ?"
|
||||
err = SqlDb.Get(d, sql, Key)
|
||||
err = sqlDb.Get(d, sql, Key)
|
||||
return
|
||||
}
|
||||
|
||||
func SaveServerData(data *SqlServerModStruct) error {
|
||||
sqlDb := GetDB()
|
||||
if sqlDb == nil {
|
||||
return fmt.Errorf("database connection is nil")
|
||||
}
|
||||
sql := "update t_server_mod set `mData` = ? , `updateTime` = ? where `key` = ?"
|
||||
_, err := SqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key)
|
||||
_, err := sqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key)
|
||||
return err
|
||||
}
|
||||
|
||||
func SaveServerDataWithTx(tx *sql.Tx, data *SqlServerModStruct) error {
|
||||
sql := "update t_server_mod set `mData` = ? , `updateTime` = ? where `key` = ?"
|
||||
_, err := tx.Exec(sql, data.ModData, data.UpdataTime, data.Key)
|
||||
return err
|
||||
}
|
||||
|
||||
func InsertServerData(data *SqlServerModStruct) error {
|
||||
sqlDb := GetDB()
|
||||
if sqlDb == nil {
|
||||
return fmt.Errorf("database connection is nil")
|
||||
}
|
||||
sql := "insert into t_server_mod (`mData` , `updateTime` ,`key`) Values (?,?,?)"
|
||||
_, err := SqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key)
|
||||
_, err := sqlDb.Exec(sql, data.ModData, data.UpdataTime, data.Key)
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
@ -206,7 +206,7 @@ func AdminPlayerBack(a gate.Agent, res map[string]interface{}) {
|
||||
JsonBuff, _ := json.Marshal(res)
|
||||
response := &msg.AdminRes{}
|
||||
response.Func = "admin"
|
||||
response.Info = JsonBuff
|
||||
response.Info = string(JsonBuff)
|
||||
a.WriteMsg(response)
|
||||
}
|
||||
|
||||
|
||||
@ -52,6 +52,7 @@ func HandleAdminReq(args []interface{}) {
|
||||
}
|
||||
|
||||
func HandleClientReq(args []interface{}) {
|
||||
start := time.Now()
|
||||
if G_GameLogicPtr.SeverInfo.Status == SERVER_STATUS_CLOSE || G_GameLogicPtr.SeverInfo.Status == SERVER_STATUS_MAINTAIN {
|
||||
return // 服务器关闭或者维护中,不处理任何消息
|
||||
}
|
||||
@ -146,7 +147,7 @@ func HandleClientReq(args []interface{}) {
|
||||
ResRegisterAccount.ResultCode = 0
|
||||
data, _ := proto.Marshal(ResRegisterAccount)
|
||||
gl.PackResInfo(a, "ResRegisterAccount", data)
|
||||
case "ReqLogin":
|
||||
case "ReqLogin": // 登录请求
|
||||
detail := &msg.ReqLogin{}
|
||||
proto.Unmarshal(buf, detail)
|
||||
accountInfo := db.GetAccountInfoFromDb(detail.UserName)
|
||||
@ -162,11 +163,13 @@ func HandleClientReq(args []interface{}) {
|
||||
G_GameLogicPtr.PackLoginResInfo(a, ResLogin)
|
||||
return
|
||||
}
|
||||
newPlayer := false
|
||||
if ResLogin.DwUin > 0 {
|
||||
PlayerInfo := G_GameLogicPtr.GetPlayer(ResLogin.DwUin)
|
||||
err := G_GameLogicPtr.ReplaceExistPlayerAndAgent(a, PlayerInfo)
|
||||
if err != nil {
|
||||
PlayerInfo = G_GameLogicPtr.CreateNewPlayer(a, detail.UserName)
|
||||
newPlayer = true
|
||||
}
|
||||
if PlayerInfo.PlayMod.getBaseMod().IdCardName == "" && conf.Server.IdVerify {
|
||||
ResLogin.ResultCode = MergeConst.Protocol_Error_Id_Not_Verify
|
||||
@ -184,6 +187,9 @@ func HandleClientReq(args []interface{}) {
|
||||
p.(*Player).LoginBackData()
|
||||
p.(*Player).TeLog("Login_log", nil)
|
||||
}
|
||||
if newPlayer {
|
||||
log.Debug("uid : %d, init user process : %s, execTime : %v , isNew: %v", p.(*Player).M_DwUin, m.GetFunc(), time.Since(start), newPlayer)
|
||||
}
|
||||
p.(*Player).ProcessTrigger()
|
||||
case "ReqServerTime": // 获取服务器时间
|
||||
detail := &msg.ReqServerTime{}
|
||||
@ -213,15 +219,23 @@ func HandleClientReq(args []interface{}) {
|
||||
err := RunNetProcessByKey(m.GetFunc(), []interface{}{a, buf})
|
||||
if err != nil {
|
||||
log.Error("uid : %d, func : %s, err : %s", p.(*Player).M_DwUin, m.GetFunc(), err)
|
||||
p.(*Player).TeLog("func_exec_error", map[string]interface{}{
|
||||
"method_name": m.GetFunc(),
|
||||
"error_info": err.Error(),
|
||||
})
|
||||
p.(*Player).Recover(backup) //还原Player的数据
|
||||
return
|
||||
}
|
||||
p.(*Player).ProcessTrigger()
|
||||
p.(*Player).TeLog("func_exec_time", map[string]interface{}{
|
||||
"method_name": m.GetFunc(),
|
||||
"exec_time": fmt.Sprintf("%v", time.Since(start)),
|
||||
})
|
||||
}
|
||||
}
|
||||
p, b := internal.Agents.Load(a)
|
||||
if b {
|
||||
p.(*Player).SendClientRes()
|
||||
}
|
||||
|
||||
log.Debug("uid : %d, func : %s, execTime : %s ", p.(*Player).M_DwUin, m.GetFunc(), time.Since(start))
|
||||
}
|
||||
|
||||
@ -581,7 +581,12 @@ func ReqGmCommand_(player *Player, Command string) error {
|
||||
player.AddPlayroomUpvote(100100129)
|
||||
i := player.GetPlayroomUpvote(100100129)
|
||||
log.Debug("debug upvote:%d", i)
|
||||
|
||||
case "addLimitEvent":
|
||||
Id, _ := strconv.Atoi(arg[1])
|
||||
Cd, _ := strconv.Atoi(arg[2])
|
||||
LimitedTimeEventMod := player.PlayMod.getLimitedTimeEventMod()
|
||||
LimitedTimeEventMod.AddEvent(Id, Cd)
|
||||
player.PushClientRes(LimitedTimeEventMod.BackData())
|
||||
default:
|
||||
return fmt.Errorf("Player %d ReqGmCommand:%v not found", player.M_DwUin, arg)
|
||||
}
|
||||
|
||||
@ -197,6 +197,7 @@ func (L *LogMgr) InitManager() {
|
||||
}
|
||||
|
||||
func (L *LogMgr) AddLog(logs *Log) {
|
||||
return
|
||||
// 复制结构体和 Param map,避免并发修改导致 json.Marshal 时 panic
|
||||
copyLog := *logs
|
||||
|
||||
|
||||
@ -467,8 +467,6 @@ func (p *WorkerPool) spawnWorker() {
|
||||
// Worker 工作函数,监听其子上下文以便单独停止
|
||||
func (p *WorkerPool) worker(ctx context.Context, id int) {
|
||||
defer p.wg.Done()
|
||||
log.Debug("Worker %d started", id)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -768,6 +766,7 @@ func FriendMgrSend(m1 *msg.Msg) error {
|
||||
|
||||
// 异步发送消息到指定节点 节点不在线则保存消息
|
||||
func sendMessageAsync(m *msg.Msg, node int) error {
|
||||
log.Debug("[Middleware] Send Async message to node: %d, message: %v", node, m)
|
||||
err := mergeCluster.SendServerMsg(m, node)
|
||||
if err != nil && GoUtil.InArray(m.HandleType, save_msg_type) {
|
||||
saveMessage(m)
|
||||
@ -779,6 +778,7 @@ func sendMessageAsync(m *msg.Msg, node int) error {
|
||||
|
||||
// 同步消息到指定节点 节点不在线则保存消息
|
||||
func sendMessageSync(m *msg.Msg, node int) (*msg.Msg, error) {
|
||||
log.Debug("[Middleware] Send Sync message to node: %d, message: %v", node, m)
|
||||
msg, err := mergeCluster.CallServerMsg(m, node)
|
||||
if err != nil && conf.Server.ServerType == "center" && GoUtil.InArray(m.HandleType, save_msg_type) {
|
||||
saveMessage(m)
|
||||
|
||||
@ -339,14 +339,37 @@ func (cb *ChessBorad) GetEmitList() []int {
|
||||
|
||||
// 完成订单 移除棋子
|
||||
func (cb *ChessBorad) FinishOrder(ChessId []int) error {
|
||||
unlockChessList := cb.GetUnlockChessList()
|
||||
unlockChessMap := make(map[int]int)
|
||||
for _, v := range unlockChessList {
|
||||
unlockChessMap[v]++
|
||||
}
|
||||
|
||||
boardChess := []int{}
|
||||
BagChess := []int{}
|
||||
for _, v := range ChessId {
|
||||
if unlockChessMap[v] > 0 {
|
||||
unlockChessMap[v]--
|
||||
boardChess = append(boardChess, v)
|
||||
} else {
|
||||
BagChess = append(BagChess, v)
|
||||
}
|
||||
}
|
||||
for _, v := range boardChess {
|
||||
err := cb.FinishOrderChess(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, v := range BagChess {
|
||||
err := cb.FinishOrderChessByBag(v)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cb *ChessBorad) FinishOrderChess(Chess int) error {
|
||||
for k, v := range cb.ChessList {
|
||||
if v == Chess {
|
||||
@ -354,13 +377,17 @@ func (cb *ChessBorad) FinishOrderChess(Chess int) error {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return fmt.Errorf("order finish board chess id:%d not exist", Chess)
|
||||
}
|
||||
|
||||
func (cb *ChessBorad) FinishOrderChessByBag(Chess int) error {
|
||||
for k, v := range cb.ChessBag.List {
|
||||
if v.ChessId == Chess {
|
||||
cb.ChessBag.List[k] = ChessBagGrid{}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return errors.New("order finish chess id not exist")
|
||||
return fmt.Errorf("order finish bag chess id:%d not exist", Chess)
|
||||
}
|
||||
|
||||
// 棋子转换
|
||||
|
||||
@ -41,6 +41,12 @@ func (d *Decorate) InitData() {
|
||||
if len(d.PartCost) == 0 {
|
||||
d.initPartCost(d.AreaId)
|
||||
}
|
||||
for k := range d.PartCost {
|
||||
AreaId := decorateCfg.GetAreaIdByIndoorId(k)
|
||||
if AreaId < d.AreaId {
|
||||
delete(d.PartCost, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 装饰
|
||||
@ -64,6 +70,15 @@ func (d *Decorate) Decorate(areaId int, decorateId int) ([]*item.Item, error) {
|
||||
d.AreaId++
|
||||
d.Progress = 0
|
||||
d.FinishList = make(map[int]struct{})
|
||||
for k := range d.PartCost {
|
||||
AreaId := decorateCfg.GetAreaIdByIndoorId(k)
|
||||
if AreaId < d.AreaId {
|
||||
delete(d.PartCost, k)
|
||||
}
|
||||
}
|
||||
if len(d.PartCost) == 0 {
|
||||
d.initPartCost(d.AreaId)
|
||||
}
|
||||
}
|
||||
|
||||
d.DecorateNum++
|
||||
@ -93,6 +108,12 @@ func (d *Decorate) GetDecorateCostItem(AreaId, DecorateId int, DecorateOffIsExis
|
||||
PartItemList = PartItem.Items
|
||||
delete(d.PartCost, Id)
|
||||
}
|
||||
for k := range d.PartCost {
|
||||
AreaId := decorateCfg.GetAreaIdByIndoorId(k)
|
||||
if AreaId < d.AreaId {
|
||||
delete(d.PartCost, k)
|
||||
}
|
||||
}
|
||||
if len(d.PartCost) == 0 {
|
||||
d.initPartCost(d.AreaId + 1)
|
||||
}
|
||||
@ -155,10 +176,16 @@ func (d *Decorate) DecorateAll(Star int, DecorateOffIsExist bool) ([]*item.Item,
|
||||
d.AreaId++
|
||||
d.Progress = 0
|
||||
d.FinishList = make(map[int]struct{})
|
||||
for k := range d.PartCost {
|
||||
AreaId := decorateCfg.GetAreaIdByIndoorId(k)
|
||||
if AreaId < d.AreaId {
|
||||
delete(d.PartCost, k)
|
||||
}
|
||||
}
|
||||
if len(d.PartCost) == 0 {
|
||||
d.initPartCost(d.AreaId)
|
||||
}
|
||||
}
|
||||
SubItems = append(SubItems, item.NewItem(item.ITEM_STAR_ID, SubItem))
|
||||
return SubItems, AddItem, Num, DecorateList, Log, PetExp
|
||||
}
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
limitedTimeEventCfg "server/conf/limited_time_event"
|
||||
mergeDataCfg "server/conf/merge_data"
|
||||
"server/game/mod/item"
|
||||
"server/game/mod/order"
|
||||
GoUtil "server/game_util"
|
||||
"server/msg"
|
||||
)
|
||||
@ -268,30 +269,38 @@ func (l *LimitedTimeEventMod) ProgressBackData() *msg.ResLimitEventProgress {
|
||||
}
|
||||
|
||||
// 获取流星雨奖励
|
||||
func (l *LimitedTimeEventMod) GetMeteorReward(MergeList []int) []*item.Item {
|
||||
MaxLv := 0
|
||||
Star := 0
|
||||
func (l *LimitedTimeEventMod) GetMeteorReward(MergeList, EmitList []int) []*item.Item {
|
||||
eneryg := 0
|
||||
for _, v := range MergeList {
|
||||
ChessLv := mergeDataCfg.GetLvById(v)
|
||||
Star += mergeDataCfg.GetStarById(v)
|
||||
if ChessLv > MaxLv {
|
||||
MaxLv = ChessLv
|
||||
Color := mergeDataCfg.GetColorById(v)
|
||||
EmitId := order.GetEmitByColor(EmitList, Color)
|
||||
if EmitId == 0 {
|
||||
continue
|
||||
}
|
||||
NewChessLv := mergeDataCfg.DynamicLevRev(ChessLv, EmitId, Color)
|
||||
eneryg += int(math.Pow(2, float64(NewChessLv)))
|
||||
}
|
||||
Add := limitedTimeEventCfg.GetMeteorAdd(MaxLv)
|
||||
NewStar := int(float64(Star) * (float64(Add) / 100))
|
||||
NewStar = max(NewStar, 1)
|
||||
|
||||
NewStar := int(max(math.Ceil(float64(eneryg)/0.36*0.1), 1))
|
||||
return []*item.Item{{Id: item.ITEM_STAR_ID, Num: NewStar}}
|
||||
}
|
||||
|
||||
// 获取宝箱雨奖励
|
||||
func (l *LimitedTimeEventMod) GetChestReward(MergeList []int) []*item.Item {
|
||||
Star := 0
|
||||
func (l *LimitedTimeEventMod) GetChestReward(MergeList, EmitList []int) []*item.Item {
|
||||
eneryg := 0
|
||||
for _, v := range MergeList {
|
||||
Star += mergeDataCfg.GetStarById(v)
|
||||
ChessLv := mergeDataCfg.GetLvById(v)
|
||||
Color := mergeDataCfg.GetColorById(v)
|
||||
EmitId := order.GetEmitByColor(EmitList, Color)
|
||||
if EmitId == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
return limitedTimeEventCfg.GetChestReward(Star)
|
||||
NewChessLv := mergeDataCfg.DynamicLevRev(ChessLv, EmitId, Color)
|
||||
eneryg += int(math.Pow(2, float64(NewChessLv-1)))
|
||||
}
|
||||
star := math.Ceil(float64(eneryg) / 10 / 2.5)
|
||||
return []*item.Item{item.NewItem(item.ITEM_DIAMOND_ID, int(star))}
|
||||
}
|
||||
|
||||
// 获取场景冲刺奖励
|
||||
@ -307,24 +316,17 @@ func (l *LimitedTimeEventMod) GetSceneDashReward() (int, []*item.Item) {
|
||||
}
|
||||
|
||||
// 获取连击快手奖励
|
||||
func (l *LimitedTimeEventMod) GetFastProduceReward(Energy int) ([]*item.Item, int64, int, error) {
|
||||
func (l *LimitedTimeEventMod) GetFastProduceReward(Energy int) ([]*item.Item, error) {
|
||||
Event, ok := l.EventList[EVENT_TYPE_FAST_PRODUCE]
|
||||
if !ok {
|
||||
return nil, 0, 0, fmt.Errorf("FastProduce event not exist")
|
||||
return nil, fmt.Errorf("FastProduce event not exist")
|
||||
}
|
||||
|
||||
Now := GoUtil.Now()
|
||||
if Now < GoUtil.Int64(Event.Info["NextPlay"]) {
|
||||
return nil, 0, 0, fmt.Errorf("FastProduce CD")
|
||||
return nil, fmt.Errorf("FastProduce CD")
|
||||
}
|
||||
Times := GoUtil.Int(Event.Info["Times"])
|
||||
Times++
|
||||
Event.Info["Times"] = Times
|
||||
MaxTimes := limitedTimeEventCfg.GetFastProduceMaxTimes()
|
||||
Times = min(Times, MaxTimes)
|
||||
CD := limitedTimeEventCfg.GetFastCD()
|
||||
Event.Info["NextPlay"] = GoUtil.Now() + int64(CD) // CD5分钟
|
||||
return limitedTimeEventCfg.GetFastProduceReward(Times, Energy), GoUtil.Now() + int64(CD), Event.Info["Times"].(int), nil
|
||||
return limitedTimeEventCfg.GetFastProduceReward(Energy), nil
|
||||
}
|
||||
|
||||
func (l *LimitedTimeEventMod) ResetFastProduceCD() {
|
||||
@ -434,17 +436,18 @@ func (l *LimitedTimeEventMod) AddCatTrickEnergy(Energy int) {
|
||||
}
|
||||
|
||||
func (l *LimitedTimeEventMod) SubPaybackDay() error {
|
||||
if l.EventList[EVENT_TYPE_PAYBACK_DAY] == nil {
|
||||
return fmt.Errorf("PaybackDay event not exist")
|
||||
}
|
||||
d := l.EventList[EVENT_TYPE_PAYBACK_DAY].D.(*PaybackDay)
|
||||
if d.count <= 0 {
|
||||
return fmt.Errorf("PaybackDay count is 0")
|
||||
}
|
||||
d.count--
|
||||
if d.count <= 0 {
|
||||
delete(l.EventList, EVENT_TYPE_PAYBACK_DAY)
|
||||
}
|
||||
// 2026.1.20 改版 不限制次数
|
||||
// if l.EventList[EVENT_TYPE_PAYBACK_DAY] == nil {
|
||||
// return fmt.Errorf("PaybackDay event not exist")
|
||||
// }
|
||||
// d := l.EventList[EVENT_TYPE_PAYBACK_DAY].D.(*PaybackDay)
|
||||
// if d.count <= 0 {
|
||||
// return fmt.Errorf("PaybackDay count is 0")
|
||||
// }
|
||||
// d.count--
|
||||
// if d.count <= 0 {
|
||||
// delete(l.EventList, EVENT_TYPE_PAYBACK_DAY)
|
||||
// }
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -457,8 +460,9 @@ func (l *LimitedTimeEventMod) GetCatTrickReward() ([]*item.Item, error) {
|
||||
return nil, fmt.Errorf("CatTrick energy not enough")
|
||||
}
|
||||
d.Energy -= 100
|
||||
// TODO 放到配置中
|
||||
return []*item.Item{
|
||||
{Id: item.ITEM_DIAMOND_ID, Num: 1},
|
||||
{Id: item.ITEM_DIAMOND_ID, Num: 5},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
||||
@ -3,6 +3,7 @@ package game
|
||||
import (
|
||||
"server/game/mod/chess"
|
||||
"server/game/mod/decorate"
|
||||
limitedTimeEvent "server/game/mod/limited_time_event"
|
||||
)
|
||||
|
||||
func (p *Player) GetChessMod() *chess.ChessBorad {
|
||||
@ -12,3 +13,7 @@ func (p *Player) GetChessMod() *chess.ChessBorad {
|
||||
func (p *Player) GetDecorateMod() *decorate.Decorate {
|
||||
return p.PlayMod.getDecorateMod()
|
||||
}
|
||||
|
||||
func (p *Player) GetLimitEventMod() *limitedTimeEvent.LimitedTimeEventMod {
|
||||
return p.PlayMod.getLimitedTimeEventMod()
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"server/MergeConst"
|
||||
"server/conf"
|
||||
baseCfg "server/conf/base"
|
||||
userCfg "server/conf/user"
|
||||
"server/db"
|
||||
@ -136,7 +137,7 @@ func (p *PlayerBaseData) SaveDataFromDB(Key interface{}) bool {
|
||||
sqlStruck.LoginTime = int32(BaseMod.LoginTime)
|
||||
sqlStruck.UserName = p.Data.UserName
|
||||
sqlStruck.LogoutTime = int32(BaseMod.LogoutTime)
|
||||
sqlStruck.Node = p.Data.Node
|
||||
sqlStruck.Node = int32(conf.Server.ServerID)
|
||||
sqlStruck.Rolecreatetime = p.Data.Rolecreatetime
|
||||
sqlStruck.NoAd = p.Data.NoAd
|
||||
sqlStruck.ChampshipsGroupID = p.Data.ChampshipsGroupID
|
||||
@ -611,6 +612,9 @@ func (p *PlayerBaseData) GetLastLoginTime() int {
|
||||
}
|
||||
|
||||
func (p *PlayerBaseData) GetName() string {
|
||||
if p == nil {
|
||||
return ""
|
||||
}
|
||||
return p.Data.UserName
|
||||
}
|
||||
|
||||
|
||||
@ -92,7 +92,7 @@ func (p *PlayerChessData) UpdatePlayerChessData(player *Player, buf []byte) erro
|
||||
for _, v := range update.MChessHandle {
|
||||
HandleStr += fmt.Sprintf("%v-%v-%v,", v.Id, v.ChessId, v.Type)
|
||||
}
|
||||
log.Debug("棋子数据不一致, %v===%v===%v===%v", HandleStr, LastMap, update.MChessData, player.PlayMod.getChessMod().GetChessList())
|
||||
log.Debug("棋子数据不一致地图, %v===%v===%v===%v", HandleStr, LastMap, update.MChessData, player.PlayMod.getChessMod().GetChessList())
|
||||
player.SendErrClienRes(res)
|
||||
player.TeLog("outsync_event", map[string]interface{}{
|
||||
"outsync_event": "UpdatePlayerChessDataFunc",
|
||||
@ -118,7 +118,7 @@ func (p *PlayerChessData) UpdateChessData(player *Player, MChessData map[string]
|
||||
Code: msg.RES_CODE_FAIL,
|
||||
Msg: "棋子数据不一致",
|
||||
}
|
||||
log.Debug("棋子数据不一致, %v---%v", p.Data.MChessData, player.PlayMod.getChessMod().GetChessList())
|
||||
log.Debug("棋子数据不一致地图, %v---%v---%v", player.PlayMod.getChessMod().ChessMap, p.Data.MChessData, player.PlayMod.getChessMod().GetChessList())
|
||||
player.SendErrClienRes(res)
|
||||
player.TeLog("outsync_event", map[string]interface{}{
|
||||
"outsync_event": "UpdatePlayerChessDataFunc",
|
||||
@ -165,7 +165,47 @@ func (p *PlayerChessData) checkChessEqual(player *Player) bool {
|
||||
for _, v := range p.Data.MChessData {
|
||||
bCopy = append(bCopy, int(v))
|
||||
}
|
||||
return SlicesEqual(aCopy, bCopy)
|
||||
isEqual := SlicesEqual(aCopy, bCopy)
|
||||
|
||||
if isEqual {
|
||||
return true
|
||||
}
|
||||
// 找出aCopy多的元素和少的元素
|
||||
aMap := make(map[int]int)
|
||||
bMap := make(map[int]int)
|
||||
|
||||
for _, v := range aCopy {
|
||||
aMap[v]++
|
||||
}
|
||||
for _, v := range bCopy {
|
||||
bMap[v]++
|
||||
}
|
||||
|
||||
extra := make([]int, 0)
|
||||
missing := make([]int, 0)
|
||||
|
||||
// 找出aCopy多的元素
|
||||
for k, v := range aMap {
|
||||
if bMap[k] < v {
|
||||
for i := 0; i < v-bMap[k]; i++ {
|
||||
extra = append(extra, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 找出aCopy少的元素
|
||||
for k, v := range bMap {
|
||||
if aMap[k] < v {
|
||||
for i := 0; i < v-aMap[k]; i++ {
|
||||
missing = append(missing, k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(extra) > 0 || len(missing) > 0 {
|
||||
log.Debug("棋子数据对比: aCopy多的元素=%v, aCopy少的元素=%v", extra, missing)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// 棋子操作
|
||||
|
||||
@ -27,6 +27,7 @@ import (
|
||||
"server/game/mod/quest"
|
||||
GoUtil "server/game_util"
|
||||
"server/msg"
|
||||
telog "server/thinkdata"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@ -268,8 +269,10 @@ func (p *Player) OrderShip() {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// 避免为每个订单创建 goroutine,改为批量处理或同步处理
|
||||
for _, OrderInfo := range OrderList {
|
||||
go p.TriggerShippingOrderOrigin(&msg.ReqShippingOrder{
|
||||
// 直接同步处理,避免创建过多 goroutine
|
||||
p.TriggerShippingOrderOrigin(&msg.ReqShippingOrder{
|
||||
OrderSn: OrderInfo.OrderId,
|
||||
})
|
||||
}
|
||||
@ -417,6 +420,7 @@ func (p *Player) Login() {
|
||||
if Duration > 604800 {
|
||||
FriendMod := p.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_LOST_USER_RETURN, "")
|
||||
p.UpdateUserInfo()
|
||||
}
|
||||
|
||||
}
|
||||
@ -708,6 +712,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error {
|
||||
})
|
||||
FriendMod := p.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_AVATAR_FRAME, "")
|
||||
p.UpdateUserInfo()
|
||||
p.PlayerDecoLog("avatar", Effect[0], Label)
|
||||
BackDataType[item.ITEM_TYPE_AVATAR] = struct{}{}
|
||||
case item.ITEM_TYPE_EMOJI: // 表情
|
||||
@ -719,6 +724,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error {
|
||||
})
|
||||
FriendMod := p.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_EMOTION, "")
|
||||
p.UpdateUserInfo()
|
||||
p.PlayerDecoLog("emoji", Effect[0], Label)
|
||||
BackDataType[item.ITEM_TYPE_EMOJI] = struct{}{}
|
||||
case item.ITEM_TYPE_FACE: // 头像
|
||||
@ -730,6 +736,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error {
|
||||
})
|
||||
FriendMod := p.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_AVATAR, "")
|
||||
p.UpdateUserInfo()
|
||||
p.PlayerDecoLog("face", Effect[0], Label)
|
||||
BackDataType[item.ITEM_TYPE_FACE] = struct{}{}
|
||||
case item.ITEM_TYPE_ACTIVITY_RACE: // 活动竞速
|
||||
@ -755,6 +762,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error {
|
||||
Type, Name := playroomCfg.GetDecoInfo(Effect)
|
||||
FriendMod := p.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_DECORATION, "")
|
||||
p.UpdateUserInfo()
|
||||
p.TeLog("room_deco_get", map[string]interface{}{
|
||||
"room_deco_type": Type,
|
||||
"room_deco_name": Name,
|
||||
@ -769,6 +777,7 @@ func (p *Player) HandleItem(itemList []*item.Item, Label string) error {
|
||||
Name := playroomCfg.GetDressName(Effect)
|
||||
FriendMod := p.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_NEW_COSTUME, "")
|
||||
p.UpdateUserInfo()
|
||||
p.TeLog("pet_deco_get", map[string]interface{}{
|
||||
"pet_deco_type": Type,
|
||||
"pet_deco_name": Name,
|
||||
@ -1061,8 +1070,9 @@ func (p *Player) TeLog(Type string, Param map[string]interface{}) {
|
||||
}
|
||||
//Param["#zone_offset"] = -5
|
||||
// 游戏内TE日志
|
||||
//go telog.Te.Track(p.GetPlayerBaseMod().GetName(), p.GetPlayerBaseMod().GetName(), Type, Param)
|
||||
BaseMod := p.PlayMod.getBaseMod()
|
||||
UidStr := GoUtil.String(p.M_DwUin)
|
||||
go telog.Te.Track(BaseMod.Account, UidStr, Type, Param)
|
||||
//途游GA
|
||||
go ga.GAlogEvent(Type, BaseMod.Account, "", Param)
|
||||
}
|
||||
@ -1205,7 +1215,8 @@ func (p *Player) DispatcherHandle() {
|
||||
if msg != nil {
|
||||
p.wg.Done()
|
||||
log.Debug("player %d recive msg %v", p.M_DwUin, msg)
|
||||
go p.HandleMsg(msg.Clone())
|
||||
// 直接在当前 goroutine 中处理,避免创建过多 goroutine
|
||||
p.HandleMsg(msg.Clone())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -245,6 +245,9 @@ func RegHandbookAllReward(player *Player, buf []byte) error {
|
||||
})
|
||||
return err
|
||||
}
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_HANDBOOK_ACHIEVEMENT, req.Type)
|
||||
player.UpdateUserInfo()
|
||||
player.PushClientRes(&msg.ResHandbookAllReward{
|
||||
Code: msg.RES_CODE_SUCCESS,
|
||||
})
|
||||
@ -290,7 +293,7 @@ func ReqRewardOrder(player *Player, buf []byte) error {
|
||||
}
|
||||
}
|
||||
if LimitedTimeEventMod.CheckExist(limitedTimeEvent.EVENT_TYPE_METEOR_SHOW) { //流星雨活动
|
||||
AddItem := LimitedTimeEventMod.GetMeteorReward(mergeList)
|
||||
AddItem := LimitedTimeEventMod.GetMeteorReward(mergeList, ChessMod.GetStarEmitList())
|
||||
if len(AddItem) > 0 {
|
||||
player.TeLog("time_limited_event_action", map[string]interface{}{
|
||||
"event_type": limitedTimeEventCfg.GetEventName(limitedTimeEvent.EVENT_TYPE_METEOR_SHOW),
|
||||
@ -338,7 +341,7 @@ func ReqRewardOrder(player *Player, buf []byte) error {
|
||||
}
|
||||
|
||||
if LimitedTimeEventMod.CheckExist(limitedTimeEvent.EVENT_TYPE_CHEST_RAIN) { //宝箱雨活动
|
||||
ChestRainItems := LimitedTimeEventMod.GetChestReward(mergeList)
|
||||
ChestRainItems := LimitedTimeEventMod.GetChestReward(mergeList, ChessMod.GetStarEmitList())
|
||||
player.args["ResItemPopId"] = req.OrderId
|
||||
err = player.HandleItem(ChestRainItems, msg.ITEM_POP_LABEL_LimitEventChestRain.String())
|
||||
if err != nil {
|
||||
@ -560,7 +563,9 @@ func ReqDecorate(player *Player, buf []byte) error {
|
||||
if AreaId == 1 && DecorateId == 44 {
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_CLOAKROOM, "")
|
||||
}
|
||||
|
||||
if AreaId != DecorateMod.AreaId {
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_CHAPTER_SCENES, GoUtil.String(AreaId))
|
||||
}
|
||||
player.PlayMod.save()
|
||||
player.PushClientRes(DecorateMod.BackData())
|
||||
player.PushClientRes(&msg.ResDecorate{
|
||||
@ -1190,6 +1195,7 @@ func ReqCardCollectReward(player *Player, buf []byte) error {
|
||||
player.PlayMod.getChessMod().AddChessBuff(chess)
|
||||
player.PushClientRes(player.PlayMod.getOrderMod().BackData())
|
||||
}
|
||||
player.UpdateUserInfo()
|
||||
player.PlayMod.save()
|
||||
player.PushClientRes(CardMod.NotifyCard())
|
||||
player.PushClientRes(&msg.ResCardCollectReward{
|
||||
@ -1271,6 +1277,7 @@ func ReqAllCollectReward(player *Player, buf []byte) error {
|
||||
}
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_COMPLETE_ALL_CARDS, "all")
|
||||
player.UpdateUserInfo()
|
||||
player.PlayMod.save()
|
||||
player.TeLog("ReqAllCollectReward", map[string]interface{}{
|
||||
"item_list": itemList,
|
||||
@ -1711,7 +1718,7 @@ func ReqFastProduceReward(player *Player, buf []byte) error {
|
||||
return err
|
||||
}
|
||||
LimitedTimeEventMod := player.PlayMod.getLimitedTimeEventMod()
|
||||
itemList, EndTime, Times, err := LimitedTimeEventMod.GetFastProduceReward(int(req.Energy))
|
||||
itemList, err := LimitedTimeEventMod.GetFastProduceReward(int(req.Energy))
|
||||
if err != nil {
|
||||
player.SendErrClienRes(&msg.ResFastProduceReward{
|
||||
Code: msg.RES_CODE_FAIL,
|
||||
@ -1731,14 +1738,10 @@ func ReqFastProduceReward(player *Player, buf []byte) error {
|
||||
player.TeLog("ReqFastProduceReward", map[string]interface{}{
|
||||
"energy": int(req.Energy),
|
||||
"item_list": itemList,
|
||||
"end_time": EndTime,
|
||||
"times": Times,
|
||||
})
|
||||
player.PushClientRes(LimitedTimeEventMod.BackData())
|
||||
player.PushClientRes(&msg.ResFastProduceReward{
|
||||
Code: msg.RES_CODE_SUCCESS,
|
||||
EndTime: EndTime,
|
||||
Num: int32(Times),
|
||||
})
|
||||
return nil
|
||||
}
|
||||
@ -2993,6 +2996,7 @@ func ReqChampshipReward(player *Player, buf []byte) error {
|
||||
if MaxId == ChampshipMod.Reward {
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_CHAMPIONSHIP_PRIZE, "")
|
||||
player.UpdateUserInfo()
|
||||
}
|
||||
player.TeLog("championship_reward", map[string]interface{}{
|
||||
"season_id": GoUtil.ZeroTimestamp(),
|
||||
@ -3104,6 +3108,8 @@ func ReqFriendTLUpvote(player *Player, buf []byte) error {
|
||||
To: int(FUid),
|
||||
SendT: GoUtil.Now(),
|
||||
}
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_VISIT_UPVOTE, "")
|
||||
player.UpdateUserInfo()
|
||||
FriendMgrSend(m)
|
||||
player.PlayMod.save()
|
||||
player.PushClientRes(&msg.ResFriendTLUpvote{
|
||||
@ -3169,6 +3175,7 @@ func ReqChampshipRankReward(player *Player, buf []byte) error {
|
||||
if myPreRank <= 5 {
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_CHAMPIONSHIP_RANK, GoUtil.String(myPreRank))
|
||||
player.UpdateUserInfo()
|
||||
}
|
||||
player.PlayMod.save()
|
||||
player.BackChampship()
|
||||
@ -3960,6 +3967,7 @@ func ReqPlayroomSelectReward(player *Player, buf []byte) error {
|
||||
})
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_VISIT_GAME_PRIZE_1, "")
|
||||
player.UpdateUserInfo()
|
||||
PlayroomMod.ResetGame()
|
||||
player.PlayerDecoSetLog("emoji", int(req.EmojiId), "playroom_select_reward")
|
||||
player.PlayroomBackData()
|
||||
@ -4154,10 +4162,7 @@ func ReqPlayroomFlipReward(player *Player, buf []byte) error {
|
||||
if LimitedTimeEventMod.CheckExist(limitedTimeEvent.EVENT_TYPE_PET_THIEF) && Result == playroom.FLIP_TYPE_GOLD {
|
||||
player.GetPetThiefReward(Target)
|
||||
}
|
||||
if Result == playroom.FLIP_TYPE_GOLD {
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_VISIT_GAME_PRIZE, "")
|
||||
}
|
||||
|
||||
err = player.HandleItem(Items1, msg.ITEM_POP_LABEL_PlayroomFlip.String())
|
||||
if err != nil {
|
||||
player.SendErrClienRes(&msg.ResPlayroomFlip{
|
||||
@ -4166,6 +4171,11 @@ func ReqPlayroomFlipReward(player *Player, buf []byte) error {
|
||||
})
|
||||
return err
|
||||
}
|
||||
if Result == playroom.FLIP_TYPE_GOLD {
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_GET_VISIT_GAME_PRIZE, "")
|
||||
player.UpdateUserInfo()
|
||||
}
|
||||
FriendMgrSend(&MsqMod.Msg{
|
||||
From: int(player.M_DwUin),
|
||||
To: Target,
|
||||
@ -4651,6 +4661,7 @@ func ReqFriendTreasureEnd(player *Player, buf []byte) error {
|
||||
}
|
||||
FriendMod := player.PlayMod.getFriendMod()
|
||||
FriendMod.AddActLog(friend.ACT_LOG_TYPE_OPEN_PET_TREASURE, "")
|
||||
player.UpdateUserInfo()
|
||||
player.TeLog("pet_treasure_open", map[string]interface{}{
|
||||
"pet_treasure_step": FriendTreasureMod.Shift,
|
||||
"pet_treasure_box": FriendTreasureMod.BoxItems,
|
||||
|
||||
@ -43,8 +43,9 @@ func (s *ServerMod) init() {
|
||||
s.handler = make(map[int]interface{})
|
||||
s.update = false
|
||||
s.LoadData()
|
||||
// 直接调用 SaveData,不要创建新的 goroutine
|
||||
s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME)*time.Second, func() {
|
||||
go s.SaveData()
|
||||
s.SaveData()
|
||||
})
|
||||
go func() {
|
||||
defer func() {
|
||||
@ -96,8 +97,9 @@ func (s *ServerMod) Send(msg *msg.Msg) {
|
||||
|
||||
// 同步请求
|
||||
func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) {
|
||||
responseChan := make(chan interface{})
|
||||
errorChan := make(chan error)
|
||||
// 使用带缓冲的 channel 避免 goroutine 在超时时泄漏
|
||||
responseChan := make(chan interface{}, 1)
|
||||
errorChan := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
@ -129,8 +131,9 @@ func (s *ServerMod) Call(m *msg.Msg) (interface{}, error) {
|
||||
|
||||
// mysql 保存消息
|
||||
func (s *ServerMod) SaveData() {
|
||||
// 直接在定时器回调中执行,不创建新的 goroutine 避免泄漏
|
||||
s.mDispatr.AfterFunc(time.Duration(PER_SAVE_TIME+GoUtil.RandNum(5, 10))*time.Second, func() {
|
||||
go s.SaveData()
|
||||
s.SaveData()
|
||||
})
|
||||
DbData := db.SqlServerModStruct{}
|
||||
DbData.Key = s.key
|
||||
@ -139,23 +142,41 @@ func (s *ServerMod) SaveData() {
|
||||
DbData.ModData, err = GoUtil.GobMarshal(s.data)
|
||||
if err != nil {
|
||||
log.Error("SaveData Marshal failed,Mod Key: %s err:%v", s.key, err)
|
||||
return
|
||||
}
|
||||
// log.Debug("SaveData Marshal success,Mod Key: %s", s.key)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
|
||||
// 使用线程安全的方式获取数据库连接
|
||||
sqlDb := db.GetDB()
|
||||
if sqlDb == nil {
|
||||
log.Error("SaveData failed, database connection is nil, Mod Key: %s", s.key)
|
||||
return
|
||||
}
|
||||
|
||||
// 先测试连接是否可用
|
||||
if err := sqlDb.Ping(); err != nil {
|
||||
log.Error("SaveData failed, database ping error, Mod Key: %s err:%v", s.key, err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
txOptions := &sql.TxOptions{}
|
||||
tx, err := db.SqlDb.BeginTx(ctx, txOptions)
|
||||
tx, err := sqlDb.BeginTx(ctx, txOptions)
|
||||
if err != nil {
|
||||
log.Error("SaveData sql begin tx failed,Mod Key: %s err:%v", s.key, err)
|
||||
log.Error("SaveData sql begin tx failed,Mod Key: %s err:%v, data size: %d bytes", s.key, err, len(DbData.ModData))
|
||||
return
|
||||
}
|
||||
err = db.SaveServerData(&DbData)
|
||||
err = db.SaveServerDataWithTx(tx, &DbData)
|
||||
if err != nil {
|
||||
tx.Rollback()
|
||||
log.Error("SaveData sql exec ,Mod Key: %s err:%v", s.key, err)
|
||||
log.Error("SaveData sql exec failed,Mod Key: %s err:%v", s.key, err)
|
||||
return
|
||||
}
|
||||
tx.Commit()
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
log.Error("SaveData sql commit failed,Mod Key: %s err:%v", s.key, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *ServerMod) LoadData() {
|
||||
|
||||
@ -185,9 +185,9 @@ func UnitLimitProgress(p *Player) error {
|
||||
|
||||
func UnitLimitedTimeEvent(p *Player) error {
|
||||
LimitedTimeEventMod := p.PlayMod.getLimitedTimeEventMod()
|
||||
|
||||
ChessMod := p.PlayMod.getChessMod()
|
||||
mergeList := []int{246, 15}
|
||||
AddItem := LimitedTimeEventMod.GetChestReward(mergeList)
|
||||
AddItem := LimitedTimeEventMod.GetChestReward(mergeList, ChessMod.GetStarEmitList())
|
||||
fmt.Print(AddItem)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"server/conf"
|
||||
"server/game"
|
||||
@ -13,6 +14,9 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 设置使用所有 CPU 核心
|
||||
runtime.GOMAXPROCS(runtime.NumCPU())
|
||||
|
||||
lconf.LogLevel = conf.Server.LogLevel
|
||||
lconf.LogPath = conf.Server.LogPath
|
||||
lconf.LogFlag = conf.LogFlag
|
||||
|
||||
@ -27231,7 +27231,7 @@ func (x *AdminReq) GetInfo() []byte {
|
||||
type AdminRes struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Func string `protobuf:"bytes,1,opt,name=Func,proto3" json:"Func,omitempty"`
|
||||
Info []byte `protobuf:"bytes,2,opt,name=Info,proto3" json:"Info,omitempty"`
|
||||
Info string `protobuf:"bytes,2,opt,name=Info,proto3" json:"Info,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@ -27273,11 +27273,11 @@ func (x *AdminRes) GetFunc() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *AdminRes) GetInfo() []byte {
|
||||
func (x *AdminRes) GetInfo() string {
|
||||
if x != nil {
|
||||
return x.Info
|
||||
}
|
||||
return nil
|
||||
return ""
|
||||
}
|
||||
|
||||
type ReqAdminInfo struct {
|
||||
@ -29602,7 +29602,7 @@ const file_proto_Gameapi_proto_rawDesc = "" +
|
||||
"\x04Info\x18\x02 \x01(\fR\x04Info\"2\n" +
|
||||
"\bAdminRes\x12\x12\n" +
|
||||
"\x04Func\x18\x01 \x01(\tR\x04Func\x12\x12\n" +
|
||||
"\x04Info\x18\x02 \x01(\fR\x04Info\" \n" +
|
||||
"\x04Info\x18\x02 \x01(\tR\x04Info\" \n" +
|
||||
"\fReqAdminInfo\x12\x10\n" +
|
||||
"\x03Uid\x18\x01 \x01(\x03R\x03Uid\"\x15\n" +
|
||||
"\x13ReqReloadServerMail\"\x0f\n" +
|
||||
|
||||
@ -18,7 +18,7 @@ const (
|
||||
releaseLevel = 1
|
||||
errorLevel = 2
|
||||
fatalLevel = 3
|
||||
|
||||
warnLevel = 4
|
||||
DEBUG_LEVEL = 0
|
||||
)
|
||||
|
||||
@ -27,6 +27,7 @@ const (
|
||||
printReleaseLevel = "[release] "
|
||||
printErrorLevel = "[error ] "
|
||||
printFatalLevel = "[fatal ] "
|
||||
printWarnLevel = "[warn ] "
|
||||
)
|
||||
|
||||
type Logger struct {
|
||||
@ -52,6 +53,8 @@ func New(strLevel string, pathname string, flag int) (*Logger, error) {
|
||||
level = errorLevel
|
||||
case "fatal":
|
||||
level = fatalLevel
|
||||
case "warn":
|
||||
level = warnLevel
|
||||
default:
|
||||
return nil, errors.New("unknown level: " + strLevel)
|
||||
}
|
||||
@ -100,6 +103,8 @@ func NewDailyLog(now time.Time, Level int, pathname string, flag int) error {
|
||||
level = errorLevel
|
||||
case fatalLevel:
|
||||
level = fatalLevel
|
||||
case warnLevel:
|
||||
level = warnLevel
|
||||
default:
|
||||
return fmt.Errorf("unknown level: %d", Level)
|
||||
}
|
||||
@ -145,6 +150,8 @@ func BindLoggerToFile(strLevel string, pathName string, flag int) (*Logger, erro
|
||||
level = errorLevel
|
||||
case "fatal":
|
||||
level = fatalLevel
|
||||
case "warn":
|
||||
level = warnLevel
|
||||
default:
|
||||
return nil, errors.New("unknown level: " + strLevel)
|
||||
}
|
||||
@ -207,6 +214,9 @@ func (logger *Logger) Error(format string, a ...interface{}) {
|
||||
func (logger *Logger) Fatal(format string, a ...interface{}) {
|
||||
logger.doPrintf(fatalLevel, printFatalLevel, format, a...)
|
||||
}
|
||||
func (logger *Logger) Warn(format string, a ...interface{}) {
|
||||
logger.doPrintf(warnLevel, printWarnLevel, format, a...)
|
||||
}
|
||||
|
||||
// It's dangerous to call the method on logging
|
||||
func Export(logger *Logger) {
|
||||
@ -235,6 +245,15 @@ func Release(format string, a ...interface{}) {
|
||||
gLogger.doPrintf(releaseLevel, printReleaseLevel, format, a...)
|
||||
}
|
||||
|
||||
func Warn(format string, a ...interface{}) {
|
||||
gloggerLock.Lock()
|
||||
defer gloggerLock.Unlock()
|
||||
if gLogger == nil {
|
||||
return
|
||||
}
|
||||
gLogger.doPrintf(warnLevel, printWarnLevel, format, a...)
|
||||
}
|
||||
|
||||
func Error(format string, a ...interface{}) {
|
||||
gloggerLock.Lock()
|
||||
defer gloggerLock.Unlock()
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
{
|
||||
"AppID": 1,
|
||||
"AppID": 0,
|
||||
"LogLevel": "debug",
|
||||
"LogPath": "./log",
|
||||
"TCPAddr": ":3601",
|
||||
@ -9,7 +9,7 @@
|
||||
"MySqlUsr": "root",
|
||||
"MySqlPwd": "IOagNEq3C84c-20CmHEin5iODVc=",
|
||||
"MaxConnNum": 20000,
|
||||
"DbName": "Merge_Pet_1",
|
||||
"DbName": "merge_pet_1",
|
||||
"HttpPort": ":8081",
|
||||
"AppPath": "./app",
|
||||
"TELOGDIR" : "./teLog/",
|
||||
|
||||
@ -1,7 +1,11 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
decorateCfg "server/conf/decorate"
|
||||
"server/db"
|
||||
"server/game"
|
||||
"server/pkg/github.com/name5566/leaf/log"
|
||||
"testing"
|
||||
)
|
||||
|
||||
@ -11,7 +15,7 @@ func TestFixDecorate(t *testing.T) {
|
||||
p.FixDecorate()
|
||||
|
||||
//
|
||||
p.InitPlayer("202601K111")
|
||||
p.InitPlayer("dee8eeb83ea3c54e48427b7ff20066fb")
|
||||
p.FixDecorate()
|
||||
|
||||
DecorateMod := p.GetDecorateMod()
|
||||
@ -19,3 +23,58 @@ func TestFixDecorate(t *testing.T) {
|
||||
DecorateMod.Progress = 22
|
||||
p.FixDecorate()
|
||||
}
|
||||
|
||||
func TestFixUserData(t *testing.T) {
|
||||
// 确保数据库已初始化
|
||||
if db.SqlDb == nil {
|
||||
db.InitDB()
|
||||
// 等待初始化完成
|
||||
for i := 0; i < 10; i++ {
|
||||
if db.SqlDb != nil {
|
||||
break
|
||||
}
|
||||
log.Warn("Waiting for database initialization...")
|
||||
// time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
if db.SqlDb == nil {
|
||||
t.Fatal("Database initialization failed")
|
||||
}
|
||||
|
||||
log.Warn("hello world")
|
||||
type account struct {
|
||||
Account string `db:"user_name"`
|
||||
}
|
||||
var accounts []account
|
||||
sqlDb := db.GetDB() // 使用线程安全的方式获取连接
|
||||
if sqlDb == nil {
|
||||
t.Fatal("Database connection is nil")
|
||||
}
|
||||
err := sqlDb.Select(&accounts, "SELECT `user_name` FROM t_account order by auto_id")
|
||||
if err != nil {
|
||||
t.Errorf("Failed to fetch accounts: %v", err)
|
||||
return
|
||||
}
|
||||
i := 0
|
||||
for _, acc := range accounts {
|
||||
i++
|
||||
fmt.Printf("Fixing account %d/%d: %s\n", i, len(accounts), acc.Account)
|
||||
account := acc.Account
|
||||
p := new(game.Player)
|
||||
p.InitPlayer(account)
|
||||
DecorateMod := p.GetDecorateMod()
|
||||
if DecorateMod.PartCost == nil {
|
||||
return
|
||||
}
|
||||
for k := range DecorateMod.PartCost {
|
||||
AreaId := decorateCfg.GetAreaIdByIndoorId(k)
|
||||
if AreaId < DecorateMod.AreaId {
|
||||
log.Debug("Fixing account: %s, PartId: %d, OldAreaId: %d, NewAreaId: %d\n", account, k, AreaId, DecorateMod.AreaId)
|
||||
}
|
||||
}
|
||||
p.Stop()
|
||||
p = nil
|
||||
}
|
||||
log.Debug("All accounts fixed")
|
||||
}
|
||||
|
||||
30
src/server/test/limit_test.go
Normal file
30
src/server/test/limit_test.go
Normal file
@ -0,0 +1,30 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"server/game"
|
||||
limitedTimeEvent "server/game/mod/limited_time_event"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestMetroRain(t *testing.T) {
|
||||
// 3625212
|
||||
p := new(game.Player)
|
||||
p.InitPlayer("3625212")
|
||||
ChessMod := p.GetChessMod()
|
||||
LimitEventMod := p.GetLimitEventMod()
|
||||
rewards := LimitEventMod.GetMeteorReward([]int{1, 22, 3}, ChessMod.GetStarEmitList())
|
||||
t.Logf("rewards: %v", rewards)
|
||||
}
|
||||
|
||||
func TestFast(t *testing.T) {
|
||||
p := new(game.Player)
|
||||
p.InitPlayer("3625212")
|
||||
LimitEventMod := p.GetLimitEventMod()
|
||||
LimitEventMod.AddEvent(limitedTimeEvent.EVENT_TYPE_FAST_PRODUCE, 60)
|
||||
items, err := LimitEventMod.GetFastProduceReward(50)
|
||||
if err != nil {
|
||||
t.Errorf("GetFastProduceReward error: %v", err)
|
||||
return
|
||||
}
|
||||
t.Logf("Fast produce items: %v", items)
|
||||
}
|
||||
@ -20,3 +20,23 @@ func TestOrderStart(t *testing.T) {
|
||||
star = int(float64(star)*float64(order_facotry)/1000+0.5) * 10
|
||||
fmt.Printf("star is %d", star)
|
||||
}
|
||||
|
||||
func TestOrderFinish(t *testing.T) {
|
||||
p1 := new(game.Player)
|
||||
p1.InitPlayer("3625212")
|
||||
game.G_GameLogicPtr.SetPlayer(p1)
|
||||
ChessMod := p1.GetChessMod()
|
||||
err := ChessMod.FinishOrder([]int{1, 2, 3})
|
||||
if err != nil {
|
||||
t.Errorf("finish order failed:%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestChestRain(t *testing.T) {
|
||||
p1 := new(game.Player)
|
||||
p1.InitPlayer("GSTTEST011")
|
||||
ChessMod := p1.GetChessMod()
|
||||
LimitEventMod := p1.GetLimitEventMod()
|
||||
f := LimitEventMod.GetChestReward([]int{928}, ChessMod.GetStarEmitList())
|
||||
fmt.Printf("chest rain reward:%v", f)
|
||||
}
|
||||
|
||||
@ -2,8 +2,7 @@ package telog
|
||||
|
||||
import (
|
||||
"server/conf"
|
||||
|
||||
"github.com/ThinkingDataAnalytics/go-sdk/v2/src/thinkingdata"
|
||||
"server/thinkingdata"
|
||||
)
|
||||
|
||||
var Te thinkingdata.TDAnalytics
|
||||
|
||||
352
src/server/thinkingdata/consumer_batch.go
Normal file
352
src/server/thinkingdata/consumer_batch.go
Normal file
@ -0,0 +1,352 @@
|
||||
package thinkingdata
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TDBatchConsumer upload data to TE by http
|
||||
type TDBatchConsumer struct {
|
||||
serverUrl string // serverUrl
|
||||
appId string // appId
|
||||
timeout time.Duration // http timeout (mill second)
|
||||
compress bool // is need compress
|
||||
bufferMutex *sync.RWMutex
|
||||
cacheMutex *sync.RWMutex // cache mutex
|
||||
|
||||
buffer []Data
|
||||
batchSize int // flush event count each time
|
||||
cacheBuffer [][]Data // buffer
|
||||
cacheCapacity int // buffer max count
|
||||
}
|
||||
|
||||
type TDBatchConfig struct {
|
||||
ServerUrl string // serverUrl
|
||||
AppId string // appId
|
||||
BatchSize int // flush event count each time
|
||||
Timeout int // http timeout (mill second)
|
||||
Compress bool // enable compress data
|
||||
AutoFlush bool // enable auto flush
|
||||
Interval int // auto flush spacing (second)
|
||||
CacheCapacity int // cache event count
|
||||
}
|
||||
|
||||
const (
|
||||
DefaultTimeOut = 30000
|
||||
DefaultBatchSize = 20
|
||||
MaxBatchSize = 200
|
||||
DefaultInterval = 30
|
||||
DefaultCacheCapacity = 50
|
||||
)
|
||||
|
||||
// NewBatchConsumer create TDBatchConsumer
|
||||
func NewBatchConsumer(serverUrl string, appId string) (TDConsumer, error) {
|
||||
config := TDBatchConfig{
|
||||
ServerUrl: serverUrl,
|
||||
AppId: appId,
|
||||
Compress: true,
|
||||
}
|
||||
return initBatchConsumer(config)
|
||||
}
|
||||
|
||||
// NewBatchConsumerWithBatchSize create TDBatchConsumer
|
||||
// serverUrl
|
||||
// appId
|
||||
// batchSize: flush event count each time
|
||||
func NewBatchConsumerWithBatchSize(serverUrl string, appId string, batchSize int) (TDConsumer, error) {
|
||||
config := TDBatchConfig{
|
||||
ServerUrl: serverUrl,
|
||||
AppId: appId,
|
||||
Compress: true,
|
||||
BatchSize: batchSize,
|
||||
}
|
||||
return initBatchConsumer(config)
|
||||
}
|
||||
|
||||
// NewBatchConsumerWithCompress create TDBatchConsumer
|
||||
// serverUrl
|
||||
// appId
|
||||
// compress: enable data compress
|
||||
func NewBatchConsumerWithCompress(serverUrl string, appId string, compress bool) (TDConsumer, error) {
|
||||
config := TDBatchConfig{
|
||||
ServerUrl: serverUrl,
|
||||
AppId: appId,
|
||||
Compress: compress,
|
||||
}
|
||||
return initBatchConsumer(config)
|
||||
}
|
||||
|
||||
func NewBatchConsumerWithConfig(config TDBatchConfig) (TDConsumer, error) {
|
||||
return initBatchConsumer(config)
|
||||
}
|
||||
|
||||
func initBatchConsumer(config TDBatchConfig) (TDConsumer, error) {
|
||||
if config.ServerUrl == "" {
|
||||
msg := fmt.Sprint("ServerUrl not be empty")
|
||||
tdLogInfo(msg)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
u, err := url.Parse(config.ServerUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
u.Path = "/sync_server"
|
||||
|
||||
var batchSize int
|
||||
if config.BatchSize > MaxBatchSize {
|
||||
batchSize = MaxBatchSize
|
||||
} else if config.BatchSize <= 0 {
|
||||
batchSize = DefaultBatchSize
|
||||
} else {
|
||||
batchSize = config.BatchSize
|
||||
}
|
||||
|
||||
var cacheCapacity int
|
||||
if config.CacheCapacity <= 0 {
|
||||
cacheCapacity = DefaultCacheCapacity
|
||||
} else {
|
||||
cacheCapacity = config.CacheCapacity
|
||||
}
|
||||
|
||||
var timeout int
|
||||
if config.Timeout == 0 {
|
||||
timeout = DefaultTimeOut
|
||||
} else {
|
||||
timeout = config.Timeout
|
||||
}
|
||||
|
||||
c := &TDBatchConsumer{
|
||||
serverUrl: u.String(),
|
||||
appId: config.AppId,
|
||||
timeout: time.Duration(timeout) * time.Millisecond,
|
||||
compress: config.Compress,
|
||||
bufferMutex: new(sync.RWMutex),
|
||||
cacheMutex: new(sync.RWMutex),
|
||||
batchSize: batchSize,
|
||||
buffer: make([]Data, 0, batchSize),
|
||||
cacheCapacity: cacheCapacity,
|
||||
cacheBuffer: make([][]Data, 0, cacheCapacity),
|
||||
}
|
||||
|
||||
var interval int
|
||||
if config.Interval == 0 {
|
||||
interval = DefaultInterval
|
||||
} else {
|
||||
interval = config.Interval
|
||||
}
|
||||
if config.AutoFlush {
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Duration(interval) * time.Second)
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
<-ticker.C
|
||||
_ = c.timerFlush()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
tdLogInfo("Mode: batch consumer, appId: %s, serverUrl: %s", c.appId, c.serverUrl)
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) Add(d Data) error {
|
||||
c.bufferMutex.Lock()
|
||||
c.buffer = append(c.buffer, d)
|
||||
c.bufferMutex.Unlock()
|
||||
|
||||
tdLogInfo("Enqueue event data: %v", d)
|
||||
|
||||
if c.getBufferLength() >= c.batchSize || c.getCacheLength() > 0 {
|
||||
err := c.Flush()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) timerFlush() error {
|
||||
tdLogInfo("timer flush data")
|
||||
return c.innerFlush()
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) Flush() error {
|
||||
tdLogInfo("flush data")
|
||||
return c.innerFlush()
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) innerFlush() error {
|
||||
|
||||
c.cacheMutex.Lock()
|
||||
defer c.cacheMutex.Unlock()
|
||||
|
||||
c.bufferMutex.Lock()
|
||||
defer c.bufferMutex.Unlock()
|
||||
|
||||
if len(c.buffer) == 0 && len(c.cacheBuffer) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if len(c.cacheBuffer) > c.cacheCapacity {
|
||||
c.cacheBuffer = c.cacheBuffer[1:]
|
||||
}
|
||||
}()
|
||||
|
||||
if len(c.cacheBuffer) == 0 || len(c.buffer) >= c.batchSize {
|
||||
c.cacheBuffer = append(c.cacheBuffer, c.buffer)
|
||||
c.buffer = make([]Data, 0, c.batchSize)
|
||||
}
|
||||
|
||||
err := c.uploadEvents()
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) uploadEvents() error {
|
||||
buffer := c.cacheBuffer[0]
|
||||
|
||||
jsonBytes, err := json.Marshal(buffer)
|
||||
if err == nil {
|
||||
params := parseTime(jsonBytes)
|
||||
for i := 0; i < 3; i++ {
|
||||
statusCode, code, err := c.send(params, len(buffer))
|
||||
if statusCode == 200 {
|
||||
c.cacheBuffer = c.cacheBuffer[1:]
|
||||
switch code {
|
||||
case 0:
|
||||
tdLogInfo("send success: %v", params)
|
||||
return nil
|
||||
case 1, -1:
|
||||
msg := "invalid data format"
|
||||
tdLogError(msg)
|
||||
return fmt.Errorf(msg)
|
||||
case -2:
|
||||
msg := "APP ID doesn't exist"
|
||||
tdLogError(msg)
|
||||
return fmt.Errorf(msg)
|
||||
case -3:
|
||||
msg := "invalid ip transmission"
|
||||
tdLogError(msg)
|
||||
return fmt.Errorf(msg)
|
||||
default:
|
||||
msg := "unknown error"
|
||||
tdLogError(msg)
|
||||
return fmt.Errorf(msg)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if i == 2 {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) FlushAll() error {
|
||||
for c.getCacheLength() > 0 || c.getBufferLength() > 0 {
|
||||
if err := c.Flush(); err != nil {
|
||||
if !strings.Contains(err.Error(), "ThinkingDataError") {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) Close() error {
|
||||
tdLogInfo("batch consumer close")
|
||||
return c.FlushAll()
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) IsStringent() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) send(data string, size int) (statusCode int, code int, err error) {
|
||||
var encodedData string
|
||||
var compressType = "gzip"
|
||||
if c.compress {
|
||||
encodedData, err = encodeData(data)
|
||||
} else {
|
||||
encodedData = data
|
||||
compressType = "none"
|
||||
}
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
postData := bytes.NewBufferString(encodedData)
|
||||
|
||||
var resp *http.Response
|
||||
req, _ := http.NewRequest("POST", c.serverUrl, postData)
|
||||
req.Header["appid"] = []string{c.appId}
|
||||
req.Header.Set("user-agent", "ta-go-sdk")
|
||||
req.Header.Set("version", SdkVersion)
|
||||
req.Header.Set("compress", compressType)
|
||||
req.Header["TA-Integration-Type"] = []string{LibName}
|
||||
req.Header["TA-Integration-Version"] = []string{SdkVersion}
|
||||
req.Header["TA-Integration-Count"] = []string{strconv.Itoa(size)}
|
||||
client := &http.Client{Timeout: c.timeout}
|
||||
resp, err = client.Do(req)
|
||||
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
var result struct {
|
||||
Code int
|
||||
}
|
||||
|
||||
err = json.Unmarshal(body, &result)
|
||||
if err != nil {
|
||||
return resp.StatusCode, 1, err
|
||||
}
|
||||
|
||||
return resp.StatusCode, result.Code, nil
|
||||
} else {
|
||||
return resp.StatusCode, -1, nil
|
||||
}
|
||||
}
|
||||
|
||||
// Gzip
|
||||
func encodeData(data string) (string, error) {
|
||||
var buf bytes.Buffer
|
||||
gw := gzip.NewWriter(&buf)
|
||||
|
||||
_, err := gw.Write([]byte(data))
|
||||
if err != nil {
|
||||
gw.Close()
|
||||
return "", err
|
||||
}
|
||||
gw.Close()
|
||||
|
||||
return string(buf.Bytes()), nil
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) getBufferLength() int {
|
||||
c.bufferMutex.RLock()
|
||||
defer c.bufferMutex.RUnlock()
|
||||
return len(c.buffer)
|
||||
}
|
||||
|
||||
func (c *TDBatchConsumer) getCacheLength() int {
|
||||
c.cacheMutex.RLock()
|
||||
defer c.cacheMutex.RUnlock()
|
||||
return len(c.cacheBuffer)
|
||||
}
|
||||
120
src/server/thinkingdata/consumer_debug.go
Normal file
120
src/server/thinkingdata/consumer_debug.go
Normal file
@ -0,0 +1,120 @@
|
||||
package thinkingdata
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// TDDebugConsumer The data is reported one by one, and when an error occurs, the log will be printed on the console.
|
||||
type TDDebugConsumer struct {
|
||||
serverUrl string // serverUrl
|
||||
appId string // appId
|
||||
writeData bool // is archive to TE
|
||||
deviceId string // be used to debug in TE
|
||||
}
|
||||
|
||||
// NewDebugConsumer init TDDebugConsumer
|
||||
func NewDebugConsumer(serverUrl string, appId string) (TDConsumer, error) {
|
||||
return NewDebugConsumerWithWriter(serverUrl, appId, true)
|
||||
}
|
||||
|
||||
func NewDebugConsumerWithWriter(serverUrl string, appId string, writeData bool) (TDConsumer, error) {
|
||||
return NewDebugConsumerWithDeviceId(serverUrl, appId, writeData, "")
|
||||
}
|
||||
|
||||
func NewDebugConsumerWithDeviceId(serverUrl string, appId string, writeData bool, deviceId string) (TDConsumer, error) {
|
||||
// enable console log
|
||||
SetLogLevel(TDLogLevelDebug)
|
||||
|
||||
if len(serverUrl) <= 0 {
|
||||
msg := fmt.Sprint("ServerUrl not be empty")
|
||||
tdLogError(msg)
|
||||
return nil, errors.New(msg)
|
||||
}
|
||||
|
||||
u, err := url.Parse(serverUrl)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u.Path = "/data_debug"
|
||||
|
||||
c := &TDDebugConsumer{serverUrl: u.String(), appId: appId, writeData: writeData, deviceId: deviceId}
|
||||
|
||||
tdLogInfo("Mode: debug consumer, appId: %s, serverUrl: %s", c.appId, c.serverUrl)
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *TDDebugConsumer) Add(d Data) error {
|
||||
jsonBytes, err := json.Marshal(d)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var jsonStr string
|
||||
// if properties has includes complex data, SDK need parse time with regular expression
|
||||
if d.IsComplex {
|
||||
jsonStr = parseTime(jsonBytes)
|
||||
} else {
|
||||
jsonStr = string(jsonBytes)
|
||||
}
|
||||
|
||||
tdLogInfo("%v", jsonStr)
|
||||
|
||||
return c.send(jsonStr)
|
||||
}
|
||||
|
||||
func (c *TDDebugConsumer) Flush() error {
|
||||
tdLogInfo("flush data")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TDDebugConsumer) Close() error {
|
||||
tdLogInfo("debug consumer close")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TDDebugConsumer) IsStringent() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *TDDebugConsumer) send(data string) error {
|
||||
var dryRun = "0"
|
||||
if !c.writeData {
|
||||
dryRun = "1"
|
||||
}
|
||||
postData := url.Values{"data": {data}, "appid": {c.appId}, "source": {"server"}, "dryRun": {dryRun}}
|
||||
if len(c.deviceId) > 0 {
|
||||
postData.Add("deviceId", c.deviceId)
|
||||
}
|
||||
resp, err := http.PostForm(c.serverUrl, postData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
body, _ := ioutil.ReadAll(resp.Body)
|
||||
result := map[string]interface{}{}
|
||||
err = json.Unmarshal(body, &result)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if uint64(result["errorLevel"].(float64)) != 0 {
|
||||
msg := fmt.Sprintf("send to receiver failed with return content: %s", string(body))
|
||||
tdLogError(msg)
|
||||
return errors.New(msg)
|
||||
} else {
|
||||
tdLogInfo("send success: %v", result)
|
||||
}
|
||||
} else {
|
||||
return errors.New(fmt.Sprintf("Unexpected Status Code: %d", resp.StatusCode))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
256
src/server/thinkingdata/consumer_log.go
Normal file
256
src/server/thinkingdata/consumer_log.go
Normal file
@ -0,0 +1,256 @@
|
||||
package thinkingdata
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type RotateMode int32
|
||||
|
||||
const (
|
||||
DefaultChannelSize = 1000 // channel size
|
||||
ROTATE_DAILY RotateMode = 0 // by the day
|
||||
ROTATE_HOURLY RotateMode = 1 // by the hour
|
||||
)
|
||||
|
||||
// TDLogConsumer write data to file, it works with LogBus
|
||||
type TDLogConsumer struct {
|
||||
directory string // directory of log file
|
||||
dateFormat string // name format of log file
|
||||
fileSize int64 // max size of single log file (MByte)
|
||||
fileNamePrefix string // prefix of log file
|
||||
currentFile *os.File // current file handler
|
||||
wg sync.WaitGroup
|
||||
ch chan []byte
|
||||
mutex *sync.RWMutex
|
||||
sdkClose bool
|
||||
}
|
||||
|
||||
type TDLogConsumerConfig struct {
|
||||
Directory string // directory of log file
|
||||
RotateMode RotateMode // rotate mode of log file
|
||||
FileSize int // max size of single log file (MByte)
|
||||
FileNamePrefix string // prefix of log file
|
||||
ChannelSize int
|
||||
}
|
||||
|
||||
func NewLogConsumer(directory string, r RotateMode) (TDConsumer, error) {
|
||||
return NewLogConsumerWithFileSize(directory, r, 0)
|
||||
}
|
||||
|
||||
// NewLogConsumerWithFileSize init TDLogConsumer
|
||||
// directory: directory of log file
|
||||
// r: rotate mode of log file. (in days / hours)
|
||||
// size: max size of single log file (MByte)
|
||||
func NewLogConsumerWithFileSize(directory string, r RotateMode, size int) (TDConsumer, error) {
|
||||
config := TDLogConsumerConfig{
|
||||
Directory: directory,
|
||||
RotateMode: r,
|
||||
FileSize: size,
|
||||
}
|
||||
return NewLogConsumerWithConfig(config)
|
||||
}
|
||||
|
||||
func NewLogConsumerWithConfig(config TDLogConsumerConfig) (TDConsumer, error) {
|
||||
var df string
|
||||
switch config.RotateMode {
|
||||
case ROTATE_DAILY:
|
||||
df = "2006-01-02"
|
||||
case ROTATE_HOURLY:
|
||||
df = "2006-01-02-15"
|
||||
default:
|
||||
errStr := "unknown rotate mode"
|
||||
tdLogInfo(errStr)
|
||||
return nil, errors.New(errStr)
|
||||
}
|
||||
|
||||
chanSize := DefaultChannelSize
|
||||
if config.ChannelSize > 0 {
|
||||
chanSize = config.ChannelSize
|
||||
}
|
||||
|
||||
c := &TDLogConsumer{
|
||||
directory: config.Directory,
|
||||
dateFormat: df,
|
||||
fileSize: int64(config.FileSize * 1024 * 1024),
|
||||
fileNamePrefix: config.FileNamePrefix,
|
||||
wg: sync.WaitGroup{},
|
||||
ch: make(chan []byte, chanSize),
|
||||
mutex: new(sync.RWMutex),
|
||||
sdkClose: false,
|
||||
}
|
||||
|
||||
return c, c.init()
|
||||
}
|
||||
|
||||
func (c *TDLogConsumer) Add(d Data) error {
|
||||
var err error = nil
|
||||
c.mutex.Lock()
|
||||
defer func() {
|
||||
c.mutex.Unlock()
|
||||
}()
|
||||
if c.sdkClose {
|
||||
err = errors.New("add event failed, SDK has been closed")
|
||||
tdLogError(err.Error())
|
||||
} else {
|
||||
jsonBytes, jsonErr := json.Marshal(d)
|
||||
if jsonErr != nil {
|
||||
err = jsonErr
|
||||
} else {
|
||||
c.ch <- jsonBytes
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TDLogConsumer) Flush() error {
|
||||
tdLogInfo("flush data")
|
||||
var err error = nil
|
||||
c.mutex.Lock()
|
||||
if c.currentFile != nil {
|
||||
err = c.currentFile.Sync()
|
||||
}
|
||||
c.mutex.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TDLogConsumer) Close() error {
|
||||
tdLogInfo("log consumer close")
|
||||
|
||||
var err error = nil
|
||||
c.mutex.Lock()
|
||||
if c.sdkClose {
|
||||
err = errors.New("[ThinkingData][error]: SDK has been closed")
|
||||
} else {
|
||||
close(c.ch)
|
||||
c.wg.Wait()
|
||||
if c.currentFile != nil {
|
||||
_ = c.currentFile.Sync()
|
||||
err = c.currentFile.Close()
|
||||
c.currentFile = nil
|
||||
}
|
||||
}
|
||||
c.sdkClose = true
|
||||
c.mutex.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TDLogConsumer) IsStringent() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *TDLogConsumer) constructFileName(timeStr string, i int) string {
|
||||
fileNamePrefix := ""
|
||||
if len(c.fileNamePrefix) != 0 {
|
||||
fileNamePrefix = c.fileNamePrefix + "."
|
||||
}
|
||||
// is need paging
|
||||
if c.fileSize > 0 {
|
||||
return fmt.Sprintf("%s/%slog.%s_%d", c.directory, fileNamePrefix, timeStr, i)
|
||||
} else {
|
||||
return fmt.Sprintf("%s/%slog.%s", c.directory, fileNamePrefix, timeStr)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *TDLogConsumer) init() error {
|
||||
fd, err := c.initLogFile()
|
||||
if err != nil {
|
||||
tdLogError("init log file failed: %s", err)
|
||||
return err
|
||||
}
|
||||
c.currentFile = fd
|
||||
|
||||
c.wg.Add(1)
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
c.wg.Done()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case rec, ok := <-c.ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
jsonStr := parseTime(rec)
|
||||
tdLogInfo("write event data: %s", jsonStr)
|
||||
c.writeToFile(jsonStr)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
tdLogInfo("Mode: log consumer, log path: " + c.directory)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TDLogConsumer) initLogFile() (*os.File, error) {
|
||||
_, err := os.Stat(c.directory)
|
||||
if err != nil && os.IsNotExist(err) {
|
||||
e := os.MkdirAll(c.directory, os.ModePerm)
|
||||
if e != nil {
|
||||
return nil, e
|
||||
}
|
||||
}
|
||||
timeStr := time.Now().Format(c.dateFormat)
|
||||
return os.OpenFile(c.constructFileName(timeStr, 0), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664)
|
||||
}
|
||||
|
||||
var logFileIndex = 0
|
||||
|
||||
func (c *TDLogConsumer) writeToFile(str string) {
|
||||
timeStr := time.Now().Format(c.dateFormat)
|
||||
// paging by Rotate Mode and current file size
|
||||
var newName string
|
||||
fName := c.constructFileName(timeStr, logFileIndex)
|
||||
|
||||
if c.currentFile == nil {
|
||||
var openFileErr error
|
||||
c.currentFile, openFileErr = os.OpenFile(fName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664)
|
||||
if openFileErr != nil {
|
||||
tdLogInfo("open log file failed: %s\n", openFileErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if c.currentFile.Name() != fName {
|
||||
newName = fName
|
||||
} else if c.fileSize > 0 {
|
||||
stat, _ := c.currentFile.Stat()
|
||||
if stat.Size() > c.fileSize {
|
||||
logFileIndex++
|
||||
newName = c.constructFileName(timeStr, logFileIndex)
|
||||
}
|
||||
}
|
||||
if newName != "" {
|
||||
err := c.currentFile.Close()
|
||||
if err != nil {
|
||||
tdLogInfo("close file failed: %s\n", err)
|
||||
return
|
||||
}
|
||||
c.currentFile, err = os.OpenFile(fName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664)
|
||||
if err != nil {
|
||||
tdLogInfo("rotate log file failed: %s\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
_, err := fmt.Fprintln(c.currentFile, str)
|
||||
if err != nil {
|
||||
tdLogInfo("LoggerWriter(%q): %s\n", c.currentFile.Name(), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Deprecated: please use TDLogConsumer
|
||||
type LogConsumer struct {
|
||||
TDLogConsumer
|
||||
}
|
||||
|
||||
// Deprecated: please use TDLogConsumerConfig
|
||||
type LogConfig struct {
|
||||
TDLogConsumerConfig
|
||||
}
|
||||
125
src/server/thinkingdata/td_log.go
Normal file
125
src/server/thinkingdata/td_log.go
Normal file
@ -0,0 +1,125 @@
|
||||
package thinkingdata
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
// SDK_LOG_PREFIX const
|
||||
const SDK_LOG_PREFIX = "[ThinkingData]"
|
||||
|
||||
var logInstance TDLogger
|
||||
|
||||
type TDLogLevel int32
|
||||
|
||||
const (
|
||||
TDLogLevelOff TDLogLevel = 1
|
||||
TDLogLevelError TDLogLevel = 2
|
||||
TDLogLevelWarning TDLogLevel = 3
|
||||
TDLogLevelInfo TDLogLevel = 4
|
||||
TDLogLevelDebug TDLogLevel = 5
|
||||
)
|
||||
|
||||
// default is TDLogLevelOff
|
||||
var currentLogLevel = TDLogLevelOff
|
||||
|
||||
// TDLogger User-defined log classes must comply with interface
|
||||
type TDLogger interface {
|
||||
Print(message string)
|
||||
}
|
||||
|
||||
// SetLogLevel Set the log output level
|
||||
func SetLogLevel(level TDLogLevel) {
|
||||
if level < TDLogLevelOff || level > TDLogLevelDebug {
|
||||
fmt.Println(SDK_LOG_PREFIX + "log type error")
|
||||
return
|
||||
} else {
|
||||
currentLogLevel = level
|
||||
}
|
||||
}
|
||||
|
||||
// SetCustomLogger Set a custom log input class, usually you don't need to set it up.
|
||||
func SetCustomLogger(logger TDLogger) {
|
||||
if logger != nil {
|
||||
logInstance = logger
|
||||
}
|
||||
}
|
||||
|
||||
func tdLog(level TDLogLevel, format string, v ...interface{}) {
|
||||
if level > currentLogLevel {
|
||||
return
|
||||
}
|
||||
|
||||
var modeStr string
|
||||
switch level {
|
||||
case TDLogLevelError:
|
||||
modeStr = "[Error] "
|
||||
break
|
||||
case TDLogLevelWarning:
|
||||
modeStr = "[Warning] "
|
||||
break
|
||||
case TDLogLevelInfo:
|
||||
modeStr = "[Info] "
|
||||
break
|
||||
case TDLogLevelDebug:
|
||||
modeStr = "[Debug] "
|
||||
break
|
||||
default:
|
||||
modeStr = "[Info] "
|
||||
break
|
||||
}
|
||||
|
||||
if logInstance != nil {
|
||||
msg := fmt.Sprintf(SDK_LOG_PREFIX+modeStr+format+"\n", v...)
|
||||
logInstance.Print(msg)
|
||||
} else {
|
||||
logTime := fmt.Sprintf("[%v]", time.Now().Format("2006-01-02 15:04:05.000"))
|
||||
fmt.Printf(logTime+SDK_LOG_PREFIX+modeStr+format+"\n", v...)
|
||||
}
|
||||
}
|
||||
|
||||
func tdLogDebug(format string, v ...interface{}) {
|
||||
tdLog(TDLogLevelDebug, format, v...)
|
||||
}
|
||||
|
||||
func tdLogInfo(format string, v ...interface{}) {
|
||||
tdLog(TDLogLevelInfo, format, v...)
|
||||
}
|
||||
|
||||
func tdLogError(format string, v ...interface{}) {
|
||||
tdLog(TDLogLevelError, format, v...)
|
||||
}
|
||||
|
||||
func tdLogWarning(format string, v ...interface{}) {
|
||||
tdLog(TDLogLevelWarning, format, v...)
|
||||
}
|
||||
|
||||
// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff)
|
||||
type LogType int32
|
||||
|
||||
// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff)
|
||||
const (
|
||||
LoggerTypeOff LogType = 1 << 0 // disable log
|
||||
LoggerTypePrint LogType = 1 << 1 // print on console
|
||||
LoggerTypeWriteFile LogType = 1 << 2 // print to file
|
||||
LoggerTypePrintAndWriteFile = LoggerTypePrint | LoggerTypeWriteFile // print both on console and file
|
||||
)
|
||||
|
||||
// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff)
|
||||
type LoggerConfig struct {
|
||||
Type LogType
|
||||
Path string
|
||||
}
|
||||
|
||||
// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff)
|
||||
func SetLoggerConfig(config LoggerConfig) {
|
||||
if config.Type < LoggerTypeOff || config.Type > LoggerTypePrintAndWriteFile {
|
||||
fmt.Println(SDK_LOG_PREFIX + "log type error")
|
||||
return
|
||||
}
|
||||
if config.Type&LoggerTypeOff == LoggerTypeOff {
|
||||
currentLogLevel = TDLogLevelOff
|
||||
} else {
|
||||
currentLogLevel = TDLogLevelInfo
|
||||
}
|
||||
}
|
||||
314
src/server/thinkingdata/thinkingdata.go
Normal file
314
src/server/thinkingdata/thinkingdata.go
Normal file
@ -0,0 +1,314 @@
|
||||
package thinkingdata
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
Track = "track"
|
||||
TrackUpdate = "track_update"
|
||||
TrackOverwrite = "track_overwrite"
|
||||
UserSet = "user_set"
|
||||
UserUnset = "user_unset"
|
||||
UserSetOnce = "user_setOnce"
|
||||
UserAdd = "user_add"
|
||||
UserAppend = "user_append"
|
||||
UserUniqAppend = "user_uniq_append"
|
||||
UserDel = "user_del"
|
||||
|
||||
SdkVersion = "2.0.3"
|
||||
LibName = "Golang"
|
||||
)
|
||||
|
||||
type Data struct {
|
||||
IsComplex bool `json:"-"` // properties are nested or not
|
||||
AccountId string `json:"#account_id,omitempty"`
|
||||
DistinctId string `json:"#distinct_id,omitempty"`
|
||||
Type string `json:"#type"`
|
||||
Time string `json:"#time"`
|
||||
Timestamp int64 `json:"#timestamp,omitempty"`
|
||||
EventName string `json:"#event_name,omitempty"`
|
||||
EventId string `json:"#event_id,omitempty"`
|
||||
FirstCheckId string `json:"#first_check_id,omitempty"`
|
||||
Ip string `json:"#ip,omitempty"`
|
||||
UUID string `json:"#uuid,omitempty"`
|
||||
AppId string `json:"#app_id,omitempty"`
|
||||
Properties map[string]interface{} `json:"properties"`
|
||||
PropertiesSummary string `json:"properties_summary,omitempty"`
|
||||
}
|
||||
|
||||
// TDConsumer define operation interface
|
||||
type TDConsumer interface {
|
||||
Add(d Data) error
|
||||
Flush() error
|
||||
Close() error
|
||||
IsStringent() bool // check data or not.
|
||||
}
|
||||
|
||||
type TDAnalytics struct {
|
||||
consumer TDConsumer
|
||||
superProperties map[string]interface{}
|
||||
mutex *sync.RWMutex
|
||||
dynamicSuperProperties func() map[string]interface{}
|
||||
}
|
||||
|
||||
// New init SDK
|
||||
func New(c TDConsumer) TDAnalytics {
|
||||
tdLogInfo("init SDK success")
|
||||
return TDAnalytics{
|
||||
consumer: c,
|
||||
superProperties: make(map[string]interface{}),
|
||||
mutex: new(sync.RWMutex),
|
||||
}
|
||||
}
|
||||
|
||||
// GetSuperProperties get common properties
|
||||
func (ta *TDAnalytics) GetSuperProperties() map[string]interface{} {
|
||||
result := make(map[string]interface{})
|
||||
ta.mutex.Lock()
|
||||
mergeProperties(result, ta.superProperties)
|
||||
ta.mutex.Unlock()
|
||||
return result
|
||||
}
|
||||
|
||||
// SetSuperProperties set common properties
|
||||
func (ta *TDAnalytics) SetSuperProperties(superProperties map[string]interface{}) {
|
||||
ta.mutex.Lock()
|
||||
mergeProperties(ta.superProperties, superProperties)
|
||||
ta.mutex.Unlock()
|
||||
}
|
||||
|
||||
// ClearSuperProperties clear common properties
|
||||
func (ta *TDAnalytics) ClearSuperProperties() {
|
||||
ta.mutex.Lock()
|
||||
ta.superProperties = make(map[string]interface{})
|
||||
ta.mutex.Unlock()
|
||||
}
|
||||
|
||||
// SetDynamicSuperProperties set common properties dynamically.
|
||||
// not recommend to add the operation which with a lot of computation
|
||||
func (ta *TDAnalytics) SetDynamicSuperProperties(action func() map[string]interface{}) {
|
||||
ta.mutex.Lock()
|
||||
ta.dynamicSuperProperties = action
|
||||
ta.mutex.Unlock()
|
||||
}
|
||||
|
||||
// GetDynamicSuperProperties dynamic common properties
|
||||
func (ta *TDAnalytics) GetDynamicSuperProperties() map[string]interface{} {
|
||||
result := make(map[string]interface{})
|
||||
ta.mutex.RLock()
|
||||
if ta.dynamicSuperProperties != nil {
|
||||
mergeProperties(result, ta.dynamicSuperProperties())
|
||||
}
|
||||
ta.mutex.RUnlock()
|
||||
return result
|
||||
}
|
||||
|
||||
// Track report ordinary event
|
||||
func (ta *TDAnalytics) Track(accountId, distinctId, eventName string, properties map[string]interface{}) error {
|
||||
return ta.track(accountId, distinctId, Track, eventName, "", properties)
|
||||
}
|
||||
|
||||
// TrackFirst report first event
|
||||
func (ta *TDAnalytics) TrackFirst(accountId, distinctId, eventName, firstCheckId string, properties map[string]interface{}) error {
|
||||
if len(firstCheckId) == 0 {
|
||||
msg := "the 'firstCheckId' must be provided"
|
||||
tdLogInfo(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
p := make(map[string]interface{})
|
||||
mergeProperties(p, properties)
|
||||
p["#first_check_id"] = firstCheckId
|
||||
return ta.track(accountId, distinctId, Track, eventName, "", p)
|
||||
}
|
||||
|
||||
// TrackUpdate report updatable event
|
||||
func (ta *TDAnalytics) TrackUpdate(accountId, distinctId, eventName, eventId string, properties map[string]interface{}) error {
|
||||
return ta.track(accountId, distinctId, TrackUpdate, eventName, eventId, properties)
|
||||
}
|
||||
|
||||
// TrackOverwrite report overridable event
|
||||
func (ta *TDAnalytics) TrackOverwrite(accountId, distinctId, eventName, eventId string, properties map[string]interface{}) error {
|
||||
return ta.track(accountId, distinctId, TrackOverwrite, eventName, eventId, properties)
|
||||
}
|
||||
|
||||
func (ta *TDAnalytics) track(accountId, distinctId, dataType, eventName, eventId string, properties map[string]interface{}) error {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
tdLogError("%+v\ndata: %+v", r, properties)
|
||||
}
|
||||
}()
|
||||
|
||||
if len(eventName) == 0 {
|
||||
msg := "the event name must be provided"
|
||||
tdLogError(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
|
||||
// eventId not be null unless eventType is equal Track.
|
||||
if len(eventId) == 0 && dataType != Track {
|
||||
msg := "the event id must be provided"
|
||||
tdLogError(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
|
||||
p := ta.GetSuperProperties()
|
||||
dynamicSuperProperties := ta.GetDynamicSuperProperties()
|
||||
|
||||
mergeProperties(p, dynamicSuperProperties)
|
||||
// preset properties has the highest priority
|
||||
p["#lib"] = LibName
|
||||
p["#lib_version"] = SdkVersion
|
||||
// custom properties
|
||||
mergeProperties(p, properties)
|
||||
|
||||
return ta.add(accountId, distinctId, dataType, eventName, eventId, p)
|
||||
}
|
||||
|
||||
// UserSet set user properties. would overwrite existing names.
|
||||
func (ta *TDAnalytics) UserSet(accountId string, distinctId string, properties map[string]interface{}) error {
|
||||
return ta.user(accountId, distinctId, UserSet, properties)
|
||||
}
|
||||
|
||||
// UserUnset clear the user properties of users.
|
||||
func (ta *TDAnalytics) UserUnset(accountId string, distinctId string, s []string) error {
|
||||
if len(s) == 0 {
|
||||
msg := "invalid params for UserUnset: keys is nil"
|
||||
tdLogInfo(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
prop := make(map[string]interface{})
|
||||
for _, v := range s {
|
||||
prop[v] = 0
|
||||
}
|
||||
return ta.user(accountId, distinctId, UserUnset, prop)
|
||||
}
|
||||
|
||||
func (ta *TDAnalytics) UserUnsetWithProperties(accountId string, distinctId string, properties map[string]interface{}) error {
|
||||
if len(properties) == 0 {
|
||||
msg := "invalid params for UserUnset: properties is nil"
|
||||
tdLogInfo(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
return ta.user(accountId, distinctId, UserUnset, properties)
|
||||
}
|
||||
|
||||
// UserSetOnce set user properties, If such property had been set before, this message would be neglected.
|
||||
func (ta *TDAnalytics) UserSetOnce(accountId string, distinctId string, properties map[string]interface{}) error {
|
||||
return ta.user(accountId, distinctId, UserSetOnce, properties)
|
||||
}
|
||||
|
||||
// UserAdd to accumulate operations against the property.
|
||||
func (ta *TDAnalytics) UserAdd(accountId string, distinctId string, properties map[string]interface{}) error {
|
||||
return ta.user(accountId, distinctId, UserAdd, properties)
|
||||
}
|
||||
|
||||
// UserAppend to add user properties of array type.
|
||||
func (ta *TDAnalytics) UserAppend(accountId string, distinctId string, properties map[string]interface{}) error {
|
||||
return ta.user(accountId, distinctId, UserAppend, properties)
|
||||
}
|
||||
|
||||
// UserUniqAppend append user properties to array type by unique.
|
||||
func (ta *TDAnalytics) UserUniqAppend(accountId string, distinctId string, properties map[string]interface{}) error {
|
||||
return ta.user(accountId, distinctId, UserUniqAppend, properties)
|
||||
}
|
||||
|
||||
// UserDelete delete a user, This operation cannot be undone.
|
||||
func (ta *TDAnalytics) UserDelete(accountId string, distinctId string) error {
|
||||
return ta.user(accountId, distinctId, UserDel, nil)
|
||||
}
|
||||
|
||||
// UserDeleteWithProperties delete a user, This operation cannot be undone.
|
||||
func (ta *TDAnalytics) UserDeleteWithProperties(accountId string, distinctId string, properties map[string]interface{}) error {
|
||||
return ta.user(accountId, distinctId, UserDel, properties)
|
||||
}
|
||||
|
||||
func (ta *TDAnalytics) user(accountId, distinctId, dataType string, properties map[string]interface{}) error {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
tdLogError("%+v\ndata: %+v", r, properties)
|
||||
}
|
||||
}()
|
||||
if properties == nil && dataType != UserDel {
|
||||
msg := "invalid params for " + dataType + ": properties is nil"
|
||||
tdLogError(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
p := make(map[string]interface{})
|
||||
mergeProperties(p, properties)
|
||||
return ta.add(accountId, distinctId, dataType, "", "", p)
|
||||
}
|
||||
|
||||
// Flush report data immediately.
|
||||
func (ta *TDAnalytics) Flush() error {
|
||||
return ta.consumer.Flush()
|
||||
}
|
||||
|
||||
// Close and exit sdk
|
||||
func (ta *TDAnalytics) Close() error {
|
||||
err := ta.consumer.Close()
|
||||
tdLogInfo("SDK close")
|
||||
return err
|
||||
}
|
||||
|
||||
func (ta *TDAnalytics) add(accountId, distinctId, dataType, eventName, eventId string, properties map[string]interface{}) error {
|
||||
if len(accountId) == 0 && len(distinctId) == 0 {
|
||||
msg := "invalid parameters: account_id and distinct_id cannot be empty at the same time"
|
||||
tdLogError(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
|
||||
// get "#ip" value in properties, empty string will be return when not found.
|
||||
ip := extractStringProperty(properties, "#ip")
|
||||
|
||||
// get "#app_id" value in properties, empty string will be return when not found.
|
||||
appId := extractStringProperty(properties, "#app_id")
|
||||
|
||||
// get "#time" value in properties, empty string will be return when not found.
|
||||
eventTime := extractTime(properties)
|
||||
|
||||
firstCheckId := extractStringProperty(properties, "#first_check_id")
|
||||
|
||||
// get "#uuid" value in properties, empty string will be return when not found.
|
||||
uuid := extractStringProperty(properties, "#uuid")
|
||||
if len(uuid) == 0 {
|
||||
uuid = generateUUID()
|
||||
}
|
||||
properties_summary, err := json.Marshal(properties)
|
||||
if err != nil {
|
||||
properties_summary = []byte{}
|
||||
}
|
||||
data := Data{
|
||||
AccountId: accountId,
|
||||
DistinctId: distinctId,
|
||||
Type: dataType,
|
||||
Time: eventTime,
|
||||
Timestamp: time.Now().Unix(),
|
||||
EventName: eventName,
|
||||
EventId: eventId,
|
||||
FirstCheckId: firstCheckId,
|
||||
Ip: ip,
|
||||
UUID: uuid,
|
||||
Properties: properties,
|
||||
PropertiesSummary: string(properties_summary),
|
||||
}
|
||||
|
||||
if len(appId) > 0 {
|
||||
data.AppId = appId
|
||||
}
|
||||
|
||||
err = formatProperties(&data, ta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ta.consumer.Add(data)
|
||||
}
|
||||
|
||||
// Deprecated: please use TDConsumer
|
||||
type Consumer interface {
|
||||
TDConsumer
|
||||
}
|
||||
143
src/server/thinkingdata/utils.go
Normal file
143
src/server/thinkingdata/utils.go
Normal file
@ -0,0 +1,143 @@
|
||||
package thinkingdata
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"os"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
DATE_FORMAT = "2006-01-02 15:04:05.000"
|
||||
KEY_PATTERN = "^[a-zA-Z#][A-Za-z0-9_]{0,49}$"
|
||||
)
|
||||
|
||||
// A string of 50 letters and digits that starts with '#' or a letter
|
||||
var keyPattern, _ = regexp.Compile(KEY_PATTERN)
|
||||
|
||||
func mergeProperties(target, source map[string]interface{}) {
|
||||
for k, v := range source {
|
||||
target[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
func extractTime(p map[string]interface{}) string {
|
||||
if t, ok := p["#time"]; ok {
|
||||
delete(p, "#time")
|
||||
switch v := t.(type) {
|
||||
case string:
|
||||
return v
|
||||
case time.Time:
|
||||
return v.Format(DATE_FORMAT)
|
||||
default:
|
||||
return time.Now().Format(DATE_FORMAT)
|
||||
}
|
||||
}
|
||||
|
||||
return time.Now().Format(DATE_FORMAT)
|
||||
}
|
||||
|
||||
func extractStringProperty(p map[string]interface{}, key string) string {
|
||||
if t, ok := p[key]; ok {
|
||||
delete(p, key)
|
||||
v, ok := t.(string)
|
||||
if !ok {
|
||||
fmt.Fprintln(os.Stderr, "Invalid data type for "+key)
|
||||
}
|
||||
return v
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func isNotNumber(v interface{}) bool {
|
||||
switch v.(type) {
|
||||
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
|
||||
case float32, float64:
|
||||
default:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func formatProperties(d *Data, ta *TDAnalytics) error {
|
||||
|
||||
if d.EventName != "" {
|
||||
matched := checkPattern([]byte(d.EventName))
|
||||
if !matched {
|
||||
msg := "invalid event name: " + d.EventName
|
||||
tdLogInfo(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
}
|
||||
|
||||
if d.Properties != nil {
|
||||
for k, v := range d.Properties {
|
||||
if ta.consumer.IsStringent() {
|
||||
isMatch := checkPattern([]byte(k))
|
||||
if !isMatch {
|
||||
msg := "invalid property key: " + k
|
||||
tdLogInfo(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
}
|
||||
|
||||
if d.Type == UserAdd && isNotNumber(v) {
|
||||
msg := "invalid property value: only numbers is supported by UserAdd"
|
||||
tdLogInfo(msg)
|
||||
return errors.New(msg)
|
||||
}
|
||||
|
||||
// check value
|
||||
switch v.(type) {
|
||||
case int:
|
||||
case bool:
|
||||
case float64:
|
||||
case string:
|
||||
case time.Time:
|
||||
d.Properties[k] = v.(time.Time).Format(DATE_FORMAT)
|
||||
case []string:
|
||||
d.IsComplex = true
|
||||
default:
|
||||
d.IsComplex = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func isNotArrayOrSlice(v interface{}) bool {
|
||||
typeOf := reflect.TypeOf(v)
|
||||
switch typeOf.Kind() {
|
||||
case reflect.Array:
|
||||
case reflect.Slice:
|
||||
default:
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func checkPattern(name []byte) bool {
|
||||
return keyPattern.Match(name)
|
||||
}
|
||||
|
||||
func parseTime(input []byte) string {
|
||||
var re = regexp.MustCompile(`"((\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2})(?:\.(\d{3}))\d*)(Z|[\+-]\d{2}:\d{2})"`)
|
||||
var substitution = "\"$2 $3.$4\""
|
||||
|
||||
for re.Match(input) {
|
||||
input = re.ReplaceAll(input, []byte(substitution))
|
||||
}
|
||||
return string(input)
|
||||
}
|
||||
|
||||
func generateUUID() string {
|
||||
newUUID, err := uuid.NewUUID()
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return newUUID.String()
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user