版本更新

This commit is contained in:
hahwu 2026-01-26 15:28:47 +08:00
parent 4d8c3dbfcf
commit 994baddfbd
13 changed files with 2410 additions and 1673 deletions

View File

@ -184,3 +184,19 @@ type AlibabaNotifyData struct {
AlarmTime string `json:"alarm_time"`
NotifyMsg string `json:"notify_msg"`
}
type AssetData struct {
ChangeNum int `json:"change_num"`
ChangeAfter int `json:"change_after"`
ChangeReason string `json:"change_reason"`
ChangeType string `json:"change_type"`
ItemId int `json:"item_id"`
Timestamp int64 `json:"timestamp"`
}
type EventData struct {
Event string `json:"Event"`
Label string `json:"Label"`
Param string `json:"Param"`
Timestamp int `json:"Timestamp"`
}

View File

@ -389,6 +389,9 @@ func SendStandardMsg(title, content, color string) (_err error) {
RobotCode: tea.String(robotCode),
// 卡片接收人
Recipients: []*string{},
AtUserIds: map[string]*string{
"@ALL": tea.String("@ALL"),
},
}
imGroupOpenSpaceModelLastMessageI18n := map[string]*string{
"ZH_CN": tea.String(lastMessage),
@ -428,6 +431,102 @@ func SendStandardMsg(title, content, color string) (_err error) {
ImGroupOpenDeliverModel: imGroupOpenDeliverModel,
OpenSpaceId: tea.String(fmt.Sprintf("dtv1.card//im_group.%s", openConversationId)),
UserIdType: tea.Int32(1),
// CardAtUserIds: []*string{tea.String("all")},
}
tryErr := func() (_e error) {
defer func() {
if r := tea.Recover(recover()); r != nil {
_e = r
}
}()
fmt.Println("createAndDeliverRequest", createAndDeliverRequest)
_, _err = client.CreateAndDeliverWithOptions(createAndDeliverRequest, createAndDeliverHeaders, &util.RuntimeOptions{})
if _err != nil {
return _err
}
return nil
}()
if tryErr != nil {
var err = &tea.SDKError{}
if _t, ok := tryErr.(*tea.SDKError); ok {
err = _t
} else {
err.Message = tea.String(tryErr.Error())
}
if !tea.BoolValue(util.Empty(err.Code)) && !tea.BoolValue(util.Empty(err.Message)) {
// err 中含有 code 和 message 属性,可帮助开发定位问题
}
}
return _err
}
func SendAliveMsg(title, content, color string) (_err error) {
client, _err := createCardClient()
if _err != nil {
return _err
}
accessToken, _ := GetToken()
robotCode := "dingrmgtodzxaik76jpc"
userId := ""
openConversationId := "cidivmW+tO/JGyIFM/XHNeQcA=="
templateId := "843a23ff-29d2-4efc-b7f4-2dea2766d7db.schema"
lastMessage := title
searchIcon := ""
searchDesc := ""
createAndDeliverHeaders := &dingtalkcard_1_0.CreateAndDeliverHeaders{}
createAndDeliverHeaders.XAcsDingtalkAccessToken = tea.String(accessToken)
imGroupOpenDeliverModel := &dingtalkcard_1_0.CreateAndDeliverRequestImGroupOpenDeliverModel{
RobotCode: tea.String(robotCode),
// 卡片接收人
Recipients: []*string{},
AtUserIds: map[string]*string{
"@ALL": tea.String("@ALL"),
},
}
imGroupOpenSpaceModelLastMessageI18n := map[string]*string{
"ZH_CN": tea.String(lastMessage),
}
imGroupOpenSpaceModelSearchSupport := &dingtalkcard_1_0.CreateAndDeliverRequestImGroupOpenSpaceModelSearchSupport{
SearchIcon: tea.String(searchIcon),
SearchDesc: tea.String(searchDesc),
}
imGroupOpenSpaceModel := &dingtalkcard_1_0.CreateAndDeliverRequestImGroupOpenSpaceModel{
SupportForward: tea.Bool(true),
LastMessageI18n: imGroupOpenSpaceModelLastMessageI18n,
SearchSupport: imGroupOpenSpaceModelSearchSupport,
}
// 此处使用了 MockData 作为测试数据,请结合真实场景设置卡片公有数据
cardDataCardParamMap := map[string]any{
"title": title,
"markdown": content,
"color": color,
"config": map[string]any{
"autoLayout": true,
},
}
cardDataError := error(nil)
_ = cardDataError
cardData := &dingtalkcard_1_0.CreateAndDeliverRequestCardData{
CardParamMap: convertJSONValuesToString(cardDataCardParamMap),
}
createAndDeliverRequest := &dingtalkcard_1_0.CreateAndDeliverRequest{
UserId: tea.String(userId),
CardTemplateId: tea.String(templateId),
// 用于标识卡片的唯一 ID业务需自行建立关联关系用于后续的卡片更新
OutTrackId: tea.String(fmt.Sprintf("standard-out-track-id-%d", time.Now().Unix())),
CallbackType: tea.String("STREAM"),
CardData: cardData,
ImGroupOpenSpaceModel: imGroupOpenSpaceModel,
ImGroupOpenDeliverModel: imGroupOpenDeliverModel,
OpenSpaceId: tea.String(fmt.Sprintf("dtv1.card//im_group.%s", openConversationId)),
UserIdType: tea.Int32(1),
// CardAtUserIds: []*string{tea.String("all")},
}
tryErr := func() (_e error) {
defer func() {

View File

@ -163,7 +163,7 @@ func main() {
go server.Server()
go model.InitToken() // 初始化 Token 列表
//go controller.USSendInfo() // 启动定时任务发送信息
//go monitor.UserAliveMonitor(0) // 用户存活监控
go monitor.UserAliveMonitor(0) // 用户存活监控
go monitor.ServerInfoMonitor() // 服务器信息监控
defer func() {
if err := recover(); err != nil {

View File

@ -1,8 +1,8 @@
package model
import (
"backend/Type"
util "backend/util"
"encoding/json"
"fmt"
)
@ -19,15 +19,15 @@ type Log struct {
type ResAsset struct {
Total int `json:"total"`
Data []*ResAssetDetail `json:"data"`
Data []*Type.AssetData `json:"data"`
Sum int `json:"sum"` // 总和
NSum int `json:"nsum"` // 负数和
PSum int `json:"psum"` // 正数和
}
type ResEvent struct {
Total int `json:"total"`
Data []*Event `json:"data"`
Total int `json:"total"`
Data []*Type.EventData `json:"data"`
}
type ResOrder struct {
@ -74,77 +74,24 @@ type Event struct {
}
func (m *Log) Asset() (*ResAsset, error) {
AppId, _ := util.ParseUid(m.Uid)
AppConfig, err := util.GetAppConfig(AppId)
resutl, total, err := util.SearchAssetByUid(m.Uid, (m.CurrentPage-1)*m.PageSize, m.CurrentPage*m.PageSize, m.StartTime, m.EndTime)
if err != nil {
return nil, err
}
StartTime := m.StartTime
EndTime := m.EndTime
Db := util.MPool.GetTopicDB(AppConfig.Topic)
defer Db.Close()
if Db == nil {
return nil, fmt.Errorf("failed to get mysql database")
}
assets := []*Event{}
value := make([]interface{}, 0)
totalValue := make([]interface{}, 0)
Sql := "SELECT * FROM log_event WHERE Uid = ? and `Event` = 'asset_change' and Timestamp >= ? and Timestamp <= ? "
TotalSql := "SELECT COUNT(*) FROM log_event WHERE Uid = ? and `Event` = 'asset_change' and Timestamp >= ? and Timestamp <= ? "
totalValue = append(totalValue, m.Uid, StartTime, EndTime)
value = append(value, m.Uid, StartTime, EndTime)
if m.ItemId != 0 {
Sql += "and Param like ? "
value = append(value, fmt.Sprintf("%%\"item_id\":%d%%", m.ItemId))
TotalSql += "and Param like ? "
totalValue = append(totalValue, fmt.Sprintf("%%\"item_id\":%d%%", m.ItemId))
}
if m.EventParam != "" {
Sql += "and Param like ? "
value = append(value, fmt.Sprintf("%%\"change_type\":\"%s\"%%", m.EventParam))
TotalSql += "and Param like ? "
totalValue = append(totalValue, fmt.Sprintf("%%\"change_type\":\"%s\"%%", m.EventParam))
}
value = append(value, (m.CurrentPage-1)*m.PageSize, m.PageSize)
Sql += "ORDER BY Timestamp DESC LIMIT ?, ?"
err = Db.Select(&assets, Sql, value...)
if err != nil {
return nil, fmt.Errorf("failed to get asset list: %v", err)
}
var total int
err = Db.QueryRow(TotalSql, totalValue...).Scan(&total)
if err != nil {
return nil, fmt.Errorf("failed to get asset count: %v", err)
}
resData := []*ResAssetDetail{}
Sum := 0
NSum := 0
PSum := 0
for _, asset := range assets {
param := map[string]interface{}{}
err := json.Unmarshal([]byte(asset.Param), &param)
if err != nil {
continue
}
Sum += util.Int(param["change_num"])
if param["change_type"].(string) == "gain" {
PSum += util.Int(param["change_num"])
for _, asset := range resutl {
Sum += asset.ChangeNum
if asset.ChangeType == "gain" {
PSum += asset.ChangeNum
} else {
NSum += util.Int(param["change_num"])
NSum += asset.ChangeNum
}
resData = append(resData, &ResAssetDetail{
Uid: asset.Uid,
ChangeType: param["change_type"].(string),
ChangeNum: util.Int(param["change_num"]),
ChangeAfter: util.Int(param["change_after"]),
ItemId: util.Int(param["item_id"]),
Timestamp: asset.Timestamp,
})
}
return &ResAsset{
Total: total,
Data: resData,
Total: int(total),
Data: resutl,
Sum: Sum,
NSum: NSum,
PSum: PSum,
@ -152,56 +99,13 @@ func (m *Log) Asset() (*ResAsset, error) {
}
func (m *Log) Event() (*ResEvent, error) {
AppId, _ := util.ParseUid(m.Uid)
AppConfig, err := util.GetAppConfig(AppId)
event, total, err := util.SearchEventByUid(m.Uid, (m.CurrentPage-1)*m.PageSize, m.CurrentPage*m.PageSize, m.StartTime, m.EndTime)
if err != nil {
return nil, err
}
Db := util.MPool.GetTopicDB(AppConfig.Topic)
defer Db.Close()
if Db == nil {
return nil, fmt.Errorf("failed to get mysql database")
}
StartTime := m.StartTime
EndTime := m.EndTime
assets := []*Event{}
Sql := "SELECT * FROM log_event WHERE Uid = ? and `Event` != 'asset_change' and Timestamp >= ? and Timestamp <= ? "
Value := make([]interface{}, 0)
Value = append(Value, m.Uid, StartTime, EndTime)
if m.EventParam != "" {
Sql += "and `Event` = ? "
Value = append(Value, m.EventParam)
}
Sql += "ORDER BY Timestamp DESC LIMIT ?, ?"
Value = append(Value, (m.CurrentPage-1)*m.PageSize, m.PageSize)
err = Db.Select(&assets, Sql, Value...)
// if m.EventParam != "" {
// err = Db.Select(&assets, "SELECT * FROM log_event WHERE Uid = ? and `Event` = ? ORDER BY Timestamp DESC LIMIT ?, ?", m.Uid, m.EventParam, (m.CurrentPage-1)*m.PageSize, m.PageSize)
// } else {
// err = Db.Select(&assets, "SELECT * FROM log_event WHERE Uid = ? and `Event` != 'asset_change' ORDER BY Timestamp DESC LIMIT ?, ?", m.Uid, (m.CurrentPage-1)*m.PageSize, m.PageSize)
// }
if err != nil {
return nil, fmt.Errorf("failed to get event list: %v", err)
}
var total int
totalSql := "SELECT COUNT(*) FROM log_event WHERE Uid = ? and `Event` != 'asset_change' and Timestamp >= ? and Timestamp <= ? "
totalValue := make([]interface{}, 0)
totalValue = append(totalValue, m.Uid, StartTime, EndTime)
if m.EventParam != "" {
totalSql += "and `Event` = ? "
totalValue = append(totalValue, m.EventParam)
}
err = Db.QueryRow(totalSql, totalValue...).Scan(&total)
//err = Db.QueryRow("SELECT COUNT(*) FROM log_event WHERE Uid = ? and `Event` != 'asset_change'", m.Uid).Scan(&total)
if err != nil {
return nil, fmt.Errorf("failed to get event count: %v", err)
}
for _, asset := range assets {
asset.Label = asset.Event
}
return &ResEvent{
Total: total,
Data: assets,
Total: int(total),
Data: event,
}, nil
}

View File

@ -2,11 +2,10 @@ package monitor
import (
"backend/Type"
"backend/feishu"
"backend/alibaba"
"backend/model"
"backend/util"
"fmt"
"log"
"time"
)
@ -15,49 +14,42 @@ func UserAliveMonitor(AppId int) {
Db := util.MPool.GetTopicDB(AppConfig.Topic)
defer Db.Close()
for {
//now := time.Now()
//next := now.Truncate(time.Hour).Add(time.Hour)
// time.Sleep(time.Until(next))
for {
// time window: the hour that just finished
endT := time.Now().Truncate(time.Hour)
startT := endT.Add(-time.Hour)
yStartT := startT.Add(-24 * time.Hour)
yEndT := endT.Add(-24 * time.Hour)
start := startT.Unix()
end := endT.Unix()
yStart := yStartT.Unix()
yEnd := yEndT.Unix()
var curCount, yCount int64
if err := Db.QueryRow("SELECT COUNT(DISTINCT Uid) FROM log_event WHERE Timestamp>=? AND Timestamp<?", start, end).Scan(&curCount); err != nil {
log.Printf("monitor current count query error: %v", err)
time.Sleep(time.Hour)
continue
}
if err := Db.QueryRow("SELECT COUNT(DISTINCT Uid) FROM log_event WHERE Timestamp>=? AND Timestamp<?", yStart, yEnd).Scan(&yCount); err != nil {
log.Printf("monitor yesterday count query error: %v", err)
time.Sleep(time.Hour)
continue
}
if yCount > 0 {
drop := float64(yCount-curCount) / float64(yCount)
if drop >= 0.3 && (yCount-curCount) >= int64(10) {
feishu.SendNoticeMsg("meowment", "用户存活监控",
fmt.Sprintf("监控时间段: %s ~ %s\n昨日活跃用户数: %d\n当前活跃用户数: %d\n用户流失率: %.2f%%",
startT.Format("2006-01-02 15:04:05"),
endT.Format("2006-01-02 15:04:05"),
yCount,
curCount,
drop*100))
}
}
time.Sleep(time.Until(time.Now().Truncate(time.Hour).Add(time.Hour)))
curCount, yCount, err := util.CountDistinctUidLastHour()
if err != nil {
continue
}
if yCount > 0 {
drop := float64(yCount-curCount) / float64(yCount)
str := `
# **游戏数据监控异常**
- 项目名称: meowment
- 监控项名称: 用户存活监控<br/>
-------------------------
监控时间段: %s ~ %s<br/>
昨日活跃用户数: %d<br/>
当前活跃用户数: %d<br/>
用户流失率: **%.2f%%**<br/>
<a>@所有人</a>
`
if drop >= 0.3 && (yCount-curCount) >= int64(10) {
alibaba.SendAliveMsg("服务器报警", fmt.Sprintf(str,
time.Now().Format("2006-01-02 15:04:05"),
time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"),
yCount,
curCount,
drop*100), "red")
// feishu.SendNoticeMsg("meowment", "用户存活监控",
// fmt.Sprintf("监控时间段: %s ~ %s\n昨日活跃用户数: %d\n当前活跃用户数: %d\n用户流失率: %.2f%%",
// time.Now().Format("2006-01-02 15:04:05"),
// time.Now().Add(time.Hour).Format("2006-01-02 15:04:05"),
// yCount,
// curCount,
// drop*100))
}
}
time.Sleep(time.Until(time.Now().Truncate(time.Hour).Add(time.Hour)))
}
}

File diff suppressed because it is too large Load Diff

View File

@ -5,6 +5,7 @@ import (
logincommon "backend/sdk/login/common"
"backend/sdk/login/model/test"
"backend/sdk/login/model/tuyou"
"backend/util"
"fmt"
"io"
"log"
@ -57,6 +58,8 @@ func init() {
gin.DefaultWriter = logWriter
gin.DefaultErrorWriter = errWriter
common.Init()
util.InitBBolt()
go util.MonitorServerList()
}
func main() {

View File

@ -9,6 +9,7 @@ import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
@ -62,6 +63,7 @@ func Login(c *gin.Context) {
}
func (t *TuyouModel) VerifyToken(AppId int, Uid string, Token string) error {
now := time.Now().UnixMilli()
client := &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
@ -115,6 +117,6 @@ func (t *TuyouModel) VerifyToken(AppId int, Uid string, Token string) error {
if respObj.Result.Verify != "ok" {
return fmt.Errorf("verify failed: verify=%s body=%s", respObj.Result.Verify, string(body))
}
log.Printf("Tuyou verify success: cost time %dms\n", time.Now().UnixMilli()-now)
return nil
}

View File

@ -129,6 +129,7 @@ func Charge(c *gin.Context) {
c.String(200, "success")
}
// http://localhost:5240/api/tuyou/charge?apiver=2&appId=20659&appInfo=%7B%22appId%22%3A0%2C%22serverId%22%3A1%2C%22orderId%22%3A%22order_105372_20260125225337HhpqbU%22%2C%22uid%22%3A105372%7D&chargedDiamonds=20&chargedRmbs=1.99&clientId=Android_5.00_tyGuest%2Cfacebook.googleplay.0-hall20659.googleplay.Meowment&consumeCoin=20&consumeId=d50b32601260117387&orderId=-&platformOrder=e50b3260126045d972&prodCount=1&prodId=TY206590059&prodPrice=1.99&userId=3790944&code=fd7532d651bed4c3041aa3e628bef80b
func Shipping(AppInfo base.Param) {
Adminreq := &msg.ReqAdminShipping{
OrderSn: AppInfo.OrderId,
@ -141,6 +142,7 @@ func Shipping(AppInfo base.Param) {
num++
if num > 100 {
log.Print("charge shipping break infinite loop;AppId:", AppInfo.AppId, ";ServerId:", AppInfo.ServerId, ";OrderId:", AppInfo.OrderId)
break
}
ws, err := util.GetWebsocket(AppInfo.AppId, AppInfo.ServerId)

530
util/es.go Normal file
View File

@ -0,0 +1,530 @@
package util
import (
"backend/Type"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"time"
"github.com/elastic/go-elasticsearch/v8"
)
func GetEsClient() (*elasticsearch.Client, error) {
cfg := elasticsearch.Config{
Addresses: []string{
"http://kibana.bywaystudios.com:9200",
},
Username: "elastic",
Password: "bywaystudios",
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("创建 ES 客户端失败: %w", err)
}
// 测试连接
res, err := client.Info()
if err != nil {
return nil, fmt.Errorf("ES 连接测试失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("ES 返回错误: %s", res.String())
}
return client, nil
}
// DSlSearch 执行 DSL 查询
// query 参数为查询 DSL 的 map 结构
// sort 参数为排序条件,传 nil 表示不排序
func DSlSearch(ctx context.Context, index string, query map[string]interface{}, from, size int, sort []map[string]interface{}) (map[string]interface{}, error) {
client, err := GetEsClient()
if err != nil {
return nil, err
}
// 构建完整查询
fullQuery := map[string]interface{}{
"query": query,
"from": from,
"size": size,
}
// 添加排序(如果提供)
if len(sort) > 0 {
fullQuery["sort"] = sort
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(fullQuery); err != nil {
return nil, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex(index),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
)
if err != nil {
return nil, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析响应失败: %w", err)
}
return result, nil
}
// SimpleSearch 简单搜索match 查询)
func SimpleSearch(ctx context.Context, index, field, value string, from, size int) (map[string]interface{}, error) {
query := map[string]interface{}{
"match": map[string]interface{}{
field: value,
},
}
return DSlSearch(ctx, index, query, from, size, nil)
}
// TermSearch 精确匹配查询
func TermSearch(ctx context.Context, index, field, value string, from, size int) (map[string]interface{}, error) {
query := map[string]interface{}{
"term": map[string]interface{}{
field: value,
},
}
return DSlSearch(ctx, index, query, from, size, nil)
}
func SearchAssetByUid(_uid int, from, size int, start, end int64) ([]*Type.AssetData, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Printf("SearchAssetByUid panic: %v", r)
}
}()
uid := Int(_uid)
ctx := context.Background()
query := map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"term": map[string]interface{}{
"game.#distinct_id.keyword": uid,
},
},
{
"term": map[string]interface{}{
"game.#event_name": "asset_change",
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": start,
"lte": end,
},
},
},
},
},
}
// 添加降序排序
sort := []map[string]interface{}{
{
"game.#timestamp": map[string]interface{}{
"order": "desc",
},
},
}
result, err := DSlSearch(ctx, "game-user-log", query, from, size, sort)
if err != nil {
return nil, 0, err
}
// 获取总数
var total int64
hits := result["hits"].(map[string]interface{})
if totalObj, ok := hits["total"].(map[string]interface{}); ok {
if value, ok := totalObj["value"].(float64); ok {
total = int64(value)
}
}
var assets []*Type.AssetData
for _, hit := range hits["hits"].([]interface{}) {
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
game := source["game"].(map[string]interface{})
properties := game["properties"].(map[string]interface{})
hitData := Type.AssetData{
ChangeNum: int(properties["change_num"].(float64)),
ChangeAfter: int(properties["change_after"].(float64)),
ChangeReason: properties["change_reason"].(string),
ChangeType: properties["change_type"].(string),
Timestamp: int64(game["#timestamp"].(float64)),
ItemId: int(properties["item_id"].(float64)),
}
assets = append(assets, &hitData)
}
return assets, total, nil
}
func SearchEventByUid(_uid int, from, size int, start, end int64) ([]*Type.EventData, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in SearchEventByUid", r)
}
}()
uid := Int(_uid)
ctx := context.Background()
query := map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"term": map[string]interface{}{
"game.#distinct_id.keyword": uid,
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": start,
"lte": end,
},
},
},
},
"must_not": []map[string]interface{}{
{
"term": map[string]interface{}{
"game.#event_name": "asset_change",
},
},
},
},
}
// 添加降序排序
sort := []map[string]interface{}{
{
"game.#timestamp": map[string]interface{}{
"order": "desc",
},
},
}
result, err := DSlSearch(ctx, "game-user-log", query, from, size, sort)
if err != nil {
return nil, 0, err
}
// 获取总数
var total int64
hits := result["hits"].(map[string]interface{})
if totalObj, ok := hits["total"].(map[string]interface{}); ok {
if value, ok := totalObj["value"].(float64); ok {
total = int64(value)
}
}
var assets []*Type.EventData
for _, hit := range hits["hits"].([]interface{}) {
source := hit.(map[string]interface{})["_source"].(map[string]interface{})
game := source["game"].(map[string]interface{})
//properites := game["properties"].(map[string]interface{})
properties_summary := game["properties_summary"].(string)
hitData := Type.EventData{
Event: game["#event_name"].(string),
Label: game["#event_name"].(string),
Param: properties_summary,
Timestamp: int(game["#timestamp"].(float64)),
}
assets = append(assets, &hitData)
}
return assets, total, nil
}
// CountDistinctUidLastHour 查询一个小时前的 game-user-log 索引中 game.#distinct_id 的去重个数
// 返回值:(当前小时的去重用户数, 一天前同一小时的去重用户数, error)
func CountDistinctUidLastHour() (int64, int64, error) {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in CountDistinctUidLastHour", r)
}
}()
ctx := context.Background()
client, err := GetEsClient()
if err != nil {
return 0, 0, err
}
now := time.Now().Unix()
// 当前小时:一个小时前至今
oneHourAgo := (now - 3600)
nowMs := now
// 一天前同一小时25小时前至24小时前
oneDayOneHourAgo := (now - 25*3600)
oneDayAgo := (now - 24*3600)
// 构建查询:使用过滤器聚合分别统计两个时间段
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"term": map[string]interface{}{
"fields.region.keyword": "us-newyork",
},
},
},
"should": []map[string]interface{}{
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
},
"minimum_should_match": 1,
},
},
"size": 0,
"from": 0,
"aggs": map[string]interface{}{
"current_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id.keyword",
},
},
},
},
"yesterday_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id.keyword",
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return 0, 0, fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex("game-user-log"),
client.Search.WithBody(&buf),
)
if err != nil {
return 0, 0, fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return 0, 0, fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return 0, 0, fmt.Errorf("解析响应失败: %w", err)
}
// 从聚合结果中提取去重计数
var currentCount, yesterdayCount int64
if aggregations, ok := result["aggregations"].(map[string]interface{}); ok {
// 提取当前小时的数据
if currentHour, ok := aggregations["current_hour"].(map[string]interface{}); ok {
if uniqueUsers, ok := currentHour["unique_users"].(map[string]interface{}); ok {
if value, ok := uniqueUsers["value"].(float64); ok {
currentCount = int64(value)
}
}
}
// 提取昨天同一小时的数据
if yesterdayHour, ok := aggregations["yesterday_hour"].(map[string]interface{}); ok {
if uniqueUsers, ok := yesterdayHour["unique_users"].(map[string]interface{}); ok {
if value, ok := uniqueUsers["value"].(float64); ok {
yesterdayCount = int64(value)
}
}
}
}
return currentCount, yesterdayCount, nil
}
func CountDistinctUidLastHourTest() interface{} {
defer func() {
if r := recover(); r != nil {
log.Println("Recovered in CountDistinctUidLastHour", r)
}
}()
ctx := context.Background()
client, err := GetEsClient()
if err != nil {
return err
}
now := time.Now().Unix()
// 当前小时:一个小时前至今
oneHourAgo := (now - 3600)
nowMs := now
//一天前同一小时25小时前至24小时前
oneDayOneHourAgo := (now - 25*3600)
oneDayAgo := (now - 24*3600)
// 构建查询:使用过滤器聚合分别统计两个时间段
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"term": map[string]interface{}{
"fields.region.keyword": "us-newyork",
},
},
},
"should": []map[string]interface{}{
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
},
"minimum_should_match": 1,
},
},
"size": 0,
"from": 0,
"aggs": map[string]interface{}{
"current_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneHourAgo,
"lte": nowMs,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id.keyword",
},
},
},
},
"yesterday_hour": map[string]interface{}{
"filter": map[string]interface{}{
"range": map[string]interface{}{
"game.#timestamp": map[string]interface{}{
"gte": oneDayOneHourAgo,
"lte": oneDayAgo,
},
},
},
"aggs": map[string]interface{}{
"unique_users": map[string]interface{}{
"cardinality": map[string]interface{}{
"field": "game.#distinct_id.keyword",
},
},
},
},
},
}
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return fmt.Errorf("编码查询失败: %w", err)
}
res, err := client.Search(
client.Search.WithContext(ctx),
client.Search.WithIndex("game-user-log"),
client.Search.WithBody(&buf),
client.Search.WithTrackTotalHits(true),
)
if err != nil {
return fmt.Errorf("执行搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("ES 搜索错误 [%s]: %s", res.Status(), string(bodyBytes))
}
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return fmt.Errorf("解析响应失败: %w", err)
}
return result
}

