admin_backend/util/es.go
2026-05-28 14:39:23 +08:00

1144 lines
27 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package util
import (
"backend/Type"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"sort"
"time"
"github.com/elastic/go-elasticsearch/v8"
)
func GetEsClient() (*elasticsearch.Client, error) {
cfg := elasticsearch.Config{
Addresses: []string{
"http://kibana.bywaystudios.com:9200",
},
Username: "elastic",
Password: "bywaystudios",
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("创建 ES 客户端失败: %w", err)
}
// 测试连接
res, err := client.Info()
if err != nil {
return nil, fmt.Errorf("ES 连接测试失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("ES 返回错误: %s", res.String())
}
return client, nil
}
// DSlSearch 执行 DSL 查询
// query 参数为查询 DSL 的 map 结构
// sort 参数为排序条件,传 nil 表示不排序
func DSlSearch(ctx context.Context, index string, query map[string]any, from, size int, sort []map[string]any) (map[string]any, error) {
client, err := GetEsClient()
if err != nil {
return nil, err
}
// 构建完整查询
fullQuery := map[string]any{
"query": query,
"from": from,
"size": size,
}
// 添加排序(如果提供)
if len(sort) > 0 {
fullQuery["sort"] = sort
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(fullQuery); err != nil {
return nil, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex(index),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
)
if err != nil {
return nil, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]any
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
return result, nil
}
// SimpleSearch 简单搜索match 查询)
func SimpleSearch(ctx context.Context, index, field, value string, from, size int) (map[string]any, error) {
query := map[string]any{
"match": map[string]any{
field: value,
},
}
return DSlSearch(ctx, index, query, from, size, nil)
}
// TermSearch 精确匹配查询
func TermSearch(ctx context.Context, index, field, value string, from, size int) (map[string]any, error) {
query := map[string]any{
"term": map[string]any{
field: value,
},
}
return DSlSearch(ctx, index, query, from, size, nil)
}
type StatisticsOverview struct {
Register int64
TotalRegister int64
Recharge float64
TotalRecharge float64
RechargeUser int64
TotalRechargeUser int64
}
type StatisticsHeatTrend struct {
Key []string
DailyActive []int64
Register []int64
}
func CountStatisticsOverview(appId int, appOpenTime int64, tz string) (*StatisticsOverview, error) {
client, err := GetEsClient()
if err != nil {
return nil, err
}
todayStart, err := GetZeroTimestamp(tz, 0)
if err != nil {
return nil, err
}
now := time.Now().Unix()
if appOpenTime <= 0 {
appOpenTime = todayStart
}
region := GetAppRegion(appId)
query := map[string]any{
"size": 0,
"query": map[string]any{
"bool": map[string]any{
"must": []map[string]any{
{
"term": map[string]any{
"fields.environment": region,
},
},
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": appOpenTime,
"lte": now,
},
},
},
},
},
},
"aggs": map[string]any{
"scopes": map[string]any{
"filters": map[string]any{
"filters": map[string]any{
"today": map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": todayStart,
"lte": now,
},
},
},
"total": map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": appOpenTime,
"lte": now,
},
},
},
},
},
"aggs": map[string]any{
"register": map[string]any{
"filter": map[string]any{
"term": map[string]any{
"game.#event_name": "register",
},
},
},
"login": map[string]any{
"filter": map[string]any{
"bool": map[string]any{
"minimum_should_match": 1,
"should": []map[string]any{
{
"term": map[string]any{
"game.#event_name": "Login_log",
},
},
{
"term": map[string]any{
"game.#event_name": "login",
},
},
{
"term": map[string]any{
"game.#event_name": "Login",
},
},
},
},
},
"aggs": map[string]any{
"uid_count": map[string]any{
"cardinality": map[string]any{
"field": "game.#distinct_id.keyword",
},
},
},
},
"pay": map[string]any{
"filter": map[string]any{
"term": map[string]any{
"game.#event_name": "pay",
},
},
"aggs": map[string]any{
"uid_count": map[string]any{
"cardinality": map[string]any{
"field": "game.#distinct_id.keyword",
},
},
"amount_sum": map[string]any{
"sum": map[string]any{
"script": map[string]any{
"source": "double v = 0; if (doc.containsKey('game.properties.price') && !doc['game.properties.price'].empty) { v = doc['game.properties.price'].value; } else if (doc.containsKey('game.properties.Price') && !doc['game.properties.Price'].empty) { v = doc['game.properties.Price'].value; } else if (doc.containsKey('game.properties.amount') && !doc['game.properties.amount'].empty) { v = doc['game.properties.amount'].value; } else if (doc.containsKey('game.properties.pay_amount') && !doc['game.properties.pay_amount'].empty) { v = doc['game.properties.pay_amount'].value; } return v;",
},
},
},
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return nil, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(context.Background()),
client.Search.WithIndex("game-user-log*"),
client.Search.WithBody(&buf),
)
if err != nil {
return nil, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]any
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
overview := &StatisticsOverview{}
aggs, ok := result["aggregations"].(map[string]any)
if !ok {
return overview, nil
}
scopes, ok := aggs["scopes"].(map[string]any)
if !ok {
return overview, nil
}
buckets, ok := scopes["buckets"].(map[string]any)
if !ok {
return overview, nil
}
parseScope := func(scopeName string) (int64, float64, int64, int64) {
scope, ok := buckets[scopeName].(map[string]any)
if !ok {
return 0, 0, 0, 0
}
registerCount := int64(0)
if registerAgg, ok := scope["register"].(map[string]any); ok {
if docCount, ok := registerAgg["doc_count"].(float64); ok {
registerCount = int64(docCount)
}
}
loginUser := int64(0)
if loginAgg, ok := scope["login"].(map[string]any); ok {
if uidCount, ok := loginAgg["uid_count"].(map[string]any); ok {
if value, ok := uidCount["value"].(float64); ok {
loginUser = int64(value)
}
}
}
payAmount := float64(0)
payUser := int64(0)
if payAgg, ok := scope["pay"].(map[string]any); ok {
if amountSum, ok := payAgg["amount_sum"].(map[string]any); ok {
if value, ok := amountSum["value"].(float64); ok {
payAmount = value
}
}
if uidCount, ok := payAgg["uid_count"].(map[string]any); ok {
if value, ok := uidCount["value"].(float64); ok {
payUser = int64(value)
}
}
}
return registerCount, payAmount, payUser, loginUser
}
overview.Register, overview.Recharge, overview.RechargeUser, _ = parseScope("today")
overview.TotalRegister, overview.TotalRecharge, overview.TotalRechargeUser, _ = parseScope("total")
return overview, nil
}
func CountStatisticsHeat7Days(appId int, tz string) (*StatisticsHeatTrend, error) {
client, err := GetEsClient()
if err != nil {
return nil, err
}
start, err := GetZeroTimestamp(tz, -6)
if err != nil {
return nil, err
}
now := time.Now().Unix()
region := GetAppRegion(appId)
dayKeys := make([]string, 0, 7)
dayLabels := make([]string, 0, 7)
dayFilters := make(map[string]any, 7)
for i := -6; i <= 0; i++ {
dayKey, err := GetDateStr(tz, i)
if err != nil {
return nil, err
}
dayStart, err := GetZeroTimestamp(tz, i)
if err != nil {
return nil, err
}
dayEnd := now
if i < 0 {
nextDayStart, err := GetZeroTimestamp(tz, i+1)
if err != nil {
return nil, err
}
dayEnd = nextDayStart - 1
}
dayKeys = append(dayKeys, dayKey)
dayLabels = append(dayLabels, dayKey[5:])
dayFilters[dayKey] = map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": dayStart,
"lte": dayEnd,
},
},
}
}
query := map[string]any{
"size": 0,
"query": map[string]any{
"bool": map[string]any{
"must": []map[string]any{
{
"term": map[string]any{
"fields.environment": region,
},
},
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": start,
"lte": now,
},
},
},
},
},
},
"aggs": map[string]any{
"days": map[string]any{
"filters": map[string]any{
"filters": dayFilters,
},
"aggs": map[string]any{
"register": map[string]any{
"filter": map[string]any{
"term": map[string]any{
"game.#event_name": "register",
},
},
},
"login": map[string]any{
"filter": map[string]any{
"bool": map[string]any{
"minimum_should_match": 1,
"should": []map[string]any{
{
"term": map[string]any{
"game.#event_name": "Login_log",
},
},
{
"term": map[string]any{
"game.#event_name": "login_log",
},
},
{
"term": map[string]any{
"game.#event_name": "login",
},
},
{
"term": map[string]any{
"game.#event_name": "Login",
},
},
},
},
},
"aggs": map[string]any{
"uid_count": map[string]any{
"cardinality": map[string]any{
"field": "game.#distinct_id.keyword",
},
},
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return nil, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(context.Background()),
client.Search.WithIndex("game-user-log*"),
client.Search.WithBody(&buf),
)
if err != nil {
return nil, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]any
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
registerByDay := make(map[string]int64)
activeByDay := make(map[string]int64)
aggs, _ := result["aggregations"].(map[string]any)
daysAgg, _ := aggs["days"].(map[string]any)
buckets, _ := daysAgg["buckets"].(map[string]any)
for day, b := range buckets {
bucket, ok := b.(map[string]any)
if !ok {
continue
}
if registerAgg, ok := bucket["register"].(map[string]any); ok {
if docCount, ok := registerAgg["doc_count"].(float64); ok {
registerByDay[day] = int64(docCount)
}
}
if loginAgg, ok := bucket["login"].(map[string]any); ok {
if uidCount, ok := loginAgg["uid_count"].(map[string]any); ok {
if value, ok := uidCount["value"].(float64); ok {
activeByDay[day] = int64(value)
}
}
}
}
trend := &StatisticsHeatTrend{
Key: make([]string, 0, 7),
DailyActive: make([]int64, 0, 7),
Register: make([]int64, 0, 7),
}
for idx, day := range dayKeys {
trend.Key = append(trend.Key, dayLabels[idx])
trend.DailyActive = append(trend.DailyActive, activeByDay[day])
trend.Register = append(trend.Register, registerByDay[day])
}
return trend, nil
}
func SearchAssetByUid(app, _uid int, from, size int, start, end int64, itemid int, param string) ([]*Type.AssetData, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Printf("SearchAssetByUid panic: %v", r)
}
}()
uid := Int(_uid)
ctx := context.Background()
region := GetAppRegion(app)
mustCondition := []map[string]any{
{
"term": map[string]any{
"game.#distinct_id.keyword": uid,
},
},
{
"term": map[string]any{
"fields.environment": region,
},
},
{
"term": map[string]any{
"game.#event_name": "asset_change",
},
},
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": start,
"lte": end,
},
},
},
}
if itemid != 0 {
mustCondition = append(mustCondition, map[string]any{
"term": map[string]any{
"game.properties.item_id": itemid,
},
})
}
if param != "" {
mustCondition = append(mustCondition, map[string]any{
"match": map[string]any{
"game.properties.change_type": param,
},
})
}
query := map[string]any{
"bool": map[string]any{
"must": mustCondition,
},
}
// 添加降序排序
sort := []map[string]any{
{
"game.#timestamp": map[string]any{
"order": "desc",
},
},
}
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
if err != nil {
return nil, 0, err
}
// 获取总数
var total int64
hits := result["hits"].(map[string]any)
if totalObj, ok := hits["total"].(map[string]any); ok {
if value, ok := totalObj["value"].(float64); ok {
total = int64(value)
}
}
var assets []*Type.AssetData
for _, hit := range hits["hits"].([]any) {
source := hit.(map[string]any)["_source"].(map[string]any)
game := source["game"].(map[string]any)
properties := game["properties"].(map[string]any)
hitData := Type.AssetData{
ChangeNum: int(properties["change_num"].(float64)),
ChangeAfter: int(properties["change_after"].(float64)),
ChangeReason: properties["change_reason"].(string),
ChangeType: properties["change_type"].(string),
Timestamp: int64(game["#timestamp"].(float64)),
ItemId: int(properties["item_id"].(float64)),
}
assets = append(assets, &hitData)
}
return assets, total, nil
}
func SearchEventByUid(app, _uid int, from, size int, start, end int64, event_name string) ([]*Type.EventData, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in SearchEventByUid", r)
}
}()
uid := Int(_uid)
ctx := context.Background()
region := GetAppRegion(app)
mustConditions := []map[string]any{
{
"term": map[string]any{
"game.#distinct_id.keyword": uid,
},
},
{
"term": map[string]any{
"fields.environment": region,
},
},
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": start,
"lte": end,
},
},
},
}
if event_name != "" {
mustConditions = append(mustConditions, map[string]any{
"term": map[string]any{
"game.#event_name": event_name,
},
})
}
query := map[string]any{
"bool": map[string]any{
"must": mustConditions,
"must_not": []map[string]any{
{
"term": map[string]any{
"game.#event_name": "asset_change",
},
},
{
"term": map[string]any{
"game.#event_name": "func_exec_time",
},
},
},
},
}
// 添加降序排序
sort := []map[string]any{
{
"game.#timestamp": map[string]any{
"order": "desc",
},
},
}
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
if err != nil {
return nil, 0, err
}
// 获取总数
var total int64
hits := result["hits"].(map[string]any)
if totalObj, ok := hits["total"].(map[string]any); ok {
if value, ok := totalObj["value"].(float64); ok {
total = int64(value)
}
}
var assets []*Type.EventData
for _, hit := range hits["hits"].([]any) {
source := hit.(map[string]any)["_source"].(map[string]any)
game := source["game"].(map[string]any)
//properites := game["properties"].(map[string]any)
properties_summary := game["properties_summary"].(string)
hitData := Type.EventData{
Event: game["#event_name"].(string),
Label: game["#event_name"].(string),
Param: properties_summary,
Timestamp: int(game["#timestamp"].(float64)),
}
assets = append(assets, &hitData)
}
return assets, total, nil
}
func SearchLoginCountByUidMonth(app, _uid int, month string) ([]*Type.LoginDailyCount, int64, error) {
uid := Int(_uid)
if uid <= 0 {
return nil, 0, fmt.Errorf("invalid uid")
}
if month == "" {
return nil, 0, fmt.Errorf("month is required")
}
appConfig, err := GetAppConfig(app)
if err != nil {
return nil, 0, err
}
loc := time.UTC
if appConfig.Tz != "" {
loc, err = time.LoadLocation(appConfig.Tz)
if err != nil {
return nil, 0, fmt.Errorf("invalid app timezone: %w", err)
}
}
monthTime, err := time.ParseInLocation("2006-01", month, loc)
if err != nil {
return nil, 0, fmt.Errorf("invalid month format, expected YYYY-MM: %w", err)
}
monthStart := time.Date(monthTime.Year(), monthTime.Month(), 1, 0, 0, 0, 0, loc)
monthEnd := monthStart.AddDate(0, 1, 0).Add(-time.Second)
region := GetAppRegion(app)
dailyFilters := make(map[string]any)
for day := monthStart; !day.After(monthEnd); day = day.AddDate(0, 0, 1) {
dayStart := time.Date(day.Year(), day.Month(), day.Day(), 0, 0, 0, 0, loc)
dayEnd := dayStart.AddDate(0, 0, 1).Add(-time.Second)
dailyFilters[dayStart.Format("2006-01-02")] = map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": dayStart.Unix(),
"lte": dayEnd.Unix(),
},
},
}
}
query := map[string]any{
"bool": map[string]any{
"must": []map[string]any{
{
"term": map[string]any{
"game.#distinct_id.keyword": uid,
},
},
{
"term": map[string]any{
"fields.environment": region,
},
},
{
"term": map[string]any{
"game.#event_name.keyword": "Login_log",
},
},
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": monthStart.Unix(),
"lte": monthEnd.Unix(),
},
},
},
},
},
}
client, err := GetEsClient()
if err != nil {
return nil, 0, err
}
fullQuery := map[string]any{
"query": query,
"size": 0,
"aggs": map[string]any{
"daily_login": map[string]any{
"filters": map[string]any{
"filters": dailyFilters,
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(fullQuery); err != nil {
return nil, 0, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(context.Background()),
client.Search.WithIndex("game-user-log*"),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
)
if err != nil {
return nil, 0, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, 0, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]any
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, 0, fmt.Errorf("解析响应失败: %w", err)
}
var total int64
if hits, ok := result["hits"].(map[string]any); ok {
if totalObj, ok := hits["total"].(map[string]any); ok {
if value, ok := totalObj["value"].(float64); ok {
total = int64(value)
}
}
}
counts := make(map[string]int64, len(dailyFilters))
if aggregations, ok := result["aggregations"].(map[string]any); ok {
if dailyLogin, ok := aggregations["daily_login"].(map[string]any); ok {
if buckets, ok := dailyLogin["buckets"].(map[string]any); ok {
for day, bucket := range buckets {
bucketMap, ok := bucket.(map[string]any)
if !ok {
continue
}
if docCount, ok := bucketMap["doc_count"].(float64); ok {
counts[day] = int64(docCount)
}
}
}
}
}
dates := make([]string, 0, len(dailyFilters))
for day := range dailyFilters {
dates = append(dates, day)
}
sort.Strings(dates)
data := make([]*Type.LoginDailyCount, 0, len(dates))
for _, day := range dates {
data = append(data, &Type.LoginDailyCount{
Date: day,
Count: counts[day],
})
}
return data, total, nil
}
// CountDistinctUidLastHour 查询一个小时前的 game-user-log 索引中 game.#distinct_id 的去重个数
// 返回值:(当前小时的去重用户数, 一天前同一小时的去重用户数, error)
func CountDistinctUidLastHour() (int64, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in CountDistinctUidLastHour", r)
}
}()
ctx := context.Background()
client, err := GetEsClient()
if err != nil {
return 0, 0, err
}
now := time.Now().Unix() - 600
// 当前小时:一个小时前至今
oneHourAgo := (now - 3600)
nowMs := now
// 一天前同一小时25小时前至24小时前
oneDayOneHourAgo := (now - 25*3600)
oneDayAgo := (now - 24*3600)
// 构建查询:使用过滤器聚合分别统计两个时间段
query := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": []map[string]any{
{
"term": map[string]any{
"fields.region.keyword": "us-newyork",
},
},
},
"should": []map[string]any{
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
},
"minimum_should_match": 1,
},
},
"from": 0,
"aggs": map[string]any{
"current_hour": map[string]any{
"filter": map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
"aggs": map[string]any{
"unique_users": map[string]any{
"cardinality": map[string]any{
"field": "game.#distinct_id.keyword",
},
},
},
},
"yesterday_hour": map[string]any{
"filter": map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
"aggs": map[string]any{
"unique_users": map[string]any{
"cardinality": map[string]any{
"field": "game.#distinct_id.keyword",
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return 0, 0, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex("game-user-log*"),
client.Search.WithTrackTotalHits(true),
client.Search.WithBody(&buf),
)
if err != nil {
return 0, 0, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return 0, 0, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]any
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return 0, 0, fmt.Errorf("解析响应失败: %w", err)
}
// 从聚合结果中提取去重计数
var currentCount, yesterdayCount int64
if aggregations, ok := result["aggregations"].(map[string]any); ok {
// 提取当前小时的数据
if currentHour, ok := aggregations["current_hour"].(map[string]any); ok {
if uniqueUsers, ok := currentHour["unique_users"].(map[string]any); ok {
if value, ok := uniqueUsers["value"].(float64); ok {
currentCount = int64(value)
}
}
}
// 提取昨天同一小时的数据
if yesterdayHour, ok := aggregations["yesterday_hour"].(map[string]any); ok {
if uniqueUsers, ok := yesterdayHour["unique_users"].(map[string]any); ok {
if value, ok := uniqueUsers["value"].(float64); ok {
yesterdayCount = int64(value)
}
}
}
}
return currentCount, yesterdayCount, nil
}
func CountDistinctUidLastHourTest() any {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in CountDistinctUidLastHour", r)
}
}()
ctx := context.Background()
client, err := GetEsClient()
if err != nil {
return err
}
now := time.Now().Unix()
// 当前小时:一个小时前至今
oneHourAgo := (now - 3600)
nowMs := now
//一天前同一小时25小时前至24小时前
oneDayOneHourAgo := (now - 25*3600)
oneDayAgo := (now - 24*3600)
// 构建查询:使用过滤器聚合分别统计两个时间段
query := map[string]any{
"query": map[string]any{
"bool": map[string]any{
"must": []map[string]any{
{
"term": map[string]any{
"fields.region.keyword": "us-newyork",
},
},
},
"should": []map[string]any{
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
},
"minimum_should_match": 1,
},
},
"size": 0,
"from": 0,
"aggs": map[string]any{
"current_hour": map[string]any{
"filter": map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
"aggs": map[string]any{
"unique_users": map[string]any{
"cardinality": map[string]any{
"field": "game.#distinct_id.keyword",
},
},
},
},
"yesterday_hour": map[string]any{
"filter": map[string]any{
"range": map[string]any{
"game.#timestamp": map[string]any{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
"aggs": map[string]any{
"unique_users": map[string]any{
"cardinality": map[string]any{
"field": "game.#distinct_id.keyword",
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex(".ds-game-user-log*"),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
)
if err != nil {
return fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]any
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("解析响应失败: %w", err)
}
return result
}