devops/kafka/main.go
2025-02-17 10:49:31 +08:00

109 lines
2.3 KiB
Go

package main
import (
"context"
"kafka-comsumer/config"
"kafka-comsumer/db"
"kafka-comsumer/statistics"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/segmentio/kafka-go"
)
var (
WorkChan = make(chan *kafka.Message, 100)
Wait sync.WaitGroup
)
var c = make(chan os.Signal, 1)
var d = make(chan int, 1)
// GOOS=linux GOARCH=amd64 go build -o /data/devops/kafka/release/kafka_comsumer main.go
func main() {
//打开日志文件
logFile, err := os.OpenFile("./log/kafka-comsumer.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("Failed to open log file: %v", err)
}
defer logFile.Close()
// 将日志输出重定向到文件
log.SetOutput(logFile)
log.SetFlags(log.LstdFlags | log.Lshortfile)
Games := config.GetGames()
for _, game := range Games {
go comsumer(game)
}
go scheduleDailyTask()
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
sig := <-c
d <- 1
log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String())
Wait.Wait()
// statistics.Statistics()
}
func scheduleDailyTask() {
for {
now := time.Now()
next := now.AddDate(0, 0, 1).Truncate(24 * time.Hour)
duration := next.Sub(now)
time.Sleep(duration)
// 执行统计函数
statistics.Statistics()
}
}
func comsumer(Game *config.Game) {
sqlDb := db.GetContainer(Game.Name)
if sqlDb == nil {
log.Printf("comsumer db %s not exist \n", Game.Name)
return
}
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"kafka-server:9092"},
Topic: Game.Topic,
Partition: Game.Partition,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer r.Close()
// 设置读取超时时间
offset := db.GetOffset(sqlDb)
r.SetOffset(offset)
ctx := context.Background()
log.Println("comsumer start ", Game.Name)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-d
cancel()
}()
for {
// 读取消息
m, err := r.ReadMessage(ctx)
if err != nil {
if ctx.Err() == context.Canceled {
log.Println("comsumer close ", Game.Name)
return
}
log.Printf("comsumer %s error :%v", Game.Name, err)
continue
}
Wait.Add(1)
err = db.ProcessMsg(sqlDb, m)
if err != nil {
log.Printf("comsumer m.key : %s %s error :%v", m.Key, Game.Name, err)
}
Wait.Done()
}
}