Skip to content

Commit

Permalink
WIP: propagate podsetupdates to jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Oct 4, 2023
1 parent e162f85 commit 63dd636
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 9 deletions.
34 changes: 25 additions & 9 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -642,9 +643,12 @@ func extractPriorityFromPodSets(podSets []kueue.PodSet) string {
}

type PodSetInfo struct {
Name string `json:"name"`
NodeSelector map[string]string `json:"nodeSelector"`
Count int32 `json:"count"`
Name string `json:"name"`
NodeSelector map[string]string `json:"nodeSelector"`
Count int32 `json:"count"`
Labels map[string]string `json:"labels"`
Annotations map[string]string `json:"annotations"`
Tolerations []corev1.Toleration `json:"tolerations"`
}

// getPodSetsInfoFromAdmission will extract podSetsInfo and podSets count from admitted workloads.
Expand All @@ -653,14 +657,17 @@ func (r *JobReconciler) getPodSetsInfoFromAdmission(ctx context.Context, w *kueu
return nil, nil
}

nodeSelectors := make([]PodSetInfo, len(w.Status.Admission.PodSetAssignments))
podsSetInfos := make([]PodSetInfo, len(w.Status.Admission.PodSetAssignments))

for i, podSetFlavor := range w.Status.Admission.PodSetAssignments {
processedFlvs := sets.NewString()
nodeSelector := PodSetInfo{
podSetInfo := PodSetInfo{
Name: podSetFlavor.Name,
NodeSelector: make(map[string]string),
Count: ptr.Deref(podSetFlavor.Count, w.Spec.PodSets[i].Count),
Labels: make(map[string]string),
Annotations: make(map[string]string),
Tolerations: make([]corev1.Toleration, 0),
}
for _, flvRef := range podSetFlavor.Flavors {
flvName := string(flvRef)
Expand All @@ -673,14 +680,23 @@ func (r *JobReconciler) getPodSetsInfoFromAdmission(ctx context.Context, w *kueu
return nil, err
}
for k, v := range flv.Spec.NodeLabels {
nodeSelector.NodeSelector[k] = v
podSetInfo.NodeSelector[k] = v
}
processedFlvs.Insert(flvName)
}

nodeSelectors[i] = nodeSelector
for _, admissionCheck := range w.Status.AdmissionChecks {
for _, podSetUpdate := range admissionCheck.PodSetUpdates {
if podSetUpdate.Name == podSetInfo.Name {
podSetInfo.Labels = utilmaps.MergeKeepFirst(podSetInfo.Labels, podSetUpdate.Labels)
podSetInfo.Annotations = utilmaps.MergeKeepFirst(podSetInfo.Annotations, podSetUpdate.Annotations)
podSetInfo.NodeSelector = utilmaps.MergeKeepFirst(podSetInfo.NodeSelector, podSetUpdate.NodeSelector)
podSetInfo.Tolerations = append(podSetInfo.Tolerations, podSetUpdate.Tolerations...)
}
}
}
podsSetInfos[i] = podSetInfo
}
return nodeSelectors, nil
return podsSetInfos, nil
}

func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job GenericJob, object client.Object) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ func (j *Job) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) error {

info := podSetsInfo[0]
j.Spec.Template.Spec.NodeSelector = utilmaps.MergeKeepFirst(info.NodeSelector, j.Spec.Template.Spec.NodeSelector)
j.Annotations = utilmaps.MergeKeepFirst(info.Annotations, j.Annotations)
j.Labels = utilmaps.MergeKeepFirst(info.Labels, j.Labels)
j.Spec.Template.Spec.Tolerations = append(j.Spec.Template.Spec.Tolerations, info.Tolerations...)

if j.minPodsCount() != nil {
j.Spec.Parallelism = ptr.To(info.Count)
Expand Down
122 changes: 122 additions & 0 deletions test/integration/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ const (
childJobName = jobName + "-child"
)

const (
flavorOnDemand = "on-demand"
)

var (
ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")
)
Expand Down Expand Up @@ -714,6 +718,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
podsCountClusterQ *kueue.ClusterQueue
prodLocalQ *kueue.LocalQueue
devLocalQ *kueue.LocalQueue
clusterQueueAc *kueue.ClusterQueue
)

ginkgo.BeforeAll(func() {
Expand Down Expand Up @@ -758,6 +763,12 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
).Obj()
gomega.Expect(k8sClient.Create(ctx, prodClusterQ)).Should(gomega.Succeed())

clusterQueueAc = testing.MakeClusterQueue("prod-cq-with-checks").
ResourceGroup(
*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(),
).AdmissionChecks("check").Obj()
gomega.Expect(k8sClient.Create(ctx, clusterQueueAc)).Should(gomega.Succeed())

devClusterQ = testing.MakeClusterQueue("dev-clusterqueue").
ResourceGroup(
*testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Obj(),
Expand All @@ -781,11 +792,122 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, prodClusterQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, devClusterQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, podsCountClusterQ, true)
util.ExpectClusterQueueToBeDeleted(ctx, k8sClient, clusterQueueAc, true)
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true)
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, spotTaintedFlavor, true)
util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, spotUntaintedFlavor, true)
})

