diff --git a/src/server/game/LogMgr.go b/src/server/game/LogMgr.go index e15dcd40..f53ad558 100644 --- a/src/server/game/LogMgr.go +++ b/src/server/game/LogMgr.go @@ -33,7 +33,7 @@ type Log struct { func (L *LogMgr) InitManager() { L.McronSave = cron.New() - L.L = make([]*Log, 0, 10) + L.L = make([]*Log, 0, LOG_LENGTH) L.McronSave.AddFunc("@every 10s", func() { L.Lock.Lock() defer L.Lock.Unlock() @@ -45,11 +45,16 @@ func (L *LogMgr) InitManager() { } return } - for _, v := range L.L { + Quene := L.L + NewQuene := make([]*Log, 0, LOG_LENGTH) + for _, v := range Quene { value, _ := json.Marshal(v) - kafkaMiddleware.SendMsg([]byte(v.EventName), value) + err := kafkaMiddleware.SendMsg([]byte(v.EventName), value) + if err != nil { + NewQuene = append(NewQuene, v) + } } - L.L = L.L[:0] + L.L = NewQuene }) L.McronSave.Start() } diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index 2836943b..521bc959 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -29,14 +29,20 @@ func init() { func reconnectKafka() { partition := 0 + WaitTime := 5 for { - time.Sleep(time.Minute) + WaitTime-- + if WaitTime <= 0 { + log.Debug("Reconnected to Kafka failed") + return + } conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) if err == nil { KafkaMod = conn log.Debug("Reconnected to Kafka") break } + time.Sleep(time.Second) log.Debug("kafka.DialLeader retry err: %v", err) } } @@ -52,7 +58,7 @@ func SendMsg(key, value []byte) error { }) if err != nil { log.Debug("WriteMessages err: %v", err) - go reconnectKafka() + reconnectKafka() } return err }