Skip to content

Commit

Permalink
Use native finalizers to handle the cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Antonio Gamez Diaz <agamez@vmware.com>
  • Loading branch information
antgamdia committed Aug 17, 2023
1 parent 2f00e80 commit 6e70910
Showing 1 changed file with 238 additions and 59 deletions.
297 changes: 238 additions & 59 deletions cmd/apprepository-controller/server/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
log "k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const controllerAgentName = "apprepository-controller"
Expand All @@ -62,6 +63,8 @@ const (
// LabelRepoNamespace is the label used to identify the repository namespace.
LabelRepoNamespace = "apprepositories.kubeapps.com/repo-namespace"

FinalizerName = "apprepositories.kubeapps.com/apprepo-cleanup-finalizer"

// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a CronJob already existing
MessageResourceExists = "Resource %q already exists and is not managed by AppRepository"
Expand Down Expand Up @@ -141,13 +144,13 @@ func NewController(
newApp := newObj.(*apprepov1alpha1.AppRepository)
if !reflect.DeepEqual(oldApp.Spec, newApp.Spec) {
controller.enqueueAppRepo(newApp)
} else if !reflect.DeepEqual(oldApp.ObjectMeta, newApp.ObjectMeta) {
// handle updates in ObjectMeta (like finalizers)
controller.handleAppRepoMetaChangeOrDelete(newApp, false)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
controller.workqueue.AddRateLimited(key)
}
controller.handleAppRepoMetaChangeOrDelete(obj, true)
},
})
if err != nil {
Expand Down Expand Up @@ -298,32 +301,8 @@ func (c *Controller) syncHandler(key string) error {
// Get the AppRepository resource with this namespace/name
apprepo, err := c.appreposLister.AppRepositories(namespace).Get(name)
if err != nil {
// The AppRepository resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
log.Infof("AppRepository '%s' no longer exists so performing cleanup of charts from the DB", key)
// Trigger a Job to perform the cleanup of the charts in the DB corresponding to deleted AppRepository
_, err = c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCleanupJob(c.conf.KubeappsNamespace, namespace, name, c.conf), metav1.CreateOptions{})
if err != nil {
log.Errorf("Unable to create cleanup job: %v", err)
return err
}

// TODO: Workaround until the sync jobs are moved to the repoNamespace (#1647)
// Delete the cronjob in the Kubeapps namespace to avoid re-syncing the repository
if c.conf.V1Beta1CronJobs {
err = c.kubeclientset.BatchV1beta1().CronJobs(c.conf.KubeappsNamespace).Delete(context.TODO(), cronJobName(namespace, name, false), metav1.DeleteOptions{})
} else {
err = c.kubeclientset.BatchV1().CronJobs(c.conf.KubeappsNamespace).Delete(context.TODO(), cronJobName(namespace, name, false), metav1.DeleteOptions{})
}

if err != nil && !errors.IsNotFound(err) {
log.Errorf("Unable to delete sync cronjob: %v", err)
return err
}
return nil
}
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
// return nil if the error is a not-found as we want to let the flow continue
return ctrlclient.IgnoreNotFound(err)
}

cronjob, err := c.ensureCronJob(apprepo)
Expand All @@ -335,6 +314,25 @@ func (c *Controller) syncHandler(key string) error {
return err
}

// if the object is not being deleted, check the finalizers
if apprepo.GetDeletionTimestamp().IsZero() {
// check if it contains the finalizer, if not, add it and update the object
if !containsFinalizer(apprepo, FinalizerName) {
log.Infof("the AppRepository %q doesn't have a finalizer yet, adding one...", apprepo.GetName())

ok := addFinalizer(apprepo, FinalizerName)
if !ok {
return fmt.Errorf("error adding finalizer to the AppRepository %q", apprepo.GetName())
}
_, err = c.apprepoclientset.KubeappsV1alpha1().AppRepositories(namespace).Update(context.TODO(), apprepo, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("error updating the AppRepository %q: %v", apprepo.GetName(), err)
}
}
} else {
return fmt.Errorf("got an update event on a deletion-pending object")
}