View File

@ -2,8 +2,10 @@ package util
import (
"fmt"
"log"
"math/rand"
"sync"
"time"
"github.com/gin-gonic/gin"
)
@ -20,8 +22,11 @@ type ServerConfig struct {
WorkDir string `db:"work_dir"`
Ecs int `db:"ecs"`
Name string `db:"ServerName"`
AppId int `db:"AppId"`
}
var serverList map[int][]ServerConfig
func LoginResponse(c *gin.Context, AppId, AreaCode int, Version string) {
Uid := c.Query("uid")
Port, Host, ServerId := GetUserInfo(AppId, AreaCode, Uid, Version)
@ -29,10 +34,11 @@ func LoginResponse(c *gin.Context, AppId, AreaCode int, Version string) {
}
func GetUserInfo(AppId, AreaCode int, Uid, Version string) (int, string, int) {
now := time.Now().UnixMilli()
lockUid(Uid)
defer unlockUid(Uid)
Db := MPool.GetGameDB()
defer Db.Close()
// Db := MPool.GetGameDB()
// defer Db.Close()
var ServerId int
var Host string
var Port int
@ -41,17 +47,21 @@ func GetUserInfo(AppId, AreaCode int, Uid, Version string) (int, string, int) {
if len(ServerList) == 0 || Uid == "" || Version == "" {
return 0, "", 0
}
appConf, _ := GetAppConfig(AppId)
PlayerDb := MPool.GetMysqlDB(appConf, 1)
defer PlayerDb.Close()
var node int
err := PlayerDb.QueryRow("SELECT node FROM t_player_baseinfo WHERE user_name = ?", Uid).Scan(&node)
// appConf, _ := GetAppConfig(AppId)
// PlayerDb := MPool.GetMysqlDB(appConf, 1)
// defer PlayerDb.Close()
// var node int
// err := PlayerDb.QueryRow("SELECT node FROM t_player_baseinfo WHERE user_name = ?", Uid).Scan(&node)
// if err != nil {
// fmt.Printf("GetUserInfo query node error: %v\n", err)
// }
node, err := GetUserConnectNode(Uid)
if err != nil {
fmt.Printf("GetUserInfo query node error: %v\n", err)
node = 0
}
if node > 0 {
for _, server := range ServerList {
if server.ServerId == node && server.Status == 1 {
if server.ServerId == node && server.Status == 1 && Version == server.Version {
ServerId = server.ServerId
Host = server.Host
Port = server.Port
@ -59,18 +69,17 @@ func GetUserInfo(AppId, AreaCode int, Uid, Version string) (int, string, int) {
}
}
}
new_server_list := make([]ServerConfig, 0)
// 先尝试找版本号完全匹配且未满的服务器
for _, server := range ServerList {
if server.Status != 1 {
continue
}
if Version != "" && server.Version == Version {
new_server_list = append(new_server_list, server)
}
}
if ServerId == 0 {
new_server_list := make([]ServerConfig, 0)
// 先尝试找版本号完全匹配且未满的服务器
for _, server := range ServerList {
if server.Status != 1 {
continue
}
if Version != "" && server.Version == Version {
new_server_list = append(new_server_list, server)
}
}
// 过滤出未满员且状态正常的候选服务器
// 加权随机选择
if len(new_server_list) > 0 {
@ -114,8 +123,9 @@ func GetUserInfo(AppId, AreaCode int, Uid, Version string) (int, string, int) {
}
// 更新玩家所在节点
if err == nil {
PlayerDb.Exec("update t_player_baseinfo set node = ? where user_name =?", ServerId, Uid)
SaveUserConnectNode(Uid, ServerId)
}
log.Printf("GetUserInfo selected server %d for uid %s, cost time %dms\n", ServerId, Uid, time.Now().UnixMilli()-now)
return Port, Host, ServerId
}
@ -169,24 +179,24 @@ func unlockUid(uid string) {
ref.mu.Unlock()
}
func GetServerInfo(AppId, AreaCode int) []ServerConfig {
Db := MPool.GetGameDB()
defer Db.Close()
rows, err := Db.Query("SELECT ServerId, Status, Host, Port, MaxOnline, Online, version, weight FROM server WHERE AppId = ? AND Status = 1 AND node_type = 1 ORDER BY ServerId", AppId)
if err != nil {
return nil
}
defer rows.Close()
var servers []ServerConfig
for rows.Next() {
var server ServerConfig
if err := rows.Scan(&server.ServerId, &server.Status, &server.Host, &server.Port, &server.MaxOnline, &server.Online, &server.Version, &server.Weight); err != nil {
continue
}
servers = append(servers, server)
}
return servers
}
// func GetServerInfo(AppId, AreaCode int) []ServerConfig {
// Db := MPool.GetGameDB()
// defer Db.Close()
// rows, err := Db.Query("SELECT ServerId, Status, Host, Port, MaxOnline, Online, version, weight FROM server WHERE AppId = ? AND Status = 1 AND node_type = 1 ORDER BY ServerId", AppId)
// if err != nil {
// return nil
// }
// defer rows.Close()
// var servers []ServerConfig
// for rows.Next() {
// var server ServerConfig
// if err := rows.Scan(&server.ServerId, &server.Status, &server.Host, &server.Port, &server.MaxOnline, &server.Online, &server.Version, &server.Weight); err != nil {
// continue
// }
// servers = append(servers, server)
// }
// return servers
// }
func GetServer(AppId, ServerId int) ServerConfig {
Db := MPool.GetGameDB()
@ -226,3 +236,38 @@ func abs(x int) int {
}
return x
}
func GetServerInfo(AppId, AreaCode int) []ServerConfig {
if serverList != nil {
return serverList[AppId]
}
return []ServerConfig{}
}
func MonitorServerList() {
_initServerList()
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
_initServerList()
}
}
func _initServerList() {
Db := MPool.GetGameDB()
defer Db.Close()
rows, err := Db.Query("SELECT AppId, ServerId, Status, Host, Port, MaxOnline, Online, version, weight FROM server WHERE Status = 1 AND node_type = 1 ORDER BY ServerId")
if err != nil {
return
}
defer rows.Close()
tempServerList := make(map[int][]ServerConfig)
for rows.Next() {
var server ServerConfig
if err := rows.Scan(&server.AppId, &server.ServerId, &server.Status, &server.Host, &server.Port, &server.MaxOnline, &server.Online, &server.Version, &server.Weight); err != nil {
continue
}
tempServerList[server.AppId] = append(tempServerList[server.AppId], server)
}
serverList = tempServerList
}

View File

@ -116,11 +116,11 @@ func PackMsg(m proto.Message) []byte {
return append([]byte{0, 2}, buf...)
}
func UnpackMsg(buf []byte, n int) ([]byte, error) {
func UnpackMsg(buf []byte, n int) (string, error) {
res := &msg.AdminRes{}
err := proto.Unmarshal(buf[2:n], res)
if err != nil {
return nil, err
return "", err
}
return res.Info, nil
}
@ -230,7 +230,7 @@ func SendAdminMsg(ws *websocket.Conn, req proto.Message) (map[string]interface{}
return nil, fmt.Errorf("failed to unpack message: %v", err)
}
r := make(map[string]interface{})
err = json.Unmarshal(resBuf, &r)
err = json.Unmarshal([]byte(resBuf), &r)
if err != nil {
log.Printf("Failed to unmarshal response: %v, %s", err, resBuf)
return nil, fmt.Errorf("failed to unmarshal response: %v, %s", err, resBuf)

View File

@ -1,7 +1,6 @@
package util
import (
"backend/common"
"fmt"
"net"
"time"
@ -10,18 +9,10 @@ import (
)
func GetWebsocket(AppId, ServerId int) (*websocket.Conn, error) {
App, err := GetAppConfig(AppId)
if err != nil {
return nil, err
}
Server, err := common.GetServerConfig(App.NodeName)
if err != nil {
return nil, err
}
ServerConfig, _ := GetServerConfig(AppId, ServerId)
origin := "http://localhost/"
if AppId == 0 {
Server.Host = "google.bywaystudios.com"
ServerConfig.Host = "google.bywaystudios.com"
}
url := fmt.Sprintf("ws://%s:%d/", ServerConfig.Host, ServerConfig.WsPort)