Skip to content

Commit

Permalink
WIP: propagate podsetupdates to jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Oct 5, 2023
1 parent e162f85 commit 0ce7d07
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 10 deletions.
34 changes: 25 additions & 9 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -642,9 +643,12 @@ func extractPriorityFromPodSets(podSets []kueue.PodSet) string {
}

type PodSetInfo struct {
Name string `json:"name"`
NodeSelector map[string]string `json:"nodeSelector"`
Count int32 `json:"count"`
Name string `json:"name"`
NodeSelector map[string]string `json:"nodeSelector"`
Count int32 `json:"count"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Tolerations []corev1.Toleration `json:"tolerations"`
}

// getPodSetsInfoFromAdmission will extract podSetsInfo and podSets count from admitted workloads.
Expand All @@ -653,14 +657,17 @@ func (r *JobReconciler) getPodSetsInfoFromAdmission(ctx context.Context, w *kueu
return nil, nil
}

nodeSelectors := make([]PodSetInfo, len(w.Status.Admission.PodSetAssignments))
podsSetInfos := make([]PodSetInfo, len(w.Status.Admission.PodSetAssignments))

for i, podSetFlavor := range w.Status.Admission.PodSetAssignments {
processedFlvs := sets.NewString()
nodeSelector := PodSetInfo{
podSetInfo := PodSetInfo{
Name: podSetFlavor.Name,
NodeSelector: make(map[string]string),
Count: ptr.Deref(podSetFlavor.Count, w.Spec.PodSets[i].Count),
Labels: make(map[string]string),
Annotations: make(map[string]string),
Tolerations: make([]corev1.Toleration, 0),
}
for _, flvRef := range podSetFlavor.Flavors {
flvName := string(flvRef)
Expand All @@ -673,14 +680,23 @@ func (r *JobReconciler) getPodSetsInfoFromAdmission(ctx context.Context, w *kueu
return nil, err
}
for k, v := range flv.Spec.NodeLabels {
nodeSelector.NodeSelector[k] = v
podSetInfo.NodeSelector[k] = v
}
processedFlvs.Insert(flvName)
}

nodeSelectors[i] = nodeSelector
for _, admissionCheck := range w.Status.AdmissionChecks {
for _, podSetUpdate := range admissionCheck.PodSetUpdates {
if podSetUpdate.Name == podSetInfo.Name {
podSetInfo.Labels = utilmaps.MergeKeepFirst(podSetInfo.Labels, podSetUpdate.Labels)
podSetInfo.Annotations = utilmaps.MergeKeepFirst(podSetInfo.Annotations, podSetUpdate.Annotations)
podSetInfo.NodeSelector = utilmaps.MergeKeepFirst(podSetInfo.NodeSelector, podSetUpdate.NodeSelector)
podSetInfo.Tolerations = append(podSetInfo.Tolerations, podSetUpdate.Tolerations...)
}
}
}
podsSetInfos[i] = podSetInfo
}
return nodeSelectors, nil
return podsSetInfos, nil
}

func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job GenericJob, object client.Object) error {
Expand Down
12 changes: 11 additions & 1 deletion pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"maps"
"slices"
"strconv"

batchv1 "k8s.io/api/batch/v1"
Expand Down Expand Up @@ -226,6 +227,9 @@ func (j *Job) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) error {

info := podSetsInfo[0]
j.Spec.Template.Spec.NodeSelector = utilmaps.MergeKeepFirst(info.NodeSelector, j.Spec.Template.Spec.NodeSelector)
j.Annotations = utilmaps.MergeKeepFirst(info.Annotations, j.Annotations)
j.Labels = utilmaps.MergeKeepFirst(info.Labels, j.Labels)
j.Spec.Template.Spec.Tolerations = append(j.Spec.Template.Spec.Tolerations, info.Tolerations...)

if j.minPodsCount() != nil {
j.Spec.Parallelism = ptr.To(info.Count)
Expand All @@ -251,10 +255,16 @@ func (j *Job) RestorePodSetsInfo(podSetsInfo []jobframework.PodSetInfo) bool {
}
}

if equality.Semantic.DeepEqual(j.Spec.Template.Spec.NodeSelector, podSetsInfo[0].NodeSelector) {
if equality.Semantic.DeepEqual(j.Spec.Template.Spec.NodeSelector, podSetsInfo[0].NodeSelector) &&
equality.Semantic.DeepEqual(j.Labels, podSetsInfo[0].Labels) &&
equality.Semantic.DeepEqual(j.Annotations, podSetsInfo[0].Annotations) &&
equality.Semantic.DeepEqual(j.Spec.Template.Spec.Tolerations, podSetsInfo[0].Tolerations) {
return changed
}
j.Spec.Template.Spec.NodeSelector = maps.Clone(podSetsInfo[0].NodeSelector)
j.Spec.Template.Spec.Tolerations = slices.Clone(podSetsInfo[0].Tolerations)
j.Annotations = maps.Clone(podSetsInfo[0].Annotations)
j.Labels = maps.Clone(podSetsInfo[0].Labels)
return true
}

Expand Down
201 changes: 201 additions & 0 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"

schedulingv1 "k8s.io/api/scheduling/v1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
Expand Down Expand Up @@ -786,6 +787,206 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, spotUntaintedFlavor, true)
})

ginkgo.FWhen("the queue has admission checks", func() {
var (
clusterQueueAc *kueue.ClusterQueue
localQueue *kueue.LocalQueue
highPriorityClass *schedulingv1.PriorityClass
)

ginkgo.BeforeEach(func() {
clusterQueueAc = testing.MakeClusterQueue("prod-cq-with-checks").
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
).Preemption(kueue.ClusterQueuePreemption{
WithinClusterQueue: kueue.PreemptionPolicyLowerPriority,
}).AdmissionChecks("check").Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueueAc)).Should(gomega.Succeed())
localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueueAc.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed())
highPriorityClass = testing.MakePriorityClass("high").PriorityValue(100).Obj()
gomega.Expect(k8sClient.Create(ctx, highPriorityClass))
})

ginkgo.AfterEach(func() {
gomega.Expect(k8sClient.Delete(ctx, highPriorityClass)).To(gomega.Succeed())
gomega.Expect(util.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed())
gomega.Expect(util.DeleteLocalQueue(ctx, k8sClient, localQueue)).Should(gomega.Succeed())
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueueAc, true)
})

ginkgo.It("labels and annotations should be propagated from admission check to job", func() {
createdJob := &batchv1.Job{}
createdWorkload := &kueue.Workload{}
jobLookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobName), Namespace: ns.Name}
jobHighName := "job-high"
jobHighLookupKey := types.NamespacedName{Name: jobHighName, Namespace: ns.Name}
createdHighJob := &batchv1.Job{}
createdHighWorkload := &kueue.Workload{}
wlHighLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobHighName), Namespace: ns.Name}

ginkgo.By("create admission check", func() {
ac := testing.MakeAdmissionCheck("check").Obj()
gomega.Expect(k8sClient.Create(ctx, ac)).To(gomega.Succeed())
})

ginkgo.By("creating the job with default priority", func() {
job := testingjob.MakeJob(jobName, ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "5").Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

wantSuspend := true
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
gomega.Consistently(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
})

ginkgo.By("checking the workload is created", func() {
gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Expect(createdWorkload.Spec.QueueName).Should(gomega.Equal("queue"))
gomega.Expect(metav1.IsControlledBy(createdWorkload, createdJob)).To(gomega.BeTrue())
})

ginkgo.By("add labels & annotations to the admission check", func() {
gomega.Eventually(func() error {
var newWL kueue.Workload
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(createdWorkload), &newWL)).To(gomega.Succeed())
check := workload.FindAdmissionCheck(newWL.Status.AdmissionChecks, "check")
workload.SetAdmissionCheckState(&newWL.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: check.Name,
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
Labels: map[string]string{
"label1": "label-value1",
},
Annotations: map[string]string{
"ann1": "ann-value1",
},
NodeSelector: map[string]string{
"selector1": "selector-value1",
},
Tolerations: []corev1.Toleration{
{
Key: "selector1",
Value: "selector-value1",
Operator: corev1.TolerationOpEqual,
Effect: corev1.TaintEffectNoSchedule,
},
},
},
},
})
return k8sClient.Status().Update(ctx, &newWL)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("await for the job to be admitted", func() {
wantSuspend := false
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
gomega.Consistently(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
})

ginkgo.By("verify the PodSetUpdates are propagated to the running job", func() {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
gomega.Expect(createdJob.Labels).Should(gomega.HaveKeyWithValue("label1", "label-value1"))
gomega.Expect(createdJob.Annotations).Should(gomega.HaveKeyWithValue("ann1", "ann-value1"))
gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector).Should(gomega.HaveKeyWithValue("selector1", "selector-value1"))
gomega.Expect(createdJob.Spec.Template.Spec.Tolerations).Should(gomega.BeComparableTo(
[]corev1.Toleration{
{
Key: "selector1",
Value: "selector-value1",
Operator: corev1.TolerationOpEqual,
Effect: corev1.TaintEffectNoSchedule,
},
},
))
})

ginkgo.By("creating the job with high priority", func() {
jobHigh := testingjob.MakeJob(jobHighName, ns.Name).Queue(localQueue.Name).PriorityClass(highPriorityClass.Name).Request(corev1.ResourceCPU, "5").Obj()
gomega.Expect(k8sClient.Create(ctx, jobHigh)).Should(gomega.Succeed())

wantSuspend := true
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobHighLookupKey, createdHighJob)).Should(gomega.Succeed())
return createdHighJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
gomega.Consistently(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobHighLookupKey, createdHighJob)).Should(gomega.Succeed())
return createdHighJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
})

ginkgo.By("checking the workload is created", func() {
gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlHighLookupKey, createdHighWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("make workload high ready to admit", func() {
gomega.Eventually(func() error {
var newWL kueue.Workload
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(createdHighWorkload), &newWL)).To(gomega.Succeed())
check := workload.FindAdmissionCheck(newWL.Status.AdmissionChecks, "check")
workload.SetAdmissionCheckState(&newWL.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: check.Name,
State: kueue.CheckStateReady,
})
return k8sClient.Status().Update(ctx, &newWL)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("await for the high priority job to be admitted", func() {
wantSuspend := false
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobHighLookupKey, createdHighJob)).Should(gomega.Succeed())
return createdHighJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
gomega.Consistently(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobHighLookupKey, createdHighJob)).Should(gomega.Succeed())
return createdHighJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
})

ginkgo.By("await for the default job to be preempted", func() {
wantSuspend := true
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
gomega.Consistently(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
})

ginkgo.By("verify the PodSetUpdates are restored", func() {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
gomega.Expect(createdJob.Spec.Suspend).Should(gomega.Equal(ptr.To(true)))
gomega.Expect(createdJob.Labels).ShouldNot(gomega.HaveKey("label1"))
gomega.Expect(createdJob.Annotations).ShouldNot(gomega.HaveKey("ann1"))
gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector).ShouldNot(gomega.HaveKey("selector1"))
})

})
})

ginkgo.It("Should schedule jobs as they fit in their ClusterQueue", func() {
ginkgo.By("creating localQueues")
prodLocalQ = testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()
Expand Down

0 comments on commit 0ce7d07

Please sign in to comment.