From fcb838fa9af9bd1b1ce217316e95d25dc2bf8012 Mon Sep 17 00:00:00 2001 From: hahwu <31872165+hahwu@users.noreply.github.com> Date: Wed, 19 Mar 2025 17:24:52 +0800 Subject: [PATCH] =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/conf/server.json | 2 +- src/server/game/UnitTest.go | 6 ++-- src/server/middleware/kafka/kafka.go | 48 ++++++++++++++++++++++------ 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/src/server/conf/server.json b/src/server/conf/server.json index 6d5ca858..c5fbf78a 100644 --- a/src/server/conf/server.json +++ b/src/server/conf/server.json @@ -14,7 +14,7 @@ "AppPath": "./app", "TELOGDIR" : "./teLog/", - "GameName": "Merge_Pet_Local", + "GameName": "pet_home", "GameID": 1, "ServerType":"node", diff --git a/src/server/game/UnitTest.go b/src/server/game/UnitTest.go index 47bc2c05..31b5174f 100644 --- a/src/server/game/UnitTest.go +++ b/src/server/game/UnitTest.go @@ -213,9 +213,7 @@ func UnitPlayroom(p *Player) error { } func UnitDailyTask(p *Player) error { - DailyTaskMod := p.PlayMod.getDailyTaskMod() - for i := 0; i < 1000; i++ { - DailyTaskMod.WeekUpdate() - } + FT := p.PlayMod.getFriendTreasureMod() + FT.ZeroUpdate() return nil } diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index f23c7061..a8ddec97 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -15,18 +15,33 @@ var topic string func init() { topic = conf.Server.GameName // 连接至Kafka集群的Leader节点 - conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server-3:9096", topic, conf.Server.Partition) + conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092") if err != nil { - log.Debug("kafka.DialLeader err: %v", err) + log.Debug("kafka.DialContext err: %v", err) go reconnectKafka() return } - log.Release("Reconnected to Kafka") - KafkaMod = conn + Partitions, err := conn.ReadPartitions(topic) + if err != nil { + log.Fatal("kafka.DialContext err: %v", err) + return + } + for _, p := range Partitions { + if p.Leader.Host == "kafka-server" { + conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID) + if err != nil { + log.Fatal("kafka.DialLeader err: %v", err) + return + } + log.Release("Reconnected to Kafka") + KafkaMod = conn + return + } + } + log.Fatal("kafka.DialLeader err: %v", "no leader") } func reconnectKafka() { - partition := 0 WaitTime := 5 for { WaitTime-- @@ -34,11 +49,24 @@ func reconnectKafka() { log.Debug("Reconnected to Kafka failed") return } - conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) - if err == nil { - KafkaMod = conn - log.Debug("Reconnected to Kafka") - break + conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092") + if err != nil { + log.Fatal("kafka.DialContext err: %v", err) + return + } + Partitions, err := conn.ReadPartitions(topic) + + for _, p := range Partitions { + if p.Leader.Host == "kafka-server" { + conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID) + if err != nil { + log.Fatal("kafka.DialLeader err: %v", err) + return + } + log.Release("Reconnected to Kafka") + KafkaMod = conn + return + } } time.Sleep(time.Second) log.Debug("kafka.DialLeader retry err: %v", err)