diff --git a/src/server/game/player_data.go b/src/server/game/player_data.go index 9cc3bda4..9dbe0acc 100644 --- a/src/server/game/player_data.go +++ b/src/server/game/player_data.go @@ -27,6 +27,7 @@ import ( "server/game/mod/quest" GoUtil "server/game_util" "server/msg" + telog "server/thinkdata" "strconv" "sync" "time" @@ -1061,7 +1062,7 @@ func (p *Player) TeLog(Type string, Param map[string]interface{}) { } //Param["#zone_offset"] = -5 // 游戏内TE日志 - //go telog.Te.Track(p.GetPlayerBaseMod().GetName(), p.GetPlayerBaseMod().GetName(), Type, Param) + go telog.Te.Track(p.GetPlayerBaseMod().GetName(), p.GetPlayerBaseMod().GetName(), Type, Param) BaseMod := p.PlayMod.getBaseMod() //途游GA go ga.GAlogEvent(Type, BaseMod.Account, "", Param) diff --git a/src/server/thinkdata/te.go b/src/server/thinkdata/te.go index 76734023..a4c0f84b 100644 --- a/src/server/thinkdata/te.go +++ b/src/server/thinkdata/te.go @@ -2,8 +2,7 @@ package telog import ( "server/conf" - - "github.com/ThinkingDataAnalytics/go-sdk/v2/src/thinkingdata" + "server/thinkingdata" ) var Te thinkingdata.TDAnalytics diff --git a/src/server/thinkingdata/consumer_batch.go b/src/server/thinkingdata/consumer_batch.go new file mode 100644 index 00000000..8d559121 --- /dev/null +++ b/src/server/thinkingdata/consumer_batch.go @@ -0,0 +1,352 @@ +package thinkingdata + +import ( + "bytes" + "compress/gzip" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" + "strconv" + "strings" + "sync" + "time" +) + +// TDBatchConsumer upload data to TE by http +type TDBatchConsumer struct { + serverUrl string // serverUrl + appId string // appId + timeout time.Duration // http timeout (mill second) + compress bool // is need compress + bufferMutex *sync.RWMutex + cacheMutex *sync.RWMutex // cache mutex + + buffer []Data + batchSize int // flush event count each time + cacheBuffer [][]Data // buffer + cacheCapacity int // buffer max count +} + +type TDBatchConfig struct { + ServerUrl string // serverUrl + AppId string // appId + BatchSize int // flush event count each time + Timeout int // http timeout (mill second) + Compress bool // enable compress data + AutoFlush bool // enable auto flush + Interval int // auto flush spacing (second) + CacheCapacity int // cache event count +} + +const ( + DefaultTimeOut = 30000 + DefaultBatchSize = 20 + MaxBatchSize = 200 + DefaultInterval = 30 + DefaultCacheCapacity = 50 +) + +// NewBatchConsumer create TDBatchConsumer +func NewBatchConsumer(serverUrl string, appId string) (TDConsumer, error) { + config := TDBatchConfig{ + ServerUrl: serverUrl, + AppId: appId, + Compress: true, + } + return initBatchConsumer(config) +} + +// NewBatchConsumerWithBatchSize create TDBatchConsumer +// serverUrl +// appId +// batchSize: flush event count each time +func NewBatchConsumerWithBatchSize(serverUrl string, appId string, batchSize int) (TDConsumer, error) { + config := TDBatchConfig{ + ServerUrl: serverUrl, + AppId: appId, + Compress: true, + BatchSize: batchSize, + } + return initBatchConsumer(config) +} + +// NewBatchConsumerWithCompress create TDBatchConsumer +// serverUrl +// appId +// compress: enable data compress +func NewBatchConsumerWithCompress(serverUrl string, appId string, compress bool) (TDConsumer, error) { + config := TDBatchConfig{ + ServerUrl: serverUrl, + AppId: appId, + Compress: compress, + } + return initBatchConsumer(config) +} + +func NewBatchConsumerWithConfig(config TDBatchConfig) (TDConsumer, error) { + return initBatchConsumer(config) +} + +func initBatchConsumer(config TDBatchConfig) (TDConsumer, error) { + if config.ServerUrl == "" { + msg := fmt.Sprint("ServerUrl not be empty") + tdLogInfo(msg) + return nil, errors.New(msg) + } + u, err := url.Parse(config.ServerUrl) + if err != nil { + return nil, err + } + u.Path = "/sync_server" + + var batchSize int + if config.BatchSize > MaxBatchSize { + batchSize = MaxBatchSize + } else if config.BatchSize <= 0 { + batchSize = DefaultBatchSize + } else { + batchSize = config.BatchSize + } + + var cacheCapacity int + if config.CacheCapacity <= 0 { + cacheCapacity = DefaultCacheCapacity + } else { + cacheCapacity = config.CacheCapacity + } + + var timeout int + if config.Timeout == 0 { + timeout = DefaultTimeOut + } else { + timeout = config.Timeout + } + + c := &TDBatchConsumer{ + serverUrl: u.String(), + appId: config.AppId, + timeout: time.Duration(timeout) * time.Millisecond, + compress: config.Compress, + bufferMutex: new(sync.RWMutex), + cacheMutex: new(sync.RWMutex), + batchSize: batchSize, + buffer: make([]Data, 0, batchSize), + cacheCapacity: cacheCapacity, + cacheBuffer: make([][]Data, 0, cacheCapacity), + } + + var interval int + if config.Interval == 0 { + interval = DefaultInterval + } else { + interval = config.Interval + } + if config.AutoFlush { + go func() { + ticker := time.NewTicker(time.Duration(interval) * time.Second) + defer ticker.Stop() + for { + <-ticker.C + _ = c.timerFlush() + } + }() + } + + tdLogInfo("Mode: batch consumer, appId: %s, serverUrl: %s", c.appId, c.serverUrl) + + return c, nil +} + +func (c *TDBatchConsumer) Add(d Data) error { + c.bufferMutex.Lock() + c.buffer = append(c.buffer, d) + c.bufferMutex.Unlock() + + tdLogInfo("Enqueue event data: %v", d) + + if c.getBufferLength() >= c.batchSize || c.getCacheLength() > 0 { + err := c.Flush() + return err + } + + return nil +} + +func (c *TDBatchConsumer) timerFlush() error { + tdLogInfo("timer flush data") + return c.innerFlush() +} + +func (c *TDBatchConsumer) Flush() error { + tdLogInfo("flush data") + return c.innerFlush() +} + +func (c *TDBatchConsumer) innerFlush() error { + + c.cacheMutex.Lock() + defer c.cacheMutex.Unlock() + + c.bufferMutex.Lock() + defer c.bufferMutex.Unlock() + + if len(c.buffer) == 0 && len(c.cacheBuffer) == 0 { + return nil + } + + defer func() { + if len(c.cacheBuffer) > c.cacheCapacity { + c.cacheBuffer = c.cacheBuffer[1:] + } + }() + + if len(c.cacheBuffer) == 0 || len(c.buffer) >= c.batchSize { + c.cacheBuffer = append(c.cacheBuffer, c.buffer) + c.buffer = make([]Data, 0, c.batchSize) + } + + err := c.uploadEvents() + + return err +} + +func (c *TDBatchConsumer) uploadEvents() error { + buffer := c.cacheBuffer[0] + + jsonBytes, err := json.Marshal(buffer) + if err == nil { + params := parseTime(jsonBytes) + for i := 0; i < 3; i++ { + statusCode, code, err := c.send(params, len(buffer)) + if statusCode == 200 { + c.cacheBuffer = c.cacheBuffer[1:] + switch code { + case 0: + tdLogInfo("send success: %v", params) + return nil + case 1, -1: + msg := "invalid data format" + tdLogError(msg) + return fmt.Errorf(msg) + case -2: + msg := "APP ID doesn't exist" + tdLogError(msg) + return fmt.Errorf(msg) + case -3: + msg := "invalid ip transmission" + tdLogError(msg) + return fmt.Errorf(msg) + default: + msg := "unknown error" + tdLogError(msg) + return fmt.Errorf(msg) + } + } + if err != nil { + if i == 2 { + return err + } + } + } + } + return err +} + +func (c *TDBatchConsumer) FlushAll() error { + for c.getCacheLength() > 0 || c.getBufferLength() > 0 { + if err := c.Flush(); err != nil { + if !strings.Contains(err.Error(), "ThinkingDataError") { + return err + } + } + } + return nil +} + +func (c *TDBatchConsumer) Close() error { + tdLogInfo("batch consumer close") + return c.FlushAll() +} + +func (c *TDBatchConsumer) IsStringent() bool { + return false +} + +func (c *TDBatchConsumer) send(data string, size int) (statusCode int, code int, err error) { + var encodedData string + var compressType = "gzip" + if c.compress { + encodedData, err = encodeData(data) + } else { + encodedData = data + compressType = "none" + } + if err != nil { + return 0, 0, err + } + postData := bytes.NewBufferString(encodedData) + + var resp *http.Response + req, _ := http.NewRequest("POST", c.serverUrl, postData) + req.Header["appid"] = []string{c.appId} + req.Header.Set("user-agent", "ta-go-sdk") + req.Header.Set("version", SdkVersion) + req.Header.Set("compress", compressType) + req.Header["TA-Integration-Type"] = []string{LibName} + req.Header["TA-Integration-Version"] = []string{SdkVersion} + req.Header["TA-Integration-Count"] = []string{strconv.Itoa(size)} + client := &http.Client{Timeout: c.timeout} + resp, err = client.Do(req) + + if err != nil { + return 0, 0, err + } + + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + var result struct { + Code int + } + + err = json.Unmarshal(body, &result) + if err != nil { + return resp.StatusCode, 1, err + } + + return resp.StatusCode, result.Code, nil + } else { + return resp.StatusCode, -1, nil + } +} + +// Gzip +func encodeData(data string) (string, error) { + var buf bytes.Buffer + gw := gzip.NewWriter(&buf) + + _, err := gw.Write([]byte(data)) + if err != nil { + gw.Close() + return "", err + } + gw.Close() + + return string(buf.Bytes()), nil +} + +func (c *TDBatchConsumer) getBufferLength() int { + c.bufferMutex.RLock() + defer c.bufferMutex.RUnlock() + return len(c.buffer) +} + +func (c *TDBatchConsumer) getCacheLength() int { + c.cacheMutex.RLock() + defer c.cacheMutex.RUnlock() + return len(c.cacheBuffer) +} diff --git a/src/server/thinkingdata/consumer_debug.go b/src/server/thinkingdata/consumer_debug.go new file mode 100644 index 00000000..2e7726de --- /dev/null +++ b/src/server/thinkingdata/consumer_debug.go @@ -0,0 +1,120 @@ +package thinkingdata + +import ( + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "net/url" +) + +// TDDebugConsumer The data is reported one by one, and when an error occurs, the log will be printed on the console. +type TDDebugConsumer struct { + serverUrl string // serverUrl + appId string // appId + writeData bool // is archive to TE + deviceId string // be used to debug in TE +} + +// NewDebugConsumer init TDDebugConsumer +func NewDebugConsumer(serverUrl string, appId string) (TDConsumer, error) { + return NewDebugConsumerWithWriter(serverUrl, appId, true) +} + +func NewDebugConsumerWithWriter(serverUrl string, appId string, writeData bool) (TDConsumer, error) { + return NewDebugConsumerWithDeviceId(serverUrl, appId, writeData, "") +} + +func NewDebugConsumerWithDeviceId(serverUrl string, appId string, writeData bool, deviceId string) (TDConsumer, error) { + // enable console log + SetLogLevel(TDLogLevelDebug) + + if len(serverUrl) <= 0 { + msg := fmt.Sprint("ServerUrl not be empty") + tdLogError(msg) + return nil, errors.New(msg) + } + + u, err := url.Parse(serverUrl) + if err != nil { + return nil, err + } + + u.Path = "/data_debug" + + c := &TDDebugConsumer{serverUrl: u.String(), appId: appId, writeData: writeData, deviceId: deviceId} + + tdLogInfo("Mode: debug consumer, appId: %s, serverUrl: %s", c.appId, c.serverUrl) + + return c, nil +} + +func (c *TDDebugConsumer) Add(d Data) error { + jsonBytes, err := json.Marshal(d) + if err != nil { + return err + } + + var jsonStr string + // if properties has includes complex data, SDK need parse time with regular expression + if d.IsComplex { + jsonStr = parseTime(jsonBytes) + } else { + jsonStr = string(jsonBytes) + } + + tdLogInfo("%v", jsonStr) + + return c.send(jsonStr) +} + +func (c *TDDebugConsumer) Flush() error { + tdLogInfo("flush data") + return nil +} + +func (c *TDDebugConsumer) Close() error { + tdLogInfo("debug consumer close") + return nil +} + +func (c *TDDebugConsumer) IsStringent() bool { + return true +} + +func (c *TDDebugConsumer) send(data string) error { + var dryRun = "0" + if !c.writeData { + dryRun = "1" + } + postData := url.Values{"data": {data}, "appid": {c.appId}, "source": {"server"}, "dryRun": {dryRun}} + if len(c.deviceId) > 0 { + postData.Add("deviceId", c.deviceId) + } + resp, err := http.PostForm(c.serverUrl, postData) + if err != nil { + return err + } + + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + body, _ := ioutil.ReadAll(resp.Body) + result := map[string]interface{}{} + err = json.Unmarshal(body, &result) + if err != nil { + return err + } + if uint64(result["errorLevel"].(float64)) != 0 { + msg := fmt.Sprintf("send to receiver failed with return content: %s", string(body)) + tdLogError(msg) + return errors.New(msg) + } else { + tdLogInfo("send success: %v", result) + } + } else { + return errors.New(fmt.Sprintf("Unexpected Status Code: %d", resp.StatusCode)) + } + return nil +} diff --git a/src/server/thinkingdata/consumer_log.go b/src/server/thinkingdata/consumer_log.go new file mode 100644 index 00000000..e39f45b3 --- /dev/null +++ b/src/server/thinkingdata/consumer_log.go @@ -0,0 +1,256 @@ +package thinkingdata + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "sync" + "time" +) + +type RotateMode int32 + +const ( + DefaultChannelSize = 1000 // channel size + ROTATE_DAILY RotateMode = 0 // by the day + ROTATE_HOURLY RotateMode = 1 // by the hour +) + +// TDLogConsumer write data to file, it works with LogBus +type TDLogConsumer struct { + directory string // directory of log file + dateFormat string // name format of log file + fileSize int64 // max size of single log file (MByte) + fileNamePrefix string // prefix of log file + currentFile *os.File // current file handler + wg sync.WaitGroup + ch chan []byte + mutex *sync.RWMutex + sdkClose bool +} + +type TDLogConsumerConfig struct { + Directory string // directory of log file + RotateMode RotateMode // rotate mode of log file + FileSize int // max size of single log file (MByte) + FileNamePrefix string // prefix of log file + ChannelSize int +} + +func NewLogConsumer(directory string, r RotateMode) (TDConsumer, error) { + return NewLogConsumerWithFileSize(directory, r, 0) +} + +// NewLogConsumerWithFileSize init TDLogConsumer +// directory: directory of log file +// r: rotate mode of log file. (in days / hours) +// size: max size of single log file (MByte) +func NewLogConsumerWithFileSize(directory string, r RotateMode, size int) (TDConsumer, error) { + config := TDLogConsumerConfig{ + Directory: directory, + RotateMode: r, + FileSize: size, + } + return NewLogConsumerWithConfig(config) +} + +func NewLogConsumerWithConfig(config TDLogConsumerConfig) (TDConsumer, error) { + var df string + switch config.RotateMode { + case ROTATE_DAILY: + df = "2006-01-02" + case ROTATE_HOURLY: + df = "2006-01-02-15" + default: + errStr := "unknown rotate mode" + tdLogInfo(errStr) + return nil, errors.New(errStr) + } + + chanSize := DefaultChannelSize + if config.ChannelSize > 0 { + chanSize = config.ChannelSize + } + + c := &TDLogConsumer{ + directory: config.Directory, + dateFormat: df, + fileSize: int64(config.FileSize * 1024 * 1024), + fileNamePrefix: config.FileNamePrefix, + wg: sync.WaitGroup{}, + ch: make(chan []byte, chanSize), + mutex: new(sync.RWMutex), + sdkClose: false, + } + + return c, c.init() +} + +func (c *TDLogConsumer) Add(d Data) error { + var err error = nil + c.mutex.Lock() + defer func() { + c.mutex.Unlock() + }() + if c.sdkClose { + err = errors.New("add event failed, SDK has been closed") + tdLogError(err.Error()) + } else { + jsonBytes, jsonErr := json.Marshal(d) + if jsonErr != nil { + err = jsonErr + } else { + c.ch <- jsonBytes + } + } + return err +} + +func (c *TDLogConsumer) Flush() error { + tdLogInfo("flush data") + var err error = nil + c.mutex.Lock() + if c.currentFile != nil { + err = c.currentFile.Sync() + } + c.mutex.Unlock() + return err +} + +func (c *TDLogConsumer) Close() error { + tdLogInfo("log consumer close") + + var err error = nil + c.mutex.Lock() + if c.sdkClose { + err = errors.New("[ThinkingData][error]: SDK has been closed") + } else { + close(c.ch) + c.wg.Wait() + if c.currentFile != nil { + _ = c.currentFile.Sync() + err = c.currentFile.Close() + c.currentFile = nil + } + } + c.sdkClose = true + c.mutex.Unlock() + return err +} + +func (c *TDLogConsumer) IsStringent() bool { + return false +} + +func (c *TDLogConsumer) constructFileName(timeStr string, i int) string { + fileNamePrefix := "" + if len(c.fileNamePrefix) != 0 { + fileNamePrefix = c.fileNamePrefix + "." + } + // is need paging + if c.fileSize > 0 { + return fmt.Sprintf("%s/%slog.%s_%d", c.directory, fileNamePrefix, timeStr, i) + } else { + return fmt.Sprintf("%s/%slog.%s", c.directory, fileNamePrefix, timeStr) + } +} + +func (c *TDLogConsumer) init() error { + fd, err := c.initLogFile() + if err != nil { + tdLogError("init log file failed: %s", err) + return err + } + c.currentFile = fd + + c.wg.Add(1) + + go func() { + defer func() { + c.wg.Done() + }() + for { + select { + case rec, ok := <-c.ch: + if !ok { + return + } + jsonStr := parseTime(rec) + tdLogInfo("write event data: %s", jsonStr) + c.writeToFile(jsonStr) + } + } + }() + + tdLogInfo("Mode: log consumer, log path: " + c.directory) + + return nil +} + +func (c *TDLogConsumer) initLogFile() (*os.File, error) { + _, err := os.Stat(c.directory) + if err != nil && os.IsNotExist(err) { + e := os.MkdirAll(c.directory, os.ModePerm) + if e != nil { + return nil, e + } + } + timeStr := time.Now().Format(c.dateFormat) + return os.OpenFile(c.constructFileName(timeStr, 0), os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664) +} + +var logFileIndex = 0 + +func (c *TDLogConsumer) writeToFile(str string) { + timeStr := time.Now().Format(c.dateFormat) + // paging by Rotate Mode and current file size + var newName string + fName := c.constructFileName(timeStr, logFileIndex) + + if c.currentFile == nil { + var openFileErr error + c.currentFile, openFileErr = os.OpenFile(fName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664) + if openFileErr != nil { + tdLogInfo("open log file failed: %s\n", openFileErr) + return + } + } + + if c.currentFile.Name() != fName { + newName = fName + } else if c.fileSize > 0 { + stat, _ := c.currentFile.Stat() + if stat.Size() > c.fileSize { + logFileIndex++ + newName = c.constructFileName(timeStr, logFileIndex) + } + } + if newName != "" { + err := c.currentFile.Close() + if err != nil { + tdLogInfo("close file failed: %s\n", err) + return + } + c.currentFile, err = os.OpenFile(fName, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0664) + if err != nil { + tdLogInfo("rotate log file failed: %s\n", err) + return + } + } + _, err := fmt.Fprintln(c.currentFile, str) + if err != nil { + tdLogInfo("LoggerWriter(%q): %s\n", c.currentFile.Name(), err) + return + } +} + +// Deprecated: please use TDLogConsumer +type LogConsumer struct { + TDLogConsumer +} + +// Deprecated: please use TDLogConsumerConfig +type LogConfig struct { + TDLogConsumerConfig +} diff --git a/src/server/thinkingdata/td_log.go b/src/server/thinkingdata/td_log.go new file mode 100644 index 00000000..a35ec036 --- /dev/null +++ b/src/server/thinkingdata/td_log.go @@ -0,0 +1,125 @@ +package thinkingdata + +import ( + "fmt" + "time" +) + +// SDK_LOG_PREFIX const +const SDK_LOG_PREFIX = "[ThinkingData]" + +var logInstance TDLogger + +type TDLogLevel int32 + +const ( + TDLogLevelOff TDLogLevel = 1 + TDLogLevelError TDLogLevel = 2 + TDLogLevelWarning TDLogLevel = 3 + TDLogLevelInfo TDLogLevel = 4 + TDLogLevelDebug TDLogLevel = 5 +) + +// default is TDLogLevelOff +var currentLogLevel = TDLogLevelOff + +// TDLogger User-defined log classes must comply with interface +type TDLogger interface { + Print(message string) +} + +// SetLogLevel Set the log output level +func SetLogLevel(level TDLogLevel) { + if level < TDLogLevelOff || level > TDLogLevelDebug { + fmt.Println(SDK_LOG_PREFIX + "log type error") + return + } else { + currentLogLevel = level + } +} + +// SetCustomLogger Set a custom log input class, usually you don't need to set it up. +func SetCustomLogger(logger TDLogger) { + if logger != nil { + logInstance = logger + } +} + +func tdLog(level TDLogLevel, format string, v ...interface{}) { + if level > currentLogLevel { + return + } + + var modeStr string + switch level { + case TDLogLevelError: + modeStr = "[Error] " + break + case TDLogLevelWarning: + modeStr = "[Warning] " + break + case TDLogLevelInfo: + modeStr = "[Info] " + break + case TDLogLevelDebug: + modeStr = "[Debug] " + break + default: + modeStr = "[Info] " + break + } + + if logInstance != nil { + msg := fmt.Sprintf(SDK_LOG_PREFIX+modeStr+format+"\n", v...) + logInstance.Print(msg) + } else { + logTime := fmt.Sprintf("[%v]", time.Now().Format("2006-01-02 15:04:05.000")) + fmt.Printf(logTime+SDK_LOG_PREFIX+modeStr+format+"\n", v...) + } +} + +func tdLogDebug(format string, v ...interface{}) { + tdLog(TDLogLevelDebug, format, v...) +} + +func tdLogInfo(format string, v ...interface{}) { + tdLog(TDLogLevelInfo, format, v...) +} + +func tdLogError(format string, v ...interface{}) { + tdLog(TDLogLevelError, format, v...) +} + +func tdLogWarning(format string, v ...interface{}) { + tdLog(TDLogLevelWarning, format, v...) +} + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +type LogType int32 + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +const ( + LoggerTypeOff LogType = 1 << 0 // disable log + LoggerTypePrint LogType = 1 << 1 // print on console + LoggerTypeWriteFile LogType = 1 << 2 // print to file + LoggerTypePrintAndWriteFile = LoggerTypePrint | LoggerTypeWriteFile // print both on console and file +) + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +type LoggerConfig struct { + Type LogType + Path string +} + +// Deprecated: please use thinkingdata.SetLogLevel(thinkingdata.TDLogLevelOff) +func SetLoggerConfig(config LoggerConfig) { + if config.Type < LoggerTypeOff || config.Type > LoggerTypePrintAndWriteFile { + fmt.Println(SDK_LOG_PREFIX + "log type error") + return + } + if config.Type&LoggerTypeOff == LoggerTypeOff { + currentLogLevel = TDLogLevelOff + } else { + currentLogLevel = TDLogLevelInfo + } +} diff --git a/src/server/thinkingdata/thinkingdata.go b/src/server/thinkingdata/thinkingdata.go new file mode 100644 index 00000000..f6b5ae9d --- /dev/null +++ b/src/server/thinkingdata/thinkingdata.go @@ -0,0 +1,305 @@ +package thinkingdata + +import ( + "errors" + "sync" +) + +const ( + Track = "track" + TrackUpdate = "track_update" + TrackOverwrite = "track_overwrite" + UserSet = "user_set" + UserUnset = "user_unset" + UserSetOnce = "user_setOnce" + UserAdd = "user_add" + UserAppend = "user_append" + UserUniqAppend = "user_uniq_append" + UserDel = "user_del" + + SdkVersion = "2.0.3" + LibName = "Golang" +) + +type Data struct { + IsComplex bool `json:"-"` // properties are nested or not + AccountId string `json:"#account_id,omitempty"` + DistinctId string `json:"#distinct_id,omitempty"` + Type string `json:"#type"` + Time string `json:"#time"` + EventName string `json:"#event_name,omitempty"` + EventId string `json:"#event_id,omitempty"` + FirstCheckId string `json:"#first_check_id,omitempty"` + Ip string `json:"#ip,omitempty"` + UUID string `json:"#uuid,omitempty"` + AppId string `json:"#app_id,omitempty"` + Properties map[string]interface{} `json:"properties"` +} + +// TDConsumer define operation interface +type TDConsumer interface { + Add(d Data) error + Flush() error + Close() error + IsStringent() bool // check data or not. +} + +type TDAnalytics struct { + consumer TDConsumer + superProperties map[string]interface{} + mutex *sync.RWMutex + dynamicSuperProperties func() map[string]interface{} +} + +// New init SDK +func New(c TDConsumer) TDAnalytics { + tdLogInfo("init SDK success") + return TDAnalytics{ + consumer: c, + superProperties: make(map[string]interface{}), + mutex: new(sync.RWMutex), + } +} + +// GetSuperProperties get common properties +func (ta *TDAnalytics) GetSuperProperties() map[string]interface{} { + result := make(map[string]interface{}) + ta.mutex.Lock() + mergeProperties(result, ta.superProperties) + ta.mutex.Unlock() + return result +} + +// SetSuperProperties set common properties +func (ta *TDAnalytics) SetSuperProperties(superProperties map[string]interface{}) { + ta.mutex.Lock() + mergeProperties(ta.superProperties, superProperties) + ta.mutex.Unlock() +} + +// ClearSuperProperties clear common properties +func (ta *TDAnalytics) ClearSuperProperties() { + ta.mutex.Lock() + ta.superProperties = make(map[string]interface{}) + ta.mutex.Unlock() +} + +// SetDynamicSuperProperties set common properties dynamically. +// not recommend to add the operation which with a lot of computation +func (ta *TDAnalytics) SetDynamicSuperProperties(action func() map[string]interface{}) { + ta.mutex.Lock() + ta.dynamicSuperProperties = action + ta.mutex.Unlock() +} + +// GetDynamicSuperProperties dynamic common properties +func (ta *TDAnalytics) GetDynamicSuperProperties() map[string]interface{} { + result := make(map[string]interface{}) + ta.mutex.RLock() + if ta.dynamicSuperProperties != nil { + mergeProperties(result, ta.dynamicSuperProperties()) + } + ta.mutex.RUnlock() + return result +} + +// Track report ordinary event +func (ta *TDAnalytics) Track(accountId, distinctId, eventName string, properties map[string]interface{}) error { + return ta.track(accountId, distinctId, Track, eventName, "", properties) +} + +// TrackFirst report first event +func (ta *TDAnalytics) TrackFirst(accountId, distinctId, eventName, firstCheckId string, properties map[string]interface{}) error { + if len(firstCheckId) == 0 { + msg := "the 'firstCheckId' must be provided" + tdLogInfo(msg) + return errors.New(msg) + } + p := make(map[string]interface{}) + mergeProperties(p, properties) + p["#first_check_id"] = firstCheckId + return ta.track(accountId, distinctId, Track, eventName, "", p) +} + +// TrackUpdate report updatable event +func (ta *TDAnalytics) TrackUpdate(accountId, distinctId, eventName, eventId string, properties map[string]interface{}) error { + return ta.track(accountId, distinctId, TrackUpdate, eventName, eventId, properties) +} + +// TrackOverwrite report overridable event +func (ta *TDAnalytics) TrackOverwrite(accountId, distinctId, eventName, eventId string, properties map[string]interface{}) error { + return ta.track(accountId, distinctId, TrackOverwrite, eventName, eventId, properties) +} + +func (ta *TDAnalytics) track(accountId, distinctId, dataType, eventName, eventId string, properties map[string]interface{}) error { + defer func() { + if r := recover(); r != nil { + tdLogError("%+v\ndata: %+v", r, properties) + } + }() + + if len(eventName) == 0 { + msg := "the event name must be provided" + tdLogError(msg) + return errors.New(msg) + } + + // eventId not be null unless eventType is equal Track. + if len(eventId) == 0 && dataType != Track { + msg := "the event id must be provided" + tdLogError(msg) + return errors.New(msg) + } + + p := ta.GetSuperProperties() + dynamicSuperProperties := ta.GetDynamicSuperProperties() + + mergeProperties(p, dynamicSuperProperties) + // preset properties has the highest priority + p["#lib"] = LibName + p["#lib_version"] = SdkVersion + // custom properties + mergeProperties(p, properties) + + return ta.add(accountId, distinctId, dataType, eventName, eventId, p) +} + +// UserSet set user properties. would overwrite existing names. +func (ta *TDAnalytics) UserSet(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserSet, properties) +} + +// UserUnset clear the user properties of users. +func (ta *TDAnalytics) UserUnset(accountId string, distinctId string, s []string) error { + if len(s) == 0 { + msg := "invalid params for UserUnset: keys is nil" + tdLogInfo(msg) + return errors.New(msg) + } + prop := make(map[string]interface{}) + for _, v := range s { + prop[v] = 0 + } + return ta.user(accountId, distinctId, UserUnset, prop) +} + +func (ta *TDAnalytics) UserUnsetWithProperties(accountId string, distinctId string, properties map[string]interface{}) error { + if len(properties) == 0 { + msg := "invalid params for UserUnset: properties is nil" + tdLogInfo(msg) + return errors.New(msg) + } + return ta.user(accountId, distinctId, UserUnset, properties) +} + +// UserSetOnce set user properties, If such property had been set before, this message would be neglected. +func (ta *TDAnalytics) UserSetOnce(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserSetOnce, properties) +} + +// UserAdd to accumulate operations against the property. +func (ta *TDAnalytics) UserAdd(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserAdd, properties) +} + +// UserAppend to add user properties of array type. +func (ta *TDAnalytics) UserAppend(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserAppend, properties) +} + +// UserUniqAppend append user properties to array type by unique. +func (ta *TDAnalytics) UserUniqAppend(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserUniqAppend, properties) +} + +// UserDelete delete a user, This operation cannot be undone. +func (ta *TDAnalytics) UserDelete(accountId string, distinctId string) error { + return ta.user(accountId, distinctId, UserDel, nil) +} + +// UserDeleteWithProperties delete a user, This operation cannot be undone. +func (ta *TDAnalytics) UserDeleteWithProperties(accountId string, distinctId string, properties map[string]interface{}) error { + return ta.user(accountId, distinctId, UserDel, properties) +} + +func (ta *TDAnalytics) user(accountId, distinctId, dataType string, properties map[string]interface{}) error { + defer func() { + if r := recover(); r != nil { + tdLogError("%+v\ndata: %+v", r, properties) + } + }() + if properties == nil && dataType != UserDel { + msg := "invalid params for " + dataType + ": properties is nil" + tdLogError(msg) + return errors.New(msg) + } + p := make(map[string]interface{}) + mergeProperties(p, properties) + return ta.add(accountId, distinctId, dataType, "", "", p) +} + +// Flush report data immediately. +func (ta *TDAnalytics) Flush() error { + return ta.consumer.Flush() +} + +// Close and exit sdk +func (ta *TDAnalytics) Close() error { + err := ta.consumer.Close() + tdLogInfo("SDK close") + return err +} + +func (ta *TDAnalytics) add(accountId, distinctId, dataType, eventName, eventId string, properties map[string]interface{}) error { + if len(accountId) == 0 && len(distinctId) == 0 { + msg := "invalid parameters: account_id and distinct_id cannot be empty at the same time" + tdLogError(msg) + return errors.New(msg) + } + + // get "#ip" value in properties, empty string will be return when not found. + ip := extractStringProperty(properties, "#ip") + + // get "#app_id" value in properties, empty string will be return when not found. + appId := extractStringProperty(properties, "#app_id") + + // get "#time" value in properties, empty string will be return when not found. + eventTime := extractTime(properties) + + firstCheckId := extractStringProperty(properties, "#first_check_id") + + // get "#uuid" value in properties, empty string will be return when not found. + uuid := extractStringProperty(properties, "#uuid") + if len(uuid) == 0 { + uuid = generateUUID() + } + + data := Data{ + AccountId: accountId, + DistinctId: distinctId, + Type: dataType, + Time: eventTime, + EventName: eventName, + EventId: eventId, + FirstCheckId: firstCheckId, + Ip: ip, + UUID: uuid, + Properties: properties, + } + + if len(appId) > 0 { + data.AppId = appId + } + + err := formatProperties(&data, ta) + if err != nil { + return err + } + + return ta.consumer.Add(data) +} + +// Deprecated: please use TDConsumer +type Consumer interface { + TDConsumer +} diff --git a/src/server/thinkingdata/utils.go b/src/server/thinkingdata/utils.go new file mode 100644 index 00000000..3ac20f07 --- /dev/null +++ b/src/server/thinkingdata/utils.go @@ -0,0 +1,143 @@ +package thinkingdata + +import ( + "errors" + "fmt" + "github.com/google/uuid" + "os" + "reflect" + "regexp" + "time" +) + +const ( + DATE_FORMAT = "2006-01-02 15:04:05.000" + KEY_PATTERN = "^[a-zA-Z#][A-Za-z0-9_]{0,49}$" +) + +// A string of 50 letters and digits that starts with '#' or a letter +var keyPattern, _ = regexp.Compile(KEY_PATTERN) + +func mergeProperties(target, source map[string]interface{}) { + for k, v := range source { + target[k] = v + } +} + +func extractTime(p map[string]interface{}) string { + if t, ok := p["#time"]; ok { + delete(p, "#time") + switch v := t.(type) { + case string: + return v + case time.Time: + return v.Format(DATE_FORMAT) + default: + return time.Now().Format(DATE_FORMAT) + } + } + + return time.Now().Format(DATE_FORMAT) +} + +func extractStringProperty(p map[string]interface{}, key string) string { + if t, ok := p[key]; ok { + delete(p, key) + v, ok := t.(string) + if !ok { + fmt.Fprintln(os.Stderr, "Invalid data type for "+key) + } + return v + } + return "" +} + +func isNotNumber(v interface{}) bool { + switch v.(type) { + case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: + case float32, float64: + default: + return true + } + return false +} + +func formatProperties(d *Data, ta *TDAnalytics) error { + + if d.EventName != "" { + matched := checkPattern([]byte(d.EventName)) + if !matched { + msg := "invalid event name: " + d.EventName + tdLogInfo(msg) + return errors.New(msg) + } + } + + if d.Properties != nil { + for k, v := range d.Properties { + if ta.consumer.IsStringent() { + isMatch := checkPattern([]byte(k)) + if !isMatch { + msg := "invalid property key: " + k + tdLogInfo(msg) + return errors.New(msg) + } + } + + if d.Type == UserAdd && isNotNumber(v) { + msg := "invalid property value: only numbers is supported by UserAdd" + tdLogInfo(msg) + return errors.New(msg) + } + + // check value + switch v.(type) { + case int: + case bool: + case float64: + case string: + case time.Time: + d.Properties[k] = v.(time.Time).Format(DATE_FORMAT) + case []string: + d.IsComplex = true + default: + d.IsComplex = true + } + } + } + + return nil +} + +func isNotArrayOrSlice(v interface{}) bool { + typeOf := reflect.TypeOf(v) + switch typeOf.Kind() { + case reflect.Array: + case reflect.Slice: + default: + return true + } + return false +} + +func checkPattern(name []byte) bool { + return keyPattern.Match(name) +} + +func parseTime(input []byte) string { + var re = regexp.MustCompile(`"((\d{4}-\d{2}-\d{2})T(\d{2}:\d{2}:\d{2})(?:\.(\d{3}))\d*)(Z|[\+-]\d{2}:\d{2})"`) + var substitution = "\"$2 $3.$4\"" + + for re.Match(input) { + input = re.ReplaceAll(input, []byte(substitution)) + } + return string(input) +} + +func generateUUID() string { + newUUID, err := uuid.NewUUID() + if err != nil { + return "" + } + return newUUID.String() +}