Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Add job suspend semantics #196

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/apis/common/v1/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,7 @@ type ControllerInterface interface {
// It will requeue the job in case of an error while creating/deleting services.
// Common implementation will be provided and User can still override this to implement their own reconcile logic
ReconcileServices(job metav1.Object, services []*v1.Service, rtype ReplicaType, spec *ReplicaSpec) error

// JobSuspended checks jobs whether suspended or not
JobSuspended(job interface{}) (*bool, error)
}
11 changes: 10 additions & 1 deletion pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,11 @@ func (jc *JobController) ReconcileJobs(
}

oldStatus := jobStatus.DeepCopy()
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) {
jobSuspended, err := jc.Controller.JobSuspended(job)
if err != nil {
return err
}
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) || (jobSuspended != nil && *jobSuspended) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern here is Suspend is just a transition state, should we delete all the pods or just the active ones, leaving the completed pods(succeeded/failed) as they are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If anything, it should have the same semantics as kubernetes Job, where we delete the running pods.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the current implementation is consistent with batch/job.

// If the Job is succeed or failed, delete all pods and services.
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
Expand Down Expand Up @@ -357,3 +361,8 @@ func (jc *JobController) CleanupJob(runPolicy *apiv1.RunPolicy, jobStatus apiv1.
func (jc *JobController) calcPGMinResources(minMember int32, replicas map[apiv1.ReplicaType]*apiv1.ReplicaSpec) *v1.ResourceList {
return CalcPGMinResources(minMember, replicas, jc.PriorityClassLister.Get)
}

func (jc *JobController) JobSuspended(job interface{}) (*bool, error) {
log.Infof("Not implemented.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should merge this since the feature is not completed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a base class default function in case Job subclasses(TFJob, MPIJob, etc.) do not implement this method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it will be override in Job subclass which supports job suspend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If DeletePodsAndServices has an implementation, I don't see why this function wouldn't have one.

return nil, nil
}