1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| func foo() { config := sarama.NewConfig() config.Version = sarama.V2_4_0_0
config.Net.SASL.Enable = true config.Net.SASL.Handshake = true config.Net.SASL.Version = sarama.SASLHandshakeV1 config.Net.SASL.User = "kafkaUser" config.Net.SASL.Password = "kafkaPass" config.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 config.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
addrs := []string{"kafkaAddr"} topics := []string{"topic"}
group, err := sarama.NewConsumerGroup(addrs, "groupId", config) if err != nil { return err } defer func() { _ = group.Close() }()
ctx := context.Background()
for { handler := consumeHandler{}
err := group.Consume(ctx, topics, handler) if err != nil { continue } } }
type consumeHandler struct{}
func (consumeHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil } func (consumeHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil } func (consumeHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { for msg := range claim.Messages() { sess.MarkMessage(msg, "") } return nil }
|