1144 lines
27 KiB
Go
1144 lines
27 KiB
Go
package util
|
||
|
||
import (
|
||
"backend/Type"
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"sort"
|
||
"time"
|
||
|
||
"github.com/elastic/go-elasticsearch/v8"
|
||
)
|
||
|
||
func GetEsClient() (*elasticsearch.Client, error) {
|
||
cfg := elasticsearch.Config{
|
||
Addresses: []string{
|
||
"http://kibana.bywaystudios.com:9200",
|
||
},
|
||
Username: "elastic",
|
||
Password: "bywaystudios",
|
||
}
|
||
|
||
client, err := elasticsearch.NewClient(cfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建 ES 客户端失败: %w", err)
|
||
}
|
||
|
||
// 测试连接
|
||
res, err := client.Info()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("ES 连接测试失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
return nil, fmt.Errorf("ES 返回错误: %s", res.String())
|
||
}
|
||
|
||
return client, nil
|
||
}
|
||
|
||
// DSlSearch 执行 DSL 查询
|
||
// query 参数为查询 DSL 的 map 结构
|
||
// sort 参数为排序条件,传 nil 表示不排序
|
||
func DSlSearch(ctx context.Context, index string, query map[string]any, from, size int, sort []map[string]any) (map[string]any, error) {
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 构建完整查询
|
||
fullQuery := map[string]any{
|
||
"query": query,
|
||
"from": from,
|
||
"size": size,
|
||
}
|
||
|
||
// 添加排序(如果提供)
|
||
if len(sort) > 0 {
|
||
fullQuery["sort"] = sort
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(fullQuery); err != nil {
|
||
return nil, fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
res, err := client.Search(
|
||
client.Search.WithContext(ctx),
|
||
client.Search.WithIndex(index),
|
||
client.Search.WithBody(&buf),
|
||
client.Search.WithTrackTotalHits(true),
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]any
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return nil, fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// SimpleSearch 简单搜索(match 查询)
|
||
func SimpleSearch(ctx context.Context, index, field, value string, from, size int) (map[string]any, error) {
|
||
query := map[string]any{
|
||
"match": map[string]any{
|
||
field: value,
|
||
},
|
||
}
|
||
return DSlSearch(ctx, index, query, from, size, nil)
|
||
}
|
||
|
||
// TermSearch 精确匹配查询
|
||
func TermSearch(ctx context.Context, index, field, value string, from, size int) (map[string]any, error) {
|
||
query := map[string]any{
|
||
"term": map[string]any{
|
||
field: value,
|
||
},
|
||
}
|
||
return DSlSearch(ctx, index, query, from, size, nil)
|
||
}
|
||
|
||
type StatisticsOverview struct {
|
||
Register int64
|
||
TotalRegister int64
|
||
Recharge float64
|
||
TotalRecharge float64
|
||
RechargeUser int64
|
||
TotalRechargeUser int64
|
||
}
|
||
|
||
type StatisticsHeatTrend struct {
|
||
Key []string
|
||
DailyActive []int64
|
||
Register []int64
|
||
}
|
||
|
||
func CountStatisticsOverview(appId int, appOpenTime int64, tz string) (*StatisticsOverview, error) {
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
todayStart, err := GetZeroTimestamp(tz, 0)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
now := time.Now().Unix()
|
||
if appOpenTime <= 0 {
|
||
appOpenTime = todayStart
|
||
}
|
||
|
||
region := GetAppRegion(appId)
|
||
query := map[string]any{
|
||
"size": 0,
|
||
"query": map[string]any{
|
||
"bool": map[string]any{
|
||
"must": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"fields.environment": region,
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": appOpenTime,
|
||
"lte": now,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"scopes": map[string]any{
|
||
"filters": map[string]any{
|
||
"filters": map[string]any{
|
||
"today": map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": todayStart,
|
||
"lte": now,
|
||
},
|
||
},
|
||
},
|
||
"total": map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": appOpenTime,
|
||
"lte": now,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"register": map[string]any{
|
||
"filter": map[string]any{
|
||
"term": map[string]any{
|
||
"game.#event_name": "register",
|
||
},
|
||
},
|
||
},
|
||
"login": map[string]any{
|
||
"filter": map[string]any{
|
||
"bool": map[string]any{
|
||
"minimum_should_match": 1,
|
||
"should": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "Login_log",
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "login",
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "Login",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"uid_count": map[string]any{
|
||
"cardinality": map[string]any{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"pay": map[string]any{
|
||
"filter": map[string]any{
|
||
"term": map[string]any{
|
||
"game.#event_name": "pay",
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"uid_count": map[string]any{
|
||
"cardinality": map[string]any{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
"amount_sum": map[string]any{
|
||
"sum": map[string]any{
|
||
"script": map[string]any{
|
||
"source": "double v = 0; if (doc.containsKey('game.properties.price') && !doc['game.properties.price'].empty) { v = doc['game.properties.price'].value; } else if (doc.containsKey('game.properties.Price') && !doc['game.properties.Price'].empty) { v = doc['game.properties.Price'].value; } else if (doc.containsKey('game.properties.amount') && !doc['game.properties.amount'].empty) { v = doc['game.properties.amount'].value; } else if (doc.containsKey('game.properties.pay_amount') && !doc['game.properties.pay_amount'].empty) { v = doc['game.properties.pay_amount'].value; } return v;",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return nil, fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
|
||
res, err := client.Search(
|
||
client.Search.WithContext(context.Background()),
|
||
client.Search.WithIndex("game-user-log*"),
|
||
client.Search.WithBody(&buf),
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]any
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return nil, fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
|
||
overview := &StatisticsOverview{}
|
||
aggs, ok := result["aggregations"].(map[string]any)
|
||
if !ok {
|
||
return overview, nil
|
||
}
|
||
|
||
scopes, ok := aggs["scopes"].(map[string]any)
|
||
if !ok {
|
||
return overview, nil
|
||
}
|
||
|
||
buckets, ok := scopes["buckets"].(map[string]any)
|
||
if !ok {
|
||
return overview, nil
|
||
}
|
||
|
||
parseScope := func(scopeName string) (int64, float64, int64, int64) {
|
||
scope, ok := buckets[scopeName].(map[string]any)
|
||
if !ok {
|
||
return 0, 0, 0, 0
|
||
}
|
||
|
||
registerCount := int64(0)
|
||
if registerAgg, ok := scope["register"].(map[string]any); ok {
|
||
if docCount, ok := registerAgg["doc_count"].(float64); ok {
|
||
registerCount = int64(docCount)
|
||
}
|
||
}
|
||
|
||
loginUser := int64(0)
|
||
if loginAgg, ok := scope["login"].(map[string]any); ok {
|
||
if uidCount, ok := loginAgg["uid_count"].(map[string]any); ok {
|
||
if value, ok := uidCount["value"].(float64); ok {
|
||
loginUser = int64(value)
|
||
}
|
||
}
|
||
}
|
||
|
||
payAmount := float64(0)
|
||
payUser := int64(0)
|
||
if payAgg, ok := scope["pay"].(map[string]any); ok {
|
||
if amountSum, ok := payAgg["amount_sum"].(map[string]any); ok {
|
||
if value, ok := amountSum["value"].(float64); ok {
|
||
payAmount = value
|
||
}
|
||
}
|
||
if uidCount, ok := payAgg["uid_count"].(map[string]any); ok {
|
||
if value, ok := uidCount["value"].(float64); ok {
|
||
payUser = int64(value)
|
||
}
|
||
}
|
||
}
|
||
|
||
return registerCount, payAmount, payUser, loginUser
|
||
}
|
||
|
||
overview.Register, overview.Recharge, overview.RechargeUser, _ = parseScope("today")
|
||
overview.TotalRegister, overview.TotalRecharge, overview.TotalRechargeUser, _ = parseScope("total")
|
||
|
||
return overview, nil
|
||
}
|
||
|
||
func CountStatisticsHeat7Days(appId int, tz string) (*StatisticsHeatTrend, error) {
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
start, err := GetZeroTimestamp(tz, -6)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
now := time.Now().Unix()
|
||
region := GetAppRegion(appId)
|
||
|
||
dayKeys := make([]string, 0, 7)
|
||
dayLabels := make([]string, 0, 7)
|
||
dayFilters := make(map[string]any, 7)
|
||
for i := -6; i <= 0; i++ {
|
||
dayKey, err := GetDateStr(tz, i)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
dayStart, err := GetZeroTimestamp(tz, i)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
dayEnd := now
|
||
if i < 0 {
|
||
nextDayStart, err := GetZeroTimestamp(tz, i+1)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
dayEnd = nextDayStart - 1
|
||
}
|
||
|
||
dayKeys = append(dayKeys, dayKey)
|
||
dayLabels = append(dayLabels, dayKey[5:])
|
||
dayFilters[dayKey] = map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": dayStart,
|
||
"lte": dayEnd,
|
||
},
|
||
},
|
||
}
|
||
}
|
||
|
||
query := map[string]any{
|
||
"size": 0,
|
||
"query": map[string]any{
|
||
"bool": map[string]any{
|
||
"must": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"fields.environment": region,
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": start,
|
||
"lte": now,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"days": map[string]any{
|
||
"filters": map[string]any{
|
||
"filters": dayFilters,
|
||
},
|
||
"aggs": map[string]any{
|
||
"register": map[string]any{
|
||
"filter": map[string]any{
|
||
"term": map[string]any{
|
||
"game.#event_name": "register",
|
||
},
|
||
},
|
||
},
|
||
"login": map[string]any{
|
||
"filter": map[string]any{
|
||
"bool": map[string]any{
|
||
"minimum_should_match": 1,
|
||
"should": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "Login_log",
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "login_log",
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "login",
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "Login",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"uid_count": map[string]any{
|
||
"cardinality": map[string]any{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return nil, fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
|
||
res, err := client.Search(
|
||
client.Search.WithContext(context.Background()),
|
||
client.Search.WithIndex("game-user-log*"),
|
||
client.Search.WithBody(&buf),
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]any
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return nil, fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
|
||
registerByDay := make(map[string]int64)
|
||
activeByDay := make(map[string]int64)
|
||
|
||
aggs, _ := result["aggregations"].(map[string]any)
|
||
daysAgg, _ := aggs["days"].(map[string]any)
|
||
buckets, _ := daysAgg["buckets"].(map[string]any)
|
||
for day, b := range buckets {
|
||
bucket, ok := b.(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
if registerAgg, ok := bucket["register"].(map[string]any); ok {
|
||
if docCount, ok := registerAgg["doc_count"].(float64); ok {
|
||
registerByDay[day] = int64(docCount)
|
||
}
|
||
}
|
||
if loginAgg, ok := bucket["login"].(map[string]any); ok {
|
||
if uidCount, ok := loginAgg["uid_count"].(map[string]any); ok {
|
||
if value, ok := uidCount["value"].(float64); ok {
|
||
activeByDay[day] = int64(value)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
trend := &StatisticsHeatTrend{
|
||
Key: make([]string, 0, 7),
|
||
DailyActive: make([]int64, 0, 7),
|
||
Register: make([]int64, 0, 7),
|
||
}
|
||
for idx, day := range dayKeys {
|
||
trend.Key = append(trend.Key, dayLabels[idx])
|
||
trend.DailyActive = append(trend.DailyActive, activeByDay[day])
|
||
trend.Register = append(trend.Register, registerByDay[day])
|
||
}
|
||
|
||
return trend, nil
|
||
}
|
||
|
||
func SearchAssetByUid(app, _uid int, from, size int, start, end int64, itemid int, param string) ([]*Type.AssetData, int64, error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Printf("SearchAssetByUid panic: %v", r)
|
||
}
|
||
}()
|
||
uid := Int(_uid)
|
||
ctx := context.Background()
|
||
region := GetAppRegion(app)
|
||
mustCondition := []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"game.#distinct_id.keyword": uid,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"fields.environment": region,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "asset_change",
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": start,
|
||
"lte": end,
|
||
},
|
||
},
|
||
},
|
||
}
|
||
if itemid != 0 {
|
||
mustCondition = append(mustCondition, map[string]any{
|
||
"term": map[string]any{
|
||
"game.properties.item_id": itemid,
|
||
},
|
||
})
|
||
}
|
||
if param != "" {
|
||
mustCondition = append(mustCondition, map[string]any{
|
||
"match": map[string]any{
|
||
"game.properties.change_type": param,
|
||
},
|
||
})
|
||
}
|
||
query := map[string]any{
|
||
"bool": map[string]any{
|
||
"must": mustCondition,
|
||
},
|
||
}
|
||
|
||
// 添加降序排序
|
||
sort := []map[string]any{
|
||
{
|
||
"game.#timestamp": map[string]any{
|
||
"order": "desc",
|
||
},
|
||
},
|
||
}
|
||
|
||
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
|
||
if err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
// 获取总数
|
||
var total int64
|
||
hits := result["hits"].(map[string]any)
|
||
if totalObj, ok := hits["total"].(map[string]any); ok {
|
||
if value, ok := totalObj["value"].(float64); ok {
|
||
total = int64(value)
|
||
}
|
||
}
|
||
|
||
var assets []*Type.AssetData
|
||
for _, hit := range hits["hits"].([]any) {
|
||
source := hit.(map[string]any)["_source"].(map[string]any)
|
||
game := source["game"].(map[string]any)
|
||
properties := game["properties"].(map[string]any)
|
||
hitData := Type.AssetData{
|
||
ChangeNum: int(properties["change_num"].(float64)),
|
||
ChangeAfter: int(properties["change_after"].(float64)),
|
||
ChangeReason: properties["change_reason"].(string),
|
||
ChangeType: properties["change_type"].(string),
|
||
Timestamp: int64(game["#timestamp"].(float64)),
|
||
ItemId: int(properties["item_id"].(float64)),
|
||
}
|
||
assets = append(assets, &hitData)
|
||
}
|
||
return assets, total, nil
|
||
}
|
||
|
||
func SearchEventByUid(app, _uid int, from, size int, start, end int64, event_name string) ([]*Type.EventData, int64, error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Println("Recovered in SearchEventByUid", r)
|
||
}
|
||
}()
|
||
uid := Int(_uid)
|
||
ctx := context.Background()
|
||
region := GetAppRegion(app)
|
||
mustConditions := []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"game.#distinct_id.keyword": uid,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"fields.environment": region,
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": start,
|
||
"lte": end,
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
if event_name != "" {
|
||
mustConditions = append(mustConditions, map[string]any{
|
||
"term": map[string]any{
|
||
"game.#event_name": event_name,
|
||
},
|
||
})
|
||
}
|
||
|
||
query := map[string]any{
|
||
"bool": map[string]any{
|
||
"must": mustConditions,
|
||
"must_not": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "asset_change",
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name": "func_exec_time",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
// 添加降序排序
|
||
sort := []map[string]any{
|
||
{
|
||
"game.#timestamp": map[string]any{
|
||
"order": "desc",
|
||
},
|
||
},
|
||
}
|
||
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
|
||
if err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
// 获取总数
|
||
var total int64
|
||
hits := result["hits"].(map[string]any)
|
||
if totalObj, ok := hits["total"].(map[string]any); ok {
|
||
if value, ok := totalObj["value"].(float64); ok {
|
||
total = int64(value)
|
||
}
|
||
}
|
||
|
||
var assets []*Type.EventData
|
||
for _, hit := range hits["hits"].([]any) {
|
||
source := hit.(map[string]any)["_source"].(map[string]any)
|
||
game := source["game"].(map[string]any)
|
||
//properites := game["properties"].(map[string]any)
|
||
properties_summary := game["properties_summary"].(string)
|
||
hitData := Type.EventData{
|
||
Event: game["#event_name"].(string),
|
||
Label: game["#event_name"].(string),
|
||
Param: properties_summary,
|
||
Timestamp: int(game["#timestamp"].(float64)),
|
||
}
|
||
assets = append(assets, &hitData)
|
||
}
|
||
return assets, total, nil
|
||
}
|
||
|
||
func SearchLoginCountByUidMonth(app, _uid int, month string) ([]*Type.LoginDailyCount, int64, error) {
|
||
uid := Int(_uid)
|
||
if uid <= 0 {
|
||
return nil, 0, fmt.Errorf("invalid uid")
|
||
}
|
||
if month == "" {
|
||
return nil, 0, fmt.Errorf("month is required")
|
||
}
|
||
|
||
appConfig, err := GetAppConfig(app)
|
||
if err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
loc := time.UTC
|
||
if appConfig.Tz != "" {
|
||
loc, err = time.LoadLocation(appConfig.Tz)
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("invalid app timezone: %w", err)
|
||
}
|
||
}
|
||
|
||
monthTime, err := time.ParseInLocation("2006-01", month, loc)
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("invalid month format, expected YYYY-MM: %w", err)
|
||
}
|
||
|
||
monthStart := time.Date(monthTime.Year(), monthTime.Month(), 1, 0, 0, 0, 0, loc)
|
||
monthEnd := monthStart.AddDate(0, 1, 0).Add(-time.Second)
|
||
region := GetAppRegion(app)
|
||
|
||
dailyFilters := make(map[string]any)
|
||
for day := monthStart; !day.After(monthEnd); day = day.AddDate(0, 0, 1) {
|
||
dayStart := time.Date(day.Year(), day.Month(), day.Day(), 0, 0, 0, 0, loc)
|
||
dayEnd := dayStart.AddDate(0, 0, 1).Add(-time.Second)
|
||
dailyFilters[dayStart.Format("2006-01-02")] = map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": dayStart.Unix(),
|
||
"lte": dayEnd.Unix(),
|
||
},
|
||
},
|
||
}
|
||
}
|
||
|
||
query := map[string]any{
|
||
"bool": map[string]any{
|
||
"must": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"game.#distinct_id.keyword": uid,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"fields.environment": region,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]any{
|
||
"game.#event_name.keyword": "Login_log",
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": monthStart.Unix(),
|
||
"lte": monthEnd.Unix(),
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
fullQuery := map[string]any{
|
||
"query": query,
|
||
"size": 0,
|
||
"aggs": map[string]any{
|
||
"daily_login": map[string]any{
|
||
"filters": map[string]any{
|
||
"filters": dailyFilters,
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(fullQuery); err != nil {
|
||
return nil, 0, fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
|
||
res, err := client.Search(
|
||
client.Search.WithContext(context.Background()),
|
||
client.Search.WithIndex("game-user-log*"),
|
||
client.Search.WithBody(&buf),
|
||
client.Search.WithTrackTotalHits(true),
|
||
)
|
||
if err != nil {
|
||
return nil, 0, fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return nil, 0, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]any
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return nil, 0, fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
|
||
var total int64
|
||
if hits, ok := result["hits"].(map[string]any); ok {
|
||
if totalObj, ok := hits["total"].(map[string]any); ok {
|
||
if value, ok := totalObj["value"].(float64); ok {
|
||
total = int64(value)
|
||
}
|
||
}
|
||
}
|
||
|
||
counts := make(map[string]int64, len(dailyFilters))
|
||
if aggregations, ok := result["aggregations"].(map[string]any); ok {
|
||
if dailyLogin, ok := aggregations["daily_login"].(map[string]any); ok {
|
||
if buckets, ok := dailyLogin["buckets"].(map[string]any); ok {
|
||
for day, bucket := range buckets {
|
||
bucketMap, ok := bucket.(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
if docCount, ok := bucketMap["doc_count"].(float64); ok {
|
||
counts[day] = int64(docCount)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
dates := make([]string, 0, len(dailyFilters))
|
||
for day := range dailyFilters {
|
||
dates = append(dates, day)
|
||
}
|
||
sort.Strings(dates)
|
||
|
||
data := make([]*Type.LoginDailyCount, 0, len(dates))
|
||
for _, day := range dates {
|
||
data = append(data, &Type.LoginDailyCount{
|
||
Date: day,
|
||
Count: counts[day],
|
||
})
|
||
}
|
||
|
||
return data, total, nil
|
||
}
|
||
|
||
// CountDistinctUidLastHour 查询一个小时前的 game-user-log 索引中 game.#distinct_id 的去重个数
|
||
// 返回值:(当前小时的去重用户数, 一天前同一小时的去重用户数, error)
|
||
func CountDistinctUidLastHour() (int64, int64, error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Println("Recovered in CountDistinctUidLastHour", r)
|
||
}
|
||
}()
|
||
|
||
ctx := context.Background()
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
|
||
now := time.Now().Unix() - 600
|
||
// 当前小时:一个小时前至今
|
||
oneHourAgo := (now - 3600)
|
||
nowMs := now
|
||
|
||
// 一天前同一小时:25小时前至24小时前
|
||
oneDayOneHourAgo := (now - 25*3600)
|
||
oneDayAgo := (now - 24*3600)
|
||
|
||
// 构建查询:使用过滤器聚合分别统计两个时间段
|
||
query := map[string]any{
|
||
"query": map[string]any{
|
||
"bool": map[string]any{
|
||
"must": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"fields.region.keyword": "us-newyork",
|
||
},
|
||
},
|
||
},
|
||
"should": []map[string]any{
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"minimum_should_match": 1,
|
||
},
|
||
},
|
||
"from": 0,
|
||
"aggs": map[string]any{
|
||
"current_hour": map[string]any{
|
||
"filter": map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"unique_users": map[string]any{
|
||
"cardinality": map[string]any{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"yesterday_hour": map[string]any{
|
||
"filter": map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"unique_users": map[string]any{
|
||
"cardinality": map[string]any{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return 0, 0, fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
res, err := client.Search(
|
||
client.Search.WithContext(ctx),
|
||
client.Search.WithIndex("game-user-log*"),
|
||
client.Search.WithTrackTotalHits(true),
|
||
client.Search.WithBody(&buf),
|
||
)
|
||
if err != nil {
|
||
return 0, 0, fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return 0, 0, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]any
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return 0, 0, fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
|
||
// 从聚合结果中提取去重计数
|
||
var currentCount, yesterdayCount int64
|
||
|
||
if aggregations, ok := result["aggregations"].(map[string]any); ok {
|
||
// 提取当前小时的数据
|
||
if currentHour, ok := aggregations["current_hour"].(map[string]any); ok {
|
||
if uniqueUsers, ok := currentHour["unique_users"].(map[string]any); ok {
|
||
if value, ok := uniqueUsers["value"].(float64); ok {
|
||
currentCount = int64(value)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 提取昨天同一小时的数据
|
||
if yesterdayHour, ok := aggregations["yesterday_hour"].(map[string]any); ok {
|
||
if uniqueUsers, ok := yesterdayHour["unique_users"].(map[string]any); ok {
|
||
if value, ok := uniqueUsers["value"].(float64); ok {
|
||
yesterdayCount = int64(value)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return currentCount, yesterdayCount, nil
|
||
}
|
||
|
||
func CountDistinctUidLastHourTest() any {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Println("Recovered in CountDistinctUidLastHour", r)
|
||
}
|
||
}()
|
||
|
||
ctx := context.Background()
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
now := time.Now().Unix()
|
||
// 当前小时:一个小时前至今
|
||
oneHourAgo := (now - 3600)
|
||
nowMs := now
|
||
|
||
//一天前同一小时:25小时前至24小时前
|
||
oneDayOneHourAgo := (now - 25*3600)
|
||
oneDayAgo := (now - 24*3600)
|
||
|
||
// 构建查询:使用过滤器聚合分别统计两个时间段
|
||
query := map[string]any{
|
||
"query": map[string]any{
|
||
"bool": map[string]any{
|
||
"must": []map[string]any{
|
||
{
|
||
"term": map[string]any{
|
||
"fields.region.keyword": "us-newyork",
|
||
},
|
||
},
|
||
},
|
||
"should": []map[string]any{
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"minimum_should_match": 1,
|
||
},
|
||
},
|
||
"size": 0,
|
||
"from": 0,
|
||
"aggs": map[string]any{
|
||
"current_hour": map[string]any{
|
||
"filter": map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"unique_users": map[string]any{
|
||
"cardinality": map[string]any{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"yesterday_hour": map[string]any{
|
||
"filter": map[string]any{
|
||
"range": map[string]any{
|
||
"game.#timestamp": map[string]any{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]any{
|
||
"unique_users": map[string]any{
|
||
"cardinality": map[string]any{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
|
||
res, err := client.Search(
|
||
client.Search.WithContext(ctx),
|
||
client.Search.WithIndex(".ds-game-user-log*"),
|
||
client.Search.WithBody(&buf),
|
||
client.Search.WithTrackTotalHits(true),
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]any
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
return result
|
||
}
|