admin_backend/util/es.go
2026-02-06 15:31:37 +08:00

566 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package util
import (
"backend/Type"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"time"
"github.com/elastic/go-elasticsearch/v8"
)
func GetEsClient() (*elasticsearch.Client, error) {
cfg := elasticsearch.Config{
Addresses: []string{
"http://kibana.bywaystudios.com:9200",
},
Username: "elastic",
Password: "bywaystudios",
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("创建 ES 客户端失败: %w", err)
}
// 测试连接
res, err := client.Info()
if err != nil {
return nil, fmt.Errorf("ES 连接测试失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("ES 返回错误: %s", res.String())
}
return client, nil
}
// DSlSearch 执行 DSL 查询
// query 参数为查询 DSL 的 map 结构
// sort 参数为排序条件,传 nil 表示不排序
func DSlSearch(ctx context.Context, index string, query map[string]interface{}, from, size int, sort []map[string]interface{}) (map[string]interface{}, error) {
client, err := GetEsClient()
if err != nil {
return nil, err
}
// 构建完整查询
fullQuery := map[string]interface{}{
"query": query,
"from": from,
"size": size,
}
// 添加排序(如果提供)
if len(sort) > 0 {
fullQuery["sort"] = sort
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(fullQuery); err != nil {
return nil, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex(index),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
)
if err != nil {
return nil, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
return result, nil
}
// SimpleSearch 简单搜索match 查询)
func SimpleSearch(ctx context.Context, index, field, value string, from, size int) (map[string]interface{}, error) {
query := map[string]interface{}{
"match": map[string]interface{}{
field: value,
},
}
return DSlSearch(ctx, index, query, from, size, nil)
}
// TermSearch 精确匹配查询
func TermSearch(ctx context.Context, index, field, value string, from, size int) (map[string]interface{}, error) {
query := map[string]interface{}{
"term": map[string]interface{}{
field: value,
},
}
return DSlSearch(ctx, index, query, from, size, nil)
}
func SearchAssetByUid(app, _uid int, from, size int, start, end int64, itemid int, param string) ([]*Type.AssetData, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Printf("SearchAssetByUid panic: %v", r)
}
}()
uid := Int(_uid)
ctx := context.Background()
region := GetAppRegion(app)
mustCondition := []map[string]interface{}{
{
"term": map[string]interface{}{
"game.#distinct_id": uid,
},
},
{
"term": map[string]interface{}{
"fields.region": region,
},
},
{
"term": map[string]interface{}{
"game.#event_name": "asset_change",
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": start,
"lte": end,
},
},
},
}
if itemid != 0 {
mustCondition = append(mustCondition, map[string]interface{}{
"term": map[string]interface{}{
"game.properties.item_id": itemid,
},
})
}
if param != "" {
mustCondition = append(mustCondition, map[string]interface{}{
"match": map[string]interface{}{
"game.properties.change_type": param,
},
})
}
query := map[string]interface{}{
"bool": map[string]interface{}{
"must": mustCondition,
},
}
// 添加降序排序
sort := []map[string]interface{}{
{
"game.#timestamp": map[string]interface{}{
"order": "desc",
},
},
}
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
if err != nil {
return nil, 0, err
}
// 获取总数
var total int64
hits := result["hits"].(map[string]interface{})
if totalObj, ok := hits["total"].(map[string]interface{}); ok {
if value, ok := totalObj["value"].(float64); ok {
total = int64(value)
}
}
var assets []*Type.AssetData
for _, hit := range hits["hits"].([]interface{}) {
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
game := source["game"].(map[string]interface{})
properties := game["properties"].(map[string]interface{})
hitData := Type.AssetData{
ChangeNum: int(properties["change_num"].(float64)),
ChangeAfter: int(properties["change_after"].(float64)),
ChangeReason: properties["change_reason"].(string),
ChangeType: properties["change_type"].(string),
Timestamp: int64(game["#timestamp"].(float64)),
ItemId: int(properties["item_id"].(float64)),
}
assets = append(assets, &hitData)
}
return assets, total, nil
}
func SearchEventByUid(app, _uid int, from, size int, start, end int64, event_name string) ([]*Type.EventData, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in SearchEventByUid", r)
}
}()
uid := Int(_uid)
ctx := context.Background()
region := GetAppRegion(app)
mustConditions := []map[string]interface{}{
{
"term": map[string]interface{}{
"game.#distinct_id": uid,
},
},
{
"term": map[string]interface{}{
"fields.region": region,
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": start,
"lte": end,
},
},
},
}
if event_name != "" {
mustConditions = append(mustConditions, map[string]interface{}{
"term": map[string]interface{}{
"game.#event_name": event_name,
},
})
}
query := map[string]interface{}{
"bool": map[string]interface{}{
"must": mustConditions,
"must_not": []map[string]interface{}{
{
"term": map[string]interface{}{
"game.#event_name": "asset_change",
},
},
},
},
}
// 添加降序排序
sort := []map[string]interface{}{
{
"game.#timestamp": map[string]interface{}{
"order": "desc",
},
},
}
result, err := DSlSearch(ctx, "game-user-log*", query, from, size, sort)
if err != nil {
return nil, 0, err
}
// 获取总数
var total int64
hits := result["hits"].(map[string]interface{})
if totalObj, ok := hits["total"].(map[string]interface{}); ok {
if value, ok := totalObj["value"].(float64); ok {
total = int64(value)
}
}
var assets []*Type.EventData
for _, hit := range hits["hits"].([]interface{}) {
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
game := source["game"].(map[string]interface{})
//properites := game["properties"].(map[string]interface{})
properties_summary := game["properties_summary"].(string)
hitData := Type.EventData{
Event: game["#event_name"].(string),
Label: game["#event_name"].(string),
Param: properties_summary,
Timestamp: int(game["#timestamp"].(float64)),
}
assets = append(assets, &hitData)
}
return assets, total, nil
}
// CountDistinctUidLastHour 查询一个小时前的 game-user-log 索引中 game.#distinct_id 的去重个数
// 返回值:(当前小时的去重用户数, 一天前同一小时的去重用户数, error)
func CountDistinctUidLastHour() (int64, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in CountDistinctUidLastHour", r)
}
}()
ctx := context.Background()
client, err := GetEsClient()
if err != nil {
return 0, 0, err
}
now := time.Now().Unix()
// 当前小时:一个小时前至今
oneHourAgo := (now - 3600)
nowMs := now
// 一天前同一小时25小时前至24小时前
oneDayOneHourAgo := (now - 25*3600)
oneDayAgo := (now - 24*3600)
// 构建查询:使用过滤器聚合分别统计两个时间段
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"term": map[string]interface{}{
"fields.region": "us-newyork",
},
},
},
"should": []map[string]interface{}{
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
},
"minimum_should_match": 1,
},
},
"from": 0,
"aggs": map[string]interface{}{
"current_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id",
},
},
},
},
"yesterday_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id",
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return 0, 0, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex("game-user-log*"),
client.Search.WithBody(&buf),
)
if err != nil {
return 0, 0, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return 0, 0, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return 0, 0, fmt.Errorf("解析响应失败: %w", err)
}
// 从聚合结果中提取去重计数
var currentCount, yesterdayCount int64
if aggregations, ok := result["aggregations"].(map[string]interface{}); ok {
// 提取当前小时的数据
if currentHour, ok := aggregations["current_hour"].(map[string]interface{}); ok {
if uniqueUsers, ok := currentHour["unique_users"].(map[string]interface{}); ok {
if value, ok := uniqueUsers["value"].(float64); ok {
currentCount = int64(value)
}
}
}
// 提取昨天同一小时的数据
if yesterdayHour, ok := aggregations["yesterday_hour"].(map[string]interface{}); ok {
if uniqueUsers, ok := yesterdayHour["unique_users"].(map[string]interface{}); ok {
if value, ok := uniqueUsers["value"].(float64); ok {
yesterdayCount = int64(value)
}
}
}
}
return currentCount, yesterdayCount, nil
}
func CountDistinctUidLastHourTest() interface{} {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in CountDistinctUidLastHour", r)
}
}()
ctx := context.Background()
client, err := GetEsClient()
if err != nil {
return err
}
now := time.Now().Unix()
// 当前小时:一个小时前至今
oneHourAgo := (now - 3600)
nowMs := now
//一天前同一小时25小时前至24小时前
oneDayOneHourAgo := (now - 25*3600)
oneDayAgo := (now - 24*3600)
// 构建查询:使用过滤器聚合分别统计两个时间段
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"term": map[string]interface{}{
"fields.region.keyword": "us-newyork",
},
},
},
"should": []map[string]interface{}{
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
},
"minimum_should_match": 1,
},
},
"size": 0,
"from": 0,
"aggs": map[string]interface{}{
"current_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id.keyword",
},
},
},
},
"yesterday_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id.keyword",
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex(".ds-game-user-log*"),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
)
if err != nil {
return fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("解析响应失败: %w", err)
}
return result
}