111 lines
2.3 KiB
Go
111 lines
2.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"kafka-comsumer/config"
|
|
"kafka-comsumer/db"
|
|
"kafka-comsumer/statistics"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
var (
|
|
WorkChan = make(chan *kafka.Message, 100)
|
|
Wait sync.WaitGroup
|
|
)
|
|
var c = make(chan os.Signal, 1)
|
|
var d = make(chan int, 1)
|
|
|
|
// GOOS=linux GOARCH=amd64 go build -o /data/devops/kafka/release/kafka_comsumer main.go
|
|
|
|
func main() {
|
|
//打开日志文件
|
|
logFile, err := os.OpenFile("./log/kafka-comsumer.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
|
if err != nil {
|
|
log.Fatalf("Failed to open log file: %v", err)
|
|
}
|
|
defer logFile.Close()
|
|
|
|
// 将日志输出重定向到文件
|
|
log.SetOutput(logFile)
|
|
log.SetFlags(log.LstdFlags | log.Lshortfile)
|
|
Games := config.GetGames()
|
|
for _, game := range Games {
|
|
go comsumer(game)
|
|
}
|
|
go scheduleDailyTask()
|
|
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL)
|
|
sig := <-c
|
|
d <- 1
|
|
log.Printf("kafka comsumer closing down (signal: %v)\n", sig.String())
|
|
Wait.Wait()
|
|
// statistics.Statistics()
|
|
}
|
|
|
|
func scheduleDailyTask() {
|
|
for {
|
|
now := time.Now()
|
|
next := now.AddDate(0, 0, 1).Truncate(24 * time.Hour)
|
|
duration := next.Sub(now)
|
|
time.Sleep(duration)
|
|
|
|
// 执行统计函数
|
|
statistics.Statistics()
|
|
}
|
|
}
|
|
func getKafkaReader(kafkaURL, topic, groupID string) *kafka.Reader {
|
|
brokers := strings.Split(kafkaURL, ",")
|
|
return kafka.NewReader(kafka.ReaderConfig{
|
|
Brokers: brokers,
|
|
GroupID: groupID,
|
|
Topic: topic,
|
|
MinBytes: 10e3, // 10KB
|
|
MaxBytes: 10e6, // 10MB
|
|
})
|
|
}
|
|
func comsumer(Game *config.Game) {
|
|
sqlDb := db.GetContainer(Game.Name)
|
|
if sqlDb == nil {
|
|
log.Printf("comsumer db %s not exist \n", Game.Name)
|
|
return
|
|
}
|
|
|
|
r := getKafkaReader("kafka-server:9092", Game.Topic, "log")
|
|
defer r.Close()
|
|
ctx := context.Background()
|
|
log.Println("comsumer start ", Game.Name)
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
go func() {
|
|
<-d
|
|
cancel()
|
|
}()
|
|
|
|
for {
|
|
// 读取消息
|
|
m, err := r.ReadMessage(ctx)
|
|
if err != nil {
|
|
if ctx.Err() == context.Canceled {
|
|
log.Println("comsumer close ", Game.Name)
|
|
return
|
|
}
|
|
log.Printf("comsumer %s error :%v", Game.Name, err)
|
|
continue
|
|
}
|
|
Wait.Add(1)
|
|
err = db.ProcessMsg(sqlDb, m)
|
|
if err != nil {
|
|
log.Printf("comsumer m.key : %s %s error :%v", m.Key, Game.Name, err)
|
|
}
|
|
Wait.Done()
|
|
}
|
|
}
|