devops/kafka/db/db.go
2025-12-12 11:40:38 +08:00

115 lines
3.2 KiB
Go

package db
import (
"encoding/json"
"kafka-comsumer/config"
"kafka-comsumer/util"
"log"
"strconv"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/segmentio/kafka-go"
)
var Container = map[string]*sqlx.DB{}
func GetContainer(Key string) *sqlx.DB {
v, ok := Container[Key]
if !ok {
DbConf := config.GetDbConfig(Key)
NewDbPass, _ := util.Decrypt(DbConf.Password, util.SECRET_KEY)
db, err := sqlx.Connect("mysql", DbConf.User+":"+NewDbPass+"@tcp("+DbConf.Host+":"+strconv.Itoa(DbConf.Port)+")/"+DbConf.DbNamee)
if err != nil {
return nil
}
Container[Key] = db
return db
}
return v
}
func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
defer func() {
if err := recover(); err != nil {
log.Println("panic recover! err: ", err)
}
}()
if SqlDb == nil {
return nil
}
// fmt.Printf("Message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
// tx := SqlDb.MustBegin()
//var err error
switch string(m.Key) {
case "Login_log":
go login(SqlDb, m)
case "Login_Out":
go login(SqlDb, m)
case "register":
go login(SqlDb, m)
case "pay":
go pay(SqlDb, m)
default:
go event(SqlDb, m)
}
offset(SqlDb, m)
// if err != nil {
// tx.Rollback()
// return err
// }
// tx.Commit()
// fmt.Println("msg process success")
return nil
}
func login(SqlDb *sqlx.DB, m kafka.Message) error {
var V map[string]interface{}
json.Unmarshal(m.Value, &V)
sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?)"
_, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"], V["AppId"], V["ServerId"])
if err != nil {
log.Printf("sql login err:%v", err)
}
return err
}
func event(SqlDb *sqlx.DB, m kafka.Message) error {
var V map[string]interface{}
json.Unmarshal(m.Value, &V)
Param, err := json.Marshal(V["Param"])
if err != nil {
return err
}
sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"], V["AppId"], V["ServerId"])
if err != nil {
log.Printf("sql event err:%v", err)
}
return err
}
func pay(SqlDb *sqlx.DB, m kafka.Message) error {
var V map[string]interface{}
json.Unmarshal(m.Value, &V)
Param, err := json.Marshal(V["Param"])
Info := V["Param"].(map[string]interface{})
if err != nil {
return err
}
Uid := util.Int(V["Uid"])
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, Uid, Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"], Info["AppId"], Info["ServerId"])
if err != nil {
log.Printf("sql pay err:%v", err)
}
return err
}
func offset(SqlDb *sqlx.DB, m kafka.Message) error {
sql := " INSERT INTO `log_var` (`Key` , `Value`, `Timestamp`) Values (?,?,?) ON DUPLICATE KEY UPDATE `key` = ? , `Value` = ? ,`Timestamp`=?"
Value := strconv.FormatInt(m.Offset+1, 10)
Timestamp := util.GetNowTime()
_, err := SqlDb.Exec(sql, "offset", Value, Timestamp, "offset", Value, Timestamp)
return err
}