From f7f0a654ec395f4b246138f8c90a6e0e9958f15a Mon Sep 17 00:00:00 2001 From: Abdullah Gharaibeh Date: Thu, 18 Aug 2022 21:12:42 +0000 Subject: [PATCH] Renamed Queue to LocalQueue --- PROJECT | 2 +- .../{queue_types.go => localqueue_types.go} | 31 +++-- apis/kueue/v1alpha1/zz_generated.deepcopy.go | 114 ++++++++-------- apis/kueue/webhooks/clusterqueue_webhook.go | 2 +- ...queue_webhook.go => localqueue_webhook.go} | 38 +++--- ...ook_test.go => localqueue_webhook_test.go} | 30 ++-- apis/kueue/webhooks/webhooks.go | 2 +- ...s.yaml => kueue.x-k8s.io_localqueues.yaml} | 23 ++-- config/crd/kustomization.yaml | 6 +- ...s.yaml => cainjection_in_localqueues.yaml} | 2 +- ...ueues.yaml => webhook_in_localqueues.yaml} | 2 +- config/rbac/kustomization.yaml | 4 +- ..._role.yaml => localqueue_editor_role.yaml} | 8 +- ..._role.yaml => localqueue_viewer_role.yaml} | 6 +- config/rbac/role.yaml | 6 +- config/samples/single-clusterqueue-setup.yaml | 2 +- config/webhook/manifests.yaml | 6 +- pkg/cache/cache.go | 26 ++-- pkg/cache/cache_test.go | 12 +- pkg/controller/core/core.go | 2 +- ...controller.go => localqueue_controller.go} | 74 +++++----- pkg/queue/cluster_queue_impl.go | 4 +- pkg/queue/cluster_queue_impl_test.go | 16 +-- pkg/queue/cluster_queue_interface.go | 4 +- pkg/queue/{queue.go => local_queue.go} | 16 +-- pkg/queue/manager.go | 66 ++++----- pkg/queue/manager_test.go | 128 +++++++++--------- pkg/scheduler/scheduler_test.go | 22 +-- pkg/util/testing/wrappers.go | 20 +-- .../core/clusterqueue_controller_test.go | 26 ++-- ..._test.go => localqueue_controller_test.go} | 22 +-- .../core/workload_controller_test.go | 28 ++-- .../controller/job/job_controller_test.go | 18 +-- test/integration/framework/framework.go | 4 +- test/integration/scheduler/scheduler_test.go | 60 ++++---- .../{queue_test.go => localqueue_test.go} | 8 +- 36 files changed, 422 insertions(+), 418 deletions(-) rename apis/kueue/v1alpha1/{queue_types.go => localqueue_types.go} (69%) rename apis/kueue/webhooks/{queue_webhook.go => localqueue_webhook.go} (55%) rename apis/kueue/webhooks/{queue_webhook_test.go => localqueue_webhook_test.go} (63%) rename config/crd/bases/{kueue.x-k8s.io_queues.yaml => kueue.x-k8s.io_localqueues.yaml} (78%) rename config/crd/patches/{cainjection_in_queues.yaml => cainjection_in_localqueues.yaml} (87%) rename config/crd/patches/{webhook_in_queues.yaml => webhook_in_localqueues.yaml} (91%) rename config/rbac/{queue_editor_role.yaml => localqueue_editor_role.yaml} (72%) rename config/rbac/{queue_viewer_role.yaml => localqueue_viewer_role.yaml} (83%) rename pkg/controller/core/{queue_controller.go => localqueue_controller.go} (63%) rename pkg/queue/{queue.go => local_queue.go} (76%) rename test/integration/controller/core/{queue_controller_test.go => localqueue_controller_test.go} (87%) rename test/integration/webhook/v1alpha1/{queue_test.go => localqueue_test.go} (88%) diff --git a/PROJECT b/PROJECT index 0dec423509..bc1f4fe595 100644 --- a/PROJECT +++ b/PROJECT @@ -11,7 +11,7 @@ resources: controller: true domain: x-k8s.io group: kueue - kind: Queue + kind: LocalQueue path: sigs.k8s.io/kueue/apis/kueue/v1alpha1 version: v1alpha1 webhooks: diff --git a/apis/kueue/v1alpha1/queue_types.go b/apis/kueue/v1alpha1/localqueue_types.go similarity index 69% rename from apis/kueue/v1alpha1/queue_types.go rename to apis/kueue/v1alpha1/localqueue_types.go index b5e5231b04..4de17651f2 100644 --- a/apis/kueue/v1alpha1/queue_types.go +++ b/apis/kueue/v1alpha1/localqueue_types.go @@ -20,19 +20,19 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// QueueSpec defines the desired state of Queue -type QueueSpec struct { - // clusterQueue is a reference to a clusterQueue that backs this queue. +// LocalQueueSpec defines the desired state of LocalQueue +type LocalQueueSpec struct { + // clusterQueue is a reference to a clusterQueue that backs this localQueue. ClusterQueue ClusterQueueReference `json:"clusterQueue,omitempty"` } // ClusterQueueReference is the name of the ClusterQueue. type ClusterQueueReference string -// QueueStatus defines the observed state of Queue -type QueueStatus struct { +// LocalQueueStatus defines the observed state of LocalQueue +type LocalQueueStatus struct { // PendingWorkloads is the number of workloads currently admitted to this - // queue not yet admitted to a ClusterQueue. + // localQueue not yet admitted to a ClusterQueue. // +optional PendingWorkloads int32 `json:"pendingWorkloads"` } @@ -41,25 +41,26 @@ type QueueStatus struct { //+kubebuilder:subresource:status //+kubebuilder:printcolumn:name="ClusterQueue",JSONPath=".spec.clusterQueue",type=string,description="Backing ClusterQueue" //+kubebuilder:printcolumn:name="Pending Workloads",JSONPath=".status.pendingWorkloads",type=integer,description="Number of pending workloads" +//+kubebuilder:resource:shortName={queue,queues} -// Queue is the Schema for the queues API -type Queue struct { +// LocalQueue is the Schema for the localQueues API +type LocalQueue struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec QueueSpec `json:"spec,omitempty"` - Status QueueStatus `json:"status,omitempty"` + Spec LocalQueueSpec `json:"spec,omitempty"` + Status LocalQueueStatus `json:"status,omitempty"` } -//+kubebuilder:object:root=true +// +kubebuilder:object:root=true -// QueueList contains a list of Queue -type QueueList struct { +// LocalQueueList contains a list of LocalQueue +type LocalQueueList struct { metav1.TypeMeta `json:",inline"` metav1.ListMeta `json:"metadata,omitempty"` - Items []Queue `json:"items"` + Items []LocalQueue `json:"items"` } func init() { - SchemeBuilder.Register(&Queue{}, &QueueList{}) + SchemeBuilder.Register(&LocalQueue{}, &LocalQueueList{}) } diff --git a/apis/kueue/v1alpha1/zz_generated.deepcopy.go b/apis/kueue/v1alpha1/zz_generated.deepcopy.go index 09164983ef..5754ccf3bb 100644 --- a/apis/kueue/v1alpha1/zz_generated.deepcopy.go +++ b/apis/kueue/v1alpha1/zz_generated.deepcopy.go @@ -184,45 +184,7 @@ func (in *Flavor) DeepCopy() *Flavor { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *PodSet) DeepCopyInto(out *PodSet) { - *out = *in - in.Spec.DeepCopyInto(&out.Spec) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodSet. -func (in *PodSet) DeepCopy() *PodSet { - if in == nil { - return nil - } - out := new(PodSet) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *PodSetFlavors) DeepCopyInto(out *PodSetFlavors) { - *out = *in - if in.Flavors != nil { - in, out := &in.Flavors, &out.Flavors - *out = make(map[corev1.ResourceName]string, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodSetFlavors. -func (in *PodSetFlavors) DeepCopy() *PodSetFlavors { - if in == nil { - return nil - } - out := new(PodSetFlavors) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Queue) DeepCopyInto(out *Queue) { +func (in *LocalQueue) DeepCopyInto(out *LocalQueue) { *out = *in out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) @@ -230,18 +192,18 @@ func (in *Queue) DeepCopyInto(out *Queue) { out.Status = in.Status } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Queue. -func (in *Queue) DeepCopy() *Queue { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueue. +func (in *LocalQueue) DeepCopy() *LocalQueue { if in == nil { return nil } - out := new(Queue) + out := new(LocalQueue) in.DeepCopyInto(out) return out } // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *Queue) DeepCopyObject() runtime.Object { +func (in *LocalQueue) DeepCopyObject() runtime.Object { if c := in.DeepCopy(); c != nil { return c } @@ -249,31 +211,31 @@ func (in *Queue) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *QueueList) DeepCopyInto(out *QueueList) { +func (in *LocalQueueList) DeepCopyInto(out *LocalQueueList) { *out = *in out.TypeMeta = in.TypeMeta in.ListMeta.DeepCopyInto(&out.ListMeta) if in.Items != nil { in, out := &in.Items, &out.Items - *out = make([]Queue, len(*in)) + *out = make([]LocalQueue, len(*in)) for i := range *in { (*in)[i].DeepCopyInto(&(*out)[i]) } } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueList. -func (in *QueueList) DeepCopy() *QueueList { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueueList. +func (in *LocalQueueList) DeepCopy() *LocalQueueList { if in == nil { return nil } - out := new(QueueList) + out := new(LocalQueueList) in.DeepCopyInto(out) return out } // DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. -func (in *QueueList) DeepCopyObject() runtime.Object { +func (in *LocalQueueList) DeepCopyObject() runtime.Object { if c := in.DeepCopy(); c != nil { return c } @@ -281,31 +243,69 @@ func (in *QueueList) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *QueueSpec) DeepCopyInto(out *QueueSpec) { +func (in *LocalQueueSpec) DeepCopyInto(out *LocalQueueSpec) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueSpec. -func (in *QueueSpec) DeepCopy() *QueueSpec { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueueSpec. +func (in *LocalQueueSpec) DeepCopy() *LocalQueueSpec { if in == nil { return nil } - out := new(QueueSpec) + out := new(LocalQueueSpec) in.DeepCopyInto(out) return out } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *QueueStatus) DeepCopyInto(out *QueueStatus) { +func (in *LocalQueueStatus) DeepCopyInto(out *LocalQueueStatus) { *out = *in } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new QueueStatus. -func (in *QueueStatus) DeepCopy() *QueueStatus { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LocalQueueStatus. +func (in *LocalQueueStatus) DeepCopy() *LocalQueueStatus { if in == nil { return nil } - out := new(QueueStatus) + out := new(LocalQueueStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodSet) DeepCopyInto(out *PodSet) { + *out = *in + in.Spec.DeepCopyInto(&out.Spec) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodSet. +func (in *PodSet) DeepCopy() *PodSet { + if in == nil { + return nil + } + out := new(PodSet) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PodSetFlavors) DeepCopyInto(out *PodSetFlavors) { + *out = *in + if in.Flavors != nil { + in, out := &in.Flavors, &out.Flavors + *out = make(map[corev1.ResourceName]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PodSetFlavors. +func (in *PodSetFlavors) DeepCopy() *PodSetFlavors { + if in == nil { + return nil + } + out := new(PodSetFlavors) in.DeepCopyInto(out) return out } diff --git a/apis/kueue/webhooks/clusterqueue_webhook.go b/apis/kueue/webhooks/clusterqueue_webhook.go index ed3cecf150..c29b5a20d3 100644 --- a/apis/kueue/webhooks/clusterqueue_webhook.go +++ b/apis/kueue/webhooks/clusterqueue_webhook.go @@ -36,7 +36,7 @@ import ( var ( // log is for logging in this package. - clusterQueueLog = ctrl.Log.WithName("cluster-queue-webhook") + clusterQueueLog = ctrl.Log.WithName("clusterqueue-webhook") queueingStrategies = sets.NewString(string(kueue.StrictFIFO), string(kueue.BestEffortFIFO)) ) diff --git a/apis/kueue/webhooks/queue_webhook.go b/apis/kueue/webhooks/localqueue_webhook.go similarity index 55% rename from apis/kueue/webhooks/queue_webhook.go rename to apis/kueue/webhooks/localqueue_webhook.go index a54516cd7a..bb4f9fe3a4 100644 --- a/apis/kueue/webhooks/queue_webhook.go +++ b/apis/kueue/webhooks/localqueue_webhook.go @@ -30,48 +30,48 @@ import ( ) // log is for logging in this package. -var queueLog = ctrl.Log.WithName("queue-webhook") +var localQueueLog = ctrl.Log.WithName("localqueue-webhook") -type QueueWebhook struct{} +type LocalQueueWebhook struct{} -func setupWebhookForQueue(mgr ctrl.Manager) error { +func setupWebhookForLocalQueue(mgr ctrl.Manager) error { return ctrl.NewWebhookManagedBy(mgr). - For(&kueue.Queue{}). - WithValidator(&QueueWebhook{}). + For(&kueue.LocalQueue{}). + WithValidator(&LocalQueueWebhook{}). Complete() } -// +kubebuilder:webhook:path=/validate-kueue-x-k8s-io-v1alpha1-queue,mutating=false,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=queues,verbs=create;update,versions=v1alpha1,name=vqueue.kb.io,admissionReviewVersions=v1 +// +kubebuilder:webhook:path=/validate-kueue-x-k8s-io-v1alpha1-localqueue,mutating=false,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=localqueues,verbs=create;update,versions=v1alpha1,name=vlocalqueue.kb.io,admissionReviewVersions=v1 -var _ webhook.CustomValidator = &QueueWebhook{} +var _ webhook.CustomValidator = &LocalQueueWebhook{} // ValidateCreate implements webhook.CustomValidator so a webhook will be registered for the type -func (w *QueueWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) error { - q := obj.(*kueue.Queue) - queueLog.V(5).Info("Validating create", "queue", klog.KObj(q)) - return ValidateQueue(q).ToAggregate() +func (w *LocalQueueWebhook) ValidateCreate(ctx context.Context, obj runtime.Object) error { + q := obj.(*kueue.LocalQueue) + localQueueLog.V(5).Info("Validating create", "localQueue", klog.KObj(q)) + return ValidateLocalQueue(q).ToAggregate() } // ValidateUpdate implements webhook.CustomValidator so a webhook will be registered for the type -func (w *QueueWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error { - newQ := newObj.(*kueue.Queue) - oldQ := oldObj.(*kueue.Queue) - queueLog.V(5).Info("Validating update", "queue", klog.KObj(newQ)) - return ValidateQueueUpdate(newQ, oldQ).ToAggregate() +func (w *LocalQueueWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) error { + newQ := newObj.(*kueue.LocalQueue) + oldQ := oldObj.(*kueue.LocalQueue) + localQueueLog.V(5).Info("Validating update", "localQueue", klog.KObj(newQ)) + return ValidateLocalQueueUpdate(newQ, oldQ).ToAggregate() } // ValidateDelete implements webhook.CustomValidator so a webhook will be registered for the type -func (w *QueueWebhook) ValidateDelete(ctx context.Context, obj runtime.Object) error { +func (w *LocalQueueWebhook) ValidateDelete(ctx context.Context, obj runtime.Object) error { return nil } -func ValidateQueue(q *kueue.Queue) field.ErrorList { +func ValidateLocalQueue(q *kueue.LocalQueue) field.ErrorList { var allErrs field.ErrorList clusterQueuePath := field.NewPath("spec", "clusterQueue") allErrs = append(allErrs, validateNameReference(string(q.Spec.ClusterQueue), clusterQueuePath)...) return allErrs } -func ValidateQueueUpdate(newObj, oldObj *kueue.Queue) field.ErrorList { +func ValidateLocalQueueUpdate(newObj, oldObj *kueue.LocalQueue) field.ErrorList { return apivalidation.ValidateImmutableField(newObj.Spec.ClusterQueue, oldObj.Spec.ClusterQueue, field.NewPath("spec", "clusterQueue")) } diff --git a/apis/kueue/webhooks/queue_webhook_test.go b/apis/kueue/webhooks/localqueue_webhook_test.go similarity index 63% rename from apis/kueue/webhooks/queue_webhook_test.go rename to apis/kueue/webhooks/localqueue_webhook_test.go index a30a7f117b..0d5925153b 100644 --- a/apis/kueue/webhooks/queue_webhook_test.go +++ b/apis/kueue/webhooks/localqueue_webhook_test.go @@ -28,17 +28,17 @@ import ( ) const ( - testQueueName = "test-queue" - testQueueNamespace = "test-queue-ns" + testLocalQueueName = "test-queue" + testLocalQueueNamespace = "test-queue-ns" ) -func TestValidateQueueCreate(t *testing.T) { +func TestValidateLocalQueueCreate(t *testing.T) { testCases := map[string]struct { - queue *Queue + queue *LocalQueue wantErr field.ErrorList }{ "should reject queue creation with an invalid clusterQueue": { - queue: testingutil.MakeQueue(testQueueName, testQueueNamespace).ClusterQueue("invalid_cluster_queue").Obj(), + queue: testingutil.MakeLocalQueue(testLocalQueueName, testLocalQueueNamespace).ClusterQueue("invalid_cluster_queue").Obj(), wantErr: field.ErrorList{ field.Invalid(field.NewPath("spec").Child("clusterQueue"), "invalid_name", ""), }, @@ -46,37 +46,37 @@ func TestValidateQueueCreate(t *testing.T) { } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - errList := ValidateQueue(tc.queue) + errList := ValidateLocalQueue(tc.queue) if diff := cmp.Diff(tc.wantErr, errList, cmpopts.IgnoreFields(field.Error{}, "Detail", "BadValue")); diff != "" { - t.Errorf("ValidateQueueCreate() mismatch (-want +got):\n%s", diff) + t.Errorf("ValidateLocalQueueCreate() mismatch (-want +got):\n%s", diff) } }) } } -func TestValidateQueueUpdate(t *testing.T) { +func TestValidateLocalQueueUpdate(t *testing.T) { testCases := map[string]struct { - before, after *Queue + before, after *LocalQueue wantErr field.ErrorList }{ "clusterQueue cannot be updated": { - before: testingutil.MakeQueue(testQueueName, testQueueNamespace).ClusterQueue("foo").Obj(), - after: testingutil.MakeQueue(testQueueName, testQueueNamespace).ClusterQueue("bar").Obj(), + before: testingutil.MakeLocalQueue(testLocalQueueName, testLocalQueueNamespace).ClusterQueue("foo").Obj(), + after: testingutil.MakeLocalQueue(testLocalQueueName, testLocalQueueNamespace).ClusterQueue("bar").Obj(), wantErr: field.ErrorList{ field.Invalid(field.NewPath("spec").Child("clusterQueue"), nil, ""), }, }, "status could be updated": { - before: testingutil.MakeQueue(testQueueName, testQueueNamespace).Obj(), - after: testingutil.MakeQueue(testQueueName, testQueueNamespace).PendingWorkloads(10).Obj(), + before: testingutil.MakeLocalQueue(testLocalQueueName, testLocalQueueNamespace).Obj(), + after: testingutil.MakeLocalQueue(testLocalQueueName, testLocalQueueNamespace).PendingWorkloads(10).Obj(), wantErr: field.ErrorList{}, }, } for name, tc := range testCases { t.Run(name, func(t *testing.T) { - errList := ValidateQueueUpdate(tc.before, tc.after) + errList := ValidateLocalQueueUpdate(tc.before, tc.after) if diff := cmp.Diff(tc.wantErr, errList, cmpopts.IgnoreFields(field.Error{}, "Detail", "BadValue")); diff != "" { - t.Errorf("ValidateQueueUpdate() mismatch (-want +got):\n%s", diff) + t.Errorf("ValidateLocalQueueUpdate() mismatch (-want +got):\n%s", diff) } }) } diff --git a/apis/kueue/webhooks/webhooks.go b/apis/kueue/webhooks/webhooks.go index 1ba7f7c003..c1988623e7 100644 --- a/apis/kueue/webhooks/webhooks.go +++ b/apis/kueue/webhooks/webhooks.go @@ -33,7 +33,7 @@ func Setup(mgr ctrl.Manager) (string, error) { return "ClusterQueue", err } - if err := setupWebhookForQueue(mgr); err != nil { + if err := setupWebhookForLocalQueue(mgr); err != nil { return "Queue", err } return "", nil diff --git a/config/crd/bases/kueue.x-k8s.io_queues.yaml b/config/crd/bases/kueue.x-k8s.io_localqueues.yaml similarity index 78% rename from config/crd/bases/kueue.x-k8s.io_queues.yaml rename to config/crd/bases/kueue.x-k8s.io_localqueues.yaml index 1dc88a833c..47e42d59a9 100644 --- a/config/crd/bases/kueue.x-k8s.io_queues.yaml +++ b/config/crd/bases/kueue.x-k8s.io_localqueues.yaml @@ -5,14 +5,17 @@ metadata: annotations: controller-gen.kubebuilder.io/version: v0.9.2 creationTimestamp: null - name: queues.kueue.x-k8s.io + name: localqueues.kueue.x-k8s.io spec: group: kueue.x-k8s.io names: - kind: Queue - listKind: QueueList - plural: queues - singular: queue + kind: LocalQueue + listKind: LocalQueueList + plural: localqueues + shortNames: + - queue + - queues + singular: localqueue scope: Namespaced versions: - additionalPrinterColumns: @@ -27,7 +30,7 @@ spec: name: v1alpha1 schema: openAPIV3Schema: - description: Queue is the Schema for the queues API + description: LocalQueue is the Schema for the localQueues API properties: apiVersion: description: 'APIVersion defines the versioned schema of this representation @@ -42,19 +45,19 @@ spec: metadata: type: object spec: - description: QueueSpec defines the desired state of Queue + description: LocalQueueSpec defines the desired state of LocalQueue properties: clusterQueue: description: clusterQueue is a reference to a clusterQueue that backs - this queue. + this localQueue. type: string type: object status: - description: QueueStatus defines the observed state of Queue + description: LocalQueueStatus defines the observed state of LocalQueue properties: pendingWorkloads: description: PendingWorkloads is the number of workloads currently - admitted to this queue not yet admitted to a ClusterQueue. + admitted to this localQueue not yet admitted to a ClusterQueue. format: int32 type: integer type: object diff --git a/config/crd/kustomization.yaml b/config/crd/kustomization.yaml index 836010ccc4..30f76217fe 100644 --- a/config/crd/kustomization.yaml +++ b/config/crd/kustomization.yaml @@ -2,7 +2,7 @@ # since it depends on service name and namespace that are out of this kustomize package. # It should be run by config/default resources: -- bases/kueue.x-k8s.io_queues.yaml +- bases/kueue.x-k8s.io_localqueues.yaml - bases/kueue.x-k8s.io_clusterqueues.yaml - bases/kueue.x-k8s.io_workloads.yaml - bases/kueue.x-k8s.io_resourceflavors.yaml @@ -11,7 +11,7 @@ resources: patchesStrategicMerge: # [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix. # patches here are for enabling the conversion webhook for each CRD -#- patches/webhook_in_queues.yaml +#- patches/webhook_in_localqueues.yaml #- patches/webhook_in_clusterqueues.yaml #- patches/webhook_in_workloads.yaml #- patches/webhook_in_resourceflavors.yaml @@ -19,7 +19,7 @@ patchesStrategicMerge: # [CERTMANAGER] To enable cert-manager, uncomment all the sections with [CERTMANAGER] prefix. # patches here are for enabling the CA injection for each CRD -- patches/cainjection_in_queues.yaml +- patches/cainjection_in_localqueues.yaml #- patches/cainjection_in_clusterqueues.yaml - patches/cainjection_in_workloads.yaml #- patches/cainjection_in_resourceflavors.yaml diff --git a/config/crd/patches/cainjection_in_queues.yaml b/config/crd/patches/cainjection_in_localqueues.yaml similarity index 87% rename from config/crd/patches/cainjection_in_queues.yaml rename to config/crd/patches/cainjection_in_localqueues.yaml index 7f0ccccb7d..a0a85c03e9 100644 --- a/config/crd/patches/cainjection_in_queues.yaml +++ b/config/crd/patches/cainjection_in_localqueues.yaml @@ -4,4 +4,4 @@ kind: CustomResourceDefinition metadata: annotations: cert-manager.io/inject-ca-from: $(CERTIFICATE_NAMESPACE)/$(CERTIFICATE_NAME) - name: queues.kueue.x-k8s.io + name: localqueues.kueue.x-k8s.io diff --git a/config/crd/patches/webhook_in_queues.yaml b/config/crd/patches/webhook_in_localqueues.yaml similarity index 91% rename from config/crd/patches/webhook_in_queues.yaml rename to config/crd/patches/webhook_in_localqueues.yaml index 013fa35403..2d66459706 100644 --- a/config/crd/patches/webhook_in_queues.yaml +++ b/config/crd/patches/webhook_in_localqueues.yaml @@ -2,7 +2,7 @@ apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: - name: queues.kueue.x-k8s.io + name: localqueues.kueue.x-k8s.io spec: conversion: strategy: Webhook diff --git a/config/rbac/kustomization.yaml b/config/rbac/kustomization.yaml index 1f4c410947..846ada9a69 100644 --- a/config/rbac/kustomization.yaml +++ b/config/rbac/kustomization.yaml @@ -23,8 +23,8 @@ resources: - clusterqueue_viewer_role.yaml - job_editor_role.yaml - job_viewer_role.yaml -- queue_editor_role.yaml -- queue_viewer_role.yaml +- localqueue_editor_role.yaml +- localqueue_viewer_role.yaml - workload_editor_role.yaml - workload_viewer_role.yaml - resourceflavor_editor_role.yaml diff --git a/config/rbac/queue_editor_role.yaml b/config/rbac/localqueue_editor_role.yaml similarity index 72% rename from config/rbac/queue_editor_role.yaml rename to config/rbac/localqueue_editor_role.yaml index b2ad181450..689fe58245 100644 --- a/config/rbac/queue_editor_role.yaml +++ b/config/rbac/localqueue_editor_role.yaml @@ -1,15 +1,15 @@ -# permissions for end users to edit queues. +# permissions for end users to edit localqueues. apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: queue-editor-role + name: localqueue-editor-role labels: rbac.kueue.x-k8s.io/batch-admin: "true" rules: - apiGroups: - kueue.x-k8s.io resources: - - queues + - localqueues verbs: - create - delete @@ -21,6 +21,6 @@ rules: - apiGroups: - kueue.x-k8s.io resources: - - queues/status + - localqueues/status verbs: - get diff --git a/config/rbac/queue_viewer_role.yaml b/config/rbac/localqueue_viewer_role.yaml similarity index 83% rename from config/rbac/queue_viewer_role.yaml rename to config/rbac/localqueue_viewer_role.yaml index 3acc1e49c5..c7b908f650 100644 --- a/config/rbac/queue_viewer_role.yaml +++ b/config/rbac/localqueue_viewer_role.yaml @@ -2,7 +2,7 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: - name: queue-viewer-role + name: localqueue-viewer-role labels: rbac.kueue.x-k8s.io/batch-admin: "true" rbac.kueue.x-k8s.io/batch-user: "true" @@ -10,7 +10,7 @@ rules: - apiGroups: - kueue.x-k8s.io resources: - - queues + - localqueues verbs: - get - list @@ -18,6 +18,6 @@ rules: - apiGroups: - kueue.x-k8s.io resources: - - queues/status + - localqueues/status verbs: - get diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index e4854123f8..c94b40a398 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -93,7 +93,7 @@ rules: - apiGroups: - kueue.x-k8s.io resources: - - queues + - localqueues verbs: - create - delete @@ -105,13 +105,13 @@ rules: - apiGroups: - kueue.x-k8s.io resources: - - queues/finalizers + - localqueues/finalizers verbs: - update - apiGroups: - kueue.x-k8s.io resources: - - queues/status + - localqueues/status verbs: - get - patch diff --git a/config/samples/single-clusterqueue-setup.yaml b/config/samples/single-clusterqueue-setup.yaml index ae4f1e709f..999ac015b2 100644 --- a/config/samples/single-clusterqueue-setup.yaml +++ b/config/samples/single-clusterqueue-setup.yaml @@ -22,7 +22,7 @@ spec: min: 36Gi --- apiVersion: kueue.x-k8s.io/v1alpha1 -kind: Queue +kind: LocalQueue metadata: namespace: default name: main diff --git a/config/webhook/manifests.yaml b/config/webhook/manifests.yaml index 7c6e20f7f5..9f2dad416f 100644 --- a/config/webhook/manifests.yaml +++ b/config/webhook/manifests.yaml @@ -96,9 +96,9 @@ webhooks: service: name: webhook-service namespace: system - path: /validate-kueue-x-k8s-io-v1alpha1-queue + path: /validate-kueue-x-k8s-io-v1alpha1-localqueue failurePolicy: Fail - name: vqueue.kb.io + name: vlocalqueue.kb.io rules: - apiGroups: - kueue.x-k8s.io @@ -108,7 +108,7 @@ webhooks: - CREATE - UPDATE resources: - - queues + - localqueues sideEffects: None - admissionReviewVersions: - v1 diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index cf259a6db6..1bcce30d47 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -293,7 +293,7 @@ func (c *ClusterQueue) updateWorkloadUsage(wi *workload.Info, m int64) { } } -func (c *ClusterQueue) addQueue(q *kueue.Queue) error { +func (c *ClusterQueue) addLocalQueue(q *kueue.LocalQueue) error { qKey := queueKey(q) if _, ok := c.admittedWorkloadsPerQueue[qKey]; ok { return errQueueAlreadyExists @@ -302,7 +302,7 @@ func (c *ClusterQueue) addQueue(q *kueue.Queue) error { // receiving the queue add event. workloads := 0 for _, wl := range c.Workloads { - if workloadBelongsToQueue(wl.Obj, q) { + if workloadBelongsToLocalQueue(wl.Obj, q) { workloads++ } } @@ -310,7 +310,7 @@ func (c *ClusterQueue) addQueue(q *kueue.Queue) error { return nil } -func (c *ClusterQueue) deleteQueue(q *kueue.Queue) { +func (c *ClusterQueue) deleteLocalQueue(q *kueue.LocalQueue) { qKey := queueKey(q) delete(c.admittedWorkloadsPerQueue, qKey) } @@ -414,7 +414,7 @@ func (c *Cache) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) err // On controller restart, an add ClusterQueue event may come after // add queue and workload, so here we explicitly list and add existing queues // and workloads. - var queues kueue.QueueList + var queues kueue.LocalQueueList if err := c.client.List(ctx, &queues, client.MatchingFields{queueClusterQueueKey: cq.Name}); err != nil { return fmt.Errorf("listing queues that match the clusterQueue: %w", err) } @@ -477,27 +477,27 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) { metrics.AdmittedActiveWorkloads.DeleteLabelValues(cq.Name) } -func (c *Cache) AddQueue(q *kueue.Queue) error { +func (c *Cache) AddLocalQueue(q *kueue.LocalQueue) error { c.Lock() defer c.Unlock() cq, ok := c.clusterQueues[string(q.Spec.ClusterQueue)] if !ok { return nil } - return cq.addQueue(q) + return cq.addLocalQueue(q) } -func (c *Cache) DeleteQueue(q *kueue.Queue) { +func (c *Cache) DeleteLocalQueue(q *kueue.LocalQueue) { c.Lock() defer c.Unlock() cq, ok := c.clusterQueues[string(q.Spec.ClusterQueue)] if !ok { return } - cq.deleteQueue(q) + cq.deleteLocalQueue(q) } -func (c *Cache) UpdateQueue(oldQ, newQ *kueue.Queue) error { +func (c *Cache) UpdateLocalQueue(oldQ, newQ *kueue.LocalQueue) error { if oldQ.Spec.ClusterQueue == newQ.Spec.ClusterQueue { return nil } @@ -505,11 +505,11 @@ func (c *Cache) UpdateQueue(oldQ, newQ *kueue.Queue) error { defer c.Unlock() cq, ok := c.clusterQueues[string(oldQ.Spec.ClusterQueue)] if ok { - cq.deleteQueue(oldQ) + cq.deleteLocalQueue(oldQ) } cq, ok = c.clusterQueues[string(newQ.Spec.ClusterQueue)] if ok { - return cq.addQueue(newQ) + return cq.addLocalQueue(newQ) } return nil } @@ -754,12 +754,12 @@ func SetupIndexes(indexer client.FieldIndexer) error { }) } -func workloadBelongsToQueue(wl *kueue.Workload, q *kueue.Queue) bool { +func workloadBelongsToLocalQueue(wl *kueue.Workload, q *kueue.LocalQueue) bool { return wl.Namespace == q.Namespace && wl.Spec.QueueName == q.Name } // Key is the key used to index the queue. -func queueKey(q *kueue.Queue) string { +func queueKey(q *kueue.LocalQueue) string { return fmt.Sprintf("%s/%s", q.Namespace, q.Name) } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index 3f9bc77a52..31f821cb01 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1249,10 +1249,10 @@ func TestCacheQueueOperations(t *testing.T) { utiltesting.MakeClusterQueue("foo").Obj(), utiltesting.MakeClusterQueue("bar").Obj(), } - queues := []*kueue.Queue{ - utiltesting.MakeQueue("alpha", "ns1").ClusterQueue("foo").Obj(), - utiltesting.MakeQueue("beta", "ns2").ClusterQueue("foo").Obj(), - utiltesting.MakeQueue("gamma", "ns1").ClusterQueue("bar").Obj(), + queues := []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("alpha", "ns1").ClusterQueue("foo").Obj(), + utiltesting.MakeLocalQueue("beta", "ns2").ClusterQueue("foo").Obj(), + utiltesting.MakeLocalQueue("gamma", "ns1").ClusterQueue("bar").Obj(), } workloads := []*kueue.Workload{ utiltesting.MakeWorkload("job1", "ns1").Queue("alpha").Admit(utiltesting.MakeAdmission("foo").Obj()).Obj(), @@ -1278,7 +1278,7 @@ func TestCacheQueueOperations(t *testing.T) { if err := cl.Create(ctx, q.DeepCopy()); err != nil { return err } - if err := cache.AddQueue(q); err != nil { + if err := cache.AddLocalQueue(q); err != nil { return err } } @@ -1451,7 +1451,7 @@ func TestCacheQueueOperations(t *testing.T) { insertAllQueues, insertAllWorkloads, func(ctx context.Context, cl client.Client, cache *Cache) error { - cache.DeleteQueue(queues[0]) + cache.DeleteLocalQueue(queues[0]) return nil }, }, diff --git a/pkg/controller/core/core.go b/pkg/controller/core/core.go index 00166de74d..a22beff31a 100644 --- a/pkg/controller/core/core.go +++ b/pkg/controller/core/core.go @@ -32,7 +32,7 @@ func SetupControllers(mgr ctrl.Manager, qManager *queue.Manager, cc *cache.Cache if err := rfRec.SetupWithManager(mgr); err != nil { return "ResourceFlavor", err } - qRec := NewQueueReconciler(mgr.GetClient(), qManager, cc) + qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc) if err := qRec.SetupWithManager(mgr); err != nil { return "Queue", err } diff --git a/pkg/controller/core/queue_controller.go b/pkg/controller/core/localqueue_controller.go similarity index 63% rename from pkg/controller/core/queue_controller.go rename to pkg/controller/core/localqueue_controller.go index 8a6576f23d..a5d1cfb0ef 100644 --- a/pkg/controller/core/queue_controller.go +++ b/pkg/controller/core/localqueue_controller.go @@ -36,8 +36,8 @@ import ( "sigs.k8s.io/kueue/pkg/queue" ) -// QueueReconciler reconciles a Queue object -type QueueReconciler struct { +// LocalQueueReconciler reconciles a LocalQueue object +type LocalQueueReconciler struct { client client.Client log logr.Logger queues *queue.Manager @@ -45,9 +45,9 @@ type QueueReconciler struct { wlUpdateCh chan event.GenericEvent } -func NewQueueReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache) *QueueReconciler { - return &QueueReconciler{ - log: ctrl.Log.WithName("queue-reconciler"), +func NewLocalQueueReconciler(client client.Client, queues *queue.Manager, cache *cache.Cache) *LocalQueueReconciler { + return &LocalQueueReconciler{ + log: ctrl.Log.WithName("localqueue-reconciler"), queues: queues, cache: cache, client: client, @@ -55,31 +55,31 @@ func NewQueueReconciler(client client.Client, queues *queue.Manager, cache *cach } } -func (r *QueueReconciler) NotifyWorkloadUpdate(w *kueue.Workload) { +func (r *LocalQueueReconciler) NotifyWorkloadUpdate(w *kueue.Workload) { r.wlUpdateCh <- event.GenericEvent{Object: w} } //+kubebuilder:rbac:groups="",resources=events,verbs=create;watch;update -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=queues,verbs=get;list;watch;create;update;patch;delete -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=queues/status,verbs=get;update;patch -//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=queues/finalizers,verbs=update +//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues,verbs=get;list;watch;create;update;patch;delete +//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues/status,verbs=get;update;patch +//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=localqueues/finalizers,verbs=update -func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - var queueObj kueue.Queue +func (r *LocalQueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var queueObj kueue.LocalQueue if err := r.client.Get(ctx, req.NamespacedName, &queueObj); err != nil { // we'll ignore not-found errors, since there is nothing to do. return ctrl.Result{}, client.IgnoreNotFound(err) } - log := ctrl.LoggerFrom(ctx).WithValues("queue", klog.KObj(&queueObj)) + log := ctrl.LoggerFrom(ctx).WithValues("localQueue", klog.KObj(&queueObj)) ctx = ctrl.LoggerInto(ctx, log) - log.V(2).Info("Reconciling Queue") + log.V(2).Info("Reconciling LocalQueue") // Shallow copy enough for now. oldStatus := queueObj.Status pending, err := r.queues.PendingWorkloads(&queueObj) if err != nil { - r.log.Error(err, "Failed to retrieve queue status") + r.log.Error(err, "Failed to retrieve localQueue status") return ctrl.Result{}, err } @@ -91,55 +91,55 @@ func (r *QueueReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl return ctrl.Result{}, nil } -func (r *QueueReconciler) Create(e event.CreateEvent) bool { - q, match := e.Object.(*kueue.Queue) +func (r *LocalQueueReconciler) Create(e event.CreateEvent) bool { + q, match := e.Object.(*kueue.LocalQueue) if !match { // No need to interact with the queue manager for other objects. return true } - log := r.log.WithValues("queue", klog.KObj(q)) - log.V(2).Info("Queue create event") + log := r.log.WithValues("localQueue", klog.KObj(q)) + log.V(2).Info("LocalQueue create event") ctx := logr.NewContext(context.Background(), log) - if err := r.queues.AddQueue(ctx, q); err != nil { - log.Error(err, "Failed to add queue to the queueing system") + if err := r.queues.AddLocalQueue(ctx, q); err != nil { + log.Error(err, "Failed to add localQueue to the queueing system") } - if err := r.cache.AddQueue(q); err != nil { - log.Error(err, "Failed to add queue to the cache") + if err := r.cache.AddLocalQueue(q); err != nil { + log.Error(err, "Failed to add localQueue to the cache") } return true } -func (r *QueueReconciler) Delete(e event.DeleteEvent) bool { - q, match := e.Object.(*kueue.Queue) +func (r *LocalQueueReconciler) Delete(e event.DeleteEvent) bool { + q, match := e.Object.(*kueue.LocalQueue) if !match { // No need to interact with the queue manager for other objects. return true } - r.log.V(2).Info("Queue delete event", "queue", klog.KObj(q)) - r.queues.DeleteQueue(q) - r.cache.DeleteQueue(q) + r.log.V(2).Info("LocalQueue delete event", "localQueue", klog.KObj(q)) + r.queues.DeleteLocalQueue(q) + r.cache.DeleteLocalQueue(q) return true } -func (r *QueueReconciler) Update(e event.UpdateEvent) bool { - q, match := e.ObjectNew.(*kueue.Queue) +func (r *LocalQueueReconciler) Update(e event.UpdateEvent) bool { + q, match := e.ObjectNew.(*kueue.LocalQueue) if !match { // No need to interact with the queue manager for other objects. return true } - log := r.log.WithValues("queue", klog.KObj(q)) + log := r.log.WithValues("localQueue", klog.KObj(q)) log.V(2).Info("Queue update event") - if err := r.queues.UpdateQueue(q); err != nil { + if err := r.queues.UpdateLocalQueue(q); err != nil { log.Error(err, "Failed to update queue in the queueing system") } - oldQ := e.ObjectOld.(*kueue.Queue) - if err := r.cache.UpdateQueue(oldQ, q); err != nil { - log.Error(err, "Failed to update queue in the cache") + oldQ := e.ObjectOld.(*kueue.LocalQueue) + if err := r.cache.UpdateLocalQueue(oldQ, q); err != nil { + log.Error(err, "Failed to update localQueue in the cache") } return true } -func (r *QueueReconciler) Generic(e event.GenericEvent) bool { +func (r *LocalQueueReconciler) Generic(e event.GenericEvent) bool { r.log.V(3).Info("Got Workload event", "workload", klog.KObj(e.Object)) return true } @@ -174,9 +174,9 @@ func (h *qWorkloadHandler) Generic(e event.GenericEvent, q workqueue.RateLimitin } // SetupWithManager sets up the controller with the Manager. -func (r *QueueReconciler) SetupWithManager(mgr ctrl.Manager) error { +func (r *LocalQueueReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&kueue.Queue{}). + For(&kueue.LocalQueue{}). Watches(&source.Channel{Source: r.wlUpdateCh}, &qWorkloadHandler{}). WithEventFilter(r). Complete(r) diff --git a/pkg/queue/cluster_queue_impl.go b/pkg/queue/cluster_queue_impl.go index fa507e5e6d..308cd4fb96 100644 --- a/pkg/queue/cluster_queue_impl.go +++ b/pkg/queue/cluster_queue_impl.go @@ -71,7 +71,7 @@ func (c *ClusterQueueImpl) Cohort() string { return c.cohort } -func (c *ClusterQueueImpl) AddFromQueue(q *Queue) bool { +func (c *ClusterQueueImpl) AddFromLocalQueue(q *LocalQueue) bool { added := false for _, info := range q.items { if c.heap.PushIfNotPresent(info) { @@ -103,7 +103,7 @@ func (c *ClusterQueueImpl) Delete(w *kueue.Workload) { c.heap.Delete(key) } -func (c *ClusterQueueImpl) DeleteFromQueue(q *Queue) { +func (c *ClusterQueueImpl) DeleteFromLocalQueue(q *LocalQueue) { for _, w := range q.items { key := workload.Key(w.Obj) if wl := c.inadmissibleWorkloads[key]; wl != nil { diff --git a/pkg/queue/cluster_queue_impl_test.go b/pkg/queue/cluster_queue_impl_test.go index 52c59e5122..d6fe846b3f 100644 --- a/pkg/queue/cluster_queue_impl_test.go +++ b/pkg/queue/cluster_queue_impl_test.go @@ -127,28 +127,28 @@ func Test_Info(t *testing.T) { } } -func Test_AddFromQueue(t *testing.T) { +func Test_AddFromLocalQueue(t *testing.T) { cq := newClusterQueueImpl(keyFunc, byCreationTime) wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj() - queue := &Queue{ + queue := &LocalQueue{ items: map[string]*workload.Info{ wl.Name: workload.NewInfo(wl), }, } cq.PushOrUpdate(workload.NewInfo(wl)) - if added := cq.AddFromQueue(queue); added { + if added := cq.AddFromLocalQueue(queue); added { t.Error("expected workload not to be added") } cq.Delete(wl) - if added := cq.AddFromQueue(queue); !added { + if added := cq.AddFromLocalQueue(queue); !added { t.Error("workload should be added to the ClusterQueue") } } -func Test_DeleteFromQueue(t *testing.T) { +func Test_DeleteFromLocalQueue(t *testing.T) { cq := newClusterQueueImpl(keyFunc, byCreationTime) - q := utiltesting.MakeQueue("foo", "").ClusterQueue("cq").Obj() - qImpl := newQueue(q) + q := utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj() + qImpl := newLocalQueue(q) wl1 := utiltesting.MakeWorkload("wl1", "").Queue(q.Name).Obj() wl2 := utiltesting.MakeWorkload("wl2", "").Queue(q.Name).Obj() wl3 := utiltesting.MakeWorkload("wl3", "").Queue(q.Name).Obj() @@ -176,7 +176,7 @@ func Test_DeleteFromQueue(t *testing.T) { t.Errorf("clusterQueue's workload number in inadmissibleWorkloads not right, want %v, got %v", len(inadmissibleWorkloads), len(cq.inadmissibleWorkloads)) } - cq.DeleteFromQueue(qImpl) + cq.DeleteFromLocalQueue(qImpl) if cq.Pending() != 0 { t.Error("clusterQueue should be empty") } diff --git a/pkg/queue/cluster_queue_interface.go b/pkg/queue/cluster_queue_interface.go index 23b7b9391e..af921f3206 100644 --- a/pkg/queue/cluster_queue_interface.go +++ b/pkg/queue/cluster_queue_interface.go @@ -46,10 +46,10 @@ type ClusterQueue interface { // AddFromQueue pushes all workloads belonging to this queue to // the ClusterQueue. If at least one workload is added, returns true. // Otherwise returns false. - AddFromQueue(*Queue) bool + AddFromLocalQueue(*LocalQueue) bool // DeleteFromQueue removes all workloads belonging to this queue from // the ClusterQueue. - DeleteFromQueue(*Queue) + DeleteFromLocalQueue(*LocalQueue) // PushOrUpdate pushes the workload to ClusterQueue. // If the workload is already present, updates with the new one. diff --git a/pkg/queue/queue.go b/pkg/queue/local_queue.go similarity index 76% rename from pkg/queue/queue.go rename to pkg/queue/local_queue.go index 86e0b09327..b1dd163b46 100644 --- a/pkg/queue/queue.go +++ b/pkg/queue/local_queue.go @@ -29,20 +29,20 @@ func keyFunc(obj interface{}) string { } // Key is the key used to index the queue. -func Key(q *kueue.Queue) string { +func Key(q *kueue.LocalQueue) string { return fmt.Sprintf("%s/%s", q.Namespace, q.Name) } -// Queue is the internal implementation of kueue.Queue. -type Queue struct { +// LocalQueue is the internal implementation of kueue.LocalQueue. +type LocalQueue struct { Key string ClusterQueue string items map[string]*workload.Info } -func newQueue(q *kueue.Queue) *Queue { - qImpl := &Queue{ +func newLocalQueue(q *kueue.LocalQueue) *LocalQueue { + qImpl := &LocalQueue{ Key: Key(q), items: make(map[string]*workload.Info), } @@ -50,16 +50,16 @@ func newQueue(q *kueue.Queue) *Queue { return qImpl } -func (q *Queue) update(apiQueue *kueue.Queue) { +func (q *LocalQueue) update(apiQueue *kueue.LocalQueue) { q.ClusterQueue = string(apiQueue.Spec.ClusterQueue) } -func (q *Queue) AddOrUpdate(info *workload.Info) { +func (q *LocalQueue) AddOrUpdate(info *workload.Info) { key := workload.Key(info.Obj) q.items[key] = info } -func (q *Queue) AddIfNotPresent(w *workload.Info) bool { +func (q *LocalQueue) AddIfNotPresent(w *workload.Info) bool { key := workload.Key(w.Obj) _, ok := q.items[key] if !ok { diff --git a/pkg/queue/manager.go b/pkg/queue/manager.go index 9301748593..9195e6c3b2 100644 --- a/pkg/queue/manager.go +++ b/pkg/queue/manager.go @@ -50,7 +50,7 @@ type Manager struct { client client.Client statusChecker StatusChecker clusterQueues map[string]ClusterQueue - queues map[string]*Queue + localQueues map[string]*LocalQueue // Key is cohort's name. Value is a set of associated ClusterQueue names. cohorts map[string]sets.String @@ -60,7 +60,7 @@ func NewManager(client client.Client, checker StatusChecker) *Manager { m := &Manager{ client: client, statusChecker: checker, - queues: make(map[string]*Queue), + localQueues: make(map[string]*LocalQueue), clusterQueues: make(map[string]ClusterQueue), cohorts: make(map[string]sets.String), } @@ -89,7 +89,7 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e // Iterate through existing queues, as queues corresponding to this cluster // queue might have been added earlier. - var queues kueue.QueueList + var queues kueue.LocalQueueList if err := m.client.List(ctx, &queues, client.MatchingFields{queueClusterQueueKey: cq.Name}); err != nil { return fmt.Errorf("listing queues pointing to the cluster queue: %w", err) } @@ -99,9 +99,9 @@ func (m *Manager) AddClusterQueue(ctx context.Context, cq *kueue.ClusterQueue) e if string(q.Spec.ClusterQueue) != cq.Name { continue } - qImpl := m.queues[Key(&q)] + qImpl := m.localQueues[Key(&q)] if qImpl != nil { - added := cqImpl.AddFromQueue(qImpl) + added := cqImpl.AddFromLocalQueue(qImpl) addedWorkloads = addedWorkloads || added } } @@ -155,16 +155,16 @@ func (m *Manager) DeleteClusterQueue(cq *kueue.ClusterQueue) { m.deleteCohort(cohort, cq.Name) } -func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error { +func (m *Manager) AddLocalQueue(ctx context.Context, q *kueue.LocalQueue) error { m.Lock() defer m.Unlock() key := Key(q) - if _, ok := m.queues[key]; ok { + if _, ok := m.localQueues[key]; ok { return fmt.Errorf("queue %q already exists", q.Name) } - qImpl := newQueue(q) - m.queues[key] = qImpl + qImpl := newLocalQueue(q) + m.localQueues[key] = qImpl // Iterate through existing workloads, as workloads corresponding to this // queue might have been added earlier. var workloads kueue.WorkloadList @@ -180,26 +180,26 @@ func (m *Manager) AddQueue(ctx context.Context, q *kueue.Queue) error { qImpl.AddOrUpdate(workload.NewInfo(&w)) } cq := m.clusterQueues[qImpl.ClusterQueue] - if cq != nil && cq.AddFromQueue(qImpl) { + if cq != nil && cq.AddFromLocalQueue(qImpl) { m.Broadcast() } return nil } -func (m *Manager) UpdateQueue(q *kueue.Queue) error { +func (m *Manager) UpdateLocalQueue(q *kueue.LocalQueue) error { m.Lock() defer m.Unlock() - qImpl, ok := m.queues[Key(q)] + qImpl, ok := m.localQueues[Key(q)] if !ok { return errQueueDoesNotExist } if qImpl.ClusterQueue != string(q.Spec.ClusterQueue) { oldCQ := m.clusterQueues[qImpl.ClusterQueue] if oldCQ != nil { - oldCQ.DeleteFromQueue(qImpl) + oldCQ.DeleteFromLocalQueue(qImpl) } newCQ := m.clusterQueues[string(q.Spec.ClusterQueue)] - if newCQ != nil && newCQ.AddFromQueue(qImpl) { + if newCQ != nil && newCQ.AddFromLocalQueue(qImpl) { m.Broadcast() } } @@ -207,26 +207,26 @@ func (m *Manager) UpdateQueue(q *kueue.Queue) error { return nil } -func (m *Manager) DeleteQueue(q *kueue.Queue) { +func (m *Manager) DeleteLocalQueue(q *kueue.LocalQueue) { m.Lock() defer m.Unlock() key := Key(q) - qImpl := m.queues[key] + qImpl := m.localQueues[key] if qImpl == nil { return } cq := m.clusterQueues[qImpl.ClusterQueue] if cq != nil { - cq.DeleteFromQueue(qImpl) + cq.DeleteFromLocalQueue(qImpl) } - delete(m.queues, key) + delete(m.localQueues, key) } -func (m *Manager) PendingWorkloads(q *kueue.Queue) (int32, error) { +func (m *Manager) PendingWorkloads(q *kueue.LocalQueue) (int32, error) { m.RLock() defer m.RUnlock() - qImpl, ok := m.queues[Key(q)] + qImpl, ok := m.localQueues[Key(q)] if !ok { return 0, errQueueDoesNotExist } @@ -243,7 +243,7 @@ func (m *Manager) Pending(cq *kueue.ClusterQueue) int { func (m *Manager) QueueForWorkloadExists(wl *kueue.Workload) bool { m.RLock() defer m.RUnlock() - _, ok := m.queues[workload.QueueKey(wl)] + _, ok := m.localQueues[workload.QueueKey(wl)] return ok } @@ -254,7 +254,7 @@ func (m *Manager) QueueForWorkloadExists(wl *kueue.Workload) bool { func (m *Manager) ClusterQueueForWorkload(wl *kueue.Workload) (string, bool) { m.RLock() defer m.RUnlock() - q, ok := m.queues[workload.QueueKey(wl)] + q, ok := m.localQueues[workload.QueueKey(wl)] if !ok { return "", false } @@ -272,7 +272,7 @@ func (m *Manager) AddOrUpdateWorkload(w *kueue.Workload) bool { func (m *Manager) addOrUpdateWorkload(w *kueue.Workload) bool { qKey := workload.QueueKey(w) - q := m.queues[qKey] + q := m.localQueues[qKey] if q == nil { return false } @@ -303,7 +303,7 @@ func (m *Manager) RequeueWorkload(ctx context.Context, info *workload.Info, reas return false } - q := m.queues[workload.QueueKey(&w)] + q := m.localQueues[workload.QueueKey(&w)] if q == nil { return false } @@ -329,7 +329,7 @@ func (m *Manager) DeleteWorkload(w *kueue.Workload) { } func (m *Manager) deleteWorkloadFromQueueAndClusterQueue(w *kueue.Workload, qKey string) { - q := m.queues[qKey] + q := m.localQueues[qKey] if q == nil { return } @@ -348,7 +348,7 @@ func (m *Manager) QueueAssociatedInadmissibleWorkloads(ctx context.Context, w *k m.Lock() defer m.Unlock() - q := m.queues[workload.QueueKey(w)] + q := m.localQueues[workload.QueueKey(w)] if q == nil { return } @@ -459,10 +459,10 @@ func (m *Manager) Heads(ctx context.Context) []workload.Info { func (m *Manager) Dump() map[string]sets.String { m.Lock() defer m.Unlock() - if len(m.queues) == 0 { + if len(m.localQueues) == 0 { return nil } - dump := make(map[string]sets.String, len(m.queues)) + dump := make(map[string]sets.String, len(m.localQueues)) for key, cq := range m.clusterQueues { if elements, ok := cq.Dump(); ok { dump[key] = elements @@ -479,10 +479,10 @@ func (m *Manager) Dump() map[string]sets.String { func (m *Manager) DumpInadmissible() map[string]sets.String { m.Lock() defer m.Unlock() - if len(m.queues) == 0 { + if len(m.localQueues) == 0 { return nil } - dump := make(map[string]sets.String, len(m.queues)) + dump := make(map[string]sets.String, len(m.localQueues)) for key, cq := range m.clusterQueues { if elements, ok := cq.DumpInadmissible(); ok { dump[key] = elements @@ -509,7 +509,7 @@ func (m *Manager) heads() []workload.Info { wlCopy := *wl wlCopy.ClusterQueue = cqName workloads = append(workloads, wlCopy) - q := m.queues[workload.QueueKey(wl.Obj)] + q := m.localQueues[workload.QueueKey(wl.Obj)] delete(q.items, workload.Key(wl.Obj)) } return workloads @@ -561,8 +561,8 @@ func SetupIndexes(indexer client.FieldIndexer) error { if err != nil { return fmt.Errorf("setting index on queue for Workload: %w", err) } - err = indexer.IndexField(context.Background(), &kueue.Queue{}, queueClusterQueueKey, func(o client.Object) []string { - q := o.(*kueue.Queue) + err = indexer.IndexField(context.Background(), &kueue.LocalQueue{}, queueClusterQueueKey, func(o client.Object) []string { + q := o.(*kueue.LocalQueue) return []string{string(q.Spec.ClusterQueue)} }) if err != nil { diff --git a/pkg/queue/manager_test.go b/pkg/queue/manager_test.go index 19d1ef740a..bcdf6a78db 100644 --- a/pkg/queue/manager_test.go +++ b/pkg/queue/manager_test.go @@ -37,9 +37,9 @@ import ( const headsTimeout = 3 * time.Second -// TestAddQueueOrphans verifies that pods added before adding the queue are +// TestAddLocalQueueOrphans verifies that pods added before adding the queue are // present when the queue is added. -func TestAddQueueOrphans(t *testing.T) { +func TestAddLocalQueueOrphans(t *testing.T) { scheme := runtime.NewScheme() if err := kueue.AddToScheme(scheme); err != nil { t.Fatalf("Failed adding kueue scheme: %s", err) @@ -53,12 +53,12 @@ func TestAddQueueOrphans(t *testing.T) { utiltesting.MakeWorkload("a", "moon").Queue("foo").Obj(), ).Build() manager := NewManager(kClient, nil) - q := utiltesting.MakeQueue("foo", "earth").Obj() - if err := manager.AddQueue(context.Background(), q); err != nil { + q := utiltesting.MakeLocalQueue("foo", "earth").Obj() + if err := manager.AddLocalQueue(context.Background(), q); err != nil { t.Fatalf("Failed adding queue: %v", err) } - qImpl := manager.queues[Key(q)] - workloadNames := workloadNamesFromQ(qImpl) + qImpl := manager.localQueues[Key(q)] + workloadNames := workloadNamesFromLQ(qImpl) if diff := cmp.Diff(sets.NewString("earth/a", "earth/c"), workloadNames); diff != "" { t.Errorf("Unexpected items in queue foo (-want,+got):\n%s", diff) } @@ -72,9 +72,9 @@ func TestAddClusterQueueOrphans(t *testing.T) { t.Fatalf("Failed adding kueue scheme: %v", err) } now := time.Now() - queues := []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq").Obj(), - utiltesting.MakeQueue("bar", "").ClusterQueue("cq").Obj(), + queues := []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(), + utiltesting.MakeLocalQueue("bar", "").ClusterQueue("cq").Obj(), } kClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects( utiltesting.MakeWorkload("a", "").Queue("foo").Creation(now.Add(time.Second)).Obj(), @@ -88,7 +88,7 @@ func TestAddClusterQueueOrphans(t *testing.T) { ctx := context.Background() manager := NewManager(kClient, nil) for _, q := range queues { - if err := manager.AddQueue(ctx, q); err != nil { + if err := manager.AddLocalQueue(ctx, q); err != nil { t.Fatalf("Failed adding queue %s: %v", q.Name, err) } } @@ -110,9 +110,9 @@ func TestUpdateQueue(t *testing.T) { utiltesting.MakeClusterQueue("cq1").Obj(), utiltesting.MakeClusterQueue("cq2").Obj(), } - queues := []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq1").Obj(), - utiltesting.MakeQueue("bar", "").ClusterQueue("cq2").Obj(), + queues := []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq1").Obj(), + utiltesting.MakeLocalQueue("bar", "").ClusterQueue("cq2").Obj(), } now := time.Now() workloads := []*kueue.Workload{ @@ -132,7 +132,7 @@ func TestUpdateQueue(t *testing.T) { } } for _, q := range queues { - if err := manager.AddQueue(ctx, q); err != nil { + if err := manager.AddLocalQueue(ctx, q); err != nil { t.Fatalf("Failed adding queue %s: %v", q.Name, err) } } @@ -142,7 +142,7 @@ func TestUpdateQueue(t *testing.T) { // Update cluster queue of first queue. queues[0].Spec.ClusterQueue = "cq2" - if err := manager.UpdateQueue(queues[0]); err != nil { + if err := manager.UpdateLocalQueue(queues[0]); err != nil { t.Fatalf("Failed updating queue: %v", err) } @@ -170,12 +170,12 @@ func TestAddWorkload(t *testing.T) { if err := manager.AddClusterQueue(context.Background(), cq); err != nil { t.Fatalf("Failed adding clusterQueue %s: %v", cq.Name, err) } - queues := []*kueue.Queue{ - utiltesting.MakeQueue("foo", "earth").ClusterQueue("cq").Obj(), - utiltesting.MakeQueue("bar", "mars").Obj(), + queues := []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "earth").ClusterQueue("cq").Obj(), + utiltesting.MakeLocalQueue("bar", "mars").Obj(), } for _, q := range queues { - if err := manager.AddQueue(context.Background(), q); err != nil { + if err := manager.AddLocalQueue(context.Background(), q); err != nil { t.Fatalf("Failed adding queue %s: %v", q.Name, err) } } @@ -238,16 +238,16 @@ func TestStatus(t *testing.T) { } now := time.Now().Truncate(time.Second) - queues := []kueue.Queue{ + queues := []kueue.LocalQueue{ { ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "fooCq", }, }, { ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "barCq", }, }, @@ -285,7 +285,7 @@ func TestStatus(t *testing.T) { manager := NewManager(fake.NewClientBuilder().WithScheme(scheme).Build(), nil) for _, q := range queues { - if err := manager.AddQueue(ctx, &q); err != nil { + if err := manager.AddLocalQueue(ctx, &q); err != nil { t.Errorf("Failed adding queue: %s", err) } } @@ -295,7 +295,7 @@ func TestStatus(t *testing.T) { } cases := map[string]struct { - queue *kueue.Queue + queue *kueue.LocalQueue wantStatus int32 wantErr error }{ @@ -310,7 +310,7 @@ func TestStatus(t *testing.T) { wantErr: nil, }, "fake": { - queue: &kueue.Queue{ObjectMeta: metav1.ObjectMeta{Name: "fake"}}, + queue: &kueue.LocalQueue{ObjectMeta: metav1.ObjectMeta{Name: "fake"}}, wantStatus: 0, wantErr: errQueueDoesNotExist, }, @@ -334,9 +334,9 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) { t.Fatalf("Failed adding kueue scheme: %s", err) } cq := utiltesting.MakeClusterQueue("cq").Obj() - queues := []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq").Obj(), - utiltesting.MakeQueue("bar", "").Obj(), + queues := []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(), + utiltesting.MakeLocalQueue("bar", "").Obj(), } cases := []struct { workload *kueue.Workload @@ -401,7 +401,7 @@ func TestRequeueWorkloadStrictFIFO(t *testing.T) { t.Fatalf("Failed adding cluster queue %s: %v", cq.Name, err) } for _, q := range queues { - if err := manager.AddQueue(ctx, q); err != nil { + if err := manager.AddLocalQueue(ctx, q); err != nil { t.Fatalf("Failed adding queue %s: %v", q.Name, err) } } @@ -431,7 +431,7 @@ func TestUpdateWorkload(t *testing.T) { now := time.Now() cases := map[string]struct { clusterQueues []*kueue.ClusterQueue - queues []*kueue.Queue + queues []*kueue.LocalQueue workloads []*kueue.Workload update func(*kueue.Workload) wantUpdated bool @@ -442,8 +442,8 @@ func TestUpdateWorkload(t *testing.T) { clusterQueues: []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("cq").Obj(), }, - queues: []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq").Obj(), + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(), }, workloads: []*kueue.Workload{ utiltesting.MakeWorkload("a", "").Queue("foo").Creation(now).Obj(), @@ -464,9 +464,9 @@ func TestUpdateWorkload(t *testing.T) { clusterQueues: []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("cq").Obj(), }, - queues: []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq").Obj(), - utiltesting.MakeQueue("bar", "").ClusterQueue("cq").Obj(), + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(), + utiltesting.MakeLocalQueue("bar", "").ClusterQueue("cq").Obj(), }, workloads: []*kueue.Workload{ utiltesting.MakeWorkload("a", "").Queue("foo").Obj(), @@ -488,9 +488,9 @@ func TestUpdateWorkload(t *testing.T) { utiltesting.MakeClusterQueue("cq1").Obj(), utiltesting.MakeClusterQueue("cq2").Obj(), }, - queues: []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq1").Obj(), - utiltesting.MakeQueue("bar", "").ClusterQueue("cq2").Obj(), + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq1").Obj(), + utiltesting.MakeLocalQueue("bar", "").ClusterQueue("cq2").Obj(), }, workloads: []*kueue.Workload{ utiltesting.MakeWorkload("a", "").Queue("foo").Obj(), @@ -512,8 +512,8 @@ func TestUpdateWorkload(t *testing.T) { clusterQueues: []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("cq").Obj(), }, - queues: []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq").Obj(), + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(), }, workloads: []*kueue.Workload{ utiltesting.MakeWorkload("a", "").Queue("foo").Obj(), @@ -532,8 +532,8 @@ func TestUpdateWorkload(t *testing.T) { clusterQueues: []*kueue.ClusterQueue{ utiltesting.MakeClusterQueue("cq").Obj(), }, - queues: []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("cq").Obj(), + queues: []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj(), }, workloads: []*kueue.Workload{ utiltesting.MakeWorkload("a", "").Queue("bar").Obj(), @@ -560,7 +560,7 @@ func TestUpdateWorkload(t *testing.T) { } } for _, q := range tc.queues { - if err := manager.AddQueue(ctx, q); err != nil { + if err := manager.AddLocalQueue(ctx, q); err != nil { t.Fatalf("Adding queue %q: %v", q.Name, err) } } @@ -572,7 +572,7 @@ func TestUpdateWorkload(t *testing.T) { if updated := manager.UpdateWorkload(tc.workloads[0], wl); updated != tc.wantUpdated { t.Errorf("UpdatedWorkload returned %t, want %t", updated, tc.wantUpdated) } - q := manager.queues[workload.QueueKey(wl)] + q := manager.localQueues[workload.QueueKey(wl)] if q != nil { key := workload.Key(wl) item := q.items[key] @@ -599,8 +599,8 @@ func TestUpdateWorkload(t *testing.T) { t.Errorf("Elements popped in the wrong order from clusterQueues (-want,+got):\n%s", diff) } queueMembers := make(map[string]sets.String) - for name, q := range manager.queues { - queueMembers[name] = workloadNamesFromQ(q) + for name, q := range manager.localQueues { + queueMembers[name] = workloadNamesFromLQ(q) } if diff := cmp.Diff(tc.wantQueueMembers, queueMembers); diff != "" { t.Errorf("Elements present in wrong queues (-want,+got):\n%s", diff) @@ -621,10 +621,10 @@ func TestHeads(t *testing.T) { utiltesting.MakeClusterQueue("active-barCq").Obj(), utiltesting.MakeClusterQueue("pending-bazCq").Obj(), } - queues := []*kueue.Queue{ - utiltesting.MakeQueue("foo", "").ClusterQueue("active-fooCq").Obj(), - utiltesting.MakeQueue("bar", "").ClusterQueue("active-barCq").Obj(), - utiltesting.MakeQueue("baz", "").ClusterQueue("pending-bazCq").Obj(), + queues := []*kueue.LocalQueue{ + utiltesting.MakeLocalQueue("foo", "").ClusterQueue("active-fooCq").Obj(), + utiltesting.MakeLocalQueue("bar", "").ClusterQueue("active-barCq").Obj(), + utiltesting.MakeLocalQueue("baz", "").ClusterQueue("pending-bazCq").Obj(), } tests := []struct { name string @@ -675,7 +675,7 @@ func TestHeads(t *testing.T) { } } for _, q := range queues { - if err := manager.AddQueue(ctx, q); err != nil { + if err := manager.AddLocalQueue(ctx, q); err != nil { t.Fatalf("Failed adding queue %s: %s", q.Name, err) } } @@ -719,16 +719,16 @@ func TestHeadsAsync(t *testing.T) { Spec: kueue.WorkloadSpec{QueueName: "foo"}, } var newWl kueue.Workload - queues := []kueue.Queue{ + queues := []kueue.LocalQueue{ { ObjectMeta: metav1.ObjectMeta{Name: "foo"}, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "fooCq", }, }, { ObjectMeta: metav1.ObjectMeta{Name: "bar"}, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "barCq", }, }, @@ -741,7 +741,7 @@ func TestHeadsAsync(t *testing.T) { "AddClusterQueue": { initialObjs: []client.Object{&wl, &queues[0]}, op: func(ctx context.Context, mgr *Manager) { - if err := mgr.AddQueue(ctx, &queues[0]); err != nil { + if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } mgr.AddOrUpdateWorkload(&wl) @@ -758,14 +758,14 @@ func TestHeadsAsync(t *testing.T) { }, }, }, - "AddQueue": { + "AddLocalQueue": { initialObjs: []client.Object{&wl}, op: func(ctx context.Context, mgr *Manager) { if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { t.Errorf("Failed adding clusterQueue: %v", err) } go func() { - if err := mgr.AddQueue(ctx, &queues[0]); err != nil { + if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } }() @@ -782,7 +782,7 @@ func TestHeadsAsync(t *testing.T) { if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { t.Errorf("Failed adding clusterQueue: %v", err) } - if err := mgr.AddQueue(ctx, &queues[0]); err != nil { + if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } go func() { @@ -801,7 +801,7 @@ func TestHeadsAsync(t *testing.T) { if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { t.Errorf("Failed adding clusterQueue: %v", err) } - if err := mgr.AddQueue(ctx, &queues[0]); err != nil { + if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } go func() { @@ -823,7 +823,7 @@ func TestHeadsAsync(t *testing.T) { if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { t.Errorf("Failed adding clusterQueue: %v", err) } - if err := mgr.AddQueue(ctx, &queues[0]); err != nil { + if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } // Remove the initial workload from the manager. @@ -845,7 +845,7 @@ func TestHeadsAsync(t *testing.T) { if err := mgr.AddClusterQueue(ctx, clusterQueues[0]); err != nil { t.Errorf("Failed adding clusterQueue: %v", err) } - if err := mgr.AddQueue(ctx, &queues[0]); err != nil { + if err := mgr.AddLocalQueue(ctx, &queues[0]); err != nil { t.Errorf("Failed adding queue: %s", err) } @@ -876,7 +876,7 @@ func TestHeadsAsync(t *testing.T) { } } for _, q := range queues { - if err := mgr.AddQueue(ctx, &q); err != nil { + if err := mgr.AddLocalQueue(ctx, &q); err != nil { t.Errorf("Failed adding queue: %s", err) } } @@ -940,8 +940,8 @@ func popNamesFromCQ(cq ClusterQueue) []string { return names } -// workloadNamesFromQ returns all the names of the workloads in a queue. -func workloadNamesFromQ(q *Queue) sets.String { +// workloadNamesFromLQ returns all the names of the workloads in a localQueue. +func workloadNamesFromLQ(q *LocalQueue) sets.String { names := sets.NewString() for k := range q.items { names.Insert(k) diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 26cc54310d..ac06ef4217 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -195,13 +195,13 @@ func TestSchedule(t *testing.T) { }, }, } - queues := []kueue.Queue{ + queues := []kueue.LocalQueue{ { ObjectMeta: metav1.ObjectMeta{ Namespace: "sales", Name: "main", }, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "sales", }, }, @@ -210,7 +210,7 @@ func TestSchedule(t *testing.T) { Namespace: "sales", Name: "blocked", }, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "eng-alpha", }, }, @@ -219,7 +219,7 @@ func TestSchedule(t *testing.T) { Namespace: "eng-alpha", Name: "main", }, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "eng-alpha", }, }, @@ -228,7 +228,7 @@ func TestSchedule(t *testing.T) { Namespace: "eng-beta", Name: "main", }, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "eng-beta", }, }, @@ -237,7 +237,7 @@ func TestSchedule(t *testing.T) { Namespace: "sales", Name: "flavor-nonexistent-queue", }, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "flavor-nonexistent-cq", }, }, @@ -246,7 +246,7 @@ func TestSchedule(t *testing.T) { Namespace: "sales", Name: "cq-nonexistent-queue", }, - Spec: kueue.QueueSpec{ + Spec: kueue.LocalQueueSpec{ ClusterQueue: "nonexistent-cq", }, }, @@ -789,7 +789,7 @@ func TestSchedule(t *testing.T) { t.Fatalf("Failed adding kueue scheme: %v", err) } clientBuilder := fake.NewClientBuilder().WithScheme(scheme). - WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.QueueList{Items: queues}). + WithLists(&kueue.WorkloadList{Items: tc.workloads}, &kueue.LocalQueueList{Items: queues}). WithObjects( &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-alpha", Labels: map[string]string{"dep": "eng"}}}, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "eng-beta", Labels: map[string]string{"dep": "eng"}}}, @@ -803,7 +803,7 @@ func TestSchedule(t *testing.T) { qManager := queue.NewManager(cl, cqCache) // Workloads are loaded into queues or clusterQueues as we add them. for _, q := range queues { - if err := qManager.AddQueue(ctx, &q); err != nil { + if err := qManager.AddLocalQueue(ctx, &q); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err) } } @@ -1725,7 +1725,7 @@ var ignoreConditionTimestamps = cmpopts.IgnoreFields(metav1.Condition{}, "LastTr func TestRequeueAndUpdate(t *testing.T) { cq := utiltesting.MakeClusterQueue("cq").Obj() - q1 := utiltesting.MakeQueue("q1", "ns1").ClusterQueue(cq.Name).Obj() + q1 := utiltesting.MakeLocalQueue("q1", "ns1").ClusterQueue(cq.Name).Obj() w1 := utiltesting.MakeWorkload("w1", "ns1").Queue(q1.Name).Obj() cases := []struct { @@ -1804,7 +1804,7 @@ func TestRequeueAndUpdate(t *testing.T) { cqCache := cache.New(cl) qManager := queue.NewManager(cl, cqCache) scheduler := New(qManager, cqCache, cl, recorder) - if err := qManager.AddQueue(ctx, q1); err != nil { + if err := qManager.AddLocalQueue(ctx, q1); err != nil { t.Fatalf("Inserting queue %s/%s in manager: %v", q1.Namespace, q1.Name, err) } if err := qManager.AddClusterQueue(ctx, cq); err != nil { diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index ddcb0f3cb6..fa965deb5b 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -243,12 +243,12 @@ func (w *AdmissionWrapper) Flavor(r corev1.ResourceName, f string) *AdmissionWra return w } -// QueueWrapper wraps a Queue. -type QueueWrapper struct{ kueue.Queue } +// LocalQueueWrapper wraps a Queue. +type LocalQueueWrapper struct{ kueue.LocalQueue } -// MakeQueue creates a wrapper for a Queue. -func MakeQueue(name, ns string) *QueueWrapper { - return &QueueWrapper{kueue.Queue{ +// MakeLocalQueue creates a wrapper for a LocalQueue. +func MakeLocalQueue(name, ns string) *LocalQueueWrapper { + return &LocalQueueWrapper{kueue.LocalQueue{ ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: ns, @@ -256,19 +256,19 @@ func MakeQueue(name, ns string) *QueueWrapper { }} } -// Obj returns the inner Queue. -func (q *QueueWrapper) Obj() *kueue.Queue { - return &q.Queue +// Obj returns the inner LocalQueue. +func (q *LocalQueueWrapper) Obj() *kueue.LocalQueue { + return &q.LocalQueue } // ClusterQueue updates the clusterQueue the queue points to. -func (q *QueueWrapper) ClusterQueue(c string) *QueueWrapper { +func (q *LocalQueueWrapper) ClusterQueue(c string) *LocalQueueWrapper { q.Spec.ClusterQueue = kueue.ClusterQueueReference(c) return q } // PendingWorkloads updates the pendingWorkloads in status. -func (q *QueueWrapper) PendingWorkloads(n int32) *QueueWrapper { +func (q *LocalQueueWrapper) PendingWorkloads(n int32) *LocalQueueWrapper { q.Status.PendingWorkloads = n return q } diff --git a/test/integration/controller/core/clusterqueue_controller_test.go b/test/integration/controller/core/clusterqueue_controller_test.go index e032b2dc0f..1f20c5da27 100644 --- a/test/integration/controller/core/clusterqueue_controller_test.go +++ b/test/integration/controller/core/clusterqueue_controller_test.go @@ -72,7 +72,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { ginkgo.When("Reconciling clusterQueue status", func() { var ( clusterQueue *kueue.ClusterQueue - queue *kueue.Queue + localQueue *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -84,8 +84,8 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { Flavor(testing.MakeFlavor(flavorModelA, "5").Max("10").Obj()). Flavor(testing.MakeFlavor(flavorModelB, "5").Max("10").Obj()).Obj()).Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { @@ -94,17 +94,17 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { ginkgo.It("Should update status when workloads are assigned and finish", func() { workloads := []*kueue.Workload{ - testing.MakeWorkload("one", ns.Name).Queue(queue.Name). + testing.MakeWorkload("one", ns.Name).Queue(localQueue.Name). Request(corev1.ResourceCPU, "2").Request(resourceGPU, "2").Obj(), - testing.MakeWorkload("two", ns.Name).Queue(queue.Name). + testing.MakeWorkload("two", ns.Name).Queue(localQueue.Name). Request(corev1.ResourceCPU, "3").Request(resourceGPU, "3").Obj(), - testing.MakeWorkload("three", ns.Name).Queue(queue.Name). + testing.MakeWorkload("three", ns.Name).Queue(localQueue.Name). Request(corev1.ResourceCPU, "1").Request(resourceGPU, "1").Obj(), - testing.MakeWorkload("four", ns.Name).Queue(queue.Name). + testing.MakeWorkload("four", ns.Name).Queue(localQueue.Name). Request(corev1.ResourceCPU, "1").Request(resourceGPU, "1").Obj(), testing.MakeWorkload("five", ns.Name).Queue("other"). Request(corev1.ResourceCPU, "1").Request(resourceGPU, "1").Obj(), - testing.MakeWorkload("six", ns.Name).Queue(queue.Name). + testing.MakeWorkload("six", ns.Name).Queue(localQueue.Name). Request(corev1.ResourceCPU, "1").Request(resourceGPU, "1").Obj(), } @@ -192,14 +192,14 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { ginkgo.When("Deleting clusterQueues", func() { var ( - cq *kueue.ClusterQueue - queue *kueue.Queue + cq *kueue.ClusterQueue + lq *kueue.LocalQueue ) ginkgo.BeforeEach(func() { cq = testing.MakeClusterQueue("foo-cq").Obj() - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) + lq = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, lq)).To(gomega.Succeed()) gomega.Expect(k8sClient.Create(ctx, cq)).To(gomega.Succeed()) }) @@ -210,7 +210,7 @@ var _ = ginkgo.Describe("ClusterQueue controller", func() { ginkgo.It("Should be stuck in termination until admitted workloads finished running", func() { ginkgo.By("Admit workload") admission := testing.MakeAdmission(cq.Name).Obj() - wl := testing.MakeWorkload("workload", ns.Name).Queue(queue.Name).Admit(admission).Obj() + wl := testing.MakeWorkload("workload", ns.Name).Queue(lq.Name).Admit(admission).Obj() gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) ginkgo.By("Delete clusterQueue") diff --git a/test/integration/controller/core/queue_controller_test.go b/test/integration/controller/core/localqueue_controller_test.go similarity index 87% rename from test/integration/controller/core/queue_controller_test.go rename to test/integration/controller/core/localqueue_controller_test.go index d98d6ef086..f404c359e3 100644 --- a/test/integration/controller/core/queue_controller_test.go +++ b/test/integration/controller/core/localqueue_controller_test.go @@ -33,7 +33,7 @@ import ( var _ = ginkgo.Describe("Queue controller", func() { var ( ns *corev1.Namespace - queue *kueue.Queue + queue *kueue.LocalQueue clusterQueue *kueue.ClusterQueue ) @@ -52,7 +52,7 @@ var _ = ginkgo.Describe("Queue controller", func() { Flavor(testing.MakeFlavor(flavorModelA, "5").Max("10").Obj()). Flavor(testing.MakeFlavor(flavorModelB, "5").Max("10").Obj()).Obj()).Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) }) @@ -78,11 +78,11 @@ var _ = ginkgo.Describe("Queue controller", func() { for _, w := range workloads { gomega.Expect(k8sClient.Create(ctx, w)).To(gomega.Succeed()) } - gomega.Eventually(func() kueue.QueueStatus { - var updatedQueue kueue.Queue + gomega.Eventually(func() kueue.LocalQueueStatus { + var updatedQueue kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status - }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.QueueStatus{PendingWorkloads: 3})) + }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 3})) ginkgo.By("Admitting workloads") for _, w := range workloads { @@ -94,18 +94,18 @@ var _ = ginkgo.Describe("Queue controller", func() { return k8sClient.Update(ctx, &newWL) }, framework.Timeout, framework.Interval).Should(gomega.Succeed()) } - gomega.Eventually(func() kueue.QueueStatus { - var updatedQueue kueue.Queue + gomega.Eventually(func() kueue.LocalQueueStatus { + var updatedQueue kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status - }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.QueueStatus{PendingWorkloads: 0})) + }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{PendingWorkloads: 0})) ginkgo.By("Finishing workloads") framework.FinishWorkloads(ctx, k8sClient, workloads...) - gomega.Eventually(func() kueue.QueueStatus { - var updatedQueue kueue.Queue + gomega.Eventually(func() kueue.LocalQueueStatus { + var updatedQueue kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(queue), &updatedQueue)).To(gomega.Succeed()) return updatedQueue.Status - }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.QueueStatus{})) + }, framework.Timeout, framework.Interval).Should(gomega.BeComparableTo(kueue.LocalQueueStatus{})) }) }) diff --git a/test/integration/controller/core/workload_controller_test.go b/test/integration/controller/core/workload_controller_test.go index 1f1e5feb75..f262e6d400 100644 --- a/test/integration/controller/core/workload_controller_test.go +++ b/test/integration/controller/core/workload_controller_test.go @@ -40,7 +40,7 @@ var _ = ginkgo.Describe("Workload controller", func() { var ( ns *corev1.Namespace updatedQueueWorkload kueue.Workload - queue *kueue.Queue + localQueue *kueue.LocalQueue wl *kueue.Workload message string runtimeClass *nodev1.RuntimeClass @@ -62,7 +62,7 @@ var _ = ginkgo.Describe("Workload controller", func() { ginkgo.AfterEach(func() { clusterQueue = nil - queue = nil + localQueue = nil updatedQueueWorkload = kueue.Workload{} updatedCQ = kueue.ClusterQueue{} }) @@ -101,14 +101,14 @@ var _ = ginkgo.Describe("Workload controller", func() { ginkgo.When("the clusterqueue doesn't exist", func() { ginkgo.BeforeEach(func() { - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue("fooclusterqueue").Obj() - gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue("fooclusterqueue").Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { gomega.Expect(framework.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) }) ginkgo.It("Should update status when workloads are created", func() { - wl = testing.MakeWorkload("three", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "1").Obj() + wl = testing.MakeWorkload("three", ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "1").Obj() message = fmt.Sprintf("ClusterQueue %s doesn't exist", "fooclusterqueue") gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) gomega.Eventually(func() []metav1.Condition { @@ -130,8 +130,8 @@ var _ = ginkgo.Describe("Workload controller", func() { Flavor(testing.MakeFlavor(flavorOnDemand, "5").Max("10").Obj()).Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { gomega.Expect(framework.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) @@ -141,7 +141,7 @@ var _ = ginkgo.Describe("Workload controller", func() { ginkgo.It("Should update the workload's condition", func() { ginkgo.By("Create workload") - wl = testing.MakeWorkload("one", ns.Name).Queue(queue.Name).Request(corev1.ResourceCPU, "1").Obj() + wl = testing.MakeWorkload("one", ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "1").Obj() gomega.Expect(k8sClient.Create(ctx, wl)).To(gomega.Succeed()) ginkgo.By("Admit workload") @@ -165,8 +165,8 @@ var _ = ginkgo.Describe("Workload controller", func() { Flavor(testing.MakeFlavor(flavorOnDemand, "5").Max("10").Obj()).Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { gomega.Expect(framework.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) @@ -177,7 +177,7 @@ var _ = ginkgo.Describe("Workload controller", func() { ginkgo.It("Should accumulate RuntimeClass's overhead", func() { ginkgo.By("Create workload") wl = testing.MakeWorkload("one", ns.Name). - Queue(queue.Name). + Queue(localQueue.Name). Request(corev1.ResourceCPU, "1"). Admit(testing.MakeAdmission(clusterQueue.Name). Flavor(corev1.ResourceCPU, flavorOnDemand).Obj()). @@ -211,8 +211,8 @@ var _ = ginkgo.Describe("Workload controller", func() { Flavor(testing.MakeFlavor(flavorOnDemand, "5").Max("10").Obj()).Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, clusterQueue)).To(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, queue)).To(gomega.Succeed()) + localQueue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(clusterQueue.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).To(gomega.Succeed()) }) ginkgo.AfterEach(func() { gomega.Expect(framework.DeleteNamespace(ctx, k8sClient, ns)).To(gomega.Succeed()) @@ -222,7 +222,7 @@ var _ = ginkgo.Describe("Workload controller", func() { ginkgo.It("Should not accumulate RuntimeClass's overhead", func() { ginkgo.By("Create workload") wl = testing.MakeWorkload("one", ns.Name). - Queue(queue.Name). + Queue(localQueue.Name). Request(corev1.ResourceCPU, "1"). Admit(testing.MakeAdmission(clusterQueue.Name). Flavor(corev1.ResourceCPU, flavorOnDemand).Obj()). diff --git a/test/integration/controller/job/job_controller_test.go b/test/integration/controller/job/job_controller_test.go index 5b01467292..360849929e 100644 --- a/test/integration/controller/job/job_controller_test.go +++ b/test/integration/controller/job/job_controller_test.go @@ -289,8 +289,8 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() { spotUntaintedFlavor *kueue.ResourceFlavor prodClusterQ *kueue.ClusterQueue devClusterQ *kueue.ClusterQueue - prodQueue *kueue.Queue - devQueue *kueue.Queue + prodLocalQ *kueue.LocalQueue + devLocalQ *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -339,11 +339,11 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, devClusterQ)).Should(gomega.Succeed()) - prodQueue = testing.MakeQueue("prod-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed()) + prodLocalQ = testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, prodLocalQ)).Should(gomega.Succeed()) - devQueue = testing.MakeQueue("dev-queue", ns.Name).ClusterQueue(devClusterQ.Name).Obj() - gomega.Expect(k8sClient.Create(ctx, devQueue)).Should(gomega.Succeed()) + devLocalQ = testing.MakeLocalQueue("dev-queue", ns.Name).ClusterQueue(devClusterQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, devLocalQ)).Should(gomega.Succeed()) }) ginkgo.AfterEach(func() { @@ -359,7 +359,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() { ginkgo.It("Should schedule jobs as they fit in their ClusterQueue", func() { ginkgo.By("checking the first prod job starts") - prodJob1 := testing.MakeJob("prod-job1", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "2").Obj() + prodJob1 := testing.MakeJob("prod-job1", ns.Name).Queue(prodLocalQ.Name).Request(corev1.ResourceCPU, "2").Obj() gomega.Expect(k8sClient.Create(ctx, prodJob1)).Should(gomega.Succeed()) lookupKey1 := types.NamespacedName{Name: prodJob1.Name, Namespace: prodJob1.Namespace} createdProdJob1 := &batchv1.Job{} @@ -372,7 +372,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() { framework.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) ginkgo.By("checking a second no-fit prod job does not start") - prodJob2 := testing.MakeJob("prod-job2", ns.Name).Queue(prodQueue.Name).Request(corev1.ResourceCPU, "5").Obj() + prodJob2 := testing.MakeJob("prod-job2", ns.Name).Queue(prodLocalQ.Name).Request(corev1.ResourceCPU, "5").Obj() gomega.Expect(k8sClient.Create(ctx, prodJob2)).Should(gomega.Succeed()) lookupKey2 := types.NamespacedName{Name: prodJob2.Name, Namespace: prodJob2.Namespace} createdProdJob2 := &batchv1.Job{} @@ -384,7 +384,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() { framework.ExpectAdmittedActiveWorkloadsMetric(prodClusterQ, 1) ginkgo.By("checking a dev job starts") - devJob := testing.MakeJob("dev-job", ns.Name).Queue(devQueue.Name).Request(corev1.ResourceCPU, "5").Obj() + devJob := testing.MakeJob("dev-job", ns.Name).Queue(devLocalQ.Name).Request(corev1.ResourceCPU, "5").Obj() gomega.Expect(k8sClient.Create(ctx, devJob)).Should(gomega.Succeed()) createdDevJob := &batchv1.Job{} gomega.Eventually(func() *bool { diff --git a/test/integration/framework/framework.go b/test/integration/framework/framework.go index 244b18ae45..98ac18ffc5 100644 --- a/test/integration/framework/framework.go +++ b/test/integration/framework/framework.go @@ -164,7 +164,7 @@ func DeleteResourceFlavor(ctx context.Context, c client.Client, rf *kueue.Resour return nil } -func DeleteQueue(ctx context.Context, c client.Client, q *kueue.Queue) error { +func DeleteQueue(ctx context.Context, c client.Client, q *kueue.LocalQueue) error { if q != nil { if err := c.Delete(ctx, q); err != nil && !apierrors.IsNotFound(err) { return err @@ -182,7 +182,7 @@ func DeleteNamespace(ctx context.Context, c client.Client, ns *corev1.Namespace) if err != nil && !apierrors.IsNotFound(err) { return err } - if err := c.DeleteAllOf(ctx, &kueue.Queue{}, client.InNamespace(ns.Name)); err != nil && !apierrors.IsNotFound(err) { + if err := c.DeleteAllOf(ctx, &kueue.LocalQueue{}, client.InNamespace(ns.Name)); err != nil && !apierrors.IsNotFound(err) { return err } if err := c.DeleteAllOf(ctx, &kueue.Workload{}, client.InNamespace(ns.Name)); err != nil && !apierrors.IsNotFound(err) { diff --git a/test/integration/scheduler/scheduler_test.go b/test/integration/scheduler/scheduler_test.go index aebfb4adf3..d291679fb4 100644 --- a/test/integration/scheduler/scheduler_test.go +++ b/test/integration/scheduler/scheduler_test.go @@ -77,8 +77,8 @@ var _ = ginkgo.Describe("Scheduler", func() { var ( prodClusterQ *kueue.ClusterQueue devClusterQ *kueue.ClusterQueue - prodQueue *kueue.Queue - devQueue *kueue.Queue + prodQueue *kueue.LocalQueue + devQueue *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -103,10 +103,10 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, devClusterQ)).Should(gomega.Succeed()) - prodQueue = testing.MakeQueue("prod-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() + prodQueue = testing.MakeLocalQueue("prod-queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, prodQueue)).Should(gomega.Succeed()) - devQueue = testing.MakeQueue("dev-queue", ns.Name).ClusterQueue(devClusterQ.Name).Obj() + devQueue = testing.MakeLocalQueue("dev-queue", ns.Name).ClusterQueue(devClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, devQueue)).Should(gomega.Succeed()) }) @@ -153,7 +153,7 @@ var _ = ginkgo.Describe("Scheduler", func() { }) ginkgo.It("Should admit workloads according to their priorities", func() { - queue := testing.MakeQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() + queue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() lowPriorityVal, highPriorityVal := int32(10), int32(100) @@ -182,7 +182,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Handling workloads events", func() { var ( cq *kueue.ClusterQueue - queue *kueue.Queue + queue *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -197,7 +197,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) }) @@ -254,7 +254,7 @@ var _ = ginkgo.Describe("Scheduler", func() { gomega.Expect(framework.DeleteClusterQueue(ctx, k8sClient, fooCQ)).Should(gomega.Succeed()) }() - fooQ := testing.MakeQueue("foo-queue", ns.Name).ClusterQueue(fooCQ.Name).Obj() + fooQ := testing.MakeLocalQueue("foo-queue", ns.Name).ClusterQueue(fooCQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, fooQ)).Should(gomega.Succeed()) ginkgo.By("First big workload starts") @@ -296,7 +296,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Handling clusterQueue events", func() { var ( cq *kueue.ClusterQueue - queue *kueue.Queue + queue *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -308,7 +308,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) }) @@ -344,9 +344,9 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Using clusterQueue NamespaceSelector", func() { var ( cq *kueue.ClusterQueue - queue *kueue.Queue + queue *kueue.LocalQueue nsFoo *corev1.Namespace - queueFoo *kueue.Queue + queueFoo *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -366,7 +366,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) nsFoo = &corev1.Namespace{ @@ -375,7 +375,7 @@ var _ = ginkgo.Describe("Scheduler", func() { }, } gomega.Expect(k8sClient.Create(ctx, nsFoo)).To(gomega.Succeed()) - queueFoo = testing.MakeQueue("foo", nsFoo.Name).ClusterQueue(cq.Name).Obj() + queueFoo = testing.MakeLocalQueue("foo", nsFoo.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queueFoo)).Should(gomega.Succeed()) }) @@ -409,7 +409,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Referencing resourceFlavors in clusterQueue", func() { var ( fooCQ *kueue.ClusterQueue - fooQ *kueue.Queue + fooQ *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -420,7 +420,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj()). Obj() gomega.Expect(k8sClient.Create(ctx, fooCQ)).Should(gomega.Succeed()) - fooQ = testing.MakeQueue("foo-queue", ns.Name).ClusterQueue(fooCQ.Name).Obj() + fooQ = testing.MakeLocalQueue("foo-queue", ns.Name).ClusterQueue(fooCQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, fooQ)).Should(gomega.Succeed()) }) @@ -454,7 +454,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Using taints in resourceFlavors", func() { var ( cq *kueue.ClusterQueue - queue *kueue.Queue + queue *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -470,7 +470,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) }) @@ -515,7 +515,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Using affinity in resourceFlavors", func() { var ( cq *kueue.ClusterQueue - queue *kueue.Queue + queue *kueue.LocalQueue ) ginkgo.BeforeEach(func() { @@ -530,7 +530,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) }) @@ -594,7 +594,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, prodBEClusterQ)).Should(gomega.Succeed()) - queue := testing.MakeQueue("queue", ns.Name).ClusterQueue(prodBEClusterQ.Name).Obj() + queue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodBEClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) ginkgo.By("checking a no-fit workload does not get admitted") @@ -643,10 +643,10 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, devBEClusterQ)).Should(gomega.Succeed()) - prodBEQueue := testing.MakeQueue("prod-be-queue", ns.Name).ClusterQueue(prodBEClusterQ.Name).Obj() + prodBEQueue := testing.MakeLocalQueue("prod-be-queue", ns.Name).ClusterQueue(prodBEClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, prodBEQueue)).Should(gomega.Succeed()) - devBEQueue := testing.MakeQueue("dev-be-queue", ns.Name).ClusterQueue(devBEClusterQ.Name).Obj() + devBEQueue := testing.MakeLocalQueue("dev-be-queue", ns.Name).ClusterQueue(devBEClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, devBEQueue)).Should(gomega.Succeed()) wl1 := testing.MakeWorkload("wl-1", ns.Name).Queue(prodBEQueue.Name).Request(corev1.ResourceCPU, "11").Obj() wl2 := testing.MakeWorkload("wl-2", ns.Name).Queue(devBEQueue.Name).Request(corev1.ResourceCPU, "11").Obj() @@ -690,7 +690,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Scheduling with multi workloads", func() { var ( cq *kueue.ClusterQueue - queue *kueue.Queue + queue *kueue.LocalQueue ) ginkgo.BeforeEach(func() { gomega.Expect(k8sClient.Create(ctx, onDemandFlavor)).Should(gomega.Succeed()) @@ -702,7 +702,7 @@ var _ = ginkgo.Describe("Scheduler", func() { Obj() gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) }) @@ -783,7 +783,7 @@ var _ = ginkgo.Describe("Scheduler", func() { }) ginkgo.It("Should schedule workloads by their priority strictly in StrictFIFO", func() { - strictFIFOQueue := testing.MakeQueue("strict-fifo-q", matchingNS.Name).ClusterQueue(strictFIFOClusterQ.Name).Obj() + strictFIFOQueue := testing.MakeLocalQueue("strict-fifo-q", matchingNS.Name).ClusterQueue(strictFIFOClusterQ.Name).Obj() ginkgo.By("Creating workloads") wl1 := testing.MakeWorkload("wl1", matchingNS.Name).Queue(strictFIFOQueue. @@ -813,10 +813,10 @@ var _ = ginkgo.Describe("Scheduler", func() { }) ginkgo.It("Workloads not matching namespaceSelector should not block others", func() { - notMatchingQueue := testing.MakeQueue("not-matching-queue", ns.Name).ClusterQueue(strictFIFOClusterQ.Name).Obj() + notMatchingQueue := testing.MakeLocalQueue("not-matching-queue", ns.Name).ClusterQueue(strictFIFOClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, notMatchingQueue)).Should(gomega.Succeed()) - matchingQueue := testing.MakeQueue("matching-queue", matchingNS.Name).ClusterQueue(strictFIFOClusterQ.Name).Obj() + matchingQueue := testing.MakeLocalQueue("matching-queue", matchingNS.Name).ClusterQueue(strictFIFOClusterQ.Name).Obj() gomega.Expect(k8sClient.Create(ctx, matchingQueue)).Should(gomega.Succeed()) ginkgo.By("Creating workloads") @@ -840,7 +840,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.When("Deleting clusterQueues", func() { var ( cq *kueue.ClusterQueue - queue *kueue.Queue + queue *kueue.LocalQueue ) ginkgo.AfterEach(func() { @@ -852,7 +852,7 @@ var _ = ginkgo.Describe("Scheduler", func() { ginkgo.By("Create clusterQueue") cq = testing.MakeClusterQueue("cluster-queue").Obj() gomega.Expect(k8sClient.Create(ctx, cq)).Should(gomega.Succeed()) - queue = testing.MakeQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() + queue = testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(cq.Name).Obj() gomega.Expect(k8sClient.Create(ctx, queue)).Should(gomega.Succeed()) ginkgo.By("New created workloads should be admitted") diff --git a/test/integration/webhook/v1alpha1/queue_test.go b/test/integration/webhook/v1alpha1/localqueue_test.go similarity index 88% rename from test/integration/webhook/v1alpha1/queue_test.go rename to test/integration/webhook/v1alpha1/localqueue_test.go index 3128b1f625..a801bec14e 100644 --- a/test/integration/webhook/v1alpha1/queue_test.go +++ b/test/integration/webhook/v1alpha1/localqueue_test.go @@ -19,7 +19,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/kueue/apis/kueue/v1alpha1" + kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha1" "sigs.k8s.io/kueue/pkg/util/testing" ) @@ -29,20 +29,20 @@ var _ = ginkgo.Describe("Queue validating webhook", func() { ginkgo.When("Updating a Queue", func() { ginkgo.It("Should allow the change of status", func() { ginkgo.By("Creating a new Queue") - obj := testing.MakeQueue(queueName, ns.Name).ClusterQueue("foo").Obj() + obj := testing.MakeLocalQueue(queueName, ns.Name).ClusterQueue("foo").Obj() gomega.Expect(k8sClient.Create(ctx, obj)).Should(gomega.Succeed()) ginkgo.By("Updating the Queue status") obj.Status.PendingWorkloads = 3 gomega.Expect(k8sClient.Status().Update(ctx, obj)).Should(gomega.Succeed()) - var after v1alpha1.Queue + var after kueue.LocalQueue gomega.Expect(k8sClient.Get(ctx, client.ObjectKey{Name: obj.Name, Namespace: obj.Namespace}, &after)).Should(gomega.Succeed()) gomega.Expect(after.Status.PendingWorkloads).Should(gomega.Equal(int32(3))) }) ginkgo.It("Should reject the change of spec.clusterQueue", func() { ginkgo.By("Creating a new Queue") - obj := testing.MakeQueue(queueName, ns.Name).ClusterQueue("foo").Obj() + obj := testing.MakeLocalQueue(queueName, ns.Name).ClusterQueue("foo").Obj() gomega.Expect(k8sClient.Create(ctx, obj)).Should(gomega.Succeed()) ginkgo.By("Updating the Queue")