From 0e30ce57e7c19022e6922e8d550a37c92df7762c Mon Sep 17 00:00:00 2001 From: hahwu <31872165+hahwu@users.noreply.github.com> Date: Fri, 3 Jan 2025 15:12:07 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E9=87=8D=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/middleware/kafka/kafka.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index 9371fb8f..a95e76b1 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -3,6 +3,8 @@ package kafkaMiddleware import ( "context" "server/conf" + "server/pkg/github.com/name5566/leaf/log" + "time" "github.com/segmentio/kafka-go" ) @@ -17,6 +19,19 @@ func init() { // 连接至Kafka集群的Leader节点 conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) if err != nil { + log.Error("kafka.DialLeader err: %v", err) + go func() { + for { + time.Sleep(time.Minute) + conn, err = kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) + if err == nil { + KafkaMod = conn + log.Release("Reconnected to Kafka") + break + } + log.Error("kafka.DialLeader retry err: %v", err) + } + }() return } KafkaMod = conn