优化kafka日志

This commit is contained in:
hahwu 2025-01-10 17:18:05 +08:00
parent bc5cd1b769
commit 827a016102
2 changed files with 17 additions and 6 deletions

View File

@ -33,7 +33,7 @@ type Log struct {
func (L *LogMgr) InitManager() { func (L *LogMgr) InitManager() {
L.McronSave = cron.New() L.McronSave = cron.New()
L.L = make([]*Log, 0, 10) L.L = make([]*Log, 0, LOG_LENGTH)
L.McronSave.AddFunc("@every 10s", func() { L.McronSave.AddFunc("@every 10s", func() {
L.Lock.Lock() L.Lock.Lock()
defer L.Lock.Unlock() defer L.Lock.Unlock()
@ -45,11 +45,16 @@ func (L *LogMgr) InitManager() {
} }
return return
} }
for _, v := range L.L { Quene := L.L
NewQuene := make([]*Log, 0, LOG_LENGTH)
for _, v := range Quene {
value, _ := json.Marshal(v) value, _ := json.Marshal(v)
kafkaMiddleware.SendMsg([]byte(v.EventName), value) err := kafkaMiddleware.SendMsg([]byte(v.EventName), value)
if err != nil {
NewQuene = append(NewQuene, v)
}
} }
L.L = L.L[:0] L.L = NewQuene
}) })
L.McronSave.Start() L.McronSave.Start()
} }

View File

@ -29,14 +29,20 @@ func init() {
func reconnectKafka() { func reconnectKafka() {
partition := 0 partition := 0
WaitTime := 5
for { for {
time.Sleep(time.Minute) WaitTime--
if WaitTime <= 0 {
log.Debug("Reconnected to Kafka failed")
return
}
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
if err == nil { if err == nil {
KafkaMod = conn KafkaMod = conn
log.Debug("Reconnected to Kafka") log.Debug("Reconnected to Kafka")
break break
} }
time.Sleep(time.Second)
log.Debug("kafka.DialLeader retry err: %v", err) log.Debug("kafka.DialLeader retry err: %v", err)
} }
} }
@ -52,7 +58,7 @@ func SendMsg(key, value []byte) error {
}) })
if err != nil { if err != nil {
log.Debug("WriteMessages err: %v", err) log.Debug("WriteMessages err: %v", err)
go reconnectKafka() reconnectKafka()
} }
return err return err
} }