From fe553068c92ef56296f74a0194a327e9aac7976f Mon Sep 17 00:00:00 2001 From: GreenHand Date: Mon, 9 Oct 2023 16:42:37 +0800 Subject: [PATCH] Enable preemption and borrowing based on flavor (#849) * add preempt and borrow based on flavor * Add some details * Skip the flavors which has been considered in last schedule * update crd * fix test * add some tests * fix last scheduling context * add test for last scheduling context * add test cases * replace usage state data in lastAssignmentState with generation * add featuregate * update helm * enable feature gate in test * update comments and tests * update helm * rebase and update test * add unit test for generation * update test * fix test * modify according to commments * update AllocatableResourceGeneration logic * set last assignment to nil when assignment was skipped * add comments * add some tests * add in feature table * modify test * fix test * move FlavorResourceQuantities to cache remove Preemption setting in MakeClusterQueue --- apis/kueue/v1beta1/clusterqueue_types.go | 38 ++ apis/kueue/v1beta1/zz_generated.deepcopy.go | 20 + .../crd/kueue.x-k8s.io_clusterqueues.yaml | 29 ++ .../kueue/v1beta1/clusterqueuespec.go | 9 + .../kueue/v1beta1/flavorfungibility.go | 51 +++ client-go/applyconfiguration/utils.go | 2 + .../bases/kueue.x-k8s.io_clusterqueues.yaml | 29 ++ .../README.md | 10 +- pkg/cache/cache_test.go | 324 ++++++++++----- pkg/cache/clusterqueue.go | 27 +- pkg/cache/snapshot.go | 22 +- pkg/cache/snapshot_test.go | 97 +++-- pkg/features/kube_features.go | 13 +- .../flavorassigner/flavorassigner.go | 117 +++++- .../flavorassigner/flavorassigner_test.go | 355 +++++++++++++++- pkg/scheduler/preemption/preemption_test.go | 2 + pkg/scheduler/scheduler.go | 5 + pkg/scheduler/scheduler_test.go | 383 ++++++++++++++++++ pkg/util/testing/wrappers.go | 10 + pkg/webhooks/clusterqueue_webhook.go | 6 + pkg/workload/workload.go | 21 +- site/content/en/docs/installation/_index.md | 1 + .../jobs/job/job_controller_test.go | 4 + test/integration/scheduler/scheduler_test.go | 95 ++++- test/integration/webhook/clusterqueue_test.go | 13 +- test/util/util.go | 18 + 26 files changed, 1529 insertions(+), 172 deletions(-) create mode 100644 client-go/applyconfiguration/kueue/v1beta1/flavorfungibility.go diff --git a/apis/kueue/v1beta1/clusterqueue_types.go b/apis/kueue/v1beta1/clusterqueue_types.go index 20cacab21d..264276908a 100644 --- a/apis/kueue/v1beta1/clusterqueue_types.go +++ b/apis/kueue/v1beta1/clusterqueue_types.go @@ -72,6 +72,10 @@ type ClusterQueueSpec struct { // If set to an empty selector `{}`, then all namespaces are eligible. NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"` + // flavorFungibility defines whether a workload should try the next flavor + // before borrowing or preempting in the flavor being evaluated. + FlavorFungibility *FlavorFungibility `json:"flavorFungibility,omitempty"` + // preemption describes policies to preempt Workloads from this ClusterQueue // or the ClusterQueue's cohort. // @@ -274,6 +278,40 @@ const ( PreemptionPolicyLowerOrNewerEqualPriority PreemptionPolicy = "LowerOrNewerEqualPriority" ) +type FlavorFungibilityPolicy string + +const ( + Borrow FlavorFungibilityPolicy = "Borrow" + Preempt FlavorFungibilityPolicy = "Preempt" + TryNextFlavor FlavorFungibilityPolicy = "TryNextFlavor" +) + +// FlavorFungibility determines whether a workload should try the next flavor +// before borrowing or preempting in current flavor. +type FlavorFungibility struct { + // whenCanBorrow determines whether a workload should try the next flavor + // before borrowing in current flavor. The possible values are: + // + // - `Borrow` (default): allocate in current flavor if borrowing + // is possible. + // - `TryNextFlavor`: try next flavor even if the current + // flavor has enough resources to borrow. + // + // +kubebuilder:validation:Enum={Borrow,TryNextFlavor} + // +kubebuilder:default="Borrow" + WhenCanBorrow FlavorFungibilityPolicy `json:"whenCanBorrow,omitempty"` + // whenCanPreempt determines whether a workload should try the next flavor + // before borrowing in current flavor. The possible values are: + // + // - `Preempt`: allocate in current flavor if it's possible to preempt some workloads. + // - `TryNextFlavor` (default): try next flavor even if there are enough + // candidates for preemption in the current flavor. + // + // +kubebuilder:validation:Enum={Preempt,TryNextFlavor} + // +kubebuilder:default="TryNextFlavor" + WhenCanPreempt FlavorFungibilityPolicy `json:"whenCanPreempt,omitempty"` +} + // ClusterQueuePreemption contains policies to preempt Workloads from this // ClusterQueue or the ClusterQueue's cohort. type ClusterQueuePreemption struct { diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index b6c36c23c1..f2ea1f3574 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -311,6 +311,11 @@ func (in *ClusterQueueSpec) DeepCopyInto(out *ClusterQueueSpec) { *out = new(v1.LabelSelector) (*in).DeepCopyInto(*out) } + if in.FlavorFungibility != nil { + in, out := &in.FlavorFungibility, &out.FlavorFungibility + *out = new(FlavorFungibility) + **out = **in + } if in.Preemption != nil { in, out := &in.Preemption, &out.Preemption *out = new(ClusterQueuePreemption) @@ -367,6 +372,21 @@ func (in *ClusterQueueStatus) DeepCopy() *ClusterQueueStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FlavorFungibility) DeepCopyInto(out *FlavorFungibility) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FlavorFungibility. +func (in *FlavorFungibility) DeepCopy() *FlavorFungibility { + if in == nil { + return nil + } + out := new(FlavorFungibility) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *FlavorQuotas) DeepCopyInto(out *FlavorQuotas) { *out = *in diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml index 3fc4ce14ac..8faa7201c9 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml @@ -84,6 +84,35 @@ spec: Validation of a cohort name is equivalent to that of object names: subdomain in DNS (RFC 1123)." type: string + flavorFungibility: + description: flavorFungibility defines whether a workload should try + the next flavor before borrowing or preempting in the flavor being + evaluated. + properties: + whenCanBorrow: + default: Borrow + description: "whenCanBorrow determines whether a workload should + try the next flavor before borrowing in current flavor. The + possible values are: \n - `Borrow` (default): allocate in current + flavor if borrowing is possible. - `TryNextFlavor`: try next + flavor even if the current flavor has enough resources to borrow." + enum: + - Borrow + - TryNextFlavor + type: string + whenCanPreempt: + default: TryNextFlavor + description: "whenCanPreempt determines whether a workload should + try the next flavor before borrowing in current flavor. The + possible values are: \n - `Preempt`: allocate in current flavor + if it's possible to preempt some workloads. - `TryNextFlavor` + (default): try next flavor even if there are enough candidates + for preemption in the current flavor." + enum: + - Preempt + - TryNextFlavor + type: string + type: object namespaceSelector: description: namespaceSelector defines which namespaces are allowed to submit workloads to this clusterQueue. Beyond this basic support diff --git a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go index 3faa0c17cf..6920fc4449 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/clusterqueuespec.go @@ -29,6 +29,7 @@ type ClusterQueueSpecApplyConfiguration struct { Cohort *string `json:"cohort,omitempty"` QueueingStrategy *kueuev1beta1.QueueingStrategy `json:"queueingStrategy,omitempty"` NamespaceSelector *v1.LabelSelector `json:"namespaceSelector,omitempty"` + FlavorFungibility *FlavorFungibilityApplyConfiguration `json:"flavorFungibility,omitempty"` Preemption *ClusterQueuePreemptionApplyConfiguration `json:"preemption,omitempty"` AdmissionChecks []string `json:"admissionChecks,omitempty"` } @@ -76,6 +77,14 @@ func (b *ClusterQueueSpecApplyConfiguration) WithNamespaceSelector(value v1.Labe return b } +// WithFlavorFungibility sets the FlavorFungibility field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the FlavorFungibility field is set to the value of the last call. +func (b *ClusterQueueSpecApplyConfiguration) WithFlavorFungibility(value *FlavorFungibilityApplyConfiguration) *ClusterQueueSpecApplyConfiguration { + b.FlavorFungibility = value + return b +} + // WithPreemption sets the Preemption field in the declarative configuration to the given value // and returns the receiver, so that objects can be built by chaining "With" function invocations. // If called multiple times, the Preemption field is set to the value of the last call. diff --git a/client-go/applyconfiguration/kueue/v1beta1/flavorfungibility.go b/client-go/applyconfiguration/kueue/v1beta1/flavorfungibility.go new file mode 100644 index 0000000000..8a2c2b7c27 --- /dev/null +++ b/client-go/applyconfiguration/kueue/v1beta1/flavorfungibility.go @@ -0,0 +1,51 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +// Code generated by applyconfiguration-gen. DO NOT EDIT. + +package v1beta1 + +import ( + v1beta1 "sigs.k8s.io/kueue/apis/kueue/v1beta1" +) + +// FlavorFungibilityApplyConfiguration represents an declarative configuration of the FlavorFungibility type for use +// with apply. +type FlavorFungibilityApplyConfiguration struct { + WhenCanBorrow *v1beta1.FlavorFungibilityPolicy `json:"whenCanBorrow,omitempty"` + WhenCanPreempt *v1beta1.FlavorFungibilityPolicy `json:"whenCanPreempt,omitempty"` +} + +// FlavorFungibilityApplyConfiguration constructs an declarative configuration of the FlavorFungibility type for use with +// apply. +func FlavorFungibility() *FlavorFungibilityApplyConfiguration { + return &FlavorFungibilityApplyConfiguration{} +} + +// WithWhenCanBorrow sets the WhenCanBorrow field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the WhenCanBorrow field is set to the value of the last call. +func (b *FlavorFungibilityApplyConfiguration) WithWhenCanBorrow(value v1beta1.FlavorFungibilityPolicy) *FlavorFungibilityApplyConfiguration { + b.WhenCanBorrow = &value + return b +} + +// WithWhenCanPreempt sets the WhenCanPreempt field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the WhenCanPreempt field is set to the value of the last call. +func (b *FlavorFungibilityApplyConfiguration) WithWhenCanPreempt(value v1beta1.FlavorFungibilityPolicy) *FlavorFungibilityApplyConfiguration { + b.WhenCanPreempt = &value + return b +} diff --git a/client-go/applyconfiguration/utils.go b/client-go/applyconfiguration/utils.go index dfae4a79e1..b7f5ffe654 100644 --- a/client-go/applyconfiguration/utils.go +++ b/client-go/applyconfiguration/utils.go @@ -50,6 +50,8 @@ func ForKind(kind schema.GroupVersionKind) interface{} { return &kueuev1beta1.ClusterQueueSpecApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("ClusterQueueStatus"): return &kueuev1beta1.ClusterQueueStatusApplyConfiguration{} + case v1beta1.SchemeGroupVersion.WithKind("FlavorFungibility"): + return &kueuev1beta1.FlavorFungibilityApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("FlavorQuotas"): return &kueuev1beta1.FlavorQuotasApplyConfiguration{} case v1beta1.SchemeGroupVersion.WithKind("FlavorUsage"): diff --git a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml index b8713d5364..e0fdcd7fb1 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml @@ -71,6 +71,35 @@ spec: Validation of a cohort name is equivalent to that of object names: subdomain in DNS (RFC 1123)." type: string + flavorFungibility: + description: flavorFungibility defines whether a workload should try + the next flavor before borrowing or preempting in the flavor being + evaluated. + properties: + whenCanBorrow: + default: Borrow + description: "whenCanBorrow determines whether a workload should + try the next flavor before borrowing in current flavor. The + possible values are: \n - `Borrow` (default): allocate in current + flavor if borrowing is possible. - `TryNextFlavor`: try next + flavor even if the current flavor has enough resources to borrow." + enum: + - Borrow + - TryNextFlavor + type: string + whenCanPreempt: + default: TryNextFlavor + description: "whenCanPreempt determines whether a workload should + try the next flavor before borrowing in current flavor. The + possible values are: \n - `Preempt`: allocate in current flavor + if it's possible to preempt some workloads. - `TryNextFlavor` + (default): try next flavor even if there are enough candidates + for preemption in the current flavor." + enum: + - Preempt + - TryNextFlavor + type: string + type: object namespaceSelector: description: namespaceSelector defines which namespaces are allowed to submit workloads to this clusterQueue. Beyond this basic support diff --git a/keps/582-preempt-based-on-flavor-order/README.md b/keps/582-preempt-based-on-flavor-order/README.md index 95bb5f00b7..5d21681f09 100644 --- a/keps/582-preempt-based-on-flavor-order/README.md +++ b/keps/582-preempt-based-on-flavor-order/README.md @@ -246,21 +246,21 @@ func assignFlavors(log logr.Logger, requests []workload.PodSetResources, podSets var assignment Assignment if lastAssignment != nil { assignment = Assignment{ - TotalBorrow: make(workload.FlavorResourceQuantities), + TotalBorrow: make(cache.FlavorResourceQuantities), PodSets: make([]PodSetAssignment, 0, len(requests)), LastState: *lastAssignment, - Usage: make(workload.FlavorResourceQuantities), + Usage: make(cache.FlavorResourceQuantities), } } else { assignment = Assignment{ - TotalBorrow: make(workload.FlavorResourceQuantities), + TotalBorrow: make(cache.FlavorResourceQuantities), PodSets: make([]PodSetAssignment, 0, len(requests)), LastState: workload.AssigmentClusterQueueState{ - LastAssignedFlavorIdx: make([]map[corev1.ResourceName]int, 0), + LastAssignedFlavorIdx: make([]map[corev1.ResourceName]int, 0, len(podSets)), CohortGeneration: 0, ClusterQueueGeneration: cq.Generation, }, - Usage: make(workload.FlavorResourceQuantities), + Usage: make(cache.FlavorResourceQuantities), } if cq.Cohort != nil { assignment.LastState.CohortGeneration = cq.Cohort.Generation diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index c7a2a8bd78..da778b1895 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -68,6 +68,13 @@ func TestCacheClusterQueueOperations(t *testing.T) { Cohort("two"). NamespaceSelector(nil). Obj(), + *utiltesting.MakeClusterQueue("f"). + Cohort("two"). + NamespaceSelector(nil). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + }). + Obj(), } setup := func(cache *Cache) error { cache.AddOrUpdateResourceFlavor( @@ -94,7 +101,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "a": { - Name: "a", + Name: "a", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -109,6 +117,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { LabelKeys: sets.New("cpuType"), }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, @@ -116,7 +125,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "b": { - Name: "b", + Name: "b", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -130,6 +140,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { LabelKeys: sets.New("cpuType"), }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, @@ -137,23 +148,28 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "c": { - Name: "c", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "c", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "d": { - Name: "d", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "d", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "e": { - Name: "e", + Name: "e", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -166,16 +182,30 @@ func TestCacheClusterQueueOperations(t *testing.T) { }}, }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "nonexistent-flavor": {corev1.ResourceCPU: 0}, }, Status: pending, Preemption: defaultPreemption, }, + "f": { + Name: "f", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + WhenCanPreempt: kueue.TryNextFlavor, + }, + }, }, wantCohorts: map[string]sets.Set[string]{ "one": sets.New("a", "b"), - "two": sets.New("c", "e"), + "two": sets.New("c", "e", "f"), }, }, { @@ -192,9 +222,11 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "foo": { - Name: "foo", - NamespaceSelector: labels.Everything(), - Status: active, + Name: "foo", + AllocatableResourceGeneration: 1, + NamespaceSelector: labels.Everything(), + Status: active, + FlavorFungibility: defaultFlavorFungibility, Preemption: kueue.ClusterQueuePreemption{ ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, @@ -218,7 +250,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "a": { - Name: "a", + Name: "a", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -232,6 +265,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { }}, LabelKeys: sets.New("cpuType"), }}, + FlavorFungibility: defaultFlavorFungibility, NamespaceSelector: labels.Nothing(), Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, @@ -240,7 +274,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "b": { - Name: "b", + Name: "b", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -253,6 +288,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { }}, LabelKeys: sets.New("cpuType"), }}, + FlavorFungibility: defaultFlavorFungibility, NamespaceSelector: labels.Nothing(), Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, @@ -261,23 +297,28 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "c": { - Name: "c", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "c", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "d": { - Name: "d", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "d", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "e": { - Name: "e", + Name: "e", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{ @@ -292,16 +333,30 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "nonexistent-flavor": {corev1.ResourceCPU: 0}, }, Status: pending, Preemption: defaultPreemption, }, + "f": { + Name: "f", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + WhenCanPreempt: kueue.TryNextFlavor, + }, + }, }, wantCohorts: map[string]sets.Set[string]{ "one": sets.New("a", "b"), - "two": sets.New("c", "e"), + "two": sets.New("c", "e", "f"), }, }, { @@ -343,7 +398,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "a": { - Name: "a", + Name: "a", + AllocatableResourceGeneration: 2, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -358,6 +414,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { LabelKeys: sets.New("cpuType", "region"), }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, @@ -365,31 +422,38 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "b": { - Name: "b", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Everything(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "b", + AllocatableResourceGeneration: 2, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Everything(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "c": { - Name: "c", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "c", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "d": { - Name: "d", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "d", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "e": { - Name: "e", + Name: "e", + AllocatableResourceGeneration: 2, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -404,16 +468,30 @@ func TestCacheClusterQueueOperations(t *testing.T) { LabelKeys: sets.New("cpuType", "region"), }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, Status: active, Preemption: defaultPreemption, }, + "f": { + Name: "f", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + WhenCanPreempt: kueue.TryNextFlavor, + }, + }, }, wantCohorts: map[string]sets.Set[string]{ "one": sets.New("b"), - "two": sets.New("a", "c", "e"), + "two": sets.New("a", "c", "e", "f"), }, }, { @@ -434,7 +512,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "b": { - Name: "b", + Name: "b", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -446,6 +525,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { LabelKeys: sets.New("cpuType"), }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, @@ -453,15 +533,18 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "c": { - Name: "c", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "c", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "e": { - Name: "e", + Name: "e", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{ @@ -476,16 +559,30 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "nonexistent-flavor": {corev1.ResourceCPU: 0}, }, Status: pending, Preemption: defaultPreemption, }, + "f": { + Name: "f", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + WhenCanPreempt: kueue.TryNextFlavor, + }, + }, }, wantCohorts: map[string]sets.Set[string]{ "one": sets.New("b"), - "two": sets.New("c", "e"), + "two": sets.New("c", "e", "f"), }, }, { @@ -502,7 +599,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "a": { - Name: "a", + Name: "a", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -517,6 +615,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { LabelKeys: sets.New("cpuType"), }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, @@ -524,7 +623,8 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "b": { - Name: "b", + Name: "b", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -538,6 +638,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { LabelKeys: sets.New("cpuType"), }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, @@ -545,23 +646,28 @@ func TestCacheClusterQueueOperations(t *testing.T) { Preemption: defaultPreemption, }, "c": { - Name: "c", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "c", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "d": { - Name: "d", - ResourceGroups: []ResourceGroup{}, - NamespaceSelector: labels.Nothing(), - Usage: FlavorResourceQuantities{}, - Status: active, - Preemption: defaultPreemption, + Name: "d", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, }, "e": { - Name: "e", + Name: "e", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{{ CoveredResources: sets.New(corev1.ResourceCPU), Flavors: []FlavorQuotas{{ @@ -574,14 +680,28 @@ func TestCacheClusterQueueOperations(t *testing.T) { }}, }}, NamespaceSelector: labels.Nothing(), + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{"nonexistent-flavor": {corev1.ResourceCPU: 0}}, Status: active, Preemption: defaultPreemption, }, + "f": { + Name: "f", + AllocatableResourceGeneration: 1, + ResourceGroups: []ResourceGroup{}, + NamespaceSelector: labels.Nothing(), + Usage: FlavorResourceQuantities{}, + Status: active, + Preemption: defaultPreemption, + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + WhenCanPreempt: kueue.TryNextFlavor, + }, + }, }, wantCohorts: map[string]sets.Set[string]{ "one": sets.New("a", "b"), - "two": sets.New("c", "e"), + "two": sets.New("c", "e", "f"), }, }, { @@ -611,8 +731,9 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "foo": { - Name: "foo", - NamespaceSelector: labels.Everything(), + Name: "foo", + NamespaceSelector: labels.Everything(), + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{ { CoveredResources: sets.New[corev1.ResourceName]("cpu", "memory"), @@ -651,6 +772,7 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, }, }, + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "foo": { "cpu": 0, @@ -686,11 +808,13 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "foo": { - Name: "foo", - NamespaceSelector: labels.Everything(), - Status: pending, - Preemption: defaultPreemption, - AdmissionChecks: sets.New("check1", "check2"), + Name: "foo", + NamespaceSelector: labels.Everything(), + Status: pending, + Preemption: defaultPreemption, + AllocatableResourceGeneration: 1, + FlavorFungibility: defaultFlavorFungibility, + AdmissionChecks: sets.New("check1", "check2"), }, }, wantCohorts: map[string]sets.Set[string]{}, @@ -712,11 +836,13 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "foo": { - Name: "foo", - NamespaceSelector: labels.Everything(), - Status: active, - Preemption: defaultPreemption, - AdmissionChecks: sets.New("check1", "check2"), + Name: "foo", + NamespaceSelector: labels.Everything(), + Status: active, + Preemption: defaultPreemption, + AllocatableResourceGeneration: 1, + FlavorFungibility: defaultFlavorFungibility, + AdmissionChecks: sets.New("check1", "check2"), }, }, wantCohorts: map[string]sets.Set[string]{}, @@ -739,11 +865,13 @@ func TestCacheClusterQueueOperations(t *testing.T) { }, wantClusterQueues: map[string]*ClusterQueue{ "foo": { - Name: "foo", - NamespaceSelector: labels.Everything(), - Status: pending, - Preemption: defaultPreemption, - AdmissionChecks: sets.New("check1", "check2"), + Name: "foo", + NamespaceSelector: labels.Everything(), + Status: pending, + Preemption: defaultPreemption, + AllocatableResourceGeneration: 1, + FlavorFungibility: defaultFlavorFungibility, + AdmissionChecks: sets.New("check1", "check2"), }, }, wantCohorts: map[string]sets.Set[string]{}, diff --git a/pkg/cache/clusterqueue.go b/pkg/cache/clusterqueue.go index c36d200b25..7c5bdd7b9e 100644 --- a/pkg/cache/clusterqueue.go +++ b/pkg/cache/clusterqueue.go @@ -32,8 +32,12 @@ type ClusterQueue struct { WorkloadsNotReady sets.Set[string] NamespaceSelector labels.Selector Preemption kueue.ClusterQueuePreemption - Status metrics.ClusterQueueStatus + FlavorFungibility kueue.FlavorFungibility AdmissionChecks sets.Set[string] + Status metrics.ClusterQueueStatus + // AllocatableResourceGeneration will be increased when some admitted workloads are + // deleted, or the resource groups are changed. + AllocatableResourceGeneration int64 // The following fields are not populated in a snapshot. @@ -52,6 +56,9 @@ type Cohort struct { // These fields are only populated for a snapshot. RequestableResources FlavorResourceQuantities Usage FlavorResourceQuantities + // This field will only be set in snapshot. This field equal to the sum of + // allocatable generation among its members. + AllocatableResourceGeneration int64 } type ResourceGroup struct { @@ -134,6 +141,8 @@ var defaultPreemption = kueue.ClusterQueuePreemption{ WithinClusterQueue: kueue.PreemptionPolicyNever, } +var defaultFlavorFungibility = kueue.FlavorFungibility{WhenCanBorrow: kueue.Borrow, WhenCanPreempt: kueue.TryNextFlavor} + func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, admissionChecks sets.Set[string]) error { c.updateResourceGroups(in.Spec.ResourceGroups) nsSelector, err := metav1.LabelSelectorAsSelector(in.Spec.NamespaceSelector) @@ -167,6 +176,18 @@ func (c *ClusterQueue) update(in *kueue.ClusterQueue, resourceFlavors map[kueue. c.Preemption = defaultPreemption } + if in.Spec.FlavorFungibility != nil { + c.FlavorFungibility = *in.Spec.FlavorFungibility + if c.FlavorFungibility.WhenCanBorrow == "" { + c.FlavorFungibility.WhenCanBorrow = defaultFlavorFungibility.WhenCanBorrow + } + if c.FlavorFungibility.WhenCanPreempt == "" { + c.FlavorFungibility.WhenCanPreempt = defaultFlavorFungibility.WhenCanPreempt + } + } else { + c.FlavorFungibility = defaultFlavorFungibility + } + return nil } @@ -196,6 +217,7 @@ func (c *ClusterQueue) updateResourceGroups(in []kueue.ResourceGroup) { rg.Flavors = append(rg.Flavors, fQuotas) } } + c.AllocatableResourceGeneration++ c.UpdateRGByResource() } @@ -316,6 +338,9 @@ func (c *ClusterQueue) deleteWorkload(w *kueue.Workload) { if c.podsReadyTracking && !apimeta.IsStatusConditionTrue(w.Status.Conditions, kueue.WorkloadPodsReady) { c.WorkloadsNotReady.Delete(k) } + // we only increase the AllocatableResourceGeneration cause the add of workload won't make more + // workloads fit in ClusterQueue. + c.AllocatableResourceGeneration++ delete(c.Workloads, k) reportAdmittedActiveWorkloads(wi.ClusterQueue, len(c.Workloads)) } diff --git a/pkg/cache/snapshot.go b/pkg/cache/snapshot.go index 5349a0b702..709390fb66 100644 --- a/pkg/cache/snapshot.go +++ b/pkg/cache/snapshot.go @@ -74,12 +74,14 @@ func (c *Cache) Snapshot() Snapshot { } for _, cohort := range c.cohorts { cohortCopy := newCohort(cohort.Name, cohort.Members.Len()) + cohortCopy.AllocatableResourceGeneration = 0 for cq := range cohort.Members { if cq.Active() { cqCopy := snap.ClusterQueues[cq.Name] cqCopy.accumulateResources(cohortCopy) cqCopy.Cohort = cohortCopy cohortCopy.Members.Insert(cqCopy) + cohortCopy.AllocatableResourceGeneration += cqCopy.AllocatableResourceGeneration } } } @@ -90,15 +92,17 @@ func (c *Cache) Snapshot() Snapshot { // objects and deep copies of changing ones. A reference to the cohort is not included. func (c *ClusterQueue) snapshot() *ClusterQueue { cc := &ClusterQueue{ - Name: c.Name, - ResourceGroups: c.ResourceGroups, // Shallow copy is enough. - RGByResource: c.RGByResource, // Shallow copy is enough. - Usage: make(FlavorResourceQuantities, len(c.Usage)), - Workloads: make(map[string]*workload.Info, len(c.Workloads)), - Preemption: c.Preemption, - NamespaceSelector: c.NamespaceSelector, - Status: c.Status, - AdmissionChecks: c.AdmissionChecks.Clone(), + Name: c.Name, + ResourceGroups: c.ResourceGroups, // Shallow copy is enough. + RGByResource: c.RGByResource, // Shallow copy is enough. + FlavorFungibility: c.FlavorFungibility, + AllocatableResourceGeneration: c.AllocatableResourceGeneration, + Usage: make(FlavorResourceQuantities, len(c.Usage)), + Workloads: make(map[string]*workload.Info, len(c.Workloads)), + Preemption: c.Preemption, + NamespaceSelector: c.NamespaceSelector, + Status: c.Status, + AdmissionChecks: c.AdmissionChecks.Clone(), } for fName, rUsage := range c.Usage { rUsageCopy := make(map[corev1.ResourceName]int64, len(rUsage)) diff --git a/pkg/cache/snapshot_test.go b/pkg/cache/snapshot_test.go index 801de4351e..fd187dd123 100644 --- a/pkg/cache/snapshot_test.go +++ b/pkg/cache/snapshot_test.go @@ -62,9 +62,11 @@ func TestSnapshot(t *testing.T) { wantSnapshot: Snapshot{ ClusterQueues: map[string]*ClusterQueue{ "a": { - Name: "a", - NamespaceSelector: labels.Everything(), - Status: active, + Name: "a", + NamespaceSelector: labels.Everything(), + Status: active, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, Workloads: map[string]*workload.Info{ "/alpha": workload.NewInfo( utiltesting.MakeWorkload("alpha", ""). @@ -73,9 +75,11 @@ func TestSnapshot(t *testing.T) { Preemption: defaultPreemption, }, "b": { - Name: "b", - NamespaceSelector: labels.Everything(), - Status: active, + Name: "b", + NamespaceSelector: labels.Everything(), + Status: active, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, Workloads: map[string]*workload.Info{ "/beta": workload.NewInfo( utiltesting.MakeWorkload("beta", ""). @@ -192,7 +196,8 @@ func TestSnapshot(t *testing.T) { }, wantSnapshot: func() Snapshot { cohort := &Cohort{ - Name: "borrowing", + Name: "borrowing", + AllocatableResourceGeneration: 2, RequestableResources: FlavorResourceQuantities{ "demand": { corev1.ResourceCPU: 100_000, @@ -219,8 +224,9 @@ func TestSnapshot(t *testing.T) { return Snapshot{ ClusterQueues: map[string]*ClusterQueue{ "a": { - Name: "a", - Cohort: cohort, + Name: "a", + Cohort: cohort, + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{ { CoveredResources: sets.New(corev1.ResourceCPU), @@ -241,6 +247,7 @@ func TestSnapshot(t *testing.T) { LabelKeys: sets.New("instance"), }, }, + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "demand": {corev1.ResourceCPU: 10_000}, "spot": {corev1.ResourceCPU: 0}, @@ -260,8 +267,9 @@ func TestSnapshot(t *testing.T) { Status: active, }, "b": { - Name: "b", - Cohort: cohort, + Name: "b", + Cohort: cohort, + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{ { CoveredResources: sets.New(corev1.ResourceCPU), @@ -283,6 +291,7 @@ func TestSnapshot(t *testing.T) { }}, }, }, + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "spot": { corev1.ResourceCPU: 10_000, @@ -321,7 +330,8 @@ func TestSnapshot(t *testing.T) { Status: active, }, "c": { - Name: "c", + Name: "c", + AllocatableResourceGeneration: 1, ResourceGroups: []ResourceGroup{ { CoveredResources: sets.New(corev1.ResourceCPU), @@ -333,6 +343,7 @@ func TestSnapshot(t *testing.T) { }}, }, }, + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": { corev1.ResourceCPU: 0, @@ -362,10 +373,12 @@ func TestSnapshot(t *testing.T) { wantSnapshot: Snapshot{ ClusterQueues: map[string]*ClusterQueue{ "with-preemption": { - Name: "with-preemption", - NamespaceSelector: labels.Everything(), - Status: active, - Workloads: map[string]*workload.Info{}, + Name: "with-preemption", + NamespaceSelector: labels.Everything(), + AllocatableResourceGeneration: 1, + Status: active, + Workloads: map[string]*workload.Info{}, + FlavorFungibility: defaultFlavorFungibility, Preemption: kueue.ClusterQueuePreemption{ ReclaimWithinCohort: kueue.PreemptionPolicyAny, WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, @@ -491,8 +504,9 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { remove: []string{"/c1-cpu", "/c1-memory-alpha", "/c1-memory-beta", "/c2-cpu-1", "/c2-cpu-2"}, want: func() Snapshot { cohort := &Cohort{ - Name: "cohort", - RequestableResources: initialCohortResources, + Name: "cohort", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, "alpha": {corev1.ResourceMemory: 0}, @@ -502,10 +516,12 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { return Snapshot{ ClusterQueues: map[string]*ClusterQueue{ "c1": { - Name: "c1", - Cohort: cohort, - Workloads: make(map[string]*workload.Info), - ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups, + Name: "c1", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, "alpha": {corev1.ResourceMemory: 0}, @@ -513,10 +529,12 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { }, }, "c2": { - Name: "c2", - Cohort: cohort, - Workloads: make(map[string]*workload.Info), - ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups, + Name: "c2", + Cohort: cohort, + Workloads: make(map[string]*workload.Info), + ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, }, @@ -529,8 +547,9 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { remove: []string{"/c1-cpu"}, want: func() Snapshot { cohort := &Cohort{ - Name: "cohort", - RequestableResources: initialCohortResources, + Name: "cohort", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 2_000}, "alpha": {corev1.ResourceMemory: utiltesting.Gi}, @@ -546,7 +565,9 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { "/c1-memory-alpha": nil, "/c1-memory-beta": nil, }, - ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups, + AllocatableResourceGeneration: 1, + ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 0}, "alpha": {corev1.ResourceMemory: utiltesting.Gi}, @@ -560,7 +581,9 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { "/c2-cpu-1": nil, "/c2-cpu-2": nil, }, - ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups, + ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, + AllocatableResourceGeneration: 1, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 2_000}, }, @@ -573,8 +596,9 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { remove: []string{"/c1-memory-alpha"}, want: func() Snapshot { cohort := &Cohort{ - Name: "cohort", - RequestableResources: initialCohortResources, + Name: "cohort", + AllocatableResourceGeneration: 2, + RequestableResources: initialCohortResources, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 3_000}, "alpha": {corev1.ResourceMemory: 0}, @@ -590,7 +614,9 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { "/c1-memory-alpha": nil, "/c1-memory-beta": nil, }, - ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups, + AllocatableResourceGeneration: 1, + ResourceGroups: cqCache.clusterQueues["c1"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 1_000}, "alpha": {corev1.ResourceMemory: 0}, @@ -604,7 +630,9 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { "/c2-cpu-1": nil, "/c2-cpu-2": nil, }, - ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups, + AllocatableResourceGeneration: 1, + ResourceGroups: cqCache.clusterQueues["c2"].ResourceGroups, + FlavorFungibility: defaultFlavorFungibility, Usage: FlavorResourceQuantities{ "default": {corev1.ResourceCPU: 2_000}, }, @@ -616,6 +644,7 @@ func TestSnapshotAddRemoveWorkload(t *testing.T) { } cmpOpts := append(snapCmpOpts, cmpopts.IgnoreFields(ClusterQueue{}, "NamespaceSelector", "Preemption", "Status"), + cmpopts.IgnoreFields(Cohort{}), cmpopts.IgnoreFields(Snapshot{}, "ResourceFlavors"), cmpopts.IgnoreTypes(&workload.Info{})) for name, tc := range cases { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 8c3fc0f9fc..af9600dbd0 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -33,12 +33,20 @@ const ( // // Enables partial admission. PartialAdmission featuregate.Feature = "PartialAdmission" + // owner: @stuton // kep: https://github.com/kubernetes-sigs/kueue/tree/main/keps/168-pending-workloads-visibility // alpha: v0.5 // // Enables queue visibility. QueueVisibility featuregate.Feature = "QueueVisibility" + + // owner: @KunWuLuan + // kep: https://github.com/kubernetes-sigs/kueue/tree/main/keps/582-preempt-based-on-flavor-order + // beta: v0.5 + // + // Enables flavor fungibility. + FlavorFungibility featuregate.Feature = "FlavorFungibility" ) func init() { @@ -52,8 +60,9 @@ func init() { // Entries are separated from each other with blank lines to avoid sweeping gofmt changes // when adding or removing one entry. var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - PartialAdmission: {Default: false, PreRelease: featuregate.Alpha}, - QueueVisibility: {Default: false, PreRelease: featuregate.Alpha}, + PartialAdmission: {Default: false, PreRelease: featuregate.Alpha}, + QueueVisibility: {Default: false, PreRelease: featuregate.Alpha}, + FlavorFungibility: {Default: true, PreRelease: featuregate.Beta}, } func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() { diff --git a/pkg/scheduler/flavorassigner/flavorassigner.go b/pkg/scheduler/flavorassigner/flavorassigner.go index b8dd74c1a3..d3e1cdc8f6 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner.go +++ b/pkg/scheduler/flavorassigner/flavorassigner.go @@ -34,12 +34,14 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/cache" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/workload" ) type Assignment struct { PodSets []PodSetAssignment TotalBorrow cache.FlavorResourceQuantities + LastState workload.AssigmentClusterQueueState // Usage is the accumulated Usage of resources as pod sets get // flavors assigned. @@ -218,9 +220,15 @@ func (m FlavorAssignmentMode) String() string { } type FlavorAssignment struct { - Name kueue.ResourceFlavorReference - Mode FlavorAssignmentMode - borrow int64 + Name kueue.ResourceFlavorReference + Mode FlavorAssignmentMode + FlavorIdx int + borrow int64 +} + +func lastAssignmentOutdated(wl *workload.Info, cq *cache.ClusterQueue) bool { + return cq.AllocatableResourceGeneration > wl.LastAssignment.ClusterQueueGeneration || + (cq.Cohort != nil && cq.Cohort.AllocatableResourceGeneration > wl.LastAssignment.CohortGeneration) } // AssignFlavors assigns flavors for each of the resources requested in each pod set. @@ -228,23 +236,40 @@ type FlavorAssignment struct { // be assigned immediately. Each assigned flavor is accompanied with a // FlavorAssignmentMode. func AssignFlavors(log logr.Logger, wl *workload.Info, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue, counts []int32) Assignment { + if wl.LastAssignment != nil && lastAssignmentOutdated(wl, cq) { + wl.LastAssignment = nil + } + if len(counts) == 0 { - return assignFlavors(log, wl.TotalRequests, wl.Obj.Spec.PodSets, resourceFlavors, cq) + return assignFlavors(log, wl.TotalRequests, wl.Obj.Spec.PodSets, resourceFlavors, cq, wl.LastAssignment) } currentResources := make([]workload.PodSetResources, len(wl.TotalRequests)) for i := range wl.TotalRequests { currentResources[i] = *wl.TotalRequests[i].ScaledTo(counts[i]) } - return assignFlavors(log, currentResources, wl.Obj.Spec.PodSets, resourceFlavors, cq) + return assignFlavors(log, currentResources, wl.Obj.Spec.PodSets, resourceFlavors, cq, wl.LastAssignment) } -func assignFlavors(log logr.Logger, requests []workload.PodSetResources, podSets []kueue.PodSet, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue) Assignment { +func assignFlavors(log logr.Logger, requests []workload.PodSetResources, podSets []kueue.PodSet, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue, lastAssignment *workload.AssigmentClusterQueueState) Assignment { assignment := Assignment{ TotalBorrow: make(cache.FlavorResourceQuantities), PodSets: make([]PodSetAssignment, 0, len(requests)), Usage: make(cache.FlavorResourceQuantities), } + if lastAssignment != nil { + assignment.LastState = *lastAssignment + } else { + assignment.LastState = workload.AssigmentClusterQueueState{ + LastAssignedFlavorIdx: make([]map[corev1.ResourceName]int, 0, len(podSets)), + CohortGeneration: 0, + ClusterQueueGeneration: cq.AllocatableResourceGeneration, + } + if cq.Cohort != nil { + assignment.LastState.CohortGeneration = cq.Cohort.AllocatableResourceGeneration + } + } + for i, podSet := range requests { if _, found := cq.RGByResource[corev1.ResourcePods]; found { podSet.Requests[corev1.ResourcePods] = int64(podSet.Count) @@ -271,7 +296,14 @@ func assignFlavors(log logr.Logger, requests []workload.PodSetResources, podSets } break } - flavors, status := assignment.findFlavorForResourceGroup(log, rg, podSet.Requests, resourceFlavors, cq, &podSets[i].Template.Spec) + lastFlavorAssignment := -1 + if lastAssignment != nil && len(lastAssignment.LastAssignedFlavorIdx) > i { + idx, ok := lastAssignment.LastAssignedFlavorIdx[i][resName] + if ok { + lastFlavorAssignment = idx + } + } + flavors, status := assignment.findFlavorForResourceGroup(log, rg, podSet.Requests, resourceFlavors, cq, &podSets[i].Template.Spec, lastFlavorAssignment) if status.IsError() || len(flavors) == 0 { psAssignment.Flavors = nil psAssignment.Status = status @@ -305,6 +337,7 @@ func (psa *PodSetAssignment) append(flavors ResourceAssignment, status *Status) } func (a *Assignment) append(requests workload.Requests, psAssignment *PodSetAssignment) { + flavorIdx := make(map[corev1.ResourceName]int, len(psAssignment.Flavors)) a.PodSets = append(a.PodSets, *psAssignment) for resource, flvAssignment := range psAssignment.Flavors { if flvAssignment.borrow > 0 { @@ -319,7 +352,9 @@ func (a *Assignment) append(requests workload.Requests, psAssignment *PodSetAssi a.Usage[flvAssignment.Name] = make(map[corev1.ResourceName]int64) } a.Usage[flvAssignment.Name][resource] += requests[resource] + flavorIdx[resource] = flvAssignment.FlavorIdx } + a.LastState.LastAssignedFlavorIdx = append(a.LastState.LastAssignedFlavorIdx, flavorIdx) } // findFlavorForResourceGroup finds the flavor which can satisfy the resource @@ -332,7 +367,8 @@ func (a *Assignment) findFlavorForResourceGroup( requests workload.Requests, resourceFlavors map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor, cq *cache.ClusterQueue, - spec *corev1.PodSpec) (ResourceAssignment, *Status) { + spec *corev1.PodSpec, + lastAssignment int) (ResourceAssignment, *Status) { status := &Status{} requests = filterRequestedResources(requests, rg.CoveredResources) @@ -341,7 +377,11 @@ func (a *Assignment) findFlavorForResourceGroup( // We will only check against the flavors' labels for the resource. selector := flavorSelector(spec, rg.LabelKeys) - for _, flvQuotas := range rg.Flavors { + flavorIdx := -1 + for idx, flvQuotas := range rg.Flavors { + if features.Enabled(features.FlavorFungibility) && idx <= lastAssignment { + continue + } flavor, exist := resourceFlavors[flvQuotas.Name] if !exist { log.Error(nil, "Flavor not found", "Flavor", flvQuotas.Name) @@ -364,6 +404,8 @@ func (a *Assignment) findFlavorForResourceGroup( continue } + flavorIdx = idx + needsBorrowing := false assignments := make(ResourceAssignment, len(requests)) // Calculate representativeMode for this assignment as the worst mode among all requests. representativeMode := Fit @@ -377,6 +419,7 @@ func (a *Assignment) findFlavorForResourceGroup( if mode < representativeMode { representativeMode = mode } + needsBorrowing = needsBorrowing || (mode == Fit && borrow > 0) if representativeMode == NoFit { // The flavor doesn't fit, no need to check other resources. break @@ -389,18 +432,62 @@ func (a *Assignment) findFlavorForResourceGroup( } } - if representativeMode > bestAssignmentMode { - bestAssignment = assignments - bestAssignmentMode = representativeMode - if bestAssignmentMode == Fit { - // All the resources fit in the cohort, no need to check more flavors. - return bestAssignment, nil + if features.Enabled(features.FlavorFungibility) { + if !shouldTryNextFlavor(representativeMode, cq.FlavorFungibility, needsBorrowing) { + bestAssignment = assignments + bestAssignmentMode = representativeMode + break + } + if representativeMode > bestAssignmentMode { + bestAssignment = assignments + bestAssignmentMode = representativeMode + } + } else { + if representativeMode > bestAssignmentMode { + bestAssignment = assignments + bestAssignmentMode = representativeMode + if bestAssignmentMode == Fit { + // All the resources fit in the cohort, no need to check more flavors. + return bestAssignment, nil + } + } + } + } + + if features.Enabled(features.FlavorFungibility) { + for _, assignment := range bestAssignment { + if flavorIdx == len(rg.Flavors)-1 { + // we have reach the last flavor, try from the first flavor next time + assignment.FlavorIdx = -1 + } else { + assignment.FlavorIdx = flavorIdx } } + if bestAssignmentMode == Fit { + return bestAssignment, nil + } } return bestAssignment, status } +func shouldTryNextFlavor(representativeMode FlavorAssignmentMode, flavorFungibility kueue.FlavorFungibility, needsBorrowing bool) bool { + policyPreempt := flavorFungibility.WhenCanPreempt + policyBorrow := flavorFungibility.WhenCanBorrow + if representativeMode == Preempt && policyPreempt == kueue.Preempt { + return false + } + + if representativeMode == Fit && needsBorrowing && policyBorrow == kueue.Borrow { + return false + } + + if representativeMode == Fit && !needsBorrowing { + return false + } + + return true +} + func flavorSelector(spec *corev1.PodSpec, allowedKeys sets.Set[string]) nodeaffinity.RequiredNodeAffinity { // This function generally replicates the implementation of kube-scheduler's NodeAffintiy // Filter plugin as of v1.24. diff --git a/pkg/scheduler/flavorassigner/flavorassigner_test.go b/pkg/scheduler/flavorassigner/flavorassigner_test.go index 7bed137754..3a466cae4a 100644 --- a/pkg/scheduler/flavorassigner/flavorassigner_test.go +++ b/pkg/scheduler/flavorassigner/flavorassigner_test.go @@ -36,6 +36,10 @@ import ( ) func TestAssignFlavors(t *testing.T) { + defaultFlavorFungibility := kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.TryNextFlavor, + } resourceFlavors := map[kueue.ResourceFlavorReference]*kueue.ResourceFlavor{ "default": { ObjectMeta: metav1.ObjectMeta{Name: "default"}, @@ -1571,6 +1575,278 @@ func TestAssignFlavors(t *testing.T) { }, wantRepMode: Fit, }, + "preempt before try next flavor": { + wlPods: []kueue.PodSet{ + *utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "9"). + Obj(), + }, + clusterQueue: cache.ClusterQueue{ + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.Preempt, + }, + ResourceGroups: []cache.ResourceGroup{{ + CoveredResources: sets.New(corev1.ResourceCPU, corev1.ResourcePods), + Flavors: []cache.FlavorQuotas{{ + Name: "one", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000}, + }, + }, { + Name: "two", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000}, + }, + }}, + }}, + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + }, + wantRepMode: Preempt, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "one", Mode: Preempt}, + corev1.ResourcePods: {Name: "one", Mode: Fit}, + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("9000m"), + corev1.ResourcePods: resource.MustParse("1"), + }, + Status: &Status{ + reasons: []string{"insufficient unused quota for cpu in flavor one, 1 more needed"}, + }, + Count: 1, + }}, + Usage: cache.FlavorResourceQuantities{"one": {"cpu": 9000, "pods": 1}}, + }, + }, + "preempt try next flavor": { + wlPods: []kueue.PodSet{ + *utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "9"). + Obj(), + }, + clusterQueue: cache.ClusterQueue{ + FlavorFungibility: defaultFlavorFungibility, + ResourceGroups: []cache.ResourceGroup{{ + CoveredResources: sets.New(corev1.ResourceCPU, corev1.ResourcePods), + Flavors: []cache.FlavorQuotas{{ + Name: "one", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000}, + }, + }, { + Name: "two", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000}, + }, + }}, + }}, + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + }, + wantRepMode: Fit, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "two", Mode: Fit}, + corev1.ResourcePods: {Name: "two", Mode: Fit}, + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("9000m"), + corev1.ResourcePods: resource.MustParse("1"), + }, + Count: 1, + }}, + Usage: cache.FlavorResourceQuantities{"two": {"cpu": 9000, "pods": 1}}, + }, + }, + "borrow try next flavor, found the first flavor": { + wlPods: []kueue.PodSet{ + *utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "9"). + Obj(), + }, + clusterQueue: cache.ClusterQueue{ + Cohort: &cache.Cohort{ + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + RequestableResources: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 11000, corev1.ResourcePods: 10}, + "two": {corev1.ResourceCPU: 1000, corev1.ResourcePods: 10}, + }, + }, + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + WhenCanPreempt: kueue.TryNextFlavor, + }, + ResourceGroups: []cache.ResourceGroup{{ + CoveredResources: sets.New(corev1.ResourceCPU, corev1.ResourcePods), + Flavors: []cache.FlavorQuotas{{ + Name: "one", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000, BorrowingLimit: ptr.To[int64](1000)}, + }, + }, { + Name: "two", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 1000}, + }, + }}, + }}, + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + }, + wantRepMode: Fit, + wantAssignment: Assignment{ + TotalBorrow: cache.FlavorResourceQuantities{"one": {"cpu": 1000}}, + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "one", Mode: Fit}, + corev1.ResourcePods: {Name: "one", Mode: Fit}, + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("9000m"), + corev1.ResourcePods: resource.MustParse("1"), + }, + Count: 1, + }}, + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 9000, corev1.ResourcePods: 1}, + }, + }, + }, + "borrow try next flavor, found the second flavor": { + wlPods: []kueue.PodSet{ + *utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "9"). + Obj(), + }, + clusterQueue: cache.ClusterQueue{ + Cohort: &cache.Cohort{ + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + RequestableResources: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 11000, corev1.ResourcePods: 10}, + "two": {corev1.ResourceCPU: 10000, corev1.ResourcePods: 10}, + }, + }, + FlavorFungibility: kueue.FlavorFungibility{ + WhenCanBorrow: kueue.TryNextFlavor, + WhenCanPreempt: kueue.TryNextFlavor, + }, + ResourceGroups: []cache.ResourceGroup{{ + CoveredResources: sets.New(corev1.ResourceCPU, corev1.ResourcePods), + Flavors: []cache.FlavorQuotas{{ + Name: "one", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000, BorrowingLimit: ptr.To[int64](1000)}, + }, + }, { + Name: "two", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000}, + }, + }}, + }}, + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + }, + wantRepMode: Fit, + wantAssignment: Assignment{ + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "two", Mode: Fit}, + corev1.ResourcePods: {Name: "two", Mode: Fit}, + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("9000m"), + corev1.ResourcePods: resource.MustParse("1"), + }, + Count: 1, + }}, + Usage: cache.FlavorResourceQuantities{ + "two": {corev1.ResourceCPU: 9000, corev1.ResourcePods: 1}, + }, + }, + }, + "borrow before try next flavor": { + wlPods: []kueue.PodSet{ + *utiltesting.MakePodSet("main", 1). + Request(corev1.ResourceCPU, "9"). + Obj(), + }, + clusterQueue: cache.ClusterQueue{ + Cohort: &cache.Cohort{ + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + RequestableResources: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 11000, corev1.ResourcePods: 10}, + "two": {corev1.ResourceCPU: 10000, corev1.ResourcePods: 10}, + }, + }, + FlavorFungibility: defaultFlavorFungibility, + ResourceGroups: []cache.ResourceGroup{{ + CoveredResources: sets.New(corev1.ResourceCPU, corev1.ResourcePods), + Flavors: []cache.FlavorQuotas{{ + Name: "one", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000, BorrowingLimit: ptr.To[int64](1000)}, + }, + }, { + Name: "two", + Resources: map[corev1.ResourceName]*cache.ResourceQuota{ + corev1.ResourcePods: {Nominal: 10}, + corev1.ResourceCPU: {Nominal: 10000}, + }, + }}, + }}, + Usage: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 2000}, + }, + }, + wantRepMode: Fit, + wantAssignment: Assignment{ + TotalBorrow: cache.FlavorResourceQuantities{ + "one": {corev1.ResourceCPU: 1000}, + }, + PodSets: []PodSetAssignment{{ + Name: "main", + Flavors: ResourceAssignment{ + corev1.ResourceCPU: {Name: "one", Mode: Fit}, + corev1.ResourcePods: {Name: "one", Mode: Fit}, + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("9000m"), + corev1.ResourcePods: resource.MustParse("1"), + }, + Count: 1, + }}, + Usage: cache.FlavorResourceQuantities{"one": {"cpu": 9000, "pods": 1}}, + }, + }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { @@ -1585,6 +1861,12 @@ func TestAssignFlavors(t *testing.T) { ReclaimablePods: tc.wlReclaimablePods, }, }) + if tc.clusterQueue.FlavorFungibility.WhenCanBorrow == "" { + tc.clusterQueue.FlavorFungibility.WhenCanBorrow = kueue.Borrow + } + if tc.clusterQueue.FlavorFungibility.WhenCanPreempt == "" { + tc.clusterQueue.FlavorFungibility.WhenCanPreempt = kueue.TryNextFlavor + } tc.clusterQueue.UpdateWithFlavors(resourceFlavors) tc.clusterQueue.UpdateRGByResource() assignment := AssignFlavors(log, wlInfo, resourceFlavors, &tc.clusterQueue, nil) @@ -1592,9 +1874,80 @@ func TestAssignFlavors(t *testing.T) { t.Errorf("e.assignFlavors(_).RepresentativeMode()=%s, want %s", repMode, tc.wantRepMode) } - if diff := cmp.Diff(tc.wantAssignment, assignment, cmpopts.IgnoreUnexported(Assignment{}, FlavorAssignment{})); diff != "" { + if diff := cmp.Diff(tc.wantAssignment, assignment, cmpopts.IgnoreUnexported(Assignment{}, FlavorAssignment{}), cmpopts.IgnoreFields(Assignment{}, "LastState"), cmpopts.IgnoreFields(FlavorAssignment{}, "FlavorIdx")); diff != "" { t.Errorf("Unexpected assignment (-want,+got):\n%s", diff) } }) } } + +func TestLastAssignmentOutdated(t *testing.T) { + type args struct { + wl *workload.Info + cq *cache.ClusterQueue + } + tests := []struct { + name string + args args + want bool + }{ + { + name: "Cluster queue allocatableResourceIncreasedGen increased", + args: args{ + wl: &workload.Info{ + LastAssignment: &workload.AssigmentClusterQueueState{ + ClusterQueueGeneration: 0, + }, + }, + cq: &cache.ClusterQueue{ + Cohort: nil, + AllocatableResourceGeneration: 1, + }, + }, + want: true, + }, + { + name: "Cohort allocatableResourceIncreasedGen increased", + args: args{ + wl: &workload.Info{ + LastAssignment: &workload.AssigmentClusterQueueState{ + ClusterQueueGeneration: 0, + CohortGeneration: 0, + }, + }, + cq: &cache.ClusterQueue{ + Cohort: &cache.Cohort{ + AllocatableResourceGeneration: 1, + }, + AllocatableResourceGeneration: 0, + }, + }, + want: true, + }, + { + name: "AllocatableResourceGeneration not increased", + args: args{ + wl: &workload.Info{ + LastAssignment: &workload.AssigmentClusterQueueState{ + ClusterQueueGeneration: 0, + CohortGeneration: 0, + }, + }, + cq: &cache.ClusterQueue{ + Cohort: &cache.Cohort{ + AllocatableResourceGeneration: 0, + }, + AllocatableResourceGeneration: 0, + }, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := lastAssignmentOutdated(tt.args.wl, tt.args.cq); got != tt.want { + t.Errorf("LastAssignmentOutdated() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/scheduler/preemption/preemption_test.go b/pkg/scheduler/preemption/preemption_test.go index 794a222f93..a0ef325b4f 100644 --- a/pkg/scheduler/preemption/preemption_test.go +++ b/pkg/scheduler/preemption/preemption_test.go @@ -42,6 +42,8 @@ import ( var snapCmpOpts = []cmp.Option{ cmpopts.EquateEmpty(), cmpopts.IgnoreUnexported(cache.ClusterQueue{}), + cmpopts.IgnoreFields(cache.Cohort{}, "AllocatableResourceGeneration"), + cmpopts.IgnoreFields(cache.ClusterQueue{}, "AllocatableResourceGeneration"), cmp.Transformer("Cohort.Members", func(s sets.Set[*cache.ClusterQueue]) sets.Set[string] { result := make(sets.Set[string], len(s)) for cq := range s { diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 4ec5ffd0ce..37e787e01e 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -194,6 +194,10 @@ func (s *Scheduler) schedule(ctx context.Context) { if cycleCohortsUsage.hasCommonFlavorResources(cq.Cohort.Name, e.assignment.Usage) && !cq.Cohort.CanFit(sum) { e.status = skipped e.inadmissibleMsg = "other workloads in the cohort were prioritized" + // When the workload needs borrowing and there is another workload in cohort doesn't + // need borrowing, the workload needborrowing will come again. In this case we should + // not skip the previous flavors. + e.LastAssignment = nil continue } // Even if the workload will not be admitted after this point, due to preemption pending or other failures, @@ -308,6 +312,7 @@ func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, sna } else { e.assignment, e.preemptionTargets = s.getAssignments(log, &e.Info, &snap) e.inadmissibleMsg = e.assignment.Message() + e.Info.LastAssignment = &e.assignment.LastState } entries = append(entries, e) } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 9737eb8006..4ca818acbb 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -1263,6 +1263,389 @@ func TestEntryOrdering(t *testing.T) { } } +func TestLastSchedulingContext(t *testing.T) { + resourceFlavors := []*kueue.ResourceFlavor{ + {ObjectMeta: metav1.ObjectMeta{Name: "on-demand"}}, + {ObjectMeta: metav1.ObjectMeta{Name: "spot"}}, + } + clusterQueue := []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("eng-alpha"). + QueueingStrategy(kueue.StrictFIFO). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, + }). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanPreempt: kueue.Preempt, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "50", "50").Obj(), + *utiltesting.MakeFlavorQuotas("spot"). + Resource(corev1.ResourceCPU, "100", "0").Obj(), + ).Obj(), + } + clusterQueue_cohort := []kueue.ClusterQueue{ + *utiltesting.MakeClusterQueue("eng-cohort-alpha"). + Cohort("cohort"). + QueueingStrategy(kueue.StrictFIFO). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyNever, + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + }). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanPreempt: kueue.Preempt, + WhenCanBorrow: kueue.Borrow, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "50", "50").Obj(), + *utiltesting.MakeFlavorQuotas("spot"). + Resource(corev1.ResourceCPU, "100", "0").Obj(), + ).Obj(), + *utiltesting.MakeClusterQueue("eng-cohort-beta"). + Cohort("cohort"). + QueueingStrategy(kueue.StrictFIFO). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyNever, + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + }). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanPreempt: kueue.Preempt, + WhenCanBorrow: kueue.Borrow, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "50", "50").Obj(), + *utiltesting.MakeFlavorQuotas("spot"). + Resource(corev1.ResourceCPU, "100", "0").Obj(), + ).Obj(), + *utiltesting.MakeClusterQueue("eng-cohort-theta"). + Cohort("cohort"). + QueueingStrategy(kueue.StrictFIFO). + Preemption(kueue.ClusterQueuePreemption{ + WithinClusterQueue: kueue.PreemptionPolicyNever, + ReclaimWithinCohort: kueue.PreemptionPolicyLowerPriority, + }). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanPreempt: kueue.TryNextFlavor, + WhenCanBorrow: kueue.TryNextFlavor, + }). + ResourceGroup( + *utiltesting.MakeFlavorQuotas("on-demand"). + Resource(corev1.ResourceCPU, "50", "50").Obj(), + *utiltesting.MakeFlavorQuotas("spot"). + Resource(corev1.ResourceCPU, "100", "0").Obj(), + ).Obj(), + } + + queues := []kueue.LocalQueue{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "main", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "eng-alpha", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "main-alpha", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "eng-cohort-alpha", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "main-beta", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "eng-cohort-beta", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "main-theta", + }, + Spec: kueue.LocalQueueSpec{ + ClusterQueue: "eng-cohort-theta", + }, + }, + } + wl := utiltesting.MakeWorkload("low-1", "default"). + Request(corev1.ResourceCPU, "50"). + ReserveQuota(utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj()). + Admitted(true). + Obj() + cases := []struct { + name string + cqs []kueue.ClusterQueue + admittedWorkloads []kueue.Workload + workloads []kueue.Workload + deletedWorkloads []kueue.Workload + wantPreempted sets.Set[string] + wantAdmissionsOnFirstSchedule map[string]kueue.Admission + wantAdmissionsOnSecondSchedule map[string]kueue.Admission + }{ + { + name: "scheduling context not changed", + cqs: clusterQueue, + admittedWorkloads: []kueue.Workload{ + *wl, + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("preemptor", "default"). + Queue("main"). + Request(corev1.ResourceCPU, "20"). + Obj(), + }, + deletedWorkloads: []kueue.Workload{}, + wantPreempted: sets.Set[string]{}, + wantAdmissionsOnFirstSchedule: map[string]kueue.Admission{}, + wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{ + "default/preemptor": *utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "spot", "20").Obj(), + "default/low-1": *utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj(), + }, + }, + { + name: "some workloads were deleted", + cqs: clusterQueue, + admittedWorkloads: []kueue.Workload{ + *wl, + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("preemptor", "default"). + Queue("main"). + Request(corev1.ResourceCPU, "20"). + Obj(), + }, + deletedWorkloads: []kueue.Workload{ + *wl, + }, + wantPreempted: sets.Set[string]{}, + wantAdmissionsOnFirstSchedule: map[string]kueue.Admission{}, + wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{ + "default/preemptor": *utiltesting.MakeAdmission("eng-alpha").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(), + }, + }, + { + name: "borrow before next flavor", + cqs: clusterQueue_cohort, + admittedWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("placeholder", "default"). + Request(corev1.ResourceCPU, "50"). + ReserveQuota(utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj()). + Admitted(true). + Obj(), + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("borrower", "default"). + Queue("main-alpha"). + Request(corev1.ResourceCPU, "20"). + Obj(), + *utiltesting.MakeWorkload("workload1", "default"). + Queue("main-beta"). + Request(corev1.ResourceCPU, "20"). + Obj(), + }, + deletedWorkloads: []kueue.Workload{}, + wantPreempted: sets.Set[string]{}, + wantAdmissionsOnFirstSchedule: map[string]kueue.Admission{ + "default/workload1": *utiltesting.MakeAdmission("eng-cohort-beta").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(), + "default/borrower": *utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(), + }, + wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{ + "default/placeholder": *utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj(), + "default/workload1": *utiltesting.MakeAdmission("eng-cohort-beta").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(), + "default/borrower": *utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(), + }, + }, + { + name: "borrow after all flavors", + cqs: clusterQueue_cohort, + admittedWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("placeholder", "default"). + Request(corev1.ResourceCPU, "50"). + ReserveQuota(utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj()). + Admitted(true). + Obj(), + *utiltesting.MakeWorkload("placeholder1", "default"). + Request(corev1.ResourceCPU, "50"). + ReserveQuota(utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj()). + Admitted(true). + Obj(), + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("workload", "default"). + Queue("main-theta"). + Request(corev1.ResourceCPU, "20"). + Obj(), + }, + deletedWorkloads: []kueue.Workload{}, + wantPreempted: sets.Set[string]{}, + wantAdmissionsOnFirstSchedule: map[string]kueue.Admission{ + "default/workload": *utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "spot", "20").Obj(), + }, + wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{ + "default/placeholder": *utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj(), + "default/placeholder1": *utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "on-demand", "50").Obj(), + "default/workload": *utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "spot", "20").Obj(), + }, + }, + { + name: "when the next flavor is full", + cqs: clusterQueue_cohort, + admittedWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("placeholder", "default"). + Request(corev1.ResourceCPU, "40"). + ReserveQuota(utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "40").Obj()). + Admitted(true). + Obj(), + *utiltesting.MakeWorkload("placeholder1", "default"). + Request(corev1.ResourceCPU, "40"). + ReserveQuota(utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "on-demand", "40").Obj()). + Admitted(true). + Obj(), + *utiltesting.MakeWorkload("placeholder2", "default"). + Request(corev1.ResourceCPU, "100"). + ReserveQuota(utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "spot", "100").Obj()). + Admitted(true). + Obj(), + }, + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("workload", "default"). + Queue("main-theta"). + Request(corev1.ResourceCPU, "20"). + Obj(), + }, + deletedWorkloads: []kueue.Workload{}, + wantPreempted: sets.Set[string]{}, + wantAdmissionsOnFirstSchedule: map[string]kueue.Admission{ + "default/workload": *utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(), + }, + wantAdmissionsOnSecondSchedule: map[string]kueue.Admission{ + "default/placeholder": *utiltesting.MakeAdmission("eng-cohort-alpha").Assignment(corev1.ResourceCPU, "on-demand", "40").Obj(), + "default/placeholder1": *utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "on-demand", "40").Obj(), + "default/placeholder2": *utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "spot", "100").Obj(), + "default/workload": *utiltesting.MakeAdmission("eng-cohort-theta").Assignment(corev1.ResourceCPU, "on-demand", "20").Obj(), + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ctx, _ := utiltesting.ContextWithLog(t) + scheme := runtime.NewScheme() + + clientBuilder := utiltesting.NewClientBuilder(). + WithLists(&kueue.WorkloadList{Items: tc.admittedWorkloads}, + &kueue.WorkloadList{Items: tc.workloads}, + &kueue.ClusterQueueList{Items: tc.cqs}, + &kueue.LocalQueueList{Items: queues}). + WithObjects( + &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}}, + ) + cl := clientBuilder.Build() + broadcaster := record.NewBroadcaster() + recorder := broadcaster.NewRecorder(scheme, + corev1.EventSource{Component: constants.AdmissionName}) + cqCache := cache.New(cl) + qManager := queue.NewManager(cl, cqCache) + // Workloads are loaded into queues or clusterQueues as we add them. + for _, q := range queues { + if err := qManager.AddLocalQueue(ctx, &q); err != nil { + t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) + } + } + for i := range resourceFlavors { + cqCache.AddOrUpdateResourceFlavor(resourceFlavors[i]) + } + for _, cq := range tc.cqs { + if err := cqCache.AddClusterQueue(ctx, &cq); err != nil { + t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err) + } + if err := qManager.AddClusterQueue(ctx, &cq); err != nil { + t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err) + } + } + scheduler := New(qManager, cqCache, cl, recorder) + gotScheduled := make(map[string]kueue.Admission) + var mu sync.Mutex + scheduler.applyAdmission = func(ctx context.Context, w *kueue.Workload) error { + mu.Lock() + gotScheduled[workload.Key(w)] = *w.Status.Admission + mu.Unlock() + return nil + } + wg := sync.WaitGroup{} + scheduler.setAdmissionRoutineWrapper(routine.NewWrapper( + func() { wg.Add(1) }, + func() { wg.Done() }, + )) + gotPreempted := sets.New[string]() + scheduler.preemptor.OverrideApply(func(_ context.Context, w *kueue.Workload) error { + mu.Lock() + gotPreempted.Insert(workload.Key(w)) + mu.Unlock() + return nil + }) + + ctx, cancel := context.WithTimeout(ctx, queueingTimeout) + go qManager.CleanUpOnContext(ctx) + defer cancel() + + scheduler.schedule(ctx) + wg.Wait() + + if diff := cmp.Diff(tc.wantPreempted, gotPreempted); diff != "" { + t.Errorf("Unexpected preemptions (-want,+got):\n%s", diff) + } + if diff := cmp.Diff(tc.wantAdmissionsOnFirstSchedule, gotScheduled); diff != "" { + t.Errorf("Unexpected scheduled workloads (-want,+got):\n%s", diff) + } + + for _, wl := range tc.deletedWorkloads { + err := cl.Delete(ctx, &wl) + if err != nil { + t.Errorf("Delete workload failed: %v", err) + } + err = cqCache.DeleteWorkload(&wl) + if err != nil { + t.Errorf("Delete workload failed: %v", err) + } + } + + scheduler.schedule(ctx) + wg.Wait() + + if diff := cmp.Diff(tc.wantPreempted, gotPreempted); diff != "" { + t.Errorf("Unexpected preemptions (-want,+got):\n%s", diff) + } + // Verify assignments in cache. + gotAssignments := make(map[string]kueue.Admission) + snapshot := cqCache.Snapshot() + for cqName, c := range snapshot.ClusterQueues { + for name, w := range c.Workloads { + if !workload.IsAdmitted(w.Obj) { + t.Errorf("Workload %s is not admitted by a clusterQueue, but it is found as member of clusterQueue %s in the cache", name, cqName) + } else if string(w.Obj.Status.Admission.ClusterQueue) != cqName { + t.Errorf("Workload %s is admitted by clusterQueue %s, but it is found as member of clusterQueue %s in the cache", name, w.Obj.Status.Admission.ClusterQueue, cqName) + } else { + gotAssignments[name] = *w.Obj.Status.Admission + } + } + } + if diff := cmp.Diff(tc.wantAdmissionsOnSecondSchedule, gotAssignments); diff != "" { + t.Errorf("Unexpected assigned clusterQueues in cache (-want,+got):\n%s", diff) + } + }) + } +} + var ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime") func TestRequeueAndUpdate(t *testing.T) { diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index 9fa26f50f9..ed29321cbe 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -359,6 +359,10 @@ func MakeClusterQueue(name string) *ClusterQueueWrapper { Spec: kueue.ClusterQueueSpec{ NamespaceSelector: &metav1.LabelSelector{}, QueueingStrategy: kueue.BestEffortFIFO, + FlavorFungibility: &kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.TryNextFlavor, + }, }, }} } @@ -424,6 +428,12 @@ func (c *ClusterQueueWrapper) Preemption(p kueue.ClusterQueuePreemption) *Cluste return c } +// Preemption sets the preeemption policies. +func (c *ClusterQueueWrapper) FlavorFungibility(p kueue.FlavorFungibility) *ClusterQueueWrapper { + c.Spec.FlavorFungibility = &p + return c +} + // FlavorQuotasWrapper wraps a FlavorQuotas object. type FlavorQuotasWrapper struct{ kueue.FlavorQuotas } diff --git a/pkg/webhooks/clusterqueue_webhook.go b/pkg/webhooks/clusterqueue_webhook.go index 90a49af949..2baddb20de 100644 --- a/pkg/webhooks/clusterqueue_webhook.go +++ b/pkg/webhooks/clusterqueue_webhook.go @@ -67,6 +67,12 @@ func (w *ClusterQueueWebhook) Default(ctx context.Context, obj runtime.Object) e ReclaimWithinCohort: kueue.PreemptionPolicyNever, } } + if cq.Spec.FlavorFungibility == nil { + cq.Spec.FlavorFungibility = &kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.TryNextFlavor, + } + } return nil } diff --git a/pkg/workload/workload.go b/pkg/workload/workload.go index a1cc98a36b..26edfc7c60 100644 --- a/pkg/workload/workload.go +++ b/pkg/workload/workload.go @@ -39,6 +39,24 @@ var ( admissionManagedConditions = []string{kueue.WorkloadQuotaReserved, kueue.WorkloadEvicted, kueue.WorkloadAdmitted} ) +type AssigmentClusterQueueState struct { + LastAssignedFlavorIdx []map[corev1.ResourceName]int + CohortGeneration int64 + ClusterQueueGeneration int64 +} + +func (s *AssigmentClusterQueueState) Clone() *AssigmentClusterQueueState { + c := AssigmentClusterQueueState{ + LastAssignedFlavorIdx: make([]map[corev1.ResourceName]int, len(s.LastAssignedFlavorIdx)), + CohortGeneration: s.CohortGeneration, + ClusterQueueGeneration: s.ClusterQueueGeneration, + } + for ps, flavorIdx := range s.LastAssignedFlavorIdx { + c.LastAssignedFlavorIdx[ps] = maps.Clone(flavorIdx) + } + return &c +} + // Info holds a Workload object and some pre-processing. type Info struct { Obj *kueue.Workload @@ -46,7 +64,8 @@ type Info struct { TotalRequests []PodSetResources // Populated from the queue during admission or from the admission field if // already admitted. - ClusterQueue string + ClusterQueue string + LastAssignment *AssigmentClusterQueueState } type PodSetResources struct { diff --git a/site/content/en/docs/installation/_index.md b/site/content/en/docs/installation/_index.md index a6cc97923c..1ad49b78b4 100644 --- a/site/content/en/docs/installation/_index.md +++ b/site/content/en/docs/installation/_index.md @@ -227,3 +227,4 @@ The currently supported features are: |---------|---------|-------|-------|-------| | `PartialAdmission` | `false` | Alpha | 0.4 | | | `QueueVisibility` | `false` | Alpha | 0.5 | | +| `FlavorFungibility` | `true` | beta | 0.5 | | diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index e1a514051e..6e6b146478 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -763,6 +763,10 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde *testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "5").Obj(), *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "5").Obj(), ). + FlavorFungibility(kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.TryNextFlavor, + }). Preemption(kueue.ClusterQueuePreemption{ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, }). diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index 29ccfdbc82..183cac564a 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + "sigs.k8s.io/kueue/pkg/features" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/util/testing" "sigs.k8s.io/kueue/pkg/workload" @@ -55,6 +56,7 @@ var _ = ginkgo.Describe("Scheduler", func() { GenerateName: "core-", }, } + _ = features.SetEnable(features.FlavorFungibility, true) gomega.Expect(k8sClient.Create(ctx, ns)).To(gomega.Succeed()) onDemandFlavor = testing.MakeResourceFlavor("on-demand").Label(instanceKey, "on-demand").Obj() @@ -921,6 +923,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.BeforeEach(func() { gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) gomega.Expect(k8sClient.Create(ctx, spotTaintedFlavor)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, spotUntaintedFlavor)).Should(gomega.Succeed()) }) ginkgo.AfterEach(func() { @@ -931,6 +934,7 @@ var _ = ginkgo.Describe("Scheduler", func() { } util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, onDemandFlavor, true) util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, spotTaintedFlavor, true) + util.ExpectResourceFlavorToBeDeleted(ctx, k8sClient, spotUntaintedFlavor, true) }) ginkgo.It("Should admit workloads using borrowed ClusterQueue", func() { @@ -989,10 +993,10 @@ var _ = ginkgo.Describe("Scheduler", func() { prodQueue := testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodCQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed()) - podQueue := testing.MakeLocalQueue("dev-queue", ns.Name).ClusterQueue(devCQ.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, podQueue)).Should(gomega.Succeed()) + devQueue := testing.MakeLocalQueue("dev-queue", ns.Name).ClusterQueue(devCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, devQueue)).Should(gomega.Succeed()) wl1 := testing.MakeWorkload("wl-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "11").Obj() - wl2 := testing.MakeWorkload("wl-2", ns.Name).Queue(podQueue.Name).Request(corev1.ResourceCPU, "11").Obj() + wl2 := testing.MakeWorkload("wl-2", ns.Name).Queue(devQueue.Name).Request(corev1.ResourceCPU, "11").Obj() ginkgo.By("Creating two workloads") gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) @@ -1075,6 +1079,91 @@ var _ = ginkgo.Describe("Scheduler", func() { util.FinishWorkloads(ctx, k8sClient, pWl3) util.ExpectWorkloadsToHaveQuotaReservation(ctx, k8sClient, devCQ.Name, dWl1) }) + + ginkgo.It("Should try next flavor instead of borrowing", func() { + prodCQ = testing.MakeClusterQueue("prod-cq"). + Cohort("all"). + ResourceGroup(*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "10", "10").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, prodCQ)).Should(gomega.Succeed()) + + devCQ = testing.MakeClusterQueue("dev-cq"). + Cohort("all"). + FlavorFungibility(kueue.FlavorFungibility{WhenCanBorrow: kueue.TryNextFlavor}). + ResourceGroup( + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "10", "10").Obj(), + *testing.MakeFlavorQuotas("spot-tainted").Resource(corev1.ResourceCPU, "11").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, devCQ)).Should(gomega.Succeed()) + + prodQueue := testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed()) + + devQueue := testing.MakeLocalQueue("dev-queue", ns.Name).ClusterQueue(devCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, devQueue)).Should(gomega.Succeed()) + + ginkgo.By("Creating one workload") + wl1 := testing.MakeWorkload("wl-1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "9").Obj() + gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) + prodWl1Admission := testing.MakeAdmission(prodCQ.Name).Assignment(corev1.ResourceCPU, "on-demand", "9").Obj() + util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl1, prodWl1Admission) + util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0) + util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 1) + util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 1) + + ginkgo.By("Creating another workload") + wl2 := testing.MakeWorkload("wl-2", ns.Name).Queue(devQueue.Name).Request(corev1.ResourceCPU, "11").Toleration(spotToleration).Obj() + gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) + prodWl2Admission := testing.MakeAdmission(devCQ.Name).Assignment(corev1.ResourceCPU, "spot-tainted", "11").Obj() + util.ExpectWorkloadToBeAdmittedAs(ctx, k8sClient, wl2, prodWl2Admission) + util.ExpectPendingWorkloadsMetric(devCQ, 0, 0) + util.ExpectAdmittedActiveWorkloadsMetric(devCQ, 1) + util.ExpectAdmittedWorkloadsTotalMetric(devCQ, 1) + }) + + ginkgo.It("Should preempt before try next flavor", func() { + prodCQ = testing.MakeClusterQueue("prod-cq"). + Cohort("all"). + ResourceGroup( + *testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "10", "10").Obj(), + *testing.MakeFlavorQuotas("spot-untainted").Resource(corev1.ResourceCPU, "11").Obj()). + Preemption(kueue.ClusterQueuePreemption{ + ReclaimWithinCohort: kueue.PreemptionPolicyAny, + }). + FlavorFungibility(kueue.FlavorFungibility{WhenCanPreempt: kueue.Preempt}). + Obj() + gomega.Expect(k8sClient.Create(ctx, prodCQ)).Should(gomega.Succeed()) + + devCQ = testing.MakeClusterQueue("dev-cq"). + Cohort("all"). + FlavorFungibility(kueue.FlavorFungibility{WhenCanBorrow: kueue.TryNextFlavor}). + ResourceGroup(*testing.MakeFlavorQuotas("on-demand").Resource(corev1.ResourceCPU, "10", "10").Obj()). + Obj() + gomega.Expect(k8sClient.Create(ctx, devCQ)).Should(gomega.Succeed()) + + prodQueue := testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed()) + + devQueue := testing.MakeLocalQueue("dev-queue", ns.Name).ClusterQueue(devCQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, devQueue)).Should(gomega.Succeed()) + + ginkgo.By("Creating two workloads") + wl1 := testing.MakeWorkload("wl-1", ns.Name).Priority(0).Queue(devQueue.Name).Request(corev1.ResourceCPU, "9").Obj() + wl2 := testing.MakeWorkload("wl-2", ns.Name).Priority(1).Queue(devQueue.Name).Request(corev1.ResourceCPU, "9").Obj() + gomega.Expect(k8sClient.Create(ctx, wl1)).Should(gomega.Succeed()) + gomega.Expect(k8sClient.Create(ctx, wl2)).Should(gomega.Succeed()) + util.ExpectPendingWorkloadsMetric(devCQ, 0, 0) + util.ExpectAdmittedActiveWorkloadsMetric(devCQ, 2) + util.ExpectAdmittedWorkloadsTotalMetric(devCQ, 2) + + ginkgo.By("Creating another workload") + wl3 := testing.MakeWorkload("wl-3", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj() + gomega.Expect(k8sClient.Create(ctx, wl3)).Should(gomega.Succeed()) + util.ExpectWorkloadsToBePreempted(ctx, k8sClient, wl1) + util.ExpectPendingWorkloadsMetric(prodCQ, 0, 0) + util.ExpectAdmittedActiveWorkloadsMetric(prodCQ, 1) + util.ExpectAdmittedWorkloadsTotalMetric(prodCQ, 1) + }) }) ginkgo.When("Queueing with StrictFIFO", func() { diff --git a/test/integration/webhook/clusterqueue_test.go b/test/integration/webhook/clusterqueue_test.go index dc35ae1d2b..9a9a5a5022 100644 --- a/test/integration/webhook/clusterqueue_test.go +++ b/test/integration/webhook/clusterqueue_test.go @@ -42,6 +42,10 @@ const ( var _ = ginkgo.Describe("ClusterQueue Webhook", func() { var ns *corev1.Namespace + defaultFlavorFungibility := &kueue.FlavorFungibility{ + WhenCanBorrow: kueue.Borrow, + WhenCanPreempt: kueue.TryNextFlavor, + } ginkgo.BeforeEach(func() { ns = &corev1.Namespace{ @@ -79,7 +83,8 @@ var _ = ginkgo.Describe("ClusterQueue Webhook", func() { Finalizers: []string{kueue.ResourceInUseFinalizerName}, }, Spec: kueue.ClusterQueueSpec{ - QueueingStrategy: kueue.BestEffortFIFO, + QueueingStrategy: kueue.BestEffortFIFO, + FlavorFungibility: defaultFlavorFungibility, Preemption: &kueue.ClusterQueuePreemption{ WithinClusterQueue: kueue.PreemptionPolicyNever, ReclaimWithinCohort: kueue.PreemptionPolicyNever, @@ -93,6 +98,7 @@ var _ = ginkgo.Describe("ClusterQueue Webhook", func() { Name: "foo", }, Spec: kueue.ClusterQueueSpec{ + FlavorFungibility: defaultFlavorFungibility, Preemption: &kueue.ClusterQueuePreemption{ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, ReclaimWithinCohort: kueue.PreemptionPolicyAny, @@ -105,7 +111,8 @@ var _ = ginkgo.Describe("ClusterQueue Webhook", func() { Finalizers: []string{kueue.ResourceInUseFinalizerName}, }, Spec: kueue.ClusterQueueSpec{ - QueueingStrategy: kueue.BestEffortFIFO, + QueueingStrategy: kueue.BestEffortFIFO, + FlavorFungibility: defaultFlavorFungibility, Preemption: &kueue.ClusterQueuePreemption{ WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, ReclaimWithinCohort: kueue.PreemptionPolicyAny, @@ -225,7 +232,7 @@ var _ = ginkgo.Describe("ClusterQueue Webhook", func() { testing.MakeClusterQueue("cluster-queue").Preemption(kueue.ClusterQueuePreemption{ ReclaimWithinCohort: kueue.PreemptionPolicyAny, WithinClusterQueue: kueue.PreemptionPolicyLowerPriority, - }).Obj(), + }).FlavorFungibility(*defaultFlavorFungibility).Obj(), isValid), ginkgo.Entry("Should forbid to create clusterQueue with unknown preemption.withinCohort", testing.MakeClusterQueue("cluster-queue").Preemption(kueue.ClusterQueuePreemption{ReclaimWithinCohort: "unknown"}).Obj(), diff --git a/test/util/util.go b/test/util/util.go index 28906cffec..4f26e00da3 100644 --- a/test/util/util.go +++ b/test/util/util.go @@ -194,6 +194,24 @@ func ExpectWorkloadsToBePending(ctx context.Context, k8sClient client.Client, wl }, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are pending") } +func ExpectWorkloadsToBePreempted(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) { + gomega.EventuallyWithOffset(1, func() int { + preempted := 0 + var updatedWorkload kueue.Workload + for _, wl := range wls { + gomega.ExpectWithOffset(1, k8sClient.Get(ctx, client.ObjectKeyFromObject(wl), &updatedWorkload)).To(gomega.Succeed()) + cond := apimeta.FindStatusCondition(updatedWorkload.Status.Conditions, kueue.WorkloadEvicted) + if cond == nil { + continue + } + if cond.Status == metav1.ConditionTrue { + preempted++ + } + } + return preempted + }, Timeout, Interval).Should(gomega.Equal(len(wls)), "Not enough workloads are preempted") +} + func ExpectWorkloadsToBeWaiting(ctx context.Context, k8sClient client.Client, wls ...*kueue.Workload) { gomega.EventuallyWithOffset(1, func() int { pending := 0