系统优化

This commit is contained in:
hahwu 2025-03-19 18:33:52 +08:00
parent daf7152619
commit 54611a0a96

View File

@ -5,74 +5,23 @@ import (
"fmt" "fmt"
"server/conf" "server/conf"
"server/pkg/github.com/name5566/leaf/log" "server/pkg/github.com/name5566/leaf/log"
"time"
"github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go"
) )
var KafkaMod *kafka.Conn var KafkaMod *kafka.Writer
var topic string var topic string
func init() { func init() {
topic = conf.Server.GameName KafkaMod = newKafkaWriter(fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), conf.Server.GameName)
// 连接至Kafka集群的Leader节点 log.Debug("KafkaMod init")
conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort))
if err != nil {
log.Debug("kafka.DialContext err: %v", err)
go reconnectKafka()
return
}
Partitions, err := conn.ReadPartitions(topic)
if err != nil {
log.Fatal("kafka.DialContext err: %v", err)
return
}
for _, p := range Partitions {
if p.Leader.Host == conf.Server.KafkaHost {
log.Debug("Kafka connected partition :%v", p)
conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), topic, p.ID)
if err != nil {
log.Fatal("kafka.DialLeader err: %v", err)
return
}
log.Release("Reconnected to Kafka")
KafkaMod = conn
return
}
}
log.Fatal("kafka.DialLeader err: %v", "no leader")
} }
func reconnectKafka() { func newKafkaWriter(kafkaURL, topic string) *kafka.Writer {
WaitTime := 5 return &kafka.Writer{
for { Addr: kafka.TCP(kafkaURL),
WaitTime-- Topic: topic,
if WaitTime <= 0 { Balancer: &kafka.LeastBytes{},
log.Debug("Reconnected to Kafka failed")
return
}
conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort))
if err != nil {
log.Fatal("kafka.DialContext err: %v", err)
return
}
Partitions, err := conn.ReadPartitions(topic)
for _, p := range Partitions {
if p.Leader.Host == conf.Server.KafkaHost {
log.Debug("Kafka connected partition :%v", p)
conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", conf.Server.KafkaHost, conf.Server.KafkaPort), topic, p.ID)
if err != nil {
log.Fatal("kafka.DialLeader err: %v", err)
return
}
log.Release("Reconnected to Kafka")
KafkaMod = conn
return
}
}
time.Sleep(time.Second)
log.Debug("kafka.DialLeader retry err: %v", err)
} }
} }
@ -80,14 +29,13 @@ func SendMsg(key, value []byte) error {
if KafkaMod == nil { if KafkaMod == nil {
return nil return nil
} }
_, err := KafkaMod.WriteMessages(kafka.Message{ err := KafkaMod.WriteMessages(context.Background(), kafka.Message{
Topic: topic, Topic: topic,
Key: key, Key: key,
Value: value, Value: value,
}) })
if err != nil { if err != nil {
log.Debug("WriteMessages err: %v", err) log.Debug("WriteMessages err: %v", err)
reconnectKafka()
} }
return err return err
} }