diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 5277f67b62..3e2fe6a4ec 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -99,13 +99,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { // Check for Resource Predicate - // TODO: We could not allocate resource to task from both node.Idle and node.Releasing now, - // after it is done, we could change the following compare to: - // clonedNode := node.Idle.Clone() - // if !task.InitResreq.LessEqual(clonedNode.Add(node.Releasing)) { - // ... - // } - if !task.InitResreq.LessEqual(node.Idle) && !task.InitResreq.LessEqual(node.Releasing) { + if !task.InitResreq.LessEqual(node.FutureIdle()) { return api.NewFitError(task, node, api.NodeResourceFitFailed) } @@ -219,7 +213,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) { task.Namespace, task.Name, node.Name) // Allocate releasing resource to the task if any. - if task.InitResreq.LessEqual(node.Releasing) { + if task.InitResreq.LessEqual(node.FutureIdle()) { klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing) if err := stmt.Pipeline(task, node.Name); err != nil { diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index c0aaadf222..e61ee6b48f 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -69,7 +69,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { queues[queue.UID] = queue } - if len(job.TaskStatusIndex[api.Pending]) != 0 { + if len(job.TaskStatusIndex[api.Pending]) != 0 && !ssn.JobPipelined(job) { if _, found := preemptorsMap[job.Queue]; !found { preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) } @@ -98,6 +98,11 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { stmt := ssn.Statement() assigned := false for { + // If job is pipelined, then stop preempting. + if ssn.JobPipelined(preemptorJob) { + break + } + // If not preemptor tasks, next job. if preemptorTasks[preemptorJob.UID].Empty() { klog.V(3).Infof("No preemptor task in job <%s/%s>.", @@ -107,7 +112,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo) - if preempted, _ := preempt(ssn, stmt, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool { + if preempted, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool { // Ignore non running task. if task.Status != api.Running { return false @@ -122,16 +127,12 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { }); preempted { assigned = true } - - // If job is not pipelined, keep preempting - if ssn.JobPipelined(preemptorJob) { - stmt.Commit() - break - } } - // If job is not pipelined after try all tasks, next job. - if !ssn.JobPipelined(preemptorJob) { + // Commit changes only if job is pipelined, otherwise try next job. + if ssn.JobPipelined(preemptorJob) { + stmt.Commit() + } else { stmt.Discard() continue } @@ -155,7 +156,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) { preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo) stmt := ssn.Statement() - assigned, _ := preempt(ssn, stmt, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool { + assigned, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool { // Ignore non running task. if task.Status != api.Running { return false @@ -181,12 +182,11 @@ func preempt( ssn *framework.Session, stmt *framework.Statement, preemptor *api.TaskInfo, - nodes map[string]*api.NodeInfo, filter func(*api.TaskInfo) bool, ) (bool, error) { assigned := false - allNodes := util.GetNodeList(nodes) + allNodes := util.GetNodeList(ssn.Nodes) predicateNodes, _ := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn) @@ -198,9 +198,6 @@ func preempt( preemptor.Namespace, preemptor.Name, node.Name) var preemptees []*api.TaskInfo - preempted := api.EmptyResource() - resreq := preemptor.InitResreq.Clone() - for _, task := range node.Tasks { if filter == nil { preemptees = append(preemptees, task.Clone()) @@ -211,7 +208,7 @@ func preempt( victims := ssn.Preemptable(preemptor, preemptees) metrics.UpdatePreemptionVictimsCount(len(victims)) - if err := validateVictims(victims, resreq); err != nil { + if err := validateVictims(preemptor, node, victims); err != nil { klog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err) continue } @@ -223,9 +220,15 @@ func preempt( victimsQueue.Push(victim) } // Preempt victims for tasks, pick lowest priority task first. + preempted := api.EmptyResource() + for !victimsQueue.Empty() { + // If reclaimed enough resources, break loop to avoid Sub panic. + if preemptor.InitResreq.LessEqual(node.FutureIdle()) { + break + } preemptee := victimsQueue.Pop().(*api.TaskInfo) - klog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>", + klog.V(3).Infof("Try to preempt Task <%s/%s> for Tasks <%s/%s>", preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name) if err := stmt.Evict(preemptee, "preempt"); err != nil { klog.Errorf("Failed to preempt Task <%s/%s> for Tasks <%s/%s>: %v", @@ -233,19 +236,15 @@ func preempt( continue } preempted.Add(preemptee.Resreq) - // If reclaimed enough resources, break loop to avoid Sub panic. - if resreq.LessEqual(preempted) { - break - } } metrics.RegisterPreemptionAttempts() - klog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.", + klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.", preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq) - if preemptor.InitResreq.LessEqual(preempted) { + if preemptor.InitResreq.LessEqual(node.FutureIdle()) { if err := stmt.Pipeline(preemptor, node.Name); err != nil { - klog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>", + klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>", preemptor.Namespace, preemptor.Name, node.Name) } @@ -259,19 +258,19 @@ func preempt( return assigned, nil } -func validateVictims(victims []*api.TaskInfo, resreq *api.Resource) error { +func validateVictims(preemptor *api.TaskInfo, node *api.NodeInfo, victims []*api.TaskInfo) error { if len(victims) == 0 { return fmt.Errorf("no victims") } - - // If not enough resource, continue - allRes := api.EmptyResource() - for _, v := range victims { - allRes.Add(v.Resreq) + futureIdle := node.FutureIdle() + for _, victim := range victims { + futureIdle.Add(victim.Resreq) } - if !resreq.LessEqual(allRes) { - return fmt.Errorf("not enough resources") + // Every resource of the preemptor needs to be less or equal than corresponding + // idle resource after preemption. + if !preemptor.InitResreq.LessEqual(futureIdle) { + return fmt.Errorf("not enough resources: requested <%v>, but future idle <%v>", + preemptor.InitResreq, futureIdle) } - return nil } diff --git a/pkg/scheduler/actions/preempt/preempt_test.go b/pkg/scheduler/actions/preempt/preempt_test.go index 9a46c675f8..99c02023e4 100644 --- a/pkg/scheduler/actions/preempt/preempt_test.go +++ b/pkg/scheduler/actions/preempt/preempt_test.go @@ -54,7 +54,7 @@ func TestPreempt(t *testing.T) { expected int }{ { - name: "one Job with two Pods on one node", + name: "do not preempt if there are enough idle resources", podGroups: []*schedulingv2.PodGroup{ { ObjectMeta: metav1.ObjectMeta{ @@ -62,7 +62,8 @@ func TestPreempt(t *testing.T) { Namespace: "c1", }, Spec: schedulingv2.PodGroupSpec{ - Queue: "q1", + MinMember: 3, + Queue: "q1", }, }, }, @@ -70,10 +71,10 @@ func TestPreempt(t *testing.T) { util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), util.BuildPod("c1", "preemptor1", "", v1.PodPending, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), - util.BuildPod("c1", "preemptor2", "", v1.PodPending, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), }, + // If there are enough idle resources on the node, then there is no need to preempt anything. nodes: []*v1.Node{ - util.BuildNode("n1", util.BuildResourceList("3", "3Gi"), make(map[string]string)), + util.BuildNode("n1", util.BuildResourceList("10", "10G"), make(map[string]string)), }, queues: []*schedulingv2.Queue{ { @@ -85,10 +86,10 @@ func TestPreempt(t *testing.T) { }, }, }, - expected: 1, + expected: 0, }, { - name: "two Jobs on one node", + name: "do not preempt if job is pipelined", podGroups: []*schedulingv2.PodGroup{ { ObjectMeta: metav1.ObjectMeta{ @@ -96,7 +97,8 @@ func TestPreempt(t *testing.T) { Namespace: "c1", }, Spec: schedulingv2.PodGroupSpec{ - Queue: "q1", + MinMember: 1, + Queue: "q1", }, }, { @@ -105,19 +107,63 @@ func TestPreempt(t *testing.T) { Namespace: "c1", }, Spec: schedulingv2.PodGroupSpec{ - Queue: "q1", + MinMember: 1, + Queue: "q1", + }, + }, + }, + // Both pg1 and pg2 jobs are pipelined, because enough pods are already running. + pods: []*v1.Pod{ + util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), + util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), + util.BuildPod("c1", "preemptee3", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)), + util.BuildPod("c1", "preemptor2", "", v1.PodPending, util.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)), + }, + // All resources on the node will be in use. + nodes: []*v1.Node{ + util.BuildNode("n1", util.BuildResourceList("3", "3G"), make(map[string]string)), + }, + queues: []*schedulingv2.Queue{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "q1", + }, + Spec: schedulingv2.QueueSpec{ + Weight: 1, + }, + }, + }, + expected: 0, + }, + { + name: "preempt one task of different job to fit both jobs on one node", + podGroups: []*schedulingv2.PodGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "c1", + }, + Spec: schedulingv2.PodGroupSpec{ + MinMember: 1, + Queue: "q1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg2", + Namespace: "c1", + }, + Spec: schedulingv2.PodGroupSpec{ + MinMember: 1, + Queue: "q1", }, }, }, pods: []*v1.Pod{ - // running pod with pg1, under c1 util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), - // running pod with pg1, under c1 util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), - // pending pod with pg2, under c1 util.BuildPod("c1", "preemptor1", "", v1.PodPending, util.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)), - // pending pod with pg2, under c1 util.BuildPod("c1", "preemptor2", "", v1.PodPending, util.BuildResourceList("1", "1G"), "pg2", make(map[string]string), make(map[string]string)), }, nodes: []*v1.Node{ @@ -133,76 +179,130 @@ func TestPreempt(t *testing.T) { }, }, }, + expected: 1, + }, + { + name: "preempt enough tasks to fit large task of different job", + podGroups: []*schedulingv2.PodGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg1", + Namespace: "c1", + }, + Spec: schedulingv2.PodGroupSpec{ + MinMember: 1, + Queue: "q1", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "pg2", + Namespace: "c1", + }, + Spec: schedulingv2.PodGroupSpec{ + MinMember: 1, + Queue: "q1", + }, + }, + }, + // There are 3 cpus and 3G of memory idle and 3 tasks running each consuming 1 cpu and 1G of memory. + // Big task requiring 5 cpus and 5G of memory should preempt 2 of 3 running tasks to fit into the node. + pods: []*v1.Pod{ + util.BuildPod("c1", "preemptee1", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), + util.BuildPod("c1", "preemptee2", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), + util.BuildPod("c1", "preemptee3", "n1", v1.PodRunning, util.BuildResourceList("1", "1G"), "pg1", make(map[string]string), make(map[string]string)), + util.BuildPod("c1", "preemptor1", "", v1.PodPending, util.BuildResourceList("5", "5G"), "pg2", make(map[string]string), make(map[string]string)), + }, + nodes: []*v1.Node{ + util.BuildNode("n1", util.BuildResourceList("6", "6G"), make(map[string]string)), + }, + queues: []*schedulingv2.Queue{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "q1", + }, + Spec: schedulingv2.QueueSpec{ + Weight: 1, + }, + }, + }, expected: 2, }, } - allocate := New() - - for i, test := range tests { - binder := &util.FakeBinder{ - Binds: map[string]string{}, - Channel: make(chan string), - } - evictor := &util.FakeEvictor{ - Evicts: make([]string, 0), - Channel: make(chan string), - } - schedulerCache := &cache.SchedulerCache{ - Nodes: make(map[string]*api.NodeInfo), - Jobs: make(map[api.JobID]*api.JobInfo), - Queues: make(map[api.QueueID]*api.QueueInfo), - Binder: binder, - Evictor: evictor, - StatusUpdater: &util.FakeStatusUpdater{}, - VolumeBinder: &util.FakeVolumeBinder{}, - - Recorder: record.NewFakeRecorder(100), - } - for _, node := range test.nodes { - schedulerCache.AddNode(node) - } - for _, pod := range test.pods { - schedulerCache.AddPod(pod) - } - - for _, ss := range test.podGroups { - schedulerCache.AddPodGroupV1alpha2(ss) - } - - for _, q := range test.queues { - schedulerCache.AddQueueV1alpha2(q) - } - - trueValue := true - ssn := framework.OpenSession(schedulerCache, []conf.Tier{ - { - Plugins: []conf.PluginOption{ - { - Name: "conformance", - EnabledPreemptable: &trueValue, - }, - { - Name: "gang", - EnabledPreemptable: &trueValue, + preempt := New() + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + binder := &util.FakeBinder{ + Binds: map[string]string{}, + Channel: make(chan string), + } + evictor := &util.FakeEvictor{ + Channel: make(chan string), + } + schedulerCache := &cache.SchedulerCache{ + Nodes: make(map[string]*api.NodeInfo), + Jobs: make(map[api.JobID]*api.JobInfo), + Queues: make(map[api.QueueID]*api.QueueInfo), + Binder: binder, + Evictor: evictor, + StatusUpdater: &util.FakeStatusUpdater{}, + VolumeBinder: &util.FakeVolumeBinder{}, + + Recorder: record.NewFakeRecorder(100), + } + for _, node := range test.nodes { + schedulerCache.AddNode(node) + } + for _, pod := range test.pods { + schedulerCache.AddPod(pod) + } + + for _, ss := range test.podGroups { + schedulerCache.AddPodGroupV1alpha2(ss) + } + + for _, q := range test.queues { + schedulerCache.AddQueueV1alpha2(q) + } + + trueValue := true + ssn := framework.OpenSession(schedulerCache, []conf.Tier{ + { + Plugins: []conf.PluginOption{ + { + Name: "conformance", + EnabledPreemptable: &trueValue, + }, + { + Name: "gang", + EnabledPreemptable: &trueValue, + EnabledJobPipelined: &trueValue, + }, }, }, - }, - }) - defer framework.CloseSession(ssn) + }) + defer framework.CloseSession(ssn) - allocate.Execute(ssn) + preempt.Execute(ssn) - for i := 0; i < test.expected; i++ { + for i := 0; i < test.expected; i++ { + select { + case <-evictor.Channel: + case <-time.After(time.Second): + t.Errorf("not enough evictions") + } + } select { - case <-evictor.Channel: - case <-time.After(3 * time.Second): - t.Errorf("Failed to get evicting request.") + case key, opened := <-evictor.Channel: + if opened { + t.Errorf("unexpected eviction: %s", key) + } + case <-time.After(50 * time.Millisecond): + // TODO: Active waiting here is not optimal, but there is no better way currently. + // Ideally we would like to wait for evict and bind request goroutines to finish first. } - } - - if test.expected != len(evictor.Evicts) { - t.Errorf("case %d (%s): expected: %v, got %v ", i, test.name, test.expected, len(evictor.Evicts)) - } + }) } } diff --git a/pkg/scheduler/actions/reclaim/reclaim_test.go b/pkg/scheduler/actions/reclaim/reclaim_test.go index 784526173a..ac63dda3fc 100644 --- a/pkg/scheduler/actions/reclaim/reclaim_test.go +++ b/pkg/scheduler/actions/reclaim/reclaim_test.go @@ -108,7 +108,6 @@ func TestReclaim(t *testing.T) { Channel: make(chan string), } evictor := &util.FakeEvictor{ - Evicts: make([]string, 0), Channel: make(chan string), } schedulerCache := &cache.SchedulerCache{ @@ -164,8 +163,8 @@ func TestReclaim(t *testing.T) { } } - if test.expected != len(evictor.Evicts) { - t.Errorf("case %d (%s): expected: %v, got %v ", i, test.name, test.expected, len(evictor.Evicts)) + if test.expected != len(evictor.Evicts()) { + t.Errorf("case %d (%s): expected: %v, got %v ", i, test.name, test.expected, len(evictor.Evicts())) } } } diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index a62ce98dbd..df97598154 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -33,6 +33,8 @@ type NodeInfo struct { // The releasing resource on that node Releasing *Resource + // The pipelined resource on that node + Pipelined *Resource // The idle resource on that node Idle *Resource // The used resource on that node, including running and terminating @@ -48,6 +50,13 @@ type NodeInfo struct { Others map[string]interface{} } +// FutureIdle returns resources that will be idle in the future: +// +// That is current idle resources plus released resources minus pipelined resources. +func (ni *NodeInfo) FutureIdle() *Resource { + return ni.Idle.Clone().Add(ni.Releasing).Sub(ni.Pipelined) +} + // NodeState defines the current state of node. type NodeState struct { Phase NodePhase @@ -61,6 +70,7 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { if node == nil { ni = &NodeInfo{ Releasing: EmptyResource(), + Pipelined: EmptyResource(), Idle: EmptyResource(), Used: EmptyResource(), @@ -75,6 +85,7 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { Node: node, Releasing: EmptyResource(), + Pipelined: EmptyResource(), Idle: NewResource(node.Status.Allocatable), Used: EmptyResource(), @@ -158,16 +169,23 @@ func (ni *NodeInfo) SetNode(node *v1.Node) { ni.Allocatable = NewResource(node.Status.Allocatable) ni.Capability = NewResource(node.Status.Capacity) + ni.Releasing = EmptyResource() + ni.Pipelined = EmptyResource() ni.Idle = NewResource(node.Status.Allocatable) ni.Used = EmptyResource() - for _, task := range ni.Tasks { - if task.Status == Releasing { - ni.Releasing.Add(task.Resreq) + for _, ti := range ni.Tasks { + switch ti.Status { + case Releasing: + ni.Idle.Sub(ti.Resreq) + ni.Releasing.Add(ti.Resreq) + ni.Used.Add(ti.Resreq) + case Pipelined: + ni.Pipelined.Add(ti.Resreq) + default: + ni.Idle.Sub(ti.Resreq) + ni.Used.Add(ti.Resreq) } - - ni.Idle.Sub(task.Resreq) - ni.Used.Add(task.Resreq) } } @@ -202,15 +220,15 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error { return err } ni.Releasing.Add(ti.Resreq) + ni.Used.Add(ti.Resreq) case Pipelined: - ni.Releasing.Sub(ti.Resreq) + ni.Pipelined.Add(ti.Resreq) default: if err := ni.allocateIdleResource(ti); err != nil { return err } + ni.Used.Add(ti.Resreq) } - - ni.Used.Add(ti.Resreq) } ni.Tasks[key] = ti @@ -233,13 +251,13 @@ func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error { case Releasing: ni.Releasing.Sub(task.Resreq) ni.Idle.Add(task.Resreq) + ni.Used.Sub(task.Resreq) case Pipelined: - ni.Releasing.Add(task.Resreq) + ni.Pipelined.Sub(task.Resreq) default: ni.Idle.Add(task.Resreq) + ni.Used.Sub(task.Resreq) } - - ni.Used.Sub(task.Resreq) } delete(ni.Tasks, key) diff --git a/pkg/scheduler/api/node_info_test.go b/pkg/scheduler/api/node_info_test.go index 4ccbfb9985..6de73c8b32 100644 --- a/pkg/scheduler/api/node_info_test.go +++ b/pkg/scheduler/api/node_info_test.go @@ -57,6 +57,7 @@ func TestNodeInfo_AddPod(t *testing.T) { Idle: buildResource("5000m", "7G"), Used: buildResource("3000m", "3G"), Releasing: EmptyResource(), + Pipelined: EmptyResource(), Allocatable: buildResource("8000m", "10G"), Capability: buildResource("8000m", "10G"), State: NodeState{Phase: Ready}, @@ -76,6 +77,7 @@ func TestNodeInfo_AddPod(t *testing.T) { Idle: buildResource("2000m", "1G"), Used: EmptyResource(), Releasing: EmptyResource(), + Pipelined: EmptyResource(), Allocatable: buildResource("2000m", "1G"), Capability: buildResource("2000m", "1G"), State: NodeState{Phase: NotReady, Reason: "OutOfSync"}, @@ -124,6 +126,7 @@ func TestNodeInfo_RemovePod(t *testing.T) { Idle: buildResource("4000m", "6G"), Used: buildResource("4000m", "4G"), Releasing: EmptyResource(), + Pipelined: EmptyResource(), Allocatable: buildResource("8000m", "10G"), Capability: buildResource("8000m", "10G"), State: NodeState{Phase: Ready}, diff --git a/pkg/scheduler/util/test_utils.go b/pkg/scheduler/util/test_utils.go index 9384c3f46a..6a14189d77 100644 --- a/pkg/scheduler/util/test_utils.go +++ b/pkg/scheduler/util/test_utils.go @@ -114,10 +114,17 @@ func (fb *FakeBinder) Bind(p *v1.Pod, hostname string) error { // FakeEvictor is used as fake evictor type FakeEvictor struct { sync.Mutex - Evicts []string + evicts []string Channel chan string } +// Evicts returns copy of evicted pods. +func (fe *FakeEvictor) Evicts() []string { + fe.Lock() + defer fe.Unlock() + return append([]string{}, fe.evicts...) +} + // Evict is used by fake evictor to evict pods func (fe *FakeEvictor) Evict(p *v1.Pod) error { fe.Lock() @@ -125,7 +132,7 @@ func (fe *FakeEvictor) Evict(p *v1.Pod) error { fmt.Println("PodName: ", p.Name) key := fmt.Sprintf("%v/%v", p.Namespace, p.Name) - fe.Evicts = append(fe.Evicts, key) + fe.evicts = append(fe.evicts, key) fe.Channel <- key