devops/kafka/main.go
2025-12-12 11:40:38 +08:00

154 lines
3.7 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"io"
"kafka-comsumer/config"
"kafka-comsumer/db"
"kafka-comsumer/statistics"
"log"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
"github.com/segmentio/kafka-go"
)
var (
WorkChan = make(chan *kafka.Message, 100)
Wait sync.WaitGroup
)
var (
rl *rotatelogs.RotateLogs
logWriter io.Writer
)
var c = make(chan os.Signal, 1)
var d = make(chan int, 1)
// GOOS=linux GOARCH=amd64 go build -o /data/devops/kafka/release/kafka_comsumer main.go
func main() {
// 确保日志目录存在
if err := os.MkdirAll("./log", 0755); err != nil {
log.Fatalf("failed to create log dir: %v", err)
}
// 使用按天轮转的日志文件,保留最近 30 个文件
var err error
rl, err = rotatelogs.New(
"./log/comsume.%Y-%m-%d.log",
rotatelogs.WithRotationTime(24*time.Hour),
rotatelogs.WithRotationCount(7),
)
if err != nil {
log.Fatalf("failed to initialize log rotator: %v", err)
}
// 打开一个普通的最新日志文件(不使用 symlink用于提供固定路径的最新日志
currFile, err := os.OpenFile("./log/comsume.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
// 如果打开失败,仍然继续使用轮转器
log.Printf("warning: failed to open current log file: %v", err)
logWriter = io.MultiWriter(rl)
} else {
// 同时输出到轮转日志、固定最新日志文件和控制台
logWriter = io.MultiWriter(rl, currFile, os.Stdout)
}
log.SetOutput(logWriter)
Games := config.GetGames()
for _, game := range Games {
go comsumer(game)
}
go scheduleDailyTask()
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
sig := <-c
d <- 1
log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String())
Wait.Wait()
// statistics.Statistics()
}
func scheduleDailyTask() {
for {
now := time.Now()
next := now.AddDate(0, 0, 1).Truncate(24 * time.Hour)
duration := next.Sub(now)
time.Sleep(duration)
// 执行统计函数
statistics.Statistics()
}
}
// func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
// brokers := strings.Split(kafkaURL, ",")
// return kafka.NewReader(kafka.ReaderConfig{
// Brokers: brokers,
// GroupID: groupID,
// Topic: topic,
// MinBytes: 10e3, // 10KB
// MaxBytes: 10e6, // 10MB
// })
// }
func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
brokers := strings.Split(kafkaURL, ",")
return kafka.NewReader(kafka.ReaderConfig{
Brokers: brokers,
GroupID: groupID,
Topic: topic,
MinBytes: 1, // 降低最小聚合字节,减少等待
MaxBytes: 10e6, // 10MB
MaxWait: 100 * time.Millisecond, // 缩短等待聚合时间
Dialer: &kafka.Dialer{
Timeout: 3 * time.Second, // 减少建立连接超时,避免 ~9s 阻塞
DualStack: true,
},
})
}
func comsumer(Game *config.Game) {
sqlDb := db.GetContainer(Game.Name)
if sqlDb == nil {
log.Printf("comsumer db %s not exist \n", Game.Name)
return
}
r := getKafkaReader("kafka-server:9092", Game.Topic, "local")
defer r.Close()
ctx := context.Background()
log.Println("comsumer start ", Game.Name)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
<-d
cancel()
}()
for {
// 读取消息
m, err := r.FetchMessage(ctx)
if err != nil {
if ctx.Err() == context.Canceled {
log.Println("comsumer close ", Game.Name)
return
}
log.Printf("comsumer %s error :%v", Game.Name, err)
continue
}
log.Printf("comsumer %s received message: %s", Game.Name, m.Value)
r.CommitMessages(ctx, m)
Wait.Add(1)
err = db.ProcessMsg(sqlDb, m)
if err != nil {
log.Printf("comsumer m.key : %s %s error :%v", m.Key, Game.Name, err)
}
Wait.Done()
}
}