How to use ConsumeClaim method of kafka Package

Best Venom code snippet using kafka.ConsumeClaim

kafka_client.go

Source:kafka_client.go Github

copy

Full Screen

...138 w *KafkaClient139 taskCallback func(t *Task) *TaskResult140 compensateCallback func(t *Task) *TaskResult141}142// Setup is run at the beginning of a new session, before ConsumeClaim143func (wh *workerHandler) Setup(sarama.ConsumerGroupSession) error { return nil }144// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited145func (wh *workerHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }146// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().147func (wh *workerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {148 var wg sync.WaitGroup149 for m := range claim.Messages() {150 wg.Add(1)151 go func(m *sarama.ConsumerMessage) {152 defer wg.Done()153 currentMillis := time.Now().UnixNano() / int64(time.Millisecond)154 t := Task{}155 if err := json.Unmarshal(m.Value, &t); err != nil {156 fmt.Println("Bad payload", err, m.Topic, m.Partition, m.Offset)157 return158 }159 elapsedTime := int(currentMillis - t.StartTime)160 if t.AckTimeout > 0 && t.AckTimeout < elapsedTime || t.Timeout > 0 && t.Timeout < elapsedTime {161 fmt.Printf(`Already timeout: %s`, t.TaskID)162 return163 }164 tr := NewTaskResult(&t)165 tr.Status = TaskStatusInProgress166 wh.w.UpdateTask(tr)167 defer func() {168 if err := recover(); err != nil {169 tr.Status = TaskStatusFailed170 tr.Output = fmt.Sprintf("callback error: %v", err)171 wh.w.UpdateTask(tr)172 }173 }()174 switch t.Type {175 case TaskTypeTask:176 tr = wh.taskCallback(&t)177 wh.w.UpdateTask(tr)178 case TaskTypeCompensate:179 tr = wh.compensateCallback(&t)180 wh.w.UpdateTask(tr)181 default:182 tr.Status = TaskStatusFailed183 tr.Output = "Unknown task type"184 wh.w.UpdateTask(tr)185 }186 session.MarkMessage(m, "")187 }(m)188 }189 wg.Wait() // Wait for every task ran before pull for new batch190 return nil191}192// eventWatcherHandler193type eventWatcherHandler struct {194 w *KafkaClient195 ch chan interface{}196}197// Setup is run at the beginning of a new session, before ConsumeClaim198func (eh *eventWatcherHandler) Setup(sarama.ConsumerGroupSession) error { return nil }199// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited200func (eh *eventWatcherHandler) Cleanup(sarama.ConsumerGroupSession) error { return nil }201// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().202func (eh *eventWatcherHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {203 for m := range claim.Messages() {204 go func(m *sarama.ConsumerMessage) {205 session.MarkMessage(m, "")206 var be BaseEvent207 err := json.Unmarshal(m.Value, &be)208 if err != nil {209 log.Println(err)210 return211 }212 if be.IsError == true {213 switch be.Type {214 case EventTypeTransaction:215 var e EventTransactionError216 err := json.Unmarshal(m.Value, &e)...

Full Screen

Full Screen

sarama.go

Source:sarama.go Github

copy

Full Screen

...106// <-p.ready107// glog.Warningf("kafka consumer up and running...")108// return close109// }110// // Setup is run at the beginning of a new session, before ConsumeClaim111// func (p *Consumer) Setup(sarama.ConsumerGroupSession) error {112// // Mark the consumer as ready113// close(p.ready)114// return nil115// }116// // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited117// func (p *Consumer) Cleanup(sarama.ConsumerGroupSession) error {118// return nil119// }120// // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().121// func (p *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {122// // NOTE:123// // Do not move the code below to a goroutine.124// // The `ConsumeClaim` itself is called within a goroutine, see:125// // https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29126// // 具体消费消息127// if *config.Config.ChOpen {128// for message := range claim.Messages() {129// msg := string(message.Value)130// p.chPool <- msg131// glog.Infof("msg: %s, %v, %v", msg, config.Config.ChOpen)132// time.Sleep(time.Second)133// // 更新位移134// session.MarkMessage(message, "")135// }136// } else {137// glog.Infof("暂停消费msg:%v.", config.Config.ChOpen)138// time.Sleep(time.Second)139// }140// /*141// for {142// // if len(p.chOpen) == 0 {143// select {144// case p.chOpen <- 1:145// case message, ok := <-claim.Messages():146// if !ok {147// continue148// }149// msg := string(message.Value)150// <-p.chOpen151// p.chPool <- msg152// glog.Infof("msg: %s, %v, %v", msg, len(p.chOpen), &p.chOpen)153// time.Sleep(time.Second)154// // 更新位移155// session.MarkMessage(message, "")156// continue157// default:158// glog.Infof("msg default.")159// // time.Sleep(time.Second)160// break161// }162// // } else {163// // time.Sleep(time.Second)164// // }165// }166// */167// glog.Infof("ConsumeClaim...")168// return nil169// }170// // func main() {171// // k := NewKafka()172// // f := k.Init()173// // sigterm := make(chan os.Signal, 1)174// // signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)175// // select {176// // case <-sigterm:177// // glog.Infof("terminating: via signal")178// // }179// // f()180// // }181// /*...

