Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix bug about status absence when worker pod spec is invalid #606

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,8 +961,13 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
// If an error occurs during Get/Create, we'll requeue the item so we
// can attempt processing again later. This could have been caused by a
// temporary network failure, or any other transient reason.
// But, if err is about pod spec invalid, retrying would be
// futile, the status of job should turn to failed.
if err != nil {
c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "worker pod created failed: %v", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only one of the cases where there could be an invalid Pod template.

It might be better to return this error and handle more generically in syncHandler, so we can handle the launcher pod, the worker pods and any other validation errors:

if errs := validation.ValidateMPIJob(mpiJob); len(errs) != 0 {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have examined how Pod Spec validation is performed in the Kubernetes project. The relevant code can be found in the "k8s.io/kubernetes/pkg/apis/core/validation" package.
However, it seems that this package is not usable outside of the Kubernetes project

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't mean that you should use the validation code form kubernetes.

I just mean that there are multiple cases in which we can't retry, and this PR is only covering one of them.

if errors.IsInvalid(err) {
return workerPods, nil
}
return nil, err
}
// If the worker is not controlled by this MPIJob resource, we should log
Expand Down Expand Up @@ -1076,7 +1081,6 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
running = 0
evict = 0
)

initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
//spec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
for i := 0; i < len(worker); i++ {
Expand All @@ -1100,7 +1104,19 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg)
}

if isMPIJobSuspended(mpiJob) {
// When workerSpec != nil and workerSpec.Replicas != 0 and len(worker) == 0,
// pod spec must be wrong, job failed.
workerSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
if workerSpec != nil && len(worker) == 0 && *workerSpec.Replicas != 0 {
msg := "invalid pod spec"
c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, msg)
if mpiJob.Status.CompletionTime == nil {
now := metav1.Now()
mpiJob.Status.CompletionTime = &now
}
updateMPIJobConditions(mpiJob, kubeflow.JobFailed, corev1.ConditionTrue, mpiJobFailedReason, msg)
mpiJobsFailureCount.Inc()
} else if isMPIJobSuspended(mpiJob) {
msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, corev1.ConditionFalse, mpiJobSuspendedReason, msg)
} else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
Expand Down