From 4212d466630d7411a38af8d036497c7640188d77 Mon Sep 17 00:00:00 2001 From: hahwu <31872165+hahwu@users.noreply.github.com> Date: Wed, 11 Jun 2025 15:59:01 +0800 Subject: [PATCH] =?UTF-8?q?kafka=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/conf/server.json | 2 +- src/server/middleware/kafka/kafka.go | 34 ++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/src/server/conf/server.json b/src/server/conf/server.json index 82dbbe94..b441b645 100644 --- a/src/server/conf/server.json +++ b/src/server/conf/server.json @@ -32,7 +32,7 @@ "RemoteAddr":"host.docker.internal:9001", "Partition":3, - "KafkaHost":"kafka-server-2", + "KafkaHost":"kafka-server", "KafkaPort":"9092", "Version":"1.0.0" } diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index 586e0cf1..4739fd6b 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -16,6 +16,40 @@ func init() { if conf.Server.GameName == "pet_home_local" { return } + topic = conf.Server.GameName + + // Create a Kafka connection to the broker + conn, err := kafka.Dial("tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort)) + if err != nil { + log.Debug("Failed to connect to Kafka broker: %v", err) + return + } + defer conn.Close() + + // Check if topic exists, create if not + partitions, err := conn.ReadPartitions() + if err != nil { + log.Debug("Failed to read partitions: %v", err) + return + } + exists := false + for _, p := range partitions { + if p.Topic == topic { + exists = true + break + } + } + if !exists { + err = conn.CreateTopics(kafka.TopicConfig{ + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }) + if err != nil { + log.Debug("Failed to create topic: %v", err) + return + } + } KafkaMod = newKafkaWriter(fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), conf.Server.GameName) log.Debug("KafkaMod init") }