From 6aa2742197a8484572f83206ac6749826706ca7e Mon Sep 17 00:00:00 2001 From: nbajaj90 Date: Wed, 13 Sep 2023 17:17:56 +0530 Subject: [PATCH] context.Done() may never reach if waiting on r.incoming <- msgErr Signed-off-by: nbajaj90 --- protocol/kafka_sarama/v2/receiver.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/protocol/kafka_sarama/v2/receiver.go b/protocol/kafka_sarama/v2/receiver.go index 4d4a633cb..c2b61f098 100644 --- a/protocol/kafka_sarama/v2/receiver.go +++ b/protocol/kafka_sarama/v2/receiver.go @@ -66,8 +66,7 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram return nil } m := NewMessageFromConsumerMessage(msg) - - r.incoming <- msgErr{ + msgErrObj := msgErr{ msg: binding.WithFinish(m, func(err error) { if protocol.IsACK(err) { session.MarkMessage(msg, "") @@ -75,6 +74,15 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram }), } + // Need to use select clause here, otherwise r.incoming <- msgErrObj can become a blocking operation, + // resulting in never reaching outside block's case <-session.Context().Done() + select { + case r.incoming <- msgErrObj: + // do nothing + case <-session.Context().Done(): + return nil + } + // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: // https://github.com/Shopify/sarama/issues/1192