kafka增加断线重连功能

This commit is contained in:
hahwu 2025-01-08 15:23:05 +08:00
parent 0b60ff67ce
commit a6b5e94b24

View File

@ -19,24 +19,28 @@ func init() {
// 连接至Kafka集群的Leader节点
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
if err != nil {
log.Error("kafka.DialLeader err: %v", err)
go func() {
for {
time.Sleep(time.Minute)
conn, err = kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
if err == nil {
KafkaMod = conn
log.Release("Reconnected to Kafka")
break
}
log.Error("kafka.DialLeader retry err: %v", err)
}
}()
log.Debug("kafka.DialLeader err: %v", err)
go reconnectKafka()
return
}
log.Release("Reconnected to Kafka")
KafkaMod = conn
}
func reconnectKafka() {
partition := 0
for {
time.Sleep(time.Minute)
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
if err == nil {
KafkaMod = conn
log.Debug("Reconnected to Kafka")
break
}
log.Debug("kafka.DialLeader retry err: %v", err)
}
}
func SendMsg(key, value []byte) error {
if KafkaMod == nil {
return nil
@ -46,5 +50,9 @@ func SendMsg(key, value []byte) error {
Key: key,
Value: value,
})
if err != nil {
log.Debug("WriteMessages err: %v", err)
go reconnectKafka()
}
return err
}