diff --git a/protocol/kafka_sarama/v2/receiver.go b/protocol/kafka_sarama/v2/receiver.go index 4d4a633cb..c2b61f098 100644 --- a/protocol/kafka_sarama/v2/receiver.go +++ b/protocol/kafka_sarama/v2/receiver.go @@ -66,8 +66,7 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram return nil } m := NewMessageFromConsumerMessage(msg) - - r.incoming <- msgErr{ + msgErrObj := msgErr{ msg: binding.WithFinish(m, func(err error) { if protocol.IsACK(err) { session.MarkMessage(msg, "") @@ -75,6 +74,15 @@ func (r *Receiver) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram }), } + // Need to use select clause here, otherwise r.incoming <- msgErrObj can become a blocking operation, + // resulting in never reaching outside block's case <-session.Context().Done() + select { + case r.incoming <- msgErrObj: + // do nothing + case <-session.Context().Done(): + return nil + } + // Should return when `session.Context()` is done. // If not, will raise `ErrRebalanceInProgress` or `read tcp :: i/o timeout` when kafka rebalance. see: // https://github.com/Shopify/sarama/issues/1192