devops/kafka/db/db.go
2025-01-08 15:48:40 +08:00

110 lines
3.0 KiB
Go

package db
import (
"encoding/json"
"kafka-comsumer/config"
"kafka-comsumer/util"
"strconv"
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
"github.com/segmentio/kafka-go"
)
var Container = map[string]*sqlx.DB{}
func GetContainer(Key string) *sqlx.DB {
v, ok := Container[Key]
if !ok {
DbConf := config.GetDbConfig(Key)
NewDbPass, _ := util.Decrypt(DbConf.Password, util.SECRET_KEY)
db, err := sqlx.Connect("mysql", DbConf.User+":"+NewDbPass+"@tcp("+DbConf.Host+":"+strconv.Itoa(DbConf.Port)+")/"+DbConf.DbNamee)
if err != nil {
return nil
}
Container[Key] = db
return db
}
return v
}
func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
if SqlDb == nil {
return nil
}
// fmt.Printf("Message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
tx := SqlDb.MustBegin()
var err error
switch string(m.Key) {
case "Login_log":
err = login(SqlDb, m)
case "Login_Out":
err = login(SqlDb, m)
case "pay":
err = pay(SqlDb, m)
default:
err = event(SqlDb, m)
}
if err != nil {
tx.Rollback()
return err
}
offset(SqlDb, m)
tx.Commit()
// fmt.Println("msg process success")
return nil
}
func login(SqlDb *sqlx.DB, m kafka.Message) error {
var V map[string]interface{}
json.Unmarshal(m.Value, &V)
sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`) VALUES (?, ?, ?)"
_, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"])
return err
}
func event(SqlDb *sqlx.DB, m kafka.Message) error {
var V map[string]interface{}
json.Unmarshal(m.Value, &V)
Param, err := json.Marshal(V["Param"])
if err != nil {
return err
}
sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`) VALUES (?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"])
return err
}
func pay(SqlDb *sqlx.DB, m kafka.Message) error {
var V map[string]interface{}
json.Unmarshal(m.Value, &V)
Param, err := json.Marshal(V["Param"])
Info := V["Param"].(map[string]interface{})
if err != nil {
return err
}
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, V["Uid"], Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"])
return err
}
func offset(SqlDb *sqlx.DB, m kafka.Message) error {
sql := " INSERT INTO `log_var` (`Key` , `Value`, `Timestamp`) Values (?,?,?) ON DUPLICATE KEY UPDATE `key` = ? , `Value` = ? ,`Timestamp`=?"
Value := strconv.FormatInt(m.Offset+1, 10)
Timestamp := util.GetNowTime()
_, err := SqlDb.Exec(sql, "offset", Value, Timestamp, "offset", Value, Timestamp)
return err
}
func GetOffset(SqlDb *sqlx.DB) int64 {
sql := "select `Value` from `log_var` where `Key` = 'offset'"
var offset string
err := SqlDb.Get(&offset, sql)
if offset == "" || err != nil {
return kafka.FirstOffset
}
v, _ := strconv.ParseInt(offset, 10, 64)
return v
}