ginkgo.FWhen("the queue has admission checks", func() {
var (
localQueue *kueue.LocalQueue
)

ginkgo.FIt("labels and annotations should be propagated from admission check to job", func() {
createdJob := &batchv1.Job{}
createdWorkload := &kueue.Workload{}
jobLookupKey := types.NamespacedName{Name: jobName, Namespace: ns.Name}
wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(jobName), Namespace: ns.Name}

ginkgo.By("create admission check", func() {
ac := testing.MakeAdmissionCheck("check").Obj()
gomega.Expect(k8sClient.Create(ctx, ac)).To(gomega.Succeed())
})

ginkgo.By("create local queue", func() {
localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueueAc.Name).Obj()
gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed())
})

ginkgo.By("creating the job", func() {
job := testingjob.MakeJob(jobName, ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "2").Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

wantSuspend := true
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
gomega.Consistently(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
})

ginkgo.By("checking the workload is created", func() {
gomega.Eventually(func() error {
return k8sClient.Get(ctx, wlLookupKey, createdWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Expect(createdWorkload.Spec.QueueName).Should(gomega.Equal("queue"))
gomega.Expect(metav1.IsControlledBy(createdWorkload, createdJob)).To(gomega.BeTrue())
})

ginkgo.By("add labels & annotations to the admission check", func() {
gomega.Eventually(func() error {
var newWL kueue.Workload
gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(createdWorkload), &newWL)).To(gomega.Succeed())
check := workload.FindAdmissionCheck(newWL.Status.AdmissionChecks, "check")
workload.SetAdmissionCheckState(&newWL.Status.AdmissionChecks, kueue.AdmissionCheckState{
Name: check.Name,
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
Labels: map[string]string{
"label1": "label-value1",
},
Annotations: map[string]string{
"ann1": "ann-value1",
},
NodeSelector: map[string]string{
"selector1": "selector-value1",
},
Tolerations: []corev1.Toleration{
{
Key: "selector1",
Value: "selector-value1",
Operator: corev1.TolerationOpEqual,
Effect: corev1.TaintEffectNoSchedule,
},
},
},
},
})
return k8sClient.Status().Update(ctx, &newWL)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
})

ginkgo.By("await for the job to be admitted", func() {
wantSuspend := false
gomega.Eventually(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
gomega.Consistently(func() *bool {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
return createdJob.Spec.Suspend
}, util.ConsistentDuration, util.Interval).Should(gomega.Equal(ptr.To(wantSuspend)))
})

ginkgo.By("verify the PodSetUpdates are propagated to the running job", func() {
gomega.Expect(k8sClient.Get(ctx, jobLookupKey, createdJob)).Should(gomega.Succeed())
gomega.Expect(createdJob.Labels).Should(gomega.HaveKeyWithValue("label1", "label-value1"))
gomega.Expect(createdJob.Annotations).Should(gomega.HaveKeyWithValue("ann1", "ann-value1"))
gomega.Expect(createdJob.Spec.Template.Spec.NodeSelector).Should(gomega.HaveKeyWithValue("selector1", "selector-value1"))
gomega.Expect(createdJob.Spec.Template.Spec.Tolerations).Should(gomega.BeComparableTo(
[]corev1.Toleration{
{
Key: "selector1",
Value: "selector-value1",
Operator: corev1.TolerationOpEqual,
Effect: corev1.TaintEffectNoSchedule,
},
},
))
})
})
})

ginkgo.It("Should schedule jobs as they fit in their ClusterQueue", func() {
ginkgo.By("creating localQueues")
prodLocalQ = testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj()
Expand Down

0 comments on commit 63dd636

Please sign in to comment.