diff --git a/src/server/middleware/kafka/kafka.go b/src/server/middleware/kafka/kafka.go index e6d68872..3ac0d515 100644 --- a/src/server/middleware/kafka/kafka.go +++ b/src/server/middleware/kafka/kafka.go @@ -5,74 +5,23 @@ import ( "fmt" "server/conf" "server/pkg/github.com/name5566/leaf/log" - "time" "github.com/segmentio/kafka-go" ) -var KafkaMod *kafka.Conn +var KafkaMod *kafka.Writer var topic string func init() { - topic = conf.Server.GameName - // 连接至Kafka集群的Leader节点 - conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort)) - if err != nil { - log.Debug("kafka.DialContext err: %v", err) - go reconnectKafka() - return - } - Partitions, err := conn.ReadPartitions(topic) - if err != nil { - log.Fatal("kafka.DialContext err: %v", err) - return - } - for _, p := range Partitions { - if p.Leader.Host == conf.Server.KafkaHost { - log.Debug("Kafka connected partition :%v", p) - conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), topic, p.ID) - if err != nil { - log.Fatal("kafka.DialLeader err: %v", err) - return - } - log.Release("Reconnected to Kafka") - KafkaMod = conn - return - } - } - log.Fatal("kafka.DialLeader err: %v", "no leader") + KafkaMod = newKafkaWriter(fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), conf.Server.GameName) + log.Debug("KafkaMod init") } -func reconnectKafka() { - WaitTime := 5 - for { - WaitTime-- - if WaitTime <= 0 { - log.Debug("Reconnected to Kafka failed") - return - } - conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort)) - if err != nil { - log.Fatal("kafka.DialContext err: %v", err) - return - } - Partitions, err := conn.ReadPartitions(topic) - - for _, p := range Partitions { - if p.Leader.Host == conf.Server.KafkaHost { - log.Debug("Kafka connected partition :%v", p) - conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), topic, p.ID) - if err != nil { - log.Fatal("kafka.DialLeader err: %v", err) - return - } - log.Release("Reconnected to Kafka") - KafkaMod = conn - return - } - } - time.Sleep(time.Second) - log.Debug("kafka.DialLeader retry err: %v", err) +func newKafkaWriter(kafkaURL, topic string) *kafka.Writer { + return &kafka.Writer{ + Addr: kafka.TCP(kafkaURL), + Topic: topic, + Balancer: &kafka.LeastBytes{}, } } @@ -80,14 +29,13 @@ func SendMsg(key, value []byte) error { if KafkaMod == nil { return nil } - _, err := KafkaMod.WriteMessages(kafka.Message{ + err := KafkaMod.WriteMessages(context.Background(), kafka.Message{ Topic: topic, Key: key, Value: value, }) if err != nil { log.Debug("WriteMessages err: %v", err) - reconnectKafka() } return err }