优化kafka日志
This commit is contained in:
parent
11a93fa7e3
commit
4aac987725
@ -33,7 +33,7 @@ type Log struct {
|
||||
|
||||
func (L *LogMgr) InitManager() {
|
||||
L.McronSave = cron.New()
|
||||
L.L = make([]*Log, 0, 10)
|
||||
L.L = make([]*Log, 0, LOG_LENGTH)
|
||||
L.McronSave.AddFunc("@every 10s", func() {
|
||||
L.Lock.Lock()
|
||||
defer L.Lock.Unlock()
|
||||
@ -45,11 +45,16 @@ func (L *LogMgr) InitManager() {
|
||||
}
|
||||
return
|
||||
}
|
||||
for _, v := range L.L {
|
||||
Quene := L.L
|
||||
NewQuene := make([]*Log, 0, LOG_LENGTH)
|
||||
for _, v := range Quene {
|
||||
value, _ := json.Marshal(v)
|
||||
kafkaMiddleware.SendMsg([]byte(v.EventName), value)
|
||||
err := kafkaMiddleware.SendMsg([]byte(v.EventName), value)
|
||||
if err != nil {
|
||||
NewQuene = append(NewQuene, v)
|
||||
}
|
||||
L.L = L.L[:0]
|
||||
}
|
||||
L.L = NewQuene
|
||||
})
|
||||
L.McronSave.Start()
|
||||
}
|
||||
|
||||
@ -29,14 +29,20 @@ func init() {
|
||||
|
||||
func reconnectKafka() {
|
||||
partition := 0
|
||||
WaitTime := 5
|
||||
for {
|
||||
time.Sleep(time.Minute)
|
||||
WaitTime--
|
||||
if WaitTime <= 0 {
|
||||
log.Debug("Reconnected to Kafka failed")
|
||||
return
|
||||
}
|
||||
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
|
||||
if err == nil {
|
||||
KafkaMod = conn
|
||||
log.Debug("Reconnected to Kafka")
|
||||
break
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
log.Debug("kafka.DialLeader retry err: %v", err)
|
||||
}
|
||||
}
|
||||
@ -52,7 +58,7 @@ func SendMsg(key, value []byte) error {
|
||||
})
|
||||
if err != nil {
|
||||
log.Debug("WriteMessages err: %v", err)
|
||||
go reconnectKafka()
|
||||
reconnectKafka()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user