// If the CronJob is not controlled by this AppRepository resource and it is not a
// cronjob for an app repo in another namespace, then we should
// log a warning to the event recorder and return it.
Expand Down Expand Up @@ -408,6 +406,32 @@ func (c *Controller) ensureCronJob(apprepo *apprepov1alpha1.AppRepository) (meta
return cronjob, nil
}

func (c *Controller) cleanUpAppRepo(apprepo *apprepov1alpha1.AppRepository, namespace, name string) error {
log.Infof("Trying to perform clean-up tasks on AppRepository %q", name)
if apprepo == nil {
log.Errorf("AppRepository %q no longer exists. No clean-up operations can be performed automatically", name)
return nil
}

// Trigger a Job to perform the cleanup of the charts in the DB corresponding to deleted AppRepository
_, err := c.kubeclientset.BatchV1().Jobs(c.conf.KubeappsNamespace).Create(context.TODO(), newCleanupJob(c.conf.KubeappsNamespace, namespace, name, c.conf, apprepo), metav1.CreateOptions{})
if err != nil {
log.Errorf("Unable to create cleanup job for AppRepository %q: %v", name, err)
return err
}

// TODO: Workaround until the sync jobs are moved to the repoNamespace (#1647)
// Delete the cronjob in the Kubeapps namespace to avoid re-syncing the repository
err = c.kubeclientset.BatchV1().CronJobs(c.conf.KubeappsNamespace).Delete(context.TODO(), cronJobName(namespace, name, false), metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
log.Errorf("Unable to delete sync cronjob for AppRepository %q: %v", name, err)
return err
}
log.Infof("The clean-up tasks on AppRepository %q succeeded", name)

return nil
}

// belongsTo is similar to IsControlledBy, but enables us to establish a relationship
// between cronjobs and app repositories in different namespaces.
func objectBelongsTo(object, parent metav1.Object) bool {
Expand Down Expand Up @@ -473,6 +497,77 @@ func (c *Controller) handleObject(obj interface{}) {
}
}

func (c Controller) handleAppRepoMetaChangeOrDelete(obj interface{}, shouldDelete bool) {
var (
object metav1.Object
apprepo *apprepov1alpha1.AppRepository
ok bool
err error
)

if object, ok = obj.(metav1.Object); !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
object, ok = tombstone.Obj.(metav1.Object)
if !ok {
runtime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
log.Infof("Recovered deleted object '%s' from tombstone", object.GetName())
}

if apprepo, ok = obj.(*apprepov1alpha1.AppRepository); !ok {
runtime.HandleError(fmt.Errorf("Error decoding object, invalid type"))
return
}

// if the object is not being deleted (ie, deletionTimestamp==0)
if apprepo.GetDeletionTimestamp().IsZero() && !containsFinalizer(apprepo, FinalizerName) {
// check if it contains the finalizer, if not, add it and update the object
log.Errorf("The AppRepository %q should be deleted, but doesn't have any finalizers. You might want to perform a manual clean-up", apprepo.GetName())
}

// if the object is being deleted and contains a finalizer
if !apprepo.GetDeletionTimestamp().IsZero() && containsFinalizer(apprepo, FinalizerName) {
// if the object is being deleted and contains a finalizer and the event is not a deletion event,
// then handle the finalizer-derived clean-up tasks and remove the finalizer
if !shouldDelete {
log.Infof("Starting the clean-up tasks derived from the finalizer of the AppRepository %q", apprepo.GetName())

// start the clean-up
err = c.cleanUpAppRepo(apprepo, apprepo.GetNamespace(), apprepo.GetName())
if err != nil {
log.Errorf("Error performing clean-up tasks derived from the finalizer of the AppRepository %q. The finalizer will be removed anyways. You might want to perform a manual clean-up. Error: %v", apprepo.GetName(), err)
}

// once everything is done, remove the finalizer from the list
ok := removeFinalizer(apprepo, FinalizerName)
if !ok {
log.Errorf("Error removing finalizer from the AppRepository %q: %v", apprepo.GetName(), err)
return
}

// update the CR removing the finalizer
_, err = c.apprepoclientset.KubeappsV1alpha1().AppRepositories(apprepo.GetNamespace()).Update(context.TODO(), apprepo, metav1.UpdateOptions{})
if err != nil {
log.Errorf("Error updating the AppRepository %q: %v", apprepo.GetName(), err)
return
}
} else {
// if the object is being deleted and it is a deletion event, then do nothing else
log.Infof("The AppRepository %q is now being deleted", apprepo.GetName())
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.workqueue.AddRateLimited(key)
}
}
}
return
}

