diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go index 68fedcbb41..6b0455046c 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation.go @@ -57,6 +57,9 @@ func (cts *ConsumerTemplateSpec) Validate(ctx context.Context) *apis.FieldError cts.Spec.Delivery.Validate(specCtx).ViaField("delivery"), cts.Spec.Subscriber.Validate(specCtx).ViaField("subscriber"), ) + if cts.Spec.Configs.Configs == nil { + err = err.Also(apis.ErrMissingField("spec.configs")) + } return err } diff --git a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go index c8e9ff6a7b..24038ff54e 100644 --- a/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go +++ b/control-plane/pkg/apis/internals/kafka/eventing/v1alpha1/consumer_group_validation_test.go @@ -109,6 +109,9 @@ func TestConsumerGroup_Validate(t *testing.T) { Host: "127.0.0.1", }, }, + Configs: ConsumerConfigs{ + Configs: map[string]string{}, + }, }, }, }, @@ -136,6 +139,9 @@ func TestConsumerGroup_Validate(t *testing.T) { APIVersion: "flows.knative.dev/v1", }, }, + Configs: ConsumerConfigs{ + Configs: map[string]string{}, + }, }, }, }, @@ -191,6 +197,9 @@ func TestConsumerGroup_Validate(t *testing.T) { Host: "127.0.0.1", }, }, + Configs: ConsumerConfigs{ + Configs: map[string]string{}, + }, }, }, }, @@ -215,6 +224,9 @@ func TestConsumerGroup_Validate(t *testing.T) { Host: "127.0.0.1", }, }, + Configs: ConsumerConfigs{ + Configs: map[string]string{}, + }, }, }, }, @@ -243,6 +255,9 @@ func TestConsumerGroup_Validate(t *testing.T) { Host: "127.0.0.1", }, }, + Configs: ConsumerConfigs{ + Configs: map[string]string{}, + }, }, }, }, @@ -267,6 +282,9 @@ func TestConsumerGroup_Validate(t *testing.T) { Host: "127.0.0.1", }, }, + Configs: ConsumerConfigs{ + Configs: map[string]string{}, + }, }, }, }, diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index acc1da40f7..83fac8389b 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -304,10 +304,10 @@ func (r *Reconciler) deleteConsumerGroupMetadata(ctx context.Context, cg *kafkai bootstrapServers := kafka.BootstrapServersArray(cg.Spec.Template.Spec.Configs.Configs["bootstrap.servers"]) kafkaClusterAdminClient, err := r.GetKafkaClusterAdmin(ctx, bootstrapServers, kafakSecret) - defer kafkaClusterAdminClient.Close() if err != nil { return fmt.Errorf("cannot obtain Kafka cluster admin, %w", err) } + defer kafkaClusterAdminClient.Close() groupId := cg.Spec.Template.Spec.Configs.Configs["group.id"] if err = kafkaClusterAdminClient.DeleteConsumerGroup(groupId); err != nil && !errorIsOneOf(err, sarama.ErrUnknownTopicOrPartition, sarama.ErrGroupIDNotFound) { @@ -599,10 +599,10 @@ func (r *Reconciler) reconcileInitialOffset(ctx context.Context, cg *kafkaintern bootstrapServers := kafka.BootstrapServersArray(cg.Spec.Template.Spec.Configs.Configs["bootstrap.servers"]) kafkaClusterAdminClient, err := r.GetKafkaClusterAdmin(ctx, bootstrapServers, kafkaSecret) - defer kafkaClusterAdminClient.Close() if err != nil { return fmt.Errorf("cannot obtain Kafka cluster admin, %w", err) } + defer kafkaClusterAdminClient.Close() kafkaClient, err := r.GetKafkaClient(ctx, bootstrapServers, kafkaSecret) if err != nil {