系统优化

This commit is contained in:
hahwu 2025-03-19 17:28:46 +08:00
parent fcb838fa9a
commit c3798e7a29

View File

@ -2,6 +2,7 @@ package kafkaMiddleware
import (
"context"
"fmt"
"server/conf"
"server/pkg/github.com/name5566/leaf/log"
"time"
@ -12,10 +13,15 @@ import (
var KafkaMod *kafka.Conn
var topic string
const (
HOST = "kafka-server:9092"
PORT = "9092"
)
func init() {
topic = conf.Server.GameName
// 连接至Kafka集群的Leader节点
conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092")
conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT))
if err != nil {
log.Debug("kafka.DialContext err: %v", err)
go reconnectKafka()
@ -27,8 +33,8 @@ func init() {
return
}
for _, p := range Partitions {
if p.Leader.Host == "kafka-server" {
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID)
if p.Leader.Host == HOST {
conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT), topic, p.ID)
if err != nil {
log.Fatal("kafka.DialLeader err: %v", err)
return
@ -49,7 +55,7 @@ func reconnectKafka() {
log.Debug("Reconnected to Kafka failed")
return
}
conn, err := kafka.DialContext(context.Background(), "tcp", "kafka-server:9092")
conn, err := kafka.DialContext(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT))
if err != nil {
log.Fatal("kafka.DialContext err: %v", err)
return
@ -57,8 +63,8 @@ func reconnectKafka() {
Partitions, err := conn.ReadPartitions(topic)
for _, p := range Partitions {
if p.Leader.Host == "kafka-server" {
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, p.ID)
if p.Leader.Host == HOST {
conn, err := kafka.DialLeader(context.Background(), "tcp", fmt.Sprintf("%s:%s", HOST, PORT), topic, p.ID)
if err != nil {
log.Fatal("kafka.DialLeader err: %v", err)
return