diff --git a/src/server/conf/server.json b/src/server/conf/server.json index 82dbbe94..b441b645 100644 --- a/src/server/conf/server.json +++ b/src/server/conf/server.json @@ -32,7 +32,7 @@ "RemoteAddr":"host.docker.internal:9001", "Partition":3, - "KafkaHost":"kafka-server-2", + "KafkaHost":"kafka-server", "KafkaPort":"9092", "Version":"1.0.0" } diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index 586e0cf1..4739fd6b 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -16,6 +16,40 @@ func init() { if conf.Server.GameName == "pet_home_local" { return } + topic = conf.Server.GameName + + // Create a Kafka connection to the broker + conn, err := kafka.Dial("tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort)) + if err != nil { + log.Debug("Failed to connect to Kafka broker: %v", err) + return + } + defer conn.Close() + + // Check if topic exists, create if not + partitions, err := conn.ReadPartitions() + if err != nil { + log.Debug("Failed to read partitions: %v", err) + return + } + exists := false + for _, p := range partitions { + if p.Topic == topic { + exists = true + break + } + } + if !exists { + err = conn.CreateTopics(kafka.TopicConfig{ + Topic: topic, + NumPartitions: 1, + ReplicationFactor: 1, + }) + if err != nil { + log.Debug("Failed to create topic: %v", err) + return + } + } KafkaMod = newKafkaWriter(fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), conf.Server.GameName) log.Debug("KafkaMod init") }