kafka优化

This commit is contained in:
hahwu 2025-06-11 15:59:01 +08:00
parent 1976d60e35
commit 4212d46663
2 changed files with 35 additions and 1 deletions

View File

@ -32,7 +32,7 @@
"RemoteAddr":"host.docker.internal:9001",
"Partition":3,
"KafkaHost":"kafka-server-2",
"KafkaHost":"kafka-server",
"KafkaPort":"9092",
"Version":"1.0.0"
}

View File

@ -16,6 +16,40 @@ func init() {
if conf.Server.GameName == "pet_home_local" {
return
}
topic = conf.Server.GameName
// Create a Kafka connection to the broker
conn, err := kafka.Dial("tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort))
if err != nil {
log.Debug("Failed to connect to Kafka broker: %v", err)
return
}
defer conn.Close()
// Check if topic exists, create if not
partitions, err := conn.ReadPartitions()
if err != nil {
log.Debug("Failed to read partitions: %v", err)
return
}
exists := false
for _, p := range partitions {
if p.Topic == topic {
exists = true
break
}
}
if !exists {
err = conn.CreateTopics(kafka.TopicConfig{
Topic: topic,
NumPartitions: 1,
ReplicationFactor: 1,
})
if err != nil {
log.Debug("Failed to create topic: %v", err)
return
}
}
KafkaMod = newKafkaWriter(fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), conf.Server.GameName)
log.Debug("KafkaMod init")
}