154 lines
3.7 KiB
Go
154 lines
3.7 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"io"
|
||
"kafka-comsumer/config"
|
||
"kafka-comsumer/db"
|
||
"kafka-comsumer/statistics"
|
||
"log"
|
||
"os"
|
||
"os/signal"
|
||
"strings"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
|
||
rotatelogs "github.com/lestrrat-go/file-rotatelogs"
|
||
"github.com/segmentio/kafka-go"
|
||
)
|
||
|
||
var (
|
||
WorkChan = make(chan *kafka.Message, 100)
|
||
Wait sync.WaitGroup
|
||
)
|
||
var (
|
||
rl *rotatelogs.RotateLogs
|
||
logWriter io.Writer
|
||
)
|
||
var c = make(chan os.Signal, 1)
|
||
var d = make(chan int, 1)
|
||
|
||
// GOOS=linux GOARCH=amd64 go build -o /data/devops/kafka/release/kafka_comsumer main.go
|
||
|
||
func main() {
|
||
// 确保日志目录存在
|
||
if err := os.MkdirAll("./log", 0755); err != nil {
|
||
log.Fatalf("failed to create log dir: %v", err)
|
||
}
|
||
|
||
// 使用按天轮转的日志文件,保留最近 30 个文件
|
||
var err error
|
||
rl, err = rotatelogs.New(
|
||
"./log/comsume.%Y-%m-%d.log",
|
||
rotatelogs.WithRotationTime(24*time.Hour),
|
||
rotatelogs.WithRotationCount(7),
|
||
)
|
||
if err != nil {
|
||
log.Fatalf("failed to initialize log rotator: %v", err)
|
||
}
|
||
|
||
// 打开一个普通的最新日志文件(不使用 symlink),用于提供固定路径的最新日志
|
||
currFile, err := os.OpenFile("./log/comsume.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
|
||
if err != nil {
|
||
// 如果打开失败,仍然继续使用轮转器
|
||
log.Printf("warning: failed to open current log file: %v", err)
|
||
logWriter = io.MultiWriter(rl)
|
||
} else {
|
||
// 同时输出到轮转日志、固定最新日志文件和控制台
|
||
logWriter = io.MultiWriter(rl, currFile, os.Stdout)
|
||
}
|
||
|
||
log.SetOutput(logWriter)
|
||
|
||
Games := config.GetGames()
|
||
for _, game := range Games {
|
||
go comsumer(game)
|
||
}
|
||
go scheduleDailyTask()
|
||
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
|
||
sig := <-c
|
||
d <- 1
|
||
log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String())
|
||
Wait.Wait()
|
||
// statistics.Statistics()
|
||
}
|
||
|
||
func scheduleDailyTask() {
|
||
for {
|
||
now := time.Now()
|
||
next := now.AddDate(0, 0, 1).Truncate(24 * time.Hour)
|
||
duration := next.Sub(now)
|
||
time.Sleep(duration)
|
||
|
||
// 执行统计函数
|
||
statistics.Statistics()
|
||
}
|
||
}
|
||
|
||
// func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
|
||
// brokers := strings.Split(kafkaURL, ",")
|
||
// return kafka.NewReader(kafka.ReaderConfig{
|
||
// Brokers: brokers,
|
||
// GroupID: groupID,
|
||
// Topic: topic,
|
||
// MinBytes: 10e3, // 10KB
|
||
// MaxBytes: 10e6, // 10MB
|
||
// })
|
||
// }
|
||
func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
|
||
brokers := strings.Split(kafkaURL, ",")
|
||
return kafka.NewReader(kafka.ReaderConfig{
|
||
Brokers: brokers,
|
||
GroupID: groupID,
|
||
Topic: topic,
|
||
MinBytes: 1, // 降低最小聚合字节,减少等待
|
||
MaxBytes: 10e6, // 10MB
|
||
MaxWait: 100 * time.Millisecond, // 缩短等待聚合时间
|
||
Dialer: &kafka.Dialer{
|
||
Timeout: 3 * time.Second, // 减少建立连接超时,避免 ~9s 阻塞
|
||
DualStack: true,
|
||
},
|
||
})
|
||
}
|
||
func comsumer(Game *config.Game) {
|
||
sqlDb := db.GetContainer(Game.Name)
|
||
if sqlDb == nil {
|
||
log.Printf("comsumer db %s not exist \n", Game.Name)
|
||
return
|
||
}
|
||
|
||
r := getKafkaReader("kafka-server:9092", Game.Topic, "local")
|
||
defer r.Close()
|
||
ctx := context.Background()
|
||
log.Println("comsumer start ", Game.Name)
|
||
ctx, cancel := context.WithCancel(ctx)
|
||
defer cancel()
|
||
|
||
go func() {
|
||
<-d
|
||
cancel()
|
||
}()
|
||
|
||
for {
|
||
// 读取消息
|
||
m, err := r.FetchMessage(ctx)
|
||
if err != nil {
|
||
if ctx.Err() == context.Canceled {
|
||
log.Println("comsumer close ", Game.Name)
|
||
return
|
||
}
|
||
log.Printf("comsumer %s error :%v", Game.Name, err)
|
||
continue
|
||
}
|
||
log.Printf("comsumer %s received message: %s", Game.Name, m.Value)
|
||
r.CommitMessages(ctx, m)
|
||
Wait.Add(1)
|
||
err = db.ProcessMsg(sqlDb, m)
|
||
if err != nil {
|
||
log.Printf("comsumer m.key : %s %s error :%v", m.Key, Game.Name, err)
|
||
}
|
||
Wait.Done()
|
||
}
|
||
}
|