567 lines
14 KiB
Go
567 lines
14 KiB
Go
package util
|
||
|
||
import (
|
||
"backend/Type"
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"io"
|
||
"log"
|
||
"time"
|
||
|
||
"github.com/elastic/go-elasticsearch/v8"
|
||
)
|
||
|
||
func GetEsClient() (*elasticsearch.Client, error) {
|
||
cfg := elasticsearch.Config{
|
||
Addresses: []string{
|
||
"http://kibana.bywaystudios.com:9200",
|
||
},
|
||
Username: "elastic",
|
||
Password: "bywaystudios",
|
||
}
|
||
|
||
client, err := elasticsearch.NewClient(cfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("创建 ES 客户端失败: %w", err)
|
||
}
|
||
|
||
// 测试连接
|
||
res, err := client.Info()
|
||
if err != nil {
|
||
return nil, fmt.Errorf("ES 连接测试失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
return nil, fmt.Errorf("ES 返回错误: %s", res.String())
|
||
}
|
||
|
||
return client, nil
|
||
}
|
||
|
||
// DSlSearch 执行 DSL 查询
|
||
// query 参数为查询 DSL 的 map 结构
|
||
// sort 参数为排序条件,传 nil 表示不排序
|
||
func DSlSearch(ctx context.Context, index string, query map[string]interface{}, from, size int, sort []map[string]interface{}) (map[string]interface{}, error) {
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 构建完整查询
|
||
fullQuery := map[string]interface{}{
|
||
"query": query,
|
||
"from": from,
|
||
"size": size,
|
||
}
|
||
|
||
// 添加排序(如果提供)
|
||
if len(sort) > 0 {
|
||
fullQuery["sort"] = sort
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(fullQuery); err != nil {
|
||
return nil, fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
fmt.Printf("%s", buf.String())
|
||
res, err := client.Search(
|
||
client.Search.WithContext(ctx),
|
||
client.Search.WithIndex(index),
|
||
client.Search.WithBody(&buf),
|
||
client.Search.WithTrackTotalHits(true),
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]interface{}
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return nil, fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// SimpleSearch 简单搜索(match 查询)
|
||
func SimpleSearch(ctx context.Context, index, field, value string, from, size int) (map[string]interface{}, error) {
|
||
query := map[string]interface{}{
|
||
"match": map[string]interface{}{
|
||
field: value,
|
||
},
|
||
}
|
||
return DSlSearch(ctx, index, query, from, size, nil)
|
||
}
|
||
|
||
// TermSearch 精确匹配查询
|
||
func TermSearch(ctx context.Context, index, field, value string, from, size int) (map[string]interface{}, error) {
|
||
query := map[string]interface{}{
|
||
"term": map[string]interface{}{
|
||
field: value,
|
||
},
|
||
}
|
||
return DSlSearch(ctx, index, query, from, size, nil)
|
||
}
|
||
|
||
func SearchAssetByUid(app, _uid int, from, size int, start, end int64, itemid int, param string) ([]*Type.AssetData, int64, error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Printf("SearchAssetByUid panic: %v", r)
|
||
}
|
||
}()
|
||
uid := Int(_uid)
|
||
ctx := context.Background()
|
||
region := GetAppRegion(app)
|
||
mustCondition := []map[string]interface{}{
|
||
{
|
||
"term": map[string]interface{}{
|
||
"game.#distinct_id.keyword": uid,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]interface{}{
|
||
"fields.environment": region,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]interface{}{
|
||
"game.#event_name": "asset_change",
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": start,
|
||
"lte": end,
|
||
},
|
||
},
|
||
},
|
||
}
|
||
if itemid != 0 {
|
||
mustCondition = append(mustCondition, map[string]interface{}{
|
||
"term": map[string]interface{}{
|
||
"game.properties.item_id": itemid,
|
||
},
|
||
})
|
||
}
|
||
if param != "" {
|
||
mustCondition = append(mustCondition, map[string]interface{}{
|
||
"match": map[string]interface{}{
|
||
"game.properties.change_type": param,
|
||
},
|
||
})
|
||
}
|
||
query := map[string]interface{}{
|
||
"bool": map[string]interface{}{
|
||
"must": mustCondition,
|
||
},
|
||
}
|
||
|
||
// 添加降序排序
|
||
sort := []map[string]interface{}{
|
||
{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"order": "desc",
|
||
},
|
||
},
|
||
}
|
||
|
||
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
|
||
if err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
// 获取总数
|
||
var total int64
|
||
hits := result["hits"].(map[string]interface{})
|
||
if totalObj, ok := hits["total"].(map[string]interface{}); ok {
|
||
if value, ok := totalObj["value"].(float64); ok {
|
||
total = int64(value)
|
||
}
|
||
}
|
||
|
||
var assets []*Type.AssetData
|
||
for _, hit := range hits["hits"].([]interface{}) {
|
||
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
|
||
game := source["game"].(map[string]interface{})
|
||
properties := game["properties"].(map[string]interface{})
|
||
hitData := Type.AssetData{
|
||
ChangeNum: int(properties["change_num"].(float64)),
|
||
ChangeAfter: int(properties["change_after"].(float64)),
|
||
ChangeReason: properties["change_reason"].(string),
|
||
ChangeType: properties["change_type"].(string),
|
||
Timestamp: int64(game["#timestamp"].(float64)),
|
||
ItemId: int(properties["item_id"].(float64)),
|
||
}
|
||
assets = append(assets, &hitData)
|
||
}
|
||
return assets, total, nil
|
||
}
|
||
|
||
func SearchEventByUid(app, _uid int, from, size int, start, end int64, event_name string) ([]*Type.EventData, int64, error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Println("Recovered in SearchEventByUid", r)
|
||
}
|
||
}()
|
||
uid := Int(_uid)
|
||
ctx := context.Background()
|
||
region := GetAppRegion(app)
|
||
mustConditions := []map[string]interface{}{
|
||
{
|
||
"term": map[string]interface{}{
|
||
"game.#distinct_id.keyword": uid,
|
||
},
|
||
},
|
||
{
|
||
"term": map[string]interface{}{
|
||
"fields.environment": region,
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": start,
|
||
"lte": end,
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
if event_name != "" {
|
||
mustConditions = append(mustConditions, map[string]interface{}{
|
||
"term": map[string]interface{}{
|
||
"game.#event_name": event_name,
|
||
},
|
||
})
|
||
}
|
||
|
||
query := map[string]interface{}{
|
||
"bool": map[string]interface{}{
|
||
"must": mustConditions,
|
||
"must_not": []map[string]interface{}{
|
||
{
|
||
"term": map[string]interface{}{
|
||
"game.#event_name": "asset_change",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
// 添加降序排序
|
||
sort := []map[string]interface{}{
|
||
{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"order": "desc",
|
||
},
|
||
},
|
||
}
|
||
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
|
||
if err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
// 获取总数
|
||
var total int64
|
||
hits := result["hits"].(map[string]interface{})
|
||
if totalObj, ok := hits["total"].(map[string]interface{}); ok {
|
||
if value, ok := totalObj["value"].(float64); ok {
|
||
total = int64(value)
|
||
}
|
||
}
|
||
|
||
var assets []*Type.EventData
|
||
for _, hit := range hits["hits"].([]interface{}) {
|
||
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
|
||
game := source["game"].(map[string]interface{})
|
||
//properites := game["properties"].(map[string]interface{})
|
||
properties_summary := game["properties_summary"].(string)
|
||
hitData := Type.EventData{
|
||
Event: game["#event_name"].(string),
|
||
Label: game["#event_name"].(string),
|
||
Param: properties_summary,
|
||
Timestamp: int(game["#timestamp"].(float64)),
|
||
}
|
||
assets = append(assets, &hitData)
|
||
}
|
||
return assets, total, nil
|
||
}
|
||
|
||
// CountDistinctUidLastHour 查询一个小时前的 game-user-log 索引中 game.#distinct_id 的去重个数
|
||
// 返回值:(当前小时的去重用户数, 一天前同一小时的去重用户数, error)
|
||
func CountDistinctUidLastHour() (int64, int64, error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Println("Recovered in CountDistinctUidLastHour", r)
|
||
}
|
||
}()
|
||
|
||
ctx := context.Background()
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return 0, 0, err
|
||
}
|
||
|
||
now := time.Now().Unix()
|
||
// 当前小时:一个小时前至今
|
||
oneHourAgo := (now - 3600)
|
||
nowMs := now
|
||
|
||
// 一天前同一小时:25小时前至24小时前
|
||
oneDayOneHourAgo := (now - 25*3600)
|
||
oneDayAgo := (now - 24*3600)
|
||
|
||
// 构建查询:使用过滤器聚合分别统计两个时间段
|
||
query := map[string]interface{}{
|
||
"query": map[string]interface{}{
|
||
"bool": map[string]interface{}{
|
||
"must": []map[string]interface{}{
|
||
{
|
||
"term": map[string]interface{}{
|
||
"fields.region.keyword": "us-newyork",
|
||
},
|
||
},
|
||
},
|
||
"should": []map[string]interface{}{
|
||
{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"minimum_should_match": 1,
|
||
},
|
||
},
|
||
"from": 0,
|
||
"aggs": map[string]interface{}{
|
||
"current_hour": map[string]interface{}{
|
||
"filter": map[string]interface{}{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]interface{}{
|
||
"unique_users": map[string]interface{}{
|
||
"cardinality": map[string]interface{}{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"yesterday_hour": map[string]interface{}{
|
||
"filter": map[string]interface{}{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]interface{}{
|
||
"unique_users": map[string]interface{}{
|
||
"cardinality": map[string]interface{}{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return 0, 0, fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
fmt.Printf("%s", buf.String())
|
||
res, err := client.Search(
|
||
client.Search.WithContext(ctx),
|
||
client.Search.WithIndex("game-user-log*"),
|
||
client.Search.WithTrackTotalHits(true),
|
||
client.Search.WithBody(&buf),
|
||
)
|
||
if err != nil {
|
||
return 0, 0, fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return 0, 0, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]interface{}
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return 0, 0, fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
|
||
// 从聚合结果中提取去重计数
|
||
var currentCount, yesterdayCount int64
|
||
|
||
if aggregations, ok := result["aggregations"].(map[string]interface{}); ok {
|
||
// 提取当前小时的数据
|
||
if currentHour, ok := aggregations["current_hour"].(map[string]interface{}); ok {
|
||
if uniqueUsers, ok := currentHour["unique_users"].(map[string]interface{}); ok {
|
||
if value, ok := uniqueUsers["value"].(float64); ok {
|
||
currentCount = int64(value)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 提取昨天同一小时的数据
|
||
if yesterdayHour, ok := aggregations["yesterday_hour"].(map[string]interface{}); ok {
|
||
if uniqueUsers, ok := yesterdayHour["unique_users"].(map[string]interface{}); ok {
|
||
if value, ok := uniqueUsers["value"].(float64); ok {
|
||
yesterdayCount = int64(value)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return currentCount, yesterdayCount, nil
|
||
}
|
||
|
||
func CountDistinctUidLastHourTest() interface{} {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
log.Println("Recovered in CountDistinctUidLastHour", r)
|
||
}
|
||
}()
|
||
|
||
ctx := context.Background()
|
||
client, err := GetEsClient()
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
now := time.Now().Unix()
|
||
// 当前小时:一个小时前至今
|
||
oneHourAgo := (now - 3600)
|
||
nowMs := now
|
||
|
||
//一天前同一小时:25小时前至24小时前
|
||
oneDayOneHourAgo := (now - 25*3600)
|
||
oneDayAgo := (now - 24*3600)
|
||
|
||
// 构建查询:使用过滤器聚合分别统计两个时间段
|
||
query := map[string]interface{}{
|
||
"query": map[string]interface{}{
|
||
"bool": map[string]interface{}{
|
||
"must": []map[string]interface{}{
|
||
{
|
||
"term": map[string]interface{}{
|
||
"fields.region.keyword": "us-newyork",
|
||
},
|
||
},
|
||
},
|
||
"should": []map[string]interface{}{
|
||
{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"minimum_should_match": 1,
|
||
},
|
||
},
|
||
"size": 0,
|
||
"from": 0,
|
||
"aggs": map[string]interface{}{
|
||
"current_hour": map[string]interface{}{
|
||
"filter": map[string]interface{}{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneHourAgo,
|
||
"lte": nowMs,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]interface{}{
|
||
"unique_users": map[string]interface{}{
|
||
"cardinality": map[string]interface{}{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"yesterday_hour": map[string]interface{}{
|
||
"filter": map[string]interface{}{
|
||
"range": map[string]interface{}{
|
||
"game.#timestamp": map[string]interface{}{
|
||
"gte": oneDayOneHourAgo,
|
||
"lte": oneDayAgo,
|
||
},
|
||
},
|
||
},
|
||
"aggs": map[string]interface{}{
|
||
"unique_users": map[string]interface{}{
|
||
"cardinality": map[string]interface{}{
|
||
"field": "game.#distinct_id.keyword",
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
}
|
||
|
||
var buf bytes.Buffer
|
||
if err := json.NewEncoder(&buf).Encode(query); err != nil {
|
||
return fmt.Errorf("编码查询失败: %w", err)
|
||
}
|
||
|
||
res, err := client.Search(
|
||
client.Search.WithContext(ctx),
|
||
client.Search.WithIndex(".ds-game-user-log*"),
|
||
client.Search.WithBody(&buf),
|
||
client.Search.WithTrackTotalHits(true),
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("执行搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
|
||
}
|
||
|
||
var result map[string]interface{}
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return fmt.Errorf("解析响应失败: %w", err)
|
||
}
|
||
return result
|
||
}
|