From 9e433518a00d3db107a06f4d315f47ac175a3a7d Mon Sep 17 00:00:00 2001 From: hahwu <31872165+hahwu@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:23:05 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E5=A2=9E=E5=8A=A0=E6=96=AD=E7=BA=BF?= =?UTF-8?q?=E9=87=8D=E8=BF=9E=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/middleware/kafka/kafka.go | 34 +++++++++++++++++----------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index a95e76b1..2836943b 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -19,24 +19,28 @@ func init() { // 连接至Kafka集群的Leader节点 conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) if err != nil { - log.Error("kafka.DialLeader err: %v", err) - go func() { - for { - time.Sleep(time.Minute) - conn, err = kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) - if err == nil { - KafkaMod = conn - log.Release("Reconnected to Kafka") - break - } - log.Error("kafka.DialLeader retry err: %v", err) - } - }() + log.Debug("kafka.DialLeader err: %v", err) + go reconnectKafka() return } + log.Release("Reconnected to Kafka") KafkaMod = conn } +func reconnectKafka() { + partition := 0 + for { + time.Sleep(time.Minute) + conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) + if err == nil { + KafkaMod = conn + log.Debug("Reconnected to Kafka") + break + } + log.Debug("kafka.DialLeader retry err: %v", err) + } +} + func SendMsg(key, value []byte) error { if KafkaMod == nil { return nil @@ -46,5 +50,9 @@ func SendMsg(key, value []byte) error { Key: key, Value: value, }) + if err != nil { + log.Debug("WriteMessages err: %v", err) + go reconnectKafka() + } return err }