diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index a95e76b1..2836943b 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -19,24 +19,28 @@ func init() { // 连接至Kafka集群的Leader节点 conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) if err != nil { - log.Error("kafka.DialLeader err: %v", err) - go func() { - for { - time.Sleep(time.Minute) - conn, err = kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) - if err == nil { - KafkaMod = conn - log.Release("Reconnected to Kafka") - break - } - log.Error("kafka.DialLeader retry err: %v", err) - } - }() + log.Debug("kafka.DialLeader err: %v", err) + go reconnectKafka() return } + log.Release("Reconnected to Kafka") KafkaMod = conn } +func reconnectKafka() { + partition := 0 + for { + time.Sleep(time.Minute) + conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) + if err == nil { + KafkaMod = conn + log.Debug("Reconnected to Kafka") + break + } + log.Debug("kafka.DialLeader retry err: %v", err) + } +} + func SendMsg(key, value []byte) error { if KafkaMod == nil { return nil @@ -46,5 +50,9 @@ func SendMsg(key, value []byte) error { Key: key, Value: value, }) + if err != nil { + log.Debug("WriteMessages err: %v", err) + go reconnectKafka() + } return err }