Full Screen

Full Screen

kafka.go

Source:kafka.go Github

copy

Full Screen

...30 return nil31}32// Cleanup sarama.ConsumerGroupHandler接口定义实现.33// Cleanup() hook is called to allow the user to perform any final tasks34// before a rebalance once all the ConsumeClaim() loops have exited.35func (h *CustomConsumerGroupHandler) Cleanup(session sarama.ConsumerGroupSession) error {36 return nil37}38// ConsumeClaim sarama.ConsumerGroupHandler接口定义实现.39// ConsumeClaim() hook is called for each of the assigned claims.40func (h *CustomConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {41 for {42 select {43 case <-session.Context().Done():44 {45 log.Info().Msg("consumer group claim, context done")46 return nil47 }48 case msg, ok := <-claim.Messages():49 {50 if !ok {51 log.Warn().Msg("consumer group claim, messages channel closed")52 return nil53 }54 h.msgCh <- msg55 session.MarkMessage(msg, "")56 }57 }58 }59}60// Close 关闭并释放所有资源 (并发安全).61func (h *CustomConsumerGroupHandler) Close() {62 h.once.Do(func() { h.closer() })63 log.Info().Msg("unload kafka plugin")64}65// NewCustomConsumerGroupHandler 新建消费者.66func NewCustomConsumerGroupHandler(cfg *conf.KafkaConfig) (*CustomConsumerGroupHandler, error) {67 h := &CustomConsumerGroupHandler{68 cfg: cfg,69 msgCh: make(chan *sarama.ConsumerMessage),70 readyCh: make(chan struct{}),71 }72 var err error73 sc := sarama.NewConfig()74 sc.Version, err = sarama.ParseKafkaVersion(h.cfg.Version)75 if cfg.FromOldest {76 sc.Consumer.Offsets.Initial = sarama.OffsetOldest77 }78 if err != nil {79 log.Error().Err(err).Msgf("unsupported kafka version %s", h.cfg.Version)80 return nil, err81 }82 setKafkaAccessSettings(sc)83 h.consumerGroup, err = sarama.NewConsumerGroup(h.cfg.Brokers, h.cfg.ConsumeGroup, sc)84 if err != nil {85 log.Error().Err(err).Msgf("cannot create consumer group for brokers <%v>", h.cfg.Brokers)86 return nil, err87 }88 ctx, cancel := context.WithCancel(context.Background())89 kGroup := new(sync.WaitGroup)90 kGroup.Add(1)91 go func() {92 defer kGroup.Done()93 h.start(ctx, h.cfg.Topic)94 }()95 // 消费组准备完毕96 <-h.readyCh97 h.closer = func() {98 cancel()99 h.consumerGroup.Close()100 close(h.msgCh)101 kGroup.Wait()102 }103 log.Info().Msgf("load kafka plugin, version %s", h.cfg.Version)104 return h, nil105}106func (h *CustomConsumerGroupHandler) start(ctx context.Context, topics []string) {107 for {108 // Consume joins a cluster of consumers for a given list of topics and109 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.110 if err := h.consumerGroup.Consume(ctx, topics, h); err != nil {111 log.Error().Err(err).Msg("error occurs inside consumer group")112 }113 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)114 // and is assigned their "fair share" of partitions, aka 'claims'.115 // 2. Before processing starts, the handler's Setup() hook is called to notify the user116 // of the claims and allow any necessary preparation or alteration of state.117 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called118 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected119 // from concurrent reads/writes.120 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the121 // parent context is cancelled or when a server-side rebalance cycle is initiated.122 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called123 // to allow the user to perform any final tasks before a rebalance.124 // 6. Finally, marked offsets are committed one last time before claims are released.125 //126 // Please note, that once a rebalance is triggered, sessions must be completed within127 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit128 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout129 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset130 // commit failures.131 select {132 case <-ctx.Done():133 {134 log.Info().Msg("consumer group context done ...")135 return136 }137 case err := <-h.consumerGroup.Errors():138 {139 log.Error().Err(err).Msg("error occurs inside consumer group")140 }141 default:...

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {2 for message := range claim.Messages() {3 fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s4", string(message.Value), message.Timestamp, message.Topic)5 session.MarkMessage(message, "")6 }7}8func (c *Consumer) Consume(ctx context.Context) {9 for {10 topics := []string{"test"}11 err := c.consumer.Consume(ctx, topics, handler)12 if err != nil {13 panic(err)14 }15 if ctx.Err() != nil {16 }17 }18}19func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {20 for message := range claim.Messages() {21 fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s22", string(message.Value), message.Timestamp, message.Topic)23 session.MarkMessage(message, "")24 }25}26func (c *Consumer) Consume(ctx context.Context) {27 for {28 topics := []string{"test"}29 err := c.consumer.Consume(ctx, topics, handler)30 if err != nil {31 panic(err)32 }33 if ctx.Err() != nil {34 }35 }36}37func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {38 for message := range claim.Messages() {39 fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s40", string(message.Value), message.Timestamp, message.Topic)41 session.MarkMessage(message, "")42 }43}44func (c *Consumer) Consume(ctx context.Context) {45 for {

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1func (c *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {2 for msg := range claim.Messages() {3 fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s4", string(msg.Value), msg.Timestamp, msg.Topic)5 session.MarkMessage(msg, "")6 }7}8func (c *KafkaConsumer) Close() error {9 if err := c.consumer.Close(); err != nil {10 }11}12func (c *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error { return nil }13func (c *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error { return nil }14func (c *KafkaConsumer) Run() {15 for {16 if err := c.consumer.Consume(context.Background(), []string{c.topic}, c); err != nil {17 fmt.Println(err)18 }19 if ctx.Err() != nil {20 }21 c.ready = make(chan bool)22 }23}24func NewConsumerGroup(brokers, topic, group string) (*KafkaConsumer, error) {25 config := sarama.NewConfig()26 client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)27 if err != nil {28 }29 return &KafkaConsumer{30 ready: make(chan bool),31 }, nil32}33func main() {

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {2 for message := range claim.Messages() {3 fmt.Fprintf(os.Stdout, "Message claimed: value = %s, timestamp = %v, topic = %s4", string(message.Value), message.Timestamp, message.Topic)5 sess.MarkMessage(message, "")6 }7}8func main() {9 config := sarama.NewConfig()10 consumer := Consumer{}11 client, err := sarama.NewClient([]string{"localhost:9092"}, config)12 if err != nil {13 fmt.Println("Error creating client: ", err)14 }15 consumerGroup, err := sarama.NewConsumerGroupFromClient("my-group", client)16 if err != nil {17 fmt.Println("Error creating consumer group client: ", err)18 }19 defer func() {20 if err := consumerGroup.Close(); err != nil {21 fmt.Println("Error closing client: ", err)22 }23 }()24 for {25 ctx, cancel := context.WithCancel(context.Background())26 go func() {27 for err := range consumerGroup.Errors() {28 fmt.Println("Error from consumer: ", err)29 }30 }()31 go func() {32 for {33 if err := consumerGroup.Consume(ctx, []string{"my-topic"}, &consumer); err != nil {34 fmt.Println("Error from consumer: ", err)35 }36 if ctx.Err() != nil {37 }38 cancel()

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1import (2var (3func main() {4 config := sarama.NewConfig()5 brokers := []string{"localhost:9092"}6 topics := []string{"test"}7 consumer, err := sarama.NewConsumer(brokers, config)8 if err != nil {9 panic(err)10 }11 defer func() {12 if err := consumer.Close(); err != nil {13 panic(err)14 }15 }()16 signals := make(chan os.Signal, 1)17 signal.Notify(signals, os.Interrupt)18 partitionList, err := consumer.Partitions(topics[0])19 if err != nil {20 panic(err)21 }22 for partition := range partitionList {23 pc, err := consumer.ConsumePartition(topics[0], int32(partition), sarama.OffsetNewest)24 if err != nil {25 panic(err)26 }27 defer pc.AsyncClose()28 wg.Add(1)29 go func(sarama.PartitionConsumer) {30 defer wg.Done()31 for msg := range pc.Messages() {32 fmt.Printf("Partition:\t%d33 fmt.Printf("Offset:\t%d34 fmt.Printf("Key:\t%s35", string(msg.Key))36 fmt.Printf("Value:\t%s37", string(msg.Value))38 }39 }(pc)40 }41 wg.Wait()42 fmt.Println("Done consuming topic", topics[0])43}44import (45var (46func main() {47 config := sarama.NewConfig()48 brokers := []string{"localhost:9092"}49 topics := []string{"test"}

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 wg.Add(1)4 go func() {5 defer wg.Done()6 config := sarama.NewConfig()7 brokers := []string{"localhost:9092"}8 consumer, err := sarama.NewConsumer(brokers, config)9 if err != nil {10 panic(err)11 }12 defer func() {13 if err := consumer.Close(); err != nil {14 panic(err)15 }16 }()17 groups, err := consumer.ListGroups()18 if err != nil {19 panic(err)20 }21 fmt.Println("Consumer Groups:", groups)22 partitionConsumer, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)23 if err != nil {24 panic(err)25 }26 defer func() {27 if err := partitionConsumer.Close(); err != nil {28 panic(err)29 }30 }()31 signals := make(chan os.Signal, 1)32 signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)33 doneCh := make(chan struct{})34 go func() {35 for {36 select {37 case msg := <-partitionConsumer.Messages():38 fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s\n", string(msg.Value), msg.Timestamp, msg.Topic)39 case err := <-partitionConsumer.Errors():40 fmt.Printf("Error: %s41", err.Error())42 fmt.Println("Interrupt is detected")43 doneCh <- struct{}{}44 }45 }46 }()47 fmt.Println("Done consuming topic test")48 }()49 wg.Wait()50}

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1import (2func main() {3 config := sarama.NewConfig()4 group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)5 if err != nil {6 panic(err)7 }8 defer func() {9 if err := group.Close(); err != nil {10 panic(err)11 }12 }()13 go func() {14 for err := range group.Errors() {15 fmt.Println(err)16 }17 }()18 for {19 topics := []string{"my-topic"}20 handler := &consumerGroupHandler{}21 err := group.Consume(context.Background(), topics, handler)22 if err != nil {23 panic(err)24 }25 }26}27import (28func main() {29 config := sarama.NewConfig()30 group, err := sarama.NewConsumerGroup([]string{"localhost:9092"}, "my-group", config)31 if err != nil {32 panic(err)33 }34 defer func() {35 if err := group.Close(); err != nil {36 panic(err)37 }38 }()39 go func() {40 for err := range group.Errors() {41 fmt.Println(err)42 }43 }()44 for {45 topics := []string{"my-topic"}46 handler := &consumerGroupHandler{}47 err := group.Consume(context.Background(), topics, handler)48 if err != nil {49 panic(err)50 }51 }52}53import (54func main() {55 config := sarama.NewConfig()

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1func main() {2 fmt.Println("Consumer")3 p := kafka.NewConsumer(&kafka.ConfigMap{4 })5 p.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)6 for run == true {7 msg, err := p.ReadMessage(-1)8 if err == nil {9 fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))10 } else {11 fmt.Printf("Consumer error: %v (%v)\n", err, msg)12 }13 }14 fmt.Printf("Closing consumer\n")15 p.Close()16}17func main() {18 fmt.Println("Consumer")19 p := kafka.NewConsumer(&kafka.ConfigMap{20 })21 p.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)22 for run == true {23 ev := p.Poll(100)24 if ev == nil {25 }26 switch e := ev.(type) {27 fmt.Printf("Message on %s: %s\n", e.TopicPartition, string(e.Value))28 fmt.Fprintf(os.Stderr, "%% Error: %v: %v29", e.Code(), e)30 fmt.Printf("Ignored %v31 }32 }33 fmt.Printf("Closing consumer\n")34 p.Close()35}36func main() {37 fmt.Println("Consumer")38 p := kafka.NewConsumer(&kafka.ConfigMap{39 })40 p.SubscribeTopics([]string{"myTopic", "^aRegex.*[Tt]opic"}, nil)

Full Screen

Full Screen

ConsumeClaim

Using AI Code Generation

copy

Full Screen

1import (2type Consumer struct {3}4func (c *Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {5 defer sess.MarkMessage(claim.Messages()[0], "")6 for message := range claim.Messages() {7 fmt.Printf("Message claimed: value = %s, timestamp = %v, topic = %s8", string(message.Value), message.Timestamp, message.Topic)9 }10}11func (c *Consumer) Setup(sess sarama.ConsumerGroupSession) error {12 close(c.ready)13}14func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {15}16func main() {17 config := sarama.NewConfig()18 client, err := sarama.NewClient([]string{"localhost:9092"}, config)19 if err != nil {20 panic(err)21 }22 consumer, err := sarama.NewConsumerGroupFromClient("my-group", client)23 if err != nil {24 panic(err)25 }26 signals := make(chan os.Signal, 1)27 signal.Notify(signals, os.Interrupt)28 go func() {29 for err := range consumer.Errors() {30 fmt.Println(err)31 }32 }()33 go func() {34 for ntf := range consumer.Notifications() {35 fmt.Println(ntf)36 }37 }()38 for {39 select {40 handler := Consumer{41 ready: make(chan bool),42 }43 ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)44 defer cancel()45 if err := consumer.Consume(ctx

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful