diff --git a/control-plane/pkg/reconciler/consumergroup/consumergroup.go b/control-plane/pkg/reconciler/consumergroup/consumergroup.go index 31b11117ae..f4e7d10ff9 100644 --- a/control-plane/pkg/reconciler/consumergroup/consumergroup.go +++ b/control-plane/pkg/reconciler/consumergroup/consumergroup.go @@ -195,6 +195,9 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, cg *kafkainternals.Consu } else { // If KEDA is not installed or autoscaler feature disabled, do nothing cg.MarkAutoscalerDisabled() + if err := r.deleteKedaObjects(ctx, cg); err != nil { + return err + } } if err := r.reconcileConsumers(ctx, cg); err != nil { @@ -845,3 +848,48 @@ func metricTagsOf(ctx context.Context, cg *kafkainternals.ConsumerGroup) (contex func keyOf(cg metav1.Object) string { return types.NamespacedName{Namespace: cg.GetNamespace(), Name: cg.GetName()}.String() } + +// deleteKedaObjects is responsible for deleting Keda (Kubernetes-based Event Driven Autoscaling) objects, "scaled +// objects" and "trigger authentications" from all available namespaces. +// +// When Keda creates a scaled object, it also generates a Horizontal Pod Autoscaler. This resource will also be removed +// when the scaled object is deleted. +func (r *Reconciler) deleteKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error { + + // get scaled object + scaledObjectName := keda.GenerateScaledObjectName(cg) + scaledObject, err := r.KedaClient.KedaV1alpha1().ScaledObjects(cg.Namespace).Get(ctx, scaledObjectName, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to get Keda scaled object: %w", err) + } + + if scaledObject == nil { + return nil + } + + // if there is a trigger authentication tight to the consumer group, delete it, and then delete the scaled object + if metav1.IsControlledBy(scaledObject, cg) { + if len(cg.ObjectMeta.OwnerReferences) == 0 { + return fmt.Errorf("failed to delete Keda objects, missing owners reference: %w", err) + } + triggerAuthName := string(cg.ObjectMeta.OwnerReferences[0].UID) + + err = r.KedaClient.KedaV1alpha1().TriggerAuthentications(cg.Namespace).Delete(ctx, triggerAuthName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete Keda trigger authentication: %w", err) + } + + logging.FromContext(ctx).Infoln("Keda trigger authentication deleted") + + // delete scaled object + err = r.KedaClient.KedaV1alpha1().ScaledObjects(cg.Namespace).Delete(ctx, scaledObjectName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("failed to delete Keda scaled object: %w", err) + } + if err == nil { + logging.FromContext(ctx).Infoln("Keda scaled object deleted") + } + } + + return nil +}