kafka优化

This commit is contained in:
hahwu 2025-01-22 16:37:49 +08:00
parent a65662b22c
commit e6fa631b27
10 changed files with 245 additions and 37 deletions

View File

@ -4,15 +4,19 @@ db:
password: "o0WEwc46C5mo1qhemU9cFTTIaMMHsoarsIdVhi6vyA=="
port: 3306
games:
- name: "Merge_Pet"
topic: "Merge_Pet"
- name: "merge_pet_sdk"
topic: "merge_pet_sdk"
partition: 0
db_name: "Merge_Pet_Local"
- name: "Merge_Pet_Test"
topic: "Merge_Pet_Test"
db_name: "merge_pet"
- name: "pet_home"
topic: "pet_home"
partition: 0
db_name: "Merge_Pet_Test"
db_name: "pet_home"
- name: "Merge_Pet_online"
topic: "Merge_Pet_online"
partition: 0
db_name: "Merge_Pet_online"
db_name: "Merge_Pet_online"
- name: "merge_pet_london"
topic: "merge_pet_london"
partition: 0
db_name: "merge_pet_london"

View File

@ -53,6 +53,7 @@ func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
default:
err = event(SqlDb, m)
}
if err != nil {
tx.Rollback()
return err
@ -66,9 +67,8 @@ func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
func login(SqlDb *sqlx.DB, m kafka.Message) error {
var V map[string]interface{}
json.Unmarshal(m.Value, &V)
sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`) VALUES (?, ?, ?)"
_, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"])
sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?)"
_, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"], V["AppId"], V["ServerId"])
return err
}
@ -79,8 +79,8 @@ func event(SqlDb *sqlx.DB, m kafka.Message) error {
if err != nil {
return err
}
sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`) VALUES (?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"])
sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"], V["AppId"], V["ServerId"])
return err
}
@ -93,7 +93,7 @@ func pay(SqlDb *sqlx.DB, m kafka.Message) error {
return err
}
Uid := util.Int(V["Uid"])
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, Uid, Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"], Info["AppId"], Info["ServerId"])
return err
}

View File

@ -6,6 +6,7 @@ require (
github.com/go-sql-driver/mysql v1.8.1
github.com/jmoiron/sqlx v1.4.0
github.com/segmentio/kafka-go v0.4.47
golang.org/x/crypto v0.32.0
gopkg.in/yaml.v2 v2.4.0
)
@ -15,5 +16,5 @@ require (
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/stretchr/testify v1.9.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
golang.org/x/sys v0.29.0 // indirect
)

View File

@ -37,6 +37,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -58,11 +60,15 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U=
golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg=
golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=

View File

