Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use future idle resources when checking if task can fit node #585

Merged
merged 1 commit into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
// Check for Resource Predicate
// TODO: We could not allocate resource to task from both node.Idle and node.Releasing now,
// after it is done, we could change the following compare to:
// clonedNode := node.Idle.Clone()
// if !task.InitResreq.LessEqual(clonedNode.Add(node.Releasing)) {
// ...
// }
if !task.InitResreq.LessEqual(node.Idle) && !task.InitResreq.LessEqual(node.Releasing) {
if !task.InitResreq.LessEqual(node.FutureIdle()) {
return api.NewFitError(task, node, api.NodeResourceFitFailed)
}

Expand Down Expand Up @@ -219,7 +213,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
task.Namespace, task.Name, node.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.Releasing) {
if task.InitResreq.LessEqual(node.FutureIdle()) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := stmt.Pipeline(task, node.Name); err != nil {
Expand Down
67 changes: 33 additions & 34 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
queues[queue.UID] = queue
}

if len(job.TaskStatusIndex[api.Pending]) != 0 {
if len(job.TaskStatusIndex[api.Pending]) != 0 && !ssn.JobPipelined(job) {
if _, found := preemptorsMap[job.Queue]; !found {
preemptorsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
Expand Down Expand Up @@ -98,6 +98,11 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
stmt := ssn.Statement()
assigned := false
for {
// If job is pipelined, then stop preempting.
if ssn.JobPipelined(preemptorJob) {
break
}

// If not preemptor tasks, next job.
if preemptorTasks[preemptorJob.UID].Empty() {
klog.V(3).Infof("No preemptor task in job <%s/%s>.",
Expand All @@ -107,7 +112,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {

preemptor := preemptorTasks[preemptorJob.UID].Pop().(*api.TaskInfo)

if preempted, _ := preempt(ssn, stmt, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool {
if preempted, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if task.Status != api.Running {
return false
Expand All @@ -122,16 +127,12 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
}); preempted {
assigned = true
}

// If job is not pipelined, keep preempting
if ssn.JobPipelined(preemptorJob) {
stmt.Commit()
break
}
}

// If job is not pipelined after try all tasks, next job.
if !ssn.JobPipelined(preemptorJob) {
// Commit changes only if job is pipelined, otherwise try next job.
if ssn.JobPipelined(preemptorJob) {
stmt.Commit()
} else {
stmt.Discard()
continue
}
Expand All @@ -155,7 +156,7 @@ func (alloc *preemptAction) Execute(ssn *framework.Session) {
preemptor := preemptorTasks[job.UID].Pop().(*api.TaskInfo)

stmt := ssn.Statement()
assigned, _ := preempt(ssn, stmt, preemptor, ssn.Nodes, func(task *api.TaskInfo) bool {
assigned, _ := preempt(ssn, stmt, preemptor, func(task *api.TaskInfo) bool {
// Ignore non running task.
if task.Status != api.Running {
return false
Expand All @@ -181,12 +182,11 @@ func preempt(
ssn *framework.Session,
stmt *framework.Statement,
preemptor *api.TaskInfo,
nodes map[string]*api.NodeInfo,
filter func(*api.TaskInfo) bool,
) (bool, error) {
assigned := false

allNodes := util.GetNodeList(nodes)
allNodes := util.GetNodeList(ssn.Nodes)

predicateNodes, _ := util.PredicateNodes(preemptor, allNodes, ssn.PredicateFn)

Expand All @@ -198,9 +198,6 @@ func preempt(
preemptor.Namespace, preemptor.Name, node.Name)

var preemptees []*api.TaskInfo
preempted := api.EmptyResource()
resreq := preemptor.InitResreq.Clone()

for _, task := range node.Tasks {
if filter == nil {
preemptees = append(preemptees, task.Clone())
Expand All @@ -211,7 +208,7 @@ func preempt(
victims := ssn.Preemptable(preemptor, preemptees)
metrics.UpdatePreemptionVictimsCount(len(victims))

if err := validateVictims(victims, resreq); err != nil {
if err := validateVictims(preemptor, node, victims); err != nil {
klog.V(3).Infof("No validated victims on Node <%s>: %v", node.Name, err)
continue
}
Expand All @@ -223,29 +220,31 @@ func preempt(
victimsQueue.Push(victim)
}
// Preempt victims for tasks, pick lowest priority task first.
preempted := api.EmptyResource()

for !victimsQueue.Empty() {
// If reclaimed enough resources, break loop to avoid Sub panic.
if preemptor.InitResreq.LessEqual(node.FutureIdle()) {
break
}
preemptee := victimsQueue.Pop().(*api.TaskInfo)
klog.Errorf("Try to preempt Task <%s/%s> for Tasks <%s/%s>",
klog.V(3).Infof("Try to preempt Task <%s/%s> for Tasks <%s/%s>",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name)
if err := stmt.Evict(preemptee, "preempt"); err != nil {
klog.Errorf("Failed to preempt Task <%s/%s> for Tasks <%s/%s>: %v",
preemptee.Namespace, preemptee.Name, preemptor.Namespace, preemptor.Name, err)
continue
}
preempted.Add(preemptee.Resreq)
// If reclaimed enough resources, break loop to avoid Sub panic.
if resreq.LessEqual(preempted) {
break
}
}

metrics.RegisterPreemptionAttempts()
klog.V(3).Infof("Preempted <%v> for task <%s/%s> requested <%v>.",
klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)

if preemptor.InitResreq.LessEqual(preempted) {
if preemptor.InitResreq.LessEqual(node.FutureIdle()) {
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
klog.Errorf("Failed to pipline Task <%s/%s> on Node <%s>",
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
}

Expand All @@ -259,19 +258,19 @@ func preempt(
return assigned, nil
}

func validateVictims(victims []*api.TaskInfo, resreq *api.Resource) error {
func validateVictims(preemptor *api.TaskInfo, node *api.NodeInfo, victims []*api.TaskInfo) error {
if len(victims) == 0 {
return fmt.Errorf("no victims")
}

// If not enough resource, continue
allRes := api.EmptyResource()
for _, v := range victims {
allRes.Add(v.Resreq)
futureIdle := node.FutureIdle()
for _, victim := range victims {
futureIdle.Add(victim.Resreq)
}
if !resreq.LessEqual(allRes) {
return fmt.Errorf("not enough resources")
// Every resource of the preemptor needs to be less or equal than corresponding
// idle resource after preemption.
if !preemptor.InitResreq.LessEqual(futureIdle) {
return fmt.Errorf("not enough resources: requested <%v>, but future idle <%v>",
preemptor.InitResreq, futureIdle)
}

return nil
}
Loading