115 lines
3.2 KiB
Go
115 lines
3.2 KiB
Go
package db
|
|
|
|
import (
|
|
"encoding/json"
|
|
"kafka-comsumer/config"
|
|
"kafka-comsumer/util"
|
|
"log"
|
|
"strconv"
|
|
|
|
_ "github.com/go-sql-driver/mysql"
|
|
"github.com/jmoiron/sqlx"
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
var Container = map[string]*sqlx.DB{}
|
|
|
|
func GetContainer(Key string) *sqlx.DB {
|
|
v, ok := Container[Key]
|
|
if !ok {
|
|
DbConf := config.GetDbConfig(Key)
|
|
NewDbPass, _ := util.Decrypt(DbConf.Password, util.SECRET_KEY)
|
|
db, err := sqlx.Connect("mysql", DbConf.User+":"+NewDbPass+"@tcp("+DbConf.Host+":"+strconv.Itoa(DbConf.Port)+")/"+DbConf.DbNamee)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
Container[Key] = db
|
|
return db
|
|
}
|
|
return v
|
|
}
|
|
|
|
func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Println("panic recover! err: ", err)
|
|
}
|
|
}()
|
|
if SqlDb == nil {
|
|
return nil
|
|
}
|
|
// fmt.Printf("Message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
|
|
// tx := SqlDb.MustBegin()
|
|
//var err error
|
|
switch string(m.Key) {
|
|
case "Login_log":
|
|
go login(SqlDb, m)
|
|
case "Login_Out":
|
|
go login(SqlDb, m)
|
|
case "register":
|
|
go login(SqlDb, m)
|
|
case "pay":
|
|
go pay(SqlDb, m)
|
|
default:
|
|
go event(SqlDb, m)
|
|
}
|
|
offset(SqlDb, m)
|
|
// if err != nil {
|
|
// tx.Rollback()
|
|
// return err
|
|
// }
|
|
// tx.Commit()
|
|
// fmt.Println("msg process success")
|
|
return nil
|
|
}
|
|
|
|
func login(SqlDb *sqlx.DB, m kafka.Message) error {
|
|
var V map[string]interface{}
|
|
json.Unmarshal(m.Value, &V)
|
|
sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?)"
|
|
_, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"], V["AppId"], V["ServerId"])
|
|
if err != nil {
|
|
log.Printf("sql login err:%v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func event(SqlDb *sqlx.DB, m kafka.Message) error {
|
|
var V map[string]interface{}
|
|
json.Unmarshal(m.Value, &V)
|
|
Param, err := json.Marshal(V["Param"])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?)"
|
|
_, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"], V["AppId"], V["ServerId"])
|
|
if err != nil {
|
|
log.Printf("sql event err:%v", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func pay(SqlDb *sqlx.DB, m kafka.Message) error {
|
|
var V map[string]interface{}
|
|
json.Unmarshal(m.Value, &V)
|
|
Param, err := json.Marshal(V["Param"])
|
|
Info := V["Param"].(map[string]interface{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
Uid := util.Int(V["Uid"])
|
|
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
|
|
_, err = SqlDb.Exec(sql, Uid, Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"], Info["AppId"], Info["ServerId"])
|
|
if err != nil {
|
|
log.Printf("sql pay err:%v", err)
|
|
}
|
|
return err
|
|
}
|
|
func offset(SqlDb *sqlx.DB, m kafka.Message) error {
|
|
sql := " INSERT INTO `log_var` (`Key` , `Value`, `Timestamp`) Values (?,?,?) ON DUPLICATE KEY UPDATE `key` = ? , `Value` = ? ,`Timestamp`=?"
|
|
Value := strconv.FormatInt(m.Offset+1, 10)
|
|
Timestamp := util.GetNowTime()
|
|
_, err := SqlDb.Exec(sql, "offset", Value, Timestamp, "offset", Value, Timestamp)
|
|
return err
|
|
}
|