diff --git a/kafka/db/db.go b/kafka/db/db.go index f4f6c24..9528fc1 100644 --- a/kafka/db/db.go +++ b/kafka/db/db.go @@ -4,6 +4,7 @@ import ( "encoding/json" "kafka-comsumer/config" "kafka-comsumer/util" + "log" "strconv" _ "github.com/go-sql-driver/mysql" @@ -29,6 +30,11 @@ func GetContainer(Key string) *sqlx.DB { } func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error { + defer func() { + if err := recover(); err != nil { + log.Println("panic recover! err: ", err) + } + }() if SqlDb == nil { return nil } @@ -40,6 +46,8 @@ func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error { err = login(SqlDb, m) case "Login_Out": err = login(SqlDb, m) + case "register": + err = login(SqlDb, m) case "pay": err = pay(SqlDb, m) default: @@ -84,8 +92,9 @@ func pay(SqlDb *sqlx.DB, m kafka.Message) error { if err != nil { return err } - sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" - _, err = SqlDb.Exec(sql, V["Uid"], Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"]) + Uid := util.Int(V["Uid"]) + sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + _, err = SqlDb.Exec(sql, Uid, Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"], Info["AppId"], Info["ServerId"]) return err } diff --git a/kafka/main.go b/kafka/main.go index 859c46b..5da7c20 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -4,11 +4,13 @@ import ( "context" "kafka-comsumer/config" "kafka-comsumer/db" + "kafka-comsumer/statistics" "log" "os" "os/signal" "sync" "syscall" + "time" "github.com/segmentio/kafka-go" ) @@ -37,6 +39,7 @@ func main() { for _, game := range Games { go comsumer(game) } + go scheduleDailyTask() signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) sig := <-c d <- 1 @@ -44,6 +47,18 @@ func main() { Wait.Wait() } +func scheduleDailyTask() { + for { + now := time.Now() + next := now.AddDate(0, 0, 1).Truncate(24 * time.Hour) + duration := next.Sub(now) + time.Sleep(duration) + + // 执行统计函数 + statistics.Statistics() + } +} + func comsumer(Game *config.Game) { sqlDb := db.GetContainer(Game.Name) if sqlDb == nil { diff --git a/kafka/release/kafka_comsumer b/kafka/release/kafka_comsumer index fad710b..fab4717 100644 Binary files a/kafka/release/kafka_comsumer and b/kafka/release/kafka_comsumer differ diff --git a/kafka/statistics/statistics.go b/kafka/statistics/statistics.go new file mode 100644 index 0000000..9e10c05 --- /dev/null +++ b/kafka/statistics/statistics.go @@ -0,0 +1,7 @@ +package statistics + +import "log" + +func Statistics() { + log.Print("statistics") +} diff --git a/kafka/util/util.go b/kafka/util/util.go index 9274e9f..a9c0396 100644 --- a/kafka/util/util.go +++ b/kafka/util/util.go @@ -7,6 +7,7 @@ import ( "encoding/base64" "fmt" "io" + "strconv" "time" ) @@ -62,3 +63,26 @@ func GetNowTime() int64 { T := time.Now() return T.Unix() } + +func Int(a interface{}) int { + if a == nil { + return 0 + } + switch v := a.(type) { + case int: + return v + case int32: + return int(v) + case int64: + return int(v) + case float64: + return int(v) + case string: + r, err := strconv.Atoi(v) + if err != nil { + return 0 + } + return r + } + return 0 +} diff --git a/playbook/goleaf-com.yml b/playbook/goleaf-com.yml index ff3f5fe..e9b5867 100644 --- a/playbook/goleaf-com.yml +++ b/playbook/goleaf-com.yml @@ -1,6 +1,6 @@ --- - name: Deploy GoLeaf release - hosts: zone1 + hosts: google remote_user: root tasks: @@ -11,18 +11,18 @@ # dest: /usr/local # - name: tar # command: tar -xvf /usr/local/Goleaf.tar -C /usr/local/ - - name: sh pack.sh - shell: sh /data/devops/pack.sh - delegate_to: localhost + # - name: sh pack.sh + # shell: sh /data/devops/pack.sh + # delegate_to: localhost + # - name: copy goleaf + # copy: + # src: /data/devops/source/main + # dest: /usr/local/Goleaf - name: copy goleaf copy: - src: /data/devops/source/main - dest: /usr/local/Goleaf - - name: copy goleaf - copy: - src: /data/devops/tool/tool - dest: /usr/local/Goleaf/tool/ + src: /data/devops/MergePet/tool/tool + dest: /usr/local/game/tool/ # - name: copy config # copy: diff --git a/playbook/kafka.yml b/playbook/kafka.yml index d46339a..9426b60 100644 --- a/playbook/kafka.yml +++ b/playbook/kafka.yml @@ -4,6 +4,8 @@ remote_user: root tasks: + - name: pack + shell: cd /data/devops/kafka &&GOOS=linux GOARCH=amd64 /usr/local/go/bin/go build -o /data/devops/kafka/release/kafka_comsumer main.go - name: copy main copy: diff --git a/script/MergeData.xlsx b/script/MergeData.xlsx new file mode 100644 index 0000000..ce52c73 Binary files /dev/null and b/script/MergeData.xlsx differ diff --git a/script/dynamicLv.py b/script/dynamicLv.py new file mode 100644 index 0000000..9a58771 --- /dev/null +++ b/script/dynamicLv.py @@ -0,0 +1,133 @@ +import pandas as pd +import math + + +subEmitColor = ["Wood", "Clothes storage bag"] +file_path = './script/MergeData.xlsx' +df = pd.read_excel(file_path, engine='openpyxl') +df = df.drop(index=0) +df_emitter = df[(df['Type'] == 'Emitter') & (df['Emit_Product'].notnull())] +df_emitter = df_emitter.assign(Dynamic=df_emitter['Emit_Product']) + + + +# for index, row in df_emitter.iterrows(): +# print(f"Index: {index}") +# print(row['Emit_List']) +# a1 = row['Emit_List'].split(',') +# Product = row['Emit_Product'].split(',') +# main = row['Emit_Product'].split(',')[0] +# if len(a1) == 1: +# a2 = a1[0].split('=') +# color_values = df[df["Id"] == int(a2[0])]["Color"].values[0] +# if color_values in subEmitColor: +# print(f"Main: {main} - Sub: {color_values}") +# else: +# df_emitter.loc[index, 'Dynamic'] = f"{color_values}=0" +# continue +# d = {} +# for i in Product: +# d[i] = 0.0 +# for i in a1 : +# a2 = i.split('=') +# color = df[df["Id"] == int(a2[0])]["Color"].values[0] +# d[color] += float(a2[1])*(int(a2[0])%10) +# c = {} +# mainNum = d[main] +# for k, i in d.items(): +# if k == main: +# c[k] = 0 +# else: +# x = round(mainNum/i) +# c[k] = round(math.log(x, 2)) + +# print(df_emitter.head()) + +def getDynamicValue1(df, index): + row = df.loc[index] + d = {} + Emit_List = row['Emit_List'].split(',') + Product = row['Emit_Product'].split(',') + main = row['Emit_Product'].split(',')[0] + if len(Emit_List) == 1: + a2 = Emit_List[0].split('=') + color_values = df[df["Id"] == int(a2[0])]["Color"].values[0] + d[color_values] = 0 + return d + else: + for i in Product: + d[i] = 0.0 + for i in Emit_List : + a2 = i.split('=') + color = df[df["Id"] == int(a2[0])]["Color"].values[0] + d[color] += float(a2[1])*(int(a2[0])%10) + c = {} + mainNum = d[main] + for k, i in d.items(): + if k == main: + c[k] = 0 + else: + x = mainNum/i + c[k] = math.floor(math.log(x, 2)) + return c + +def getDynamicValueG(df, index): + row = df.loc[index] + d = {} + Emit_List = row['Emit_List'].split(',') + Product = row['Emit_Product'].split(',') + main = row['Emit_Product'].split(',')[0] + if len(Emit_List) == 1: + a2 = Emit_List[0].split('=') + color_values = df[df["Id"] == int(a2[0])]["Color"].values[0] + d[color_values] = 0 + return d + else: + for i in Product: + d[i] = 0.0 + for i in Emit_List : + a2 = i.split('=') + color = df[df["Id"] == int(a2[0])]["Color"].values[0] + if color == "Clothes storage bag": + d["Dress"] += float(a2[1])*(int(a2[0])%10) + else: + d[color] += float(a2[1])*(int(a2[0])%10) + c = {} + mainNum = d[main] + for k, i in d.items(): + if k == main: + c[k] = 0 + else: + x = mainNum/i + c[k] = math.floor(math.log(x, 2)) + return c +def getDynamicValue3(df, id): + return df[df['Id'] == id]['Dynamic'].values[0] + + +for index, row in df_emitter.iterrows(): + if row['Emit_ID'] == 'I': + d = getDynamicValueG(df, index) + else: + continue + # if row['Emit_ID'] == 'B': + # d = getDynamicValue1(df, index) + # if row['Emit_ID'] == 'C': + # d = getDynamicValue1(df, index) + # if row['Emit_ID'] == 'D': + # d = getDynamicValue1(df, index) + # if row['Emit_ID'] == 'E': + # d = getDynamicValue1(df, index) + # if row['Emit_ID'] == 'F': + # d = getDynamicValue1(df, index) + # if row['Emit_ID'] == 'G': + # d = getDynamicValue1(df, index) + # if row['Emit_ID'] == 'H': + # d = getDynamicValue1(df, index) + # if row['Emit_ID'] == 'I': + # d = getDynamicValue1(df, index) + str = "" + for i,j in d.items(): + str += f"{i}={j}," + str = str[:-1] + print(str) \ No newline at end of file diff --git a/script/sync_order.py b/script/sync_order.py new file mode 100644 index 0000000..ab6160b --- /dev/null +++ b/script/sync_order.py @@ -0,0 +1,64 @@ +import pymysql +from sshtunnel import SSHTunnelForwarder +from pymysql.converters import escape_string +from ruamel.yaml import YAML +import math + +# ===================config=================== + +server = SSHTunnelForwarder( + ssh_address_or_host=("47.254.83.25", 22), + ssh_username="root", + ssh_password="ByWayStudios01!", + remote_bind_address=("127.0.0.1",3306), + local_bind_address=('127.0.0.1',5143) +) +server.start() +db_host = server.local_bind_host +db_port = server.local_bind_port + +conn = pymysql.connect( + host=db_host, + port=db_port, + user="root", + password="Xijing1!", + database="Merge_Pet_1" +) +cursor = conn.cursor() + +# 查询所有记录 +cursor.execute("select * from t_player_charge where PayStatus >= 3") +results = cursor.fetchall() + + +server2 = SSHTunnelForwarder( + ssh_address_or_host=("8.155.14.94", 22), + ssh_username="root", + ssh_password="-xLX]p!PQ1@SHm`A", + remote_bind_address=("rm-f8zd2030feam53n43.mysql.rds.aliyuncs.com",3306), + local_bind_address=('127.0.0.1',5144) +) +server2.start() +db_host2 = server2.local_bind_host +db_port2 = server2.local_bind_port + +conn2 = pymysql.connect( + host=db_host2, + port=db_port2, + user="root", + password="Z4rf7eZZe500dxa", + database="merge_pet_online" +) +cursor2 = conn2.cursor() + +for row in results: + Uid = int(row[1]) # 将字符串转换为整数 + cursor2.execute("INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '', 0, 0, 1)", + (Uid, row[2], row[6], row[14], row[3], row[8], row[9], row[10])) + print(row) +# 提交更改 +conn2.commit() +cursor.close() +cursor2.close() +conn2.close() +conn.close() diff --git a/script/tools.ipynb b/script/tools.ipynb index a8c63d6..18be04c 100644 --- a/script/tools.ipynb +++ b/script/tools.ipynb @@ -68,14 +68,14 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "日期时间: 2024-12-26 20:39:44\n" + "日期时间: 2025-01-11 17:38:45\n" ] } ], @@ -83,7 +83,7 @@ "from datetime import datetime\n", "\n", "# 示例时间戳\n", - "timestamp = 1735216784\n", + "timestamp = 1736588325\n", "\n", "# 将时间戳转换为日期时间\n", "dt_object = datetime.fromtimestamp(timestamp)\n", diff --git a/source/readme.md b/source/readme.md new file mode 100644 index 0000000..db5ffcd --- /dev/null +++ b/source/readme.md @@ -0,0 +1,2 @@ +docker run --name redis --volume=/data --network=bridge --workdir=/data -p 6379:6379 --restart=always -d redis +docker run --name mysql --volume=/data --network=bridge -p 3306:3306 -e MYSQL_ROOT_PASSWORD=Z4rf7eZZe500dxa --restart=always -d mysql:8 \ No newline at end of file diff --git a/tool/main.go b/tool/main.go index c68833e..61a072d 100644 --- a/tool/main.go +++ b/tool/main.go @@ -1,6 +1,9 @@ package main import ( + "crypto/aes" + "crypto/cipher" + "encoding/base64" "encoding/json" "fmt" "log" @@ -16,7 +19,11 @@ import ( "gopkg.in/ini.v1" ) -// GOOS=linux GOARCH=amd64 go build -o /data/devops/Goleaf/tool/tool main.go +const ( + SECRET_KEY = ")VQbB(vpy=U(wcp)" +) + +// GOOS=linux GOARCH=amd64 go build -o /data/devops/MergePet/tool/tool main.go var help = ` Usage: app.ini [options] @@ -154,7 +161,7 @@ func install() error { // 创建数据库 dbUser := cfg.Section("mysql").Key("mysql_user").String() - dbPassword := cfg.Section("mysql").Key("mysql_password").String() + dbPassword, _ := Decrypt(cfg.Section("mysql").Key("mysql_password").String()) dbHost := cfg.Section("mysql").Key("mysql_host").String() dbPort := cfg.Section("mysql").Key("mysql_port").String() // log.Println("mysql", "-u"+dbUser, "-p"+dbPassword, "-h"+dbHost, "-P"+dbPort, "-e", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName)) @@ -483,3 +490,28 @@ func restart() error { } return nil } + +// 解密字符串 +func Decrypt(cipherText string) (string, error) { + cipherTextBytes, err := base64.URLEncoding.DecodeString(cipherText) + if err != nil { + return "", err + } + + block, err := aes.NewCipher([]byte(SECRET_KEY)) + if err != nil { + return "", err + } + + if len(cipherTextBytes) < aes.BlockSize { + return "", fmt.Errorf("cipherText too short") + } + + iv := cipherTextBytes[:aes.BlockSize] + cipherTextBytes = cipherTextBytes[aes.BlockSize:] + + stream := cipher.NewCFBDecrypter(block, iv) + stream.XORKeyStream(cipherTextBytes, cipherTextBytes) + + return string(cipherTextBytes), nil +}