Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove keda objects during reconciliation #3676

45 changes: 45 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@
} else {
// If KEDA is not installed or autoscaler feature disabled, do nothing
cg.MarkAutoscalerDisabled()
if err := r.deleteKedaObjects(ctx, cg); err != nil {
return err
}
}

if err := r.reconcileConsumers(ctx, cg); err != nil {
Expand Down Expand Up @@ -845,3 +848,45 @@
func keyOf(cg metav1.Object) string {
return types.NamespacedName{Namespace: cg.GetNamespace(), Name: cg.GetName()}.String()
}

// deleteKedaObjects is responsible for deleting Keda (Kubernetes-based Event Driven Autoscaling) objects, "scaled
// objects" and "trigger authentications" from all available namespaces.
//
// When Keda creates a scaled object, it also generates a Horizontal Pod Autoscaler. This resource will also be removed
// when the scaled object is deleted.
func (r *Reconciler) deleteKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

// get scaled object
scaledObjectName := keda.GenerateScaledObjectName(cg)
scaledObject, err := r.KedaClient.KedaV1alpha1().ScaledObjects(cg.Namespace).Get(ctx, scaledObjectName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get Keda scaled object: %w", err)
}

if scaledObject == nil {
return nil
}

// if there is a trigger authentication tight to the consumer group, delete it
if metav1.IsControlledBy(scaledObject, cg) {
triggerAuthName := string(cg.ObjectMeta.OwnerReferences[0].UID)

err = r.KedaClient.KedaV1alpha1().TriggerAuthentications(cg.Namespace).Delete(ctx, triggerAuthName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete Keda trigger authentication: %w", err)
}

Check warning on line 877 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L876-L877

Added lines #L876 - L877 were not covered by tests

logging.FromContext(ctx).Infoln("Keda trigger authentication deleted")
}

// delete scaled object
err = r.KedaClient.KedaV1alpha1().ScaledObjects(cg.Namespace).Delete(ctx, scaledObjectName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete Keda scaled object: %w", err)
}
if err == nil {
logging.FromContext(ctx).Infoln("Keda scaled object deleted")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@converge sorry I missed this earlier, but this should also be in the if metav1.IsControlledBy() statement, as we don't want to delete the scaled object if we don't control it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@converge sorry I missed this earlier, but this should also be in the if metav1.IsControlledBy() statement, as we don't want to delete the scaled object if we don't control it

make sense! I have just updated, tks!


Check warning on line 890 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L884-L890

Added lines #L884 - L890 were not covered by tests
return nil
}

Check warning on line 892 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L892

Added line #L892 was not covered by tests
Loading