版本更新

This commit is contained in:
hahwu 2025-01-15 11:49:26 +08:00
parent 12ea321ac3
commit c568d1bfcb
13 changed files with 305 additions and 17 deletions

View File

@ -4,6 +4,7 @@ import (
"encoding/json"
"kafka-comsumer/config"
"kafka-comsumer/util"
"log"
"strconv"
_ "github.com/go-sql-driver/mysql"
@ -29,6 +30,11 @@ func GetContainer(Key string) *sqlx.DB {
}
func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
defer func() {
if err := recover(); err != nil {
log.Println("panic recover! err: ", err)
}
}()
if SqlDb == nil {
return nil
}
@ -40,6 +46,8 @@ func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
err = login(SqlDb, m)
case "Login_Out":
err = login(SqlDb, m)
case "register":
err = login(SqlDb, m)
case "pay":
err = pay(SqlDb, m)
default:
@ -84,8 +92,9 @@ func pay(SqlDb *sqlx.DB, m kafka.Message) error {
if err != nil {
return err
}
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, V["Uid"], Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"])
Uid := util.Int(V["Uid"])
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, Uid, Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"], Info["AppId"], Info["ServerId"])
return err
}

View File

@ -4,11 +4,13 @@ import (
"context"
"kafka-comsumer/config"
"kafka-comsumer/db"
"kafka-comsumer/statistics"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
@ -37,6 +39,7 @@ func main() {
for _, game := range Games {
go comsumer(game)
}
go scheduleDailyTask()
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
sig := <-c
d <- 1
@ -44,6 +47,18 @@ func main() {
Wait.Wait()
}
func scheduleDailyTask() {
for {
now := time.Now()
next := now.AddDate(0, 0, 1).Truncate(24 * time.Hour)
duration := next.Sub(now)
time.Sleep(duration)
// 执行统计函数
statistics.Statistics()
}
}
func comsumer(Game *config.Game) {
sqlDb := db.GetContainer(Game.Name)
if sqlDb == nil {

Binary file not shown.

View File

@ -0,0 +1,7 @@
package statistics
import "log"
func Statistics() {
log.Print("statistics")
}

View File

@ -7,6 +7,7 @@ import (
"encoding/base64"
"fmt"
"io"
"strconv"
"time"
)
@ -62,3 +63,26 @@ func GetNowTime() int64 {
T := time.Now()
return T.Unix()
}
func Int(a interface{}) int {
if a == nil {
return 0
}
switch v := a.(type) {
case int:
return v
case int32:
return int(v)
case int64:
return int(v)
case float64:
return int(v)
case string:
r, err := strconv.Atoi(v)
if err != nil {
return 0
}
return r
}
return 0
}

View File

@ -1,6 +1,6 @@
---
- name: Deploy GoLeaf release
hosts: zone1
hosts: google
remote_user: root
tasks:
@ -11,18 +11,18 @@
# dest: /usr/local
# - name: tar
# command: tar -xvf /usr/local/Goleaf.tar -C /usr/local/
- name: sh pack.sh
shell: sh /data/devops/pack.sh
delegate_to: localhost
# - name: sh pack.sh
# shell: sh /data/devops/pack.sh
# delegate_to: localhost
# - name: copy goleaf
# copy:
# src: /data/devops/source/main
# dest: /usr/local/Goleaf
- name: copy goleaf
copy:
src: /data/devops/source/main
dest: /usr/local/Goleaf
- name: copy goleaf
copy:
src: /data/devops/tool/tool
dest: /usr/local/Goleaf/tool/
src: /data/devops/MergePet/tool/tool
dest: /usr/local/game/tool/
# - name: copy config
# copy:

View File

@ -4,6 +4,8 @@
remote_user: root
tasks:
- name: pack
shell: cd /data/devops/kafka &&GOOS=linux GOARCH=amd64 /usr/local/go/bin/go build -o /data/devops/kafka/release/kafka_comsumer main.go
- name: copy main
copy:

BIN
script/MergeData.xlsx Normal file

Binary file not shown.

133
script/dynamicLv.py Normal file
View File

@ -0,0 +1,133 @@
import pandas as pd
import math
subEmitColor = ["Wood", "Clothes storage bag"]
file_path = './script/MergeData.xlsx'
df = pd.read_excel(file_path, engine='openpyxl')
df = df.drop(index=0)
df_emitter = df[(df['Type'] == 'Emitter') & (df['Emit_Product'].notnull())]
df_emitter = df_emitter.assign(Dynamic=df_emitter['Emit_Product'])
# for index, row in df_emitter.iterrows():
# print(f"Index: {index}")
# print(row['Emit_List'])
# a1 = row['Emit_List'].split(',')
# Product = row['Emit_Product'].split(',')
# main = row['Emit_Product'].split(',')[0]
# if len(a1) == 1:
# a2 = a1[0].split('=')
# color_values = df[df["Id"] == int(a2[0])]["Color"].values[0]
# if color_values in subEmitColor:
# print(f"Main: {main} - Sub: {color_values}")
# else:
# df_emitter.loc[index, 'Dynamic'] = f"{color_values}=0"
# continue
# d = {}
# for i in Product:
# d[i] = 0.0
# for i in a1 :
# a2 = i.split('=')
# color = df[df["Id"] == int(a2[0])]["Color"].values[0]
# d[color] += float(a2[1])*(int(a2[0])%10)
# c = {}
# mainNum = d[main]
# for k, i in d.items():
# if k == main:
# c[k] = 0
# else:
# x = round(mainNum/i)
# c[k] = round(math.log(x, 2))
# print(df_emitter.head())
def getDynamicValue1(df, index):
row = df.loc[index]
d = {}
Emit_List = row['Emit_List'].split(',')
Product = row['Emit_Product'].split(',')
main = row['Emit_Product'].split(',')[0]
if len(Emit_List) == 1:
a2 = Emit_List[0].split('=')
color_values = df[df["Id"] == int(a2[0])]["Color"].values[0]
d[color_values] = 0
return d
else:
for i in Product:
d[i] = 0.0
for i in Emit_List :
a2 = i.split('=')
color = df[df["Id"] == int(a2[0])]["Color"].values[0]
d[color] += float(a2[1])*(int(a2[0])%10)
c = {}
mainNum = d[main]
for k, i in d.items():
if k == main:
c[k] = 0
else:
x = mainNum/i
c[k] = math.floor(math.log(x, 2))
return c
def getDynamicValueG(df, index):
row = df.loc[index]
d = {}
Emit_List = row['Emit_List'].split(',')
Product = row['Emit_Product'].split(',')
main = row['Emit_Product'].split(',')[0]
if len(Emit_List) == 1:
a2 = Emit_List[0].split('=')
color_values = df[df["Id"] == int(a2[0])]["Color"].values[0]
d[color_values] = 0
return d
else:
for i in Product:
d[i] = 0.0
for i in Emit_List :
a2 = i.split('=')
color = df[df["Id"] == int(a2[0])]["Color"].values[0]
if color == "Clothes storage bag":
d["Dress"] += float(a2[1])*(int(a2[0])%10)
else:
d[color] += float(a2[1])*(int(a2[0])%10)
c = {}
mainNum = d[main]
for k, i in d.items():
if k == main:
c[k] = 0
else:
x = mainNum/i
c[k] = math.floor(math.log(x, 2))
return c
def getDynamicValue3(df, id):
return df[df['Id'] == id]['Dynamic'].values[0]
for index, row in df_emitter.iterrows():
if row['Emit_ID'] == 'I':
d = getDynamicValueG(df, index)
else:
continue
# if row['Emit_ID'] == 'B':
# d = getDynamicValue1(df, index)
# if row['Emit_ID'] == 'C':
# d = getDynamicValue1(df, index)
# if row['Emit_ID'] == 'D':
# d = getDynamicValue1(df, index)
# if row['Emit_ID'] == 'E':
# d = getDynamicValue1(df, index)
# if row['Emit_ID'] == 'F':
# d = getDynamicValue1(df, index)
# if row['Emit_ID'] == 'G':
# d = getDynamicValue1(df, index)
# if row['Emit_ID'] == 'H':
# d = getDynamicValue1(df, index)
# if row['Emit_ID'] == 'I':
# d = getDynamicValue1(df, index)
str = ""
for i,j in d.items():
str += f"{i}={j},"
str = str[:-1]
print(str)

64
script/sync_order.py Normal file
View File

@ -0,0 +1,64 @@
import pymysql
from sshtunnel import SSHTunnelForwarder
from pymysql.converters import escape_string
from ruamel.yaml import YAML
import math
# ===================config===================
server = SSHTunnelForwarder(
ssh_address_or_host=("47.254.83.25", 22),
ssh_username="root",
ssh_password="ByWayStudios01!",
remote_bind_address=("127.0.0.1",3306),
local_bind_address=('127.0.0.1',5143)
)
server.start()
db_host = server.local_bind_host
db_port = server.local_bind_port
conn = pymysql.connect(
host=db_host,
port=db_port,
user="root",
password="Xijing1!",
database="Merge_Pet_1"
)
cursor = conn.cursor()
# 查询所有记录
cursor.execute("select * from t_player_charge where PayStatus >= 3")
results = cursor.fetchall()
server2 = SSHTunnelForwarder(
ssh_address_or_host=("8.155.14.94", 22),
ssh_username="root",
ssh_password="-xLX]p!PQ1@SHm`A",
remote_bind_address=("rm-f8zd2030feam53n43.mysql.rds.aliyuncs.com",3306),
local_bind_address=('127.0.0.1',5144)
)
server2.start()
db_host2 = server2.local_bind_host
db_port2 = server2.local_bind_port
conn2 = pymysql.connect(
host=db_host2,
port=db_port2,
user="root",
password="Z4rf7eZZe500dxa",
database="merge_pet_online"
)
cursor2 = conn2.cursor()
for row in results:
Uid = int(row[1]) # 将字符串转换为整数
cursor2.execute("INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, '', 0, 0, 1)",
(Uid, row[2], row[6], row[14], row[3], row[8], row[9], row[10]))
print(row)
# 提交更改
conn2.commit()
cursor.close()
cursor2.close()
conn2.close()
conn.close()

View File

@ -68,14 +68,14 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"日期时间: 2024-12-26 20:39:44\n"
"日期时间: 2025-01-11 17:38:45\n"
]
}
],
@ -83,7 +83,7 @@
"from datetime import datetime\n",
"\n",
"# 示例时间戳\n",
"timestamp = 1735216784\n",
"timestamp = 1736588325\n",
"\n",
"# 将时间戳转换为日期时间\n",
"dt_object = datetime.fromtimestamp(timestamp)\n",

2
source/readme.md Normal file
View File

@ -0,0 +1,2 @@
docker run --name redis --volume=/data --network=bridge --workdir=/data -p 6379:6379 --restart=always -d redis
docker run --name mysql --volume=/data --network=bridge -p 3306:3306 -e MYSQL_ROOT_PASSWORD=Z4rf7eZZe500dxa --restart=always -d mysql:8

View File

@ -1,6 +1,9 @@
package main
import (
"crypto/aes"
"crypto/cipher"
"encoding/base64"
"encoding/json"
"fmt"
"log"
@ -16,7 +19,11 @@ import (
"gopkg.in/ini.v1"
)
// GOOS=linux GOARCH=amd64 go build -o /data/devops/Goleaf/tool/tool main.go
const (
SECRET_KEY = ")VQbB(vpy=U(wcp)"
)
// GOOS=linux GOARCH=amd64 go build -o /data/devops/MergePet/tool/tool main.go
var help = `
Usage: app.ini [options]
@ -154,7 +161,7 @@ func install() error {
// 创建数据库
dbUser := cfg.Section("mysql").Key("mysql_user").String()
dbPassword := cfg.Section("mysql").Key("mysql_password").String()
dbPassword, _ := Decrypt(cfg.Section("mysql").Key("mysql_password").String())
dbHost := cfg.Section("mysql").Key("mysql_host").String()
dbPort := cfg.Section("mysql").Key("mysql_port").String()
// log.Println("mysql", "-u"+dbUser, "-p"+dbPassword, "-h"+dbHost, "-P"+dbPort, "-e", fmt.Sprintf("CREATE DATABASE IF NOT EXISTS %s", dbName))
@ -483,3 +490,28 @@ func restart() error {
}
return nil
}
// 解密字符串
func Decrypt(cipherText string) (string, error) {
cipherTextBytes, err := base64.URLEncoding.DecodeString(cipherText)
if err != nil {
return "", err
}
block, err := aes.NewCipher([]byte(SECRET_KEY))
if err != nil {
return "", err
}
if len(cipherTextBytes) < aes.BlockSize {
return "", fmt.Errorf("cipherText too short")
}
iv := cipherTextBytes[:aes.BlockSize]
cipherTextBytes = cipherTextBytes[aes.BlockSize:]
stream := cipher.NewCFBDecrypter(block, iv)
stream.XORKeyStream(cipherTextBytes, cipherTextBytes)
return string(cipherTextBytes), nil
}