Skip to content

Commit

Permalink
Review remarks and cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
mimowo committed Mar 8, 2023
1 parent 4fecb4e commit 1d4a4ce
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 71 deletions.
60 changes: 27 additions & 33 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
)

var (
ownerKey = ".metadata.controller"
parentWorkloadKey = ".metadata.parentWorkload"
)

Expand Down Expand Up @@ -151,8 +150,6 @@ func (h *parentWorkloadHandler) queueReconcileForChildJob(object client.Object,
}
}

//var _ GenericJob = &BatchJob{}

type BatchJob struct {
batchv1.Job
}
Expand All @@ -162,11 +159,11 @@ func (b *BatchJob) Object() client.Object {
}

func (b *BatchJob) ParentWorkloadName() string {
return parentWorkloadName(&b.Job)
return b.Job.Annotations[constants.ParentWorkloadAnnotation]
}

func (b *BatchJob) QueueName() string {
return queueName(&b.Job)
return b.Job.Annotations[constants.QueueAnnotation]
}

func (b *BatchJob) IsSuspend() bool {
Expand All @@ -187,6 +184,10 @@ func (b *BatchJob) UnSuspend() error {
return nil
}

func (b *BatchJob) GetWorkloadName() string {
return GetWorkloadNameForJob(b.Name)
}

func (b *BatchJob) ResetStatus() bool {
// Reset start time so we can update the scheduling directives later when unsuspending.
if b.Status.StartTime == nil {
Expand Down Expand Up @@ -317,14 +318,15 @@ func (r *JobReconciler) SetupWithManager(mgr ctrl.Manager) error {
func SetupIndexes(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &batchv1.Job{}, parentWorkloadKey, func(o client.Object) []string {
job := o.(*batchv1.Job)
if pwName := parentWorkloadName(job); pwName != "" {
batchJob := BatchJob{*job}
if pwName := batchJob.ParentWorkloadName(); pwName != "" {
return []string{pwName}
}
return nil
}); err != nil {
return err
}
return indexer.IndexField(ctx, &kueue.Workload{}, ownerKey, func(o client.Object) []string {
return indexer.IndexField(ctx, &kueue.Workload{}, jobframework.OwnerKey, func(o client.Object) []string {
// grab the Workload object, extract the owner...
wl := o.(*kueue.Workload)
owner := metav1.GetControllerOf(wl)
Expand Down Expand Up @@ -353,16 +355,15 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// we'll ignore not-found errors, since there is nothing to do.
return ctrl.Result{}, client.IgnoreNotFound(err)
}
var genericJob GenericJob = &batchJob

log := ctrl.LoggerFrom(ctx).WithValues("job", klog.KObj(&batchJob.Job))
ctx = ctrl.LoggerInto(ctx, log)

isStandaloneJob := genericJob.ParentWorkloadName() == ""
isStandaloneJob := batchJob.ParentWorkloadName() == ""

// when manageJobsWithoutQueueName is disabled we only reconcile jobs that have either
// queue-name or the parent-workload annotation set.
if !r.manageJobsWithoutQueueName && genericJob.QueueName() == "" && isStandaloneJob {
if !r.manageJobsWithoutQueueName && batchJob.QueueName() == "" && isStandaloneJob {
log.V(3).Info(fmt.Sprintf("Neither %s, nor %s annotation is set, ignoring the job", constants.QueueAnnotation, constants.ParentWorkloadAnnotation))
return ctrl.Result{}, nil
}
Expand All @@ -371,18 +372,19 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

// 1. make sure there is only a single existing instance of the workload.
// If there's no workload exists and job is unsuspended, we'll stop it immediately.
wl, err := EnsureOneWorkload(ctx, r.client, req, r.record, genericJob)
wl, err := jobframework.EnsureOneWorkload(ctx, r.client, req, r.record, &batchJob)
if err != nil {
log.Error(err, "Getting existing workloads")
return ctrl.Result{}, err
}

// 2. handle job is finished.
if condition, finished := genericJob.Finished(); finished {
if condition, finished := batchJob.Finished(); finished {
if wl == nil || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
if err := SetWorkloadCondition(ctx, r.client, wl, condition); err != nil {
apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
if err := r.client.Status().Update(ctx, wl); err != nil {
log.Error(err, "Updating workload status")
}
return ctrl.Result{}, nil
Expand All @@ -393,7 +395,7 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if !isStandaloneJob {
return ctrl.Result{}, nil
}
err := r.handleJobWithNoWorkload(ctx, genericJob)
err := r.handleJobWithNoWorkload(ctx, &batchJob)
if err != nil {
log.Error(err, "Handling job with no workload")
}
Expand All @@ -405,7 +407,7 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// handle a job when waitForPodsReady is enabled, and it is the main job
if r.waitForPodsReady {
log.V(5).Info("Handling a job when waitForPodsReady is enabled")
condition := generatePodsReadyCondition(genericJob, wl)
condition := generatePodsReadyCondition(&batchJob, wl)
// optimization to avoid sending the update request if the status didn't change
if !apimeta.IsStatusConditionPresentAndEqual(wl.Status.Conditions, condition.Type, condition.Status) {
log.V(3).Info(fmt.Sprintf("Updating the PodsReady condition with status: %v", condition.Status))
Expand All @@ -418,19 +420,19 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
}

// 5. handle job is suspended.
if genericJob.IsSuspend() {
if batchJob.IsSuspend() {
// start the job if the workload has been admitted, and the job is still suspended
if wl.Spec.Admission != nil {
log.V(2).Info("Job admitted, unsuspending")
err := StartJob(ctx, r.client, r.record, genericJob, wl)
err := jobframework.StartJob(ctx, r.client, r.record, &batchJob, wl)
if err != nil {
log.Error(err, "Unsuspending job")
}
return ctrl.Result{}, err
}

// update queue name if changed.
q := genericJob.QueueName()
q := batchJob.QueueName()
if wl.Spec.QueueName != q {
log.V(2).Info("Job changed queues, updating workload")
wl.Spec.QueueName = q
Expand All @@ -448,7 +450,7 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
if wl.Spec.Admission == nil {
// the job must be suspended if the workload is not yet admitted.
log.V(2).Info("Running job is not admitted by a cluster queue, suspending")
err := StopJob(ctx, r.client, r.record, genericJob, wl, "Not admitted by cluster queue")
err := jobframework.StopJob(ctx, r.client, r.record, &batchJob, wl, "Not admitted by cluster queue")
if err != nil {
log.Error(err, "Suspending job with non admitted workload")
}
Expand All @@ -460,37 +462,37 @@ func (r *JobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, nil
}

func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, genericJob GenericJob) error {
func (r *JobReconciler) handleJobWithNoWorkload(ctx context.Context, batchJob *BatchJob) error {
log := ctrl.LoggerFrom(ctx)

// Wait until there are no active pods.
if genericJob.IsActive() {
if batchJob.IsActive() {
log.V(2).Info("Job is suspended but still has active pods, waiting")
return nil
}

// Create the corresponding workload.
wl, err := ConstructWorkload(ctx, r.client, r.scheme, genericJob)
wl, err := jobframework.ConstructWorkload(ctx, r.client, r.scheme, batchJob)
if err != nil {
return err
}
if err = r.client.Create(ctx, wl); err != nil {
return err
}
r.record.Eventf(genericJob.Object(), corev1.EventTypeNormal, "CreatedWorkload",
r.record.Eventf(batchJob.Object(), corev1.EventTypeNormal, "CreatedWorkload",
"Created Workload: %v", workload.Key(wl))
return nil
}

func generatePodsReadyCondition(genericJob GenericJob, wl *kueue.Workload) metav1.Condition {
func generatePodsReadyCondition(batchJob *BatchJob, wl *kueue.Workload) metav1.Condition {
conditionStatus := metav1.ConditionFalse
message := "Not all pods are ready or succeeded"
// Once PodsReady=True it stays as long as the workload remains admitted to
// avoid unnecessary flickering the the condition when the pods transition
// Ready to Completed. As pods finish, they transition first into the
// uncountedTerminatedPods staging area, before passing to the
// succeeded/failed counters.
if wl.Spec.Admission != nil && (genericJob.PodsReady() || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadPodsReady)) {
if wl.Spec.Admission != nil && (batchJob.PodsReady() || apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadPodsReady)) {
conditionStatus = metav1.ConditionTrue
message = "All pods were ready or succeeded since the workload admission"
}
Expand All @@ -502,14 +504,6 @@ func generatePodsReadyCondition(genericJob GenericJob, wl *kueue.Workload) metav
}
}

func queueName(job *batchv1.Job) string {
return job.Annotations[constants.QueueAnnotation]
}

func parentWorkloadName(job *batchv1.Job) string {
return job.Annotations[constants.ParentWorkloadAnnotation]
}

func GetWorkloadNameForJob(jobName string) string {
gvk := metav1.GroupVersionKind{Group: batchv1.SchemeGroupVersion.Group, Version: batchv1.SchemeGroupVersion.Version, Kind: "Job"}
return jobframework.GetWorkloadNameForOwnerWithGVK(jobName, &gvk)
Expand Down
7 changes: 5 additions & 2 deletions pkg/controller/workload/job/job_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func (w *JobWebhook) Default(ctx context.Context, obj runtime.Object) error {
}
}

if queueName(job) == "" && !w.manageJobsWithoutQueueName {
batchJob := BatchJob{*job}
if batchJob.QueueName() == "" && !w.manageJobsWithoutQueueName {
return nil
}

Expand Down Expand Up @@ -127,7 +128,9 @@ func (w *JobWebhook) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.
func validateUpdate(oldJob, newJob *batchv1.Job) field.ErrorList {
allErrs := validateCreate(newJob)

if !*newJob.Spec.Suspend && (queueName(oldJob) != queueName(newJob)) {
oldBatchJob := BatchJob{*oldJob}
newBatchJob := BatchJob{*newJob}
if !*newJob.Spec.Suspend && (oldBatchJob.QueueName() != newBatchJob.QueueName()) {
allErrs = append(allErrs, field.Forbidden(suspendPath, "must not update queue name when job is unsuspend"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package job
package jobframework

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -61,7 +60,7 @@ func EnsureOneWorkload(ctx context.Context, cli client.Client, req ctrl.Request,

var workloads kueue.WorkloadList
if err := cli.List(ctx, &workloads, client.InNamespace(job.Object().GetNamespace()),
client.MatchingFields{ownerKey: job.Object().GetName()}); err != nil {
client.MatchingFields{OwnerKey: job.Object().GetName()}); err != nil {
log.Error(err, "Unable to list child workloads")
return nil, err
}
Expand Down Expand Up @@ -176,25 +175,11 @@ func StopJob(ctx context.Context, client client.Client, record record.EventRecor
return nil
}

// CreateWorkload will create a workload from the corresponding job.
func CreateWorkload(ctx context.Context, client client.Client, scheme *runtime.Scheme, job GenericJob) (*kueue.Workload, error) {
wl, err := ConstructWorkload(ctx, client, scheme, job)
if err != nil {
return nil, err
}

if err = client.Create(ctx, wl); err != nil {
return nil, err
}

return wl, nil
}

// ConstructWorkload will derive a workload from the corresponding job.
func ConstructWorkload(ctx context.Context, client client.Client, scheme *runtime.Scheme, job GenericJob) (*kueue.Workload, error) {
wl := &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: GetWorkloadNameForJob(job.Object().GetName()),
Name: job.GetWorkloadName(),
Namespace: job.Object().GetNamespace(),
},
Spec: kueue.WorkloadSpec{
Expand Down Expand Up @@ -248,19 +233,3 @@ func GetNodeSelectors(ctx context.Context, client client.Client, w *kueue.Worklo
}
return nodeSelectors, nil
}

// UpdateQueueNameIfChanged will update workload queue name if changed.
func UpdateQueueNameIfChanged(ctx context.Context, client client.Client, job GenericJob, wl *kueue.Workload) error {
queueName := job.QueueName()
if wl.Spec.QueueName != queueName {
wl.Spec.QueueName = queueName
return client.Update(ctx, wl)
}
return nil
}

// SetWorkloadCondition will update the workload condition by the provide one.
func SetWorkloadCondition(ctx context.Context, client client.Client, wl *kueue.Workload, condition metav1.Condition) error {
apimeta.SetStatusCondition(&wl.Status.Conditions, condition)
return client.Status().Update(ctx, wl)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package job
package jobframework

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -20,6 +20,10 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2"
)

const (
OwnerKey = ".metadata.controller"
)

type GenericJob interface {
// Object returns the job instance.
Object() client.Object
Expand Down Expand Up @@ -53,4 +57,6 @@ type GenericJob interface {
IsActive() bool
// PodsReady instructs whether job derived pods are all ready now.
PodsReady() bool
// GetWorkloadName returns the name of the workload for the job.
GetWorkloadName() string
}
3 changes: 2 additions & 1 deletion test/integration/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1alpha2"
"sigs.k8s.io/kueue/pkg/constants"
workloadjob "sigs.k8s.io/kueue/pkg/controller/workload/job"
"sigs.k8s.io/kueue/pkg/controller/workload/jobframework"
"sigs.k8s.io/kueue/pkg/util/pointer"
"sigs.k8s.io/kueue/pkg/util/testing"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -110,7 +111,7 @@ var _ = ginkgo.Describe("Job controller", func() {
}, util.Timeout, util.Interval).Should(gomega.BeTrue())

ginkgo.By("checking a second non-matching workload is deleted")
secondWl, _ := workloadjob.ConstructWorkload(ctx, k8sClient, scheme.Scheme, &batchJob)
secondWl, _ := jobframework.ConstructWorkload(ctx, k8sClient, scheme.Scheme, &batchJob)
secondWl.Name = workloadjob.GetWorkloadNameForJob("second-workload")
secondWl.Spec.PodSets[0].Count = parallelism + 1
gomega.Expect(k8sClient.Create(ctx, secondWl)).Should(gomega.Succeed())
Expand Down

0 comments on commit 1d4a4ce

Please sign in to comment.