kafka消费者优化

This commit is contained in:
hahwu 2025-12-04 12:16:39 +08:00
parent a90b658798
commit 82158467f9
5 changed files with 97 additions and 49 deletions

View File

@ -1,5 +1,5 @@
db:
host: "rm-f8zd2030feam53n43.mysql.rds.aliyuncs.com"
host: "rm-f8zd2030feam53n43io.mysql.rds.aliyuncs.com"
user: root
password: "o0WEwc46C5mo1qhemU9cFTTIaMMHsoarsIdVhi6vyA=="
port: 3306

View File

@ -39,27 +39,26 @@ func ProcessMsg(SqlDb *sqlx.DB, m kafka.Message) error {
return nil
}
// fmt.Printf("Message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
tx := SqlDb.MustBegin()
var err error
// tx := SqlDb.MustBegin()
//var err error
switch string(m.Key) {
case "Login_log":
err = login(SqlDb, m)
go login(SqlDb, m)
case "Login_Out":
err = login(SqlDb, m)
go login(SqlDb, m)
case "register":
err = login(SqlDb, m)
go login(SqlDb, m)
case "pay":
err = pay(SqlDb, m)
go pay(SqlDb, m)
default:
err = event(SqlDb, m)
go event(SqlDb, m)
}
if err != nil {
tx.Rollback()
return err
}
offset(SqlDb, m)
tx.Commit()
// if err != nil {
// tx.Rollback()
// return err
// }
// tx.Commit()
// fmt.Println("msg process success")
return nil
}
@ -69,6 +68,9 @@ func login(SqlDb *sqlx.DB, m kafka.Message) error {
json.Unmarshal(m.Value, &V)
sql := "INSERT INTO log_login (`Uid`, `Event`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?)"
_, err := SqlDb.Exec(sql, V["Uid"], m.Key, V["TimeStamp"], V["AppId"], V["ServerId"])
if err != nil {
log.Printf("sql login err:%v", err)
}
return err
}
@ -81,6 +83,9 @@ func event(SqlDb *sqlx.DB, m kafka.Message) error {
}
sql := "INSERT INTO log_event (`Uid`, `Event`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, V["Uid"], m.Key, string(Param), V["TimeStamp"], V["AppId"], V["ServerId"])
if err != nil {
log.Printf("sql event err:%v", err)
}
return err
}
@ -95,24 +100,8 @@ func pay(SqlDb *sqlx.DB, m kafka.Message) error {
Uid := util.Int(V["Uid"])
sql := "INSERT INTO log_order (`Uid`, `OrderId`, `Price`, `PayChannelOrderId`, `ProductId`, `CreateTime`, `PayTime`, `PayType`, `Param`, `Timestamp`, `AppId`, `ServerId`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
_, err = SqlDb.Exec(sql, Uid, Info["OrderId"], Info["Price"], Info["PayChannelOrderId"], Info["ProductId"], Info["CreateTime"], Info["PayTime"], Info["PayType"], string(Param), V["TimeStamp"], Info["AppId"], Info["ServerId"])
return err
}
func offset(SqlDb *sqlx.DB, m kafka.Message) error {
sql := " INSERT INTO `log_var` (`Key` , `Value`, `Timestamp`) Values (?,?,?) ON DUPLICATE KEY UPDATE `key` = ? , `Value` = ? ,`Timestamp`=?"
Value := strconv.FormatInt(m.Offset+1, 10)
Timestamp := util.GetNowTime()
_, err := SqlDb.Exec(sql, "offset", Value, Timestamp, "offset", Value, Timestamp)
return err
}
func GetOffset(SqlDb *sqlx.DB) int64 {
sql := "select `Value` from `log_var` where `Key` = 'offset'"
var offset string
err := SqlDb.Get(&offset, sql)
if offset == "" || err != nil {
return kafka.FirstOffset
if err != nil {
log.Printf("sql pay err:%v", err)
}
v, _ := strconv.ParseInt(offset, 10, 64)
return v
return err
}

View File

@ -13,8 +13,11 @@ require (
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible // indirect
github.com/lestrrat-go/strftime v1.1.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/stretchr/testify v1.9.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.10.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.29.0 // indirect
)

View File

@ -10,6 +10,10 @@ github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible h1:Y6sqxHMyB1D2YSzWkLibYKgg+SwmyFU9dF2hn6MdTj4=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
github.com/lestrrat-go/strftime v1.1.1 h1:zgf8QCsgj27GlKBy3SU9/8MMgegZ8UCzlCyHYrUF0QU=
github.com/lestrrat-go/strftime v1.1.1/go.mod h1:YDrzHJAODYQ+xxvrn5SG01uFIQAeDTzpxNVppCz7Nmw=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
@ -17,6 +21,8 @@ github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxU
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/segmentio/kafka-go v0.4.47 h1:IqziR4pA3vrZq7YdRxaT3w1/5fvIH5qpCwstUanQQB0=
@ -27,6 +33,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=

View File

@ -2,6 +2,7 @@ package main
import (
"context"
"io"
"kafka-comsumer/config"
"kafka-comsumer/db"
"kafka-comsumer/statistics"
@ -13,6 +14,7 @@ import (
"syscall"
"time"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/segmentio/kafka-go"
)
@ -20,22 +22,48 @@ var (
WorkChan = make(chan *kafka.Message, 100)
Wait sync.WaitGroup
)
var (
rl *rotatelogs.RotateLogs
logWriter io.Writer
errWriter io.Writer
)
var c = make(chan os.Signal, 1)
var d = make(chan int, 1)
// GOOS=linux GOARCH=amd64 go build -o /data/devops/kafka/release/kafka_comsumer main.go
func main() {
//打开日志文件
logFile, err := os.OpenFile("./log/kafka-comsumer.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
// 确保日志目录存在
if err := os.MkdirAll("./log", 0755); err != nil {
log.Fatalf("failed to create log dir: %v", err)
}
defer logFile.Close()
// 将日志输出重定向到文件
log.SetOutput(logFile)
log.SetFlags(log.LstdFlags | log.Lshortfile)
// 使用按天轮转的日志文件,保留最近 30 个文件
var err error
rl, err = rotatelogs.New(
"./log/comsume.%Y-%m-%d.log",
rotatelogs.WithRotationTime(24*time.Hour),
rotatelogs.WithRotationCount(7),
)
if err != nil {
log.Fatalf("failed to initialize log rotator: %v", err)
}
// 打开一个普通的最新日志文件(不使用 symlink用于提供固定路径的最新日志
currFile, err := os.OpenFile("./log/comsume.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
// 如果打开失败,仍然继续使用轮转器+控制台
log.Printf("warning: failed to open current log file: %v", err)
logWriter = io.MultiWriter(rl, os.Stdout)
errWriter = io.MultiWriter(rl, os.Stderr)
} else {
// 同时输出到轮转日志、固定最新日志文件和控制台
logWriter = io.MultiWriter(rl, currFile, os.Stdout)
errWriter = io.MultiWriter(rl, currFile, os.Stderr)
}
log.SetOutput(logWriter)
Games := config.GetGames()
for _, game := range Games {
go comsumer(game)
@ -60,14 +88,31 @@ func scheduleDailyTask() {
statistics.Statistics()
}
}
// func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
// brokers := strings.Split(kafkaURL, ",")
// return kafka.NewReader(kafka.ReaderConfig{
// Brokers: brokers,
// GroupID: groupID,
// Topic: topic,
// MinBytes: 10e3, // 10KB
// MaxBytes: 10e6, // 10MB
// })
// }
func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1, // 降低最小聚合字节,减少等待
MaxBytes: 10e6, // 10MB
MaxWait: 100 * time.Millisecond, // 缩短等待聚合时间
QueueCapacity: 5000, // 内部预取队列容量(根据流量调整)
Dialer: &kafka.Dialer{
Timeout: 3 * time.Second, // 减少建立连接超时,避免 ~9s 阻塞
DualStack: true,
},
})
}
func comsumer(Game *config.Game) {
@ -77,7 +122,7 @@ func comsumer(Game *config.Game) {
return
}
r := getKafkaReader("kafka-server:9092", Game.Topic, "log")
r := getKafkaReader("kafka-server:9092", Game.Topic, "local")
defer r.Close()
ctx := context.Background()
log.Println("comsumer start ", Game.Name)
@ -91,7 +136,7 @@ func comsumer(Game *config.Game) {
for {
// 读取消息
m, err := r.ReadMessage(ctx)
m, err := r.FetchMessage(ctx)
if err != nil {
if ctx.Err() == context.Canceled {
log.Println("comsumer close ", Game.Name)
@ -100,6 +145,9 @@ func comsumer(Game *config.Game) {
log.Printf("comsumer %s error :%v", Game.Name, err)
continue
}
jsonData := string(m.Value)
log.Printf("comsumer %s received message: %s", Game.Name, jsonData)
r.CommitMessages(ctx, m)
Wait.Add(1)
err = db.ProcessMsg(sqlDb, m)
if err != nil {