kafka增加断线重连功能

This commit is contained in:
hahwu 2025-01-08 15:23:05 +08:00
parent abd5039f98
commit cb8bba3537

View File

@ -19,24 +19,28 @@ func init() {
// 连接至Kafka集群的Leader节点 // 连接至Kafka集群的Leader节点
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition) conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
if err != nil { if err != nil {
log.Error("kafka.DialLeader err: %v", err) log.Debug("kafka.DialLeader err: %v", err)
go func() { go reconnectKafka()
for {
time.Sleep(time.Minute)
conn, err = kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
if err == nil {
KafkaMod = conn
log.Release("Reconnected to Kafka")
break
}
log.Error("kafka.DialLeader retry err: %v", err)
}
}()
return return
} }
log.Release("Reconnected to Kafka")
KafkaMod = conn KafkaMod = conn
} }
func reconnectKafka() {
partition := 0
for {
time.Sleep(time.Minute)
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
if err == nil {
KafkaMod = conn
log.Debug("Reconnected to Kafka")
break
}
log.Debug("kafka.DialLeader retry err: %v", err)
}
}
func SendMsg(key, value []byte) error { func SendMsg(key, value []byte) error {
if KafkaMod == nil { if KafkaMod == nil {
return nil return nil
@ -46,5 +50,9 @@ func SendMsg(key, value []byte) error {
Key: key, Key: key,
Value: value, Value: value,
}) })
if err != nil {
log.Debug("WriteMessages err: %v", err)
go reconnectKafka()
}
return err return err
} }