Skip to content

Commit

Permalink
Propagate podsetupdates to jobs (kubernetes-sigs#1180)
Browse files Browse the repository at this point in the history
* propagate podsetupdates to jobs

* Remarks

* Move test test

* Remove focus

* consistent naming for `podSetInfos`

* consistent naming

* revert unnecessary rename
  • Loading branch information
mimowo authored and PBundyra committed Oct 26, 2023
1 parent a929d8e commit 3990567
Show file tree
Hide file tree
Showing 23 changed files with 1,706 additions and 109 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ help: ## Display this help.
manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and CustomResourceDefinition objects.
$(CONTROLLER_GEN) \
rbac:roleName=manager-role output:rbac:artifacts:config=config/components/rbac\
crd output:crd:artifacts:config=config/components/crd/bases\
crd:generateEmbeddedObjectMeta=true output:crd:artifacts:config=config/components/crd/bases\
webhook output:webhook:artifacts:config=config/components/webhook\
paths="./..."

Expand Down
34 changes: 34 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,23 @@ spec:
properties:
metadata:
description: 'Standard object''s metadata. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata'
properties:
annotations:
additionalProperties:
type: string
type: object
finalizers:
items:
type: string
type: array
labels:
additionalProperties:
type: string
type: object
name:
type: string
namespace:
type: string
type: object
spec:
description: 'Specification of the desired behavior of the
Expand Down Expand Up @@ -6880,6 +6897,23 @@ spec:
that will be copied into the PVC when
creating it. No other fields are allowed
and will be rejected during validation.
properties:
annotations:
additionalProperties:
type: string
type: object
finalizers:
items:
type: string
type: array
labels:
additionalProperties:
type: string
type: object
name:
type: string
namespace:
type: string
type: object
spec:
description: The specification for the
Expand Down
34 changes: 34 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ spec:
properties:
metadata:
description: 'Standard object''s metadata. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata'
properties:
annotations:
additionalProperties:
type: string
type: object
finalizers:
items:
type: string
type: array
labels:
additionalProperties:
type: string
type: object
name:
type: string
namespace:
type: string
type: object
spec:
description: 'Specification of the desired behavior of the
Expand Down Expand Up @@ -6867,6 +6884,23 @@ spec:
that will be copied into the PVC when
creating it. No other fields are allowed
and will be rejected during validation.
properties:
annotations:
additionalProperties:
type: string
type: object
finalizers:
items:
type: string
type: array
labels:
additionalProperties:
type: string
type: object
name:
type: string
namespace:
type: string
type: object
spec:
description: The specification for the
Expand Down
92 changes: 92 additions & 0 deletions pkg/controller/jobframework/podsetinfo.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package jobframework

import (
"maps"
"slices"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
)

type PodSetInfo struct {
Name string
Count int32
Annotations map[string]string
Labels map[string]string
NodeSelector map[string]string
Tolerations []corev1.Toleration
}

func (podSetInfo *PodSetInfo) Merge(o PodSetInfo) error {
if err := utilmaps.HaveConflict(podSetInfo.Annotations, o.Annotations); err != nil {
return BadPodSetsUpdateError("annotations", err)
}
if err := utilmaps.HaveConflict(podSetInfo.Labels, o.Labels); err != nil {
return BadPodSetsUpdateError("labels", err)
}
if err := utilmaps.HaveConflict(podSetInfo.NodeSelector, o.NodeSelector); err != nil {
return BadPodSetsUpdateError("nodeSelector", err)
}
podSetInfo.Annotations = utilmaps.MergeKeepFirst(podSetInfo.Annotations, o.Annotations)
podSetInfo.Labels = utilmaps.MergeKeepFirst(podSetInfo.Labels, o.Labels)
podSetInfo.NodeSelector = utilmaps.MergeKeepFirst(podSetInfo.NodeSelector, o.NodeSelector)
podSetInfo.Tolerations = append(podSetInfo.Tolerations, o.Tolerations...)
return nil
}

// Merge updates or appends the replica metadata & spec fields based on PodSetInfo.
// If returns error if there is a conflict.
func Merge(meta *metav1.ObjectMeta, spec *v1.PodSpec, info PodSetInfo) error {
if err := info.Merge(PodSetInfo{
Annotations: meta.Annotations,
Labels: meta.Labels,
NodeSelector: spec.NodeSelector,
Tolerations: spec.Tolerations,
}); err != nil {
return err
}
meta.Annotations = info.Annotations
meta.Labels = info.Labels
spec.NodeSelector = info.NodeSelector
spec.Tolerations = info.Tolerations
return nil
}

// Restore sets replica metadata and spec fields based on PodSetInfo.
// It returns true if there is any change.
func Restore(meta *metav1.ObjectMeta, spec *v1.PodSpec, info PodSetInfo) bool {
changed := false
if !maps.Equal(meta.Annotations, info.Annotations) {
meta.Annotations = maps.Clone(info.Annotations)
changed = true
}
if !maps.Equal(meta.Labels, info.Labels) {
meta.Labels = maps.Clone(info.Labels)
changed = true
}
if !maps.Equal(spec.NodeSelector, info.NodeSelector) {
spec.NodeSelector = maps.Clone(info.NodeSelector)
changed = true
}
if !slices.Equal(spec.Tolerations, info.Tolerations) {
spec.Tolerations = slices.Clone(info.Tolerations)
changed = true
}
return changed
}
57 changes: 38 additions & 19 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"maps"
"slices"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -40,7 +41,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
utilpriority "sigs.k8s.io/kueue/pkg/util/priority"
"sigs.k8s.io/kueue/pkg/util/slices"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand All @@ -55,6 +56,7 @@ var (
ErrNoMatchingWorkloads = errors.New("no matching workloads")
ErrExtraWorkloads = errors.New("extra workloads")
ErrInvalidPodsetInfo = errors.New("invalid podset infos")
ErrInvalidPodSetUpdate = errors.New("invalid admission check PodSetUpdate")
)

// JobReconciler reconciles a GenericJob object
Expand Down Expand Up @@ -362,7 +364,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

func isPermanent(e error) bool {
return errors.Is(e, ErrInvalidPodsetInfo)
return errors.Is(e, ErrInvalidPodsetInfo) || errors.Is(e, ErrInvalidPodSetUpdate)
}

// IsParentJobManaged checks whether the parent job is managed by kueue.
Expand Down Expand Up @@ -513,7 +515,7 @@ func (r *JobReconciler) equivalentToWorkload(job GenericJob, object client.Objec

// startJob will unsuspend the job, and also inject the node affinity.
func (r *JobReconciler) startJob(ctx context.Context, job GenericJob, object client.Object, wl *kueue.Workload) error {
info, err := r.getPodSetsInfoFromAdmission(ctx, wl)
info, err := r.getPodSetsInfoFromStatus(ctx, wl)
if err != nil {
return err
}
Expand Down Expand Up @@ -643,26 +645,23 @@ func extractPriorityFromPodSets(podSets []kueue.PodSet) string {
return ""
}

type PodSetInfo struct {
Name string `json:"name"`
NodeSelector map[string]string `json:"nodeSelector"`
Count int32 `json:"count"`
}

// getPodSetsInfoFromAdmission will extract podSetsInfo and podSets count from admitted workloads.
func (r *JobReconciler) getPodSetsInfoFromAdmission(ctx context.Context, w *kueue.Workload) ([]PodSetInfo, error) {
// getPodSetsInfoFromStatus extracts podSetInfos from workload status, based on
// admission, and admission checks.
func (r *JobReconciler) getPodSetsInfoFromStatus(ctx context.Context, w *kueue.Workload) ([]PodSetInfo, error) {
if len(w.Status.Admission.PodSetAssignments) == 0 {
return nil, nil
}

nodeSelectors := make([]PodSetInfo, len(w.Status.Admission.PodSetAssignments))
podSetInfos := make([]PodSetInfo, len(w.Status.Admission.PodSetAssignments))

for i, podSetFlavor := range w.Status.Admission.PodSetAssignments {
processedFlvs := sets.NewString()
nodeSelector := PodSetInfo{
podSetInfo := PodSetInfo{
Name: podSetFlavor.Name,
NodeSelector: make(map[string]string),
Count: ptr.Deref(podSetFlavor.Count, w.Spec.PodSets[i].Count),
Labels: make(map[string]string),
Annotations: make(map[string]string),
}
for _, flvRef := range podSetFlavor.Flavors {
flvName := string(flvRef)
Expand All @@ -675,14 +674,27 @@ func (r *JobReconciler) getPodSetsInfoFromAdmission(ctx context.Context, w *kueu
return nil, err
}
for k, v := range flv.Spec.NodeLabels {
nodeSelector.NodeSelector[k] = v
podSetInfo.NodeSelector[k] = v
}
processedFlvs.Insert(flvName)
}

nodeSelectors[i] = nodeSelector
for _, admissionCheck := range w.Status.AdmissionChecks {
for _, podSetUpdate := range admissionCheck.PodSetUpdates {
if podSetUpdate.Name == podSetInfo.Name {
if err := podSetInfo.Merge(PodSetInfo{
Labels: podSetUpdate.Labels,
Annotations: podSetUpdate.Annotations,
Tolerations: podSetUpdate.Tolerations,
NodeSelector: podSetUpdate.NodeSelector,
}); err != nil {
return nil, fmt.Errorf("in admission check %q: %w", admissionCheck.Name, err)
}
}
}
}
podSetInfos[i] = podSetInfo
}
return nodeSelectors, nil
return podSetInfos, nil
}

func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, job GenericJob, object client.Object) error {
Expand Down Expand Up @@ -734,11 +746,14 @@ func getPodSetsInfoFromWorkload(wl *kueue.Workload) []PodSetInfo {
return nil
}

return slices.Map(wl.Spec.PodSets, func(ps *kueue.PodSet) PodSetInfo {
return utilslices.Map(wl.Spec.PodSets, func(ps *kueue.PodSet) PodSetInfo {
return PodSetInfo{
Name: ps.Name,
NodeSelector: maps.Clone(ps.Template.Spec.NodeSelector),
Count: ps.Count,
Annotations: maps.Clone(ps.Template.Annotations),
Labels: maps.Clone(ps.Template.Labels),
NodeSelector: maps.Clone(ps.Template.Spec.NodeSelector),
Tolerations: slices.Clone(ps.Template.Spec.Tolerations),
}
})
}
Expand Down Expand Up @@ -790,3 +805,7 @@ func resetMinCounts(in []kueue.PodSet) []kueue.PodSet {
func BadPodSetsInfoLenError(want, got int) error {
return fmt.Errorf("%w: expecting %d podset, got %d", ErrInvalidPodsetInfo, got, want)
}

func BadPodSetsUpdateError(update string, err error) error {
return fmt.Errorf("%w: conflict for %v: %v", ErrInvalidPodSetUpdate, update, err)
}
14 changes: 3 additions & 11 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ package job
import (
"context"
"fmt"
"maps"
"strconv"

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -41,7 +39,6 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
utilmaps "sigs.k8s.io/kueue/pkg/util/maps"
)

var (
Expand Down Expand Up @@ -225,15 +222,14 @@ func (j *Job) RunWithPodSetsInfo(podSetsInfo []jobframework.PodSetInfo) error {
}

info := podSetsInfo[0]
j.Spec.Template.Spec.NodeSelector = utilmaps.MergeKeepFirst(info.NodeSelector, j.Spec.Template.Spec.NodeSelector)

if j.minPodsCount() != nil {
j.Spec.Parallelism = ptr.To(info.Count)
if j.syncCompletionWithParallelism() {
j.Spec.Completions = j.Spec.Parallelism
}
}
return nil
return jobframework.Merge(&j.Spec.Template.ObjectMeta, &j.Spec.Template.Spec, info)
}

func (j *Job) RestorePodSetsInfo(podSetsInfo []jobframework.PodSetInfo) bool {
Expand All @@ -250,12 +246,8 @@ func (j *Job) RestorePodSetsInfo(podSetsInfo []jobframework.PodSetInfo) bool {
j.Spec.Completions = j.Spec.Parallelism
}
}

if equality.Semantic.DeepEqual(j.Spec.Template.Spec.NodeSelector, podSetsInfo[0].NodeSelector) {
return changed
}
j.Spec.Template.Spec.NodeSelector = maps.Clone(podSetsInfo[0].NodeSelector)
return true
changed = jobframework.Restore(&j.Spec.Template.ObjectMeta, &j.Spec.Template.Spec, podSetsInfo[0]) || changed
return changed
}

func (j *Job) Finished() (metav1.Condition, bool) {
Expand Down
Loading

0 comments on commit 3990567

Please sign in to comment.