Skip to content

Commit

Permalink
[multikueue] Don't use reconnecting clients in WL reconnecting. (kube…
Browse files Browse the repository at this point in the history
…rnetes-sigs#1827)

* [multikueue] Don't use reconnecting clients in WL reconnecting.

* Add unit test
  • Loading branch information
trasc authored and vsoch committed Apr 18, 2024
1 parent fc2746e commit cc2ae73
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type remoteClient struct {
kubeconfig []byte
origin string

forceReconnect atomic.Bool
pendingReconnect atomic.Bool
failedConnAttempts uint

// For unit testing only. There is now need of creating fully functional remote clients in the unit tests
Expand Down Expand Up @@ -138,7 +138,7 @@ func (*workloadKueueWatcher) GetWorkloadKey(o runtime.Object) (types.NamespacedN
// If the encountered error is not permanent the duration after which a retry should be done is returned.
func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (*time.Duration, error) {
configChanged := !equality.Semantic.DeepEqual(kubeconfig, rc.kubeconfig)
if !configChanged && !rc.forceReconnect.Load() {
if !configChanged && !rc.pendingReconnect.Load() {
return nil, nil
}

Expand Down Expand Up @@ -182,7 +182,7 @@ func (rc *remoteClient) setConfig(watchCtx context.Context, kubeconfig []byte) (
}
}

rc.forceReconnect.Store(false)
rc.pendingReconnect.Store(false)
rc.failedConnAttempts = 0
return nil, nil
}
Expand Down Expand Up @@ -217,7 +217,7 @@ func (rc *remoteClient) startWatcher(ctx context.Context, kind string, w multiKu
log.V(2).Info("Watch ended", "ctxErr", ctx.Err())
// If the context is not yet Done , queue a reconcile to attempt reconnection
if ctx.Err() == nil {
oldReconnect := rc.forceReconnect.Swap(true)
oldReconnect := rc.pendingReconnect.Swap(true)
//reconnect if this is the first watch failing.
if !oldReconnect {
log.V(2).Info("Queue reconcile for reconnect", "cluster", rc.clusterName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func newTestClient(config string, watchCancel func()) *remoteClient {

func setReconnectState(rc *remoteClient, a uint) *remoteClient {
rc.failedConnAttempts = a
rc.forceReconnect.Store(true)
rc.pendingReconnect.Store(true)
return rc
}

Expand Down
5 changes: 4 additions & 1 deletion pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (m
clients := make(map[string]*remoteClient, len(cfg.Spec.Clusters))
for _, clusterName := range cfg.Spec.Clusters {
if client, found := w.clusters.controllerFor(clusterName); found {
clients[clusterName] = client
// Skip the client if its reconnect is ongoing.
if !client.pendingReconnect.Load() {
clients[clusterName] = client
}
}
}
if len(clients) == 0 {
Expand Down
24 changes: 24 additions & 0 deletions pkg/controller/admissionchecks/multikueue/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestWlReconcile(t *testing.T) {
worker1Jobs []batchv1.Job
// second worker
useSecondWorker bool
worker2Reconnecting bool
worker2OnDeleteError error
worker2OnGetError error
worker2OnCreateError error
Expand Down Expand Up @@ -124,6 +125,26 @@ func TestWlReconcile(t *testing.T) {
},
wantError: errFake,
},
"reconnecting clients are skipped": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
Obj(),
},
useSecondWorker: true,
worker2Reconnecting: true,
worker2OnGetError: errFake,

wantManagersWorkloads: []kueue.Workload{
*baseWorkloadBuilder.Clone().
AdmissionCheck(kueue.AdmissionCheckState{Name: "ac1", State: kueue.CheckStatePending}).
ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1").
Obj(),
},
wantError: nil,
},
"wl without reservation, clears the workload objects": {
reconcileFor: "wl1",
managersWorkloads: []kueue.Workload{
Expand Down Expand Up @@ -664,6 +685,9 @@ func TestWlReconcile(t *testing.T) {

w2remoteClient := newRemoteClient(managerClient, nil, nil, defaultOrigin, "")
w2remoteClient.client = worker2Client
if tc.worker2Reconnecting {
w2remoteClient.pendingReconnect.Store(true)
}
cRec.remoteClients["worker2"] = w2remoteClient

}
Expand Down

0 comments on commit cc2ae73

Please sign in to comment.