diff --git a/kafka/conf/conf.yml b/kafka/conf/conf.yml index dba82db..7e9c1c9 100644 --- a/kafka/conf/conf.yml +++ b/kafka/conf/conf.yml @@ -4,15 +4,19 @@ db: password: "o0WEwc46C5mo1qhemU9cFTTIaMMHsoarsIdVhi6vyA==" port: 3306 games: - - name: "Merge_Pet" - topic: "Merge_Pet" + - name: "merge_pet_sdk" + topic: "merge_pet_sdk" partition: 0 - db_name: "Merge_Pet_Local" - - name: "Merge_Pet_Test" - topic: "Merge_Pet_Test" + db_name: "merge_pet" + - name: "pet_home" + topic: "pet_home" partition: 0 - db_name: "Merge_Pet_Test" + db_name: "pet_home" - name: "Merge_Pet_online" topic: "Merge_Pet_online" partition: 0 - db_name: "Merge_Pet_online" \ No newline at end of file + db_name: "Merge_Pet_online" + - name: "merge_pet_london" + topic: "merge_pet_london" + partition: 0 + db_name: "merge_pet_london" \ No newline at end of file diff --git a/kafka/db/db.go b/kafka/db/db.go index 9528fc1..b6e27c1 100644 --- a/kafka/db/db.go +++ b/kafka/db/db.go @@ -53,6 +53,7 @@ func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error { default: err = event(SqlDb, m) } + if err != nil { tx.Rollback() return err @@ -66,9 +67,8 @@ func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error { func login(SqlDb *sqlx.DB, m kafka.Message) error { var V map[string]interface{} json.Unmarshal(m.Value, &V) - sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`) VALUES (?, ?, ?)" - - _, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"]) + sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?)" + _, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"], V["AppId"], V["ServerId"]) return err } @@ -79,8 +79,8 @@ func event(SqlDb *sqlx.DB, m kafka.Message) error { if err != nil { return err } - sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`) VALUES (?, ?, ?, ?)" - _, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"]) + sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?)" + _, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"], V["AppId"], V["ServerId"]) return err } @@ -93,7 +93,7 @@ func pay(SqlDb *sqlx.DB, m kafka.Message) error { return err } Uid := util.Int(V["Uid"]) - sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" _, err = SqlDb.Exec(sql, Uid, Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"], Info["AppId"], Info["ServerId"]) return err } diff --git a/kafka/go.mod b/kafka/go.mod index 5935ca7..20f0db9 100644 --- a/kafka/go.mod +++ b/kafka/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-sql-driver/mysql v1.8.1 github.com/jmoiron/sqlx v1.4.0 github.com/segmentio/kafka-go v0.4.47 + golang.org/x/crypto v0.32.0 gopkg.in/yaml.v2 v2.4.0 ) @@ -15,5 +16,5 @@ require ( github.com/pierrec/lz4/v4 v4.1.21 // indirect github.com/stretchr/testify v1.9.0 // indirect golang.org/x/net v0.28.0 // indirect - golang.org/x/text v0.21.0 // indirect + golang.org/x/sys v0.29.0 // indirect ) diff --git a/kafka/go.sum b/kafka/go.sum index 35e231e..88d7c4b 100644 --- a/kafka/go.sum +++ b/kafka/go.sum @@ -37,6 +37,8 @@ github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5t golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -58,11 +60,15 @@ golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= +golang.org/x/term v0.28.0 h1:/Ts8HFuMR2E6IP/jlo7QVLZHggjKQbhu/7H0LJFr3Gg= +golang.org/x/term v0.28.0/go.mod h1:Sw/lC2IAUZ92udQNf3WodGtn4k/XoLyZoh8v/8uiwek= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= diff --git a/kafka/main.go b/kafka/main.go index 5da7c20..388b931 100644 --- a/kafka/main.go +++ b/kafka/main.go @@ -7,9 +7,7 @@ import ( "kafka-comsumer/statistics" "log" "os" - "os/signal" "sync" - "syscall" "time" "github.com/segmentio/kafka-go" @@ -26,25 +24,26 @@ var d = make(chan int, 1) func main() { // 打开日志文件 - logFile, err := os.OpenFile("./log/kafka-comsumer.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) - if err != nil { - log.Fatalf("Failed to open log file: %v", err) - } - defer logFile.Close() + // logFile, err := os.OpenFile("./log/kafka-comsumer.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) + // if err != nil { + // log.Fatalf("Failed to open log file: %v", err) + // } + // defer logFile.Close() - // 将日志输出重定向到文件 - log.SetOutput(logFile) - log.SetFlags(log.LstdFlags | log.Lshortfile) - Games := config.GetGames() - for _, game := range Games { - go comsumer(game) - } - go scheduleDailyTask() - signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) - sig := <-c - d <- 1 - log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String()) - Wait.Wait() + // // 将日志输出重定向到文件 + // log.SetOutput(logFile) + // log.SetFlags(log.LstdFlags | log.Lshortfile) + // Games := config.GetGames() + // for _, game := range Games { + // go comsumer(game) + // } + // go scheduleDailyTask() + // signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL) + // sig := <-c + // d <- 1 + // log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String()) + // Wait.Wait() + statistics.Statistics() } func scheduleDailyTask() { @@ -100,7 +99,7 @@ func comsumer(Game *config.Game) { Wait.Add(1) err = db.ProcessMsg(sqlDb, m) if err != nil { - log.Printf("comsumer %s error :%v", Game.Name, err) + log.Printf("comsumer m.key : %s %s error :%v", m.Key, Game.Name, err) } Wait.Done() } diff --git a/kafka/release/kafka_comsumer b/kafka/release/kafka_comsumer index fab4717..e5824c6 100644 Binary files a/kafka/release/kafka_comsumer and b/kafka/release/kafka_comsumer differ diff --git a/kafka/release/pack.sh b/kafka/release/pack.sh new file mode 100644 index 0000000..93449ea --- /dev/null +++ b/kafka/release/pack.sh @@ -0,0 +1,2 @@ +cd /data/devops/kafka +GOOS=linux GOARCH=amd64 /usr/local/go/bin/go build -o /data/devops/kafka/release/kafka_comsumer main.go \ No newline at end of file diff --git a/kafka/sql/kafkalog.sql b/kafka/sql/kafkalog.sql new file mode 100644 index 0000000..8f74338 --- /dev/null +++ b/kafka/sql/kafkalog.sql @@ -0,0 +1,60 @@ +/*==============================================================*/ +/* Database name: sg_gamedb */ +/* DBMS name: MySQL 5.5.17 */ +/* Created on: 2014-10-16 10:00:00 */ +/*==============================================================*/ + +create database if not exists merge_pet_london CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci; +use merge_pet_london; + + +CREATE TABLE IF NOT EXISTS `log_var` ( + `Key` varchar(128) DEFAULT '' COMMENT '键', + `Value` varchar(128) DEFAULT '' COMMENT '值', + `Timestamp` int DEFAULT 0 COMMENT '时间', + PRIMARY KEY (`key`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='登录日志'; + +CREATE TABLE IF NOT EXISTS `log_login` ( + `id` int unsigned AUTO_INCREMENT COMMENT '自增id', + `Uid` int unsigned NOT NULL COMMENT '玩家id', + `AppId` int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId', + `ServerId` int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId', + `Event` varchar(128) DEFAULT '' COMMENT '事件', + `Timestamp` int DEFAULT 0 COMMENT '时间', + PRIMARY KEY (`id`), + KEY `Event` (`Event`), + KEY `Uid` (`Uid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='登录日志'; + +CREATE TABLE IF NOT EXISTS `log_event` ( + `id` int unsigned AUTO_INCREMENT COMMENT '自增id', + `Uid` int unsigned NOT NULL COMMENT '玩家id', + `AppId` int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId', + `ServerId` int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId', + `Event` varchar(128) DEFAULT '' COMMENT '事件', + `Param` text COMMENT '参数', + `Timestamp` int DEFAULT 0 COMMENT '时间', + PRIMARY KEY (`id`), + KEY `Event` (`Event`), + KEY `Uid` (`Uid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='操作事件'; + +CREATE TABLE IF NOT EXISTS `log_order` ( + `id` int unsigned AUTO_INCREMENT COMMENT '自增id', + `Uid` int unsigned NOT NULL COMMENT '玩家id', + `AppId` int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId', + `ServerId` int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId', + `OrderId` varchar(128) DEFAULT '' COMMENT '订单号', + `Price` float NOT NULL DEFAULT '0' COMMENT '价格', + `PayChannelOrderId` varchar(128) DEFAULT '' COMMENT '支付渠道订单号', + `ProductId` int unsigned NOT NULL DEFAULT '0' COMMENT '商品id', + `CreateTime` int unsigned NOT NULL DEFAULT '0' COMMENT '创建时间', + `PayTime` int unsigned NOT NULL DEFAULT '0' COMMENT '支付时间', + `PayType` int unsigned NOT NULL DEFAULT '0' COMMENT '支付类型', + `Param` text COMMENT '参数', + `Timestamp` int DEFAULT 0 COMMENT '时间', + PRIMARY KEY (`id`), + KEY `OrderId` (`OrderId`), + KEY `Uid` (`Uid`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单'; \ No newline at end of file diff --git a/kafka/sql/statistics.sql b/kafka/sql/statistics.sql new file mode 100644 index 0000000..a04dfab --- /dev/null +++ b/kafka/sql/statistics.sql @@ -0,0 +1,23 @@ +/*==============================================================*/ +/* Database name: sg_gamedb */ +/* DBMS name: MySQL 5.5.17 */ +/* Created on: 2014-10-16 10:00:00 */ +/*==============================================================*/ + +create database if not exists statistics CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci; +use statistics; + +CREATE TABLE IF NOT EXISTS `remain` ( + id int unsigned AUTO_INCREMENT COMMENT '自增id', + Date date NOT NULL COMMENT '日期', + AppId int unsigned NOT NULL DEFAULT '0' COMMENT 'AppId', + ServerId int unsigned NOT NULL DEFAULT '0' COMMENT 'ServerId', + Register int unsigned NOT NULL DEFAULT '0' COMMENT '注册', + SecondRemain int unsigned NOT NULL DEFAULT '0' COMMENT '次日留存', + ThirdRemain int unsigned NOT NULL DEFAULT '0' COMMENT '三日留存', + SeventhRemain int unsigned NOT NULL DEFAULT '0' COMMENT '七日留存', + FourteenthRemain int unsigned NOT NULL DEFAULT '0' COMMENT '十四日留存', + ThirtiethRemain int unsigned NOT NULL DEFAULT '0' COMMENT '三十日留存', + PRIMARY KEY (id), + KEY `Date` (`Date`) +)ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='留存'; diff --git a/kafka/statistics/statistics.go b/kafka/statistics/statistics.go index 9e10c05..ca0e406 100644 --- a/kafka/statistics/statistics.go +++ b/kafka/statistics/statistics.go @@ -1,7 +1,120 @@ package statistics -import "log" +import ( + "context" + "fmt" + "log" + "net" + "time" + + "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" + "golang.org/x/crypto/ssh" +) func Statistics() { - log.Print("statistics") + for i := 0; i <= 30; i++ { + remain(-i) + } +} + +func remain(Day int) { + now := time.Now() + now = now.AddDate(0, 0, Day) + Date := now.Format("2006-01-02") + SecondRemain := DayRemain(now, -1) // 计算次留 + ThirdRemain := DayRemain(now, -2) // 计算三留 + SeventhRemain := DayRemain(now, -6) // 计算七留 + FourteenthRemain := DayRemain(now, -13) // 计算14留 + ThirtiethRemain := DayRemain(now, -29) // 计算30留 + Register := DayRemain(now, 0) // 计算注册 + Db, err := getDb("statistics") + if err != nil { + log.Fatalf("failed to get db: %v", err) + } + defer Db.Close() + _, err = Db.Exec("INSERT INTO `remain` (`Date`, `SecondRemain`, `ThirdRemain`, `SeventhRemain`, `FourteenthRemain`, `ThirtiethRemain`, `Register`) VALUES (?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE SecondRemain = ?, ThirdRemain = ?, SeventhRemain = ?, FourteenthRemain = ?, ThirtiethRemain = ?, Register = ?", Date, SecondRemain, ThirdRemain, SeventhRemain, FourteenthRemain, ThirtiethRemain, Register, SecondRemain, ThirdRemain, SeventhRemain, FourteenthRemain, ThirtiethRemain, Register) + if err != nil { + log.Printf("failed to insert data: %v", err) + } +} + +func DayRemain(now time.Time, Day int) int { + // now := time.Now() + todayMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()).Unix() + SecondRemainDate := now.AddDate(0, 0, Day).Format("2006-01-02") + parsedTime, err := time.Parse("2006-01-02", SecondRemainDate) + if err != nil { + log.Fatalf("failed to parse date: %v", err) + } + Timestamp := parsedTime.Unix() + log.Print("secondRemain:", Timestamp) + db, err := getDb("pet_home") + if err != nil { + log.Fatalf("failed to get db: %v", err) + } + defer db.Close() + type data struct { + Uid int `db:"Uid"` + } + dataList := []data{} + err = db.Select(&dataList, "SELECT Uid FROM log_login WHERE Event = 'register' and Timestamp >= ? and Timestamp <= ?", Timestamp, Timestamp+86399) + if err != nil { + log.Fatalf("failed to select data: %v", err) + } + if Day == 0 { + return len(dataList) + } + if len(dataList) == 0 { + return 0 + } + UidStr := "" + for _, v := range dataList { + UidStr += fmt.Sprintf("%d,", v.Uid) + } + if len(UidStr) > 0 { + UidStr = UidStr[:len(UidStr)-1] + } + DataList2 := []data{} + Sql := fmt.Sprintf("SELECT `Uid` FROM `log_login` WHERE `Event` = 'Login_log' and `Uid` in ( %s ) and `Timestamp` >= ? group by `Uid`", UidStr) + err = db.Select(&DataList2, Sql, todayMidnight) + if err != nil { + log.Fatalf("failed to select data: %v", err) + } + return len(DataList2) +} + +func getDb(DbName string) (*sqlx.DB, error) { + sshConfig := &ssh.ClientConfig{ + User: "root", + Auth: []ssh.AuthMethod{ + ssh.Password("-xLX]p!PQ1@SHm`A"), + }, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + } + + // 连接到 SSH 服务器 + sshConn, err := ssh.Dial("tcp", fmt.Sprintf("%s:%d", "8.155.14.94", 22), sshConfig) + if err != nil { + return nil, fmt.Errorf("failed to dial SSH: %v", err) + } + + // 创建到 MySQL 服务器的隧道 + mysqlConn, err := sshConn.Dial("tcp", fmt.Sprintf("%s:%d", "rm-f8zd2030feam53n43.mysql.rds.aliyuncs.com", 3306)) + if err != nil { + return nil, fmt.Errorf("failed to dial MySQL: %v", err) + } + + // 注册 MySQL 驱动 + mysql.RegisterDialContext("mysql+tcp", func(ctx context.Context, addr string) (net.Conn, error) { + return mysqlConn, nil + }) + // 连接到 MySQL 数据库 + dsn := fmt.Sprintf("%s:%s@mysql+tcp(%s:%d)/%s", "root", "Z4rf7eZZe500dxa", "rm-f8zd2030feam53n43.mysql.rds.aliyuncs.com", 3306, DbName) + db, err := sqlx.Open("mysql", dsn) + if err != nil { + return nil, fmt.Errorf("failed to open MySQL: %v", err) + } + + return db, nil }