From cc2ae735b6c442fd49b5d2d9ae27d03a5927d512 Mon Sep 17 00:00:00 2001 From: Traian Schiau <55734665+trasc@users.noreply.github.com> Date: Wed, 13 Mar 2024 12:12:24 +0200 Subject: [PATCH] [multikueue] Don't use reconnecting clients in WL reconnecting. (#1827) * [multikueue] Don't use reconnecting clients in WL reconnecting. * Add unit test --- .../multikueue/multikueuecluster.go | 8 +++---- .../multikueue/multikueuecluster_test.go | 2 +- .../admissionchecks/multikueue/workload.go | 5 +++- .../multikueue/workload_test.go | 24 +++++++++++++++++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go index 4ccaa46983..aa282d193c 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster.go @@ -80,7 +80,7 @@ type remoteClient struct { kubeconfig []byte origin string - forceReconnect atomic.Bool + pendingReconnect atomic.Bool failedConnAttempts uint // For unit testing only. There is now need of creating fully functional remote clients in the unit tests @@ -138,7 +138,7 @@ func (*workloadKueueWatcher) GetWorkloadKey(o runtime.Object) (types.NamespacedN // If the encountered error is not permanent the duration after which a retry should be done is returned. func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (*time.Duration, error) { configChanged := !equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig) - if !configChanged && !rc.forceReconnect.Load() { + if !configChanged && !rc.pendingReconnect.Load() { return nil, nil } @@ -182,7 +182,7 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) ( } } - rc.forceReconnect.Store(false) + rc.pendingReconnect.Store(false) rc.failedConnAttempts = 0 return nil, nil } @@ -217,7 +217,7 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu log.V(2).Info("Watch ended", "ctxErr", ctx.Err()) // If the context is not yet Done , queue a reconcile to attempt reconnection if ctx.Err() == nil { - oldReconnect := rc.forceReconnect.Swap(true) + oldReconnect := rc.pendingReconnect.Swap(true) //reconnect if this is the first watch failing. if !oldReconnect { log.V(2).Info("Queue reconcile for reconnect", "cluster", rc.clusterName) diff --git a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go index 42187024a2..7e8ab1bdcd 100644 --- a/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go +++ b/pkg/controller/admissionchecks/multikueue/multikueuecluster_test.go @@ -76,7 +76,7 @@ func newTestClient(config string, watchCancel func()) *remoteClient { func setReconnectState(rc *remoteClient, a uint) *remoteClient { rc.failedConnAttempts = a - rc.forceReconnect.Store(true) + rc.pendingReconnect.Store(true) return rc } diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index e320670531..7862a00fdd 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -180,7 +180,10 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m clients := make(map[string]*remoteClient, len(cfg.Spec.Clusters)) for _, clusterName := range cfg.Spec.Clusters { if client, found := w.clusters.controllerFor(clusterName); found { - clients[clusterName] = client + // Skip the client if its reconnect is ongoing. + if !client.pendingReconnect.Load() { + clients[clusterName] = client + } } } if len(clients) == 0 { diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index 4228fe4592..c004d43a23 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -64,6 +64,7 @@ func TestWlReconcile(t *testing.T) { worker1Jobs []batchv1.Job // second worker useSecondWorker bool + worker2Reconnecting bool worker2OnDeleteError error worker2OnGetError error worker2OnCreateError error @@ -124,6 +125,26 @@ func TestWlReconcile(t *testing.T) { }, wantError: errFake, }, + "reconnecting clients are skipped": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + useSecondWorker: true, + worker2Reconnecting: true, + worker2OnGetError: errFake, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + wantError: nil, + }, "wl without reservation, clears the workload objects": { reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ @@ -664,6 +685,9 @@ func TestWlReconcile(t *testing.T) { w2remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "") w2remoteClient.client = worker2Client + if tc.worker2Reconnecting { + w2remoteClient.pendingReconnect.Store(true) + } cRec.remoteClients["worker2"] = w2remoteClient }