Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove keda objects during reconciliation #3676

48 changes: 48 additions & 0 deletions control-plane/pkg/reconciler/consumergroup/consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@
} else {
// If KEDA is not installed or autoscaler feature disabled, do nothing
cg.MarkAutoscalerDisabled()
if err := r.deleteKedaObjects(ctx, cg); err != nil {
return err
}
}

if err := r.reconcileConsumers(ctx, cg); err != nil {
Expand Down Expand Up @@ -845,3 +848,48 @@
func keyOf(cg metav1.Object) string {
return types.NamespacedName{Namespace: cg.GetNamespace(), Name: cg.GetName()}.String()
}

// deleteKedaObjects is responsible for deleting Keda (Kubernetes-based Event Driven Autoscaling) objects, "scaled
// objects" and "trigger authentications" from all available namespaces.
//
// When Keda creates a scaled object, it also generates a Horizontal Pod Autoscaler. This resource will also be removed
// when the scaled object is deleted.
func (r *Reconciler) deleteKedaObjects(ctx context.Context, cg *kafkainternals.ConsumerGroup) error {

// get scaled object
scaledObjectName := keda.GenerateScaledObjectName(cg)
scaledObject, err := r.KedaClient.KedaV1alpha1().ScaledObjects(cg.Namespace).Get(ctx, scaledObjectName, metav1.GetOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to get Keda scaled object: %w", err)

Check warning on line 863 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L862-L863

Added lines #L862 - L863 were not covered by tests
}

if scaledObject == nil {
return nil
}

// if there is a trigger authentication tight to the consumer group, delete it, and then delete the scaled object
if metav1.IsControlledBy(scaledObject, cg) {
if len(cg.ObjectMeta.OwnerReferences) == 0 {
return fmt.Errorf("failed to delete Keda objects, missing owners reference: %w", err)
}
triggerAuthName := string(cg.ObjectMeta.OwnerReferences[0].UID)

err = r.KedaClient.KedaV1alpha1().TriggerAuthentications(cg.Namespace).Delete(ctx, triggerAuthName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete Keda trigger authentication: %w", err)

Check warning on line 879 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L870-L879

Added lines #L870 - L879 were not covered by tests
}

logging.FromContext(ctx).Infoln("Keda trigger authentication deleted")

// delete scaled object
err = r.KedaClient.KedaV1alpha1().ScaledObjects(cg.Namespace).Delete(ctx, scaledObjectName, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to delete Keda scaled object: %w", err)
}
if err == nil {
logging.FromContext(ctx).Infoln("Keda scaled object deleted")

Check warning on line 890 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L881-L890

Added lines #L881 - L890 were not covered by tests
}
}

Check warning on line 893 in control-plane/pkg/reconciler/consumergroup/consumergroup.go

View check run for this annotation

Codecov / codecov/patch

control-plane/pkg/reconciler/consumergroup/consumergroup.go#L893

Added line #L893 was not covered by tests
return nil
}
Loading