diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index a8ddec97..3d81d681 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -2,6 +2,7 @@ package kafkaMiddleware import ( "context" + "fmt" "server/conf" "server/pkg/github.com/name5566/leaf/log" "time" @@ -12,10 +13,15 @@ import ( var KafkaMod *kafka.Conn var topic string +const ( + HOST = "kafka-server:9092" + PORT = "9092" +) + func init() { topic = conf.Server.GameName // 连接至Kafka集群的Leader节点 - conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092") + conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT)) if err != nil { log.Debug("kafka.DialContext err: %v", err) go reconnectKafka() @@ -27,8 +33,8 @@ func init() { return } for _, p := range Partitions { - if p.Leader.Host == "kafka-server" { - conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID) + if p.Leader.Host == HOST { + conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT), topic, p.ID) if err != nil { log.Fatal("kafka.DialLeader err: %v", err) return @@ -49,7 +55,7 @@ func reconnectKafka() { log.Debug("Reconnected to Kafka failed") return } - conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092") + conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT)) if err != nil { log.Fatal("kafka.DialContext err: %v", err) return @@ -57,8 +63,8 @@ func reconnectKafka() { Partitions, err := conn.ReadPartitions(topic) for _, p := range Partitions { - if p.Leader.Host == "kafka-server" { - conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID) + if p.Leader.Host == HOST { + conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT), topic, p.ID) if err != nil { log.Fatal("kafka.DialLeader err: %v", err) return