@ -7,9 +7,7 @@ import (
"kafka-comsumer/statistics"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/segmentio/kafka-go"
@ -26,25 +24,26 @@ var d = make(chan int, 1)
func main() {
// 打开日志文件
logFile, err := os.OpenFile("./log/kafka-comsumer.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logFile.Close()
// logFile, err := os.OpenFile("./log/kafka-comsumer.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
// if err != nil {
// log.Fatalf("Failed to open log file: %v", err)
// }
// defer logFile.Close()
// 将日志输出重定向到文件
log.SetOutput(logFile)
log.SetFlags(log.LstdFlags | log.Lshortfile)
Games := config.GetGames()
for _, game := range Games {
go comsumer(game)
}
go scheduleDailyTask()
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
sig := <-c
d <- 1
log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String())
Wait.Wait()
// // 将日志输出重定向到文件
// log.SetOutput(logFile)
// log.SetFlags(log.LstdFlags | log.Lshortfile)
// Games := config.GetGames()
// for _, game := range Games {
// go comsumer(game)
// }
// go scheduleDailyTask()
// signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
// sig := <-c
// d <- 1
// log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String())
// Wait.Wait()
statistics.Statistics()
}
func scheduleDailyTask() {
@ -100,7 +99,7 @@ func comsumer(Game *config.Game) {
Wait.Add(1)
err = db.ProcessMsg(sqlDb, m)
if err != nil {
log.Printf("comsumer %s error :%v", Game.Name, err)
log.Printf("comsumer m.key : %s %s error :%v", m.Key, Game.Name, err)
}
Wait.Done()
}

Binary file not shown.

2
kafka/release/pack.sh Normal file
View File

@ -0,0 +1,2 @@
cd /data/devops/kafka
GOOS=linux GOARCH=amd64 /usr/local/go/bin/go build -o /data/devops/kafka/release/kafka_comsumer main.go

60
kafka/sql/kafkalog.sql Normal file
View File

@ -0,0 +1,60 @@
/*==============================================================*/
/* Database name: sg_gamedb */
/* DBMS name: MySQL 5.5.17 */
/* Created on: 2014-10-16 10:00:00 */
/*==============================================================*/
create database if not exists merge_pet_london CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
use merge_pet_london;
CREATE TABLE IF NOT EXISTS `log_var` (
`Key` varchar(128) DEFAULT '' COMMENT '',
`Value` varchar(128) DEFAULT '' COMMENT '',
`Timestamp` int DEFAULT 0 COMMENT '时间',
PRIMARY KEY (`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='登录日志';
CREATE TABLE IF NOT EXISTS `log_login` (
`id` int unsigned AUTO_INCREMENT COMMENT '自增id',
`Uid` int unsigned NOT NULL COMMENT '玩家id',
`AppId` int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId',
`ServerId` int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId',
`Event` varchar(128) DEFAULT '' COMMENT '事件',
`Timestamp` int DEFAULT 0 COMMENT '时间',
PRIMARY KEY (`id`),
KEY `Event` (`Event`),
KEY `Uid` (`Uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='登录日志';
CREATE TABLE IF NOT EXISTS `log_event` (
`id` int unsigned AUTO_INCREMENT COMMENT '自增id',
`Uid` int unsigned NOT NULL COMMENT '玩家id',
`AppId` int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId',
`ServerId` int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId',
`Event` varchar(128) DEFAULT '' COMMENT '事件',
`Param` text COMMENT '参数',
`Timestamp` int DEFAULT 0 COMMENT '时间',
PRIMARY KEY (`id`),
KEY `Event` (`Event`),
KEY `Uid` (`Uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='操作事件';
CREATE TABLE IF NOT EXISTS `log_order` (
`id` int unsigned AUTO_INCREMENT COMMENT '自增id',
`Uid` int unsigned NOT NULL COMMENT '玩家id',
`AppId` int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId',
`ServerId` int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId',
`OrderId` varchar(128) DEFAULT '' COMMENT '订单号',
`Price` float NOT NULL DEFAULT '0' COMMENT '价格',
`PayChannelOrderId` varchar(128) DEFAULT '' COMMENT '支付渠道订单号',
`ProductId` int unsigned NOT NULL DEFAULT '0' COMMENT '商品id',
`CreateTime` int unsigned NOT NULL DEFAULT '0' COMMENT '创建时间',
`PayTime` int unsigned NOT NULL DEFAULT '0' COMMENT '支付时间',
`PayType` int unsigned NOT NULL DEFAULT '0' COMMENT '支付类型',
`Param` text COMMENT '参数',
`Timestamp` int DEFAULT 0 COMMENT '时间',
PRIMARY KEY (`id`),
KEY `OrderId` (`OrderId`),
KEY `Uid` (`Uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单';

23
kafka/sql/statistics.sql Normal file
View File

@ -0,0 +1,23 @@
/*==============================================================*/
/* Database name: sg_gamedb */
/* DBMS name: MySQL 5.5.17 */
/* Created on: 2014-10-16 10:00:00 */
/*==============================================================*/
create database if not exists statistics CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
use statistics;
CREATE TABLE IF NOT EXISTS `remain` (
id int unsigned AUTO_INCREMENT COMMENT '自增id',
Date date NOT NULL COMMENT '日期',
AppId int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId',
ServerId int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId',
Register int unsigned NOT NULL DEFAULT '0' COMMENT '注册',
SecondRemain int unsigned NOT NULL DEFAULT '0' COMMENT '次日留存',
ThirdRemain int unsigned NOT NULL DEFAULT '0' COMMENT '三日留存',
SeventhRemain int unsigned NOT NULL DEFAULT '0' COMMENT '七日留存',
FourteenthRemain int unsigned NOT NULL DEFAULT '0' COMMENT '十四日留存',
ThirtiethRemain int unsigned NOT NULL DEFAULT '0' COMMENT '三十日留存',
PRIMARY KEY (id),
KEY `Date` (`Date`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='留存';

View File

@ -1,7 +1,120 @@
package statistics
import "log"
import (
"context"
"fmt"
"log"
"net"
"time"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"golang.org/x/crypto/ssh"
)
func Statistics() {
log.Print("statistics")
for i := 0; i <= 30; i++ {
remain(-i)
}
}
func remain(Day int) {
now := time.Now()
now = now.AddDate(0, 0, Day)
Date := now.Format("2006-01-02")
SecondRemain := DayRemain(now, -1) // 计算次留
ThirdRemain := DayRemain(now, -2) // 计算三留
SeventhRemain := DayRemain(now, -6) // 计算七留
FourteenthRemain := DayRemain(now, -13) // 计算14留
ThirtiethRemain := DayRemain(now, -29) // 计算30留
Register := DayRemain(now, 0) // 计算注册
Db, err := getDb("statistics")
if err != nil {
log.Fatalf("failed to get db: %v", err)
}
defer Db.Close()
_, err = Db.Exec("INSERT INTO `remain` (`Date`, `SecondRemain`, `ThirdRemain`, `SeventhRemain`, `FourteenthRemain`, `ThirtiethRemain`, `Register`) VALUES (?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE SecondRemain = ?, ThirdRemain = ?, SeventhRemain = ?, FourteenthRemain = ?, ThirtiethRemain = ?, Register = ?", Date, SecondRemain, ThirdRemain, SeventhRemain, FourteenthRemain, ThirtiethRemain, Register, SecondRemain, ThirdRemain, SeventhRemain, FourteenthRemain, ThirtiethRemain, Register)
if err != nil {
log.Printf("failed to insert data: %v", err)
}
}
func DayRemain(now time.Time, Day int) int {
// now := time.Now()
todayMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Unix()
SecondRemainDate := now.AddDate(0, 0, Day).Format("2006-01-02")
parsedTime, err := time.Parse("2006-01-02", SecondRemainDate)
if err != nil {
log.Fatalf("failed to parse date: %v", err)
}
Timestamp := parsedTime.Unix()
log.Print("secondRemain:", Timestamp)
db, err := getDb("pet_home")
if err != nil {
log.Fatalf("failed to get db: %v", err)
}
defer db.Close()
type data struct {
Uid int `db:"Uid"`
}
dataList := []data{}
err = db.Select(&dataList, "SELECT Uid FROM log_login WHERE Event = 'register' and Timestamp >= ? and Timestamp <= ?", Timestamp, Timestamp+86399)
if err != nil {
log.Fatalf("failed to select data: %v", err)
}
if Day == 0 {
return len(dataList)
}
if len(dataList) == 0 {
return 0
}
UidStr := ""
for _, v := range dataList {
UidStr += fmt.Sprintf("%d,", v.Uid)
}
if len(UidStr) > 0 {
UidStr = UidStr[:len(UidStr)-1]
}
DataList2 := []data{}
Sql := fmt.Sprintf("SELECT `Uid` FROM `log_login` WHERE `Event` = 'Login_log' and `Uid` in ( %s ) and `Timestamp` >= ? group by `Uid`", UidStr)
err = db.Select(&DataList2, Sql, todayMidnight)
if err != nil {
log.Fatalf("failed to select data: %v", err)
}
return len(DataList2)
}
func getDb(DbName string) (*sqlx.DB, error) {
sshConfig := &ssh.ClientConfig{
User: "root",
Auth: []ssh.AuthMethod{
ssh.Password("-xLX]p!PQ1@SHm`A"),
},
HostKeyCallback: ssh.InsecureIgnoreHostKey(),
}
// 连接到 SSH 服务器
sshConn, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", "8.155.14.94", 22), sshConfig)
if err != nil {
return nil, fmt.Errorf("failed to dial SSH: %v", err)
}
// 创建到 MySQL 服务器的隧道
mysqlConn, err := sshConn.Dial("tcp", fmt.Sprintf("%s:%d", "rm-f8zd2030feam53n43.mysql.rds.aliyuncs.com", 3306))
if err != nil {
return nil, fmt.Errorf("failed to dial MySQL: %v", err)
}
// 注册 MySQL 驱动
mysql.RegisterDialContext("mysql+tcp", func(ctx context.Context, addr string) (net.Conn, error) {
return mysqlConn, nil
})
// 连接到 MySQL 数据库
dsn := fmt.Sprintf("%s:%s@mysql+tcp(%s:%d)/%s", "root", "Z4rf7eZZe500dxa", "rm-f8zd2030feam53n43.mysql.rds.aliyuncs.com", 3306, DbName)
db, err := sqlx.Open("mysql", dsn)
if err != nil {
return nil, fmt.Errorf("failed to open MySQL: %v", err)
}
return db, nil
}