kafka重连
This commit is contained in:
parent
094d646c8b
commit
0e30ce57e7
@ -3,6 +3,8 @@ package kafkaMiddleware
|
||||
import (
|
||||
"context"
|
||||
"server/conf"
|
||||
"server/pkg/github.com/name5566/leaf/log"
|
||||
"time"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
)
|
||||
@ -17,6 +19,19 @@ func init() {
|
||||
// 连接至Kafka集群的Leader节点
|
||||
conn, err := kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
|
||||
if err != nil {
|
||||
log.Error("kafka.DialLeader err: %v", err)
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(time.Minute)
|
||||
conn, err = kafka.DialLeader(context.Background(), "tcp", "kafka-server:9092", topic, partition)
|
||||
if err == nil {
|
||||
KafkaMod = conn
|
||||
log.Release("Reconnected to Kafka")
|
||||
break
|
||||
}
|
||||
log.Error("kafka.DialLeader retry err: %v", err)
|
||||
}
|
||||
}()
|
||||
return
|
||||
}
|
||||
KafkaMod = conn
|
||||
|
||||
Loading…
Reference in New Issue
Block a user