From c3798e7a29b8e6e132113f272fd5c8975b7a8c15 Mon Sep 17 00:00:00 2001 From: hahwu <31872165+hahwu@users.noreply.github.com> Date: Wed, 19 Mar 2025 17:28:46 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/middleware/kafka/kafka.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index a8ddec97..3d81d681 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -2,6 +2,7 @@ package kafkaMiddleware import ( "context" + "fmt" "server/conf" "server/pkg/github.com/name5566/leaf/log" "time" @@ -12,10 +13,15 @@ import ( var KafkaMod *kafka.Conn var topic string +const ( + HOST = "kafka-server:9092" + PORT = "9092" +) + func init() { topic = conf.Server.GameName // 连接至Kafka集群的Leader节点 - conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092") + conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT)) if err != nil { log.Debug("kafka.DialContext err: %v", err) go reconnectKafka() @@ -27,8 +33,8 @@ func init() { return } for _, p := range Partitions { - if p.Leader.Host == "kafka-server" { - conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID) + if p.Leader.Host == HOST { + conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT), topic, p.ID) if err != nil { log.Fatal("kafka.DialLeader err: %v", err) return @@ -49,7 +55,7 @@ func reconnectKafka() { log.Debug("Reconnected to Kafka failed") return } - conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092") + conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT)) if err != nil { log.Fatal("kafka.DialContext err: %v", err) return @@ -57,8 +63,8 @@ func reconnectKafka() { Partitions, err := conn.ReadPartitions(topic) for _, p := range Partitions { - if p.Leader.Host == "kafka-server" { - conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID) + if p.Leader.Host == HOST { + conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT), topic, p.ID) if err != nil { log.Fatal("kafka.DialLeader err: %v", err) return