// ownerReferencesForAppRepo returns populated owner references for app repos in the same namespace
// as the cronjob and nil otherwise.
func ownerReferencesForAppRepo(apprepo *apprepov1alpha1.AppRepository, childNamespace string) []metav1.OwnerReference {
Expand Down Expand Up @@ -668,49 +763,73 @@ func syncJobSpec(apprepo *apprepov1alpha1.AppRepository, config Config) batchv1.

// newCleanupJob triggers a job for the AppRepository resource. It also sets the
// appropriate OwnerReferences on the resource
func newCleanupJob(kubeappsNamespace, repoNamespace, name string, config Config) *batchv1.Job {
func newCleanupJob(kubeappsNamespace, repoNamespace, name string, config Config, apprepo *apprepov1alpha1.AppRepository) *batchv1.Job {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
GenerateName: deleteJobName(repoNamespace, name),
Namespace: kubeappsNamespace,
Annotations: config.ParsedCustomAnnotations,
Labels: config.ParsedCustomLabels,
},
Spec: cleanupJobSpec(repoNamespace, name, config),
Spec: cleanupJobSpec(repoNamespace, name, config, apprepo),
}
}

// cleanupJobSpec returns a batchv1.JobSpec for running the chart-repo delete job
func cleanupJobSpec(namespace, name string, config Config) batchv1.JobSpec {
return batchv1.JobSpec{
TTLSecondsAfterFinished: ttlLifetimeJobs(config),
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
// If there's an issue, delay till the next cron
RestartPolicy: corev1.RestartPolicyNever,
ImagePullSecrets: config.ImagePullSecretsRefs,
Containers: []corev1.Container{
{
Name: "delete",
Image: config.RepoSyncImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{config.RepoSyncCommand},
Args: apprepoCleanupJobArgs(namespace, name, config),
Env: []corev1.EnvVar{
{
Name: "DB_PASSWORD",
ValueFrom: &corev1.EnvVarSource{
SecretKeyRef: &corev1.SecretKeySelector{
LocalObjectReference: corev1.LocalObjectReference{Name: config.DBSecretName},
Key: config.DBSecretKey,
},
},
},
},
func cleanupJobSpec(namespace, name string, config Config, apprepo *apprepov1alpha1.AppRepository) batchv1.JobSpec {
volumes := []corev1.Volume{}
volumeMounts := []corev1.VolumeMount{}
if apprepo.Spec.Auth.CustomCA != nil {
volumes = append(volumes, corev1.Volume{
Name: apprepo.Spec.Auth.CustomCA.SecretKeyRef.Name,
VolumeSource: corev1.VolumeSource{
Secret: &corev1.SecretVolumeSource{
SecretName: secretKeyRefForRepo(apprepo.Spec.Auth.CustomCA.SecretKeyRef, apprepo, config).Name,
Items: []corev1.KeyToPath{
{Key: apprepo.Spec.Auth.CustomCA.SecretKeyRef.Key, Path: "ca.crt"},
},
},
},
},
})
volumeMounts = append(volumeMounts, corev1.VolumeMount{
Name: apprepo.Spec.Auth.CustomCA.SecretKeyRef.Name,
ReadOnly: true,
MountPath: "/usr/local/share/ca-certificates",
})
}

// Get the predefined pod spec for the apprepo definition if exists
podTemplateSpec := apprepo.Spec.SyncJobPodTemplate
// Add labels
if len(podTemplateSpec.ObjectMeta.Labels) == 0 {
podTemplateSpec.ObjectMeta.Labels = map[string]string{}
}
for k, v := range jobLabels(apprepo, config) {
podTemplateSpec.ObjectMeta.Labels[k] = v
}
podTemplateSpec.ObjectMeta.Annotations = config.ParsedCustomAnnotations
// If there's an issue, won't restart
podTemplateSpec.Spec.RestartPolicy = corev1.RestartPolicyNever
// Populate container spec
if len(podTemplateSpec.Spec.Containers) == 0 {
podTemplateSpec.Spec.Containers = []corev1.Container{{}}
}
// Populate ImagePullSecrets spec
podTemplateSpec.Spec.ImagePullSecrets = append(podTemplateSpec.Spec.ImagePullSecrets, config.ImagePullSecretsRefs...)

podTemplateSpec.Spec.Containers[0].Name = "delete"
podTemplateSpec.Spec.Containers[0].Image = config.RepoSyncImage
podTemplateSpec.Spec.Containers[0].ImagePullPolicy = corev1.PullIfNotPresent
podTemplateSpec.Spec.Containers[0].Command = []string{config.RepoSyncCommand}
podTemplateSpec.Spec.Containers[0].Args = apprepoCleanupJobArgs(namespace, name, config)
podTemplateSpec.Spec.Containers[0].Env = append(podTemplateSpec.Spec.Containers[0].Env, apprepoSyncJobEnvVars(apprepo, config)...)
podTemplateSpec.Spec.Containers[0].VolumeMounts = append(podTemplateSpec.Spec.Containers[0].VolumeMounts, volumeMounts...)
// Add volumes
podTemplateSpec.Spec.Volumes = append(podTemplateSpec.Spec.Volumes, volumes...)

return batchv1.JobSpec{
TTLSecondsAfterFinished: ttlLifetimeJobs(config),
Template: podTemplateSpec,
}
}

Expand Down Expand Up @@ -892,3 +1011,63 @@ func dbFlags(config Config) []string {
"--database-name=" + config.DBName,
}
}

// the following pieces of code have been extracted from
// https://github.com/kubernetes-sigs/controller-runtime/blob/main/pkg/controller/controllerutil/controllerutil.go
// and modified to use the apimachinery object instead of the the controller-runtime object
// they are subject to the undermentioned license terms.

/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// ContainsFinalizer checks an Object that the provided finalizer is present.
func containsFinalizer(o metav1.Object, finalizer string) bool {
f := o.GetFinalizers()
for _, e := range f {
if e == finalizer {
return true
}
}
return false
}

// AddFinalizer accepts an Object and adds the provided finalizer if not present.
// It returns an indication of whether it updated the object's list of finalizers.
func addFinalizer(o metav1.Object, finalizer string) (finalizersUpdated bool) {
f := o.GetFinalizers()
for _, e := range f {
if e == finalizer {
return false
}
}
o.SetFinalizers(append(f, finalizer))
return true
}

// RemoveFinalizer accepts an Object and removes the provided finalizer if present.
// It returns an indication of whether it updated the object's list of finalizers.
func removeFinalizer(o metav1.Object, finalizer string) (finalizersUpdated bool) {
f := o.GetFinalizers()
for i := 0; i < len(f); i++ {
if f[i] == finalizer {
f = append(f[:i], f[i+1:]...)
i--
finalizersUpdated = true
}
}
o.SetFinalizers(f)
return
}

0 comments on commit 6e70910

Please sign in to comment.